Skip to main content
Version: Mosquitto 2.5

Mosquitto Broker

System requirements

The Mosquitto broker is the most efficient broker offered worldwide.

When considering how much a MQTT Broker is used and the work it has to perform, it is not so much about the number of clients that you should focus on but rather the amount of data sent per second, the "traffic".

Suppose you have 1.5 million clients but each client is sending only little data and, additionally, that even quite rarely, e.g. only every couple of minutes.

Then, this leads to much less traffic and workload on the broker than merely a few thousand clients sending big data messages at high frequency.

A different scenario that can incur a lot of traffic and workload for the broker is if you want to transfer pictures every millisecond using QoS level 2 (QoS2) - a Quality of Service level occupying the resources of a broker way more than other Quality of Service levels.

Read more about Quality of Service.

Additionally, the circumstances of the setup influence the performance. Thinking of remote devices, a very good latency is certainly harder to achieve than when the devices would be close together.

High Availability (HA)

note

Premium feature

HA is the ability of a system to work at optimal performance continuously for a long period.

This means that the systems should work without failure. The Mosquitto MQTT broker is designed to be stable and consistently functional by clustering.

Clustering provides prevention of disadvantages whenever a broker might break down, e.g. hardware failure. Therefore usually three brokers get clustered.

In the case of a breakdown of broker number one, the so-called "load balancer" shifts the workload to a passive broker number two.

To be absolutely safe, there is at least one passive broker clustered.

The HA architecture requires that the cluster has a majority of nodes active and connected in order for the cluster to be operable and stable. So for 3 nodes, 2 must be available. For 5 nodes, 3 must be available and so on. The third node of three isn't just an extra backup, but required as part of the cluster algorithm.

Minimum Hardware Requirements

Hardware

Hardware requirements for Eclipse Mosquitto can vary depending on the client connections and message rates expected. The minimum specification below is for a low power device supporting 10s of clients with a low message rate. The recommended specification is for a device capable of supporting 10,000s of clients with reasonable message rates. The "with stream processing" specification includes extra RAM and storage for stream processing support.

MinimumRecommendedRecommended (with stream processing)Comment
RAM128MB4GB16GB
CPU1 core 32-bit ARMRecent 4+ core AMD/IntelRecent 4+ core AMD/Intel
Storage20MB200MB40GB

Software

Eclipse Mosquitto is available natively on Linux, Windows, and other Unix-like systems, and using Docker.

On Windows, a Docker installation gives the best performance.

Docker

The minimum Docker version is 19.03.

https://hub.docker.com/u/cedalo

Mosquitto MQTT Broker versions

Open-Source

The open source Eclipse Mosquitto is the most popular and downloaded MQTT broker in the world. The Mosquitto MQTT broker enables you to connect sensors, devices, and applications with more flexibility and speed since it is written in C.

Eclipse Mosquitto is open source (EPL/EDL licensed).

It implements the MQTT protocol versions 5.0, 3.1.1 and 3.1.

Report bugs or submit changes on the Github repository or the foums.

https://github.com/eclipse/mosquitto

https://forum.cedalo.com/

Talk to other users on the Mosquitto mailing list or on Slack.

https://accounts.eclipse.org/mailing-list/mosquitto-dev

https://eclipse-iot-wg.slack.com/join/shared_invite/zt-d8zil9s0-NF5UHh92Odf3AbonspswHA#/shared-invite/email

Get help from the forums.

https://forum.cedalo.com/

Cite Mosquitto in your academic work .

https://mosquitto.org/blog/2017/06/citing-eclipse-mosquitto/

Premium

Cedalo Enterprise Platform for Eclipse Mosquitto builds on the core of the open source Mosquitto. It comes with premium support and plugins like Stream Processing and High Availability.

Take a look at the features and capabilities of Eclipse Mosquitto versus the Cedalo Enterprise Platform for Eclipse Mosquitto offerings and choose your MQTT broker:

Eclipse Mosquitto as a managed Service

Mosquitto hosted in the Cedalo Cloud

Cedalo Enterprise Platform for Eclipse Mosquitto On-Premise

Mosquitto with Premium Management Center.

Mosquitto Broker Client Library

The Mosquitto MQTT Broker supports any client library that is supporting MQTT protocol.

Here is an overview.

Proxy Protocol

In computer networking, a proxy server is a server application that acts as an intermediary between a client requesting a resource and the server providing that resource.

Instead of connecting directly to a server that can fulfill a request for a resource, such as a file or web page, the client directs the request to the proxy server, which evaluates the request and performs the required network transactions.

