Skip to main content
Version: Next

InfluxDB Bridge

Introduction

The InfluxDB Bridge plugin can be used to forward data published to the Mosquitto broker to InfluxDB time-series databases. The plugin can handle multiple database connections. For each connection, a mapping between certain MQTT topics and InfluxDB measurements can be defined (see example configuration).

The list of currently supported InfluxDB versions:

  • InfluxDB v1.x (using the influx/v1 driver)
  • InfluxDB v2.x (using the influx/v2 driver)
  • InfluxDB v3.x (using the influx/v2 driver (same as InfluxDB v2.x))

InfluxDB Bridge supports both basic (username, password) authentication for v1 and token-based authentication for v2.

The plugin manages incoming MQTT messages in a buffer to perform batching and improve performance when writing to InfluxDB. The size properties of the buffer can be controlled via the configuration in the options section, with bufferSize and timeoutMs (refer to Configuration File Format).

Plugin activation

To enable the InfluxDB Bridge plugin in the broker, add the following to the mosquitto.conf file:

plugin /usr/lib/cedalo_influxdb_bridge.so

persistence_location /mosquitto/data

This is an example configuration snippet, which applies to the docker container setup. For installation not running in a container, the above configuration needs to be adjusted accordingly (namely the location of cedalo_influxdb_bridge.so dynamic library and persistence_location may differ).

persistence_location is used as the search path for the plugin's config file in the context of a single node broker. The file must be called Influxdb-Bridge.json. This file must be created to allow the plugin to start, but it may be left empty. The ownership of this file should be given to the mosquitto user. In the context of an HA cluster, configuration is fully managed by the Cedalo Platform or manually via a respective control topic (see the section below).

In addition to modifying mosquitto.conf, ensure that you have the appropriate license to use the plugin.

Configuration

The configuration of the bridge is achieved by providing a configuration file following the JSON schema below. You can modify the configuration file using the MQTT API for the Mongo Bridge. This is described in detail here. Another option is to use the user interface provided with the Cedalo Mosquitto Platform, which provides a webpage to modify and update the configuration. This option is described below in the Platform UI section

Configuration File Format

The configuration is managed by the control topic with the latest update being stored to disk with a filename of Influxdb-Bridge.json under the persistence location (as defined in mosquitto.conf).

Note that in the single node setting, the plugin will not be able to start unless the Influxdb-Bridge.json file exists under the persistence location. When first starting the plugin this file should be created but may be left empty. The owner of this file should be the mosquitto user.

The configuration of the plugin is a JSON object consisting of an array of connection objects as well as an array of so-called "schema mappings". Each connection object defines its own InfluxDB connection and MQTT topics to listen to. It also references a mapping of the MQTT message properties to InfluxDB fields and tags (or an entire line protocol record if the MQTT message is a line protocol string). Such mappings are called schema mappings and are described in a different array within the same file under schemaMappings field. This is to allow multiple InfluxDB connections to reuse the same mappings.

The fields of the configuration object are described below. To see an example of the config file refer to the configuration example section. Refer to the configuration schema

