Skip to main content
Version: Mosquitto 3.0

Streams Select Query

When processing is enabled the JSON payload can be manipulated using select statements. If no select statements are defined, then the payload will be left unmodified. If select statements are defined, then only the parts of the message that are defined in the select will be added to the outgoing payload, and only messages that match the select specification will be republished or persisted.

The following examples demonstrate how to use define the select queries. Only the select part of the command payload is shown, see the createStream command for the full command payload requirement.

The example incoming payload is:

 {
"Machine Data": {
"Speed": 7.5,
"Angle Deg": 55.3,
"Angle Rad": 0.97,
"Power": 601.6
},
"Location": "Freiburg"
}

A basic select picks a JSON object from the source payload and optionally places it in the target payload. To access items in the JSON object each element of the object hierarchy is described by the object name in square brackets. For example, to select the Speed element the source item would be set to [Machine Data][Speed]. The same technique is used when choosing the target location. It is perfectly valid to have the source and target paths be the same.

Basic payload manipulation

This example extracts the Speed and makes it a top level object, keeps the Location and discards all other payload entries.

"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Speed]"
},{
"source": "[Location]",
"target": "[Location]"
}
]

With the example payload this would result in the following output payload:

{
"Speed": 7.5,
"Location": "Freiburg"
}
note

$ - topics can not be used with the streams plugin

Select Aliases

When defining a select statement an alias can be created. This is a text string, that must not begin or end with a square bracket, that can be used is a where statement instead of the full JSON path.

"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Speed]"
},{
"source": "[Location]",
"target": "[Location],
"alias": "location"
}
]

Aggregate functions

Simple time aggregations can be defined for select statements for calculations over a time bucket. This includes the min, max, sum, count, and time_bucket functions. When a function is defined, the current values for each select statement is stored, and when a message is received, the payload is used to calculate the new value depending on the function in use. A new message will be republished/persisted for every message that is received, and will contain the current values. The current time bucket can be added to the target payload for identifying which bucket a message is in.

The time_bucket function defines the parameters of the time bucket to be used. It has the parameters intervalsize and intervaloffset, which are both in seconds. intervalsize defines the length of the time bucket and must be greater than 0. intervaloffset defines the start offset of the time bucket. So to create a 60 second time interval that starts at 15 seconds into the minute you would use:

"select": [
{
"source": "", # source is ignored for the time_bucket function
"target": "", # set to a JSON path to output to the payload, or blank to ignore
"function": {
"name": "time_bucket",
"intervalsize": 60,
"intervaloffset": 15
}
}
]

Only one time_bucket is allowed per query. The other functions are:

  • max : the maximum value in the time bucket
  • min : the minimum value in the time bucket
  • sum : the sum of all values in the time bucket
  • count : the count of messages received in the time bucket. This function does not require a source.

A more complete example:

"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Data][SpeedMax]",
"alias" : "speedmax",
"function": {
"name" : "max"
}
},
{
"source": "[Machine Data][Power]",
"target": "[Data][PowerSum]",
"alias" : "powersum",
"function": {
"name" : "sum"
}

},
{
"target": "[Data][Count]",
"alias" : "IntervalCount",
"function": {
"name" : "count"
}

},
{
"source": "*",
"target": "",
"alias" : "timeInterval",
"function": {
"name" : "time_bucket",
"intervalsize": 60,
"intervaloffset": 30
}
},
{
"source": "[Location]",
"target": "[Location]",
"alias" : "Location"
}
]

This would produce the output:

{
"Data" {
"SpeedMax": 10,
"PowerSum": 123,
"Count": 3
},
"Location": "Freiburg"
}

The processing feature has some internal sources that can be used:

  • timestamp : the current Unix timestamp in seconds
  • windowstart : the start of the current time bucket, if enabled
  • windowend : the end of the current time bucket, if enabled

For example:

"select": [
{
"source": "timestamp",
"target": "[Data][Now]",
}
]

Choosing messages are used with where statements

It is possible to restrict which messages are processed and persisted by using a where statement. The