Streams Select Query
When processing is enabled the JSON payload can be manipulated using select
statements. If no select statements are defined, then the payload will be left
unmodified. If select statements are defined, then only the parts of the
message that are defined in the select will be added to the outgoing payload,
and only messages that match the select specification will be republished or
persisted.
The following examples demonstrate how to use define the select
queries.
Only the select
part of the command payload is shown, see the createStream
command for the full command payload requirement.
The example incoming payload is:
{
"Machine Data": {
"Speed": 7.5,
"Angle Deg": 55.3,
"Angle Rad": 0.97,
"Power": 601.6
},
"Location": "Freiburg"
}
A basic select picks a JSON object from the source payload and optionally
places it in the target payload. To access items in the JSON object each
element of the object hierarchy is described by the object name in square
brackets. For example, to select the Speed element the source
item would be
set to [Machine Data][Speed]
. The same technique is used when choosing the
target location. It is perfectly valid to have the source and target paths be
the same.
Basic payload manipulation
This example extracts the Speed and makes it a top level object, keeps the Location and discards all other payload entries.
"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Speed]"
},{
"source": "[Location]",
"target": "[Location]"
}
]
With the example payload this would result in the following output payload:
{
"Speed": 7.5,
"Location": "Freiburg"
}
$ - topics can not be used with the streams plugin
Select Aliases
When defining a select
statement an alias
can be created. This is a text
string, that must not begin or end with a square bracket, that can be used is a
where
statement instead of the full JSON path.
"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Speed]"
},{
"source": "[Location]",
"target": "[Location],
"alias": "location"
}
]
Aggregate functions
Simple time aggregations can be defined for select statements for calculations
over a time bucket. This includes the min
, max
, sum
, count
, and
time_bucket
functions. When a function is defined, the current values for
each select statement is stored, and when a message is received, the payload is
used to calculate the new value depending on the function in use. A new message
will be republished/persisted for every message that is received, and will
contain the current values. The current time bucket can be added to the target
payload for identifying which bucket a message is in.
The time_bucket
function defines the parameters of the time bucket to be
used. It has the parameters intervalsize
and intervaloffset
, which are both
in seconds. intervalsize
defines the length of the time bucket and must be
greater than 0. intervaloffset
defines the start offset of the time bucket.
So to create a 60 second time interval that starts at 15 seconds into the
minute you would use:
"select": [
{
"source": "", # source is ignored for the time_bucket function
"target": "", # set to a JSON path to output to the payload, or blank to ignore
"function": {
"name": "time_bucket",
"intervalsize": 60,
"intervaloffset": 15
}
}
]
Only one time_bucket
is allowed per query. The other functions are:
max
: the maximum value in the time bucketmin
: the minimum value in the time bucketsum
: the sum of all values in the time bucketcount
: the count of messages received in the time bucket. This function does not require a source.
A more complete example:
"select": [
{
"source": "[Machine Data][Speed]",
"target": "[Data][SpeedMax]",
"alias" : "speedmax",
"function": {
"name" : "max"
}
},
{
"source": "[Machine Data][Power]",
"target": "[Data][PowerSum]",
"alias" : "powersum",
"function": {
"name" : "sum"
}
},
{
"target": "[Data][Count]",
"alias" : "IntervalCount",
"function": {
"name" : "count"
}
},
{
"source": "*",
"target": "",
"alias" : "timeInterval",
"function": {
"name" : "time_bucket",
"intervalsize": 60,
"intervaloffset": 30
}
},
{
"source": "[Location]",
"target": "[Location]",
"alias" : "Location"
}
]
This would produce the output:
{
"Data" {
"SpeedMax": 10,
"PowerSum": 123,
"Count": 3
},
"Location": "Freiburg"
}
The processing feature has some internal sources that can be used:
timestamp
: the current Unix timestamp in secondswindowstart
: the start of the current time bucket, if enabledwindowend
: the end of the current time bucket, if enabled
For example:
"select": [
{
"source": "timestamp",
"target": "[Data][Now]",
}
]
Choosing messages are used with where statements
It is possible to restrict which messages are processed and persisted by using
a where
statement. The