The following fields of the config are mandatory:

  • connections (path: $.connections):
    An array of InfluxDB connection objects. Apart from database connection settings, each connection object also contains MQTT topic mapping information (type: array).

    • name (path: $.connections[].name):
      Unique identifier of the InfluxDB Bridge connection object. A convenience name for the database connection used in the plugin's logs (type: string).
    • connection (path: $.connections[].connection):
      Object holding the actual database connection information (type: object).
      • driver (path: $.connections[].connection.driver):
        Name of the driver to be used with the database connection. (type: string, one of: influxdbv1, influxdbv2).
      • hostname (path: $.connections[].connection.hostname):
        Hostname or IP address of the InfluxDB server (type: string).
      • port (path: $.connections[].connection.port):
        Port that the InfluxDB server listens for client connections on (type: integer).
      • database (path: $.connections[].connection.database):
        Name of the database for InfluxDB v1 or bucket for InfluxDB v2 to connect to (type: string).
      • credentials (path: $.connections[].connection.credentials):
        Object holding authentication information. (type: object)
        • For InfluxDB v1:
          • username (path: $.connections[].connection.credentials.username):
            Username used for authenticating against the database (type: string)
          • password (path: $.connections[].connection.credentials.password):
            Password of the user used for authenticating against the database. (type: string)
        • For InfluxDB v2:
          • token (path: $.connections[].connection.credentials.token):
            Authentication token for InfluxDB v2 connection (type: string)
      • organization (path: $.connections[].connection.organization):
        Organization that a bucket belongs to in InfluxDB v2. This field is only required by InfluxDB v2 itself. But it's not a required field in the JSON schema of the plugin (type: string)
    • topicMappings (path: $.connections[].topicMappings):
      List of mappings specifying a mapping between MQTT topics and InfluxDB measurements to forward data to (type: array of type object).
      • name (path: $.connections[].topicMappings.name):
        Unique identifier of the current topic mapping ( type: string).
      • target (path: $.connections[].topicMappings.target):
        Name of the InfluxDB measurement into which MQTT data needs to be inserted. If your MQTT messages are line protocol strings already, this field should be set to an empty string (type: string).
      • mqttTopics (path: $.connections[].topicMappings.mqttTopics):
        An array of MQTT topics that should be forwarded to InfluxDB. MQTT wildcards (+ and #) can also be used (type: array of type string).
      • schemaMapping (path: $.connections[].topicMappings.schemaMapping):
        Name of a particular schema mapping from $.schemaMappings array to be used as a mapping between target measurement and mqttTopics.
  • schemaMappings (path: $.schemaMappings):
    An array of schema mappings. Each element of the array specifies a mapping between MQTT message properties and InfluxDB fields and tags (type: array of type object).

Optional fields:

  • connections (path: $.connections)

    • connection (path: $.connections[].connection)
      • compression (path: $.connections[].connection.compression):
        Properties related to data compression for InfluxDB v2. Not applicable for v1 (type: object)
        • enable (path: $.connections[].connection.compression.enable):
          Whether to compress data before sending to InfluxDB. Less bandwidth required but inflicts more strain on the server (type: bool, defaults to: false)
      • ssl (path: $.connections[].connection.ssl):
        SSL options. SSL connection with the database will not be established if this option is not defined (type: object)
        • rejectUnauthorized (path: $.connections[].connection.ssl.rejectUnauthorized):
          Whether to refuse unauthorized certificates from the InfluxDB server. (type: bool, defaults to: false)
        • caPath (path: $.connections[].connection.ssl.caPath):
          Path to CA that signed InfluxDB server certificate (type: string)
        • ca (path: $.connections[].connection.ssl.ca):
          String with CA that signed InfluxDB server certificate. Use either this property or caPath (type: string)
      • debug (path: $.connections[].connection.debug):
        Whether to log debug information about queries performed by the plugin. This option is currently not in use by the plugin. It's reserved for the future versions (type: bool, defaults to: false).
      • lazyConnect (path: $.connections[].connection.lazyConnect):
        Whether to allow the plugin to continue if the connection to the database cannot be established (type: bool, defaults to: false).
    • options (path: $.connections[].options):
      Object holding additional plugin settings (type: object).
      • bufferSize (path: $.connections[].options.bufferSize):
        Defines the size of the buffer, which is used to batch multiple MQTT messages together before inserting them into InfluxDB. For more information, see the section on buffering (type: integer, defaults to: 1000).
      • timeoutMs (path: $.connections[].options.timeoutMs):
        Interval in milliseconds after which buffer is forcefully released if it is not empty. In other words, it specifies a maximum latency of the buffer release ( type: integer, defaults to: 5000).
      • queueMaxSize (path: $.connections[].options.queueMaxSize):
        Defines the max size of a retry queue used in case a database insert fails due to connection problems. If set to -1, the size of the queue is not limited. For more information see the section on retry queue (type: integer, defaults to: -1).
      • maxRetries (path: $.connections[].options.maxRetries):
        Defines a number of retries to be performed by a retry queue before dropping the message. For more information, see the section on retry queue (type: integer, defaults to: 10).
      • retryDelayMs (path: $.connections[].options.retryDelayMs):
        Defines a delay in milliseconds between the retry attempts of the queue. For more information, see the section on retry queue ( type: integer, defaults to: 1000).
  • schemaMappings (path: $.schemaMappings):
    An array of schema mappings. Each element of the array specifies a mapping between MQTT message properties and InfluxDB fields and tags (type: array of type object).

    • name (path: $.schemaMappings[].name):
      Unique identifier of the current schema mapping. This name is matched against $.connections[].topicMappings.schemaMapping to determine which schema mapping to use for particular measurement (type: string).
    • mapping (path: $.schemaMappings[].mapping):
      An array holding source-to-target mapper object with information about which properties or metadata to pick out from your MQTT messages as well as which InfluxDB fields or tags those properties should be inserted into (type: array of type object).
      • source (path: $.schemaMappings[].mapping[].source):
        Source of the data to be inserted. This can be a property of the MQTT message payload (in case the payload is a JSON object) or some metadata property (e.g. client_id, timestamp, etc). This field follows a particular unpacking syntax. For more information, see below (type: string).
      • target (path: $.schemaMappings[].mapping[].target):
        InfluxDB target field or tag name into which data from the source field will be inserted. For the MQTT messages which are already line protocol strings this field should be an empty string (type: string).
      • targetType (path: $.schemaMappings[].mapping[].targetType):
        Whether the target is a tag, field, or a line protocol string (type: string, one of: tag, field, lineProtocol).
      • type (path: $.schemaMappings[].mapping[].type):
        Optional type casting of the source field before it gets inserted into InfluxDB. See more information about typecasting here ( type: string).
      • options (path: $.schemaMappings[].mapping[].options):
        Object holding information about additional transformations to be performed on source before it is inserted into InfluxDB.
        • isConst(path: $.schemaMappings[].mapping[].options.isConst):
          Used to indicate that the field in source is a constant that should be inserted into InfluxDB as is without trying to unpack it. This is relevant in case the source field is a string that follows the unpacking syntax but should be treated as a constant. Otherwise (i.e. for constants that don't follow the unpacking syntax), this field is not required and therefore can be omitted or only added for clarity and additional context (type: bool).
        • replace (path: $.schemaMappings[].mapping[].options.replace):
          Specifies an array of two strings. Replaces all occurrences of the first string with the second one before inserting the data into the database (type: array consisting of two elements of type string).
        • replaceNullWith (path: $.schemaMappings[].mapping[].options.replaceNullWith):
          Specifies a value that null values on the source will be replaced with (type: any).
        • replaceUndefinedWith (path: $.schemaMappings[].mapping[].options.replaceUndefinedWith):
          In case that a property specified in source is missing on the MQTT message, it will be replaced with the value specified in this option (type: any).
        • format (path: $.schemaMappings[].mapping[].options.format):
          Used when casting into datetime datatype (i.e. strings representing date and time). Specifies a datetime format string that values from source are formatted as. See format string documentation for more information on available format strings. Defaults to yyyy-MM-ddTHH:mm:ss.sssZ (with an optional timezone information). However, by default if an integer value representing a unix timestamp in milliseconds is encountered in source, it will also be accepted and appropriately converted to the datetime datatype (type: string).
        • dbFormat (path: $.schemaMappings[].mapping[].options.dbFormat):
          Used when casting into datetime datatype. Specifies a format string that datetime values should be converted to before they are inserted into the table (type: string).
        • replaceDelimiter (path: $.schemaMappings[].mapping[].options.replaceDelimiter):
          Used when casting into datetime datatype. The format of the datetime values inserted into the database will typically be an ISO format as in the following example: 2017-01-29T14:07:00. InfluxDB can understand and parse this format but if you would like to change the delimiter to something else like a space, you can still do it with this option. For more complex changes use the dbFormat field (type: string).

Mapping MQTT packets to InfluxDB

The plugin configuration specifies the mapping between MQTT packets and InfluxDB fields/tags in the schemaMappings field. Elements of the schemaMappings array must have unique names that will be matched against the schemaMapping field of the respective connection object.

mapping specifies an array of extractor objects (you may also see them being referred to as selectors in other documentation on bridges) between sources and targets. The source field represents the source of the data. It comes from the MQTT message or its metadata. The target field is the name of the field or tag that source will be inserted into in InfluxDB. targetType specifies whether the target is a field or tag in InfluxDB, or a line protocol string.

To be more precise, the source field can be a constant, a path to a particular metadata property of an MQTT message, or a data property within its payload. The latter is relevant for JSON MQTT payloads: in case the MQTT payload is a JSON, the path to some data property of such payload can be specified as a source.

Below are some examples with clarifications to put the above-mentioned into context:

It is possible to unpack MQTT payload in case it is a JSON and extract (pick out) certain properties from it. This can be achieved by specifying a JSON path to such property in the source field. However, the properties in this path should be separated not by dots, but rather by square brackets (e.g.: not payload.property1.innerProperty but [payload][property1][innerProperty]).

Note that currently the only supported characters for the JSON payload properties are [a-zA-Z0-9_-] (alphanumeric characters as well as _ (underscore) and - (dash))

Example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][temperature]",
"target": "temp",
"targetType": "field"
}
],
...
}

