blob: 8b6f6edb21a116ef7ce7c0e9e5e30ed4a0ae83f9 [file] [log] [blame]
#!/usr/bin/env impala-python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''This script creates a nested version of TPC-H. Non-nested TPC-H must already be
loaded.
'''
import logging
import os
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from tests.common.environ import HIVE_MAJOR_VERSION
import tests.comparison.cli_options as cli_options
LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
# We use Hive to transform nested text tables into parquet/orc tables. To control the
# compression, the property keys differ from file formats (see COMPRESSION_KEYS_MAP).
# The property values also differ from short names used in Impala (e.g. snap, def).
# So we define COMPRESSION_VALUES_MAP for the mapping.
COMPRESSION_KEYS_MAP = {
"parquet": "parquet.compression",
"orc": "orc.compress"
}
COMPRESSION_VALUES_MAP = {
# Currently, only three codecs are supported in Hive for Parquet. See codes in
# org.apache.parquet.hadoop.metadata.CompressionCodecName (parquet-hadoop-bundle)
"parquet": {
"none": "SNAPPY",
"snap": "SNAPPY",
"gzip": "GZIP",
"lzo": "LZO"
},
# Currently, only three codecs are supported in Hive for ORC. See Hive codes in
# org.apache.orc.impl.WriterImpl#createCodec (in module hive-orc)
"orc": {
"none": "NONE",
"def": "ZLIB",
"snap": "SNAPPY"
}
}
# These vars are set after arg parsing.
cluster = None
source_db = None
target_db = None
file_format = None
compression_key = None
compression_value = None
chunks = None
def is_loaded():
with cluster.impala.cursor() as cursor:
try:
# If the part table exists, assume everything is already loaded.
cursor.execute("DESCRIBE %s.part" % target_db)
return True
except Exception as e:
if "AnalysisException" not in str(e):
raise
return False
def load():
# As of this writing, Impala isn't able to write nested data in parquet format.
# Instead, the data will be written in text format, then Hive will be used to
# convert from text to parquet.
with cluster.impala.cursor() as impala:
impala.ensure_empty_db(target_db)
impala.execute("USE %s" % target_db)
external = ""
tblproperties = "'{0}' = '{1}'".format(compression_key, compression_value)
# For Hive 3+, workaround for HIVE-22371 (CTAS puts files in the wrong place) by
# explicitly creating an external table so that files are in the external warehouse
# directory. Use external.table.purge=true so that it is equivalent to a Hive 2
# managed table.
if HIVE_MAJOR_VERSION >= 3:
external = "EXTERNAL"
tblproperties += ",'external.table.purge'='TRUE'"
sql_params = {
"source_db": source_db,
"target_db": target_db,
"file_format": file_format,
"chunks": chunks,
"warehouse_dir": cluster.hive.warehouse_dir,
"tblproperties": tblproperties,
"external": external}
# Split table creation into multiple queries or "chunks" so less memory is needed.
for chunk_idx in xrange(chunks):
sql_params["chunk_idx"] = chunk_idx
# Create the nested data in text format. The \00#'s are nested field terminators,
# where the numbers correspond to the nesting level.
tmp_orders_sql = r"""
SELECT STRAIGHT_JOIN
o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate,
o_orderpriority, o_clerk, o_shippriority, o_comment,
GROUP_CONCAT(
CONCAT(
CAST(l_partkey AS STRING), '\005',
CAST(l_suppkey AS STRING), '\005',
CAST(l_linenumber AS STRING), '\005',
CAST(l_quantity AS STRING), '\005',
CAST(l_extendedprice AS STRING), '\005',
CAST(l_discount AS STRING), '\005',
CAST(l_tax AS STRING), '\005',
CAST(l_returnflag AS STRING), '\005',
CAST(l_linestatus AS STRING), '\005',
CAST(l_shipdate AS STRING), '\005',
CAST(l_commitdate AS STRING), '\005',
CAST(l_receiptdate AS STRING), '\005',
CAST(l_shipinstruct AS STRING), '\005',
CAST(l_shipmode AS STRING), '\005',
CAST(l_comment AS STRING)
), '\004'
) AS lineitems_string
FROM {source_db}.lineitem
INNER JOIN [SHUFFLE] {source_db}.orders ON o_orderkey = l_orderkey
WHERE o_orderkey % {chunks} = {chunk_idx}
GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9""".format(**sql_params)
LOG.info("Creating temp orders (chunk {chunk} of {chunks})".format(
chunk=(chunk_idx + 1), chunks=chunks))
if chunk_idx == 0:
impala.execute("CREATE TABLE tmp_orders_string AS " + tmp_orders_sql)
else:
impala.execute("INSERT INTO TABLE tmp_orders_string " + tmp_orders_sql)
for chunk_idx in xrange(chunks):
sql_params["chunk_idx"] = chunk_idx
tmp_customer_sql = r"""
SELECT STRAIGHT_JOIN
c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment,
c_comment,
GROUP_CONCAT(
CONCAT(
CAST(o_orderkey AS STRING), '\003',
CAST(o_orderstatus AS STRING), '\003',
CAST(o_totalprice AS STRING), '\003',
CAST(o_orderdate AS STRING), '\003',
CAST(o_orderpriority AS STRING), '\003',
CAST(o_clerk AS STRING), '\003',
CAST(o_shippriority AS STRING), '\003',
CAST(o_comment AS STRING), '\003',
CAST(lineitems_string AS STRING)
), '\002'
) orders_string
FROM tmp_orders_string
RIGHT JOIN [SHUFFLE] {source_db}.customer ON c_custkey = o_custkey
WHERE c_custkey % {chunks} = {chunk_idx}
GROUP BY 1, 2, 3, 4, 5, 6, 7, 8""".format(**sql_params)
LOG.info("Creating temp customers (chunk {chunk} of {chunks})".format(
chunk=(chunk_idx + 1), chunks=chunks))
if chunk_idx == 0:
impala.execute("CREATE TABLE tmp_customer_string AS " + tmp_customer_sql)
else:
impala.execute("INSERT INTO TABLE tmp_customer_string " + tmp_customer_sql)
# Create a table with nested schema to read the text file we generated above. Impala
# is currently unable to read from this table. We will use Hive to read from it in
# order to convert the table to parquet.
impala.execute("""
CREATE EXTERNAL TABLE tmp_customer (
c_custkey BIGINT,
c_name STRING,
c_address STRING,
c_nationkey SMALLINT,
c_phone STRING,
c_acctbal DECIMAL(12, 2),
c_mktsegment STRING,
c_comment STRING,
c_orders ARRAY<STRUCT<
o_orderkey: BIGINT,
o_orderstatus: STRING,
o_totalprice: DECIMAL(12, 2),
o_orderdate: STRING,
o_orderpriority: STRING,
o_clerk: STRING,
o_shippriority: INT,
o_comment: STRING,
o_lineitems: ARRAY<STRUCT<
l_partkey: BIGINT,
l_suppkey: BIGINT,
l_linenumber: INT,
l_quantity: DECIMAL(12, 2),
l_extendedprice: DECIMAL(12, 2),
l_discount: DECIMAL(12, 2),
l_tax: DECIMAL(12, 2),
l_returnflag: STRING,
l_linestatus: STRING,
l_shipdate: STRING,
l_commitdate: STRING,
l_receiptdate: STRING,
l_shipinstruct: STRING,
l_shipmode: STRING,
l_comment: STRING>>>>)
STORED AS TEXTFILE
LOCATION '{warehouse_dir}/{target_db}.db/tmp_customer_string'"""\
.format(**sql_params))
# Create the temporary region table with nested nation. This table doesn't seem to
# get too big so we don't partition it (like we did with customer).
LOG.info("Creating temp regions")
impala.execute(r"""
CREATE TABLE tmp_region_string
AS SELECT
r_regionkey, r_name, r_comment,
GROUP_CONCAT(
CONCAT(
CAST(n_nationkey AS STRING), '\003',
CAST(n_name AS STRING), '\003',
CAST(n_comment AS STRING)
), '\002'
) nations_string
FROM {source_db}.region
JOIN {source_db}.nation ON r_regionkey = n_regionkey
GROUP BY 1, 2, 3""".format(**sql_params))
impala.execute("""
CREATE EXTERNAL TABLE tmp_region (
r_regionkey SMALLINT,
r_name STRING,
r_comment STRING,
r_nations ARRAY<STRUCT<
n_nationkey: SMALLINT,
n_name: STRING,
n_comment: STRING>>)
STORED AS TEXTFILE
LOCATION '{warehouse_dir}/{target_db}.db/tmp_region_string'"""\
.format(**sql_params))
# Several suppliers supply the same part so the actual part data is not nested to
# avoid duplicated data.
LOG.info("Creating temp suppliers")
impala.execute(r"""
CREATE TABLE tmp_supplier_string AS
SELECT
s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment,
GROUP_CONCAT(
CONCAT(
CAST(ps_partkey AS STRING), '\003',
CAST(ps_availqty AS STRING), '\003',
CAST(ps_supplycost AS STRING), '\003',
CAST(ps_comment AS STRING)
), '\002'
) partsupps_string
FROM {source_db}.supplier
JOIN {source_db}.partsupp ON s_suppkey = ps_suppkey
GROUP BY 1, 2, 3, 4, 5, 6, 7""".format(**sql_params))
impala.execute("""
CREATE EXTERNAL TABLE tmp_supplier (
s_suppkey BIGINT,
s_name STRING,
s_address STRING,
s_nationkey SMALLINT,
s_phone STRING,
s_acctbal DECIMAL(12,2),
s_comment STRING,
s_partsupps ARRAY<STRUCT<
ps_partkey: BIGINT,
ps_availqty: INT,
ps_supplycost: DECIMAL(12,2),
ps_comment: STRING>>)
STORED AS TEXTFILE
LOCATION '{warehouse_dir}/{target_db}.db/tmp_supplier_string'"""\
.format(**sql_params))
# The part table doesn't have nesting. If it's a parquet table, we create it in Impala
LOG.info("Creating parts")
if file_format == "parquet":
impala.execute("""
CREATE EXTERNAL TABLE part
STORED AS PARQUET
AS SELECT * FROM {source_db}.part""".format(**sql_params))
cluster.hdfs.ensure_home_dir()
if file_format == "orc":
# For ORC format, we create the 'part' table by Hive
with cluster.hive.cursor(db_name=target_db) as hive:
hive.execute("""
CREATE {external} TABLE part
STORED AS ORC
TBLPROPERTIES({tblproperties})
AS SELECT * FROM {source_db}.part""".format(**sql_params))
# Hive is used to convert the data into parquet/orc and drop all the temp tables.
# The Hive SET values are necessary to prevent Impala remote reads of parquet files.
# These values are taken from http://blog.cloudera.com/blog/2014/12/the-impala-cookbook.
create_final_tables_sql = """
SET mapred.min.split.size=1073741824;
SET parquet.block.size=10737418240;
SET dfs.block.size=1073741824;
CREATE {external} TABLE region
STORED AS {file_format}
TBLPROPERTIES({tblproperties})
AS SELECT * FROM tmp_region;
CREATE {external} TABLE supplier
STORED AS {file_format}
TBLPROPERTIES({tblproperties})
AS SELECT * FROM tmp_supplier;"""
# A simple CTAS for tpch_nested_parquet.customer would create 3 files with Hive3 vs
# 4 files with Hive2. This difference would break several tests, and it seemed
# easier to hack the loading of the table than to add Hive version specific behavior
# for each affected test. A small part of the table is inserted in a separate statement
# to generate the +1 file (needs hive.merge.tezfiles to avoid creating +3 files).
# TODO: find a less hacky way to ensure a fix number of files
if HIVE_MAJOR_VERSION >= 3 and file_format == "parquet":
create_final_tables_sql += """
CREATE {external} TABLE customer
STORED AS {file_format}
TBLPROPERTIES({tblproperties})
AS SELECT * FROM tmp_customer
WHERE c_custkey >= 10;
INSERT INTO customer
SELECT * FROM tmp_customer
WHERE c_custkey < 10;"""
else:
create_final_tables_sql += """
CREATE {external} TABLE customer
STORED AS {file_format}
TBLPROPERTIES({tblproperties})
AS SELECT * FROM tmp_customer;"""
with cluster.hive.cursor(db_name=target_db) as hive:
LOG.info("Converting temp tables")
for stmt in create_final_tables_sql.format(**sql_params).split(";"):
if not stmt.strip():
continue
LOG.info("Executing: {0}".format(stmt))
hive.execute(stmt)
with cluster.impala.cursor(db_name=target_db) as impala:
# Drop the temporary tables. These temporary tables were created
# in Impala, so they exist in Impala's metadata. This drop is executed by
# Impala so that the metadata is automatically updated.
for stmt in """
DROP TABLE tmp_orders_string;
DROP TABLE tmp_customer_string;
DROP TABLE tmp_customer;
DROP TABLE tmp_region_string;
DROP TABLE tmp_region;
DROP TABLE tmp_supplier;
DROP TABLE tmp_supplier_string;""".split(";"):
if not stmt.strip():
continue
LOG.info("Executing: {0}".format(stmt))
impala.execute(stmt)
impala.invalidate_metadata(table_name="customer")
impala.invalidate_metadata(table_name="part")
impala.invalidate_metadata(table_name="region")
impala.invalidate_metadata(table_name="supplier")
impala.compute_stats()
LOG.info("Done loading nested TPCH data")
if __name__ == "__main__":
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
cli_options.add_logging_options(parser)
cli_options.add_cluster_options(parser) # --cm-host and similar args added here
cli_options.add_kerberos_options(parser)
cli_options.add_ssl_options(parser)
parser.add_argument("-s", "--source-db", default="tpch_parquet")
parser.add_argument("-t", "--target-db", default="tpch_nested_parquet")
parser.add_argument("-f", "--table-format", default="parquet/none") # can be "orc/def"
parser.add_argument("-c", "-p", "--chunks", type=int, default=1)
args = parser.parse_args()
cli_options.configure_logging(args.log_level, debug_log_file=args.debug_log_file)
cluster = cli_options.create_cluster(args)
source_db = args.source_db
target_db = args.target_db
file_format, compression_value = args.table_format.split("/")
# 'compression_value' is one of [none,def,gzip,bzip,snap,lzo]. We should translate it
# into values that can be set to Hive.
if file_format not in COMPRESSION_KEYS_MAP:
raise Exception("Nested types in file format %s are not supported" % file_format)
compression_key = COMPRESSION_KEYS_MAP[file_format]
if compression_value not in COMPRESSION_VALUES_MAP[file_format]:
raise Exception("Loading %s tables in %s compression is not supported by Hive. "
"Supported compressions: %s"
% (file_format, compression_value,
str(COMPRESSION_VALUES_MAP[file_format].keys())))
compression_value = COMPRESSION_VALUES_MAP[file_format][compression_value]
chunks = args.chunks
if is_loaded():
LOG.info("Data is already loaded")
else:
load()