Skip to main content
Version: Mosquitto 2.7

Kafka Bridge

Premium

The Kafka Bridge plugin can be used to publish data published to the Mosquitto broker to a Kafka broker. The plugin can handle multiple Kafka broker connections. To define which MQTT topics should be bridged to the Kafka broker multiple topping mappings can be specified (see example configuration). The implemented Kafka client additionally allowes basic authentication (username, password) with SASL (included mechanisms: plain, scram-sha-256, scram-sha-512). Should the Kafka Bridge be unable to establish a connection to the Kafka broker, incoming messages will be stored in a queue. The size of the queue can be set with the queueSize parameter and is valid for each Kafka topic specified in the topic mappings. This is due to the type of queue we use (QueueMap). This type of queue allows a higher throughput because messages of all topics get merged into so called ProducerBatches before they are sent to Kafka.

Plugin activation

To enable the NodeJS Kafka Bridge plugin in the broker add the following to the mosquitto.conf file:

plugin /usr/lib/cedalo_kafka_bridge.so

persistence_location /mosquitto/data

This is an example configuration snipped, which applies to the docker container setup. For installation not running in a container the above configuration needs to be adjusted accordingly.

persistence_location is used as the search path for the plugins config file.

Config file format

The config is stored in a single JSON file (named kafka-bridge.json) located inside the persistence_location which is defined in the mosquitto.conf. There is also a schema at the bottom of this page which contains all possible field. The following fields of the config are mandatory:

  • name: Name of the Kafka Bridge config (type: string).
  • brokers: List of the Kafka broker's adresses (type: array of type string).
  • clientId: ID of the Kafka client that published the message. The clientId can be set arbitrarily. The purpose of this ID is to provide more data about the source of requests than IP address and port (type: string).
  • allowAutoTopicCreation: If set to true, topics will be automatically created in the Kafka broker in case the topic does not exist (type: bool, true or false).
  • queueSize: Size of the internal queue that is valid for each Kafka topic. The queue is used to store messages in case that the plugin cannot establish a connection to the Kafka broker. If the queueSize is reached incoming messages will be dropped. The plugin will produce log messages in those cases (type: int, minimum: 1).
  • topicMappings: List of topic mappings where (multiple) MQTT topics can be mapped to one Kafka topic (type: array of type object).

Optional fields:

  • sasl: Used to configure basic authentication (type: object) (see more detailed information below the example kafka-bridge.json).
  • retryPublishMinDelay: Defines the minimum delay in milliseconds before the plugin tries to resend a message, once the first publish failed. If not specified there will be no retry after first failed publish. (type: int, minimum: 1).
  • retryPublishMaxDelay: Defines the maximum delay in milliseconds before the plugin tries to publish a message again, once the first publish failed. If not specified there will be no retry after first failed publish. (type: int).
  • ssl: If set to true, SSL will be enabled for the connection between the Kafka client and Kafka broker (type: bool, true or false).

An example for the kafka-bridge.json is (see all available options here):

[
{
"name": "sensorData",
"connection": {
"brokers": [
"127.0.0.1:29092"
],
"clientId": "mosquitto-broker",
"allowAutoTopicCreation": true,
"queueSize": 10,
"sasl": {
"mechanism": "plain",
"username": "user",
"password": "secret_password"
},
"retryPublishMinDelay": 250,
"retryPublishMaxDelay": 2500
},
"topicMappings": [
{
"name": "sensorAll",
"kafkaTopic": "sensorAll",
"mqttTopics": [
"sensor/#"
]
}
]
}
]

In this example config named "sensorData" it is expected that a Kafka broker is already running on localhost listening on port 29092. In general there could be more than one broker in the brokers array.

The queueSize parameter is set to 10. That means that each Kafka topic mapped in the topicMappings has a queue that stores up to 10 messages in case that the Kafka client cannot establish a connection to the Kafka broker.

To configure basic authentication (username, password) with SASL assign the fields mechanism, username and password in the sasl block (available mechanisms for SASL: plain, scram-sha-256, scram-sha-512). The mechanism and credentials in the config file must match to those in the Kafka broker.

In case that the client failed to publish a message (e.g., Kafka broker returned an error) the plugin will wait at least 250 milliseconds and at most 2500 milliseconds until it retries to publish the message again. This behavior is defined by the retryPublishMinDelay and retryPublishMaxDelay values in the plugin's config file. These values are passed to the implemented Kafka client, which automatically manages the retry mechanism.

In this config each MQTT topic payload published to sensor/# will be published to the Kafka sensorAll topic. This is achieved by defining a topic mapping in the topicMappings section of the configuration file. So, for all topics that should be bridged to the Kafka broker there has to be a topic mapping configured. Each topic mapping defines a list of MQTT topic filters, and the Kafka topic where matching messages will be written. If there is no topic mapping, no messages of the respective MQTT topic will be forwarded.

JSON Schema

Overview over all possible parameters for the kafka-bridge.json:

{
"type": "array",
"description": "List of sub-configurations per Kafka connection.",
"items": {
"type": "object",
"description": "Sub-configurations per Kafka connection.",
"properties": {
"name": {
"type": "string",
"description": "Textual identifier of this configuration."
},
"connection": {
"type": "object",
"description": "Kafka connection specific configurations.",
"properties": {
"brokers": {
"type": "array",
"description": "List of brokers, e.g. \"localhost:9092\"",
"items": {
"type": "string"
}
},
"clientId": {
"type": "string"
},
"ssl": {
"type": "boolean",
"nullable": true
},
"sasl": {
"type": "object",
"description": "SASL Configuration.",
"properties": {
"mechanism": {
"type": "string",
"enum": [
"plain",
"scram-sha-256",
"scram-sha-512"
]
},
"username": {
"type": "string"
},
"password": {
"type": "string"
}
},
"nullable": true,
"required": [
"username",
"password",
"mechanism"
]
},
"allowAutoTopicCreation": {
"type": "boolean"
},
"queueSize": {
"type": "integer",
"description": "Specifies the limit of not-yet-processed/published MQTT messages, before the plugin starts to drop them.",
"minimum": 1
},
"retryPublishMinDelay": {
"type": "integer",
"description": "Initial delay in milliseconds before the plugin tries to publish a message again, once the server returned an error or was not available. The increment follows a pattern of double growth, where each increase is doubled compared to the previous increment. If not specified, the plugin won't try to resend messages again after the first try.",
"minimum": 1,
"nullable": true
},
"retryPublishMaxDelay": {
"type": "integer",
"description": "Maximum delay in milliseconds before the plugin tries to publish a message again, once the server returned an error or was not available. If not specified, the plugin won't try to resend messages again after the first try.",
"minimum": 1,
"nullable": true
}
},
"required": [
"brokers",
"clientId",
"queueSize"
]
},
"topicMappings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Textual identifier of this topic mapping."
},
"kafkaTopic": {
"type": "string",
"description": "Name of the topic, the MQTT data should published to."
},
"mqttTopics": {
"type": "array",
"description": "List of topic filters the plugin forwards messages from.",
"items": {
"type": "string"
},
"minItems": 1
}
},
"required": [
"name",
"kafkaTopic",
"mqttTopics"
]
}
}
},
"required": [
"name",
"connection",
"topicMappings"
]
}
}