blob: bb82ee815a01e8c41240b72c10b59c8b1f7f9ade [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.pig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestHCatLoaderComplexSchema {
//private static MiniCluster cluster = MiniCluster.buildCluster();
private static Driver driver;
//private static Properties props;
private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoaderComplexSchema.class);
private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
driver.run("drop table " + tablename);
}
private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
String createTable;
createTable = "create table " + tablename + "(" + schema + ") ";
if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
createTable = createTable + "partitioned by (" + partitionedBy + ") ";
}
createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
"'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
LOG.info("Creating table:\n {}", createTable);
CommandProcessorResponse result = driver.run(createTable);
int retCode = result.getResponseCode();
if (retCode != 0) {
throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + " " + result.getErrorMessage() + "]");
}
}
private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
createTable(tablename, schema, null);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
HiveConf hiveConf = new HiveConf(TestHCatLoaderComplexSchema.class);
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
driver = new Driver(hiveConf);
SessionState.start(new CliSessionState(hiveConf));
//props = new Properties();
//props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
}
private static final TupleFactory tf = TupleFactory.getInstance();
private static final BagFactory bf = BagFactory.getInstance();
private Tuple t(Object... objects) {
return tf.newTuple(Arrays.asList(objects));
}
private DataBag b(Tuple... objects) {
return bf.newDefaultBag(Arrays.asList(objects));
}
/**
* artificially complex nested schema to test nested schema conversion
* @throws Exception
*/
@Test
public void testSyntheticComplexSchema() throws Exception {
String pigSchema =
"(" +
"a: " +
"(" +
"aa: chararray, " +
"ab: long, " +
"ac: map[], " +
"ad: { t: (ada: long) }, " +
"ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," +
"af: (afa: chararray, afb: long) " +
")," +
"b: chararray, " +
"c: long, " +
"d: { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } " +
")";
// with extra structs
String tableSchema =
"a struct<" +
"aa: string, " +
"ab: bigint, " +
"ac: map<string, string>, " +
"ad: array<struct<ada:bigint>>, " +
"ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," +
"af: struct<afa: string, afb: bigint> " +
">, " +
"b string, " +
"c bigint, " +
"d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<struct<dca: bigint>>>>";
// without extra structs
String tableSchema2 =
"a struct<" +
"aa: string, " +
"ab: bigint, " +
"ac: map<string, string>, " +
"ad: array<bigint>, " +
"ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," +
"af: struct<afa: string, afb: bigint> " +
">, " +
"b string, " +
"c bigint, " +
"d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<bigint>>>";
List<Tuple> data = new ArrayList<Tuple>();
for (int i = 0; i < 10; i++) {
Tuple t = t(
t(
"aa test",
2l,
new HashMap<String, String>() {
{
put("ac test1", "test 1");
put("ac test2", "test 2");
}
},
b(t(3l), t(4l)),
b(t(5l, t("aeba test", 6l))),
t("afa test", 7l)
),
"b test",
(long) i,
b(t(8l, t("dba test", 9l), b(t(10l)))));
data.add(t);
}
verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, true);
verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, false);
verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, true);
verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, false);
}
private void verifyWriteRead(String tablename, String pigSchema, String tableSchema, List<Tuple> data, boolean provideSchemaToStorer)
throws IOException, CommandNeedRetryException, ExecException, FrontendException {
MockLoader.setData(tablename + "Input", data);
try {
createTable(tablename, tableSchema);
PigServer server = new PigServer(ExecType.LOCAL);
server.setBatchOn();
server.registerQuery("A = load '" + tablename + "Input' using org.apache.hcatalog.pig.MockLoader() AS " + pigSchema + ";");
Schema dumpedASchema = server.dumpSchema("A");
server.registerQuery("STORE A into '" + tablename + "' using org.apache.hcatalog.pig.HCatStorer("
+ (provideSchemaToStorer ? "'', '" + pigSchema + "'" : "")
+ ");");
ExecJob execJob = server.executeBatch().get(0);
if (!execJob.getStatistics().isSuccessful()) {
throw new RuntimeException("Import failed", execJob.getException());
}
// test that schema was loaded correctly
server.registerQuery("X = load '" + tablename + "' using org.apache.hcatalog.pig.HCatLoader();");
server.dumpSchema("X");
Iterator<Tuple> it = server.openIterator("X");
int i = 0;
while (it.hasNext()) {
Tuple input = data.get(i++);
Tuple output = it.next();
Assert.assertEquals(input.toString(), output.toString());
LOG.info("tuple : {} ", output);
}
Schema dumpedXSchema = server.dumpSchema("X");
Assert.assertEquals(
"expected " + dumpedASchema + " but was " + dumpedXSchema + " (ignoring field names)",
"",
compareIgnoreFiledNames(dumpedASchema, dumpedXSchema));
} finally {
dropTable(tablename);
}
}
private String compareIgnoreFiledNames(Schema expected, Schema got) throws FrontendException {
if (expected == null || got == null) {
if (expected == got) {
return "";
} else {
return "\nexpected " + expected + " got " + got;
}
}
if (expected.size() != got.size()) {
return "\nsize expected " + expected.size() + " (" + expected + ") got " + got.size() + " (" + got + ")";
}
String message = "";
for (int i = 0; i < expected.size(); i++) {
FieldSchema expectedField = expected.getField(i);
FieldSchema gotField = got.getField(i);
if (expectedField.type != gotField.type) {
message += "\ntype expected " + expectedField.type + " (" + expectedField + ") got " + gotField.type + " (" + gotField + ")";
} else {
message += compareIgnoreFiledNames(expectedField.schema, gotField.schema);
}
}
return message;
}
/**
* tests that unnecessary tuples are drop while converting schema
* (Pig requires Tuples in Bags)
* @throws Exception
*/
@Test
public void testTupleInBagInTupleInBag() throws Exception {
String pigSchema = "(a: { b : ( c: { d: (i : long) } ) })";
String tableSchema = "a array< array< bigint > >";
List<Tuple> data = new ArrayList<Tuple>();
data.add(t(b(t(b(t(100l), t(101l))), t(b(t(110l))))));
data.add(t(b(t(b(t(200l))), t(b(t(210l))), t(b(t(220l))))));
data.add(t(b(t(b(t(300l), t(301l))))));
data.add(t(b(t(b(t(400l))), t(b(t(410l), t(411l), t(412l))))));
verifyWriteRead("TupleInBagInTupleInBag1", pigSchema, tableSchema, data, true);
verifyWriteRead("TupleInBagInTupleInBag2", pigSchema, tableSchema, data, false);
// test that we don't drop the unnecessary tuple if the table has the corresponding Struct
String tableSchema2 = "a array< struct< c: array< struct< i: bigint > > > >";
verifyWriteRead("TupleInBagInTupleInBag3", pigSchema, tableSchema2, data, true);
verifyWriteRead("TupleInBagInTupleInBag4", pigSchema, tableSchema2, data, false);
}
@Test
public void testMapWithComplexData() throws Exception {
String pigSchema = "(a: long, b: map[])";
String tableSchema = "a bigint, b map<string, struct<aa:bigint, ab:string>>";
List<Tuple> data = new ArrayList<Tuple>();
for (int i = 0; i < 10; i++) {
Tuple t = t(
(long) i,
new HashMap<String, Object>() {
{
put("b test 1", t(1l, "test 1"));
put("b test 2", t(2l, "test 2"));
}
});
data.add(t);
}
verifyWriteRead("testMapWithComplexData", pigSchema, tableSchema, data, true);
verifyWriteRead("testMapWithComplexData2", pigSchema, tableSchema, data, false);
}
}