MongoDb Bridge
The MongoDB-Bridge plugin can be used to insert data published to the Mosquitto broker into a MongoDB database. The plugin can handle multiple MongoDB client connections. Each connection applies changes to a configured database. Currently, the integrated client supports only basic authentication (username, password). Topic mappings are used to specify which MQTT topic payloads should be inserted into which collection. Each insert can contain the fields payload
, topic
, qos
, client_id
, hostname
, retain
, timestamp
of the received MQTT message. By default, all these fields are included in the database insert command. If needed, Schema mappings are used to filter or rename these fields. The queueSize
can be configured per MongoDB client, to specify a limit of not-yet-processed MQTT messages, before the plugin starts to drop them. More information in the Json Schema section.
Plugin Activation
To load and enable the plugin into the broker, the "mosquitto.conf" must be extended by:
plugin /usr/lib/cedalo_mongodb_bridge.so
plugin_opt_config_file mongodb-bridge.json
persistence_location /mosquitto/data
To make use of the hostname parameter make sure to set the environment variable in the docker-compose-yml
called hostname
as wished.
Config File Format
The config file contains a JSON of the following structure. Available log fields and BSON types used by the default schema or a custom schema are:
hostname
: The hostname of broker node (type:string
). The hostname field is set to the value of the HOSTNAME environment variable, if not set toUnknown
.topic
: The MQTT topic of the message (type:string
).payload
: The payload of the message (type:string
).qos
: The quality of service level of the message (type:int
, minimum:0
, maximum:2
).retain
: Whether or not the message is a retained message (type:bool
,true
orfalse
).timestamp
: The timestamp of when the message was received by the Mosquitto broker (type:date
in UTC).client_id
: The ID of the MQTT client that published the message (type:string
)
An example for the plugin configuration (mongodb-bridge.json
) is:
[
{
"name": "connection-to-db1",
"mongodb": {
"hostname": "mongodb",
"port": 27017,
"database": "db1",
"credentials": {
"username": "user1",
"password": "secret123"
},
"queueSize": 100000,
"reconnectMinDelay": 5,
"reconnectMaxDelay": 25000
},
"schemaMappings": [
{
"name": "reduced-mapping",
"schema": {
"data": "payload",
"nodeId": "hostname"
}
}
],
"topicMappings": [
{
"name": "topic-mapping",
"collection": "sensorData",
"schema": "reduced-mapping",
"topics": [
"sensor_data/#"
]
}
]
}
]
With this example the plugin will create a single client to connect to a MongoDB
instance with the URI: mongodb://user1:secret123@mongodb:27017
.
All data received on sensor_data/#
topics will be published to the collection sensorData
of the database db1
. This is configured using topic mappings, which define the MQTT topics inserted to MongoDB. Without a topic mapping, no messages will be written to MongoDB. Each topic mapping defines a list of MQTT topic filters, and the MongoDB collection where matching messages will be written.
A topic mapping can also reference a custom schema, or use the default schema. In the example above the data is reduced to:
payload
of the MQTT message stored in adata
column of the collectionhostname
stored into thenodeId
column of the collection
Instead of using the default schema mapping containing all information of the default schema.
This is an example configuration snipped, which applies to the docker container setup. For installation not running in a container the above configuration needs to be adjusted accordingly.
plugin_opt_config_file
must be a file name, not a path.
persistence_location
is used as the search path for the plugins' config file.
JSON Schema
Overview over all possible parameter for the mongodb-bridge.json
:
{
"type": "array",
"description": "List of sub-configurations per MongoDB connection/database.",
"items": {
"type": "object",
"description": "Sub-configurations per MongoDB connection/database.",
"properties": {
"name": {
"type": "string",
"description": "Textual identifier of this configuration."
},
"mongodb": {
"type": "object",
"description": "MongoDB server specific configurations.",
"properties": {
"hostname": {
"type": "string",
"description": "Hostname or IP address of the MongoDB server."
},
"port": {
"type": "integer",
"description": "Port the MongoDB server is listening on."
},
"database": {
"type": "string",
"description": "Name of the database, the data should be inserted to."
},
"credentials": {
"type": "object",
"description": "Basic authentication configuration.",
"properties": {
"username": {
"type": "string"
},
"password": {
"type": "string"
}
}
},
"queueSize": {
"type": "integer",
"minimum": 1,
"description": "Specifies the limit of not-yet-processed/inserted MQTT messages, before the plugin starts to drop them."
},
"retryInsertMinDelay ": {
"type": "integer",
"minimum": 1,
"description": "Initial delay in milliseconds before the plugin tries to insert a message again, once the server returned an error or was not available. The increment follows a pattern of double growth, where each increase is doubled compared to the previous increment. If not specified, the plugin won't try to resend messages again after the first try."
},
"retryInsertMaxDelay": {
"type": "integer",
"minimum": 1,
"description": "Maximum delay in milliseconds before the plugin tries to insert a message again, once the server returned an error or was not available. If not specified, the plugin won't try to resend messages again after the first try."
},
"required": [
"hostname",
"port",
"database",
"queueSize"
]
},
"schemaMappings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Textual identifier of this schema mapping."
},
"schema": {
"type": "object",
"description": "Mapping where <key> is the target column name and <value> is the MQTT message information field.",
"patternProperties": {
".*": {
"type": "string",
"enum": [
"hostname",
"payload",
"topic",
"qos",
"client_id",
"retain",
"timestamp"
]
}
},
"additionalProperties": false
}
},
"required": [
"name",
"schema"
]
}
},
"topicMappings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Textual identifier of this topic mapping."
},
"collection": {
"type": "string",
"description": "Name of the collection, the MQTT data should inserted to."
},
"schema": {
"type": "string",
"description": "Name of a schema mapping, which should be applied to this topic mapping. If not specified the default schema mapping including all fields will be used."
},
"topics": {
"type": "array",
"description": "List of topic filters the plugin forwards messages from.",
"items": {
"type": "string"
}
}
},
"required": [
"name",
"collection",
"topics"
]
}
}
},
"required": [
"name",
"mongodb",
"schemaMappings",
"topicMappings"
]
}
}