blob: 175d400f4ebcea3134f4d348c5ab661d965b0b14 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.api.DMLException;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.codegen.CodegenUtils;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysds.runtime.functionobjects.ValueComparisonFunction;
import org.apache.sysds.runtime.instructions.cp.*;
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
import org.apache.sysds.runtime.transform.encode.EncoderRecode;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.IndexRange;
import org.apache.sysds.runtime.util.UtilFunctions;
@SuppressWarnings({"rawtypes","unchecked"}) //allow generic native arrays
public class FrameBlock implements CacheBlock, Externalizable {
private static final long serialVersionUID = -3993450030207130665L;
private static final Log LOG = LogFactory.getLog(FrameBlock.class.getName());
private static final IDSequence CLASS_ID = new IDSequence();
public static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements, size of default matrix block
//internal configuration
private static final boolean REUSE_RECODE_MAPS = true;
/** The number of rows of the FrameBlock */
private int _numRows = -1;
/** The schema of the data frame as an ordered list of value types */
private ValueType[] _schema = null;
/** The column names of the data frame as an ordered list of strings, allocated on-demand */
private String[] _colnames = null;
private ColumnMetadata[] _colmeta = null;
/** The data frame data as an ordered list of columns */
private Array[] _coldata = null;
public FrameBlock() {
_numRows = 0;
* Copy constructor for frame blocks, which uses a shallow copy for
* the schema (column types and names) but a deep copy for meta data
* and actual column data.
* @param that frame block
public FrameBlock(FrameBlock that) {
this(that.getSchema(), that.getColumnNames(false));
public FrameBlock(int ncols, ValueType vt) {
_schema = UtilFunctions.nCopies(ncols, vt);
_colnames = null; //default not materialized
_colmeta = new ColumnMetadata[ncols];
for( int j=0; j<ncols; j++ )
_colmeta[j] = new ColumnMetadata(0);
public FrameBlock(ValueType[] schema) {
this(schema, new String[0][]);
public FrameBlock(ValueType[] schema, String[] names) {
this(schema, names, new String[0][]);
public FrameBlock(ValueType[] schema, String[][] data) {
//default column names not materialized
this(schema, null, data);
public FrameBlock(ValueType[] schema, String[] names, String[][] data) {
_numRows = 0; //maintained on append
_schema = schema;
_colnames = names;
_colmeta = new ColumnMetadata[_schema.length];
for( int j=0; j<_schema.length; j++ )
_colmeta[j] = new ColumnMetadata(0);
for( int i=0; i<data.length; i++ )
* Get the number of rows of the frame block.
* @return number of rows
public int getNumRows() {
return _numRows;
public void setNumRows(int numRows) {
_numRows = numRows;
* Get the number of columns of the frame block, that is
* the number of columns defined in the schema.
* @return number of columns
public int getNumColumns() {
return (_schema != null) ? _schema.length : 0;
* Returns the schema of the frame block.
* @return schema as array of ValueTypes
public ValueType[] getSchema() {
return _schema;
* Sets the schema of the frame block.
* @param schema schema as array of ValueTypes
public void setSchema(ValueType[] schema) {
_schema = schema;
* Returns the column names of the frame block. This method
* allocates default column names if required.
* @return column names
public String[] getColumnNames() {
return getColumnNames(true);
public FrameBlock getColumnNamesAsFrame() {
FrameBlock fb = new FrameBlock(getNumColumns(), ValueType.STRING);
return fb;
* Returns the column names of the frame block. This method
* allocates default column names if required.
* @param alloc if true, create column names
* @return array of column names
public String[] getColumnNames(boolean alloc) {
if( _colnames == null && alloc )
_colnames = createColNames(getNumColumns());
return _colnames;
* Returns the column name for the requested column. This
* method allocates default column names if required.
* @param c column index
* @return column name
public String getColumnName(int c) {
if( _colnames == null )
_colnames = createColNames(getNumColumns());
return _colnames[c];
public void setColumnNames(String[] colnames) {
_colnames = colnames;
public ColumnMetadata[] getColumnMetadata() {
return _colmeta;
public ColumnMetadata getColumnMetadata(int c) {
return _colmeta[c];
public boolean isColumnMetadataDefault() {
boolean ret = true;
for( int j=0; j<getNumColumns() && ret; j++ )
ret &= isColumnMetadataDefault(j);
return ret;
public boolean isColumnMetadataDefault(int c) {
return _colmeta[c].getMvValue() == null
&& _colmeta[c].getNumDistinct() == 0;
public void setColumnMetadata(ColumnMetadata[] colmeta) {
System.arraycopy(colmeta, 0, _colmeta, 0, _colmeta.length);
public void setColumnMetadata(int c, ColumnMetadata colmeta) {
_colmeta[c] = colmeta;
* Creates a mapping from column names to column IDs, i.e.,
* 1-based column indexes
* @return map of column name keys and id values
public Map<String,Integer> getColumnNameIDMap() {
Map<String, Integer> ret = new HashMap<>();
for( int j=0; j<getNumColumns(); j++ )
ret.put(getColumnName(j), j+1);
return ret;
* Allocate column data structures if necessary, i.e., if schema specified
* but not all column data structures created yet.
* @param numRows number of rows
public void ensureAllocatedColumns(int numRows) {
//early abort if already allocated
if( _coldata != null && _schema.length == _coldata.length ) {
//handle special case that to few rows allocated
if( _numRows < numRows ) {
String[] tmp = new String[getNumColumns()];
int len = numRows - _numRows;
for(int i=0; i<len; i++)
//allocate column meta data if necessary
if( _colmeta == null || _schema.length != _colmeta.length ) {
_colmeta = new ColumnMetadata[_schema.length];
for( int j=0; j<_schema.length; j++ )
_colmeta[j] = new ColumnMetadata(0);
//allocate columns if necessary
_coldata = new Array[_schema.length];
for( int j=0; j<_schema.length; j++ ) {
switch( _schema[j] ) {
case STRING: _coldata[j] = new StringArray(new String[numRows]); break;
case BOOLEAN: _coldata[j] = new BooleanArray(new boolean[numRows]); break;
case INT32: _coldata[j] = new IntegerArray(new int[numRows]); break;
case INT64: _coldata[j] = new LongArray(new long[numRows]); break;
case FP32: _coldata[j] = new FloatArray(new float[numRows]); break;
case FP64: _coldata[j] = new DoubleArray(new double[numRows]); break;
default: throw new RuntimeException("Unsupported value type: "+_schema[j]);
_numRows = numRows;
* Checks for matching column sizes in case of existing columns.
* @param newlen number of rows to compare with existing number of rows
public void ensureColumnCompatibility(int newlen) {
if( _coldata!=null && _coldata.length > 0 && _numRows != newlen )
throw new RuntimeException("Mismatch in number of rows: "+newlen+" (expected: "+_numRows+")");
public static String[] createColNames(int size) {
return createColNames(0, size);
public static String[] createColNames(int off, int size) {
String[] ret = new String[size];
for( int i=off+1; i<=off+size; i++ )
ret[i-off-1] = createColName(i);
return ret;
public static String createColName(int i) {
return "C" + i;
public boolean isColNamesDefault() {
boolean ret = (_colnames != null);
for( int j=0; j<getNumColumns() && ret; j++ )
ret &= isColNameDefault(j);
return ret;
public boolean isColNameDefault(int i) {
return _colnames==null
|| _colnames[i].equals("C"+(i+1));
public void recomputeColumnCardinality() {
for( int j=0; j<getNumColumns(); j++ ) {
int card = 0;
for( int i=0; i<getNumRows(); i++ )
card += (get(i, j) != null) ? 1 : 0;
// basic get and set functionality
* Gets a boxed object of the value in position (r,c).
* @param r row index, 0-based
* @param c column index, 0-based
* @return object of the value at specified position
public Object get(int r, int c) {
return _coldata[c].get(r);
* Sets the value in position (r,c), where the input is assumed
* to be a boxed object consistent with the schema definition.
* @param r row index
* @param c column index
* @param val value to set at specified position
public void set(int r, int c, Object val) {
_coldata[c].set(r, UtilFunctions.objectToObject(_schema[c], val));
public void reset(int nrow, boolean clearMeta) {
if( clearMeta ) {
_schema = null;
_colnames = null;
if( _colmeta != null ) {
for( int i=0; i<_colmeta.length; i++ )
if( !isColumnMetadataDefault(i) )
_colmeta[i] = new ColumnMetadata(0);
if(_coldata != null) {
for( int i=0; i < _coldata.length; i++ )
public void reset() {
reset(0, true);
* Append a row to the end of the data frame, where all row fields
* are boxed objects according to the schema.
* @param row array of objects
public void appendRow(Object[] row) {
for( int j=0; j<row.length; j++ )
* Append a row to the end of the data frame, where all row fields
* are string encoded.
* @param row array of strings
public void appendRow(String[] row) {
for( int j=0; j<row.length; j++ )
* Append a column of value type STRING as the last column of
* the data frame. The given array is wrapped but not copied
* and hence might be updated in the future.
* @param col array of strings
public void appendColumn(String[] col) {
String[] colnames = getColumnNames(); //before schema modification
_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.STRING);
_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
_coldata = (_coldata==null) ? new Array[]{new StringArray(col)} :
(Array[]) ArrayUtils.add(_coldata, new StringArray(col));
_numRows = col.length;
* Append a column of value type BOOLEAN as the last column of
* the data frame. The given array is wrapped but not copied
* and hence might be updated in the future.
* @param col array of booleans
public void appendColumn(boolean[] col) {
String[] colnames = getColumnNames(); //before schema modification
_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.BOOLEAN);
_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
_coldata = (_coldata==null) ? new Array[]{new BooleanArray(col)} :
(Array[]) ArrayUtils.add(_coldata, new BooleanArray(col));
_numRows = col.length;
* Append a column of value type INT as the last column of
* the data frame. The given array is wrapped but not copied
* and hence might be updated in the future.
* @param col array of longs
public void appendColumn(int[] col) {
String[] colnames = getColumnNames(); //before schema modification
_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.INT32);
_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
_coldata = (_coldata==null) ? new Array[]{new IntegerArray(col)} :
(Array[]) ArrayUtils.add(_coldata, new IntegerArray(col));
_numRows = col.length;
* Append a column of value type LONG as the last column of
* the data frame. The given array is wrapped but not copied
* and hence might be updated in the future.
* @param col array of longs
public void appendColumn(long[] col) {
String[] colnames = getColumnNames(); //before schema modification
_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.INT64);
_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
_coldata = (_coldata==null) ? new Array[]{new LongArray(col)} :
(Array[]) ArrayUtils.add(_coldata, new LongArray(col));
_numRows = col.length;
* Append a column of value type float as the last column of
* the data frame. The given array is wrapped but not copied
* and hence might be updated in the future.
* @param col array of doubles
public void appendColumn(float[] col) {
String[] colnames = getColumnNames(); //before schema modification
_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.FP32);
_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
_coldata = (_coldata==null) ? new Array[]{new FloatArray(col)} :
(Array[]) ArrayUtils.add(_coldata, new FloatArray(col));
_numRows = col.length;
* Append a column of value type DOUBLE as the last column of
* the data frame. The given array is wrapped but not copied
* and hence might be updated in the future.
* @param col array of doubles
public void appendColumn(double[] col) {
String[] colnames = getColumnNames(); //before schema modification
_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.FP64);
_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
_coldata = (_coldata==null) ? new Array[]{new DoubleArray(col)} :
(Array[]) ArrayUtils.add(_coldata, new DoubleArray(col));
_numRows = col.length;
* Append a set of column of value type DOUBLE at the end of the frame
* in order to avoid repeated allocation with appendColumns. The given
* array is wrapped but not copied and hence might be updated in the future.
* @param cols 2d array of doubles
public void appendColumns(double[][] cols) {
int ncol = cols.length;
boolean empty = (_schema == null);
ValueType[] tmpSchema = UtilFunctions.nCopies(ncol, ValueType.FP64);
Array[] tmpData = new Array[ncol];
for( int j=0; j<ncol; j++ )
tmpData[j] = new DoubleArray(cols[j]);
_colnames = empty ? null : (String[]) ArrayUtils.addAll(getColumnNames(),
createColNames(getNumColumns(), ncol)); //before schema modification
_schema = empty ? tmpSchema : (ValueType[]) ArrayUtils.addAll(_schema, tmpSchema);
_coldata = empty ? tmpData : (Array[]) ArrayUtils.addAll(_coldata, tmpData);
_numRows = cols[0].length;
public Object getColumnData(int c) {
switch(_schema[c]) {
case STRING: return ((StringArray)_coldata[c])._data;
case BOOLEAN: return ((BooleanArray)_coldata[c])._data;
case INT64: return ((LongArray)_coldata[c])._data;
case FP64: return ((DoubleArray)_coldata[c])._data;
default: return null;
public Array getColumn(int c) {
return _coldata[c];
public void setColumn(int c, Array column) {
if( _coldata == null )
_coldata = new Array[getNumColumns()];
_coldata[c] = column;
* Get a row iterator over the frame where all fields are encoded
* as strings independent of their value types.
* @return string array iterator
public Iterator<String[]> getStringRowIterator() {
return new StringRowIterator(0, _numRows);
* Get a row iterator over the frame where all selected fields are
* encoded as strings independent of their value types.
* @param cols column selection, 1-based
* @return string array iterator
public Iterator<String[]> getStringRowIterator(int[] cols) {
return new StringRowIterator(0, _numRows, cols);
* Get a row iterator over the frame where all fields are encoded
* as strings independent of their value types.
* @param rl lower row index
* @param ru upper row index
* @return string array iterator
public Iterator<String[]> getStringRowIterator(int rl, int ru) {
return new StringRowIterator(rl, ru);
* Get a row iterator over the frame where all selected fields are
* encoded as strings independent of their value types.
* @param rl lower row index
* @param ru upper row index
* @param cols column selection, 1-based
* @return string array iterator
public Iterator<String[]> getStringRowIterator(int rl, int ru, int[] cols) {
return new StringRowIterator(rl, ru, cols);
* Get a row iterator over the frame where all fields are encoded
* as boxed objects according to their value types.
* @return object array iterator
public Iterator<Object[]> getObjectRowIterator() {
return new ObjectRowIterator(0, _numRows);
* Get a row iterator over the frame where all fields are encoded
* as boxed objects according to the value types of the provided
* target schema.
* @param schema target schema of objects
* @return object array iterator
public Iterator<Object[]> getObjectRowIterator(ValueType[] schema) {
ObjectRowIterator iter = new ObjectRowIterator(0, _numRows);
return iter;
* Get a row iterator over the frame where all selected fields are
* encoded as boxed objects according to their value types.
* @param cols column selection, 1-based
* @return object array iterator
public Iterator<Object[]> getObjectRowIterator(int[] cols) {
return new ObjectRowIterator(0, _numRows, cols);
* Get a row iterator over the frame where all fields are encoded
* as boxed objects according to their value types.
* @param rl lower row index
* @param ru upper row index
* @return object array iterator
public Iterator<Object[]> getObjectRowIterator(int rl, int ru) {
return new ObjectRowIterator(rl, ru);
* Get a row iterator over the frame where all selected fields are
* encoded as boxed objects according to their value types.
* @param rl lower row index
* @param ru upper row index
* @param cols column selection, 1-based
* @return object array iterator
public Iterator<Object[]> getObjectRowIterator(int rl, int ru, int[] cols) {
return new ObjectRowIterator(rl, ru, cols);
// serialization / deserialization (implementation of writable and externalizable)
// FIXME for FrameBlock fix write and readFields, it does not work if the Arrays are not yet
// allocated (after fixing remove hack in FederatedWorkerHandler.createFrameEncodeMeta(FederatedRequest) call to
// FrameBlock.ensureAllocatedColumns())
public void write(DataOutput out) throws IOException {
boolean isDefaultMeta = isColNamesDefault()
&& isColumnMetadataDefault();
//write header (rows, cols, default)
//write columns (value type, data)
for( int j=0; j<getNumColumns(); j++ ) {
if( !isDefaultMeta ) {
out.writeUTF( (_colmeta[j].getMvValue()!=null) ?
_colmeta[j].getMvValue() : "" );
public void readFields(DataInput in) throws IOException {
//read head (rows, cols)
_numRows = in.readInt();
int numCols = in.readInt();
boolean isDefaultMeta = in.readBoolean();
//allocate schema/meta data arrays
_schema = (_schema!=null && _schema.length==numCols) ?
_schema : new ValueType[numCols];
_colnames = (_colnames != null && _colnames.length==numCols) ?
_colnames : new String[numCols];
_colmeta = (_colmeta != null && _colmeta.length==numCols) ?
_colmeta : new ColumnMetadata[numCols];
_coldata = (_coldata!=null && _coldata.length==numCols) ?
_coldata : new Array[numCols];
//read columns (value type, meta, data)
for( int j=0; j<numCols; j++ ) {
ValueType vt = ValueType.values()[in.readByte()];
String name = isDefaultMeta ? createColName(j) : in.readUTF();
long ndistinct = isDefaultMeta ? 0 : in.readLong();
String mvvalue = isDefaultMeta ? null : in.readUTF();
Array arr = null;
switch( vt ) {
case STRING: arr = new StringArray(new String[_numRows]); break;
case BOOLEAN: arr = new BooleanArray(new boolean[_numRows]); break;
case INT64: arr = new LongArray(new long[_numRows]); break;
case FP64: arr = new DoubleArray(new double[_numRows]); break;
case INT32: arr = new IntegerArray(new int[_numRows]); break;
case FP32: arr = new FloatArray(new float[_numRows]); break;
default: throw new IOException("Unsupported value type: "+vt);
_schema[j] = vt;
_colnames[j] = name;
_colmeta[j] = new ColumnMetadata(ndistinct,
(mvvalue==null || mvvalue.isEmpty()) ? null : mvvalue);
_coldata[j] = arr;
public void writeExternal(ObjectOutput out) throws IOException {
//redirect serialization to writable impl
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
//redirect deserialization to writable impl
// CacheBlock implementation
public long getInMemorySize() {
//frame block header
long size = 16 + 4; //object, num rows
//schema array (overhead and int entries)
int clen = getNumColumns();
size += 8 + 32 + clen * 4;
//colname array (overhead and string entries)
size += 8 + ((_colnames!=null) ? 32 : 0);
for( int j=0; j<clen && _colnames!=null; j++ )
size += getInMemoryStringSize(getColumnName(j));
//meta data array (overhead and entries)
size += 8 + 32;
for( int j=0; j<clen; j++ ) {
size += 16 + 8 + 8 //object, long num distinct, ref mv
+ getInMemoryStringSize(_colmeta[j].getMvValue());
//data array (overhead and entries)
size += 8 + 32 + clen * (16+4+8+32);
for( int j=0; j<clen; j++ ) {
switch( _schema[j] ) {
case BOOLEAN: size += _numRows; break;
case INT64:
case FP64: size += 8*_numRows; break;
case STRING:
StringArray arr = (StringArray)_coldata[j];
for( int i=0; i<_numRows; i++ )
size += getInMemoryStringSize(arr.get(i));
default: //not applicable
return size;
public long getExactSerializedSize() {
//header: 2xint, boolean
long size = 9;
//column sizes
boolean isDefaultMeta = isColNamesDefault()
&& isColumnMetadataDefault();
for( int j=0; j<getNumColumns(); j++ ) {
size += 1; //column schema
if( !isDefaultMeta ) {
size += IOUtilFunctions.getUTFSize(getColumnName(j));
size += 8;
size += IOUtilFunctions.getUTFSize(_colmeta[j].getMvValue());
switch( _schema[j] ) {
case BOOLEAN: size += _numRows; break;
case INT64:
case FP64: size += 8*_numRows; break;
case STRING:
StringArray arr = (StringArray)_coldata[j];
for( int i=0; i<_numRows; i++ )
size += IOUtilFunctions.getUTFSize(arr.get(i));
default: //not applicable
return size;
public boolean isShallowSerialize() {
return isShallowSerialize(false);
public boolean isShallowSerialize(boolean inclConvert) {
//shallow serialize if non-string schema because a frame block
//is always dense but strings have large array overhead per cell
boolean ret = true;
for( int j=0; j<_schema.length && ret; j++ )
ret &= (_schema[j] != ValueType.STRING);
return ret;
public void toShallowSerializeBlock() {
//do nothing (not applicable).
public void compactEmptyBlock() {
//do nothing
* Returns the in-memory size in bytes of the given string value.
* @param value string value
* @return in-memory size of string value
private static long getInMemoryStringSize(String value) {
if( value == null )
return 0;
return 16 + 4 + 8 //object, hash, array ref
+ 32 + value.length(); //char array
* This method performs the value comparison on two frames
* if the values in both frames are equal, not equal, less than, greater than, less than/greater than and equal to
* the output frame will store boolean value for each each comparison
* @param bop binary operator
* @param that frame block of rhs of m * n dimensions
* @param out output frame block
* @return a boolean frameBlock
public FrameBlock binaryOperations(BinaryOperator bop, FrameBlock that, FrameBlock out) {
if(getNumColumns() != that.getNumColumns() && getNumRows() != that.getNumColumns())
throw new DMLRuntimeException("Frame dimension mismatch "+getNumRows()+" * "+getNumColumns()+
" != "+that.getNumRows()+" * "+that.getNumColumns());
String[][] outputData = new String[getNumRows()][getNumColumns()];
//compare output value, incl implicit type promotion if necessary
if( !(bop.fn instanceof ValueComparisonFunction) )
throw new DMLRuntimeException("Unsupported binary operation on frames (only comparisons supported)");
ValueComparisonFunction vcomp = (ValueComparisonFunction) bop.fn;
for (int i = 0; i < getNumColumns(); i++) {
if (getSchema()[i] == ValueType.STRING || that.getSchema()[i] == ValueType.STRING) {
for (int j = 0; j < getNumRows(); j++) {
if(checkAndSetEmpty(this, that, outputData, j, i))
String v1 = UtilFunctions.objectToString(get(j, i));
String v2 = UtilFunctions.objectToString(that.get(j, i));
outputData[j][i] = String.valueOf(, v2));
else if (getSchema()[i] == ValueType.FP64 || that.getSchema()[i] == ValueType.FP64 ||
getSchema()[i] == ValueType.FP32 || that.getSchema()[i] == ValueType.FP32) {
for (int j = 0; j < getNumRows(); j++) {
if(checkAndSetEmpty(this, that, outputData, j, i))
ScalarObject so1 = new DoubleObject(Double.parseDouble(get(j, i).toString()));
ScalarObject so2 = new DoubleObject(Double.parseDouble(that.get(j, i).toString()));
outputData[j][i] = String.valueOf(, so2.getDoubleValue()));
else if (getSchema()[i] == ValueType.INT64 || that.getSchema()[i] == ValueType.INT64 ||
getSchema()[i] == ValueType.INT32 || that.getSchema()[i] == ValueType.INT32) {
for (int j = 0; j < this.getNumRows(); j++) {
if(checkAndSetEmpty(this, that, outputData, j, i))
ScalarObject so1 = new IntObject(Integer.parseInt(get(j, i).toString()));
ScalarObject so2 = new IntObject(Integer.parseInt(that.get(j, i).toString()));
outputData[j][i] = String.valueOf(, so2.getLongValue()));
else {
for (int j = 0; j < getNumRows(); j++) {
if(checkAndSetEmpty(this, that, outputData, j, i))
ScalarObject so1 = new BooleanObject( Boolean.parseBoolean(get(j, i).toString()));
ScalarObject so2 = new BooleanObject( Boolean.parseBoolean(that.get(j, i).toString()));
outputData[j][i] = String.valueOf(, so2.getBooleanValue()));
return new FrameBlock(UtilFunctions.nCopies(this.getNumColumns(), ValueType.BOOLEAN), outputData);
private static boolean checkAndSetEmpty(FrameBlock fb1, FrameBlock fb2, String[][] out, int r, int c) {
if(fb1.get(r, c) == null || fb2.get(r, c) == null) {
out[r][c] = (fb1.get(r, c) == null && fb2.get(r, c) == null) ? "true" : "false";
return true;
return false;
// indexing and append operations
public FrameBlock leftIndexingOperations(FrameBlock rhsFrame, IndexRange ixrange, FrameBlock ret) {
return leftIndexingOperations(rhsFrame,
(int)ixrange.rowStart, (int)ixrange.rowEnd,
(int)ixrange.colStart, (int)ixrange.colEnd, ret);
public FrameBlock leftIndexingOperations(FrameBlock rhsFrame, int rl, int ru, int cl, int cu, FrameBlock ret) {
// check the validity of bounds
if ( rl < 0 || rl >= getNumRows() || ru < rl || ru >= getNumRows()
|| cl < 0 || cu >= getNumColumns() || cu < cl || cu >= getNumColumns() ) {
throw new DMLRuntimeException("Invalid values for frame indexing: ["+(rl+1)+":"+(ru+1)+"," + (cl+1)+":"+(cu+1)+"] " +
"must be within frame dimensions ["+getNumRows()+","+getNumColumns()+"].");
if ( (ru-rl+1) < rhsFrame.getNumRows() || (cu-cl+1) < rhsFrame.getNumColumns()) {
throw new DMLRuntimeException("Invalid values for frame indexing: " +
"dimensions of the source frame ["+rhsFrame.getNumRows()+"x" + rhsFrame.getNumColumns() + "] " +
"do not match the shape of the frame specified by indices [" +
(rl+1) +":" + (ru+1) + ", " + (cl+1) + ":" + (cu+1) + "].");
//allocate output frame (incl deep copy schema)
if( ret == null )
ret = new FrameBlock();
ret._numRows = _numRows;
ret._schema = _schema.clone();
ret._colnames = (_colnames != null) ? _colnames.clone() : null;
ret._colmeta = _colmeta.clone();
ret._coldata = new Array[getNumColumns()];
//copy data to output and partial overwrite w/ rhs
for( int j=0; j<getNumColumns(); j++ ) {
Array tmp = _coldata[j].clone();
if( j>=cl && j<=cu ) {
//fast-path for homogeneous column schemas
if( _schema[j]==rhsFrame._schema[j-cl] )
tmp.set(rl, ru, rhsFrame._coldata[j-cl]);
//general-path for heterogeneous column schemas
else {
for( int i=rl; i<=ru; i++ )
tmp.set(i, UtilFunctions.objectToObject(
_schema[j], rhsFrame._coldata[j-cl].get(i-rl)));
ret._coldata[j] = tmp;
return ret;
public FrameBlock slice(IndexRange ixrange, FrameBlock ret) {
return slice(
(int)ixrange.rowStart, (int)ixrange.rowEnd,
(int)ixrange.colStart, (int)ixrange.colEnd, ret);
* Right indexing operations to slice a subframe out of this frame block.
* Note that the existing column value types are preserved.
* @param rl row lower index, inclusive, 0-based
* @param ru row upper index, inclusive, 0-based
* @param cl column lower index, inclusive, 0-based
* @param cu column upper index, inclusive, 0-based
* @param retCache cache block
* @return frame block
public FrameBlock slice(int rl, int ru, int cl, int cu, CacheBlock retCache) {
FrameBlock ret = (FrameBlock)retCache;
// check the validity of bounds
if ( rl < 0 || rl >= getNumRows() || ru < rl || ru >= getNumRows()
|| cl < 0 || cu >= getNumColumns() || cu < cl || cu >= getNumColumns() ) {
throw new DMLRuntimeException("Invalid values for frame indexing: ["+(rl+1)+":"+(ru+1)+"," + (cl+1)+":"+(cu+1)+"] " +
"must be within frame dimensions ["+getNumRows()+","+getNumColumns()+"]");
//allocate output frame
if( ret == null )
ret = new FrameBlock();
ret.reset(ru-rl+1, true);
//copy output schema and colnames
int numCols = cu-cl+1;
boolean isDefNames = isColNamesDefault();
ret._schema = new ValueType[numCols];
ret._colnames = !isDefNames ? new String[numCols] : null;
ret._colmeta = new ColumnMetadata[numCols];
for( int j=cl; j<=cu; j++ ) {
ret._schema[j-cl] = _schema[j];
ret._colmeta[j-cl] = _colmeta[j];
if( !isDefNames )
ret._colnames[j-cl] = getColumnName(j);
ret._numRows = ru-rl+1;
if(ret._coldata == null )
ret._coldata = new Array[numCols];
//fast-path: shallow copy column indexing
if( ret._numRows == _numRows ) {
//this shallow copy does not only avoid an array copy, but
//also allows for bi-directional reuses of recodemaps
for( int j=cl; j<=cu; j++ )
ret._coldata[j-cl] = _coldata[j];
//copy output data
else {
for( int j=cl; j<=cu; j++ ) {
if( ret._coldata[j-cl] == null )
ret._coldata[j-cl] = _coldata[j].slice(rl,ru);
ret._coldata[j-cl].set(0, ru-rl, _coldata[j], rl);
return ret;
public void slice(ArrayList<Pair<Long,FrameBlock>> outlist, IndexRange range, int rowCut)
FrameBlock top=null, bottom=null;
Iterator<Pair<Long,FrameBlock>> p=outlist.iterator();
top =;
bottom =;
if(getNumRows() > 0)
int r=(int) range.rowStart;
for(; r<Math.min(rowCut, range.rowEnd+1); r++)
Object[] row = new Object[(int) (range.colEnd-range.colStart+1)];
for(int c=(int) range.colStart; c<range.colEnd+1; c++)
row[(int) (c-range.colStart)] = get(r,c);
for(; r<=range.rowEnd; r++)
Object[] row = new Object[(int) (range.colEnd-range.colStart+1)];
for(int c=(int) range.colStart; c<range.colEnd+1; c++)
row[(int) (c-range.colStart)] = get(r,c);
* Appends the given argument frameblock 'that' to this frameblock by
* creating a deep copy to prevent side effects. For cbind, the frames
* are appended column-wise (same number of rows), while for rbind the
* frames are appended row-wise (same number of columns).
* @param that frame block to append to current frame block
* @param ret frame block to return, can be null
* @param cbind if true, column append
* @return frame block
public FrameBlock append( FrameBlock that, FrameBlock ret, boolean cbind ) {
if( cbind ) //COLUMN APPEND
//sanity check row dimension mismatch
if( getNumRows() != that.getNumRows() ) {
throw new DMLRuntimeException("Incompatible number of rows for cbind: "+
that.getNumRows()+" (expected: "+getNumRows()+")");
//allocate output frame
if( ret == null )
ret = new FrameBlock();
ret._numRows = _numRows;
//concatenate schemas (w/ deep copy to prevent side effects)
ret._schema = (ValueType[]) ArrayUtils.addAll(_schema, that._schema);
ret._colnames = (String[]) ArrayUtils.addAll(getColumnNames(), that.getColumnNames());
ret._colmeta = (ColumnMetadata[]) ArrayUtils.addAll(_colmeta, that._colmeta);
//check and enforce unique columns names
if( ! HashSet<>()::add) )
ret._colnames = createColNames(ret.getNumColumns());
//concatenate column data (w/ shallow copy which is safe due to copy on write semantics)
ret._coldata = (Array[]) ArrayUtils.addAll(_coldata, that._coldata);
//sanity check column dimension mismatch
if( getNumColumns() != that.getNumColumns() ) {
throw new DMLRuntimeException("Incompatible number of columns for rbind: "+
that.getNumColumns()+" (expected: "+getNumColumns()+")");
//allocate output frame (incl deep copy schema)
if( ret == null )
ret = new FrameBlock();
ret._numRows = _numRows;
ret._schema = _schema.clone();
ret._colnames = (_colnames!=null) ? _colnames.clone() : null;
ret._colmeta = new ColumnMetadata[getNumColumns()];
for( int j=0; j<_schema.length; j++ )
ret._colmeta[j] = new ColumnMetadata(0);
//concatenate data (deep copy first, append second)
ret._coldata = new Array[getNumColumns()];
for( int j=0; j<getNumColumns(); j++ )
ret._coldata[j] = _coldata[j].clone();
Iterator<Object[]> iter = that.getObjectRowIterator(_schema);
while( iter.hasNext() )
return ret;
public void copy(FrameBlock src) {
copy(0, src.getNumRows()-1, 0, src.getNumColumns()-1, src);
public void copy(int rl, int ru, int cl, int cu, FrameBlock src)
//allocate columns if necessary
//copy values
for( int j=cl; j<=cu; j++ ) {
//special case: column memcopy
if( _schema[j].equals(src._schema[j-cl]) )
_coldata[j].set(rl, ru, src._coldata[j-cl]);
//general case w/ schema transformation
for( int i=rl; i<=ru; i++ ) {
String tmp = src.get(i-rl, j-cl)!=null ? src.get(i-rl, j-cl).toString() : null;
set(i, j, UtilFunctions.stringToObject(_schema[j], tmp));
// transform specific functionality
* This function will split every Recode map in the column using delimiter Lop.DATATYPE_PREFIX,
* as Recode map generated earlier in the form of Code+Lop.DATATYPE_PREFIX+Token and store it in a map
* which contains token and code for every unique tokens.
* @param col is the column # from frame data which contains Recode map generated earlier.
* @return map of token and code for every element in the input column of a frame containing Recode map
public HashMap<String,Long> getRecodeMap(int col) {
//probe cache for existing map
SoftReference<HashMap<String,Long>> tmp = _coldata[col]._rcdMapCache;
HashMap<String,Long> map = (tmp!=null) ? tmp.get() : null;
if( map != null ) return map;
//construct recode map
HashMap<String,Long> map = new HashMap<>();
Array ldata = _coldata[col];
for( int i=0; i<getNumRows(); i++ ) {
Object val = ldata.get(i);
if( val != null ) {
String[] tmp = EncoderRecode.splitRecodeMapEntry(val.toString());
map.put(tmp[0], Long.parseLong(tmp[1]));
//put created map into cache
_coldata[col]._rcdMapCache = new SoftReference<>(map);
return map;
public void merge(CacheBlock that, boolean bDummy) {
public void merge(FrameBlock that) {
//check for empty input source (nothing to merge)
if( that == null || that.getNumRows() == 0 )
//check dimensions (before potentially copy to prevent implicit dimension change)
if ( getNumRows() != that.getNumRows() || getNumColumns() != that.getNumColumns() )
throw new DMLRuntimeException("Dimension mismatch on merge disjoint (target="+getNumRows()+"x"+getNumColumns()+", source="+that.getNumRows()+"x"+that.getNumColumns()+")");
//meta data copy if necessary
for( int j=0; j<getNumColumns(); j++ )
if( !that.isColumnMetadataDefault(j) ) {
//core frame block merge through cell copy
//with column-wide access pattern
for( int j=0; j<getNumColumns(); j++ ) {
//special case: copy non-zeros of column
if( _schema[j].equals(that._schema[j]) )
_coldata[j].setNz(0, _numRows-1, that._coldata[j]);
//general case w/ schema transformation
else {
for( int i=0; i<_numRows; i++ ) {
Object obj = UtilFunctions.objectToObject(
_schema[j], that.get(i,j), true);
if (obj != null) //merge non-zeros
set(i, j,obj);
* This function ZERO OUT the data in the slicing window applicable for this block.
* @param result frame block
* @param range index range
* @param complementary ?
* @param iRowStartSrc ?
* @param iRowStartDest ?
* @param blen ?
* @param iMaxRowsToCopy ?
* @return frame block
public FrameBlock zeroOutOperations(FrameBlock result, IndexRange range, boolean complementary, int iRowStartSrc, int iRowStartDest, int blen, int iMaxRowsToCopy) {
int clen = getNumColumns();
result=new FrameBlock(getSchema());
result.reset(0, true);
for(int r=(int) range.rowStart; r<=range.rowEnd&&r+iRowStartDest<blen; r++)
for(int c=(int) range.colStart; c<=range.colEnd; c++)
result.set(r+iRowStartDest, c, get(r+iRowStartSrc,c));
int r=iRowStartDest;
for(; r<(int)range.rowStart && r-iRowStartDest<iMaxRowsToCopy ; r++)
for(int c=0; c<clen; c++/*, offset++*/)
result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));
for(; r<=(int)range.rowEnd && r-iRowStartDest<iMaxRowsToCopy ; r++)
for(int c=0; c<(int)range.colStart; c++)
result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));
for(int c=(int)range.colEnd+1; c<clen; c++)
result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));
for(; r-iRowStartDest<iMaxRowsToCopy ; r++)
for(int c=0; c<clen; c++)
result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));
return result;
public FrameBlock getSchemaTypeOf() {
FrameBlock fb = new FrameBlock(
UtilFunctions.nCopies(getNumColumns(), ValueType.STRING));
.map(vt -> vt.toString()).toArray(String[]::new));
return fb;
// row iterators (over strings and boxed objects)
private abstract class RowIterator<T> implements Iterator<T[]> {
protected final int[] _cols;
protected final T[] _curRow;
protected final int _maxPos;
protected int _curPos = -1;
protected RowIterator(int rl, int ru) {
this(rl, ru, UtilFunctions.getSeqArray(1, getNumColumns(), 1));
protected RowIterator(int rl, int ru, int[] cols) {
_curRow = createRow(cols.length);
_cols = cols;
_maxPos = ru;
_curPos = rl;
public boolean hasNext() {
return (_curPos < _maxPos);
public void remove() {
throw new RuntimeException("RowIterator.remove is unsupported!");
protected abstract T[] createRow(int size);
private class StringRowIterator extends RowIterator<String> {
public StringRowIterator(int rl, int ru) {
super(rl, ru);
public StringRowIterator(int rl, int ru, int[] cols) {
super(rl, ru, cols);
protected String[] createRow(int size) {
return new String[size];
public String[] next( ) {
for( int j=0; j<_cols.length; j++ ) {
Object tmp = get(_curPos, _cols[j]-1);
_curRow[j] = (tmp!=null) ? tmp.toString() : null;
return _curRow;
private class ObjectRowIterator extends RowIterator<Object> {
private ValueType[] _tgtSchema = null;
public ObjectRowIterator(int rl, int ru) {
super(rl, ru);
public ObjectRowIterator(int rl, int ru, int[] cols) {
super(rl, ru, cols);
public void setSchema(ValueType[] schema) {
_tgtSchema = schema;
protected Object[] createRow(int size) {
return new Object[size];
public Object[] next( ) {
for( int j=0; j<_cols.length; j++ )
_curRow[j] = getValue(_curPos, _cols[j]-1);
return _curRow;
private Object getValue(int i, int j) {
Object val = get(i, j);
if( _tgtSchema != null )
val = UtilFunctions.objectToObject(_tgtSchema[j], val);
return val;
// generic, resizable native arrays
* Base class for generic, resizable array of various value types. We
* use this custom class hierarchy instead of Trove or other libraries
* in order to avoid unnecessary dependencies.
private abstract static class Array<T> implements Writable {
protected SoftReference<HashMap<String,Long>> _rcdMapCache = null;
protected int _size = 0;
protected int newSize() {
return Math.max(_size*2, 4);
public abstract T get(int index);
public abstract void set(int index, T value);
public abstract void set(int rl, int ru, Array value);
public abstract void set(int rl, int ru, Array value, int rlSrc);
public abstract void setNz(int rl, int ru, Array value);
public abstract void append(String value);
public abstract void append(T value);
public abstract Array clone();
public abstract Array slice(int rl, int ru);
public abstract void reset(int size);
private static class StringArray extends Array<String> {
private String[] _data = null;
public StringArray(String[] data) {
_data = data;
_size = _data.length;
public String get(int index) {
return _data[index];
public void set(int index, String value) {
_data[index] = value;
public void set(int rl, int ru, Array value) {
set(rl, ru, value, 0);
public void set(int rl, int ru, Array value, int rlSrc) {
System.arraycopy(((StringArray)value)._data, rlSrc, _data, rl, ru-rl+1);
public void setNz(int rl, int ru, Array value) {
String[] data2 = ((StringArray)value)._data;
for( int i=rl; i<ru+1; i++ )
if( data2[i]!=null )
_data[i] = data2[i];
public void append(String value) {
if( _data.length <= _size )
_data = Arrays.copyOf(_data, newSize());
_data[_size++] = value;
public void write(DataOutput out) throws IOException {
for( int i=0; i<_size; i++ )
public void readFields(DataInput in) throws IOException {
_size = _data.length;
for( int i=0; i<_size; i++ ) {
String tmp = in.readUTF();
_data[i] = (!tmp.isEmpty()) ? tmp : null;
public Array clone() {
return new StringArray(Arrays.copyOf(_data, _size));
public Array slice(int rl, int ru) {
return new StringArray(Arrays.copyOfRange(_data,rl,ru+1));
public void reset(int size) {
if( _data.length < size )
_data = new String[size];
_size = size;
private static class BooleanArray extends Array<Boolean> {
private boolean[] _data = null;
public BooleanArray(boolean[] data) {
_data = data;
_size = _data.length;
public Boolean get(int index) {
return _data[index];
public void set(int index, Boolean value) {
_data[index] = (value!=null) ? value : false;
public void set(int rl, int ru, Array value) {
set(rl, ru, value, 0);
public void set(int rl, int ru, Array value, int rlSrc) {
System.arraycopy(((BooleanArray)value)._data, rlSrc, _data, rl, ru-rl+1);
public void setNz(int rl, int ru, Array value) {
boolean[] data2 = ((BooleanArray)value)._data;
for( int i=rl; i<ru+1; i++ )
if( data2[i] )
_data[i] = data2[i];
public void append(String value) {
public void append(Boolean value) {
if( _data.length <= _size )
_data = Arrays.copyOf(_data, newSize());
_data[_size++] = (value!=null) ? value : false;
public void write(DataOutput out) throws IOException {
for( int i=0; i<_size; i++ )
public void readFields(DataInput in) throws IOException {
_size = _data.length;
for( int i=0; i<_size; i++ )
_data[i] = in.readBoolean();
public Array clone() {
return new BooleanArray(Arrays.copyOf(_data, _size));
public Array slice(int rl, int ru) {
return new BooleanArray(Arrays.copyOfRange(_data,rl,ru+1));
public void reset(int size) {
if( _data.length < size )
_data = new boolean[size];
_size = size;
private static class LongArray extends Array<Long> {
private long[] _data = null;
public LongArray(long[] data) {
_data = data;
_size = _data.length;
public Long get(int index) {
return _data[index];
public void set(int index, Long value) {
_data[index] = (value!=null) ? value : 0L;
public void set(int rl, int ru, Array value) {
set(rl, ru, value, 0);
public void set(int rl, int ru, Array value, int rlSrc) {
System.arraycopy(((LongArray)value)._data, rlSrc, _data, rl, ru-rl+1);
public void setNz(int rl, int ru, Array value) {
long[] data2 = ((LongArray)value)._data;
for( int i=rl; i<ru+1; i++ )
if( data2[i]!=0 )
_data[i] = data2[i];
public void append(String value) {
public void append(Long value) {
if( _data.length <= _size )
_data = Arrays.copyOf(_data, newSize());
_data[_size++] = (value!=null) ? value : 0L;
public void write(DataOutput out) throws IOException {
for( int i=0; i<_size; i++ )
public void readFields(DataInput in) throws IOException {
_size = _data.length;
for( int i=0; i<_size; i++ )
_data[i] = in.readLong();
public Array clone() {
return new LongArray(Arrays.copyOf(_data, _size));
public Array slice(int rl, int ru) {
return new LongArray(Arrays.copyOfRange(_data,rl,ru+1));
public void reset(int size) {
if( _data.length < size )
_data = new long[size];
_size = size;
private static class IntegerArray extends Array<Integer> {
private int[] _data = null;
public IntegerArray(int[] data) {
_data = data;
_size = _data.length;
public Integer get(int index) {
return _data[index];
public void set(int index, Integer value) { _data[index] = (value!=null) ? value : 0;}
public void set(int rl, int ru, Array value) {
set(rl, ru, value, 0);
public void set(int rl, int ru, Array value, int rlSrc) {
System.arraycopy(((IntegerArray)value)._data, rlSrc, _data, rl, ru-rl+1);
public void setNz(int rl, int ru, Array value) {
int[] data2 = ((IntegerArray)value)._data;
for( int i=rl; i<ru+1; i++ )
if( data2[i]!=0 )
_data[i] = data2[i];
public void append(String value) {
public void append(Integer value) {
if( _data.length <= _size )
_data = Arrays.copyOf(_data, newSize());
_data[_size++] = (value!=null) ? value : 0;
public void write(DataOutput out) throws IOException {
for( int i=0; i<_size; i++ )
public void readFields(DataInput in) throws IOException {
_size = _data.length;
for( int i=0; i<_size; i++ )
_data[i] = in.readInt();
public Array clone() {
return new IntegerArray(Arrays.copyOf(_data, _size));
public Array slice(int rl, int ru) {
return new IntegerArray(Arrays.copyOfRange(_data,rl,ru+1));
public void reset(int size) {
if( _data.length < size )
_data = new int[size];
_size = size;
private static class FloatArray extends Array<Float> {
private float[] _data = null;
public FloatArray(float[] data) {
_data = data;
_size = _data.length;
public Float get(int index) {
return _data[index];
public void set(int index, Float value) { _data[index] = (value!=null) ? value : 0f; }
public void set(int rl, int ru, Array value) {
set(rl,ru, value, 0);
public void set(int rl, int ru, Array value, int rlSrc) {
System.arraycopy(((FloatArray)value)._data, rlSrc, _data, rl, ru-rl+1);
public void setNz(int rl, int ru, Array value) {
float[] data2 = ((FloatArray)value)._data;
for( int i=rl; i<ru+1; i++ )
if( data2[i]!=0 )
_data[i] = data2[i];
public void append(String value) { append((value!=null)? Float.parseFloat(value):null); }
public void append(Float value) {
if( _data.length <= _size )
_data = Arrays.copyOf(_data, newSize());
_data[_size++] = (value!=null) ? value : 0f;
public void write(DataOutput out) throws IOException {
for( int i=0; i<_size; i++ )
public void readFields(DataInput in) throws IOException {
_size = _data.length;
for( int i=0; i<_size; i++ )
_data[i] = in.readFloat();
public Array clone() {
return new FloatArray(Arrays.copyOf(_data, _size));
public Array slice(int rl, int ru) {
return new FloatArray(Arrays.copyOfRange(_data,rl,ru+1));
public void reset(int size) {
if( _data.length < size )
_data = new float[size];
_size = size;
private static class DoubleArray extends Array<Double> {
private double[] _data = null;
public DoubleArray(double[] data) {
_data = data;
_size = _data.length;
public Double get(int index) {
return _data[index];
public void set(int index, Double value) {
_data[index] = (value!=null) ? value : 0d;
public void set(int rl, int ru, Array value) {
set(rl,ru, value, 0);
public void set(int rl, int ru, Array value, int rlSrc) {
System.arraycopy(((DoubleArray)value)._data, rlSrc, _data, rl, ru-rl+1);
public void setNz(int rl, int ru, Array value) {
double[] data2 = ((DoubleArray)value)._data;
for( int i=rl; i<ru+1; i++ )
if( data2[i]!=0 )
_data[i] = data2[i];
public void append(String value) {
public void append(Double value) {
if( _data.length <= _size )
_data = Arrays.copyOf(_data, newSize());
_data[_size++] = (value!=null) ? value : 0d;
public void write(DataOutput out) throws IOException {
for( int i=0; i<_size; i++ )
public void readFields(DataInput in) throws IOException {
_size = _data.length;
for( int i=0; i<_size; i++ )
_data[i] = in.readDouble();
public Array clone() {
return new DoubleArray(Arrays.copyOf(_data, _size));
public Array slice(int rl, int ru) {
return new DoubleArray(Arrays.copyOfRange(_data,rl,ru+1));
public void reset(int size) {
if( _data.length < size )
_data = new double[size];
_size = size;
public static class ColumnMetadata implements Serializable {
private static final long serialVersionUID = -90094082422100311L;
private long _ndistinct = 0;
private String _mvValue = null;
public ColumnMetadata(long ndistinct) {
_ndistinct = ndistinct;
public ColumnMetadata(long ndistinct, String mvval) {
_ndistinct = ndistinct;
_mvValue = mvval;
public ColumnMetadata(ColumnMetadata that) {
_ndistinct = that._ndistinct;
_mvValue = that._mvValue;
public long getNumDistinct() {
return _ndistinct;
public void setNumDistinct(long ndistinct) {
_ndistinct = ndistinct;
public String getMvValue() {
return _mvValue;
public void setMvValue(String mvVal) {
_mvValue = mvVal;
private static ValueType isType(String val) {
val = val.trim().toLowerCase().replaceAll("\"", "");
if (val.matches("(true|false|t|f|0|1)"))
return ValueType.BOOLEAN;
else if (val.matches("[-+]?\\d+")){
long maxValue = Long.parseLong(val);
if ((maxValue >= Integer.MIN_VALUE) && (maxValue <= Integer.MAX_VALUE))
return ValueType.INT32;
return ValueType.INT64;
else if (val.matches("[-+]?[0-9]+\\.?[0-9]*([e]?[-+]?[0-9]+)")){
double maxValue = Double.parseDouble(val);
if ((maxValue >= (-Float.MAX_VALUE)) && (maxValue <= Float.MAX_VALUE))
return ValueType.FP32;
return ValueType.FP64;
else if (val.equals("infinity") || val.equals("-infinity") || val.equals("nan"))
return ValueType.FP64;
else return ValueType.STRING;
public FrameBlock detectSchemaFromRow(double sampleFraction) {
int rows = this.getNumRows();
int cols = this.getNumColumns();
String[] schemaInfo = new String[cols];
int sample = (int)Math.min(Math.max(sampleFraction*rows, 256), rows);
ExecutorService pool = CommonThreadPool.get(cols);
ArrayList<DetectValueTypeTask> tasks = new ArrayList<>();
for (int i = 0; i < cols; i++) {
FrameBlock.Array obj = this.getColumn(i);
tasks.add(new DetectValueTypeTask(obj,rows, sample));
List<Future<String>> ret;
try {
ret = pool.invokeAll(tasks);
for(int i = 0; i < cols; i++){
schemaInfo[i] = ret.get(i).get();
} catch (ExecutionException | InterruptedException e) {
throw new DMLRuntimeException("Exception Interupted or Exception thrown in Detect Schema", e);
//create output block one row representing the schema as strings
FrameBlock fb = new FrameBlock(UtilFunctions.nCopies(cols, ValueType.STRING));
return fb;
private static class DetectValueTypeTask implements Callable<String>
private final Array _obj;
private final int _rows;
private final int _sampleSize;
protected DetectValueTypeTask(Array obj, int rows, int sampleSize ) {
_obj = obj;
_rows = rows;
_sampleSize = sampleSize;
public String call() {
ValueType state = ValueType.UNKNOWN;
for (int j = 0; j < _sampleSize; j++) {
int randomIndex = ThreadLocalRandom.current().nextInt(0, _rows - 1);
String dataValue = ((_obj.get(randomIndex) != null)?_obj.get(randomIndex).toString().trim().replace("\"", "").toLowerCase():null);
if(dataValue != null){
ValueType current = isType(dataValue);
if (current == ValueType.STRING) {
state = ValueType.STRING;
else if (current== ValueType.FP64) {
state = ValueType.FP64;
else if (current== ValueType.FP32) {
state = (state == ValueType.FP64 ? state : ValueType.FP32);
else if (current == ValueType.INT64) {
state = ((state == ValueType.FP64 || state == ValueType.FP32) ? state : ValueType.INT64);
else if (current == ValueType.INT32) {
state = ((state == ValueType.FP64 || state == ValueType.FP32 || state == ValueType.INT64) ? state : ValueType.INT32);
else if (current == ValueType.BOOLEAN)
state = ((state == ValueType.FP64 || state == ValueType.FP32 || state == ValueType.INT64 || state == ValueType.INT32) ? state : ValueType.BOOLEAN);
* Drop the cell value which does not confirms to the data type of its column
* @param schema of the frame
* @return original frame where invalid values are replaced with null
public FrameBlock dropInvalidType(FrameBlock schema) {
//sanity checks
if(this.getNumColumns() != schema.getNumColumns())
throw new DMLException("mismatch in number of columns in frame and its schema ");
String[] schemaString = schema.getStringRowIterator().next(); // extract the schema in String array
for (int i = 0; i < this.getNumColumns(); i++) {
Array obj = this.getColumn(i);
String schemaCol = schemaString[i];
String type;
type = "FP";
} else if (schemaCol.contains("INT")){
type = "INT";
} else if (schemaCol.contains("STRING")){
// In case of String columns, don't do any verification or replacements.
} else{
type = schemaCol;
for (int j = 0; j < this.getNumRows(); j++)
if(obj.get(j) == null)
String dataValue = obj.get(j).toString().trim().replace("\"", "").toLowerCase() ;
ValueType dataType = isType(dataValue);
if(!dataType.toString().contains(type) && !(dataType == ValueType.BOOLEAN && type == "INT")){
LOG.warn("Datatype detected: " + dataType + " where expected: " + schemaString[i] + " index: " + i + "," +j);
return this;
* This method validates the frame data against an attribute length constrain
* if data value in any cell is greater than the specified threshold of that attribute
* the output frame will store a null on that cell position, thus removing the length-violating values.
* @param feaLen vector of valid lengths
* @return FrameBlock with invalid values converted into missing values (null)
public FrameBlock invalidByLength(MatrixBlock feaLen) {
//sanity checks
if(this.getNumColumns() != feaLen.getNumColumns())
throw new DMLException("mismatch in number of columns in frame and corresponding feature-length vector");
FrameBlock outBlock = new FrameBlock(this);
for (int i = 0; i < this.getNumColumns(); i++) {
if(feaLen.quickGetValue(0, i) == -1)
int validLength = (int)feaLen.quickGetValue(0, i);
Array obj = this.getColumn(i);
for (int j = 0; j < obj._size; j++)
if(obj.get(j) == null)
String dataValue = obj.get(j).toString();
if(dataValue.length() > validLength)
outBlock.set(j, i, null);
return outBlock;
public static FrameBlock mergeSchema(FrameBlock temp1, FrameBlock temp2) {
String[] rowTemp1 = temp1.getStringRowIterator().next();
String[] rowTemp2 = temp2.getStringRowIterator().next();
if(rowTemp1.length != rowTemp2.length)
throw new DMLRuntimeException("Schema dimension "
+ "mismatch: "+rowTemp1.length+" vs "+rowTemp2.length);
for(int i=0; i< rowTemp1.length; i++ ) {
//modify schema1 if necessary (different schema2)
if(!rowTemp1[i].equals(rowTemp2[i])) {
if(rowTemp1[i].equals("STRING") || rowTemp2[i].equals("STRING"))
rowTemp1[i] = "STRING";
else if (rowTemp1[i].equals("FP64") || rowTemp2[i].equals("FP64"))
rowTemp1[i] = "FP64";
else if (rowTemp1[i].equals("FP32") && new ArrayList<>(Arrays.asList("INT64", "INT32", "CHARACTER")).contains(rowTemp2[i]) )
rowTemp1[i] = "FP32";
else if (rowTemp1[i].equals("INT64") && new ArrayList<>(Arrays.asList("INT32", "CHARACTER")).contains(rowTemp2[i]))
rowTemp1[i] = "INT64";
else if (rowTemp1[i].equals("INT32") || rowTemp2[i].equals("CHARACTER"))
rowTemp1[i] = "INT32";
//create output block one row representing the schema as strings
FrameBlock mergedFrame = new FrameBlock(
UtilFunctions.nCopies(temp1.getNumColumns(), ValueType.STRING));
return mergedFrame;
public FrameBlock map(String lambdaExpr) {
return map(getCompiledFunction(lambdaExpr));
public FrameBlock map(FrameMapFunction lambdaExpr) {
// Prepare temporary output array
String[][] output = new String[getNumRows()][getNumColumns()];
// Execute map function on all cells
for(int j=0; j<getNumColumns(); j++) {
Array input = getColumn(j);
for (int i = 0; i < input._size; i++)
if(input.get(i) != null)
output[i][j] = lambdaExpr.apply(String.valueOf(input.get(i)));
return new FrameBlock(UtilFunctions.nCopies(getNumColumns(), ValueType.STRING), output);
public static FrameMapFunction getCompiledFunction(String lambdaExpr) {
// split lambda expression
String[] parts = lambdaExpr.split("->");
if( parts.length != 2 )
throw new DMLRuntimeException("Unsupported lambda expression: "+lambdaExpr);
String varname = parts[0].trim();
String expr = parts[1].trim();
// construct class code
String cname = "StringProcessing"+CLASS_ID.getNextID();
StringBuilder sb = new StringBuilder();
sb.append("import org.apache.sysds.runtime.util.UtilFunctions;\n");
sb.append("public class "+cname+" extends FrameMapFunction {\n");
sb.append("public String apply(String "+varname+") {\n");
sb.append(" return String.valueOf("+expr+"); }}\n");
// compile class, and create FrameMapFunction object
try {
return (FrameMapFunction) CodegenUtils
.compileClass(cname, sb.toString()).newInstance();
catch(InstantiationException | IllegalAccessException e) {
throw new DMLRuntimeException("Failed to compile FrameMapFunction.", e);
public static abstract class FrameMapFunction implements Serializable {
private static final long serialVersionUID = -8398572153616520873L;
public abstract String apply(String input);