/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.sysds.runtime.matrix.data;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.io.Writable;
import org.apache.sysds.api.DMLException;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.functionobjects.ValueComparisonFunction;
import org.apache.sysds.runtime.instructions.cp.*;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
import org.apache.sysds.runtime.transform.encode.EncoderRecode;
import org.apache.sysds.runtime.util.IndexRange;
import org.apache.sysds.runtime.util.UtilFunctions;

@SuppressWarnings({"rawtypes","unchecked"}) //allow generic native arrays
public class FrameBlock implements CacheBlock, Externalizable  
{
	private static final long serialVersionUID = -3993450030207130665L;
	
	public static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements, size of default matrix block 

	//internal configuration
	private static final boolean REUSE_RECODE_MAPS = true;
	
	/** The number of rows of the FrameBlock */
	private int _numRows = -1;
	
	/** The schema of the data frame as an ordered list of value types */
	private ValueType[] _schema = null; 
	
	/** The column names of the data frame as an ordered list of strings, allocated on-demand */
	private String[] _colnames = null;
	
	private ColumnMetadata[] _colmeta = null;
	
	/** The data frame data as an ordered list of columns */
	private Array[] _coldata = null;
	
	public FrameBlock() {
		_numRows = 0;
	}
	
	/**
	 * Copy constructor for frame blocks, which uses a shallow copy for
	 * the schema (column types and names) but a deep copy for meta data 
	 * and actual column data.
	 * 
	 * @param that frame block
	 */
	public FrameBlock(FrameBlock that) {
		this(that.getSchema(), that.getColumnNames(false));
		copy(that);
		setColumnMetadata(that.getColumnMetadata());
	}
	
	public FrameBlock(int ncols, ValueType vt) {
		this();
		_schema = UtilFunctions.nCopies(ncols, vt);
		_colnames = null; //default not materialized
		_colmeta = new ColumnMetadata[ncols];
		for( int j=0; j<ncols; j++ )
			_colmeta[j] = new ColumnMetadata(0);
	}
	
	public FrameBlock(ValueType[] schema) {
		this(schema, new String[0][]);
	}
	
	public FrameBlock(ValueType[] schema, String[] names) {
		this(schema, names, new String[0][]);
	}
	
	public FrameBlock(ValueType[] schema, String[][] data) {
		//default column names not materialized
		this(schema, null, data);

	}
	
	public FrameBlock(ValueType[] schema, String[] names, String[][] data) {
		_numRows = 0; //maintained on append
		_schema = schema;
		_colnames = names;
		_colmeta = new ColumnMetadata[_schema.length];
		for( int j=0; j<_schema.length; j++ )
			_colmeta[j] = new ColumnMetadata(0);
		for( int i=0; i<data.length; i++ )
			appendRow(data[i]);
	}
	
	/**
	 * Get the number of rows of the frame block.
	 * 
	 * @return number of rows
	 */
	@Override
	public int getNumRows() {
		return _numRows;
	}

	public void setNumRows(int numRows) {
		_numRows = numRows;
	}
	
	/**
	 * Get the number of columns of the frame block, that is
	 * the number of columns defined in the schema.
	 * 
	 * @return number of columns
	 */
	@Override
	public int getNumColumns() {
		return (_schema != null) ? _schema.length : 0;
	}
	
	/**
	 * Returns the schema of the frame block.
	 * 
	 * @return schema as array of ValueTypes
	 */
	public ValueType[] getSchema() {
		return _schema;
	}
	
	/**
	 * Sets the schema of the frame block.
	 * 
	 * @param schema schema as array of ValueTypes
	 */
	public void setSchema(ValueType[] schema) {
		_schema = schema;
	}

	/**
	 * Returns the column names of the frame block. This method 
	 * allocates default column names if required.
	 * 
	 * @return column names
	 */
	public String[] getColumnNames() {
		return getColumnNames(true);
	}
		
	/**
	 * Returns the column names of the frame block. This method 
	 * allocates default column names if required.
	 * 
	 * @param alloc if true, create column names
	 * @return array of column names
	 */
	public String[] getColumnNames(boolean alloc) {
		if( _colnames == null && alloc )
			_colnames = createColNames(getNumColumns());
		return _colnames;
	}
	
	/**
	 * Returns the column name for the requested column. This 
	 * method allocates default column names if required.
	 * 
	 * @param c column index
	 * @return column name
	 */
	public String getColumnName(int c) {
		if( _colnames == null )
			_colnames = createColNames(getNumColumns());
		return _colnames[c];
	}

	public void setColumnNames(String[] colnames) {
		_colnames = colnames;
	}

	public ColumnMetadata[] getColumnMetadata() {
		return _colmeta;
	}

	public ColumnMetadata getColumnMetadata(int c) {
		return _colmeta[c];
	}

	public boolean isColumnMetadataDefault() {
		boolean ret = true;
		for( int j=0; j<getNumColumns() && ret; j++ )
			ret &= isColumnMetadataDefault(j);
		return ret;
	}

	public boolean isColumnMetadataDefault(int c) {
		return _colmeta[c].getMvValue() == null
			&& _colmeta[c].getNumDistinct() == 0;
	}

	public void setColumnMetadata(ColumnMetadata[] colmeta) {
		System.arraycopy(colmeta, 0, _colmeta, 0, _colmeta.length);
	}

	public void setColumnMetadata(int c, ColumnMetadata colmeta) {
		_colmeta[c] = colmeta;
	}
	
	/**
	 * Creates a mapping from column names to column IDs, i.e., 
	 * 1-based column indexes
	 * 
	 * @return map of column name keys and id values
	 */
	public Map<String,Integer> getColumnNameIDMap() {
		Map<String, Integer> ret = new HashMap<>();
		for( int j=0; j<getNumColumns(); j++ )
			ret.put(getColumnName(j), j+1);
		return ret;	
	}
	
	/**
	 * Allocate column data structures if necessary, i.e., if schema specified
	 * but not all column data structures created yet.
	 * 
	 * @param numRows number of rows
	 */
	public void ensureAllocatedColumns(int numRows) {
		//early abort if already allocated
		if( _coldata != null && _schema.length == _coldata.length ) {
			//handle special case that to few rows allocated
			if( _numRows < numRows ) {
				String[] tmp = new String[getNumColumns()];
				int len = numRows - _numRows;
				for(int i=0; i<len; i++)
					appendRow(tmp);
			}
			return;
		}
		//allocate column meta data if necessary
		if( _colmeta == null || _schema.length != _colmeta.length ) {
			_colmeta = new ColumnMetadata[_schema.length];
			for( int j=0; j<_schema.length; j++ )
				_colmeta[j] = new ColumnMetadata(0);
		}
		//allocate columns if necessary
		_coldata = new Array[_schema.length];
		for( int j=0; j<_schema.length; j++ ) {
			switch( _schema[j] ) {
				case STRING:  _coldata[j] = new StringArray(new String[numRows]); break;
				case BOOLEAN: _coldata[j] = new BooleanArray(new boolean[numRows]); break;
				case INT32:   _coldata[j] = new IntegerArray(new int[numRows]); break;
				case INT64:   _coldata[j] = new LongArray(new long[numRows]); break;
				case FP32:   _coldata[j] = new FloatArray(new float[numRows]); break;
				case FP64:   _coldata[j] = new DoubleArray(new double[numRows]); break;
				default: throw new RuntimeException("Unsupported value type: "+_schema[j]);
			}
		}
		_numRows = numRows;
	}
	
	/**
	 * Checks for matching column sizes in case of existing columns.
	 * 		
	 * @param newlen number of rows to compare with existing number of rows
	 */
	public void ensureColumnCompatibility(int newlen) {
		if( _coldata!=null && _coldata.length > 0 && _numRows != newlen )
			throw new RuntimeException("Mismatch in number of rows: "+newlen+" (expected: "+_numRows+")");
	}

	public static String[] createColNames(int size) {
		return createColNames(0, size);
	}

	public static String[] createColNames(int off, int size) {
		String[] ret = new String[size];
		for( int i=off+1; i<=off+size; i++ )
			ret[i-off-1] = createColName(i);
		return ret;
	}

	public static String createColName(int i) {
		return "C" + i;
	}

	public boolean isColNamesDefault() {
		boolean ret = (_colnames != null);
		for( int j=0; j<getNumColumns() && ret; j++ )
			ret &= isColNameDefault(j);
		return ret;	
	}

	public boolean isColNameDefault(int i) {
		return _colnames==null 
			|| _colnames[i].equals("C"+(i+1));
	}

