[SYSTEMDS-352] Builtin error correction by length

- New builtin for identifying cells which violate length constrain.
- Replacing OutputInfo.CSVOutputInfo with Types.FileFormat.CSV

1. Operations are now consistent with their semantics i.e.,
   dropInvalidLength and dropInvalidType
2. Instead of identify the invalid cells the "dropInvalidLength"
   now replaces the invalid values with null and returns a frame
3. Binary method changed from MMBinaryMethod.MR_BINARY_R to
   MMBinaryMethod.MR_BINARY_M
4. Spark broadcast replaced with PartitionedBroadcast
diff --git a/dev/Tasks.txt b/dev/Tasks.txt
index 3755b5d..c06bd46 100644
--- a/dev/Tasks.txt
+++ b/dev/Tasks.txt
@@ -308,6 +308,7 @@
 
 SYSTEMDS-350 Data Cleaning Framework
  * 351 New builtin function for error correction by schema            OK
+ * 352 New builtin function for error correction by length            Ok
 
 SYSTEMDS-360 Privacy/Data Exchange Constraints
  * 361 Initial privacy meta data (compiler/runtime)                   OK
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java b/src/main/java/org/apache/sysds/common/Builtins.java
index 5ee7a79..602ffd6 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -85,7 +85,8 @@
 	DETECTSCHEMA("detectSchema", false),
 	DIAG("diag", false),
 	DISCOVER_FD("discoverFD", true),
-	DROP_INVALID("dropInvalid", false),
+	DROP_INVALID_TYPE("dropInvalidType", false),
+	DROP_INVALID_LENGTH("dropInvalidLength", false),
 	EIGEN("eigen", false, ReturnType.MULTI_RETURN),
 	EXISTS("exists", false),
 	EXP("exp", false),
diff --git a/src/main/java/org/apache/sysds/common/Types.java b/src/main/java/org/apache/sysds/common/Types.java
index 02b1519..1ec0b13 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -266,7 +266,8 @@
 	public enum OpOp2 {
 		AND(true), BITWAND(true), BITWOR(true), BITWSHIFTL(true), BITWSHIFTR(true),
 		BITWXOR(true), CBIND(false), CONCAT(false), COV(false), DIV(true),
-		DROP_INVALID(false), EQUAL(true), GREATER(true), GREATEREQUAL(true),
+		DROP_INVALID_TYPE(false), DROP_INVALID_LENGTH(false),
+		EQUAL(true), GREATER(true), GREATEREQUAL(true),
 		INTDIV(true), INTERQUANTILE(false), IQM(false), LESS(true), LESSEQUAL(true),
 		LOG(true), MAX(true), MEDIAN(false), MIN(true), MINUS(true), MODULUS(true),
 		MOMENT(false), MULT(true), NOTEQUAL(true), OR(true), PLUS(true), POW(true),
@@ -313,7 +314,8 @@
 				case BITWXOR:      return "bitwXor";
 				case BITWSHIFTL:   return "bitwShiftL";
 				case BITWSHIFTR:   return "bitwShiftR";
-				case DROP_INVALID: return "dropInvalid";
+				case DROP_INVALID_TYPE: return "dropInvalidType";
+				case DROP_INVALID_LENGTH: return "dropInvalidLength";
 				default:           return name().toLowerCase();
 			}
 		}
@@ -345,7 +347,8 @@
 				case "bitwXor":     return BITWXOR;
 				case "bitwShiftL":  return BITWSHIFTL;
 				case "bitwShiftR":  return BITWSHIFTR;
-				case "dropInvalid": return DROP_INVALID;
+				case "dropInvalidType": return DROP_INVALID_TYPE;
+				case "dropInvalidLength": return DROP_INVALID_LENGTH;
 				default:            return valueOf(opcode.toUpperCase());
 			}
 		}
diff --git a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
index eeb44f9..d3966e2 100644
--- a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
+++ b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
@@ -1528,7 +1528,7 @@
 			output.setBlocksize(0);
 			break;
 
-		case DROP_INVALID:
+		case DROP_INVALID_TYPE:
 			checkNumParameters(2);
 			checkMatrixFrameParam(getFirstExpr());
 			checkMatrixFrameParam(getSecondExpr());
