blob: 544d462eea1cc41ec6e379c1b57bd5a2e0d99392 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.data;
import static org.apache.pig.builtin.mock.Storage.resetData;
import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.InterRecordReader;
import org.apache.pig.impl.io.InterRecordWriter;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.util.Utils;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Maps;
public class TestSchemaTuple {
private Properties props;
private Configuration conf;
private PigContext pigContext;
private static final BinInterSedes bis = new BinInterSedes();
@Before
public void perTestInitialize() {
props = new Properties();
props.setProperty(PigConfiguration.PIG_SCHEMA_TUPLE_ENABLED, "true");
conf = ConfigurationUtil.toConfiguration(props);
pigContext = new PigContext(ExecType.LOCAL, props);
}
@Test
public void testCompileAndResolve() throws Exception {
//frontend
Schema udfSchema = Utils.getSchemaFromString("a:int");
boolean isAppendable = false;
GenContext context = GenContext.UDF;
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
udfSchema = Utils.getSchemaFromString("a:long");
isAppendable = true;
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
udfSchema = Utils.getSchemaFromString("a:chararray,(a:chararray)");
isAppendable = false;
context = GenContext.FOREACH;
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
udfSchema = Utils.getSchemaFromString("a:int,(a:int,(a:int,(a:int,(a:int,(a:int,(a:int))))))");
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
udfSchema = Utils.getSchemaFromString("((a:int,b:int),(a:int,b:int),(a:int,b:int)),((a:int,b:int),(a:int,b:int),(a:int,b:int))");
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
udfSchema = Utils.getSchemaFromString(
"a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double, dt:datetime, bd: bigdecimal, bi: biginteger,"
+"(a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double, dt:datetime, bd: bigdecimal, bi: biginteger,"
+"(a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double, dt:datetime, bd: bigdecimal, bi: biginteger))");
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
udfSchema = Utils.getSchemaFromString("boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean");
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
udfSchema = Utils.getSchemaFromString(
"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger))");
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
isAppendable = true;
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
isAppendable = false;
udfSchema = Utils.getSchemaFromString("int, b:bag{(int,int,int)}");
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
isAppendable = false;
udfSchema = Utils.getSchemaFromString("int, m:map[(int,int,int)]");
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
isAppendable = false;
udfSchema = new Schema();
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
isAppendable = false;
udfSchema = new Schema(new FieldSchema(null, DataType.BAG));
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
// this compiles and "ships"
SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);
//backend
SchemaTupleBackend.initialize(conf, pigContext);
udfSchema = Utils.getSchemaFromString("a:int");
isAppendable = false;
context = GenContext.UDF;
SchemaTupleFactory tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
context = GenContext.MERGE_JOIN;
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
assertNull(tf);
udfSchema = Utils.getSchemaFromString("a:long");
context = GenContext.UDF;
isAppendable = true;
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
isAppendable = false;
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
assertNull(tf);
udfSchema = Utils.getSchemaFromString("a:chararray,(a:chararray)");
isAppendable = false;
context = GenContext.FOREACH;
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
udfSchema = Utils.getSchemaFromString("(a:chararray)");
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
assertNull(tf);
udfSchema = Utils.getSchemaFromString("a:int,(a:int,(a:int,(a:int,(a:int,(a:int,(a:int))))))");
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
udfSchema = Utils.getSchemaFromString("((a:int,b:int),(a:int,b:int),(a:int,b:int)),((a:int,b:int),(a:int,b:int),(a:int,b:int))");
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
udfSchema = Utils.getSchemaFromString(
"a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double, dt: datetime, bd: bigdecimal, bi: biginteger,"
+"(a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double, dt: datetime, bd: bigdecimal, bi: biginteger,"
+"(a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double, dt: datetime, bd: bigdecimal, bi: biginteger))");
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
udfSchema = Utils.getSchemaFromString("boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean");
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
udfSchema = Utils.getSchemaFromString(
"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger)),"
+"int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger,"
+"(int, long, chararray, boolean, bytearray, float, double, datetime, bigdecimal, biginteger))");
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
isAppendable = true;
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
isAppendable = false;
udfSchema = Utils.getSchemaFromString("int, b:bag{(int,int,int)}");
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
isAppendable = false;
udfSchema = Utils.getSchemaFromString("int, m:map[(int,int,int)]");
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
isAppendable = false;
udfSchema = new Schema();
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
assertNull(tf);
isAppendable = false;
udfSchema = new Schema(new FieldSchema(null, DataType.BAG));
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
}
private void putThroughPaces(SchemaTupleFactory tf, Schema udfSchema, boolean isAppendable) throws Exception {
assertNotNull(tf);
if (isAppendable) {
assertTrue(tf.newTuple() instanceof AppendableSchemaTuple);
}
testNotAppendable(tf, udfSchema);
if (isAppendable) {
testAppendable(tf, udfSchema);
}
}
private void testAppendable(SchemaTupleFactory tf, Schema udfSchema) {
SchemaTuple<?> st = tf.newTuple();
st.append("woah");
assertEquals(udfSchema.size() + 1, st.size());
}
private void testNotAppendable(SchemaTupleFactory tf, Schema udfSchema) throws Exception {
SchemaTuple<?> st = tf.newTuple();
Schema.equals(udfSchema, st.getSchema(), false, true);
assertEquals(udfSchema.size(), st.size());
testHashCodeEqualAsNormalTuple(tf);
shouldAllBeNull(tf);
copyThenCompare(tf);
testSerDe(tf);
testInterStorageSerDe(tf);
}
private void copyThenCompare(SchemaTupleFactory tf) throws ExecException {
SchemaTuple<?> st = tf.newTuple();
SchemaTuple<?> st2 = tf.newTuple();
fillWithData(st);
st2.set(st);
assertTrue(st.equals(st2));
assertEquals(st.compareTo(st2), 0);
st.set(0, null);
assertFalse(st.equals(st2));
assertEquals(st.compareTo(st2), -1);
assertTrue(st.isNull(0));
st2.set(0, null);
assertTrue(st.equals(st2));
assertEquals(st.compareTo(st2), 0);
}
/**
* This ensures that a fresh Tuple out of a TupleFactory
* will be full of null fields.
* @param tf a TupleFactory
* @throws ExecException
*/
private void shouldAllBeNull(SchemaTupleFactory tf) throws ExecException {
Tuple t = tf.newTuple();
for (Object o : t) {
assertNull(o);
}
for (int i = 0; i < t.size(); i++) {
assertNull(t.get(i));
assertTrue(t.isNull(i));
}
}
private void fillWithData(SchemaTuple<?> st) throws ExecException {
Schema udfSchema = st.getSchema();
int pos = 0;
for (FieldSchema fs : udfSchema.getFields()) {
Object val;
if (fs.type == DataType.TUPLE) {
val = SchemaTupleFactory
.getInstance(fs.schema, false, GenContext.FORCE_LOAD)
.newTuple();
fillWithData((SchemaTuple<?>)val);
} else {
val = randData(fs);
}
st.set(pos++, val);
}
}
private Random r = new Random(100L);
private Object randData(FieldSchema fs) throws ExecException {
switch (fs.type) {
case DataType.BIGDECIMAL: return new BigDecimal(r.nextDouble());
case DataType.BIGINTEGER: return new BigInteger(130, r);
case DataType.BOOLEAN: return r.nextBoolean();
case DataType.BYTEARRAY: return new DataByteArray(new BigInteger(130, r).toByteArray());
case DataType.CHARARRAY: return new BigInteger(130, r).toString(32);
case DataType.INTEGER: return r.nextInt();
case DataType.LONG: return r.nextLong();
case DataType.FLOAT: return r.nextFloat();
case DataType.DOUBLE: return r.nextDouble();
case DataType.DATETIME: return new DateTime(r.nextLong());
case DataType.BAG:
DataBag db = BagFactory.getInstance().newDefaultBag();
int sz = r.nextInt(100);
for (int i = 0; i < sz; i++) {
int tSz = r.nextInt(10);
Tuple t = TupleFactory.getInstance().newTuple(tSz);
for (int j = 0; j < tSz; j++) {
t.set(j, r.nextInt());
}
db.add(t);
}
return db;
case DataType.MAP:
Map<String, Object> map = Maps.newHashMap();
int keys = r.nextInt(10);
for (int i = 0; i < keys; i++) {
String key = (String) randData(new FieldSchema(null, DataType.CHARARRAY));
int values = r.nextInt(100);
for (int j = 0; j < values; j++) {
int tSz = r.nextInt(10);
Tuple t = TupleFactory.getInstance().newTuple(tSz);
for (int k = 0; k < tSz; k++) {
t.set(k, r.nextInt());
}
map.put(key, t);
}
}
return map;
default: throw new RuntimeException("Cannot generate data for given FieldSchema: " + fs);
}
}
public void testTypeAwareGetSetting(TupleFactory tf) throws ExecException {
SchemaTuple<?> st = (SchemaTuple<?>)tf.newTuple();
checkNullGetThrowsError(st);
}
private void checkNullGetThrowsError(SchemaTuple<?> st) throws ExecException {
Schema schema = st.getSchema();
int i = 0;
for (Schema.FieldSchema fs : schema.getFields()) {
boolean fieldIsNull = false;
try {
switch (fs.type) {
case DataType.BIGDECIMAL: st.getBigDecimal(i); break;
case DataType.BIGINTEGER: st.getBigInteger(i); break;
case DataType.BOOLEAN: st.getBoolean(i); break;
case DataType.BYTEARRAY: st.getBytes(i); break;
case DataType.CHARARRAY: st.getString(i); break;
case DataType.INTEGER: st.getInt(i); break;
case DataType.LONG: st.getLong(i); break;
case DataType.FLOAT: st.getFloat(i); break;
case DataType.DOUBLE: st.getDouble(i); break;
case DataType.DATETIME: st.getDateTime(i); break;
case DataType.TUPLE: st.getTuple(i); break;
case DataType.BAG: st.getDataBag(i); break;
case DataType.MAP: st.getMap(i); break;
default: throw new RuntimeException("Unsupported FieldSchema in SchemaTuple: " + fs);
}
} catch (FieldIsNullException e) {
fieldIsNull = true;
}
assertTrue(fieldIsNull);
i++;
}
}
/**
* This tests that InterStorage will work as expected with the Tuples generated by
* the given TupleFactory.
* @param tf
* @throws Exception
*/
public void testInterStorageSerDe(SchemaTupleFactory tf) throws Exception {
int sz = 4096;
List<Tuple> written = new ArrayList<Tuple>(sz);
File temp = File.createTempFile("tmp", "tmp");
temp.deleteOnExit();
FileOutputStream fos = new FileOutputStream(temp);
FSDataOutputStream dos = new FSDataOutputStream(fos, null);
InterRecordWriter writer = new InterRecordWriter(dos,
PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT,
PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT);
// This test does not cover the case of overlapping record bytes that may be present at the
// beginning of a split, for that see org.apache.pig.test.TestBinInterSedes#testInterStorageSyncMarker()
for (int i = 0; i < sz; i++) {
SchemaTuple<?> st = (SchemaTuple<?>)tf.newTuple();
fillWithData(st);
writer.write(null, st);
written.add(st);
}
writer.close(null);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
TaskAttemptID taskId = HadoopShims.createTaskAttemptID("jt", 1, true, 1, 1);
conf.set(MRConfiguration.TASK_ID, taskId.toString());
InputSplit is = new FileSplit(new Path(temp.getAbsolutePath()), 0, temp.length(), null);
InterRecordReader reader = new InterRecordReader(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT,
PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT);
reader.initialize(is, HadoopShims.createTaskAttemptContext(conf, taskId));
for (int i = 0; i < sz; i++) {
assertTrue(reader.nextKeyValue());
SchemaTuple<?> st = (SchemaTuple<?>)reader.getCurrentValue();
assertEquals(written.get(i), st);
}
reader.close();
}
public void testSerDe(SchemaTupleFactory tf) throws IOException {
int sz = 4096;
List<Tuple> written = new ArrayList<Tuple>(sz);
File temp = File.createTempFile("tmp", "tmp");
temp.deleteOnExit();
FileOutputStream fos = new FileOutputStream(temp);
DataOutput dos = new DataOutputStream(fos);
for (int i = 0; i < sz; i++) {
SchemaTuple<?> st = (SchemaTuple<?>)tf.newTuple();
fillWithData(st);
bis.writeDatum(dos, st);
written.add(st);
}
fos.close();
assertEquals(sz, written.size());
FileInputStream fis = new FileInputStream(temp);
DataInput din = new DataInputStream(fis);
for (int i = 0; i < sz; i++) {
SchemaTuple<?> st = (SchemaTuple<?>)bis.readDatum(din);
assertEquals(written.get(i), st);
}
fis.close();
}
@Test
public void testFRJoinWithSchemaTuple() throws Exception {
testJoinType("replicated", false);
}
@Test
public void testMergeJoinWithSchemaTuple() throws Exception {
testJoinType("merge", true);
}
public void testJoinType(String joinType, boolean preSort) throws Exception {
Properties props = PropertiesUtil.loadDefaultProperties();
props.setProperty("pig.schematuple", "true");
PigServer pigServer = new PigServer(ExecType.LOCAL, props);
Data data = resetData(pigServer);
data.set("foo1",
tuple(0, 0),
tuple(1, 1),
tuple(2, 2),
tuple(3, 3),
tuple(4, 4),
tuple(5, 5),
tuple(6, 6),
tuple(7, 7),
tuple(8, 8),
tuple(9, 9)
);
data.set("foo2",
tuple(0, 0),
tuple(1, 1),
tuple(2, 2),
tuple(3, 3),
tuple(4, 4),
tuple(5, 5),
tuple(6, 6),
tuple(7, 7),
tuple(8, 8),
tuple(9, 9)
);
pigServer.registerQuery("A = LOAD 'foo1' USING mock.Storage() as (x:int, y:int);");
pigServer.registerQuery("B = LOAD 'foo2' USING mock.Storage() as (x:int, y:int);");
if (preSort) {
pigServer.registerQuery("A = ORDER A BY x ASC;");
pigServer.registerQuery("B = ORDER B BY x ASC;");
}
pigServer.registerQuery("C = JOIN A by x, B by x using '"+joinType+"';");
pigServer.registerQuery("D = ORDER C BY $0 ASC;");
Iterator<Tuple> out = pigServer.openIterator("D");
for (int i = 0; i < 10; i++) {
if (!out.hasNext()) {
throw new Exception("Output should have had more elements! Failed on element: " + i);
}
assertEquals(tuple(i, i, i, i), out.next());
}
assertFalse(out.hasNext());
pigServer.registerQuery("STORE D INTO 'bar1' USING mock.Storage();");
pigServer.registerQuery("E = JOIN A by (x, y), B by (x, y) using '"+joinType+"';");
pigServer.registerQuery("F = ORDER E BY $0 ASC;");
pigServer.registerQuery("STORE F INTO 'bar2' USING mock.Storage();");
List<Tuple> bar1 = data.get("bar1");
List<Tuple> bar2 = data.get("bar2");
assertEquals("Output does not have enough elements! List: " + bar1, 10, bar1.size());
assertEquals("Output does not have enough elements! List: " + bar2, 10, bar2.size());
for (int i = 0; i < 10; i++) {
assertEquals(tuple(i, i, i, i), bar1.get(i));
assertEquals(tuple(i, i, i, i), bar2.get(i));
}
}
public void testHashCodeEqualAsNormalTuple(SchemaTupleFactory tf) {
SchemaTuple<?> st = tf.newTuple();
Tuple t = TupleFactory.getInstance().newTuple();
for (Object o : st) {
t.append(o);
}
assertEquals(t.hashCode(), st.hashCode());
assertEquals(st, t);
assertEquals(t, st);
}
}