	public void recomputeColumnCardinality() {
		for( int j=0; j<getNumColumns(); j++ ) {
			int card = 0;
			for( int i=0; i<getNumRows(); i++ )
				card += (get(i, j) != null) ? 1 : 0;
			_colmeta[j].setNumDistinct(card);
		}
	}

	///////
	// basic get and set functionality
	
	/**
	 * Gets a boxed object of the value in position (r,c).
	 * 
	 * @param r	row index, 0-based
	 * @param c	column index, 0-based
	 * @return object of the value at specified position
	 */
	public Object get(int r, int c) {
		return _coldata[c].get(r);
	}
	
	/**
	 * Sets the value in position (r,c), where the input is assumed
	 * to be a boxed object consistent with the schema definition.
	 * 
	 * @param r row index
	 * @param c column index
	 * @param val value to set at specified position
	 */
	public void set(int r, int c, Object val) {
		_coldata[c].set(r, UtilFunctions.objectToObject(_schema[c], val));
	}

	public void reset(int nrow, boolean clearMeta) {
		if( clearMeta ) {
			_schema = null;
			_colnames = null;
			if( _colmeta != null ) {
				for( int i=0; i<_colmeta.length; i++ )
					if( !isColumnMetadataDefault(i) )
						_colmeta[i] = new ColumnMetadata(0);
			}
		}
		if(_coldata != null) {
			for( int i=0; i < _coldata.length; i++ )
				_coldata[i].reset(nrow);
		}
	}

	public void reset() {
		reset(0, true);
	}
	

	/**
	 * Append a row to the end of the data frame, where all row fields
	 * are boxed objects according to the schema.
	 * 
	 * @param row array of objects
	 */
	public void appendRow(Object[] row) {
		ensureAllocatedColumns(0);
		for( int j=0; j<row.length; j++ )
			_coldata[j].append(row[j]);
		_numRows++;
	}
	
	/**
	 * Append a row to the end of the data frame, where all row fields
	 * are string encoded.
	 * 
	 * @param row array of strings
	 */
	public void appendRow(String[] row) {
		ensureAllocatedColumns(0);
		for( int j=0; j<row.length; j++ )
			_coldata[j].append(row[j]);
		_numRows++;
	}
	
