Skip to main content
Version: Next

SQL Bridges

Premium

Introduction

The SQL Bridge plugin can be used to forward data published to the Mosquitto broker to various relational databases. The plugin can handle multiple database connections. For each connection, a mapping between certain MQTT topics and database tables can be defined (see example configuration).

The list of currently supported DBMS:

  • PostgreSQL
  • TimescaleDB
  • MySQL
  • AlloyDB
  • MariaDB
  • Amazon Redshift
  • Microsoft SQL
  • OracleDB
  • CockroachDB

SQL bridge supports basic (username, password) authentication.

The plugin manages incoming MQTT messages in a buffer to perform batching and leverage batch inserting capabilities of the databases. The size properties of the buffer can be controlled via the config file (sql-bridge.json) in the options section, with bufferSize, batchSize, and timeoutMs (refer to config file format).

Plugin Activation

To enable the SQL Bridge plugin in the broker add the following to the mosquitto.conf file:

plugin /usr/lib/cedalo_sql_bridge.so

persistence_location /mosquitto/data

This is an example configuration snippet applicable to the docker container setup. For installation not running in a container the above configuration needs to be adjusted accordingly (namely the location of cedalo_sql_bridge.so dynamic library and persistence_location may differ).

persistence_location is used as the search base path for the plugin's config file.

In addition to modifying mosquitto.conf, ensure that you have the appropriate license (with the required database integrations) to use the plugin.

Config File Format

The configuration is stored in a single JSON file (called sql-bridge.json by default) located inside the persistence_location, which is defined in mosquitto.conf. To use a different config file name, plugin_opt_config_file option can be specified under the SQL bridge plugin in mosquitto.conf.

The file represents a configuration object consisting of an array of connection objects as well as an array of so-called "schema mappings". Each connection object defines its own database connection, MQTT topics to listen to, and references a mapping of MQTT message properties to columns of a database table. Such mappings are called schema mappings and are described in a different array within the same config file under schemaMapping field.

The fields of the configuration object are described below. To see the entire structure of the configuration file take a look at the JSON schema at the bottom of this page. To see an example of the config file refer to the configuration example section.