@@ -1538,6 +1538,16 @@
 			output.setValueType(ValueType.STRING);
 			break;
 
+		case DROP_INVALID_LENGTH:
+			checkNumParameters(2);
+			checkMatrixFrameParam(getFirstExpr());
+			checkMatrixFrameParam(getSecondExpr());
+			output.setDataType(DataType.FRAME);
+			output.setDimensions(id.getDim1(), id.getDim2());
+			output.setBlocksize (id.getBlocksize());
+			output.setValueType(id.getValueType());
+			break;
+
 		default:
 			if( isMathFunction() ) {
 				checkMathFunctionParam();
diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index b8c7bcf..87fd18a 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -2503,7 +2503,8 @@
 			currBuiltinOp = new UnaryOp(target.getName(), target.getDataType(), target.getValueType(),
 				OpOp1.valueOf(source.getOpCode().name()), expr);
 			break;
-		case DROP_INVALID:
+		case DROP_INVALID_TYPE:
+		case DROP_INVALID_LENGTH:
 			currBuiltinOp = new BinaryOp(target.getName(), target.getDataType(),
 				target.getValueType(), OpOp2.valueOf(source.getOpCode().name()), expr, expr2);
 			break;
diff --git a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
index 7e5b73a..5d66c15 100644
--- a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
+++ b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
@@ -50,8 +50,10 @@
 	public enum BuiltinCode { SIN, COS, TAN, SINH, COSH, TANH, ASIN, ACOS, ATAN, LOG, LOG_NZ, MIN,
 		MAX, ABS, SIGN, SQRT, EXP, PLOGP, PRINT, PRINTF, NROW, NCOL, LENGTH, LINEAGE, ROUND, MAXINDEX, MININDEX,
 		STOP, CEIL, FLOOR, CUMSUM, CUMPROD, CUMMIN, CUMMAX, CUMSUMPROD, INVERSE, SPROP, SIGMOID, EVAL, LIST,
-		TYPEOF, DETECTSCHEMA, ISNA, ISNAN, ISINF, DROP_INVALID, COUNT_DISTINCT, COUNT_DISTINCT_APPROX}
-		
+		TYPEOF, DETECTSCHEMA, ISNA, ISNAN, ISINF, DROP_INVALID_TYPE, DROP_INVALID_LENGTH,
+		COUNT_DISTINCT, COUNT_DISTINCT_APPROX}
+
+
 	public BuiltinCode bFunc;
 	
 	private static final boolean FASTMATH = true;
@@ -103,7 +105,8 @@
 		String2BuiltinCode.put( "isna", BuiltinCode.ISNA);
 		String2BuiltinCode.put( "isnan", BuiltinCode.ISNAN);
 		String2BuiltinCode.put( "isinf", BuiltinCode.ISINF);
-		String2BuiltinCode.put( "dropInvalid", BuiltinCode.DROP_INVALID);
+		String2BuiltinCode.put( "dropInvalidType", BuiltinCode.DROP_INVALID_TYPE);
+		String2BuiltinCode.put( "dropInvalidLength", BuiltinCode.DROP_INVALID_LENGTH);
 	}
 	
 	private Builtin(BuiltinCode bf) {
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
index d4a78ef..669ea79 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
@@ -151,7 +151,8 @@
 		String2CPInstructionType.put( "solve"  , CPType.Binary);
 		String2CPInstructionType.put( "max"  , CPType.Binary);
 		String2CPInstructionType.put( "min"  , CPType.Binary);
-		String2CPInstructionType.put( "dropInvalid"  , CPType.Binary);
+		String2CPInstructionType.put( "dropInvalidType"  , CPType.Binary);
+		String2CPInstructionType.put( "dropInvalidLength"  , CPType.Binary);
 
 		String2CPInstructionType.put( "nmax", CPType.BuiltinNary);
 		String2CPInstructionType.put( "nmin", CPType.BuiltinNary);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
index 4377918..904f46d 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
@@ -21,6 +21,7 @@
 
 import java.util.StringTokenizer;
 
+import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.AggOp;
 import org.apache.sysds.common.Types.CorrectionLocationType;
 import org.apache.sysds.common.Types.Direction;
@@ -485,7 +486,7 @@
 	}
 	
 	public static Operator parseExtendedBinaryOrBuiltinOperator(String opcode, CPOperand in1, CPOperand in2) {
-		boolean matrixScalar = (in1.getDataType() != in2.getDataType());
+		boolean matrixScalar = (in1.getDataType() != in2.getDataType() && (in1.getDataType() != Types.DataType.FRAME && in2.getDataType() != Types.DataType.FRAME));
 		return Builtin.isBuiltinFnObject(opcode) ?
 			(matrixScalar ? new RightScalarOperator( Builtin.getBuiltinFnObject(opcode), 0) :
 				new BinaryOperator( Builtin.getBuiltinFnObject(opcode))) :
@@ -548,9 +549,11 @@
 			return new BinaryOperator(Builtin.getBuiltinFnObject("max"));
 		else if ( opcode.equalsIgnoreCase("min") ) 
 			return new BinaryOperator(Builtin.getBuiltinFnObject("min"));
-		else if( opcode.equalsIgnoreCase("dropInvalid"))
-			return new BinaryOperator(Builtin.getBuiltinFnObject("dropInvalid"));
-		
+		else if( opcode.equalsIgnoreCase("dropInvalidType"))
+			return new BinaryOperator(Builtin.getBuiltinFnObject("dropInvalidType"));
+		else if( opcode.equalsIgnoreCase("dropInvalidLength"))
+			return new BinaryOperator(Builtin.getBuiltinFnObject("dropInvalidLength"));
+
 		throw new RuntimeException("Unknown binary opcode " + opcode);
 	}
 	
