[SYSTEMDS-2576] Fix dop unoptimized functions (parfor-eval/parmserv)
This patch fixes the degree of parallelism of unoptimized functions as
called in second-order functions such as eval and paramserv to avoid
excessive CPU over-commitment. For example, on a box with 32 threads,
paramserv would run 32 local workers and each worker use 32 threads for
individual operations (1024 threads total).
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
index 699b72f..968cb1d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
@@ -218,10 +218,11 @@
{
Program prog = ec.getProgram();
- // 1. Recompile the internal program blocks
+ // 1. Recompile the internal program blocks
recompileProgramBlocks(k, prog.getProgramBlocks());
// 2. Recompile the imported function blocks
- prog.getFunctionProgramBlocks().forEach((fname, fvalue) -> recompileProgramBlocks(k, fvalue.getChildBlocks()));
+ prog.getFunctionProgramBlocks(false)
+ .forEach((fname, fvalue) -> recompileProgramBlocks(k, fvalue.getChildBlocks()));
// 3. Copy all functions
return ExecutionContextFactory.createContext(
@@ -247,7 +248,7 @@
return newProg;
}
- private static void recompileProgramBlocks(int k, List<ProgramBlock> pbs) {
+ public static void recompileProgramBlocks(int k, List<ProgramBlock> pbs) {
// Reset the visit status from root
for (ProgramBlock pb : pbs)
DMLTranslator.resetHopsDAGVisitStatus(pb.getStatementBlock());
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanMappingAbstract.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanMappingAbstract.java
index eced04f..956ca12 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanMappingAbstract.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanMappingAbstract.java
@@ -84,6 +84,10 @@
return ret;
}
+ public ProgramBlock getMappedProgramBlock(long id) {
+ return (ProgramBlock) _id_rtprog.get(id);
+ }
+
public void replaceMapping( ProgramBlock pb, OptNode n ) {
long id = n.getID();
_id_rtprog.put(id, pb);
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 7a6933e..63ae8af 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.common.Types.OpOpN;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.AggBinaryOp;
import org.apache.sysds.hops.AggBinaryOp.MMultMethod;
@@ -66,6 +67,7 @@
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.controlprogram.paramserv.ParamservUtils;
import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalFile;
import org.apache.sysds.runtime.controlprogram.parfor.opt.CostEstimator.ExcludeType;
import org.apache.sysds.runtime.controlprogram.parfor.opt.CostEstimator.TestMeasure;
@@ -1247,7 +1249,7 @@
long id = c.getID();
c.setK(tmpK);
ParForProgramBlock pfpb = (ParForProgramBlock)
- OptTreeConverter.getAbstractPlanMapping().getMappedProg(id)[1];
+ OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(id);
pfpb.setDegreeOfParallelism(tmpK);
//distribute remaining parallelism
@@ -1275,6 +1277,13 @@
mhop.setMaxNumThreads(1); //set max constraint in hop
c.setK(1); //set optnode k (for explain)
}
+
+ //if parfor contains eval call, make unoptimized functions single-threaded
+ if( HopRewriteUtils.isNary(h, OpOpN.EVAL) ) {
+ ProgramBlock pb = OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
+ pb.getProgram().getFunctionProgramBlocks(false)
+ .forEach((fname, fvalue) -> ParamservUtils.recompileProgramBlocks(1, fvalue.getChildBlocks()));
+ }
}
else
rAssignRemainingParallelism(c, parforK, opsK);
@@ -1284,7 +1293,7 @@
if( recompileSB ) {
try {
//guaranteed to be a last-level block (see hop change)
- ProgramBlock pb = (ProgramBlock) OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ ProgramBlock pb = OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
Recompiler.recompileProgramBlockInstructions(pb);
}
catch(Exception ex){
@@ -1356,7 +1365,7 @@
// modify rtprog
ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
- .getAbstractPlanMapping().getMappedProg(id)[1];
+ .getAbstractPlanMapping().getMappedProgramBlock(id);
pfpb.setTaskPartitioner(partitioner);
// modify plan
@@ -2403,10 +2412,9 @@
{
boolean ret = false;
- if( n.getNodeType() == NodeType.PARFOR )
- {
- ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
- .getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ if( n.getNodeType() == NodeType.PARFOR ) {
+ ProgramBlock pfpb = OptTreeConverter
+ .getAbstractPlanMapping().getMappedProgramBlock(n.getID());
ret = (parfor == pfpb);
}
@@ -2425,7 +2433,7 @@
if( n.getNodeType()==NodeType.PARFOR )
{
ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
- .getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ .getAbstractPlanMapping().getMappedProgramBlock(n.getID());
pbs.add(pfpb);
}
diff --git a/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java b/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
index 9181351..89ec265 100644
--- a/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
@@ -48,6 +48,7 @@
private final static String TEST_NAME17 = "FunPotpourriNamedArgsQuotedAssign";
private final static String TEST_NAME18 = "FunPotpourriMultiReturnBuiltin1";
private final static String TEST_NAME19 = "FunPotpourriMultiReturnBuiltin2";
+ private final static String TEST_NAME20 = "FunPotpourriNestedParforEval";
private final static String TEST_DIR = "functions/misc/";
private final static String TEST_CLASS_DIR = TEST_DIR + FunctionPotpourriTest.class.getSimpleName() + "/";
@@ -74,6 +75,7 @@
addTestConfiguration( TEST_NAME17, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME17, new String[] { "R" }) );
addTestConfiguration( TEST_NAME18, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME18, new String[] { "R" }) );
addTestConfiguration( TEST_NAME19, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME19, new String[] { "R" }) );
+ addTestConfiguration( TEST_NAME20, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME20, new String[] { "R" }) );
}
@Test
@@ -181,6 +183,11 @@
runFunctionTest( TEST_NAME19, false );
}
+ @Test
+ public void testFunctionNestedParforEval() {
+ runFunctionTest( TEST_NAME20, false );
+ }
+
private void runFunctionTest(String testName, boolean error) {
TestConfiguration config = getTestConfiguration(testName);
loadTestConfiguration(config);
diff --git a/src/test/scripts/functions/misc/FunPotpourriNestedParforEval.dml b/src/test/scripts/functions/misc/FunPotpourriNestedParforEval.dml
new file mode 100644
index 0000000..4dc37cb
--- /dev/null
+++ b/src/test/scripts/functions/misc/FunPotpourriNestedParforEval.dml
@@ -0,0 +1,36 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+foo1 = function(Matrix[Double] A, Matrix[Double] B) return (Matrix[Double] C) {
+ while(FALSE){} # no inlining
+ C = A %*% B + 7;
+}
+
+X1 = matrix(1.1, 100, 100)
+X2 = matrix(1.2, 100, 100)
+f = "foo1";
+
+R = matrix(0, 100, 100)
+parfor(i in 1:nrow(R) )
+ parfor(j in 1:ncol(R) )
+ R[i,j] = sum(eval(f, list(A=X1, B=X2)));
+
+print("out: " + sum(R))