The following fields of the config are mandatory:

  • connections (path: $.connections):
    An array of database connection objects. Each connection objects also contains a MQTT topic mapping information apart from database connection settings (type: array).
    • name (path: $.connections[].name):
      Unique identifier of the SQL Bridge connection object. In simple terms, one can think of it as a convenient name for the database connection (type: string).
    • connection (path: $.connections[].connection):
      Object holding the actual database connection information (type: object).
      • driver (path: $.connections[].connection.driver):
        Name of the driver to be used with the database connection. (type: string, one of: mysql, timescaledb, postgres, alloydb, mariadb, cockroachdb, redshift, oracle, mssql).
      • hostname (path: $.connections[].connection.hostname):
        Hostname or IP address of the database server (type: string).
      • port (path: $.connections[].connection.port):
        Port that the database server listens for client connections on (type: integer).
      • database (path: $.connections[].connection.database):
        Name of the database to connect to (type: string).
    • topicMappings (path: $.connections[].topicMappings):
      List of mappings specifying which MQTT topics should forward data to which database tables (type: array of type object). name (path: $.connections[].topicMappings.name):
      Unique identifier of the current topic mapping. Used to provide useful information about configuration errors to the user (type: string).
      target (path: $.connections[].topicMappings.target):
      Name of the database table that you want to insert your MQTT messages into (type: string). * mqttTopics (path: $.connections[].topicMappings.mqttTopics):
      An array of MQTT topics that should be forwarded to the database. MQTT wildcards (+ and #) can also be used (type: array of type string).
  • schemaMappings (path: $.schemaMappings):
    An array of schema mappings (selectors). May be empty, therefore elements of this array are optional and are described in the section below (type: array of type object).

Optional fields:

  • version (path: $.version):
    Version of the config file. Reserved for future use (type: string, defaults to: 1).

  • connections (path: $.connections)

    • connection (path: $.connections[].connection)
      • credentials (path: $.connections[].connection.credentials):
        Object holding basic authentication fields. (type: object)
        • username (path: $.connections[].connection.credentials.username):
          Username used for authenticating against the database (type: string)
        • password (path: $.connections[].connection.credentials.password):
          Password of the user used for authenticating against the database. May be an empty string (type: string)
      • ssl (path: $.connections[].connection.ssl):
        SSL options. SSL connection with DB will not be established if this option is not defined (type: object)
        • rejectUnauthorized (path: $.connections[].connection.ssl.rejectUnauthorized):
          Whether to refuse unauthorized certificates from the DB server. I.e. DB certificate is malformed or not signed by CA certificate provided in caPath or ca option (type: bool, defaults to: false)
        • caPath (path: $.connections[].connection.ssl.caPath):
          Path to CA that signed DB server certificate (type: string)
        • certPath (path: $.connections[].connection.ssl.certPath):
          Path to client certificate (type: string)
        • keyPath (path: $.connections[].connection.ssl.keyPath):
          Path to private client key (type: string)
        • ca (path: $.connections[].connection.ssl.ca):
          string with CA that signed DB server certificate. Use either this or caPath (type: string)
        • cert (path: $.connections[].connection.ssl.cert):
          client certificate string. Use either this or certPath (type: string)
        • key (path: $.connections[].connection.ssl.key):
          private client key string. Use either this or keyPath (type: string)
      • lazyConnect (path: $.connections[].connection.lazyConnect):
        Whether to initialize database connection lazily meaning that the connection pool will be created but the database connection will not be performed until there are records to be inserted. In case this option is false the connection to the database will be established and confirmed on plugin startup. In case of connection failure at this stage the plugin will output an error and fail (become unoperational). This allows to spot the connection issues early. Note that with this option being set to false, sql bridge plugin will fail only if problems with initial connection are encountered. If the connection to the database was established sucessfully at startup and fails later, the plugin will try reconnecting normally. (type: bool, defaults to: false).
      • debug (path: $.connections[].connection.debug):
        Whether to log debug information about each SQL query performed by the plugin (type: bool, defaults to: false).
      • driverDebug (path: $.connections[].connection.driverDebug):
        Whether to log driver specific debug information into stdout. Note that different database drivers log different information. Additionally, this option does not work for all the drivers. The format of the logged messages is entirely driver-dependent and does not follow a typical structure of a Mosquitto logentry (type: bool, defaults to: false).
    • options (path: $.connections[].options):
      Object holding additional plugin settings (type: object).
      • bufferSize (path: $.connections[].options.bufferSize):
        Defines the size of the buffer, which is used to batch multiple MQTT messages together before inserting them into the database. For more information, see the section on buffering (type: integer, defaults to: 1000).
      • batchSize (path: $.connections[].options.batchSize):
        Defines the size of the batch inside the buffer. Buffer is broken down into batches and a separate batch insert is issued to the database for every batch in the buffer when the latter is released. (type: integer, defaults to: bufferSize).
      • timeoutMs (path: $.connections[].options.timeoutMs):
        Interval in milliseconds after which buffer is forcefully released if it is not empty. In other words, it specifies a maximum latency of the buffer release (type: integer, defaults to: 5000).
      • queueMaxSize (path: $.connections[].options.queueMaxSize):
        Defines the max size of a retry queue used in case a database insert fails due to connection problems (and not due to malformed data). If set to -1, the size of the queue is not limited. For more information see the section on retry queue (type: integer, defaults to: 1000, defaults to: -1).
      • maxRetries (path: $.connections[].options.maxRetries):
        Defines a number of retries to be performed by a retry queue before dropping the message. For more information, see the section on retry queue (type: integer, defaults to: 1000, defaults to: 10).
      • retryDelayMs (path: $.connections[].options.retryDelayMs):
        Defines a delay in milliseconds between the retry attempts of the queue. For more information, see the section on retry queue (type: integer, defaults to: 1000, defaults to: 1000).
    • topicMappings (path: $.connections[].topicMappings)
      • options (path: $.connections[].topicMappings.options):
        Additional options which are to be applied before inserting records into the database.
        • dbSchema (path: $.connections[].topicMappings.options.dbSchema):
          Name of the database schema in which the table is stored (if any, since not all the database management systems use schemas) (type: string).
        • uppercase (path: $.connections[].topicMappings.options.uppercase):
          Whether to cast database identifiers (tablename, schemaname, column names) to uppercase or use them as provided. (type: bool, defaults to: false).
      • schemaMapping (path: $.connections[].topicMappings.schemaMapping):
        Name of a particular schema mapping from $.schemaMappings array to be used as a mapping between target table and mqttTopics.
  • schemaMappings (path: $.schemaMappings):
    An array of schema mappings. Each element of the array specifies a mapping between MQTT message properties and database table columns, along with typecasting and transformations if needed (type: array of type object).

    • name (path: $.schemaMappings[].name):
      Unique identifier of the current schema mapping. This name is matched against $.connections[].topicMappings.schemaMapping to determine which schema mapping to used for particular table (type: string).

    • mapping (path: $.schemaMappings[].mapping):
      An array holding source-to-target mapper object with information about which properties or metadata to pick out from your MQTT messages as well as which columns of the target database table those properties should be inserted into. These objects are also called selectors because they select what parts of the MQTT message are to be inserted into the DB table. (type: array of type object).

      • source (path: $.schemaMappings[].mapping[].source):
        Source of the data to be inserted. This can be a property of the MQTT message payload (in case the payload is a JSON object) or some metadata property (e.g. client_id, timestamp etc). This field follows a particular unpacking syntax. For more information, see below (type: string).

      • target (path: $.schemaMappings[].mapping[].target):
        Target table column into which data from the source field will be inserted (type: string).

      • type (path: $.schemaMappings[].mapping[].type):
        Optional type casting of the source field before it gets inserted into the database. See more information about typecasting here (type: string, one of: number, integer, datetime, boolean, raw, json, string).

      • options (path: $.schemaMappings[].mapping[].options):
        Object holding information about additional transformations to be performed on source before it is inserted into the database.

        • encoding (path: $.schemaMappings[].mapping[].options.encoding):
          Used when casting a string data to raw datatype. Specifies the encoding of the string data from source (type: string, one of: ascii, base64, base64url, hex, ucs2, utf16le, utf8, binary, defaults to: utf8).

        • dbEncoding (path: $.schemaMappings[].mapping[].options.dbEncoding):
          Used when casting a to raw datatype. Specifies which string encoding the database expects for raw/binary datatypes. If not specified, the default encoding of the underlying database driver will be used (type: string, one of: ascii, base64, base64url, hex, ucs2, utf16le, utf8, binary).

        • replace (path: $.schemaMappings[].mapping[].options.replace):
          Specifies an array of two strings. Replaces all occurrences of the first string with the second one before inserting the data into the database (type: array consisting of two elements of type string).

        • isConst(path: $.schemaMappings[].mapping[].options.isConst):
          Used to indicate that the field in source is a constant that should be inserted into the database as is without trying to unpack it. This is relevant in case the constant in the source field is a string that follows the unpacking syntax (see below for more information) and, in particular, contains [ and ]. Otherwise, this field can be omitted (type: bool, defaults to: if source is a string following the unpacking syntax: true else: false).

        • treatNullAsDefault (path: $.schemaMappings[].mapping[].options.treatNullAsDefault): This parameter specifies whether null values in the MQTT message should be treated as not defined and handled by the database table's default values (type: bool, default: false).

        • nullValue (path: $.schemaMappings[].mapping[].options.nullValue): Used to specify which value encountered in source should be replaced with null when performing an insert into the table. For example, with the help of this field, you can specify to convert all "<None>" strings to null values (type: string, boolean, number, or null).

        • defaultValue (path: $.schemaMappings[].mapping[].options.defaultValue): Used to specify which value encountered in the source field should be treated as not defined and therefore replaced with the tables's DEFAULT value of the target column when insert is performed (type: string, boolean, number, or null).

        • replaceNullWith (path: $.schemaMappings[].mapping[].options.replaceNullWith): Specifies a value that null values will be replaced with (type: any).

        • replaceUndefinedWith (path: $.schemaMappings[].mapping[].options.replaceUndefinedWith): In case that a property specified in source is missing on the MQTT message, it will be replaced with the value specified in this option (type: any).

        • format (path: $.schemaMappings[].mapping[].options.format): Used when casting into datetime datatype. Specifies a datetime format string representing the values from source. See format string documentation for more information on available format strings. Defaults to yyyy-MM-ddTHH:mm:ss.sssZ (with an optional timezone information). However, by default if an integer value representing a unix timestamp in milliseconds is encountered in source, it will also be accepted and appropriately converted to the datetime datatype (type: string).

        • dbFormat (path: $.schemaMappings[].mapping[].options.dbFormat):
          Used when casting into datetime datatype. Specifies a format string that datetime values should be converted to before they are inserted into the table. Database drivers typically handle this conversion automatically and usually default to the ISO string format. However, in some specific cases (for example, with OracleDB which uses dd-MMM-yy hh:mm:ss instead of ISO format by default) an appropriate format might be required be defined by the user (type: string).

        • replaceDelimiter (path: $.schemaMappings[].mapping[].options.replaceDelimiter):
          Used when casting into datetime datatype. The format of the datetime values inserted into the database will typically be an ISO format as in the following example: 2017-01-29T14:07:00. If, for some reason, the database does not accept the T delimiter separating the date and time portion, a different delimiter may be specified using this option. Some database drivers will handle this transformation automatically, but in other cases, this field may become useful (type: string).

Mapping MQTT packets to database table columns

The plugin configuration specifies the mapping between MQTT packets and table columns in the schemaMappings field. Elements of the schemaMappings array must have unique names that will be matched against the schemaMapping field of the respective connection object and a mapping field.

mapping specifies an array of mapper objects (also referred to as selectors) between sources and targets. source field represents the source of the data. It comes from the MQTT message or its metadata. target field, on the other hand, is an appropriate table column that source will be inserted into. It should be able to accomodate the datatype of source. Most of the time the consistency between the common datatypes of sources and targets (e.g. string to VARCHAR, number to NUMBER etc.) is handled by the SQL bridge itself. However, in certain cases of datatype mismatches (e.g. when handling non-ISO formatted date strings etc) a typecasting may be required. See the following section on typecasting.

To be more precise, the source field can either be a constant or a path to a particular metadata property of an MQTT message or a data property within its payload. In case MQTT payload is a JSON, the path to some data property of such payload can be specified.

Here are some examples to put the above-mentioned into context:

{
...
"name": "mapping1",
"mapping": [
{
"source": "I am a constant string",
"target": "column_a"
}
],
...
}

The select object above maps a constant string ("I am a constant string") to the column_a of the database. This means that for every MQTT message for which this mapping is invoked, an SQL statement inserting "I am a constant string" into the column column_a will be performed.

To insert the entire MQTT payload, the selector would look similar to the following:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload]",
"target": "column_a"
}
],
...
}

