Skip to main content
Version: Next

Google Pub/Sub Bridge

Premium
info

Coming soon: Configuring this plugin will soon be easier than ever before. Find more information about this here.

The Google Pub/Sub Plugin actsj as a bridge between the Mosquitto broker and Google Pub/Sub service, facilitating the exchange of messages between MQTT and Google Pub/Sub. This interoperability supports various messaging patterns, including 1:1, 1:n, and n:1, as dictated by the designated topic mappings. For instance, a message that arrives at a Mosquitto broker can be relayed to one or multiple topics within the Google Pub/Sub service. Similarly, a Mosquitto broker can fetch messages from one or more Google Pub/Sub topics and distribute them to one or several MQTT topics. For further details, refer to the configuration format and example.

info

The plugin cannot configure the Google-Pub/Sub service. This has to be done upfront.

info

The push feature is currently not supported at all and the pull feature is in the beta state and therefore subject to change!

Plugin state

The push feature is currently not supported at all and the pull feature is in experimental state and therefore subject to change!

Plugin activation and configuration

To enable the Google PubSub plugin in the broker add the following to the mosquitto.conf file:

plugin /usr/lib/cedalo_google_pubsub.so

Configuration is done via two JSON files. One is a key file which contains the Pub/Sub project and credentials to use. This file is provided by Google. The other file configures the plugin itself (see next section). The filenames can be edited, but the path is resolved relative to the persistence_location property. The paths to both files are specified in mosquitto.conf using following plugin options:

plugin_opt_key_file path/to/pubsub-key.json
plugin_opt_config_file path/to/pubsub-config.json

Note: the name of the file does not matter, but the path is resolved relative to the persistence_location property. So this property must be specified as well, for example:

persistence_location /mosquitto/data

Config file format

The configuration is based on the Cedalo plugin config guidelines. A connection is defined for each Google PubSub feature, namely publish, pull and (in a future version)push. The core setting for each connection is the topic mapping defined by the topicMappings property. For publish this defines how messages are forwarded from Mosquitto broker to Google PubSub and for pull/push it defines the other way round, i.e. how messages received by Google PubSub are forwarded via the Mosquitto broker.

Current connection options:

  • publish: Settings for the publish feature.
    • topicMappings: The topic mapping which defines how messages received by Mosquitto broker are forwarded to Google PubSub topics (type: array).
      • mqttTargets: MQTT topics to subscribe to (type: array).
      • target: Google PubSub topic to publish (type: string).
      • targets: Several Google PubSub topics to publish (type: array).
    • options: Currently contains only settings for internal message queue which is used to try to publish failed messges again (Optional, type: object).
      • maxQueuedMessages: max. total number of queued messages. If maxQueuedMessages is reached further received messages are dropped. (Optional, defaults to 100)
      • retryDelayMs: Delay in milliseconds to wait before retrying to publish queued messages again. (Optional, defaults to 10000ms)
      • maxRetries: Max. number of attempts before retry is stopped. Use -1 for unlimited retries. (Optional, defaults to -1)
  • pull: EXPREIMENTAL - Settings for the pull feature.
    • topicMappings: The topic mapping which defines how messages received from Google PubSub are forwarded to MQTT topics (type: array).
      • mqttTargets: MQTT topics to publish to (type: array).
      • source: Google PubSub topic to subscribe to (type: string).
      • sources: Several Google PubSub topics to subscribe to (type: array).
    • options:
      • pullTriggerTopic: The MQTT topic used to trigger message pull (type: string).
      • pullDurationMs: The duration in milliseconds for how long messages should be pulled from Google PubSub. (Optional, defaults to 30000ms)
      • qos: The quality of service flag to use for MQTT publish (type: 0, 1, 2). (Optional, defaults to 0)
  • push: Currently not supported. (type: object).

For a more technical overview refer to the format schema at the bottom of this page as well.

Following is an example for a pubsub-config.json:

{
"connections": [
{
"name": "publish",
"topicMappings": [
{
"mqttTopics": ["cedalo/test"],
"targets": ["test_topic", "cedalo_test_topic"]
}
],
"options": {
"maxQueuedMessages": 5,
"retryDelayMs": 10000,
"maxRetries": 5
}
},
{
"name": "pull",
"topicMappings": [
{
"mqttTopics": ["cedalo/app"],
"source": "cedalo_test_topic"
}
],
"options": {
"pullDurationMs": 10000,
"pullTriggerTopic": "cedalo/test/pull"
}
}
]
}

In this example two connections are defined, namely a publish and a pull connection. The publish connection defines a 1:n topic mapping, i.e. each message which is send to the Mosquitto broker on topic cedalo/test is published to two Google PubSub topics, namely test_topic and cedalo_test_topic. The publish options limits the size of the message queue to 5 failed messages and specifies that the queue should retry sending failed messages each 10 seconds but at max. 5 times. A second connection pull defines a 1:1 topic mapping and a pull duration of 10 seconds. That means after the broker has started to pull messages each message received on Google PubSub topic cedalo_test_topic within the following 10 seconds will be published by Mosquitto broker on topic cedalo/app. Message pull is started by sending an empty message to the specified trigger topic cedalo/test/pull to the broker.

info

The pull feature is still experimental and subject to change!

JSON Schema

Following are the connection schemata for publishand pull. These schemata adjust the overall base plugin schema as defined in the Cedalo plugin guidelines.

  • publish connection:
{
"name": { "type": "string", "const": "publish" },
"options": {
"type": "object",
"properties": {
"maxQueuedMessages": {
"type": "integer",
"description": "The max. number of queued messages. Use -1 for unlimited queue",
"minimum": 0,
"default": 100
},
"retryDelayMs": {
"type": "number",
"description": "Delay in milliseconds to wait before retrying to send queued messages.",
"minimum": 1000,
"default": 10000
},
"maxRetries": {
"type": "integer",
"description": "Max. number of attempts before retry is stopped. Use -1 for unlimited retries.",
"default": -1
}
},
"required": [],
"additionalProperties": false
},
"topicMappings": {
"description": "Topic mapping",
"type": "array",
"items": {
"type": "object",
"properties": {
"mqttTopics": {
"description": "MQTT Topics which the plugin should gather messages to forwarded them to the target (e.g. Topic/Database/URI)",
"type": "array",
"items": {
"type": "string"
}
},
"target": {
"description":
" 'Describing the target (message sink), which can be a topic, DB table, URI, ..., depending on the use case. Note: either target or targets should be specified but not both",
"type": "string"
},
"targets": {
"description":
" 'Describing the targets (message sinks), which can be topics, like DB table, URI, ..., depending on the use case. Note: either target or targets should be specified but not both",
"type": "array",
"items": {
"type": "string"
}
}
},
"oneOf": [{ "required": ["target"] }, { "required": ["targets"] }],
"required": ["mqttTopics"]
}
}
}
  • pullconnection:
{
"name": {
"type": "string",
"const": "pull"
},
"options": {
"type": "object",
"properties": {
"pullDurationMs": {
"type": "number",
"description": "Specifies in milliseconds for how long incoming messages should be pulled.",
"minimum": 1000,
"default": 30000
},
"pullTriggerTopic": {
"type": "string",
"description": "The mqtt topic which triggers the pubsub pull.",
"minLength": 1
},
"qos": {
"type": "integer",
"enum": [0, 1, 2],
"description": "The quality of service value to use for publishing received messages to mqtt topic.",
"default": 0
}
},
"required": [],
"additionalProperties": false
},
"topicMappings": {
"items": {
"type": "object",
"properties": {
"mqttTopics": {
"description": "MQTT Topics which the plugin should gather messages to forwarded them to the target (e.g. Topic/Database/URI)",
"type": "array",
"items": {
"type": "string"
}
},
"source": {
"description": "Describing the source (message source), which can be a topic. Note: either source or sources should be specified but not both",
"type": "string"
},
"sources": {
"description": "Describing the sources (message sources), which can be topics. Note: either source or sources should be specified but not both",
"type": "array",
"items": {
"type": "string"
}
}
},
"oneOf": [{ "required": ["source"] },{ "required": ["sources"] }],
"required": ["mqttTopics"]
}
}
}