The WDP Indexer works with streaming and batch data, and it is based on Druid.

Stream Indexing

The Stream Indexing is based on Kafka Indexing Service. WDP helps to build Druid Task Spec using a simplify spec version’s that is called KafkaSupervisorSimplifySpec.

{
  "dimensions": [
    "dim1", "dim2"
  ],
  "dimensionExclusions": [],
  "metrics": [
    {
      "type":"count",
      "name":"messages"
    }
  ],
  "spatialDimensions": [
    {
      "dimName": "coordinates",
      "dims": [
        "dim_lat", "dim_long"
      ]
    }
  ],
  "queryGranularity": "MINUTE",
  "kafkaTopic": "kafka-example-topic",
  "config": {
    "kafka": {
      "resetOffsetAutomatically": true,
      "useEarliestOffset": false,
      "skipOffsetGaps": true
    },
    "rejectPeriod": {
      "lateMessageRejectionPeriod": "P7D",
      "earlyMessageRejectionPeriod": "P1D"
    }
  },
  "taskCount": 1,
  "taskReplicas": 1,
  "context": {}
}

Dimensions

The dimensions are indexed like STRING. You use dimensions to aggregate data over them using metrics.

Field Description
dimensions The message fields’s that you want to index to generate rows
dimensionExclusions The message field’s that you want to exclude. All other fields will be indexed
spatialDimensions The spatial dimensions index coordinates data. You need to indicate latitude coordinate and longitude coordinate

Metrics

The metrics indicate the fields to aggregate and the way to do it. You can define all Druid Indexing Aggregators on this section.

Granularity

The granularity field determines how data gets bucketed across the time dimension. Supported granularity strings are: all, none, second, minute, fifteen_minute, thirty_minute, hour, day, week, month, quarter and year.

Configs

Kafka

Field Description Default
kafkaTopic The kafka topic where the task consumes the stream data. -
resetOffsetAutomatically Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on useEarliestOffset. true
useEarliestOffset If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run. false
skipOffsetGaps Whether or not to allow gaps of missing offsets in the Kafka stream. true

RejectPeriods

The reject periods use the ISO8601 Period format.

Field Description Default
lateMessageRejectionPeriod Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to PT1H and the supervisor creates a task at 2016-01-01T12:00Z, messages with timestamps earlier than 2016-01-01T11:00Z will be dropped. P7D
earlyMessageRejectionPeriod Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; taskDuration=PT1H P1D

Context

The context is used to indicate specific Druid Context, this is used to overwrite default configurations.

Task Scale & High-Availability

Field Description Default
taskCount The maximum number of reading tasks in a replica set. This means that the maximum number of reading tasks will be taskCount * replicas and the total number of tasks (reading + publishing) will be higher than this. See ‘Capacity Planning’ below for more details. The number of reading tasks will be less than taskCount if taskCount > {numKafkaPartitions}. 1
earlyMessageRejectionPeriod he number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different middlemanager to provide resiliency against node failure. 1

Batch Indexing

The Batch Indexing is based on Native Batch Ingestion. WDP helps to build Druid Task Spec using a simplify spec version’s that is called BatchIndexSimplifySpec.

{
  "firehose": <firehose_spec>,
  "dimensions": [
    "dim1", "dim2", "dim3"
  ],
  "dimensionExclusions": [],
  "metrics": [
    {
      "type":"count",
      "name":"messages"
    }
  ],
  "spatialDimensions": [
    {
      "dimName": "coordinates",
      "dims": [
        "dim_lat", "dim_long"
      ]
    }
  ],
  "queryGranularity": "MINUTE",
  "intervals": [
    "2017/2019"
  ],
  "context": {}
}

Firehose

On this section, you can use any Druid Firehose, including extensions firehose:

Dimensions

The dimensions are indexed like STRING. You use dimensions to aggregate data over them using metrics.

Field Description
dimensions The message fields’s that you want to index to generate rows
dimensionExclusions The message field’s that you want to exclude. All other fields will be indexed
spatialDimensions The spatial dimensions index coordinates data. You need to indicate latitude coordinate and longitude coordinate

Metrics

The metrics indicate the fields to aggregate and the way to do it. You can define all Druid Indexing Aggregators on this section.

Granularity

The granularity field determines how data gets bucketed across the time dimension. Supported granularity strings are: all, none, second, minute, fifteen_minute, thirty_minute, hour, day, week, month, quarter and year.

Invervals

A JSON Object representing ISO-8601 Intervals. This defines the time ranges to index data.

Context

The context is used to indicate specific Druid Context, this is used to overwrite default configurations.

Examples

S3 Indexing

{
  "firehose": {
    "type": "static-s3",
    "uris": [
      "s3://bucket/file.gz"
    ]
  },
  "dimensions": [
    "dim1"
  ],
  "metrics": [
    {
      "type": "count",
      "name": "count"
    }
  ],
  "queryGranularity": "MINUTE",
  "intervals": [
    "2013/2019"
  ],
  "context": {
    "druid.s3.accessKey": "XXXXXX",
    "druid.indexer.fork.property.druid.s3.accessKey": "XXXXXX",
    "druid.s3.secretKey": "YYYYY",
    "druid.indexer.fork.property.druid.s3.secretKey": "YYYYY"
  }
}

Remote HTTP Indexing

{
  "firehose": {
    "type"    : "http",
    "uris"  : ["http://example.com/uri1", "http://example2.com/uri2"]
  },
  "dimensions": [
    "dim1"
  ],
  "metrics": [
    {
      "type": "count",
      "name": "count"
    }
  ],
  "queryGranularity": "MINUTE",
  "intervals": [
    "2013/2019"
  ],
  "context": {
  }
}

Local File Indexing

{
  "firehose": {
        "type" : "local",
        "baseDir" : "examples/indexing/",
        "filter" : "file_data.json"
  },
  "dimensions": [
    "dim1"
  ],
  "metrics": [
    {
      "type": "count",
      "name": "count"
    }
  ],
  "queryGranularity": "MINUTE",
  "intervals": [
    "2013/2019"
  ],
  "context": {
  }
}