The CEP is able to bypass the incoming Kafka key through Siddhi. This is how it is made:
When the CEP read a Kafka message that is keyed:
aabb,{"timestamp":1234, "value":3}
It inserts the key (“aabb”) as an event field with the key “KAFKA_KEY”. The event will be readed by Siddhi as:
{"timestamp":1234, "value":3, "KAFKA_KEY":"aabb"}
Also, when you define an stream:
{
"streamName": "streaminput",
"attributes": [
{
"name": "timestamp",
"type": "long"
},
{
"name": "value",
"type": "integer"
}
]
}
The actual stream that is defined is:
{
"streamName": "streaminput",
"attributes": [
{
"name": "timestamp",
"type": "long"
},
{
"name": "value",
"type": "integer"
},
{
"name": "KAFKA_KEY",
"type": "string"
}
]
}
This allows the CEP to pass the key through Siddhi core.
Finally, when Siddhi sends the output to Kafka, the "KAFKA_KEY"
field will be removed from the event and will be used as the key.
So, if you send the next event with the following processing definition:
{
"streams": [
{
"streamName": "streaminput",
"attributes": [
{
"name": "timestamp",
"type": "long"
},
{
"name": "value",
"type": "integer"
}
]
}
],
"rules": [
{
"id": 1,
"version": "v1",
"streams": {
"in": [
{
"streamName": "streaminput",
"kafkaTopic": "kafkainput"
}
],
"out": [
{
"streamName": "streamoutput",
"kafkaTopic": "kafkaoutput"
}
]
},
"executionPlan": "from streaminput select * insert into streamoutput"
}
]
}
And you send:
aabb,{"timestamp":1234, "value":3}
You will get at the output kafka topic:
aabb,{"timestamp":1234, "value":3}
The user has to ensure that the KAFKA_KEY field is selected to be at the Siddhi output. If it isn’t selected the KAFKA_KEY will not be used to write the message to Kafka.
Overwriting the KAFKA_KEY
You can also use other fields of the message as the KAKFA_KEY. If you use a query like this:
"from stream select timestamp, value, value as KAFKA_KEY insert into streamoutput"
The field “value” will be used as the KAFKA_KEY. So the output will be:
3,{"timestamp":1234, "value":3}