Skip to main content
Version: Next

Azure EventHubs Bridge

Premium
Version 3.1
info

This plugin is configurable via the Cedalo MQTT platform UI, which simplifies the process of updating config values without the need to restart the broker. Additionally, config updates are synchronized accross cluster nodes, reducing the administrative overhead in clustered environments. Find more information about this here.

The Azure EventHubs Bridge plugin can be used to send messages from the Mosquitto broker to one or several Azure EventHubs resources.

Features:

  • Topic based message routing, supporting 1:1, 1:n, m:1, and m:n mappings between topics and event hubs (see Topic Mappings)
  • Authentication methods including Shared Access Signature (SAS) keys and Role-Based Access Control (RBAC) (see Eventhubs)
  • Support partition-based message ordering via partitionKey (when using JSON payloads) or partitionId (see Producers)
  • Dynamic functions to provide timestamp-based values for correlation tracking (see Producers)
  • Batched or buffered message forwarding (see Producers)
  • Configurable retry mechanisms (see Producers)

Note: the plugin cannot configure any referenced Azure EventHubs resources. This has to be done upfront.

Plugin activation

Ensure that the broker has a suitable license for using the Azure EventHubs Bridge plugin. If that is the case the plugin can be activated by adding the following to the mosquitto.conf file:

plugin /usr/lib/cedalo_azure_eventhubs.so
persistence_location /mosquitto/data

Configuration

The configuration of the bridge is achieved by providing a configuration file following the JSON schema below. You can modify the configuration file using the MQTT API for the Azure Eventhub Bridge. This is described in detail control api and here. Another option is to use the user interface provided with the Cedalo Mosquitto Platform, which provides a webpage to modify and update the configuration. This option is described below in the Bridges Platform UI section

Configuration File Format

The configuration consist of three main parts as well as optional schemaMappings:

  • eventhubs: define one or several Azure EventHubs resources (Eventhubs)
  • producers: define resource targets to forward messages to (Producers)
  • topicMappings: define which messages are to be forwarded and where to (Topic Mappings)
  • schemaMappings: optionally define how payloads need to be formated and which metadata they must include (Schema Mappings)

Eventhubs

Each Azure EventHubs resource is referenced by its name (aka namespace) and is authenticated either via a provided Shared Access Signature (SAS) key or through Role-Based Access Control (RBAC). Note that the RBAC authentication uses a fully qualified namespace to reference the Azure resource. If this property is not provided it defaults to the specified name postfixed with servicebus.windows.net, i.e. <name>.servicebus.windows.net. For SAS this is not required as the key contains the endpoint already.

Following is an example of a resource configuration which uses RBAC for authentication:

{
"name": "EventHubsResource",
"auth": {
"rbac": {
"tenantId": "12345677-6171-47aa-b47b-1234567d89de",
"clientId": "abcdef12-aa90-4190-9cfc-123456789abc",
"clientSecret": "aVerySecretClientSecret",
"fullyQualifiedNamespace": "resource_namespace.servicebus.windows.net"
}
}
}

Producers

The producer settings control how messages are sent to individual event hubs. To authenticate against the Azure resource the auth property of the referenced eventhub is used. However, it is also possible to override these auth settings for each producer.

The bridge supports two types of producers, batch or buffered. The batch producer forwards messages immediately, whereas a buffered producer collects messages before sending them to optimize throughput. If the type is not specified a batch producer is created by default.

