[SYSTEMDS-2782] MDedup Builtin for finding duplicate rows
DIA project WS2020/21.
Closes #1139.
Date:      Mon Jan 11 23:50:57 2021 +0100
diff --git a/docs/site/builtins-reference.md b/docs/site/builtins-reference.md
index d960b25..022b75e 100644
--- a/docs/site/builtins-reference.md
+++ b/docs/site/builtins-reference.md
@@ -56,6 +56,7 @@
     * [`slicefinder`-Function](#slicefinder-function)
     * [`normalize`-Function](#normalize-function)
     * [`gnmf`-Function](#gnmf-function)
+    * [`mdedup`-Function](#mdedup-function)
     * [`msvm`-Function](#msvm-function)
     * [`naivebayes`-Function](#naivebayes-function)
     * [`outlier`-Function](#outlier-function)
@@ -1275,6 +1276,48 @@
 y = toOneHot(X,numClasses)
 ```
 
+## `mdedup`-Function
+
+The `mdedup`-function implements builtin for deduplication using matching dependencies 
+(e.g. Street 0.95, City 0.90 -> ZIP 1.0) by Jaccard distance.
+
+### Usage
+
+```r
+mdedup(X, Y, intercept, epsilon, lamda, maxIterations, verbose)
+```
+
+
+### Arguments
+
+| Name          | Type             | Default    | Description |
+| :------       | :-------------   | --------   | :---------- |
+| X             | Frame            | ---        | Input Frame X |
+| LHSfeatures   | Matrix[Integer]  | ---        | A matrix 1xd with numbers of columns for MDs |
+| LHSthreshold  | Matrix[Double]   | ---        | A matrix 1xd with threshold values in interval [0, 1] for MDs |
+| RHSfeatures   | Matrix[Integer]  | ---        | A matrix 1xd with numbers of columns for MDs |
+| RHSthreshold  | Matrix[Double]   | ---        | A matrix 1xd with threshold values in interval [0, 1] for MDs |
+| verbose       | Boolean          | False      | Set to true to print duplicates.|
+
+
+### Returns
+
+| Type            | Default  | Description |
+| :-------------- | -------- | :---------- |
+| Matrix[Integer] | ---      | Matrix of duplicates (rows). |
+
+
+### Example
+
+```r
+X = as.frame(rand(rows = 50, cols = 10))
+LHSfeatures = matrix("1 3 19", 1, 2)
+LHSthreshold = matrix("0.85 0.85", 1, 2)
+RHSfeatures = matrix("30", 1, 1)
+RHSthreshold = matrix("1.0", 1, 1)
+duplicates = mdedup(X, LHSfeatures, LHSthreshold, RHSfeatures, RHSthreshold, verbose = FALSE)
+```
+
 ## `msvm`-Function
 
 The `msvm`-function implements builtin multiclass SVM with squared slack variables
diff --git a/docs/site/dml-language-reference.md b/docs/site/dml-language-reference.md
index 2f3bd16..27bbbc6 100644
--- a/docs/site/dml-language-reference.md
+++ b/docs/site/dml-language-reference.md
@@ -2067,7 +2067,24 @@
       WEST
       EAST
 
-
+It is also possible to compute Jaccard similarity matrix of rows of a vector.
+<code> dist = map(Xi, "(x, y) -> UtilFunctions.jaccardSim(x, y)") <br/> 
+print(toString(dist)) </code>
+     
+    # FRAME: nrow = 10, ncol = 10 
+    # DOUBLE 
+    # 0,000 0,286 0,125 0,600 0,286 0,125 0,125 1,000 1,000 0,600 
+      0,286 0,000 0,429 0,286 1,000 0,429 0,429 0,286 0,286 0,286 
+      0,125 0,429 0,000 0,125 0,429 1,000 1,000 0,125 0,125 0,125 
+      0,600 0,286 0,125 0,000 0,286 0,125 0,125 0,600 0,600 1,000 
+      0,286 1,000 0,429 0,286 0,000 0,429 0,429 0,286 0,286 0,286 
+      0,125 0,429 1,000 0,125 0,429 0,000 1,000 0,125 0,125 0,125 
+      0,125 0,429 1,000 0,125 0,429 1,000 0,000 0,125 0,125 0,125 
+      1,000 0,286 0,125 0,600 0,286 0,125 0,125 0,000 1,000 0,600 
+      1,000 0,286 0,125 0,600 0,286 0,125 0,125 1,000 0,000 0,600 
+      0,600 0,286 0,125 1,000 0,286 0,125 0,125 0,600 0,600 0,000
+    #
+    
 * * *
 
 ## Modules
diff --git a/scripts/builtin/discoverFD.dml b/scripts/builtin/discoverFD.dml
index 0d787bd..49d013b 100644
--- a/scripts/builtin/discoverFD.dml
+++ b/scripts/builtin/discoverFD.dml
@@ -56,12 +56,12 @@
   # allocate output and working sets
   n = nrow(X)
   d = ncol(X)
-  FD = matrix(0, d, d)
+  FD = diag(matrix(1, d, 1))
   cm = matrix(0, 1, d)
 
   # num distinct per column
   parfor(i in 1:d)
-    cm[1,i] = colDistinct(X[,i]) 
+    cm[1,i] = colDistinct(X[,i])
 
   # add know functional dependencies
   FD = FD + (cm == 1)    # constant columns determined by all columns
diff --git a/scripts/builtin/mdedup.dml b/scripts/builtin/mdedup.dml
new file mode 100644
index 0000000..f6af1d4
--- /dev/null
+++ b/scripts/builtin/mdedup.dml
@@ -0,0 +1,119 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#------------------------------------------------------------------------------------------------------------------
+
+# Implements builtin for deduplication using matching dependencies (e.g. Street 0.95, City 0.90 -> ZIP 1.0)
+# and Jaccard distance.
+# 
+# INPUT PARAMETERS:
+# -----------------------------------------------------------------------------------------------------------------
+# NAME            TYPE              DEFAULT     MEANING
+# -----------------------------------------------------------------------------------------------------------------
+# X               Frame               --       Input Frame X
+# LHSfeatures     Matrix[Integer]     --       A matrix 1xd with numbers of columns for MDs
+#                                              (e.g. Street 0.95, City 0.90 -> ZIP 1.0)
+# LHSthreshold    Matrix[Double]      --       A matrix 1xd with threshold values in interval [0, 1] for MDs
+# RHSfeatures     Matrix[Integer]     --       A matrix 1xd with numbers of columns for MDs
+# RHSthreshold    Matrix[Double]      --       A matrix 1xd with threshold values in interval [0, 1] for MDs
+# verbose         Boolean             --       To print the output
+# -----------------------------------------------------------------------------------------------------------------
+#
+# Output(s)
+# -----------------------------------------------------------------------------------------------------------------
+# NAME                 TYPE         DEFAULT     MEANING
+# -----------------------------------------------------------------------------------------------------------------
+# MD              Matrix[Double]      ---       Matrix nx1 of duplicates
+
+s_mdedup = function(Frame[String] X, Matrix[Double] LHSfeatures, Matrix[Double] LHSthreshold,
+    Matrix[Double] RHSfeatures, Matrix[Double] RHSthreshold, Boolean verbose)
+  return(Matrix[Double] MD)
+{
+  n = nrow(X)
+  d = ncol(X)
+
+  if (0 > (ncol(LHSfeatures) + ncol(RHSfeatures)) > d)
+    stop("Invalid input: thresholds should in interval [0, " + d + "]")
+
+  if ((ncol(LHSfeatures) != ncol(LHSthreshold)) | (ncol(RHSfeatures) != ncol(RHSthreshold)))
+      stop("Invalid input: number of thresholds and columns to compare should be equal for LHS and RHS.")
+
+  if (max(LHSfeatures) > d | max(RHSfeatures) > d)
+    stop("Invalid input: feature values should be less than " + d)
+
+  if (sum(LHSthreshold > 1) > 0 | sum(RHSthreshold > 1) > 0)
+    stop("Invalid input: threshold values should be in the interval [0, 1].")
+
+  MD = matrix(0, n, 1)
+  LHS_MD = getMDAdjacency(X, LHSfeatures, LHSthreshold)
+  RHS_MD = matrix(0, n, n)
+
+  if (sum(LHS_MD) > 0) {
+    RHS_MD = getMDAdjacency(X, RHSfeatures, RHSthreshold)
+  }
+
+  MD = detectDuplicates(LHS_MD, RHS_MD)
+
+  if(verbose)
+    print(toString(MD))
+}
+
+getMDAdjacency = function(Frame[String] X, Matrix[Double] features, Matrix[Double] thresholds)
+  return(Matrix[Double] adjacency)
+{
+  n = nrow(X)
+  d = ncol(X)
+  adjacency = matrix(0, n, n)
+
+  i = 1
+  while (i <= ncol(features)) {
+    # slice col
+    pos = as.scalar(features[1, i])
+    Xi = X[, pos]
+    # distances between words in each row of col
+    dist = map(Xi, "(x, y) -> UtilFunctions.jaccardSim(x, y)")
+    jaccardDist = as.matrix(dist)
+    jaccardDist = jaccardDist + t(jaccardDist)
+    threshold = as.scalar(thresholds[1, i])
+
+    if(i == 1) {
+      adjacency = jaccardDist >= threshold
+    } else {
+      adjacency = adjacency & (jaccardDist >= threshold)
+    }
+
+    # break if one of MDs is false
+    if (sum(adjacency) == 0)
+      i = ncol(features)
+
+    i = i + 1
+  }
+}
+
+detectDuplicates = function(Matrix[Double] LHS_adj, Matrix[Double] RHS_adj)
+  return(Matrix[Double] MD)
+{
+
+  n = nrow(LHS_adj)
+  adjacency = LHS_adj * RHS_adj  
+  # find duplicates
+  # TODO size propagation issue of adjacency matrix inside components call
+  colDuplicates = components(G=adjacency[1:n, 1:n], verbose=FALSE)
+  MD = colDuplicates * (rowSums(adjacency[1:n, 1:n]) > 0)
+}
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java b/src/main/java/org/apache/sysds/common/Builtins.java
index 1a230bf..f18ec9c 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -97,6 +97,7 @@
 	DETECTSCHEMA("detectSchema", false),
 	DIAG("diag", false),
 	DISCOVER_FD("discoverFD", true),
+	DISCOVER_MD("mdedup", true),
 	DIST("dist", true),
 	DMV("dmv", true),
 	DROP_INVALID_TYPE("dropInvalidType", false),
diff --git a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
index d4d8296..ba3fbe3 100644
--- a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
+++ b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
@@ -1562,9 +1562,16 @@
 			checkMatrixFrameParam(getFirstExpr());
 			checkScalarParam(getSecondExpr());
 			output.setDataType(DataType.FRAME);
-			output.setDimensions(id.getDim1(), 1);
+			if(_args[1].getText().contains("jaccardSim")) {
+				output.setDimensions(id.getDim1(), id.getDim1());
+				output.setValueType(ValueType.FP64);
+			}
+			else {
+				output.setDimensions(id.getDim1(), 1);
+				output.setValueType(ValueType.STRING);
+			}
 			output.setBlocksize (id.getBlocksize());
-			output.setValueType(ValueType.STRING);
+
 			break;
 
 		default:
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameScalarSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameScalarSPInstruction.java
index a395c16..b5cf078 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameScalarSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameScalarSPInstruction.java
@@ -44,6 +44,11 @@
 		// Create local compiled functions (once) and execute on RDD
 		JavaPairRDD<Long, FrameBlock> out = in1.mapValues(new RDDStringProcessing(expression));
 
+		if(expression.contains("jaccardSim")) {
+			long rows = sec.getDataCharacteristics(output.getName()).getRows();
+			sec.getDataCharacteristics(output.getName()).setDimension(rows, rows);
+		}
+
 		sec.setRDDHandleForVariable(output.getName(), out);
 		sec.addLineageRDD(output.getName(), input1.getName());
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index e33052d..227fa0c 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -70,33 +70,33 @@
 
 	//internal configuration
 	private static final boolean REUSE_RECODE_MAPS = true;
-	
+
 	/** The number of rows of the FrameBlock */
 	private int _numRows = -1;
-	
+
 	/** The schema of the data frame as an ordered list of value types */
-	private ValueType[] _schema = null; 
-	
+	private ValueType[] _schema = null;
+
 	/** The column names of the data frame as an ordered list of strings, allocated on-demand */
 	private String[] _colnames = null;
-	
+
 	private ColumnMetadata[] _colmeta = null;
-	
+
 	/** The data frame data as an ordered list of columns */
 	private Array[] _coldata = null;
-	
+
 	/** Cached size in memory to avoid repeated scans of string columns */
 	long _msize = -1;
-	
+
 	public FrameBlock() {
 		_numRows = 0;
 	}
-	
+
 	/**
 	 * Copy constructor for frame blocks, which uses a shallow copy for
-	 * the schema (column types and names) but a deep copy for meta data 
+	 * the schema (column types and names) but a deep copy for meta data
 	 * and actual column data.
-	 * 
+	 *
 	 * @param that frame block
 	 */
 	public FrameBlock(FrameBlock that) {
@@ -104,7 +104,7 @@
 		copy(that);
 		setColumnMetadata(that.getColumnMetadata());
 	}
-	
+
 	public FrameBlock(int ncols, ValueType vt) {
 		this();
 		_schema = UtilFunctions.nCopies(ncols, vt);
@@ -113,21 +113,21 @@
 		for( int j=0; j<ncols; j++ )
 			_colmeta[j] = new ColumnMetadata(0);
 	}
-	
+
 	public FrameBlock(ValueType[] schema) {
 		this(schema, new String[0][]);
 	}
-	
+
 	public FrameBlock(ValueType[] schema, String[] names) {
 		this(schema, names, new String[0][]);
 	}
-	
+
 	public FrameBlock(ValueType[] schema, String[][] data) {
 		//default column names not materialized
 		this(schema, null, data);
 
 	}
-	
+
 	public FrameBlock(ValueType[] schema, String[] names, String[][] data) {
 		_numRows = 0; //maintained on append
 		_schema = schema;
@@ -138,10 +138,10 @@
 		for( int i=0; i<data.length; i++ )
 			appendRow(data[i]);
 	}
-	
+
 	/**
 	 * Get the number of rows of the frame block.
-	 * 
+	 *
 	 * @return number of rows
 	 */
 	@Override
@@ -152,30 +152,30 @@
 	public void setNumRows(int numRows) {
 		_numRows = numRows;
 	}
-	
+
 	/**
 	 * Get the number of columns of the frame block, that is
 	 * the number of columns defined in the schema.
-	 * 
+	 *
 	 * @return number of columns
 	 */
 	@Override
 	public int getNumColumns() {
 		return (_schema != null) ? _schema.length : 0;
 	}
-	
+
 	/**
 	 * Returns the schema of the frame block.
-	 * 
+	 *
 	 * @return schema as array of ValueTypes
 	 */
 	public ValueType[] getSchema() {
 		return _schema;
 	}
-	
+
 	/**
 	 * Sets the schema of the frame block.
-	 * 
+	 *
 	 * @param schema schema as array of ValueTypes
 	 */
 	public void setSchema(ValueType[] schema) {
@@ -183,26 +183,26 @@
 	}
 
 	/**
-	 * Returns the column names of the frame block. This method 
+	 * Returns the column names of the frame block. This method
 	 * allocates default column names if required.
-	 * 
+	 *
 	 * @return column names
 	 */
 	public String[] getColumnNames() {
 		return getColumnNames(true);
 	}
-	
-	
+
+
 	public FrameBlock getColumnNamesAsFrame() {
 		FrameBlock fb = new FrameBlock(getNumColumns(), ValueType.STRING);
 		fb.appendRow(getColumnNames());
 		return fb;
 	}
-	
+
 	/**
-	 * Returns the column names of the frame block. This method 
+	 * Returns the column names of the frame block. This method
 	 * allocates default column names if required.
-	 * 
+	 *
 	 * @param alloc if true, create column names
 	 * @return array of column names
 	 */
@@ -211,11 +211,11 @@
 			_colnames = createColNames(getNumColumns());
 		return _colnames;
 	}
-	
+
 	/**
-	 * Returns the column name for the requested column. This 
+	 * Returns the column name for the requested column. This
 	 * method allocates default column names if required.
-	 * 
+	 *
 	 * @param c column index
 	 * @return column name
 	 */
@@ -256,24 +256,24 @@
 	public void setColumnMetadata(int c, ColumnMetadata colmeta) {
 		_colmeta[c] = colmeta;
 	}
-	
+
 	/**
-	 * Creates a mapping from column names to column IDs, i.e., 
+	 * Creates a mapping from column names to column IDs, i.e.,
 	 * 1-based column indexes
-	 * 
+	 *
 	 * @return map of column name keys and id values
 	 */
 	public Map<String,Integer> getColumnNameIDMap() {
 		Map<String, Integer> ret = new HashMap<>();
 		for( int j=0; j<getNumColumns(); j++ )
 			ret.put(getColumnName(j), j+1);
-		return ret;	
+		return ret;
 	}
-	
+
 	/**
 	 * Allocate column data structures if necessary, i.e., if schema specified
 	 * but not all column data structures created yet.
-	 * 
+	 *
 	 * @param numRows number of rows
 	 */
 	public void ensureAllocatedColumns(int numRows) {
@@ -310,10 +310,10 @@
 		}
 		_numRows = numRows;
 	}
-	
+
 	/**
 	 * Checks for matching column sizes in case of existing columns.
-	 * 		
+	 *
 	 * @param newlen number of rows to compare with existing number of rows
 	 */
 	public void ensureColumnCompatibility(int newlen) {
@@ -340,11 +340,11 @@
 		boolean ret = (_colnames != null);
 		for( int j=0; j<getNumColumns() && ret; j++ )
 			ret &= isColNameDefault(j);
-		return ret;	
+		return ret;
 	}
 
 	public boolean isColNameDefault(int i) {
-		return _colnames==null 
+		return _colnames==null
 			|| _colnames[i].equals("C"+(i+1));
 	}
 
@@ -359,10 +359,10 @@
 
 	///////
 	// basic get and set functionality
-	
+
 	/**
 	 * Gets a boxed object of the value in position (r,c).
-	 * 
+	 *
 	 * @param r	row index, 0-based
 	 * @param c	column index, 0-based
 	 * @return object of the value at specified position
@@ -370,11 +370,11 @@
 	public Object get(int r, int c) {
 		return _coldata[c].get(r);
 	}
-	
+
 	/**
 	 * Sets the value in position (r,c), where the input is assumed
 	 * to be a boxed object consistent with the schema definition.
-	 * 
+	 *
 	 * @param r row index
 	 * @param c column index
 	 * @param val value to set at specified position
@@ -404,12 +404,12 @@
 	public void reset() {
 		reset(0, true);
 	}
-	
+
 
 	/**
 	 * Append a row to the end of the data frame, where all row fields
 	 * are boxed objects according to the schema.
-	 * 
+	 *
 	 * @param row array of objects
 	 */
 	public void appendRow(Object[] row) {
@@ -418,11 +418,11 @@
 			_coldata[j].append(row[j]);
 		_numRows++;
 	}
-	
+
 	/**
 	 * Append a row to the end of the data frame, where all row fields
 	 * are string encoded.
-	 * 
+	 *
 	 * @param row array of strings
 	 */
 	public void appendRow(String[] row) {
@@ -431,12 +431,12 @@
 			_coldata[j].append(row[j]);
 		_numRows++;
 	}
-	
+
 	/**
-	 * Append a column of value type STRING as the last column of 
-	 * the data frame. The given array is wrapped but not copied 
+	 * Append a column of value type STRING as the last column of
+	 * the data frame. The given array is wrapped but not copied
 	 * and hence might be updated in the future.
-	 * 
+	 *
 	 * @param col array of strings
 	 */
 	public void appendColumn(String[] col) {
@@ -449,12 +449,12 @@
 		_numRows = col.length;
 		_msize = -1;
 	}
-	
+
 	/**
-	 * Append a column of value type BOOLEAN as the last column of 
-	 * the data frame. The given array is wrapped but not copied 
+	 * Append a column of value type BOOLEAN as the last column of
+	 * the data frame. The given array is wrapped but not copied
 	 * and hence might be updated in the future.
-	 * 
+	 *
 	 * @param col array of booleans
 	 */
 	public void appendColumn(boolean[] col) {
@@ -463,16 +463,16 @@
 		_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.BOOLEAN);
 		_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
 		_coldata = (_coldata==null) ? new Array[]{new BooleanArray(col)} :
-			(Array[]) ArrayUtils.add(_coldata, new BooleanArray(col));	
+			(Array[]) ArrayUtils.add(_coldata, new BooleanArray(col));
 		_numRows = col.length;
 		_msize = -1;
 	}
-	
+
 	/**
-	 * Append a column of value type INT as the last column of 
-	 * the data frame. The given array is wrapped but not copied 
+	 * Append a column of value type INT as the last column of
+	 * the data frame. The given array is wrapped but not copied
 	 * and hence might be updated in the future.
-	 * 
+	 *
 	 * @param col array of longs
 	 */
 	public void appendColumn(int[] col) {
@@ -502,12 +502,12 @@
 		_numRows = col.length;
 		_msize = -1;
 	}
-	
+
 	/**
 	 * Append a column of value type float as the last column of
 	 * the data frame. The given array is wrapped but not copied
 	 * and hence might be updated in the future.
-	 * 
+	 *
 	 * @param col array of doubles
 	 */
 	public void appendColumn(float[] col) {
@@ -537,12 +537,12 @@
 		_numRows = col.length;
 		_msize = -1;
 	}
-	
+
 	/**
 	 * Append a set of column of value type DOUBLE at the end of the frame
-	 * in order to avoid repeated allocation with appendColumns. The given 
+	 * in order to avoid repeated allocation with appendColumns. The given
 	 * array is wrapped but not copied and hence might be updated in the future.
-	 * 
+	 *
 	 * @param cols 2d array of doubles
 	 */
 	public void appendColumns(double[][] cols) {
@@ -552,9 +552,9 @@
 		Array[] tmpData = new Array[ncol];
 		for( int j=0; j<ncol; j++ )
 			tmpData[j] = new DoubleArray(cols[j]);
-		_colnames = empty ? null : (String[]) ArrayUtils.addAll(getColumnNames(), 
+		_colnames = empty ? null : (String[]) ArrayUtils.addAll(getColumnNames(),
 			createColNames(getNumColumns(), ncol)); //before schema modification
-		_schema = empty ? tmpSchema : (ValueType[]) ArrayUtils.addAll(_schema, tmpSchema); 
+		_schema = empty ? tmpSchema : (ValueType[]) ArrayUtils.addAll(_schema, tmpSchema);
 		_coldata = empty ? tmpData : (Array[]) ArrayUtils.addAll(_coldata, tmpData);
 		_numRows = cols[0].length;
 		_msize = -1;
@@ -562,50 +562,50 @@
 
 	public Object getColumnData(int c) {
 		switch(_schema[c]) {
-			case STRING:  return ((StringArray)_coldata[c])._data; 
+			case STRING:  return ((StringArray)_coldata[c])._data;
 			case BOOLEAN: return ((BooleanArray)_coldata[c])._data;
 			case INT64:     return ((LongArray)_coldata[c])._data;
 			case FP64:  return ((DoubleArray)_coldata[c])._data;
 			default:      return null;
 	 	}
 	}
-	
+
 	public Array getColumn(int c) {
-		return _coldata[c]; 
+		return _coldata[c];
 	}
-	
+
 	public void setColumn(int c, Array column) {
 		if( _coldata == null )
 			_coldata = new Array[getNumColumns()];
 		_coldata[c] = column;
 		_msize = -1;
 	}
-	
+
 	/**
 	 * Get a row iterator over the frame where all fields are encoded
 	 * as strings independent of their value types.
-	 * 
+	 *
 	 * @return string array iterator
 	 */
 	public Iterator<String[]> getStringRowIterator() {
 		return new StringRowIterator(0, _numRows);
 	}
-	
+
 	/**
-	 * Get a row iterator over the frame where all selected fields are 
-	 * encoded as strings independent of their value types.  
-	 * 
+	 * Get a row iterator over the frame where all selected fields are
+	 * encoded as strings independent of their value types.
+	 *
 	 * @param cols column selection, 1-based
 	 * @return string array iterator
 	 */
 	public Iterator<String[]> getStringRowIterator(int[] cols) {
 		return new StringRowIterator(0, _numRows, cols);
 	}
-	
+
 	/**
 	 * Get a row iterator over the frame where all fields are encoded
-	 * as strings independent of their value types.  
-	 * 
+	 * as strings independent of their value types.
+	 *
 	 * @param rl lower row index
 	 * @param ru upper row index
 	 * @return string array iterator
@@ -613,11 +613,11 @@
 	public Iterator<String[]> getStringRowIterator(int rl, int ru) {
 		return new StringRowIterator(rl, ru);
 	}
-	
+
 	/**
-	 * Get a row iterator over the frame where all selected fields are 
-	 * encoded as strings independent of their value types.  
-	 * 
+	 * Get a row iterator over the frame where all selected fields are
+	 * encoded as strings independent of their value types.
+	 *
 	 * @param rl lower row index
 	 * @param ru upper row index
 	 * @param cols column selection, 1-based
@@ -626,22 +626,22 @@
 	public Iterator<String[]> getStringRowIterator(int rl, int ru, int[] cols) {
 		return new StringRowIterator(rl, ru, cols);
 	}
-	
+
 	/**
 	 * Get a row iterator over the frame where all fields are encoded
-	 * as boxed objects according to their value types.  
-	 * 
+	 * as boxed objects according to their value types.
+	 *
 	 * @return object array iterator
 	 */
 	public Iterator<Object[]> getObjectRowIterator() {
 		return new ObjectRowIterator(0, _numRows);
 	}
-	
+
 	/**
 	 * Get a row iterator over the frame where all fields are encoded
 	 * as boxed objects according to the value types of the provided
 	 * target schema.
-	 * 
+	 *
 	 * @param schema target schema of objects
 	 * @return object array iterator
 	 */
@@ -650,22 +650,22 @@
 		iter.setSchema(schema);
 		return iter;
 	}
-	
+
 	/**
-	 * Get a row iterator over the frame where all selected fields are 
-	 * encoded as boxed objects according to their value types.  
-	 * 
+	 * Get a row iterator over the frame where all selected fields are
+	 * encoded as boxed objects according to their value types.
+	 *
 	 * @param cols column selection, 1-based
 	 * @return object array iterator
 	 */
 	public Iterator<Object[]> getObjectRowIterator(int[] cols) {
 		return new ObjectRowIterator(0, _numRows, cols);
 	}
-	
+
 	/**
 	 * Get a row iterator over the frame where all fields are encoded
-	 * as boxed objects according to their value types.  
-	 * 
+	 * as boxed objects according to their value types.
+	 *
 	 * @param rl lower row index
 	 * @param ru upper row index
 	 * @return object array iterator
@@ -673,11 +673,11 @@
 	public Iterator<Object[]> getObjectRowIterator(int rl, int ru) {
 		return new ObjectRowIterator(rl, ru);
 	}
-	
+
 	/**
-	 * Get a row iterator over the frame where all selected fields are 
-	 * encoded as boxed objects according to their value types.  
-	 * 
+	 * Get a row iterator over the frame where all selected fields are
+	 * encoded as boxed objects according to their value types.
+	 *
 	 * @param rl lower row index
 	 * @param ru upper row index
 	 * @param cols column selection, 1-based
@@ -692,7 +692,7 @@
 	// FIXME for FrameBlock fix write and readFields, it does not work if the Arrays are not yet
 	// allocated (after fixing remove hack in FederatedWorkerHandler.createFrameEncodeMeta(FederatedRequest) call to
 	// FrameBlock.ensureAllocatedColumns())
-	
+
 	@Override
 	public void write(DataOutput out) throws IOException {
 		boolean isDefaultMeta = isColNamesDefault()
@@ -707,7 +707,7 @@
 			if( !isDefaultMeta ) {
 				out.writeUTF(getColumnName(j));
 				out.writeLong(_colmeta[j].getNumDistinct());
-				out.writeUTF( (_colmeta[j].getMvValue()!=null) ? 
+				out.writeUTF( (_colmeta[j].getMvValue()!=null) ?
 						_colmeta[j].getMvValue() : "" );
 			}
 			_coldata[j].write(out);
@@ -721,13 +721,13 @@
 		int numCols = in.readInt();
 		boolean isDefaultMeta = in.readBoolean();
 		//allocate schema/meta data arrays
-		_schema = (_schema!=null && _schema.length==numCols) ? 
+		_schema = (_schema!=null && _schema.length==numCols) ?
 				_schema : new ValueType[numCols];
-		_colnames = (_colnames != null && _colnames.length==numCols) ? 
+		_colnames = (_colnames != null && _colnames.length==numCols) ?
 				_colnames : new String[numCols];
-		_colmeta = (_colmeta != null && _colmeta.length==numCols) ? 
+		_colmeta = (_colmeta != null && _colmeta.length==numCols) ?
 				_colmeta : new ColumnMetadata[numCols];
-		_coldata = (_coldata!=null && _coldata.length==numCols) ? 
+		_coldata = (_coldata!=null && _coldata.length==numCols) ?
 				_coldata : new Array[numCols];
 		//read columns (value type, meta, data)
 		for( int j=0; j<numCols; j++ ) {
@@ -748,7 +748,7 @@
 			arr.readFields(in);
 			_schema[j] = vt;
 			_colnames[j] = name;
-			_colmeta[j] = new ColumnMetadata(ndistinct, 
+			_colmeta[j] = new ColumnMetadata(ndistinct,
 					(mvvalue==null || mvvalue.isEmpty()) ? null : mvvalue);
 			_coldata[j] = arr;
 		}
@@ -766,35 +766,35 @@
 		//redirect deserialization to writable impl
 		readFields(in);
 	}
-	
+
 	////////
 	// CacheBlock implementation
-	
+
 	@Override
 	public long getInMemorySize() {
 		//reuse previously computed size
 		if( _msize > 0 )
 			return _msize;
-		
+
 		//frame block header
 		long size = 16 + 4; //object, num rows
-		
+
 		//schema array (overhead and int entries)
 		int clen = getNumColumns();
 		size += 8 + 32 + clen * 4;
-		
+
 		//colname array (overhead and string entries)
 		size += 8 + ((_colnames!=null) ? 32 : 0);
 		for( int j=0; j<clen && _colnames!=null; j++ )
 			size += getInMemoryStringSize(getColumnName(j));
-		
+
 		//meta data array (overhead and entries)
 		size += 8 + 32;
 		for( int j=0; j<clen; j++ ) {
-			size += 16 + 8 + 8 //object, long num distinct, ref mv 
+			size += 16 + 8 + 8 //object, long num distinct, ref mv
 				+ getInMemoryStringSize(_colmeta[j].getMvValue());
 		}
-		
+
 		//data array (overhead and entries)
 		size += 8 + 32 + clen * (16+4+8+32);
 		for( int j=0; j<clen; j++ ) {
@@ -802,7 +802,7 @@
 				case BOOLEAN: size += _numRows; break;
 				case INT64:
 				case FP64: size += 8*_numRows; break;
-				case STRING: 
+				case STRING:
 					StringArray arr = (StringArray)_coldata[j];
 					for( int i=0; i<_numRows; i++ )
 						size += getInMemoryStringSize(arr.get(i));
@@ -810,15 +810,15 @@
 				default: //not applicable
 			}
 		}
-		
+
 		return _msize = size;
 	}
-	
+
 	@Override
 	public long getExactSerializedSize() {
 		//header: 2xint, boolean
 		long size = 9;
-		
+
 		//column sizes
 		boolean isDefaultMeta = isColNamesDefault()
 				&& isColumnMetadataDefault();
@@ -833,7 +833,7 @@
 				case BOOLEAN: size += _numRows; break;
 				case INT64:
 				case FP64: size += 8*_numRows; break;
-				case STRING: 
+				case STRING:
 					StringArray arr = (StringArray)_coldata[j];
 					for( int i=0; i<_numRows; i++ )
 						size += IOUtilFunctions.getUTFSize(arr.get(i));
@@ -841,15 +841,15 @@
 				default: //not applicable
 			}
 		}
-		
+
 		return size;
 	}
-	
+
 	@Override
 	public boolean isShallowSerialize() {
 		return isShallowSerialize(false);
 	}
-	
+
 	@Override
 	public boolean isShallowSerialize(boolean inclConvert) {
 		//shallow serialize if non-string schema because a frame block
@@ -859,20 +859,20 @@
 			ret &= (_schema[j] != ValueType.STRING);
 		return ret;
 	}
-	
-	@Override 
+
+	@Override
 	public void toShallowSerializeBlock() {
 		//do nothing (not applicable).
 	}
-	
+
 	@Override
 	public void compactEmptyBlock() {
 		//do nothing
 	}
-	
+
 	/**
-	 * Returns the in-memory size in bytes of the given string value. 
-	 * 
+	 * Returns the in-memory size in bytes of the given string value.
+	 *
 	 * @param value string value
 	 * @return in-memory size of string value
 	 */
@@ -880,9 +880,9 @@
 		if( value == null )
 			return 0;
 		return 16 + 4 + 8 //object, hash, array ref
-			+ 32 + value.length();     //char array 
+			+ 32 + value.length();     //char array
 	}
-	
+
 	/**
 	 *  This method performs the value comparison on two frames
 	 *  if the values in both frames are equal, not equal, less than, greater than, less than/greater than and equal to
@@ -947,7 +947,7 @@
 
 		return new FrameBlock(UtilFunctions.nCopies(this.getNumColumns(), ValueType.BOOLEAN), outputData);
 	}
-	
+
 	private static boolean checkAndSetEmpty(FrameBlock fb1, FrameBlock fb2, String[][] out, int r, int c) {
 		if(fb1.get(r, c) == null || fb2.get(r, c) == null) {
 			out[r][c] = (fb1.get(r, c) == null && fb2.get(r, c) == null) ? "true" : "false";
@@ -955,13 +955,13 @@
 		}
 		return false;
 	}
-	
+
 	///////
 	// indexing and append operations
-	
+
 	public FrameBlock leftIndexingOperations(FrameBlock rhsFrame, IndexRange ixrange, FrameBlock ret) {
-		return leftIndexingOperations(rhsFrame, 
-				(int)ixrange.rowStart, (int)ixrange.rowEnd, 
+		return leftIndexingOperations(rhsFrame,
+				(int)ixrange.rowStart, (int)ixrange.rowEnd,
 				(int)ixrange.colStart, (int)ixrange.colEnd, ret);
 	}
 
@@ -971,7 +971,7 @@
 			|| cl < 0 || cu >= getNumColumns() || cu < cl || cu >= getNumColumns() ) {
 			throw new DMLRuntimeException("Invalid values for frame indexing: ["+(rl+1)+":"+(ru+1)+"," + (cl+1)+":"+(cu+1)+"] " +
 							"must be within frame dimensions ["+getNumRows()+","+getNumColumns()+"].");
-		}		
+		}
 
 		if ( (ru-rl+1) < rhsFrame.getNumRows() || (cu-cl+1) < rhsFrame.getNumColumns()) {
 			throw new DMLRuntimeException("Invalid values for frame indexing: " +
@@ -979,8 +979,8 @@
 					"do not match the shape of the frame specified by indices [" +
 					(rl+1) +":" + (ru+1) + ", " + (cl+1) + ":" + (cu+1) + "].");
 		}
-		
-		
+
+
 		//allocate output frame (incl deep copy schema)
 		if( ret == null )
 			ret = new FrameBlock();
@@ -989,7 +989,7 @@
 		ret._colnames = (_colnames != null) ? _colnames.clone() : null;
 		ret._colmeta = _colmeta.clone();
 		ret._coldata = new Array[getNumColumns()];
-		
+
 		//copy data to output and partial overwrite w/ rhs
 		for( int j=0; j<getNumColumns(); j++ ) {
 			Array tmp = _coldata[j].clone();
@@ -1006,7 +1006,7 @@
 			}
 			ret._coldata[j] = tmp;
 		}
-		
+
 		return ret;
 	}
 
@@ -1015,11 +1015,11 @@
 				(int)ixrange.rowStart, (int)ixrange.rowEnd,
 				(int)ixrange.colStart, (int)ixrange.colEnd, ret);
 	}
-	
+
 	/**
-	 * Right indexing operations to slice a subframe out of this frame block. 
+	 * Right indexing operations to slice a subframe out of this frame block.
 	 * Note that the existing column value types are preserved.
-	 * 
+	 *
 	 * @param rl row lower index, inclusive, 0-based
 	 * @param ru row upper index, inclusive, 0-based
 	 * @param cl column lower index, inclusive, 0-based
@@ -1036,34 +1036,34 @@
 			throw new DMLRuntimeException("Invalid values for frame indexing: ["+(rl+1)+":"+(ru+1)+"," + (cl+1)+":"+(cu+1)+"] " +
 							"must be within frame dimensions ["+getNumRows()+","+getNumColumns()+"]");
 		}
-		
+
 		//allocate output frame
 		if( ret == null )
 			ret = new FrameBlock();
 		else
 			ret.reset(ru-rl+1, true);
-		
+
 		//copy output schema and colnames
 		int numCols = cu-cl+1;
 		boolean isDefNames = isColNamesDefault();
 		ret._schema = new ValueType[numCols];
 		ret._colnames = !isDefNames ? new String[numCols] : null;
 		ret._colmeta = new ColumnMetadata[numCols];
-		
+
 		for( int j=cl; j<=cu; j++ ) {
 			ret._schema[j-cl] = _schema[j];
 			ret._colmeta[j-cl] = _colmeta[j];
 			if( !isDefNames )
 				ret._colnames[j-cl] = getColumnName(j);
-		}	
+		}
 		ret._numRows = ru-rl+1;
 		if(ret._coldata == null )
 			ret._coldata = new Array[numCols];
-		
-		//fast-path: shallow copy column indexing 
+
+		//fast-path: shallow copy column indexing
 		if( ret._numRows == _numRows ) {
 			//this shallow copy does not only avoid an array copy, but
-			//also allows for bi-directional reuses of recodemaps 
+			//also allows for bi-directional reuses of recodemaps
 			for( int j=cl; j<=cu; j++ )
 				ret._coldata[j-cl] = _coldata[j];
 		}
@@ -1078,23 +1078,23 @@
 		}
 		return ret;
 	}
-	
-	
+
+
 	public void slice(ArrayList<Pair<Long,FrameBlock>> outlist, IndexRange range, int rowCut)
 	{
 		FrameBlock top=null, bottom=null;
 		Iterator<Pair<Long,FrameBlock>> p=outlist.iterator();
-		
+
 		if(range.rowStart<rowCut)
 			top = p.next().getValue();
-		
+
 		if(range.rowEnd>=rowCut)
 			bottom = p.next().getValue();
-		
+
 		if(getNumRows() > 0)
 		{
 			int r=(int) range.rowStart;
-			
+
 			for(; r<Math.min(rowCut, range.rowEnd+1); r++)
 			{
 				Object[] row = new Object[(int) (range.colEnd-range.colStart+1)];
@@ -1114,11 +1114,11 @@
 	}
 
 	/**
-	 * Appends the given argument frameblock 'that' to this frameblock by 
+	 * Appends the given argument frameblock 'that' to this frameblock by
 	 * creating a deep copy to prevent side effects. For cbind, the frames
-	 * are appended column-wise (same number of rows), while for rbind the 
-	 * frames are appended row-wise (same number of columns).   
-	 * 
+	 * are appended column-wise (same number of rows), while for rbind the
+	 * frames are appended row-wise (same number of columns).
+	 *
 	 * @param that frame block to append to current frame block
 	 * @param ret frame block to return, can be null
 	 * @param cbind if true, column append
@@ -1132,21 +1132,21 @@
 				throw new DMLRuntimeException("Incompatible number of rows for cbind: "+
 						that.getNumRows()+" (expected: "+getNumRows()+")");
 			}
-			
+
 			//allocate output frame
 			if( ret == null )
 				ret = new FrameBlock();
 			ret._numRows = _numRows;
-			
+
 			//concatenate schemas (w/ deep copy to prevent side effects)
 			ret._schema = (ValueType[]) ArrayUtils.addAll(_schema, that._schema);
 			ret._colnames = (String[]) ArrayUtils.addAll(getColumnNames(), that.getColumnNames());
 			ret._colmeta = (ColumnMetadata[]) ArrayUtils.addAll(_colmeta, that._colmeta);
-			
+
 			//check and enforce unique columns names
 			if( !Arrays.stream(ret._colnames).allMatch(new HashSet<>()::add) )
 				ret._colnames = createColNames(ret.getNumColumns());
-			
+
 			//concatenate column data (w/ shallow copy which is safe due to copy on write semantics)
 			ret._coldata = (Array[]) ArrayUtils.addAll(_coldata, that._coldata);
 		}
@@ -1157,7 +1157,7 @@
 				throw new DMLRuntimeException("Incompatible number of columns for rbind: "+
 						that.getNumColumns()+" (expected: "+getNumColumns()+")");
 			}
-			
+
 			//allocate output frame (incl deep copy schema)
 			if( ret == null )
 				ret = new FrameBlock();
@@ -1167,7 +1167,7 @@
 			ret._colmeta = new ColumnMetadata[getNumColumns()];
 			for( int j=0; j<_schema.length; j++ )
 				ret._colmeta[j] = new ColumnMetadata(0);
-			
+
 			//concatenate data (deep copy first, append second)
 			ret._coldata = new Array[getNumColumns()];
 			for( int j=0; j<getNumColumns(); j++ )
@@ -1183,32 +1183,32 @@
 		copy(0, src.getNumRows()-1, 0, src.getNumColumns()-1, src);
 	}
 
-	public void copy(int rl, int ru, int cl, int cu, FrameBlock src) 
+	public void copy(int rl, int ru, int cl, int cu, FrameBlock src)
 	{
 		//allocate columns if necessary
 		ensureAllocatedColumns(ru-rl+1);
-		
+
 		//copy values
 		for( int j=cl; j<=cu; j++ ) {
-			//special case: column memcopy 
+			//special case: column memcopy
 			if( _schema[j].equals(src._schema[j-cl]) )
 				_coldata[j].set(rl, ru, src._coldata[j-cl]);
 			//general case w/ schema transformation
-			else 
+			else
 				for( int i=rl; i<=ru; i++ ) {
 					String tmp = src.get(i-rl, j-cl)!=null ? src.get(i-rl, j-cl).toString() : null;
 					set(i, j, UtilFunctions.stringToObject(_schema[j], tmp));
 				}
 		}
 	}
-	
-	
+
+
 	///////
 	// transform specific functionality
-	
+
 	/**
-	 * This function will split every Recode map in the column using delimiter Lop.DATATYPE_PREFIX, 
-	 * as Recode map generated earlier in the form of Code+Lop.DATATYPE_PREFIX+Token and store it in a map 
+	 * This function will split every Recode map in the column using delimiter Lop.DATATYPE_PREFIX,
+	 * as Recode map generated earlier in the form of Code+Lop.DATATYPE_PREFIX+Token and store it in a map
 	 * which contains token and code for every unique tokens.
 	 *
 	 * @param col	is the column # from frame data which contains Recode map generated earlier.
@@ -1221,10 +1221,10 @@
 			HashMap<String,Long> map = (tmp!=null) ? tmp.get() : null;
 			if( map != null ) return map;
 		}
-		
+
 		//construct recode map
 		HashMap<String,Long> map = new HashMap<>();
-		Array ldata = _coldata[col]; 
+		Array ldata = _coldata[col];
 		for( int i=0; i<getNumRows(); i++ ) {
 			Object val = ldata.get(i);
 			if( val != null ) {
@@ -1232,11 +1232,11 @@
 				map.put(tmp[0], Long.parseLong(tmp[1]));
 			}
 		}
-		
+
 		//put created map into cache
 		if( REUSE_RECODE_MAPS )
 			_coldata[col]._rcdMapCache = new SoftReference<>(map);
-		
+
 		return map;
 	}
 
@@ -1249,22 +1249,22 @@
 		//check for empty input source (nothing to merge)
 		if( that == null || that.getNumRows() == 0 )
 			return;
-		
-		//check dimensions (before potentially copy to prevent implicit dimension change) 
+
+		//check dimensions (before potentially copy to prevent implicit dimension change)
 		if ( getNumRows() != that.getNumRows() || getNumColumns() != that.getNumColumns() )
 			throw new DMLRuntimeException("Dimension mismatch on merge disjoint (target="+getNumRows()+"x"+getNumColumns()+", source="+that.getNumRows()+"x"+that.getNumColumns()+")");
-		
+
 		//meta data copy if necessary
 		for( int j=0; j<getNumColumns(); j++ )
 			if( !that.isColumnMetadataDefault(j) ) {
 				_colmeta[j].setNumDistinct(that._colmeta[j].getNumDistinct());
 				_colmeta[j].setMvValue(that._colmeta[j].getMvValue());
 			}
-		
+
 		//core frame block merge through cell copy
 		//with column-wide access pattern
 		for( int j=0; j<getNumColumns(); j++ ) {
-			//special case: copy non-zeros of column 
+			//special case: copy non-zeros of column
 			if( _schema[j].equals(that._schema[j]) )
 				_coldata[j].setNz(0, _numRows-1, that._coldata[j]);
 			//general case w/ schema transformation
@@ -1278,10 +1278,10 @@
 			}
 		}
 	}
-	
+
 	/**
 	 * This function ZERO OUT the data in the slicing window applicable for this block.
-	 * 
+	 *
 	 * @param result frame block
 	 * @param range index range
 	 * @param complementary ?
@@ -1293,16 +1293,16 @@
 	 */
 	public FrameBlock zeroOutOperations(FrameBlock result, IndexRange range, boolean complementary, int iRowStartSrc, int iRowStartDest, int blen, int iMaxRowsToCopy) {
 		int clen = getNumColumns();
-		
+
 		if(result==null)
 			result=new FrameBlock(getSchema());
-		else 
+		else
 		{
 			result.reset(0, true);
 			result.setSchema(getSchema());
 		}
 		result.ensureAllocatedColumns(blen);
-		
+
 		if(complementary)
 		{
 			for(int r=(int) range.rowStart; r<=range.rowEnd&&r+iRowStartDest<blen; r++)
@@ -1316,7 +1316,7 @@
 			for(; r<(int)range.rowStart && r-iRowStartDest<iMaxRowsToCopy ; r++)
 				for(int c=0; c<clen; c++/*, offset++*/)
 					result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));
-			
+
 			for(; r<=(int)range.rowEnd && r-iRowStartDest<iMaxRowsToCopy ; r++)
 			{
 				for(int c=0; c<(int)range.colStart; c++)
@@ -1325,12 +1325,12 @@
 				for(int c=(int)range.colEnd+1; c<clen; c++)
 					result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));
 			}
-			
+
 			for(; r-iRowStartDest<iMaxRowsToCopy ; r++)
 				for(int c=0; c<clen; c++)
 					result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));
 		}
-		
+
 		return result;
 	}
 
@@ -1341,7 +1341,7 @@
 			.map(vt -> vt.toString()).toArray(String[]::new));
 		return fb;
 	}
-	
+
 	///////
 	// row iterators (over strings and boxed objects)
 
@@ -1350,18 +1350,18 @@
 		protected final T[] _curRow;
 		protected final int _maxPos;
 		protected int _curPos = -1;
-		
+
 		protected RowIterator(int rl, int ru) {
 			this(rl, ru, UtilFunctions.getSeqArray(1, getNumColumns(), 1));
 		}
-		
+
 		protected RowIterator(int rl, int ru, int[] cols) {
 			_curRow = createRow(cols.length);
 			_cols = cols;
 			_maxPos = ru;
 			_curPos = rl;
 		}
-		
+
 		@Override
 		public boolean hasNext() {
 			return (_curPos < _maxPos);
@@ -1369,9 +1369,9 @@
 
 		@Override
 		public void remove() {
-			throw new RuntimeException("RowIterator.remove is unsupported!");			
+			throw new RuntimeException("RowIterator.remove is unsupported!");
 		}
-		
+
 		protected abstract T[] createRow(int size);
 	}
 
@@ -1379,16 +1379,16 @@
 		public StringRowIterator(int rl, int ru) {
 			super(rl, ru);
 		}
-		
+
 		public StringRowIterator(int rl, int ru, int[] cols) {
 			super(rl, ru, cols);
 		}
-		
+
 		@Override
 		protected String[] createRow(int size) {
 			return new String[size];
 		}
-		
+
 		@Override
 		public String[] next( ) {
 			for( int j=0; j<_cols.length; j++ ) {
@@ -1402,24 +1402,24 @@
 
 	private class ObjectRowIterator extends RowIterator<Object> {
 		private ValueType[] _tgtSchema = null;
-		
+
 		public ObjectRowIterator(int rl, int ru) {
 			super(rl, ru);
 		}
-		
+
 		public ObjectRowIterator(int rl, int ru, int[] cols) {
 			super(rl, ru, cols);
 		}
-		
+
 		public void setSchema(ValueType[] schema) {
 			_tgtSchema = schema;
 		}
-		
+
 		@Override
 		protected Object[] createRow(int size) {
 			return new Object[size];
 		}
-		
+
 		@Override
 		public Object[] next( ) {
 			for( int j=0; j<_cols.length; j++ )
@@ -1427,7 +1427,7 @@
 			_curPos++;
 			return _curRow;
 		}
-		
+
 		private Object getValue(int i, int j) {
 			Object val = get(i, j);
 			if( _tgtSchema != null )
@@ -1435,21 +1435,21 @@
 			return val;
 		}
 	}
-	
+
 	///////
-	// generic, resizable native arrays 
-	
+	// generic, resizable native arrays
+
 	/**
-	 * Base class for generic, resizable array of various value types. We 
-	 * use this custom class hierarchy instead of Trove or other libraries 
+	 * Base class for generic, resizable array of various value types. We
+	 * use this custom class hierarchy instead of Trove or other libraries
 	 * in order to avoid unnecessary dependencies.
 	 */
 	private abstract static class Array<T> implements Writable {
 		protected SoftReference<HashMap<String,Long>> _rcdMapCache = null;
-		
+
 		protected int _size = 0;
 		protected int newSize() {
-			return Math.max(_size*2, 4); 
+			return Math.max(_size*2, 4);
 		}
 		public abstract T get(int index);
 		public abstract void set(int index, T value);
@@ -1461,12 +1461,12 @@
 		@Override
 		public abstract Array clone();
 		public abstract Array slice(int rl, int ru);
-		public abstract void reset(int size); 
+		public abstract void reset(int size);
 	}
 
 	private static class StringArray extends Array<String> {
 		private String[] _data = null;
-		
+
 		public StringArray(String[] data) {
 			_data = data;
 			_size = _data.length;
@@ -1531,7 +1531,7 @@
 
 	private static class BooleanArray extends Array<Boolean> {
 		private boolean[] _data = null;
-		
+
 		public BooleanArray(boolean[] data) {
 			_data = data;
 			_size = _data.length;
@@ -1598,7 +1598,7 @@
 
 	private static class LongArray extends Array<Long> {
 		private long[] _data = null;
-		
+
 		public LongArray(long[] data) {
 			_data = data;
 			_size = _data.length;
@@ -1798,7 +1798,7 @@
 
 	private static class DoubleArray extends Array<Double> {
 		private double[] _data = null;
-		
+
 		public DoubleArray(double[] data) {
 			_data = data;
 			_size = _data.length;
@@ -1865,10 +1865,10 @@
 
 	public static class ColumnMetadata implements Serializable {
 		private static final long serialVersionUID = -90094082422100311L;
-		
+
 		private long _ndistinct = 0;
 		private String _mvValue = null;
-		
+
 		public ColumnMetadata(long ndistinct) {
 			_ndistinct = ndistinct;
 		}
@@ -1880,10 +1880,10 @@
 			_ndistinct = that._ndistinct;
 			_mvValue = that._mvValue;
 		}
-		
+
 		public long getNumDistinct() {
 			return _ndistinct;
-		}		
+		}
 		public void setNumDistinct(long ndistinct) {
 			_ndistinct = ndistinct;
 		}
@@ -2042,7 +2042,7 @@
 	 *  This method validates the frame data against an attribute length constrain
 	 *  if data value in any cell is greater than the specified threshold of that attribute
 	 *  the output frame will store a null on that cell position, thus removing the length-violating values.
-	 * 
+	 *
 	 *  @param feaLen vector of valid lengths
 	 *  @return FrameBlock with invalid values converted into missing values (null)
 	 */
@@ -2079,7 +2079,7 @@
 				+ "mismatch: "+rowTemp1.length+" vs "+rowTemp2.length);
 
 		for(int i=0; i< rowTemp1.length; i++ ) {
-			//modify schema1 if necessary (different schema2) 
+			//modify schema1 if necessary (different schema2)
 			if(!rowTemp1[i].equals(rowTemp2[i])) {
 				if(rowTemp1[i].equals("STRING") || rowTemp2[i].equals("STRING"))
 					rowTemp1[i] = "STRING";
@@ -2101,7 +2101,7 @@
 		return mergedFrame;
 	}
 
-	public FrameBlock map(String lambdaExpr) {
+	public FrameBlock map (String lambdaExpr){
 		if(!lambdaExpr.contains("->")) {
 			String args = lambdaExpr.substring(lambdaExpr.indexOf('(') + 1, lambdaExpr.indexOf(')'));
 			if(args.contains(",")) {
@@ -2109,86 +2109,73 @@
 				return DMVUtils.syntacticalPatternDiscovery(this, Double.parseDouble(arguments[0]), arguments[1]);
 			}
 		}
+		if(lambdaExpr.contains("jaccardSim"))
+			return mapDist(getCompiledFunction(lambdaExpr));
 		return map(getCompiledFunction(lambdaExpr));
 	}
 
-	public FrameBlock map(FrameBlockMapFunction lambdaExpression) {
-		return lambdaExpression.apply();
-	}
-	
-	public FrameBlock map(FrameMapFunction lambdaExpr) {
+	public FrameBlock map (FrameMapFunction lambdaExpr) {
 		// Prepare temporary output array
 		String[][] output = new String[getNumRows()][getNumColumns()];
-
 		// Execute map function on all cells
-		for(int j=0; j<getNumColumns(); j++) {
+		for(int j = 0; j < getNumColumns(); j++) {
 			Array input = getColumn(j);
-			for (int i = 0; i < input._size; i++)
+			for(int i = 0; i < input._size; i++)
 				if(input.get(i) != null)
 					output[i][j] = lambdaExpr.apply(String.valueOf(input.get(i)));
 		}
 
-		return  new FrameBlock(UtilFunctions.nCopies(getNumColumns(), ValueType.STRING), output);
+		return new FrameBlock(UtilFunctions.nCopies(getNumColumns(), ValueType.STRING), output);
 	}
 
-	public static FrameMapFunction getCompiledFunction(String lambdaExpr) {
-		String cname = "StringProcessing"+CLASS_ID.getNextID();
+	public FrameBlock mapDist (FrameMapFunction lambdaExpr) {
+		String[][] output = new String[getNumRows()][getNumRows()];
+		for(String[] row : output)
+			Arrays.fill(row, "0.0");
+		Array input = getColumn(0);
+		for(int j = 0; j < input._size - 1; j++) {
+			for(int i = j + 1; i < input._size; i++)
+				if(input.get(i) != null && input.get(j) != null) {
+					output[j][i] = lambdaExpr.apply(String.valueOf(input.get(j)), String.valueOf(input.get(i)));
+					//					output[i][j] = output[j][i];
+				}
+		}
+		return new FrameBlock(UtilFunctions.nCopies(getNumRows(), ValueType.STRING), output);
+	}
+
+	public static FrameMapFunction getCompiledFunction (String lambdaExpr) {
+		String cname = "StringProcessing" + CLASS_ID.getNextID();
 		StringBuilder sb = new StringBuilder();
 		String[] parts = lambdaExpr.split("->");
-		
-		if( parts.length != 2 )
-			throw new DMLRuntimeException("Unsupported lambda expression: "+lambdaExpr);
-		
-		String varname = parts[0].trim();
+		if(parts.length != 2)
+			throw new DMLRuntimeException("Unsupported lambda expression: " + lambdaExpr);
+		String[] varname = parts[0].replaceAll("[()]", "").split(",");
 		String expr = parts[1].trim();
 
 		// construct class code
 		sb.append("import org.apache.sysds.runtime.util.UtilFunctions;\n");
 		sb.append("import org.apache.sysds.runtime.matrix.data.FrameBlock.FrameMapFunction;\n");
-		sb.append("public class "+cname+" extends FrameMapFunction {\n");
-		sb.append("@Override\n");
-		sb.append("public String apply(String "+varname+") {\n");
-		sb.append("  return String.valueOf("+expr+"); }}\n");
-
+		sb.append("public class " + cname + " extends FrameMapFunction {\n");
+		if(varname.length == 1) {
+			sb.append("public String apply(String " + varname[0].trim() + ") {\n");
+			sb.append("  return String.valueOf(" + expr + "); }}\n");
+		}
+		else if(varname.length == 2) {
+			sb.append("public String apply(String " + varname[0].trim() + ", String " + varname[1].trim() + ") {\n");
+			sb.append("  return String.valueOf(" + expr + "); }}\n");
+		}
 		// compile class, and create FrameMapFunction object
 		try {
-			return (FrameMapFunction) CodegenUtils
-				.compileClass(cname, sb.toString()).newInstance();
+			return (FrameMapFunction) CodegenUtils.compileClass(cname, sb.toString()).newInstance();
 		}
 		catch(InstantiationException | IllegalAccessException e) {
 			throw new DMLRuntimeException("Failed to compile FrameMapFunction.", e);
 		}
 	}
 
-
-	public FrameBlockMapFunction getCompiledFunctionBlock(String lambdaExpression) {
-		String cname = "StringProcessing"+CLASS_ID.getNextID();
-		StringBuilder sb = new StringBuilder();
-		String expr = lambdaExpression;
-
-		sb.append("import org.apache.sysds.runtime.util.UtilFunctions;\n");
-		sb.append("import org.apache.sysds.runtime.matrix.data.FrameBlock.FrameBlockMapFunction;\n");
-		sb.append("public class "+cname+" extends FrameBlockMapFunction {\n");
-		sb.append("@Override\n");
-		sb.append("public FrameBlock apply() {\n");
-		sb.append("  return "+expr+"; }}\n");
-
-		try {
-			return (FrameBlockMapFunction) CodegenUtils
-				.compileClass(cname, sb.toString()).newInstance();
-		}
-		catch(InstantiationException | IllegalAccessException e) {
-			throw new DMLRuntimeException("Failed to compile FrameBlockMapFunction.", e);
-		}
-	}
-
-	public static abstract class FrameMapFunction implements Serializable {
+	public static class FrameMapFunction implements Serializable {
 		private static final long serialVersionUID = -8398572153616520873L;
-		public abstract String apply(String input);
-	}
-
-	public static abstract class FrameBlockMapFunction implements Serializable {
-		private static final long serialVersionUID = -8398573333616520876L;
-		public abstract FrameBlock apply();
+		public String apply(String input) {return null;}
+		public String apply(String input1, String input2) {	return null;}
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
index a7fdaf4..5c8ed95 100644
--- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
@@ -19,10 +19,6 @@
 
 package org.apache.sysds.runtime.util;
 
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.math3.random.RandomDataGenerator;
 import org.apache.sysds.common.Types.ValueType;
@@ -35,6 +31,10 @@
 import org.apache.sysds.runtime.matrix.data.Pair;
 import org.apache.sysds.runtime.meta.TensorCharacteristics;
 
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
 public class UtilFunctions {
 	// private static final Log LOG = LogFactory.getLog(UtilFunctions.class.getName());
 
@@ -835,6 +835,17 @@
 			.map(DATE_FORMATS::get).orElseThrow(() -> new NullPointerException("Unknown date format."));
 	}
 
+	 public static double jaccardSim(String x, String y) {
+		Set<String> charsX = new LinkedHashSet<>(Arrays.asList(x.split("(?!^)")));
+		Set<String> charsY = new LinkedHashSet<>(Arrays.asList(y.split("(?!^)")));
+
+		final int sa = charsX.size();
+		final int sb = charsY.size();
+		charsX.retainAll(charsY);
+		final int intersection = charsX.size();
+		return 1d / (sa + sb - charsX.size()) * intersection;
+	}
+
 	/**
 	 * Generates a random FrameBlock with given parameters.
 	 * 
diff --git a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinMDTest.java b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinMDTest.java
new file mode 100644
index 0000000..ec3c502
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinMDTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.sysds.test.functions.builtin;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.lops.LopProperties;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class BuiltinMDTest extends AutomatedTestBase {
+	private final static String TEST_NAME = "matching_dependency";
+	private final static String TEST_DIR = "functions/builtin/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + BuiltinMDTest.class.getSimpleName() + "/";
+
+	@Parameterized.Parameter()
+	public double[][] LHSf;
+
+	@Parameterized.Parameter(1)
+	public double[][] LHSt;
+
+	@Parameterized.Parameter(2)
+	public double[][] RHSf;
+
+	@Parameterized.Parameter(3)
+	public double[][] RHSt;
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][] {
+			{new double[][] {{1}}, new double[][] {{0.95}},
+				new double[][] {{5}}, new double[][] {{0.65}}},
+
+			{new double[][] {{1,3}}, new double[][] {{0.7,0.8}},
+				new double[][] {{5}}, new double[][] {{0.8}}},
+
+			{new double[][] {{1,4,5}}, new double[][] {{0.9,0.9,0.9}},
+				new double[][] {{6}}, new double[][] {{0.9}}},
+
+			{new double[][] {{1,4,5}}, new double[][] {{0.75,0.6,0.9}},
+				new double[][] {{3}}, new double[][] {{0.8}}},
+		});
+	}
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"D"}));
+		if (TEST_CACHE_ENABLED) {
+			setOutAndExpectedDeletionDisabled(true);
+		}
+	}
+
+	@Test
+	public void testMDCP() {
+		double[][] D =  {
+			{7567, 231, 1231, 1232, 122, 321},
+			{5321, 23123, 122, 123, 1232, 11},
+			{7267, 3, 223, 432, 1132, 0},
+			{7267, 3, 223, 432, 1132, 500},
+			{7254, 3, 223, 432, 1132, 0},
+		};
+		runMDTests(D, LHSf, LHSt, RHSf, RHSt, LopProperties.ExecType.CP);
+	}
+
+	@Test
+	public void testMDSP() {
+		double[][] D =  {
+			{7567, 231, 1231, 1232, 122, 321},
+			{5321, 23123, 122, 123, 1232, 11},
+			{7267, 3, 223, 432, 1132, 0},
+			{7267, 3, 223, 432, 1132, 500},
+			{7254, 3, 223, 432, 1132, 0},
+		};
+		runMDTests(D, LHSf, LHSt, RHSf, RHSt, LopProperties.ExecType.SPARK);
+	}
+	
+	private void runMDTests(double [][] X , double[][] LHSf, double[][] LHSt, double[][] RHSf, double[][] RHSt, LopProperties.ExecType instType) {
+		Types.ExecMode platformOld = setExecMode(instType);
+		try
+		{
+			loadTestConfiguration(getTestConfiguration(TEST_NAME));
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + TEST_NAME + ".dml";
+			programArgs = new String[]{"-stats","-args", input("X"),
+				input("LHSf"), input("LHSt"), input("RHSf"), input("RHSt"), output("B")};
+
+			double[][] A = getRandomMatrix(20, 6, 50, 500, 1, 2);
+			System.arraycopy(X, 0, A, 0, X.length);
+
+			writeInputMatrixWithMTD("X", A, false);
+			writeInputMatrixWithMTD("LHSf", LHSf, true);
+			writeInputMatrixWithMTD("LHSt", LHSt, true);
+			writeInputMatrixWithMTD("RHSf", RHSf, true);
+			writeInputMatrixWithMTD("RHSt", RHSt, true);
+
+			runTest(true, false, null, -1);
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+		}
+		finally {
+			rtplatform = platformOld;
+		}
+	}
+}
diff --git a/src/test/scripts/functions/builtin/matching_dependency.dml b/src/test/scripts/functions/builtin/matching_dependency.dml
new file mode 100644
index 0000000..0256cc8
--- /dev/null
+++ b/src/test/scripts/functions/builtin/matching_dependency.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# X = read($1, data_type = "frame", format = "csv", header = FALSE);
+X = as.frame(read($1))
+LHSf = read($2);
+LHSt = read($3);
+RHSf = read($4);
+RHSt = read($5);
+B = mdedup(X, LHSf, LHSt, RHSf, RHSt, TRUE);
+write(B, $6);