Enrichment Query Language (EQL) is a query language generated with ANTLR that allows us define that streams and fields to use and join. Before we look into details of the EQL, let’s take a look at a few definitions of terms.
Term | Definition |
---|---|
Stream | A stream represents continuosly updating data set of unknown size. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records |
Data record | A data record is defined as key-value pair |
Table | A table is a collection of key-value pairs |
Joiner | A joiner merges two streams or tables based on the keys of their data records, and yields a new stream |
Enricher | An enricher add relational information about events |
Queries
Following provides an abstract diagram definition for EQL.
Joiners
A joiner merges two streams or tables based on the keys of their data records, and yields a new stream.
Joiner basic syntax is as follows:
JOIN SELECT <comma-separated-fields or *> FROM (TABLE|STREAM) <stream-name> [[PARTITION] BY <field-name>] USING <joiner-name>
Where:
<comma-sparated-fields or *>
or<field-name>
: The selected fields to join. The wildcard*
indicates ‘all fields of stream’.<stream-name>
: The selected stream from which enricher will get data.<joiner-name>
: The selected joiner in enrichment config json.
GLOBAL TABLE
The FROM GLOBAL TABLE
clause allows us use a stream as Kafka Stream GlobalKTable.
Optional PARTITION BY
The [PARTITION] BY
clause allows us partition by field instead of stream’s key. We are going to illustrate this behaviour in next diagram:
Enrichers
An enricher add relational information about events.
Enricher basic syntax is as follows:
ENRICH WITH <enricher-name>
Where:
<enricher-name>
: The selected enricher in enrichment configuration json.
An enricher can be any data source that works with Json messages.
Output Stream
The INSERT INTO
clause allows us to define what stream to use as the output stream, this clause can be complemented with PARTITION BY
clause that allows us repartition the output stream by a specific field. If we omit the PARTITION BY
clause, the enricher will use the current partition key of the input stream.
Examples
Simple field extraction
Suppose that we have two streams with fields:
- inputStream1: fieldA and fieldB
- inputStream2: fieldC and fieldD
If we define next query:
SELECT fieldA, fieldB, fieldC FROM inputStream1, inputStream2 INSERT INTO STREAM output
Enricher extracts fieldA, fieldB and fieldC from both inputStream1 and inputStream2 if exists and inserts them in output.
This query is very simple and not enrich, Enricher only extracts and inserts fields.
Simple streams join
If we define next query
SELECT * FROM STREAM inputStream1 JOIN SELECT fieldY FROM STREAM inputStream2 USING simpleStreamPreferredJoiner INSERT INTO STREAM output
Enricher extracts all fields from intputStream1 and join them with the fieldY from inputStream2 using simpleStreamPreferredJoiner in order to do it is necessary that the streams share the same key.
Simple streams enrich
If we define next query
SELECT * FROM STREAM input ENRICH WITH simpleStreamEnrich INSERT INTO STREAM output
Enricher extracts all fields from input and enrich with simpleStreamEnrich using as data source a relational database.
Complex streams enrich and join
Suppose that we have a system with data about flow and location and we have a key-value store with information about ip reputation:
We need enrich and join this information and send data to final stream enrichflow. In order to do this we have to define next EQL:
SELECT src, protocol FROM STREAM flow JOIN SELECT * FROM STREAM location USING simpleStreamPreferredJoiner ENRICH WITH reputationStreamEnrich INSERT INTO STREAM enrichflow
These are some examples about EQL and how Enricher works.