/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.data;

import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.utils.SedesHelper;
import org.apache.pig.impl.util.ObjectSerializer;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

/**
 * A class to handle reading and writing of intermediate results of data types. The serialization format used by this
 * class more efficient than what was used in DataReaderWriter . The format used by the functions in this class is
 * subject to change, so it should be used ONLY to store intermediate results within a pig query.
 */
@InterfaceAudience.Private
@InterfaceStability.Stable
public class BinInterSedes implements InterSedes {

    private static final int ONE_MINUTE = 60000;

    public static final byte BOOLEAN_TRUE = 0;
    public static final byte BOOLEAN_FALSE = 1;

    public static final byte BYTE = 2;

    public static final byte INTEGER = 3;
    // since boolean is not supported yet(v0.7) as external type, lot of people use int instead and some data with old
    // schema is likely stay for some time. so optimizing for that case as well
    public static final byte INTEGER_0 = 4;
    public static final byte INTEGER_1 = 5;
    public static final byte INTEGER_INSHORT = 6;
    public static final byte INTEGER_INBYTE = 7;

    public static final byte LONG = 8;
    public static final byte FLOAT = 9;
    public static final byte DOUBLE = 10;

    public static final byte BYTEARRAY = 11;
    public static final byte SMALLBYTEARRAY = 12;
    public static final byte TINYBYTEARRAY = 13;

    public static final byte CHARARRAY = 14;
    public static final byte SMALLCHARARRAY = 15;

    public static final byte MAP = 16;
    public static final byte SMALLMAP = 17;
    public static final byte TINYMAP = 18;

    public static final byte TUPLE = 19;
    public static final byte SMALLTUPLE = 20;
    public static final byte TINYTUPLE = 21;

    public static final byte BAG = 22;
    public static final byte SMALLBAG = 23;
    public static final byte TINYBAG = 24;

    public static final byte GENERIC_WRITABLECOMPARABLE = 25;
    public static final byte INTERNALMAP = 26;

    public static final byte NULL = 27;

    public static final byte SCHEMA_TUPLE_BYTE_INDEX = 28;
    public static final byte SCHEMA_TUPLE_SHORT_INDEX = 29;
    public static final byte SCHEMA_TUPLE = 30;

    public static final byte LONG_INBYTE = 31;
    public static final byte LONG_INSHORT = 32;
    public static final byte LONG_ININT = 33;
    public static final byte LONG_0 = 34;
    public static final byte LONG_1 = 35;

    public static final byte TUPLE_0 = 36;
    public static final byte TUPLE_1 = 37;
    public static final byte TUPLE_2 = 38;
    public static final byte TUPLE_3 = 39;
    public static final byte TUPLE_4 = 40;
    public static final byte TUPLE_5 = 41;
    public static final byte TUPLE_6 = 42;
    public static final byte TUPLE_7 = 43;
    public static final byte TUPLE_8 = 44;
    public static final byte TUPLE_9 = 45;

    public static final byte BIGINTEGER = 46;
    public static final byte BIGDECIMAL = 47;

    public static final byte DATETIME = 48;

    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
    private static BagFactory mBagFactory = BagFactory.getInstance();
    public static final int UNSIGNED_SHORT_MAX = 65535;
    public static final int UNSIGNED_BYTE_MAX = 255;
    public static final String UTF8 = "UTF-8";

    public Tuple readTuple(DataInput in, byte type) throws IOException {
        switch (type) {
        case TUPLE_0:
        case TUPLE_1:
        case TUPLE_2:
        case TUPLE_3:
        case TUPLE_4:
        case TUPLE_5:
        case TUPLE_6:
        case TUPLE_7:
        case TUPLE_8:
        case TUPLE_9:
        case TUPLE:
        case TINYTUPLE:
        case SMALLTUPLE:
            return SedesHelper.readGenericTuple(in, type);
        case SCHEMA_TUPLE_BYTE_INDEX:
        case SCHEMA_TUPLE_SHORT_INDEX:
        case SCHEMA_TUPLE:
            return readSchemaTuple(in, type);
        default:
            throw new ExecException("Unknown Tuple type found in stream: " + type);
        }
        }

    private Tuple readSchemaTuple(DataInput in, byte type) throws IOException {
        int id;
        switch (type) {
        case (SCHEMA_TUPLE_BYTE_INDEX): id = in.readUnsignedByte(); break;
        case (SCHEMA_TUPLE_SHORT_INDEX): id = in.readUnsignedShort(); break;
        case (SCHEMA_TUPLE): id = in.readInt(); break;
        default: throw new RuntimeException("Invalid type given to readSchemaTuple: " + type);
    }

        Tuple st = SchemaTupleFactory.getInstance(id).newTuple();
        st.readFields(in);

        return st;
    }

    public int getTupleSize(DataInput in, byte type) throws IOException {
        int sz;
        switch (type) {
        case TUPLE_0:
            return 0;
        case TUPLE_1:
            return 1;
        case TUPLE_2:
            return 2;
        case TUPLE_3:
            return 3;
        case TUPLE_4:
            return 4;
        case TUPLE_5:
            return 5;
        case TUPLE_6:
            return 6;
        case TUPLE_7:
            return 7;
        case TUPLE_8:
            return 8;
        case TUPLE_9:
            return 9;
        case TINYTUPLE:
            sz = in.readUnsignedByte();
            break;
        case SMALLTUPLE:
            sz = in.readUnsignedShort();
            break;
        case TUPLE:
            sz = in.readInt();
            break;
        default: {
            int errCode = 2112;
            String msg = "Unexpected datatype " + type + " while reading tuple" + "from binary file.";
            throw new ExecException(msg, errCode, PigException.BUG);
        }
        }
        // if sz == 0, we construct an "empty" tuple - presumably the writer wrote an empty tuple!
        if (sz < 0) {
            throw new IOException("Invalid size " + sz + " for a tuple");
        }
        return sz;
    }