In the above example, the temperature property from the payload is selected as a field named temp in InfluxDB.

This means that if the MQTT payload looks like below:

{
"temperature": 21.5,
"humidity": 45,
"status": "normal"
}

then the value 21.5 will be inserted as a field called temp in the InfluxDB measurement.

Another example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][sensor][id]",
"target": "sensorId",
"targetType": "tag"
}
],
...
}

The extractor object above expects an MQTT message payload to be in JSON format. It extracts a property id from the sensor object in this payload and inserts (maps) it into sensorId tag in InfluxDB.

Apart from extracting data from payloads it's also possible to insert constant values, which may be useful for InfluxDB tags. Here is another example which maps a constant value instead of the one taken from an MQTT message:

{
...
"name": "mapping1",
"mapping": [
{
"source": "sensor-reading",
"target": "sensorType",
"targetType": "tag",
"options": {
"isConst": true
}
}
],
...
}

This extractor maps a constant string ("sensor-reading") to the tag sensorType in InfluxDB. This means that for every MQTT message for which this mapping is invoked, the tag sensorType with value "sensor-reading" will be added to the InfluxDB measurement.

To insert the entire MQTT payload (useful when the payload is of a primitive type like a string or a number), the extractor would look similar to the following:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload]",
"target": "status",
"targetType": "field"
}
],
...
}