This serves as a method to simplify or control the complexity of the request, or provide additional benefits such as load balancing, privacy, or security.

Proxies were devised to add structure and encapsulation to distributed systems. A proxy server thus functions on behalf of the client when requesting service, potentially masking the true origin of the request to the resource server.

Idle Timeout

If you wish to have minimum workload, you might want to close client connections that have no traffic. Idle Timeout enables to set a time interval instructing the broker to check for activity at the end of the time interval.

If there is no exchange of packets the connection will be closed.

Cedalo Stream Processing (CSP) plugin

note

Premium feature

Cedalo Stream Processing (CSP) plugin is a Mosquitto Premium feature. The plugin allows the creation of topic streams in the Mosquitto broker. A topic stream receives all messages on a specified topic and can then perform the following tasks:

  • Republish messages on a different topic, with optional QoS and retain control.
  • Persist messages to disk, with the ability to replay messages in the future.
  • Process JSON payload to extract particular data values prior to republishing.
  • Apply aggregation functions to data prior to republishing/persisting.
  • Select which messages are processed/persisted based on data values in the payload.
  • Controlling the plugin is carried out with an MQTT topic based API. The Management Center for Mosquitto web tool provides a convenient front end to access the API, the mosquitto_ctrl tool provides command line access to the API, or custom tools can be written to control it.

Read more about the Management Center.

Quick start

After installing the Cedalo Platform Docker image, the Mosquitto configuration file must be edited to enable the CSP plugin.

Edit cedalo_platform/mosquitto/config/mosquitto.conf, and add the following lines:

  • plugin /usr/lib/cedalo_stream_processing.so
  • plugin_opt_data_dir /mosquitto/data/csp

Find more information about the install here:

Read more about the Download.

Important, when using a plugin, is to place the plugin in the configuration data, too.

After that restart the Mosquitto broker. Check the command straight after the restart for the loaded Cedalo Streaming Plugin (CSP).

Changing the configuration is only necessary, if you want to switch from an existing open source installation to a premium version.

Stream Processing

The stream processing feature allows messages with JSON payloads to be modified before they are republished and/or persisted to disk. This is managed with a user defined query that has similar concepts to a SQL statement.

Modifying the payload with select statements

When processing is enabled the JSON payload can be manipulated using select statements. If no select statements are defined, then the payload will be left unmodified. If select statements are defined, then only the parts of the message that are defined in the select will be added to the outgoing payload, and only messages that match the select specification will be republished or persisted.

The following examples demonstrate how to use define the select queries. Only the select part of the command payload is shown, see the createStream command for the full command payload requirement. The example incoming payload is:

 {
"Machine Data": {
"Speed": 7.5,
"Angle Deg": 55.3,
"Angle Rad": 0.97,
"Power": 601.6
},
"Location": "Freiburg"
}

A basic select picks a JSON object from the source payload and optionally places it in the target payload. To access items in the JSON object each element of the object hierarchy is described by the object name in square brackets. For example, to select the Speed element the source item would be set to [Machine Data][Speed]. The same technique is used when choosing the target location. It is perfectly valid to have the source and target paths be the same.

Basic payload manipulation

This example extracts the Speed and makes it a top level object, keeps the Location and discards all other payload entries.

"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Speed]"
},{
"source": "[Location]",
"target": "[Location]
}
]

With the example payload this would result in the following output payload:

{
"Speed": 7.5,
"Location": "Freiburg"
}

Select Aliases

When defining a select statement an alias can be created. This is a text string, that must not begin or end with a square bracket, that can be used is a where statement instead of the full JSON path.

"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Speed]"
},{
"source": "[Location]",
"target": "[Location],
"alias": "location"
}
]

Aggregate functions

Simple time aggregations can be defined for select statements for calculations over a time bucket.

This includes the min, max, sum, count, and time_bucket functions.

When a function is defined, the current values for each select statement is stored, and when a message is received, the payload is used to calculate the new value depending on the function in use.

A new message will be republished/persisted for every message that is received, and will contain the current values.

The current time bucket can be added to the target payload for identifying which bucket a message is in.

The time_bucket function defines the parameters of the time bucket to be used.

It has the parameters intervalsize and intervaloffset, which are both in seconds. intervalsize defines the length of the time bucket and must be greater than 0. intervaloffset defines the start offset of the time bucket.

So to create a 60 second time interval that starts at 15 seconds into the minute you would use:

"select": [
{
"source": "", # source is ignored for the `time_bucket` function
"target": "", # set to a JSON path to output to the payload, or blank to ignore
"function": {
"name": "time_bucket",
"intervalsize": 60,
"intervaloffset": 15
}
}
]