    private DataBag readBag(DataInput in, byte type) throws IOException {
        DataBag bag = mBagFactory.newDefaultBag();
        long size;
        // determine size of bag
        switch (type) {
        case TINYBAG:
            size = in.readUnsignedByte();
            break;
        case SMALLBAG:
            size = in.readUnsignedShort();
            break;
        case BAG:
            size = in.readLong();
            break;
        default:
            int errCode = 2219;
            String msg = "Unexpected data while reading bag " + "from binary file.";
            throw new ExecException(msg, errCode, PigException.BUG);
        }

        for (long i = 0; i < size; i++) {
            try {
                Object o = readDatum(in);
                bag.add((Tuple) o);
            } catch (ExecException ee) {
                throw ee;
            }
        }
        return bag;
    }

    private Map<String, Object> readMap(DataInput in, byte type) throws IOException {
        int size;
        switch (type) {
        case TINYMAP:
            size = in.readUnsignedByte();
            break;
        case SMALLMAP:
            size = in.readUnsignedShort();
            break;
        case MAP:
            size = in.readInt();
            break;
        default: {
            int errCode = 2220;
            String msg = "Unexpected data while reading map" + "from binary file.";
            throw new ExecException(msg, errCode, PigException.BUG);
        }
        }
        Map<String, Object> m = new HashMap<String, Object>(size);
        for (int i = 0; i < size; i++) {
            String key = (String) readDatum(in);
            m.put(key, readDatum(in));
        }
        return m;
    }

    private InternalMap readInternalMap(DataInput in) throws IOException {
        int size = in.readInt();
        InternalMap m = new InternalMap(size);
        for (int i = 0; i < size; i++) {
            Object key = readDatum(in);
            m.put(key, readDatum(in));
        }
        return m;
    }

    private WritableComparable readWritable(DataInput in) throws IOException {
        String className = (String) readDatum(in);
        // create the writeable class . It needs to have a default constructor
        Class<?> objClass = null;
        try {
            objClass = Class.forName(className);
        } catch (ClassNotFoundException e) {
            throw new IOException("Could not find class " + className + ", while attempting to de-serialize it ", e);
        }
        WritableComparable writable = null;
        try {
            writable = (WritableComparable) objClass.newInstance();
        } catch (Exception e) {
            String msg = "Could create instance of class " + className
                    + ", while attempting to de-serialize it. (no default constructor ?)";
            throw new IOException(msg, e);
        }

        // read the fields of the object from DataInput
        writable.readFields(in);
        return writable;
    }

    /*
     * (non-Javadoc)
     *
     * @see org.apache.pig.data.InterSedes#readDatum(java.io.DataInput)
     */
    @Override
    public Object readDatum(DataInput in) throws IOException, ExecException {
        // Read the data type
        byte b = in.readByte();
        return readDatum(in, b);
    }

    private static Object readBytes(DataInput in, int size) throws IOException {
        byte[] ba = new byte[size];
        in.readFully(ba);
        return new DataByteArray(ba);
    }

    /**
     * Expects binInterSedes data types (NOT DataType types!)
     * <p>
     *
     * @see org.apache.pig.data.InterSedes#readDatum(java.io.DataInput, byte)
     */
    @Override
    public Object readDatum(DataInput in, byte type) throws IOException, ExecException {
        switch (type) {
        case TUPLE_0:
        case TUPLE_1:
        case TUPLE_2:
        case TUPLE_3:
        case TUPLE_4:
        case TUPLE_5:
        case TUPLE_6:
        case TUPLE_7:
        case TUPLE_8:
        case TUPLE_9:
        case TUPLE:
        case TINYTUPLE:
        case SMALLTUPLE:
            return SedesHelper.readGenericTuple(in, type);

        case BAG:
        case TINYBAG:
        case SMALLBAG:
            return readBag(in, type);

        case MAP:
        case TINYMAP:
        case SMALLMAP:
            return readMap(in, type);

        case INTERNALMAP:
            return readInternalMap(in);

        case INTEGER_0:
            return Integer.valueOf(0);
        case INTEGER_1:
            return Integer.valueOf(1);
        case INTEGER_INBYTE:
            return Integer.valueOf(in.readByte());
        case INTEGER_INSHORT:
            return Integer.valueOf(in.readShort());
        case INTEGER:
            return Integer.valueOf(in.readInt());

        case LONG_0:
            return Long.valueOf(0);
        case LONG_1:
            return Long.valueOf(1);
        case LONG_INBYTE:
            return Long.valueOf(in.readByte());
        case LONG_INSHORT:
            return Long.valueOf(in.readShort());
        case LONG_ININT:
            return Long.valueOf(in.readInt());
        case LONG:
            return Long.valueOf(in.readLong());

        case DATETIME:
            return new DateTime(in.readLong(), DateTimeZone.forOffsetMillis(in.readShort() * ONE_MINUTE));

        case FLOAT:
            return Float.valueOf(in.readFloat());

        case DOUBLE:
            return Double.valueOf(in.readDouble());

        case BIGINTEGER:
            return readBigInteger(in);

        case BIGDECIMAL:
            return readBigDecimal(in);

        case BOOLEAN_TRUE:
            return Boolean.valueOf(true);

        case BOOLEAN_FALSE:
            return Boolean.valueOf(false);

        case BYTE:
            return Byte.valueOf(in.readByte());

        case TINYBYTEARRAY:
        case SMALLBYTEARRAY:
        case BYTEARRAY:
            return new DataByteArray(SedesHelper.readBytes(in, type));

        case CHARARRAY:
        case SMALLCHARARRAY:
            return SedesHelper.readChararray(in, type);

        case GENERIC_WRITABLECOMPARABLE:
            return readWritable(in);

        case SCHEMA_TUPLE_BYTE_INDEX:
        case SCHEMA_TUPLE_SHORT_INDEX:
        case SCHEMA_TUPLE:
            return readSchemaTuple(in, type);

        case NULL:
            return null;

        default:
            throw new RuntimeException("Unexpected data type " + type + " found in stream.");
        }
    }

