This Apache Druid extension enables Druid to ingest and understand the Protobuf data format. Make sure to include druid-protobuf-extensions
as an extension.
The druid-protobuf-extensions
provides the Protobuf Parser for stream ingestion. See corresponding docs for details.
This example demonstrates how to load Protobuf messages from Kafka. Please read the Load from Kafka tutorial first. This example will use the same “metrics” dataset.
Files used in this example are found at ./examples/quickstart/protobuf
in your Druid directory.
localhost:9092
.metrics_pb
instead of metrics
.metrics-kafka-pb
instead of metrics-kafka
to avoid the confusion.Here is the metrics JSON example.
{ "unit": "milliseconds", "http_method": "GET", "value": 44, "timestamp": "2017-04-06T02:36:22Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www1.example.com" }
The proto file should look like this. Save it as metrics.proto.
syntax = "proto3"; message Metrics { string unit = 1; string http_method = 2; int32 value = 3; string timestamp = 4; string http_code = 5; string page = 6; string metricType = 7; string server = 8; }
Using the protoc
Protobuf compiler to generate the descriptor file. Save the metrics.desc file either in the classpath or reachable by URL. In this example the descriptor file was saved at /tmp/metrics.desc.
protoc -o /tmp/metrics.desc metrics.proto
Below is the complete Supervisor spec JSON to be submitted to the Overlord. Please make sure these keys are properly configured for successful ingestion.
descriptor
for the descriptor file URL.protoMessageType
from the proto definition.format
must be json
.topic
to subscribe. The topic is “metrics_pb” instead of “metrics”.bootstrap.server
is the Kafka broker host.{ "type": "kafka", "dataSchema": { "dataSource": "metrics-kafka2", "parser": { "type": "protobuf", "descriptor": "file:///tmp/metrics.desc", "protoMessageType": "Metrics", "parseSpec": { "format": "json", "timestampSpec": { "column": "timestamp", "format": "auto" }, "dimensionsSpec": { "dimensions": [ "unit", "http_method", "http_code", "page", "metricType", "server" ], "dimensionExclusions": [ "timestamp", "value" ] } } }, "metricsSpec": [ { "name": "count", "type": "count" }, { "name": "value_sum", "fieldName": "value", "type": "doubleSum" }, { "name": "value_min", "fieldName": "value", "type": "doubleMin" }, { "name": "value_max", "fieldName": "value", "type": "doubleMax" } ], "granularitySpec": { "type": "uniform", "segmentGranularity": "HOUR", "queryGranularity": "NONE" } }, "tuningConfig": { "type": "kafka", "maxRowsPerSegment": 5000000 }, "ioConfig": { "topic": "metrics_pb", "consumerProperties": { "bootstrap.servers": "localhost:9092" }, "taskCount": 1, "replicas": 1, "taskDuration": "PT1H" } }
Here is the sample script that publishes the metrics to Kafka in Protobuf format.
protoc
again with the Python binding option. This command generates metrics_pb2.py
file.protoc -o metrics.desc metrics.proto --python_out=.
This script requires protobuf
and kafka-python
modules.
#!/usr/bin/env python import sys import json from kafka import KafkaProducer from metrics_pb2 import Metrics producer = KafkaProducer(bootstrap_servers='localhost:9092') topic = 'metrics_pb' metrics = Metrics() for row in iter(sys.stdin): d = json.loads(row) for k, v in d.items(): setattr(metrics, k, v) pb = metrics.SerializeToString() producer.send(topic, pb)
./bin/generate-example-metrics | ./pb_publisher.py
kafka-console-consumer --zookeeper localhost --topic metrics_pb
It should print messages like this
millisecondsGETR"2017-04-06T03:23:56Z*2002/list:request/latencyBwww1.example.com