[SYSTEMDS-2788] Fix list-append validation, write error handling

This patch fixes an issue of invalid list-append validation. For lists
appends, we can append objects of any size and thus ignore inferred
dimensions (from cbind/rbind). However, if the dimensions of the
appended matrix where known, the existing validation code incorrectly
raised errors (for unknown dimensions only warnings, that's why it did
not show up in other scripts).

Furthermore, this also improves the error handling if due to
script-level mistakes we compile a spark write instruction for lists.
diff --git a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
index 6b4c472..d4d8296 100644
--- a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
+++ b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
@@ -800,6 +800,8 @@
 			
 			output.setDataType(id.getDataType());
 			output.setValueType(id.getValueType());
+			
+			//special handling of concatenating all list elements
 			if( id.getDataType() == DataType.LIST && getAllExpr().length == 1) {
 				output.setDataType(DataType.MATRIX);
 				output.setValueType(ValueType.FP64);
@@ -810,33 +812,35 @@
 			long m1clen = getFirstExpr().getOutput().getDim2();
 			long appendDim1 = m1rlen, appendDim2 = m1clen;
 			
-			for(int i=1; i<getAllExpr().length; i++) {
-				long m2rlen = getExpr(i).getOutput().getDim1();
-				long m2clen = getExpr(i).getOutput().getDim2();
-				
-				if( getOpCode() == Builtins.CBIND ) {
-					if (m1rlen >= 0 && m2rlen >= 0 && m1rlen!=m2rlen) {
-						raiseValidateError("inputs to cbind must have same number of rows: input 1 rows: " + 
-							m1rlen+", input 2 rows: "+m2rlen, conditional, LanguageErrorCodes.INVALID_PARAMETERS);
-					}
-					appendDim1 = (m2rlen>=0) ? m2rlen : appendDim1;
-					appendDim2 = (appendDim2>=0 && m2clen>=0) ? appendDim2 + m2clen : -1;
-				}
-				else if( getOpCode() == Builtins.RBIND ) {
-					if (m1clen >= 0 && m2clen >= 0 && m1clen!=m2clen) {
-						raiseValidateError("inputs to rbind must have same number of columns: input 1 columns: " + 
-							m1clen+", input 2 columns: "+m2clen, conditional, LanguageErrorCodes.INVALID_PARAMETERS);
-					}
-					appendDim1 = (appendDim1>=0 && m2rlen>=0)? appendDim1 + m2rlen : -1;
-					appendDim2 = (m2clen>=0) ? m2clen : appendDim2;
-				}
-			}
-			
+			// best-effort dimension propagation and validation
 			if( id.getDataType() == DataType.LIST ) {
 				appendDim1 = -1;
 				appendDim2 = -1;
 			}
-
+			else {
+				for(int i=1; i<getAllExpr().length; i++) {
+					long m2rlen = getExpr(i).getOutput().getDim1();
+					long m2clen = getExpr(i).getOutput().getDim2();
+					
+					if( getOpCode() == Builtins.CBIND ) {
+						if (m1rlen >= 0 && m2rlen >= 0 && m1rlen!=m2rlen) {
+							raiseValidateError("inputs to cbind must have same number of rows: input 1 rows: " + 
+								m1rlen+", input 2 rows: "+m2rlen, conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+						}
+						appendDim1 = (m2rlen>=0) ? m2rlen : appendDim1;
+						appendDim2 = (appendDim2>=0 && m2clen>=0) ? appendDim2 + m2clen : -1;
+					}
+					else if( getOpCode() == Builtins.RBIND ) {
+						if (m1clen >= 0 && m2clen >= 0 && m1clen!=m2clen) {
+							raiseValidateError("inputs to rbind must have same number of columns: input 1 columns: " + 
+								m1clen+", input 2 columns: "+m2clen, conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+						}
+						appendDim1 = (appendDim1>=0 && m2rlen>=0)? appendDim1 + m2rlen : -1;
+						appendDim2 = (m2clen>=0) ? m2clen : appendDim2;
+					}
+				}
+			}
+			
 			output.setDimensions(appendDim1, appendDim2);
 			output.setBlocksize (id.getBlocksize());
 			
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
index 76cd245..3f71d5c 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
@@ -145,10 +145,12 @@
 			FileFormat fmt = FileFormat.safeValueOf(input3.getName());
 			
 			//core matrix/frame write
-			if( input1.getDataType()==DataType.MATRIX )
-				processMatrixWriteInstruction(sec, fname, fmt);
-			else
-				processFrameWriteInstruction(sec, fname, fmt, schema);
+			switch( input1.getDataType() ) {
+				case MATRIX: processMatrixWriteInstruction(sec, fname, fmt); break;
+				case FRAME:  processFrameWriteInstruction(sec, fname, fmt, schema); break;
+				default: throw new DMLRuntimeException(
+					"Unsupported data type "+input1.getDataType()+" in WriteSPInstruction.");
+			}
 		}
 		catch(IOException ex)
 		{
diff --git a/src/test/java/org/apache/sysds/test/functions/misc/ListAppendRemove.java b/src/test/java/org/apache/sysds/test/functions/misc/ListAppendRemove.java
index 64daa4b..946ef67 100644
--- a/src/test/java/org/apache/sysds/test/functions/misc/ListAppendRemove.java
+++ b/src/test/java/org/apache/sysds/test/functions/misc/ListAppendRemove.java
@@ -29,9 +29,10 @@
 import org.apache.sysds.test.TestUtils;
 import org.apache.sysds.utils.Statistics;
 
-public class ListAppendRemove extends AutomatedTestBase 
+public class ListAppendRemove extends AutomatedTestBase
 {
 	private static final String TEST_NAME1 = "ListAppendRemove";
+	private static final String TEST_NAME2 = "ListAppend2788";
 	
 	private static final String TEST_DIR = "functions/misc/";
 	private static final String TEST_CLASS_DIR = TEST_DIR + ListAppendRemove.class.getSimpleName() + "/";
@@ -40,6 +41,7 @@
 	public void setUp() {
 		TestUtils.clearAssertionInformation();
 		addTestConfiguration( TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) );
+		addTestConfiguration( TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "R" }) );
 	}
 	
 	@Test
@@ -82,6 +84,15 @@
 		runListAppendRemove(TEST_NAME1, ExecType.SPARK, false, true);
 	}
 	
