/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package edu.uci.ics.hivesterix.serde.lazy;

import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.WritableUtils;

import edu.uci.ics.hivesterix.serde.lazy.objectinspector.LazyObjectInspectorFactory;
import edu.uci.ics.hivesterix.serde.lazy.objectinspector.primitive.PrimitiveObjectInspectorFactory;

/**
 * LazyUtils.
 * 
 */
public final class LazyUtils {

	/**
	 * Convert the byte array to an int starting from the given offset. Refer to
	 * code by aeden on DZone Snippets:
	 * 
	 * @param b
	 *            the byte array
	 * @param offset
	 *            the array offset
	 * @return the integer
	 */
	public static int byteArrayToInt(byte[] b, int offset) {
		int value = 0;
		for (int i = 0; i < 4; i++) {
			int shift = (4 - 1 - i) * 8;
			value += (b[i + offset] & 0x000000FF) << shift;
		}
		return value;
	}

	/**
	 * Convert the byte array to a long starting from the given offset.
	 * 
	 * @param b
	 *            the byte array
	 * @param offset
	 *            the array offset
	 * @return the long
	 */
	public static long byteArrayToLong(byte[] b, int offset) {
		long value = 0;
		for (int i = 0; i < 8; i++) {
			int shift = (8 - 1 - i) * 8;
			value += ((long) (b[i + offset] & 0x00000000000000FF)) << shift;
		}
		return value;
	}

	/**
	 * Convert the byte array to a short starting from the given offset.
	 * 
	 * @param b
	 *            the byte array
	 * @param offset
	 *            the array offset
	 * @return the short
	 */
	public static short byteArrayToShort(byte[] b, int offset) {
		short value = 0;
		value += (b[offset] & 0x000000FF) << 8;
		value += (b[offset + 1] & 0x000000FF);
		return value;
	}

	/**
	 * Record is the unit that data is serialized in. A record includes two
	 * parts. The first part stores the size of the element and the second part
	 * stores the real element. size element record ->
	 * |----|-------------------------|
	 * 
	 * A RecordInfo stores two information of a record, the size of the "size"
	 * part which is the element offset and the size of the element part which
	 * is element size.
	 */
	public static class RecordInfo {
		public RecordInfo() {
			elementOffset = 0;
			elementSize = 0;
		}

		public byte elementOffset;
		public int elementSize;

		@Override
		public String toString() {
			return "(" + elementOffset + ", " + elementSize + ")";
		}
	}

	static VInt vInt = new LazyUtils.VInt();

	/**
	 * Check a particular field and set its size and offset in bytes based on
	 * the field type and the bytes arrays.
	 * 
	 * For void, boolean, byte, short, int, long, float and double, there is no
	 * offset and the size is fixed. For string, map, list, struct, the first
	 * four bytes are used to store the size. So the offset is 4 and the size is
	 * computed by concating the first four bytes together. The first four bytes
	 * are defined with respect to the offset in the bytes arrays.
	 * 
	 * @param objectInspector
	 *            object inspector of the field
	 * @param bytes
	 *            bytes arrays store the table row
	 * @param offset
	 *            offset of this field
	 * @param recordInfo
	 *            modify this byteinfo object and return it
	 */
	public static void checkObjectByteInfo(ObjectInspector objectInspector,
			byte[] bytes, int offset, RecordInfo recordInfo) {
		Category category = objectInspector.getCategory();
		switch (category) {
		case PRIMITIVE:
			PrimitiveCategory primitiveCategory = ((PrimitiveObjectInspector) objectInspector)
					.getPrimitiveCategory();
			switch (primitiveCategory) {
			case VOID:
				recordInfo.elementOffset = 0;
				recordInfo.elementSize = 0;
				break;
			case BOOLEAN:
			case BYTE:
				recordInfo.elementOffset = 0;
				recordInfo.elementSize = 1;
				break;
			case SHORT:
				recordInfo.elementOffset = 0;
				recordInfo.elementSize = 2;
				break;
			case FLOAT:
				recordInfo.elementOffset = 0;
				recordInfo.elementSize = 4;
				break;
			case DOUBLE:
				recordInfo.elementOffset = 0;
				recordInfo.elementSize = 8;
				break;
			case INT:
				recordInfo.elementOffset = 0;
				recordInfo.elementSize = WritableUtils
						.decodeVIntSize(bytes[offset]);
				break;
			case LONG:
				recordInfo.elementOffset = 0;
				recordInfo.elementSize = WritableUtils
						.decodeVIntSize(bytes[offset]);
				break;
			case STRING:
				// using vint instead of 4 bytes
				LazyUtils.readVInt(bytes, offset, vInt);
				recordInfo.elementOffset = vInt.length;
				recordInfo.elementSize = vInt.value;
				break;
			default: {
				throw new RuntimeException("Unrecognized primitive type: "
						+ primitiveCategory);
			}
			}
			break;
		case LIST:
		case MAP:
		case STRUCT:
			recordInfo.elementOffset = 4;
			recordInfo.elementSize = LazyUtils.byteArrayToInt(bytes, offset);
			break;
		default: {
			throw new RuntimeException("Unrecognized non-primitive type: "
					+ category);
		}
		}
	}

