Drill kafka storage plugin allows you to perform interactive analysis using SQL against Apache Kafka.
Kafka topics and message offsets
$bin/kafka-topics --list --zookeeper localhost:2181 clicks clickstream clickstream-json-demo $ bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic clickstream-json-demo --from-beginning | more {"userID":"055e9af4-8c3c-4834-8482-8e05367a7bef","sessionID":"7badf08e-1e1d-4aeb-b853-7df2df4431ac","pageName":"shoes","refferalUrl":"yelp","ipAddress":"20.44.183.126","userAgent":"Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1","client_ts":1509926023099} {"userID":"a29454b3-642d-481e-9dd8-0e0d7ef32ef5","sessionID":"b4a89204-b98c-4b4b-a1a9-f28f22d5ead3","pageName":"books","refferalUrl":"yelp","ipAddress":"252.252.113.190","userAgent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.106 Safari/537.36 OPR/38.0.2220.41","client_ts":1509926023100} {"userID":"8c53b1c6-da47-4b5a-989d-61b5594f3a1d","sessionID":"baae3a1d-25b2-4955-8d07-20191f29ab32","pageName":"login","refferalUrl":"yelp","ipAddress":"110.170.214.255","userAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0","client_ts":1509926023100} $ bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic clickstream-json-demo --time -2 clickstream-json-demo:2:2765000 clickstream-json-demo:1:2765000 clickstream-json-demo:3:2765000 clickstream-json-demo:0:2765000 $ bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic clickstream-json-demo --time -1 clickstream-json-demo:2:2765245 clickstream-json-demo:1:2765245 clickstream-json-demo:3:2765245 clickstream-json-demo:0:2765245
Drill queries on Kafka
$ bin/sqlline -u jdbc:drill:zk=localhost:2181 Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support was removed in 8.0 apache drill 1.12.0-SNAPSHOT "json ain't no thang" 0: jdbc:drill:zk=localhost:2181> use kafka; +-------+------------------------------------+ | ok | summary | +-------+------------------------------------+ | true | Default schema changed to [kafka] | +-------+------------------------------------+ 1 row selected (0.564 seconds) 0: jdbc:drill:zk=localhost:2181> show tables; +---------------+------------------------------+ | TABLE_SCHEMA | TABLE_NAME | +---------------+------------------------------+ | kafka | clickstream-json-demo | | kafka | clickstream | | kafka | clicks | +---------------+------------------------------+ 17 rows selected (1.908 seconds) 0: jdbc:drill:zk=localhost:2181> ALTER SESSION SET `store.kafka.poll.timeout` = 200; +-------+------------------------------------+ | ok | summary | +-------+------------------------------------+ | true | store.kafka.poll.timeout updated. | +-------+------------------------------------+ 1 row selected (0.102 seconds) 0: jdbc:drill:zk=localhost:2181> ALTER SESSION SET `store.kafka.record.reader` = 'org.apache.drill.exec.store.kafka.decoders.JsonMessageReader'; +-------+-------------------------------------+ | ok | summary | +-------+-------------------------------------+ | true | store.kafka.record.reader updated. | +-------+-------------------------------------+ 1 row selected (0.082 seconds) 0: jdbc:drill:zk=localhost:2181> select * from kafka.`clickstream-json-demo` limit 2; +---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+ | userID | sessionID | pageName | refferalUrl | ipAddress | userAgent | client_ts | kafkaTopic | kafkaPartitionId | kafkaMsgOffset | kafkaMsgTimestamp | +---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+ | 6b55a8fa-d0fd-41f0-94e3-7f6b551cdede | e3bd34a8-b546-4cd5-a0c6-5438589839fc | categories | bing | 198.105.119.221 | Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0 | 1509926023098 | clickstream-json-demo | 2 | 2765000 | 1509926023098 | | 74cffc37-2df0-4db4-aff9-ed0027a12d03 | 339e3821-5254-4d79-bbae-69bc12808eca | furniture | bing | 161.169.50.60 | Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0 | 1509926023099 | clickstream-json-demo | 2 | 2765001 | 1509926023099 | +---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+ 2 rows selected (1.18 seconds) 0: jdbc:drill:zk=localhost:2181> select count(*) from kafka.`clickstream-json-demo`; +---------+ | EXPR$0 | +---------+ | 980 | +---------+ 1 row selected (0.732 seconds) 0: jdbc:drill:zk=localhost:2181> select kafkaPartitionId, MIN(kafkaMsgOffset) as minOffset, MAX(kafkaMsgOffset) as maxOffset from kafka.`clickstream-json-demo` group by kafkaPartitionId; +-------------------+------------+------------+ | kafkaPartitionId | minOffset | maxOffset | +-------------------+------------+------------+ | 2 | 2765000 | 2765244 | | 1 | 2765000 | 2765244 | | 3 | 2765000 | 2765244 | | 0 | 2765000 | 2765244 | +-------------------+------------+------------+ 4 rows selected (3.081 seconds) 0: jdbc:drill:zk=localhost:2181> select kafkaPartitionId, from_unixtime(MIN(kafkaMsgTimestamp)/1000) as minKafkaTS, from_unixtime(MAX(kafkaMsgTimestamp)/1000) as maxKafkaTs from kafka.`clickstream-json-demo` group by kafkaPartitionId; +-------------------+----------------------+----------------------+ | kafkaPartitionId | minKafkaTS | maxKafkaTs | +-------------------+----------------------+----------------------+ | 2 | 2017-11-05 15:53:43 | 2017-11-05 15:53:43 | | 1 | 2017-11-05 15:53:43 | 2017-11-05 15:53:43 | | 3 | 2017-11-05 15:53:43 | 2017-11-05 15:53:43 | | 0 | 2017-11-05 15:53:43 | 2017-11-05 15:53:43 | +-------------------+----------------------+----------------------+ 4 rows selected (2.758 seconds) 0: jdbc:drill:zk=localhost:2181> select distinct(refferalUrl) from kafka.`clickstream-json-demo`; +--------------+ | refferalUrl | +--------------+ | bing | | yahoo | | yelp | | google | +--------------+ 4 rows selected (2.944 seconds) 0: jdbc:drill:zk=localhost:2181> select pageName, count(*) from kafka.`clickstream-json-demo` group by pageName; +--------------+---------+ | pageName | EXPR$1 | +--------------+---------+ | categories | 89 | | furniture | 89 | | mobiles | 89 | | clothing | 89 | | sports | 89 | | offers | 89 | | shoes | 89 | | books | 89 | | login | 90 | | electronics | 89 | | toys | 89 | +--------------+---------+ 11 rows selected (2.493 seconds)
Note:
In case of JSON message format, following system / session options can be used accordingly. More details can be found in Drill Json Model and in Drill system options configurations