Getting started Azure Service Bus Event Hubs: building a real-time log stream

More posts on Event Hubs:
IoT with Azure Service Bus Event Hubs: authenticating and sending from any type of device (.NET and JS samples)

At WPC 2014 Microsoft announced the preview of Azure Event Hubs, a part of the Azure Service Bus, which will enable the collection of event streams at high throughput which makes it perfect for IoT, data processing, … scenarios. In this article I’ll start with a simple example to give you a first hands-on experience with Event Hubs.

We’ll be building a simple TraceListener that publishes logs to an Event Hub which can be processed by one or more worker processed. I’m not fond of the “logging” examples, but in this case this could really be useful. Think of all those legacy .NET desktop applications deployed on hundreds or even thousands of machines. Some of those applications are probably deployed on laptops, POS devices, … that don’t have a direct link with the main office, so logging to SQL Server is probably not an option. Wouldn’t it be useful if all these applications could send their logs (performance, error, …) to a single ‘hub’ instead of local files or the event log, even if they’re not in the main office?  And what if we could then process all those logs for reporting of bugs, performance, …? These are the scenarios that are made possible by Event Hubs.

Creating our Event Hub

So the first thing we’ll do is create a new Event Hub in a Service Bus namespace:

On the next page we’ll need to define the following settings:

  • Partition Count: the scale unit (each partition can handle an ingress of 1MB/s and egress of 2MB/second). For each partition you’ll also need a dedicated worker process (this is your code running in a Thread, a Task or different processes) and a partition should always be processed by a single worker process.
  • Message Retention: the duration that messages are conserved in the hub. When you process a message from the hub the message is not removed, it simply stays in the hub until it expires. That’s why when processing a hub you need to specify the offset (a start time for example).

Finally once the Event Hub is created you need to specify 2 Shared Access Policies in the Configure tab. In this scenario we control the senders so it’s OK to simply use the policy directly (with a shared secret), but for IoT scenario’s you’ll do things a little different. You will need to generate a signature based on the Shared Access Policy and store that signature in the device (something I’ll be covering in a next blog post).

Sending Logs

The next thing we’ll do is create a new ASP.NET MVC 5 application with a custom Trace Listener. Please don’t use this in production (it’s not asynchronous, doesn’t batch the messages, no fault resilience, …).

Here’s what’s happening:

  1. In the connection settings we define that we want to use Amqp a transport type (required by the Event Hubs).
  2. We create a TokenProvider with the information from our Shared Access Policy
  3. We create a new MessagingFactory and a EventHubClient which uses our Logs Event Hub
  4. Each time a message is written we create a LogMessageEvent, serialize it and add it to the EventData object, set its partition key and send out the message. The partition key is what makes this a very scalable system and will define which worker process will receive the message: since we create the Event Hub with 8 partitions, a hash of the PartitionKey will map this to 1 of the 8 partitions.

    public class EventHubsTraceListener : TraceListener
    {
        private readonly EventHubClient _client;
        private readonly string _siteName;
        private readonly string _instanceId;

        public EventHubsTraceListener()
        {
            var settings = new MessagingFactorySettings()
            {
                TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(
                      ConfigurationManager.AppSettings["ServiceBus.KeyName"], ConfigurationManager.AppSettings["ServiceBus.Key"]),
                TransportType = TransportType.Amqp
            };
            var factory = MessagingFactory.Create(
                 ServiceBusEnvironment.CreateServiceUri("sb",
                 ConfigurationManager.AppSettings["ServiceBus.Namespace"], ""), settings);
            _client = factory.CreateEventHubClient("Logs");

            // Event information.
            _instanceId = Environment.GetEnvironmentVariable("WEBSITE_INSTANCE_ID") ?? DateTime.Now.Ticks.ToString(CultureInfo.InvariantCulture);
            _siteName = Environment.GetEnvironmentVariable("WEBSITE_SITE_NAME") ?? "CorporateWebApp";

        }

        public override void Write(string message)
        {
            var eventData = new EventData(Encoding.Default.GetBytes(
              JsonConvert.SerializeObject(new LogMessageEvent()
              {
                InstanceId = _instanceId,
                MachineName = Environment.MachineName,
                SiteName = _siteName,
                Value = message
              })));

            eventData.PartitionKey = _instanceId;

            _client.Send(eventData);
        }

        public override void WriteLine(string message)
        {
            Write(message);
        }
    }

