Skip to main content
Version: Next

MongoDB Bridge

Premium

The MongoDB-Bridge plugin can be used to insert data published to the Mosquitto broker into a MongoDB database. The plugin can handle multiple MongoDB client connections. Each connection applies changes to a configured database. Currently, the integrated client supports only basic authentication (username, password). Topic mappings are used to specify which MQTT topic payloads should be inserted into which collection. Each insert can contain the fields payload, jsonPayload, topic, qos, client_id, hostname, retain, timestamp of the received MQTT message. By default, all these fields are included in the database insert command. If needed, Schema mappings are used to filter or rename these fields. The queueSize can be configured per MongoDB client, to specify a limit of not-yet-processed MQTT messages, before the plugin starts to drop them. More information in the Json Schema section.

Plugin Activation

To load and enable the plugin into the broker, the "mosquitto.conf" must be extended by:

plugin /usr/lib/cedalo_mongodb_bridge.so
plugin_opt_config_file mongodb-bridge.json

persistence_location /mosquitto/data

To make use of the hostname parameter make sure to set the environment variable in the docker-compose-yml called hostname as wished.

Config File Format

The config file contains a JSON of the following structure. Available log fields and BSON types used by the default schema or a custom schema are:

  • hostname: The hostname of broker node (type: string). The hostname field is set to the value of the HOSTNAME environment variable, if not set to Unknown .
  • topic: The MQTT topic of the message (type: string).
  • payload: The payload of the message (type: string).
  • jsonPayload: The payload converted to a BSON object (type: object, not in default schema).
  • qos: The quality of service level of the message (type: int, minimum: 0, maximum: 2).
  • retain: Whether or not the message is a retained message (type: bool, true or false).
  • timestamp: The timestamp of when the message was received by the Mosquitto broker (type: date in UTC).
  • client_id: The ID of the MQTT client that published the message (type: string)

An example for the plugin configuration (mongodb-bridge.json) is:

[
{
"name": "connection-to-db1",
"mongodb": {
"hostname": "mongodb",
"port": 27017,
"database": "db1",
"credentials": {
"username": "user1",
"password": "secret123"
},
"queueSize": 100000,
"reconnectMinDelay": 5,
"reconnectMaxDelay": 25000
},
"schemaMappings": [
{
"name": "reduced-mapping",
"schema": {
"data": "payload",
"nodeId": "hostname"
}
}
],
"topicMappings": [
{
"name": "topic-mapping",
"collection": "sensorData",
"schema": "reduced-mapping",
"topics": ["sensor_data/#"]
}
]
}
]

With this example the plugin will create a single client to connect to a MongoDB instance with the URI: mongodb://user1:secret123@mongodb:27017.

All data received on sensor_data/# topics will be published to the collection sensorData of the database db1. This is configured using topic mappings, which define the MQTT topics inserted to MongoDB. Without a topic mapping, no messages will be written to MongoDB. Each topic mapping defines a list of MQTT topic filters, and the MongoDB collection where matching messages will be written.

A topic mapping can also reference a custom schema, or use the default schema. In the example above the data is reduced to:

  • payload of the MQTT message stored in a data column of the collection
  • hostname stored into the nodeId column of the collection

Instead of using the default schema mapping containing all information of the default schema.

info

This is an example configuration snippet, which applies to the docker container setup. For installation not running in a container the above configuration needs to be adjusted accordingly.

plugin_opt_config_file must be a file name, not a path. persistence_location is used as the search path for the plugins' config file.

MongoDB Atlas Bridge

In case the plugin should connect to a MongoDB Atlas cluster or custom options should be used the original connection string can be provided as follows:

[
{
"name": "connection-to-db-cluster",
"mongodb": {
"connectionURI": "mongodb+srv://user1:secret123@mongodb/db_name",
"queueSize": 100000
},
"schemaMappings": [
{
"name": "reduced-mapping",
"schema": {
"data": "payload",
"nodeId": "hostname"
}
}
],
"topicMappings": [
{
"name": "topic-mapping",
"collection": "sensorData",
"schema": "reduced-mapping",
"topics": ["sensor_data/#"]
}
]
}
]

In this example the plugin takes the connection string as a whole. The plugin will automatically resolve the host from the connection string provided as the connectionURI parameter and parse the parameters hostname, port, database, credentials. Those can therefore be omitted in the config. The password in the connection string always has to be URL encoded (percent-encoded). Additionally, standard MongoDB connection strings can also be provided as in the previous snippet.

JSON Schema

Overview over all possible parameter for the mongodb-bridge.json:

{
"type": "array",
"description": "List of sub-configurations per MongoDB connection/database.",
"items": {
"type": "object",
"description": "Sub-configurations per MongoDB connection/database.",
"properties": {
"name": {
"type": "string",
"description": "Textual identifier of this configuration."
},
"mongodb": {
"type": "object",
"description": "MongoDB server specific configurations.",
"properties": {
"connectionURI": {
"type": "string",
"description": "MongoDB connection string that contains protocol, hostname, port, database name, optional credentials and options."
},
"hostname": {
"type": "string",
"description": "Hostname or IP address of the MongoDB server."
},
"port": {
"type": "integer",
"description": "Port the MongoDB server is listening on."
},
"database": {
"type": "string",
"description": "Name of the database, the data should be inserted to."
},
"credentials": {
"type": "object",
"description": "Basic authentication configuration.",
"properties": {
"username": {
"type": "string"
},
"password": {
"type": "string"
}
}
},
"queueSize": {
"type": "integer",
"minimum": 1,
"description": "Specifies the limit of not-yet-processed/inserted MQTT messages, before the plugin starts to drop them."
},
"retryInsertMinDelay": {
"type": "integer",
"minimum": 1,
"description": "Initial delay in milliseconds before the plugin tries to insert a message again, once the server returned an error or was not available. The increment follows a pattern of double growth, where each increase is doubled compared to the previous increment. If not specified, the plugin won't try to resend messages again after the first try."
},
"retryInsertMaxDelay": {
"type": "integer",
"minimum": 1,
"description": "Maximum delay in milliseconds before the plugin tries to insert a message again, once the server returned an error or was not available. If not specified, the plugin won't try to resend messages again after the first try."
}
},
"oneOf": [
{
"required": ["hostname", "port", "database", "queueSize"]
},
{
"required": ["connectionURI", "queueSize"]
}
]
},
"schemaMappings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Textual identifier of this schema mapping."
},
"schema": {
"type": "object",
"description": "Mapping where <key> is the target column name and <value> is the MQTT message information field.",
"patternProperties": {
".*": {
"type": "string",
"enum": ["hostname", "payload", "jsonPayload", "topic", "qos", "client_id", "retain", "timestamp"]
}
},
"additionalProperties": false
}
},
"required": ["name", "schema"]
}
},
"topicMappings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Textual identifier of this topic mapping."
},
"collection": {
"type": "string",
"description": "Name of the collection, the MQTT data should inserted to."
},
"schema": {
"type": "string",
"description": "Name of a schema mapping, which should be applied to this topic mapping. If not specified the default schema mapping including all fields will be used."
},
"topics": {
"type": "array",
"description": "List of topic filters the plugin forwards messages from.",
"items": {
"type": "string"
}
}
},
"required": ["name", "collection", "topics"]
}
}
},
"required": ["name", "mongodb", "schemaMappings", "topicMappings"]
}
}