Skip to main content
Version: Next

Azure Bridge

Premium

Azure EventHubs Bridge Plugin

Introduction

The Azure EventHubs Bridge plugin can be used to send messages from the Mosquitto broker to one or several Azure EventHubs resources.

Features:

  • Topic based message routing, supporting 1:1, 1:n, m:1, and m:n mappings between topics and event hubs (see Topic Mappings)
  • Authentication methods including Shared Access Signature (SAS) keys and Role-Based Access Control (RBAC) (see Eventhubs)
  • Support partition-based message ordering via partitionKey (when using JSON payloads) or partitionId (see Producers)
  • Dynamic functions to provide timestamp-based values for correlation tracking (see Producers)
  • Batched or buffered message forwarding (see Producers)
  • Configurable retry mechanisms (see Producers)

Note: the plugin cannot configure any referenced Azure EventHubs resources. This has to be done upfront.

Plugin activation

Ensure that the broker has a suitable license for using the Azure EventHubs Bridge plugin. If that is the case the plugin can be activated by adding the following to the mosquitto.conf file:

plugin /usr/lib/cedalo_azure_eventhubs.so
persistence_location /mosquitto/data

Note: for a single node broker scenario it is currently required to add a file named Azure-EventHubs-Bridge.json within the specified persistence_location folder. This file may be empty as the configuration is managed via corresponding control api.

Configuration

The configuration of the bridge is achieved by providing a configuration file following the JSON schema below. You can modify the configuration file using the MQTT API for the Azure Eventhub Bridge. This is described in detail here. Another option is to use the user interface provided with the Cedalo Mosquitto Platform, which provides a webpage to modify and update the configuration. This option is described below in the Platform UI section

Configuration File Format

The configuration consist of three main parts:

  • eventhubs: define one or several Azure EventHubs resources (Eventhubs)
  • producers: define resource targets to forward messages to (Producers)
  • topicMappings: define which messages should be forwarded and where (Topic Mappings)

Eventhubs

Each Azure EventHubs resource is referenced by its name (aka namespace) and is authenticated either via a provided Shared Access Signature (SAS) key or through Role-Based Access Control (RBAC). Note that the RBAC authentication uses a fully qualified namespace to reference the Azure resource. If this property is not provided it defaults to the specified name postfixed with servicebus.windows.net, i.e. <name>.servicebus.windows.net. For SAS this is not required as the key contains the endpoint already.

Following is an example of a resource configuration which uses RBAC for authentication:

{
"name": "EventHubsResource",
"auth": {
"rbac": {
"tenantId": "12345677-6171-47aa-b47b-1234567d89de",
"clientId": "abcdef12-aa90-4190-9cfc-123456789abc",
"clientSecret": "aVerySecretClientSecret",
"fullyQualifiedNamespace": "resource_namespace.servicebus.windows.net"
}
}
}

Producers

The producer settings control how messages are sent to individual event hubs. To authenticate against the Azure resource the auth property of the referenced eventhub is used. However, it is also possible to override these auth settings for each producer.

The bridge supports two types of producers, batch or buffered. The batch producer forwards messages immediately, whereas a buffered producer collects messages before sending them to optimize throughput. If the type is not specified a batch producer is created by default.

Following options can be used to define how messages are sent to different hubs:

  • name: a unique name for this producer to identify in mappings
  • hub: the event hub name where messages will be sent to
  • type: either batch to create a batch producer or buffered to use a buffered one
  • eventhub: name of defined EventHubs resource
  • auth: optional authentication credentials for this producer. If provided these credentials override the ones from referenced eventhub
  • options: general producer options
    • messageId: used for tracking the source of an event in logs
    • correlationId: used for debugging or tracking events. Supports dynamic placeholders like {NOW} or {NOW_ISO}
    • partitionId: specifies the Azure event hub partition for in-order message processing. Do not use partitionKey at the same time
    • partitionKey: specifies a dynamic partition key (only for JSON payloads). Supports JSON-path-based extraction
      • field: specifies the JSON-path to extract a value for partitioning
      • default: defines a fallback value if the JSON-path cannot be resolved
  • retryOptions: configures message retry behavior when an event fails to send
    • mode: defines the retry strategy, use one of following values:
      • fix: uses a fixed retry delay
      • exponential: increases retry delay exponentially until maxRetryDelayInMs is reached
    • delayInMs: specifies the fixed delay in milliseconds before retrying
    • maxDelayInMs: defines the maximum delay before a retry in exponential mode
    • maxRetries specifies the maximum number of retry attempts before giving up
    • timeoutInMs: defines the wait time before the request is declared as timed out
  • batch: options that only apply to batch producer
    • maxSizeInBytes defines the maximum batch size for sending messages. If not set, the default value from Azure EventHubs will apply.
    • maxOpenBatches: defines the maximum number of parallel open batches which currently sent or retry sending at the same time. If not set, the default value is 10
  • buffer: options that only apply to buffered producer
    • maxWaitTimeInMs: milliseconds to wait for next message, if none is received within this period the buffer will send all its messages
    • maxEventBufferLengthPerPartition: defines the maximum number of messages buffered per partition before sending

