blob: 2ba5af3ae50e665ad313952d6d333f2ebdaf8def [file] [log] [blame]
1. ORC read process:
FileInputStream -> SnappyDecompressionStream -> RLE decoder -> raw bytes
2. ORC write process
raw bytes -> RLE encoder -> SnappyCompressionStream -> FileOutputStream
3. TODOs
1) url format hdfs://localhost:8020/user/hive/warehouse/tsmallint/” or “hdfs://localhost:8020//user/hive/warehouse/tsmallint” are not correct
2) now some types are supported: tinyint, smallint, int, bigint,float, double, string, varchar
need more type support: boolean/date/char(x)/timestamp/decimal/struct et al
TO add a type support, we need to pay special attention to:
a. TypeImpl::createRowBatch: this function returns correct ColumnVectorBatch
b. buildReader -- this function returns reader of correct type
3) null value handling performance enhancement
4) writer
orc::WriterOptions opts;
std::unique_ptr<orc::Writer> writer;
dbcommon::URL url(filename);
dbcommon::FileSystemManager fsm;
dbcommon::FileSystem *fs = fsm.get(url.getNormalizedServiceName());
writer = orc::createWriter(orc::writeFile(fs, url.getPath()), opts);
std::unique_ptr<orc::ColumnVectorBatch> batch = writer->createRowBatch(1000);
for (uint32_t i = 0; i < 100; i++) {
writer->addRowBatch(batch);
}
writer->close();
5) hasEnoughSpaceForBatch needs to be revisited
// estimated tuple batch size
// TODO(lei): might need to be revised after we figure out how to
// store other types
6) read footer only once on master, then dispatch it to worker. this can potentially
avoid all opening footers at the same time
7) add some boundary numbers tests for orc format: for example, max(int32_t) for different
encoding schemes - delta, direct, patchedbase, short repeat.
8) need to compare the performance & compression ration for lz4 and snappy
9) write more information to orc file: statistics, indexes. otherwise,
it needs data reloading when we added the feature in reader.
10) add tests for snappy (since lz4 is now default), and add tests for snappycompressor/lz4compressor
4. How to use
-- hive
create table tcn(t tinyint, s smallint, i int, b bigint, f float, d double, str string, v varchar(10), c char(4), bin binary) stored as orc;
insert into tcn values (1, 2, 3, 4, 1.1, 1.2, 'string', 'var', 'char', 'binary');
select * from tcn;
-- computenode
create table tcn(t tinyint, s smallint, i int, b bigint, f float, d double, str string, v varchar(10), c string, bin string)
with (format = orc, location= 'hdfs://localhost:8020/user/hive/warehouse/tcn');
select * from tcn;
5. micro benchmark: based on 2016 Oct 15 version (after analyze)
NOTE: Analyze is very important for aggregation.
lz4 fast snappy orcnone postgres
filesize 426MB 1042MB 402MB 696M 985MB
load 20239ms 20784ms 20098ms 19944ms 22767ms(copy)
count* 32ms 32ms 31ms 30ms 639ms
countint 96ms 42ms 98ms 97ms 745ms
countstring 179ms 85ms 235ms 120ms 1154ms
count2int2string 444ms 165ms 501ms 365ms 1596ms
tpch-Q1 1025ms 386ms 1072ms 896ms 3830ms
The benchmark used is:
1) schema
CREATE TABLE e_LINEITEM ( L_ORDERKEY int,
L_PARTKEY int,
L_SUPPKEY int,
L_LINENUMBER int,
L_QUANTITY double,
L_EXTENDEDPRICE double,
L_DISCOUNT double,
L_TAX double,
L_RETURNFLAG string,
L_LINESTATUS string,
L_SHIPDATE string,
L_COMMITDATE string,
L_RECEIPTDATE string,
L_SHIPINSTRUCT string,
L_SHIPMODE string,
L_COMMENT string) with (FORMAT = command, COMMANDS = '/Users/ChangLei/curwork/dev/computenode/inst/bin/dbgen -b /Users/ChangLei/curwork/dev/computenode/inst/bin/dists.dss -T L -s 1 -C 2 -S $TASKNO', TaskCount = 2);
CREATE TABLE lineitem_orc_lz4 ( L_ORDERKEY int,
L_PARTKEY int,
L_SUPPKEY int,
L_LINENUMBER int,
L_QUANTITY double,
L_EXTENDEDPRICE double,
L_DISCOUNT double,
L_TAX double,
L_RETURNFLAG string,
L_LINESTATUS string,
L_SHIPDATE string,
L_COMMITDATE string,
L_RECEIPTDATE string,
L_SHIPINSTRUCT string,
L_SHIPMODE string,
L_COMMENT string) with(format = orc, location='file:///tmp/lineitem_orc_lz4');
CREATE TABLE lineitem_fast ( L_ORDERKEY int,
L_PARTKEY int,
L_SUPPKEY int,
L_LINENUMBER int,
L_QUANTITY double,
L_EXTENDEDPRICE double,
L_DISCOUNT double,
L_TAX double,
L_RETURNFLAG string,
L_LINESTATUS string,
L_SHIPDATE string,
L_COMMITDATE string,
L_RECEIPTDATE string,
L_SHIPINSTRUCT string,
L_SHIPMODE string,
L_COMMENT string) with(format = fast, location='file:///tmp/lineitem_fast');
CREATE TABLE lineitem_orc_snappy ( L_ORDERKEY int,
L_PARTKEY int,
L_SUPPKEY int,
L_LINENUMBER int,
L_QUANTITY double,
L_EXTENDEDPRICE double,
L_DISCOUNT double,
L_TAX double,
L_RETURNFLAG string,
L_LINESTATUS string,
L_SHIPDATE string,
L_COMMITDATE string,
L_RECEIPTDATE string,
L_SHIPINSTRUCT string,
L_SHIPMODE string,
L_COMMENT string) with(format = orc, location='file:///tmp/lineitem_orc_snappy');
CREATE TABLE lineitem_orc_none ( L_ORDERKEY int,
L_PARTKEY int,
L_SUPPKEY int,
L_LINENUMBER int,
L_QUANTITY double,
L_EXTENDEDPRICE double,
L_DISCOUNT double,
L_TAX double,
L_RETURNFLAG string,
L_LINESTATUS string,
L_SHIPDATE string,
L_COMMITDATE string,
L_RECEIPTDATE string,
L_SHIPINSTRUCT string,
L_SHIPMODE string,
L_COMMENT string) with(format = orc, location='file:///tmp/lineitem_orc_none');
CREATE TABLE lineitem_pg ( L_ORDERKEY int,
L_PARTKEY int,
L_SUPPKEY int,
L_LINENUMBER int,
L_QUANTITY double precision,
L_EXTENDEDPRICE double precision,
L_DISCOUNT double precision,
L_TAX double precision,
L_RETURNFLAG varchar,
L_LINESTATUS varchar,
L_SHIPDATE varchar,
L_COMMITDATE varchar,
L_RECEIPTDATE varchar,
L_SHIPINSTRUCT varchar,
L_SHIPMODE varchar,
L_COMMENT varchar);
2) loading
insert into lineitem_orc_lz4 select * from e_lineitem;
insert into lineitem_fast select * from e_lineitem;
insert into lineitem_orc_snappy select * from e_lineitem;
insert into lineitem_orc_none select * from e_lineitem;
copy lineitem_pg from '/Users/ChangLei/curwork/dev/tpch-dbgen/lineitem.tbl' with delimiter '|';
analyze lineitem_orc_lz4;
analyze lineitem_fast;
analyze lineitem_orc_snappy;
analyze lineitem_orc_none;
analyze lineitem_pg;
3) count*
select count(*) from lineitem_orc_lz4;
select count(*) from lineitem_fast;
select count(*) from lineitem_orc_snappy;
select count(*) from lineitem_orc_none;
select count(*) from lineitem_pg;
4) countint
select count(L_ORDERKEY) from lineitem_orc_lz4;
select count(L_ORDERKEY) from lineitem_fast;
select count(L_ORDERKEY) from lineitem_orc_snappy;
select count(L_ORDERKEY) from lineitem_orc_none;
select count(L_ORDERKEY) from lineitem_pg;
5) countstring
select count(L_COMMENT) from lineitem_orc_lz4;
select count(L_COMMENT) from lineitem_fast;
select count(L_COMMENT) from lineitem_orc_snappy;
select count(L_COMMENT) from lineitem_orc_none;
select count(L_COMMENT) from lineitem_pg;
6) count2int2string
select count(L_ORDERKEY), count(L_SUPPKEY), count(L_SHIPMODE), count(L_COMMENT) from lineitem_orc_lz4;
select count(L_ORDERKEY), count(L_SUPPKEY), count(L_SHIPMODE), count(L_COMMENT) from lineitem_fast;
select count(L_ORDERKEY), count(L_SUPPKEY), count(L_SHIPMODE), count(L_COMMENT) from lineitem_orc_snappy;
select count(L_ORDERKEY), count(L_SUPPKEY), count(L_SHIPMODE), count(L_COMMENT) from lineitem_orc_none;
select count(L_ORDERKEY), count(L_SUPPKEY), count(L_SHIPMODE), count(L_COMMENT) from lineitem_pg;
7) tpch-Q1
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
sum(l_quantity) as avg_qty,
sum(l_extendedprice) as avg_price,
sum(l_discount) as avg_disc,
count(*) as count_order
FROM
lineitem_orc_lz4
GROUP BY
l_returnflag,
l_linestatus;
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
sum(l_quantity) as avg_qty,
sum(l_extendedprice) as avg_price,
sum(l_discount) as avg_disc,
count(*) as count_order
FROM
lineitem_fast
GROUP BY
l_returnflag,
l_linestatus;
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
sum(l_quantity) as avg_qty,
sum(l_extendedprice) as avg_price,
sum(l_discount) as avg_disc,
count(*) as count_order
FROM
lineitem_orc_snappy
GROUP BY
l_returnflag,
l_linestatus;
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
sum(l_quantity) as avg_qty,
sum(l_extendedprice) as avg_price,
sum(l_discount) as avg_disc,
count(*) as count_order
FROM
lineitem_orc_none
GROUP BY
l_returnflag,
l_linestatus;
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
sum(l_quantity) as avg_qty,
sum(l_extendedprice) as avg_price,
sum(l_discount) as avg_disc,
count(*) as count_order
FROM
lineitem_pg
GROUP BY
l_returnflag,
l_linestatus;