blob: e9406f76b951ecd583137ff9029e0f142594c8db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.apache.pig.builtin.mock.Storage.resetData;
import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.test.utils.TypeCheckingTestUtil;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestPigStorage {
private static PigServer pig;
private static PigContext pigContext;
private static final String datadir = "build/test/tmpdata/";
@Before
public void setup() throws Exception {
// some tests are in map-reduce mode and some in local - so before
// each test, we will re-initialize FileLocalizer so that temp files
// are created correctly depending on the ExecType in the test.
Util.resetStateForExecModeSwitch();
// If needed, a test can change that. Most tests are local so we save a bit
// of typing here.
pig = new PigServer(ExecType.LOCAL);
Util.deleteDirectory(new File(datadir));
try {
pig.mkdirs(datadir);
} catch (IOException e) {};
Util.createLocalInputFile(datadir + "originput",
new String[] {"A,1", "B,2", "C,3", "D,2",
"A,5", "B,5", "C,8", "A,8",
"D,8", "A,9"});
}
@After
public void tearDown() throws Exception {
Util.deleteDirectory(new File(datadir));
pig.shutdown();
}
@BeforeClass
public static void oneTimeSetup() {
pigContext = new PigContext(ExecType.LOCAL, new Properties());
}
private static void assertAliasIs(String alias, List<Tuple> expectedResults)
throws IOException {
Iterator<Tuple> iter = pig.openIterator(alias);
int counter = 0;
while (iter.hasNext()) {
Assert.assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
Assert.assertEquals(expectedResults.size(), counter);
}
@Test
public void testBlockBoundary() throws ExecException {
MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
Properties properties = cluster.getProperties();
// This tests PigStorage loader with records exactly
// on the boundary of the file blocks.
Properties props = new Properties();
for (Entry<Object, Object> entry : properties.entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
props.setProperty(MRConfiguration.MAX_SPLIT_SIZE, "20");
Util.resetStateForExecModeSwitch();
PigServer pigServer = new PigServer(cluster.getExecType(), props);
String[] inputs = {
"abcdefgh1", "abcdefgh2", "abcdefgh3",
"abcdefgh4", "abcdefgh5", "abcdefgh6",
"abcdefgh7", "abcdefgh8", "abcdefgh9"
};
String[] expected = {
"(abcdefgh1)", "(abcdefgh2)", "(abcdefgh3)",
"(abcdefgh4)", "(abcdefgh5)", "(abcdefgh6)",
"(abcdefgh7)", "(abcdefgh8)", "(abcdefgh9)"
};
System.setProperty("pig.overrideBlockSize", "20");
String INPUT_FILE = "tmp.txt";
try {
PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
for (String s : inputs) {
w.println(s);
}
w.close();
Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
pigServer.registerQuery("a = load '" + INPUT_FILE + "';");
Iterator<Tuple> iter = pigServer.openIterator("a");
int counter = 0;
while (iter.hasNext()){
assertEquals(expected[counter++].toString(), iter.next().toString());
}
assertEquals(expected.length, counter);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
} finally {
new File(INPUT_FILE).delete();
try {
Util.deleteFile(cluster, INPUT_FILE);
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}
cluster.shutDown();
}
}
/**
* Test to verify that PigStorage works fine in the following scenario:
* The column prune optimization determines only columns 2 and 3 are needed
* and there are records in the data which have only 1 column (malformed data).
* In this case, PigStorage should return an empty tuple to represent columns
* 2 and 3 and {@link POProject} would handle catching any
* {@link IndexOutOfBoundsException} resulting from accessing a field in the
* tuple and substitute a null.
*/
@Test
public void testPruneColumnsWithMissingFields() throws IOException {
String inputFileName = "TestPigStorage-testPruneColumnsWithMissingFields-input.txt";
Util.createLocalInputFile(
inputFileName,
new String[] {"1\t2\t3", "4", "5\t6\t7"});
String script = "a = load '" + inputFileName + "' as (i:int, j:int, k:int);" +
"b = foreach a generate j, k;";
Util.registerMultiLineQuery(pig, script);
Iterator<Tuple> it = pig.openIterator("b");
assertEquals(Util.createTuple(new Integer[] { 2, 3}), it.next());
assertEquals(Util.createTuple(new Integer[] { null, null}), it.next());
assertEquals(Util.createTuple(new Integer[] { 6, 7}), it.next());
assertFalse(it.hasNext());
}
@Test
public void testPigStorageNoSchema() throws Exception {
//if the schema file does not exist, and '-schema' option is used
// it should result in an error
pigContext.connect();
String query = "a = LOAD '" + datadir + "originput' using PigStorage('\\t', '-schema') " +
"as (f1:chararray, f2:int);";
try{
pig.registerQuery(query);
pig.dumpSchema("a");
}catch(FrontendException ex){
assertEquals(ex.toString(), 1000, ex.getErrorCode());
return;
}
fail("no exception caught");
}
@Test
public void testPigStorageSchema() throws Exception {
pigContext.connect();
String query = "a = LOAD '" + datadir + "originput' using PigStorage('\\t') " +
"as (f1:chararray, f2:int);";
pig.registerQuery(query);
Schema origSchema = pig.dumpSchema("a");
pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
// aout now has a schema.
// Verify that loading a-out with no given schema produces
// the original schema.
pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t');");
Schema genSchema = pig.dumpSchema("b");
Assert.assertTrue("generated schema equals original" ,
Schema.equals(genSchema, origSchema, true, false));
// Verify that giving our own schema works
String [] aliases ={"foo", "bar"};
byte[] types = {DataType.INTEGER, DataType.LONG};
Schema newSchema = TypeCheckingTestUtil.genFlatSchema(
aliases,types);
pig.registerQuery("c = LOAD '" + datadir + "aout' using PigStorage('\\t', '-schema') "+
"as (foo:int, bar:long);");
Schema newGenSchema = pig.dumpSchema("c");
Assert.assertTrue("explicit schema overrides metadata",
Schema.equals(newSchema, newGenSchema, true, false));
// Verify that explicitly requesting no schema works
pig.registerQuery("d = LOAD '" + datadir + "aout' using PigStorage('\\t', '-noschema');");
genSchema = pig.dumpSchema("d");
assertNull(genSchema);
}
@Test
public void testPruneColumnsWithSchema() throws Exception {
pigContext.connect();
String query = "a = LOAD '" + datadir + "originput' using PigStorage(',') " +
"as (f1:chararray, f2:int);";
pig.registerQuery(query);
pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
// aout now has a schema.
// Verify that loaded data has the correct data type after the prune
pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t'); c = FOREACH b GENERATE f2;");
Iterator<Tuple> it = pig.openIterator("c");
Assert.assertTrue("results were produced", it.hasNext());
Tuple t = it.next();
Assert.assertTrue("data is correct type", t.get(0) instanceof Integer);
}
@Test
public void testSchemaConversion() throws Exception {
Util.createLocalInputFile(datadir + "originput2",
new String[] {"1", "2", "3", "2",
"5", "5", "8", "8",
"8", "9"});
pig.registerQuery("A = LOAD '" + datadir + "originput2' using PigStorage('\\t') " +
"as (f:int);");
pig.registerQuery("B = group A by f;");
Schema origSchema = pig.dumpSchema("B");
ResourceSchema rs1 = new ResourceSchema(origSchema);
pig.registerQuery("STORE B into '" + datadir + "bout' using PigStorage('\\t', '-schema');");
pig.registerQuery("C = LOAD '" + datadir + "bout' using PigStorage('\\t', '-schema');");
Schema genSchema = pig.dumpSchema("C");
ResourceSchema rs2 = new ResourceSchema(genSchema);
Assert.assertTrue("generated schema equals original" , ResourceSchema.equals(rs1, rs2));
pig.registerQuery("C1 = LOAD '" + datadir + "bout' as (a0:int, A: {t: (f:int) } );");
pig.registerQuery("D = foreach C1 generate a0, SUM(A);");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1,1L)",
"(2,4L)",
"(3,3L)",
"(5,10L)",
"(8,24L)",
"(9,9L)"
});
Iterator<Tuple> iter = pig.openIterator("D");
int counter = 0;
while (iter.hasNext()) {
Assert.assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
Assert.assertEquals(expectedResults.size(), counter);
}
@Test
public void testSchemaConversion2() throws Exception {
pig.registerQuery("A = LOAD '" + datadir + "originput' using PigStorage(',') " +
"as (f1:chararray, f2:int);");
pig.registerQuery("B = group A by f1;");
Schema origSchema = pig.dumpSchema("B");
ResourceSchema rs1 = new ResourceSchema(origSchema);
pig.registerQuery("STORE B into '" + datadir + "cout' using PigStorage('\\t', '-schema');");
pig.registerQuery("C = LOAD '" + datadir + "cout' using PigStorage('\\t', '-schema');");
Schema genSchema = pig.dumpSchema("C");
ResourceSchema rs2 = new ResourceSchema(genSchema);
Assert.assertTrue("generated schema equals original" , ResourceSchema.equals(rs1, rs2));
pig.registerQuery("C1 = LOAD '" + datadir + "cout' as (a0:chararray, A: {t: (f1:chararray, f2:int) } );");
pig.registerQuery("D = foreach C1 generate a0, SUM(A.f2);");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
new String[] {
"('A',23L)",
"('B',7L)",
"('C',11L)",
"('D',10L)"
});
Iterator<Tuple> iter = pig.openIterator("D");
int counter = 0;
while (iter.hasNext()) {
Assert.assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
Assert.assertEquals(expectedResults.size(), counter);
}
@Test
public void testSchemaDataNotMatchWITHCast() throws Exception {
pig.registerQuery("A = LOAD '" + datadir + "originput' using PigStorage(',') as (x:chararray);");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
new String[] {
"('A')",
"('B')",
"('C')",
"('D')",
"('A')",
"('B')",
"('C')",
"('A')",
"('D')",
"('A')",
});
assertAliasIs("A", expectedResults);
}
@Test
public void testSchemaDataNotMatchNOCast() throws Exception {
pig.registerQuery("A = LOAD '" + datadir + "originput' using PigStorage(',') as (x:bytearray);");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
new String[] {
"('A')",
"('B')",
"('C')",
"('D')",
"('A')",
"('B')",
"('C')",
"('A')",
"('D')",
"('A')",
});
assertAliasIs("A", expectedResults);
}
@Test
public void testSchemaDataNotMatchAsEXTRACoumns() throws Exception {
pig.registerQuery("A = LOAD '" + datadir + "originput' using PigStorage(',') as (x,y,z);");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
new String[] {
"('A',1,NULL)",
"('B',2,NULL)",
"('C',3,NULL)",
"('D',2,NULL)",
"('A',5,NULL)",
"('B',5,NULL)",
"('C',8,NULL)",
"('A',8,NULL)",
"('D',8,NULL)",
"('A',9,NULL)",
});
assertAliasIs("A", expectedResults);
}
/**
* See PIG-1830
* @throws IOException
*/
@Test
public void testByteArrayConversion() throws IOException {
Util.createLocalInputFile(datadir + "originput2",
new String[] {"peter\t1", "samir\t2", "michael\t4",
"peter\t2", "peter\t4", "samir\t1", "john\t"
});
Util.createLocalInputFile(datadir + ".pig_schema",
new String[] {
"{\"fields\":[{\"name\":\"name\",\"type\":55,\"schema\":null," +
"\"description\":\"autogenerated from Pig Field Schema\"}," +
"{\"name\":\"val\",\"type\":10,\"schema\":null,\"description\":"+
"\"autogenerated from Pig Field Schema\"}],\"version\":0," +
"\"sortKeys\":[],\"sortKeyOrders\":[]}"
});
pig.registerQuery("Events = LOAD '" + datadir + "originput2' USING PigStorage('\\t', '-schema');");
pig.registerQuery("Sessions = GROUP Events BY name;");
Iterator<Tuple> sessions = pig.openIterator("Sessions");
while (sessions.hasNext()) {
System.out.println(sessions.next());
}
}
// See PIG-1993
@Test
public void testColumnPrune() throws IOException {
Util.createLocalInputFile(datadir + "originput2",
new String[] {"peter\t1", "samir\t2", "michael\t4",
"peter\t2", "peter\t4", "samir\t1", "john\t"
});
Util.createLocalInputFile(datadir + ".pig_schema",
new String[] {
"{\"fields\":[{\"name\":\"name\",\"type\":55,\"schema\":null," +
"\"description\":\"autogenerated from Pig Field Schema\"}," +
"{\"name\":\"val\",\"type\":10,\"schema\":null,\"description\":"+
"\"autogenerated from Pig Field Schema\"}],\"version\":0," +
"\"sortKeys\":[],\"sortKeyOrders\":[]}"
});
pig.registerQuery("Events = LOAD '" + datadir + "originput2' USING PigStorage('\\t', '-schema');");
pig.registerQuery("EventsName = foreach Events generate name;");
Iterator<Tuple> sessions = pig.openIterator("EventsName");
sessions.next().toString().equals("(1)");
sessions.next().toString().equals("(2)");
sessions.next().toString().equals("(4)");
sessions.next().toString().equals("(2)");
sessions.next().toString().equals("(4)");
sessions.next().toString().equals("(1)");
sessions.next().toString().equals("()");
Assert.assertFalse(sessions.hasNext());
}
@Test
public void testPigStorageSchemaHeader() throws Exception {
pigContext.connect();
String query = "a = LOAD '" + datadir + "originput' using PigStorage(',') " +
"as (foo:chararray, bar:int);";
pig.registerQuery(query);
pig.registerQuery("a2 = FOREACH a GENERATE *, 1;"); // adds a field with a null schema name
pig.registerQuery("STORE a2 into '" + datadir + "nout' using PigStorage('\\t', '-schema');");
String outPath = FileLocalizer.fullPath(datadir + "nout/.pig_header",
pig.getPigContext());
Assert.assertTrue(FileLocalizer.fileExists(outPath,
pig.getPigContext()));
String[] header = Util.readOutput(pig.getPigContext(), outPath);
Assert.assertArrayEquals("Headers are not the same.", new String[] {"foo\tbar\t$2"}, header);
}
@Test
public void testPigStorageSchemaHeaderDelimiter() throws Exception {
pigContext.connect();
String query = "a = LOAD '" + datadir + "originput' using PigStorage(',') " +
"as (foo:chararray, bar:int);";
pig.registerQuery(query);
pig.registerQuery("STORE a into '" + datadir + "dout' using PigStorage('#', '-schema');");
pig.registerQuery("STORE a into '" + datadir + "eout' using PigStorage('\\t', '-schema');");
String outPath = FileLocalizer.fullPath(datadir + "dout/.pig_header",
pig.getPigContext());
Assert.assertTrue(FileLocalizer.fileExists(outPath,
pig.getPigContext()));
String[] header = Util.readOutput(pig.getPigContext(), outPath);
Assert.assertArrayEquals("Headers are not the same.", new String[] {"foo#bar"}, header);
outPath = FileLocalizer.fullPath(datadir + "eout/.pig_header",
pig.getPigContext());
Assert.assertTrue(FileLocalizer.fileExists(outPath,
pig.getPigContext()));
header = Util.readOutput(pig.getPigContext(), outPath);
Assert.assertArrayEquals("Headers are not the same.", new String[] {"foo\tbar"}, header);
}
private void putInputFile(String filename) throws IOException {
Util.createLocalInputFile(filename, new String[] {});
}
private void putSchemaFile(String schemaFilename, ResourceSchema testSchema) throws JsonGenerationException, JsonMappingException, IOException {
new ObjectMapper().writeValue(new File(schemaFilename), testSchema);
}
@SuppressWarnings("deprecation")
@Test
public void testPigStorageSchemaSearch() throws Exception {
String globtestdir = "build/test/tmpglobbingdata/";
ResourceSchema testSchema = new ResourceSchema(Utils.parseSchema("a0:chararray"));
PigStorage pigStorage = new PigStorage();
pigContext.connect();
try{
Util.deleteDirectory(new File(datadir));
pig.mkdirs(globtestdir+"a");
pig.mkdirs(globtestdir+"a/a0");
putInputFile(globtestdir+"a/a0/input");
pig.mkdirs(globtestdir+"a/b0");
putInputFile(globtestdir+"a/b0/input");
pig.mkdirs(globtestdir+"b");
} catch (IOException e) {};
Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
// if schema file is not found, schema is null
ResourceSchema schema = pigStorage.getSchema(globtestdir, new Job(conf));
Assert.assertTrue(schema==null);
// if .pig_schema is in the input directory
putSchemaFile(globtestdir+"a/a0/.pig_schema", testSchema);
schema = pigStorage.getSchema(globtestdir+"a/a0", new Job(conf));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
new File(globtestdir+"a/a0/.pig_schema").delete();
// .pig_schema in one of globStatus returned directory
putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
schema = pigStorage.getSchema(globtestdir+"*", new Job(conf));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
new File(globtestdir+"a/.pig_schema").delete();
putSchemaFile(globtestdir+"b/.pig_schema", testSchema);
schema = pigStorage.getSchema(globtestdir+"*", new Job(conf));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
new File(globtestdir+"b/.pig_schema").delete();
// if .pig_schema is deep in the globbing, it will not get used
putSchemaFile(globtestdir+"a/a0/.pig_schema", testSchema);
schema = pigStorage.getSchema(globtestdir+"*", new Job(conf));
Assert.assertTrue(schema==null);
putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
schema = pigStorage.getSchema(globtestdir+"*", new Job(conf));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
new File(globtestdir+"a/a0/.pig_schema").delete();
new File(globtestdir+"a/.pig_schema").delete();
pigStorage = new PigStorage("\t", "-schema");
putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
schema = pigStorage.getSchema(globtestdir+"{a,b}", new Job(conf));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
}
/**
* This is for testing source tagging option on PigStorage. When a user
* specifies '-tagFile' as an option, PigStorage must prepend the input
* source path to the tuple and "INPUT_FILE_NAME" to schema.
*
* @throws Exception
*/
@Test
public void testPigStorageSourceTagSchema() throws Exception {
pigContext.connect();
String query = "a = LOAD '" + datadir + "originput' using PigStorage('\\t') " +
"as (f1:chararray, f2:int);";
pig.registerQuery(query);
pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
// aout now has a schema.
// Verify that loading a-out with '-tagFile' produces
// the original schema, and prepends 'INPUT_FILE_NAME' to
// original schema.
pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagFile');");
Schema genSchema = pig.dumpSchema("b");
String[] fileAliases = {"INPUT_FILE_NAME", "f1", "f2"};
byte[] fileTypes = {DataType.CHARARRAY, DataType.CHARARRAY, DataType.INTEGER};
Schema newSchema = TypeCheckingTestUtil.genFlatSchema(
fileAliases,fileTypes);
Assert.assertTrue("schema with -tagFile preprends INPUT_FILE_NAME",
Schema.equals(newSchema, genSchema, true, false));
// Verify that loading a-out with '-tagPath' produces
// the original schema, and prepends 'INPUT_FILE_PATH' to
// original schema.
pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagPath');");
genSchema = pig.dumpSchema("b");
String[] pathAliases = {"INPUT_FILE_PATH", "f1", "f2"};
byte[] pathTypes = {DataType.CHARARRAY, DataType.CHARARRAY, DataType.INTEGER};
newSchema = TypeCheckingTestUtil.genFlatSchema(pathAliases,pathTypes);
Assert.assertTrue("schema with -tagPath preprends INPUT_FILE_PATH",
Schema.equals(newSchema, genSchema, true, false));
// Verify that explicitly requesting no schema works
pig.registerQuery("d = LOAD '" + datadir + "aout' using PigStorage('\\t', '-noschema');");
genSchema = pig.dumpSchema("d");
assertNull(genSchema);
// Verify specifying your own schema works
pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagFile') " +
"as (input_file:chararray, foo:chararray, bar:int);");
genSchema = pig.dumpSchema("b");
String[] newAliases = {"input_file", "foo", "bar"};
byte[] newTypes = {DataType.CHARARRAY, DataType.CHARARRAY, DataType.INTEGER};
newSchema = TypeCheckingTestUtil.genFlatSchema(newAliases,newTypes);
Assert.assertTrue("explicit schema overrides metadata",
Schema.equals(newSchema, genSchema, true, false));
}
@Test
public void testPigStorageSourceTagValue() throws Exception {
final String storeFileName = "part-m-00000";
pigContext.connect();
String query = "a = LOAD '" + datadir + "' using PigStorage('\\t') " +
"as (f1:chararray, f2:int);";
pig.registerQuery(query);
// Storing in 'aout' directory will store contents in part-m-00000
pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
// Verify input source tag is present when using -tagFile or -tagPath
pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagFile');");
pig.registerQuery("c = foreach b generate INPUT_FILE_NAME;");
Iterator<Tuple> iter = pig.openIterator("c");
while(iter.hasNext()) {
Tuple tuple = iter.next();
String inputFileName = (String)tuple.get(0);
assertEquals("tagFile value must be part-m-00000", inputFileName, storeFileName);
}
}
@Test
public void testIncompleteDataWithPigSchema() throws Exception {
Data data = resetData(pig);
String schema = "{\"fields\":[{\"name\":\"x\",\"type\":10,\"schema\":null},"
+ "{\"name\":\"y\",\"type\":55,\"schema\":null},"
+ "{\"name\":\"z\",\"type\":55,\"schema\":null}],"
+ "\"version\":0,\"sortKeys\":[],\"sortKeyOrders\":[]}";
File parent = new File(datadir, "incomplete_data_with_pig_schema_2");
parent.deleteOnExit();
File inputDir = new File(parent, "input");
inputDir.mkdirs();
File inputSchemaFile = new File(inputDir, ".pig_schema");
Util.writeToFile(inputSchemaFile, new String[] {schema});
File inputFile = new File(inputDir, "data");
Util.writeToFile(inputFile, new String[]{"1"});
pig.registerQuery("a = load '"+Util.encodeEscape(inputDir.getAbsolutePath())+"';");
pig.registerQuery("store a into 'actual' using mock.Storage();");
data.set("expected", tuple(1, null, null));
Assert.assertEquals(data.get("expected"), data.get("actual"));
// Now, test with prune
data = resetData(pig);
data.set("expected", tuple(1, null));
pig.registerQuery("a = load '"+Util.encodeEscape(inputDir.getAbsolutePath())+"'; b = foreach a generate x, z;");
pig.registerQuery("store b into 'actual' using mock.Storage();");
Assert.assertEquals(data.get("expected"), data.get("actual"));
// TODO: TypeCaster should be adding a cast for this case but it always uses the file schema
// data = resetData(pig);
// data.set("expected", tuple(new DataByteArray("1"), null));
// pig.registerQuery("a = load '"+Util.encodeEscape(inputDir.getAbsolutePath())+"' as (x: bytearray, y:bytearray, z:bytearray);");
// pig.registerQuery("b = foreach a generate x, z;");
// pig.registerQuery("store b into 'actual' using mock.Storage();");
// Assert.assertEquals(data.get("expected"), data.get("actual"));
schema = "{\"fields\":[{\"name\":\"x\",\"type\":50,\"schema\":null},"
+ "{\"name\":\"y\",\"type\":50,\"schema\":null},"
+ "{\"name\":\"z\",\"type\":50,\"schema\":null}],"
+ "\"version\":0,\"sortKeys\":[],\"sortKeyOrders\":[]}";
Util.writeToFile(inputSchemaFile, new String[] {schema});
data = resetData(pig);
data.set("expected", tuple(new DataByteArray("1"), null));
pig.registerQuery("a = load '"+Util.encodeEscape(inputDir.getAbsolutePath())+"' as (x: bytearray, y:bytearray, z:bytearray);");
pig.registerQuery("b = foreach a generate x, z;");
pig.registerQuery("store b into 'actual' using mock.Storage();");
Assert.assertEquals(data.get("expected"), data.get("actual"));
}
@Test
public void testIncompleteDataNoPigSchema() throws Exception {
File inputFile = new File(datadir, "incomplete_data_no_pigschema");
inputFile.deleteOnExit();
Util.writeToFile(inputFile, new String[]{"1\t2", "2\t3"});
Data data = resetData(pig);
String query = "A = LOAD '"+ Util.encodeEscape(inputFile.getAbsolutePath()) + "' as (x, y, z);"
+ "store A into 'actual' using mock.Storage();";
Util.registerMultiLineQuery(pig, query);
data.set("expected",
tuple(new DataByteArray("1"), new DataByteArray("2"), null),
tuple(new DataByteArray("2"), new DataByteArray("3"), null));
Assert.assertEquals(data.get("expected"), data.get("actual"));
data = resetData(pig);
query = "A = LOAD '"+ Util.encodeEscape(inputFile.getAbsolutePath())
+ "' using " + PigExtendedStorage.class.getName() + " as (x, y, z);"
+ "store A into 'actual' using mock.Storage();";
pig.registerQuery(query);
data.set("expected",
tuple(new DataByteArray("1"), new DataByteArray("2"), new DataByteArray("extracolumn")),
tuple(new DataByteArray("2"), new DataByteArray("3"), new DataByteArray("extracolumn")));
Assert.assertEquals(data.get("expected"), data.get("actual"));
}
public static class PigExtendedStorage extends PigStorage {
@Override
public Tuple getNext() throws IOException {
Tuple tuple = super.getNext();
tuple.append(new DataByteArray("extracolumn"));
return tuple;
}
}
@Test
public void testPigStorageSchemaWithOverwrite() throws Exception {
pigContext.connect();
String query = "a = LOAD '" + datadir
+ "originput' using PigStorage(',') "
+ "as (f1:chararray, f2:int);";
List<Tuple> expectedResults = Util
.getTuplesFromConstantTupleStrings(new String[] { "('A',1L)",
"('B',2L)", "('C',3L)", "('D',2L)", "('A',5L)",
"('B',5L)", "('C',8L)", "('A',8L)", "('D',8L)",
"('A',9L)", });
pig.registerQuery(query);
pig.store("a", datadir + "aout", "PigStorage(',')");
// below shouldn't fail & we should get the same result in the end
pig.store("a", datadir + "aout", "PigStorage(',', '--overwrite true')");
pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage(',');");
Iterator<Tuple> iter = pig.openIterator("b");
int counter = 0;
while (iter.hasNext()) {
String tuple = iter.next().toString();
Assert.assertEquals(expectedResults.get(counter++).toString(),
tuple);
}
Assert.assertEquals(expectedResults.size(), counter);
}
@Test(expected = Exception.class)
public void testPigStorageSchemaFailureWithoutOverwrite() throws Exception {
pigContext.connect();
String query = "a = LOAD '" + datadir + "originput' using PigStorage(',') "
+ "as (f1:chararray, f2:int);";
pig.registerQuery(query);
// should fail without the overwrite flag
pig.store("a", datadir + "aout", "PigStorage(',')");
pig.store("a", datadir + "aout", "PigStorage(',')");
}
@Test
public void testPigStroageSchemaWithMultipleSchema() throws Exception {
pigContext.connect();
String query = "A = LOAD '" + datadir + "originput' using PigStorage(',') as (f1:chararray, f2:int);"
+ "B = FOREACH A generate f1, f2, 3 as (f3:int);";
pig.registerQuery(query);
pig.store("A", datadir + "aout", "PigStorage('\\t', '-schema')");
pig.store("B", datadir + "bout", "PigStorage('\\t', '-schema')");
// We want to test the case when aout/.pig_schema is chosen for loading
// aout AND bout.
// Picking of schema is not deterministic given it's picked from a SET.
// For this test, we simply delete the other schema.
new File(datadir + "bout/.pig_schema" ).delete();
// Loading from 2 directories, each containing 2 fields and 3 fields
// respectively.
pig.registerQuery("C = LOAD '" + datadir + "aout," + datadir + "bout ' using PigStorage('\\t', '-schema');");
Schema a_schema = pig.dumpSchema("A");
Schema c_schema = pig.dumpSchema("C");
Assert.assertEquals("PigStorage schema should pick up the .pig_schema from A", a_schema, c_schema);
Iterator<Tuple> iter = pig.openIterator("C");
int counter = 0;
while (iter.hasNext()) {
Assert.assertEquals("All tuples should only contain 2 fields defined in schema",
2, iter.next().size());
counter++;
}
Assert.assertEquals(20, counter);
}
}