tree: bc764bf5a6e8b3c62d707b76181f8fcafa9b0fe0 [path history] [tgz]
  1. src/
  2. pom.xml
  3. README.md
contrib/storage-kafka/README.md

Drill Kafka Plugin

Drill kafka storage plugin allows you to perform interactive analysis using SQL against Apache Kafka.

Kafka topics and message offsets

$bin/kafka-topics --list --zookeeper localhost:2181
clicks
clickstream
clickstream-json-demo

$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic clickstream-json-demo --from-beginning | more
{"userID":"055e9af4-8c3c-4834-8482-8e05367a7bef","sessionID":"7badf08e-1e1d-4aeb-b853-7df2df4431ac","pageName":"shoes","refferalUrl":"yelp","ipAddress":"20.44.183.126","userAgent":"Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1","client_ts":1509926023099}
{"userID":"a29454b3-642d-481e-9dd8-0e0d7ef32ef5","sessionID":"b4a89204-b98c-4b4b-a1a9-f28f22d5ead3","pageName":"books","refferalUrl":"yelp","ipAddress":"252.252.113.190","userAgent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.106 Safari/537.36 OPR/38.0.2220.41","client_ts":1509926023100}
{"userID":"8c53b1c6-da47-4b5a-989d-61b5594f3a1d","sessionID":"baae3a1d-25b2-4955-8d07-20191f29ab32","pageName":"login","refferalUrl":"yelp","ipAddress":"110.170.214.255","userAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0","client_ts":1509926023100}

$ bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic clickstream-json-demo --time -2
clickstream-json-demo:2:2765000
clickstream-json-demo:1:2765000
clickstream-json-demo:3:2765000
clickstream-json-demo:0:2765000

$ bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic clickstream-json-demo --time -1
clickstream-json-demo:2:2765245
clickstream-json-demo:1:2765245
clickstream-json-demo:3:2765245
clickstream-json-demo:0:2765245


Drill queries on Kafka

$ bin/sqlline -u jdbc:drill:zk=localhost:2181
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support was removed in 8.0
apache drill 1.12.0-SNAPSHOT
"json ain't no thang"
0: jdbc:drill:zk=localhost:2181> use kafka;
+-------+------------------------------------+
|  ok   |              summary               |
+-------+------------------------------------+
| true  | Default schema changed to [kafka]  |
+-------+------------------------------------+
1 row selected (0.564 seconds)
0: jdbc:drill:zk=localhost:2181> show tables;
+---------------+------------------------------+
| TABLE_SCHEMA  |          TABLE_NAME          |
+---------------+------------------------------+
| kafka         | clickstream-json-demo        |
| kafka         | clickstream                  |
| kafka         | clicks                       |
+---------------+------------------------------+
17 rows selected (1.908 seconds)
0: jdbc:drill:zk=localhost:2181> ALTER SESSION SET `store.kafka.poll.timeout` = 200;
+-------+------------------------------------+
|  ok   |              summary               |
+-------+------------------------------------+
| true  | store.kafka.poll.timeout updated.  |
+-------+------------------------------------+
1 row selected (0.102 seconds)
0: jdbc:drill:zk=localhost:2181> ALTER SESSION SET `store.kafka.record.reader` = 'org.apache.drill.exec.store.kafka.decoders.JsonMessageReader';
+-------+-------------------------------------+
|  ok   |               summary               |
+-------+-------------------------------------+
| true  | store.kafka.record.reader updated.  |
+-------+-------------------------------------+
1 row selected (0.082 seconds)
0: jdbc:drill:zk=localhost:2181> select * from kafka.`clickstream-json-demo` limit 2;
+---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+
|                userID                 |               sessionID               |  pageName   | refferalUrl  |    ipAddress     |                                     userAgent                                     |   client_ts    |       kafkaTopic       | kafkaPartitionId  | kafkaMsgOffset  | kafkaMsgTimestamp  |
+---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+
| 6b55a8fa-d0fd-41f0-94e3-7f6b551cdede  | e3bd34a8-b546-4cd5-a0c6-5438589839fc  | categories  | bing         | 198.105.119.221  | Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0  | 1509926023098  | clickstream-json-demo  | 2                 | 2765000         | 1509926023098      |
| 74cffc37-2df0-4db4-aff9-ed0027a12d03  | 339e3821-5254-4d79-bbae-69bc12808eca  | furniture   | bing         | 161.169.50.60    | Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0     | 1509926023099  | clickstream-json-demo  | 2                 | 2765001         | 1509926023099      |
+---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+
2 rows selected (1.18 seconds)
0: jdbc:drill:zk=localhost:2181> select count(*) from kafka.`clickstream-json-demo`;
+---------+
| EXPR$0  |
+---------+
| 980     |
+---------+
1 row selected (0.732 seconds)
0: jdbc:drill:zk=localhost:2181> select kafkaPartitionId, MIN(kafkaMsgOffset) as minOffset, MAX(kafkaMsgOffset) as maxOffset from kafka.`clickstream-json-demo` group by kafkaPartitionId;
+-------------------+------------+------------+
| kafkaPartitionId  | minOffset  | maxOffset  |
+-------------------+------------+------------+
| 2                 | 2765000    | 2765244    |
| 1                 | 2765000    | 2765244    |
| 3                 | 2765000    | 2765244    |
| 0                 | 2765000    | 2765244    |
+-------------------+------------+------------+
4 rows selected (3.081 seconds)
0: jdbc:drill:zk=localhost:2181> select kafkaPartitionId, from_unixtime(MIN(kafkaMsgTimestamp)/1000) as minKafkaTS, from_unixtime(MAX(kafkaMsgTimestamp)/1000) as maxKafkaTs from kafka.`clickstream-json-demo` group by kafkaPartitionId;
+-------------------+----------------------+----------------------+
| kafkaPartitionId  |      minKafkaTS      |      maxKafkaTs      |
+-------------------+----------------------+----------------------+
| 2                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
| 1                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
| 3                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
| 0                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
+-------------------+----------------------+----------------------+
4 rows selected (2.758 seconds)
0: jdbc:drill:zk=localhost:2181> select distinct(refferalUrl) from kafka.`clickstream-json-demo`;
+--------------+
| refferalUrl  |
+--------------+
| bing         |
| yahoo        |
| yelp         |
| google       |
+--------------+
4 rows selected (2.944 seconds)
0: jdbc:drill:zk=localhost:2181> select pageName, count(*) from kafka.`clickstream-json-demo` group by pageName;
+--------------+---------+
|   pageName   | EXPR$1  |
+--------------+---------+
| categories   | 89      |
| furniture    | 89      |
| mobiles      | 89      |
| clothing     | 89      |
| sports       | 89      |
| offers       | 89      |
| shoes        | 89      |
| books        | 89      |
| login        | 90      |
| electronics  | 89      |
| toys         | 89      |
+--------------+---------+
11 rows selected (2.493 seconds)

Note:

  • store.kafka.record.reader system option can be used for setting record reader and default is org.apache.drill.exec.store.kafka.decoders.JsonMessageReader
  • Default store.kafka.poll.timeout is set to 200, user has to set this accordingly
  • Custom record reader can be implemented by extending org.apache.drill.exec.store.kafka.decoders.MessageReader and setting store.kafka.record.reader accordinlgy

In case of JSON message format, following system options can be used accordingly. More details can be found in Drill Json Model and in Drill system options configurations