Note that apart from [payload], various special metadata properties of the MQTT message may be selected. Here is the entire list of special (meta)data properties:

  • hostname: Hostname of the broker. Note that the HOSTNAME environment variable must be specified for the broker, or the default value will be used (type: string, defaults to: <Unknown>).
  • topic: Name of the topic to which the MQTT message was published (type: string).
  • payload: Payload of the MQTT message. Can be used to further unpack (pick out) child properties of the payload object if the latter is a JSON. See below for more information. If the payload was originally a string or a number, it will be treated as such, and no typecasting is necessary (type: any).
  • qos: QoS level of the MQTT message (type: integer, one of: 0, 1, 2).
  • retain - Retain flag of the MQTT message (type: bool).
  • timestamp - Unix timestamp representing the time when the MQTT message arrived at the broker (type: integer).
  • datetime - Datetime string representing the time when the MQTT message arrived at the broker (type: string representing the date in ISO format).
  • uuid - unique identifier (UUID) of the message. Note that it is a convenience property. This ID is internally generated by the InfluxDB Bridge plugin and is not coming from the original message's metadata (type: string).
  • client_id - client_id of the device that published the MQTT message (type: string).

To select one of the above-mentioned properties, it should be enclosed in square brackets ([]) and specified in the source field of the configuration file.

For example, to use the client ID of the device that published the message as a tag in InfluxDB, the following selector can be defined:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[client_id]",
"target": "deviceId",
"targetType": "tag"
}
],
...
}

Data type casting

InfluxDB Bridge provides a feature to typecast properties of the MQTT messages before they are inserted into measurements.

In order to cast a certain property to a certain type, the following configuration must be specified:

{
...
"name": "<your mapping name here>",
"mapping": [
{
"source": "<your source mqtt message property>",
"target": "<your target influxdb field/tag name>",
"targetType": "<field or tag>",
"type": "<specify type to cast 'source' into>"
}
],
...
}

For example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][temperature_str]",
"target": "temperature",
"targetType": "field",
"type": "float"
}
],
...
}

Appropriate type casting depends on whether the target is a field or a tag in InfluxDB:

  • For tags: Tags are always stored as strings in InfluxDB.

  • For fields: InfluxDB supports the following field types:

    • float / double: for floating point values
    • integer: for integer values
    • string: for text values
    • boolean: for true/false values

Keeping the above in mind, in InfluxDB bridge the following typecasting is provided and makes sense:

  • For tags: Tags are automatically converted to strings from any type. Therefore the only typecasting that makes sense here is a datetime if you are using a date as a tag and want to change it's format. See below for more information on datetime casting.

  • For fields:

    • datetime: casts integers or strings into a datetime (format, dbFormat and replaceDelimiter might need to be specified. See config options section for details). There is no need to typecast datetime strings if they follow ISO format (note that the timezone portion is not required here since many databases don't handle timezones).
    • number: casts strings into either a float or integer (supports float numbers separated by both dots or commas ( e.g. both 1.5 and 1,5 are handled)).
    • integer: casts string into an integer truncating the fractional part.
    • boolean: casts strings or numbers into a boolean value (note that string "false" is cast to the falseboolean value, as is number 0, null, empty string, or a missing property. Everything else is cast to true).
    • string: can cast numbers into strings. There is typically no need to typecast JSON objects to strings as this is done automatically.

Note that while it might be tempting to explicitly specify typecasting for all the sources, this is not recommended as typecasting is often associated with additional overhead. The rule of thumb would be to cast only what is necessary.

Line Protocol

InfluxDB natively understands the Line Protocol format. If your MQTT payload already contains data in Line Protocol format, you can use the special lineProtocol target type to directly insert it:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload]",
"target": "",
"targetType": "lineProtocol"
}
],
...
}