Following producer example creates a buffered producer and uses the previously defined eventhubs resource but overrides its fullQualifiedNamespace property to target a different endpoint.

{
"name": "Producer1",
"type": "buffered",
"hub": "hub1",
"eventhub": "EventHubsResource",
"options": {
"messageId": "Event sent at {NOW_ISO}"
},
"auth": {
"rbac": {
"fullyQualifiedNamespace": "another_resource_namespace.servicebus.windows.net"
}
},
"retryOptions": {
"mode": "fix",
"maxRetries": 100,
"timeoutInMs": 1000
},
"buffer": {
"maxWaitTimeInMs": 1000
}
}

Topic Mappings

Topic mappings define how broker messages are forwarded to EventHubs endpoints. Wildcards like + (single level) or # (multi-level) are supported too. The bridge allows various types of mappings between MQTT topics and event hubs:

  • 1:1: a single topic is mapped to a single event hub
  • 1:n: a single topic is mapped to multiple event hubs
  • m:1: multiple topics are forwarded to a single event hub
  • m:n: multiple topics are forwarded to multiple event hubs

Following is an example of a m:n topic mapping:

[
{
"mqttTopics": ["topic/test1","topic/test2"],
"target": "Producer1"
},
{
"mqttTopics": ["topic/test1", "topic/test2"],
"target": "Producer2"
}
]

Platform UI

JSON Schema

Schema for the Microsoft Azure® Bridge, which describes all configuration options for the bridge.

