blob: ae97bf3eb71eb4c07fc4cab5ba5d3588d833a376 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.joda.time.DateTime;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleDefaultRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.util.ObjectSerializer;
import org.junit.Before;
import org.junit.Test;
public class TestPigTupleRawComparator {
private TupleFactory tf = TupleFactory.getInstance();
private PigTupleSortComparator comparator = new PigTupleSortComparator();
private PigTupleDefaultRawComparator oldComparator = new PigTupleDefaultRawComparator();
private List<Object> list;
private NullableTuple prototype;
private ByteArrayOutputStream baos1 = new ByteArrayOutputStream();
private ByteArrayOutputStream baos2 = new ByteArrayOutputStream();
private DataOutputStream dos1 = new DataOutputStream(baos1);
private DataOutputStream dos2 = new DataOutputStream(baos2);
private final static int TUPLE_NUMBER = (int) 1e3;
private final static int TIMES = (int) 1e5;
private final static int SEED = 123456789;
@Before
public void setUp() {
JobConf jobConf = new JobConf();
comparator.setConf(jobConf);
oldComparator.setConf(jobConf);
list = Arrays.<Object> asList(1f, 2, 3.0, 4l, (byte) 5, true,
new DataByteArray(new byte[] { 0x10, 0x2a, 0x5e }), "hello world!",
tf.newTuple(Arrays.<Object> asList(8.0, 9f, 10l, 11)), new DateTime(12L));
prototype = new NullableTuple(tf.newTuple(list));
baos1.reset();
baos2.reset();
}
@Test
public void testCompareEquals() throws IOException {
NullableTuple t = new NullableTuple(tf.newTuple(list));
int res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res == 0);
}
@Test
public void testCompareFloat() throws IOException {
list.set(0, (Float) list.get(0) - 1);
NullableTuple t = new NullableTuple(tf.newTuple(list));
int res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res > 0);
}
@Test
public void testCompareInt() throws IOException {
list.set(1, (Integer) list.get(1) + 1);
NullableTuple t = new NullableTuple(tf.newTuple(list));
int res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res < 0);
}
@Test
public void testCompareDouble() throws IOException {
list.set(2, (Double) list.get(2) + 0.1);
NullableTuple t = new NullableTuple(tf.newTuple(list));
int res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res < 0);
}
@Test
public void testCompareByte() throws IOException {
list.set(4, (Byte) list.get(4) + 1);
NullableTuple t = new NullableTuple(tf.newTuple(list));
int res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res < 0);
}
@Test
public void testCompareBoolean() throws IOException {
list.set(5, false);
NullableTuple t = new NullableTuple(tf.newTuple(list));
int res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res > 0);
}
@Test
public void testCompareByteArray() throws IOException {
list.set(6, new DataByteArray(new byte[] { 0x10, 0x1a }));
NullableTuple t = new NullableTuple(tf.newTuple(list));
int res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
list.set(6, new DataByteArray(new byte[] { 0x20 }));
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
// bytearray that will fit into BinInterSedes.TINYBYTEARRAY
String largeTinyStr = appendChars("abc", 'x', 255 - 10);
list.set(6, new DataByteArray(largeTinyStr));
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
//longest bytearray that will fit into BinInterSedes.TINYBYTEARRAY
largeTinyStr = appendChars("", 'x', 255);
list.set(6, new DataByteArray(largeTinyStr));
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
// bytearray that will fit into BinInterSedes.SMALLBYTEARRAY
String largeSmallStr = appendChars("abc", 'x', 65535 - 100);
list.set(6, new DataByteArray(largeSmallStr));
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
// bytearray that will fit into BinInterSedes.BYTEARRAY
String largeStr = appendChars("abc", 'x', 65535 + 10000);
list.set(6, new DataByteArray(largeStr));
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
}
@Test
public void testCompareCharArray() throws IOException {
list.set(7, "hello world!");
NullableTuple t = new NullableTuple(tf.newTuple(list));
int res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res == 0);
list.set(7, "hello worlc!");
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res > 0);
// chararray that will fit into BinInterSedes.SMALLCHARARRAY
String largeTinyString = appendChars("hello worlc!", 'x', 300);
list.set(7, largeTinyString);
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res > 0);
list.set(7, "hello worlz!");
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res < 0);
list.set(7, "hello");
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res > 0);
list.set(7, "hello world!?");
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res < 0);
}
private String appendChars(String str, char c, int rep) {
StringBuilder sb = new StringBuilder(str.length() + rep);
sb.append(str);
for(int i=0; i<rep; i++){
sb.append(c);
}
return sb.toString();
}
@Test
public void compareInnerTuples() throws IOException {
NullableTuple t = new NullableTuple(tf.newTuple(list));
int res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res == 0);
list.set(8, tf.newTuple(Arrays.<Object> asList(8.0, 9f, 10l, 12)));
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res < 0);
list.set(8, tf.newTuple(Arrays.<Object> asList(8.0, 9f, 9l, 12)));
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res > 0);
list.set(8, tf.newTuple(Arrays.<Object> asList(7.0, 9f, 9l, 12)));
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res > 0);
//Tuple that will fit into BinInterSedes.TINYTUPLE
Tuple tinyTuple = createLargeTuple(1, 200, tf);
list.set(8, tinyTuple);
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
//Tuple that will fit into BinInterSedes.SMALLTUPLE
Tuple smallTuple = createLargeTuple(1, 1000, tf);
list.set(8, smallTuple);
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
// DataType.LONG < DataType.DOUBLE
list.set(8, tf.newTuple(Arrays.<Object> asList(8l, 9f, 9l, 12)));
t = new NullableTuple(tf.newTuple(list));
res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res > 0);
// object after tuple
list = new ArrayList<Object>(list);
list.add(10);
NullableTuple t1 = new NullableTuple(tf.newTuple(list));
list.set(list.size() - 1, 11);
NullableTuple t2 = new NullableTuple(tf.newTuple(list));
res = compareHelper(t1, t2, comparator);
assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
assertTrue(res < 0);
// fancy tuple nesting
list.set(list.size() - 1, tf.newTuple(list));
t1 = new NullableTuple(tf.newTuple(list));
list.set(list.size() - 1, 10);
list.set(list.size() - 1, tf.newTuple(list));
t2 = new NullableTuple(tf.newTuple(list));
res = compareHelper(t1, t2, comparator);
assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
assertTrue(res > 0);
}
private Tuple createLargeTuple(int num, int repetitions, TupleFactory tf) {
ArrayList<Integer> ar = new ArrayList<Integer>(repetitions);
for(int i=0; i<repetitions; i++){
ar.add(i, num);
}
return tf.newTuple(ar);
}
@Test
public void testCompareDataBag() throws IOException {
list = new ArrayList<Object>(list);
list.add(new DefaultDataBag(Arrays.asList(tf.newTuple(Arrays.asList(0)))));
NullableTuple t1 = new NullableTuple(tf.newTuple(list));
list.set(list.size() - 1, new DefaultDataBag(Arrays.asList(tf.newTuple(Arrays.asList(1)))));
NullableTuple t2 = new NullableTuple(tf.newTuple(list));
int res = compareHelper(t1, t2, comparator);
assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
assertTrue(res < 0);
//bag that will fit into BinInterSedes.TINYBAG
DataBag largeBag = createLargeBag(200, tf);
t2 = new NullableTuple(tf.newTuple(largeBag));
res = compareHelper(t1, t2, comparator);
assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
//bag that will fit into BinInterSedes.SMALLBAG
largeBag = createLargeBag(3000, tf);
t2 = new NullableTuple(tf.newTuple(largeBag));
res = compareHelper(t1, t2, comparator);
assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
}
private DataBag createLargeBag(int size, TupleFactory tf) {
Tuple t = tf.newTuple(Arrays.asList(0));
ArrayList<Tuple> tuplist = new ArrayList<Tuple>(size);
for(int i=0; i<size; i++){
tuplist.add(t);
}
return new DefaultDataBag(tuplist);
}
@Test
public void testCompareMap() throws IOException {
list = new ArrayList<Object>(list);
list.add(Collections.singletonMap("pig", "scalability"));
NullableTuple t1 = new NullableTuple(tf.newTuple(list));
list.set(list.size() - 1, Collections.singletonMap("pig", "scalability"));
NullableTuple t2 = new NullableTuple(tf.newTuple(list));
int res = compareHelper(t1, t2, comparator);
assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
assertTrue(res == 0);
list.set(list.size() - 1, Collections.singletonMap("pigg", "scalability"));
t2 = new NullableTuple(tf.newTuple(list));
res = compareHelper(t1, t2, comparator);
assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
assertTrue(res < 0);
list.set(list.size() - 1, Collections.singletonMap("pig", "Scalability"));
t2 = new NullableTuple(tf.newTuple(list));
res = compareHelper(t1, t2, comparator);
assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
assertTrue(res > 0);
list.set(list.size() - 1, Collections.singletonMap("pii", "scalability"));
t2 = new NullableTuple(tf.newTuple(list));
res = compareHelper(t1, t2, comparator);
assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
assertTrue(res < 0);
// object after map
list.add(107);
t1 = new NullableTuple(tf.newTuple(list));
list.set(list.size() - 1, 108);
t2 = new NullableTuple(tf.newTuple(list));
res = compareHelper(t1, t2, comparator);
assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
assertTrue(res < 0);
}
@Test
public void testCompareDateTime() throws IOException {
list.set(9, ((DateTime) list.get(9)).plus(1L));
NullableTuple t = new NullableTuple(tf.newTuple(list));
int res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res < 0);
}
@Test
public void testCompareDiffertTypes() throws IOException {
// DataType.INTEGER < DataType.LONG
list.set(3, 4);
NullableTuple t = new NullableTuple(tf.newTuple(list));
int res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res > 0);
}
@Test
public void testCompareDifferentSizes() throws IOException {
list = new ArrayList<Object>(list);
// this object should be never get into the comparison loop
list.add(new DefaultDataBag());
NullableTuple t = new NullableTuple(tf.newTuple(list));
int res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res < 0);
}
@Test
public void testRandomTuples() throws IOException {
Random rand = new Random(SEED);
for (int i = 0; i < TUPLE_NUMBER; i++) {
NullableTuple t = new NullableTuple(getRandomTuple(rand));
int res = compareHelper(prototype, t, comparator);
assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
}
}
@Test
public void testSortOrder() throws IOException {
// prototype < t but we use inverse sort order
list.set(2, (Double) list.get(2) + 0.1);
NullableTuple t = new NullableTuple(tf.newTuple(list));
JobConf jobConf = new JobConf();
jobConf.set("pig.sortOrder", ObjectSerializer.serialize(new boolean[] {false}));
comparator.setConf(jobConf);
int res = compareHelper(prototype, t, comparator);
assertEquals(-1 * Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res > 0);
jobConf.set("pig.sortOrder", ObjectSerializer.serialize(new boolean[] {true,true,false,true,true,true,true,true,true}));
comparator.setConf(jobConf);
res = compareHelper(prototype, t, comparator);
assertEquals(-1 * Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res > 0);
}
private Tuple getRandomTuple(Random rand) throws IOException {
int pos = rand.nextInt(list.size());
Tuple t = tf.newTuple(list);
switch (pos) {
case 0:
t.set(pos, rand.nextFloat());
break;
case 1:
t.set(pos, rand.nextInt());
break;
case 2:
t.set(pos, rand.nextDouble());
break;
case 3:
t.set(pos, rand.nextLong());
break;
case 4:
t.set(pos, (byte) rand.nextInt());
break;
case 5:
t.set(pos, rand.nextBoolean());
break;
case 6:
byte[] ba = new byte[3];
rand.nextBytes(ba);
t.set(pos, new DataByteArray(ba));
break;
case 7:
int length = rand.nextInt(15);
String s = randomString(length, rand);
t.set(pos, s);
break;
case 8:
length = rand.nextInt(6);
t.set(pos, getRandomTuple(rand));
break;
case 9:
t.set(pos, new DateTime(rand.nextLong()));
break;
default:
}
return t;
}
private int compareHelper(NullableTuple t1, NullableTuple t2, RawComparator comparator) throws IOException {
t1.write(dos1);
t2.write(dos2);
byte[] b1 = baos1.toByteArray();
byte[] b2 = baos2.toByteArray();
baos1.reset();
baos2.reset();
return comparator.compare(b1, 0, b1.length, b2, 0, b2.length);
}
private static final String AB = "0123456789abcdefghijklmnopqrstuwxyz!?-_ ";
private String randomString(int length, Random rand) {
StringBuilder sb = new StringBuilder(length);
for (int i = 0; i < length; i++)
sb.append(AB.charAt(rand.nextInt(AB.length())));
return sb.toString();
}
public static void main(String[] args) throws Exception {
long before, after;
Random rand = new Random(SEED);
TestPigTupleRawComparator test = new TestPigTupleRawComparator();
test.setUp();
byte[][] toCompare1 = new byte[TUPLE_NUMBER][];
byte[][] toCompare2 = new byte[TUPLE_NUMBER][];
NullableTuple t;
for (int i = 0; i < TUPLE_NUMBER; i++) {
t = new NullableTuple(test.getRandomTuple(rand));
t.write(test.dos1);
toCompare1[i] = test.baos1.toByteArray();
}
for (int i = 0; i < TUPLE_NUMBER; i++) {
t = new NullableTuple(test.getRandomTuple(rand));
t.write(test.dos2);
toCompare2[i] = test.baos2.toByteArray();
}
before = System.currentTimeMillis();
for (int loop = 0; loop < TIMES; loop++) {
for (int i = 0; i < TUPLE_NUMBER; i++) {
test.comparator.compare(toCompare1[i], 0, toCompare1[i].length, toCompare2[i], 0, toCompare2[i].length);
}
}
after = System.currentTimeMillis();
System.out.println("Raw Version - elapsed time: " + Long.toString(after - before) + " milliseconds");
before = System.currentTimeMillis();
for (int loop = 0; loop < TIMES; loop++) {
for (int i = 0; i < TUPLE_NUMBER; i++) {
test.oldComparator.compare(toCompare1[i], 0, toCompare1[i].length, toCompare2[i], 0,
toCompare2[i].length);
}
}
after = System.currentTimeMillis();
System.out.println("Old Version - elapsed time: " + Long.toString(after - before) + " milliseconds");
}
}