Streams
Cedalo Stream Processing plugin for Mosquitto
The Cedalo Stream Processing (CSP) plugin allows the creation of topic streams in the Mosquitto broker. A topic stream receives all messages on a specified topic and can then perform the following tasks:
- Republish messages on a different topic, with optional QoS and retain control
- Persist messages to disk, with the ability to replay messages in the future
- Process JSON payload to extract particular data values prior to republishing
- Apply aggregation functions to data prior to republishing/persisting
- Select which messages are processed/persisted based on data values in the payload
Controlling the plugin is carried out with an MQTT topic based API. The Management Center for Mosquitto web tool provides a convenient front end to access the API, the mosquitto_ctrl tool provides command line access to the API, or custom tools can be written to control it.
The streams plugin can be controlled via the Mosquitto Management Center
Quick start
To activate the plugin, make sure your configuration file at /mosquitto/config/mosquitto.conf
is using the following lines:
plugin /usr/lib/cedalo_stream_processing.so
plugin_opt_data_dir /mosquitto/data/csp
After the start of Mosquitto the logs should show your license information:
CSP: Cedalo Streaming Plugin initializing. CSP: License serial: decc1940-a806-11eb-ac1e-9d31c1d948ef CSP: License issued by: Cedalo AG CSP: License issued to: info@cedalo.com CSP: License comment: This is a demo license. CSP: Licensed stream count: 5 AVAILABLE until 2022-04-28T10:48:22 CSP: Processing AVAILABLE until 2022-04-28T10:48:22 CSP: Persistence AVAILABLE until 2022-04-28T10:48:22
Changing the configuration is only necessary, if you manually add the Streams Plugin in a later stage.
Stream processing
The stream processing feature allows messages with JSON payloads to be modified before they are republished and/or persisted to disk. This is managed with a user defined query that has similar concepts to a SQL statement.
API description
The CSP plugin is controlled over the $CONTROL/stream-processing/v1
topic, with
replies published to $CONTROL/stream-processing/v1/response
. You should ensure that
your access control solution denies access to these topics for unauthorized
users.
Commands are sent as JSON payloads, as described in the command sections below. The examples in each section show only a single command in the command array, but multiple commands can be sent at once and will be processed in order.
Commands will generate a response on the response topic. Unless otherwise
specified, this will consist of the command
string, the optional
correlationData
string, and an error
string. If the command completed
successfully, the error
string will not be present. Where a command returns
data in the response there will be a data
object with contents as described
in the individual command sections.
Response payload:
{
"responses":[
{
"command": "getStreamMessageCount",
"correlationData": "3c079967-5bee-4409-aa9f-a963180cde94",
"error": "Persistence not configured for this stream"
}
]
}
Modifying the payload with select statements
When processing is enabled the JSON payload can be manipulated using select
statements. If no select statements are defined, then the payload will be left
unmodified. If select statements are defined, then only the parts of the
message that are defined in the select will be added to the outgoing payload,
and only messages that match the select specification will be republished or
persisted.
The following examples demonstrate how to use define the select
queries.
Only the select
part of the command payload is shown, see the createStream
command for the full command payload requirement.
The example incoming payload is:
{
"Machine Data": {
"Speed": 7.5,
"Angle Deg": 55.3,
"Angle Rad": 0.97,
"Power": 601.6
},
"Location": "Freiburg"
}
A basic select picks a JSON object from the source payload and optionally
places it in the target payload. To access items in the JSON object each
element of the object hierarchy is described by the object name in square
brackets. For example, to select the Speed element the source
item would be
set to [Machine Data][Speed]
. The same technique is used when choosing the
target location. It is perfectly valid to have the source and target paths be
the same.
Basic payload manipulation
This example extracts the Speed and makes it a top level object, keeps the Location and discards all other payload entries.
"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Speed]"
},{
"source": "[Location]",
"target": "[Location]
}
]
With the example payload this would result in the following output payload:
{
"Speed": 7.5,
"Location": "Freiburg"
}
$ - topics can not be used with the streams plugin
Select Aliases
When defining a select
statement an alias
can be created. This is a text
string, that must not begin or end with a square bracket, that can be used is a
where
statement instead of the full JSON path.
"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Speed]"
},{
"source": "[Location]",
"target": "[Location],
"alias": "location"
}
]
Aggregate functions
Simple time aggregations can be defined for select statements for calculations
over a time bucket. This includes the min
, max
, sum
, count
, and
time_bucket
functions. When a function is defined, the current values for
each select statement is stored, and when a message is received, the payload is
used to calculate the new value depending on the function in use. A new message
will be republished/persisted for every message that is received, and will
contain the current values. The current time bucket can be added to the target
payload for identifying which bucket a message is in.
The time_bucket
function defines the parameters of the time bucket to be
used. It has the parameters intervalsize
and intervaloffset
, which are both
in seconds. intervalsize
defines the length of the time bucket and must be
greater than 0. intervaloffset
defines the start offset of the time bucket.
So to create a 60 second time interval that starts at 15 seconds into the
minute you would use:
"select": [
{
"source": "", # source is ignored for the time_bucket function
"target": "", # set to a JSON path to output to the payload, or blank to ignore
"function": {
"name": "time_bucket",
"intervalsize": 60,
"intervaloffset": 15
}
}
]
Only one time_bucket
is allowed per query. The other functions are:
max
: the maximum value in the time bucketmin
: the minimum value in the time bucketsum
: the sum of all values in the time bucketcount
: the count of messages received in the time bucket. This function does not require a source.
A more complete example:
"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Data][SpeedMax]",
"alias" : "speedmax",
"function": {
"name" : "max"
}
},
{
"source": "[Machine Data][Power]",
"target": "[Data][PowerSum]",
"alias" : "powersum",
"function": {
"name" : "sum"
}
},
{
"target": "[Data][Count]",
"alias" : "IntervalCount",
"function": {
"name" : "count"
}
},
{
"source": "*",
"target": "",
"alias" : "timeInterval",
"function": {
"name" : "time_bucket",
"intervalsize": 60,
"intervaloffset": 30
}
},
{
"source": "[Location]",
"target": "[Location]",
"alias" : "Location"
}
]
This would produce the output:
{
"Data" {
"SpeedMax": 10,
"PowerSum": 123,
"Count": 3
},
"Location": "Freiburg"
}
The processing feature has some internal sources that can be used:
timestamp
: the current Unix timestamp in secondswindowstart
: the start of the current time bucket, if enabledwindowend
: the end of the current time bucket, if enabled
For example:
"select": [
{
"source": "timestamp",
"target": "[Data][Now]",
}
]
Choosing messages are used with where statements
It is possible to restrict which messages are processed and persisted by using
a where
statement. The
Stream persistence and replay
The stream persistence feature allows all messages received on a stream to be saved to disk and then replayed to a new topic at a later point in time. Persistence can be turned on or off at any point for any stream, if the feature is available.
When used on a stream that has processing configured, the processed payload will be stored not the original payload.
Stream persistence is not available for 32-bit platforms.
Replaying
Replaying a stream means republishing its messages on a different topic. A single replay may be running for each stream at once.
To replay a stream, use the replayStream
command. The only requirement for is
to specify the destination topic. Further control is possible using the gte
,
lte
, limit
, speed
, and reverse
parameters.
If only the replaytopic
parameter is provided, all of the messages stored for
the stream will be republished in order, with the first message published
immediately and subsequent messages published at approximately the same
interval as the original messages.
The range and number of messages to be replayed can be limited using gte
,
lte
, and limit
. gte
and lte
are the lower and upper bounds of the time
range that should be replayed, respectively. They are both Unix timestamps in
seconds. Both are optional, so specifying a range "all messages up to this
point" or "all messages since this point" are both possible, as well as "only
messages within this range". The limit
parameter limits the total number of
messages that will be replayed, and can be used in conjunction with gte
and/or lte
, or on its own. The default value of -1 means there is no
numerical limit.
The speed of the replay can be controlled using the speed
parameter. Set to
the string "original" to keep the original speed, or to "fastest" to be
republished at the fastest rate possible. Alternatively, set to a number to
specify an exact speed multiple, e.g. setting to 2.5
would replay the stream
2.5x faster than the original.
Stopping an in-progress replay for a stream can be done using the
stopStreamReplay
command.
Message count
It is possible to obtain the estimated number of message currently persisted
using the getStreamMessageCount
command.
Clearing messages
Persisted messages can be manually cleared using the clearStreamMessages
API
command. This completely removes the messages from disk and can not be undone.
All messages can be cleared, or a range of messages can be cleared based on
the message timestamp in unix timestamp seconds format. Use the gte
parameter
to specify the lower bound of the timestamp that will be cleared, and the lte
parameter to specify the upper bound of the range to be cleared.
See the API description for details of gte
and lte
.
Time to Live (TTL)
When creating a new stream and enabling persistence, the Time to Live (TTL) property will be used, which allows the disk usage of streams to be limited. Setting TTL to 0 means that all messages received will be kept on disk forever, unless they are manually cleared. Setting TTL to a positive integer means that messages will be removed from the persistence store in the future. TTL is measured in seconds, and guarantees that messages that are younger than that number of seconds will be kept in the database. Messages older than the TTL interval are not immediately removed from the database: they can remain in the database until at most twice the TTL interval before they are removed, depending on when they were received.
clearStreamMessages
Clear persisted messages within a defined range.
Parameters:
streamname
: the name of the stream to be cleared.gte
: optional lower bound of the range to be cleared, i.e. all messages more recent than this will be cleared. If not present or set to "", then no lower bound will be used. Unix timestamp in seconds.lte
: optional upper bound of the range to be cleared, i.e. all messages older than this will be cleared. If not present or set to "", then no upper bound will be used. Unix timestamp in seconds.correlationData
: optional string that can be used to correlate commands with responses. Use a random/unique value.
Non-code examples:
# Clear all messages that are more recent than 1620131088.
clearStreamMessages gte=1620131088
# Clear all messages that are older than 1620131088.
clearStreamMessages lte=1620131088
# Clear all messages that are more recent than 1620044688 and are also older
# than 1620131088.
clearStreamMessages gte=1620044688 lte=1620131088
Command payload:
{
"commands": [
{
"command": "clearStreamMessages",
"streamname": "<name of stream>",
"gte": <optional lower bound as unix timestamp number>,
"lte": <optional upper bound as unix timestamp number>,
"correlationData": ""
}
]
}
createStream and modifyStream
Create a new stream, or modify an existing stream.
When modifying a stream, only parameters that are present in the JSON payload are updated.
Parameters:
streamname
: the name of the stream to be created/modified.correlationData
: optional string that can be used to correlate commands with responses. Use a random/unique value.sourcetopic
: the MQTT topic that will be used as the source of messages.targettopic
: optional MQTT topic where stream messages will be republished.targetqos
: optional QoS that will be used when republishing messages. If set to -1 then the original QoS will be used.ttl
: optional Time to Live in seconds that will be used when persisting messages.active
: boolean, if set to true the stream will be enabled and operational. If set to false, none of the stream features will be used.process
: boolean, if set to true the stream processing feature will be enabled. If set to false, stream processing will be disabled and payloads will be republished without modification.persist
: boolean, if set to true the stream persistence feature will be enabled. If set to false, stream persistence will be disabled and messages will not be persisted to disk.textname
: optional user string to name the stream.textdescription
: optional user string to describe the stream.query
: optional object used with the processing feature, can contain aselect
and/or awhere
array.select
: optional query arraywhere
: optional query array. At the moment this array can only contain a single object with anoperator
,left
, andright
members. Theleft
string is a JSON path field or alias. Theright
item can only be a value and can be a string, number, or boolean. Theoperator
is a string which can be one of==
,!=
,>
,<
,>=
, or<=
.
Command payload:
{
"commands": [
{
"command": "createStream",
"streamname": "<name>",
"correlationData": "",
"sourcetopic": "",
"targettopic": "",
"targetqos": -1|0|1|2,
"ttl": 86400,
"active": true|false,
"process": true|false,
"persist": true|false,
"textname": "",
"textdescription": "",
"query": {
"select": [
{
"source": "",
"target": "",
"alias": "",
"function": {
"name": ""
}
}
],
"where": [
{
"operator": "==",
"left": "",
"right: ""
}
]
}
}
]
}
deleteStream
Delete a stream. This cannot be undone. If the stream is currently carrying out a replay, the replay will be stopped.
streamname
: the name of the stream to be deleted.correlationData
: optional string that can be used to correlate commands with responses. Use a random/unique value.
Command payload:
{
"commands": [
{
"command": "deleteStream",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}
disableStream
Disable an existing stream. Enabling / setting active here means that the stream will accept messages and process, persist, and republish them as it is configured. If a stream is disabled / set inactive then it will not process, persist, or republish messages.
streamname
: the name of the stream to be disabled.correlationData
: optional string that can be used to correlate commands with responses. Use a random/unique value.
Command payload:
{
"commands": [
{
"command": "disableStream",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}
enableStream
Enable an existing stream. Enabling / setting active here means that the stream will accept messages and process, persist, and republish them as it is configured. If a stream is disabled / set inactive then it will not process, persist, or republish messages.
streamname
: the name of the stream to be enabled.correlationData
: optional string that can be used to correlate commands with responses. Use a random/unique value.
Command payload:
{
"commands": [
{
"command": "enableStream",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}
getLicense
Retrieve the current licensed features.
Command payload:
{
"commands": [
{
"command": "getLicense",
"correlationData": ""
}
]
}
Response payload:
{
"responses":[
{
"command": "getLicense",
"correlationData": "",
"data": {
"issuedBy": "Cedalo AG",
"issuedTo": "name@customer.com",
"comment": "Comment",
"serial": "00000000-0000-0000-0000-000000000000",
"features": [
{
"name": "stream-count",
"version": "1.0",
"validSince": 1610219382000,
"validUntil": 1610305782000,
"count": -1
},
{
"name": "stream-processing",
"version": "1.0",
"validSince": 1610219382000,
"validUntil": 1610305782000
},
{
"name": "stream-persistence",
"version": "1.0",
"validSince": 1610219382000,
"validUntil": 1610305782000
}
]
}
}
]
}
getStream
Get information on a stream.
Parameters:
streamname
: the name of the stream to be queried.correlationData
: optional string that can be used to correlate commands with responses. Use a random/unique value.
Command payload:
{
"commands": [
{
"command": "getStream",
"streamname": "<name>",
"correlationData": ""
}
]
}
Response payload:
{
"responses":[
{
"command": "getStream",
"correlationData": "",
"data": {
"streamname": "<name>",
"sourcetopic": "",
"targettopic": "",
"textname": "",
"textdescription": "",
"targetqos": -1,
"ttl": 86400,
"active": true|false,
"process": true|false,
"persist": true|false,
"replaying": true|false,
"query": {
# Query spec
}
}
}
]
}
getStreamMessageCount
Retrieve the approximate count of messages that are currently persisted.
Parameters:
streamname
: the name of the stream to be queried.correlationData
: optional string that can be used to correlate commands with responses. Use a random/unique value.
Command payload:
{
"commands": [
{
"command": "getStreamMessageCount",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}
Response payload:
{
"responses":[
{
"command": "getStreamMessageCount",
"correlationData": "",
"data": {
"streamname": "<name of stream>",
"count": <count>
}
}
]
}
listStreams
List all configured streams.
Parameters:
verbose
: boolean, if set to false a simple list of stream names will be returned. If set to true, the list of streams will be returned as objects in the same format as when using thegetStream
command.count
: optional maximum number of streams to return.offset
: optional offset to start the stream list.correlationData
: optional string that can be used to correlate commands with responses. Use a random/unique value.
Command payload:
{
"commands": [
{
"command": "listStreams",
"streamname": "<name>",
"verbose": true|false,
"count": <number>,
"offset": <number>
"correlationData": ""
}
]
}
Response payload:
{
"responses":[
{
"command": "listStreams",
"correlationData": "",
"data": {
"streamname": "<name>",
"sourcetopic": "",
"targettopic": "",
"textname": "",
"textdescription": "",
"targetqos": -1|0|1|2,
"ttl": 86400,
"active": true|false,
"process": true|false,
"persist": true|false,
"replaying": true|false,
"query": {
# Query spec
}
}
}
]
}
replayStream
Replay a persisted stream to a different topic.
Parameters:
streamname
: the name of the stream to be replayed.replaytopic
: The topic where the messages will be published.gte
: optional lower bound of the range to be cleared, i.e. all messages more recent than this will be replayed. If not present or set to "", then no lower bound will be used. Unix timestamp in seconds.lte
: optional upper bound of the range to be cleared, i.e. all messages older than this will be replayed. If not present or set to "", then no upper bound will be used. Unix timestamp in seconds.limit
: optionally limit the number of messages replayed. This number represents a maximum value and may not be reached if there are not enough messages persisted. A value of-1
, the default, means that no numerical limit will be applied.speed
: optionally change the speed at which the messages are replayed. This can be one of the strings"original"
or"fastest"
, or a number indicating the factor. For example, a speed of2.5
means replay at 2.5x the original speed.reverse
: optionally set totrue
to play the messages in reverse order.correlationData
: optional string that can be used to correlate commands with responses. Use a random/unique value.
Command payload:
{
"commands": [
{
"command": "replayStream",
"streamname": "<name of stream>",
"replaytopic": "<replay destination topic>",
"gte": <optional lower bound as unix timestamp number>,
"lte": <optional upper bound as unix timestamp number>,
"limit": <optional message limit, number>,
"speed": <optional replay speed, number>,
"reverse": <optional reverse playback mode, boolean>,
"correlationData": ""
}
]
}
stopStreamReplay
Stop a stream replay, if it is running.
Parameters:
streamname
: the name of the stream to be stopped.correlationData
: optional string that can be used to correlate commands with responses. Use a random/unique value.
Command payload:
{
"commands": [
{
"command": "stopStreamReplay",
"streamname": "<name of stream>",
"correlationData": ""
}
]
}