Following options can be used to define how messages are sent to different hubs:

  • name: a unique name for this producer to identify in mappings
  • hub: the event hub name where messages will be sent to
  • type: either batch to create a batch producer or buffered to use a buffered one
  • eventhub: name of defined EventHubs resource
  • auth: optional authentication credentials for this producer. If provided these credentials override the ones from referenced eventhub
  • options: general producer options
    • messageId: used for tracking the source of an event in logs
    • correlationId: used for debugging or tracking events. Supports dynamic placeholders like {NOW} or {NOW_ISO}
    • partitionId: specifies the Azure event hub partition for in-order message processing. Do not use partitionKey at the same time
    • partitionKey: specifies a dynamic partition key (only for JSON payloads). Supports JSON-path-based extraction
      • field: specifies the JSON-path to extract a value for partitioning
      • default: defines a fallback value if the JSON-path cannot be resolved
  • retryOptions: configures message retry behavior when an event fails to send
    • mode: defines the retry strategy, use one of following values:
      • fix: uses a fixed retry delay
      • exponential: increases retry delay exponentially until maxRetryDelayInMs is reached
    • delayInMs: specifies the fixed delay in milliseconds before retrying
    • maxDelayInMs: defines the maximum delay before a retry in exponential mode
    • maxRetries specifies the maximum number of retry attempts before giving up
    • timeoutInMs: defines the wait time before the request is declared as timed out
  • batch: options that only apply to batch producer
    • maxSizeInBytes defines the maximum batch size for sending messages. If not set, the default value from Azure EventHubs will apply.
    • maxOpenBatches: defines the maximum number of parallel open batches which currently sent or retry sending at the same time. If not set, the default value is 10
  • buffer: options that only apply to buffered producer
    • maxWaitTimeInMs: milliseconds to wait for next message, if none is received within this period the buffer will send all its messages
    • maxEventBufferLengthPerPartition: defines the maximum number of messages buffered per partition before sending

Following producer example creates a buffered producer and uses the previously defined eventhubs resource but overrides its fullQualifiedNamespace property to target a different endpoint.

{
"name": "Producer1",
"type": "buffered",
"hub": "hub1",
"eventhub": "EventHubsResource",
"options": {
"messageId": "Event sent at {NOW_ISO}"
},
"auth": {
"rbac": {
"fullyQualifiedNamespace": "another_resource_namespace.servicebus.windows.net"
}
},
"retryOptions": {
"mode": "fix",
"maxRetries": 100,
"timeoutInMs": 1000
},
"buffer": {
"maxWaitTimeInMs": 1000
}
}

Topic Mappings

Topic mappings define how broker messages are forwarded to EventHubs endpoints. Wildcards like + (single level) or # (multi-level) are supported too. The bridge allows various types of mappings between MQTT topics and event hubs:

  • 1:1: a single topic is mapped to a single event hub
  • 1:n: a single topic is mapped to multiple event hubs
  • m:1: multiple topics are forwarded to a single event hub
  • m:n: multiple topics are forwarded to multiple event hubs

Following is an example of a m:n topic mapping:

[
{
"mqttTopics": ["topic/test1", "topic/test2"],
"target": "Producer1"
},
{
"mqttTopics": ["topic/test1", "topic/test2"],
"target": "Producer2"
}
]

If you want to format your resulting payloads in a certain way or include some MQTT message metadata into them, you can specify an addional key called schemaMapping which will point to a name of the respective schema mapping under schemaMappings.

[
{
"mqttTopics": ["topic/test1", "topic/test2"],
"target": "Producer1",
"schemaMapping": "Mapping1"
},
{
"mqttTopics": ["topic/+/prod/#"],
"target": "Producer2",
"schemaMapping": "Mapping1"
},
{
"mqttTopics": ["topic/test1", "topic/test2"],
"target": "Producer2",
"schemaMapping": "Mapping2"
}
]

Alternatively, you can specify the entire schema mapping in-place:

[
{
"mqttTopics": ["topic/test1", "topic/test2"],
"target": "Producer1",
"schemaMapping": {
"mapping": [
{ "source": "[topic]", "target": "[metadata][topic]" },
{ "source": "[payload]", "target": "[payload]" }
]
}
}
]

Topic Mappings

Topic mappings define how broker messages are forwarded to EventHubs endpoints. Wildcards like + (single level) or # (multi-level) are supported too. The bridge allows various types of mappings between MQTT topics and event hubs:

  • 1:1: a single topic is mapped to a single event hub
  • 1:n: a single topic is mapped to multiple event hubs
  • m:1: multiple topics are forwarded to a single event hub
  • m:n: multiple topics are forwarded to multiple event hubs

