Skip to main content

Streams

The Management Center allows the administration of the CSP Mosquitto plugin.

Cedalo Stream Processing plugin for Mosquitto#

The Cedalo Stream Processing (CSP) plugin allows the creation of topic streams in the Mosquitto broker. A topic stream receives all messages on a specified topic and can then perform the following tasks:

  • Republish messages on a different topic, with optional QoS and retain control
  • Persist messages to disk, with the ability to replay messages in the future
  • Process JSON payload to extract particular data values prior to republishing
  • Apply aggregation functions to data prior to republishing/persisting
  • Select which messages are processed/persisted based on data values in the payload

Controlling the plugin is carried out with an MQTT topic based API. The Management Center for Mosquitto web tool provides a convenient front end to access the API, the mosquitto_ctrl tool provides command line access to the API, or custom tools can be written to control it.

Mosquitto MQTT Streams#

To understand more about the Stream Plugin & how to enable it see the Mosquitto Docs page explanation page. These streams can be created and modified using the managment center. Select the "Streams" tab to access the plugin in th UI:

Streams#

image

The streams UI serves as an overview of your current setup streams and their state. It is further possible to create new streams if neccessary. A stream consists out of different possible settings:

image

Source topic: The topic to be processed with this stream.

Target topic: The topic to forward the processed data to.

Tagret QoS: Quality of serivce of the target topic.

TTL: Time to live for persisted stream data.

Edit field: Field to specify the aggregations. Leave empty to simply create a stream rerouting from source topic to target topic.

Process Switch: If active, aggegration statement from "Edit field" is utilized.

Persist Switch: If active, stream data is saved to drive.

Active Switch: If active, stream is enabled.

Stream processing#

The stream processing feature allows messages with JSON payloads to be modified before they are republished and/or persisted to disk. This is managed with a user defined query that has similar concepts to a SQL statement. The aggregations are specified in the "Edit Field". To closer understand the syntax of stream aggregations see the Mosquitto Docs page.

Stream persistence and replay#

The stream persistence feature allows all messages received on a stream to be saved to disk and then replayed to a new topic at a later point in time. Persistence can be turned on or off at any point for any stream, if the feature is available.

When used on a stream that has processing configured, the processed payload will be stored not the original payload.

Stream persistence is not available for 32-bit platforms.

Replaying#

Replaying a stream means republishing its messages on a different topic. A single replay may be running for each stream at once. Replaying is only possible, if "persitance" is turned on.

To replay a stream via the management center navigate to the stream overview page and click the little play button at the right side of your stream.

image

A dialogue will open, where one is able to set the parameters for the replay:

image

Replay topic: The topic to play the replay in.

gte/lte: Optional. Use a UTC timestamp to specify a time interval of the replay. "gte" = Greater than or equal. "lte" = Less than or equal.

Reverse: If active, the message stream will be replayed in reverse.

Limit: Optional. Limit the number of messages replayed. This number represents a maximum value and may not be reached if there are not enough messages persisted. A value of -1, the default, means that no numerical limit will be applied.

Speed: Optional. Change the speed at which the messages are replayed. This can be one of the strings "original" or "fastest", or a number indicating the factor. For example, a speed of 2.5 means replay at 2.5x the original speed.

Time to Live (TTL)#

When creating a new stream and enabling persistence, the Time to Live (TTL) property will be used, which allows the disk usage of streams to be limited. Setting TTL to 0 means that all messages received will be kept on disk forever, unless they are manually cleared. Setting TTL to a positive integer means that messages will be removed from the persistence store in the future. TTL is measured in seconds, and guarantees that messages that are younger than that number of seconds will be kept in the database. Messages older than the TTL interval are not immediately removed from the database: they can remain in the database until at most twice the TTL interval before they are removed, depending on when they were received.

Clear stream messages#

Clears out all persisted streaming data from disc. image

Delete stream#

Deletes the whole stream. image