| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.hcatalog.pig; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import junit.framework.Assert; |
| |
| import org.apache.hadoop.hive.cli.CliSessionState; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.ql.CommandNeedRetryException; |
| import org.apache.hadoop.hive.ql.Driver; |
| import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; |
| import org.apache.hadoop.hive.ql.session.SessionState; |
| import org.apache.pig.ExecType; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.executionengine.ExecJob; |
| import org.apache.pig.data.BagFactory; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.impl.logicalLayer.schema.Schema; |
| import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class TestHCatLoaderComplexSchema { |
| |
| //private static MiniCluster cluster = MiniCluster.buildCluster(); |
| private static Driver driver; |
| //private static Properties props; |
| private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoaderComplexSchema.class); |
| |
| private void dropTable(String tablename) throws IOException, CommandNeedRetryException { |
| driver.run("drop table " + tablename); |
| } |
| |
| private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException { |
| String createTable; |
| createTable = "create table " + tablename + "(" + schema + ") "; |
| if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) { |
| createTable = createTable + "partitioned by (" + partitionedBy + ") "; |
| } |
| createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," + |
| "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') "; |
| LOG.info("Creating table:\n {}", createTable); |
| CommandProcessorResponse result = driver.run(createTable); |
| int retCode = result.getResponseCode(); |
| if (retCode != 0) { |
| throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + " " + result.getErrorMessage() + "]"); |
| } |
| } |
| |
| private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException { |
| createTable(tablename, schema, null); |
| } |
| |
| @BeforeClass |
| public static void setUpBeforeClass() throws Exception { |
| |
| HiveConf hiveConf = new HiveConf(TestHCatLoaderComplexSchema.class); |
| hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); |
| hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); |
| hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); |
| driver = new Driver(hiveConf); |
| SessionState.start(new CliSessionState(hiveConf)); |
| //props = new Properties(); |
| //props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name")); |
| |
| } |
| |
| private static final TupleFactory tf = TupleFactory.getInstance(); |
| private static final BagFactory bf = BagFactory.getInstance(); |
| |
| private Tuple t(Object... objects) { |
| return tf.newTuple(Arrays.asList(objects)); |
| } |
| |
| private DataBag b(Tuple... objects) { |
| return bf.newDefaultBag(Arrays.asList(objects)); |
| } |
| |
| /** |
| * artificially complex nested schema to test nested schema conversion |
| * @throws Exception |
| */ |
| @Test |
| public void testSyntheticComplexSchema() throws Exception { |
| String pigSchema = |
| "(" + |
| "a: " + |
| "(" + |
| "aa: chararray, " + |
| "ab: long, " + |
| "ac: map[], " + |
| "ad: { t: (ada: long) }, " + |
| "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," + |
| "af: (afa: chararray, afb: long) " + |
| ")," + |
| "b: chararray, " + |
| "c: long, " + |
| "d: { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } " + |
| ")"; |
| |
| // with extra structs |
| String tableSchema = |
| "a struct<" + |
| "aa: string, " + |
| "ab: bigint, " + |
| "ac: map<string, string>, " + |
| "ad: array<struct<ada:bigint>>, " + |
| "ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," + |
| "af: struct<afa: string, afb: bigint> " + |
| ">, " + |
| "b string, " + |
| "c bigint, " + |
| "d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<struct<dca: bigint>>>>"; |
| |
| // without extra structs |
| String tableSchema2 = |
| "a struct<" + |
| "aa: string, " + |
| "ab: bigint, " + |
| "ac: map<string, string>, " + |
| "ad: array<bigint>, " + |
| "ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," + |
| "af: struct<afa: string, afb: bigint> " + |
| ">, " + |
| "b string, " + |
| "c bigint, " + |
| "d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<bigint>>>"; |
| |
| List<Tuple> data = new ArrayList<Tuple>(); |
| for (int i = 0; i < 10; i++) { |
| Tuple t = t( |
| t( |
| "aa test", |
| 2l, |
| new HashMap<String, String>() { |
| { |
| put("ac test1", "test 1"); |
| put("ac test2", "test 2"); |
| } |
| }, |
| b(t(3l), t(4l)), |
| b(t(5l, t("aeba test", 6l))), |
| t("afa test", 7l) |
| ), |
| "b test", |
| (long) i, |
| b(t(8l, t("dba test", 9l), b(t(10l))))); |
| |
| data.add(t); |
| } |
| verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, true); |
| verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, false); |
| verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, true); |
| verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, false); |
| |
| } |
| |
| private void verifyWriteRead(String tablename, String pigSchema, String tableSchema, List<Tuple> data, boolean provideSchemaToStorer) |
| throws IOException, CommandNeedRetryException, ExecException, FrontendException { |
| MockLoader.setData(tablename + "Input", data); |
| try { |
| createTable(tablename, tableSchema); |
| PigServer server = new PigServer(ExecType.LOCAL); |
| server.setBatchOn(); |
| server.registerQuery("A = load '" + tablename + "Input' using org.apache.hcatalog.pig.MockLoader() AS " + pigSchema + ";"); |
| Schema dumpedASchema = server.dumpSchema("A"); |
| server.registerQuery("STORE A into '" + tablename + "' using org.apache.hcatalog.pig.HCatStorer(" |
| + (provideSchemaToStorer ? "'', '" + pigSchema + "'" : "") |
| + ");"); |
| |
| ExecJob execJob = server.executeBatch().get(0); |
| if (!execJob.getStatistics().isSuccessful()) { |
| throw new RuntimeException("Import failed", execJob.getException()); |
| } |
| // test that schema was loaded correctly |
| server.registerQuery("X = load '" + tablename + "' using org.apache.hcatalog.pig.HCatLoader();"); |
| server.dumpSchema("X"); |
| Iterator<Tuple> it = server.openIterator("X"); |
| int i = 0; |
| while (it.hasNext()) { |
| Tuple input = data.get(i++); |
| Tuple output = it.next(); |
| Assert.assertEquals(input.toString(), output.toString()); |
| LOG.info("tuple : {} ", output); |
| } |
| Schema dumpedXSchema = server.dumpSchema("X"); |
| |
| Assert.assertEquals( |
| "expected " + dumpedASchema + " but was " + dumpedXSchema + " (ignoring field names)", |
| "", |
| compareIgnoreFiledNames(dumpedASchema, dumpedXSchema)); |
| |
| } finally { |
| dropTable(tablename); |
| } |
| } |
| |
| private String compareIgnoreFiledNames(Schema expected, Schema got) throws FrontendException { |
| if (expected == null || got == null) { |
| if (expected == got) { |
| return ""; |
| } else { |
| return "\nexpected " + expected + " got " + got; |
| } |
| } |
| if (expected.size() != got.size()) { |
| return "\nsize expected " + expected.size() + " (" + expected + ") got " + got.size() + " (" + got + ")"; |
| } |
| String message = ""; |
| for (int i = 0; i < expected.size(); i++) { |
| FieldSchema expectedField = expected.getField(i); |
| FieldSchema gotField = got.getField(i); |
| if (expectedField.type != gotField.type) { |
| message += "\ntype expected " + expectedField.type + " (" + expectedField + ") got " + gotField.type + " (" + gotField + ")"; |
| } else { |
| message += compareIgnoreFiledNames(expectedField.schema, gotField.schema); |
| } |
| } |
| return message; |
| } |
| |
| /** |
| * tests that unnecessary tuples are drop while converting schema |
| * (Pig requires Tuples in Bags) |
| * @throws Exception |
| */ |
| @Test |
| public void testTupleInBagInTupleInBag() throws Exception { |
| String pigSchema = "(a: { b : ( c: { d: (i : long) } ) })"; |
| |
| String tableSchema = "a array< array< bigint > >"; |
| |
| List<Tuple> data = new ArrayList<Tuple>(); |
| data.add(t(b(t(b(t(100l), t(101l))), t(b(t(110l)))))); |
| data.add(t(b(t(b(t(200l))), t(b(t(210l))), t(b(t(220l)))))); |
| data.add(t(b(t(b(t(300l), t(301l)))))); |
| data.add(t(b(t(b(t(400l))), t(b(t(410l), t(411l), t(412l)))))); |
| |
| |
| verifyWriteRead("TupleInBagInTupleInBag1", pigSchema, tableSchema, data, true); |
| verifyWriteRead("TupleInBagInTupleInBag2", pigSchema, tableSchema, data, false); |
| |
| // test that we don't drop the unnecessary tuple if the table has the corresponding Struct |
| String tableSchema2 = "a array< struct< c: array< struct< i: bigint > > > >"; |
| |
| verifyWriteRead("TupleInBagInTupleInBag3", pigSchema, tableSchema2, data, true); |
| verifyWriteRead("TupleInBagInTupleInBag4", pigSchema, tableSchema2, data, false); |
| |
| } |
| |
| @Test |
| public void testMapWithComplexData() throws Exception { |
| String pigSchema = "(a: long, b: map[])"; |
| String tableSchema = "a bigint, b map<string, struct<aa:bigint, ab:string>>"; |
| |
| List<Tuple> data = new ArrayList<Tuple>(); |
| for (int i = 0; i < 10; i++) { |
| Tuple t = t( |
| (long) i, |
| new HashMap<String, Object>() { |
| { |
| put("b test 1", t(1l, "test 1")); |
| put("b test 2", t(2l, "test 2")); |
| } |
| }); |
| |
| data.add(t); |
| } |
| verifyWriteRead("testMapWithComplexData", pigSchema, tableSchema, data, true); |
| verifyWriteRead("testMapWithComplexData2", pigSchema, tableSchema, data, false); |
| |
| } |
| } |