Following is an example of a m:n topic mapping:

[
{
"mqttTopics": ["topic/test1", "topic/test2"],
"target": "Producer1"
},
{
"mqttTopics": ["topic/test1", "topic/test2"],
"target": "Producer2"
}
]

If you want to format your resulting payloads in a certain way or include some MQTT message metadata into them, you can specify an addional key called schemaMapping which will point to a name of the respective schema mapping under schemaMappings.

[
{
"mqttTopics": ["topic/test1", "topic/test2"],
"target": "Producer1",
"schemaMapping": "Mapping1"
},
{
"mqttTopics": ["topic/+/prod/#"],
"target": "Producer2",
"schemaMapping": "Mapping1"
},
{
"mqttTopics": ["topic/test1", "topic/test2"],
"target": "Producer2",
"schemaMapping": "Mapping2"
}
]

Alternatively, you can specify the entire schema mapping in-place:

[
{
"mqttTopics": ["topic/test1", "topic/test2"],
"target": "Producer1",
"schemaMapping": {
"mapping": [
{ "source": "[topic]", "target": "[metadata][topic]" },
{ "source": "[payload]", "target": "[payload]" }
]
}
}
]

Schema Mappings

The bridge sends it's paylaods to Azure Eventhubs as JSON. If you want to restructure your JSON payloads in a specific way and include additional MQTT message metadata you may use schema mappings.

Schema mappings is an array of schema mapping objects under the optional schemaMappings key in the configuration object.

Each schema mapping defined under schemaMappings key of the configuration object must include a unique name unlike the in-place schema mappings defined directly as part of topicMappings (see above).

Schema mappings must contain a mappings key which defines an array of so-called extractors (sometimes referred to as selectors). Extractor is a mapping between a source and a target with additional transformations and type casting if needed. Sources are the fields of the incoming MQTT message while targets are fields in the outgoing JSON object.

An example of a schema mappings is presented below:

[
{
"name": "M1",
"mapping": [
{ "source": "[datetime]", "target": "[metadata][dt]" },
{ "source": "[client_id]", "target": "[metadata][client][id]" },
{ "source": "[retain]", "target": "[metadata][messge][ret_flag]" },
{ "source": "[qos]", "target": "[metadata][message][qos]" },
{ "source": "[topic]", "target": "[metadata][message][topic]" },
{ "source": "[payload]", "target": "[payload]" }
]
},
{
"name": "M2",
"mapping": [
{ "source": "[topic]", "target": "[metadata][topic]" },
{ "source": "[payload][temeperature]", "target": "[reading]" }
]
}
]

This examples demonstrates 2 schema mappings: M1 and M2.

M1 uses extractors to pick out metadata fields from the incoming MQTT message, namely, datetime, client_id, retain, qos, topic and adds them to the outgoing JSON to be sent to Azure. Then it also includes the message payload under the payload key of the outgoing JSON.

The resulting JSON after the M1 mapping would look like:

{
"metadata": {
"dt": "<Date in the ISO 8601 format, e.g.: 1970-01-01T00:00:00.000Z>",
"message": {
"ret_flag": "<retain>",
"qos": "<qos>",
"topic": "<topic>"
},
"client": {
"id": "<client_id>"
}
},
"payload": <mqtt message payload>
}

M2 mapping provides the topic and the temperature field of the MQTT payload into the resulting JSON. The resulting JSON sent to Azure would be as follows:

{
"topic": "<topic>",
"reading": <temperature>
}

Notice that [] are used to "unpack" json object.

Note that currently the only supported characters for the JSON payload fields (i.e. values inside [] when unpacking json objects) are [a-zA-Z0-9_-] (alphanumeric characters as well as _ (underscore) and - (dash))

It is also possible to insert a constant value into the resulting JSON:

[
{
"name": "M2",
"mapping": [
{ "source": "[label]", "target": "test data" },
{ "source": "[payload]", "target": "[payload]" }
]
}
]