Note that the target is empty. This is because line protocol strings already include all the required data (which measurements, tags, and fields to insert). Apart from setting target of the mapping to an empty string, you will also have to set a target field of the topicMappings to an empty string. This is again because the name of the measurement has to be included as part of the line protocol string itself.

With the configuration above, if your MQTT payload contains valid Line Protocol data like:

weather,location=us-midwest temperature=82.0,humidity=54 1465839830100400200

It will be directly parsed and inserted by InfluxDB.

Also note that the timestamp at the end is in nanoseconds. The timestamp may also be omitted, in which case the current time of message reception by the InfluxDB will be used instead.

Extractor options

Each extractor under the mapping section of the configuration file can have additional options that define transformations to be performed on the source data before it is inserted into target. For example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "column_a",
"options": {
"replaceNullWith": "<null>"
}
}
],
...
}

The above specifies an extractor that maps property1 of the MQTT message to the column_a in the InfluxDB measurement. Once the null value is encountered in the property1 property of the MQTT message's payload, it will be converted to "<null>" string before being inserted into the database.

Note that some options can only be used with certain (or when typecasting to certain) datatypes, while others can be used with any type.

Options which can only be used when typecasting to datetime:

  • format
  • dbFormat
  • replaceDelimiter

Example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "column_a",
"type": "datetime",
"options": {
"nullValue": "<null>",
"format": "yyyy-MM-dd HH:mm:ss",
"dbFormat": "dd-MMM-yy hh:mm:ss",
"replaceDelimiter": " "
}
}
],
...
}

Options which can be used as long as the underlying value is a string:

  • replace

Example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "column_a",
"options": {
"replace": [
"hello world",
"hello SQL"
]
}
}
],
...
}

Options which can be used regardless of types and typecasting:

  • replaceNullWith
  • replaceUndefinedWith
  • isConst

See configuration options for more information on the listed options (their descriptions, datatypes, allowed values).

Choosing InfluxDB version

To work with different InfluxDB versions you have to change the driver field under connection in a respective connection object of the configuration. The allowed values are influxdbv1 and influxdbv2. To work with InfluxDB v3 choose influxdbv2 driver as the REST API for v2 and v3 is the same.

There are also some additional properties specific to the InfluxDB version. For example, username and password properties under credentials are only relevant to InfluxDB v1, while token property is only used for v2.

Apart from that, there is a compression object under connection, which is applicable only to InfluxDB v2.

SSL connection options

As InfluxDB (both v1 and v2) supports custom server certificates, you can specify TLS/SSL options under ssl field to allow Mosquitto to connect to such an InfluxDB instance.

As InfluxDB does not support mTLS you can only specify a CA certificate that signed the custom certificate of your InfluxDB under SSL options. To that end, there are two fields: ca or caPath. The first one specifies a certificate as a PEM-formatted string, while the other field specifies a path to a certificate file on the filesystem.

Another field under SSL options is rejectUnauthorized which is a boolean specifying whether to reject unknown certificates. This option must not be specified in production as it opens your connection up to Man-in-the-Middle (MITM) attacks. The default for this setting is false.

If instead of custom certificates your InfluxDB instance is using a certificate signed by a well-known certificate authority (one of the CAs whose certificates are shipped together with major browsers like Firefox), you may set the ssl field to an empty object. This is the same as specifying an ssl object with a single property rejectUnauthorized set to true. This way a TLS connection will still be initiated by the broker.

Compression

InfluxDB v2 supports data compression of incoming records with gzip as noted in the InfluxDB documentation Compression in InfluxDB is one of the techniques to optimize writes to the database However, be aware that while compression reduces network bandwidth, it increases the load on the server as it has to perform compression every time before sending the data over. This means that careful benchmarking needs to be performed before deciding if compression helps your use case.

To enable compression set an enable field under compression object in connection to true. The default value is false. This feature only works for InfluxDB v2 and higher.

Configuration example

An example of the configuration with two database connections is shown below:

