give priority to the following children of onebyoneNode
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java index 6734c75..7de1298 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -61,6 +61,7 @@ private final Map<String, Set<String>> allSensorsMap; private int degreeOfParallelism = IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism(); + private boolean allowParallelScanOperator = true; // this is shared with all subContexts private final AtomicInteger nextPipelineId; private final List<PipelineDriverFactory> pipelineDriverFactories; @@ -105,6 +106,7 @@ this.cachedDataTypes = parentContext.cachedDataTypes; this.driverContext = parentContext.getDriverContext().createSubDriverContext(getNextPipelineId()); + this.allowParallelScanOperator = parentContext.allowParallelScanOperator; } // for schema region @@ -266,4 +268,12 @@ public long getDataRegionTTL() { return dataRegionTTL; } + + public boolean isAllowParallelScanOperator() { + return allowParallelScanOperator; + } + + public void setAllowParallelScanOperator(boolean allowParallelScanOperator) { + this.allowParallelScanOperator = allowParallelScanOperator; + } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index d64ac85..2653216 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -293,7 +293,7 @@ seriesScanOptionsBuilder.withLimit(node.getLimit()); seriesScanOptionsBuilder.withOffset(node.getOffset()); - if (context.getDegreeOfParallelism() == 1) { + if (context.getDegreeOfParallelism() == 1 || !context.isAllowParallelScanOperator()) { OperatorContext operatorContext = context .getDriverContext() @@ -338,7 +338,7 @@ seriesScanOptionsBuilder.withOffset(node.getOffset()); seriesScanOptionsBuilder.withAllSensors(new HashSet<>(seriesPath.getMeasurementList())); - if (context.getDegreeOfParallelism() == 1) { + if (context.getDegreeOfParallelism() == 1 || !context.isAllowParallelScanOperator()) { OperatorContext operatorContext = context .getDriverContext() @@ -402,7 +402,7 @@ scanOptionsBuilder.withGlobalTimeFilter(timeFilter.copy()); } - if (context.getDegreeOfParallelism() == 1) { + if (context.getDegreeOfParallelism() == 1 || !context.isAllowParallelScanOperator()) { OperatorContext operatorContext = context .getDriverContext() @@ -487,7 +487,7 @@ scanOptionsBuilder.withGlobalTimeFilter(timeFilter.copy()); } - if (context.getDegreeOfParallelism() == 1) { + if (context.getDegreeOfParallelism() == 1 || !context.isAllowParallelScanOperator()) { OperatorContext operatorContext = context .getDriverContext() @@ -2770,7 +2770,7 @@ // 1. divide every child to pipeline using the max dop if (context.getDegreeOfParallelism() == 1 || node.getChildren().size() == 1) { - // If dop = 1, we don't create extra pipeline + // If dop = 1 or the size of children = 1, we don't create extra pipeline for current operator for (PlanNode childSource : node.getChildren()) { Operator childOperation = childSource.accept(this, context); finalExchangeNum = Math.max(finalExchangeNum, context.getExchangeSumNum()); @@ -2778,6 +2778,9 @@ parentPipelineChildren.add(childOperation); } } else { + // For oneByOneNode with more than one child, we offer running priority and dop to the + // following children, and don't allow parallel scan operator + context.setAllowParallelScanOperator(false); List<Integer> childPipelineNums = new ArrayList<>(); List<Integer> childExchangeNums = new ArrayList<>(); int sumOfChildPipelines = 0, sumOfChildExchangeNums = 0; @@ -2792,11 +2795,7 @@ LocalExecutionPlanContext subContext = context.createSubContext(); // Only context.getDegreeOfParallelism() - 1 can be allocated to child int dopForChild = context.getDegreeOfParallelism() - 1; - if (childNode instanceof SeriesSourceNode && node.getChildren().size() > 1) { - subContext.setDegreeOfParallelism(1); - } else { - subContext.setDegreeOfParallelism(dopForChild); - } + subContext.setDegreeOfParallelism(dopForChild); int originPipeNum = context.getPipelineNumber(); Operator childOperation = childNode.accept(this, subContext); ISinkChannel localSinkChannel =