    private Object readBigDecimal(DataInput in) throws IOException {
        return  new BigDecimal((String)readDatum(in));
    }

    private Object readBigInteger(DataInput in) throws IOException {
        return new BigInteger((String)readDatum(in));
    }

    private void writeBigInteger(DataOutput out, BigInteger bi) throws IOException {
        writeDatum(out, bi.toString());
    }

    private void writeBigDecimal(DataOutput out, BigDecimal bd) throws IOException {
        writeDatum(out, bd.toString());
    }

    /*
     * (non-Javadoc)
     *
     * @see org.apache.pig.data.InterSedes#writeDatum(java.io.DataOutput, java.lang.Object)
     */
    @Override
    public void writeDatum(DataOutput out, Object val) throws IOException {
        // Read the data type
        byte type = DataType.findType(val);
        writeDatum(out, val, type);
    }

    @Override
    @SuppressWarnings("unchecked")
    public void writeDatum(DataOutput out, Object val, byte type) throws IOException {
        switch (type) {
        case DataType.TUPLE:
            writeTuple(out, (Tuple) val);
            break;

        case DataType.BAG:
            writeBag(out, (DataBag) val);
            break;

        case DataType.MAP: {
            writeMap(out, (Map<String, Object>) val);
            break;
        }

        case DataType.INTERNALMAP: {
            out.writeByte(INTERNALMAP);
            Map<Object, Object> m = (Map<Object, Object>) val;
            out.writeInt(m.size());
            Iterator<Map.Entry<Object, Object>> i = m.entrySet().iterator();
            while (i.hasNext()) {
                Map.Entry<Object, Object> entry = i.next();
                writeDatum(out, entry.getKey());
                writeDatum(out, entry.getValue());
            }
            break;
        }

        case DataType.INTEGER:
            int i = (Integer) val;
            if (i == 0) {
                out.writeByte(INTEGER_0);
            } else if (i == 1) {
                out.writeByte(INTEGER_1);
            } else if (Byte.MIN_VALUE <= i && i <= Byte.MAX_VALUE) {
                out.writeByte(INTEGER_INBYTE);
                out.writeByte(i);
            } else if (Short.MIN_VALUE <= i && i <= Short.MAX_VALUE) {
                out.writeByte(INTEGER_INSHORT);
                out.writeShort(i);
            } else {
                out.writeByte(INTEGER);
                out.writeInt(i);
            }
            break;

        case DataType.LONG:
            long lng = (Long) val;
            if (lng == 0) {
                out.writeByte(LONG_0);
            } else if (lng == 1) {
                out.writeByte(LONG_1);
            } else if (Byte.MIN_VALUE <= lng && lng <= Byte.MAX_VALUE) {
                out.writeByte(LONG_INBYTE);
                out.writeByte((int)lng);
            } else if (Short.MIN_VALUE <= lng && lng <= Short.MAX_VALUE) {
                out.writeByte(LONG_INSHORT);
                out.writeShort((int)lng);
            } else if (Integer.MIN_VALUE <= lng && lng <= Integer.MAX_VALUE) {
                out.writeByte(LONG_ININT);
                out.writeInt((int)lng);
            } else {
            out.writeByte(LONG);
                out.writeLong(lng);
            }
            break;

        case DataType.DATETIME:
            out.writeByte(DATETIME);
            out.writeLong(((DateTime) val).getMillis());
            out.writeShort(((DateTime) val).getZone().getOffset((DateTime) val) / ONE_MINUTE);
            break;

        case DataType.FLOAT:
            out.writeByte(FLOAT);
            out.writeFloat((Float) val);
            break;

        case DataType.BIGINTEGER:
            out.writeByte(BIGINTEGER);
            writeBigInteger(out, (BigInteger)val);
            break;

        case DataType.BIGDECIMAL:
            out.writeByte(BIGDECIMAL);
            writeBigDecimal(out, (BigDecimal)val);
            break;

        case DataType.DOUBLE:
            out.writeByte(DOUBLE);
            out.writeDouble((Double) val);
            break;

        case DataType.BOOLEAN:
            if ((Boolean) val)
                out.writeByte(BOOLEAN_TRUE);
            else
                out.writeByte(BOOLEAN_FALSE);
            break;

        case DataType.BYTE:
            out.writeByte(BYTE);
            out.writeByte((Byte) val);
            break;

        case DataType.BYTEARRAY: {
            DataByteArray bytes = (DataByteArray) val;
            SedesHelper.writeBytes(out, bytes.mData);
            break;

        }

        case DataType.CHARARRAY: {
            SedesHelper.writeChararray(out, (String) val);
            break;
        }
        case DataType.GENERIC_WRITABLECOMPARABLE:
            out.writeByte(GENERIC_WRITABLECOMPARABLE);
            // store the class name, so we know the class to create on read
            writeDatum(out, val.getClass().getName());
            Writable writable = (Writable) val;
            writable.write(out);
            break;

        case DataType.NULL:
            out.writeByte(NULL);
            break;

        default:
            throw new RuntimeException("Unexpected data type " + val.getClass().getName() + " found in stream. " +
                    "Note only standard Pig type is supported when you output from UDF/LoadFunc");
        }
    }

    private void writeMap(DataOutput out, Map<String, Object> m) throws IOException {

        final int sz = m.size();
        if (sz < UNSIGNED_BYTE_MAX) {
            out.writeByte(TINYMAP);
            out.writeByte(sz);
        } else if (sz < UNSIGNED_SHORT_MAX) {
            out.writeByte(SMALLMAP);
            out.writeShort(sz);
        } else {
            out.writeByte(MAP);
            out.writeInt(sz);
        }
        Iterator<Map.Entry<String, Object>> i = m.entrySet().iterator();
        while (i.hasNext()) {
            Map.Entry<String, Object> entry = i.next();
            writeDatum(out, entry.getKey());
            writeDatum(out, entry.getValue());
        }
    }