{
"title": "Azure Bridge Configuration",
"description": "",
"type": "object",
"properties": {
"eventhubs": {
"description": "The Azure EventHubs Bridge can forward messages to multiple EventHubs resources.",
"type": "array",
"items": {
"description": "An Azure EventHubs resource.",
"type": "object",
"properties": {
"name": {
"description": "Unique namespace identifier of an EventHubs",
"type": "string",
"minLength": 1
},
"auth": {
"description": "EventHubs specific authentication. Either specify a Shared Access Signature (SAS) key or use Role Based Access Control (RBAC)",
"type": "object",
"properties": {
"sas": {
"type": "object",
"properties": {
"key": {
"type": "string",
"minLength": 1
}
},
"required": [
"key"
]
},
"rbac": {
"type": "object",
"properties": {
"tenantId": {
"type": "string",
"minLength": 1
},
"clientId": {
"type": "string",
"minLength": 1
},
"clientSecret": {
"type": "string",
"minLength": 1
},
"fullyQualifiedNamespace": {
"description": "Full namespace qualifier. If not specified \"<namespace>.servicebus.windows.net\" will be used",
"type": "string",
"minLength": 1
}
},
"required": []
}
},
"oneOf": [
{
"required": [
"sas"
]
},
{
"required": [
"rbac"
]
}
]
}
},
"required": [
"name",
"auth"
]
}
},
"producers": {
"description": "Per producer settings for each event hub within an EventHubs resource",
"type": "array",
"items": {
"description": "Producer settings for each event hub within an EventHubs resource",
"type": "object",
"properties": {
"name": {
"description": "Unique name for this producer to use and identify it in mappings",
"type": "string",
"minLength": 1
},
"type": {
"description": "Use a batch producer for direct message passing or a buffered producer to collect several messages before sending to eventhub",
"type": "string",
"default": "batch",
"enum": [
"batch",
"buffered"
]
},
"hub": {
"description": "Name of the hub to send messages to",
"type": "string",
"minLength": 1
},
"eventhub": {
"description": "Namespace of the EventHubs to use",
"type": "string",
"minLength": 1
},
"auth": {
"description": "Producer specific authentication. Specify only if differ from EventHubs auth",
"type": "object",
"properties": {
"sas": {
"type": "object",
"properties": {
"key": {
"type": "string",
"minLength": 1
}
},
"required": [
"key"
]
},
"rbac": {
"type": "object",
"properties": {
"tenantId": {
"type": "string",
"minLength": 1
},
"clientId": {
"type": "string",
"minLength": 1
},
"clientSecret": {
"type": "string",
"minLength": 1
},
"fullyQualifiedNamespace": {
"description": "Full namespace qualifier. If not specified \"<namespace>.servicebus.windows.net\" will be used",
"type": "string",
"minLength": 1
}
},
"required": []
}
},
"oneOf": [
{
"required": [
"sas"
]
},
{
"required": [
"rbac"
]
}
]
},
"options": {
"description": "General producer options",
"type": "object",
"properties": {
"messageId": {
"description": "Custom, may be useful for tracking event source",
"type": "string"
},
"correlationId": {
"description": "Custom, may be useful for debbuing or tracking events",
"type": "string"
},
"partitionId": {
"description": "Partition id like \"0\", \"1\",... This enables ordered event process on same partition. Either specify this or partitionKey, but not both!",
"type": "string"
},
"partitionKey": {
"description": "A fix or dynamic value. This enables ordered event process on same partition. Either specify this or partitionId, but not both!",
"type": "object",
"properties": {
"field": {
"description": "JSON-path wich specifies a field within message payload. To use a fix value specify default only.",
"type": "string"
},
"default": {
"description": "Either used as default if field could not be resolved or as fix value if no field is specified"
}
}
}
},
"anyOf": [
{
"required": [
"partitionId"
],
"not": {
"required": [
"partitionKey"
]
}
},
{
"required": [
"partitionKey"
],
"not": {
"required": [
"partitionId"
]
}
},
{
"not": {
"required": [
"partitionId",
"partitionKey"
]
}
}
]
},
"retryOptions": {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": [
"fix",
"exponential"
]
},
"delayInMs": {
"description": "Period in milliseconds to wait until next retry",
"type": "number"
},
"maxDelayInMs": {
"description": "Max. delay in milliseconds to wait before next retry. Only useful if mode is exponential",
"type": "number"
},
"maxRetries": {
"description": "Max. number of retries",
"type": "number"
},
"timeoutInMs": {
"description": "Milliseconds to wait before declaring current request as timed out",
"type": "number"
}
}
},
"batch": {
"description": "Options to apply for batch producer only",
"type": "object",
"properties": {
"maxSizeInBytes": {
"description": "Max. batch size",
"type": "number"
},
"maxOpenBatches": {
"description": "Max. number of batches sending or retrying at the same time. Default is 10",
"type": "number"
}
}
},
"buffer": {
"description": "Options for buffered producer only",
"type": "object",
"properties": {
"maxWaitTimeInMs": {
"description": "Milliseconds to wait for next message. If none is received during this period the buffered producer sends all of its messages.",
"type": "number"
},
"maxEventBufferLengthPerPartitions": {
"description": "Max. buffer length. If this limit is reached the buffer sends all of its messages.",
"type": "number"
}
}
}
},
"required": [
"name",
"hub",
"eventhub"
]
}
},
"topicMappings": {
"description": "Descriptions for how to map MQTT topics to azure event hubs",
"type": "array",
"items": {
"type": "object",
"properties": {
"mqttTopics": {
"description": "List of MQTT topics to map",
"type": "array",
"items": {
"description": "MQTT topic to map",
"type": "string",
"minLength": 1
}
},
"target": {
"description": "Name of defined producer to send message to",
"type": "string",
"minLength": 1
}
},
"required": [
"mqttTopics",
"target"
]
}
}
},
"required": [
"eventhubs",
"producers"
]
}