Databricks Bridge
This plugin is configurable via the Cedalo MQTT platform UI, which simplifies the process of updating config values without the need to restart the broker. Additionally, config updates are synchronized accross cluster nodes, reducing the administrative overhead in clustered environments. Find more information about this here.
Introduction
The Databricks Bridge plugin can be used to forward data published to the Mosquitto broker to the Databricks compute resources (clusters or SQL warehouses). The plugin can handle multiple Databricks connections. For each connection, a mapping between certain MQTT topics and SQL-table columns can be defined (see example configuration).
Databricks Bridge supports both personal access token authentication and OAuth machine-to-machine (M2M) authentication.
The plugin manages incoming MQTT messages in a buffer to perform batching and improve performance when writing to the Databricks resources. The size properties of the buffer can be controlled via the configuration in the options section, with bufferSize and timeoutMs (refer to config format).
Plugin activation
To enable the Databricks Bridge plugin in the broker, add the following to the mosquitto.conf file:
plugin /usr/lib/cedalo_databricks_bridge.so
persistence_location /mosquitto/data
The example configuration snippet above applies to the docker container setup. For installations that do not run in a container, the above configuration needs to be adjusted accordingly (namely the location of the cedalo_databricks_bridge.so dynamic library and persistence_location may differ. Refer to the respective documentation pages for different installation, e.g. RPM-package).
persistence_location is used as the search path for the plugin's config file in the context of a single node broker. The file is be called Databricks-Bridge.json. To apply configuration changes, you can make changes to this file and restart the broker but it's recommended to use Cedalo Platform UI or a respective control topic instead (see the section below). In the context of an HA cluster, configuration is fully managed by the Cedalo Platform and the respective control topic: modification via changes to the file on the filesystem is not supported.
In addition to modifying mosquitto.conf, ensure that you have the appropriate license to use the plugin.
Control topic
The plugin can be controlled using the $CONTROL/cedalo/databricks-bridge/v1 control topic. The matching response topic is $CONTROL/cedalo/databricks-bridge/v1/response.
The commands that are currently supported are:
updateConfig- updates the bridge's configuration.getConfig- fetches the configuration currently used by the bridge.getStatus- fetches status information of the bridge.getSchema- fetches schema information about the JSON schema of the configuration (as defined by IETF standard).
The examples of the control request are presented below, however, more detailed information including response formats can be found in the MQTT API documentation.
To update bridge's configuration a message in the following format should be sent to the control topic:
{
    "commands": [
        {
            "command": "updateConfig",
            "correlationData": "1234",
            "configChange": <Config JSON object>
        }
    ]
}
For example:
{
    "commands": [
        {
            "command": "updateConfig",
            "correlationData": "1234",
            "configChange": {
                "version": 1,
                "connections": [
                    {
                        "name": "databricks-connection-1",
                        "connection": {
                            "hostname": "databricks.test.com",
                            "path": "/warehouse",
                            "credentials": {
                                "token": "*****"
                            },
                            "catalog": "target_catalog"
                        },
                        "options": {
                            "bufferSize": 10000,
                            "batchSize": 1000,
                            "timeoutMs": 4000,
                            "queueMaxSize": 10000,
                            "maxRetries": 15,
                            "retryDelayMs": 1000
                        },
                        "topicMappings": [
                            {
                                "name": "topic-mapping-to-databricks-1",
                                "schemaMapping": "schema-mapping-1",
                                "target": "target_table",
                                "options": {
                                    "schema": "target_schema"
                                },
                                "mqttTopics": ["db/qa/#", "db2/dev/databricks"]
                            }
                        ]
                    }
                ],
                "schemaMappings": [
                    {
                        "name": "schema-mapping-1",
                        "mapping": [
                            {
                                "source": "[topic]",
                                "target": "topic_in"
                            },
                            {
                                "source": "[payload][label]",
                                "target": "label"
                            },
                            {
                                "source": "plant1",
                                "target": "building_id",
                                "options": { "isConst": true }
                            },
                            {
                                "source": "[client_id]",
                                "target": "client_id"
                            },
                            {
                                "source": "[payload][temperature][avg]",
                                "target": "temperature",
                                "type": "number"
                            },
                            {
                                "source": "[timestamp]",
                                "target": "dt"
                            }
                        ]
                    }
                ]
            }
        }
    ]
}
Remember the correlation data should be a unique random string. This is used to correlate requests to the matching responses arriving at the response topic. If you are performing a few manual requests "by hand" you may omit it. However, for broker management applications and scripts it is highly recommended to make use of this field to be able to properly identify and map responses to the respective requests later.
Example of the getConfig command:
{
    "commands": [
        {
            "command": "getConfig",
            "correlationData": "1234"
        }
    ]
}
Example of getStatus:
{
    "commands": [
        {
            "command": "getStatus",
            "correlationData": "1234"
        }
    ]
}
This command will return data about the availability of each connection, the number of buffers and queues, all the dropped as well as currently queued messages. As the plugin creates a queue and buffer per target Databricks table, the information about queues and buffers will be presented as an array of numbers with each element corresponding to a separate queue/buffer.
Example of getSchema:
{
    "commands": [
        {
            "command": "getSchema",
            "correlationData": "1234"
        }
    ]
}
Config format
The configuration of the plugin is a JSON object consisting of an array of connection objects as well as an array of so-called "schema mappings". Each connection object defines its own Databricks connection and MQTT topics to listen to. It also references a mapping of the MQTT message properties to the Databricks table columns. Such mappings are called schema mappings and are described in a different array within the same file under the schemaMappings field. This is to allow multiple Databricks connections to reuse the same mappings and avoid repetitiveness.
The fields of the configuration object are described below. To see an example of the config file refer to the configuration example section. Refer to the configuration schema
The following fields of the config are mandatory:
connections(path:$.connections):
An array of Databricks connection objects. Apart from database connection settings, each connection object also contains MQTT topic mapping information (type:array).name(path:$.connections[].name):
Unique identifier of the Databricks Bridge connection object. A convenience name for the database connection used in the plugin's logs (type:string).connection(path:$.connections[].connection):
Object holding the actual database connection information (type:object).path(path:$.connections[].connection.driver):
HTTP Path of your Databricks cluster or SQL warehouse (type:string).hostname(path:$.connections[].connection.hostname):
Server Hostname of your Databricks cluster or SQL warehouse (type:string).catalog(path:$.connections[].connection.database):
Name of the Databricks catalog to connect to (type:string).credentials(path:$.connections[].connection.credentials):
Object holding authentication information. (type:object)- For OAuth machine-to-machine (M2M) authentication (recommended):
clientId(path:$.connections[].connection.credentials.clientId):
Service principal's UUID or Application ID. See Databricks docs (type:string)clientSecret(path:$.connections[].connection.credentials.clientSecret):
Secret value for the service principal's OAuth secret. See Databricks docs (type:string)
 - For personal-access token authentication:
token(path:$.connections[].connection.credentials.token):
Databricks personal access token. See Databricks docs (type:string)
 
- For OAuth machine-to-machine (M2M) authentication (recommended):
 
topicMappings(path:$.connections[].topicMappings):
List of mappings specifying a mapping between MQTT topics and Databricks tables to forward data to (type:arrayof typeobject).name(path:$.connections[].topicMappings.name):
Unique identifier of the current topic mapping (type:string).target(path:$.connections[].topicMappings.target):
Name of the target table in Databricks into which MQTT data is going to be inserted (type:string).mqttTopics(path:$.connections[].topicMappings.mqttTopics):
An array of MQTT topics that should be forwarded to Databricks. MQTT wildcards (+and#) can also be used (type:arrayof typestring).schemaMapping(path:$.connections[].topicMappings.schemaMapping):
Name of a particular schema mapping from$.schemaMappingsarray to be used as a mapping betweentargetmeasurement andmqttTopics.
schemaMappings(path:$.schemaMappings):
An array of schema mappings. Each element of the array specifies a mapping between MQTT message properties and Databricks table columns (type:arrayof typeobject).
Optional fields:
connections(path:$.connections).connection(path:$.connections[].connection).credentials(path:$.connections[].connection.credentials):
Object holding authentication information. (type:object).azureTenantId(path:$.connections[].connection.credentials): Provide custom Azure tenant ID (type:string).
debug(path:$.connections[].connection.debug):
Whether to log debug information about Databricks connection (type:bool, defaults to:false).driverDebug(path:$.connections[].connection.driverDebug):
Whether to log Databricks' driver-specific debug information. Notice that driver-specific logs don't follow the normal msosquitto log format, e.g. they do not have timestamp, plugin, and connection name in front of each record. By default only error messages comming from teh Databricks driver itself will be logged (type:bool, defaults to:false).lazyConnect(path:$.connections[].connection.lazyConnect):
Whether to allow the plugin to continue if the initial connection to the database cannot be established and instead try to connect again right before inserting data records (type:bool, defaults to:false).poolOptions(path:$.connections[].connection.poolOptions):
Options to configure internal connection pool used to open sessions to the Databricks (type:object).maxSize(path:$.connections[].connection.poolOptions.maxSize):
Max pool size. Must be greater than minSize (type:integer, defaults to:10).minSize(path:$.connections[].connection.poolOptions.minSize):
Min pool size, can be 0 if you want to free all sessions when by an eviction routine after they are considered idle. Must be smaller than maxSize (type:integer, defaults to:0).testOnBorrow(path:$.connections[].connection.poolOptions.testInBorrow):
Whether to run empty select queries to validate session health right after the session acquisition (type:boolean, defaults to:true).acquireTimeoutMs(path:$.connections[].connection.poolOptions.acquireTimeoutMs):
Timeout for sessions acquisition in milliseconds (type:integer, defaults to:10000).idleTimeoutMs(path:$.connections[].connection.poolOptions.idleTimeoutMs):
Interval in milliseconds after which a session is considered idle (type:integer, defaults to:15000).evictionRunIntervalMs(path:$.connections[].connection.poolOptions.evictionRunIntervalMs):
Interval in milliseconds after which evictor routine is executed. Evictor routine closes idle sessions (type:integer, defaults to:20000).
topicMappings(path:$.connections[].topicMappings):
List of mappings specifying a mapping between MQTT topics and Databricks tables to forward data to (type:arrayof typeobject).options(path:$.connections[].topicMappings.options):
Additional options for the mqtt-topic-to-target-table mapping (type:object).schema(path:$.connections[].topicMappings.options.schema):
Schema where the target Databricks table is located. If not specified,defaultschema will be used (type:string).
options(path:$.connections[].options):
Object holding additional plugin settings (type:object).bufferSize(path:$.connections[].options.bufferSize):
Defines the size of the buffer, which is used to batch multiple MQTT messages together before inserting them into Databricks. For more information, see the section on buffering (type:integer, defaults to:1000).timeoutMs(path:$.connections[].options.timeoutMs):
Interval in milliseconds after which buffer is forcefully released if it is not empty. In other words, it specifies a maximum latency of the buffer release (type:integer, defaults to:5000).queueMaxSize(path:$.connections[].options.queueMaxSize):
Defines the max size of a retry queue used in case a database insert fails due to connection problems. If set to-1, the size of the queue is not limited. For more information see the section on retry queue (type:integer, defaults to:-1).maxRetries(path:$.connections[].options.maxRetries):
Defines a number of retries to be performed by a retry queue before dropping the message. For more information, see the section on retry queue (type:integer, defaults to:10).retryDelayMs(path:$.connections[].options.retryDelayMs):
Defines a delay in milliseconds between the retry attempts of the queue. For more information, see the section on retry queue (type:integer, defaults to:1000).
schemaMappings(path:$.schemaMappings):
An array of schema mappings. Each element of the array specifies a mapping between MQTT message properties and Databricks fields and tags (type:arrayof typeobject).name(path:$.schemaMappings[].name):
Unique identifier of the current schema mapping. This name is matched against$.connections[].topicMappings.schemaMappingto determine which schema mapping to use for particular measurement (type:string).mapping(path:$.schemaMappings[].mapping):
An array holding source-to-target mapper object with information about which properties or metadata to pick out from your MQTT messages as well as which Databricks fields or tags those properties should be inserted into (type:arrayof typeobject).source(path:$.schemaMappings[].mapping[].source):
Source of the data to be inserted. This can be a property of the MQTT message payload (in case the payload is a JSON object) or some metadata property (e.g. client_id, timestamp, etc). This field follows a particular unpacking syntax. For more information, see below (type:string).target(path:$.schemaMappings[].mapping[].target):
A column of the Databrick's target table into which data from thesourcefield will be inserted (type:string).type(path:$.schemaMappings[].mapping[].type):
Optional type casting of thesourcefield before it gets inserted into Databricks. See more information about typecasting here (type:string, one of:datetime,variant,number,integer,boolean,string,raw).options(path:$.schemaMappings[].mapping[].options):
Object holding information about additional transformations to be performed onsourcebefore it is inserted into Databricks.isConst(path:$.schemaMappings[].mapping[].options.isConst):
Used to indicate that the field insourceis a constant that should be inserted into Databricks as is without trying to unpack it. This is relevant in case thesourcefield is a string that follows the unpacking syntax but should be treated as a constant. Otherwise (i.e. for constants that don't follow the unpacking syntax), this field is not required and can therefore be omitted or only added for clarity and additional context (type:bool).replace(path:$.schemaMappings[].mapping[].options.replace):
Specifies an array of two strings. Replaces all occurrences of the first string with the second one before inserting the data into the database (type:arrayconsisting of two elements of typestring).truncate(path:$.schemaMappings[].mapping[].options.truncate):
Specifies the length to which thesourceis to be reduced (type:integer).replaceNullWith(path:$.schemaMappings[].mapping[].options.replaceNullWith):
Specifies a value that null values in thesourcewill be replaced with (type:any).replaceUndefinedWith(path:$.schemaMappings[].mapping[].options.replaceUndefinedWith):
In case that a property specified insourceis missing on the MQTT message, it will be replaced with the value specified in this option (type:any).nullValue(path:$.schemaMappings[].mapping[].options.nullValue):
If the value specified in this field is encountered insource, it will be replaced withnullbefore inserting into Databricks (type:any).defaultValue(path:$.schemaMappings[].mapping[].options.defaultValue):
If the value specified in this field is encountered insource, it will be replaced with the default specified on the target table when inserting into Databricks (type:any).format(path:$.schemaMappings[].mapping[].options.format):
Used when casting into datetime datatype (i.e. strings representing date and time). Specifies a datetime format string that values from source are formatted as. See format string documentation for more information on available format strings. Defaults toyyyy-MM-ddTHH:mm:ss.sssZ(with an optional timezone information). However, by default if an integer value representing a unix timestamp in milliseconds is encountered in source, it will also be accepted and appropriately converted to the datetime datatype (type:string).dbFormat(path:$.schemaMappings[].mapping[].options.dbFormat):
Used when casting into datetime datatype. Specifies a format string that datetime values should be converted to before they are inserted into the table (type:string).encoding(path:$.schemaMAppings[].mapping[].options.encoding):
Used when casting intorawdatatype (i.e.typeneeds to be set toraw). Specifies encoding of thesourcestring. Notice that sending binary data to the database is currently only supported via encoded strings (type:string, one of:hex,base64,base64url,utf8,utf16le,ascii,latin1,ucs2).replaceDelimiter(path:$.schemaMappings[].mapping[].options.replaceDelimiter):
Used when casting into datetime datatype. The format of the datetime values inserted into the database will typically be an ISO format as in the following example: 2017-01-29T14:07:00. Databricks can already understand and parse this format but if you would like to change the delimiter to something else like a space, you can still do it with this option. For more complex changes use thedbFormatfield (type:string).splitBy(path:$.schemaMappings[].mapping[].options.splitBy):
Used to split a string fromsourceinto an array by a certain delimiter. Can be combined withjoinBy,fetchByIndex, orfetchByRangeor used on its own (type:string)joinBy(path:$.schemaMappings[].mapping[].options.joinBy):
Used to merge an array fromsourceinto a string inserting the specified delimiter between each pair of array's elements (type:string)fetchByIndex(path:$.schemaMappings[].mapping[].options.fetchByIndex):
Used to fetch an element of an array fromsource, which is specified by a (zero-based) index (type:integer)fetchByRange(path:$.schemaMappings[].mapping[].options.fetchByRange):
Used to fetch a range of elements from the array taken fromsource. Can be combined withsplitByorjoinBy. When used withjoinBy, the specified array slice will be joined using the character fromjoinBy. When used withsplitBya string fromsourceis first split into an array and then the specified range is extracted from it. Then, ifjoinByis not set, the delimiter fromsplitBywill be used to join the extracted array back into a string (type:arrayconsisting of two elements of type:integer)
Mapping MQTT packets to Databricks
The plugin configuration specifies the mapping between MQTT packets and columns of Databricks SQL-tables in the schemaMappings field. Elements of the schemaMappings array must have unique names that will be matched against the schemaMapping field of the respective topicMappings in the connection object.
mapping specifies an array of extractor objects (you may also see them being referred to as selectors in other documentation on bridges) between sources and targets. The source field represents the source of the data. It comes from the MQTT message or its metadata. The target field is the name of the column that source will be inserted into.
The source field can be a constant, a path to a particular metadata property of an MQTT message, or a data property within its payload. The latter is relevant for JSON MQTT payloads: in case the MQTT payload is a JSON, the path to some data property of such payload can be specified as a source.
Below are some examples with clarifications to put the above-mentioned into context:
It is possible to unpack MQTT payload in case it is a JSON and extract (pick out) certain properties from it. This can be achieved by specifying a JSON path to such property in the source field. However, the properties in this path should be separated not by dots, but rather by square brackets (e.g.: not payload.property1.innerProperty but [payload][property1][innerProperty]).
Note that currently the only supported characters for the JSON payload properties are
[a-zA-Z0-9_-](alphanumeric characters as well as_(underscore) and-(dash))
Example:
{
    ...
    "name": "mapping1",
    "mapping": [
        {
            "source": "[payload][temperature]",
            "target": "temp"
        }
    ],
    ...
}
In the above example, the temperature property from the payload is selected to be inserted into the column called temp of the Databricks' target table.
This means that if the MQTT payload looks like below:
{
    "temperature": 21.5,
    "humidity": 45,
    "status": "normal"
}
then the value 21.5 will be inserted into the temp column.
Another example:
{
    ...
    "name": "mapping1",
    "mapping": [
        {
            "source": "[payload][sensor][id]",
            "target": "sensor_id"
        }
    ],
    ...
}
The extractor object above expects an MQTT message payload to be in JSON format. It extracts a property id from the sensor object in this payload and inserts (maps) it into sensor_id column in the target table.
Apart from extracting data from payloads it's also possible to insert constant values. Here is another example which maps a constant value instead of the one taken from an MQTT message:
{
    ...
    "name": "mapping1",
    "mapping": [
        {
            "source": "sensor-reading",
            "target": "sensor_type",
            "options": {
                "isConst": true
            }
        }
    ],
    ...
}
This extractor maps a constant string ("sensor-reading") to the sensorType column of the target table. This means that for every MQTT message for which this mapping is invoked, "sensor-reading" value will be inserted into the sensor_type column.
To insert the entire MQTT payload (useful when the payload is a string), the extractor would look similar to the following:
{
    ...
    "name": "mapping1",
    "mapping": [
        {
            "source": "[payload]",
            "target": "status",
        }
    ],
    ...
}
Note that apart from [payload], various special metadata properties of the MQTT message may be selected. Here is the entire list of special (meta)data properties:
hostname: Hostname of the broker. Note that theHOSTNAMEenvironment variable must be specified for the broker, or the default value will be used (type:string, defaults to:<Unknown>).topic: Name of the topic to which the MQTT message was published (type:string).payload: Payload of the MQTT message. Can be used to further unpack (pick out) child properties of the payload object if the latter is a JSON. See below for more information. If the payload was originally a string or a number, it will be treated as such, and no typecasting is necessary (type:any).qos: QoS level of the MQTT message (type:integer, one of:0,1,2).retain- Retain flag of the MQTT message (type:bool).timestamp- Unix timestamp representing the time when the MQTT message arrived at the broker (type:integer).datetime- Datetime string representing the time when the MQTT message arrived at the broker (type:stringrepresenting the date in ISO format).uuid- unique identifier (UUID) of the message. Note that it is a convenience property. This ID is internally generated by the Databricks Bridge plugin and is not coming from the original message's metadata (type:string).client_id- client_id of the device that published the MQTT message (type:string).
To select one of the above-mentioned properties, it must be enclosed in square brackets ([]) and specified in the source field of the configuration file.
For example, to use the client ID of the device that published the message, the following selector can be defined:
{
    ...
    "name": "mapping1",
    "mapping": [
        {
            "source": "[client_id]",
            "target": "deviceId",
        }
    ],
    ...
}
Data type casting
Databricks Bridge provides a feature to typecast properties of the MQTT messages before they are inserted into the respective table columns.
In order to cast a certain property to a certain type, the following configuration must be specified:
{
    ...
    "name": "<your mapping name here>",
    "mapping": [
        {
            "source": "<your source mqtt message property>",
            "target": "<your target databricks table column name>",
            "type": "<specify type to cast 'source' into>"
        }
    ],
    ...
}
For example:
{
    ...
    "name": "mapping1",
    "mapping": [
        {
            "source": "[payload][temperature_str]",
            "target": "temperature",
            "type": "float"
        }
    ],
    ...
}
In Databricks bridge the following typecasting is provided and makes sense:
- For fields:
datetime: casts integers or strings into a datetime (format,dbFormatandreplaceDelimitermight need to be specified. See config options section for details). There is no need to typecast datetime strings if they follow ISO 8601 format (note that the timezone portion is not required here since many databases don't handle timezones).number: casts strings into either a float or integer (supports float numbers separated by both dots or commas (e.g. both1.5and1,5are handled)).integer: casts string into an integer truncating the fractional part.boolean: casts strings or numbers into a boolean value (note that string"false"is cast to thefalseboolean value, as is number0,null, empty string, or a missing property. Everything else is cast totrue).string: can cast numbers into strings. There is typically no need to typecast JSON objects to strings as this is done automatically.raw: facilitates inserting binary data into Databricks as encoded stringsvariant: facilitates support for Databrick's variant datatype
 
Note that while it might be tempting to explicitly specify typecasting for all the
sources, this is not recommended as typecasting is often associated with additional overhead. The rule of thumb would be to cast only what is necessary.
Variant datatype support
Databricks bridge supports inserting sources of VARIANT datatype, which is able to accomodate complex datatypes such as arrays or JSON objects as well primitives such as numbers or strings. To safely insert a record into a VARIANT column, set variant as the value in the type field of the respective extractor. For example:
{
    ...
    "name": "mapping1",
    "mapping": [
        {
            "source": "[payload][my_object]",
            "target": "my_object",
            "type": "variant"
        }
    ],
    ...
}
Extractor options
Each extractor under the mapping section of the configuration file can have additional options that define transformations to be performed on the source data before it is inserted into target. For example:
{
    ...
    "name": "mapping1",
    "mapping": [
        {
            "source": "[payload][property1]",
            "target": "column_a",
            "options": {
                "replaceNullWith": "<null>"
            }
        }
    ],
    ...
}
The above specifies an extractor that maps property1 of the MQTT message to the column_a in the target Databricks table. Once the null value is encountered in the property1 property of the MQTT message's payload, it will be converted to "<null>" string before being inserted into the database.
Note that some options can only be used with certain (or when typecasting to certain) datatypes, while others can be used with any type.
Options which can only be used when typecasting to datetime:
formatdbFormatreplaceDelimiter
Example:
{
    ...
    "name": "mapping1",
    "mapping": [
        {
            "source": "[payload][property1]",
            "target": "column_a",
            "type": "datetime",
            "options": {
                "nullValue": "<null>",
                "format": "yyyy-MM-dd HH:mm:ss",
                "dbFormat": "dd-MMM-yy hh:mm:ss",
                "replaceDelimiter": " "
            }
        }
    ],
    ...
}
Options which can be used as long as the underlying value is a string:
replacetruncatesplitBy
Example:
{
    ...
    "name": "mapping1",
    "mapping": [
        {
            "source": "[payload][property1]",
            "target": "column_a",
            "options": {
                "replace": ["hello world", "hello SQL"]
            }
        }
    ],
    ...
}
Options which can only be used with arrays of after the source was split to an array using the splitBy option:
joinByfetchByIndexfetchByRange
Example:
{
    ...
    "name": "mapping1",
    "mapping": [
        {
            "source": "[payload][property1]",
            "target": "column_a",
            "options": {
                "splitBy": "/", "fetchByRange": [0, 3]
            }
        }
    ],
    ...
}
Options which can be used regardless of types and typecasting:
replaceNullWithreplaceUndefinedWithisConstnullValuedefaultValue
See configuration options for more information on the listed options (their descriptions, datatypes, allowed values).
Configuration example
An example of the configuration with two database connections is shown below:
{
    "connections": [
        {
            "name": "conn1",
            "connection": {
                "hostname": "abc.cloud.databricks.example",
                "httpPath": "/sql/1.0/warehouses/123",
                "catalog": "my_catalog",
                "credentials": {
                    "token": "<myToken>"
                },
                "debug": true,
                "driverDebug": false
            },
            "options": {
                "bufferSize": 5000,
                "timeoutMs": 3000,
                "queueMaxSize": 10000,
                "maxRetries": 15,
                "retryDelayMs": 1000
            },
            "topicMappings": [
                {
                    "name": "map1",
                    "schemaMapping": "schema1",
                    "target": "node_data",
                    "mqttTopics": ["namespace/+/node/#"],
                    "options": { "schema": "test" }
                }
            ]
        },
        {
            "name": "conn2",
            "connection": {
                "hostname": "cba.cloud.databricks.example",
                "httpPath": "/sql/1.0/warehouses/321",
                "catalog": "my_catalog",
                "credentials": {
                    "clientId": "<myClientId>",
                    "clientSecret": "<mySecret>"
                }
            },
            "options": {
                "bufferSize": 1000,
                "timeoutMs": 4000,
                "queueMaxSize": 5000,
                "maxRetries": 10,
                "retryDelayMs": 2000
            },
            "topicMappings": [
                {
                    "name": "map1",
                    "schemaMapping": "schema1",
                    "target": "cluster_data",
                    "mqttTopics": ["namespace/+/cluster/#"],
                    "options": { "schema": "test" }
                }
            ]
        }
    ],
    "schemaMappings": [
        {
            "name": "schema1",
            "mapping": [
                { "source": "[timestamp]", "target": "insert_dt" },
                { "source": "[client_id]", "target": "device_id" },
                {
                    "source": "[payload][dt]",
                    "type": "datetime",
                    "options": {
                        "format": "DD-MM-YYYY HH:mm:ss"
                    },
                    "target": "dt"
                },
                {
                    "source": "[topic]",
                    "target": "id",
                    "options": { "splitBy": "/", "fetchByIndex": 1 }
                },
                { "source": "[payload][label]", "target": "label" },
                {
                    "source": "[payload][payload]",
                    "target": "payload",
                    "type": "variant"
                },
                { "source": "[payload][temperature]", "target": "temperature" },
                {
                    "source": "[payload][bin]",
                    "target": "bin",
                    "type": "raw",
                    "options": { "encoding": "base64" }
                }
            ]
        }
    ]
}
As per the example config above, Databricks bridge will connect to two instances: at abc.cloud.databricks.example and another instance at cba.cloud.databricks.example. It will use a database called my_catalog for both connections.
The optional options field is specified and overrides some defaults of the plugin's buffering. For the first connection, it sets bufferSize to 5000 and timeoutMs to 3000 milliseconds, while for the second connection, it sets bufferSize to 1000 and timeoutMs to 4000 milliseconds. This illustrates that you can set different queue options per database connection, making it adjustable to different traffic loads.
Also see section about buffering for more information.
The retry queue parameters are in this case also configured differently depending on the database connection. See section about retry queue for more information.
Notice that the first connection uses OAuth machine-to-machine (M2M) authentication, while the second one makes use of personal token authentication. Note, however, that OAuth M2M flow is recommended over the personal token.
In the topicMappings section, we define which MQTT topics should be forwarded to which Databricks tables. The first connection maps topics matching namespace/+/node/# to the node_data measurement, while the second connection maps topics matching namespace/+/node/# to the table called cluster_data.
The schemaMappings section defines how the MQTT message data should be mapped to columns of the database table. In the schema1, we extract:
- The client ID into the column called 
device_id. - MQTT message timestamp into the column called 
insert_dt. - Datetime string in the format 
DD-MM-YYYY HH:mm:ssfrom thedtproperty of the MQTT message paylod into the column calleddt. If the date is in the ISO 8601 format, typecasting todatetimecan be ommited. - Split the MQTT topic by 
/character and insert the second element (index 1) into theidcolumn. labelproperty of the MQTT message into thelablecolumn.payloadproperty inside the MQTT message into thevariant-typed column calledpayload.temperatureproperty of the MQTT message into the column calledtemperature.- Base64 encoded string from the 
binproperty of the MQTT message into thebinary-typed column calledbin. 
Error handling
Any configuration or license errors will prevent the plugin from being loaded, and respective messages will appear in stderr and stdout.
If recoverable errors occur during the operation of the plugin, it will generate respective error messages in its stderr and prefix them with an ERR: prefix. This includes errors encountered during config validation, connection, and data insertion.
Note that while error messages generated by the plugin will be preceded by
ERR:, warning messages will get aWARN:prefix, while debug messages - a respectiveDEBUG:.
Debugging
To print detailed information about queries being performed against the Databricks database to the stdout as well as other information, define DEBUG and DEBUG_DATABRICKS_BRIDGE environment variables on the broker. However, never set this variable in production as it will reveal log parametrised database queries with respective parameter values into logs.
You may also receive additional output by setting debug and driverDebug properties under $.connections[].connection.
Buffering
Buffers are created per target table in every Databricks connection. The size of the buffer is controlled with bufferSize field under options ($.connections[].options) and indicates the capacity of the buffer. The default size of the buffer is 1000. Buffer holds records representing unpacked MQTT messages (or line protocol strings if this is the configured format of your MQTT messages) which are to be inserted to the respective measurement once the buffer is saturated.
Once the buffer fills up, its flushing is triggered, and a batch write is issued against the measurement, inserting the buffer contents. The condition for buffer flush is bufferSize + 1. Once this number of records is reached, the buffer is released.
Another parameter timeoutMs - indicates the time interval (in milliseconds) after which all the non-empty buffers will be forcefully released triggering the batch writes of the buffers' contents. The default value is 5000 ms.
However, even though it might seem that buffer is only released after saturation or a timeout, it also has a mechanism to account for infrequent MQTT messages and decrease the latency of database inserts for such records.
The idea is that the time interval between consecutive MQTT messages is kept track of. If this interval is long enough, the record will be inserted instantly. However, if the interval between the messages is short, the plugin will keep buffering the messages anticipating a possible influx. The definition of "long enough" might change when updates to the plugin are made, but currently, it's a fraction of the timeoutMs.
Note that this leads to the first MQTT message never being buffered.
Also, notice that buffer options can be defined per database connection, allowing for more flexibility and fine-tuning when needed.
Retry queue
The Databricks bridge plugin provides a retry mechanism for failed writes to ensure data is not lost in case of temporary connection issues. To prevent data loss, Databricks bridge has a retry queue that is responsible for retrying message writes in case of connection problems.
Each retry queue can be configured with the following settings (under options field of the configuration ($.connections[].options)):
queueMaxSize option is used to set the maximum amount of messages that the queue is able to accommodate before it starts to drop them. This limit can be set to infinity using the value -1, which is the default. Note that if this value is not set to -1, then careful considerations and testing should be performed to ensure that the queue will not overfill too quickly.
maxRetries is used to specify the maximum number of retry attempts the queue will make before dropping a message. The default is 10.
retryDelayMs is a minimum amount of time in milliseconds that must pass between the retry queue attempts. The default is 1000 ms (1 second).
Note that the elements in a retry queue can be both single MQTT messages as well as entire buffers of MQTT messages that are ready to be batch-written into Databricks.
An important point is that retries happen only in case of connection errors. In case of invalid data the error is logged out but no retries are performed. If a batch of messages is being inserted and only some of these messages are invalid, then the plugin will try to reinsert the records one by one skipping invalid ones.
JSON schema
Databricks bridge plugin uses the following JSON schema for its configuration:
{
    "title": "Databricks-Bridge Plugin Config",
    "type": "object",
    "properties": {
        "version": {
            "type": ["string", "integer"],
            "nullable": true,
            "description": "Version of the configuration file"
        },
        "connections": {
            "type": "array",
            "description": "List of sub-configurations per Databricks connection",
            "items": {
                "type": "object",
                "description": "Configuration per Databricks connection",
                "properties": {
                    "name": {
                        "type": "string",
                        "nullable": false,
                        "description": "Unique textual identifier of the configuration"
                    },
                    "connection": {
                        "type": "object",
                        "description": "Databricks connection specific configuration",
                        "properties": {
                            "hostname": {
                                "type": "string",
                                "nullable": false,
                                "description": "Hostname or IP of the Databricks server to connect to"
                            },
                            "httpPath": {
                                "type": "string",
                                "nullable": false,
                                "description": "Http path of the Databricks server"
                            },
                            "catalog": {
                                "type": "string",
                                "nullable": false,
                                "description": ""
                            },
                            "credentials": {
                                "type": "object",
                                "nullable": false,
                                "description": "Credentials to authenticate to the Databricks instance. Supports both personal access token (token property) as well as machine-to-machine (M2M) OAuth authentication flow (clientId, clientSecret property)",
                                "properties": {
                                    "token": {
                                        "type": "string",
                                        "nullable": true,
                                        "description": "Authentication token for the databricks personal access token authentication flow"
                                    },
                                    "clientId": {
                                        "type": "string",
                                        "nullable": true,
                                        "description": "Client id (service principal's UUID or Application ID) used for the machine-to-machine OAuth authentication flow"
                                    },
                                    "clientSecret": {
                                        "type": "string",
                                        "nullable": true,
                                        "description": "Client secret (value of the service prinaipal's OAuth secret) used for the machine-to-machine OAuth authentication flow"
                                    },
                                    "useDatabricksOAuthInAzure": {
                                        "type": "boolean",
                                        "nullable": true,
                                        "description": "Whether the Databricks OAuth authentication flow is used in Azure"
                                    },
                                    "azureTenantId": {
                                        "type": "string",
                                        "nullable": true,
                                        "description": "Tenant ID in Azure"
                                    }
                                }
                            },
                            "debug": {
                                "type": "boolean",
                                "nullable": true,
                                "description": "Whether to enable debug query logging for connection and driver output"
                            },
                            "driverDebug": {
                                "type": "boolean",
                                "nullable": true,
                                "description": "Whether to enable Databricks driver-specific debug information. This information will be logged into the console and will differ in format from the broker's logs"
                            },
                            "poolOptions": {
                                "type": "object",
                                "nullable": true,
                                "description": "A set of options to configure an internal Databricks connection pool",
                                "properties": {
                                    "maxSize": {
                                        "type": "integer",
                                        "nullable": true,
                                        "minimum": 1,
                                        "default": 10,
                                        "description": "Max pool size, i.e. the maximum number of open sessions allowed. Must be greater than minSize"
                                    },
                                    "minSize": {
                                        "type": "integer",
                                        "nullable": true,
                                        "minimum": 0,
                                        "default": 0,
                                        "description": "Min pool size, i.e. the minimum number of open sessions allowed. Setting to 0 allows the pool to clean all it's resources up when idle. Must be smaller than maxSize"
                                    },
                                    "testOnBorrow": {
                                        "type": "boolean",
                                        "nullable": true,
                                        "default": true,
                                        "description": "Whether to test the health of the newly acquired pool connection by running a dummy SQL query. If this query fails the connection is deemed unhealthy and discarded, connection acquisition is then repeated"
                                    },
                                    "acquireTimeoutMs": {
                                        "type": "integer",
                                        "nullable": true,
                                        "default": 10000,
                                        "description": "A millisecond timeout on waiting for the Databricks connection creation"
                                    },
                                    "idleTimeoutMs": {
                                        "type": "integer",
                                        "nullable": true,
                                        "default": 15000,
                                        "description": "A timespan in milliseconds before the acquired/created connection is deemed idle. Idle connections are elligible for pool eviction once evictor runs"
                                    },
                                    "evictionRunIntervalMs": {
                                        "type": "integer",
                                        "nullable": true,
                                        "default": 20000,
                                        "description": "A timespan in milliseconds before pool's resource evictor runs. When evictor runs all idle connections are destroyed"
                                    }
                                }
                            }
                        },
                        "nullable": false,
                        "required": ["hostname", "httpPath", "catalog", "credentials"]
                    },
                    "options": {
                        "type": "object",
                        "nullable": true,
                        "description": "Plugin options for configuring internal queues and buffers",
                        "properties": {
                            "bufferSize": {
                                "type": "integer",
                                "nullable": true,
                                "minimum": 1,
                                "description": "Maximum number of messages to buffer before flushing to the database"
                            },
                            "timeoutMs": {
                                "type": "integer",
                                "nullable": true,
                                "minimum": 0,
                                "description": "Maximum time in milliseconds to buffer messages before flushing to the database"
                            },
                            "queueMaxSize": {
                                "type": "integer",
                                "nullable": true,
                                "minimum": -1,
                                "description": "Maximum number of messages in retry queue"
                            },
                            "maxRetries": {
                                "type": "integer",
                                "nullable": true,
                                "minimum": 0,
                                "description": "Maximum number of retries for failed inserts due to lost connection"
                            },
                            "retryDelayMs": {
                                "type": "integer",
                                "nullable": true,
                                "minimum": 0,
                                "description": "Delay in milliseconds before retrying inserts failed due to lost connection"
                            }
                        }
                    },
                    "topicMappings": {
                        "type": "array",
                        "description": "List of topic mapping objects specifying which MQTT topics are to be forwarded to a specific Databricks table",
                        "items": {
                            "type": "object",
                            "properties": {
                                "name": {
                                    "type": "string",
                                    "nullable": false,
                                    "description": "Unique identifier of this mapping"
                                },
                                "schemaMapping": {
                                    "type": "string",
                                    "nullable": false,
                                    "description": "Name of the schema mapping which should be applied here"
                                },
                                "target": {
                                    "type": "string",
                                    "description": "Databricks table to forward MQTT messages to"
                                },
                                "options": {
                                    "type": "object",
                                    "nullable": false,
                                    "description": "Additional options",
                                    "properties": {
                                        "schema": {
                                            "type": "string",
                                            "nullable": false,
                                            "description": "Name of the database schema where the target table lives"
                                        }
                                    },
                                    "required": ["schema"]
                                },
                                "mqttTopics": {
                                    "type": "array",
                                    "items": {
                                        "type": "string",
                                        "description": "List of MQTT topics to forward messages from"
                                    }
                                }
                            },
                            "required": ["name", "target", "options", "mqttTopics"]
                        }
                    }
                },
                "required": ["name", "connection", "topicMappings"]
            }
        },
        "schemaMappings": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "name": {
                        "type": "string",
                        "nullable": false,
                        "description": "Unique identifier of the mapping"
                    },
                    "mapping": {
                        "type": "array",
                        "description": "List of mappings to apply to Databricks. Each schema contains a mapping of Databricks table columns to MQTT message properties",
                        "items": {
                            "type": "object",
                            "description": "Properties to select from the MQTT message and insert into the Databricks table",
                            "properties": {
                                "source": {
                                    "oneOf": [
                                        { "type": "string" },
                                        { "type": "boolean" },
                                        { "type": "number" },
                                        { "type": "null", "nullable": true }
                                    ],
                                    "description": "JSON path to the value to insert or constant value"
                                },
                                "target": {
                                    "type": "string",
                                    "description": "Name of the field or tag to insert data from \"source\" into"
                                },
                                "type": {
                                    "type": "string",
                                    "nullable": true,
                                    "description": "Data type of the value to cast source to before inserting"
                                },
                                "options": {
                                    "type": "object",
                                    "nullable": true,
                                    "description": "Options",
                                    "properties": {
                                        "isConst": {
                                            "type": "boolean",
                                            "nullable": true,
                                            "description": "Whether the source is a constant value"
                                        }
                                    },
                                    "additionalProperties": true
                                }
                            },
                            "required": ["source", "target"]
                        }
                    }
                },
                "required": ["name", "mapping"]
            }
        }
    },
    "required": ["connections", "schemaMappings"]
}