@@ -777,6 +780,8 @@
 			return new BinaryOperator(Builtin.getBuiltinFnObject("max"));
 		else if ( opcode.equalsIgnoreCase("min") || opcode.equalsIgnoreCase("mapmin") ) 
 			return new BinaryOperator(Builtin.getBuiltinFnObject("min"));
+		else if ( opcode.equalsIgnoreCase("dropInvalidLength") || opcode.equalsIgnoreCase("mapdropInvalidLength") )
+			return new BinaryOperator(Builtin.getBuiltinFnObject("dropInvalidLength"));
 		
 		throw new DMLRuntimeException("Unknown binary opcode " + opcode);
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
index 5891d30..a53acb9 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
@@ -177,8 +177,9 @@
 		String2SPInstructionType.put( "map^"    , SPType.Binary);
 		String2SPInstructionType.put( "map+*"   , SPType.Binary);
 		String2SPInstructionType.put( "map-*"   , SPType.Binary);
-		String2SPInstructionType.put( "dropInvalid", SPType.Binary);
-		// Relational Instruction Opcodes 
+		String2SPInstructionType.put( "dropInvalidType", SPType.Binary);
+		String2SPInstructionType.put( "mapdropInvalidLength", SPType.Binary);
+		// Relational Instruction Opcodes
 		String2SPInstructionType.put( "=="   , SPType.Binary);
 		String2SPInstructionType.put( "!="   , SPType.Binary);
 		String2SPInstructionType.put( "<"    , SPType.Binary);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryCPInstruction.java
index 1e4e7f6..bd87a15 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryCPInstruction.java
@@ -41,8 +41,9 @@
 		CPOperand in2 = new CPOperand("", ValueType.UNKNOWN, DataType.UNKNOWN);
 		CPOperand out = new CPOperand("", ValueType.UNKNOWN, DataType.UNKNOWN);
 		String opcode = parseBinaryInstruction(str, in1, in2, out);
-		
-		checkOutputDataType(in1, in2, out);
+
+		if(!(in1.getDataType() == DataType.FRAME || in2.getDataType() == DataType.FRAME))
+			checkOutputDataType(in1, in2, out);
 		
 		Operator operator = InstructionUtils.parseBinaryOrBuiltinOperator(opcode, in1, in2);
 
@@ -54,6 +55,8 @@
 			return new BinaryTensorTensorCPInstruction(operator, in1, in2, out, opcode, str);
 		else if (in1.getDataType() == DataType.FRAME && in2.getDataType() == DataType.FRAME)
 			return new BinaryFrameFrameCPInstruction(operator, in1, in2, out, opcode, str);
