Streams
The Stream Processing plugin allows the creation of topic streams in the Mosquitto broker. A topic stream receives all messages on a specified topic and can then perform the following tasks:
- Republish messages on a different topic, with optional QoS and retain control
- Persist messages to disk, with the ability to replay messages in the future
- Process JSON payload to extract particular data values prior to republishing
- Apply aggregation functions to data prior to republishing/persisting
- Select which messages are processed/persisted based on data values in the payload
Controlling the plugin is carried out with an MQTT topic based API. The Management Center provides a convenient front end to access the API. To get more infos about the Stream Plugin & how to enable it see the Mosquitto Streams documentation.
Streams Overview
The streams UI overview shows the list of defined streams and details like the name, description, source topic and target topic. You can also modify the status of the stream and execute actions as described below. In the table below three samples streams are defined.
The following states can be changed:
- Process: Process query as defined while editing the stream or disable processing.
- Persist: Persist stream message to disk for replay or not
- Active: If active, stream is enabled, otherwise it will not execute
The following actions are available:
Clear Stream Messages
Clears out all persisted streaming data from disc.
Replay
Replaying a stream means republishing its messages on a different topic. A single replay may be running for each stream at once. Replaying is only possible, if "persistence" is turned on. A dialog will open, where you can set the parameters for replay:
- Replay topic: The topic to play the replay to.
- gte/lte: Optional. Use a UTC timestamp to specify a time interval of the replay. "gte" = Greater than or equal. " lte" = Less than or equal.
- Reverse: If active, the message stream will be replayed in reverse.
- Limit: Optional. Limit the number of messages replayed. This number represents a maximum value and may not be
reached if there are not enough messages persisted. A value of
-1
, the default, means that no numerical limit will be applied. - Speed: Optional. Change the speed at which the messages are replayed. This can be one of the strings
"original"
or"fastest"
, or a number indicating the factor. For example, a speed of2.5
means replay at 2.5x the original speed.
Delete stream
Click to delete the stream from the server.
Create Stream
To create a stream, click on the "New Stream" Button and the following page will appear:
Here, the following properties can be defined:
- Name: Name of the stream.
- Source topic: The topic to be processed with this stream.
- Target topic: The topic to forward the processed data to.
- Target QoS: Quality of service of the target topic.
- TTL: Time to live for persisted stream data.
- Edit field: Field to specify the stream query. Leave empty to simply create a stream rerouting from source topic to target topic. The stream processing feature allows messages with JSON payloads to be modified before they are republished and/or persisted to disk. This is managed with a user defined query that has similar concepts to a SQL statement. To better understand the syntax of stream queries see the Mosquitto Docs page.
Edit Stream
Editing a stream can be achieved by clicking on an existing stream. The following page will appear:
Click on "Edit" to change the settings.
In addition to the properties described above, the following settings can be modified:
- Process: If active, the query statement is utilized.
- Persist: If active, stream data is persisted.
- Active: If active, stream is enabled.
Time to Live (TTL)
When creating a new stream and enabling persistence, the Time to Live (TTL) property will be used, which allows the disk usage of streams to be limited. Setting TTL to 0 means that all messages received will be kept on disk forever, unless they are manually cleared. Setting TTL to a positive integer means that messages will be removed from the persistence store in the future. TTL is measured in seconds, and guarantees that messages that are younger than that number of seconds will be kept in the database. Messages older than the TTL interval are not immediately removed from the database: they can remain in the database until at most twice the TTL interval before they are removed, depending on when they were received.