	/**
	 * A zero-compressed encoded long.
	 */
	public static class VLong {
		public VLong() {
			value = 0;
			length = 0;
		}

		public long value;
		public byte length;
	};

	/**
	 * Reads a zero-compressed encoded long from a byte array and returns it.
	 * 
	 * @param bytes
	 *            the byte array
	 * @param offset
	 *            offset of the array to read from
	 * @param vlong
	 *            storing the deserialized long and its size in byte
	 */
	public static void readVLong(byte[] bytes, int offset, VLong vlong) {
		byte firstByte = bytes[offset];
		vlong.length = (byte) WritableUtils.decodeVIntSize(firstByte);
		if (vlong.length == 1) {
			vlong.value = firstByte;
			return;
		}
		long i = 0;
		for (int idx = 0; idx < vlong.length - 1; idx++) {
			byte b = bytes[offset + 1 + idx];
			i = i << 8;
			i = i | (b & 0xFF);
		}
		vlong.value = (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
	}

	/**
	 * A zero-compressed encoded integer.
	 */
	public static class VInt implements Serializable {
		private static final long serialVersionUID = 1L;

		public VInt() {
			value = 0;
			length = 0;
		}

		public int value;
		public byte length;
	};

	/**
	 * Reads a zero-compressed encoded int from a byte array and returns it.
	 * 
	 * @param bytes
	 *            the byte array
	 * @param offset
	 *            offset of the array to read from
	 * @param vInt
	 *            storing the deserialized int and its size in byte
	 */
	public static void readVInt(byte[] bytes, int offset, VInt vInt) {
		byte firstByte = bytes[offset];
		vInt.length = (byte) WritableUtils.decodeVIntSize(firstByte);
		if (vInt.length == 1) {
			vInt.value = firstByte;
			return;
		}
		int i = 0;
		for (int idx = 0; idx < vInt.length - 1; idx++) {
			byte b = bytes[offset + 1 + idx];
			i = i << 8;
			i = i | (b & 0xFF);
		}
		vInt.value = (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1) : i);
	}

	/**
	 * Writes a zero-compressed encoded int to a byte array.
	 * 
	 * @param byteStream
	 *            the byte array/stream
	 * @param i
	 *            the int
	 */
	public static void writeVInt(Output byteStream, int i) {
		writeVLong(byteStream, i);
	}

	/**
	 * Write a zero-compressed encoded long to a byte array.
	 * 
	 * @param byteStream
	 *            the byte array/stream
	 * @param l
	 *            the long
	 */
	public static void writeVLong(Output byteStream, long l) {
		if (l >= -112 && l <= 127) {
			byteStream.write((byte) l);
			return;
		}

		int len = -112;
		if (l < 0) {
			l ^= -1L; // take one's complement'
			len = -120;
		}

		long tmp = l;
		while (tmp != 0) {
			tmp = tmp >> 8;
			len--;
		}

		byteStream.write((byte) len);

		len = (len < -120) ? -(len + 120) : -(len + 112);

		for (int idx = len; idx != 0; idx--) {
			int shiftbits = (idx - 1) * 8;
			long mask = 0xFFL << shiftbits;
			byteStream.write((byte) ((l & mask) >> shiftbits));
		}
	}

	static Map<TypeInfo, ObjectInspector> cachedLazyObjectInspector = new ConcurrentHashMap<TypeInfo, ObjectInspector>();

