The stream configuration is the execution plan of the normalizer. Normalizer uses the stream configuration to build the Kafka Streams topology using DSL API. You can configure how the normalizer gets the stream conf to do it you need to use the Bootstrappers.
Stream configuration has two main sections:
inputs
This section is used to define the mapper between streams and Kafka topic.
{
"inputs": {
"topic1": [
"stream1",
"stream2"
],
"topic2": [
"stream3",
"stream1"
]
}
}
This example defines three streams stream1
is reading both Kafka topic topic1
and topic2
, stream2
is reading Kafka topic topic1
, and stream3
that is reading Kafka topic topic2
. Later, you can use the defined streams inside the stream section.
streams
The streams
section is made by streams (json objects):
{
"streams":{
"stream1":{
"funcs":[
{
"name":"myMapper",
"className":"io.wizzie.normalizer.funcs.impl.SimpleMapper",
"properties": {
"maps": [
{"dimPath":["timestamp"]}
]
}
}
],
"sinks":[
{"topic":"output1", "type": "kafka"}
]
}
}
}
The streams objects are made by a key and a body. The key is the stream’s name that you define previously on input
section and the body is made by funcs
and sinks
.
funcs
The funcs
is a JSON Array where you define different functions to transform the stream. The transformations are sequential so the order when you are defining the functions is so important!!
sinks
The sinks
is a JSON Array where you define sinks. You can define two types of sinks:
kafka
: kafka sink is used to send the data into kafka topic.stream
: stream sink is used to create a new stream that you can use later on thestreams
section.
You can indicate what sink type do you want to use using the field type
, by default the type is kafka
.
Sinks also have the option to configure a filter that is apply to the stream before send the messages through the sink, to define a filter you need to use the field filter
and use some filter function.
"sinks": [
{"topic": "mapper-stream", "type": "stream", "filter": {
"name": "myFilter",
"className": "io.wizzie.normalizer.funcs.impl.ContainsDimensionFilter",
"properties": {
"dimensions": ["A"]
}
}},
{"topic": "diff-splitter-stream", "type": "stream", "filter": {
"name": "myFilter",
"className": "io.wizzie.normalizer.funcs.impl.ContainsDimensionFilter",
"properties": {
"dimensions": ["A"],
"__MATCH": false
}}}
]
Finally, when you define a sink you can repartitioned the stream using different key, to do this process you can use the field partitionBy
.
"sinks":[
{"topic":"output", "partitionBy":"X"},
{"topic":"output1"}
]
By default the partitioned is done using the Kafka key message, that is indicated using the reserved value __KEY
.
These examples are the same:
"sinks":[
{"topic":"output", "partitionBy":"__KEY"}
]
"sinks":[
{"topic":"output1"}
]
stream-builder.sh
The bin/stream-builder.sh
is a script that reads the stream configurations from a folder and generates a file with all the neccesary data.
The usage is:
./stream-builder.sh streams_folder [output_file]
Where streams_folder
is the folder where you have the streams and the optional argument output_file
is the path of the generated output.
The streams_folder
must have only the files that you want to be used by this tool. As an example:
streams_folder/flow.json
{
"inputs": {
"flow":["flow"]
},
"streams":
{
"flow": {
"funcs": [
{
"name": "flowNormMap",
"className": "io.wizzie.normalizer.funcs.impl.SimpleMapper",
"properties": {
"maps": [
{"dimPath": ["client_name"],"as": "user_id"},
{"dimPath": ["client_mac"],"as": "device_id"}
]
}
},
{
"name": "flowNormField",
"className": "io.wizzie.normalizer.funcs.impl.FieldMapper",
"properties": {
"dimensions": [
{
"dimension": "stream_type",
"value": "flow",
"overwrite": false
}
]
}
}
],
"sinks": [
{"topic": "flow-norm","type": "kafka","partitionBy": "device_id"}
]
}
}
}
streams_folder/ntop.json
{
"inputs": {
"ntop":["ntop"]
},
"streams":
{
"ntop": {
"funcs": [
{
"name": "ntopNormMap",
"className": "io.wizzie.normalizer.funcs.impl.SimpleMapper",
"properties": {
"maps": [
{"dimPath": ["APPLICATION_ID"], "as":"application"},
{"dimPath": ["95"], "as":"application"}
]
}
}
],
"sinks": [
{"topic": "flow-norm","type": "kafka"}
]
}
}
}
Generated output:
{
"inputs": {
"flow": [
"flow"
],
"ntop": [
"ntop"
]
},
"streams": {
"flow": {
"funcs": [
{
"name": "flowNormMap",
"className": "io.wizzie.normalizer.funcs.impl.SimpleMapper",
"properties": {
"maps": [
{
"dimPath": [
"client_name"
],
"as": "user_id"
},
{
"dimPath": [
"client_mac"
],
"as": "device_id"
}
]
}
},
{
"name": "flowNormField",
"className": "io.wizzie.normalizer.funcs.impl.FieldMapper",
"properties": {
"dimensions": [
{
"dimension": "stream_type",
"value": "flow",
"overwrite": false
}
]
}
}
],
"sinks": [
{
"topic": "flow-norm",
"type": "kafka",
"partitionBy": "device_id"
}
]
},
"ntop": {
"funcs": [
{
"name": "ntopNormMap",
"className": "io.wizzie.normalizer.funcs.impl.SimpleMapper",
"properties": {
"maps": [
{
"dimPath": [
"APPLICATION_ID"
],
"as": "application"
},
{
"dimPath": [
"95"
],
"as": "application"
}
]
}
}
],
"sinks": [
{
"topic": "flow-norm",
"type": "kafka"
}
]
}
}
}
The function does not care about file names. It takes the sections defined at all files and merge them.
Other Notes
- If you build an invalid plan, the PlanBuilderException will be throw.
- You can’t do loop on the stream topology, if you define a loop the TryToDoLoopException will be throw.