The WDP Indexer works with streaming and batch data, and it is based on Druid.
Stream Indexing
The Stream Indexing is based on Kafka Indexing Service. WDP helps to build Druid Task Spec using a simplify spec version’s that is called KafkaSupervisorSimplifySpec.
{
"dimensions": [
"dim1", "dim2"
],
"dimensionExclusions": [],
"metrics": [
{
"type":"count",
"name":"messages"
}
],
"spatialDimensions": [
{
"dimName": "coordinates",
"dims": [
"dim_lat", "dim_long"
]
}
],
"queryGranularity": "MINUTE",
"kafkaTopic": "kafka-example-topic",
"config": {
"kafka": {
"resetOffsetAutomatically": true,
"useEarliestOffset": false,
"skipOffsetGaps": true
},
"rejectPeriod": {
"lateMessageRejectionPeriod": "P7D",
"earlyMessageRejectionPeriod": "P1D"
}
},
"taskCount": 1,
"taskReplicas": 1,
"context": {}
}
Dimensions
The dimensions are indexed like STRING
. You use dimensions to aggregate data over them using metrics.
Field | Description |
---|---|
dimensions | The message fields’s that you want to index to generate rows |
dimensionExclusions | The message field’s that you want to exclude. All other fields will be indexed |
spatialDimensions | The spatial dimensions index coordinates data. You need to indicate latitude coordinate and longitude coordinate |
dimension
and dimensionExclusions
at the same time.Metrics
The metrics
indicate the fields to aggregate and the way to do it. You can define all Druid Indexing Aggregators on this section.
Granularity
The granularity field determines how data gets bucketed across the time dimension. Supported granularity strings are: all
, none
, second
, minute
, fifteen_minute
, thirty_minute
, hour
, day
, week
, month
, quarter
and year
.
Configs
Kafka
Field | Description | Default |
---|---|---|
kafkaTopic | The kafka topic where the task consumes the stream data. | - |
resetOffsetAutomatically | Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on useEarliestOffset. | true |
useEarliestOffset | If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run. | false |
skipOffsetGaps | Whether or not to allow gaps of missing offsets in the Kafka stream. | true |
RejectPeriods
The reject periods use the ISO8601 Period format.
Field | Description | Default |
---|---|---|
lateMessageRejectionPeriod | Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to PT1H and the supervisor creates a task at 2016-01-01T12:00Z, messages with timestamps earlier than 2016-01-01T11:00Z will be dropped. | P7D |
earlyMessageRejectionPeriod | Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; taskDuration=PT1H |
P1D |
Context
The context is used to indicate specific Druid Context, this is used to overwrite default configurations.
Task Scale & High-Availability
Field | Description | Default |
---|---|---|
taskCount | The maximum number of reading tasks in a replica set. This means that the maximum number of reading tasks will be taskCount * replicas and the total number of tasks (reading + publishing) will be higher than this. See ‘Capacity Planning’ below for more details. The number of reading tasks will be less than taskCount if taskCount > {numKafkaPartitions} . |
1 |
earlyMessageRejectionPeriod | he number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different middlemanager to provide resiliency against node failure. | 1 |
Batch Indexing
The Batch Indexing is based on Native Batch Ingestion. WDP helps to build Druid Task Spec using a simplify spec version’s that is called BatchIndexSimplifySpec.
{
"firehose": <firehose_spec>,
"dimensions": [
"dim1", "dim2", "dim3"
],
"dimensionExclusions": [],
"metrics": [
{
"type":"count",
"name":"messages"
}
],
"spatialDimensions": [
{
"dimName": "coordinates",
"dims": [
"dim_lat", "dim_long"
]
}
],
"queryGranularity": "MINUTE",
"intervals": [
"2017/2019"
],
"context": {}
}
Firehose
On this section, you can use any Druid Firehose, including extensions firehose:
Dimensions
The dimensions are indexed like STRING
. You use dimensions to aggregate data over them using metrics.
Field | Description |
---|---|
dimensions | The message fields’s that you want to index to generate rows |
dimensionExclusions | The message field’s that you want to exclude. All other fields will be indexed |
spatialDimensions | The spatial dimensions index coordinates data. You need to indicate latitude coordinate and longitude coordinate |
dimension
and dimensionExclusions
at the same time.Metrics
The metrics
indicate the fields to aggregate and the way to do it. You can define all Druid Indexing Aggregators on this section.
Granularity
The granularity field determines how data gets bucketed across the time dimension. Supported granularity strings are: all
, none
, second
, minute
, fifteen_minute
, thirty_minute
, hour
, day
, week
, month
, quarter
and year
.
Invervals
A JSON Object representing ISO-8601 Intervals. This defines the time ranges to index data.
Context
The context is used to indicate specific Druid Context, this is used to overwrite default configurations.
Examples
S3 Indexing
{
"firehose": {
"type": "static-s3",
"uris": [
"s3://bucket/file.gz"
]
},
"dimensions": [
"dim1"
],
"metrics": [
{
"type": "count",
"name": "count"
}
],
"queryGranularity": "MINUTE",
"intervals": [
"2013/2019"
],
"context": {
"druid.s3.accessKey": "XXXXXX",
"druid.indexer.fork.property.druid.s3.accessKey": "XXXXXX",
"druid.s3.secretKey": "YYYYY",
"druid.indexer.fork.property.druid.s3.secretKey": "YYYYY"
}
}
Remote HTTP Indexing
{
"firehose": {
"type" : "http",
"uris" : ["http://example.com/uri1", "http://example2.com/uri2"]
},
"dimensions": [
"dim1"
],
"metrics": [
{
"type": "count",
"name": "count"
}
],
"queryGranularity": "MINUTE",
"intervals": [
"2013/2019"
],
"context": {
}
}
Local File Indexing
{
"firehose": {
"type" : "local",
"baseDir" : "examples/indexing/",
"filter" : "file_data.json"
},
"dimensions": [
"dim1"
],
"metrics": [
{
"type": "count",
"name": "count"
}
],
"queryGranularity": "MINUTE",
"intervals": [
"2013/2019"
],
"context": {
}
}