/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package edu.uci.ics.hivesterix.serde.lazy;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.RecordInfo;
import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
import edu.uci.ics.hivesterix.serde.lazy.objectinspector.LazyListObjectInspector;

/**
 * LazyArray is serialized as follows: start A b b b b b b end bytes[] ->
 * |--------|---|---|---|---| ... |---|---|
 * 
 * Section A is the null-bytes. Suppose the list has N elements, then there are
 * (N+7)/8 bytes used as null-bytes. Each bit corresponds to an element and it
 * indicates whether that element is null (0) or not null (1).
 * 
 * After A, all b(s) represent the elements of the list. Each of them is again a
 * LazyObject.
 * 
 */

public class LazyArray extends LazyNonPrimitive<LazyListObjectInspector> {

	/**
	 * Whether the data is already parsed or not.
	 */
	boolean parsed = false;
	/**
	 * The length of the array. Only valid when the data is parsed.
	 */
	int arraySize = 0;

	/**
	 * The start positions and lengths of array elements. Only valid when the
	 * data is parsed.
	 */
	int[] elementStart;
	int[] elementLength;

	/**
	 * Whether an element is initialized or not.
	 */
	boolean[] elementInited;

	/**
	 * Whether an element is null or not. Because length is 0 does not means the
	 * field is null. In particular, a 0-length string is not null.
	 */
	boolean[] elementIsNull;

	/**
	 * The elements of the array. Note that we call arrayElements[i].init(bytes,
	 * begin, length) only when that element is accessed.
	 */
	@SuppressWarnings("rawtypes")
	LazyObject[] arrayElements;

	/**
	 * Construct a LazyArray object with the ObjectInspector.
	 * 
	 * @param oi
	 *            the oi representing the type of this LazyArray
	 */
	protected LazyArray(LazyListObjectInspector oi) {
		super(oi);
	}

	/**
	 * Set the row data for this LazyArray.
	 * 
	 * @see LazyObject#init(ByteArrayRef, int, int)
	 */
	@Override
	public void init(byte[] bytes, int start, int length) {
		super.init(bytes, start, length);
		parsed = false;
	}

	/**
	 * Enlarge the size of arrays storing information for the elements inside
	 * the array.
	 */
	private void adjustArraySize(int newSize) {
		if (elementStart == null || elementStart.length < newSize) {
			elementStart = new int[newSize];
			elementLength = new int[newSize];
			elementInited = new boolean[newSize];
			elementIsNull = new boolean[newSize];
			arrayElements = new LazyObject[newSize];
		}
	}

	VInt vInt = new LazyUtils.VInt();
	RecordInfo recordInfo = new LazyUtils.RecordInfo();

	/**
	 * Parse the bytes and fill elementStart, elementLength, elementInited and
	 * elementIsNull.
	 */
	private void parse() {

		// get the vlong that represents the map size
		LazyUtils.readVInt(bytes, start, vInt);
		arraySize = vInt.value;
		if (0 == arraySize) {
			parsed = true;
			return;
		}

		// adjust arrays
		adjustArraySize(arraySize);
		// find out the null-bytes
		int arryByteStart = start + vInt.length;
		int nullByteCur = arryByteStart;
		int nullByteEnd = arryByteStart + (arraySize + 7) / 8;
		// the begin the real elements
		int lastElementByteEnd = nullByteEnd;
		// the list element object inspector
		ObjectInspector listEleObjectInspector = ((ListObjectInspector) oi)
				.getListElementObjectInspector();
		// parsing elements one by one
		for (int i = 0; i < arraySize; i++) {
			elementIsNull[i] = true;
			if ((bytes[nullByteCur] & (1 << (i % 8))) != 0) {
				elementIsNull[i] = false;
				LazyUtils.checkObjectByteInfo(listEleObjectInspector, bytes,
						lastElementByteEnd, recordInfo);
				elementStart[i] = lastElementByteEnd + recordInfo.elementOffset;
				elementLength[i] = recordInfo.elementSize;
				lastElementByteEnd = elementStart[i] + elementLength[i];
			}
			// move onto the next null byte
			if (7 == (i % 8)) {
				nullByteCur++;
			}
		}

		Arrays.fill(elementInited, 0, arraySize, false);
		parsed = true;
	}

	/**
	 * Returns the actual primitive object at the index position inside the
	 * array represented by this LazyObject.
	 */
	public Object getListElementObject(int index) {
		if (!parsed) {
			parse();
		}
		if (index < 0 || index >= arraySize) {
			return null;
		}
		return uncheckedGetElement(index);
	}

	/**
	 * Get the element without checking out-of-bound index.
	 * 
	 * @param index
	 *            index to the array element
	 */
	private Object uncheckedGetElement(int index) {

		if (elementIsNull[index]) {
			return null;
		} else {
			if (!elementInited[index]) {
				elementInited[index] = true;
				if (arrayElements[index] == null) {
					arrayElements[index] = LazyFactory.createLazyObject((oi)
							.getListElementObjectInspector());
				}
				arrayElements[index].init(bytes, elementStart[index],
						elementLength[index]);
			}
		}
		return arrayElements[index].getObject();
	}

	/**
	 * Returns the array size.
	 */
	public int getListLength() {
		if (!parsed) {
			parse();
		}
		return arraySize;
	}

	/**
	 * cachedList is reused every time getList is called. Different
	 * LazyBianryArray instances cannot share the same cachedList.
	 */
	ArrayList<Object> cachedList;

	/**
	 * Returns the List of actual primitive objects. Returns null for null
	 * array.
	 */
	public List<Object> getList() {
		if (!parsed) {
			parse();
		}
		if (cachedList == null) {
			cachedList = new ArrayList<Object>(arraySize);
		} else {
			cachedList.clear();
		}
		for (int index = 0; index < arraySize; index++) {
			cachedList.add(uncheckedGetElement(index));
		}
		return cachedList;
	}
}