{
"version": "1",
"connections": [
{
"name": "InfluxDB v1 Connection",
"connection": {
"driver": "influxdbv1",
"hostname": "influxdb-v1.example.com",
"port": 8086,
"database": "mqtt_data",
"credentials": {
"username": "admin",
"password": "password123"
}
},
"options": {
"bufferSize": 5000,
"timeoutMs": 3000,
"queueMaxSize": 10000,
"maxRetries": 15,
"retryDelayMs": 1000
},
"topicMappings": [
{
"name": "topic-mapping-to-weather-measurement",
"schemaMapping": "weather-schema-mapping",
"target": "weather_data",
"mqttTopics": [
"sensors/weather/#"
]
}
]
},
{
"name": "InfluxDB v2 Connection",
"connection": {
"driver": "influxdbv2",
"hostname": "influxdb-v2.example.com",
"port": 8086,
"database": "mqtt_metrics",
"organization": "my-org",
"credentials": {
"token": "your-influxdb-v2-token"
},
"compression": {
"enable": true
}
},
"options": {
"bufferSize": 1000,
"timeoutMs": 4000,
"queueMaxSize": 5000,
"maxRetries": 10,
"retryDelayMs": 2000
},
"topicMappings": [
{
"name": "topic-mapping-to-system-measurement",
"schemaMapping": "system-schema-mapping",
"target": "system_metrics",
"mqttTopics": [
"system/metrics/#"
]
}
]
}
],
"schemaMappings": [
{
"name": "weather-schema-mapping",
"mapping": [
{
"source": "[client_id]",
"target": "device_id",
"targetType": "tag"
},
{
"source": "[payload][temperature]",
"target": "temperature",
"targetType": "field"
},
{
"source": "[payload][humidity]",
"target": "humidity",
"targetType": "field"
},
{
"source": "[payload][location]",
"target": "location",
"targetType": "tag"
},
{
"source": "[timestamp]",
"target": "recorded_at",
"targetType": "field"
}
]
},
{
"name": "system-schema-mapping",
"mapping": [
{
"source": "[client_id]",
"target": "host",
"targetType": "tag"
},
{
"source": "[payload][cpu]",
"target": "cpu_usage",
"targetType": "field"
},
{
"source": "[payload][memory]",
"target": "memory_usage",
"targetType": "field"
},
{
"source": "[payload][disk]",
"target": "disk_usage",
"targetType": "field"
},
{
"source": "system",
"target": "data_type",
"targetType": "tag",
"options": {
"isConst": true
}
}
]
}
]
}

As per the example config above, InfluxDB bridge will connect to two databases: an InfluxDB v1 instance at influxdb-v1.example.com:8086 and an InfluxDB v2 instance at influxdb-v2.example.com:8086. It will use a database called mqtt_data for v1 and a bucket called mqtt_metrics in the organization my-org for v2.

The optional options field is specified and overrides some defaults of the plugin's buffering. For the v1 connection, it sets bufferSize to 5000 and timeoutMs to 3000 milliseconds, while for the v2 connection, it sets bufferSize to 1000 and timeoutMs to 4000 milliseconds. This illustrates that you can set different queue options per database connection, making it adjustable to different traffic loads.

Also see section about buffering for more information.

The retry queue parameters are in this case also configured differently depending on the database connection. See section about retry queue for more information.

Notice that InfluxDB v2 connection has compression enabled, which will compress data before sending it to the database, reducing bandwidth at the cost of increased server load. This option is only available for InfluxDB v2.

In the topicMappings section, we define which MQTT topics should be forwarded to which InfluxDB measurements. The v1 connection maps topics matching sensors/weather/# to the weather_data measurement, while the v2 connection maps topics matching system/metrics/# to the measurement called system_metrics.

The schemaMappings section defines how the MQTT message data should be mapped to InfluxDB fields and tags. For example, in the weather-schema-mapping, we extract:

  • The client ID as a tag called device_id
  • The temperature, humidity, and timestamp from the payload as fields
  • The location from the payload as a tag

In the system-schema-mapping, we extract:

  • The client ID as a tag called host
  • CPU, memory, and disk usage from the payload as fields
  • We also add a constant tag called data_type with the value system. This value will stay the same for all records

Error handling

Any configuration or license errors will prevent the plugin from being loaded, and respective messages will appear in stderr and stdout.

If recoverable errors occur during the operation of the plugin, it will generate respective error messages in its stderr and prefix them with an ERR: prefix. This includes: encountering errors when converting MQTT messages to InfluxDB format as well as failures during data insertion.

Note that while error messages generated by the plugin will be preceded by ERR:, warning messages will get a WARN: prefix, while debug messages - a respective DEBUG:.

Debugging

To print detailed information about operations being performed against the InfluxDB database to the stdout, define the DEBUG and DEBUG_INFLUXDB_BRIDGE environment variables on the broker.

In future versions the functionality of debug environment variables will move to the debug property in the configuration file under connection section.

Notes on data insertions and data format

