[SYSTEMDS-2782] MDedup Builtin for finding duplicate rows
DIA project WS2020/21.
Closes #1139.
Date: Mon Jan 11 23:50:57 2021 +0100
diff --git a/docs/site/builtins-reference.md b/docs/site/builtins-reference.md
index d960b25..022b75e 100644
--- a/docs/site/builtins-reference.md
+++ b/docs/site/builtins-reference.md
@@ -56,6 +56,7 @@
* [`slicefinder`-Function](#slicefinder-function)
* [`normalize`-Function](#normalize-function)
* [`gnmf`-Function](#gnmf-function)
+ * [`mdedup`-Function](#mdedup-function)
* [`msvm`-Function](#msvm-function)
* [`naivebayes`-Function](#naivebayes-function)
* [`outlier`-Function](#outlier-function)
@@ -1275,6 +1276,48 @@
y = toOneHot(X,numClasses)
```
+## `mdedup`-Function
+
+The `mdedup`-function implements builtin for deduplication using matching dependencies
+(e.g. Street 0.95, City 0.90 -> ZIP 1.0) by Jaccard distance.
+
+### Usage
+
+```r
+mdedup(X, Y, intercept, epsilon, lamda, maxIterations, verbose)
+```
+
+
+### Arguments
+
+| Name | Type | Default | Description |
+| :------ | :------------- | -------- | :---------- |
+| X | Frame | --- | Input Frame X |
+| LHSfeatures | Matrix[Integer] | --- | A matrix 1xd with numbers of columns for MDs |
+| LHSthreshold | Matrix[Double] | --- | A matrix 1xd with threshold values in interval [0, 1] for MDs |
+| RHSfeatures | Matrix[Integer] | --- | A matrix 1xd with numbers of columns for MDs |
+| RHSthreshold | Matrix[Double] | --- | A matrix 1xd with threshold values in interval [0, 1] for MDs |
+| verbose | Boolean | False | Set to true to print duplicates.|
+
+
+### Returns
+
+| Type | Default | Description |
+| :-------------- | -------- | :---------- |
+| Matrix[Integer] | --- | Matrix of duplicates (rows). |
+
+
+### Example
+
+```r
+X = as.frame(rand(rows = 50, cols = 10))
+LHSfeatures = matrix("1 3 19", 1, 2)
+LHSthreshold = matrix("0.85 0.85", 1, 2)
+RHSfeatures = matrix("30", 1, 1)
+RHSthreshold = matrix("1.0", 1, 1)
+duplicates = mdedup(X, LHSfeatures, LHSthreshold, RHSfeatures, RHSthreshold, verbose = FALSE)
+```
+
## `msvm`-Function
The `msvm`-function implements builtin multiclass SVM with squared slack variables
diff --git a/docs/site/dml-language-reference.md b/docs/site/dml-language-reference.md
index 2f3bd16..27bbbc6 100644
--- a/docs/site/dml-language-reference.md
+++ b/docs/site/dml-language-reference.md
@@ -2067,7 +2067,24 @@
WEST
EAST
-
+It is also possible to compute Jaccard similarity matrix of rows of a vector.
+<code> dist = map(Xi, "(x, y) -> UtilFunctions.jaccardSim(x, y)") <br/>
+print(toString(dist)) </code>
+
+ # FRAME: nrow = 10, ncol = 10
+ # DOUBLE
+ # 0,000 0,286 0,125 0,600 0,286 0,125 0,125 1,000 1,000 0,600
+ 0,286 0,000 0,429 0,286 1,000 0,429 0,429 0,286 0,286 0,286
+ 0,125 0,429 0,000 0,125 0,429 1,000 1,000 0,125 0,125 0,125
+ 0,600 0,286 0,125 0,000 0,286 0,125 0,125 0,600 0,600 1,000
+ 0,286 1,000 0,429 0,286 0,000 0,429 0,429 0,286 0,286 0,286
+ 0,125 0,429 1,000 0,125 0,429 0,000 1,000 0,125 0,125 0,125
+ 0,125 0,429 1,000 0,125 0,429 1,000 0,000 0,125 0,125 0,125
+ 1,000 0,286 0,125 0,600 0,286 0,125 0,125 0,000 1,000 0,600
+ 1,000 0,286 0,125 0,600 0,286 0,125 0,125 1,000 0,000 0,600
+ 0,600 0,286 0,125 1,000 0,286 0,125 0,125 0,600 0,600 0,000
+ #
+
* * *
## Modules
diff --git a/scripts/builtin/discoverFD.dml b/scripts/builtin/discoverFD.dml
index 0d787bd..49d013b 100644
--- a/scripts/builtin/discoverFD.dml
+++ b/scripts/builtin/discoverFD.dml
@@ -56,12 +56,12 @@
# allocate output and working sets
n = nrow(X)
d = ncol(X)
- FD = matrix(0, d, d)
+ FD = diag(matrix(1, d, 1))
cm = matrix(0, 1, d)
# num distinct per column
parfor(i in 1:d)
- cm[1,i] = colDistinct(X[,i])
+ cm[1,i] = colDistinct(X[,i])
# add know functional dependencies
FD = FD + (cm == 1) # constant columns determined by all columns
diff --git a/scripts/builtin/mdedup.dml b/scripts/builtin/mdedup.dml
new file mode 100644
index 0000000..f6af1d4
--- /dev/null
+++ b/scripts/builtin/mdedup.dml
@@ -0,0 +1,119 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#------------------------------------------------------------------------------------------------------------------
+
+# Implements builtin for deduplication using matching dependencies (e.g. Street 0.95, City 0.90 -> ZIP 1.0)
+# and Jaccard distance.
+#
+# INPUT PARAMETERS:
+# -----------------------------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# -----------------------------------------------------------------------------------------------------------------
+# X Frame -- Input Frame X
+# LHSfeatures Matrix[Integer] -- A matrix 1xd with numbers of columns for MDs
+# (e.g. Street 0.95, City 0.90 -> ZIP 1.0)
+# LHSthreshold Matrix[Double] -- A matrix 1xd with threshold values in interval [0, 1] for MDs
+# RHSfeatures Matrix[Integer] -- A matrix 1xd with numbers of columns for MDs
+# RHSthreshold Matrix[Double] -- A matrix 1xd with threshold values in interval [0, 1] for MDs
+# verbose Boolean -- To print the output
+# -----------------------------------------------------------------------------------------------------------------
+#
+# Output(s)
+# -----------------------------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# -----------------------------------------------------------------------------------------------------------------
+# MD Matrix[Double] --- Matrix nx1 of duplicates
+
+s_mdedup = function(Frame[String] X, Matrix[Double] LHSfeatures, Matrix[Double] LHSthreshold,
+ Matrix[Double] RHSfeatures, Matrix[Double] RHSthreshold, Boolean verbose)
+ return(Matrix[Double] MD)
+{
+ n = nrow(X)
+ d = ncol(X)
+
+ if (0 > (ncol(LHSfeatures) + ncol(RHSfeatures)) > d)
+ stop("Invalid input: thresholds should in interval [0, " + d + "]")
+
+ if ((ncol(LHSfeatures) != ncol(LHSthreshold)) | (ncol(RHSfeatures) != ncol(RHSthreshold)))
+ stop("Invalid input: number of thresholds and columns to compare should be equal for LHS and RHS.")
+
+ if (max(LHSfeatures) > d | max(RHSfeatures) > d)
+ stop("Invalid input: feature values should be less than " + d)
+
+ if (sum(LHSthreshold > 1) > 0 | sum(RHSthreshold > 1) > 0)
+ stop("Invalid input: threshold values should be in the interval [0, 1].")
+
+ MD = matrix(0, n, 1)
+ LHS_MD = getMDAdjacency(X, LHSfeatures, LHSthreshold)
+ RHS_MD = matrix(0, n, n)
+
+ if (sum(LHS_MD) > 0) {
+ RHS_MD = getMDAdjacency(X, RHSfeatures, RHSthreshold)
+ }
+
+ MD = detectDuplicates(LHS_MD, RHS_MD)
+
+ if(verbose)
+ print(toString(MD))
+}
+
+getMDAdjacency = function(Frame[String] X, Matrix[Double] features, Matrix[Double] thresholds)
+ return(Matrix[Double] adjacency)
+{
+ n = nrow(X)
+ d = ncol(X)
+ adjacency = matrix(0, n, n)
+
+ i = 1
+ while (i <= ncol(features)) {
+ # slice col
+ pos = as.scalar(features[1, i])
+ Xi = X[, pos]
+ # distances between words in each row of col
+ dist = map(Xi, "(x, y) -> UtilFunctions.jaccardSim(x, y)")
+ jaccardDist = as.matrix(dist)
+ jaccardDist = jaccardDist + t(jaccardDist)
+ threshold = as.scalar(thresholds[1, i])
+
+ if(i == 1) {
+ adjacency = jaccardDist >= threshold
+ } else {
+ adjacency = adjacency & (jaccardDist >= threshold)
+ }
+
+ # break if one of MDs is false
+ if (sum(adjacency) == 0)
+ i = ncol(features)
+
+ i = i + 1
+ }
+}
+
+detectDuplicates = function(Matrix[Double] LHS_adj, Matrix[Double] RHS_adj)
+ return(Matrix[Double] MD)
+{
+
+ n = nrow(LHS_adj)
+ adjacency = LHS_adj * RHS_adj
+ # find duplicates
+ # TODO size propagation issue of adjacency matrix inside components call
+ colDuplicates = components(G=adjacency[1:n, 1:n], verbose=FALSE)
+ MD = colDuplicates * (rowSums(adjacency[1:n, 1:n]) > 0)
+}
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java b/src/main/java/org/apache/sysds/common/Builtins.java
index 1a230bf..f18ec9c 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -97,6 +97,7 @@
DETECTSCHEMA("detectSchema", false),
DIAG("diag", false),
DISCOVER_FD("discoverFD", true),
+ DISCOVER_MD("mdedup", true),
DIST("dist", true),
DMV("dmv", true),
DROP_INVALID_TYPE("dropInvalidType", false),
diff --git a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
index d4d8296..ba3fbe3 100644
--- a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
+++ b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
@@ -1562,9 +1562,16 @@
checkMatrixFrameParam(getFirstExpr());
checkScalarParam(getSecondExpr());
output.setDataType(DataType.FRAME);
- output.setDimensions(id.getDim1(), 1);
+ if(_args[1].getText().contains("jaccardSim")) {
+ output.setDimensions(id.getDim1(), id.getDim1());
+ output.setValueType(ValueType.FP64);
+ }
+ else {
+ output.setDimensions(id.getDim1(), 1);
+ output.setValueType(ValueType.STRING);
+ }
output.setBlocksize (id.getBlocksize());
- output.setValueType(ValueType.STRING);
+
break;
default:
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameScalarSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameScalarSPInstruction.java
index a395c16..b5cf078 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameScalarSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameScalarSPInstruction.java
@@ -44,6 +44,11 @@
// Create local compiled functions (once) and execute on RDD
JavaPairRDD<Long, FrameBlock> out = in1.mapValues(new RDDStringProcessing(expression));
+ if(expression.contains("jaccardSim")) {
+ long rows = sec.getDataCharacteristics(output.getName()).getRows();
+ sec.getDataCharacteristics(output.getName()).setDimension(rows, rows);
+ }
+
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
}
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index e33052d..227fa0c 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -70,33 +70,33 @@
//internal configuration
private static final boolean REUSE_RECODE_MAPS = true;
-
+
/** The number of rows of the FrameBlock */
private int _numRows = -1;
-
+
/** The schema of the data frame as an ordered list of value types */
- private ValueType[] _schema = null;
-
+ private ValueType[] _schema = null;
+
/** The column names of the data frame as an ordered list of strings, allocated on-demand */
private String[] _colnames = null;
-
+
private ColumnMetadata[] _colmeta = null;
-
+
/** The data frame data as an ordered list of columns */
private Array[] _coldata = null;
-
+
/** Cached size in memory to avoid repeated scans of string columns */
long _msize = -1;
-
+
public FrameBlock() {
_numRows = 0;
}
-
+
/**
* Copy constructor for frame blocks, which uses a shallow copy for
- * the schema (column types and names) but a deep copy for meta data
+ * the schema (column types and names) but a deep copy for meta data
* and actual column data.
- *
+ *
* @param that frame block
*/
public FrameBlock(FrameBlock that) {
@@ -104,7 +104,7 @@
copy(that);
setColumnMetadata(that.getColumnMetadata());
}
-
+
public FrameBlock(int ncols, ValueType vt) {
this();
_schema = UtilFunctions.nCopies(ncols, vt);
@@ -113,21 +113,21 @@
for( int j=0; j<ncols; j++ )
_colmeta[j] = new ColumnMetadata(0);
}
-
+
public FrameBlock(ValueType[] schema) {
this(schema, new String[0][]);
}
-
+
public FrameBlock(ValueType[] schema, String[] names) {
this(schema, names, new String[0][]);
}
-
+
public FrameBlock(ValueType[] schema, String[][] data) {
//default column names not materialized
this(schema, null, data);
}
-
+
public FrameBlock(ValueType[] schema, String[] names, String[][] data) {
_numRows = 0; //maintained on append
_schema = schema;
@@ -138,10 +138,10 @@
for( int i=0; i<data.length; i++ )
appendRow(data[i]);
}
-
+
/**
* Get the number of rows of the frame block.
- *
+ *
* @return number of rows
*/
@Override
@@ -152,30 +152,30 @@
public void setNumRows(int numRows) {
_numRows = numRows;
}
-
+
/**
* Get the number of columns of the frame block, that is
* the number of columns defined in the schema.
- *
+ *
* @return number of columns
*/
@Override
public int getNumColumns() {
return (_schema != null) ? _schema.length : 0;
}
-
+
/**
* Returns the schema of the frame block.
- *
+ *
* @return schema as array of ValueTypes
*/
public ValueType[] getSchema() {
return _schema;
}
-
+
/**
* Sets the schema of the frame block.
- *
+ *
* @param schema schema as array of ValueTypes
*/
public void setSchema(ValueType[] schema) {
@@ -183,26 +183,26 @@
}
/**
- * Returns the column names of the frame block. This method
+ * Returns the column names of the frame block. This method
* allocates default column names if required.
- *
+ *
* @return column names
*/
public String[] getColumnNames() {
return getColumnNames(true);
}
-
-
+
+
public FrameBlock getColumnNamesAsFrame() {
FrameBlock fb = new FrameBlock(getNumColumns(), ValueType.STRING);
fb.appendRow(getColumnNames());
return fb;
}
-
+
/**
- * Returns the column names of the frame block. This method
+ * Returns the column names of the frame block. This method
* allocates default column names if required.
- *
+ *
* @param alloc if true, create column names
* @return array of column names
*/
@@ -211,11 +211,11 @@
_colnames = createColNames(getNumColumns());
return _colnames;
}
-
+
/**
- * Returns the column name for the requested column. This
+ * Returns the column name for the requested column. This
* method allocates default column names if required.
- *
+ *
* @param c column index
* @return column name
*/
@@ -256,24 +256,24 @@
public void setColumnMetadata(int c, ColumnMetadata colmeta) {
_colmeta[c] = colmeta;
}
-
+
/**
- * Creates a mapping from column names to column IDs, i.e.,
+ * Creates a mapping from column names to column IDs, i.e.,
* 1-based column indexes
- *
+ *
* @return map of column name keys and id values
*/
public Map<String,Integer> getColumnNameIDMap() {
Map<String, Integer> ret = new HashMap<>();
for( int j=0; j<getNumColumns(); j++ )
ret.put(getColumnName(j), j+1);
- return ret;
+ return ret;
}
-
+
/**
* Allocate column data structures if necessary, i.e., if schema specified
* but not all column data structures created yet.
- *
+ *
* @param numRows number of rows
*/
public void ensureAllocatedColumns(int numRows) {
@@ -310,10 +310,10 @@
}
_numRows = numRows;
}
-
+
/**
* Checks for matching column sizes in case of existing columns.
- *
+ *
* @param newlen number of rows to compare with existing number of rows
*/
public void ensureColumnCompatibility(int newlen) {
@@ -340,11 +340,11 @@
boolean ret = (_colnames != null);
for( int j=0; j<getNumColumns() && ret; j++ )
ret &= isColNameDefault(j);
- return ret;
+ return ret;
}
public boolean isColNameDefault(int i) {
- return _colnames==null
+ return _colnames==null
|| _colnames[i].equals("C"+(i+1));
}
@@ -359,10 +359,10 @@
///////
// basic get and set functionality
-
+
/**
* Gets a boxed object of the value in position (r,c).
- *
+ *
* @param r row index, 0-based
* @param c column index, 0-based
* @return object of the value at specified position
@@ -370,11 +370,11 @@
public Object get(int r, int c) {
return _coldata[c].get(r);
}
-
+
/**
* Sets the value in position (r,c), where the input is assumed
* to be a boxed object consistent with the schema definition.
- *
+ *
* @param r row index
* @param c column index
* @param val value to set at specified position
@@ -404,12 +404,12 @@
public void reset() {
reset(0, true);
}
-
+
/**
* Append a row to the end of the data frame, where all row fields
* are boxed objects according to the schema.
- *
+ *
* @param row array of objects
*/
public void appendRow(Object[] row) {
@@ -418,11 +418,11 @@
_coldata[j].append(row[j]);
_numRows++;
}
-
+
/**
* Append a row to the end of the data frame, where all row fields
* are string encoded.
- *
+ *
* @param row array of strings
*/
public void appendRow(String[] row) {
@@ -431,12 +431,12 @@
_coldata[j].append(row[j]);
_numRows++;
}
-
+
/**
- * Append a column of value type STRING as the last column of
- * the data frame. The given array is wrapped but not copied
+ * Append a column of value type STRING as the last column of
+ * the data frame. The given array is wrapped but not copied
* and hence might be updated in the future.
- *
+ *
* @param col array of strings
*/
public void appendColumn(String[] col) {
@@ -449,12 +449,12 @@
_numRows = col.length;
_msize = -1;
}
-
+
/**
- * Append a column of value type BOOLEAN as the last column of
- * the data frame. The given array is wrapped but not copied
+ * Append a column of value type BOOLEAN as the last column of
+ * the data frame. The given array is wrapped but not copied
* and hence might be updated in the future.
- *
+ *
* @param col array of booleans
*/
public void appendColumn(boolean[] col) {
@@ -463,16 +463,16 @@
_schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.BOOLEAN);
_colnames = (String[]) ArrayUtils.add(colnames, createColName(_schema.length));
_coldata = (_coldata==null) ? new Array[]{new BooleanArray(col)} :
- (Array[]) ArrayUtils.add(_coldata, new BooleanArray(col));
+ (Array[]) ArrayUtils.add(_coldata, new BooleanArray(col));
_numRows = col.length;
_msize = -1;
}
-
+
/**
- * Append a column of value type INT as the last column of
- * the data frame. The given array is wrapped but not copied
+ * Append a column of value type INT as the last column of
+ * the data frame. The given array is wrapped but not copied
* and hence might be updated in the future.
- *
+ *
* @param col array of longs
*/
public void appendColumn(int[] col) {
@@ -502,12 +502,12 @@
_numRows = col.length;
_msize = -1;
}
-
+
/**
* Append a column of value type float as the last column of
* the data frame. The given array is wrapped but not copied
* and hence might be updated in the future.
- *
+ *
* @param col array of doubles
*/
public void appendColumn(float[] col) {
@@ -537,12 +537,12 @@
_numRows = col.length;
_msize = -1;
}
-
+
/**
* Append a set of column of value type DOUBLE at the end of the frame
- * in order to avoid repeated allocation with appendColumns. The given
+ * in order to avoid repeated allocation with appendColumns. The given
* array is wrapped but not copied and hence might be updated in the future.
- *
+ *
* @param cols 2d array of doubles
*/
public void appendColumns(double[][] cols) {
@@ -552,9 +552,9 @@
Array[] tmpData = new Array[ncol];
for( int j=0; j<ncol; j++ )
tmpData[j] = new DoubleArray(cols[j]);
- _colnames = empty ? null : (String[]) ArrayUtils.addAll(getColumnNames(),
+ _colnames = empty ? null : (String[]) ArrayUtils.addAll(getColumnNames(),
createColNames(getNumColumns(), ncol)); //before schema modification
- _schema = empty ? tmpSchema : (ValueType[]) ArrayUtils.addAll(_schema, tmpSchema);
+ _schema = empty ? tmpSchema : (ValueType[]) ArrayUtils.addAll(_schema, tmpSchema);
_coldata = empty ? tmpData : (Array[]) ArrayUtils.addAll(_coldata, tmpData);
_numRows = cols[0].length;
_msize = -1;
@@ -562,50 +562,50 @@
public Object getColumnData(int c) {
switch(_schema[c]) {
- case STRING: return ((StringArray)_coldata[c])._data;
+ case STRING: return ((StringArray)_coldata[c])._data;
case BOOLEAN: return ((BooleanArray)_coldata[c])._data;
case INT64: return ((LongArray)_coldata[c])._data;
case FP64: return ((DoubleArray)_coldata[c])._data;
default: return null;
}
}
-
+
public Array getColumn(int c) {
- return _coldata[c];
+ return _coldata[c];
}
-
+
public void setColumn(int c, Array column) {
if( _coldata == null )
_coldata = new Array[getNumColumns()];
_coldata[c] = column;
_msize = -1;
}
-
+
/**
* Get a row iterator over the frame where all fields are encoded
* as strings independent of their value types.
- *
+ *
* @return string array iterator
*/
public Iterator<String[]> getStringRowIterator() {
return new StringRowIterator(0, _numRows);
}
-
+
/**
- * Get a row iterator over the frame where all selected fields are
- * encoded as strings independent of their value types.
- *
+ * Get a row iterator over the frame where all selected fields are
+ * encoded as strings independent of their value types.
+ *
* @param cols column selection, 1-based
* @return string array iterator
*/
public Iterator<String[]> getStringRowIterator(int[] cols) {
return new StringRowIterator(0, _numRows, cols);
}
-
+
/**
* Get a row iterator over the frame where all fields are encoded
- * as strings independent of their value types.
- *
+ * as strings independent of their value types.
+ *
* @param rl lower row index
* @param ru upper row index
* @return string array iterator
@@ -613,11 +613,11 @@
public Iterator<String[]> getStringRowIterator(int rl, int ru) {
return new StringRowIterator(rl, ru);
}
-
+
/**
- * Get a row iterator over the frame where all selected fields are
- * encoded as strings independent of their value types.
- *
+ * Get a row iterator over the frame where all selected fields are
+ * encoded as strings independent of their value types.
+ *
* @param rl lower row index
* @param ru upper row index
* @param cols column selection, 1-based
@@ -626,22 +626,22 @@
public Iterator<String[]> getStringRowIterator(int rl, int ru, int[] cols) {
return new StringRowIterator(rl, ru, cols);
}
-
+
/**
* Get a row iterator over the frame where all fields are encoded
- * as boxed objects according to their value types.
- *
+ * as boxed objects according to their value types.
+ *
* @return object array iterator
*/
public Iterator<Object[]> getObjectRowIterator() {
return new ObjectRowIterator(0, _numRows);
}
-
+
/**
* Get a row iterator over the frame where all fields are encoded
* as boxed objects according to the value types of the provided
* target schema.
- *
+ *
* @param schema target schema of objects
* @return object array iterator
*/
@@ -650,22 +650,22 @@
iter.setSchema(schema);
return iter;
}
-
+
/**
- * Get a row iterator over the frame where all selected fields are
- * encoded as boxed objects according to their value types.
- *
+ * Get a row iterator over the frame where all selected fields are
+ * encoded as boxed objects according to their value types.
+ *
* @param cols column selection, 1-based
* @return object array iterator
*/
public Iterator<Object[]> getObjectRowIterator(int[] cols) {
return new ObjectRowIterator(0, _numRows, cols);
}
-
+
/**
* Get a row iterator over the frame where all fields are encoded
- * as boxed objects according to their value types.
- *
+ * as boxed objects according to their value types.
+ *
* @param rl lower row index
* @param ru upper row index
* @return object array iterator
@@ -673,11 +673,11 @@
public Iterator<Object[]> getObjectRowIterator(int rl, int ru) {
return new ObjectRowIterator(rl, ru);
}
-
+
/**
- * Get a row iterator over the frame where all selected fields are
- * encoded as boxed objects according to their value types.
- *
+ * Get a row iterator over the frame where all selected fields are
+ * encoded as boxed objects according to their value types.
+ *
* @param rl lower row index
* @param ru upper row index
* @param cols column selection, 1-based
@@ -692,7 +692,7 @@
// FIXME for FrameBlock fix write and readFields, it does not work if the Arrays are not yet
// allocated (after fixing remove hack in FederatedWorkerHandler.createFrameEncodeMeta(FederatedRequest) call to
// FrameBlock.ensureAllocatedColumns())
-
+
@Override
public void write(DataOutput out) throws IOException {
boolean isDefaultMeta = isColNamesDefault()
@@ -707,7 +707,7 @@
if( !isDefaultMeta ) {
out.writeUTF(getColumnName(j));
out.writeLong(_colmeta[j].getNumDistinct());
- out.writeUTF( (_colmeta[j].getMvValue()!=null) ?
+ out.writeUTF( (_colmeta[j].getMvValue()!=null) ?
_colmeta[j].getMvValue() : "" );
}
_coldata[j].write(out);
@@ -721,13 +721,13 @@
int numCols = in.readInt();
boolean isDefaultMeta = in.readBoolean();
//allocate schema/meta data arrays
- _schema = (_schema!=null && _schema.length==numCols) ?
+ _schema = (_schema!=null && _schema.length==numCols) ?
_schema : new ValueType[numCols];
- _colnames = (_colnames != null && _colnames.length==numCols) ?
+ _colnames = (_colnames != null && _colnames.length==numCols) ?
_colnames : new String[numCols];
- _colmeta = (_colmeta != null && _colmeta.length==numCols) ?
+ _colmeta = (_colmeta != null && _colmeta.length==numCols) ?
_colmeta : new ColumnMetadata[numCols];
- _coldata = (_coldata!=null && _coldata.length==numCols) ?
+ _coldata = (_coldata!=null && _coldata.length==numCols) ?
_coldata : new Array[numCols];
//read columns (value type, meta, data)
for( int j=0; j<numCols; j++ ) {
@@ -748,7 +748,7 @@
arr.readFields(in);
_schema[j] = vt;
_colnames[j] = name;
- _colmeta[j] = new ColumnMetadata(ndistinct,
+ _colmeta[j] = new ColumnMetadata(ndistinct,
(mvvalue==null || mvvalue.isEmpty()) ? null : mvvalue);
_coldata[j] = arr;
}
@@ -766,35 +766,35 @@
//redirect deserialization to writable impl
readFields(in);
}
-
+
////////
// CacheBlock implementation
-
+
@Override
public long getInMemorySize() {
//reuse previously computed size
if( _msize > 0 )
return _msize;
-
+
//frame block header
long size = 16 + 4; //object, num rows
-
+
//schema array (overhead and int entries)
int clen = getNumColumns();
size += 8 + 32 + clen * 4;
-
+
//colname array (overhead and string entries)
size += 8 + ((_colnames!=null) ? 32 : 0);
for( int j=0; j<clen && _colnames!=null; j++ )
size += getInMemoryStringSize(getColumnName(j));
-
+
//meta data array (overhead and entries)
size += 8 + 32;
for( int j=0; j<clen; j++ ) {
- size += 16 + 8 + 8 //object, long num distinct, ref mv
+ size += 16 + 8 + 8 //object, long num distinct, ref mv
+ getInMemoryStringSize(_colmeta[j].getMvValue());
}
-
+
//data array (overhead and entries)
size += 8 + 32 + clen * (16+4+8+32);
for( int j=0; j<clen; j++ ) {
@@ -802,7 +802,7 @@
case BOOLEAN: size += _numRows; break;
case INT64:
case FP64: size += 8*_numRows; break;
- case STRING:
+ case STRING:
StringArray arr = (StringArray)_coldata[j];
for( int i=0; i<_numRows; i++ )
size += getInMemoryStringSize(arr.get(i));
@@ -810,15 +810,15 @@
default: //not applicable
}
}
-
+
return _msize = size;
}
-
+
@Override
public long getExactSerializedSize() {
//header: 2xint, boolean
long size = 9;
-
+
//column sizes
boolean isDefaultMeta = isColNamesDefault()
&& isColumnMetadataDefault();
@@ -833,7 +833,7 @@
case BOOLEAN: size += _numRows; break;
case INT64:
case FP64: size += 8*_numRows; break;
- case STRING:
+ case STRING:
StringArray arr = (StringArray)_coldata[j];
for( int i=0; i<_numRows; i++ )
size += IOUtilFunctions.getUTFSize(arr.get(i));
@@ -841,15 +841,15 @@
default: //not applicable
}
}
-
+
return size;
}
-
+
@Override
public boolean isShallowSerialize() {
return isShallowSerialize(false);
}
-
+
@Override
public boolean isShallowSerialize(boolean inclConvert) {
//shallow serialize if non-string schema because a frame block
@@ -859,20 +859,20 @@
ret &= (_schema[j] != ValueType.STRING);
return ret;
}
-
- @Override
+
+ @Override
public void toShallowSerializeBlock() {
//do nothing (not applicable).
}
-
+
@Override
public void compactEmptyBlock() {
//do nothing
}
-
+
/**
- * Returns the in-memory size in bytes of the given string value.
- *
+ * Returns the in-memory size in bytes of the given string value.
+ *
* @param value string value
* @return in-memory size of string value
*/
@@ -880,9 +880,9 @@
if( value == null )
return 0;
return 16 + 4 + 8 //object, hash, array ref
- + 32 + value.length(); //char array
+ + 32 + value.length(); //char array
}
-
+
/**
* This method performs the value comparison on two frames
* if the values in both frames are equal, not equal, less than, greater than, less than/greater than and equal to
@@ -947,7 +947,7 @@
return new FrameBlock(UtilFunctions.nCopies(this.getNumColumns(), ValueType.BOOLEAN), outputData);
}
-
+
private static boolean checkAndSetEmpty(FrameBlock fb1, FrameBlock fb2, String[][] out, int r, int c) {
if(fb1.get(r, c) == null || fb2.get(r, c) == null) {
out[r][c] = (fb1.get(r, c) == null && fb2.get(r, c) == null) ? "true" : "false";
@@ -955,13 +955,13 @@
}
return false;
}
-
+
///////
// indexing and append operations
-
+
public FrameBlock leftIndexingOperations(FrameBlock rhsFrame, IndexRange ixrange, FrameBlock ret) {
- return leftIndexingOperations(rhsFrame,
- (int)ixrange.rowStart, (int)ixrange.rowEnd,
+ return leftIndexingOperations(rhsFrame,
+ (int)ixrange.rowStart, (int)ixrange.rowEnd,
(int)ixrange.colStart, (int)ixrange.colEnd, ret);
}
@@ -971,7 +971,7 @@
|| cl < 0 || cu >= getNumColumns() || cu < cl || cu >= getNumColumns() ) {
throw new DMLRuntimeException("Invalid values for frame indexing: ["+(rl+1)+":"+(ru+1)+"," + (cl+1)+":"+(cu+1)+"] " +
"must be within frame dimensions ["+getNumRows()+","+getNumColumns()+"].");
- }
+ }
if ( (ru-rl+1) < rhsFrame.getNumRows() || (cu-cl+1) < rhsFrame.getNumColumns()) {
throw new DMLRuntimeException("Invalid values for frame indexing: " +
@@ -979,8 +979,8 @@
"do not match the shape of the frame specified by indices [" +
(rl+1) +":" + (ru+1) + ", " + (cl+1) + ":" + (cu+1) + "].");
}
-
-
+
+
//allocate output frame (incl deep copy schema)
if( ret == null )
ret = new FrameBlock();
@@ -989,7 +989,7 @@
ret._colnames = (_colnames != null) ? _colnames.clone() : null;
ret._colmeta = _colmeta.clone();
ret._coldata = new Array[getNumColumns()];
-
+
//copy data to output and partial overwrite w/ rhs
for( int j=0; j<getNumColumns(); j++ ) {
Array tmp = _coldata[j].clone();
@@ -1006,7 +1006,7 @@
}
ret._coldata[j] = tmp;
}
-
+
return ret;
}
@@ -1015,11 +1015,11 @@
(int)ixrange.rowStart, (int)ixrange.rowEnd,
(int)ixrange.colStart, (int)ixrange.colEnd, ret);
}
-
+
/**
- * Right indexing operations to slice a subframe out of this frame block.
+ * Right indexing operations to slice a subframe out of this frame block.
* Note that the existing column value types are preserved.
- *
+ *
* @param rl row lower index, inclusive, 0-based
* @param ru row upper index, inclusive, 0-based
* @param cl column lower index, inclusive, 0-based
@@ -1036,34 +1036,34 @@
throw new DMLRuntimeException("Invalid values for frame indexing: ["+(rl+1)+":"+(ru+1)+"," + (cl+1)+":"+(cu+1)+"] " +
"must be within frame dimensions ["+getNumRows()+","+getNumColumns()+"]");
}
-
+
//allocate output frame
if( ret == null )
ret = new FrameBlock();
else
ret.reset(ru-rl+1, true);
-
+
//copy output schema and colnames
int numCols = cu-cl+1;
boolean isDefNames = isColNamesDefault();
ret._schema = new ValueType[numCols];
ret._colnames = !isDefNames ? new String[numCols] : null;
ret._colmeta = new ColumnMetadata[numCols];
-
+
for( int j=cl; j<=cu; j++ ) {
ret._schema[j-cl] = _schema[j];
ret._colmeta[j-cl] = _colmeta[j];
if( !isDefNames )
ret._colnames[j-cl] = getColumnName(j);
- }
+ }
ret._numRows = ru-rl+1;
if(ret._coldata == null )
ret._coldata = new Array[numCols];
-
- //fast-path: shallow copy column indexing
+
+ //fast-path: shallow copy column indexing
if( ret._numRows == _numRows ) {
//this shallow copy does not only avoid an array copy, but
- //also allows for bi-directional reuses of recodemaps
+ //also allows for bi-directional reuses of recodemaps
for( int j=cl; j<=cu; j++ )
ret._coldata[j-cl] = _coldata[j];
}
@@ -1078,23 +1078,23 @@
}
return ret;
}
-
-
+
+
public void slice(ArrayList<Pair<Long,FrameBlock>> outlist, IndexRange range, int rowCut)
{
FrameBlock top=null, bottom=null;
Iterator<Pair<Long,FrameBlock>> p=outlist.iterator();
-
+
if(range.rowStart<rowCut)
top = p.next().getValue();
-
+
if(range.rowEnd>=rowCut)
bottom = p.next().getValue();
-
+
if(getNumRows() > 0)
{
int r=(int) range.rowStart;
-
+
for(; r<Math.min(rowCut, range.rowEnd+1); r++)
{
Object[] row = new Object[(int) (range.colEnd-range.colStart+1)];
@@ -1114,11 +1114,11 @@
}
/**
- * Appends the given argument frameblock 'that' to this frameblock by
+ * Appends the given argument frameblock 'that' to this frameblock by
* creating a deep copy to prevent side effects. For cbind, the frames
- * are appended column-wise (same number of rows), while for rbind the
- * frames are appended row-wise (same number of columns).
- *
+ * are appended column-wise (same number of rows), while for rbind the
+ * frames are appended row-wise (same number of columns).
+ *
* @param that frame block to append to current frame block
* @param ret frame block to return, can be null
* @param cbind if true, column append
@@ -1132,21 +1132,21 @@
throw new DMLRuntimeException("Incompatible number of rows for cbind: "+
that.getNumRows()+" (expected: "+getNumRows()+")");
}
-
+
//allocate output frame
if( ret == null )
ret = new FrameBlock();
ret._numRows = _numRows;
-
+
//concatenate schemas (w/ deep copy to prevent side effects)
ret._schema = (ValueType[]) ArrayUtils.addAll(_schema, that._schema);
ret._colnames = (String[]) ArrayUtils.addAll(getColumnNames(), that.getColumnNames());
ret._colmeta = (ColumnMetadata[]) ArrayUtils.addAll(_colmeta, that._colmeta);
-
+
//check and enforce unique columns names
if( !Arrays.stream(ret._colnames).allMatch(new HashSet<>()::add) )
ret._colnames = createColNames(ret.getNumColumns());
-
+
//concatenate column data (w/ shallow copy which is safe due to copy on write semantics)
ret._coldata = (Array[]) ArrayUtils.addAll(_coldata, that._coldata);
}
@@ -1157,7 +1157,7 @@
throw new DMLRuntimeException("Incompatible number of columns for rbind: "+
that.getNumColumns()+" (expected: "+getNumColumns()+")");
}
-
+
//allocate output frame (incl deep copy schema)
if( ret == null )
ret = new FrameBlock();
@@ -1167,7 +1167,7 @@
ret._colmeta = new ColumnMetadata[getNumColumns()];
for( int j=0; j<_schema.length; j++ )
ret._colmeta[j] = new ColumnMetadata(0);
-
+
//concatenate data (deep copy first, append second)
ret._coldata = new Array[getNumColumns()];
for( int j=0; j<getNumColumns(); j++ )
@@ -1183,32 +1183,32 @@
copy(0, src.getNumRows()-1, 0, src.getNumColumns()-1, src);
}
- public void copy(int rl, int ru, int cl, int cu, FrameBlock src)
+ public void copy(int rl, int ru, int cl, int cu, FrameBlock src)
{
//allocate columns if necessary
ensureAllocatedColumns(ru-rl+1);
-
+
//copy values
for( int j=cl; j<=cu; j++ ) {
- //special case: column memcopy
+ //special case: column memcopy
if( _schema[j].equals(src._schema[j-cl]) )
_coldata[j].set(rl, ru, src._coldata[j-cl]);
//general case w/ schema transformation
- else
+ else
for( int i=rl; i<=ru; i++ ) {
String tmp = src.get(i-rl, j-cl)!=null ? src.get(i-rl, j-cl).toString() : null;
set(i, j, UtilFunctions.stringToObject(_schema[j], tmp));
}
}
}
-
-
+
+
///////
// transform specific functionality
-
+
/**
- * This function will split every Recode map in the column using delimiter Lop.DATATYPE_PREFIX,
- * as Recode map generated earlier in the form of Code+Lop.DATATYPE_PREFIX+Token and store it in a map
+ * This function will split every Recode map in the column using delimiter Lop.DATATYPE_PREFIX,
+ * as Recode map generated earlier in the form of Code+Lop.DATATYPE_PREFIX+Token and store it in a map
* which contains token and code for every unique tokens.
*
* @param col is the column # from frame data which contains Recode map generated earlier.
@@ -1221,10 +1221,10 @@
HashMap<String,Long> map = (tmp!=null) ? tmp.get() : null;
if( map != null ) return map;
}
-
+
//construct recode map
HashMap<String,Long> map = new HashMap<>();
- Array ldata = _coldata[col];
+ Array ldata = _coldata[col];
for( int i=0; i<getNumRows(); i++ ) {
Object val = ldata.get(i);
if( val != null ) {
@@ -1232,11 +1232,11 @@
map.put(tmp[0], Long.parseLong(tmp[1]));
}
}
-
+
//put created map into cache
if( REUSE_RECODE_MAPS )
_coldata[col]._rcdMapCache = new SoftReference<>(map);
-
+
return map;
}
@@ -1249,22 +1249,22 @@
//check for empty input source (nothing to merge)
if( that == null || that.getNumRows() == 0 )
return;
-
- //check dimensions (before potentially copy to prevent implicit dimension change)
+
+ //check dimensions (before potentially copy to prevent implicit dimension change)
if ( getNumRows() != that.getNumRows() || getNumColumns() != that.getNumColumns() )
throw new DMLRuntimeException("Dimension mismatch on merge disjoint (target="+getNumRows()+"x"+getNumColumns()+", source="+that.getNumRows()+"x"+that.getNumColumns()+")");
-
+
//meta data copy if necessary
for( int j=0; j<getNumColumns(); j++ )
if( !that.isColumnMetadataDefault(j) ) {
_colmeta[j].setNumDistinct(that._colmeta[j].getNumDistinct());
_colmeta[j].setMvValue(that._colmeta[j].getMvValue());
}
-
+
//core frame block merge through cell copy
//with column-wide access pattern
for( int j=0; j<getNumColumns(); j++ ) {
- //special case: copy non-zeros of column
+ //special case: copy non-zeros of column
if( _schema[j].equals(that._schema[j]) )
_coldata[j].setNz(0, _numRows-1, that._coldata[j]);
//general case w/ schema transformation
@@ -1278,10 +1278,10 @@
}
}
}
-
+
/**
* This function ZERO OUT the data in the slicing window applicable for this block.
- *
+ *
* @param result frame block
* @param range index range
* @param complementary ?
@@ -1293,16 +1293,16 @@
*/
public FrameBlock zeroOutOperations(FrameBlock result, IndexRange range, boolean complementary, int iRowStartSrc, int iRowStartDest, int blen, int iMaxRowsToCopy) {
int clen = getNumColumns();
-
+
if(result==null)
result=new FrameBlock(getSchema());
- else
+ else
{
result.reset(0, true);
result.setSchema(getSchema());
}
result.ensureAllocatedColumns(blen);
-
+
if(complementary)
{
for(int r=(int) range.rowStart; r<=range.rowEnd&&r+iRowStartDest<blen; r++)
@@ -1316,7 +1316,7 @@
for(; r<(int)range.rowStart && r-iRowStartDest<iMaxRowsToCopy ; r++)
for(int c=0; c<clen; c++/*, offset++*/)
result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));
-
+
for(; r<=(int)range.rowEnd && r-iRowStartDest<iMaxRowsToCopy ; r++)
{
for(int c=0; c<(int)range.colStart; c++)
@@ -1325,12 +1325,12 @@
for(int c=(int)range.colEnd+1; c<clen; c++)
result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));
}
-
+
for(; r-iRowStartDest<iMaxRowsToCopy ; r++)
for(int c=0; c<clen; c++)
result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c));
}
-
+
return result;
}
@@ -1341,7 +1341,7 @@
.map(vt -> vt.toString()).toArray(String[]::new));
return fb;
}
-
+
///////
// row iterators (over strings and boxed objects)
@@ -1350,18 +1350,18 @@
protected final T[] _curRow;
protected final int _maxPos;
protected int _curPos = -1;
-
+
protected RowIterator(int rl, int ru) {
this(rl, ru, UtilFunctions.getSeqArray(1, getNumColumns(), 1));
}
-
+
protected RowIterator(int rl, int ru, int[] cols) {
_curRow = createRow(cols.length);
_cols = cols;
_maxPos = ru;
_curPos = rl;
}
-
+
@Override
public boolean hasNext() {
return (_curPos < _maxPos);
@@ -1369,9 +1369,9 @@
@Override
public void remove() {
- throw new RuntimeException("RowIterator.remove is unsupported!");
+ throw new RuntimeException("RowIterator.remove is unsupported!");
}
-
+
protected abstract T[] createRow(int size);
}
@@ -1379,16 +1379,16 @@
public StringRowIterator(int rl, int ru) {
super(rl, ru);
}
-
+
public StringRowIterator(int rl, int ru, int[] cols) {
super(rl, ru, cols);
}
-
+
@Override
protected String[] createRow(int size) {
return new String[size];
}
-
+
@Override
public String[] next( ) {
for( int j=0; j<_cols.length; j++ ) {
@@ -1402,24 +1402,24 @@
private class ObjectRowIterator extends RowIterator<Object> {
private ValueType[] _tgtSchema = null;
-
+
public ObjectRowIterator(int rl, int ru) {
super(rl, ru);
}
-
+
public ObjectRowIterator(int rl, int ru, int[] cols) {
super(rl, ru, cols);
}
-
+
public void setSchema(ValueType[] schema) {
_tgtSchema = schema;
}
-
+
@Override
protected Object[] createRow(int size) {
return new Object[size];
}
-
+
@Override
public Object[] next( ) {
for( int j=0; j<_cols.length; j++ )
@@ -1427,7 +1427,7 @@
_curPos++;
return _curRow;
}
-
+
private Object getValue(int i, int j) {
Object val = get(i, j);
if( _tgtSchema != null )
@@ -1435,21 +1435,21 @@
return val;
}
}
-
+
///////
- // generic, resizable native arrays
-
+ // generic, resizable native arrays
+
/**
- * Base class for generic, resizable array of various value types. We
- * use this custom class hierarchy instead of Trove or other libraries
+ * Base class for generic, resizable array of various value types. We
+ * use this custom class hierarchy instead of Trove or other libraries
* in order to avoid unnecessary dependencies.
*/
private abstract static class Array<T> implements Writable {
protected SoftReference<HashMap<String,Long>> _rcdMapCache = null;
-
+
protected int _size = 0;
protected int newSize() {
- return Math.max(_size*2, 4);
+ return Math.max(_size*2, 4);
}
public abstract T get(int index);
public abstract void set(int index, T value);
@@ -1461,12 +1461,12 @@
@Override
public abstract Array clone();
public abstract Array slice(int rl, int ru);
- public abstract void reset(int size);
+ public abstract void reset(int size);
}
private static class StringArray extends Array<String> {
private String[] _data = null;
-
+
public StringArray(String[] data) {
_data = data;
_size = _data.length;
@@ -1531,7 +1531,7 @@
private static class BooleanArray extends Array<Boolean> {
private boolean[] _data = null;
-
+
public BooleanArray(boolean[] data) {
_data = data;
_size = _data.length;
@@ -1598,7 +1598,7 @@
private static class LongArray extends Array<Long> {
private long[] _data = null;
-
+
public LongArray(long[] data) {
_data = data;
_size = _data.length;
@@ -1798,7 +1798,7 @@
private static class DoubleArray extends Array<Double> {
private double[] _data = null;
-
+
public DoubleArray(double[] data) {
_data = data;
_size = _data.length;
@@ -1865,10 +1865,10 @@
public static class ColumnMetadata implements Serializable {
private static final long serialVersionUID = -90094082422100311L;
-
+
private long _ndistinct = 0;
private String _mvValue = null;
-
+
public ColumnMetadata(long ndistinct) {
_ndistinct = ndistinct;
}
@@ -1880,10 +1880,10 @@
_ndistinct = that._ndistinct;
_mvValue = that._mvValue;
}
-
+
public long getNumDistinct() {
return _ndistinct;
- }
+ }
public void setNumDistinct(long ndistinct) {
_ndistinct = ndistinct;
}
@@ -2042,7 +2042,7 @@
* This method validates the frame data against an attribute length constrain
* if data value in any cell is greater than the specified threshold of that attribute
* the output frame will store a null on that cell position, thus removing the length-violating values.
- *
+ *
* @param feaLen vector of valid lengths
* @return FrameBlock with invalid values converted into missing values (null)
*/
@@ -2079,7 +2079,7 @@
+ "mismatch: "+rowTemp1.length+" vs "+rowTemp2.length);
for(int i=0; i< rowTemp1.length; i++ ) {
- //modify schema1 if necessary (different schema2)
+ //modify schema1 if necessary (different schema2)
if(!rowTemp1[i].equals(rowTemp2[i])) {
if(rowTemp1[i].equals("STRING") || rowTemp2[i].equals("STRING"))
rowTemp1[i] = "STRING";
@@ -2101,7 +2101,7 @@
return mergedFrame;
}
- public FrameBlock map(String lambdaExpr) {
+ public FrameBlock map (String lambdaExpr){
if(!lambdaExpr.contains("->")) {
String args = lambdaExpr.substring(lambdaExpr.indexOf('(') + 1, lambdaExpr.indexOf(')'));
if(args.contains(",")) {
@@ -2109,86 +2109,73 @@
return DMVUtils.syntacticalPatternDiscovery(this, Double.parseDouble(arguments[0]), arguments[1]);
}
}
+ if(lambdaExpr.contains("jaccardSim"))
+ return mapDist(getCompiledFunction(lambdaExpr));
return map(getCompiledFunction(lambdaExpr));
}
- public FrameBlock map(FrameBlockMapFunction lambdaExpression) {
- return lambdaExpression.apply();
- }
-
- public FrameBlock map(FrameMapFunction lambdaExpr) {
+ public FrameBlock map (FrameMapFunction lambdaExpr) {
// Prepare temporary output array
String[][] output = new String[getNumRows()][getNumColumns()];
-
// Execute map function on all cells
- for(int j=0; j<getNumColumns(); j++) {
+ for(int j = 0; j < getNumColumns(); j++) {
Array input = getColumn(j);
- for (int i = 0; i < input._size; i++)
+ for(int i = 0; i < input._size; i++)
if(input.get(i) != null)
output[i][j] = lambdaExpr.apply(String.valueOf(input.get(i)));
}
- return new FrameBlock(UtilFunctions.nCopies(getNumColumns(), ValueType.STRING), output);
+ return new FrameBlock(UtilFunctions.nCopies(getNumColumns(), ValueType.STRING), output);
}
- public static FrameMapFunction getCompiledFunction(String lambdaExpr) {
- String cname = "StringProcessing"+CLASS_ID.getNextID();
+ public FrameBlock mapDist (FrameMapFunction lambdaExpr) {
+ String[][] output = new String[getNumRows()][getNumRows()];
+ for(String[] row : output)
+ Arrays.fill(row, "0.0");
+ Array input = getColumn(0);
+ for(int j = 0; j < input._size - 1; j++) {
+ for(int i = j + 1; i < input._size; i++)
+ if(input.get(i) != null && input.get(j) != null) {
+ output[j][i] = lambdaExpr.apply(String.valueOf(input.get(j)), String.valueOf(input.get(i)));
+ // output[i][j] = output[j][i];
+ }
+ }
+ return new FrameBlock(UtilFunctions.nCopies(getNumRows(), ValueType.STRING), output);
+ }
+
+ public static FrameMapFunction getCompiledFunction (String lambdaExpr) {
+ String cname = "StringProcessing" + CLASS_ID.getNextID();
StringBuilder sb = new StringBuilder();
String[] parts = lambdaExpr.split("->");
-
- if( parts.length != 2 )
- throw new DMLRuntimeException("Unsupported lambda expression: "+lambdaExpr);
-
- String varname = parts[0].trim();
+ if(parts.length != 2)
+ throw new DMLRuntimeException("Unsupported lambda expression: " + lambdaExpr);
+ String[] varname = parts[0].replaceAll("[()]", "").split(",");
String expr = parts[1].trim();
// construct class code
sb.append("import org.apache.sysds.runtime.util.UtilFunctions;\n");
sb.append("import org.apache.sysds.runtime.matrix.data.FrameBlock.FrameMapFunction;\n");
- sb.append("public class "+cname+" extends FrameMapFunction {\n");
- sb.append("@Override\n");
- sb.append("public String apply(String "+varname+") {\n");
- sb.append(" return String.valueOf("+expr+"); }}\n");
-
+ sb.append("public class " + cname + " extends FrameMapFunction {\n");
+ if(varname.length == 1) {
+ sb.append("public String apply(String " + varname[0].trim() + ") {\n");
+ sb.append(" return String.valueOf(" + expr + "); }}\n");
+ }
+ else if(varname.length == 2) {
+ sb.append("public String apply(String " + varname[0].trim() + ", String " + varname[1].trim() + ") {\n");
+ sb.append(" return String.valueOf(" + expr + "); }}\n");
+ }
// compile class, and create FrameMapFunction object
try {
- return (FrameMapFunction) CodegenUtils
- .compileClass(cname, sb.toString()).newInstance();
+ return (FrameMapFunction) CodegenUtils.compileClass(cname, sb.toString()).newInstance();
}
catch(InstantiationException | IllegalAccessException e) {
throw new DMLRuntimeException("Failed to compile FrameMapFunction.", e);
}
}
-
- public FrameBlockMapFunction getCompiledFunctionBlock(String lambdaExpression) {
- String cname = "StringProcessing"+CLASS_ID.getNextID();
- StringBuilder sb = new StringBuilder();
- String expr = lambdaExpression;
-
- sb.append("import org.apache.sysds.runtime.util.UtilFunctions;\n");
- sb.append("import org.apache.sysds.runtime.matrix.data.FrameBlock.FrameBlockMapFunction;\n");
- sb.append("public class "+cname+" extends FrameBlockMapFunction {\n");
- sb.append("@Override\n");
- sb.append("public FrameBlock apply() {\n");
- sb.append(" return "+expr+"; }}\n");
-
- try {
- return (FrameBlockMapFunction) CodegenUtils
- .compileClass(cname, sb.toString()).newInstance();
- }
- catch(InstantiationException | IllegalAccessException e) {
- throw new DMLRuntimeException("Failed to compile FrameBlockMapFunction.", e);
- }
- }
-
- public static abstract class FrameMapFunction implements Serializable {
+ public static class FrameMapFunction implements Serializable {
private static final long serialVersionUID = -8398572153616520873L;
- public abstract String apply(String input);
- }
-
- public static abstract class FrameBlockMapFunction implements Serializable {
- private static final long serialVersionUID = -8398573333616520876L;
- public abstract FrameBlock apply();
+ public String apply(String input) {return null;}
+ public String apply(String input1, String input2) { return null;}
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
index a7fdaf4..5c8ed95 100644
--- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
@@ -19,10 +19,6 @@
package org.apache.sysds.runtime.util;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.sysds.common.Types.ValueType;
@@ -35,6 +31,10 @@
import org.apache.sysds.runtime.matrix.data.Pair;
import org.apache.sysds.runtime.meta.TensorCharacteristics;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
public class UtilFunctions {
// private static final Log LOG = LogFactory.getLog(UtilFunctions.class.getName());
@@ -835,6 +835,17 @@
.map(DATE_FORMATS::get).orElseThrow(() -> new NullPointerException("Unknown date format."));
}
+ public static double jaccardSim(String x, String y) {
+ Set<String> charsX = new LinkedHashSet<>(Arrays.asList(x.split("(?!^)")));
+ Set<String> charsY = new LinkedHashSet<>(Arrays.asList(y.split("(?!^)")));
+
+ final int sa = charsX.size();
+ final int sb = charsY.size();
+ charsX.retainAll(charsY);
+ final int intersection = charsX.size();
+ return 1d / (sa + sb - charsX.size()) * intersection;
+ }
+
/**
* Generates a random FrameBlock with given parameters.
*
diff --git a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinMDTest.java b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinMDTest.java
new file mode 100644
index 0000000..ec3c502
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinMDTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.sysds.test.functions.builtin;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.lops.LopProperties;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class BuiltinMDTest extends AutomatedTestBase {
+ private final static String TEST_NAME = "matching_dependency";
+ private final static String TEST_DIR = "functions/builtin/";
+ private static final String TEST_CLASS_DIR = TEST_DIR + BuiltinMDTest.class.getSimpleName() + "/";
+
+ @Parameterized.Parameter()
+ public double[][] LHSf;
+
+ @Parameterized.Parameter(1)
+ public double[][] LHSt;
+
+ @Parameterized.Parameter(2)
+ public double[][] RHSf;
+
+ @Parameterized.Parameter(3)
+ public double[][] RHSt;
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {new double[][] {{1}}, new double[][] {{0.95}},
+ new double[][] {{5}}, new double[][] {{0.65}}},
+
+ {new double[][] {{1,3}}, new double[][] {{0.7,0.8}},
+ new double[][] {{5}}, new double[][] {{0.8}}},
+
+ {new double[][] {{1,4,5}}, new double[][] {{0.9,0.9,0.9}},
+ new double[][] {{6}}, new double[][] {{0.9}}},
+
+ {new double[][] {{1,4,5}}, new double[][] {{0.75,0.6,0.9}},
+ new double[][] {{3}}, new double[][] {{0.8}}},
+ });
+ }
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"D"}));
+ if (TEST_CACHE_ENABLED) {
+ setOutAndExpectedDeletionDisabled(true);
+ }
+ }
+
+ @Test
+ public void testMDCP() {
+ double[][] D = {
+ {7567, 231, 1231, 1232, 122, 321},
+ {5321, 23123, 122, 123, 1232, 11},
+ {7267, 3, 223, 432, 1132, 0},
+ {7267, 3, 223, 432, 1132, 500},
+ {7254, 3, 223, 432, 1132, 0},
+ };
+ runMDTests(D, LHSf, LHSt, RHSf, RHSt, LopProperties.ExecType.CP);
+ }
+
+ @Test
+ public void testMDSP() {
+ double[][] D = {
+ {7567, 231, 1231, 1232, 122, 321},
+ {5321, 23123, 122, 123, 1232, 11},
+ {7267, 3, 223, 432, 1132, 0},
+ {7267, 3, 223, 432, 1132, 500},
+ {7254, 3, 223, 432, 1132, 0},
+ };
+ runMDTests(D, LHSf, LHSt, RHSf, RHSt, LopProperties.ExecType.SPARK);
+ }
+
+ private void runMDTests(double [][] X , double[][] LHSf, double[][] LHSt, double[][] RHSf, double[][] RHSt, LopProperties.ExecType instType) {
+ Types.ExecMode platformOld = setExecMode(instType);
+ try
+ {
+ loadTestConfiguration(getTestConfiguration(TEST_NAME));
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[]{"-stats","-args", input("X"),
+ input("LHSf"), input("LHSt"), input("RHSf"), input("RHSt"), output("B")};
+
+ double[][] A = getRandomMatrix(20, 6, 50, 500, 1, 2);
+ System.arraycopy(X, 0, A, 0, X.length);
+
+ writeInputMatrixWithMTD("X", A, false);
+ writeInputMatrixWithMTD("LHSf", LHSf, true);
+ writeInputMatrixWithMTD("LHSt", LHSt, true);
+ writeInputMatrixWithMTD("RHSf", RHSf, true);
+ writeInputMatrixWithMTD("RHSt", RHSt, true);
+
+ runTest(true, false, null, -1);
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ }
+ finally {
+ rtplatform = platformOld;
+ }
+ }
+}
diff --git a/src/test/scripts/functions/builtin/matching_dependency.dml b/src/test/scripts/functions/builtin/matching_dependency.dml
new file mode 100644
index 0000000..0256cc8
--- /dev/null
+++ b/src/test/scripts/functions/builtin/matching_dependency.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# X = read($1, data_type = "frame", format = "csv", header = FALSE);
+X = as.frame(read($1))
+LHSf = read($2);
+LHSt = read($3);
+RHSf = read($4);
+RHSt = read($5);
+B = mdedup(X, LHSf, LHSt, RHSf, RHSt, TRUE);
+write(B, $6);