Azure Bridge
Azure EventHubs Bridge Plugin
Introduction
The Azure EventHubs Bridge plugin can be used to send messages from the Mosquitto broker to one or several Azure EventHubs resources.
Features:
- Topic based message routing, supporting 1:1, 1:n, m:1, and m:n mappings between topics and event hubs (see Topic Mappings)
- Authentication methods including Shared Access Signature (SAS) keys and Role-Based Access Control (RBAC) (see Eventhubs)
- Support partition-based message ordering via partitionKey (when using JSON payloads) or partitionId (see Producers)
- Dynamic functions to provide timestamp-based values for correlation tracking (see Producers)
- Batched or buffered message forwarding (see Producers)
- Configurable retry mechanisms (see Producers)
Note: the plugin cannot configure any referenced Azure EventHubs resources. This has to be done upfront.
Plugin activation
Ensure that the broker has a suitable license for using the Azure EventHubs Bridge plugin.
If that is the case the plugin can be activated by adding the following to the mosquitto.conf
file:
plugin /usr/lib/cedalo_azure_eventhubs.so
persistence_location /mosquitto/data
Note: for a single node broker scenario it is currently required to add a file named Azure-EventHubs-Bridge.json
within the specified persistence_location
folder.
This file may be empty as the configuration is managed via corresponding control api.
Configuration
The configuration of the bridge is achieved by providing a configuration file following the JSON schema below. You can modify the configuration file using the MQTT API for the Azure Eventhub Bridge. This is described in detail here. Another option is to use the user interface provided with the Cedalo Mosquitto Platform, which provides a webpage to modify and update the configuration. This option is described below in the Platform UI section
Configuration File Format
The configuration consist of three main parts:
eventhubs
: define one or several Azure EventHubs resources (Eventhubs)producers
: define resource targets to forward messages to (Producers)topicMappings
: define which messages should be forwarded and where (Topic Mappings)
Eventhubs
Each Azure EventHubs resource is referenced by its name (aka namespace) and is authenticated either via a provided
Shared Access Signature (SAS) key or through Role-Based Access Control (RBAC).
Note that the RBAC authentication uses a fully qualified namespace to reference the Azure resource.
If this property is not provided it defaults to the specified name postfixed with servicebus.windows.net
, i.e. <name>.servicebus.windows.net
.
For SAS this is not required as the key contains the endpoint already.
Following is an example of a resource configuration which uses RBAC for authentication:
{
"name": "EventHubsResource",
"auth": {
"rbac": {
"tenantId": "12345677-6171-47aa-b47b-1234567d89de",
"clientId": "abcdef12-aa90-4190-9cfc-123456789abc",
"clientSecret": "aVerySecretClientSecret",
"fullyQualifiedNamespace": "resource_namespace.servicebus.windows.net"
}
}
}
Producers
The producer settings control how messages are sent to individual event hubs. To authenticate against the
Azure resource the auth
property of the referenced eventhub
is used. However, it is also possible to override these
auth settings for each producer.
The bridge supports two types of producers, batch
or buffered
. The batch producer forwards messages immediately,
whereas a buffered producer collects messages before sending them to optimize throughput. If the type is not specified
a batch producer is created by default.
Following options can be used to define how messages are sent to different hubs:
name
: a unique name for this producer to identify in mappingshub
: the event hub name where messages will be sent totype
: eitherbatch
to create a batch producer orbuffered
to use a buffered oneeventhub
: name of defined EventHubs resourceauth
: optional authentication credentials for this producer. If provided these credentials override the ones from referenced eventhuboptions
: general producer optionsmessageId
: used for tracking the source of an event in logscorrelationId
: used for debugging or tracking events. Supports dynamic placeholders like{NOW}
or{NOW_ISO}
partitionId
: specifies the Azure event hub partition for in-order message processing. Do not usepartitionKey
at the same timepartitionKey
: specifies a dynamic partition key (only for JSON payloads). Supports JSON-path-based extractionfield
: specifies the JSON-path to extract a value for partitioningdefault
: defines a fallback value if the JSON-path cannot be resolved
retryOptions
: configures message retry behavior when an event fails to sendmode
: defines the retry strategy, use one of following values:fix
: uses a fixed retry delayexponential
: increases retry delay exponentially untilmaxRetryDelayInMs
is reached
delayInMs
: specifies the fixed delay in milliseconds before retryingmaxDelayInMs
: defines the maximum delay before a retry in exponential modemaxRetries
specifies the maximum number of retry attempts before giving uptimeoutInMs
: defines the wait time before the request is declared as timed out
batch
: options that only apply to batch producermaxSizeInBytes
defines the maximum batch size for sending messages. If not set, the default value from Azure EventHubs will apply.maxOpenBatches
: defines the maximum number of parallel open batches which currently sent or retry sending at the same time. If not set, the default value is 10
buffer
: options that only apply to buffered producermaxWaitTimeInMs
: milliseconds to wait for next message, if none is received within this period the buffer will send all its messagesmaxEventBufferLengthPerPartition
: defines the maximum number of messages buffered per partition before sending
Following producer example creates a buffered producer and uses the previously defined eventhubs resource but overrides
its fullQualifiedNamespace
property to target a different endpoint.
{
"name": "Producer1",
"type": "buffered",
"hub": "hub1",
"eventhub": "EventHubsResource",
"options": {
"messageId": "Event sent at {NOW_ISO}"
},
"auth": {
"rbac": {
"fullyQualifiedNamespace": "another_resource_namespace.servicebus.windows.net"
}
},
"retryOptions": {
"mode": "fix",
"maxRetries": 100,
"timeoutInMs": 1000
},
"buffer": {
"maxWaitTimeInMs": 1000
}
}
Topic Mappings
Topic mappings define how broker messages are forwarded to EventHubs endpoints.
Wildcards like +
(single level) or #
(multi-level) are supported too.
The bridge allows various types of
mappings between MQTT topics and event hubs:
1:1
: a single topic is mapped to a single event hub1:n
: a single topic is mapped to multiple event hubsm:1
: multiple topics are forwarded to a single event hubm:n
: multiple topics are forwarded to multiple event hubs
Following is an example of a m:n
topic mapping:
[
{
"mqttTopics": ["topic/test1","topic/test2"],
"target": "Producer1"
},
{
"mqttTopics": ["topic/test1", "topic/test2"],
"target": "Producer2"
}
]
Platform UI
JSON Schema
Schema for the Microsoft Azure® Bridge, which describes all configuration options for the bridge.
{
"title": "Azure Bridge Configuration",
"description": "",
"type": "object",
"properties": {
"eventhubs": {
"description": "The Azure EventHubs Bridge can forward messages to multiple EventHubs resources.",
"type": "array",
"items": {
"description": "An Azure EventHubs resource.",
"type": "object",
"properties": {
"name": {
"description": "Unique namespace identifier of an EventHubs",
"type": "string",
"minLength": 1
},
"auth": {
"description": "EventHubs specific authentication. Either specify a Shared Access Signature (SAS) key or use Role Based Access Control (RBAC)",
"type": "object",
"properties": {
"sas": {
"type": "object",
"properties": {
"key": {
"type": "string",
"minLength": 1
}
},
"required": [
"key"
]
},
"rbac": {
"type": "object",
"properties": {
"tenantId": {
"type": "string",
"minLength": 1
},
"clientId": {
"type": "string",
"minLength": 1
},
"clientSecret": {
"type": "string",
"minLength": 1
},
"fullyQualifiedNamespace": {
"description": "Full namespace qualifier. If not specified \"<namespace>.servicebus.windows.net\" will be used",
"type": "string",
"minLength": 1
}
},
"required": []
}
},
"oneOf": [
{
"required": [
"sas"
]
},
{
"required": [
"rbac"
]
}
]
}
},
"required": [
"name",
"auth"
]
}
},
"producers": {
"description": "Per producer settings for each event hub within an EventHubs resource",
"type": "array",
"items": {
"description": "Producer settings for each event hub within an EventHubs resource",
"type": "object",
"properties": {
"name": {
"description": "Unique name for this producer to use and identify it in mappings",
"type": "string",
"minLength": 1
},
"type": {
"description": "Use a batch producer for direct message passing or a buffered producer to collect several messages before sending to eventhub",
"type": "string",
"default": "batch",
"enum": [
"batch",
"buffered"
]
},
"hub": {
"description": "Name of the hub to send messages to",
"type": "string",
"minLength": 1
},
"eventhub": {
"description": "Namespace of the EventHubs to use",
"type": "string",
"minLength": 1
},
"auth": {
"description": "Producer specific authentication. Specify only if differ from EventHubs auth",
"type": "object",
"properties": {
"sas": {
"type": "object",
"properties": {
"key": {
"type": "string",
"minLength": 1
}
},
"required": [
"key"
]
},
"rbac": {
"type": "object",
"properties": {
"tenantId": {
"type": "string",
"minLength": 1
},
"clientId": {
"type": "string",
"minLength": 1
},
"clientSecret": {
"type": "string",
"minLength": 1
},
"fullyQualifiedNamespace": {
"description": "Full namespace qualifier. If not specified \"<namespace>.servicebus.windows.net\" will be used",
"type": "string",
"minLength": 1
}
},
"required": []
}
},
"oneOf": [
{
"required": [
"sas"
]
},
{
"required": [
"rbac"
]
}
]
},
"options": {
"description": "General producer options",
"type": "object",
"properties": {
"messageId": {
"description": "Custom, may be useful for tracking event source",
"type": "string"
},
"correlationId": {
"description": "Custom, may be useful for debbuing or tracking events",
"type": "string"
},
"partitionId": {
"description": "Partition id like \"0\", \"1\",... This enables ordered event process on same partition. Either specify this or partitionKey, but not both!",
"type": "string"
},
"partitionKey": {
"description": "A fix or dynamic value. This enables ordered event process on same partition. Either specify this or partitionId, but not both!",
"type": "object",
"properties": {
"field": {
"description": "JSON-path wich specifies a field within message payload. To use a fix value specify default only.",
"type": "string"
},
"default": {
"description": "Either used as default if field could not be resolved or as fix value if no field is specified"
}
}
}
},
"anyOf": [
{
"required": [
"partitionId"
],
"not": {
"required": [
"partitionKey"
]
}
},
{
"required": [
"partitionKey"
],
"not": {
"required": [
"partitionId"
]
}
},
{
"not": {
"required": [
"partitionId",
"partitionKey"
]
}
}
]
},
"retryOptions": {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": [
"fix",
"exponential"
]
},
"delayInMs": {
"description": "Period in milliseconds to wait until next retry",
"type": "number"
},
"maxDelayInMs": {
"description": "Max. delay in milliseconds to wait before next retry. Only useful if mode is exponential",
"type": "number"
},
"maxRetries": {
"description": "Max. number of retries",
"type": "number"
},
"timeoutInMs": {
"description": "Milliseconds to wait before declaring current request as timed out",
"type": "number"
}
}
},
"batch": {
"description": "Options to apply for batch producer only",
"type": "object",
"properties": {
"maxSizeInBytes": {
"description": "Max. batch size",
"type": "number"
},
"maxOpenBatches": {
"description": "Max. number of batches sending or retrying at the same time. Default is 10",
"type": "number"
}
}
},
"buffer": {
"description": "Options for buffered producer only",
"type": "object",
"properties": {
"maxWaitTimeInMs": {
"description": "Milliseconds to wait for next message. If none is received during this period the buffered producer sends all of its messages.",
"type": "number"
},
"maxEventBufferLengthPerPartitions": {
"description": "Max. buffer length. If this limit is reached the buffer sends all of its messages.",
"type": "number"
}
}
}
},
"required": [
"name",
"hub",
"eventhub"
]
}
},
"topicMappings": {
"description": "Descriptions for how to map MQTT topics to azure event hubs",
"type": "array",
"items": {
"type": "object",
"properties": {
"mqttTopics": {
"description": "List of MQTT topics to map",
"type": "array",
"items": {
"description": "MQTT topic to map",
"type": "string",
"minLength": 1
}
},
"target": {
"description": "Name of defined producer to send message to",
"type": "string",
"minLength": 1
}
},
"required": [
"mqttTopics",
"target"
]
}
}
},
"required": [
"eventhubs",
"producers"
]
}