Note that apart from [payload], various special metadata properties of the MQTT message may be selected. Here is the entire list of special (meta)data properties:

  • hostname: Hostname of the broker. Note that the HOSTNAME environment variable must be specified for the broker, or the default value will be used (type: string, defaults to: <Unknown>).
  • topic: Name of the topic to which the MQTT message was published (type: string).
  • payload: Payload of the MQTT message. Can be used to further unpack (pick out) child properties of the payload object if the latter is a JSON. See below for more information. If the payload was originally a string or a number, it will be treated as such, and no typecasting is necessary (type: any).
  • qos: QoS level of the MQTT message (type: integer, one of: 0, 1, 2).
  • retain - Retain flag of the MQTT message (type: bool).
  • timestamp - Unix timestamp representing the time when the MQTT message arrived at the broker. To insert this property into the database a column of type INTEGER or BIGINT would typically be used (type: integer).
  • datetime - Datetime string representing the time when the MQTT message arrived at the broker. To insert this property into the database a column of type DATETIME would typically be used (type: string representing the date in ISO format).
  • uuid - unique identifier (UUID) of the message. Note that it is a convenience property. This ID is internally generated by the SQL Bridge plugin and is not coming from the original message's metadata (type: string).
  • client_id - client_id of the device that published the MQTT message (type: string).

