tree: 30840265b1a286ec6100ebfbacd512c807332352 [path history] [tgz]
  1. src/
  2. pom.xml
  3. README.md
contrib/storage-kafka/README.md

Drill Kafka Plugin

Drill kafka storage plugin allows you to perform interactive analysis using SQL against Apache Kafka.

Kafka topics and message offsets

$bin/kafka-topics --list --zookeeper localhost:2181
clicks
clickstream
clickstream-json-demo

$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic clickstream-json-demo --from-beginning | more
{"userID":"055e9af4-8c3c-4834-8482-8e05367a7bef","sessionID":"7badf08e-1e1d-4aeb-b853-7df2df4431ac","pageName":"shoes","refferalUrl":"yelp","ipAddress":"20.44.183.126","userAgent":"Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1","client_ts":1509926023099}
{"userID":"a29454b3-642d-481e-9dd8-0e0d7ef32ef5","sessionID":"b4a89204-b98c-4b4b-a1a9-f28f22d5ead3","pageName":"books","refferalUrl":"yelp","ipAddress":"252.252.113.190","userAgent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.106 Safari/537.36 OPR/38.0.2220.41","client_ts":1509926023100}
{"userID":"8c53b1c6-da47-4b5a-989d-61b5594f3a1d","sessionID":"baae3a1d-25b2-4955-8d07-20191f29ab32","pageName":"login","refferalUrl":"yelp","ipAddress":"110.170.214.255","userAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0","client_ts":1509926023100}

$ bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic clickstream-json-demo --time -2
clickstream-json-demo:2:2765000
clickstream-json-demo:1:2765000
clickstream-json-demo:3:2765000
clickstream-json-demo:0:2765000

$ bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic clickstream-json-demo --time -1
clickstream-json-demo:2:2765245
clickstream-json-demo:1:2765245
clickstream-json-demo:3:2765245
clickstream-json-demo:0:2765245


Drill queries on Kafka

$ bin/sqlline -u jdbc:drill:zk=localhost:2181
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support was removed in 8.0
apache drill 1.12.0-SNAPSHOT
"json ain't no thang"
0: jdbc:drill:zk=localhost:2181> use kafka;
+-------+------------------------------------+
|  ok   |              summary               |
+-------+------------------------------------+
| true  | Default schema changed to [kafka]  |
+-------+------------------------------------+
1 row selected (0.564 seconds)
0: jdbc:drill:zk=localhost:2181> show tables;
+---------------+------------------------------+
| TABLE_SCHEMA  |          TABLE_NAME          |
+---------------+------------------------------+
| kafka         | clickstream-json-demo        |
| kafka         | clickstream                  |
| kafka         | clicks                       |
+---------------+------------------------------+
17 rows selected (1.908 seconds)
0: jdbc:drill:zk=localhost:2181> ALTER SESSION SET `store.kafka.poll.timeout` = 200;
+-------+------------------------------------+
|  ok   |              summary               |
+-------+------------------------------------+
| true  | store.kafka.poll.timeout updated.  |
+-------+------------------------------------+
1 row selected (0.102 seconds)
0: jdbc:drill:zk=localhost:2181> ALTER SESSION SET `store.kafka.record.reader` = 'org.apache.drill.exec.store.kafka.decoders.JsonMessageReader';
+-------+-------------------------------------+
|  ok   |               summary               |
+-------+-------------------------------------+
| true  | store.kafka.record.reader updated.  |
+-------+-------------------------------------+
1 row selected (0.082 seconds)
0: jdbc:drill:zk=localhost:2181> select * from kafka.`clickstream-json-demo` limit 2;
+---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+
|                userID                 |               sessionID               |  pageName   | refferalUrl  |    ipAddress     |                                     userAgent                                     |   client_ts    |       kafkaTopic       | kafkaPartitionId  | kafkaMsgOffset  | kafkaMsgTimestamp  |
+---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+
| 6b55a8fa-d0fd-41f0-94e3-7f6b551cdede  | e3bd34a8-b546-4cd5-a0c6-5438589839fc  | categories  | bing         | 198.105.119.221  | Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0  | 1509926023098  | clickstream-json-demo  | 2                 | 2765000         | 1509926023098      |
| 74cffc37-2df0-4db4-aff9-ed0027a12d03  | 339e3821-5254-4d79-bbae-69bc12808eca  | furniture   | bing         | 161.169.50.60    | Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0     | 1509926023099  | clickstream-json-demo  | 2                 | 2765001         | 1509926023099      |
+---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+
2 rows selected (1.18 seconds)
0: jdbc:drill:zk=localhost:2181> select count(*) from kafka.`clickstream-json-demo`;
+---------+
| EXPR$0  |
+---------+
| 980     |
+---------+
1 row selected (0.732 seconds)
0: jdbc:drill:zk=localhost:2181> select kafkaPartitionId, MIN(kafkaMsgOffset) as minOffset, MAX(kafkaMsgOffset) as maxOffset from kafka.`clickstream-json-demo` group by kafkaPartitionId;
+-------------------+------------+------------+
| kafkaPartitionId  | minOffset  | maxOffset  |
+-------------------+------------+------------+
| 2                 | 2765000    | 2765244    |
| 1                 | 2765000    | 2765244    |
| 3                 | 2765000    | 2765244    |
| 0                 | 2765000    | 2765244    |
+-------------------+------------+------------+
4 rows selected (3.081 seconds)
0: jdbc:drill:zk=localhost:2181> select kafkaPartitionId, from_unixtime(MIN(kafkaMsgTimestamp)/1000) as minKafkaTS, from_unixtime(MAX(kafkaMsgTimestamp)/1000) as maxKafkaTs from kafka.`clickstream-json-demo` group by kafkaPartitionId;
+-------------------+----------------------+----------------------+
| kafkaPartitionId  |      minKafkaTS      |      maxKafkaTs      |
+-------------------+----------------------+----------------------+
| 2                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
| 1                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
| 3                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
| 0                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
+-------------------+----------------------+----------------------+
4 rows selected (2.758 seconds)
0: jdbc:drill:zk=localhost:2181> select distinct(refferalUrl) from kafka.`clickstream-json-demo`;
+--------------+
| refferalUrl  |
+--------------+
| bing         |
| yahoo        |
| yelp         |
| google       |
+--------------+
4 rows selected (2.944 seconds)
0: jdbc:drill:zk=localhost:2181> select pageName, count(*) from kafka.`clickstream-json-demo` group by pageName;
+--------------+---------+
|   pageName   | EXPR$1  |
+--------------+---------+
| categories   | 89      |
| furniture    | 89      |
| mobiles      | 89      |
| clothing     | 89      |
| sports       | 89      |
| offers       | 89      |
| shoes        | 89      |
| books        | 89      |
| login        | 90      |
| electronics  | 89      |
| toys         | 89      |
+--------------+---------+
11 rows selected (2.493 seconds)

Note:

  • store.kafka.record.reader system option can be used for setting record reader and default is org.apache.drill.exec.store.kafka.decoders.JsonMessageReader
  • Default store.kafka.poll.timeout is set to 200, user has to set this accordingly
  • Custom record reader can be implemented by extending org.apache.drill.exec.store.kafka.decoders.MessageReader and setting store.kafka.record.reader accordingly

In case of JSON message format, following system / session options can be used accordingly. More details can be found in [Drill Json Model](https://drill.apache.org/docs/json-data -model/) and in Drill system options configurations