blob: d7d5412d640f1ef772eca57cc5ee33e6c7c348b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.DateTimeWritable;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.data.BinSedesTuple;
import org.apache.pig.data.BinSedesTupleFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.TestHelper;
import org.joda.time.DateTime;
import org.junit.Test;
public class TestPackage {
private static final TupleFactory binfactory = BinSedesTupleFactory.getInstance();
private void runTest(Object key, boolean inner[], byte keyType) throws ExecException,
IOException {
Random r = new Random();
DataBag db1 = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
DataBag db2 = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
List<NullableTuple> db = new ArrayList<NullableTuple>(200);
Iterator<Tuple> db1Iter = db1.iterator();
if (!inner[0]) {
while (db1Iter.hasNext()) {
NullableTuple it = new NullableTuple(db1Iter.next());
it.setIndex((byte)0);
db.add(it);
}
}
Iterator<Tuple> db2Iter = db2.iterator();
while (db2Iter.hasNext()) {
NullableTuple it = new NullableTuple(db2Iter.next());
it.setIndex((byte)1);
db.add(it);
}
// ITIterator iti = new TestPackage.ITIterator(db.iterator());
POPackage pop = new POPackage(new OperatorKey("", r.nextLong()));
pop.setNumInps(2);
pop.getPkgr().setInner(inner);
PigNullableWritable k = HDataType.getWritableComparableTypes(key, keyType);
pop.attachInput(k, db.iterator());
if (keyType != DataType.BAG) {
// test serialization
NullablePartitionWritable wr;
if (keyType == DataType.TUPLE) {
BinSedesTuple tup = (BinSedesTuple) binfactory.newTupleNoCopy(((Tuple) k.getValueAsPigType()).getAll());
wr = new NullablePartitionWritable(new NullableTuple(tup));
} else {
wr = new NullablePartitionWritable(k);
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
wr.write(out);
byte[] arr = baos.toByteArray();
ByteArrayInputStream bais = new ByteArrayInputStream(arr);
DataInputStream in = new DataInputStream(bais);
NullablePartitionWritable re = new NullablePartitionWritable();
re.readFields(in);
assertEquals(re, wr);
}
// we are not doing any optimization to remove
// parts of the "value" which are present in the "key" in this
// unit test - so set up the "keyInfo" accordingly in
// the POPackage
Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo =
new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
Pair<Boolean, Map<Integer, Integer>> p =
new Pair<Boolean, Map<Integer, Integer>>(false, new HashMap<Integer, Integer>());
keyInfo.put(0, p);
keyInfo.put(1, p);
pop.getPkgr().setKeyInfo(keyInfo);
Tuple t = null;
Result res = null;
res = pop.getNextTuple();
if (res.returnStatus == POStatus.STATUS_NULL && inner[0])
return;
assertEquals(POStatus.STATUS_OK, res.returnStatus);
t = (Tuple)res.result;
Object outKey = t.get(0);
DataBag outDb1 = (DataBag)t.get(1);
DataBag outDb2 = (DataBag)t.get(2);
assertEquals(key, outKey);
assertTrue(TestHelper.compareBags(db1, outDb1));
assertTrue(TestHelper.compareBags(db2, outDb2));
}
/**
* To show that it does not have any type specific
* code
*/
private void pickTest(byte t, boolean[] inner) throws ExecException, IOException {
Random r = new Random();
switch (t) {
case DataType.BAG:
runTest(GenRandomData.genRandSmallTupDataBag(r, 10, 100), inner, DataType.BAG);
break;
case DataType.BOOLEAN:
runTest(r.nextBoolean(), inner, DataType.BOOLEAN);
break;
case DataType.BYTEARRAY:
runTest(GenRandomData.genRandDBA(r), inner, DataType.BYTEARRAY);
break;
case DataType.BIGCHARARRAY: {
String s = GenRandomData.genRandString(r);
for (; s.length() < 65535;) {
s += GenRandomData.genRandString(r);
}
runTest(s, inner, DataType.CHARARRAY);
break;
}
case DataType.CHARARRAY:
runTest(GenRandomData.genRandString(r), inner, DataType.CHARARRAY);
break;
case DataType.DOUBLE:
runTest(r.nextDouble(), inner, DataType.DOUBLE);
break;
case DataType.FLOAT:
runTest(r.nextFloat(), inner, DataType.FLOAT);
break;
case DataType.INTEGER:
runTest(r.nextInt(), inner, DataType.INTEGER);
break;
case DataType.LONG:
runTest(r.nextLong(), inner, DataType.LONG);
break;
case DataType.DATETIME:
DateTimeWritable.setupAvailableZoneIds();
runTest(new DateTime(r.nextLong()), inner, DataType.DATETIME);
break;
case DataType.MAP:
case DataType.INTERNALMAP:
case DataType.BYTE:
return; // map not key type
case DataType.TUPLE:
runTest(GenRandomData.genRandSmallBagTuple(r, 10, 100), inner, DataType.TUPLE);
break;
case DataType.BIGINTEGER:
runTest(new BigInteger(256, r), inner, DataType.BIGINTEGER);
break;
case DataType.BIGDECIMAL:
runTest(new BigDecimal(r.nextDouble()), inner, DataType.BIGDECIMAL);
break;
default:
fail("No test case for type " + DataType.findTypeName(t));
}
}
@Test
public void testOperator() throws ExecException, IOException {
byte[] types = DataType.genAllTypes();
for (byte b : types) {
if (b == DataType.GENERIC_WRITABLECOMPARABLE) {
// genericwritablecomparable is only used in
// merge join which we will not test here
continue;
}
System.out.println("Type " + DataType.findTypeName(b));
int NUM_TRIALS = 10;
boolean[] inner1 = { false, false };
for (int i = 0; i < NUM_TRIALS; i++)
pickTest(b, inner1);
boolean[] inner2 = { true, false };
for (int i = 0; i < NUM_TRIALS; i++)
pickTest(b, inner2);
/*
* if (succ)
* System.out.println("Success!!");
* else
* System.out.println("Failure");
*/
}
}
}