To select one of the above-mentioned properties, it should be enclosed in square brackets ([]) and specified in the source field of the configuration file.

For example, to insert the client ID of the device that published the message into the client_id_column the following selector can be defined:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[client_id]",
"target": "client_id_column"
}
],
...
}

As mentioned, it is also possible to unpack MQTT payload in case it is a JSON and select/pick out/extract certain properties from it. This can be achieved by specifying a JSON path to such property in the source field. However, the properties in this path should be separated not by dots, but rather by square brackets (e.g.: not payload.property1.innerProperty but [payload][property1][innerProperty]).

Note that currently the only supported characters for the JSON payload properties are [a-zA-Z0-9_-] (alpha numeric characters as well as _ (underscore) and - (dash))

Example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1][innerProperty]",
"target": "column_a"
}
],
...
}

In the above example payload.property1.innerProperty is selected into the column_a column.

This means that if the MQTT payload looks like below:

{
"propery1": {
"innerProperty": "test"
},
"property2": {
"someProperty": 1
}
}

then the string "test" will be inserted into column_a.

Column type casting

SQL bridge provides a feature to typecast properties of MQTT messages before they are inserted into the respective tables.

In order to cast a certain property to a certain type, the following configuration must be specified:

{
...
"name": "<your mapping name here>",
"mapping": [
{
"source": "<your source mqtt message property>",
"target": "<your target db table column>",
"type": "<specify type to cast 'source' into>"
}
],
...
}

For example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][number_str]",
"target": "number_column",
"type": "number"
}
],
...
}

The following types are supported:

  • datetime: casts integers or strings into a datetime (format, dbFormat and replaceDelimiter might need to be specified. See options section for details). There is no need to typecast datetime strings if they follow ISO format (note that the timezone portion is not required here since many databases don't handle timezones).
  • number: casts strings into either a float or integer (supports float numbers separated by both dots or commas (e.g. both 1.5 and 1,5 are handled)).
  • integer: casts string into an integer truncating the fractional part.
  • boolean: casts strings or numbers into a boolean value (note that string "false" is cast to a false boolean value, as is number 0, null, empty string, or a missing property. Everything else is cast to true).
  • raw: casts strings, numbers, and objects into binary DB types like BLOB or BINARY. The specific datatype depends on the underlying databae driver and a specified encoding (encoding, and dbEncoding might need to be specified. See options section for details).
  • string: can cast numbers into strings. There is typically no need to typecast JSON objects to strings as this is done automatically.
  • json: casts strings into JSON objects. This type is used very rarely, since implicit JSON casting is handled very well by the SQL bridge itself.

Note that while it might be tempting to explicitly specify typecasting for all the sources, this is not recommended as typecasting is often associated with additional overhead. The rule of thumb would be to cast only what is necessary.

Selector options

Each selector under the mapping section of the configuration file can have additional options that define transformations to be performed on the source data before it is inserted into target. For example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "column_a",
"options": {
"nullValue": "<null>"
}
}
],
...
}

The above specifies a selector that maps property1 of the MQTT message to the column_a in the database table. Once value "<null>" is encountered in property1 it will be converted to NULL before being inserted into the database.

Note that some options can only be used when typecasting to certain datatypes, while others can be used with any type or even without any typecasting.

Options which can only be used when typecasting to datetime:

  • format
  • dbFormat
  • replaceDelimiter

Example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "column_a",
"type": "datetime",
"options": {
"nullValue": "<null>",
"format": "yyyy-MM-dd HH:mm:ss",
"dbFormat": "dd-MMM-yy hh:mm:ss",
"replaceDelimiter": " "
}
}
],
...
}

Options which can only be used when typecasting number, string, or an object to raw:

  • dbEncoding

Example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "column_a",
"type": "raw",
"options": {
"dbEncoding": "hex"
}
}
],
...
}

Options which can only be used when typecasting a string to raw:

  • encoding

Example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "column_a",
"type": "raw",
"options": {
"encoding": "utf8",
"dbEncoding": "hex"
}
}
],
...
}

Options which can be used as long as the underlying value is a string:

  • replace

Example:

{
...
"name": "mapping1",
"mapping": [
{
"source": "[payload][property1]",
"target": "column_a",
"options": {
"replace": ["hello world", "hello SQL"]
}
}
],
...
}

Options which can be used regardless of types and typecasting:

  • replaceNullWith
  • replaceUndefinedWith
  • nullValue
  • defaultValue
  • treatNullAsDefault
  • isConst

See config file fieds for more information on the listed options (their descriptions, datatypes, allowed values).

Configuration example

An example of the sql-bridge.json with two database connections is shown below (you may see all the available configuration options here):