+		else if (in1.getDataType() == DataType.FRAME && in2.getDataType() == DataType.MATRIX)
+			return new BinaryFrameMatrixCPInstruction(operator, in1, in2, out, opcode, str);
 		else
 			return new BinaryMatrixScalarCPInstruction(operator, in1, in2, out, opcode, str);
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
index ba0bfe1..1116675 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
@@ -26,7 +26,7 @@
 public class BinaryFrameFrameCPInstruction extends BinaryCPInstruction
 {
 	protected BinaryFrameFrameCPInstruction(Operator op, CPOperand in1,
-		CPOperand in2, CPOperand out, String opcode, String istr) {
+			CPOperand in2, CPOperand out, String opcode, String istr) {
 		super(CPType.Binary, op, in1, in2, out, opcode, istr);
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameMatrixCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameMatrixCPInstruction.java
new file mode 100644
index 0000000..25b0988
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameMatrixCPInstruction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.sysds.runtime.instructions.cp;
+
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class BinaryFrameMatrixCPInstruction extends BinaryCPInstruction {
+	protected BinaryFrameMatrixCPInstruction(Operator op, CPOperand in1,
+			CPOperand in2, CPOperand out, String opcode, String istr) {
+		super(CPInstruction.CPType.Binary, op, in1, in2, out, opcode, istr);
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		// Read input frame
+		FrameBlock inBlock1 = ec.getFrameInput(input1.getName());
+		// the vector with valid column lengths
+		MatrixBlock featurelength = ec.getMatrixInput(input2.getName());
+		// identify columns with invalid lengths
+		FrameBlock out = inBlock1.invalidByLength(featurelength);
+		// Release the memory occupied by inputs
+		ec.releaseFrameInput(input1.getName());
+		ec.releaseMatrixInput(input2.getName());
+		// Attach result frame with output
+		ec.setFrameOutput(output.getName(),out);
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameMatrixSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameMatrixSPInstruction.java
new file mode 100644
index 0000000..6135d45
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameMatrixSPInstruction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.spark.data.PartitionedBroadcast;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class BinaryFrameMatrixSPInstruction extends BinarySPInstruction {
+	protected BinaryFrameMatrixSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, String opcode, String istr) {
+		super(SPType.Binary, op, in1, in2, out, opcode, istr);
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		SparkExecutionContext sec = (SparkExecutionContext) ec;
+		// Get input RDDs
+		JavaPairRDD<Long, FrameBlock> in1 = sec.getFrameBinaryBlockRDDHandleForVariable(input1.getName());
+		// get feature length matrix
+		PartitionedBroadcast<MatrixBlock> feaLen =  sec.getBroadcastForVariable(input2.getName());
+		JavaPairRDD<Long, FrameBlock> out = in1.mapValues(new DropInvalidLengths(feaLen));
+
+		//set output RDD
+		sec.setRDDHandleForVariable(output.getName(), out);
+		sec.addLineageRDD(output.getName(), input1.getName());
+		sec.addLineageBroadcast(output.getName(), input2.getName());
+	}
+
+	private static class DropInvalidLengths implements  Function<FrameBlock,FrameBlock> {
+		private static final long serialVersionUID = 5850400295183766400L;
+
+		private PartitionedBroadcast<MatrixBlock> featureLength = null;
+
+		public DropInvalidLengths(PartitionedBroadcast<MatrixBlock> fl) {
+			featureLength = fl;
+		}
+
+		@Override public FrameBlock call(FrameBlock frameBlock) throws Exception {
+			int idx = (int)featureLength.getNumRows();
+			FrameBlock fb = frameBlock.invalidByLength(featureLength.getBlock(1, idx));
+			return fb;
+		}
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
index 3c3e5b6..ee96dc9 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
@@ -92,6 +92,8 @@
 				else
 					return new BinaryMatrixMatrixSPInstruction(operator, in1, in2, out, opcode, str);
 			}
+			else if(dt1 == DataType.FRAME && dt2 == DataType.MATRIX)
+				return  new BinaryFrameMatrixSPInstruction(operator, in1, in2, out, opcode, str);
 			else
 				return new BinaryMatrixScalarSPInstruction(operator, in1, in2, out, opcode, str);
 		}
@@ -106,7 +108,9 @@
 				throw new DMLRuntimeException("Tensor binary operation not yet implemented for tensor-scalar, or tensor-matrix");
 		}
 		else if( dt1 == DataType.FRAME || dt2 == DataType.FRAME ) {
-			return BinaryFrameFrameSPInstruction.parseInstruction(str);
+			if(dt1 == DataType.FRAME && dt2 == DataType.FRAME)
+				return BinaryFrameFrameSPInstruction.parseInstruction(str);
+
 		}
 
 		return null;
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index 7456cc4..a27511a 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -1830,6 +1830,11 @@
 		return fb;
 	}
 
+	/**
+	 * Drop the cell value which does not confirms to the data type of its column
+	 * @param schema of the frame
+	 * @return original frame where invalid values are replaced with null
+	 */
 	public FrameBlock dropInvalid(FrameBlock schema) {
 		//sanity checks
 		if(this.getNumColumns() != schema.getNumColumns())
@@ -1865,6 +1870,38 @@
 		return this;
 	}
 
+	/**
+	 *  This method validates the frame data against an attribute length constrain
+	 *  if data value in any cell is greater than the specified threshold of that attribute
+	 *  the output frame will store a null on that cell position, thus removing the length-violating values.
+	 * 
+	 *  @param row vector of valid lengths
+	 *  @return FrameBlock with invalid values converted into missing values (null)
+	 */
+	public FrameBlock invalidByLength(MatrixBlock feaLen) {
+		//sanity checks
+		if(this.getNumColumns() != feaLen.getNumColumns())
+			throw new DMLException("mismatch in number of columns in frame and corresponding feature-length vector");
+
+		FrameBlock outBlock = new FrameBlock(this);
+		for (int i = 0; i < this.getNumColumns(); i++) {
+			if(feaLen.quickGetValue(0, i) == -1)
+				continue;
+			int validLength = (int)feaLen.quickGetValue(0, i);
+			Array obj = this.getColumn(i);
+			for (int j = 0; j < obj._size; j++)
+			{
+				if(obj.get(j) == null)
+					continue;
+				String dataValue = obj.get(j).toString();
+				if(dataValue.length() > validLength)
+					outBlock.set(j, i, null);
+			}
+		}
+
+		return outBlock;
+	}
+
 	public static FrameBlock mergeSchema(FrameBlock temp1, FrameBlock temp2) {
 		String[] rowTemp1 = temp1.getStringRowIterator().next();
 		String[] rowTemp2 = temp2.getStringRowIterator().next();
diff --git a/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidLengthTest.java b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidLengthTest.java
new file mode 100644
index 0000000..c938b28
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidLengthTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.frame;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.lops.LopProperties;
+import org.apache.sysds.runtime.io.FrameWriter;
+import org.apache.sysds.runtime.io.FrameWriterFactory;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+public class FrameDropInvalidLengthTest extends AutomatedTestBase {
+	private final static String TEST_NAME = "DropInvalidLength";
+	private final static String TEST_DIR = "functions/frame/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + FrameDropInvalidLengthTest.class.getSimpleName() + "/";
+
+	private final static int rows = 800;
+	private final static int cols = 4;
+	private final static Types.ValueType[] schemaStrings = {Types.ValueType.FP64, Types.ValueType.STRING, Types.ValueType.STRING, Types.ValueType.INT64};
+
+	public static void init() {
+		TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR);
+	}
+
+	public static void cleanUp() {
+		if (TEST_CACHE_ENABLED) {
+			TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR);
+		}
+	}
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+		if (TEST_CACHE_ENABLED) {
+			setOutAndExpectedDeletionDisabled(true);
+		}
+	}
+
+	@Test
+	public void testTwoBadColCP() {
+		double[][] invalidLength =  {{-1,30,20,-1}};
+		runDropInvalidLenTest( invalidLength,1, LopProperties.ExecType.CP);
+	}
+
+	@Test
+	public void testTwoBadColSP() {
+		double[][] invalidLength =  {{-1,30,20,-1}};
+		runDropInvalidLenTest( invalidLength,1, LopProperties.ExecType.SPARK);
+	}
+
+	@Test
+	public void testOneBadColCP() {
+		double[][] invalidLength =  {{-1,-1,20,-1}};
+		runDropInvalidLenTest( invalidLength,2, LopProperties.ExecType.CP);
+	}
+
+	@Test
+	public void testOneBadColSP() {
+		double[][] invalidLength =  {{-1,-1,20,-1}};
+		runDropInvalidLenTest( invalidLength,2, LopProperties.ExecType.SPARK);
+	}
+
+	@Test
+	public void testAllBadColCP() {
+		double[][] invalidLength =  {{2,2,2,1}};
+		runDropInvalidLenTest( invalidLength,3, LopProperties.ExecType.CP);
+	}
+
+	@Test
+	public void testAllBadColSP() {
+		double[][] invalidLength =  {{2,2,2,1}};
+		runDropInvalidLenTest( invalidLength,3, LopProperties.ExecType.SPARK);
+	}
+
+	@Test
+	public void testNoneBadColCP() {
+		double[][] invalidLength =  {{-1,20,20,-1}};
+		runDropInvalidLenTest( invalidLength,4, LopProperties.ExecType.CP);
+	}
+
+	@Test
+	public void testNoneBadColSP() {
+		double[][] invalidLength =  {{-1,20,20,-1}};
+		runDropInvalidLenTest( invalidLength,4, LopProperties.ExecType.SPARK);
+	}
+
+	private void runDropInvalidLenTest(double[][] colInvalidLength, int test, LopProperties.ExecType et)
+	{
+		Types.ExecMode platformOld = setExecMode(et);
+		boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		try {
+			getAndLoadTestConfiguration(TEST_NAME);
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + TEST_NAME + ".dml";
+			programArgs = new String[] {"-args", input("A"), input("M"),
+					String.valueOf(rows), Integer.toString(cols), output("B")};
+			FrameBlock frame1 = new FrameBlock(schemaStrings);
+			double[][] A = getRandomMatrix(rows, cols, 10, 100, 1, 2373);
+			initFrameDataString(frame1,A, schemaStrings); // initialize a frame with one column
+			FrameWriter writer = FrameWriterFactory.createFrameWriter(Types.FileFormat.CSV);
+
+			ArrayList<Integer> badIndex = getBadIndexes(rows/4);
+			int expected = 0;
+
+			switch (test) { //Double in String
+				case 1:
+					for (int i = 0; i < badIndex.size(); i++) {
+						frame1.set(badIndex.get(i),1,"This is a very long sentence that could" +
+								" count up to multiple characters");
+					}
+					expected += badIndex.size();
+				case 2:
+					for (int i = 0; i < badIndex.size(); i++) {
+						frame1.set(badIndex.get(i), 2, "This is out of length");
+					}
+					expected += badIndex.size();
+					break;
+				case 3:
+					expected += rows*cols;
+					break;
+				case 4:
+					expected += 0.0;
+					break;
+			}
+			// write data frame
+			writer.writeFrameToHDFS(
+					frame1.slice(0, rows - 1, 0, cols-1, new FrameBlock()),
+					input("A"), rows, schemaStrings.length);
+
+			// write expected feature length vector
+			writeInputMatrixWithMTD("M", colInvalidLength, true);
+
+			runTest(true, false, null, -1);
+
+			// compare output
+			FrameBlock frameOut = readDMLFrameFromHDFS("B", Types.FileFormat.BINARY);
+
+			//read output data and compare results
+			ArrayList<Object> data = new ArrayList<>();
+			for (int i = 0; i < frameOut.getNumRows(); i++)
+				for(int j=0; j < frameOut.getNumColumns(); j++ )
+					data.add(frameOut.get(i, j));
+
+			int nullNum = Math.toIntExact(data.stream().filter(s -> s == null).count());
+			Assert.assertEquals(expected, nullNum, 1e-5);
+		}
+		catch (Exception ex) {
+			throw new RuntimeException(ex);
+		}
+		finally {
+			rtplatform = platformOld;
+			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+			OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldFlag;
+			OptimizerUtils.ALLOW_AUTO_VECTORIZATION = true;
+			OptimizerUtils.ALLOW_OPERATOR_FUSION = true;
+		}
+	}
+
+	private ArrayList<Integer> getBadIndexes(int length) {
+		ArrayList<Integer> list = new ArrayList();
+		for(int i =0; i<length; i++)
+		{
+			int r = ThreadLocalRandom.current().nextInt(0, rows);
+			list.add(r);
+		}
+		return (ArrayList) list.stream().distinct().collect(Collectors.toList());
+	}
+
+	public static void initFrameDataString(FrameBlock frame1, double[][] data, Types.ValueType[] lschema) {
+		for (int j = 0; j < lschema.length; j++) {
+			Types.ValueType vt = lschema[j];
+			switch (vt) {
+				case STRING:
+					String[] tmp1 = new String[rows];
+					for (int i = 0; i < rows; i++)
+						tmp1[i] = (String) UtilFunctions.doubleToObject(vt, data[i][j]);
+					frame1.appendColumn(tmp1);
+					break;
+				case INT64:
+					long[] tmp4 = new long[rows];
+					for (int i = 0; i < rows; i++)
+						data[i][j] = tmp4[i] = (Long) UtilFunctions.doubleToObject(Types.ValueType.INT64,
+								data[i][j], false);
+					frame1.appendColumn(tmp4);
+					break;
+				case FP64:
+					double[] tmp6 = new double[rows];
+					for (int i = 0; i < rows; i++)
+						tmp6[i] = (Double) UtilFunctions.doubleToObject(vt, data[i][j], false);
+					frame1.appendColumn(tmp6);
+					break;
+				default:
+					throw new RuntimeException("Unsupported value type: " + vt);
+			}
+		}
+	}
+}
+
diff --git a/src/test/java/org/apache/sysds/test/functions/frame/FrameIsCorrectTypeTest.java b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
similarity index 96%
rename from src/test/java/org/apache/sysds/test/functions/frame/FrameIsCorrectTypeTest.java
rename to src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
index 433f23f..d70b255 100644
--- a/src/test/java/org/apache/sysds/test/functions/frame/FrameIsCorrectTypeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
@@ -38,11 +38,11 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 
-public class FrameIsCorrectTypeTest extends AutomatedTestBase
+public class FrameDropInvalidTypeTest extends AutomatedTestBase
 {
-	private final static String TEST_NAME = "DropInvalid";
+	private final static String TEST_NAME = "DropInvalidType";
 	private final static String TEST_DIR = "functions/frame/";
-	private static final String TEST_CLASS_DIR = TEST_DIR + FrameIsCorrectTypeTest.class.getSimpleName() + "/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + FrameDropInvalidTypeTest.class.getSimpleName() + "/";
 
 	private final static int rows = 20;
 	private final static ValueType[] schemaStrings = {ValueType.FP64, ValueType.STRING};
diff --git a/src/test/scripts/functions/frame/DropInvalid.dml b/src/test/scripts/functions/frame/DropInvalidLength.dml
similarity index 82%
copy from src/test/scripts/functions/frame/DropInvalid.dml
copy to src/test/scripts/functions/frame/DropInvalidLength.dml
index 38d2436..c711ff4 100644
--- a/src/test/scripts/functions/frame/DropInvalid.dml
+++ b/src/test/scripts/functions/frame/DropInvalidLength.dml
@@ -20,6 +20,9 @@
 #-------------------------------------------------------------
 
 X = read($1, rows=$3, cols=$4, data_type="frame", format="csv");
-M = read($2, rows=1, cols=$4, data_type="frame", format="csv");
-R = dropInvalid(X,M);
+colLength = read($2); # row vector with -1 for exempted features and 
+                      # a valid character length for features to be processed 
+R = dropInvalidLength(X, colLength);
+# print(toString(R))
 write(R, $5, format="binary");
+
diff --git a/src/test/scripts/functions/frame/DropInvalid.dml b/src/test/scripts/functions/frame/DropInvalidType.dml
similarity index 97%
rename from src/test/scripts/functions/frame/DropInvalid.dml
rename to src/test/scripts/functions/frame/DropInvalidType.dml
index 38d2436..89d3954 100644
--- a/src/test/scripts/functions/frame/DropInvalid.dml
+++ b/src/test/scripts/functions/frame/DropInvalidType.dml
@@ -21,5 +21,5 @@
 
 X = read($1, rows=$3, cols=$4, data_type="frame", format="csv");
 M = read($2, rows=1, cols=$4, data_type="frame", format="csv");
-R = dropInvalid(X,M);
+R = dropInvalidType(X,M);
 write(R, $5, format="binary");