InfluxDB Bridge
Introduction
The InfluxDB Bridge plugin can be used to forward data published to the Mosquitto broker to InfluxDB time-series databases. The plugin can handle multiple database connections. For each connection, a mapping between certain MQTT topics and InfluxDB measurements can be defined (see example configuration).
The list of currently supported InfluxDB versions:
InfluxDB v1.x
(using theinflux/v1
driver)InfluxDB v2.x
(using theinflux/v2
driver)InfluxDB v3.x
(using theinflux/v2
driver (same asInfluxDB v2.x
))
InfluxDB Bridge supports both basic (username, password) authentication for v1 and token-based authentication for v2.
The plugin manages incoming MQTT messages in a buffer to perform batching and improve performance when writing to
InfluxDB. The size properties of the buffer can be controlled via the configuration in the options
section,
with bufferSize
and timeoutMs
(refer to Configuration File Format).
Plugin activation
To enable the InfluxDB Bridge plugin in the broker, add the following to the mosquitto.conf
file:
plugin /usr/lib/cedalo_influxdb_bridge.so
persistence_location /mosquitto/data
This is an example configuration snippet, which applies to the docker container setup. For installation not running in a
container, the above configuration needs to be adjusted accordingly (namely the location of cedalo_influxdb_bridge.so
dynamic library and persistence_location
may differ).
persistence_location
is used as the search path for the plugin's config file in the context of a single node broker.
The file must be called Influxdb-Bridge.json
. This file must be created to allow the plugin to start, but it may be
left empty. The ownership of this file should be given to the mosquitto
user.
In the context of an HA cluster, configuration is fully managed by the Cedalo Platform or manually via a respective
control topic (see the section below).
In addition to modifying mosquitto.conf
, ensure that you have the appropriate license to use the plugin.
Configuration
The configuration of the bridge is achieved by providing a configuration file following the JSON schema below. You can modify the configuration file using the MQTT API for the Mongo Bridge. This is described in detail here. Another option is to use the user interface provided with the Cedalo Mosquitto Platform, which provides a webpage to modify and update the configuration. This option is described below in the Platform UI section
Configuration File Format
The configuration is managed by the control topic with the latest update being stored to disk with a filename
of Influxdb-Bridge.json
under the persistence location (as defined in mosquitto.conf
).
Note that in the single node setting, the plugin will not be able to start unless the
Influxdb-Bridge.json
file exists under the persistence location. When first starting the plugin this file should be created but may be left empty. The owner of this file should be themosquitto
user.
The configuration of the plugin is a JSON object consisting of an array of connection objects as well as an array of
so-called "schema mappings". Each connection object defines its own InfluxDB connection and MQTT topics to listen to. It
also references a mapping of the MQTT message properties to InfluxDB fields and tags (or an entire line protocol record
if the MQTT message is a line protocol string). Such mappings are called schema mappings and are described in a
different array within the same file under schemaMappings
field. This is to allow multiple InfluxDB connections to
reuse the same mappings.
The fields of the configuration object are described below. To see an example of the config file refer to the configuration example section. Refer to the configuration schema
The following fields of the config are mandatory:
connections
(path:$.connections
):
An array of InfluxDB connection objects. Apart from database connection settings, each connection object also contains MQTT topic mapping information (type:array
).name
(path:$.connections[].name
):
Unique identifier of the InfluxDB Bridge connection object. A convenience name for the database connection used in the plugin's logs (type:string
).connection
(path:$.connections[].connection
):
Object holding the actual database connection information (type:object
).driver
(path:$.connections[].connection.driver
):
Name of the driver to be used with the database connection. (type:string
, one of:influxdbv1
,influxdbv2
).hostname
(path:$.connections[].connection.hostname
):
Hostname or IP address of the InfluxDB server (type:string
).port
(path:$.connections[].connection.port
):
Port that the InfluxDB server listens for client connections on (type:integer
).database
(path:$.connections[].connection.database
):
Name of the database for InfluxDB v1 or bucket for InfluxDB v2 to connect to (type:string
).credentials
(path:$.connections[].connection.credentials
):
Object holding authentication information. (type:object
)- For InfluxDB v1:
username
(path:$.connections[].connection.credentials.username
):
Username used for authenticating against the database (type:string
)password
(path:$.connections[].connection.credentials.password
):
Password of the user used for authenticating against the database. (type:string
)
- For InfluxDB v2:
token
(path:$.connections[].connection.credentials.token
):
Authentication token for InfluxDB v2 connection (type:string
)
- For InfluxDB v1:
organization
(path:$.connections[].connection.organization
):
Organization that a bucket belongs to in InfluxDB v2. This field is only required by InfluxDB v2 itself. But it's not a required field in the JSON schema of the plugin (type:string
)
topicMappings
(path:$.connections[].topicMappings
):
List of mappings specifying a mapping between MQTT topics and InfluxDB measurements to forward data to (type:array
of typeobject
).name
(path:$.connections[].topicMappings.name
):
Unique identifier of the current topic mapping ( type:string
).target
(path:$.connections[].topicMappings.target
):
Name of the InfluxDB measurement into which MQTT data needs to be inserted. If your MQTT messages are line protocol strings already, this field should be set to an empty string (type:string
).mqttTopics
(path:$.connections[].topicMappings.mqttTopics
):
An array of MQTT topics that should be forwarded to InfluxDB. MQTT wildcards (+
and#
) can also be used (type:array
of typestring
).schemaMapping
(path:$.connections[].topicMappings.schemaMapping
):
Name of a particular schema mapping from$.schemaMappings
array to be used as a mapping betweentarget
measurement andmqttTopics
.
schemaMappings
(path:$.schemaMappings
):
An array of schema mappings. Each element of the array specifies a mapping between MQTT message properties and InfluxDB fields and tags (type:array
of typeobject
).
Optional fields:
connections
(path:$.connections
)connection
(path:$.connections[].connection
)compression
(path:$.connections[].connection.compression
):
Properties related to data compression for InfluxDB v2. Not applicable for v1 (type:object
)enable
(path:$.connections[].connection.compression.enable
):
Whether to compress data before sending to InfluxDB. Less bandwidth required but inflicts more strain on the server (type:bool
, defaults to:false
)
ssl
(path:$.connections[].connection.ssl
):
SSL options. SSL connection with the database will not be established if this option is not defined (type:object
)rejectUnauthorized
(path:$.connections[].connection.ssl.rejectUnauthorized
):
Whether to refuse unauthorized certificates from the InfluxDB server. (type:bool
, defaults to:false
)caPath
(path:$.connections[].connection.ssl.caPath
):
Path to CA that signed InfluxDB server certificate (type:string
)ca
(path:$.connections[].connection.ssl.ca
):
String with CA that signed InfluxDB server certificate. Use either this property orcaPath
(type:string
)
debug
(path:$.connections[].connection.debug
):
Whether to log debug information about queries performed by the plugin. This option is currently not in use by the plugin. It's reserved for the future versions (type:bool
, defaults to:false
).lazyConnect
(path:$.connections[].connection.lazyConnect
):
Whether to allow the plugin to continue if the connection to the database cannot be established (type:bool
, defaults to:false
).
options
(path:$.connections[].options
):
Object holding additional plugin settings (type:object
).bufferSize
(path:$.connections[].options.bufferSize
):
Defines the size of the buffer, which is used to batch multiple MQTT messages together before inserting them into InfluxDB. For more information, see the section on buffering (type:integer
, defaults to:1000
).timeoutMs
(path:$.connections[].options.timeoutMs
):
Interval in milliseconds after which buffer is forcefully released if it is not empty. In other words, it specifies a maximum latency of the buffer release ( type:integer
, defaults to:5000
).queueMaxSize
(path:$.connections[].options.queueMaxSize
):
Defines the max size of a retry queue used in case a database insert fails due to connection problems. If set to-1
, the size of the queue is not limited. For more information see the section on retry queue (type:integer
, defaults to:-1
).maxRetries
(path:$.connections[].options.maxRetries
):
Defines a number of retries to be performed by a retry queue before dropping the message. For more information, see the section on retry queue (type:integer
, defaults to:10
).retryDelayMs
(path:$.connections[].options.retryDelayMs
):
Defines a delay in milliseconds between the retry attempts of the queue. For more information, see the section on retry queue ( type:integer
, defaults to:1000
).
schemaMappings
(path:$.schemaMappings
):
An array of schema mappings. Each element of the array specifies a mapping between MQTT message properties and InfluxDB fields and tags (type:array
of typeobject
).name
(path:$.schemaMappings[].name
):
Unique identifier of the current schema mapping. This name is matched against$.connections[].topicMappings.schemaMapping
to determine which schema mapping to use for particular measurement (type:string
).mapping
(path:$.schemaMappings[].mapping
):
An array holding source-to-target mapper object with information about which properties or metadata to pick out from your MQTT messages as well as which InfluxDB fields or tags those properties should be inserted into (type:array
of typeobject
).source
(path:$.schemaMappings[].mapping[].source
):
Source of the data to be inserted. This can be a property of the MQTT message payload (in case the payload is a JSON object) or some metadata property (e.g. client_id, timestamp, etc). This field follows a particular unpacking syntax. For more information, see below (type:string
).target
(path:$.schemaMappings[].mapping[].target
):
InfluxDB target field or tag name into which data from thesource
field will be inserted. For the MQTT messages which are already line protocol strings this field should be an empty string (type:string
).targetType
(path:$.schemaMappings[].mapping[].targetType
):
Whether the target is a tag, field, or a line protocol string (type:string
, one of:tag
,field
,lineProtocol
).type
(path:$.schemaMappings[].mapping[].type
):
Optional type casting of thesource
field before it gets inserted into InfluxDB. See more information about typecasting here ( type:string
).options
(path:$.schemaMappings[].mapping[].options
):
Object holding information about additional transformations to be performed onsource
before it is inserted into InfluxDB.isConst
(path:$.schemaMappings[].mapping[].options.isConst
):
Used to indicate that the field insource
is a constant that should be inserted into InfluxDB as is without trying to unpack it. This is relevant in case thesource
field is a string that follows the unpacking syntax but should be treated as a constant. Otherwise (i.e. for constants that don't follow the unpacking syntax), this field is not required and therefore can be omitted or only added for clarity and additional context (type:bool
).replace
(path:$.schemaMappings[].mapping[].options.replace
):
Specifies an array of two strings. Replaces all occurrences of the first string with the second one before inserting the data into the database (type:array
consisting of two elements of typestring
).replaceNullWith
(path:$.schemaMappings[].mapping[].options.replaceNullWith
):
Specifies a value that null values on the source will be replaced with (type:any
).replaceUndefinedWith
(path:$.schemaMappings[].mapping[].options.replaceUndefinedWith
):
In case that a property specified in source is missing on the MQTT message, it will be replaced with the value specified in this option (type:any
).format
(path:$.schemaMappings[].mapping[].options.format
):
Used when casting into datetime datatype (i.e. strings representing date and time). Specifies a datetime format string that values from source are formatted as. See format string documentation for more information on available format strings. Defaults toyyyy-MM-ddTHH:mm:ss.sssZ
(with an optional timezone information). However, by default if an integer value representing a unix timestamp in milliseconds is encountered in source, it will also be accepted and appropriately converted to the datetime datatype (type:string
).dbFormat
(path:$.schemaMappings[].mapping[].options.dbFormat
):
Used when casting into datetime datatype. Specifies a format string that datetime values should be converted to before they are inserted into the table (type:string
).replaceDelimiter
(path:$.schemaMappings[].mapping[].options.replaceDelimiter
):
Used when casting into datetime datatype. The format of the datetime values inserted into the database will typically be an ISO format as in the following example: 2017-01-29T14:07:00. InfluxDB can understand and parse this format but if you would like to change the delimiter to something else like a space, you can still do it with this option. For more complex changes use thedbFormat
field (type:string
).
Mapping MQTT packets to InfluxDB
The plugin configuration specifies the mapping between MQTT packets and InfluxDB fields/tags in the schemaMappings
field. Elements of the schemaMappings
array must have unique names that will be matched against the schemaMapping
field of the respective connection object.
mapping
specifies an array of extractor objects (you may also see them being referred to as selectors in other
documentation on bridges) between sources and targets. The source
field represents the source of the data. It comes
from the MQTT message or its metadata. The target
field is the name of the field or tag that source
will be inserted
into in InfluxDB. targetType
specifies whether the target is a field or tag in InfluxDB, or a line protocol string.
To be more precise, the source
field can be a constant, a path to a particular metadata property of an MQTT message,
or a data property within its payload. The latter is relevant for JSON MQTT payloads: in case the MQTT payload is a
JSON, the path to some data property of such payload can be specified as a source.
Below are some examples with clarifications to put the above-mentioned into context:
It is possible to unpack MQTT payload in case it is a JSON and extract (pick out) certain properties from it. This can
be achieved by specifying a JSON path to such property in the source
field. However, the properties in this path
should be separated not by dots, but rather by square brackets (e.g.: not payload.property1.innerProperty
but [payload][property1][innerProperty]
).
Note that currently the only supported characters for the JSON payload properties are
[a-zA-Z0-9_-]
(alphanumeric characters as well as_
(underscore) and-
(dash))
Example:
{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][temperature]",
"target": "temp",
"targetType": "field"
}
],
...
}
In the above example, the temperature
property from the payload is selected as a field named temp
in InfluxDB.
This means that if the MQTT payload looks like below:
{
"temperature": 21.5,
"humidity": 45,
"status": "normal"
}
then the value 21.5
will be inserted as a field called temp
in the InfluxDB measurement.
Another example:
{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][sensor][id]",
"target": "sensorId",
"targetType": "tag"
}
],
...
}
The extractor object above expects an MQTT message payload to be in JSON format. It extracts a property id
from
the sensor
object in this payload and inserts (maps) it into sensorId
tag in InfluxDB.
Apart from extracting data from payloads it's also possible to insert constant values, which may be useful for InfluxDB tags. Here is another example which maps a constant value instead of the one taken from an MQTT message:
{
...
"name": "mapping1",
"mapping": [
{
"source": "sensor-reading",
"target": "sensorType",
"targetType": "tag",
"options": {
"isConst": true
}
}
],
...
}
This extractor maps a constant string ("sensor-reading"
) to the tag sensorType
in InfluxDB. This means that for
every MQTT message for which this mapping is invoked, the tag sensorType
with value "sensor-reading"
will be added
to the InfluxDB measurement.
To insert the entire MQTT payload (useful when the payload is of a primitive type like a string or a number), the extractor would look similar to the following:
{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload]",
"target": "status",
"targetType": "field"
}
],
...
}
Note that apart from [payload]
, various special metadata properties of the MQTT message may be selected. Here is the
entire list of special (meta)data properties:
hostname
: Hostname of the broker. Note that theHOSTNAME
environment variable must be specified for the broker, or the default value will be used (type:string
, defaults to:<Unknown>
).topic
: Name of the topic to which the MQTT message was published (type:string
).payload
: Payload of the MQTT message. Can be used to further unpack (pick out) child properties of the payload object if the latter is a JSON. See below for more information. If the payload was originally a string or a number, it will be treated as such, and no typecasting is necessary (type:any
).qos
: QoS level of the MQTT message (type:integer
, one of:0
,1
,2
).retain
- Retain flag of the MQTT message (type:bool
).timestamp
- Unix timestamp representing the time when the MQTT message arrived at the broker (type:integer
).datetime
- Datetime string representing the time when the MQTT message arrived at the broker (type:string
representing the date in ISO format).uuid
- unique identifier (UUID) of the message. Note that it is a convenience property. This ID is internally generated by the InfluxDB Bridge plugin and is not coming from the original message's metadata (type:string
).client_id
- client_id of the device that published the MQTT message (type:string
).
To select one of the above-mentioned properties, it should be enclosed in square brackets ([]
) and specified in
the source
field of the configuration file.
For example, to use the client ID of the device that published the message as a tag in InfluxDB, the following selector can be defined:
{
...
"name": "mapping1",
"mapping": [
{
"source": "[client_id]",
"target": "deviceId",
"targetType": "tag"
}
],
...
}
Data type casting
InfluxDB Bridge provides a feature to typecast properties of the MQTT messages before they are inserted into measurements.
In order to cast a certain property to a certain type, the following configuration must be specified:
{
...
"name": "<your mapping name here>",
"mapping": [
{
"source": "<your source mqtt message property>",
"target": "<your target influxdb field/tag name>",
"targetType": "<field or tag>",
"type": "<specify type to cast 'source' into>"
}
],
...
}
For example:
{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][temperature_str]",
"target": "temperature",
"targetType": "field",
"type": "float"
}
],
...
}
Appropriate type casting depends on whether the target is a field or a tag in InfluxDB:
For tags: Tags are always stored as strings in InfluxDB.
For fields: InfluxDB supports the following field types:
float
/double
: for floating point valuesinteger
: for integer valuesstring
: for text valuesboolean
: for true/false values
Keeping the above in mind, in InfluxDB bridge the following typecasting is provided and makes sense:
For tags: Tags are automatically converted to strings from any type. Therefore the only typecasting that makes sense here is a
datetime
if you are using a date as a tag and want to change it's format. See below for more information ondatetime
casting.For fields:
datetime
: casts integers or strings into a datetime (format
,dbFormat
andreplaceDelimiter
might need to be specified. See config options section for details). There is no need to typecast datetime strings if they follow ISO format (note that the timezone portion is not required here since many databases don't handle timezones).number
: casts strings into either a float or integer (supports float numbers separated by both dots or commas ( e.g. both1.5
and1,5
are handled)).integer
: casts string into an integer truncating the fractional part.boolean
: casts strings or numbers into a boolean value (note that string"false"
is cast to thefalse
boolean value, as is number0
,null
, empty string, or a missing property. Everything else is cast totrue
).string
: can cast numbers into strings. There is typically no need to typecast JSON objects to strings as this is done automatically.
Note that while it might be tempting to explicitly specify typecasting for all the
source
s, this is not recommended as typecasting is often associated with additional overhead. The rule of thumb would be to cast only what is necessary.
Line Protocol
InfluxDB natively understands
the Line Protocol format. If your MQTT
payload already contains data in Line Protocol format, you can use the special lineProtocol
target type to directly
insert it:
{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload]",
"target": "",
"targetType": "lineProtocol"
}
],
...
}
Note that the target is empty. This is because line protocol strings already include all the required data (which
measurements, tags, and fields to insert). Apart from setting target of the mapping to an empty string, you will also
have to set a target
field of the topicMappings
to an empty string. This is again because the name of the
measurement has to be included as part of the line protocol string itself.
With the configuration above, if your MQTT payload contains valid Line Protocol data like:
weather,location=us-midwest temperature=82.0,humidity=54 1465839830100400200
It will be directly parsed and inserted by InfluxDB.
Also note that the timestamp at the end is in nanoseconds. The timestamp may also be omitted, in which case the current time of message reception by the InfluxDB will be used instead.
Extractor options
Each extractor under the mapping
section of the configuration file can have additional options that define
transformations to be performed on the source
data before it is inserted into target
. For example:
{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "column_a",
"options": {
"replaceNullWith": "<null>"
}
}
],
...
}
The above specifies an extractor that maps property1
of the MQTT message to the column_a
in the InfluxDB
measurement. Once the null
value is encountered in the property1
property of the MQTT message's payload, it will be
converted to "<null>"
string before being inserted into the database.
Note that some options can only be used with certain (or when typecasting to certain) datatypes, while others can be used with any type.
Options which can only be used when typecasting to datetime
:
format
dbFormat
replaceDelimiter
Example:
{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "column_a",
"type": "datetime",
"options": {
"nullValue": "<null>",
"format": "yyyy-MM-dd HH:mm:ss",
"dbFormat": "dd-MMM-yy hh:mm:ss",
"replaceDelimiter": " "
}
}
],
...
}
Options which can be used as long as the underlying value is a string
:
replace
Example:
{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "column_a",
"options": {
"replace": [
"hello world",
"hello SQL"
]
}
}
],
...
}
Options which can be used regardless of types and typecasting:
replaceNullWith
replaceUndefinedWith
isConst
See configuration options for more information on the listed options (their descriptions, datatypes, allowed values).
Choosing InfluxDB version
To work with different InfluxDB versions you have to change the driver
field under connection
in a respective
connection object of the configuration. The allowed values are influxdbv1
and influxdbv2
. To work with InfluxDB v3
choose influxdbv2
driver as the REST API for v2 and v3 is the same.
There are also some additional properties specific to the InfluxDB version. For example, username
and password
properties under credentials
are only relevant to InfluxDB v1, while token
property is only used for v2.
Apart from that, there is a compression
object under connection
, which is applicable only to InfluxDB v2.
SSL connection options
As InfluxDB (both v1 and v2) supports custom server certificates, you can specify TLS/SSL options under ssl
field to
allow Mosquitto to connect to such an InfluxDB instance.
As InfluxDB does not support mTLS you can only specify a CA certificate that signed the custom certificate of your
InfluxDB under SSL options. To that end, there are two fields: ca
or caPath
. The first one specifies a certificate
as a PEM-formatted string, while the other field specifies a path to a certificate file on the filesystem.
Another field under SSL options is rejectUnauthorized
which is a boolean specifying whether to reject unknown
certificates. This option must not be specified in production as it opens your connection up to Man-in-the-Middle (MITM)
attacks. The default for this setting is false
.
If instead of custom certificates your InfluxDB instance is using a certificate signed by a well-known certificate
authority (one of the CAs whose certificates are shipped together with major browsers like Firefox), you may set
the ssl
field to an empty object. This is the same as specifying an ssl
object with a single
property rejectUnauthorized
set to true
. This way a TLS connection will still be initiated by the broker.
Compression
InfluxDB v2 supports data compression of incoming records with gzip
as noted in the InfluxDB documentation
Compression in InfluxDB is one of the techniques
to optimize writes to the database
However, be aware that while compression reduces network bandwidth, it increases the load on the server as it has to
perform compression every time before sending the data over. This means that careful benchmarking needs to be performed
before deciding if compression helps your use case.
To enable compression set an enable
field under compression
object in connection
to true
. The default value
is false
. This feature only works for InfluxDB v2 and higher.
Configuration example
An example of the configuration with two database connections is shown below:
{
"version": "1",
"connections": [
{
"name": "InfluxDB v1 Connection",
"connection": {
"driver": "influxdbv1",
"hostname": "influxdb-v1.example.com",
"port": 8086,
"database": "mqtt_data",
"credentials": {
"username": "admin",
"password": "password123"
}
},
"options": {
"bufferSize": 5000,
"timeoutMs": 3000,
"queueMaxSize": 10000,
"maxRetries": 15,
"retryDelayMs": 1000
},
"topicMappings": [
{
"name": "topic-mapping-to-weather-measurement",
"schemaMapping": "weather-schema-mapping",
"target": "weather_data",
"mqttTopics": [
"sensors/weather/#"
]
}
]
},
{
"name": "InfluxDB v2 Connection",
"connection": {
"driver": "influxdbv2",
"hostname": "influxdb-v2.example.com",
"port": 8086,
"database": "mqtt_metrics",
"organization": "my-org",
"credentials": {
"token": "your-influxdb-v2-token"
},
"compression": {
"enable": true
}
},
"options": {
"bufferSize": 1000,
"timeoutMs": 4000,
"queueMaxSize": 5000,
"maxRetries": 10,
"retryDelayMs": 2000
},
"topicMappings": [
{
"name": "topic-mapping-to-system-measurement",
"schemaMapping": "system-schema-mapping",
"target": "system_metrics",
"mqttTopics": [
"system/metrics/#"
]
}
]
}
],
"schemaMappings": [
{
"name": "weather-schema-mapping",
"mapping": [
{
"source": "[client_id]",
"target": "device_id",
"targetType": "tag"
},
{
"source": "[payload][temperature]",
"target": "temperature",
"targetType": "field"
},
{
"source": "[payload][humidity]",
"target": "humidity",
"targetType": "field"
},
{
"source": "[payload][location]",
"target": "location",
"targetType": "tag"
},
{
"source": "[timestamp]",
"target": "recorded_at",
"targetType": "field"
}
]
},
{
"name": "system-schema-mapping",
"mapping": [
{
"source": "[client_id]",
"target": "host",
"targetType": "tag"
},
{
"source": "[payload][cpu]",
"target": "cpu_usage",
"targetType": "field"
},
{
"source": "[payload][memory]",
"target": "memory_usage",
"targetType": "field"
},
{
"source": "[payload][disk]",
"target": "disk_usage",
"targetType": "field"
},
{
"source": "system",
"target": "data_type",
"targetType": "tag",
"options": {
"isConst": true
}
}
]
}
]
}
As per the example config above, InfluxDB bridge will connect to two databases: an InfluxDB v1 instance
at influxdb-v1.example.com:8086
and an InfluxDB v2 instance at influxdb-v2.example.com:8086
. It will use a database
called mqtt_data
for v1 and a bucket called mqtt_metrics
in the organization my-org
for v2.
The optional options
field is specified and overrides some defaults of the plugin's buffering. For the v1 connection,
it sets bufferSize
to 5000 and timeoutMs
to 3000 milliseconds, while for the v2 connection, it sets bufferSize
to
1000 and timeoutMs
to 4000 milliseconds. This illustrates that you can set different queue options per database
connection, making it adjustable to different traffic loads.
Also see section about buffering for more information.
The retry queue parameters are in this case also configured differently depending on the database connection. See section about retry queue for more information.
Notice that InfluxDB v2 connection has compression
enabled, which will compress data before sending it to the
database, reducing bandwidth at the cost of increased server load. This option is only available for InfluxDB v2.
In the topicMappings
section, we define which MQTT topics should be forwarded to which InfluxDB measurements. The v1
connection maps topics matching sensors/weather/#
to the weather_data
measurement, while the v2 connection maps
topics matching system/metrics/#
to the measurement called system_metrics
.
The schemaMappings
section defines how the MQTT message data should be mapped to InfluxDB fields and tags. For
example, in the weather-schema-mapping
, we extract:
- The client ID as a tag called
device_id
- The temperature, humidity, and timestamp from the payload as fields
- The location from the payload as a tag
In the system-schema-mapping
, we extract:
- The client ID as a tag called
host
- CPU, memory, and disk usage from the payload as fields
- We also add a constant tag called
data_type
with the valuesystem
. This value will stay the same for all records
Error handling
Any configuration or license errors will prevent the plugin from being loaded, and respective messages will appear in stderr and stdout.
If recoverable errors occur during the operation of the plugin, it will generate respective error messages in its stderr
and prefix them with an ERR:
prefix. This includes: encountering errors when converting MQTT messages to InfluxDB
format as well as failures during data insertion.
Note that while error messages generated by the plugin will be preceded by
ERR:
, warning messages will get aWARN:
prefix, while debug messages - a respectiveDEBUG:
.
Debugging
To print detailed information about operations being performed against the InfluxDB database to the stdout, define
the DEBUG
and DEBUG_INFLUXDB_BRIDGE
environment variables on the broker.
In future versions the functionality of debug environment variables will move to the debug
property in the
configuration file under connection
section.
Notes on data insertions and data format
Under the hood the plugin converts all the data to InfluxDB line protocol records, which are then inserted using InfluxDB REST API endpoint. While converting data to line protocol, InfluxDB bridge takes care of escaping spaces and special characters in measurement names and field names, so you don't have to worry about escaping when writing plugin's configuration.
Buffering
Buffers are created per measurement for every InfluxDB connection. The size of the buffer is controlled
with bufferSize
field under options
($.connections[].options
) and indicates the capacity of the buffer. The
default size of the buffer is 1000. Buffer holds records representing unpacked MQTT messages (or line protocol strings
if this is the configured format of your MQTT messages) which are to be inserted to the respective measurement once the
buffer is saturated.
Once the buffer fills up, its flushing is triggered, and a batch write is issued against the measurement, inserting the
buffer contents. The condition for buffer flush is bufferSize + 1
. Once this number of records is reached, the buffer
is released.
Another parameter timeoutMs
- indicates the time interval (in milliseconds) after which all the non-empty buffers will
be forcefully released triggering the batch writes of the buffers' contents. The default value is 5000 ms.
However, even though it might seem that buffer is only released after saturation or a timeout, it also has a mechanism to account for infrequent MQTT messages and decrease the latency of database inserts for such records.
The idea is that the time interval between consecutive MQTT messages is kept track of. If this interval is long enough,
the record will be inserted instantly. However, if the interval between the messages is short, the plugin will keep
buffering the messages anticipating a possible influx. The definition of "long enough" might change when updates to the
plugin are made, but currently, it's a fraction of the timeoutMs.
Note that this leads to the first MQTT message never being buffered.
Also, notice that buffer options can be defined per database connection, allowing for more flexibility and fine-tuning when needed.
Retry queue
The InfluxDB bridge plugin provides a retry mechanism for failed writes to ensure data is not lost in case of temporary connection issues. To prevent data loss, InfluxDB bridge has a retry queue that is responsible for retrying message writes in case of connection problems.
Each retry queue can be configured with the following settings (under options
field of the
configuration ($.connections[].options
)):
queueMaxSize
option is used to set the maximum amount of messages that the queue is able to accommodate before it
starts to drop them. This limit can be set to infinity using the value -1
, which is the default. Note that if this
value is not set to -1
, then careful considerations and testing should be performed to ensure that the queue will not
overfill too quickly.
maxRetries
is used to specify the maximum number of retry attempts the queue will make before dropping a message. The
default is 10.
retryDelayMs
is a minimum amount of time in milliseconds that must pass between the retry queue attempts. The default
is 1000 ms (1 second).
Note that the elements in a retry queue can be both single MQTT messages as well as entire buffers of MQTT messages that are ready to be batch-written to InfluxDB.
An important point is that retries happen only in case of connection errors. In case of invalid data the error is logged out but no retries are performed. If a batch of messages is being inserted and only some of these messages are invalid, then the filtering of invalid records from valid ones will happen on the side of the InfluxDB itself using its native mechanisms to handle invalid data.
Platform UI
JSON schema
InfluxDB bridge plugin uses the following JSON schema for its configuration:
{
"title": "InfluxDB-Bridge Plugin Config",
"type": "object",
"properties": {
"version": {
"type": [
"string",
"integer"
],
"nullable": true,
"description": "Version of the configuration file"
},
"connections": {
"type": "array",
"description": "List of sub-configurations per InfluxDB connection",
"items": {
"type": "object",
"description": "Configuration per InfluxDB connection",
"properties": {
"name": {
"type": "string",
"nullable": false,
"description": "Unique textual identifier of the configuration"
},
"connection": {
"type": "object",
"description": "InfluxDB connection specific configuration",
"properties": {
"driver": {
"type": "string",
"enum": [
"influxdbv1",
"influxdbv2"
],
"nullable": false,
"description": "Name of the InfluxDB driver to use (for InfluxDB v1 or InfluxDB v2/v3)"
},
"hostname": {
"type": "string",
"nullable": false,
"description": "Hostname or IP of the InfluxDB server to connect to"
},
"port": {
"type": "integer",
"nullable": false,
"description": "Port of the InfluxDB server to connect to"
},
"database": {
"type": "string",
"nullable": false,
"description": "Name of the database (or bucket for influxdb v2) to connect to"
},
"organization": {
"type": "string",
"nullable": true,
"description": "Organization that a bucket belongs to in InfluxDB v2"
},
"compression": {
"type": "object",
"nullable": true,
"description": "Properties related to data compression happening before sending it over to InfluxDB",
"properties": {
"enable": {
"type": "boolean",
"nullable": true,
"default": false,
"description": "Whether to compress data before influxdb. Less bandwidth required but inflicts more strain on the server"
}
}
},
"credentials": {
"type": "object",
"nullable": false,
"description": "Credentials to authenticate to InfluxDB. Combine both v1 credentials (username and password) and v2 credentials (tokens). Only one of the two which corresponds to the InfluxDB version in use must be selected",
"properties": {
"token": {
"type": "string",
"nullable": true,
"description": "Authentication token for InfluxDB v2 connection"
},
"username": {
"type": "string",
"nullable": true,
"description": "Username for InfluxDB v1"
},
"password": {
"type": "string",
"nullable": true,
"description": "Password for InfluxDB v1"
}
}
},
"ssl": {
"type": "object",
"nullable": true,
"description": "SSL configuration for the connection",
"properties": {
"ca": {
"type": "string",
"nullable": true,
"description": "CA certificate file to use for the connection"
},
"caPath": {
"type": "string",
"nullable": true,
"description": "CA certificate path to use for the connection"
},
"rejectUnauthorized": {
"type": "boolean",
"nullable": true,
"description": "Whether to reject unauthorized connections or not"
}
}
},
"debug": {
"type": "boolean",
"nullable": true,
"description": "Whether to enable debug query logging for connection"
},
"lazyConnect": {
"type": "boolean",
"nullable": true,
"default": false,
"description": "Whether to allow the plugin to continue if the connection to the database cannot be established"
}
},
"nullable": false,
"required": [
"driver"
]
},
"topicMappings": {
"type": "array",
"description": "List of topic mapping objects to forward MQTT messages to InfluxDB",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"nullable": false,
"description": "Unique identifier of this mapping"
},
"schemaMapping": {
"type": "string",
"nullable": false,
"description": "Name of the schema mapping which should be applied here"
},
"target": {
"type": "string",
"description": "InfluxDB measurement to forward MQTT messages to"
},
"options": {
"type": "object",
"nullable": true,
"description": "Optional options for the measurement"
},
"mqttTopics": {
"type": "array",
"items": {
"type": "string",
"description": "List of MQTT topics to forward messages from",
"pattern": "^((\\+?|[\\w-]+)((\\/[\\w-]+)?(\\/\\+)?(\\/[\\w-]+)?((?<!\\+)\\/)?)*(\\/#)?|(\\/)?#)$"
}
}
},
"required": [
"name",
"target",
"mqttTopics"
]
}
},
"options": {
"type": "object",
"nullable": true,
"description": "Plugin options for configuring internal queues and buffers",
"properties": {
"bufferSize": {
"type": "integer",
"nullable": true,
"minimum": 1,
"description": "Maximum number of messages to buffer before flushing to the database"
},
"timeoutMs": {
"type": "integer",
"nullable": true,
"minimum": 0,
"description": "Maximum time in milliseconds to buffer messages before flushing to the database"
},
"queueMaxSize": {
"type": "integer",
"nullable": true,
"minimum": -1,
"description": "Maximum number of messages in retry queue"
},
"maxRetries": {
"type": "integer",
"nullable": true,
"minimum": 0,
"description": "Maximum number of retries for failed inserts due to lost connection"
},
"retryDelayMs": {
"type": "integer",
"nullable": true,
"minimum": 0,
"description": "Delay in milliseconds before retrying inserts failed due to lost connection"
}
}
}
},
"required": [
"name",
"connection",
"topicMappings"
]
}
},
"schemaMappings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"nullable": false,
"description": "Unique identifier of the mapping"
},
"mapping": {
"type": "array",
"description": "List of mappings to apply to InfluxDB. Each schema contains a mapping of InfluxDB fields/tags to MQTT message properties",
"items": {
"type": "object",
"description": "Properties to select from the MQTT message and insert into InfluxDB",
"properties": {
"source": {
"oneOf": [
{
"type": "string"
},
{
"type": "number"
},
{
"type": "null",
"nullable": true
}
],
"description": "JSON path to the value to insert or constant value"
},
"target": {
"type": "string",
"description": "Name of the field or tag to insert data from \"source\" into"
},
"targetType": {
"type": "string",
"enum": [
"tag",
"field",
"lineProtocol"
],
"description": "Whether the target is a tag or a field in InfluxDB"
},
"type": {
"type": "string",
"nullable": true,
"description": "Data type of the value to cast source to before inserting"
},
"options": {
"type": "object",
"nullable": true,
"description": "Optional options for the field or tag",
"properties": {
"isConst": {
"type": "boolean",
"nullable": true,
"description": "Whether the source is a constant value"
}
}
}
},
"required": [
"source",
"target",
"targetType"
]
}
}
},
"required": [
"name",
"mapping"
]
}
}
},
"required": [
"connections",
"schemaMappings"
]
}