    private void writeBag(DataOutput out, DataBag bag) throws IOException {
        // We don't care whether this bag was sorted or distinct because
        // using the iterator to write it will guarantee those things come
        // correctly. And on the other end there'll be no reason to waste
        // time re-sorting or re-applying distinct.
        final long sz = bag.size();
        if (sz < UNSIGNED_BYTE_MAX) {
            out.writeByte(TINYBAG);
            out.writeByte((int) sz);
        } else if (sz < UNSIGNED_SHORT_MAX) {
            out.writeByte(SMALLBAG);
            out.writeShort((int) sz);
        } else {
            out.writeByte(BAG);
            out.writeLong(sz);
        }

        Iterator<Tuple> it = bag.iterator();
        while (it.hasNext()) {
            writeTuple(out, it.next());
        }

    }

    private void writeTuple(DataOutput out, Tuple t) throws IOException {
        if (t instanceof TypeAwareTuple) {
            t.write(out);
        } else {
            SedesHelper.writeGenericTuple(out, t);
    }
    }

    /*
     * (non-Javadoc)
     *
     * @see org.apache.pig.data.InterSedes#addColsToTuple(java.io.DataInput, org.apache.pig.data.Tuple)
     */
    @Override
    public void addColsToTuple(DataInput in, Tuple t) throws IOException {
        byte type = in.readByte();
        int sz = getTupleSize(in, type);
        for (int i = 0; i < sz; i++) {
            t.append(readDatum(in));
        }
    }

    public static class BinInterSedesTupleRawComparator extends WritableComparator implements TupleRawComparator {

        private final Log mLog = LogFactory.getLog(getClass());
        private boolean[] mAsc;
        private boolean[] mSecondaryAsc;
        private static final boolean[] EMPTY_ASC = new boolean[] {};
        private boolean mWholeTuple;
        private boolean mIsSecondarySort;
        private boolean mHasNullField;
        private TupleFactory mFact;
        private InterSedes mSedes;

        public BinInterSedesTupleRawComparator() {
            super(BinSedesTuple.class);
        }

        @Override
        public Configuration getConf() {
            return null;
        }

        @Override
        public void setConf(Configuration conf) {
            try {
                mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
                mSecondaryAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.secondarySortOrder"));
                mIsSecondarySort = true;
            } catch (IOException ioe) {
                mLog.error("Unable to deserialize sort order object" + ioe.getMessage());
                throw new RuntimeException(ioe);
            }
            if (mAsc == null) {
                mAsc = new boolean[1];
                mAsc[0] = true;
            }
            if (mSecondaryAsc == null) {
                mIsSecondarySort = false;
            }
            // If there's only one entry in mAsc, it means it's for the whole
            // tuple. So we can't be looking for each column.
            mWholeTuple = (mAsc.length == 1);
            mFact = TupleFactory.getInstance();
            mSedes = InterSedesFactory.getInterSedesInstance();
        }

        @Override
        public boolean hasComparedTupleNull() {
            return mHasNullField;
        }

        /**
         * Compare two BinSedesTuples as raw bytes. We assume the Tuples are NOT PigNullableWritable, so client classes
         * need to deal with Null and Index.
         */
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            int rc = 0;
            ByteBuffer bb1 = ByteBuffer.wrap(b1, s1, l1);
            ByteBuffer bb2 = ByteBuffer.wrap(b2, s2, l2);
            try {
                rc = compareBinSedesTuple(bb1, bb2);
            } catch (IOException ioe) {
                mLog.error("I/O error during tuple comparison: " + ioe.getMessage());
                throw new RuntimeException(ioe.getMessage(), ioe);
            }
            return rc;
        }

        /**
         * Compare two BinSedesTuples as raw bytes. We deal with sort ordering in this method.
         *
         * @throws IOException
         */
        private int compareBinSedesTuple(ByteBuffer bb1, ByteBuffer bb2) throws IOException {
            mHasNullField = false;
            // store the position in case of deserialization
            int s1 = bb1.position();
            int s2 = bb2.position();
            // treat the outermost tuple differently because we have to deal with sort order
            int result = 0;
            try {
                // first compare sizes
                int tsz1 = readSize(bb1);
                int tsz2 = readSize(bb2);
                if (tsz1 > tsz2)
                    return 1;
                else if (tsz1 < tsz2)
                    return -1;
                else {
                    // if sizes are the same, compare field by field
                    if (mIsSecondarySort) {
                        // we have a compound tuple key (main_key, secondary_key). Each key has its own sort order, so
                        // we have to deal with them separately. We delegate it to the first invocation of
                        // compareBinInterSedesDatum()
                        assert (tsz1 == 2); // main_key, secondary_key
                        result = compareBinInterSedesDatum(bb1, bb2, mAsc);
                        if (result == 0)
                            result = compareBinInterSedesDatum(bb1, bb2, mSecondaryAsc);
                    } else {
                        // we have just one tuple key, we deal with sort order here
                        for (int i = 0; i < tsz1 && result == 0; i++) {
                            // EMPTY_ASC is used to distinguish original calls from recursive ones (hack-ish)
                            result = compareBinInterSedesDatum(bb1, bb2, EMPTY_ASC);
                            // flip if the order is descending
                            if (result != 0) {
                                if (!mWholeTuple && !mAsc[i])
                                    result *= -1;
                                else if (mWholeTuple && !mAsc[0])
                                    result *= -1;
                            }
                        }
                    }
                }
            } catch (UnsupportedEncodingException uee) {
                Tuple t1 = mFact.newTuple();
                Tuple t2 = mFact.newTuple();
                t1.readFields(new DataInputStream(new ByteArrayInputStream(bb1.array(), s1, bb1.limit())));
                t2.readFields(new DataInputStream(new ByteArrayInputStream(bb2.array(), s2, bb2.limit())));
                // delegate to compare()
                result = compare(t1, t2);
            }
            return result;
        }