Only one time_bucket is allowed per query. The other functions are:

  • max : the maximum value in the time bucket
  • min : the minimum value in the time bucket
  • sum : the sum of all values in the time bucket
  • count : the count of messages received in the time bucket. This function does not require a source.

A more complete example:

"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Data][SpeedMax]",
"alias" : "speedmax",
"function": {
"name" : "max"
}
},
{
"source": "[Machine Data][Power]",
"target": "[Data][PowerSum]",
"alias" : "powersum",
"function": {
"name" : "sum"
}
},
{
"target": "[Data][Count]",
"alias" : "IntervalCount",
"function": {
"name" : "count"
}
},
{
"source": "*",
"target": "",
"alias" : "timeInterval",
"function": {
"name" : "time_bucket",
"intervalsize": 60,
"intervaloffset": 30
}
},
{
"source": "[Location]",
"target": "[Location]",
"alias" : "Location"
}
]

This would produce the output:

{
"Data" {
"SpeedMax": 10,
"PowerSum": 123,
"Count": 3
},
"Location": "Freiburg"
}

The processing feature has some internal sources that can be used:

  • timestamp : the current Unix timestamp in seconds
  • windowstart : the start of the current time bucket, if enabled
  • windowend : the end of the current time bucket, if enabled

For example:

"select": [
{
"source": "timestamp",
"target": "[Data][Now]",
}
]

Choosing messages are used with where statements

It is possible to restrict which messages are processed and persisted by using a where statement.

Stream persistence and replay

The stream persistence feature allows all messages received on a stream to be saved to disk and then replayed to a new topic at a later point in time.

Persistence can be turned on or off at any point for any stream, if the feature is available.

When used on a stream that has processing configured, the processed payload will be stored not the original payload.

Stream persistence is not available for 32-bit platforms.

autosave_interval seconds

The number of seconds that mosquitto will wait between each time it saves the in-memory database to disk. If set to 0, the in-memory database will only be saved when mosquitto exits or when receiving the SIGUSR1 signal. Note that this setting only has an effect if persistence is enabled. Defaults to 1800 seconds (30 minutes).

This option applies globally.

Reloaded on reload signal.

Replaying

Replaying a stream means republishing its messages on a different topic. A single replay may be running for each stream at once.

To replay a stream, use the replayStream command. The only requirement for is to specify the destination topic. Further control is possible using the gte, lte, limit, speed, and reverse parameters.

If only the replaytopic parameter is provided, all of the messages stored for the stream will be republished in order, with the first message published immediately and subsequent messages published at approximately the same interval as the original messages.

The range and number of messages to be replayed can be limited using gte, lte, and limit.

gte and lte are the lower and upper bounds of the time range that should be replayed, respectively. They are both Unix timestamps in seconds.

Both are optional, so specifying a range "all messages up to this point" or "all messages since this point" are both possible, as well as "only messages within this range".

The limit parameter limits the total number of messages that will be replayed, and can be used in conjunction with gte and/or lte, or on its own. The default value of -1 means there is no numerical limit.

The speed of the replay can be controlled using the speed parameter. Set to the string "original" to keep the original speed, or to "fastest" to be republished at the fastest rate possible.

Alternatively, set to a number to specify an exact speed multiple, e.g. setting to 2.5 would replay the stream 2.5x faster than the original.

Stopping an in-progress replay for a stream can be done using the stopStreamReplay command.

persistence_location path

The path where the persistence database should be stored. If not given, then the current directory is used.

This option applies globally.

Current versions (2.0 and earlier) only use this for internal persistence location, but later versions will allow this to be set for use by all plugins.

Reloaded on reload signal.

Message count

It is possible to obtain the estimated number of message currently persisted using the getStreamMessageCount command.

Clearing messages

Persisted messages can be manually cleared using the clearStreamMessages API command.

This completely removes the messages from disk and can not be undone.

All messages can be cleared, or a range of messages can be cleared based on the message timestamp in unix timestamp seconds format.

Use the gte parameter to specify the lower bound of the timestamp that will be cleared, and the lte parameter to specify the upper bound of the range to be cleared.

See the API description for details of gte and lte.

Time to Live (TTL)

When creating a new stream and enabling persistence, the Time to Live (TTL) property will be used, which allows the disk usage of streams to be limited.

Setting TTL to 0 means that all messages received will be kept on disk forever, unless they are manually cleared.

Setting TTL to a positive integer means that messages will be removed from the persistence store in the future.

TTL is measured in seconds, and guarantees that messages that are younger than that number of seconds will be kept in the database.

Messages older than the TTL interval are not immediately removed from the database: they can remain in the database until at most twice the TTL interval before they are removed, depending on when they were received.

API description

The CSP plugin is controlled over the $CONTROL/stream-processing/v1 topic, with replies published to $CONTROL/stream-processing/v1/response. You should ensure that your access control solution denies access to these topics for unauthorized users.

Commands are sent as JSON payloads, as described in the command sections below. The examples in each section show only a single command in the command array, but multiple commands can be sent at once and will be processed in order.

Commands will generate a response on the response topic. Unless otherwise specified, this will consist of the command string, the optional correlationData string, and an error string. If the command completed successfully, the error string will not be present. Where a command returns data in the response there will be a data object with contents as described in the individual command sections.

Response payload:

{
"responses":[
{
"command": "getStreamMessageCount",
"correlationData": "3c079967-5bee-4409-aa9f-a963180cde94",
"error": "Persistence not configured for this stream"
}
]
}

clearStreamMessages

Clear persisted messages within a defined range. Parameters:

  • streamname : the name of the stream to be cleared.
  • gte : optional lower bound of the range to be cleared, i.e. all messages more recent than this will be cleared. If not present or set to "", then no lower bound will be used. Unix timestamp in seconds.
  • lte : optional upper bound of the range to be cleared, i.e. all messages older than this will be cleared. If not present or set to "", then no upper bound will be used. Unix timestamp in seconds.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Non-code examples:

Clear all messages that are more recent than 1620131088.

clearStreamMessages gte=1620131088

Clear all messages that are older than 1620131088.

clearStreamMessages lte=1620131088

Clear all messages that are more recent than 1620044688 and are also older than 1620131088.

clearStreamMessages gte=1620044688 lte=1620131088

Command payload:

{
"commands": [
{
"command": "clearStreamMessages",
"streamname": "<name of stream>",
"gte": <optional lower bound as unix timestamp number>,
"lte": <optional upper bound as unix timestamp number>,
"correlationData": ""`
}
]
}

createStream and modifyStream

Create a new stream, or modify an existing stream. When modifying a stream, only parameters that are present in the JSON payload are updated.

Parameters:

  • streamname : the name of the stream to be created/modified.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.
  • sourcetopic : the MQTT topic that will be used as the source of messages.
  • targettopic : optional MQTT topic where stream messages will be republished.
  • targetqos : optional QoS that will be used when republishing messages. If set to -1 then the original QoS will be used.
  • ttl : optional Time to Live in seconds that will be used when persisting messages.
  • active : boolean, if set to true the stream will be enabled and operational. If set to false, none of the stream features will be used.
  • process : boolean, if set to true the stream processing feature will be enabled. If set to false, stream processing will be disabled and payloads will be republished without modification.
  • persist : boolean, if set to true the stream persistence feature will be enabled. If set to false, stream persistence will be disabled and messages will not be persisted to disk.
  • textname : optional user string to name the stream.
  • textdescription : optional user string to describe the stream.
  • query : optional object used with the processing feature, can contain a select and/or a where array.
  • select : optional query array
  • where : optional query array. At the moment this array can only contain a single object with an operator, left, and right members. The left string is a JSON path field or alias. The right item can only be a value and can be a string, number, or boolean. The operator is a string which can be one of ==, !=, >, <, >=, or <=.

Command payload:

{
"commands": [
{
"command": "createStream",
"streamname": "<name>",
"correlationData": "",
"sourcetopic": "",
"targettopic": "",
"targetqos": -1|0|1|2,
"ttl": 86400,
"active": true|false,
"process": true|false,
"persist": true|false,
"textname": "",
"textdescription": "",
"query": {
"select": [
{
"source": "",
"target": "",
"alias": "",
"function": {
"name": ""
}
}
],
"where": [
{
"operator": "==",
"left": "",
"right: ""
}
]
}
}
]
}

deleteStream

Delete a stream.

This cannot be undone.

If the stream is currently carrying out a replay, the replay will be stopped.

  • streamname : the name of the stream to be deleted.
  • correlationData : optional string that can be used to correlate commands with responses.

Use a random/unique value.

Command payload:

    "commands": [
{
"command": "deleteStream",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}

disableStream

Disable an existing stream.

Enabling / setting active here means that the stream will accept messages and process, persist, and republish them as it is configured.

If a stream is disabled / set inactive then it will not process, persist, or republish messages.

  • streamname : the name of the stream to be disabled.
  • correlationData : optional string that can be used to correlate commands with responses.

Use a random/unique value.

Command payload:

 "commands": [
{
"command": "disableStream",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}

enableStream

Enable an existing stream. Enabling / setting active here means that the stream will accept messages and process, persist, and republish them as it is configured.

If a stream is disabled / set inactive then it will not process, persist, or republish messages.

  • streamname : the name of the stream to be enabled.
  • correlationData : optional string that can be used to correlate commands with responses.

Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "enableStream",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}

getLicense

Retrieve the current licensed features.

Command payload:

{
"commands": [
{
"command": "getLicense",
"correlationData": ""
}
]
}

Response payload:

{
"responses":[
{
"command": "getLicense",
"correlationData": "",
"data": {
"issuedBy": "Cedalo AG",
"issuedTo": "name@customer.com",
"comment": "Comment",
"serial": "00000000-0000-0000-0000-000000000000",
"features": [
{
"name": "stream-count",
"version": "1.0",
"validSince": 1610219382000,
"validUntil": 1610305782000,
"count": -1
},
{
"name": "stream-processing",
"version": "1.0",
"validSince": 1610219382000,
"validUntil": 1610305782000
},
{
"name": "stream-persistence",
"version": "1.0",
"validSince": 1610219382000,
"validUntil": 1610305782000
}
]
}
}
]
}

getStream

Get information on a stream.

Parameters:

  • streamname : the name of the stream to be queried.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "getStream",
"streamname": "<name>",
"correlationData": ""
}
]
}

Response payload:

{
"responses":[
{
"command": "getStream",
"correlationData": "",
"data": {
"streamname": "<name>",
"sourcetopic": "",
"targettopic": "",
"textname": "",
"textdescription": "",
"targetqos": -1,
"ttl": 86400,
"active": true|false,
"process": true|false,
"persist": true|false,
"replaying": true|false,
"query": {
# Query spec
}
}
}
]
}

getStreamMessageCount

Retrieve the approximate count of messages that are currently persisted.

Parameters:

  • streamname : the name of the stream to be queried.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "getStreamMessageCount",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}

Response payload:

{
"responses":[
{
"command": "getStreamMessageCount",
"correlationData": "",
"data": {
"streamname": "<name of stream>",
"count": <count>
}
}
]
}

listStreams

List all configured streams.

Parameters:

  • verbose : boolean, if set to false a simple list of stream names will be returned. If set to true, the list of streams will be returned as objects in the same format as when using the getStream command.
  • count : optional maximum number of streams to return.
  • offset : optional offset to start the stream list.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "listStreams",
"streamname": "<name>",
"verbose": true|false,
"count": <number>,
"offset": <number>
"correlationData": ""
}
]
}

Response payload:

{
"responses":[
{
"command": "listStreams",
"correlationData": "",
"data": {
"streamname": "<name>",
"sourcetopic": "",
"targettopic": "",
"textname": "",
"textdescription": "",
"targetqos": -1|0|1|2,
"ttl": 86400,
"active": true|false,
"process": true|false,
"persist": true|false,
"replaying": true|false,
"query": {
# Query spec
}
}
}
]
}

replayStream

Replay a persisted stream to a different topic.

Parameters:

  • streamname : the name of the stream to be replayed.
  • replaytopic : The topic where the messages will be published.
  • gte : optional lower bound of the range to be cleared, i.e. all messages more recent than this will be replayed. If not present or set to "", then no lower bound will be used. Unix timestamp in seconds.
  • lte : optional upper bound of the range to be cleared, i.e. all messages older than this will be replayed. If not present or set to "", then no upper bound will be used. Unix timestamp in seconds.
  • limit : optionally limit the number of messages replayed. This number represents a maximum value and may not be reached if there are not enough messages persisted. A value of -1, the default, means that no numerical limit will be applied.
  • speed : optionally change the speed at which the messages are replayed. This can be one of the strings "original" or "fastest", or a number indicating the factor. For example, a speed of 2.5 means replay at 2.5x the original speed.
  • reverse : optionally set to true to play the messages in reverse order.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "replayStream",
"streamname": "<name of stream>",
"replaytopic": "<replay destination topic>",
"gte": <optional lower bound as unix timestamp number>,
"lte": <optional upper bound as unix timestamp number>,
"limit": <optional message limit, number>,
"speed": <optional replay speed, number>,
"reverse": <optional reverse playback mode, boolean>,
"correlationData": ""
}
]
}

stopStreamReplay

Stop a stream replay, if it is running.

Parameters:

  • streamname : the name of the stream to be stopped.
  • correlationData : optional string that can be used to correlate commands with responses. Use a random/unique value.

Command payload:

{
"commands": [
{
"command": "stopStreamReplay",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}