| |
| 1. ORC read process: |
| |
| FileInputStream -> SnappyDecompressionStream -> RLE decoder -> raw bytes |
| |
| 2. ORC write process |
| |
| raw bytes -> RLE encoder -> SnappyCompressionStream -> FileOutputStream |
| |
| 3. TODOs |
| |
| 1) url format “hdfs://localhost:8020/user/hive/warehouse/tsmallint/” or “hdfs://localhost:8020//user/hive/warehouse/tsmallint” are not correct |
| |
| 2) now some types are supported: tinyint, smallint, int, bigint,float, double, string, varchar |
| need more type support: boolean/date/char(x)/timestamp/decimal/struct et al |
| |
| TO add a type support, we need to pay special attention to: |
| a. TypeImpl::createRowBatch: this function returns correct ColumnVectorBatch |
| b. buildReader -- this function returns reader of correct type |
| |
| 3) null value handling performance enhancement |
| |
| 4) writer |
| |
| orc::WriterOptions opts; |
| std::unique_ptr<orc::Writer> writer; |
| dbcommon::URL url(filename); |
| dbcommon::FileSystemManager fsm; |
| dbcommon::FileSystem *fs = fsm.get(url.getNormalizedServiceName()); |
| writer = orc::createWriter(orc::writeFile(fs, url.getPath()), opts); |
| |
| std::unique_ptr<orc::ColumnVectorBatch> batch = writer->createRowBatch(1000); |
| for (uint32_t i = 0; i < 100; i++) { |
| writer->addRowBatch(batch); |
| } |
| writer->close(); |
| |
| 5) hasEnoughSpaceForBatch needs to be revisited |
| // estimated tuple batch size |
| // TODO(lei): might need to be revised after we figure out how to |
| // store other types |
| |
| 6) read footer only once on master, then dispatch it to worker. this can potentially |
| avoid all opening footers at the same time |
| |
| 7) add some boundary numbers tests for orc format: for example, max(int32_t) for different |
| encoding schemes - delta, direct, patchedbase, short repeat. |
| |
| 8) need to compare the performance & compression ration for lz4 and snappy |
| |
| 9) write more information to orc file: statistics, indexes. otherwise, |
| it needs data reloading when we added the feature in reader. |
| |
| 10) add tests for snappy (since lz4 is now default), and add tests for snappycompressor/lz4compressor |
| |
| 4. How to use |
| |
| -- hive |
| create table tcn(t tinyint, s smallint, i int, b bigint, f float, d double, str string, v varchar(10), c char(4), bin binary) stored as orc; |
| |
| insert into tcn values (1, 2, 3, 4, 1.1, 1.2, 'string', 'var', 'char', 'binary'); |
| |
| select * from tcn; |
| |
| -- computenode |
| create table tcn(t tinyint, s smallint, i int, b bigint, f float, d double, str string, v varchar(10), c string, bin string) |
| with (format = orc, location= 'hdfs://localhost:8020/user/hive/warehouse/tcn'); |
| |
| select * from tcn; |
| |
| 5. micro benchmark: based on 2016 Oct 15 version (after analyze) |
| |
| NOTE: Analyze is very important for aggregation. |
| |
| lz4 fast snappy orcnone postgres |
| filesize 426MB 1042MB 402MB 696M 985MB |
| load 20239ms 20784ms 20098ms 19944ms 22767ms(copy) |
| count* 32ms 32ms 31ms 30ms 639ms |
| countint 96ms 42ms 98ms 97ms 745ms |
| countstring 179ms 85ms 235ms 120ms 1154ms |
| count2int2string 444ms 165ms 501ms 365ms 1596ms |
| tpch-Q1 1025ms 386ms 1072ms 896ms 3830ms |
| |
| |
| The benchmark used is: |
| 1) schema |
| CREATE TABLE e_LINEITEM ( L_ORDERKEY int, |
| L_PARTKEY int, |
| L_SUPPKEY int, |
| L_LINENUMBER int, |
| L_QUANTITY double, |
| L_EXTENDEDPRICE double, |
| L_DISCOUNT double, |
| L_TAX double, |
| L_RETURNFLAG string, |
| L_LINESTATUS string, |
| L_SHIPDATE string, |
| L_COMMITDATE string, |
| L_RECEIPTDATE string, |
| L_SHIPINSTRUCT string, |
| L_SHIPMODE string, |
| L_COMMENT string) with (FORMAT = command, COMMANDS = '/Users/ChangLei/curwork/dev/computenode/inst/bin/dbgen -b /Users/ChangLei/curwork/dev/computenode/inst/bin/dists.dss -T L -s 1 -C 2 -S $TASKNO', TaskCount = 2); |
| |
| CREATE TABLE lineitem_orc_lz4 ( L_ORDERKEY int, |
| L_PARTKEY int, |
| L_SUPPKEY int, |
| L_LINENUMBER int, |
| L_QUANTITY double, |
| L_EXTENDEDPRICE double, |
| L_DISCOUNT double, |
| L_TAX double, |
| L_RETURNFLAG string, |
| L_LINESTATUS string, |
| L_SHIPDATE string, |
| L_COMMITDATE string, |
| L_RECEIPTDATE string, |
| L_SHIPINSTRUCT string, |
| L_SHIPMODE string, |
| L_COMMENT string) with(format = orc, location='file:///tmp/lineitem_orc_lz4'); |
| |
| CREATE TABLE lineitem_fast ( L_ORDERKEY int, |
| L_PARTKEY int, |
| L_SUPPKEY int, |
| L_LINENUMBER int, |
| L_QUANTITY double, |
| L_EXTENDEDPRICE double, |
| L_DISCOUNT double, |
| L_TAX double, |
| L_RETURNFLAG string, |
| L_LINESTATUS string, |
| L_SHIPDATE string, |
| L_COMMITDATE string, |
| L_RECEIPTDATE string, |
| L_SHIPINSTRUCT string, |
| L_SHIPMODE string, |
| L_COMMENT string) with(format = fast, location='file:///tmp/lineitem_fast'); |
| |
| CREATE TABLE lineitem_orc_snappy ( L_ORDERKEY int, |
| L_PARTKEY int, |
| L_SUPPKEY int, |
| L_LINENUMBER int, |
| L_QUANTITY double, |
| L_EXTENDEDPRICE double, |
| L_DISCOUNT double, |
| L_TAX double, |
| L_RETURNFLAG string, |
| L_LINESTATUS string, |
| L_SHIPDATE string, |
| L_COMMITDATE string, |
| L_RECEIPTDATE string, |
| L_SHIPINSTRUCT string, |
| L_SHIPMODE string, |
| L_COMMENT string) with(format = orc, location='file:///tmp/lineitem_orc_snappy'); |
| |
| CREATE TABLE lineitem_orc_none ( L_ORDERKEY int, |
| L_PARTKEY int, |
| L_SUPPKEY int, |
| L_LINENUMBER int, |
| L_QUANTITY double, |
| L_EXTENDEDPRICE double, |
| L_DISCOUNT double, |
| L_TAX double, |
| L_RETURNFLAG string, |
| L_LINESTATUS string, |
| L_SHIPDATE string, |
| L_COMMITDATE string, |
| L_RECEIPTDATE string, |
| L_SHIPINSTRUCT string, |
| L_SHIPMODE string, |
| L_COMMENT string) with(format = orc, location='file:///tmp/lineitem_orc_none'); |
| |
| CREATE TABLE lineitem_pg ( L_ORDERKEY int, |
| L_PARTKEY int, |
| L_SUPPKEY int, |
| L_LINENUMBER int, |
| L_QUANTITY double precision, |
| L_EXTENDEDPRICE double precision, |
| L_DISCOUNT double precision, |
| L_TAX double precision, |
| L_RETURNFLAG varchar, |
| L_LINESTATUS varchar, |
| L_SHIPDATE varchar, |
| L_COMMITDATE varchar, |
| L_RECEIPTDATE varchar, |
| L_SHIPINSTRUCT varchar, |
| L_SHIPMODE varchar, |
| L_COMMENT varchar); |
| |
| 2) loading |
| insert into lineitem_orc_lz4 select * from e_lineitem; |
| insert into lineitem_fast select * from e_lineitem; |
| insert into lineitem_orc_snappy select * from e_lineitem; |
| insert into lineitem_orc_none select * from e_lineitem; |
| copy lineitem_pg from '/Users/ChangLei/curwork/dev/tpch-dbgen/lineitem.tbl' with delimiter '|'; |
| |
| analyze lineitem_orc_lz4; |
| analyze lineitem_fast; |
| analyze lineitem_orc_snappy; |
| analyze lineitem_orc_none; |
| analyze lineitem_pg; |
| |
| |
| 3) count* |
| select count(*) from lineitem_orc_lz4; |
| select count(*) from lineitem_fast; |
| select count(*) from lineitem_orc_snappy; |
| select count(*) from lineitem_orc_none; |
| select count(*) from lineitem_pg; |
| |
| 4) countint |
| select count(L_ORDERKEY) from lineitem_orc_lz4; |
| select count(L_ORDERKEY) from lineitem_fast; |
| select count(L_ORDERKEY) from lineitem_orc_snappy; |
| select count(L_ORDERKEY) from lineitem_orc_none; |
| select count(L_ORDERKEY) from lineitem_pg; |
| |
| 5) countstring |
| select count(L_COMMENT) from lineitem_orc_lz4; |
| select count(L_COMMENT) from lineitem_fast; |
| select count(L_COMMENT) from lineitem_orc_snappy; |
| select count(L_COMMENT) from lineitem_orc_none; |
| select count(L_COMMENT) from lineitem_pg; |
| |
| 6) count2int2string |
| select count(L_ORDERKEY), count(L_SUPPKEY), count(L_SHIPMODE), count(L_COMMENT) from lineitem_orc_lz4; |
| select count(L_ORDERKEY), count(L_SUPPKEY), count(L_SHIPMODE), count(L_COMMENT) from lineitem_fast; |
| select count(L_ORDERKEY), count(L_SUPPKEY), count(L_SHIPMODE), count(L_COMMENT) from lineitem_orc_snappy; |
| select count(L_ORDERKEY), count(L_SUPPKEY), count(L_SHIPMODE), count(L_COMMENT) from lineitem_orc_none; |
| select count(L_ORDERKEY), count(L_SUPPKEY), count(L_SHIPMODE), count(L_COMMENT) from lineitem_pg; |
| |
| 7) tpch-Q1 |
| |
| SELECT |
| l_returnflag, |
| l_linestatus, |
| sum(l_quantity) as sum_qty, |
| sum(l_extendedprice) as sum_base_price, |
| sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, |
| sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, |
| sum(l_quantity) as avg_qty, |
| sum(l_extendedprice) as avg_price, |
| sum(l_discount) as avg_disc, |
| count(*) as count_order |
| FROM |
| lineitem_orc_lz4 |
| GROUP BY |
| l_returnflag, |
| l_linestatus; |
| |
| SELECT |
| l_returnflag, |
| l_linestatus, |
| sum(l_quantity) as sum_qty, |
| sum(l_extendedprice) as sum_base_price, |
| sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, |
| sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, |
| sum(l_quantity) as avg_qty, |
| sum(l_extendedprice) as avg_price, |
| sum(l_discount) as avg_disc, |
| count(*) as count_order |
| FROM |
| lineitem_fast |
| GROUP BY |
| l_returnflag, |
| l_linestatus; |
| |
| |
| SELECT |
| l_returnflag, |
| l_linestatus, |
| sum(l_quantity) as sum_qty, |
| sum(l_extendedprice) as sum_base_price, |
| sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, |
| sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, |
| sum(l_quantity) as avg_qty, |
| sum(l_extendedprice) as avg_price, |
| sum(l_discount) as avg_disc, |
| count(*) as count_order |
| FROM |
| lineitem_orc_snappy |
| GROUP BY |
| l_returnflag, |
| l_linestatus; |
| |
| |
| SELECT |
| l_returnflag, |
| l_linestatus, |
| sum(l_quantity) as sum_qty, |
| sum(l_extendedprice) as sum_base_price, |
| sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, |
| sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, |
| sum(l_quantity) as avg_qty, |
| sum(l_extendedprice) as avg_price, |
| sum(l_discount) as avg_disc, |
| count(*) as count_order |
| FROM |
| lineitem_orc_none |
| GROUP BY |
| l_returnflag, |
| l_linestatus; |
| |
| SELECT |
| l_returnflag, |
| l_linestatus, |
| sum(l_quantity) as sum_qty, |
| sum(l_extendedprice) as sum_base_price, |
| sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, |
| sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, |
| sum(l_quantity) as avg_qty, |
| sum(l_extendedprice) as avg_price, |
| sum(l_discount) as avg_disc, |
| count(*) as count_order |
| FROM |
| lineitem_pg |
| GROUP BY |
| l_returnflag, |
| l_linestatus; |
| |
| |