	/**
	 * Returns the lazy binary object inspector that can be used to inspect an
	 * lazy binary object of that typeInfo
	 * 
	 * For primitive types, we use the standard writable object inspector.
	 */
	public static ObjectInspector getLazyObjectInspectorFromTypeInfo(
			TypeInfo typeInfo, boolean topLevel) {
		if (typeInfo == null)
			throw new IllegalStateException("illegal type null ");
		ObjectInspector result = cachedLazyObjectInspector.get(typeInfo);
		if (result == null) {
			switch (typeInfo.getCategory()) {
			case PRIMITIVE: {
				result = PrimitiveObjectInspectorFactory
						.getPrimitiveLazyObjectInspector(((PrimitiveTypeInfo) typeInfo)
								.getPrimitiveCategory());
				break;
			}
			case LIST: {
				ObjectInspector elementObjectInspector = getLazyObjectInspectorFromTypeInfo(
						((ListTypeInfo) typeInfo).getListElementTypeInfo(),
						false);
				result = LazyObjectInspectorFactory
						.getLazyListObjectInspector(elementObjectInspector);
				break;
			}
			case MAP: {
				MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
				ObjectInspector keyObjectInspector = getLazyObjectInspectorFromTypeInfo(
						mapTypeInfo.getMapKeyTypeInfo(), false);
				ObjectInspector valueObjectInspector = getLazyObjectInspectorFromTypeInfo(
						mapTypeInfo.getMapValueTypeInfo(), false);
				result = LazyObjectInspectorFactory.getLazyMapObjectInspector(
						keyObjectInspector, valueObjectInspector);
				break;
			}
			case STRUCT: {
				StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
				List<String> fieldNames = structTypeInfo
						.getAllStructFieldNames();
				List<TypeInfo> fieldTypeInfos = structTypeInfo
						.getAllStructFieldTypeInfos();
				List<ObjectInspector> fieldObjectInspectors = new ArrayList<ObjectInspector>(
						fieldTypeInfos.size());

				for (int i = 0; i < fieldTypeInfos.size(); i++) {
					fieldObjectInspectors
							.add(getLazyObjectInspectorFromTypeInfo(
									fieldTypeInfos.get(i), false));
				}

				// if it is top level then create columnar
				if (topLevel)
					result = LazyObjectInspectorFactory
							.getLazyColumnarObjectInspector(fieldNames,
									fieldObjectInspectors);
				// if it is not top level then create struct
				else
					result = LazyObjectInspectorFactory
							.getLazyStructObjectInspector(fieldNames,
									fieldObjectInspectors);

				break;
			}
			default: {
				result = null;
			}
			}
			cachedLazyObjectInspector.put(typeInfo, result);
		}
		return result;
	}

	/**
	 * get top-level lazy object inspector
	 * 
	 * @param fieldNames
	 * @param fieldTypeInfos
	 * @return
	 */
	public static ObjectInspector getLazyObjectInspector(
			List<String> fieldNames, List<TypeInfo> fieldTypeInfos) {
		List<ObjectInspector> fieldObjectInspectors = new ArrayList<ObjectInspector>(
				fieldTypeInfos.size());
		for (int i = 0; i < fieldTypeInfos.size(); i++) {
			fieldObjectInspectors.add(getLazyObjectInspectorFromTypeInfo(
					fieldTypeInfos.get(i), false));
		}

		return LazyObjectInspectorFactory.getLazyColumnarObjectInspector(
				fieldNames, fieldObjectInspectors);
	}

	private LazyUtils() {
		// prevent instantiation
	}

	/**
	 * Returns -1 if the first byte sequence is lexicographically less than the
	 * second; returns +1 if the second byte sequence is lexicographically less
	 * than the first; otherwise return 0.
	 */
	public static int compare(byte[] b1, int start1, int length1, byte[] b2,
			int start2, int length2) {

		int min = Math.min(length1, length2);

		for (int i = 0; i < min; i++) {
			if (b1[start1 + i] == b2[start2 + i]) {
				continue;
			}
			if (b1[start1 + i] < b2[start2 + i]) {
				return -1;
			} else {
				return 1;
			}
		}

		if (length1 < length2) {
			return -1;
		}
		if (length1 > length2) {
			return 1;
		}
		return 0;
	}

	public static int hashBytes(byte[] data, int start, int len) {
		int hash = 1;
		for (int i = start; i < len; i++) {
			hash = (31 * hash) + data[i];
		}
		return hash;
	}

	/**
	 * Writes a zero-compressed encoded int to a byte array.
	 * 
	 * @param byteStream
	 *            the byte array/stream
	 * @param i
	 *            the int
	 */
	public static void writeVInt(DataOutput byteStream, int i)
			throws IOException {
		writeVLong(byteStream, i);
	}

	/**
	 * Write a zero-compressed encoded long to a byte array.
	 * 
	 * @param byteStream
	 *            the byte array/stream
	 * @param l
	 *            the long
	 */
	public static void writeVLong(DataOutput byteStream, long l)
			throws IOException {
		if (l >= -112 && l <= 127) {
			byteStream.write((byte) l);
			return;
		}

		int len = -112;
		if (l < 0) {
			l ^= -1L; // take one's complement'
			len = -120;
		}

		long tmp = l;
		while (tmp != 0) {
			tmp = tmp >> 8;
			len--;
		}

		byteStream.write((byte) len);

		len = (len < -120) ? -(len + 120) : -(len + 112);

		for (int idx = len; idx != 0; idx--) {
			int shiftbits = (idx - 1) * 8;
			long mask = 0xFFL << shiftbits;
			byteStream.write((byte) ((l & mask) >> shiftbits));
		}
	}
}
