[MINOR] Multi-threaded parfor result file deletion (latency mitigation)
After parfor operations a result-merge implementation merges the partial
results from parfor workers into the final result variables. In case of
remote parfor, we have #result variables x #parfor tasks files, which -
in case of in-memory result merge - are into the driver, aggregated, and
finally deleted. In sub-optimal cluster configurations, the delete can
have substantial latency (independent of file size). To mitigate this
latency we now delete these files in a multi-threaded manner, which
showed good performance. Note that we refrain from asynchronous deletion
to avoid synchronization in case of parfor loops in surrounding
while/for loops (where the same files might be written multiple times).
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index d5ef760..1e4281c 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -99,6 +99,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
@@ -1046,12 +1047,12 @@
* @param out output matrix
* @param in array of input matrix objects
*/
- private static void cleanWorkerResultVariables(ExecutionContext ec, MatrixObject out, MatrixObject[] in) {
- for( MatrixObject tmp : in ) {
- //check for empty inputs (no iterations executed)
- if( tmp != null && tmp != out )
- ec.cleanupCacheableData(tmp);
- }
+ private static void cleanWorkerResultVariables(ExecutionContext ec, MatrixObject out, MatrixObject[] in, boolean parallel) {
+ //check for empty inputs (no iterations executed)
+ Stream<MatrixObject> results = Arrays.stream(in).filter(m -> m!=null && m!=out);
+ //perform cleanup (parallel to mitigate file deletion bottlenecks)
+ (parallel ? results.parallel() : results)
+ .forEach(m -> ec.cleanupCacheableData(m));
}
/**
@@ -1432,7 +1433,7 @@
ec.cleanupDataObject(exdata);
//cleanup of intermediate result variables
- cleanWorkerResultVariables( ec, out, in );
+ cleanWorkerResultVariables( ec, out, in, true );
//set merged result variable
ec.setVariable(var._name, outNew);
@@ -1657,13 +1658,12 @@
}
//cleanup of intermediate result variables
- cleanWorkerResultVariables( _ec, out, in );
+ cleanWorkerResultVariables( _ec, out, in, false );
}
_success = true;
}
- catch(Exception ex)
- {
+ catch(Exception ex) {
LOG.error("Error executing result merge: ", ex);
}
}
diff --git a/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java b/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java
index 561bdf1..0106099 100644
--- a/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java
+++ b/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java
@@ -19,40 +19,35 @@
package org.apache.sysds.test.usertest.pythonapi;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-
import org.apache.sysds.api.PythonDMLScript;
import org.junit.Test;
/** Simple tests to verify startup of Python Gateway server happens without crashes */
public class StartupTest {
- @Test(expected = IllegalArgumentException.class)
- public void testStartupIncorrect_1() {
- PythonDMLScript.main(new String[] {});
- }
+ @Test(expected = IllegalArgumentException.class)
+ public void testStartupIncorrect_1() {
+ PythonDMLScript.main(new String[] {});
+ }
- @Test(expected = IllegalArgumentException.class)
- public void testStartupIncorrect_2() {
- PythonDMLScript.main(new String[] {""});
- }
+ @Test(expected = IllegalArgumentException.class)
+ public void testStartupIncorrect_2() {
+ PythonDMLScript.main(new String[] {""});
+ }
- @Test(expected = IllegalArgumentException.class)
- public void testStartupIncorrect_3() {
- PythonDMLScript.main(new String[] {"131", "131"});
- }
+ @Test(expected = IllegalArgumentException.class)
+ public void testStartupIncorrect_3() {
+ PythonDMLScript.main(new String[] {"131", "131"});
+ }
- @Test(expected = NumberFormatException.class)
- public void testStartupIncorrect_4() {
- PythonDMLScript.main(new String[] {"Hello"});
- }
+ @Test(expected = NumberFormatException.class)
+ public void testStartupIncorrect_4() {
+ PythonDMLScript.main(new String[] {"Hello"});
+ }
- @Test(expected = IllegalArgumentException.class)
- public void testStartupIncorrect_5() {
- // Number out of range
- PythonDMLScript.main(new String[] {"918757"});
- }
+ @Test(expected = IllegalArgumentException.class)
+ public void testStartupIncorrect_5() {
+ // Number out of range
+ PythonDMLScript.main(new String[] {"918757"});
+ }
}