Azure Service Bus
Azure Service Bus is a fully managed enterprise message broker with message queues and publish-subscribe topics. Eventuous supports Azure Service Bus as both a producer and a subscription target using the Eventuous.Azure.ServiceBus package.
Producer
Section titled “Producer”The Azure Service Bus producer publishes messages to a queue or topic.
Configuration
Section titled “Configuration”The producer requires an Azure ServiceBusClient instance registered in the DI container:
builder.Services.AddSingleton(new ServiceBusClient(connectionString));Then register the producer with its options:
builder.Services.AddProducer<ServiceBusProducer, ServiceBusProducerOptions>( options => options.QueueOrTopicName = "my-topic");Producer options:
| Option | Description | Default |
|---|---|---|
QueueOrTopicName | Target queue or topic name (required) | — |
SenderOptions | Azure SDK ServiceBusSenderOptions for advanced tuning | null |
AttributeNames | Customizable message attribute name mapping | defaults |
Produce options
Section titled “Produce options”When producing messages, you can supply per-message options using ServiceBusProduceOptions:
| Option | Description |
|---|---|
Subject | Message subject |
To | Forward-to address |
ReplyTo | Reply-to address |
SessionId | Session id for session-enabled queues/topics |
ReplyToSessionId | Reply-to session id |
TimeToLive | Message time-to-live, default is TimeSpan.MaxValue |
Message metadata is mapped to Azure Service Bus application properties. The attribute names used for standard properties (message type, stream name, correlation id, etc.) can be customized via ServiceBusMessageAttributeNames.
Subscriptions
Section titled “Subscriptions”Eventuous supports consuming messages from Azure Service Bus queues and topics.
Configuration
Section titled “Configuration”Register a subscription using AddSubscription:
builder.Services.AddSubscription<ServiceBusSubscription, ServiceBusSubscriptionOptions>( "PaymentsIntegration", b => b .Configure(cfg => cfg.QueueOrTopic = new Queue("payments-queue")) .AddEventHandler<PaymentsHandler>());The QueueOrTopic property determines the source of messages. Three options are available:
| Type | Description |
|---|---|
Queue(name) | Subscribe to a queue |
Topic(name) | Subscribe to a topic using the subscription id as the subscription name |
TopicAndSubscription(topic, subscription) | Subscribe to a topic with an explicit subscription name |
Examples:
// Queue subscriptioncfg.QueueOrTopic = new Queue("my-queue");
// Topic subscription (uses subscription id as the subscription name)cfg.QueueOrTopic = new Topic("my-topic");
// Topic subscription with explicit subscription namecfg.QueueOrTopic = new TopicAndSubscription("my-topic", "my-subscription");Subscription options
Section titled “Subscription options”| Option | Description |
|---|---|
QueueOrTopic | Queue or topic to subscribe to (required) |
ProcessorOptions | Azure SDK ServiceBusProcessorOptions for standard processing |
SessionProcessorOptions | Enable session-based processing (see below) |
AttributeNames | Message attribute name mapping |
ErrorHandler | Custom error handling delegate |
Session support
Section titled “Session support”Azure Service Bus supports sessions for ordered message processing within a session group. To enable session-based processing, configure SessionProcessorOptions:
builder.Services.AddSubscription<ServiceBusSubscription, ServiceBusSubscriptionOptions>( "OrderProcessing", b => b .Configure(cfg => { cfg.QueueOrTopic = new Queue("orders-session-queue"); cfg.SessionProcessorOptions = new ServiceBusSessionProcessorOptions { MaxConcurrentSessions = 8 }; }) .AddEventHandler<OrderHandler>());When SessionProcessorOptions is set, the subscription uses ServiceBusSessionProcessor instead of the standard ServiceBusProcessor. This ensures messages with the same SessionId are processed in order.
To set the session id when producing messages, use ServiceBusProduceOptions.SessionId.
Error handling
Section titled “Error handling”By default, errors are logged. You can override this by providing a custom ErrorHandler delegate on the subscription options:
cfg.ErrorHandler = args => { logger.LogError(args.Exception, "Service Bus error"); return Task.CompletedTask;};