package edu.uci.ics.hivesterix.runtime.factory.normalize;

import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VLong;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

public class HiveLongAscNormalizedKeyComputerFactory implements
		INormalizedKeyComputerFactory {

	private static final long serialVersionUID = 1L;

	@Override
	public INormalizedKeyComputer createNormalizedKeyComputer() {

		return new INormalizedKeyComputer() {
			private static final int POSTIVE_LONG_MASK = (3 << 30);
			private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
			private static final int NEGATIVE_LONG_MASK = (0 << 30);
			private VLong vlong = new VLong();

			@Override
			public int normalize(byte[] bytes, int start, int length) {
				LazyUtils.readVLong(bytes, start, vlong);
				if (vlong.length != length)
					throw new IllegalArgumentException(
							"length mismatch in int comparator function actual: "
									+ vlong.length + " expected " + length);
				long value = (long) vlong.value;
				int highValue = (int) (value >> 32);
				if (highValue > 0) {
					/**
					 * larger than Integer.MAX
					 */
					int highNmk = getKey(highValue);
					highNmk >>= 2;
					highNmk |= POSTIVE_LONG_MASK;
					return highNmk;
				} else if (highValue == 0) {
					/**
					 * smaller than Integer.MAX but >=0
					 */
					int lowNmk = (int) value;
					lowNmk >>= 2;
					lowNmk |= NON_NEGATIVE_INT_MASK;
					return lowNmk;
				} else {
					/**
					 * less than 0; TODO: have not optimized for that
					 */
					int highNmk = getKey(highValue);
					highNmk >>= 2;
					highNmk |= NEGATIVE_LONG_MASK;
					return highNmk;
				}
			}

			private int getKey(int value) {
				long unsignedFirstValue = (long) value;
				int nmk = (int) ((unsignedFirstValue - ((long) Integer.MIN_VALUE)) & 0xffffffffL);
				return nmk;
			}
		};
	}
}