{
"version": 1,
"connections": [
{
"name": "PostgreSQL - Bridge",
"connection": {
"driver": "postgres",
"hostname": "postgres.cedalo.com",
"port": 5432,
"database": "mosquitto",
"credentials": {
"username": "mosquittouser",
"password": "password"
},
"debug": true
},
"options": {
"bufferSize": 10000,
"batchSize": 1000,
"timeoutMs": 4000,
"queueMaxSize": 10000,
"maxRetries": 15,
"retryDelayMs": 1000
},
"topicMappings": [
{
"name": "topic-mapping-to-postgers-table",
"schemaMapping": "schema-mapping-1",
"target": "blood",
"options": {
"dbSchema": "public"
},
"mqttTopics": [
"db/qa/#"
]
}
]
},
{
"name": "MySQL - Bridge",
"connection": {
"driver": "mysql",
"hostname": "mysql.cedalo.com",
"port": 3306,
"database": "mosquitto",
"credentials": {
"username": "mosquittouser",
"password": "password"
},
"debug": true
},
"options": {
"bufferSize": 10000,
"batchSize": 1000,
"timeoutMs": 4000,
"queueMaxSize": 10000,
"maxRetries": 15,
"retryDelayMs": 1000
},
"topicMappings": [
{
"name": "topic-mapping-to-mysql-table",
"target": "blood",
"schemaMapping": "schema-mapping-1",
"mqttTopics": [
"db/qa/#"
]
}
]
}
],
"schemaMappings": [
{
"name": "schema-mapping-1",
"mapping": [
{
"source": "[client_id]",
"target": "humanname"
},
{
"source": "[payload][data]",
"target": "dummychars"
},
{
"source": "[uuid]",
"target": "uuidcol"
},
{
"source": "[payload][tmp]",
"target": "floatnumbercol",
"type": "integer"
},
{
"source": "[datetime]",
"target": "dt"
},
{
"source": "[payload][tmp_int]",
"target": "floatnumbercol2",
"type": "number"
},
{
"source": "[payload][name]",
"target": "textcol",
"options": {
"replace": ["Tom ", "Thomas "]
}
},
{
"source": "some constant desc",
"target": "descript"
},
{
"source": "[payload][data][temp]",
"target": "temp"
},
{
"source": "[payload][data][more][ok]",
"target": "isok",
"type": "boolean"
},
{
"source": "[payload][tmp_json]",
"target": "jsn"
},
{
"source": "[payload][jsonarray]",
"target": "jsonarray"
},
{
"source": "[payload][packet_dt]",
"target": "dt2",
"type": "datetime",
"options": {
"format": "DD-MM-YYYY HH:mm:ss"
}
},
{
"source": "[payload]",
"target": "payloadcolumn",
},
{
"source": "[payload][data]",
"target": "sourcecolumn",
"options": {
"isConst": true
}
}
]
}
]
}

As per the example config above, SQL bridge will connect to two databases at postgres.cedalo.com:5432 and mysql.cedalo.com:3306. It will use a database called mosquitto and will authenticate against it as mosquittouser.

Note that in this example both database connections have passwords. However, some databases like CockroachDB if started in an insecure mode will not require password and so, in such cases, the password field may be left empty.

The optional options field is specified and overrides some defaults of the plugin's buffering. It sets bufferSize to 10000. This setting effectively assigns the specified buffer size for every buffer. Buffers are created per table to store MQTT message information for batching purposes and trigger fewer insert queries.

batchSize specifies the number of rows that should be inserted into the database per single batch insert. Effectively, it breaks a single batch insert into multiple smaller batch inserts of the specified size (1000 in this case).

timeoutMs is changed to 4000 milliseconds which means that every 4 seconds all the buffers that are not empty will be forecefully cleared. Clearing the buffers triggers batch inserts even if they have not yet reached full capacity.

See section about buffering for more information.

queueMaxSize sets the retry queue size to 10000 which is a limit after which messages will be dropped. maxRetries is set to 15, and retryDelayMs to 1000. This means that we will retry once every second for 15 iterations, getting a 15 seconds of retry for every message that wasn't inserted due to connection issue. This is quite long, so it's important to adapt queueMaxSize or set it to be limitless with a -1 value to ensure that the queue doesn't get overfilled while retrying to insert messages.

See section about retry queue for more information.

The configuration uses a mapping to table blood stored in schema public in the case of the Postgres database. Notice that MySQL doesn't have schema in it's topicMapings under options. That is because MySQL does not use database schemas.

Under the mqttTopics entry, there is an array of MQTT topics which should be mapped to the database table (blood). In case this array was empty, no messages would be forwarded to the database.

