On this page, we can to try a stream example using a real Kafka cluster and the cep jar artifact. We are going to suppose that you have built the cep distribution how we explain on the Building section.
Explication
First of all, we need define a the config for launch a cep application.
CEP Config Json (config.json)
{
"application.id": "cep-instance-id",
"bootstrap.servers": "localhost:9092",
"num.stream.threads": 1,
"bootstrapper.classname": "io.wizzie.bootstrapper.bootstrappers.impl.KafkaBootstrapper",
"bootstrap.kafka.topics": ["__cep_bootstrapper"],
"metric.enable": true,
"metric.listeners": ["io.wizzie.metrics.ConsoleMetricListener"],
"metric.interval": 60000
}
You can now start the cep application:
./cep-start.sh config/config.json
At this point the CEP application is listening for new stream processing definitions. You can send one with the “streamer-kafka.sh” tool. As an example:
Stream processing definition
{
"streams": [
{
"streamName": "streaminput",
"attributes": [
{
"name": "timestamp",
"type": "long"
}
]
}
],
"rules": [
{
"id": "1",
"version": "v1",
"streams": {
"in": [
{
"streamName": "streaminput",
"kafkaTopic": "kafkainput"
}
],
"out": [
{
"streamName": "streamoutput",
"kafkaTopic": "kafkaoutput"
}
]
},
"executionPlan": "from streaminput select * insert into streamoutput"
},
{
"id": "2",
"version": "v1",
"streams": {
"in": [
{
"streamName": "streaminput",
"kafkaTopic": "kafkainput"
}
],
"out": [
{
"streamName": "streamoutput",
"kafkaTopic": "kafkaoutput"
}
]
},
"executionPlan": "from streaminput select * insert into streamoutput"
}
]
}
./bin/streamer-kafka.sh localhost:9092 cep-instance-id config/sample_processing_model.json
On this example we read the Kafka topic kafkainput
, map it to the stream streaminput
and insert every field to the stream streamoutput
which is mapped to the Kafka topic kafkaoutput
Now we can send an event to kafka topic kafkainput
.
{"timestamp":123456789}
The CEP instance should output the same at kafka topic kafkaoutput
:
{"timestamp":123456789}
Execution
On the first time we need to have a running Kafka cluster and the decompressed cep distribution.
Config file
We need to modify the config file that is inside the folder config/sample_config.json
, we can change it or destroy it and create a new one with this content.
{
"application.id": "cep-instance-id",
"bootstrap.servers": "localhost:9092",
"num.stream.threads": 1,
"bootstrapper.classname": "io.wizzie.bootstrapper.bootstrappers.impl.KafkaBootstrapper",
"bootstrap.kafka.topics": ["__cep_bootstrapper"],
"metric.enable": true,
"metric.listeners": ["io.wizzie.cep.metrics.ConsoleMetricListener"],
"metric.interval": 60000
}
On this config file we indicate the application.id
that will identify our instances group and the running Zookeeper and some Kafka Broker. On the example we are going to use the KafkaBootstrapper
so we read the config using the kafka cluster.
Now we can start the cep service to do that we can uses the init script that is inside the folder bin:
cep/bin/cep-start.sh cep/config/sample_config.json
Now you can produce some input message into kafkainput
Kafka topic, but first you could open a Kafka consumer to check the output messages.
- Consumer
kafka_dist/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --property print.key=true --topic kafkaoutput --new-consumer
- Producer
kafka_dist/bin/kafka-console-producer.sh --broker-list localhost:9092 --property parse.key=true --property key.separator=, --topic kafkainput
You can write some message into console-producer:
null,{"timestamp":123456789}
and you must see the output message on the console-consumer:
null,{"timestamp":123456789}
This is the end of the tutorial!! Congratulations! ;)