| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.queryengine.plan.planner; |
| |
| import org.apache.iotdb.common.rpc.thrift.TAggregationType; |
| import org.apache.iotdb.common.rpc.thrift.TEndPoint; |
| import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; |
| import org.apache.iotdb.commons.exception.IllegalPathException; |
| import org.apache.iotdb.commons.path.AlignedPath; |
| import org.apache.iotdb.commons.path.MeasurementPath; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; |
| import org.apache.iotdb.db.queryengine.common.PlanFragmentId; |
| import org.apache.iotdb.db.queryengine.common.QueryId; |
| import org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext; |
| import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; |
| import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; |
| import org.apache.iotdb.db.queryengine.execution.operator.Operator; |
| import org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator; |
| import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator; |
| import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator; |
| import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; |
| import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; |
| import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter; |
| import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey; |
| import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; |
| import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; |
| import org.apache.iotdb.db.storageengine.dataregion.DataRegion; |
| |
| import org.apache.tsfile.enums.TSDataType; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.ExecutorService; |
| |
| import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; |
| import static org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey.DEVICE; |
| import static org.junit.Assert.assertEquals; |
| |
| public class PipelineBuilderTest { |
| |
| OperatorTreeGenerator operatorTreeGenerator = new OperatorTreeGenerator(); |
| |
| /** |
| * The operator structure is [TimeJoin1 - [SeriesScan0,SeriesScan1,SeriesScan2,SeriesScan3]]. |
| * |
| * <p>The next six tests, I will test this TimeJoinOperator with different dop. |
| * |
| * <p>The first test will test dop = 1. Expected result is that no child pipelines will be |
| * divided. |
| */ |
| @Test |
| public void testConsumeAllChildrenPipelineBuilder1() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| FullOuterTimeJoinNode fullOuterTimeJoinNode = initFullOuterTimeJoinNode(typeProvider, 4); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(1); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( |
| fullOuterTimeJoinNode, context); |
| assertEquals(0, context.getPipelineNumber()); |
| assertEquals(4, childrenOperator.size()); |
| assertEquals(4, fullOuterTimeJoinNode.getChildren().size()); |
| for (int i = 0; i < 4; i++) { |
| assertEquals(SeriesScanOperator.class, childrenOperator.get(i).getClass()); |
| assertEquals(SeriesScanNode.class, fullOuterTimeJoinNode.getChildren().get(i).getClass()); |
| assertEquals( |
| String.format("root.sg.d%d.s1", i), |
| fullOuterTimeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); |
| } |
| |
| // Validate the number exchange operator |
| assertEquals(0, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * This test will test dop = 2. Expected result is two pipelines: |
| * |
| * <p>The first is: TimeJoin1 - [SeriesScan1, SeriesScan0, ExchangeOperator]; |
| * |
| * <p>The second is: ExchangeOperator - TimeJoin1-1[SeriesScan2, SeriesScan3]. |
| */ |
| @Test |
| public void testConsumeAllChildrenPipelineBuilder2() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| FullOuterTimeJoinNode fullOuterTimeJoinNode = initFullOuterTimeJoinNode(typeProvider, 4); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(2); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( |
| fullOuterTimeJoinNode, context); |
| // The number of pipeline is 1, since parent pipeline hasn't joined |
| assertEquals(1, context.getPipelineNumber()); |
| |
| // Validate the first pipeline |
| assertEquals(3, childrenOperator.size()); |
| assertEquals(3, fullOuterTimeJoinNode.getChildren().size()); |
| for (int i = 0; i < 2; i++) { |
| assertEquals(SeriesScanOperator.class, childrenOperator.get(i).getClass()); |
| assertEquals(SeriesScanNode.class, fullOuterTimeJoinNode.getChildren().get(i).getClass()); |
| } |
| assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass()); |
| |
| // Validate the changes of node structure |
| assertEquals( |
| "root.sg.d0.s1", fullOuterTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); |
| assertEquals( |
| "root.sg.d1.s1", fullOuterTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); |
| assertEquals( |
| FullOuterTimeJoinNode.class, fullOuterTimeJoinNode.getChildren().get(2).getClass()); |
| |
| // Validate the second pipeline |
| FullOuterTimeJoinNode subFullOuterTimeJoinNode = |
| (FullOuterTimeJoinNode) fullOuterTimeJoinNode.getChildren().get(2); |
| assertEquals(2, subFullOuterTimeJoinNode.getChildren().size()); |
| assertEquals( |
| "root.sg.d2.s1", |
| subFullOuterTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); |
| assertEquals( |
| "root.sg.d3.s1", |
| subFullOuterTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); |
| |
| // Validate the number exchange operator |
| assertEquals(1, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * This test will test dop = 3. Expected result is three pipelines: |
| * |
| * <p>The first is: TimeJoin1 - [SeriesScan0, ExchangeOperator, ExchangeOperator]; |
| * |
| * <p>The second is: ExchangeOperator - SeriesScan1. |
| * |
| * <p>The third is: ExchangeOperator - TimeJoin1-1[SeriesScan2, SeriesScan3]. |
| */ |
| @Test |
| public void testConsumeAllChildrenPipelineBuilder3() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| FullOuterTimeJoinNode fullOuterTimeJoinNode = initFullOuterTimeJoinNode(typeProvider, 4); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(3); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( |
| fullOuterTimeJoinNode, context); |
| // The number of pipeline is 2, since parent pipeline hasn't joined |
| assertEquals(2, context.getPipelineNumber()); |
| |
| // Validate the first pipeline |
| assertEquals(3, childrenOperator.size()); |
| assertEquals(SeriesScanOperator.class, childrenOperator.get(0).getClass()); |
| assertEquals(ExchangeOperator.class, childrenOperator.get(1).getClass()); |
| assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass()); |
| |
| // Validate the changes of node structure |
| assertEquals(3, fullOuterTimeJoinNode.getChildren().size()); |
| assertEquals( |
| "root.sg.d0.s1", fullOuterTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); |
| assertEquals( |
| "root.sg.d1.s1", fullOuterTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); |
| assertEquals( |
| FullOuterTimeJoinNode.class, fullOuterTimeJoinNode.getChildren().get(2).getClass()); |
| |
| // Validate the second pipeline |
| ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(1); |
| assertEquals("SeriesScanNode1", exchangeOperator1.getSourceId().getId()); |
| |
| // Validate the third pipeline |
| FullOuterTimeJoinNode subFullOuterTimeJoinNode = |
| (FullOuterTimeJoinNode) fullOuterTimeJoinNode.getChildren().get(2); |
| assertEquals(2, subFullOuterTimeJoinNode.getChildren().size()); |
| assertEquals( |
| "root.sg.d2.s1", |
| subFullOuterTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); |
| assertEquals( |
| "root.sg.d3.s1", |
| subFullOuterTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); |
| ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(2); |
| assertEquals(exchangeOperator2.getSourceId(), subFullOuterTimeJoinNode.getPlanNodeId()); |
| |
| // Validate the number exchange operator |
| assertEquals(2, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * This test will test dop = 4. Expected result is four pipelines: |
| * |
| * <p>The first is: TimeJoin1 - [SeriesScan0, ExchangeOperator, ExchangeOperator, |
| * ExchangeOperator]; |
| * |
| * <p>The second is: ExchangeOperator - SeriesScan1. |
| * |
| * <p>The third is: ExchangeOperator - SeriesScan2. |
| * |
| * <p>The forth is: ExchangeOperator - SeriesScan3. |
| */ |
| @Test |
| public void testConsumeAllChildrenPipelineBuilder4() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| FullOuterTimeJoinNode fullOuterTimeJoinNode = initFullOuterTimeJoinNode(typeProvider, 4); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(4); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( |
| fullOuterTimeJoinNode, context); |
| // The number of pipeline is 3, since parent pipeline hasn't joined |
| assertEquals(3, context.getPipelineNumber()); |
| |
| // Validate the first pipeline |
| assertEquals(4, childrenOperator.size()); |
| assertEquals(SeriesScanOperator.class, childrenOperator.get(0).getClass()); |
| assertEquals(ExchangeOperator.class, childrenOperator.get(1).getClass()); |
| assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass()); |
| assertEquals(ExchangeOperator.class, childrenOperator.get(3).getClass()); |
| |
| // Validate the changes of node structure |
| assertEquals(4, fullOuterTimeJoinNode.getChildren().size()); |
| for (int i = 0; i < 4; i++) { |
| assertEquals(SeriesScanNode.class, fullOuterTimeJoinNode.getChildren().get(i).getClass()); |
| assertEquals( |
| String.format("root.sg.d%d.s1", i), |
| fullOuterTimeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); |
| } |
| |
| // Validate the second pipeline |
| ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(1); |
| assertEquals("SeriesScanNode1", exchangeOperator1.getSourceId().getId()); |
| |
| // Validate the third pipeline |
| ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(2); |
| assertEquals("SeriesScanNode2", exchangeOperator2.getSourceId().getId()); |
| |
| // Validate the forth pipeline |
| ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(3); |
| assertEquals("SeriesScanNode3", exchangeOperator3.getSourceId().getId()); |
| |
| // Validate the number exchange operator |
| assertEquals(3, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * This test will test dop = 5. Expected result is five pipelines: |
| * |
| * <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator, ExchangeOperator, |
| * ExchangeOperator]; |
| * |
| * <p>The second is: ExchangeOperator - SeriesScan0. |
| * |
| * <p>The third is: ExchangeOperator - SeriesScan1. |
| * |
| * <p>The forth is: ExchangeOperator - SeriesScan2. |
| * |
| * <p>The fifth is: ExchangeOperator - SeriesScan3. |
| */ |
| @Test |
| public void testConsumeAllChildrenPipelineBuilder5() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| FullOuterTimeJoinNode fullOuterTimeJoinNode = initFullOuterTimeJoinNode(typeProvider, 4); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(5); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( |
| fullOuterTimeJoinNode, context); |
| // The number of pipeline is 4, since parent pipeline hasn't joined |
| assertEquals(4, context.getPipelineNumber()); |
| |
| // Validate the first pipeline |
| assertEquals(4, childrenOperator.size()); |
| for (int i = 0; i < 4; i++) { |
| assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); |
| } |
| |
| // Validate the changes of node structure |
| assertEquals(4, fullOuterTimeJoinNode.getChildren().size()); |
| for (int i = 0; i < 4; i++) { |
| assertEquals(SeriesScanNode.class, fullOuterTimeJoinNode.getChildren().get(i).getClass()); |
| assertEquals( |
| String.format("root.sg.d%d.s1", i), |
| fullOuterTimeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); |
| } |
| |
| // Validate the second pipeline |
| ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); |
| assertEquals("SeriesScanNode0", exchangeOperator1.getSourceId().getId()); |
| |
| // Validate the third pipeline |
| ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); |
| assertEquals("SeriesScanNode1", exchangeOperator2.getSourceId().getId()); |
| |
| // Validate the forth pipeline |
| ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); |
| assertEquals("SeriesScanNode2", exchangeOperator3.getSourceId().getId()); |
| |
| // Validate the fifth pipeline |
| ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); |
| assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId()); |
| |
| // Validate the number exchange operator |
| assertEquals(4, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * This test will test dop = 6. Expected result is still five pipelines: |
| * |
| * <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator, ExchangeOperator, |
| * ExchangeOperator]; |
| * |
| * <p>The second is: ExchangeOperator - SeriesScan0. |
| * |
| * <p>The third is: ExchangeOperator - SeriesScan1. |
| * |
| * <p>The forth is: ExchangeOperator - SeriesScan2. |
| * |
| * <p>The fifth is: ExchangeOperator - SeriesScan3. |
| */ |
| @Test |
| public void testConsumeAllChildrenPipelineBuilder6() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| FullOuterTimeJoinNode fullOuterTimeJoinNode = initFullOuterTimeJoinNode(typeProvider, 4); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(6); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( |
| fullOuterTimeJoinNode, context); |
| // The number of pipeline is 4, since parent pipeline hasn't joined |
| assertEquals(4, context.getPipelineNumber()); |
| |
| // Validate the first pipeline |
| assertEquals(4, childrenOperator.size()); |
| for (int i = 0; i < 4; i++) { |
| assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); |
| } |
| |
| // Validate the changes of node structure |
| assertEquals(4, fullOuterTimeJoinNode.getChildren().size()); |
| for (int i = 0; i < 4; i++) { |
| assertEquals(SeriesScanNode.class, fullOuterTimeJoinNode.getChildren().get(i).getClass()); |
| assertEquals( |
| String.format("root.sg.d%d.s1", i), |
| fullOuterTimeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); |
| } |
| |
| // Validate the second pipeline |
| ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); |
| assertEquals("SeriesScanNode0", exchangeOperator1.getSourceId().getId()); |
| |
| // Validate the third pipeline |
| ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); |
| assertEquals("SeriesScanNode1", exchangeOperator2.getSourceId().getId()); |
| |
| // Validate the forth pipeline |
| ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); |
| assertEquals("SeriesScanNode2", exchangeOperator3.getSourceId().getId()); |
| |
| // Validate the fifth pipeline |
| ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); |
| assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId()); |
| |
| // Validate the number exchange operator |
| assertEquals(4, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * This test will test dop = 3. Expected result is three pipelines: |
| * |
| * <p>The first is: TopKNode1 - [SingleDeviceViewNode0, ExchangeOperator, ExchangeOperator]; |
| * |
| * <p>The second is: ExchangeOperator - SingleDeviceViewNode1. |
| * |
| * <p>The third is: ExchangeOperator - TopKNode1-1[SingleDeviceViewNode2, SingleDeviceViewNode3]. |
| */ |
| @Test |
| public void testTopKConsumeAllChildrenPipelineBuilder3() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| TopKNode topKNode = initTopKNode(typeProvider, 4); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(3); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(topKNode, context); |
| // The number of pipeline is 2, since parent pipeline hasn't joined |
| assertEquals(2, context.getPipelineNumber()); |
| |
| // Validate the first pipeline |
| assertEquals(3, childrenOperator.size()); |
| assertEquals(SingleDeviceViewOperator.class, childrenOperator.get(0).getClass()); |
| assertEquals(ExchangeOperator.class, childrenOperator.get(1).getClass()); |
| assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass()); |
| |
| // Validate the changes of node structure |
| assertEquals(3, topKNode.getChildren().size()); |
| assertEquals("Time", topKNode.getChildren().get(0).getOutputColumnNames().get(0)); |
| assertEquals("Time", topKNode.getChildren().get(1).getOutputColumnNames().get(0)); |
| assertEquals("Time", topKNode.getChildren().get(2).getOutputColumnNames().get(0)); |
| assertEquals(TopKNode.class, topKNode.getChildren().get(2).getClass()); |
| |
| // Validate the second pipeline |
| ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(1); |
| assertEquals("SingleDeviceViewNode1", exchangeOperator1.getSourceId().getId()); |
| |
| // Validate the third pipeline |
| TopKNode subTimeJoinNode = (TopKNode) topKNode.getChildren().get(2); |
| assertEquals(2, subTimeJoinNode.getChildren().size()); |
| assertEquals("Time", subTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); |
| assertEquals("Time", subTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); |
| ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(2); |
| assertEquals(exchangeOperator2.getSourceId(), subTimeJoinNode.getPlanNodeId()); |
| |
| // Validate the number exchange operator |
| assertEquals(2, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * The operator structure is [DeviceView - [SeriesScan0,SeriesScan1,SeriesScan2,SeriesScan3]]. |
| * |
| * <p>The next six tests, I will test this DeviceViewOperator with different dop. |
| * |
| * <p>The first test will test dop = 1. Expected result is that no child pipelines will be |
| * divided. |
| */ |
| @Test |
| public void testConsumeOneByOneChildrenPipelineBuilder1() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(1); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context); |
| assertEquals(0, context.getPipelineNumber()); |
| assertEquals(4, childrenOperator.size()); |
| for (int i = 0; i < 4; i++) { |
| assertEquals(AlignedSeriesScanOperator.class, childrenOperator.get(i).getClass()); |
| assertEquals( |
| String.format("root.sg.d%d.s1", i), |
| deviceViewNode.getChildren().get(i).getOutputColumnNames().get(0)); |
| } |
| |
| // Validate the number exchange operator |
| assertEquals(0, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * This test will test dop = 2. Expected result is five pipelines with dependency: |
| * |
| * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator, |
| * ExchangeOperator]; |
| * |
| * <p>The second is: ExchangeOperator - SeriesScan0. |
| * |
| * <p>The third is: ExchangeOperator - SeriesScan1, which has dependency second pipeline. |
| * |
| * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency third pipeline. |
| * |
| * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency forth pipeline. |
| */ |
| @Test |
| public void testConsumeOneByOneChildrenPipelineBuilder2() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(2); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context); |
| // The number of pipeline is 4, since parent pipeline hasn't joined |
| assertEquals(4, context.getPipelineNumber()); |
| |
| // Validate the first pipeline |
| assertEquals(4, childrenOperator.size()); |
| for (int i = 0; i < 4; i++) { |
| assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); |
| } |
| |
| // Validate the second pipeline |
| ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); |
| assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); |
| |
| // Validate the third pipeline |
| ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); |
| assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId()); |
| assertEquals(0, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex()); |
| |
| // Validate the forth pipeline |
| ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); |
| assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId()); |
| assertEquals(1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex()); |
| |
| // Validate the fifth pipeline |
| ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); |
| assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId()); |
| assertEquals(2, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex()); |
| |
| // Validate the number exchange operator |
| assertEquals(1, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * This test will test dop = 3. Expected result is five pipelines with dependency: |
| * |
| * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator, |
| * ExchangeOperator]; |
| * |
| * <p>The second is: ExchangeOperator - SeriesScan0. |
| * |
| * <p>The third is: ExchangeOperator - SeriesScan1. |
| * |
| * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency second pipeline. |
| * |
| * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency third pipeline. |
| */ |
| @Test |
| public void testConsumeOneByOneChildrenPipelineBuilder3() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(3); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context); |
| // The number of pipeline is 4, since parent pipeline hasn't joined |
| assertEquals(4, context.getPipelineNumber()); |
| |
| // Validate the first pipeline |
| assertEquals(4, childrenOperator.size()); |
| for (int i = 0; i < 4; i++) { |
| assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); |
| } |
| |
| // Validate the second pipeline |
| ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); |
| assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); |
| |
| // Validate the third pipeline |
| ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); |
| assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex()); |
| |
| // Validate the forth pipeline |
| ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); |
| assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId()); |
| assertEquals(0, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex()); |
| |
| // Validate the fifth pipeline |
| ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); |
| assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId()); |
| assertEquals(1, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex()); |
| |
| // Validate the number exchange operator |
| assertEquals(2, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * This test will test dop = 4. Expected result is five pipelines with dependency: |
| * |
| * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator, |
| * ExchangeOperator]; |
| * |
| * <p>The second is: ExchangeOperator - SeriesScan0. |
| * |
| * <p>The third is: ExchangeOperator - SeriesScan1. |
| * |
| * <p>The forth is: ExchangeOperator - SeriesScan2. |
| * |
| * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency second pipeline. |
| */ |
| @Test |
| public void testConsumeOneByOneChildrenPipelineBuilder4() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(4); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context); |
| // The number of pipeline is 4, since parent pipeline hasn't joined |
| assertEquals(4, context.getPipelineNumber()); |
| |
| // Validate the first pipeline |
| assertEquals(4, childrenOperator.size()); |
| for (int i = 0; i < 4; i++) { |
| assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); |
| } |
| |
| // Validate the second pipeline |
| ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); |
| assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); |
| |
| // Validate the third pipeline |
| ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); |
| assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex()); |
| |
| // Validate the forth pipeline |
| ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); |
| assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex()); |
| |
| // Validate the fifth pipeline |
| ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); |
| assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId()); |
| assertEquals(0, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex()); |
| |
| // Validate the number exchange operator |
| assertEquals(3, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * This test will test dop = 5. Expected result is five pipelines without dependency: |
| * |
| * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator, |
| * ExchangeOperator]; |
| * |
| * <p>The second is: ExchangeOperator - SeriesScan0. |
| * |
| * <p>The third is: ExchangeOperator - SeriesScan1. |
| * |
| * <p>The forth is: ExchangeOperator - SeriesScan2. |
| * |
| * <p>The fifth is: ExchangeOperator - SeriesScan3. |
| */ |
| @Test |
| public void testConsumeOneByOneChildrenPipelineBuilder5() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(5); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context); |
| // The number of pipeline is 4, since parent pipeline hasn't joined |
| assertEquals(4, context.getPipelineNumber()); |
| |
| // Validate the first pipeline |
| assertEquals(4, childrenOperator.size()); |
| for (int i = 0; i < 4; i++) { |
| assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); |
| } |
| |
| // Validate the second pipeline |
| ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); |
| assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); |
| |
| // Validate the third pipeline |
| ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); |
| assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex()); |
| |
| // Validate the forth pipeline |
| ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); |
| assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex()); |
| |
| // Validate the fifth pipeline |
| ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); |
| assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex()); |
| |
| // Validate the number exchange operator |
| assertEquals(4, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * This test will test dop = 6. Expected result is five pipelines without dependency: |
| * |
| * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator, |
| * ExchangeOperator]; |
| * |
| * <p>The second is: ExchangeOperator - SeriesScan0. |
| * |
| * <p>The third is: ExchangeOperator - SeriesScan1. |
| * |
| * <p>The forth is: ExchangeOperator - SeriesScan2. |
| * |
| * <p>The fifth is: ExchangeOperator - SeriesScan3. |
| */ |
| @Test |
| public void testConsumeOneByOneChildrenPipelineBuilder6() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(5); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context); |
| // The number of pipeline is 4, since parent pipeline hasn't joined |
| assertEquals(4, context.getPipelineNumber()); |
| |
| // Validate the first pipeline |
| assertEquals(4, childrenOperator.size()); |
| for (int i = 0; i < 4; i++) { |
| assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); |
| } |
| |
| // Validate the second pipeline |
| ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); |
| assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); |
| |
| // Validate the third pipeline |
| ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); |
| assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex()); |
| |
| // Validate the forth pipeline |
| ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); |
| assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex()); |
| |
| // Validate the fifth pipeline |
| ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); |
| assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex()); |
| |
| // Validate the number exchange operator |
| assertEquals(4, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * The operator structure is: |
| * |
| * <p>DeviceViewOperator - [AggregationOperator1 - [SeriesAggregationScanOperator1, |
| * ExchangeOperator1]], [AggregationOperator2 - [SeriesAggregationScanOperator2, |
| * ExchangeOperator2]]. |
| * |
| * <p>This test will test dop = 3. Expected result is five pipelines with dependency: |
| * |
| * <p>The pipeline0 is: ExchangeOperator - SeriesAggregationScanOperator1. |
| * |
| * <p>The pipeline1 is: ExchangeOperator - AggregationOperator1. |
| * |
| * <p>The pipeline2 is: ExchangeOperator - SeriesAggregationScanOperator2, which has dependency 1. |
| * |
| * <p>The pipeline3 is: ExchangeOperator - AggregationOperator2, which has dependency1. |
| * |
| * <p>The pipeline4 is: DeviceView - [ExchangeOperator, ExchangeOperator] |
| */ |
| @Test |
| public void testConsumeOneByOneChildrenPipelineBuilderDependency() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| typeProvider.setType("root.sg.d0.s1", TSDataType.INT64); |
| typeProvider.setType("root.sg.d1.s1", TSDataType.INT64); |
| typeProvider.setType("count(root.sg.d0.s1)", TSDataType.INT64); |
| typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64); |
| DeviceViewNode deviceViewNode = |
| new DeviceViewNode(new PlanNodeId("DeviceViewNode"), null, null, null); |
| for (int i = 0; i < 2; i++) { |
| PartialPath path = new MeasurementPath(String.format("root.sg.d%d.s1", i), TSDataType.INT64); |
| AggregationNode aggregationNode = |
| new AggregationNode( |
| new PlanNodeId(String.format("AggregationOperator%d", i)), |
| Collections.singletonList( |
| new AggregationDescriptor( |
| TAggregationType.COUNT.name().toLowerCase(), |
| AggregationStep.FINAL, |
| Collections.singletonList(new TimeSeriesOperand(path)))), |
| null, |
| Ordering.ASC); |
| SeriesAggregationScanNode seriesAggregationScanNode = |
| new SeriesAggregationScanNode( |
| new PlanNodeId(String.format("seriesAggregationScanNode%d", i)), |
| (MeasurementPath) path, |
| Collections.singletonList( |
| new AggregationDescriptor( |
| TAggregationType.COUNT.name().toLowerCase(), |
| AggregationStep.PARTIAL, |
| Collections.singletonList(new TimeSeriesOperand(path))))); |
| ExchangeNode exchangeNode = |
| new ExchangeNode(new PlanNodeId(String.format("ExchangeNode%d", i))); |
| exchangeNode.setUpstream( |
| new TEndPoint("127.0.0.1", 6667), |
| new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"), |
| new PlanNodeId("test")); |
| aggregationNode.addChild(seriesAggregationScanNode); |
| aggregationNode.addChild(exchangeNode); |
| deviceViewNode.addChild(aggregationNode); |
| } |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(3); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context); |
| // The number of pipeline is 4, since parent pipeline hasn't joined |
| assertEquals(4, context.getPipelineNumber()); |
| |
| assertEquals(2, childrenOperator.size()); |
| for (int i = 0; i < 2; i++) { |
| assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); |
| } |
| |
| // Validate the first pipeline |
| assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); |
| |
| // Validate the second pipeline |
| assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex()); |
| |
| // Validate the third pipeline |
| assertEquals(1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex()); |
| |
| // Validate the forth pipeline |
| assertEquals(1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex()); |
| } |
| |
| @Test |
| public void testGetChildNumInEachPipeline() { |
| List<PlanNode> allChildren = new ArrayList<>(); |
| allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode1"))); |
| allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null)); |
| allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2"))); |
| allChildren.add(new SeriesScanNode(new PlanNodeId("localNode2"), null)); |
| |
| int[] childNumInEachPipeline = |
| operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 2); |
| assertEquals(2, childNumInEachPipeline.length); |
| assertEquals(2, childNumInEachPipeline[0]); |
| assertEquals(1, childNumInEachPipeline[1]); |
| |
| allChildren.add(new SeriesScanNode(new PlanNodeId("localNode3"), null)); |
| allChildren.add(new SeriesScanNode(new PlanNodeId("localNode4"), null)); |
| allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode3"))); |
| allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode4"))); |
| allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null)); |
| allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode5"))); |
| childNumInEachPipeline = operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 5, 3); |
| assertEquals(3, childNumInEachPipeline.length); |
| assertEquals(2, childNumInEachPipeline[0]); |
| assertEquals(2, childNumInEachPipeline[1]); |
| assertEquals(5, childNumInEachPipeline[2]); |
| } |
| |
| /** |
| * The operator structure is [LeftOuterTimeJoin - [SeriesScan0,SeriesScan1]]. |
| * |
| * <p>The next three tests, I will test this LeftOuterTimeJoinOperator with different dop. |
| * |
| * <p>The first test will test dop = 1. Expected result is that no child pipelines will be |
| * divided. |
| */ |
| @Test |
| public void testLeftOuterTimeJoinPipelineBuilder1() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| LeftOuterTimeJoinNode leftOuterTimeJoinNode = initLeftOuterTimeJoinNode(typeProvider); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(1); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( |
| leftOuterTimeJoinNode, context); |
| assertEquals(0, context.getPipelineNumber()); |
| assertEquals(2, childrenOperator.size()); |
| assertEquals(2, leftOuterTimeJoinNode.getChildren().size()); |
| |
| assertEquals(SeriesScanOperator.class, childrenOperator.get(0).getClass()); |
| assertEquals(SeriesScanNode.class, leftOuterTimeJoinNode.getLeftChild().getClass()); |
| assertEquals( |
| "root.sg.d0.s1", leftOuterTimeJoinNode.getLeftChild().getOutputColumnNames().get(0)); |
| |
| assertEquals(SeriesScanOperator.class, childrenOperator.get(1).getClass()); |
| assertEquals(SeriesScanNode.class, leftOuterTimeJoinNode.getRightChild().getClass()); |
| assertEquals( |
| "root.sg.d1.s1", leftOuterTimeJoinNode.getRightChild().getOutputColumnNames().get(0)); |
| |
| // Validate the number exchange operator |
| assertEquals(0, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * This test will test dop = 2. Expected result is two pipelines: |
| * |
| * <p>The first is: LeftOuterTimeJoin1 - [SeriesScan0, ExchangeOperator]; |
| * |
| * <p>The second is: ExchangeOperator - SeriesScan1. |
| */ |
| @Test |
| public void testLeftOuterTimeJoinPipelineBuilder2() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| LeftOuterTimeJoinNode leftOuterTimeJoinNode = initLeftOuterTimeJoinNode(typeProvider); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(2); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( |
| leftOuterTimeJoinNode, context); |
| assertEquals(1, context.getPipelineNumber()); |
| assertEquals(2, childrenOperator.size()); |
| |
| // Validate the first pipeline |
| assertEquals(SeriesScanOperator.class, childrenOperator.get(0).getClass()); |
| assertEquals( |
| "root.sg.d0.s1", leftOuterTimeJoinNode.getLeftChild().getOutputColumnNames().get(0)); |
| |
| // Validate the second pipeline |
| ExchangeOperator exchangeOperator = (ExchangeOperator) childrenOperator.get(1); |
| assertEquals("SeriesScanNode1", exchangeOperator.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); |
| |
| // Validate the number exchange operator |
| assertEquals(1, context.getExchangeSumNum()); |
| } |
| |
| /** |
| * This test will test dop = 3. Expected result is two pipelines: |
| * |
| * <p>The first is: LeftOuterTimeJoin1 - [ExchangeOperator1, ExchangeOperator2]; |
| * |
| * <p>The second is: ExchangeOperator1 - SeriesScan0. |
| * |
| * <p>The third is: ExchangeOperator2 - SeriesScan1. |
| */ |
| @Test |
| public void testLeftOuterTimeJoinPipelineBuilder3() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| LeftOuterTimeJoinNode leftOuterTimeJoinNode = initLeftOuterTimeJoinNode(typeProvider); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(3); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( |
| leftOuterTimeJoinNode, context); |
| assertEquals(2, context.getPipelineNumber()); |
| assertEquals(2, childrenOperator.size()); |
| |
| // Validate the first pipeline |
| ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); |
| assertEquals("SeriesScanNode0", exchangeOperator1.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); |
| |
| // Validate the second pipeline |
| ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); |
| assertEquals("SeriesScanNode1", exchangeOperator2.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex()); |
| |
| // Validate the number exchange operator |
| assertEquals(2, context.getExchangeSumNum()); |
| } |
| |
| /** This test will test dop > 3. Expected result is same as dop = 3. */ |
| @Test |
| public void testLeftOuterTimeJoinPipelineBuilder4() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| LeftOuterTimeJoinNode leftOuterTimeJoinNode = initLeftOuterTimeJoinNode(typeProvider); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(4); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( |
| leftOuterTimeJoinNode, context); |
| assertEquals(2, context.getPipelineNumber()); |
| assertEquals(2, childrenOperator.size()); |
| |
| // Validate the first pipeline |
| ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); |
| assertEquals("SeriesScanNode0", exchangeOperator1.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); |
| |
| // Validate the second pipeline |
| ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); |
| assertEquals("SeriesScanNode1", exchangeOperator2.getSourceId().getId()); |
| assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex()); |
| |
| // Validate the number exchange operator |
| assertEquals(2, context.getExchangeSumNum()); |
| } |
| |
| @Test |
| public void testConsumeAllChildrenPipelineBuilderWithExchange() throws IllegalPathException { |
| TypeProvider typeProvider = new TypeProvider(); |
| FullOuterTimeJoinNode fullOuterTimeJoinNode = |
| initFullOuterTimeJoinNodeWithExchangeNode(typeProvider, 3, 3); |
| LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); |
| context.setDegreeOfParallelism(1); |
| |
| List<Operator> childrenOperator = |
| operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( |
| fullOuterTimeJoinNode, context); |
| assertEquals(0, context.getPipelineNumber()); |
| assertEquals(6, childrenOperator.size()); |
| assertEquals(6, fullOuterTimeJoinNode.getChildren().size()); |
| for (int i = 0; i < 3; i++) { |
| assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); |
| assertEquals(ExchangeNode.class, fullOuterTimeJoinNode.getChildren().get(i).getClass()); |
| } |
| |
| for (int i = 3; i < 6; i++) { |
| assertEquals(SeriesScanOperator.class, childrenOperator.get(i).getClass()); |
| assertEquals(SeriesScanNode.class, fullOuterTimeJoinNode.getChildren().get(i).getClass()); |
| assertEquals( |
| String.format("root.sg.d%d.s1", i - 3), |
| fullOuterTimeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); |
| } |
| |
| // Validate the number exchange operator |
| assertEquals(3, context.getExchangeSumNum()); |
| } |
| |
| private LocalExecutionPlanContext createLocalExecutionPlanContext(TypeProvider typeProvider) { |
| ExecutorService instanceNotificationExecutor = |
| IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); |
| |
| QueryId queryId = new QueryId("stub_query"); |
| FragmentInstanceId instanceId = |
| new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); |
| FragmentInstanceStateMachine stateMachine = |
| new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); |
| DataRegion dataRegion = Mockito.mock(DataRegion.class); |
| FragmentInstanceContext fragmentInstanceContext = |
| createFragmentInstanceContext(instanceId, stateMachine); |
| fragmentInstanceContext.setDataRegion(dataRegion); |
| |
| return new LocalExecutionPlanContext( |
| typeProvider, fragmentInstanceContext, new DataNodeQueryContext(1)); |
| } |
| |
| /** |
| * This method will init a timeJoinNode with @childNum seriesScanNode as children. |
| * |
| * @param childNum the number of children |
| * @return a timeJoinNode with @childNum seriesScanNode as children |
| */ |
| private FullOuterTimeJoinNode initFullOuterTimeJoinNode(TypeProvider typeProvider, int childNum) |
| throws IllegalPathException { |
| FullOuterTimeJoinNode fullOuterTimeJoinNode = |
| new FullOuterTimeJoinNode(new PlanNodeId("TimeJoinNode"), Ordering.ASC); |
| for (int i = 0; i < childNum; i++) { |
| SeriesScanNode seriesScanNode = |
| new SeriesScanNode( |
| new PlanNodeId(String.format("SeriesScanNode%d", i)), |
| new MeasurementPath(String.format("root.sg.d%d.s1", i), TSDataType.INT32)); |
| typeProvider.setType(seriesScanNode.getSeriesPath().toString(), TSDataType.INT32); |
| fullOuterTimeJoinNode.addChild(seriesScanNode); |
| } |
| return fullOuterTimeJoinNode; |
| } |
| |
| private FullOuterTimeJoinNode initFullOuterTimeJoinNodeWithExchangeNode( |
| TypeProvider typeProvider, int exchangeNum, int scanNum) throws IllegalPathException { |
| FullOuterTimeJoinNode fullOuterTimeJoinNode = |
| new FullOuterTimeJoinNode(new PlanNodeId("TimeJoinNode"), Ordering.ASC); |
| for (int i = 0; i < exchangeNum; i++) { |
| ExchangeNode exchangeNode = |
| new ExchangeNode(new PlanNodeId(String.format("FullOuterTimeJoinWithExchangeNode%d", i))); |
| exchangeNode.setUpstream( |
| new TEndPoint("127.0.0.2", 6667), |
| new FragmentInstanceId(new PlanFragmentId("q", i), "ds"), |
| new PlanNodeId("test")); |
| fullOuterTimeJoinNode.addChild(exchangeNode); |
| } |
| for (int i = 0; i < scanNum; i++) { |
| SeriesScanNode seriesScanNode = |
| new SeriesScanNode( |
| new PlanNodeId(String.format("SeriesScanNode%d", i)), |
| new MeasurementPath(String.format("root.sg.d%d.s1", i), TSDataType.INT32)); |
| typeProvider.setType(seriesScanNode.getSeriesPath().toString(), TSDataType.INT32); |
| fullOuterTimeJoinNode.addChild(seriesScanNode); |
| } |
| return fullOuterTimeJoinNode; |
| } |
| |
| private LeftOuterTimeJoinNode initLeftOuterTimeJoinNode(TypeProvider typeProvider) |
| throws IllegalPathException { |
| LeftOuterTimeJoinNode leftOuterTimeJoinNode = |
| new LeftOuterTimeJoinNode(new PlanNodeId("TimeJoinNode"), Ordering.ASC); |
| for (int i = 0; i < 2; i++) { |
| SeriesScanNode seriesScanNode = |
| new SeriesScanNode( |
| new PlanNodeId(String.format("SeriesScanNode%d", i)), |
| new MeasurementPath(String.format("root.sg.d%d.s1", i), TSDataType.INT32)); |
| typeProvider.setType(seriesScanNode.getSeriesPath().toString(), TSDataType.INT32); |
| leftOuterTimeJoinNode.addChild(seriesScanNode); |
| } |
| return leftOuterTimeJoinNode; |
| } |
| |
| /** |
| * This method will init a DeviceViewNode with @childNum alignedSeriesScanNode as children. |
| * |
| * @param childNum the number of children |
| * @return a DeviceViewNode with @childNum alignedSeriesScanNode as children |
| */ |
| private DeviceViewNode initDeviceViewNode(TypeProvider typeProvider, int childNum) |
| throws IllegalPathException { |
| DeviceViewNode deviceViewNode = |
| new DeviceViewNode(new PlanNodeId("DeviceViewNode"), null, null, null); |
| for (int i = 0; i < childNum; i++) { |
| AlignedSeriesScanNode alignedSeriesScanNode = |
| new AlignedSeriesScanNode( |
| new PlanNodeId(String.format("AlignedSeriesScanNode%d", i)), |
| new AlignedPath(String.format("root.sg.d%d", i), "s1")); |
| deviceViewNode.addChild(alignedSeriesScanNode); |
| } |
| return deviceViewNode; |
| } |
| |
| private TopKNode initTopKNode(TypeProvider typeProvider, int childNum) |
| throws IllegalPathException { |
| TopKNode topKNode = |
| new TopKNode( |
| new PlanNodeId("TopKNode"), |
| 10, |
| new OrderByParameter( |
| Arrays.asList( |
| new SortItem(OrderByKey.TIME, Ordering.ASC), |
| new SortItem(DEVICE, Ordering.ASC))), |
| Arrays.asList("Time", "Device", "s1")); |
| for (int i = 0; i < childNum; i++) { |
| SingleDeviceViewNode singleDeviceViewNode = |
| new SingleDeviceViewNode( |
| new PlanNodeId(String.format("SingleDeviceViewNode%d", i)), |
| Arrays.asList("Time", "Device", "s1"), |
| "root.sg.d" + i, |
| Arrays.asList(0, 1, 2)); |
| singleDeviceViewNode.setCacheOutputColumnNames(true); |
| SeriesScanNode seriesScanNode = |
| new SeriesScanNode( |
| new PlanNodeId(String.format("SeriesScanNode%d", i)), |
| new MeasurementPath(String.format("root.sg.d%d.s1", i), TSDataType.INT32)); |
| typeProvider.setType(seriesScanNode.getSeriesPath().toString(), TSDataType.INT32); |
| singleDeviceViewNode.addChild(seriesScanNode); |
| typeProvider.setType("Time", TSDataType.INT64); |
| typeProvider.setType("Device", TSDataType.TEXT); |
| typeProvider.setType("s1", TSDataType.INT32); |
| topKNode.addChild(singleDeviceViewNode); |
| } |
| return topKNode; |
| } |
| } |