In this case "test data" string will be inserted into the label field of the resulting JSON. However, if your constant value contains square brackets, then you have to use an additional options field set to an object with the isConst field set to true, e.g.:

[
{
"name": "M2",
"mapping": [
{ "source": "[label]", "target": "[test_data]", "options": { "isConst": true } },
{ "source": "[payload]", "target": "[payload]" }
]
}
]

Currently the following metadata properties of the incoming MQTT message are supported:

  • hostname: Hostname of the broker. Note that the HOSTNAME environment variable must be specified for the broker, or the default value will be used (type: string, defaults to: <Unknown>).
  • topic: Name of the topic to which the MQTT message was published (type: string).
  • payload: Payload of the MQTT message. Can be used to further unpack (pick out) child properties of the payload object if the latter is a JSON. See below for more information. If the payload was originally a string or a number, it will be treated as such, and no typecasting is necessary (type: any).
  • qos: QoS level of the MQTT message (type: integer, one of: 0, 1, 2).
  • retain - Retain flag of the MQTT message (type: bool).
  • timestamp - Unix timestamp representing the time when the MQTT message arrived at the broker (type: integer).
  • datetime - Datetime string representing the time when the MQTT message arrived at the broker (type: string representing the date in ISO format).
  • uuid - unique identifier (UUID) of the message. Note that it is a convenience property. This ID is internally generated by the bridge itself and is not coming from the original message's metadata (type: string).
  • client_id - client_id of the device that published the MQTT message (type: string).

Extractor options

Each extractor under the mapping section of the configuration file can have additional options that define transformations to be performed on the source data before it is inserted into target. For example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "[my_property]",
"options": {
"replaceNullWith": "<null>"
}
}
],
...
}

The above specifies an extractor that maps property1 of the MQTT message to the my_property in the resulting JSON object. Once the null value is encountered in the property1 field of the MQTT message's payload, it will be converted to "<null>" string and put into the outgoing JSON.

Here is the list of the currently supported extractor options:

  • isConst(path: $.schemaMappings[].mapping[].options.isConst):
    Used to indicate that the field in source is a constant that should be inserted into Azure eventhubs as is without trying to unpack it. This is relevant in case the source field is a string that follows the unpacking syntax but should be treated as a constant. Otherwise (i.e. for constants that don't follow the unpacking syntax), this field is not required and therefore can be omitted or only added for clarity and additional context (type: bool).
  • replace (path: $.schemaMappings[].mapping[].options.replace):
    Specifies an array of two strings. Replaces all occurrences of the first string with the second one before inserting the data into the database (type: array consisting of two elements of type string).
  • replaceNullWith (path: $.schemaMappings[].mapping[].options.replaceNullWith):
    Specifies a value that null values on the source will be replaced with (type: any).
  • replaceUndefinedWith (path: $.schemaMappings[].mapping[].options.replaceUndefinedWith):
    In case that a property specified in source is missing on the MQTT message, it will be replaced with the value specified in this option (type: any).
  • format (path: $.schemaMappings[].mapping[].options.format):
    Used when casting into datetime datatype (i.e. strings representing date and time). Specifies a datetime format string that values from source are formatted as. See format string documentation for more information on available format strings. Defaults to yyyy-MM-ddTHH:mm:ss.sssZ (with an optional timezone information). However, by default if an integer value representing a unix timestamp in milliseconds is encountered in source, it will also be accepted and appropriately converted to the datetime datatype (type: string).
  • dbFormat (path: $.schemaMappings[].mapping[].options.dbFormat):
    Used when casting into datetime datatype. Specifies a format string that datetime values should be converted to before they are inserted into the table (type: string).
  • replaceDelimiter (path: $.schemaMappings[].mapping[].options.replaceDelimiter):
    Used when casting into datetime datatype. The format of the datetime values inserted into the database will typically be an ISO format as in the following example: 2017-01-29T14:07:00. With ths option you may change the T to a space of whatever suits your schema (type: string).
  • splitBy (path: $.schemaMappings[].mapping[].options.splitBy):
    Used to split a string from source into an array by a certain delimiter. Can be combined with joinBy, fetchByIndex, or fetchByRange or used on its own (type: string)
  • joinBy(path: $.schemaMappings[].mapping[].options.joinBy):
    Used to merge an array from source into a string inserting the specified delimiter between each pair of array's elements (type: string)
  • fetchByIndex (path: $.schemaMappings[].mapping[].options.fetchByIndex):
    Used to fetch an element of an array from source, which is specified by a (zero-based) index (type: integer)
  • fetchByRange (path: $.schemaMappings[].mapping[].options.fetchByRange):
    Used to fetch a range of elements from the array taken from source. Can be combined with splitBy or joinBy. When used with joinBy, the specified array slice will be joined using the character from joinBy. When used with splitBy a string from source is first split into an array and then the specified range is extracted from it. Then, if joinBy is not set, the delimiter from splitBy will be used to join the extracted array back into a string (type: array consisting of two elements of type: integer)

Note that some options can only be used with certain (or when typecasting to certain) datatypes, while others can be used with any type.

Options which can only be used when typecasting to datetime:

  • format
  • dbFormat
  • replaceDelimiter

Example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "[my_property]",
"type": "datetime",
"options": {
"nullValue": "<null>",
"format": "yyyy-MM-dd HH:mm:ss",
"dbFormat": "dd-MMM-yy hh:mm:ss",
"replaceDelimiter": " "
}
}
],
...
}

