| --! qt:disabled:HIVE-23985 |
| |
| SET hive.vectorized.execution.enabled=true; |
| |
| CREATE EXTERNAL TABLE kafka_table |
| (`__time` timestamp , `page` string, `user` string, `language` string, |
| `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, |
| `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) |
| STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' |
| WITH SERDEPROPERTIES ("timestamp.formats"="yyyy-MM-dd\'T\'HH:mm:ss\'Z\'") |
| TBLPROPERTIES |
| ("kafka.topic" = "test-topic", |
| "kafka.bootstrap.servers"="localhost:9093", |
| "kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe") |
| ; |
| |
| DESCRIBE EXTENDED kafka_table; |
| |
| Select `__partition` , `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , |
| `unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table; |
| |
| Select count(*) FROM kafka_table; |
| |
| Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , |
| `unpatrolled` , `anonymous` , `robot` , added , deleted , delta |
| from kafka_table where `__timestamp` > 1533960760123; |
| Select `__partition`, `__offset` ,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , |
| `unpatrolled` , `anonymous` , `robot` , added , deleted , delta |
| from kafka_table where `__timestamp` > 533960760123; |
| |
| Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , |
| `unpatrolled` , `anonymous` , `robot` , added , deleted , delta |
| from kafka_table where (`__offset` > 7 and `__partition` = 0 and `__offset` <9 ) OR |
| `__offset` = 4 and `__partition` = 0 OR (`__offset` <= 1 and `__partition` = 0 and `__offset` > 0); |
| |
| Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5; |
| |
| Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5; |
| |
| Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5; |
| |
| -- Timestamp filter |
| |
| Select `__partition`, `__offset`, `user` from kafka_table where |
| `__timestamp` > to_epoch_milli(CURRENT_TIMESTAMP - interval '1' HOURS) ; |
| |
| -- non existing partition |
| Select count(*) from kafka_table where `__partition` = 1; |
| |
| -- non existing offset |
| Select count(*) from kafka_table where `__offset` = 100; |
| |
| -- less than non existing offset and partition |
| Select count(*) from kafka_table where `__offset` <= 100 and `__partition` <= 100; |
| |
| Drop table kafka_table_offsets; |
| create table kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp); |
| |
| insert overwrite table kafka_table_offsets select `__partition`, min(`__offset`) - 1, CURRENT_TIMESTAMP from kafka_table group by `__partition`, CURRENT_TIMESTAMP ; |
| |
| -- check initial state is 0 for partition and 0 offsets |
| select partition_id, max_offset from kafka_table_offsets; |
| |
| Drop table orc_kafka_table; |
| Create table orc_kafka_table (partition_id int, row_offset bigint, kafka_ts bigint, |
| `__time` timestamp , `page` string, `user` string, `language` string, |
| `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, |
| `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint |
| ) stored as ORC; |
| |
| |
| From kafka_table ktable JOIN kafka_table_offsets offset_table |
| on (ktable.`__partition` = offset_table.partition_id and ktable.`__offset` > offset_table.max_offset and ktable.`__offset` < 3 ) |
| insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`, |
| `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , |
| `unpatrolled` , `anonymous` , `robot` , added , deleted , delta |
| Insert overwrite table kafka_table_offsets select |
| `__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP; |
| |
| -- should ingest only first 3 rows |
| select count(*) from orc_kafka_table; |
| |
| -- check max offset is 2 |
| select partition_id, max_offset from kafka_table_offsets; |
| |
| -- 3 rows form 0 to 2 |
| select `partition_id`, `row_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , |
| `unpatrolled` , `anonymous` , `robot` , added , deleted , delta from orc_kafka_table; |
| |
| |
| -- insert the rest using inner join |
| |
| From kafka_table ktable JOIN kafka_table_offsets offset_table |
| on (ktable.`__partition` = offset_table.partition_id and ktable.`__offset` > offset_table.max_offset) |
| insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`, |
| `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , |
| `unpatrolled` , `anonymous` , `robot` , added , deleted , delta |
| Insert overwrite table kafka_table_offsets select |
| `__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP; |
| |
| -- check that max offset is 9 |
| select partition_id, max_offset from kafka_table_offsets; |
| |
| -- 10 rows |
| select count(*) from orc_kafka_table; |
| |
| -- no duplicate or missing data |
| select `partition_id`, `row_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , |
| `unpatrolled` , `anonymous` , `robot` , added , deleted , delta from orc_kafka_table; |
| |
| -- LEFT OUTER JOIN if metadata is empty |
| |
| Drop table kafka_table_offsets; |
| create table kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp); |
| |
| Drop table orc_kafka_table; |
| Create table orc_kafka_table (partition_id int, row_offset bigint, kafka_ts bigint, |
| `__time` timestamp , `page` string, `user` string, `language` string, |
| `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, |
| `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint |
| ) stored as ORC; |
| |
| |
| From kafka_table ktable LEFT OUTER JOIN kafka_table_offsets offset_table |
| on (ktable.`__partition` = offset_table.partition_id and ktable.`__offset` > offset_table.max_offset ) |
| insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`, |
| `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , |
| `unpatrolled` , `anonymous` , `robot` , added , deleted , delta |
| Insert overwrite table kafka_table_offsets select |
| `__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP; |
| |
| select count(*) from orc_kafka_table; |
| |
| select partition_id, max_offset from kafka_table_offsets; |
| |
| select `partition_id`, `row_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , |
| `unpatrolled` , `anonymous` , `robot` , added , deleted , delta from orc_kafka_table; |
| |
| -- using basic implementation of flat json probably to be removed |
| CREATE EXTERNAL TABLE kafka_table_2 |
| (`__time` timestamp with local time zone , `page` string, `user` string, `language` string, |
| `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, |
| `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) |
| STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' |
| WITH SERDEPROPERTIES ("timestamp.formats"="yyyy-MM-dd\'T\'HH:mm:ss\'Z\'") |
| TBLPROPERTIES |
| ("kafka.topic" = "test-topic", |
| "kafka.bootstrap.servers"="localhost:9093"); |
| |
| Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , |
| `unpatrolled` , `anonymous` , `robot` , added , deleted , delta |
| FROM kafka_table_2; |
| |
| Select count(*) FROM kafka_table_2; |
| |
| CREATE EXTERNAL TABLE wiki_kafka_avro_table |
| STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' |
| TBLPROPERTIES |
| ("kafka.topic" = "wiki_kafka_avro_table", |
| "kafka.bootstrap.servers"="localhost:9093", |
| "kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe", |
| 'avro.schema.literal'='{ |
| "type" : "record", |
| "name" : "Wikipedia", |
| "namespace" : "org.apache.hive.kafka", |
| "version": "1", |
| "fields" : [ { |
| "name" : "isrobot", |
| "type" : "boolean" |
| }, { |
| "name" : "channel", |
| "type" : "string" |
| }, { |
| "name" : "timestamp", |
| "type" : "string" |
| }, { |
| "name" : "flags", |
| "type" : "string" |
| }, { |
| "name" : "isunpatrolled", |
| "type" : "boolean" |
| }, { |
| "name" : "page", |
| "type" : "string" |
| }, { |
| "name" : "diffurl", |
| "type" : "string" |
| }, { |
| "name" : "added", |
| "type" : "long" |
| }, { |
| "name" : "comment", |
| "type" : "string" |
| }, { |
| "name" : "commentlength", |
| "type" : "long" |
| }, { |
| "name" : "isnew", |
| "type" : "boolean" |
| }, { |
| "name" : "isminor", |
| "type" : "boolean" |
| }, { |
| "name" : "delta", |
| "type" : "long" |
| }, { |
| "name" : "isanonymous", |
| "type" : "boolean" |
| }, { |
| "name" : "user", |
| "type" : "string" |
| }, { |
| "name" : "deltabucket", |
| "type" : "double" |
| }, { |
| "name" : "deleted", |
| "type" : "long" |
| }, { |
| "name" : "namespace", |
| "type" : "string" |
| } ] |
| }' |
| ); |
| |
| describe extended wiki_kafka_avro_table; |
| |
| |
| select cast (`__timestamp` as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, |
| `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table; |
| |
| select count(*) from wiki_kafka_avro_table; |
| |
| select count(distinct `user`) from wiki_kafka_avro_table; |
| |
| select sum(deltabucket), min(commentlength) from wiki_kafka_avro_table; |
| |
| select cast (`__timestamp` as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, |
| `__partition`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, |
| `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090; |
| |
| |
| CREATE EXTERNAL TABLE kafka_table_insert |
| (c_name string, c_int int, c_float float) |
| STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' |
| WITH SERDEPROPERTIES ("timestamp.formats"="yyyy-MM-dd\'T\'HH:mm:ss\'Z\'") |
| TBLPROPERTIES |
| ("kafka.topic" = "test-topic-write-json", |
| "kafka.bootstrap.servers"="localhost:9093", |
| "kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe") |
| ; |
| |
| insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`) |
| values ('test1',5, 4.999,'key',null ,-1,1536449552290); |
| |
| insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`) |
| values ('test2',15, 14.9996666, null ,null ,-1,1536449552285); |
| |
| select * from kafka_table_insert; |
| |
| |
| insert into table wiki_kafka_avro_table select |
| isrobot as isrobot, channel as channel,`timestamp` as `timestamp`, flags as flags, isunpatrolled as isunpatrolled, page as page, |
| diffurl as diffurl, added as added, comment as comment, commentlength as commentlength, isnew as isnew, isminor as isminor, |
| delta as delta, isanonymous as isanonymous, `user` as `user`, deltabucket as detlabucket, deleted as deleted, namespace as namespace, |
| `__key`, `__partition`, -1 as `__offset`,`__timestamp` |
| from wiki_kafka_avro_table; |
| |
| select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, |
| `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table; |
| |
| select `__key`, count(1) FROM wiki_kafka_avro_table group by `__key` order by `__key`; |
| |
| select `__timestamp`, count(1) from wiki_kafka_avro_table group by `__timestamp` order by `__timestamp`; |
| |
| |
| CREATE EXTERNAL TABLE kafka_table_csv |
| (c_name string, c_int int, c_float float) |
| STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' |
| TBLPROPERTIES |
| ("kafka.topic" = "test-topic-write-csv", |
| "kafka.bootstrap.servers"="localhost:9093", |
| "kafka.serde.class"="org.apache.hadoop.hive.serde2.OpenCSVSerde"); |
| |
| ALTER TABLE kafka_table_csv SET TBLPROPERTIES ("hive.kafka.optimistic.commit"="false", "kafka.write.semantic"="EXACTLY_ONCE"); |
| insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp` from kafka_table_insert; |
| |
| insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`) |
| values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291); |
| |
| insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`) |
| values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284); |
| |
| select * from kafka_table_csv; |
| select distinct `__key`, c_name from kafka_table_csv; |
| |
| SET hive.vectorized.execution.enabled=false ; |
| explain extended select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table; |
| select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table; |
| |
| SET hive.vectorized.execution.enabled=true ; |
| explain extended select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table; |
| select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table; |