In the web.config I also added the required settings and registered the TraceListener:


...

...

And finally I modified my HomeController to log something each time someone visits the site:

    public class HomeController : Controller
    {
        public ActionResult Index()
        {
            Trace.WriteLine(String.Format("Request on /Home/Index by {0}", Request.UserAgent));
            return View();
        }

        public ActionResult About()
        {
            Trace.WriteLine(String.Format("Request on /Home/About by {0}", Request.UserAgent));
            return View();
        }

        public ActionResult Contact()
        {
            Trace.WriteLine(String.Format("Request on /Home/About by {0}", Request.UserAgent));
            return View();
        }
    }

Processing the log stream

Our site (possible running on a lot of instances) is now sending all its log messages to our Event Hub. The following Console Application will now simulate 8 worker processes, each processing a single partition. Again, this is just a simple demo which does not take into account retry policies, fault resilience, …

private static void Main(string[] args)
{
    var cts = new CancellationTokenSource();

    for (int i = 0; i
        {
            Console.WriteLine("Starting worker to process partition: {0}", state);

            var factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", ConfigurationManager.AppSettings["ServiceBus.Namespace"], ""), new MessagingFactorySettings()
            {
                TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(ConfigurationManager.AppSettings["ServiceBus.KeyName"], ConfigurationManager.AppSettings["ServiceBus.Key"]),
                TransportType = TransportType.Amqp
            });

            var client = factory.CreateEventHubClient("Logs");
            var grp = client.GetDefaultConsumerGroup();

            Console.WriteLine("Group: {0}", grp.GroupName);

            var receiver = grp.CreateReceiver(state.ToString(), DateTime.UtcNow);

            while (true)
            {
                if (cts.IsCancellationRequested)
                {
                    receiver.Close();
                    break;
                }

                // Receive could fail, I would need a retry policy etc...
                var messages = receiver.Receive(10);
                foreach (var message in messages)
                {
                    var logMessage = Newtonsoft.Json.JsonConvert.DeserializeObject(Encoding.Default.GetString(message.GetBytes()));

                    Console.WriteLine("{0} [{6}] {2}/{3}: {5}", DateTime.Now, message.PartitionKey, logMessage.MachineName, logMessage.SiteName, logMessage.InstanceId, logMessage.Value, state);
                    Console.WriteLine(" > Instance/PartitionKey: {0}", message.PartitionKey);
                }
            }
        }, i);
    }

    Console.ReadLine();

    cts.Cancel();
}

Just like with the TraceListener I’m connecting to the Event Hub using Amqp and I create a HubClient. Using the default consumer group I create a receiver on which I listen for messages and process them in batches of 10 messages (this happens once for every partition). Note that I created an Event Hub with 8 partitions, so I’ll create a receiver for: “0″, “1″, “2″, “3″, “4″, “5″, “6″, “7″ (the name of the partition starts with 0 and ends with PartitionCount-1).

The complete code is available here: https://github.com/sandrinodimattia/EventHubsDemo

We’re not done yet!

This is only the tip of the iceberg and I’ll be blogging more about this soon to cover IoT scenarios (with device registration), reliable processing of event streams, how you can use Event Hubs with my RedDog library, …

And in the meanwhile, here’s the documentation:

Enjoy!

About Sandrino Di Mattia

Sandrino Di Mattia is a Windows Azure Consultant at RealDolmen and a Windows Azure Insider. He lives and breathes Windows Azure.

  • http://weblogs.asp.net/sfeldman/ Sean Feldman

    This is a good demo, thank you.
    Just wanted to mention that if you could in your config file provide values that are close to real values, would be easier for beginners to understand what to enter. I failed on the servicebus namespace. Was entering the full URI instead… silly :)

  • Orr Ganani

    Hi,
    I wanted to know how to define different consumer groups for the Event Hub.
    I tried with anything other than $Default but the events aren’t received.
    I tried to just to get a different consumer group from the client but that didn’t help as well.

    Thanks!

    • winterTTr

      I have the same question, what’s the different consumer groups means?
      How it control the reading of event from event hub?
      Is consumer group are related with partition ?