Azure Event Hubs


Last Updated: 3/26/2022

Overview

  • Azure Event Hubs is a big data streaming platform and event ingestion (take in) service.
  • It can receive and process millions of events per second. Data sent to an event hub can be transformed and stored by using any real-time analytics provider such as Azure Stream Analytics or batching/storage adapters.
  • Azure Event Hubs represents the front door for an event pipeline, often called an event ingestor in solution architectures.
  • Event ingestor is a component or service that sits between event publishers and event consumers to decouple the production of an event stream from the consumption of those events.
  • Event Hubs is a fully managed Platform-as-a-Service (PaaS) with little configuration or management overhead.
  • Event Hubs is a time-retention durable buffer for telemetry ingress, similar to a distributed log.
  • Scaling options, like Auto-inflate, scale the number of throughput units to meet your usage needs.
  • Event Hubs uses a partitioned consumer model, enabling multiple applications to process the stream concurrently and letting you control the speed of processing.
  • Capture your data in near-real time in an Azure Blob storage or Azure Data Lake Storage for long-term retention or micro-batch processing.
  • Event Hubs for Apache Kafka ecosystems enables Apache Kafka (1.0 and later) clients and applications to talk to Event Hubs. You don't need to set up, configure, and manage your own Kafka and Zookeeper clusters or use some Kafka-as-a-Service offering not native to Azure.

Applications

  • Application logging
  • Live dashboards
  • Transaction processing

Key architecture components

Event Hubs contains the following key components:

Event producers

Any entity that sends data to an event hub. Event publishers can publish events using HTTPS or AMQP 1.0 or Apache Kafka (1.0 and above)

Partitions

  • Event Hubs organizes sequences of events sent to an event hub into one or more partitions. As newer events arrive, they're added to the end of this sequence.
  • A partition can be thought of as a "commit log". Partitions hold event data that contains body of the event, a user-defined property bag describing the event, metadata such as its offset in the partition, its number in the stream sequence, and service-side timestamp at which it was accepted.
  • Partitioning helps for multiple parallel logs to be used for the same event hub and supports parallel processing.
  • You can use a partition key to map incoming event data into specific partitions for the purpose of data organization.
  • A per-device or user unique identity or geography makes a good partition key,
  • Each consumer only reads a specific subset, or partition, of the message stream.

Consumer groups

A consumer group is a view (state, position, or offset) of an entire event hub. Consumer groups enable consuming applications to each have a separate view of the event stream. They read the stream independently at their own pace and with their own offsets.

Throughput Units

Throughput units (standard tier) or processing units (premium tier) or capacity units (dedicated) : Pre-purchased units of capacity that control the throughput capacity of Event Hubs.

Checkpointing

  • Checkpointing is a process by which readers mark or commit their position within a partition event sequence.
  • Checkpointing is the responsibility of the consumer and occurs on a per-partition basis within a consumer group.
  • If an event processor disconnects from a partition, another instance can resume processing the partition at the checkpoint that was previously committed by the last processor of that partition in that consumer group.
  • When the processor connects, it can pass the offset to the event hub to specify the location at which to start reading
  • You use Azure Storage as the checkpoint store

Event receivers

  • Any entity that reads event data from an event hub. All Event Hubs consumers connect via the AMQP 1.0 session.
  • The Event Hubs service delivers events through a session as they become available. All Kafka consumers connect via the Kafka protocol 1.0 and later.

Demo - Create Event Hub

Create a resource group

  • Sign in to azure portal
  • Left navigation > Resources groups > Create
  • Choose subscription
  • Enter name: demo-azure
  • Region: Central India
  • Review & Create > Create

Create an Event Hubs namespace

  • Left navigation > All Services > Analytics > Event Hubs
  • Click Create or Create event hubs namespace
  • Choose subscription
  • Choose resource group
  • Enter name: demoeventhubnamespace
  • Location: Central India
  • Pricing Tier: Basic
  • Review and Create > Create
  • Go to resource

Create Event Hub

  • Go to the event hub namespace resource
  • Click Create Event Hub
  • Name: demoeventhubname
  • Create
  • Go to Side bar > Event Hubs

