blob: 2737f4dfd91d8ecb62bd7dbf87d43dc9ed40116a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.data;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.utils.SedesHelper;
import org.apache.pig.impl.util.ObjectSerializer;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
/**
* A class to handle reading and writing of intermediate results of data types. The serialization format used by this
* class more efficient than what was used in DataReaderWriter . The format used by the functions in this class is
* subject to change, so it should be used ONLY to store intermediate results within a pig query.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class BinInterSedes implements InterSedes {
private static final int ONE_MINUTE = 60000;
public static final byte BOOLEAN_TRUE = 0;
public static final byte BOOLEAN_FALSE = 1;
public static final byte BYTE = 2;
public static final byte INTEGER = 3;
// since boolean is not supported yet(v0.7) as external type, lot of people use int instead and some data with old
// schema is likely stay for some time. so optimizing for that case as well
public static final byte INTEGER_0 = 4;
public static final byte INTEGER_1 = 5;
public static final byte INTEGER_INSHORT = 6;
public static final byte INTEGER_INBYTE = 7;
public static final byte LONG = 8;
public static final byte FLOAT = 9;
public static final byte DOUBLE = 10;
public static final byte BYTEARRAY = 11;
public static final byte SMALLBYTEARRAY = 12;
public static final byte TINYBYTEARRAY = 13;
public static final byte CHARARRAY = 14;
public static final byte SMALLCHARARRAY = 15;
public static final byte MAP = 16;
public static final byte SMALLMAP = 17;
public static final byte TINYMAP = 18;
public static final byte TUPLE = 19;
public static final byte SMALLTUPLE = 20;
public static final byte TINYTUPLE = 21;
public static final byte BAG = 22;
public static final byte SMALLBAG = 23;
public static final byte TINYBAG = 24;
public static final byte GENERIC_WRITABLECOMPARABLE = 25;
public static final byte INTERNALMAP = 26;
public static final byte NULL = 27;
public static final byte SCHEMA_TUPLE_BYTE_INDEX = 28;
public static final byte SCHEMA_TUPLE_SHORT_INDEX = 29;
public static final byte SCHEMA_TUPLE = 30;
public static final byte LONG_INBYTE = 31;
public static final byte LONG_INSHORT = 32;
public static final byte LONG_ININT = 33;
public static final byte LONG_0 = 34;
public static final byte LONG_1 = 35;
public static final byte TUPLE_0 = 36;
public static final byte TUPLE_1 = 37;
public static final byte TUPLE_2 = 38;
public static final byte TUPLE_3 = 39;
public static final byte TUPLE_4 = 40;
public static final byte TUPLE_5 = 41;
public static final byte TUPLE_6 = 42;
public static final byte TUPLE_7 = 43;
public static final byte TUPLE_8 = 44;
public static final byte TUPLE_9 = 45;
public static final byte BIGINTEGER = 46;
public static final byte BIGDECIMAL = 47;
public static final byte DATETIME = 48;
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
private static BagFactory mBagFactory = BagFactory.getInstance();
public static final int UNSIGNED_SHORT_MAX = 65535;
public static final int UNSIGNED_BYTE_MAX = 255;
public static final String UTF8 = "UTF-8";
public Tuple readTuple(DataInput in, byte type) throws IOException {
switch (type) {
case TUPLE_0:
case TUPLE_1:
case TUPLE_2:
case TUPLE_3:
case TUPLE_4:
case TUPLE_5:
case TUPLE_6:
case TUPLE_7:
case TUPLE_8:
case TUPLE_9:
case TUPLE:
case TINYTUPLE:
case SMALLTUPLE:
return SedesHelper.readGenericTuple(in, type);
case SCHEMA_TUPLE_BYTE_INDEX:
case SCHEMA_TUPLE_SHORT_INDEX:
case SCHEMA_TUPLE:
return readSchemaTuple(in, type);
default:
throw new ExecException("Unknown Tuple type found in stream: " + type);
}
}
private Tuple readSchemaTuple(DataInput in, byte type) throws IOException {
int id;
switch (type) {
case (SCHEMA_TUPLE_BYTE_INDEX): id = in.readUnsignedByte(); break;
case (SCHEMA_TUPLE_SHORT_INDEX): id = in.readUnsignedShort(); break;
case (SCHEMA_TUPLE): id = in.readInt(); break;
default: throw new RuntimeException("Invalid type given to readSchemaTuple: " + type);
}
Tuple st = SchemaTupleFactory.getInstance(id).newTuple();
st.readFields(in);
return st;
}
public int getTupleSize(DataInput in, byte type) throws IOException {
int sz;
switch (type) {
case TUPLE_0:
return 0;
case TUPLE_1:
return 1;
case TUPLE_2:
return 2;
case TUPLE_3:
return 3;
case TUPLE_4:
return 4;
case TUPLE_5:
return 5;
case TUPLE_6:
return 6;
case TUPLE_7:
return 7;
case TUPLE_8:
return 8;
case TUPLE_9:
return 9;
case TINYTUPLE:
sz = in.readUnsignedByte();
break;
case SMALLTUPLE:
sz = in.readUnsignedShort();
break;
case TUPLE:
sz = in.readInt();
break;
default: {
int errCode = 2112;
String msg = "Unexpected datatype " + type + " while reading tuple" + "from binary file.";
throw new ExecException(msg, errCode, PigException.BUG);
}
}
// if sz == 0, we construct an "empty" tuple - presumably the writer wrote an empty tuple!
if (sz < 0) {
throw new IOException("Invalid size " + sz + " for a tuple");
}
return sz;
}
private DataBag readBag(DataInput in, byte type) throws IOException {
DataBag bag = mBagFactory.newDefaultBag();
long size;
// determine size of bag
switch (type) {
case TINYBAG:
size = in.readUnsignedByte();
break;
case SMALLBAG:
size = in.readUnsignedShort();
break;
case BAG:
size = in.readLong();
break;
default:
int errCode = 2219;
String msg = "Unexpected data while reading bag " + "from binary file.";
throw new ExecException(msg, errCode, PigException.BUG);
}
for (long i = 0; i < size; i++) {
try {
Object o = readDatum(in);
bag.add((Tuple) o);
} catch (ExecException ee) {
throw ee;
}
}
return bag;
}
private Map<String, Object> readMap(DataInput in, byte type) throws IOException {
int size;
switch (type) {
case TINYMAP:
size = in.readUnsignedByte();
break;
case SMALLMAP:
size = in.readUnsignedShort();
break;
case MAP:
size = in.readInt();
break;
default: {
int errCode = 2220;
String msg = "Unexpected data while reading map" + "from binary file.";
throw new ExecException(msg, errCode, PigException.BUG);
}
}
Map<String, Object> m = new HashMap<String, Object>(size);
for (int i = 0; i < size; i++) {
String key = (String) readDatum(in);
m.put(key, readDatum(in));
}
return m;
}
private InternalMap readInternalMap(DataInput in) throws IOException {
int size = in.readInt();
InternalMap m = new InternalMap(size);
for (int i = 0; i < size; i++) {
Object key = readDatum(in);
m.put(key, readDatum(in));
}
return m;
}
private WritableComparable readWritable(DataInput in) throws IOException {
String className = (String) readDatum(in);
// create the writeable class . It needs to have a default constructor
Class<?> objClass = null;
try {
objClass = Class.forName(className);
} catch (ClassNotFoundException e) {
throw new IOException("Could not find class " + className + ", while attempting to de-serialize it ", e);
}
WritableComparable writable = null;
try {
writable = (WritableComparable) objClass.newInstance();
} catch (Exception e) {
String msg = "Could create instance of class " + className
+ ", while attempting to de-serialize it. (no default constructor ?)";
throw new IOException(msg, e);
}
// read the fields of the object from DataInput
writable.readFields(in);
return writable;
}
/*
* (non-Javadoc)
*
* @see org.apache.pig.data.InterSedes#readDatum(java.io.DataInput)
*/
@Override
public Object readDatum(DataInput in) throws IOException, ExecException {
// Read the data type
byte b = in.readByte();
return readDatum(in, b);
}
private static Object readBytes(DataInput in, int size) throws IOException {
byte[] ba = new byte[size];
in.readFully(ba);
return new DataByteArray(ba);
}
/**
* Expects binInterSedes data types (NOT DataType types!)
* <p>
*
* @see org.apache.pig.data.InterSedes#readDatum(java.io.DataInput, byte)
*/
@Override
public Object readDatum(DataInput in, byte type) throws IOException, ExecException {
switch (type) {
case TUPLE_0:
case TUPLE_1:
case TUPLE_2:
case TUPLE_3:
case TUPLE_4:
case TUPLE_5:
case TUPLE_6:
case TUPLE_7:
case TUPLE_8:
case TUPLE_9:
case TUPLE:
case TINYTUPLE:
case SMALLTUPLE:
return SedesHelper.readGenericTuple(in, type);
case BAG:
case TINYBAG:
case SMALLBAG:
return readBag(in, type);
case MAP:
case TINYMAP:
case SMALLMAP:
return readMap(in, type);
case INTERNALMAP:
return readInternalMap(in);
case INTEGER_0:
return Integer.valueOf(0);
case INTEGER_1:
return Integer.valueOf(1);
case INTEGER_INBYTE:
return Integer.valueOf(in.readByte());
case INTEGER_INSHORT:
return Integer.valueOf(in.readShort());
case INTEGER:
return Integer.valueOf(in.readInt());
case LONG_0:
return Long.valueOf(0);
case LONG_1:
return Long.valueOf(1);
case LONG_INBYTE:
return Long.valueOf(in.readByte());
case LONG_INSHORT:
return Long.valueOf(in.readShort());
case LONG_ININT:
return Long.valueOf(in.readInt());
case LONG:
return Long.valueOf(in.readLong());
case DATETIME:
return new DateTime(in.readLong(), DateTimeZone.forOffsetMillis(in.readShort() * ONE_MINUTE));
case FLOAT:
return Float.valueOf(in.readFloat());
case DOUBLE:
return Double.valueOf(in.readDouble());
case BIGINTEGER:
return readBigInteger(in);
case BIGDECIMAL:
return readBigDecimal(in);
case BOOLEAN_TRUE:
return Boolean.valueOf(true);
case BOOLEAN_FALSE:
return Boolean.valueOf(false);
case BYTE:
return Byte.valueOf(in.readByte());
case TINYBYTEARRAY:
case SMALLBYTEARRAY:
case BYTEARRAY:
return new DataByteArray(SedesHelper.readBytes(in, type));
case CHARARRAY:
case SMALLCHARARRAY:
return SedesHelper.readChararray(in, type);
case GENERIC_WRITABLECOMPARABLE:
return readWritable(in);
case SCHEMA_TUPLE_BYTE_INDEX:
case SCHEMA_TUPLE_SHORT_INDEX:
case SCHEMA_TUPLE:
return readSchemaTuple(in, type);
case NULL:
return null;
default:
throw new RuntimeException("Unexpected data type " + type + " found in stream.");
}
}
private Object readBigDecimal(DataInput in) throws IOException {
return new BigDecimal((String)readDatum(in));
}
private Object readBigInteger(DataInput in) throws IOException {
return new BigInteger((String)readDatum(in));
}
private void writeBigInteger(DataOutput out, BigInteger bi) throws IOException {
writeDatum(out, bi.toString());
}
private void writeBigDecimal(DataOutput out, BigDecimal bd) throws IOException {
writeDatum(out, bd.toString());
}
/*
* (non-Javadoc)
*
* @see org.apache.pig.data.InterSedes#writeDatum(java.io.DataOutput, java.lang.Object)
*/
@Override
public void writeDatum(DataOutput out, Object val) throws IOException {
// Read the data type
byte type = DataType.findType(val);
writeDatum(out, val, type);
}
@Override
@SuppressWarnings("unchecked")
public void writeDatum(DataOutput out, Object val, byte type) throws IOException {
switch (type) {
case DataType.TUPLE:
writeTuple(out, (Tuple) val);
break;
case DataType.BAG:
writeBag(out, (DataBag) val);
break;
case DataType.MAP: {
writeMap(out, (Map<String, Object>) val);
break;
}
case DataType.INTERNALMAP: {
out.writeByte(INTERNALMAP);
Map<Object, Object> m = (Map<Object, Object>) val;
out.writeInt(m.size());
Iterator<Map.Entry<Object, Object>> i = m.entrySet().iterator();
while (i.hasNext()) {
Map.Entry<Object, Object> entry = i.next();
writeDatum(out, entry.getKey());
writeDatum(out, entry.getValue());
}
break;
}
case DataType.INTEGER:
int i = (Integer) val;
if (i == 0) {
out.writeByte(INTEGER_0);
} else if (i == 1) {
out.writeByte(INTEGER_1);
} else if (Byte.MIN_VALUE <= i && i <= Byte.MAX_VALUE) {
out.writeByte(INTEGER_INBYTE);
out.writeByte(i);
} else if (Short.MIN_VALUE <= i && i <= Short.MAX_VALUE) {
out.writeByte(INTEGER_INSHORT);
out.writeShort(i);
} else {
out.writeByte(INTEGER);
out.writeInt(i);
}
break;
case DataType.LONG:
long lng = (Long) val;
if (lng == 0) {
out.writeByte(LONG_0);
} else if (lng == 1) {
out.writeByte(LONG_1);
} else if (Byte.MIN_VALUE <= lng && lng <= Byte.MAX_VALUE) {
out.writeByte(LONG_INBYTE);
out.writeByte((int)lng);
} else if (Short.MIN_VALUE <= lng && lng <= Short.MAX_VALUE) {
out.writeByte(LONG_INSHORT);
out.writeShort((int)lng);
} else if (Integer.MIN_VALUE <= lng && lng <= Integer.MAX_VALUE) {
out.writeByte(LONG_ININT);
out.writeInt((int)lng);
} else {
out.writeByte(LONG);
out.writeLong(lng);
}
break;
case DataType.DATETIME:
out.writeByte(DATETIME);
out.writeLong(((DateTime) val).getMillis());
out.writeShort(((DateTime) val).getZone().getOffset((DateTime) val) / ONE_MINUTE);
break;
case DataType.FLOAT:
out.writeByte(FLOAT);
out.writeFloat((Float) val);
break;
case DataType.BIGINTEGER:
out.writeByte(BIGINTEGER);
writeBigInteger(out, (BigInteger)val);
break;
case DataType.BIGDECIMAL:
out.writeByte(BIGDECIMAL);
writeBigDecimal(out, (BigDecimal)val);
break;
case DataType.DOUBLE:
out.writeByte(DOUBLE);
out.writeDouble((Double) val);
break;
case DataType.BOOLEAN:
if ((Boolean) val)
out.writeByte(BOOLEAN_TRUE);
else
out.writeByte(BOOLEAN_FALSE);
break;
case DataType.BYTE:
out.writeByte(BYTE);
out.writeByte((Byte) val);
break;
case DataType.BYTEARRAY: {
DataByteArray bytes = (DataByteArray) val;
SedesHelper.writeBytes(out, bytes.mData);
break;
}
case DataType.CHARARRAY: {
SedesHelper.writeChararray(out, (String) val);
break;
}
case DataType.GENERIC_WRITABLECOMPARABLE:
out.writeByte(GENERIC_WRITABLECOMPARABLE);
// store the class name, so we know the class to create on read
writeDatum(out, val.getClass().getName());
Writable writable = (Writable) val;
writable.write(out);
break;
case DataType.NULL:
out.writeByte(NULL);
break;
default:
throw new RuntimeException("Unexpected data type " + val.getClass().getName() + " found in stream. " +
"Note only standard Pig type is supported when you output from UDF/LoadFunc");
}
}
private void writeMap(DataOutput out, Map<String, Object> m) throws IOException {
final int sz = m.size();
if (sz < UNSIGNED_BYTE_MAX) {
out.writeByte(TINYMAP);
out.writeByte(sz);
} else if (sz < UNSIGNED_SHORT_MAX) {
out.writeByte(SMALLMAP);
out.writeShort(sz);
} else {
out.writeByte(MAP);
out.writeInt(sz);
}
Iterator<Map.Entry<String, Object>> i = m.entrySet().iterator();
while (i.hasNext()) {
Map.Entry<String, Object> entry = i.next();
writeDatum(out, entry.getKey());
writeDatum(out, entry.getValue());
}
}
private void writeBag(DataOutput out, DataBag bag) throws IOException {
// We don't care whether this bag was sorted or distinct because
// using the iterator to write it will guarantee those things come
// correctly. And on the other end there'll be no reason to waste
// time re-sorting or re-applying distinct.
final long sz = bag.size();
if (sz < UNSIGNED_BYTE_MAX) {
out.writeByte(TINYBAG);
out.writeByte((int) sz);
} else if (sz < UNSIGNED_SHORT_MAX) {
out.writeByte(SMALLBAG);
out.writeShort((int) sz);
} else {
out.writeByte(BAG);
out.writeLong(sz);
}
Iterator<Tuple> it = bag.iterator();
while (it.hasNext()) {
writeTuple(out, it.next());
}
}
private void writeTuple(DataOutput out, Tuple t) throws IOException {
if (t instanceof TypeAwareTuple) {
t.write(out);
} else {
SedesHelper.writeGenericTuple(out, t);
}
}
/*
* (non-Javadoc)
*
* @see org.apache.pig.data.InterSedes#addColsToTuple(java.io.DataInput, org.apache.pig.data.Tuple)
*/
@Override
public void addColsToTuple(DataInput in, Tuple t) throws IOException {
byte type = in.readByte();
int sz = getTupleSize(in, type);
for (int i = 0; i < sz; i++) {
t.append(readDatum(in));
}
}
public static class BinInterSedesTupleRawComparator extends WritableComparator implements TupleRawComparator {
private final Log mLog = LogFactory.getLog(getClass());
private boolean[] mAsc;
private boolean[] mSecondaryAsc;
private static final boolean[] EMPTY_ASC = new boolean[] {};
private boolean mWholeTuple;
private boolean mIsSecondarySort;
private boolean mHasNullField;
private TupleFactory mFact;
private InterSedes mSedes;
public BinInterSedesTupleRawComparator() {
super(BinSedesTuple.class);
}
@Override
public Configuration getConf() {
return null;
}
@Override
public void setConf(Configuration conf) {
try {
mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
mSecondaryAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.secondarySortOrder"));
mIsSecondarySort = true;
} catch (IOException ioe) {
mLog.error("Unable to deserialize sort order object" + ioe.getMessage());
throw new RuntimeException(ioe);
}
if (mAsc == null) {
mAsc = new boolean[1];
mAsc[0] = true;
}
if (mSecondaryAsc == null) {
mIsSecondarySort = false;
}
// If there's only one entry in mAsc, it means it's for the whole
// tuple. So we can't be looking for each column.
mWholeTuple = (mAsc.length == 1);
mFact = TupleFactory.getInstance();
mSedes = InterSedesFactory.getInterSedesInstance();
}
@Override
public boolean hasComparedTupleNull() {
return mHasNullField;
}
/**
* Compare two BinSedesTuples as raw bytes. We assume the Tuples are NOT PigNullableWritable, so client classes
* need to deal with Null and Index.
*/
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int rc = 0;
ByteBuffer bb1 = ByteBuffer.wrap(b1, s1, l1);
ByteBuffer bb2 = ByteBuffer.wrap(b2, s2, l2);
try {
rc = compareBinSedesTuple(bb1, bb2);
} catch (IOException ioe) {
mLog.error("I/O error during tuple comparison: " + ioe.getMessage());
throw new RuntimeException(ioe.getMessage(), ioe);
}
return rc;
}
/**
* Compare two BinSedesTuples as raw bytes. We deal with sort ordering in this method.
*
* @throws IOException
*/
private int compareBinSedesTuple(ByteBuffer bb1, ByteBuffer bb2) throws IOException {
mHasNullField = false;
// store the position in case of deserialization
int s1 = bb1.position();
int s2 = bb2.position();
// treat the outermost tuple differently because we have to deal with sort order
int result = 0;
try {
// first compare sizes
int tsz1 = readSize(bb1);
int tsz2 = readSize(bb2);
if (tsz1 > tsz2)
return 1;
else if (tsz1 < tsz2)
return -1;
else {
// if sizes are the same, compare field by field
if (mIsSecondarySort) {
// we have a compound tuple key (main_key, secondary_key). Each key has its own sort order, so
// we have to deal with them separately. We delegate it to the first invocation of
// compareBinInterSedesDatum()
assert (tsz1 == 2); // main_key, secondary_key
result = compareBinInterSedesDatum(bb1, bb2, mAsc);
if (result == 0)
result = compareBinInterSedesDatum(bb1, bb2, mSecondaryAsc);
} else {
// we have just one tuple key, we deal with sort order here
for (int i = 0; i < tsz1 && result == 0; i++) {
// EMPTY_ASC is used to distinguish original calls from recursive ones (hack-ish)
result = compareBinInterSedesDatum(bb1, bb2, EMPTY_ASC);
// flip if the order is descending
if (result != 0) {
if (!mWholeTuple && !mAsc[i])
result *= -1;
else if (mWholeTuple && !mAsc[0])
result *= -1;
}
}
}
}
} catch (UnsupportedEncodingException uee) {
Tuple t1 = mFact.newTuple();
Tuple t2 = mFact.newTuple();
t1.readFields(new DataInputStream(new ByteArrayInputStream(bb1.array(), s1, bb1.limit())));
t2.readFields(new DataInputStream(new ByteArrayInputStream(bb2.array(), s2, bb2.limit())));
// delegate to compare()
result = compare(t1, t2);
}
return result;
}
private int compareBinInterSedesDatum(ByteBuffer bb1, ByteBuffer bb2, boolean[] asc) throws IOException {
int rc = 0;
byte type1, type2;
byte dt1 = bb1.get();
byte dt2 = bb2.get();
switch (dt1) {
case BinInterSedes.NULL: {
type1 = DataType.NULL;
type2 = getGeneralizedDataType(dt2);
if (asc != null) // we are scanning the top-level Tuple (original call)
mHasNullField = true;
if (type1 == type2)
rc = 0;
break;
}
case BinInterSedes.BOOLEAN_TRUE:
case BinInterSedes.BOOLEAN_FALSE: {
type1 = DataType.BOOLEAN;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2) {
// false < true
int bv1 = (dt1 == BinInterSedes.BOOLEAN_TRUE) ? 1 : 0;
int bv2 = (dt2 == BinInterSedes.BOOLEAN_TRUE) ? 1 : 0;
rc = bv1 - bv2;
}
break;
}
case BinInterSedes.BYTE: {
type1 = DataType.BYTE;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2) {
byte bv1 = bb1.get();
byte bv2 = bb2.get();
rc = (bv1 < bv2 ? -1 : (bv1 == bv2 ? 0 : 1));
}
break;
}
case BinInterSedes.INTEGER_0:
case BinInterSedes.INTEGER_1:
case BinInterSedes.INTEGER_INBYTE:
case BinInterSedes.INTEGER_INSHORT:
case BinInterSedes.INTEGER: {
type1 = DataType.INTEGER;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2) {
int iv1 = readInt(bb1, dt1);
int iv2 = readInt(bb2, dt2);
rc = (iv1 < iv2 ? -1 : (iv1 == iv2 ? 0 : 1));
}
break;
}
case BinInterSedes.LONG_0:
case BinInterSedes.LONG_1:
case BinInterSedes.LONG_INBYTE:
case BinInterSedes.LONG_INSHORT:
case BinInterSedes.LONG_ININT:
case BinInterSedes.LONG: {
type1 = DataType.LONG;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2) {
long lv1 = readLong(bb1, dt1);
long lv2 = readLong(bb2, dt2);
rc = (lv1 < lv2 ? -1 : (lv1 == lv2 ? 0 : 1));
}
break;
}
case BinInterSedes.DATETIME: {
type1 = DataType.DATETIME;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2) {
long lv1 = bb1.getLong();
bb1.position(bb1.position() + 2); // move cursor forward without read the timezone bytes
long lv2 = bb2.getLong();
bb2.position(bb2.position() + 2);
rc = (lv1 < lv2 ? -1 : (lv1 == lv2 ? 0 : 1));
}
break;
}
case BinInterSedes.FLOAT: {
type1 = DataType.FLOAT;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2) {
float fv1 = bb1.getFloat();
float fv2 = bb2.getFloat();
rc = Float.compare(fv1, fv2);
}
break;
}
case BinInterSedes.DOUBLE: {
type1 = DataType.DOUBLE;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2) {
double dv1 = bb1.getDouble();
double dv2 = bb2.getDouble();
rc = Double.compare(dv1, dv2);
}
break;
}
case BinInterSedes.BIGINTEGER: {
type1 = DataType.BIGINTEGER;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2) {
int sz1 = readSize(bb1, bb1.get());
int sz2 = readSize(bb2, bb2.get());
byte[] ca1 = new byte[sz1];
byte[] ca2 = new byte[sz2];
bb1.get(ca1);
bb2.get(ca2);
String str1 = null, str2 = null;
try {
str1 = new String(ca1, BinInterSedes.UTF8);
str2 = new String(ca2, BinInterSedes.UTF8);
} catch (UnsupportedEncodingException uee) {
mLog.warn("Unsupported string encoding", uee);
uee.printStackTrace();
}
if (str1 != null && str2 != null) {
rc = new BigInteger(str1).compareTo(new BigInteger(str2));
}
}
break;
}
case BinInterSedes.BIGDECIMAL: {
type1 = DataType.BIGDECIMAL;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2) {
int sz1 = readSize(bb1, bb1.get());
int sz2 = readSize(bb2, bb2.get());
byte[] ca1 = new byte[sz1];
byte[] ca2 = new byte[sz2];
bb1.get(ca1);
bb2.get(ca2);
String str1 = null, str2 = null;
try {
str1 = new String(ca1, BinInterSedes.UTF8);
str2 = new String(ca2, BinInterSedes.UTF8);
} catch (UnsupportedEncodingException uee) {
mLog.warn("Unsupported string encoding", uee);
uee.printStackTrace();
}
if (str1 != null && str2 != null) {
rc = new BigDecimal(str1).compareTo(new BigDecimal(str2));
}
}
break;
}
case BinInterSedes.TINYBYTEARRAY:
case BinInterSedes.SMALLBYTEARRAY:
case BinInterSedes.BYTEARRAY: {
type1 = DataType.BYTEARRAY;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2) {
int basz1 = readSize(bb1, dt1);
int basz2 = readSize(bb2, dt2);
rc = WritableComparator.compareBytes(
bb1.array(), bb1.position(), basz1,
bb2.array(), bb2.position(), basz2);
bb1.position(bb1.position() + basz1);
bb2.position(bb2.position() + basz2);
}
break;
}
case BinInterSedes.SMALLCHARARRAY:
case BinInterSedes.CHARARRAY: {
type1 = DataType.CHARARRAY;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2) {
int casz1 = readSize(bb1, dt1);
int casz2 = readSize(bb2, dt2);
String str1 = null, str2 = null;
try {
str1 = new String(bb1.array(), bb1.position(), casz1, BinInterSedes.UTF8);
str2 = new String(bb2.array(), bb2.position(), casz2, BinInterSedes.UTF8);
} catch (UnsupportedEncodingException uee) {
mLog.warn("Unsupported string encoding", uee);
uee.printStackTrace();
} finally {
bb1.position(bb1.position() + casz1);
bb2.position(bb2.position() + casz2);
}
if (str1 != null && str2 != null)
rc = str1.compareTo(str2);
}
break;
}
case BinInterSedes.TUPLE_0:
case BinInterSedes.TUPLE_1:
case BinInterSedes.TUPLE_2:
case BinInterSedes.TUPLE_3:
case BinInterSedes.TUPLE_4:
case BinInterSedes.TUPLE_5:
case BinInterSedes.TUPLE_6:
case BinInterSedes.TUPLE_7:
case BinInterSedes.TUPLE_8:
case BinInterSedes.TUPLE_9:
case BinInterSedes.TINYTUPLE:
case BinInterSedes.SMALLTUPLE:
case BinInterSedes.TUPLE: {
type1 = DataType.TUPLE;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2) {
// first compare sizes
int tsz1 = readSize(bb1, dt1);
int tsz2 = readSize(bb2, dt2);
if (tsz1 > tsz2)
return 1;
else if (tsz1 < tsz2)
return -1;
else {
// if sizes are the same, compare field by field. If we are doing secondary sort, use the sort
// order passed by the caller. Inner tuples never have sort order (so we pass null).
for (int i = 0; i < tsz1 && rc == 0; i++) {
rc = compareBinInterSedesDatum(bb1, bb2, null);
if (rc != 0 && asc != null && asc.length > 1 && !asc[i])
rc *= -1;
}
}
}
break;
}
case BinInterSedes.TINYBAG:
case BinInterSedes.SMALLBAG:
case BinInterSedes.BAG: {
type1 = DataType.BAG;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2)
rc = compareBinInterSedesBag(bb1, bb2, dt1, dt2);
break;
}
case BinInterSedes.TINYMAP:
case BinInterSedes.SMALLMAP:
case BinInterSedes.MAP: {
type1 = DataType.MAP;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2)
rc = compareBinInterSedesMap(bb1, bb2, dt1, dt2);
break;
}
case BinInterSedes.GENERIC_WRITABLECOMPARABLE: {
type1 = DataType.GENERIC_WRITABLECOMPARABLE;
type2 = getGeneralizedDataType(dt2);
if (type1 == type2)
rc = compareBinInterSedesGenericWritableComparable(bb1, bb2);
break;
}
default: {
mLog.info("Unsupported DataType for binary comparison, switching to object deserialization: "
+ DataType.genTypeToNameMap().get(dt1) + "(" + dt1 + ")");
throw new UnsupportedEncodingException();
}
}
// compare generalized data types
if (type1 != type2)
rc = (type1 < type2) ? -1 : 1;
// apply sort order for keys that are not tuples or for whole tuples
if (asc != null && asc.length == 1 && !asc[0])
rc *= -1;
return rc;
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable o1, WritableComparable o2) {
Tuple t1 = (Tuple) o1;
Tuple t2 = (Tuple) o2;
mHasNullField = false;
// treat the outermost tuple differently because we have to deal with sort order
int result = 0;
// first compare sizes
int tsz1 = t1.size();
int tsz2 = t2.size();
if (tsz1 > tsz2)
return 1;
else if (tsz1 < tsz2)
return -1;
else {
try {
// if sizes are the same, compare field by field
if (mIsSecondarySort) {
// we have a compound tuple key (main_key, secondary_key). Each key has its own sort order, so
// we have to deal with them separately. We delegate it to the first invocation of
// compareDatum()
assert (tsz1 == 2); // main_key, secondary_key
result = compareDatum(t1.get(0), t2.get(0), mAsc);
if (result == 0)
result = compareDatum(t1.get(1), t2.get(1), mSecondaryAsc);
} else {
// we have just one tuple key and no chance of recursion, we delegate dealing with sort order to
// compareDatum()
result = compareDatum(t1, t2, mAsc);
}
} catch (ExecException e) {
throw new RuntimeException("Unable to compare tuples", e);
}
}
return result;
}
private int compareDatum(Object o1, Object o2, boolean[] asc) {
int rc = 0;
if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
// objects are Tuples, we may need to apply sort order inside them
Tuple t1 = (Tuple) o1;
Tuple t2 = (Tuple) o2;
int sz1 = t1.size();
int sz2 = t2.size();
if (sz2 < sz1) {
return 1;
} else if (sz2 > sz1) {
return -1;
} else {
for (int i = 0; i < sz1; i++) {
try {
rc = DataType.compare(t1.get(i), t2.get(i));
if (rc != 0 && asc != null && asc.length > 1 && !asc[i])
rc *= -1;
if (t1.get(i) == null) // (PIG-927) record if the tuple has a null field
mHasNullField = true;
if (rc!=0) break;
} catch (ExecException e) {
throw new RuntimeException("Unable to compare tuples", e);
}
}
}
} else {
// objects are NOT Tuples, delegate to DataType.compare()
rc = DataType.compare(o1, o2);
}
// apply sort order for keys that are not tuples or for whole tuples
if (asc != null && asc.length == 1 && !asc[0])
rc *= -1;
return rc;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private int compareBinInterSedesGenericWritableComparable(ByteBuffer bb1, ByteBuffer bb2) throws ExecException,
IOException {
DataInputBuffer buffer1 = new DataInputBuffer();
DataInputBuffer buffer2 = new DataInputBuffer();
buffer1.reset(bb1.array(), bb1.position(), bb1.remaining());
buffer2.reset(bb2.array(), bb2.position(), bb2.remaining());
Comparable writable1 = (Comparable) mSedes.readDatum(buffer1);
Comparable writable2 = (Comparable) mSedes.readDatum(buffer2);
bb1.position(buffer1.getPosition());
bb2.position(buffer2.getPosition());
return writable1.compareTo(writable2);
}
@SuppressWarnings("unchecked")
private int compareBinInterSedesBag(ByteBuffer bb1, ByteBuffer bb2, byte dt1, byte dt2) throws IOException {
int s1 = bb1.position();
int s2 = bb2.position();
int l1 = bb1.remaining();
int l2 = bb2.remaining();
// first compare sizes
int bsz1 = readSize(bb1, dt1);
int bsz2 = readSize(bb2, dt2);
if (bsz1 > bsz2)
return 1;
else if (bsz1 < bsz2)
return -1;
else {
DataInputBuffer buffer1 = new DataInputBuffer();
DataInputBuffer buffer2 = new DataInputBuffer();
buffer1.reset(bb1.array(), s1, l1);
buffer2.reset(bb2.array(), s2, l2);
DataBag bag1 = (DataBag) mSedes.readDatum(buffer1, dt1);
DataBag bag2 = (DataBag) mSedes.readDatum(buffer2, dt2);
bb1.position(buffer1.getPosition());
bb2.position(buffer2.getPosition());
return bag1.compareTo(bag2);
}
}
@SuppressWarnings("unchecked")
private int compareBinInterSedesMap(ByteBuffer bb1, ByteBuffer bb2, byte dt1, byte dt2) throws ExecException,
IOException {
int s1 = bb1.position();
int s2 = bb2.position();
int l1 = bb1.remaining();
int l2 = bb2.remaining();
// first compare sizes
int bsz1 = readSize(bb1, dt1);
int bsz2 = readSize(bb2, dt2);
if (bsz1 > bsz2)
return 1;
else if (bsz1 < bsz2)
return -1;
else {
DataInputBuffer buffer1 = new DataInputBuffer();
DataInputBuffer buffer2 = new DataInputBuffer();
buffer1.reset(bb1.array(), s1, l1);
buffer2.reset(bb2.array(), s2, l2);
Map<String, Object> map1 = (Map<String, Object>) mSedes.readDatum(buffer1, dt1);
Map<String, Object> map2 = (Map<String, Object>) mSedes.readDatum(buffer2, dt2);
bb1.position(buffer1.getPosition());
bb2.position(buffer2.getPosition());
return DataType.compare(map1, map2, DataType.MAP, DataType.MAP);
}
}
private static byte getGeneralizedDataType(byte type) {
switch (type) {
case BinInterSedes.NULL:
return DataType.NULL;
case BinInterSedes.BOOLEAN_TRUE:
case BinInterSedes.BOOLEAN_FALSE:
return DataType.BOOLEAN;
case BinInterSedes.BYTE:
return DataType.BYTE;
case BinInterSedes.INTEGER_0:
case BinInterSedes.INTEGER_1:
case BinInterSedes.INTEGER_INBYTE:
case BinInterSedes.INTEGER_INSHORT:
case BinInterSedes.INTEGER:
return DataType.INTEGER;
case BinInterSedes.LONG_0:
case BinInterSedes.LONG_1:
case BinInterSedes.LONG_INBYTE:
case BinInterSedes.LONG_INSHORT:
case BinInterSedes.LONG_ININT:
case BinInterSedes.LONG:
return DataType.LONG;
case BinInterSedes.DATETIME:
return DataType.DATETIME;
case BinInterSedes.FLOAT:
return DataType.FLOAT;
case BinInterSedes.DOUBLE:
return DataType.DOUBLE;
case BinInterSedes.BIGINTEGER:
return DataType.BIGINTEGER;
case BinInterSedes.BIGDECIMAL:
return DataType.BIGDECIMAL;
case BinInterSedes.TINYBYTEARRAY:
case BinInterSedes.SMALLBYTEARRAY:
case BinInterSedes.BYTEARRAY:
return DataType.BYTEARRAY;
case BinInterSedes.SMALLCHARARRAY:
case BinInterSedes.CHARARRAY:
return DataType.CHARARRAY;
case BinInterSedes.TUPLE_0:
case BinInterSedes.TUPLE_1:
case BinInterSedes.TUPLE_2:
case BinInterSedes.TUPLE_3:
case BinInterSedes.TUPLE_4:
case BinInterSedes.TUPLE_5:
case BinInterSedes.TUPLE_6:
case BinInterSedes.TUPLE_7:
case BinInterSedes.TUPLE_8:
case BinInterSedes.TUPLE_9:
case BinInterSedes.TUPLE:
case BinInterSedes.TINYTUPLE:
case BinInterSedes.SMALLTUPLE:
return DataType.TUPLE;
case BinInterSedes.BAG:
case BinInterSedes.TINYBAG:
case BinInterSedes.SMALLBAG:
return DataType.BAG;
case BinInterSedes.MAP:
case BinInterSedes.TINYMAP:
case BinInterSedes.SMALLMAP:
return DataType.MAP;
case BinInterSedes.INTERNALMAP:
return DataType.INTERNALMAP;
case BinInterSedes.GENERIC_WRITABLECOMPARABLE:
return DataType.GENERIC_WRITABLECOMPARABLE;
default:
throw new RuntimeException("Unexpected data type " + type + " found in stream.");
}
}
private static long readLong(ByteBuffer bb, byte type) {
int bytesToRead = 0;
switch (type) {
case BinInterSedes.LONG_0: return 0L;
case BinInterSedes.LONG_1: return 1L;
case BinInterSedes.LONG_INBYTE: return bb.get();
case BinInterSedes.LONG_INSHORT: return bb.getShort();
case BinInterSedes.LONG_ININT: return bb.getInt();
case BinInterSedes.LONG: return bb.getLong();
default:
throw new RuntimeException("Unexpected data type " + type + " found in stream.");
}
}
private static int readInt(ByteBuffer bb, byte type) {
switch (type) {
case BinInterSedes.INTEGER_0:
return 0;
case BinInterSedes.INTEGER_1:
return 1;
case BinInterSedes.INTEGER_INBYTE:
return bb.get();
case BinInterSedes.INTEGER_INSHORT:
return bb.getShort();
case BinInterSedes.INTEGER:
return bb.getInt();
default:
throw new RuntimeException("Unexpected data type " + type + " found in stream.");
}
}
/**
* @param bb ByteBuffer having serialized object, including the type information
* @param type serialized type information
* @return the size of this type
*/
private static int readSize(ByteBuffer bb) {
return readSize(bb, bb.get());
}
/**
* @param bb ByteBuffer having serialized object, minus the type information
* @param type serialized type information
* @return the size of this type
*/
private static int readSize(ByteBuffer bb, byte type) {
switch (type) {
case BinInterSedes.TINYBYTEARRAY:
case BinInterSedes.TINYTUPLE:
case BinInterSedes.TINYBAG:
case BinInterSedes.TINYMAP:
return getUnsignedByte(bb);
case BinInterSedes.SMALLBYTEARRAY:
case BinInterSedes.SMALLCHARARRAY:
case BinInterSedes.SMALLTUPLE:
case BinInterSedes.SMALLBAG:
case BinInterSedes.SMALLMAP:
return getUnsignedShort(bb);
case BinInterSedes.BYTEARRAY:
case BinInterSedes.CHARARRAY:
case BinInterSedes.TUPLE:
case BinInterSedes.BAG:
case BinInterSedes.MAP:
return bb.getInt();
case BinInterSedes.TUPLE_0:
return 0;
case BinInterSedes.TUPLE_1:
return 1;
case BinInterSedes.TUPLE_2:
return 2;
case BinInterSedes.TUPLE_3:
return 3;
case BinInterSedes.TUPLE_4:
return 4;
case BinInterSedes.TUPLE_5:
return 5;
case BinInterSedes.TUPLE_6:
return 6;
case BinInterSedes.TUPLE_7:
return 7;
case BinInterSedes.TUPLE_8:
return 8;
case BinInterSedes.TUPLE_9:
return 9;
default:
throw new RuntimeException("Unexpected data type " + type + " found in stream.");
}
}
//same as format used by DataInput/DataOutput for unsigned short
private static int getUnsignedShort(ByteBuffer bb) {
return (((bb.get() & 0xff) << 8) | (bb.get() & 0xff));
}
//same as format used by DataInput/DataOutput for unsigned byte
private static int getUnsignedByte(ByteBuffer bb) {
return bb.get() & 0xff;
}
}
@Override
public Class<? extends TupleRawComparator> getTupleRawComparatorClass() {
return BinInterSedesTupleRawComparator.class;
}
public Tuple readTuple(DataInput in) throws IOException {
return readTuple(in, in.readByte());
}
public static boolean isTupleByte(byte b) {
return b == BinInterSedes.TUPLE
|| b == BinInterSedes.SMALLTUPLE
|| b == BinInterSedes.TINYTUPLE
|| b == BinInterSedes.SCHEMA_TUPLE
|| b == BinInterSedes.SCHEMA_TUPLE_BYTE_INDEX
|| b == BinInterSedes.SCHEMA_TUPLE_SHORT_INDEX
|| b == BinInterSedes.TUPLE_0
|| b == BinInterSedes.TUPLE_1
|| b == BinInterSedes.TUPLE_2
|| b == BinInterSedes.TUPLE_3
|| b == BinInterSedes.TUPLE_4
|| b == BinInterSedes.TUPLE_5
|| b == BinInterSedes.TUPLE_6
|| b == BinInterSedes.TUPLE_7
|| b == BinInterSedes.TUPLE_8
|| b == BinInterSedes.TUPLE_9;
}
}