The stream configuration is the execution plan of the cep. Cep uses the stream configuration to build the Siddhi processing model using Kafka. You can configure how the cep gets the stream conf to do it you need to use the Bootstrappers.
Stream configuration has two main sections:
Streams
This section is used to define the streams attributes and types.
{
"streams": [
{
"streamName": "streaminput",
"attributes": [
{
"name": "timestamp",
"type": "long"
}
]
}
]
}
This example defines a stream called streaminput
with a one field called timestamp
of type long
.
The supported attributes types are:
- boolean
- long
- string
- float
- double
- object
You can add the number of streams that you want. You must define an stream if you will use it on the next section.
Rules
The rules
section is made by processing rules:
{
"rules": [
{
"id": 1,
"version": "v1",
"streams": {
"in": [
{
"streamName": "streaminput",
"kafkaTopic": "kafkainput"
}
],
"out": [
{
"streamName": "streamoutput",
"kafkaTopic": "kafkaoutput"
}
]
},
"executionPlan": "from streaminput select * insert into streamoutput",
"options": {"filterOutputNullDimension": true}
},
{
"id": 2,
"version": "v1",
"streams": {
"in": [
{
"streamName": "streaminput",
"kafkaTopic": "kafkainput"
}
],
"out": [
{
"streamName": "streamoutput",
"kafkaTopic": "kafkaoutput"
}
]
},
"executionPlan": "from streaminput select * insert into streamoutput"
}
]
}
The rules objects are made by: id
, version
, streams
and executionPlan
id
: The name of the rule.version
: an string that identifies the version of the rule.streams
: a map that contains thein
andout
relations between Siddhi and Kafka.executionPlan
: the SiddhiQL query that will be used to process this rule.options
: An optional map field to modify the rule behaviour.
The current options
available value is:
"filterOutputNullDimension"
: This value can be set totrue
if the rule should filter out the null values at the output when the event is sended to Kafka.
The configuration for streams
is:
"streams": {
"in": [
{
"streamName": "streaminput",
"kafkaTopic": "kafkainput"
"dimMapper": {"fieldNameToConvertTo": "originalKafkaFieldName"}
}
],
"out": [
{
"streamName": "streamoutput",
"kafkaTopic": "kafkaoutput",
"dimMapper": {"fieldNameToConvertTo": "originalKafkaFieldName"}
}
]
}
streamName
: The stream name. This must match the stream name used at the execution plan.kafkaTopic
: The topic name where the data will be readed/written.dimMapper
: As Siddhi does not support field names with characters as.
,-
or_
. This value may be used to rename the field names to ones supported by Siddhi and then reconvert them at the output. This field is optional.
So, the full stream configuration should be:
{
"streams": [
{
"streamName": "streaminput",
"attributes": [
{
"name": "timestamp",
"type": "long"
}
]
}
],
"rules": [
{
"id": "1",
"version": "v1",
"streams": {
"in": [
{
"streamName": "streaminput",
"kafkaTopic": "kafkainput"
}
],
"out": [
{
"streamName": "streamoutput",
"kafkaTopic": "kafkaoutput"
}
]
},
"executionPlan": "from streaminput select * insert into streamoutput"
},
{
"id": "2",
"version": "v1",
"streams": {
"in": [
{
"streamName": "streaminput",
"kafkaTopic": "kafkainput"
}
],
"out": [
{
"streamName": "streamoutput",
"kafkaTopic": "kafkaoutput"
}
]
},
"executionPlan": "from streaminput select * insert into streamoutput"
}
]
}
Important notes: you must define an stream if you use it at one rule. If you want to update and overwrite a rule, you must send the same rule with a different version. If you send a rule with the same version, the existing rule will not be overwritten. The execution plans you can use are defined at SiddhiQL Documentation