Create Event Sender Application

  • Open Visual Studio 2022
  • Create new project
  • Choose language: C#
  • Choose project types: Console
  • Select Console App
  • Project name: EventHubsSender
  • Solution name: AzureDemoEventHubs
  • Choose Framework: .Net 6.0
  • Create
  • Select Tools > NuGet Package Manager > Package Manager Console from the menu.
  • Install nuget package Install-Package Azure.Messaging.EventHubs
  • Add code
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using System.Text;

namespace EventHubsSender;

public class Program
{
    // connection string to the Event Hubs namespace
    private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";

    // name of the event hub
    private const string eventHubName = "<EVENT HUB NAME>";

    // number of events to be sent to the event hub
    private const int numOfEvents = 3;

    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    static EventHubProducerClient producerClient;

    static async Task Main()
    {
        // Create a producer client that you can use to send events to an event hub
        producerClient = new EventHubProducerClient(connectionString, eventHubName);

        // Create a batch of events 
        using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();

        for (int i = 1; i <= numOfEvents; i++)
        {
            if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"Event {i}"))))
            {
                // if it is too large for the batch
                throw new Exception($"Event {i} is too large for the batch and cannot be sent.");
            }
        }

        try
        {
            // Use the producer client to send the batch of events to the event hub
            await producerClient.SendAsync(eventBatch);
            Console.WriteLine($"A batch of {numOfEvents} events has been published.");
        }
        finally
        {
            await producerClient.DisposeAsync();
        }
    }
} 


  • Go to event hub namespace resource > Sidebar > Settings > Shared access policies > Click RootManageSharedAccessKey
  • Copy connection string - primary key and paste in the connectionString variable
  • Set the eventHubName to demoeventhubname
  • Run the project

View Messages

  • Go to the resource event hub namespace
  • In the overview page, See Messages

Create Azure Storage Account

You use Azure Storage as the checkpoint store.

  • Go to portal > Click Show Portal Menu > Storage Accounts > Create
  • Choose subscription
  • Choose Resource Group
  • Name: demoazurestorageaccount
  • Region: Central India
  • Performance: Standard
  • Redundancy: LRS
  • Review and Create
  • Go to resource

Create Storage Container

  • Sidebar > Data Storage > Containers > Click +
  • Name: data
  • Private no access: default
  • Ok

Copy Connection String

  • Go to storage account > Sidebar > Security + Networking > Access Keys > Show Keys
  • Copy key 1

Create EventHubsReceiver project

For most production scenarios, it is recommended that the EventProcessorClient be used for reading and processing events. Since the EventProcessorClient has a dependency on Azure Storage blobs for persistence of its state, you'll need to provide a BlobContainerClient for the processor, which has been configured for the storage account and container that should be used.

  • Right click solution
  • Add new project - Console App
  • Project Name: EventHubsReceiver
  • Frameword: .Net 6
  • Create
  • Install Package Install-Package Azure.Messaging.EventHubs Install-Package Azure.Messaging.EventHubs.Processor
  • Add Code
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using System.Text;

namespace EventHubsReceiver;

public class Program
{
    private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
    private const string blobContainerName = "<BLOB CONTAINER NAME>";

    static BlobContainerClient storageClient;

    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.        
    static EventProcessorClient processor;

    static async Task Main()
    {
        // Read from the default consumer group: $Default
        string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

        // Create a blob container client that the event processor will use 
        storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);

        // Create an event processor client to process events in the event hub
        processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);

        // Register handlers for processing events and handling errors
        processor.ProcessEventAsync += ProcessEventHandler;
        processor.ProcessErrorAsync += ProcessErrorHandler;

        // Start the processing
        await processor.StartProcessingAsync();

        // Wait for 30 seconds for the events to be processed
        await Task.Delay(TimeSpan.FromSeconds(30));

        // Stop the processing
        await processor.StopProcessingAsync();
    }

    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Write the body of the event to the console window
        Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));

        // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
        await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
    }

    static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }
}
  • Replace the eventhub namespace connection string, storage account connection string
  • Set eventhub name to demoeventhubname, blob container name to data
  • Run the project

Read events from an Event Hub

It is recommended that it not be used in production scenarios. Can be used for prototyping.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

References: