Skip to main content
Version: Next

Streams

Premium

The Cedalo Stream Processing (CSP) plugin allows the creation of topic streams in the Mosquitto broker. A topic stream receives all messages on a specified topic and can then perform the following tasks:

  • Republish messages on a different topic, with optional QoS and retain control
  • Persist messages to disk, with the ability to replay messages in the future
  • Process JSON payload to extract particular data values prior to republishing
  • Apply aggregation functions to data prior to republishing/persisting
  • Select which messages are processed/persisted based on data values in the payload

Controlling the plugin is carried out with an MQTT topic based API. The Management Center for Mosquitto web tool provides a convenient front end to access the API, the mosquitto_ctrl tool provides command line access to the API, or custom tools can be written to control it.

The streams plugin can be controlled via the Mosquitto Management Center

Stream processing

The stream processing feature allows messages with JSON payloads to be modified before they are republished and/or persisted to disk. This is managed with a user defined query that has similar concepts to a SQL statement.

API description

The CSP plugin is controlled over the $CONTROL/stream-processing/v1 topic, with replies published to $CONTROL/stream-processing/v1/response. You should ensure that your access control solution denies access to these topics for unauthorized users.

Commands are sent as JSON payloads, as described in the command sections below. The examples in each section show only a single command in the command array, but multiple commands can be sent at once and will be processed in order.

Commands will generate a response on the response topic. Unless otherwise specified, this will consist of the command string, the optional correlationData string, and an error string. If the command completed successfully, the error string will not be present. Where a command returns data in the response there will be a data object with contents as described in the individual command sections.

Response payload:

{
"responses":[
{
"command": "getStreamMessageCount",
"correlationData": "3c079967-5bee-4409-aa9f-a963180cde94",
"error": "Persistence not configured for this stream"
}
]
}

Modifying the payload with select statements

When processing is enabled the JSON payload can be manipulated using select statements. If no select statements are defined, then the payload will be left unmodified. If select statements are defined, then only the parts of the message that are defined in the select will be added to the outgoing payload, and only messages that match the select specification will be republished or persisted.

The following examples demonstrate how to use define the select queries. Only the select part of the command payload is shown, see the createStream command for the full command payload requirement.

The example incoming payload is:

 {
"Machine Data": {
"Speed": 7.5,
"Angle Deg": 55.3,
"Angle Rad": 0.97,
"Power": 601.6
},
"Location": "Freiburg"
}

A basic select picks a JSON object from the source payload and optionally places it in the target payload. To access items in the JSON object each element of the object hierarchy is described by the object name in square brackets. For example, to select the Speed element the source item would be set to [Machine Data][Speed]. The same technique is used when choosing the target location. It is perfectly valid to have the source and target paths be the same.

Basic payload manipulation

This example extracts the Speed and makes it a top level object, keeps the Location and discards all other payload entries.

"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Speed]"
},{
"source": "[Location]",
"target": "[Location]
}
]

With the example payload this would result in the following output payload:

{
"Speed": 7.5,
"Location": "Freiburg"
}
note

$ - topics can not be used with the streams plugin

Select Aliases

When defining a select statement an alias can be created. This is a text string, that must not begin or end with a square bracket, that can be used is a where statement instead of the full JSON path.

"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Speed]"
},{
"source": "[Location]",
"target": "[Location],
"alias": "location"
}
]

Aggregate functions

Simple time aggregations can be defined for select statements for calculations over a time bucket. This includes the min, max, sum, count, and time_bucket functions. When a function is defined, the current values for each select statement is stored, and when a message is received, the payload is used to calculate the new value depending on the function in use. A new message will be republished/persisted for every message that is received, and will contain the current values. The current time bucket can be added to the target payload for identifying which bucket a message is in.

The time_bucket function defines the parameters of the time bucket to be used. It has the parameters intervalsize and intervaloffset, which are both in seconds. intervalsize defines the length of the time bucket and must be greater than 0. intervaloffset defines the start offset of the time bucket. So to create a 60 second time interval that starts at 15 seconds into the minute you would use:

"select": [
{
"source": "", # source is ignored for the time_bucket function
"target": "", # set to a JSON path to output to the payload, or blank to ignore
"function": {
"name": "time_bucket",
"intervalsize": 60,
"intervaloffset": 15
}
}
]

Only one time_bucket is allowed per query. The other functions are:

  • max : the maximum value in the time bucket
  • min : the minimum value in the time bucket
  • sum : the sum of all values in the time bucket
  • count : the count of messages received in the time bucket. This function does not require a source.

A more complete example:

"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Data][SpeedMax]",
"alias" : "speedmax",
"function": {
"name" : "max"
}
},
{
"source": "[Machine Data][Power]",
"target": "[Data][PowerSum]",
"alias" : "powersum",
"function": {
"name" : "sum"
}

},
{
"target": "[Data][Count]",
"alias" : "IntervalCount",
"function": {
"name" : "count"
}

},
{
"source": "*",
"target": "",
"alias" : "timeInterval",
"function": {
"name" : "time_bucket",
"intervalsize": 60,
"intervaloffset": 30
}
},
{
"source": "[Location]",
"target": "[Location]",
"alias" : "Location"
}
]

This would produce the output:

{
"Data" {
"SpeedMax": 10,
"PowerSum": 123,
"Count": 3
},
"Location": "Freiburg"
}

The processing feature has some internal sources that can be used:

  • timestamp : the current Unix timestamp in seconds
  • windowstart : the start of the current time bucket, if enabled
  • windowend : the end of the current time bucket, if enabled

For example:

"select": [
{
"source": "timestamp",
"target": "[Data][Now]",
}
]

Choosing messages are used with where statements

It is possible to restrict which messages are processed and persisted by using a where statement. The

Stream persistence and replay

The stream persistence feature allows all messages received on a stream to be saved to disk and then replayed to a new topic at a later point in time. Persistence can be turned on or off at any point for any stream, if the feature is available.

When used on a stream that has processing configured, the processed payload will be stored not the original payload.

Stream persistence is not available for 32-bit platforms.

Replaying

Replaying a stream means republishing its messages on a different topic. A single replay may be running for each stream at once.

To replay a stream, use the replayStream command. The only requirement for is to specify the destination topic. Further control is possible using the gte, lte, limit, speed, and reverse parameters.

If only the replaytopic parameter is provided, all of the messages stored for the stream will be republished in order, with the first message published immediately and subsequent messages published at approximately the same interval as the original messages.

The range and number of messages to be replayed can be limited using gte, lte, and limit. gte and lte are the lower and upper bounds of the time range that should be replayed, respectively. They are both Unix timestamps in seconds. Both are optional, so specifying a range "all messages up to this point" or "all messages since this point" are both possible, as well as "only messages within this range". The limit parameter limits the total number of messages that will be replayed, and can be used in conjunction with gte and/or lte, or on its own. The default value of -1 means there is no numerical limit.

The speed of the replay can be controlled using the speed parameter. Set to the string "original" to keep the original speed, or to "fastest" to be republished at the fastest rate possible. Alternatively, set to a number to specify an exact speed multiple, e.g. setting to 2.5 would replay the stream 2.5x faster than the original.

Stopping an in-progress replay for a stream can be done using the stopStreamReplay command.

Message count

It is possible to obtain the estimated number of message currently persisted using the getStreamMessageCount command.

Clearing messages

Persisted messages can be manually cleared using the clearStreamMessages API command. This completely removes the messages from disk and can not be undone.

All messages can be cleared, or a range of messages can be cleared based on the message timestamp in unix timestamp seconds format. Use the gte parameter to specify the lower bound of the timestamp that will be cleared, and the lte parameter to specify the upper bound of the range to be cleared.

See the API description for details of gte and lte.

Time to Live (TTL)

When creating a new stream and enabling persistence, the Time to Live (TTL) property will be used, which allows the disk usage of streams to be limited. Setting TTL to 0 means that all messages received will be kept on disk forever, unless they are manually cleared. Setting TTL to a positive integer means that messages will be removed from the persistence store in the future. TTL is measured in seconds, and guarantees that messages that are younger than that number of seconds will be kept in the database. Messages older than the TTL interval are not immediately removed from the database: they can remain in the database until at most twice the TTL interval before they are removed, depending on when they were received.

clearStreamMessages

Clear persisted messages within a defined range.

Parameters:

  • streamname : the name of the stream to be cleared.
  • gte : optional lower bound of the range to be cleared, i.e. all messages more recent than this will be cleared. If not present or set to "", then no lower bound will be used. Unix timestamp in seconds.
  • lte : optional upper bound of the range to be cleared, i.e. all messages older than this will be cleared. If not present or set to "", then no upper bound will be used. Unix timestamp in seconds.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Non-code examples:

# Clear all messages that are more recent than 1620131088.
clearStreamMessages gte=1620131088

# Clear all messages that are older than 1620131088.
clearStreamMessages lte=1620131088

# Clear all messages that are more recent than 1620044688 and are also older
# than 1620131088.
clearStreamMessages gte=1620044688 lte=1620131088

Command payload:

{
"commands": [
{
"command": "clearStreamMessages",
"streamname": "<name of stream>",
"gte": <optional lower bound as unix timestamp number>,
"lte": <optional upper bound as unix timestamp number>,
"correlationData": ""
}
]
}

Create Stream and Modify Stream

Create a new stream, or modify an existing stream.

When modifying a stream, only parameters that are present in the JSON payload are updated.

Parameters:

  • streamname : the name of the stream to be created/modified.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.
  • sourcetopic : the MQTT topic that will be used as the source of messages.
  • targettopic : optional MQTT topic where stream messages will be republished.
  • targetqos : optional QoS that will be used when republishing messages. If set to -1 then the original QoS will be used.
  • ttl : optional Time to Live in seconds that will be used when persisting messages.
  • active : boolean, if set to true the stream will be enabled and operational. If set to false, none of the stream features will be used.
  • process : boolean, if set to true the stream processing feature will be enabled. If set to false, stream processing will be disabled and payloads will be republished without modification.
  • persist : boolean, if set to true the stream persistence feature will be enabled. If set to false, stream persistence will be disabled and messages will not be persisted to disk.
  • textname : optional user string to name the stream.
  • textdescription : optional user string to describe the stream.
  • query : optional object used with the processing feature, can contain a select and/or a where array.
  • select : optional query array
  • where : optional query array. At the moment this array can only contain a single object with an operator, left, and right members. The left string is a JSON path field or alias. The right item can only be a value and can be a string, number, or boolean. The operator is a string which can be one of ==, !=, >, <, >=, or <=.

Command payload:

{
"commands": [
{
"command": "createStream",
"streamname": "<name>",
"correlationData": "",
"sourcetopic": "",
"targettopic": "",
"targetqos": -1|0|1|2,
"ttl": 86400,
"active": true|false,
"process": true|false,
"persist": true|false,
"textname": "",
"textdescription": "",
"query": {
"select": [
{
"source": "",
"target": "",
"alias": "",
"function": {
"name": ""
}
}
],
"where": [
{
"operator": "==",
"left": "",
"right: ""
}
]

}
}
]
}

Delete Stream

Delete a stream. This cannot be undone. If the stream is currently carrying out a replay, the replay will be stopped.

  • streamname : the name of the stream to be deleted.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "deleteStream",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}

Disable Stream

Disable an existing stream. Enabling / setting active here means that the stream will accept messages and process, persist, and republish them as it is configured. If a stream is disabled / set inactive then it will not process, persist, or republish messages.

  • streamname : the name of the stream to be disabled.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "disableStream",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}

Enable Stream

Enable an existing stream. Enabling / setting active here means that the stream will accept messages and process, persist, and republish them as it is configured. If a stream is disabled / set inactive then it will not process, persist, or republish messages.

  • streamname : the name of the stream to be enabled.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "enableStream",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}

getLicense

Retrieve the current licensed features.

Command payload:

{
"commands": [
{
"command": "getLicense",
"correlationData": ""
}
]
}

Response payload:

{
"responses":[
{
"command": "getLicense",
"correlationData": "",
"data": {
"issuedBy": "Cedalo AG",
"issuedTo": "name@customer.com",
"comment": "Comment",
"serial": "00000000-0000-0000-0000-000000000000",
"features": [
{
"name": "stream-count",
"version": "1.0",
"validSince": 1610219382000,
"validUntil": 1610305782000,
"count": -1
},
{
"name": "stream-processing",
"version": "1.0",
"validSince": 1610219382000,
"validUntil": 1610305782000
},
{
"name": "stream-persistence",
"version": "1.0",
"validSince": 1610219382000,
"validUntil": 1610305782000
}
]
}
}
]
}

getStream

Get information on a stream.

Parameters:

  • streamname : the name of the stream to be queried.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "getStream",
"streamname": "<name>",
"correlationData": ""
}
]
}

Response payload:

{
"responses":[
{
"command": "getStream",
"correlationData": "",
"data": {
"streamname": "<name>",
"sourcetopic": "",
"targettopic": "",
"textname": "",
"textdescription": "",
"targetqos": -1,
"ttl": 86400,
"active": true|false,
"process": true|false,
"persist": true|false,
"replaying": true|false,
"query": {
# Query spec
}
}
}
]
}

getStreamMessageCount

Retrieve the approximate count of messages that are currently persisted.

Parameters:

  • streamname : the name of the stream to be queried.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "getStreamMessageCount",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}

Response payload:

{
"responses":[
{
"command": "getStreamMessageCount",
"correlationData": "",
"data": {
"streamname": "<name of stream>",
"count": <count>
}
}
]
}

listStreams

List all configured streams.

Parameters:

  • verbose : boolean, if set to false a simple list of stream names will be returned. If set to true, the list of streams will be returned as objects in the same format as when using the getStream command.
  • count : optional maximum number of streams to return.
  • offset : optional offset to start the stream list.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "listStreams",
"streamname": "<name>",
"verbose": true|false,
"count": <number>,
"offset": <number>
"correlationData": ""
}
]
}

Response payload:

{
"responses":[
{
"command": "listStreams",
"correlationData": "",
"data": {
"streamname": "<name>",
"sourcetopic": "",
"targettopic": "",
"textname": "",
"textdescription": "",
"targetqos": -1|0|1|2,
"ttl": 86400,
"active": true|false,
"process": true|false,
"persist": true|false,
"replaying": true|false,
"query": {
# Query spec
}
}
}
]
}

Replay Stream

Replay a persisted stream to a different topic.

Parameters:

  • streamname : the name of the stream to be replayed.
  • replaytopic : The topic where the messages will be published.
  • gte : optional lower bound of the range to be cleared, i.e. all messages more recent than this will be replayed. If not present or set to "", then no lower bound will be used. Unix timestamp in seconds.
  • lte : optional upper bound of the range to be cleared, i.e. all messages older than this will be replayed. If not present or set to "", then no upper bound will be used. Unix timestamp in seconds.
  • limit : optionally limit the number of messages replayed. This number represents a maximum value and may not be reached if there are not enough messages persisted. A value of -1, the default, means that no numerical limit will be applied.
  • speed : optionally change the speed at which the messages are replayed. This can be one of the strings "original" or "fastest", or a number indicating the factor. For example, a speed of 2.5 means replay at 2.5x the original speed.
  • reverse : optionally set to true to play the messages in reverse order.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "replayStream",
"streamname": "<name of stream>",
"replaytopic": "<replay destination topic>",
"gte": <optional lower bound as unix timestamp number>,
"lte": <optional upper bound as unix timestamp number>,
"limit": <optional message limit, number>,
"speed": <optional replay speed, number>,
"reverse": <optional reverse playback mode, boolean>,
"correlationData": ""
}
]
}

Stop StreamReplay

Stop a stream replay, if it is running.

Parameters:

  • streamname : the name of the stream to be stopped.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "stopStreamReplay",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}

Plugin activation

To activate the plugin, make sure your configuration file at /mosquitto/config/mosquitto.conf is using the following lines:

plugin /usr/lib/cedalo_stream_processing.so
plugin_opt_data_dir /mosquitto/data/csp

After the start of Mosquitto the logs should show your license information:

CSP: Cedalo Streaming Plugin initializing. CSP: License serial: decc1940-a806-11eb-ac1e-9d31c1d948ef CSP: License issued by: Cedalo AG CSP: License issued to: info@cedalo.com CSP: License comment: This is a demo license. CSP: Licensed stream count: 5 AVAILABLE until 2022-04-28T10:48:22 CSP: Processing AVAILABLE until 2022-04-28T10:48:22 CSP: Persistence AVAILABLE until 2022-04-28T10:48:22

note

Changing the configuration is only necessary, if you manually add the Streams Plugin in a later stage.