Under the hood the plugin converts all the data to InfluxDB line protocol records, which are then inserted using InfluxDB REST API endpoint. While converting data to line protocol, InfluxDB bridge takes care of escaping spaces and special characters in measurement names and field names, so you don't have to worry about escaping when writing plugin's configuration.

Buffering

Buffers are created per measurement for every InfluxDB connection. The size of the buffer is controlled with bufferSize field under options ($.connections[].options) and indicates the capacity of the buffer. The default size of the buffer is 1000. Buffer holds records representing unpacked MQTT messages (or line protocol strings if this is the configured format of your MQTT messages) which are to be inserted to the respective measurement once the buffer is saturated.

Once the buffer fills up, its flushing is triggered, and a batch write is issued against the measurement, inserting the buffer contents. The condition for buffer flush is bufferSize + 1. Once this number of records is reached, the buffer is released.

Another parameter timeoutMs - indicates the time interval (in milliseconds) after which all the non-empty buffers will be forcefully released triggering the batch writes of the buffers' contents. The default value is 5000 ms.

However, even though it might seem that buffer is only released after saturation or a timeout, it also has a mechanism to account for infrequent MQTT messages and decrease the latency of database inserts for such records.

The idea is that the time interval between consecutive MQTT messages is kept track of. If this interval is long enough, the record will be inserted instantly. However, if the interval between the messages is short, the plugin will keep buffering the messages anticipating a possible influx. The definition of "long enough" might change when updates to the plugin are made, but currently, it's a fraction of the timeoutMs.

Note that this leads to the first MQTT message never being buffered.

Also, notice that buffer options can be defined per database connection, allowing for more flexibility and fine-tuning when needed.

Retry queue

The InfluxDB bridge plugin provides a retry mechanism for failed writes to ensure data is not lost in case of temporary connection issues. To prevent data loss, InfluxDB bridge has a retry queue that is responsible for retrying message writes in case of connection problems.

Each retry queue can be configured with the following settings (under options field of the configuration ($.connections[].options)):

queueMaxSize option is used to set the maximum amount of messages that the queue is able to accommodate before it starts to drop them. This limit can be set to infinity using the value -1, which is the default. Note that if this value is not set to -1, then careful considerations and testing should be performed to ensure that the queue will not overfill too quickly.

maxRetries is used to specify the maximum number of retry attempts the queue will make before dropping a message. The default is 10.

retryDelayMs is a minimum amount of time in milliseconds that must pass between the retry queue attempts. The default is 1000 ms (1 second).

Note that the elements in a retry queue can be both single MQTT messages as well as entire buffers of MQTT messages that are ready to be batch-written to InfluxDB.

An important point is that retries happen only in case of connection errors. In case of invalid data the error is logged out but no retries are performed. If a batch of messages is being inserted and only some of these messages are invalid, then the filtering of invalid records from valid ones will happen on the side of the InfluxDB itself using its native mechanisms to handle invalid data.

Platform UI

JSON schema

InfluxDB bridge plugin uses the following JSON schema for its configuration:

