Skip to content

Azure Service Bus

Azure Service Bus is a fully managed enterprise message broker with message queues and publish-subscribe topics. Eventuous supports Azure Service Bus as both a producer and a subscription target using the Eventuous.Azure.ServiceBus package.

The Azure Service Bus producer publishes messages to a queue or topic.

The producer requires an Azure ServiceBusClient instance registered in the DI container:

builder.Services.AddSingleton(new ServiceBusClient(connectionString));

Then register the producer with its options:

builder.Services.AddProducer<ServiceBusProducer, ServiceBusProducerOptions>(
options => options.QueueOrTopicName = "my-topic"
);

Producer options:

OptionDescriptionDefault
QueueOrTopicNameTarget queue or topic name (required)
SenderOptionsAzure SDK ServiceBusSenderOptions for advanced tuningnull
AttributeNamesCustomizable message attribute name mappingdefaults

When producing messages, you can supply per-message options using ServiceBusProduceOptions:

OptionDescription
SubjectMessage subject
ToForward-to address
ReplyToReply-to address
SessionIdSession id for session-enabled queues/topics
ReplyToSessionIdReply-to session id
TimeToLiveMessage time-to-live, default is TimeSpan.MaxValue

Message metadata is mapped to Azure Service Bus application properties. The attribute names used for standard properties (message type, stream name, correlation id, etc.) can be customized via ServiceBusMessageAttributeNames.

Eventuous supports consuming messages from Azure Service Bus queues and topics.

Register a subscription using AddSubscription:

builder.Services.AddSubscription<ServiceBusSubscription, ServiceBusSubscriptionOptions>(
"PaymentsIntegration",
b => b
.Configure(cfg => cfg.QueueOrTopic = new Queue("payments-queue"))
.AddEventHandler<PaymentsHandler>()
);

The QueueOrTopic property determines the source of messages. Three options are available:

TypeDescription
Queue(name)Subscribe to a queue
Topic(name)Subscribe to a topic using the subscription id as the subscription name
TopicAndSubscription(topic, subscription)Subscribe to a topic with an explicit subscription name

Examples:

// Queue subscription
cfg.QueueOrTopic = new Queue("my-queue");
// Topic subscription (uses subscription id as the subscription name)
cfg.QueueOrTopic = new Topic("my-topic");
// Topic subscription with explicit subscription name
cfg.QueueOrTopic = new TopicAndSubscription("my-topic", "my-subscription");
OptionDescription
QueueOrTopicQueue or topic to subscribe to (required)
ProcessorOptionsAzure SDK ServiceBusProcessorOptions for standard processing
SessionProcessorOptionsEnable session-based processing (see below)
AttributeNamesMessage attribute name mapping
ErrorHandlerCustom error handling delegate

Azure Service Bus supports sessions for ordered message processing within a session group. To enable session-based processing, configure SessionProcessorOptions:

builder.Services.AddSubscription<ServiceBusSubscription, ServiceBusSubscriptionOptions>(
"OrderProcessing",
b => b
.Configure(cfg => {
cfg.QueueOrTopic = new Queue("orders-session-queue");
cfg.SessionProcessorOptions = new ServiceBusSessionProcessorOptions {
MaxConcurrentSessions = 8
};
})
.AddEventHandler<OrderHandler>()
);

When SessionProcessorOptions is set, the subscription uses ServiceBusSessionProcessor instead of the standard ServiceBusProcessor. This ensures messages with the same SessionId are processed in order.

To set the session id when producing messages, use ServiceBusProduceOptions.SessionId.

By default, errors are logged. You can override this by providing a custom ErrorHandler delegate on the subscription options:

cfg.ErrorHandler = args => {
logger.LogError(args.Exception, "Service Bus error");
return Task.CompletedTask;
};