        private int compareBinInterSedesDatum(ByteBuffer bb1, ByteBuffer bb2, boolean[] asc) throws IOException {
            int rc = 0;
            byte type1, type2;
            byte dt1 = bb1.get();
            byte dt2 = bb2.get();
            switch (dt1) {
            case BinInterSedes.NULL: {
                type1 = DataType.NULL;
                type2 = getGeneralizedDataType(dt2);
                if (asc != null) // we are scanning the top-level Tuple (original call)
                    mHasNullField = true;
                if (type1 == type2)
                    rc = 0;
                break;
            }
            case BinInterSedes.BOOLEAN_TRUE:
            case BinInterSedes.BOOLEAN_FALSE: {
                type1 = DataType.BOOLEAN;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2) {
                    // false < true
                    int bv1 = (dt1 == BinInterSedes.BOOLEAN_TRUE) ? 1 : 0;
                    int bv2 = (dt2 == BinInterSedes.BOOLEAN_TRUE) ? 1 : 0;
                    rc = bv1 - bv2;
                }
                break;
            }
            case BinInterSedes.BYTE: {
                type1 = DataType.BYTE;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2) {
                    byte bv1 = bb1.get();
                    byte bv2 = bb2.get();
                    rc = (bv1 < bv2 ? -1 : (bv1 == bv2 ? 0 : 1));
                }
                break;
            }
            case BinInterSedes.INTEGER_0:
            case BinInterSedes.INTEGER_1:
            case BinInterSedes.INTEGER_INBYTE:
            case BinInterSedes.INTEGER_INSHORT:
            case BinInterSedes.INTEGER: {
                type1 = DataType.INTEGER;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2) {
                    int iv1 = readInt(bb1, dt1);
                    int iv2 = readInt(bb2, dt2);
                    rc = (iv1 < iv2 ? -1 : (iv1 == iv2 ? 0 : 1));
                }
                break;
            }
            case BinInterSedes.LONG_0:
            case BinInterSedes.LONG_1:
            case BinInterSedes.LONG_INBYTE:
            case BinInterSedes.LONG_INSHORT:
            case BinInterSedes.LONG_ININT:
            case BinInterSedes.LONG: {
                type1 = DataType.LONG;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2) {
                    long lv1 = readLong(bb1, dt1);
                    long lv2 = readLong(bb2, dt2);
                    rc = (lv1 < lv2 ? -1 : (lv1 == lv2 ? 0 : 1));
                }
                break;
            }
            case BinInterSedes.DATETIME: {
                type1 = DataType.DATETIME;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2) {
                    long lv1 = bb1.getLong();
                    bb1.position(bb1.position() + 2); // move cursor forward without read the timezone bytes
                    long lv2 = bb2.getLong();
                    bb2.position(bb2.position() + 2);
                    rc = (lv1 < lv2 ? -1 : (lv1 == lv2 ? 0 : 1));
                }
                break;
            }
            case BinInterSedes.FLOAT: {
                type1 = DataType.FLOAT;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2) {
                    float fv1 = bb1.getFloat();
                    float fv2 = bb2.getFloat();
                    rc = Float.compare(fv1, fv2);
                }
                break;
            }
            case BinInterSedes.DOUBLE: {
                type1 = DataType.DOUBLE;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2) {
                    double dv1 = bb1.getDouble();
                    double dv2 = bb2.getDouble();
                    rc = Double.compare(dv1, dv2);
                }
                break;
            }
            case BinInterSedes.BIGINTEGER: {
                type1 = DataType.BIGINTEGER;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2) {
                    int sz1 = readSize(bb1, bb1.get());
                    int sz2 = readSize(bb2, bb2.get());
                    byte[] ca1 = new byte[sz1];
                    byte[] ca2 = new byte[sz2];
                    bb1.get(ca1);
                    bb2.get(ca2);
                    String str1 = null, str2 = null;
                    try {
                        str1 = new String(ca1, BinInterSedes.UTF8);
                        str2 = new String(ca2, BinInterSedes.UTF8);
                    } catch (UnsupportedEncodingException uee) {
                        mLog.warn("Unsupported string encoding", uee);
                        uee.printStackTrace();
                    }
                    if (str1 != null && str2 != null) {
                        rc = new BigInteger(str1).compareTo(new BigInteger(str2));
                    }
                }
                break;
            }
            case BinInterSedes.BIGDECIMAL: {
                type1 = DataType.BIGDECIMAL;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2) {
                    int sz1 = readSize(bb1, bb1.get());
                    int sz2 = readSize(bb2, bb2.get());
                    byte[] ca1 = new byte[sz1];
                    byte[] ca2 = new byte[sz2];
                    bb1.get(ca1);
                    bb2.get(ca2);
                    String str1 = null, str2 = null;
                    try {
                        str1 = new String(ca1, BinInterSedes.UTF8);
                        str2 = new String(ca2, BinInterSedes.UTF8);
                    } catch (UnsupportedEncodingException uee) {
                        mLog.warn("Unsupported string encoding", uee);
                        uee.printStackTrace();
                    }
                    if (str1 != null && str2 != null) {
                        rc = new BigDecimal(str1).compareTo(new BigDecimal(str2));
                    }
                }
                break;
            }
            case BinInterSedes.TINYBYTEARRAY:
            case BinInterSedes.SMALLBYTEARRAY:
            case BinInterSedes.BYTEARRAY: {
                type1 = DataType.BYTEARRAY;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2) {
                    int basz1 = readSize(bb1, dt1);
                    int basz2 = readSize(bb2, dt2);
                    rc = WritableComparator.compareBytes(
                          bb1.array(), bb1.position(), basz1,
                          bb2.array(), bb2.position(), basz2);
                    bb1.position(bb1.position() + basz1);
                    bb2.position(bb2.position() + basz2);
                }
                break;
            }
            case BinInterSedes.SMALLCHARARRAY:
            case BinInterSedes.CHARARRAY: {
                type1 = DataType.CHARARRAY;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2) {
                    int casz1 = readSize(bb1, dt1);
                    int casz2 = readSize(bb2, dt2);
                    String str1 = null, str2 = null;
                    try {
                        str1 = new String(bb1.array(), bb1.position(), casz1, BinInterSedes.UTF8);
                        str2 = new String(bb2.array(), bb2.position(), casz2, BinInterSedes.UTF8);
                    } catch (UnsupportedEncodingException uee) {
                        mLog.warn("Unsupported string encoding", uee);
                        uee.printStackTrace();
                    } finally {
                        bb1.position(bb1.position() + casz1);
                        bb2.position(bb2.position() + casz2);
                    }
                    if (str1 != null && str2 != null)
                        rc = str1.compareTo(str2);
                }
                break;
            }
            case BinInterSedes.TUPLE_0:
            case BinInterSedes.TUPLE_1:
            case BinInterSedes.TUPLE_2:
            case BinInterSedes.TUPLE_3:
            case BinInterSedes.TUPLE_4:
            case BinInterSedes.TUPLE_5:
            case BinInterSedes.TUPLE_6:
            case BinInterSedes.TUPLE_7:
            case BinInterSedes.TUPLE_8:
            case BinInterSedes.TUPLE_9:
            case BinInterSedes.TINYTUPLE:
            case BinInterSedes.SMALLTUPLE:
            case BinInterSedes.TUPLE: {
                type1 = DataType.TUPLE;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2) {
                    // first compare sizes
                    int tsz1 = readSize(bb1, dt1);
                    int tsz2 = readSize(bb2, dt2);
                    if (tsz1 > tsz2)
                        return 1;
                    else if (tsz1 < tsz2)
                        return -1;
                    else {
                        // if sizes are the same, compare field by field. If we are doing secondary sort, use the sort
                        // order passed by the caller. Inner tuples never have sort order (so we pass null).
                        for (int i = 0; i < tsz1 && rc == 0; i++) {
                            rc = compareBinInterSedesDatum(bb1, bb2, null);
                            if (rc != 0 && asc != null && asc.length > 1 && !asc[i])
                                rc *= -1;
                        }
                    }
                }
                break;
            }
            case BinInterSedes.TINYBAG:
            case BinInterSedes.SMALLBAG:
            case BinInterSedes.BAG: {
                type1 = DataType.BAG;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2)
                    rc = compareBinInterSedesBag(bb1, bb2, dt1, dt2);
                break;
            }
            case BinInterSedes.TINYMAP:
            case BinInterSedes.SMALLMAP:
            case BinInterSedes.MAP: {
                type1 = DataType.MAP;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2)
                    rc = compareBinInterSedesMap(bb1, bb2, dt1, dt2);
                break;
            }
            case BinInterSedes.GENERIC_WRITABLECOMPARABLE: {
                type1 = DataType.GENERIC_WRITABLECOMPARABLE;
                type2 = getGeneralizedDataType(dt2);
                if (type1 == type2)
                    rc = compareBinInterSedesGenericWritableComparable(bb1, bb2);
                break;
            }
            default: {
                mLog.info("Unsupported DataType for binary comparison, switching to object deserialization: "
                        + DataType.genTypeToNameMap().get(dt1) + "(" + dt1 + ")");
                throw new UnsupportedEncodingException();
            }
            }
            // compare generalized data types
            if (type1 != type2)
                rc = (type1 < type2) ? -1 : 1;
            // apply sort order for keys that are not tuples or for whole tuples
            if (asc != null && asc.length == 1 && !asc[0])
                rc *= -1;
            return rc;
        }

        @SuppressWarnings("rawtypes")
        @Override
        public int compare(WritableComparable o1, WritableComparable o2) {
            Tuple t1 = (Tuple) o1;
            Tuple t2 = (Tuple) o2;
            mHasNullField = false;
            // treat the outermost tuple differently because we have to deal with sort order
            int result = 0;

            // first compare sizes
            int tsz1 = t1.size();
            int tsz2 = t2.size();
            if (tsz1 > tsz2)
                return 1;
            else if (tsz1 < tsz2)
                return -1;
            else {
                try {
                    // if sizes are the same, compare field by field
                    if (mIsSecondarySort) {
                        // we have a compound tuple key (main_key, secondary_key). Each key has its own sort order, so
                        // we have to deal with them separately. We delegate it to the first invocation of
                        // compareDatum()
                        assert (tsz1 == 2); // main_key, secondary_key
                        result = compareDatum(t1.get(0), t2.get(0), mAsc);
                        if (result == 0)
                            result = compareDatum(t1.get(1), t2.get(1), mSecondaryAsc);
                    } else {
                        // we have just one tuple key and no chance of recursion, we delegate dealing with sort order to
                        // compareDatum()
                        result = compareDatum(t1, t2, mAsc);
                    }
                } catch (ExecException e) {
                    throw new RuntimeException("Unable to compare tuples", e);
                }
            }
            return result;
        }

        private int compareDatum(Object o1, Object o2, boolean[] asc) {
            int rc = 0;
            if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
                // objects are Tuples, we may need to apply sort order inside them
                Tuple t1 = (Tuple) o1;
                Tuple t2 = (Tuple) o2;
                int sz1 = t1.size();
                int sz2 = t2.size();
                if (sz2 < sz1) {
                    return 1;
                } else if (sz2 > sz1) {
                    return -1;
                } else {
                    for (int i = 0; i < sz1; i++) {
                        try {
                            rc = DataType.compare(t1.get(i), t2.get(i));
                            if (rc != 0 && asc != null && asc.length > 1 && !asc[i])
                                rc *= -1;
                            if (t1.get(i) == null) // (PIG-927) record if the tuple has a null field
                                mHasNullField = true;
                            if (rc!=0) break;
                        } catch (ExecException e) {
                            throw new RuntimeException("Unable to compare tuples", e);
                        }
                    }
                }
            } else {
                // objects are NOT Tuples, delegate to DataType.compare()
                rc = DataType.compare(o1, o2);
            }
            // apply sort order for keys that are not tuples or for whole tuples
            if (asc != null && asc.length == 1 && !asc[0])
                rc *= -1;
            return rc;
        }

        @SuppressWarnings({ "unchecked", "rawtypes" })
        private int compareBinInterSedesGenericWritableComparable(ByteBuffer bb1, ByteBuffer bb2) throws ExecException,
                IOException {
            DataInputBuffer buffer1 = new DataInputBuffer();
            DataInputBuffer buffer2 = new DataInputBuffer();
            buffer1.reset(bb1.array(), bb1.position(), bb1.remaining());
            buffer2.reset(bb2.array(), bb2.position(), bb2.remaining());
            Comparable writable1 = (Comparable) mSedes.readDatum(buffer1);
            Comparable writable2 = (Comparable) mSedes.readDatum(buffer2);
            bb1.position(buffer1.getPosition());
            bb2.position(buffer2.getPosition());
            return writable1.compareTo(writable2);
        }

        @SuppressWarnings("unchecked")
        private int compareBinInterSedesBag(ByteBuffer bb1, ByteBuffer bb2, byte dt1, byte dt2) throws IOException {
            int s1 = bb1.position();
            int s2 = bb2.position();
            int l1 = bb1.remaining();
            int l2 = bb2.remaining();
            // first compare sizes
            int bsz1 = readSize(bb1, dt1);
            int bsz2 = readSize(bb2, dt2);
            if (bsz1 > bsz2)
                return 1;
            else if (bsz1 < bsz2)
                return -1;
            else {
                DataInputBuffer buffer1 = new DataInputBuffer();
                DataInputBuffer buffer2 = new DataInputBuffer();
                buffer1.reset(bb1.array(), s1, l1);
                buffer2.reset(bb2.array(), s2, l2);
                DataBag bag1 = (DataBag) mSedes.readDatum(buffer1, dt1);
                DataBag bag2 = (DataBag) mSedes.readDatum(buffer2, dt2);
                bb1.position(buffer1.getPosition());
                bb2.position(buffer2.getPosition());
                return bag1.compareTo(bag2);
            }
        }

        @SuppressWarnings("unchecked")
        private int compareBinInterSedesMap(ByteBuffer bb1, ByteBuffer bb2, byte dt1, byte dt2) throws ExecException,
                IOException {
            int s1 = bb1.position();
            int s2 = bb2.position();
            int l1 = bb1.remaining();
            int l2 = bb2.remaining();
            // first compare sizes
            int bsz1 = readSize(bb1, dt1);
            int bsz2 = readSize(bb2, dt2);
            if (bsz1 > bsz2)
                return 1;
            else if (bsz1 < bsz2)
                return -1;
            else {
                DataInputBuffer buffer1 = new DataInputBuffer();
                DataInputBuffer buffer2 = new DataInputBuffer();
                buffer1.reset(bb1.array(), s1, l1);
                buffer2.reset(bb2.array(), s2, l2);
                Map<String, Object> map1 = (Map<String, Object>) mSedes.readDatum(buffer1, dt1);
                Map<String, Object> map2 = (Map<String, Object>) mSedes.readDatum(buffer2, dt2);
                bb1.position(buffer1.getPosition());
                bb2.position(buffer2.getPosition());
                return DataType.compare(map1, map2, DataType.MAP, DataType.MAP);
            }
        }

        private static byte getGeneralizedDataType(byte type) {
            switch (type) {
            case BinInterSedes.NULL:
                return DataType.NULL;
            case BinInterSedes.BOOLEAN_TRUE:
            case BinInterSedes.BOOLEAN_FALSE:
                return DataType.BOOLEAN;
            case BinInterSedes.BYTE:
                return DataType.BYTE;
            case BinInterSedes.INTEGER_0:
            case BinInterSedes.INTEGER_1:
            case BinInterSedes.INTEGER_INBYTE:
            case BinInterSedes.INTEGER_INSHORT:
            case BinInterSedes.INTEGER:
                return DataType.INTEGER;
            case BinInterSedes.LONG_0:
            case BinInterSedes.LONG_1:
            case BinInterSedes.LONG_INBYTE:
            case BinInterSedes.LONG_INSHORT:
            case BinInterSedes.LONG_ININT:
            case BinInterSedes.LONG:
                return DataType.LONG;
            case BinInterSedes.DATETIME:
                return DataType.DATETIME;
            case BinInterSedes.FLOAT:
                return DataType.FLOAT;
            case BinInterSedes.DOUBLE:
                return DataType.DOUBLE;
            case BinInterSedes.BIGINTEGER:
                return DataType.BIGINTEGER;
            case BinInterSedes.BIGDECIMAL:
                return DataType.BIGDECIMAL;
            case BinInterSedes.TINYBYTEARRAY:
            case BinInterSedes.SMALLBYTEARRAY:
            case BinInterSedes.BYTEARRAY:
                return DataType.BYTEARRAY;
            case BinInterSedes.SMALLCHARARRAY:
            case BinInterSedes.CHARARRAY:
                return DataType.CHARARRAY;
            case BinInterSedes.TUPLE_0:
            case BinInterSedes.TUPLE_1:
            case BinInterSedes.TUPLE_2:
            case BinInterSedes.TUPLE_3:
            case BinInterSedes.TUPLE_4:
            case BinInterSedes.TUPLE_5:
            case BinInterSedes.TUPLE_6:
            case BinInterSedes.TUPLE_7:
            case BinInterSedes.TUPLE_8:
            case BinInterSedes.TUPLE_9:
            case BinInterSedes.TUPLE:
            case BinInterSedes.TINYTUPLE:
            case BinInterSedes.SMALLTUPLE:
                return DataType.TUPLE;
            case BinInterSedes.BAG:
            case BinInterSedes.TINYBAG:
            case BinInterSedes.SMALLBAG:
                return DataType.BAG;
            case BinInterSedes.MAP:
            case BinInterSedes.TINYMAP:
            case BinInterSedes.SMALLMAP:
                return DataType.MAP;
            case BinInterSedes.INTERNALMAP:
                return DataType.INTERNALMAP;
            case BinInterSedes.GENERIC_WRITABLECOMPARABLE:
                return DataType.GENERIC_WRITABLECOMPARABLE;
            default:
                throw new RuntimeException("Unexpected data type " + type + " found in stream.");
            }
        }

        private static long readLong(ByteBuffer bb, byte type) {
            int bytesToRead = 0;
            switch (type) {
            case BinInterSedes.LONG_0: return 0L;
            case BinInterSedes.LONG_1: return 1L;
            case BinInterSedes.LONG_INBYTE: return bb.get();
            case BinInterSedes.LONG_INSHORT: return bb.getShort();
            case BinInterSedes.LONG_ININT: return bb.getInt();
            case BinInterSedes.LONG: return bb.getLong();
            default:
                throw new RuntimeException("Unexpected data type " + type + " found in stream.");
            }
        }

        private static int readInt(ByteBuffer bb, byte type) {
            switch (type) {
            case BinInterSedes.INTEGER_0:
                return 0;
            case BinInterSedes.INTEGER_1:
                return 1;
            case BinInterSedes.INTEGER_INBYTE:
                return bb.get();
            case BinInterSedes.INTEGER_INSHORT:
                return bb.getShort();
            case BinInterSedes.INTEGER:
                return bb.getInt();
            default:
                throw new RuntimeException("Unexpected data type " + type + " found in stream.");
            }
        }

        /**
         * @param bb ByteBuffer having serialized object, including the type information
         * @param type serialized type information
         * @return the size of this type
         */
        private static int readSize(ByteBuffer bb) {
            return readSize(bb, bb.get());
        }

        /**
         * @param bb ByteBuffer having serialized object, minus the type information
         * @param type serialized type information
         * @return the size of this type
         */
        private static int readSize(ByteBuffer bb, byte type) {
            switch (type) {
            case BinInterSedes.TINYBYTEARRAY:
            case BinInterSedes.TINYTUPLE:
            case BinInterSedes.TINYBAG:
            case BinInterSedes.TINYMAP:
                return getUnsignedByte(bb);
            case BinInterSedes.SMALLBYTEARRAY:
            case BinInterSedes.SMALLCHARARRAY:
            case BinInterSedes.SMALLTUPLE:
            case BinInterSedes.SMALLBAG:
            case BinInterSedes.SMALLMAP:
                return getUnsignedShort(bb);
            case BinInterSedes.BYTEARRAY:
            case BinInterSedes.CHARARRAY:
            case BinInterSedes.TUPLE:
            case BinInterSedes.BAG:
            case BinInterSedes.MAP:
                return bb.getInt();
            case BinInterSedes.TUPLE_0:
                return 0;
            case BinInterSedes.TUPLE_1:
                return 1;
            case BinInterSedes.TUPLE_2:
                return 2;
            case BinInterSedes.TUPLE_3:
                return 3;
            case BinInterSedes.TUPLE_4:
                return 4;
            case BinInterSedes.TUPLE_5:
                return 5;
            case BinInterSedes.TUPLE_6:
                return 6;
            case BinInterSedes.TUPLE_7:
                return 7;
            case BinInterSedes.TUPLE_8:
                return 8;
            case BinInterSedes.TUPLE_9:
                return 9;
            default:
                throw new RuntimeException("Unexpected data type " + type + " found in stream.");
            }
        }

        //same as format used by DataInput/DataOutput for unsigned short
        private static int getUnsignedShort(ByteBuffer bb) {
            return (((bb.get() & 0xff) << 8) | (bb.get() & 0xff));
        }

        //same as format used by DataInput/DataOutput for unsigned byte
        private static int getUnsignedByte(ByteBuffer bb) {
            return bb.get() & 0xff;
        }
    }

    @Override
    public Class<? extends TupleRawComparator> getTupleRawComparatorClass() {
        return BinInterSedesTupleRawComparator.class;
    }

    public Tuple readTuple(DataInput in) throws IOException {
        return readTuple(in, in.readByte());
    }

    public static boolean isTupleByte(byte b) {
        return b == BinInterSedes.TUPLE
            || b == BinInterSedes.SMALLTUPLE
            || b == BinInterSedes.TINYTUPLE
            || b == BinInterSedes.SCHEMA_TUPLE
            || b == BinInterSedes.SCHEMA_TUPLE_BYTE_INDEX
            || b == BinInterSedes.SCHEMA_TUPLE_SHORT_INDEX
            || b == BinInterSedes.TUPLE_0
            || b == BinInterSedes.TUPLE_1
            || b == BinInterSedes.TUPLE_2
            || b == BinInterSedes.TUPLE_3
            || b == BinInterSedes.TUPLE_4
            || b == BinInterSedes.TUPLE_5
            || b == BinInterSedes.TUPLE_6
            || b == BinInterSedes.TUPLE_7
            || b == BinInterSedes.TUPLE_8
            || b == BinInterSedes.TUPLE_9;
    }
}