+	@Test
+	public void testStaticListAppendCP() {
+		runListAppendRemove(TEST_NAME2, ExecType.CP, true, false);
+	}
+	
+	@Test
+	public void testStaticListAppendSpark() {
+		runListAppendRemove(TEST_NAME2, ExecType.SPARK, true, false);
+	}
 	
 	private void runListAppendRemove(String testname, ExecType type, boolean rewrites, boolean conditional)
 	{
@@ -114,8 +125,7 @@
 			//check for properly compiled CP operations for list 
 			//(but spark instructions for sum, indexing, write)
 			int numExpected = (type == ExecType.CP) ? 0 :
-				conditional ? 5 : 4;
-			Assert.assertTrue(Statistics.getNoOfExecutedSPInst()==numExpected);
+				testname.equals(TEST_NAME1) ? (conditional ? 5 : 4) : 1;
 			Assert.assertTrue(Statistics.getNoOfExecutedSPInst()==numExpected);
 		}
 		finally {
diff --git a/src/test/scripts/functions/misc/ListAppend2788.dml b/src/test/scripts/functions/misc/ListAppend2788.dml
new file mode 100644
index 0000000..e6e0d8e
--- /dev/null
+++ b/src/test/scripts/functions/misc/ListAppend2788.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+mat = matrix("0 7 3 1", rows=4, cols=1)
+
+my_list = list()
+my_list = append(my_list, mat)
+
+while(FALSE){} #cut DAGs
+
+R = as.matrix(my_list[1])
+
+write(R, $2);