[MINOR] Multi-threaded parfor result file deletion (latency mitigation)

After parfor operations a result-merge implementation merges the partial
results from parfor workers into the final result variables. In case of
remote parfor, we have #result variables x #parfor tasks files, which -
in case of in-memory result merge - are into the driver, aggregated, and
finally deleted. In sub-optimal cluster configurations, the delete can
have substantial latency (independent of file size). To mitigate this
latency we now delete these files in a multi-threaded manner, which
showed good performance. Note that we refrain from asynchronous deletion
to avoid synchronization in case of parfor loops in surrounding
while/for loops (where the same files might be written multiple times).
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index d5ef760..1e4281c 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -99,6 +99,7 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 
 
@@ -1046,12 +1047,12 @@
 	 * @param out output matrix
 	 * @param in array of input matrix objects
 	 */
-	private static void cleanWorkerResultVariables(ExecutionContext ec, MatrixObject out, MatrixObject[] in) {
-		for( MatrixObject tmp : in ) {
-			//check for empty inputs (no iterations executed)
-			if( tmp != null && tmp != out )
-				ec.cleanupCacheableData(tmp);
-		}
+	private static void cleanWorkerResultVariables(ExecutionContext ec, MatrixObject out, MatrixObject[] in, boolean parallel) {
+		//check for empty inputs (no iterations executed)
+		Stream<MatrixObject> results = Arrays.stream(in).filter(m -> m!=null && m!=out);
+		//perform cleanup (parallel to mitigate file deletion bottlenecks)
+		(parallel ? results.parallel() : results)
+			.forEach(m -> ec.cleanupCacheableData(m));
 	}
 	
 	/**
@@ -1432,7 +1433,7 @@
 						ec.cleanupDataObject(exdata);
 					
 					//cleanup of intermediate result variables
-					cleanWorkerResultVariables( ec, out, in );
+					cleanWorkerResultVariables( ec, out, in, true );
 					
 					//set merged result variable
 					ec.setVariable(var._name, outNew);
@@ -1657,13 +1658,12 @@
 					}
 		
 					//cleanup of intermediate result variables
-					cleanWorkerResultVariables( _ec, out, in );
+					cleanWorkerResultVariables( _ec, out, in, false );
 				}
 				
 				_success = true;
 			}
-			catch(Exception ex)
-			{
+			catch(Exception ex) {
 				LOG.error("Error executing result merge: ", ex);
 			}
 		}
diff --git a/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java b/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java
index 561bdf1..0106099 100644
--- a/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java
+++ b/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java
@@ -19,40 +19,35 @@
 
 package org.apache.sysds.test.usertest.pythonapi;
 
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-
 import org.apache.sysds.api.PythonDMLScript;
 import org.junit.Test;
 
 /** Simple tests to verify startup of Python Gateway server happens without crashes */
 public class StartupTest {
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testStartupIncorrect_1() {
-        PythonDMLScript.main(new String[] {});
-    }
+	@Test(expected = IllegalArgumentException.class)
+	public void testStartupIncorrect_1() {
+		PythonDMLScript.main(new String[] {});
+	}
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testStartupIncorrect_2() {
-        PythonDMLScript.main(new String[] {""});
-    }
+	@Test(expected = IllegalArgumentException.class)
+	public void testStartupIncorrect_2() {
+		PythonDMLScript.main(new String[] {""});
+	}
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testStartupIncorrect_3() {
-        PythonDMLScript.main(new String[] {"131", "131"});
-    }
+	@Test(expected = IllegalArgumentException.class)
+	public void testStartupIncorrect_3() {
+		PythonDMLScript.main(new String[] {"131", "131"});
+	}
 
-    @Test(expected = NumberFormatException.class)
-    public void testStartupIncorrect_4() {
-        PythonDMLScript.main(new String[] {"Hello"});
-    }
+	@Test(expected = NumberFormatException.class)
+	public void testStartupIncorrect_4() {
+		PythonDMLScript.main(new String[] {"Hello"});
+	}
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testStartupIncorrect_5() {
-        // Number out of range
-        PythonDMLScript.main(new String[] {"918757"});
-    }
+	@Test(expected = IllegalArgumentException.class)
+	public void testStartupIncorrect_5() {
+		// Number out of range
+		PythonDMLScript.main(new String[] {"918757"});
+	}
 }