{
"title": "InfluxDB-Bridge Plugin Config",
"type": "object",
"properties": {
"version": {
"type": [
"string",
"integer"
],
"nullable": true,
"description": "Version of the configuration file"
},
"connections": {
"type": "array",
"description": "List of sub-configurations per InfluxDB connection",
"items": {
"type": "object",
"description": "Configuration per InfluxDB connection",
"properties": {
"name": {
"type": "string",
"nullable": false,
"description": "Unique textual identifier of the configuration"
},
"connection": {
"type": "object",
"description": "InfluxDB connection specific configuration",
"properties": {
"driver": {
"type": "string",
"enum": [
"influxdbv1",
"influxdbv2"
],
"nullable": false,
"description": "Name of the InfluxDB driver to use (for InfluxDB v1 or InfluxDB v2/v3)"
},
"hostname": {
"type": "string",
"nullable": false,
"description": "Hostname or IP of the InfluxDB server to connect to"
},
"port": {
"type": "integer",
"nullable": false,
"description": "Port of the InfluxDB server to connect to"
},
"database": {
"type": "string",
"nullable": false,
"description": "Name of the database (or bucket for influxdb v2) to connect to"
},
"organization": {
"type": "string",
"nullable": true,
"description": "Organization that a bucket belongs to in InfluxDB v2"
},
"compression": {
"type": "object",
"nullable": true,
"description": "Properties related to data compression happening before sending it over to InfluxDB",
"properties": {
"enable": {
"type": "boolean",
"nullable": true,
"default": false,
"description": "Whether to compress data before influxdb. Less bandwidth required but inflicts more strain on the server"
}
}
},
"credentials": {
"type": "object",
"nullable": false,
"description": "Credentials to authenticate to InfluxDB. Combine both v1 credentials (username and password) and v2 credentials (tokens). Only one of the two which corresponds to the InfluxDB version in use must be selected",
"properties": {
"token": {
"type": "string",
"nullable": true,
"description": "Authentication token for InfluxDB v2 connection"
},
"username": {
"type": "string",
"nullable": true,
"description": "Username for InfluxDB v1"
},
"password": {
"type": "string",
"nullable": true,
"description": "Password for InfluxDB v1"
}
}
},
"ssl": {
"type": "object",
"nullable": true,
"description": "SSL configuration for the connection",
"properties": {
"ca": {
"type": "string",
"nullable": true,
"description": "CA certificate file to use for the connection"
},
"caPath": {
"type": "string",
"nullable": true,
"description": "CA certificate path to use for the connection"
},
"rejectUnauthorized": {
"type": "boolean",
"nullable": true,
"description": "Whether to reject unauthorized connections or not"
}
}
},
"debug": {
"type": "boolean",
"nullable": true,
"description": "Whether to enable debug query logging for connection"
},
"lazyConnect": {
"type": "boolean",
"nullable": true,
"default": false,
"description": "Whether to allow the plugin to continue if the connection to the database cannot be established"
}
},
"nullable": false,
"required": [
"driver"
]
},
"topicMappings": {
"type": "array",
"description": "List of topic mapping objects to forward MQTT messages to InfluxDB",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"nullable": false,
"description": "Unique identifier of this mapping"
},
"schemaMapping": {
"type": "string",
"nullable": false,
"description": "Name of the schema mapping which should be applied here"
},
"target": {
"type": "string",
"description": "InfluxDB measurement to forward MQTT messages to"
},
"options": {
"type": "object",
"nullable": true,
"description": "Optional options for the measurement"
},
"mqttTopics": {
"type": "array",
"items": {
"type": "string",
"description": "List of MQTT topics to forward messages from",
"pattern": "^((\\+?|[\\w-]+)((\\/[\\w-]+)?(\\/\\+)?(\\/[\\w-]+)?((?<!\\+)\\/)?)*(\\/#)?|(\\/)?#)$"
}
}
},
"required": [
"name",
"target",
"mqttTopics"
]
}
},
"options": {
"type": "object",
"nullable": true,
"description": "Plugin options for configuring internal queues and buffers",
"properties": {
"bufferSize": {
"type": "integer",
"nullable": true,
"minimum": 1,
"description": "Maximum number of messages to buffer before flushing to the database"
},
"timeoutMs": {
"type": "integer",
"nullable": true,
"minimum": 0,
"description": "Maximum time in milliseconds to buffer messages before flushing to the database"
},
"queueMaxSize": {
"type": "integer",
"nullable": true,
"minimum": -1,
"description": "Maximum number of messages in retry queue"
},
"maxRetries": {
"type": "integer",
"nullable": true,
"minimum": 0,
"description": "Maximum number of retries for failed inserts due to lost connection"
},
"retryDelayMs": {
"type": "integer",
"nullable": true,
"minimum": 0,
"description": "Delay in milliseconds before retrying inserts failed due to lost connection"
}
}
}
},
"required": [
"name",
"connection",
"topicMappings"
]
}
},
"schemaMappings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"nullable": false,
"description": "Unique identifier of the mapping"
},
"mapping": {
"type": "array",
"description": "List of mappings to apply to InfluxDB. Each schema contains a mapping of InfluxDB fields/tags to MQTT message properties",
"items": {
"type": "object",
"description": "Properties to select from the MQTT message and insert into InfluxDB",
"properties": {
"source": {
"oneOf": [
{
"type": "string"
},
{
"type": "number"
},
{
"type": "null",
"nullable": true
}
],
"description": "JSON path to the value to insert or constant value"
},
"target": {
"type": "string",
"description": "Name of the field or tag to insert data from \"source\" into"
},
"targetType": {
"type": "string",
"enum": [
"tag",
"field",
"lineProtocol"
],
"description": "Whether the target is a tag or a field in InfluxDB"
},
"type": {
"type": "string",
"nullable": true,
"description": "Data type of the value to cast source to before inserting"
},
"options": {
"type": "object",
"nullable": true,
"description": "Optional options for the field or tag",
"properties": {
"isConst": {
"type": "boolean",
"nullable": true,
"description": "Whether the source is a constant value"
}
}
}
},
"required": [
"source",
"target",
"targetType"
]
}
}
},
"required": [
"name",
"mapping"
]
}
}
},
"required": [
"connections",
"schemaMappings"
]
}