	/**
	 * Append a column of value type STRING as the last column of 
	 * the data frame. The given array is wrapped but not copied 
	 * and hence might be updated in the future.
	 * 
	 * @param col array of strings
	 */
	public void appendColumn(String[] col) {
		ensureColumnCompatibility(col.length);
		String[] colnames = getColumnNames(); //before schema modification
		_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.STRING);
		_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
		_coldata = (_coldata==null) ? new Array[]{new StringArray(col)} :
			(Array[]) ArrayUtils.add(_coldata, new StringArray(col));
		_numRows = col.length;

	}
	
	/**
	 * Append a column of value type BOOLEAN as the last column of 
	 * the data frame. The given array is wrapped but not copied 
	 * and hence might be updated in the future.
	 * 
	 * @param col array of booleans
	 */
	public void appendColumn(boolean[] col) {
		ensureColumnCompatibility(col.length);
		String[] colnames = getColumnNames(); //before schema modification
		_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.BOOLEAN);
		_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
		_coldata = (_coldata==null) ? new Array[]{new BooleanArray(col)} :
			(Array[]) ArrayUtils.add(_coldata, new BooleanArray(col));	
		_numRows = col.length;
	}
	
	/**
	 * Append a column of value type INT as the last column of 
	 * the data frame. The given array is wrapped but not copied 
	 * and hence might be updated in the future.
	 * 
	 * @param col array of longs
	 */
	public void appendColumn(int[] col) {
		ensureColumnCompatibility(col.length);
		String[] colnames = getColumnNames(); //before schema modification
		_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.INT32);
		_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
		_coldata = (_coldata==null) ? new Array[]{new IntegerArray(col)} :
			(Array[]) ArrayUtils.add(_coldata, new IntegerArray(col));
		_numRows = col.length;
	}
	/**
	 * Append a column of value type LONG as the last column of
	 * the data frame. The given array is wrapped but not copied
	 * and hence might be updated in the future.
	 *
	 * @param col array of longs
	 */
	public void appendColumn(long[] col) {
		ensureColumnCompatibility(col.length);
		String[] colnames = getColumnNames(); //before schema modification
		_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.INT64);
		_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
		_coldata = (_coldata==null) ? new Array[]{new LongArray(col)} :
			(Array[]) ArrayUtils.add(_coldata, new LongArray(col));
		_numRows = col.length;
	}
	
	/**
	 * Append a column of value type float as the last column of
	 * the data frame. The given array is wrapped but not copied
	 * and hence might be updated in the future.
	 * 
	 * @param col array of doubles
	 */
	public void appendColumn(float[] col) {
		ensureColumnCompatibility(col.length);
		String[] colnames = getColumnNames(); //before schema modification
		_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.FP32);
		_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
		_coldata = (_coldata==null) ? new Array[]{new FloatArray(col)} :
				(Array[]) ArrayUtils.add(_coldata, new FloatArray(col));
		_numRows = col.length;
	}
	/**
	 * Append a column of value type DOUBLE as the last column of
	 * the data frame. The given array is wrapped but not copied
	 * and hence might be updated in the future.
	 *
	 * @param col array of doubles
	 */
	public void appendColumn(double[] col) {
		ensureColumnCompatibility(col.length);
		String[] colnames = getColumnNames(); //before schema modification
		_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.FP64);
		_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
		_coldata = (_coldata==null) ? new Array[]{new DoubleArray(col)} :
			(Array[]) ArrayUtils.add(_coldata, new DoubleArray(col));
		_numRows = col.length;
	}
	
	/**
	 * Append a set of column of value type DOUBLE at the end of the frame
	 * in order to avoid repeated allocation with appendColumns. The given 
	 * array is wrapped but not copied and hence might be updated in the future.
	 * 
	 * @param cols 2d array of doubles
	 */
	public void appendColumns(double[][] cols) {
		int ncol = cols.length;
		boolean empty = (_schema == null);
		ValueType[] tmpSchema = UtilFunctions.nCopies(ncol, ValueType.FP64);
		Array[] tmpData = new Array[ncol];
		for( int j=0; j<ncol; j++ )
			tmpData[j] = new DoubleArray(cols[j]);
		_colnames = empty ? null : (String[]) ArrayUtils.addAll(getColumnNames(), 
			createColNames(getNumColumns(), ncol)); //before schema modification
		_schema = empty ? tmpSchema : (ValueType[]) ArrayUtils.addAll(_schema, tmpSchema); 
		_coldata = empty ? tmpData : (Array[]) ArrayUtils.addAll(_coldata, tmpData);
		_numRows = cols[0].length;
	}

	public Object getColumnData(int c) {
		switch(_schema[c]) {
			case STRING:  return ((StringArray)_coldata[c])._data; 
			case BOOLEAN: return ((BooleanArray)_coldata[c])._data;
			case INT64:     return ((LongArray)_coldata[c])._data;
			case FP64:  return ((DoubleArray)_coldata[c])._data;
			default:      return null;
	 	}
	}
	
	public Array getColumn(int c) {
		return _coldata[c]; 
	}
	
	public void setColumn(int c, Array column) {
		if( _coldata == null )
			_coldata = new Array[getNumColumns()];
		_coldata[c] = column; 
	}
	
	/**
	 * Get a row iterator over the frame where all fields are encoded
	 * as strings independent of their value types.  
	 * 
	 * @return string array iterator
	 */
	public Iterator<String[]> getStringRowIterator() {
		return new StringRowIterator(0, _numRows);
	}
	
	/**
	 * Get a row iterator over the frame where all selected fields are 
	 * encoded as strings independent of their value types.  
	 * 
	 * @param cols column selection, 1-based
	 * @return string array iterator
	 */
	public Iterator<String[]> getStringRowIterator(int[] cols) {
		return new StringRowIterator(0, _numRows, cols);
	}
	
	/**
	 * Get a row iterator over the frame where all fields are encoded
	 * as strings independent of their value types.  
	 * 
	 * @param rl lower row index
	 * @param ru upper row index
	 * @return string array iterator
	 */
	public Iterator<String[]> getStringRowIterator(int rl, int ru) {
		return new StringRowIterator(rl, ru);
	}
	
	/**
	 * Get a row iterator over the frame where all selected fields are 
	 * encoded as strings independent of their value types.  
	 * 
	 * @param rl lower row index
	 * @param ru upper row index
	 * @param cols column selection, 1-based
	 * @return string array iterator
	 */
	public Iterator<String[]> getStringRowIterator(int rl, int ru, int[] cols) {
		return new StringRowIterator(rl, ru, cols);
	}
	
	/**
	 * Get a row iterator over the frame where all fields are encoded
	 * as boxed objects according to their value types.  
	 * 
	 * @return object array iterator
	 */
	public Iterator<Object[]> getObjectRowIterator() {
		return new ObjectRowIterator(0, _numRows);
	}
	
	/**
	 * Get a row iterator over the frame where all fields are encoded
	 * as boxed objects according to the value types of the provided
	 * target schema.
	 * 
	 * @param schema target schema of objects
	 * @return object array iterator
	 */
	public Iterator<Object[]> getObjectRowIterator(ValueType[] schema) {
		ObjectRowIterator iter = new ObjectRowIterator(0, _numRows);
		iter.setSchema(schema);
		return iter;
	}
	
	/**
	 * Get a row iterator over the frame where all selected fields are 
	 * encoded as boxed objects according to their value types.  
	 * 
	 * @param cols column selection, 1-based
	 * @return object array iterator
	 */
	public Iterator<Object[]> getObjectRowIterator(int[] cols) {
		return new ObjectRowIterator(0, _numRows, cols);
	}
	
	/**
	 * Get a row iterator over the frame where all fields are encoded
	 * as boxed objects according to their value types.  
	 * 
	 * @param rl lower row index
	 * @param ru upper row index
	 * @return object array iterator
	 */
	public Iterator<Object[]> getObjectRowIterator(int rl, int ru) {
		return new ObjectRowIterator(rl, ru);
	}
	
	/**
	 * Get a row iterator over the frame where all selected fields are 
	 * encoded as boxed objects according to their value types.  
	 * 
	 * @param rl lower row index
	 * @param ru upper row index
	 * @param cols column selection, 1-based
	 * @return object array iterator
	 */
	public Iterator<Object[]> getObjectRowIterator(int rl, int ru, int[] cols) {
		return new ObjectRowIterator(rl, ru, cols);
	}

	///////
	// serialization / deserialization (implementation of writable and externalizable)
	
	@Override
	public void write(DataOutput out) throws IOException {
		boolean isDefaultMeta = isColNamesDefault()
				&& isColumnMetadataDefault();
		//write header (rows, cols, default)
		out.writeInt(getNumRows());
		out.writeInt(getNumColumns());
		out.writeBoolean(isDefaultMeta);
		//write columns (value type, data)
		for( int j=0; j<getNumColumns(); j++ ) {
			out.writeByte(_schema[j].ordinal());
			if( !isDefaultMeta ) {
				out.writeUTF(getColumnName(j));
				out.writeLong(_colmeta[j].getNumDistinct());
				out.writeUTF( (_colmeta[j].getMvValue()!=null) ? 
						_colmeta[j].getMvValue() : "" );
			}
			_coldata[j].write(out);
		}
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		//read head (rows, cols)
		_numRows = in.readInt();
		int numCols = in.readInt();
		boolean isDefaultMeta = in.readBoolean();
		//allocate schema/meta data arrays
		_schema = (_schema!=null && _schema.length==numCols) ? 
				_schema : new ValueType[numCols];
		_colnames = (_colnames != null && _colnames.length==numCols) ? 
				_colnames : new String[numCols];
		_colmeta = (_colmeta != null && _colmeta.length==numCols) ? 
				_colmeta : new ColumnMetadata[numCols];
		_coldata = (_coldata!=null && _coldata.length==numCols) ? 
				_coldata : new Array[numCols];
		//read columns (value type, meta, data)
		for( int j=0; j<numCols; j++ ) {
			ValueType vt = ValueType.values()[in.readByte()];
			String name = isDefaultMeta ? createColName(j) : in.readUTF();
			long ndistinct = isDefaultMeta ? 0 : in.readLong();
			String mvvalue = isDefaultMeta ? null : in.readUTF();
			Array arr = null;
			switch( vt ) {
				case STRING:  arr = new StringArray(new String[_numRows]); break;
				case BOOLEAN: arr = new BooleanArray(new boolean[_numRows]); break;
				case INT64:     arr = new LongArray(new long[_numRows]); break;
				case FP64:  arr = new DoubleArray(new double[_numRows]); break;
				case INT32: arr = new IntegerArray(new int[_numRows]); break;
				case FP32:  arr = new FloatArray(new float[_numRows]); break;
				default: throw new IOException("Unsupported value type: "+vt);
			}
			arr.readFields(in);
			_schema[j] = vt;
			_colnames[j] = name;
			_colmeta[j] = new ColumnMetadata(ndistinct, 
					(mvvalue==null || mvvalue.isEmpty()) ? null : mvvalue);
			_coldata[j] = arr;
		}
	}

	@Override
	public void writeExternal(ObjectOutput out) throws IOException {
		//redirect serialization to writable impl
		write(out);
	}

	@Override
	public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
		//redirect deserialization to writable impl
		readFields(in);
	}
	
	////////
	// CacheBlock implementation
	
	@Override
	public long getInMemorySize() {
		//frame block header
		long size = 16 + 4; //object, num rows
		
		//schema array (overhead and int entries)
		int clen = getNumColumns();
		size += 8 + 32 + clen * 4;
		
		//colname array (overhead and string entries)
		size += 8 + ((_colnames!=null) ? 32 : 0);
		for( int j=0; j<clen && _colnames!=null; j++ )
			size += getInMemoryStringSize(getColumnName(j));
		
		//meta data array (overhead and entries)
		size += 8 + 32;
		for( int j=0; j<clen; j++ ) {
			size += 16 + 8 + 8 //object, long num distinct, ref mv 
				+ getInMemoryStringSize(_colmeta[j].getMvValue());
		}
		
		//data array (overhead and entries)
		size += 8 + 32 + clen * (16+4+8+32);
		for( int j=0; j<clen; j++ ) {
			switch( _schema[j] ) {
				case BOOLEAN: size += _numRows; break;
				case INT64:
				case FP64: size += 8*_numRows; break;
				case STRING: 
					StringArray arr = (StringArray)_coldata[j];
					for( int i=0; i<_numRows; i++ )
						size += getInMemoryStringSize(arr.get(i));
					break;
				default: //not applicable	
			}
		}
		
		return size;
	}
	
	@Override
	public long getExactSerializedSize() {
		//header: 2xint, boolean
		long size = 9;
		
		//column sizes
		boolean isDefaultMeta = isColNamesDefault()
				&& isColumnMetadataDefault();
		for( int j=0; j<getNumColumns(); j++ ) {
			size += 1; //column schema
			if( !isDefaultMeta ) {
				size += IOUtilFunctions.getUTFSize(getColumnName(j));
				size += 8;
				size += IOUtilFunctions.getUTFSize(_colmeta[j].getMvValue());
			}
			switch( _schema[j] ) {
				case BOOLEAN: size += _numRows; break;
				case INT64:
				case FP64: size += 8*_numRows; break;
				case STRING: 
					StringArray arr = (StringArray)_coldata[j];
					for( int i=0; i<_numRows; i++ )
						size += IOUtilFunctions.getUTFSize(arr.get(i));
					break;
				default: //not applicable	
			}
		}
		
		return size;
	}
	
	@Override
	public boolean isShallowSerialize() {
		return isShallowSerialize(false);
	}
	
	@Override
	public boolean isShallowSerialize(boolean inclConvert) {
		//shallow serialize if non-string schema because a frame block
		//is always dense but strings have large array overhead per cell
		boolean ret = true;
		for( int j=0; j<_schema.length && ret; j++ )
			ret &= (_schema[j] != ValueType.STRING);
		return ret;
	}
	
	@Override 
	public void toShallowSerializeBlock() {
		//do nothing (not applicable).
	}
	
	@Override
	public void compactEmptyBlock() {
		//do nothing
	}
	
	/**
	 * Returns the in-memory size in bytes of the given string value. 
	 * 
	 * @param value string value
	 * @return in-memory size of string value
	 */
	private static long getInMemoryStringSize(String value) {
		if( value == null )
			return 0;
		return 16 + 4 + 8 //object, hash, array ref
			+ 32 + value.length();     //char array 
	}
	
	/**
	 *  This method performs the value comparison on two frames
	 *  if the values in both frames are equal, not equal, less than, greater than, less than/greater than and equal to
	 *  the output frame will store boolean value for each each comparison
	 *
	 *  @param bop binary operator
	 *  @param that frame block of rhs of m * n dimensions
	 *  @param out output frame block
	 *  @return a boolean frameBlock
	 */
	public FrameBlock binaryOperations(BinaryOperator bop, FrameBlock that, FrameBlock out) {
		if(getNumColumns() != that.getNumColumns() && getNumRows() != that.getNumColumns())
			throw new DMLRuntimeException("Frame dimension mismatch "+getNumRows()+" * "+getNumColumns()+
				" != "+that.getNumRows()+" * "+that.getNumColumns());
		String[][] outputData = new String[getNumRows()][getNumColumns()];

		//compare output value, incl implicit type promotion if necessary
		if( !(bop.fn instanceof ValueComparisonFunction) )
			throw new DMLRuntimeException("Unsupported binary operation on frames (only comparisons supported)");
		ValueComparisonFunction vcomp = (ValueComparisonFunction) bop.fn;

		for (int i = 0; i < getNumColumns(); i++) {
			if (getSchema()[i] == ValueType.STRING || that.getSchema()[i] == ValueType.STRING) {
				for (int j = 0; j < getNumRows(); j++) {
					if(checkAndSetEmpty(this, that, outputData, j, i))
						continue;
					String v1 = UtilFunctions.objectToString(get(j, i));
					String v2 = UtilFunctions.objectToString(that.get(j, i));
					outputData[j][i] = String.valueOf(vcomp.compare(v1, v2));
				}
			}
			else if (getSchema()[i] == ValueType.FP64 || that.getSchema()[i] == ValueType.FP64 ||
					getSchema()[i] == ValueType.FP32 || that.getSchema()[i] == ValueType.FP32) {
				for (int j = 0; j < getNumRows(); j++) {
					if(checkAndSetEmpty(this, that, outputData, j, i))
						continue;
					ScalarObject so1 = new DoubleObject(Double.parseDouble(get(j, i).toString()));
					ScalarObject so2 = new DoubleObject(Double.parseDouble(that.get(j, i).toString()));
					outputData[j][i] = String.valueOf(vcomp.compare(so1.getDoubleValue(), so2.getDoubleValue()));
				}
			}
			else if (getSchema()[i] == ValueType.INT64 || that.getSchema()[i] == ValueType.INT64 ||
					getSchema()[i] == ValueType.INT32 || that.getSchema()[i] == ValueType.INT32) {
				for (int j = 0; j < this.getNumRows(); j++) {
					if(checkAndSetEmpty(this, that, outputData, j, i))
						continue;
					ScalarObject so1 = new IntObject(Integer.parseInt(get(j, i).toString()));
					ScalarObject so2 = new IntObject(Integer.parseInt(that.get(j, i).toString()));
					outputData[j][i]  = String.valueOf(vcomp.compare(so1.getLongValue(), so2.getLongValue()));
				}
			}
			else {
				for (int j = 0; j < getNumRows(); j++) {
					if(checkAndSetEmpty(this, that, outputData, j, i))
						continue;
					ScalarObject so1 = new BooleanObject( Boolean.parseBoolean(get(j, i).toString()));
					ScalarObject so2 = new BooleanObject( Boolean.parseBoolean(that.get(j, i).toString()));
					outputData[j][i] = String.valueOf(vcomp.compare(so1.getBooleanValue(), so2.getBooleanValue()));
				}
			}
		}

		return new FrameBlock(UtilFunctions.nCopies(this.getNumColumns(), ValueType.BOOLEAN), outputData);
	}
	
	private static boolean checkAndSetEmpty(FrameBlock fb1, FrameBlock fb2, String[][] out, int r, int c) {
		if(fb1.get(r, c) == null || fb2.get(r, c) == null) {
			out[r][c] = (fb1.get(r, c) == null && fb2.get(r, c) == null) ? "true" : "false";
			return true;
		}
		return false;
	}
	
	///////
	// indexing and append operations
	
	public FrameBlock leftIndexingOperations(FrameBlock rhsFrame, IndexRange ixrange, FrameBlock ret) {
		return leftIndexingOperations(rhsFrame, 
				(int)ixrange.rowStart, (int)ixrange.rowEnd, 
				(int)ixrange.colStart, (int)ixrange.colEnd, ret);
	}

	public FrameBlock leftIndexingOperations(FrameBlock rhsFrame, int rl, int ru, int cl, int cu, FrameBlock ret) {
		// check the validity of bounds
		if (   rl < 0 || rl >= getNumRows() || ru < rl || ru >= getNumRows()
			|| cl < 0 || cu >= getNumColumns() || cu < cl || cu >= getNumColumns() ) {
			throw new DMLRuntimeException("Invalid values for frame indexing: ["+(rl+1)+":"+(ru+1)+"," + (cl+1)+":"+(cu+1)+"] " +
							"must be within frame dimensions ["+getNumRows()+","+getNumColumns()+"].");
		}		

		if ( (ru-rl+1) < rhsFrame.getNumRows() || (cu-cl+1) < rhsFrame.getNumColumns()) {
			throw new DMLRuntimeException("Invalid values for frame indexing: " +
					"dimensions of the source frame ["+rhsFrame.getNumRows()+"x" + rhsFrame.getNumColumns() + "] " +
					"do not match the shape of the frame specified by indices [" +
					(rl+1) +":" + (ru+1) + ", " + (cl+1) + ":" + (cu+1) + "].");
		}
		
		
		//allocate output frame (incl deep copy schema)
		if( ret == null )
			ret = new FrameBlock();
		ret._numRows = _numRows;								
		ret._schema = _schema.clone();
		ret._colnames = (_colnames != null) ? _colnames.clone() : null;
		ret._colmeta = _colmeta.clone();
		ret._coldata = new Array[getNumColumns()];
		
		//copy data to output and partial overwrite w/ rhs
		for( int j=0; j<getNumColumns(); j++ ) {
			Array tmp = _coldata[j].clone();
			if( j>=cl && j<=cu ) {
				//fast-path for homogeneous column schemas
				if( _schema[j]==rhsFrame._schema[j-cl] )
					tmp.set(rl, ru, rhsFrame._coldata[j-cl]);
				//general-path for heterogeneous column schemas
				else {
					for( int i=rl; i<=ru; i++ )
						tmp.set(i, UtilFunctions.objectToObject(
							_schema[j], rhsFrame._coldata[j-cl].get(i-rl)));
				}
			}
			ret._coldata[j] = tmp;
		}
		
		return ret;
	}

	public FrameBlock slice(IndexRange ixrange, FrameBlock ret) {
		return slice(
				(int)ixrange.rowStart, (int)ixrange.rowEnd,
				(int)ixrange.colStart, (int)ixrange.colEnd, ret);
	}
	
	/**
	 * Right indexing operations to slice a subframe out of this frame block. 
	 * Note that the existing column value types are preserved.
	 * 
	 * @param rl row lower index, inclusive, 0-based
	 * @param ru row upper index, inclusive, 0-based
	 * @param cl column lower index, inclusive, 0-based
	 * @param cu column upper index, inclusive, 0-based
	 * @param retCache cache block
	 * @return frame block
	 */
	@Override
	public FrameBlock slice(int rl, int ru, int cl, int cu, CacheBlock retCache) {
		FrameBlock ret = (FrameBlock)retCache;
		// check the validity of bounds
		if (   rl < 0 || rl >= getNumRows() || ru < rl || ru >= getNumRows()
			|| cl < 0 || cu >= getNumColumns() || cu < cl || cu >= getNumColumns() ) {
			throw new DMLRuntimeException("Invalid values for frame indexing: ["+(rl+1)+":"+(ru+1)+"," + (cl+1)+":"+(cu+1)+"] " +
							"must be within frame dimensions ["+getNumRows()+","+getNumColumns()+"]");
		}
		
		//allocate output frame
		if( ret == null )
			ret = new FrameBlock();
		else
			ret.reset(ru-rl+1, true);
		
		//copy output schema and colnames
		int numCols = cu-cl+1;
		boolean isDefNames = isColNamesDefault();
		ret._schema = new ValueType[numCols];
		ret._colnames = !isDefNames ? new String[numCols] : null;
		ret._colmeta = new ColumnMetadata[numCols];
		
		for( int j=cl; j<=cu; j++ ) {
			ret._schema[j-cl] = _schema[j];
			ret._colmeta[j-cl] = _colmeta[j];
			if( !isDefNames )
				ret._colnames[j-cl] = getColumnName(j);
		}	
		ret._numRows = ru-rl+1;
		if(ret._coldata == null )
			ret._coldata = new Array[numCols];
		
		//fast-path: shallow copy column indexing 
		if( ret._numRows == _numRows ) {
			//this shallow copy does not only avoid an array copy, but
			//also allows for bi-directional reuses of recodemaps 
			for( int j=cl; j<=cu; j++ )
				ret._coldata[j-cl] = _coldata[j];
		}
		//copy output data
		else {
			for( int j=cl; j<=cu; j++ ) {
				if( ret._coldata[j-cl] == null )
					ret._coldata[j-cl] = _coldata[j].slice(rl,ru);
				else
					ret._coldata[j-cl].set(0, ru-rl, _coldata[j], rl);
			}
		}
		return ret;
	}
	
	
	public void slice(ArrayList<Pair<Long,FrameBlock>> outlist, IndexRange range, int rowCut)
	{
		FrameBlock top=null, bottom=null;
		Iterator<Pair<Long,FrameBlock>> p=outlist.iterator();
		
		if(range.rowStart<rowCut)
			top = p.next().getValue();
		
		if(range.rowEnd>=rowCut)
			bottom = p.next().getValue();
		
		if(getNumRows() > 0)
		{
			int r=(int) range.rowStart;
			
			for(; r<Math.min(rowCut, range.rowEnd+1); r++)
			{
				Object[] row = new Object[(int) (range.colEnd-range.colStart+1)];
				for(int c=(int) range.colStart; c<range.colEnd+1; c++)
					row[(int) (c-range.colStart)] = get(r,c);
				top.appendRow(row);
			}

			for(; r<=range.rowEnd; r++)
			{
				Object[] row = new Object[(int) (range.colEnd-range.colStart+1)];
				for(int c=(int) range.colStart; c<range.colEnd+1; c++)
					row[(int) (c-range.colStart)] = get(r,c);
				bottom.appendRow(row);
			}
		}
	}

	/**
	 * Appends the given argument frameblock 'that' to this frameblock by 
	 * creating a deep copy to prevent side effects. For cbind, the frames
	 * are appended column-wise (same number of rows), while for rbind the 
	 * frames are appended row-wise (same number of columns).   
	 * 
	 * @param that frame block to append to current frame block
	 * @param ret frame block to return, can be null
	 * @param cbind if true, column append
	 * @return frame block
	 */
	public FrameBlock append( FrameBlock that, FrameBlock ret, boolean cbind ) {
		if( cbind ) //COLUMN APPEND
		{
			//sanity check row dimension mismatch
			if( getNumRows() != that.getNumRows() ) {
				throw new DMLRuntimeException("Incompatible number of rows for cbind: "+
						that.getNumRows()+" (expected: "+getNumRows()+")");
			}
			
			//allocate output frame
			if( ret == null )
				ret = new FrameBlock();
			ret._numRows = _numRows;
			
			//concatenate schemas (w/ deep copy to prevent side effects)
			ret._schema = (ValueType[]) ArrayUtils.addAll(_schema, that._schema);
			ret._colnames = (String[]) ArrayUtils.addAll(getColumnNames(), that.getColumnNames());
			ret._colmeta = (ColumnMetadata[]) ArrayUtils.addAll(_colmeta, that._colmeta);
			
			//check and enforce unique columns names
			if( !Arrays.stream(ret._colnames).allMatch(new HashSet<>()::add) )
				ret._colnames = createColNames(ret.getNumColumns());
			
			//concatenate column data (w/ shallow copy which is safe due to copy on write semantics)
			ret._coldata = (Array[]) ArrayUtils.addAll(_coldata, that._coldata);
		}
		else //ROW APPEND
		{
			//sanity check column dimension mismatch
			if( getNumColumns() != that.getNumColumns() ) {
				throw new DMLRuntimeException("Incompatible number of columns for rbind: "+
						that.getNumColumns()+" (expected: "+getNumColumns()+")");
			}
			
			//allocate output frame (incl deep copy schema)
			if( ret == null )
				ret = new FrameBlock();
			ret._numRows = _numRows;
			ret._schema = _schema.clone();
			ret._colnames = (_colnames!=null) ? _colnames.clone() : null;
			ret._colmeta = new ColumnMetadata[getNumColumns()];
			for( int j=0; j<_schema.length; j++ )
				ret._colmeta[j] = new ColumnMetadata(0);
			
			//concatenate data (deep copy first, append second)
			ret._coldata = new Array[getNumColumns()];
			for( int j=0; j<getNumColumns(); j++ )
				ret._coldata[j] = _coldata[j].clone();
			Iterator<Object[]> iter = that.getObjectRowIterator(_schema);
			while( iter.hasNext() )
				ret.appendRow(iter.next());
		}
		return ret;
	}

	public void copy(FrameBlock src) {
		copy(0, src.getNumRows()-1, 0, src.getNumColumns()-1, src);
	}

	public void copy(int rl, int ru, int cl, int cu, FrameBlock src) 
	{
		//allocate columns if necessary
		ensureAllocatedColumns(ru-rl+1);
		
		//copy values
		for( int j=cl; j<=cu; j++ ) {
			//special case: column memcopy 
			if( _schema[j].equals(src._schema[j-cl]) )
				_coldata[j].set(rl, ru, src._coldata[j-cl]);
			//general case w/ schema transformation
			else 
				for( int i=rl; i<=ru; i++ ) {
					String tmp = src.get(i-rl, j-cl)!=null ? src.get(i-rl, j-cl).toString() : null;
					set(i, j, UtilFunctions.stringToObject(_schema[j], tmp));
				}
		}
	}
	
	
	///////
	// transform specific functionality
	
	/**
	 * This function will split every Recode map in the column using delimiter Lop.DATATYPE_PREFIX, 
	 * as Recode map generated earlier in the form of Code+Lop.DATATYPE_PREFIX+Token and store it in a map 
	 * which contains token and code for every unique tokens.
	 *
	 * @param col	is the column # from frame data which contains Recode map generated earlier.
	 * @return map of token and code for every element in the input column of a frame containing Recode map
	 */
	public HashMap<String,Long> getRecodeMap(int col) {
		//probe cache for existing map
		if( REUSE_RECODE_MAPS ) {
			SoftReference<HashMap<String,Long>> tmp = _coldata[col]._rcdMapCache;
			HashMap<String,Long> map = (tmp!=null) ? tmp.get() : null;
			if( map != null ) return map;
		}
		
		//construct recode map
		HashMap<String,Long> map = new HashMap<>();
		Array ldata = _coldata[col]; 
		for( int i=0; i<getNumRows(); i++ ) {
			Object val = ldata.get(i);
			if( val != null ) {
				String[] tmp = EncoderRecode.splitRecodeMapEntry(val.toString());
				map.put(tmp[0], Long.parseLong(tmp[1]));
			}
		}
		
		//put created map into cache
		if( REUSE_RECODE_MAPS )
			_coldata[col]._rcdMapCache = new SoftReference<>(map);
		
		return map;
	}

	@Override
	public void merge(CacheBlock that, boolean bDummy) {
		merge((FrameBlock)that);
	}

	public void merge(FrameBlock that) {
		//check for empty input source (nothing to merge)
		if( that == null || that.getNumRows() == 0 )
			return;
		
		//check dimensions (before potentially copy to prevent implicit dimension change) 
		if ( getNumRows() != that.getNumRows() || getNumColumns() != that.getNumColumns() )
			throw new DMLRuntimeException("Dimension mismatch on merge disjoint (target="+getNumRows()+"x"+getNumColumns()+", source="+that.getNumRows()+"x"+that.getNumColumns()+")");
		
		//meta data copy if necessary
		for( int j=0; j<getNumColumns(); j++ )
			if( !that.isColumnMetadataDefault(j) ) {
				_colmeta[j].setNumDistinct(that._colmeta[j].getNumDistinct());
				_colmeta[j].setMvValue(that._colmeta[j].getMvValue());
			}
		
		//core frame block merge through cell copy
		//with column-wide access pattern
		for( int j=0; j<getNumColumns(); j++ ) {
			//special case: copy non-zeros of column 
			if( _schema[j].equals(that._schema[j]) )
				_coldata[j].setNz(0, _numRows-1, that._coldata[j]);
			//general case w/ schema transformation
			else {
				for( int i=0; i<_numRows; i++ ) {
					Object obj = UtilFunctions.objectToObject(
							_schema[j], that.get(i,j), true);
					if (obj != null) //merge non-zeros
						set(i, j,obj);
				}
			}
		}
	}
	
	/**
	 * This function ZERO OUT the data in the slicing window applicable for this block.
	 * 
	 * @param result frame block
	 * @param range index range
	 * @param complementary ?
	 * @param iRowStartSrc ?
	 * @param iRowStartDest ?
	 * @param blen ?
	 * @param iMaxRowsToCopy ?
	 * @return frame block
	 */
	public FrameBlock zeroOutOperations(FrameBlock result, IndexRange range, boolean complementary, int iRowStartSrc, int iRowStartDest, int blen, int iMaxRowsToCopy) {
		int clen = getNumColumns();
		
		if(result==null)
			result=new FrameBlock(getSchema());
		else 
		{
			result.reset(0, true);
			result.setSchema(getSchema());
		}
		result.ensureAllocatedColumns(blen);
		
		if(complementary)
		{
			for(int r=(int) range.rowStart; r<=range.rowEnd&&r+iRowStartDest<blen; r++)
			{
				for(int c=(int) range.colStart; c<=range.colEnd; c++)
					result.set(r+iRowStartDest, c, get(r+iRowStartSrc,c));
			}
		}else
		{
			int r=iRowStartDest;
			for(; r<(int)range.rowStart && r-iRowStartDest<iMaxRowsToCopy ; r++)
				for(int c=0; c<clen; c++/*, offset++*/)
					result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));
			
			for(; r<=(int)range.rowEnd && r-iRowStartDest<iMaxRowsToCopy ; r++)
			{
				for(int c=0; c<(int)range.colStart; c++)
					result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));

				for(int c=(int)range.colEnd+1; c<clen; c++)
					result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));
			}
			
			for(; r-iRowStartDest<iMaxRowsToCopy ; r++)
				for(int c=0; c<clen; c++)
					result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));
		}
		
		return result;
	}

	public FrameBlock getSchemaTypeOf() {
		FrameBlock fb = new FrameBlock(
			UtilFunctions.nCopies(getNumColumns(), ValueType.STRING));
		fb.appendRow(Arrays.stream(_schema)
			.map(vt -> vt.toString()).toArray(String[]::new));
		return fb;
	}
	
	///////
	// row iterators (over strings and boxed objects)

	private abstract class RowIterator<T> implements Iterator<T[]> {
		protected final int[] _cols;
		protected final T[] _curRow;
		protected final int _maxPos;
		protected int _curPos = -1;
		
		protected RowIterator(int rl, int ru) {
			this(rl, ru, UtilFunctions.getSeqArray(1, getNumColumns(), 1));
		}
		
		protected RowIterator(int rl, int ru, int[] cols) {
			_curRow = createRow(cols.length);
			_cols = cols;
			_maxPos = ru;
			_curPos = rl;
		}
		
		@Override
		public boolean hasNext() {
			return (_curPos < _maxPos);
		}

		@Override
		public void remove() {
			throw new RuntimeException("RowIterator.remove is unsupported!");			
		}
		
		protected abstract T[] createRow(int size);
	}

	private class StringRowIterator extends RowIterator<String> {
		public StringRowIterator(int rl, int ru) {
			super(rl, ru);
		}
		
		public StringRowIterator(int rl, int ru, int[] cols) {
			super(rl, ru, cols);
		}
		
		@Override
		protected String[] createRow(int size) {
			return new String[size];
		}
		
		@Override
		public String[] next( ) {
			for( int j=0; j<_cols.length; j++ ) {
				Object tmp = get(_curPos, _cols[j]-1);
				_curRow[j] = (tmp!=null) ? tmp.toString() : null;
			}
			_curPos++;
			return _curRow;
		}
	}

	private class ObjectRowIterator extends RowIterator<Object> {
		private ValueType[] _tgtSchema = null;
		
		public ObjectRowIterator(int rl, int ru) {
			super(rl, ru);
		}
		
		public ObjectRowIterator(int rl, int ru, int[] cols) {
			super(rl, ru, cols);
		}
		
		public void setSchema(ValueType[] schema) {
			_tgtSchema = schema;
		}
		
		@Override
		protected Object[] createRow(int size) {
			return new Object[size];
		}
		
		@Override
		public Object[] next( ) {
			for( int j=0; j<_cols.length; j++ )
				_curRow[j] = getValue(_curPos, _cols[j]-1);
			_curPos++;
			return _curRow;
		}
		
		private Object getValue(int i, int j) {
			Object val = get(i, j);
			if( _tgtSchema != null )
				val = UtilFunctions.objectToObject(_tgtSchema[j], val);
			return val;
		}
	}
	
	///////
	// generic, resizable native arrays 
	
	/**
	 * Base class for generic, resizable array of various value types. We 
	 * use this custom class hierarchy instead of Trove or other libraries 
	 * in order to avoid unnecessary dependencies.
	 */
	private abstract static class Array<T> implements Writable {
		protected SoftReference<HashMap<String,Long>> _rcdMapCache = null;
		
		protected int _size = 0;
		protected int newSize() {
			return Math.max(_size*2, 4); 
		}
		public abstract T get(int index);
		public abstract void set(int index, T value);
		public abstract void set(int rl, int ru, Array value);
		public abstract void set(int rl, int ru, Array value, int rlSrc);
		public abstract void setNz(int rl, int ru, Array value);
		public abstract void append(String value);
		public abstract void append(T value);
		@Override
		public abstract Array clone();
		public abstract Array slice(int rl, int ru);
		public abstract void reset(int size); 
	}

	private static class StringArray extends Array<String> {
		private String[] _data = null;
		
		public StringArray(String[] data) {
			_data = data;
			_size = _data.length;
		}
		@Override
		public String get(int index) {
			return _data[index];
		}
		@Override
		public void set(int index, String value) {
			_data[index] = value;
		}
		@Override
		public void set(int rl, int ru, Array value) {
			set(rl, ru, value, 0);
		}
		@Override
		public void set(int rl, int ru, Array value, int rlSrc) {
			System.arraycopy(((StringArray)value)._data, rlSrc, _data, rl, ru-rl+1);
		}
		@Override
		public void setNz(int rl, int ru, Array value) {
			String[] data2 = ((StringArray)value)._data;
			for( int i=rl; i<ru+1; i++ )
				if( data2[i]!=null )
					_data[i] = data2[i];
		}
		@Override
		public void append(String value) {
			if( _data.length <= _size )
				_data = Arrays.copyOf(_data, newSize());
			_data[_size++] = value;
		}
		@Override
		public void write(DataOutput out) throws IOException {
			for( int i=0; i<_size; i++ )
				out.writeUTF((_data[i]!=null)?_data[i]:"");
		}
		@Override
		public void readFields(DataInput in) throws IOException {
			_size = _data.length;
			for( int i=0; i<_size; i++ ) {
				String tmp = in.readUTF();
				_data[i] = (!tmp.isEmpty()) ? tmp : null;
			}
		}
		@Override
		public Array clone() {
			return new StringArray(Arrays.copyOf(_data, _size));
		}
		@Override
		public Array slice(int rl, int ru) {
			return new StringArray(Arrays.copyOfRange(_data,rl,ru+1));
		}
		@Override
		public void reset(int size) {
			if( _data.length < size )
				_data = new String[size];
			_size = size;
		}
	}

	private static class BooleanArray extends Array<Boolean> {
		private boolean[] _data = null;
		
		public BooleanArray(boolean[] data) {
			_data = data;
			_size = _data.length;
		}
		@Override
		public Boolean get(int index) {
			return _data[index];
		}
		@Override
		public void set(int index, Boolean value) {
			_data[index] = (value!=null) ? value : false;
		}
		@Override
		public void set(int rl, int ru, Array value) {
			set(rl, ru, value, 0);
		}
		@Override
		public void set(int rl, int ru, Array value, int rlSrc) {
			System.arraycopy(((BooleanArray)value)._data, rlSrc, _data, rl, ru-rl+1);
		}
		@Override
		public void setNz(int rl, int ru, Array value) {
			boolean[] data2 = ((BooleanArray)value)._data;
			for( int i=rl; i<ru+1; i++ )
				if( data2[i] )
					_data[i] = data2[i];
		}
		@Override
		public void append(String value) {
			append(Boolean.parseBoolean(value));
		}
		@Override
		public void append(Boolean value) {
			if( _data.length <= _size )
				_data = Arrays.copyOf(_data, newSize());
			_data[_size++] = (value!=null) ? value : false;
		}
		@Override
		public void write(DataOutput out) throws IOException {
			for( int i=0; i<_size; i++ )
				out.writeBoolean(_data[i]);
		}
		@Override
		public void readFields(DataInput in) throws IOException {
			_size = _data.length;
			for( int i=0; i<_size; i++ )
				_data[i] = in.readBoolean();
		}
		@Override
		public Array clone() {
			return new BooleanArray(Arrays.copyOf(_data, _size));
		}
		@Override
		public Array slice(int rl, int ru) {
			return new BooleanArray(Arrays.copyOfRange(_data,rl,ru+1));
		}
		@Override
		public void reset(int size) {
			if( _data.length < size )
				_data = new boolean[size];
			_size = size;
		}
	}

	private static class LongArray extends Array<Long> {
		private long[] _data = null;
		
		public LongArray(long[] data) {
			_data = data;
			_size = _data.length;
		}
		@Override
		public Long get(int index) {
			return _data[index];
		}
		@Override
		public void set(int index, Long value) {
			_data[index] = (value!=null) ? value : 0L;
		}
		@Override
		public void set(int rl, int ru, Array value) {
			set(rl, ru, value, 0);
		}
		@Override
		public void set(int rl, int ru, Array value, int rlSrc) {
			System.arraycopy(((LongArray)value)._data, rlSrc, _data, rl, ru-rl+1);
		}
		@Override
		public void setNz(int rl, int ru, Array value) {
			long[] data2 = ((LongArray)value)._data;
			for( int i=rl; i<ru+1; i++ )
				if( data2[i]!=0 )
					_data[i] = data2[i];
		}
		@Override
		public void append(String value) {
			append((value!=null)?Long.parseLong(value.trim()):null);
		}
		@Override
		public void append(Long value) {
			if( _data.length <= _size )
				_data = Arrays.copyOf(_data, newSize());
			_data[_size++] = (value!=null) ? value : 0L;
		}
		@Override
		public void write(DataOutput out) throws IOException {
			for( int i=0; i<_size; i++ )
				out.writeLong(_data[i]);
		}
		@Override
		public void readFields(DataInput in) throws IOException {
			_size = _data.length;
			for( int i=0; i<_size; i++ )
				_data[i] = in.readLong();
		}
		@Override
		public Array clone() {
			return new LongArray(Arrays.copyOf(_data, _size));
		}
		@Override
		public Array slice(int rl, int ru) {
			return new LongArray(Arrays.copyOfRange(_data,rl,ru+1));
		}
		@Override
		public void reset(int size) {
			if( _data.length < size )
				_data = new long[size];
			_size = size;
		}
	}

	private static class IntegerArray extends Array<Integer> {
		private int[] _data = null;

		public IntegerArray(int[] data) {
			_data = data;
			_size = _data.length;
		}

		@Override
		public Integer get(int index) {
			return _data[index];
		}

		@Override
		public void set(int index, Integer value) { _data[index] = (value!=null) ? value : 0;}
		@Override
		public void set(int rl, int ru, Array value) {
			set(rl, ru, value, 0);
		}
		@Override
		public void set(int rl, int ru, Array value, int rlSrc) {
			System.arraycopy(((IntegerArray)value)._data, rlSrc, _data, rl, ru-rl+1);
		}
		@Override
		public void setNz(int rl, int ru, Array value) {
			int[] data2 = ((IntegerArray)value)._data;
			for( int i=rl; i<ru+1; i++ )
				if( data2[i]!=0 )
					_data[i] = data2[i];
		}
		@Override
		public void append(String value) {
			append((value!=null)?Integer.parseInt(value.trim()):null);
		}
		@Override
		public void append(Integer value) {
			if( _data.length <= _size )
				_data = Arrays.copyOf(_data, newSize());
			_data[_size++] = (value!=null) ? value : 0;
		}
		@Override
		public void write(DataOutput out) throws IOException {
			for( int i=0; i<_size; i++ )
				out.writeLong(_data[i]);
		}
		@Override
		public void readFields(DataInput in) throws IOException {
			_size = _data.length;
			for( int i=0; i<_size; i++ )
				_data[i] = in.readInt();
		}
		@Override
		public Array clone() {
			return new IntegerArray(Arrays.copyOf(_data, _size));
		}
		@Override
		public Array slice(int rl, int ru) {
			return new IntegerArray(Arrays.copyOfRange(_data,rl,ru+1));
		}
		@Override
		public void reset(int size) {
			if( _data.length < size )
				_data = new int[size];
			_size = size;
		}
	}

	private static class FloatArray extends Array<Float> {
		private float[] _data = null;

		public FloatArray(float[] data) {
			_data = data;
			_size = _data.length;
		}
		@Override
		public Float get(int index) {
			return _data[index];
		}

		@Override
		public void set(int index, Float value) { _data[index] = (value!=null) ? value : 0f; }

		@Override
		public void set(int rl, int ru, Array value) {
			set(rl,ru, value, 0);
		}
		@Override
		public void set(int rl, int ru, Array value, int rlSrc) {
			System.arraycopy(((FloatArray)value)._data, rlSrc, _data, rl, ru-rl+1);
		}
		@Override
		public void setNz(int rl, int ru, Array value) {
			float[] data2 = ((FloatArray)value)._data;
			for( int i=rl; i<ru+1; i++ )
				if( data2[i]!=0 )
					_data[i] = data2[i];
		}
		@Override
		public void append(String value) {	append((value!=null)? Float.parseFloat(value):null); }

		@Override
		public void append(Float value) {
			if( _data.length <= _size )
				_data = Arrays.copyOf(_data, newSize());
			_data[_size++] = (value!=null) ? value : 0f;
		}
		@Override
		public void write(DataOutput out) throws IOException {
			for( int i=0; i<_size; i++ )
				out.writeFloat(_data[i]);
		}
		@Override
		public void readFields(DataInput in) throws IOException {
			_size = _data.length;
			for( int i=0; i<_size; i++ )
				_data[i] = in.readFloat();
		}
		@Override
		public Array clone() {
			return new FloatArray(Arrays.copyOf(_data, _size));
		}
		@Override
		public Array slice(int rl, int ru) {
			return new FloatArray(Arrays.copyOfRange(_data,rl,ru+1));
		}
		@Override
		public void reset(int size) {
			if( _data.length < size )
				_data = new float[size];
			_size = size;
		}
	}

	private static class DoubleArray extends Array<Double> {
		private double[] _data = null;
		
		public DoubleArray(double[] data) {
			_data = data;
			_size = _data.length;
		}
		@Override
		public Double get(int index) {
			return _data[index];
		}
		@Override
		public void set(int index, Double value) {
			_data[index] = (value!=null) ? value : 0d;
		}
		@Override
		public void set(int rl, int ru, Array value) {
			set(rl,ru, value, 0);
		}
		@Override
		public void set(int rl, int ru, Array value, int rlSrc) {
			System.arraycopy(((DoubleArray)value)._data, rlSrc, _data, rl, ru-rl+1);
		}
		@Override
		public void setNz(int rl, int ru, Array value) {
			double[] data2 = ((DoubleArray)value)._data;
			for( int i=rl; i<ru+1; i++ )
				if( data2[i]!=0 )
					_data[i] = data2[i];
		}
		@Override
		public void append(String value) {
			append((value!=null)?Double.parseDouble(value):null);
		}
		@Override
		public void append(Double value) {
			if( _data.length <= _size )
				_data = Arrays.copyOf(_data, newSize());
			_data[_size++] = (value!=null) ? value : 0d;
		}
		@Override
		public void write(DataOutput out) throws IOException {
			for( int i=0; i<_size; i++ )
				out.writeDouble(_data[i]);
		}
		@Override
		public void readFields(DataInput in) throws IOException {
			_size = _data.length;
			for( int i=0; i<_size; i++ )
				_data[i] = in.readDouble();
		}
		@Override
		public Array clone() {
			return new DoubleArray(Arrays.copyOf(_data, _size));
		}
		@Override
		public Array slice(int rl, int ru) {
			return new DoubleArray(Arrays.copyOfRange(_data,rl,ru+1));
		}
		@Override
		public void reset(int size) {
			if( _data.length < size )
				_data = new double[size];
			_size = size;
		}
	}

	public static class ColumnMetadata implements Serializable {
		private static final long serialVersionUID = -90094082422100311L;
		
		private long _ndistinct = 0;
		private String _mvValue = null;
		
		public ColumnMetadata(long ndistinct) {
			_ndistinct = ndistinct;
		}
		public ColumnMetadata(long ndistinct, String mvval) {
			_ndistinct = ndistinct;
			_mvValue = mvval;
		}
		public ColumnMetadata(ColumnMetadata that) {
			_ndistinct = that._ndistinct;
			_mvValue = that._mvValue;
		}
		
		public long getNumDistinct() {
			return _ndistinct;
		}		
		public void setNumDistinct(long ndistinct) {
			_ndistinct = ndistinct;
		}
		public String getMvValue() {
			return _mvValue;
		}
		public void setMvValue(String mvVal) {
			_mvValue = mvVal;
		}
	}

	private static ValueType isType(String val) {
		val = val.trim().toLowerCase().replaceAll("\"",  "");
		if (val.matches("(true|false|t|f|0|1)"))
			return ValueType.BOOLEAN;
		else if (val.matches("[-+]?\\d+"))
			return ValueType.INT64;
		else if (val.matches("[-+]?[0-9]+\\.?[0-9]*([e]?[-+]?[0-9]+)") || val.equals("infinity") || val.equals("-infinity") || val.equals("nan"))
			return ValueType.FP64;
		else return ValueType.STRING;
	}

	public FrameBlock detectSchemaFromRow(double sampleFraction) {
		int rows = this.getNumRows();
		int cols = this.getNumColumns();
		String[] schemaInfo = new String[cols];
		int sample = (int)Math.min(Math.max(sampleFraction*rows, 1024), rows);
		for (int i = 0; i < cols; i++) {
			ValueType state = ValueType.UNKNOWN;
			Array obj = this.getColumn(i);
			for (int j = 0; j < sample; j++)
			{
				String dataValue = null;
				//read a not null sample value
				while (dataValue == null) {
					int randomIndex = ThreadLocalRandom.current().nextInt(0, rows - 1);
					dataValue = ((obj.get(randomIndex) != null)?obj.get(randomIndex).toString().trim().replace("\"", "").toLowerCase():null);
				}

				if (isType(dataValue) == ValueType.STRING) {
					state = ValueType.STRING;
					break;
				}
				else if (isType(dataValue) == ValueType.FP64) {
					if (dataValue.equals("infinity") || dataValue.equals("-infinity") || dataValue.equals("nan")) {
						state = ValueType.FP64;
					}
					else {
						double maxValue = Double.parseDouble(dataValue);
						if ((maxValue >= (-Float.MAX_VALUE)) && (maxValue <= Float.MAX_VALUE))
							state = (state == ValueType.FP64 ? state : ValueType.FP32);
						else
							state = ValueType.FP64;
					}
				}
				else if (isType(dataValue) == ValueType.INT64) {
					long maxValue = Long.parseLong(dataValue);
					if ((maxValue >= Integer.MIN_VALUE) && (maxValue <= Integer.MAX_VALUE))
						state = ((state == ValueType.FP64 || state == ValueType.FP32 || state == ValueType.INT64) ? state : ValueType.INT32);
					else
						state = ((state == ValueType.FP64  || state == ValueType.FP32) ? state : ValueType.INT64);
				}
				else if (isType(dataValue) == ValueType.BOOLEAN)
					state = ((new ArrayList<>(Arrays.asList(ValueType.FP64, ValueType.FP32, ValueType.INT64, ValueType.INT32)).contains(state)) ? state : ValueType.BOOLEAN);
				else if (isType(dataValue) == ValueType.STRING)
					state = ((new ArrayList<>(Arrays.asList(ValueType.FP64, ValueType.FP32, ValueType.INT64, ValueType.INT32, ValueType.BOOLEAN)).contains(state)) ? state : ValueType.STRING);
			}
			schemaInfo[i] = state.name();
		}

		//create output block one row representing the schema as strings
		FrameBlock fb = new FrameBlock(UtilFunctions.nCopies(cols, ValueType.STRING));
		fb.appendRow(schemaInfo);
		return fb;
	}

	/**
	 * Drop the cell value which does not confirms to the data type of its column
	 * @param schema of the frame
	 * @return original frame where invalid values are replaced with null
	 */
	public FrameBlock dropInvalid(FrameBlock schema) {
		//sanity checks
		if(this.getNumColumns() != schema.getNumColumns())
			throw new DMLException("mismatch in number of columns in frame and its schema ");

		String[] schemaString = schema.getStringRowIterator().next(); // extract the schema in String array
		for (int i = 0; i < this.getNumColumns(); i++) {
			Array obj = this.getColumn(i);
			for (int j = 0; j < this.getNumRows(); j++)
			{
				if(obj.get(j) == null)
					continue;
				String dataValue = obj.get(j).toString().trim().replace("\"", "").toLowerCase() ;

				ValueType dataType = isType(dataValue);
				 if (dataType== ValueType.FP64 && schemaString[i].trim().equals("FP32")) {
					 double maxValue = Double.parseDouble(dataValue);
					 if ((maxValue < (-Float.MAX_VALUE)) || (maxValue > Float.MAX_VALUE))
						 this.set(j,i,null);
				}
				else if (dataType== ValueType.INT64 && schemaString[i].trim().equals("INT32")) {
					 long maxValue = Long.parseLong(dataValue);
					 if ((maxValue < Integer.MIN_VALUE) || (maxValue > Integer.MAX_VALUE))
						 this.set(j,i,null);
				}
				else if(dataType == ValueType.BOOLEAN && schemaString[i].trim().equals("INT32")
						 && ((Integer.parseInt(dataValue) == 1 || Integer.parseInt(dataValue) == 0)))
					continue;
				else if (!dataType.toString().equals(schemaString[i].trim()))
					this.set(j,i,null);
			}
		}
		return this;
	}

	/**
	 *  This method validates the frame data against an attribute length constrain
	 *  if data value in any cell is greater than the specified threshold of that attribute
	 *  the output frame will store a null on that cell position, thus removing the length-violating values.
	 * 
	 *  @param feaLen vector of valid lengths
	 *  @return FrameBlock with invalid values converted into missing values (null)
	 */
	public FrameBlock invalidByLength(MatrixBlock feaLen) {
		//sanity checks
		if(this.getNumColumns() != feaLen.getNumColumns())
			throw new DMLException("mismatch in number of columns in frame and corresponding feature-length vector");

		FrameBlock outBlock = new FrameBlock(this);
		for (int i = 0; i < this.getNumColumns(); i++) {
			if(feaLen.quickGetValue(0, i) == -1)
				continue;
			int validLength = (int)feaLen.quickGetValue(0, i);
			Array obj = this.getColumn(i);
			for (int j = 0; j < obj._size; j++)
			{
				if(obj.get(j) == null)
					continue;
				String dataValue = obj.get(j).toString();
				if(dataValue.length() > validLength)
					outBlock.set(j, i, null);
			}
		}

		return outBlock;
	}

	public static FrameBlock mergeSchema(FrameBlock temp1, FrameBlock temp2) {
		String[] rowTemp1 = temp1.getStringRowIterator().next();
		String[] rowTemp2 = temp2.getStringRowIterator().next();

		if(rowTemp1.length != rowTemp2.length)
			throw new DMLRuntimeException("Schema dimension "
				+ "mismatch: "+rowTemp1.length+" vs "+rowTemp2.length);

		for(int i=0; i< rowTemp1.length; i++ ) {
			//modify schema1 if necessary (different schema2) 
			if(!rowTemp1[i].equals(rowTemp2[i])) {
				if(rowTemp1[i].equals("STRING") || rowTemp2[i].equals("STRING"))
					rowTemp1[i] = "STRING";
				else if (rowTemp1[i].equals("FP64") || rowTemp2[i].equals("FP64"))
					rowTemp1[i] = "FP64";
				else if (rowTemp1[i].equals("FP32") && new ArrayList<>(Arrays.asList("INT64", "INT32", "CHARACTER")).contains(rowTemp2[i]) )
					rowTemp1[i] = "FP32";
				else if (rowTemp1[i].equals("INT64") && new ArrayList<>(Arrays.asList("INT32", "CHARACTER")).contains(rowTemp2[i]))
					rowTemp1[i] = "INT64";
				else if (rowTemp1[i].equals("INT32") || rowTemp2[i].equals("CHARACTER"))
					rowTemp1[i] = "INT32";
			}
		}

		//create output block one row representing the schema as strings
		FrameBlock mergedFrame = new FrameBlock(
			UtilFunctions.nCopies(temp1.getNumColumns(), ValueType.STRING));
		mergedFrame.appendRow(rowTemp1);
		return mergedFrame;
	}
}