Options which can be used as long as the underlying value is a string:

  • replace
  • truncate
  • splitBy

Example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "[my_property]",
"options": {
"replace": ["hello world", "hello Azure"]
}
}
],
...
}

Options which can only be used with arrays of after the source was split to an array using the splitBy option:

  • joinBy
  • fetchByIndex
  • fetchByRange

Example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "[my_property]",
"options": {
"splitBy": "/", "fetchByRange": [0, 3]
}
}
],
...
}

Options which can be used regardless of types and typecasting:

  • replaceNullWith
  • replaceUndefinedWith
  • isConst
  • nullValue
  • defaultValue

Data type casting

Azure Eventhubs Bridge provides a feature to typecast properties of the MQTT messages before they are inserted into the outgoing JSON.

In order to cast a certain property to a certain type, the following configuration must be specified:

{
...
"name": "<your mapping name here>",
"mapping": [
{
"source": "<your source mqtt message property>",
"target": "<your target property in the outgoing JSON object>",
"type": "<specify type to cast 'source' into>"
}
],
...
}

For example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][temperature_str]",
"target": "[temperature]",
"type": "float"
}
],
...
}

The following typecasting is provided and makes sense:

  • datetime: casts integers or strings into a datetime (format, dbFormat and replaceDelimiter might need to be specified (see the Extractor Options section). There is no need to typecast datetime strings if they follow ISO 8601 format (note that the timezone portion is not required here since many databases don't handle timezones).
  • number: casts strings into either a float or integer (supports float numbers separated by both dots or commas (e.g. both 1.5 and 1,5 are handled)).
  • integer: casts string into an integer truncating the fractional part.
  • boolean: casts strings or numbers into a boolean value (note that string "false" is cast to the false boolean value, as is number 0, null, empty string, or a missing property. Everything else is cast to true).
  • string: can cast numbers into strings. There is typically no need to typecast JSON objects to strings as this is done automatically.

Note that while it might be tempting to explicitly specify typecasting for all the sources, this is not recommended as typecasting is often associated with additional overhead. The rule of thumb would be to cast only what is necessary.

Configuration example

A simple configuration that includes all the above-described parts may look as follows:

{
"eventhubs": [
{
"name": "EvHub1",
"auth": {
"sas": {
"key": "A_SAS_Key_Token"
}
}
}
],
"producers": [
{
"name": "P1",
"hub": "hub1",
"eventhub": "EvHub1",
"options": {
"messageId": "{NOW_ISO}"
}
}
],
"topicMappings": [
{
"source": "test/topic",
"target": "P1",
"schemaMapping": "M1"
}
],
"schemaMappings": [
{
"name": "M1",
"mapping": [
{ "source": "[datetime]", "target": "[metadata][dt]" },
{ "source": "[topic]", "target": "[metadata][topic]" },
{ "source": "[payload]", "target": "[payload]" }
]
}
]
}

Control topic

The plugin can be controlled by sending command messages to the $CONTROL/cedalo/azure-eventhubs-bridge/v1 topic. Responses are published to the $CONTROL/cedalo/azure-eventhubs-bridge/v1/response topic.

Following commands are currently available:

  • getStatus to fetch information about current bridge status
  • getSchema to get the schema used to validate the config JSON along with some examples
  • getConfig to fetch current config as a JSON object
  • updateConfig to update the complete configuration of the Azure EventHubs Bridge
  • testProducer to check producer settings if a connection can be established to specified hub

Each control message has the following format:

{
"commands": [
{
"command": "<A known control api command>",
"correlationData": "<Optional UUID to identify sender. If specified it is passed with response.>",
"configChange": "<Only for updateConfig command: provide a valid config JSON object>"
}
]
}

For example to fetch the current config send a getConfig command:

{
"commands": [
{
"command": "getConfig",
"correlationData": "1234"
}
]
}

And to update it use the updateConfig command:

{
"commands": [
{
"command": "updateConfig",
"correlationData": "12345",
"configChange": {
"eventhubs": [
{
"name": "EvHub1",
"auth": {
"sas": {
"key": "A_SAS_Key_Token"
}
}
}
],
"producers": [
{
"name": "P1",
"hub": "hub1",
"eventhub": "EvHub1",
"options": {
"messageId": "{NOW_ISO}"
}
}
],
"topicMappings": [
{
"source": "test/topic",
"target": "P1"
}
]
}
}
]
}

For more information on the control requests for the bridge consult the MQTT API spec here.

JSON Schema

Schema for the Microsoft Azure® Bridge, which describes all configuration options for the bridge.

{
"title": "Azure Bridge Configuration",
"description": "",
"type": "object",
"properties": {
"eventhubs": {
"description": "The Azure EventHubs Bridge can forward messages to multiple EventHubs resources.",
"type": "array",
"items": {
"description": "An Azure EventHubs resource.",
"type": "object",
"properties": {
"name": {
"description": "Unique namespace identifier of an EventHubs",
"type": "string",
"minLength": 1
},
"auth": {
"description": "EventHubs specific authentication. Either specify a Shared Access Signature (SAS) key or use Role Based Access Control (RBAC)",
"type": "object",
"properties": {
"sas": {
"type": "object",
"properties": {
"key": {
"type": "string",
"minLength": 1
}
},
"required": [
"key"
]
},
"rbac": {
"type": "object",
"properties": {
"tenantId": {
"type": "string",
"minLength": 1
},
"clientId": {
"type": "string",
"minLength": 1
},
"clientSecret": {
"type": "string",
"minLength": 1
},
"fullyQualifiedNamespace": {
"description": "Full namespace qualifier. If not specified \"<namespace>.servicebus.windows.net\" will be used",
"type": "string",
"minLength": 1
}
},
"required": []
}
},
"oneOf": [
{
"required": [
"sas"
]
},
{
"required": [
"rbac"
]
}
]
}
},
"required": [
"name",
"auth"
]
}
},
"producers": {
"description": "Per producer settings for each event hub within an EventHubs resource",
"type": "array",
"items": {
"description": "Producer settings for each event hub within an EventHubs resource",
"type": "object",
"properties": {
"name": {
"description": "Unique name for this producer to use and identify it in mappings",
"type": "string",
"minLength": 1
},
"type": {
"description": "Use a batch producer for direct message passing or a buffered producer to collect several messages before sending to eventhub",
"type": "string",
"default": "batch",
"enum": [
"batch",
"buffered"
]
},
"hub": {
"description": "Name of the hub to send messages to",
"type": "string",
"minLength": 1
},
"eventhub": {
"description": "Namespace of the EventHubs to use",
"type": "string",
"minLength": 1
},
"auth": {
"description": "Producer specific authentication. Specify only if differ from EventHubs auth",
"type": "object",
"properties": {
"sas": {
"type": "object",
"properties": {
"key": {
"type": "string",
"minLength": 1
}
},
"required": [
"key"
]
},
"rbac": {
"type": "object",
"properties": {
"tenantId": {
"type": "string",
"minLength": 1
},
"clientId": {
"type": "string",
"minLength": 1
},
"clientSecret": {
"type": "string",
"minLength": 1
},
"fullyQualifiedNamespace": {
"description": "Full namespace qualifier. If not specified \"<namespace>.servicebus.windows.net\" will be used",
"type": "string",
"minLength": 1
}
},
"required": []
}
},
"oneOf": [
{
"required": [
"sas"
]
},
{
"required": [
"rbac"
]
}
]
},
"options": {
"description": "General producer options",
"type": "object",
"properties": {
"messageId": {
"description": "Custom, may be useful for tracking event source",
"type": "string"
},
"correlationId": {
"description": "Custom, may be useful for debbuing or tracking events",
"type": "string"
},
"partitionId": {
"description": "Partition id like \"0\", \"1\",... This enables ordered event process on same partition. Either specify this or partitionKey, but not both!",
"type": "string"
},
"partitionKey": {
"description": "A fix or dynamic value. This enables ordered event process on same partition. Either specify this or partitionId, but not both!",
"type": "object",
"properties": {
"field": {
"description": "JSON-path wich specifies a field within message payload. To use a fix value specify default only.",
"type": "string"
},
"default": {
"description": "Either used as default if field could not be resolved or as fix value if no field is specified"
}
}
}
},
"anyOf": [
{
"required": [
"partitionId"
],
"not": {
"required": [
"partitionKey"
]
}
},
{
"required": [
"partitionKey"
],
"not": {
"required": [
"partitionId"
]
}
},
{
"not": {
"required": [
"partitionId",
"partitionKey"
]
}
}
]
},
"retryOptions": {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": [
"fix",
"exponential"
]
},
"delayInMs": {
"description": "Period in milliseconds to wait until next retry",
"type": "number"
},
"maxDelayInMs": {
"description": "Max. delay in milliseconds to wait before next retry. Only useful if mode is exponential",
"type": "number"
},
"maxRetries": {
"description": "Max. number of retries",
"type": "number"
},
"timeoutInMs": {
"description": "Milliseconds to wait before declaring current request as timed out",
"type": "number"
}
}
},
"batch": {
"description": "Options to apply for batch producer only",
"type": "object",
"properties": {
"maxSizeInBytes": {
"description": "Max. batch size",
"type": "number"
},
"maxOpenBatches": {
"description": "Max. number of batches sending or retrying at the same time. Default is 10",
"type": "number"
}
}
},
"buffer": {
"description": "Options for buffered producer only",
"type": "object",
"properties": {
"maxWaitTimeInMs": {
"description": "Milliseconds to wait for next message. If none is received during this period the buffered producer sends all of its messages.",
"type": "number"
},
"maxEventBufferLengthPerPartitions": {
"description": "Max. buffer length. If this limit is reached the buffer sends all of its messages.",
"type": "number"
}
}
}
},
"required": [
"name",
"hub",
"eventhub"
]
}
},
"topicMappings": {
"description": "Descriptions for how to map MQTT topics to azure event hubs",
"type": "array",
"items": {
"type": "object",
"properties": {
"mqttTopics": {
"description": "List of MQTT topics to map",
"type": "array",
"items": {
"description": "MQTT topic to map",
"type": "string",
"minLength": 1
}
},
"target": {
"description": "Name of defined producer to send message to",
"type": "string",
"minLength": 1
}
},
"required": [
"mqttTopics",
"target"
]
}
}
},
"required": [
"eventhubs",
"producers"
]
}