Multiple topic mappings can be specified in this array, but in the case above, there is just a single entry. However, this entry uses an MQTT wildcard (db/qa/#), specifying that any MQTT message that is published in any of the topics under db/qa/ will be unpacked and inserted into the database using selectors from the respective schema mapping specified in schemaMapping key ("schema-mapping-1" in our case).

Section schemaMapping defines an array of schema mappings that are used to unpack MQTT messages. name field of each element in the array is a unique identifier of the schema mapping matched against schameMapping field from topicMappings. Apart from name each schema mapping contains a mapping field with selectors. Each selector under the mapping field represents a source and a target for the mapping between MQTT message properties and database table column mapping. Each source is some property of the MQTT message (or a constant value) and each target is a column of the table blood.

Note that some sources specify metadata properties of the MQTT message like [client_id] or [uuid] (representing a client id of the publisher or a unique identifier generated for the message). Other properties unpack the payload of the MQTT message and extract certain properties from it. For example, [payload][packet_dt], which extracts the packet_dt from the MQTT payload, assuming that this payload is a JSON. Also note that this selector has a typecast to datetime datatype meaning that it tries to interpret the value in packet_dt as a date string according to the DD-MM-YYYY HH:mm:ss format specified under options.

Notice the typecasting on some other sources. For example, the source [payload][tmp] is mapped to a floatnumbercol column. This implicitly implies (judging by the name of the column) that the value of tmp could be a float. However, it is typecasted to integer, and therefore, the fractional part of the number will be sliced off before the insert into the table occurs.

Another example is a mapping between a deeply nested property ok in source [payload][data][more][ok] and the column isok. This mapping has a boolean typecasting, which means that even if ok property is not an actual boolean but a string literal with values of either "true" or "false", the casting will convert those to the proper boolean values true and false respectively.

There is also a [payload][name] property being unpacked and transformed. The transformation in this case is a replacement of all occurrences of the substring "Tom " to "Thomas ".

If required, the entire MQTT payload can be inserted into a column. This is shown in the mapping between [payload] and payloadcolumn. If the payload is JSON and the database supports JSON types (like PostgreSQL does, for example), then the payload will be inserted as JSON. Otherwise, if the database of choice does not support this type, then the plugin will try to insert the payload as a string, and the respective database column should have an appropriate type to accommodate it.

In cases when the payload of the MQTT message is not a JSON but a string or a number, it will be inserted as such.

An interesting but uncommon case is the mapping of source [payload][data] to the sourcecolumn. Note that, since isConst option is set to true, [payload][data] is being treated as a constant string rather than a data property of the MQTT message's payload. The isConst option is needed in this case because the constant string [payload][data] has the same syntax as unpacking MQTT payload. If this option was not specified, the plugin would try to extract the property from the payload leading to errors if it is not present.

However, if there is a need to insert a constant into the database and it does not look like a valid message unpacking syntax (e.g. doesn't use the square brackets notation), then there is no need to specify the isConst option. This can be seen in the following mapping:

                            {
"source": "some constant desc",
"target": "descript"
},

which will simply insert string "some constant desc" into descript message for every MQTT message.

Error handling

Any configuration or license errors will prevent the plugin from being loaded, and respective messages will appear in the logs.

If recoverable errors occur during the operation of the plugin, it will generate respective error messages in its logs and prefix them with an ERR: prefix. This includes: encountering errors when converting MQTT messages to SQL insert queries as well as failures during query execution.

Note that while error messages generated by the plugin will be preceded by ERR:, warning messages will get a WARN: prefix, while debug messages - a respective DEBUG:.

Debugging

To debug the plugin it is possible to set the debug variable in the configuration file under connection section to true. This will print all the SQL queries that are being performed against the database to the stdout.

Another option is specifying driverDebug (under the connection section as well), which will lead to an SQL driver-specific output. Note that some drivers do not produce such output at all. Also note that the records printed with driverDebug do not follow a typical Mosquitto log message structure. The logs generated when this option is enabled might also not be synchronized with Mosquitto logs leading to malformed log records in certain cases. Therefore, the use of this option beyond debug purposes is strongly not recommended.

Case sensitivity

Different databases handle database identifiers a bit differently. While DBMSs such as PostgreSQL fold identifier names to lowercase (unless they were enclosed with double quotes upon creation), other databases, such as OracleDB, tend to fold unquoted identifiers to uppercase by default.

The SQL bridge config file is case-sensitive to the database identifier names, which, coupled with different conventions for name folding, might create some confusion and errors, especially when reusing parts of the config or specifying different databases with identically named tables.

For instance, if there was a table products in OracleDB and a table with the same name in PostgreSQL, the following config:

{
"connections": [
{
"name": "PostgreSQL - Bridge",
"connection": {
"driver": "postgres",
"hostname": "postgres.cedalo.com",
"port": 5432,
"database": "mosquitto",
"credentials": {
"username": "test",
"password": "secret_password"
},
"debug": true
},
"topicMappings": [
{
"name": "topic-mapping-to-postgers-table",
"schemaMapping": "schema-mapping-1",
"target": "products",
"options": {
"uppercase": false,
"dbSchema": "test"
},
"mqttTopics": [
"db/qa"
]
}
]
},
{
"name": "OracleDB - Bridge",
"connection": {
"driver": "oracle",
"hostname": "oracle.cedalo.com",
"port": 1521,
"database": "FREEPDB1",
"credentials": {
"username": "test",
"password": "secret_password"
}
},
"topicMappings": [
{
"name": "topic-mapping-to-oracledb-table",
"schemaMapping": "schema-mapping-1",
"options": {
"dbSchema": "test"
},
"target": "products"
}
]
}
],
"schemaMapping": [
{
"name": "schema-mapping-1",
"mapping": [
{
"source": "[payload][product]",
"target": "product_name"
}
]
}
]
}

could potentially lead to errors. The reason is exactly the default name folding. When SQL bridge generates SQL queries, it encloses identifier names in double quotes, meaning that they become case-sensitive. If you specify the table products, schema test, and column product_name as above (notice the lowercase) to the Oracle database, it will search for those identifiers as if they were intentionally lowercased. The query would look something like this:

insert into "test"."products"."product_name" values("some_value");

If the products table was not created with a double-quoted lowercase identifier, the above query would not find the correct table and fail. In this case, a correct query should be:

insert into "TEST"."PRODUCTS"."PRODUCT_NAME" values("some_value");

To construct the correct query in such cases there are two options: either specifying the identifier names in the config file using uppercase characters (e.g. "target": "PRODUCTS", etc) for databases that fold to uppercase, or to specify a property uppercase in the entry of the mappings array and set it to true. E.g.:

{
...
"topicMappings": [
{
"name": "topic-mapping-to-oracledb-table",
"schemaMapping": "schema-mapping-1",
"target": "products",
"options": {
"dbSchema": "test",
"uppercase": true
},
...
},
...
]
...
}

And once again note that this example would not have been a problem for PostgreSQL and some other databases which fold identifiers to lowercase by default.

As to the rationale behind this documentation using mostly lowercase for database identifiers, it is because the majority of the supported DBMSs fold to lowercase.

An advantage of the case sensitivity of the database identifiers in the SQL bridge config files is that it allows to reference tables which were created as case sensitive. For example: producttable, PRODUCTTABLE, PrOdUcTtAbLe or ProductTable are 4 completely different identifiers that can be easily referenced with in the configuration file. For case sensitivity of the configuration file to work ensure that uppercase fields are set to false or removed from the config.

Buffering

Buffers are created per table for every database connection. The size of the buffer is controlled with bufferSize field under options ($.connections[].options) and indicates the capacity of the buffer. The default size of the buffer is 1000. Buffer holds records representing unpacked MQTT messages which are to be inserted to the respective table once the buffer is saturated.

Once the buffer fills up, its flushing is triggered, and a single or multiple batch inserts are issued against the table, inserting the buffer contents. The condition for buffer flush is bufferSize + 1. Once this number of records is reached, the buffer is released. The number of batch insert queries issued per buffer release is controlled via the batchSize config field. This field indicates how many records should be inserted per single batch insert. By default, batchSize equals bufferSize, meaning that the buffer is not broken down into batches and a single batch insert query is performed.

Another parameter timeoutMs - indicates the timeinterval (in milliseconds) after which all the non-empty buffers will be forcefully released triggering the batch inserts of the buffers' contents. The default value is 5000 ms.

However, even though it might seem that buffer is only released after saturation or a timeout, it also has a mechanism to account for infrequent MQTT messages and decrease the latency of database inserts for such records.

The idea is that the time interval between the current and the previous MQTT messages is kept track of. If this interval is long enough, the record will be inserted instantly. However, if the interval between the messages is short, the plugin will keep buffering the messages anticipating a possible influx. The definition of "long enough" might change when updates to the plugin are made, but currently, it's a fraction of the timeouts. For practical purposes it can be assumed as timeoutMs/2.

Note that the above leads to the first MQTT message never being buffered.

Also, notice that buffer options can be defined per database connection, allowing for more flexibility and fine-tuning when needed.

Retry queue

The SQL bridge plugin guarantees the in-order insertion of all the non-malformed messages into the respective tables. However, in case of network issues or database downtime, some messages may be lost. To prevent this, SQL bridge has a retry queue on a per-table basis that is responsible for retrying message inserts in case of connection problems. Notice that this does not include cases when the data of the MQTT message is malformed and generates an SQL error upon insert (due to inconsistent types, exceeded limits, or violated constraints, etc).

Each retry queue can be configured with the following settings (under options field of the configuration file ($.connections[].options)):

queueMaxSize option is used to set the maximum amount of messages that the queue is able to accomodate before it starts to drop them. This limit can be set to infinity using the value -1, which is a default. Note that if this value is not set to -1, then careful considerations and testing should be performed to ensure that the queue will not overfill too quickly as soon as it tries to retry insertings messages.

maxRetries is used to specify the maximum number that the retry queue attempts to insert a message again before dropping it.

retryDelayMs is a minimum amount of time in milliseconds that must pass between the retry queue attempts.

Note that the elements in a retry queue can be both single MQTT messages as well as entire buffers of MQTT messages that are ready to be batch-inserted into the database.

JSON Schema

Schema for all possible parameters for the sql-bridge.json config file:

{
"title": "SQL-Bridge Plugin Config",
"type": "object",
"properties": {
"version": {
"type": "string",
"nullable": true,
"default": "1",
"description": "Version of the configuration file",
},
"connections": {
"type": "array",
"description": "List of sub-configurations per DB connection",
"items": {
"type": "object",
"description": "Configuration per DB connection",
"properties": {
"name": {
"type": "string",
"nullable": false,
"description": "Unique textual identifier of the configuration"
},
"options": {
"type": "object",
"nullable": true,
"description": "Plugin options for configuring internal queues and buffers",
"properties": {
"bufferSize": {
"type": "integer",
"nullable": true,
"minimum": 1,
"description": "Maximum number of messages to buffer before flushing to the database"
},
"timeoutMs": {
"type": "integer",
"nullable": true,
"minimum": 0,
"description": "Maximum time in milliseconds to buffer messages before flushing to the database"
},
"batchSize": {
"type": "integer",
"nullable": true,
"minimum": 0,
"description": "Size of batch for bulk insert when releasing the buffer. Defaults to bufferlength"
},
"queueMaxSize": {
"type": "integer",
"nullable": true,
"minimum": -1,
"description": "Maximum number of messages in retry queue"
},
"maxRetries": {
"type": "integer",
"nullable": true,
"minimum": 0,
"description": "Maximum number of retries for failed inserts due to lost connection"
},
"retryDelayMs": {
"type": "integer",
"nullable": true,
"minimum": 0,
"description": "Delay in milliseconds before retrying inserts failed due to lost connection"
}
}
},
"connection": {
"type": "object",
"description": "DB connection specific configuration",
"properties": {
"driver": {
"type": "string",
"enum": ["mysql", "postgres", "timescaledb", "alloydb", "mariadb", "cockroachdb", "redshift", "oracle", "mssql"],
"nullable": false,
"description": "Name of the DB driver to use, e.g. \"mysql\", \"postgres\", \"mssql\" etc."
},
"version": {
"type": "string",
"nullable": true,
"description": "Used to specify DB name for old postgres databases mainly"
},
"hostname": {
"type": "string",
"nullable": true,
"description": "Hostname or IP of the DB server to connect to"
},
"port": {
"type": "integer",
"nullable": true,
"description": "Port of the DB server to connect to"
},
"database": {
"type": "string",
"nullable": true,
"description": "Name of the database to connect to"
},
"filename": {
"type": "string",
"nullable": true,
"description": "Path to dbfile in case of sqlite only. If sqlite is used, then port, host, credentials, and database fields are not used"
},
"credentials": {
"type": "object",
"description": "Credentials of the database user used for connection to the database",
"properties": {
"username": {
"type": "string",
"description": "Username of the database user to use for connection"
},
"password": {
"type": "string",
"description": "Password of the database user to use for connection"
}
},
"nullable": true,
"required": ["username", "password"]
},
"ssl": {
"type": "object",
"nullable": true,
"description": "SSL configuration for the connection",
"properties": {
"ca": {
"type": "string",
"nullable": true,
"description": "CA certificate file to use for the connection"
},
"key": {
"type": "string",
"nullable": true,
"description": "Private key file to use for the connection"
},
"cert": {
"type": "string",
"nullable": true,
"description": "Certificate file to use for the connection"
},
"caPath": {
"type": "string",
"nullable": true,
"description": "CA certificate path to use for the connection"
},
"keyPath": {
"type": "string",
"nullable": true,
"description": "Private key path to use for the connection"
},
"certPath": {
"type": "string",
"nullable": true,
"description": "Certificate path to use for the connection"
},
"rejectUnauthorized": {
"type": "boolean",
"nullable": true,
"description": "Whether to reject unauthorized connections or not"
}
}
},
"debug": {
"type": "boolean",
"nullable": true,
"description": "Whether to enable debug query logging for connection"
},
"driverDebug": {
"type": "boolean",
"nullable": true,
"description": "Whether to enable db-driver-level debug information for queries. Note that it does not use Mosquitto logger, so the formatting of the output is different. Also not all drivers support this option"
},
"lazyConnect": {
"type": "boolean",
"nullable": true,
"default": false,
"description": "Whether to allow the plugin to continue if the connection to the database cannot be established",
},
},
"nullable": false,
"required": ["driver"]
},
"topicMappings": {
"type": "array",
"description": "List of topic mapping objects to forward MQTT messages to the database",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"nullable": false,
"description": "Unique identifier of this mapping"
},
"schemaMapping": {
"type": "string",
"nullable": false,
"description": "Name of the schema mapping which should be applied here"
},
"target": {
"type": "string",
"description": "Database table to forward MQTT messages to"
},
"options": {
"type": "object",
"nullable": true,
"properties": {
"schema": {
"type": "string",
"nullable": true,
"description": "Name of the schema where the table is located in"
}
},
"description": "Optional options for the table"
},
"mqttTopics": {
"type": "array",
"items": {
"type": "string",
"description": "List of MQTT topics to forward messages from",
"pattern": "^((\\+?|[\\w-]+)((\\/[\\w-]+)?(\\/\\+)?(\\/[\\w-]+)?((?<!\\+)\\/)?)*(\\/#)?|(\\/)?#)$"
}
}
},
"required": ["name", "target", "mqttTopics"]
}
}
},
"required": ["name", "connection", "topicMappings"]
}
},
"schemaMappings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"nullable": false,
"description": "Unique identifier of the mapping"
},
"mapping": {
"type": "array",
"description": "List of mappings to apply to the table. Each schema contains a mapping of database table columns to MQTT message properties",
"items": {
"type": "object",
"description": "Properties to select from the MQTT message and insert into the database table",
"properties": {
"source": {
"oneOf": [
{ "type": "string" },
{ "type": "number" },
{ "type": "null", "nullable": true }
],
"description": "JSON path to the value to insert into the column using square brackets, e.g. \"[foo][bar]\" AND NOT \"$.foo.bar\""
},
"target": {
"type": "string",
"description": "Name of the column to insert data from \"source\" into"
},
"type": {
"type": "string",
"nullable": true,
"description": "Data type of the value to cast source to before inserting into the column"
},
"options": {
"type": "object",
"nullable": true,
"description": "Optional options for the column"
}
},
"required": ["source", "target"]
}
}
},
"required": ["name", "mapping"]
}
}
},
"required": ["connections", "schemaMappings"]
}