blob: a6d043f5ad25fda534da1d238cb916f3e9dfd6f5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.planner;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.NodeRef;
import org.apache.iotdb.db.queryengine.execution.aggregation.Accumulator;
import org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator;
import org.apache.iotdb.db.queryengine.execution.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext;
import org.apache.iotdb.db.queryengine.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkChannel;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.LocalSinkChannel;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil;
import org.apache.iotdb.db.queryengine.execution.operator.ExplainAnalyzeOperator;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.queryengine.execution.operator.process.AggregationMergeSortOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.ColumnInjectOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewIntoOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.FillOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.FilterAndProjectOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.IntoOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.LinearFillOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.MergeSortOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.ProjectOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.RawDataAggregationOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.SlidingWindowAggregationOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.TagAggregationOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.TopKOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.TransformOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.IFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.IFillFilter;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.ILinearFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.BinaryConstantFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.BooleanConstantFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.DoubleConstantFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.FloatConstantFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.IntConstantFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.LongConstantFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.filter.FixedIntervalFillFilter;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.filter.MonthIntervalMSFillFilter;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.filter.MonthIntervalNSFillFilter;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.filter.MonthIntervalUSFillFilter;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.identity.IdentityFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.identity.IdentityLinearFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.linear.DoubleLinearFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.linear.FloatLinearFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.linear.IntLinearFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.linear.LongLinearFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.previous.BinaryPreviousFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.previous.BooleanPreviousFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.previous.DoublePreviousFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.previous.FloatPreviousFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.previous.IntPreviousFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.previous.LongPreviousFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.HorizontallyConcatOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.InnerTimeJoinOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuterTimeJoinOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.ColumnMerger;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.DescTimeComparator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MultiColumnMerger;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.AlignedUpdateLastCacheOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.AlignedUpdateViewPathLastCacheOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryCollectOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryMergeOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQuerySortOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryTransformOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.UpdateLastCacheOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.UpdateViewPathLastCacheOperator;
import org.apache.iotdb.db.queryengine.execution.operator.schema.CountGroupByLevelMergeOperator;
import org.apache.iotdb.db.queryengine.execution.operator.schema.CountGroupByLevelScanOperator;
import org.apache.iotdb.db.queryengine.execution.operator.schema.CountMergeOperator;
import org.apache.iotdb.db.queryengine.execution.operator.schema.NodeManageMemoryMergeOperator;
import org.apache.iotdb.db.queryengine.execution.operator.schema.NodePathsConvertOperator;
import org.apache.iotdb.db.queryengine.execution.operator.schema.NodePathsCountOperator;
import org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaCountOperator;
import org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaFetchMergeOperator;
import org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaFetchScanOperator;
import org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaQueryMergeOperator;
import org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaQueryOrderByHeatOperator;
import org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaQueryScanOperator;
import org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSourceFactory;
import org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
import org.apache.iotdb.db.queryengine.execution.operator.sink.ShuffleHelperOperator;
import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesAggregationScanOperator;
import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.queryengine.execution.operator.source.ShowQueriesOperator;
import org.apache.iotdb.db.queryengine.execution.operator.window.ConditionWindowParameter;
import org.apache.iotdb.db.queryengine.execution.operator.window.CountWindowParameter;
import org.apache.iotdb.db.queryengine.execution.operator.window.SessionWindowParameter;
import org.apache.iotdb.db.queryengine.execution.operator.window.TimeWindowParameter;
import org.apache.iotdb.db.queryengine.execution.operator.window.VariationWindowParameter;
import org.apache.iotdb.db.queryengine.execution.operator.window.WindowParameter;
import org.apache.iotdb.db.queryengine.execution.operator.window.WindowType;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer;
import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
import org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.queryengine.plan.expression.visitor.ColumnTransformerVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.ExplainAnalyzeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.DevicesCountNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.DevicesSchemaScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.LevelTimeSeriesCountNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.LogicalViewSchemaScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.NodeManagementMemoryMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.NodePathsConvertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.NodePathsCountNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.NodePathsSchemaScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.PathsUsingTemplateScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewIntoNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.IntoNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TwoChildProcessNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByConditionParameter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByCountParameter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByParameter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupBySessionParameter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByVariationParameter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OutputColumn;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
import org.apache.iotdb.db.queryengine.plan.statement.literal.Literal;
import org.apache.iotdb.db.queryengine.statistics.StatisticsManager;
import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.column.leaf.LeafColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFContext;
import org.apache.iotdb.db.utils.columngenerator.ColumnGenerator;
import org.apache.iotdb.db.utils.columngenerator.ColumnGeneratorType;
import org.apache.iotdb.db.utils.columngenerator.SlidingTimeColumnGenerator;
import org.apache.iotdb.db.utils.columngenerator.parameter.ColumnGeneratorParameter;
import org.apache.iotdb.db.utils.columngenerator.parameter.SlidingTimeColumnGeneratorParameter;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.TimeFilterOperators.TimeGt;
import org.apache.iotdb.tsfile.read.filter.operator.TimeFilterOperators.TimeGtEq;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TimeDuration;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.calculateMaxAggregationResultSize;
import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.getOutputColumnSizePerLine;
import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparator;
import static org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter;
import static org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
import static org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor.getAggregationTypeByFuncName;
import static org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.updateFilterUsingTTL;
import static org.apache.iotdb.db.utils.TimestampPrecisionUtils.TIMESTAMP_PRECISION;
/** This Visitor is responsible for transferring PlanNode Tree to Operator Tree. */
public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionPlanContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(OperatorTreeGenerator.class);
private static final MPPDataExchangeManager MPP_DATA_EXCHANGE_MANAGER =
MPPDataExchangeService.getInstance().getMPPDataExchangeManager();
private static final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE =
DataNodeSchemaCache.getInstance();
private static final TimeComparator ASC_TIME_COMPARATOR = new AscTimeComparator();
private static final TimeComparator DESC_TIME_COMPARATOR = new DescTimeComparator();
private static final IdentityFill IDENTITY_FILL = new IdentityFill();
private static final IdentityLinearFill IDENTITY_LINEAR_FILL = new IdentityLinearFill();
private static final Comparator<Binary> ASC_BINARY_COMPARATOR = Comparator.naturalOrder();
private static final Comparator<Binary> DESC_BINARY_COMPARATOR = Comparator.reverseOrder();
private static final String UNKNOWN_DATATYPE = "Unknown data type: ";
@Override
public Operator visitPlan(PlanNode node, LocalExecutionPlanContext context) {
throw new UnsupportedOperationException("should call the concrete visitXX() method");
}
@Override
public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext context) {
PartialPath seriesPath = node.getSeriesPath();
SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(node, context);
scanOptionsBuilder.withAllSensors(
context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()));
scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
Expression pushDownPredicate = node.getPushDownPredicate();
boolean predicateCanPushIntoScan = canPushIntoScan(pushDownPredicate);
if (pushDownPredicate != null && predicateCanPushIntoScan) {
scanOptionsBuilder.withPushDownFilter(
convertPredicateToFilter(
pushDownPredicate,
Collections.singletonList(node.getSeriesPath().getMeasurement()),
context.getTypeProvider().getTemplatedInfo() != null,
context.getTypeProvider()));
}
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SeriesScanOperator.class.getSimpleName());
operatorContext.recordSpecifiedInfo("SeriesPath", seriesPath.getFullPath());
SeriesScanOperator seriesScanOperator =
new SeriesScanOperator(
operatorContext,
node.getPlanNodeId(),
seriesPath,
node.getScanOrder(),
scanOptionsBuilder.build());
((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator);
((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
context.getDriverContext().setInputDriver(true);
if (!predicateCanPushIntoScan) {
checkState(!context.isBuildPlanUseTemplate(), "Push down predicate is not supported yet");
return constructFilterOperator(
pushDownPredicate,
seriesScanOperator,
Collections.singletonList(ExpressionFactory.timeSeries(node.getSeriesPath()))
.toArray(new Expression[0]),
Collections.singletonList(node.getSeriesPath().getSeriesType()),
makeLayout(Collections.singletonList(node)),
false,
node.getPlanNodeId(),
node.getScanOrder(),
context);
}
return seriesScanOperator;
}
@Override
public Operator visitAlignedSeriesScan(
AlignedSeriesScanNode node, LocalExecutionPlanContext context) {
AlignedPath seriesPath = node.getAlignedPath();
SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(node, context);
scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
scanOptionsBuilder.withAllSensors(
new HashSet<>(
context.isBuildPlanUseTemplate()
? context.getTemplatedInfo().getMeasurementList()
: seriesPath.getMeasurementList()));
Expression pushDownPredicate = node.getPushDownPredicate();
boolean predicateCanPushIntoScan = canPushIntoScan(pushDownPredicate);
if (pushDownPredicate != null && predicateCanPushIntoScan) {
scanOptionsBuilder.withPushDownFilter(
convertPredicateToFilter(
pushDownPredicate,
node.getAlignedPath().getMeasurementList(),
context.getTypeProvider().getTemplatedInfo() != null,
context.getTypeProvider()));
}
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AlignedSeriesScanOperator.class.getSimpleName());
int maxTsBlockLineNum = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
if (context.getTypeProvider().getTemplatedInfo() != null) {
maxTsBlockLineNum =
(int)
Math.min(
context.getTypeProvider().getTemplatedInfo().getLimitValue(), maxTsBlockLineNum);
}
AlignedSeriesScanOperator seriesScanOperator =
new AlignedSeriesScanOperator(
operatorContext,
node.getPlanNodeId(),
seriesPath,
node.getScanOrder(),
scanOptionsBuilder.build(),
node.isQueryAllSensors(),
context.getTypeProvider().getTemplatedInfo() != null
? context.getTypeProvider().getTemplatedInfo().getDataTypes()
: null,
maxTsBlockLineNum);
((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator);
((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
context.getDriverContext().setInputDriver(true);
if (!predicateCanPushIntoScan) {
if (context.isBuildPlanUseTemplate()) {
TemplatedInfo templatedInfo = context.getTemplatedInfo();
return constructFilterOperator(
pushDownPredicate,
seriesScanOperator,
templatedInfo.getProjectExpressions(),
templatedInfo.getDataTypes(),
templatedInfo.getLayoutMap(),
templatedInfo.isKeepNull(),
node.getPlanNodeId(),
templatedInfo.getScanOrder(),
context);
}
AlignedPath alignedPath = node.getAlignedPath();
List<Expression> expressions = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
for (int i = 0; i < alignedPath.getMeasurementList().size(); i++) {
expressions.add(ExpressionFactory.timeSeries(alignedPath.getSubMeasurementPath(i)));
dataTypes.add(alignedPath.getSubMeasurementDataType(i));
}
return constructFilterOperator(
pushDownPredicate,
seriesScanOperator,
expressions.toArray(new Expression[0]),
dataTypes,
makeLayout(Collections.singletonList(node)),
false,
node.getPlanNodeId(),
node.getScanOrder(),
context);
}
return seriesScanOperator;
}
private boolean canPushIntoScan(Expression pushDownPredicate) {
return pushDownPredicate == null || PredicateUtils.predicateCanPushIntoScan(pushDownPredicate);
}
@Override
public Operator visitProject(ProjectNode node, LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
ProjectOperator.class.getSimpleName());
List<String> inputColumnNames;
List<String> outputColumnNames = node.getOutputColumnNames();
if (outputColumnNames == null) {
outputColumnNames = context.getTypeProvider().getTemplatedInfo().getSelectMeasurements();
// skip device column
outputColumnNames = outputColumnNames.subList(1, outputColumnNames.size());
inputColumnNames = context.getTypeProvider().getTemplatedInfo().getMeasurementList();
} else {
inputColumnNames = node.getChild().getOutputColumnNames();
}
if (inputColumnNames.equals(outputColumnNames)) {
// no need to project
return child;
}
List<Integer> remainingColumnIndexList = new ArrayList<>();
for (String outputColumnName : outputColumnNames) {
int index = inputColumnNames.indexOf(outputColumnName);
if (index < 0) {
throw new IllegalStateException(
String.format("Cannot find column [%s] in child's output", outputColumnName));
}
remainingColumnIndexList.add(index);
}
return new ProjectOperator(operatorContext, child, remainingColumnIndexList);
}
@Override
public Operator visitSeriesAggregationScan(
SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
PartialPath seriesPath = node.getSeriesPath();
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
List<Aggregator> aggregators = new ArrayList<>();
aggregationDescriptors.forEach(
o ->
aggregators.add(
new Aggregator(
AccumulatorFactory.createAccumulator(
o.getAggregationFuncName(),
o.getAggregationType(),
Collections.singletonList(node.getSeriesPath().getSeriesType()),
o.getInputExpressions(),
o.getInputAttributes(),
ascending,
o.getStep().isInputRaw()),
o.getStep())));
GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, ascending, true);
long maxReturnSize =
AggregationUtil.calculateMaxAggregationResultSize(
node.getAggregationDescriptorList(), timeRangeIterator, context.getTypeProvider());
SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(node, context);
scanOptionsBuilder.withAllSensors(
context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()));
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SeriesAggregationScanOperator.class.getSimpleName());
SeriesAggregationScanOperator aggregateScanOperator =
new SeriesAggregationScanOperator(
node.getPlanNodeId(),
seriesPath,
node.getScanOrder(),
node.isOutputEndTime(),
scanOptionsBuilder.build(),
operatorContext,
aggregators,
timeRangeIterator,
node.getGroupByTimeParameter(),
maxReturnSize);
((DataDriverContext) context.getDriverContext()).addSourceOperator(aggregateScanOperator);
((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
context.getDriverContext().setInputDriver(true);
return aggregateScanOperator;
}
@Override
public Operator visitAlignedSeriesAggregationScan(
AlignedSeriesAggregationScanNode node, LocalExecutionPlanContext context) {
AlignedPath seriesPath = node.getAlignedPath();
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
checkArgument(
descriptor.getInputExpressions().size() == 1,
"descriptor's input expression size is not 1");
Expression expression = descriptor.getInputExpressions().get(0);
if (expression instanceof TimeSeriesOperand) {
String inputSeries =
((TimeSeriesOperand) (descriptor.getInputExpressions().get(0)))
.getPath()
.getMeasurement();
int seriesIndex = seriesPath.getMeasurementList().indexOf(inputSeries);
TSDataType seriesDataType =
seriesPath.getMeasurementSchema().getSubMeasurementsTSDataTypeList().get(seriesIndex);
aggregators.add(
new Aggregator(
AccumulatorFactory.createAccumulator(
descriptor.getAggregationFuncName(),
descriptor.getAggregationType(),
Collections.singletonList(seriesDataType),
descriptor.getInputExpressions(),
descriptor.getInputAttributes(),
ascending,
descriptor.getStep().isInputRaw()),
descriptor.getStep(),
Collections.singletonList(
new InputLocation[] {new InputLocation(0, seriesIndex)})));
} else if (expression instanceof TimestampOperand) {
aggregators.add(
new Aggregator(
AccumulatorFactory.createAccumulator(
descriptor.getAggregationFuncName(),
descriptor.getAggregationType(),
Collections.singletonList(TSDataType.INT64),
descriptor.getInputExpressions(),
descriptor.getInputAttributes(),
ascending,
descriptor.getStep().isInputRaw()),
descriptor.getStep(),
Collections.singletonList(new InputLocation[] {new InputLocation(0, -1)})));
} else {
throw new IllegalArgumentException(
"descriptor's input expression must be TimeSeriesOperand/TimestampOperand, current is "
+ expression);
}
}
GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, ascending, true);
long maxReturnSize =
AggregationUtil.calculateMaxAggregationResultSize(
node.getAggregationDescriptorList(), timeRangeIterator, context.getTypeProvider());
SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(node, context);
scanOptionsBuilder.withAllSensors(new HashSet<>(seriesPath.getMeasurementList()));
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AlignedSeriesAggregationScanOperator.class.getSimpleName());
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
node.getPlanNodeId(),
seriesPath,
node.getScanOrder(),
node.isOutputEndTime(),
scanOptionsBuilder.build(),
operatorContext,
aggregators,
timeRangeIterator,
groupByTimeParameter,
maxReturnSize);
((DataDriverContext) context.getDriverContext())
.addSourceOperator(seriesAggregationScanOperator);
((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
context.getDriverContext().setInputDriver(true);
return seriesAggregationScanOperator;
}
private SeriesScanOptions.Builder getSeriesScanOptionsBuilder(
SeriesSourceNode node, LocalExecutionPlanContext context) {
SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
Filter globalTimeFilter = context.getGlobalTimeFilter();
if (globalTimeFilter != null) {
// time filter may be stateful, so we need to copy it
scanOptionsBuilder.withGlobalTimeFilter(globalTimeFilter.copy());
}
return scanOptionsBuilder;
}
@Override
public Operator visitSchemaQueryOrderByHeat(
SchemaQueryOrderByHeatNode node, LocalExecutionPlanContext context) {
List<Operator> children =
node.getChildren().stream().map(n -> n.accept(this, context)).collect(Collectors.toList());
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaQueryOrderByHeatOperator.class.getSimpleName());
return new SchemaQueryOrderByHeatOperator(operatorContext, children);
}
@Override
public Operator visitSchemaQueryScan(
SchemaQueryScanNode node, LocalExecutionPlanContext context) {
if (node instanceof TimeSeriesSchemaScanNode) {
return visitTimeSeriesSchemaScan((TimeSeriesSchemaScanNode) node, context);
} else if (node instanceof DevicesSchemaScanNode) {
return visitDevicesSchemaScan((DevicesSchemaScanNode) node, context);
} else if (node instanceof DevicesCountNode) {
return visitDevicesCount((DevicesCountNode) node, context);
} else if (node instanceof TimeSeriesCountNode) {
return visitTimeSeriesCount((TimeSeriesCountNode) node, context);
} else if (node instanceof LevelTimeSeriesCountNode) {
return visitLevelTimeSeriesCount((LevelTimeSeriesCountNode) node, context);
} else if (node instanceof NodePathsSchemaScanNode) {
return visitNodePathsSchemaScan((NodePathsSchemaScanNode) node, context);
} else if (node instanceof PathsUsingTemplateScanNode) {
return visitPathsUsingTemplateScan((PathsUsingTemplateScanNode) node, context);
} else if (node instanceof LogicalViewSchemaScanNode) {
return visitLogicalViewSchemaScan((LogicalViewSchemaScanNode) node, context);
}
return visitPlan(node, context);
}
@Override
public Operator visitTimeSeriesSchemaScan(
TimeSeriesSchemaScanNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaQueryScanOperator.class.getSimpleName());
return new SchemaQueryScanOperator<>(
node.getPlanNodeId(),
operatorContext,
SchemaSourceFactory.getTimeSeriesSchemaScanSource(
node.getPath(),
node.isPrefixPath(),
node.getLimit(),
node.getOffset(),
node.getSchemaFilter(),
node.getTemplateMap(),
node.getScope()));
}
@Override
public Operator visitDevicesSchemaScan(
DevicesSchemaScanNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaQueryScanOperator.class.getSimpleName());
return new SchemaQueryScanOperator<>(
node.getPlanNodeId(),
operatorContext,
SchemaSourceFactory.getDeviceSchemaSource(
node.getPath(),
node.isPrefixPath(),
node.getLimit(),
node.getOffset(),
node.isHasSgCol(),
node.getSchemaFilter(),
node.getScope()));
}
@Override
public Operator visitSchemaQueryMerge(
SchemaQueryMergeNode node, LocalExecutionPlanContext context) {
List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaQueryMergeOperator.class.getSimpleName());
return new SchemaQueryMergeOperator(operatorContext, children);
}
@Override
public Operator visitCountMerge(CountSchemaMergeNode node, LocalExecutionPlanContext context) {
List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
CountMergeOperator.class.getSimpleName());
if (node.getChildren().get(0) instanceof LevelTimeSeriesCountNode) {
return new CountGroupByLevelMergeOperator(operatorContext, children);
} else {
return new CountMergeOperator(operatorContext, children);
}
}
@Override
public Operator visitDevicesCount(DevicesCountNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaCountOperator.class.getSimpleName());
return new SchemaCountOperator<>(
node.getPlanNodeId(),
operatorContext,
SchemaSourceFactory.getDeviceSchemaSource(
node.getPath(), node.isPrefixPath(), node.getScope()));
}
@Override
public Operator visitTimeSeriesCount(
TimeSeriesCountNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaCountOperator.class.getSimpleName());
return new SchemaCountOperator<>(
node.getPlanNodeId(),
operatorContext,
SchemaSourceFactory.getTimeSeriesSchemaCountSource(
node.getPath(),
node.isPrefixPath(),
node.getSchemaFilter(),
node.getTemplateMap(),
node.getScope()));
}
@Override
public Operator visitLevelTimeSeriesCount(
LevelTimeSeriesCountNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
CountGroupByLevelScanOperator.class.getSimpleName());
return new CountGroupByLevelScanOperator<>(
node.getPlanNodeId(),
operatorContext,
node.getLevel(),
SchemaSourceFactory.getTimeSeriesSchemaCountSource(
node.getPath(),
node.isPrefixPath(),
node.getSchemaFilter(),
node.getTemplateMap(),
node.getScope()));
}
@Override
public Operator visitNodePathsSchemaScan(
NodePathsSchemaScanNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaQueryScanOperator.class.getSimpleName());
return new SchemaQueryScanOperator<>(
node.getPlanNodeId(),
operatorContext,
SchemaSourceFactory.getNodeSchemaSource(
node.getPrefixPath(), node.getLevel(), node.getScope()));
}
@Override
public Operator visitNodeManagementMemoryMerge(
NodeManagementMemoryMergeNode node, LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
NodeManageMemoryMergeOperator.class.getSimpleName());
return new NodeManageMemoryMergeOperator(operatorContext, node.getData(), child);
}
@Override
public Operator visitNodePathConvert(
NodePathsConvertNode node, LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
NodePathsConvertOperator.class.getSimpleName());
return new NodePathsConvertOperator(operatorContext, child);
}
@Override
public Operator visitNodePathsCount(NodePathsCountNode node, LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
NodePathsCountOperator.class.getSimpleName());
return new NodePathsCountOperator(operatorContext, child);
}
@Override
public Operator visitSingleDeviceView(
SingleDeviceViewNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SingleDeviceViewOperator.class.getSimpleName());
Operator child = node.getChild().accept(this, context);
List<Integer> deviceColumnIndex = node.getDeviceToMeasurementIndexes();
List<TSDataType> outputColumnTypes =
node.isCacheOutputColumnNames()
? getOutputColumnTypes(node, context.getTypeProvider())
: context.getCachedDataTypes();
if (outputColumnTypes == null || outputColumnTypes.isEmpty()) {
throw new IllegalStateException("OutputColumTypes should not be null/empty");
}
return new SingleDeviceViewOperator(
operatorContext, node.getDevice(), child, deviceColumnIndex, outputColumnTypes);
}
@Override
public Operator visitDeviceView(DeviceViewNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
DeviceViewOperator.class.getSimpleName());
List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, context);
List<List<Integer>> deviceColumnIndex =
node.getDevices().stream()
.map(deviceName -> node.getDeviceToMeasurementIndexesMap().get(deviceName))
.collect(Collectors.toList());
List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider());
return new DeviceViewOperator(
operatorContext, node.getDevices(), children, deviceColumnIndex, outputColumnTypes);
}
@Override
public Operator visitMergeSort(MergeSortNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
MergeSortOperator.class.getSimpleName());
List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());
context.setCachedDataTypes(dataTypes);
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
List<SortItem> sortItemList = node.getMergeOrderParameter().getSortItemList();
List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemList.size());
genSortInformation(
node.getOutputColumnNames(),
dataTypes,
sortItemList,
sortItemIndexList,
sortItemDataTypeList);
return new MergeSortOperator(
operatorContext,
children,
dataTypes,
getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList));
}
@Override
public Operator visitAggregationMergeSort(
AggregationMergeSortNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
MergeSortOperator.class.getSimpleName());
List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
List<SortItem> sortItemList = node.getMergeOrderParameter().getSortItemList();
if (!sortItemList.get(0).getSortKey().equalsIgnoreCase("Device")) {
throw new IllegalArgumentException(
"Only order by device align by device support AggregationMergeSortNode.");
}
boolean timeAscending = true;
for (SortItem sortItem : sortItemList) {
if (TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(sortItem.getSortKey())
&& (sortItem.getOrdering() == Ordering.DESC)) {
timeAscending = false;
break;
}
}
List<Accumulator> accumulators = new ArrayList<>();
for (Expression expression : node.getSelectExpressions()) {
if (expression instanceof FunctionExpression) {
FunctionExpression functionExpression = (FunctionExpression) expression;
String aggregationName = functionExpression.getFunctionName();
Accumulator accumulator =
AccumulatorFactory.createAccumulator(
aggregationName,
getAggregationTypeByFuncName(aggregationName),
Collections.singletonList(
context.getTypeProvider().getType(functionExpression.getOutputSymbol())),
functionExpression.getExpressions(),
functionExpression.getFunctionAttributes(),
timeAscending,
false);
accumulators.add(accumulator);
}
}
List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemList.size());
genSortInformation(
node.getOutputColumnNames(),
dataTypes,
sortItemList,
sortItemIndexList,
sortItemDataTypeList);
return new AggregationMergeSortOperator(
operatorContext,
children,
dataTypes,
accumulators,
node.isHasGroupBy(),
getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList));
}
@Override
public Operator visitTopK(TopKNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
TopKOperator.class.getSimpleName());
List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());
context.setCachedDataTypes(dataTypes);
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
List<SortItem> sortItemList = node.getMergeOrderParameter().getSortItemList();
List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemList.size());
genSortInformation(
node.getOutputColumnNames(),
dataTypes,
sortItemList,
sortItemIndexList,
sortItemDataTypeList);
return new TopKOperator(
operatorContext,
children,
dataTypes,
getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList),
node.getTopValue(),
!sortItemList.isEmpty()
&& sortItemList.get(0).getSortKey().equalsIgnoreCase(OrderByKey.TIME)
&& sortItemList.stream()
.allMatch(
i ->
(i.getSortKey().equals(OrderByKey.TIME)
|| i.getSortKey().equals(OrderByKey.DEVICE))));
}
private void genSortInformation(
List<String> outputColumnNames,
List<TSDataType> dataTypes,
List<SortItem> sortItemList,
List<Integer> sortItemIndexList,
List<TSDataType> sortItemDataTypeList) {
sortItemList.forEach(
sortItem -> {
if (sortItem.getSortKey().equals(OrderByKey.TIME)) {
sortItemIndexList.add(-1);
sortItemDataTypeList.add(TSDataType.INT64);
} else {
for (int i = 0; i < outputColumnNames.size(); i++) {
if (sortItem.getSortKey().equalsIgnoreCase(outputColumnNames.get(i))) {
sortItemIndexList.add(i);
sortItemDataTypeList.add(dataTypes.get(i));
break;
}
// there is no related column in outputColumnNames
if (i == outputColumnNames.size() - 1) {
sortItemIndexList.add(-2);
sortItemDataTypeList.add(null);
}
}
}
});
}
@Override
public Operator visitFill(FillNode node, LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
return getFillOperator(node, context, child);
}
private ProcessOperator getFillOperator(
FillNode node, LocalExecutionPlanContext context, Operator child) {
FillDescriptor descriptor = node.getFillDescriptor();
List<TSDataType> inputDataTypes =
getOutputColumnTypes(node.getChild(), context.getTypeProvider());
int inputColumns = inputDataTypes.size();
FillPolicy fillPolicy = descriptor.getFillPolicy();
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
FillOperator.class.getSimpleName());
switch (fillPolicy) {
case VALUE:
Literal literal = descriptor.getFillValue();
return new FillOperator(
operatorContext, getConstantFill(inputColumns, inputDataTypes, literal), child);
case PREVIOUS:
return new FillOperator(
operatorContext,
getPreviousFill(
inputColumns,
inputDataTypes,
descriptor.getTimeDurationThreshold(),
context.getZoneId()),
child);
case LINEAR:
return new LinearFillOperator(
operatorContext, getLinearFill(inputColumns, inputDataTypes), child);
default:
throw new IllegalArgumentException("Unknown fill policy: " + fillPolicy);
}
}
private IFill[] getConstantFill(
int inputColumns, List<TSDataType> inputDataTypes, Literal literal) {
IFill[] constantFill = new IFill[inputColumns];
for (int i = 0; i < inputColumns; i++) {
if (!literal.isDataTypeConsistency(inputDataTypes.get(i))) {
constantFill[i] = IDENTITY_FILL;
continue;
}
switch (inputDataTypes.get(i)) {
case BOOLEAN:
constantFill[i] = new BooleanConstantFill(literal.getBoolean());
break;
case TEXT:
constantFill[i] = new BinaryConstantFill(literal.getBinary());
break;
case INT32:
constantFill[i] = new IntConstantFill(literal.getInt());
break;
case INT64:
constantFill[i] = new LongConstantFill(literal.getLong());
break;
case FLOAT:
constantFill[i] = new FloatConstantFill(literal.getFloat());
break;
case DOUBLE:
constantFill[i] = new DoubleConstantFill(literal.getDouble());
break;
default:
throw new IllegalArgumentException(UNKNOWN_DATATYPE + inputDataTypes.get(i));
}
}
return constantFill;
}
private IFill[] getPreviousFill(
int inputColumns,
List<TSDataType> inputDataTypes,
TimeDuration timeDurationThreshold,
ZoneId zoneId) {
IFillFilter filter;
if (timeDurationThreshold == null) {
filter = IFillFilter.TRUE;
} else if (!timeDurationThreshold.containsMonth()) {
filter = new FixedIntervalFillFilter(timeDurationThreshold.nonMonthDuration);
} else {
switch (TIMESTAMP_PRECISION) {
case "ms":
filter =
new MonthIntervalMSFillFilter(
timeDurationThreshold.monthDuration,
timeDurationThreshold.nonMonthDuration,
zoneId);
break;
case "us":
filter =
new MonthIntervalUSFillFilter(
timeDurationThreshold.monthDuration,
timeDurationThreshold.nonMonthDuration,
zoneId);
break;
case "ns":
filter =
new MonthIntervalNSFillFilter(
timeDurationThreshold.monthDuration,
timeDurationThreshold.nonMonthDuration,
zoneId);
break;
default:
// this case will never reach
throw new UnsupportedOperationException(
"not supported time_precision: " + TIMESTAMP_PRECISION);
}
}
IFill[] previousFill = new IFill[inputColumns];
for (int i = 0; i < inputColumns; i++) {
switch (inputDataTypes.get(i)) {
case BOOLEAN:
previousFill[i] = new BooleanPreviousFill(filter);
break;
case TEXT:
previousFill[i] = new BinaryPreviousFill(filter);
break;
case INT32:
previousFill[i] = new IntPreviousFill(filter);
break;
case INT64:
previousFill[i] = new LongPreviousFill(filter);
break;
case FLOAT:
previousFill[i] = new FloatPreviousFill(filter);
break;
case DOUBLE:
previousFill[i] = new DoublePreviousFill(filter);
break;
default:
throw new IllegalArgumentException(UNKNOWN_DATATYPE + inputDataTypes.get(i));
}
}
return previousFill;
}
private ILinearFill[] getLinearFill(int inputColumns, List<TSDataType> inputDataTypes) {
ILinearFill[] linearFill = new ILinearFill[inputColumns];
for (int i = 0; i < inputColumns; i++) {
switch (inputDataTypes.get(i)) {
case INT32:
linearFill[i] = new IntLinearFill();
break;
case INT64:
linearFill[i] = new LongLinearFill();
break;
case FLOAT:
linearFill[i] = new FloatLinearFill();
break;
case DOUBLE:
linearFill[i] = new DoubleLinearFill();
break;
case BOOLEAN:
case TEXT:
linearFill[i] = IDENTITY_LINEAR_FILL;
break;
default:
throw new IllegalArgumentException(UNKNOWN_DATATYPE + inputDataTypes.get(i));
}
}
return linearFill;
}
@Override
public Operator visitTransform(TransformNode node, LocalExecutionPlanContext context) {
final OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
TransformOperator.class.getSimpleName());
final Operator inputOperator = generateOnlyChildOperator(node, context);
final List<TSDataType> inputDataTypes = getInputColumnTypes(node, context.getTypeProvider());
final Map<String, List<InputLocation>> inputLocations = makeLayout(node);
final Expression[] projectExpressions = node.getOutputExpressions();
final Map<NodeRef<Expression>, TSDataType> expressionTypes = new HashMap<>();
for (Expression projectExpression : projectExpressions) {
ExpressionTypeAnalyzer.analyzeExpression(expressionTypes, projectExpression);
}
boolean hasNonMappableUDF = false;
for (Expression expression : projectExpressions) {
if (!expression.isMappable(expressionTypes)) {
hasNonMappableUDF = true;
break;
}
}
// Use FilterAndProject Operator when project expressions are all mappable
if (!hasNonMappableUDF) {
// init project UDTFContext
UDTFContext projectContext = new UDTFContext(context.getZoneId());
projectContext.constructUdfExecutors(projectExpressions);
List<ColumnTransformer> projectOutputTransformerList = new ArrayList<>();
Map<Expression, ColumnTransformer> projectExpressionColumnTransformerMap = new HashMap<>();
// records LeafColumnTransformer of project expressions
List<LeafColumnTransformer> projectLeafColumnTransformerList = new ArrayList<>();
ColumnTransformerVisitor visitor = new ColumnTransformerVisitor();
ColumnTransformerVisitor.ColumnTransformerVisitorContext projectColumnTransformerContext =
new ColumnTransformerVisitor.ColumnTransformerVisitorContext(
projectContext,
expressionTypes,
projectLeafColumnTransformerList,
inputLocations,
projectExpressionColumnTransformerMap,
ImmutableMap.of(),
ImmutableList.of(),
inputDataTypes,
inputLocations.size() - 1,
null);
for (Expression expression : projectExpressions) {
projectOutputTransformerList.add(
visitor.process(expression, projectColumnTransformerContext));
}
return new FilterAndProjectOperator(
operatorContext,
inputOperator,
inputDataTypes,
ImmutableList.of(),
null,
ImmutableList.of(),
projectLeafColumnTransformerList,
projectOutputTransformerList,
false,
false);
}
try {
return new TransformOperator(
operatorContext,
inputOperator,
inputDataTypes,
inputLocations,
node.getOutputExpressions(),
node.isKeepNull(),
context.getZoneId(),
expressionTypes,
node.getScanOrder() == Ordering.ASC);
} catch (QueryProcessException e) {
throw new RuntimeException(e);
}
}
@Override
public Operator visitFilter(FilterNode node, LocalExecutionPlanContext context) {
if (context.isBuildPlanUseTemplate()) {
TemplatedInfo templatedInfo = context.getTemplatedInfo();
return constructFilterOperator(
node.getPredicate(),
generateOnlyChildOperator(node, context),
templatedInfo.getProjectExpressions(),
templatedInfo.getDataTypes(),
templatedInfo.getLayoutMap(),
templatedInfo.isKeepNull(),
node.getPlanNodeId(),
templatedInfo.getScanOrder(),
context);
}
return constructFilterOperator(
node.getPredicate(),
generateOnlyChildOperator(node, context),
node.getOutputExpressions(),
getInputColumnTypes(node, context.getTypeProvider()),
makeLayout(node),
node.isKeepNull(),
node.getPlanNodeId(),
node.getScanOrder(),
context);
}
private Operator constructFilterOperator(
Expression predicate,
Operator inputOperator,
Expression[] projectExpressions,
List<TSDataType> inputDataTypes,
Map<String, List<InputLocation>> inputLocations,
boolean isKeepNull,
PlanNodeId planNodeId,
Ordering scanOrder,
LocalExecutionPlanContext context) {
final Map<NodeRef<Expression>, TSDataType> expressionTypes = new HashMap<>();
if (context.isBuildPlanUseTemplate()) {
ExpressionTypeAnalyzer.analyzeExpressionUsingTemplatedInfo(
expressionTypes, predicate, context.getTypeProvider().getTemplatedInfo());
} else {
ExpressionTypeAnalyzer.analyzeExpression(expressionTypes, predicate);
}
// check whether predicate contains Non-Mappable UDF
if (!predicate.isMappable(expressionTypes)) {
throw new UnsupportedOperationException("Filter can not contain Non-Mappable UDF");
}
final List<TSDataType> filterOutputDataTypes = new ArrayList<>(inputDataTypes);
for (Expression projectExpression : projectExpressions) {
if (context.isBuildPlanUseTemplate()) {
ExpressionTypeAnalyzer.analyzeExpressionUsingTemplatedInfo(
expressionTypes, projectExpression, context.getTypeProvider().getTemplatedInfo());
} else {
ExpressionTypeAnalyzer.analyzeExpression(expressionTypes, projectExpression);
}
}
boolean hasNonMappableUdf = false;
for (Expression expression : projectExpressions) {
if (!expression.isMappable(expressionTypes)) {
hasNonMappableUdf = true;
break;
}
}
// init UDTFContext
UDTFContext filterContext = new UDTFContext(context.getZoneId());
filterContext.constructUdfExecutors(new Expression[] {predicate});
// records LeafColumnTransformer of filter
List<LeafColumnTransformer> filterLeafColumnTransformerList = new ArrayList<>();
// records common ColumnTransformer between filter and project expressions
List<ColumnTransformer> commonTransformerList = new ArrayList<>();
// records LeafColumnTransformer of project expressions
List<LeafColumnTransformer> projectLeafColumnTransformerList = new ArrayList<>();
// records subexpression -> ColumnTransformer for filter
Map<Expression, ColumnTransformer> filterExpressionColumnTransformerMap = new HashMap<>();
ColumnTransformerVisitor visitor = new ColumnTransformerVisitor();
ColumnTransformerVisitor.ColumnTransformerVisitorContext filterColumnTransformerContext =
new ColumnTransformerVisitor.ColumnTransformerVisitorContext(
filterContext,
expressionTypes,
filterLeafColumnTransformerList,
inputLocations,
filterExpressionColumnTransformerMap,
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
0,
null);
ColumnTransformer filterOutputTransformer =
visitor.process(predicate, filterColumnTransformerContext);
List<ColumnTransformer> projectOutputTransformerList = new ArrayList<>();
Map<Expression, ColumnTransformer> projectExpressionColumnTransformerMap = new HashMap<>();
// init project transformer when project expressions are all mappable
if (!hasNonMappableUdf) {
// init project UDTFContext
UDTFContext projectContext = new UDTFContext(context.getZoneId());
projectContext.constructUdfExecutors(projectExpressions);
ColumnTransformerVisitor.ColumnTransformerVisitorContext projectColumnTransformerContext =
new ColumnTransformerVisitor.ColumnTransformerVisitorContext(
projectContext,
expressionTypes,
projectLeafColumnTransformerList,
inputLocations,
projectExpressionColumnTransformerMap,
filterExpressionColumnTransformerMap,
commonTransformerList,
filterOutputDataTypes,
inputLocations.size() - 1,
null);
for (Expression expression : projectExpressions) {
projectOutputTransformerList.add(
visitor.process(expression, projectColumnTransformerContext));
}
}
final OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
planNodeId,
FilterAndProjectOperator.class.getSimpleName());
Operator filter =
new FilterAndProjectOperator(
operatorContext,
inputOperator,
filterOutputDataTypes,
filterLeafColumnTransformerList,
filterOutputTransformer,
commonTransformerList,
projectLeafColumnTransformerList,
projectOutputTransformerList,
hasNonMappableUdf,
true);
// Project expressions don't contain Non-Mappable UDF, TransformOperator is not needed
if (!hasNonMappableUdf) {
return filter;
}
// has Non-Mappable UDF, we wrap a TransformOperator for further calculation
try {
final OperatorContext transformContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(), planNodeId, TransformOperator.class.getSimpleName());
return new TransformOperator(
transformContext,
filter,
inputDataTypes,
inputLocations,
projectExpressions,
isKeepNull,
context.getZoneId(),
expressionTypes,
scanOrder == Ordering.ASC);
} catch (QueryProcessException e) {
throw new RuntimeException(e);
}
}
@Override
public Operator visitGroupByLevel(GroupByLevelNode node, LocalExecutionPlanContext context) {
checkArgument(
!node.getGroupByLevelDescriptors().isEmpty(),
"GroupByLevel descriptorList cannot be empty");
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
List<CrossSeriesAggregationDescriptor> aggregationDescriptors =
node.getGroupByLevelDescriptors();
for (CrossSeriesAggregationDescriptor descriptor : aggregationDescriptors) {
List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
// Use the first set of InputExpression
List<TSDataType> inputDataTypes =
IntStream.range(0, descriptor.getExpressionNumOfOneInput())
.mapToObj(
x ->
context
.getTypeProvider()
.getType(descriptor.getInputExpressions().get(x).getExpressionString()))
.collect(Collectors.toList());
aggregators.add(
new Aggregator(
AccumulatorFactory.createAccumulator(
descriptor.getAggregationFuncName(),
descriptor.getAggregationType(),
inputDataTypes,
descriptor.getInputExpressions(),
descriptor.getInputAttributes(),
ascending,
descriptor.getStep().isInputRaw()),
descriptor.getStep(),
inputLocationList));
}
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AggregationOperator.class.getSimpleName());
GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, ascending, false);
long maxReturnSize =
calculateMaxAggregationResultSize(
aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
return new AggregationOperator(
operatorContext, aggregators, timeRangeIterator, children, false, maxReturnSize);
}
@Override
public Operator visitGroupByTag(GroupByTagNode node, LocalExecutionPlanContext context) {
checkArgument(!node.getTagKeys().isEmpty(), "GroupByTag tag keys cannot be empty");
checkArgument(
node.getTagValuesToAggregationDescriptors().size() >= 1,
"GroupByTag aggregation descriptors cannot be empty");
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
boolean ascending = node.getScanOrder() == Ordering.ASC;
Map<String, List<InputLocation>> layout = makeLayout(node);
List<List<String>> groups = new ArrayList<>();
List<List<Aggregator>> groupedAggregators = new ArrayList<>();
int aggregatorCount = 0;
for (Map.Entry<List<String>, List<CrossSeriesAggregationDescriptor>> entry :
node.getTagValuesToAggregationDescriptors().entrySet()) {
groups.add(entry.getKey());
List<Aggregator> aggregators = new ArrayList<>();
for (CrossSeriesAggregationDescriptor aggregationDescriptor : entry.getValue()) {
if (aggregationDescriptor == null) {
aggregators.add(null);
continue;
}
List<InputLocation[]> inputLocations = calcInputLocationList(aggregationDescriptor, layout);
List<TSDataType> inputDataTypes =
aggregationDescriptor.getInputExpressions().stream()
.map(x -> context.getTypeProvider().getType(x.getExpressionString()))
.collect(Collectors.toList());
aggregators.add(
new Aggregator(
AccumulatorFactory.createAccumulator(
aggregationDescriptor.getAggregationFuncName(),
aggregationDescriptor.getAggregationType(),
inputDataTypes,
aggregationDescriptor.getInputExpressions(),
aggregationDescriptor.getInputAttributes(),
ascending,
aggregationDescriptor.getStep().isInputRaw()),
aggregationDescriptor.getStep(),
inputLocations));
}
groupedAggregators.add(aggregators);
aggregatorCount += aggregators.size();
}
GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, ascending, false);
List<AggregationDescriptor> aggregationDescriptors =
node.getTagValuesToAggregationDescriptors().values().stream()
.flatMap(Collection::stream)
.filter(Objects::nonNull)
.collect(Collectors.toList());
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
TagAggregationOperator.class.getSimpleName());
long maxReturnSize =
calculateMaxAggregationResultSize(
aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
return new TagAggregationOperator(
operatorContext, groups, groupedAggregators, children, maxReturnSize);
}
@Override
public Operator visitSlidingWindowAggregation(
SlidingWindowAggregationNode node, LocalExecutionPlanContext context) {
checkArgument(
!node.getAggregationDescriptorList().isEmpty(),
"Aggregation descriptorList cannot be empty");
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SlidingWindowAggregationOperator.class.getSimpleName());
Operator child = node.getChild().accept(this, context);
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
for (AggregationDescriptor descriptor : aggregationDescriptors) {
List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
aggregators.add(
SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
descriptor.getAggregationFuncName(),
descriptor.getAggregationType(),
descriptor.getInputExpressions().stream()
.map(x -> context.getTypeProvider().getType(x.getExpressionString()))
.collect(Collectors.toList()),
descriptor.getInputExpressions(),
descriptor.getInputAttributes(),
ascending,
inputLocationList,
descriptor.getStep()));
}
GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, ascending, false);
long maxReturnSize =
calculateMaxAggregationResultSize(
aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
return new SlidingWindowAggregationOperator(
operatorContext,
aggregators,
timeRangeIterator,
child,
ascending,
node.isOutputEndTime(),
groupByTimeParameter,
maxReturnSize);
}
@Override
public Operator visitLimit(LimitNode node, LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
LimitOperator.class.getSimpleName());
return new LimitOperator(operatorContext, node.getLimit(), child);
}
@Override
public Operator visitOffset(OffsetNode node, LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
OffsetOperator.class.getSimpleName());
return new OffsetOperator(operatorContext, node.getOffset(), child);
}
@Override
public Operator visitAggregation(AggregationNode node, LocalExecutionPlanContext context) {
checkArgument(
!node.getAggregationDescriptorList().isEmpty(),
"Aggregation descriptorList cannot be empty");
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
aggregators.add(
new Aggregator(
AccumulatorFactory.createAccumulator(
descriptor.getAggregationFuncName(),
descriptor.getAggregationType(),
descriptor.getInputExpressions().stream()
.map(x -> context.getTypeProvider().getType(x.getExpressionString()))
.collect(Collectors.toList()),
descriptor.getInputExpressions(),
descriptor.getInputAttributes(),
ascending,
descriptor.getStep().isInputRaw()),
descriptor.getStep(),
inputLocationList));
}
boolean inputRaw = node.getAggregationDescriptorList().get(0).getStep().isInputRaw();
GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
GroupByParameter groupByParameter = node.getGroupByParameter();
if (inputRaw) {
checkArgument(children.size() == 1, "rawDataAggregateOperator can only accept one input");
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
RawDataAggregationOperator.class.getSimpleName());
ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, ascending, true);
long maxReturnSize =
calculateMaxAggregationResultSize(
aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
// groupByParameter and groupByTimeParameter
if (groupByParameter != null) {
WindowType windowType = groupByParameter.getWindowType();
WindowParameter windowParameter;
switch (windowType) {
case VARIATION_WINDOW:
Expression groupByVariationExpression = node.getGroupByExpression();
if (groupByVariationExpression == null) {
throw new IllegalArgumentException("groupByVariationExpression can't be null");
}
String controlColumn = groupByVariationExpression.getExpressionString();
TSDataType controlColumnType = context.getTypeProvider().getType(controlColumn);
windowParameter =
new VariationWindowParameter(
controlColumnType,
layout.get(controlColumn).get(0).getValueColumnIndex(),
node.isOutputEndTime(),
((GroupByVariationParameter) groupByParameter).isIgnoringNull(),
((GroupByVariationParameter) groupByParameter).getDelta());
break;
case CONDITION_WINDOW:
Expression groupByConditionExpression = node.getGroupByExpression();
if (groupByConditionExpression == null) {
throw new IllegalArgumentException("groupByConditionExpression can't be null");
}
windowParameter =
new ConditionWindowParameter(
node.isOutputEndTime(),
((GroupByConditionParameter) groupByParameter).isIgnoringNull(),
layout
.get(groupByConditionExpression.getExpressionString())
.get(0)
.getValueColumnIndex(),
((GroupByConditionParameter) groupByParameter).getKeepExpression());
break;
case SESSION_WINDOW:
windowParameter =
new SessionWindowParameter(
((GroupBySessionParameter) groupByParameter).getTimeInterval(),
node.isOutputEndTime());
break;
case COUNT_WINDOW:
Expression groupByCountExpression = node.getGroupByExpression();
if (groupByCountExpression == null) {
throw new IllegalArgumentException("groupByCountExpression can't be null");
}
windowParameter =
new CountWindowParameter(
((GroupByCountParameter) groupByParameter).getCountNumber(),
layout
.get(groupByCountExpression.getExpressionString())
.get(0)
.getValueColumnIndex(),
node.isOutputEndTime(),
((GroupByCountParameter) groupByParameter).isIgnoreNull());
break;
default:
throw new IllegalArgumentException("Unsupported window type");
}
return new RawDataAggregationOperator(
operatorContext,
aggregators,
timeRangeIterator,
children.get(0),
ascending,
maxReturnSize,
windowParameter);
}
WindowParameter windowParameter = new TimeWindowParameter(node.isOutputEndTime());
return new RawDataAggregationOperator(
operatorContext,
aggregators,
timeRangeIterator,
children.get(0),
ascending,
maxReturnSize,
windowParameter);
} else {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AggregationOperator.class.getSimpleName());
ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, ascending, true);
long maxReturnSize =
calculateMaxAggregationResultSize(
aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
return new AggregationOperator(
operatorContext,
aggregators,
timeRangeIterator,
children,
node.isOutputEndTime(),
maxReturnSize);
}
}
private List<InputLocation[]> calcInputLocationList(
AggregationDescriptor descriptor, Map<String, List<InputLocation>> layout) {
List<List<String>> inputColumnNames = descriptor.getInputColumnNamesList();
List<InputLocation[]> inputLocationList = new ArrayList<>();
for (List<String> inputColumnNamesOfOneInput : inputColumnNames) {
// it may include double parts
List<List<InputLocation>> inputLocationParts = new ArrayList<>();
inputColumnNamesOfOneInput.forEach(o -> inputLocationParts.add(layout.get(o)));
for (int i = 0; i < inputLocationParts.get(0).size(); i++) {
if (inputColumnNamesOfOneInput.size() == 1) {
inputLocationList.add(new InputLocation[] {inputLocationParts.get(0).get(i)});
} else {
inputLocationList.add(
new InputLocation[] {
inputLocationParts.get(0).get(i), inputLocationParts.get(1).get(i)
});
}
}
}
return inputLocationList;
}
@Override
public Operator visitSort(SortNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SortOperator.class.getSimpleName());
List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());
List<SortItem> sortItemList = node.getOrderByParameter().getSortItemList();
List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemList.size());
genSortInformation(
node.getOutputColumnNames(),
dataTypes,
sortItemList,
sortItemIndexList,
sortItemDataTypeList);
String filePrefix =
IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()
+ File.separator
+ operatorContext.getDriverContext().getFragmentInstanceContext().getId().getFullId()
+ File.separator
+ operatorContext.getDriverContext().getPipelineId()
+ File.separator;
context.getDriverContext().setHaveTmpFile(true);
context.getDriverContext().getFragmentInstanceContext().setMayHaveTmpFile(true);
Operator child = node.getChild().accept(this, context);
return new SortOperator(
operatorContext,
child,
dataTypes,
filePrefix,
getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList));
}
@Override
public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
IntoOperator.class.getSimpleName());
IntoPathDescriptor intoPathDescriptor = node.getIntoPathDescriptor();
Map<String, InputLocation> sourceColumnToInputLocationMap =
constructSourceColumnToInputLocationMap(node);
Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
new HashMap<>();
processTargetPathToSourceMap(
intoPathDescriptor.getTargetPathToSourceMap(),
targetPathToSourceInputLocationMap,
sourceColumnToInputLocationMap);
Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
intoPathDescriptor.getTargetPathToDataTypeMap();
long statementSizePerLine = calculateStatementSizePerLine(targetPathToDataTypeMap);
List<Pair<String, PartialPath>> sourceTargetPathPairList =
intoPathDescriptor.getSourceTargetPathPairList();
List<String> sourceColumnToViewList = intoPathDescriptor.getSourceColumnToViewList();
List<Pair<String, PartialPath>> sourceTargetPathPairWithViewList =
new ArrayList<>(sourceTargetPathPairList);
for (int i = 0; i < sourceColumnToViewList.size(); i++) {
String viewPath = sourceColumnToViewList.get(i);
if (StringUtils.isNotEmpty(viewPath)) {
sourceTargetPathPairWithViewList.get(i).setLeft(viewPath);
}
}
return new IntoOperator(
operatorContext,
child,
getInputColumnTypes(node, context.getTypeProvider()),
targetPathToSourceInputLocationMap,
targetPathToDataTypeMap,
intoPathDescriptor.getTargetDeviceToAlignedMap(),
sourceTargetPathPairWithViewList,
FragmentInstanceManager.getInstance().getIntoOperationExecutor(),
statementSizePerLine);
}
@Override
public Operator visitDeviceViewInto(DeviceViewIntoNode node, LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
DeviceViewIntoOperator.class.getSimpleName());
DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor =
node.getDeviceViewIntoPathDescriptor();
Map<String, InputLocation> sourceColumnToInputLocationMap =
constructSourceColumnToInputLocationMap(node);
Map<String, Map<PartialPath, Map<String, InputLocation>>>
deviceToTargetPathSourceInputLocationMap = new HashMap<>();
Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap =
deviceViewIntoPathDescriptor.getSourceDeviceToTargetPathDataTypeMap();
Map<String, Map<PartialPath, Map<String, String>>> sourceDeviceToTargetPathMap =
deviceViewIntoPathDescriptor.getSourceDeviceToTargetPathMap();
long statementSizePerLine = 0L;
for (Map.Entry<String, Map<PartialPath, Map<String, String>>> deviceEntry :
sourceDeviceToTargetPathMap.entrySet()) {
String sourceDevice = deviceEntry.getKey();
Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
new HashMap<>();
processTargetPathToSourceMap(
deviceEntry.getValue(),
targetPathToSourceInputLocationMap,
sourceColumnToInputLocationMap);
deviceToTargetPathSourceInputLocationMap.put(
sourceDevice, targetPathToSourceInputLocationMap);
statementSizePerLine +=
calculateStatementSizePerLine(deviceToTargetPathDataTypeMap.get(sourceDevice));
}
return new DeviceViewIntoOperator(
operatorContext,
child,
getInputColumnTypes(node, context.getTypeProvider()),
deviceToTargetPathSourceInputLocationMap,
deviceToTargetPathDataTypeMap,
deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(),
deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
sourceColumnToInputLocationMap,
FragmentInstanceManager.getInstance().getIntoOperationExecutor(),
statementSizePerLine);
}
private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {
Map<String, InputLocation> sourceColumnToInputLocationMap = new HashMap<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
for (Map.Entry<String, List<InputLocation>> layoutEntry : layout.entrySet()) {
sourceColumnToInputLocationMap.put(layoutEntry.getKey(), layoutEntry.getValue().get(0));
}
return sourceColumnToInputLocationMap;
}
private void processTargetPathToSourceMap(
Map<PartialPath, Map<String, String>> targetPathToSourceMap,
Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
Map<String, InputLocation> sourceColumnToInputLocationMap) {
for (Map.Entry<PartialPath, Map<String, String>> entry : targetPathToSourceMap.entrySet()) {
PartialPath targetDevice = entry.getKey();
Map<String, InputLocation> measurementToInputLocationMap = new HashMap<>();
for (Map.Entry<String, String> measurementEntry : entry.getValue().entrySet()) {
String targetMeasurement = measurementEntry.getKey();
String sourceColumn = measurementEntry.getValue();
measurementToInputLocationMap.put(
targetMeasurement, sourceColumnToInputLocationMap.get(sourceColumn));
}
targetPathToSourceInputLocationMap.put(targetDevice, measurementToInputLocationMap);
}
}
private long calculateStatementSizePerLine(
Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap) {
long maxStatementSize = Long.BYTES;
List<TSDataType> dataTypes =
targetPathToDataTypeMap.values().stream()
.flatMap(stringTSDataTypeMap -> stringTSDataTypeMap.values().stream())
.collect(Collectors.toList());
for (TSDataType dataType : dataTypes) {
maxStatementSize += getValueSizePerLine(dataType);
}
return maxStatementSize;
}
private static long getValueSizePerLine(TSDataType tsDataType) {
switch (tsDataType) {
case INT32:
return Integer.BYTES;
case INT64:
return Long.BYTES;
case FLOAT:
return Float.BYTES;
case DOUBLE:
return Double.BYTES;
case BOOLEAN:
return Byte.BYTES;
case TEXT:
return StatisticsManager.getInstance().getMaxBinarySizeInBytes();
default:
throw new UnsupportedOperationException("Unknown data type " + tsDataType);
}
}
@Override
public Operator visitFullOuterTimeJoin(
FullOuterTimeJoinNode node, LocalExecutionPlanContext context) {
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
FullOuterTimeJoinOperator.class.getSimpleName());
TimeComparator timeComparator =
node.getMergeOrder() == Ordering.ASC ? ASC_TIME_COMPARATOR : DESC_TIME_COMPARATOR;
List<OutputColumn> outputColumns = generateOutputColumnsFromChildren(node);
List<ColumnMerger> mergers = createColumnMergers(outputColumns, timeComparator);
List<TSDataType> outputColumnTypes =
context.getTypeProvider().getTemplatedInfo() != null
? getOutputColumnTypesOfTimeJoinNode(node)
: getOutputColumnTypes(node, context.getTypeProvider());
return new FullOuterTimeJoinOperator(
operatorContext,
children,
node.getMergeOrder(),
outputColumnTypes,
mergers,
timeComparator);
}
@Override
public Operator visitInnerTimeJoin(InnerTimeJoinNode node, LocalExecutionPlanContext context) {
node.getTimePartitions().ifPresent(context::setTimePartitions);
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
InnerTimeJoinOperator.class.getSimpleName());
TimeComparator timeComparator =
node.getMergeOrder() == Ordering.ASC ? ASC_TIME_COMPARATOR : DESC_TIME_COMPARATOR;
List<TSDataType> outputColumnTypes =
context.getTypeProvider().getTemplatedInfo() != null
? getOutputColumnTypesOfTimeJoinNode(node)
: getOutputColumnTypes(node, context.getTypeProvider());
return new InnerTimeJoinOperator(
operatorContext, children, outputColumnTypes, timeComparator, getOutputColumnMap(node));
}
private Map<InputLocation, Integer> getOutputColumnMap(InnerTimeJoinNode innerTimeJoinNode) {
Map<InputLocation, Integer> result = new HashMap<>();
if (innerTimeJoinNode.outputColumnNamesIsNull()) {
int outputIndex = 0;
for (int i = 0, size = innerTimeJoinNode.getChildren().size(); i < size; i++) {
PlanNode child = innerTimeJoinNode.getChildren().get(i);
List<String> childOutputColumns = child.getOutputColumnNames();
for (int j = 0, childSize = childOutputColumns.size(); j < childSize; j++) {
result.put(new InputLocation(i, j), outputIndex++);
}
}
} else {
List<String> outputColumns = innerTimeJoinNode.getOutputColumnNames();
Map<String, Integer> outputColumnIndexMap = new HashMap<>();
for (int i = 0; i < outputColumns.size(); i++) {
outputColumnIndexMap.put(outputColumns.get(i), i);
}
for (int i = 0, size = innerTimeJoinNode.getChildren().size(); i < size; i++) {
PlanNode child = innerTimeJoinNode.getChildren().get(i);
List<String> childOutputColumns = child.getOutputColumnNames();
for (int j = 0, childSize = childOutputColumns.size(); j < childSize; j++) {
result.put(new InputLocation(i, j), outputColumnIndexMap.get(childOutputColumns.get(j)));
}
}
}
return result;
}
@Override
public Operator visitLeftOuterTimeJoin(
LeftOuterTimeJoinNode node, LocalExecutionPlanContext context) {
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
checkState(children.size() == 2);
Operator leftChild = children.get(0);
Operator rightChild = children.get(1);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
LeftOuterTimeJoinOperator.class.getSimpleName());
TimeComparator timeComparator =
node.getMergeOrder() == Ordering.ASC ? ASC_TIME_COMPARATOR : DESC_TIME_COMPARATOR;
List<TSDataType> outputColumnTypes =
context.getTypeProvider().getTemplatedInfo() != null
? getOutputColumnTypesOfTimeJoinNode(node)
: getOutputColumnTypes(node, context.getTypeProvider());
return new LeftOuterTimeJoinOperator(
operatorContext,
leftChild,
node.getLeftChild().getOutputColumnNames().size(),
rightChild,
outputColumnTypes,
timeComparator);
}
@Override
public Operator visitHorizontallyConcat(
HorizontallyConcatNode node, LocalExecutionPlanContext context) {
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
HorizontallyConcatOperator.class.getSimpleName());
List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider());
return new HorizontallyConcatOperator(operatorContext, children, outputColumnTypes);
}
@Override
public Operator visitShowQueries(ShowQueriesNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
ShowQueriesOperator.class.getSimpleName());
return new ShowQueriesOperator(
operatorContext, node.getPlanNodeId(), Coordinator.getInstance());
}
private List<OutputColumn> generateOutputColumnsFromChildren(MultiChildProcessNode node) {
// TODO we should also sort the InputLocation for each column if they are not overlapped
return makeLayout(node).entrySet().stream()
.filter(entry -> !TIMESTAMP_EXPRESSION_STRING.equals(entry.getKey()))
.map(entry -> new OutputColumn(entry.getValue(), entry.getValue().size() > 1))
.collect(Collectors.toList());
}
private List<ColumnMerger> createColumnMergers(
List<OutputColumn> outputColumns, TimeComparator timeComparator) {
List<ColumnMerger> mergers = new ArrayList<>(outputColumns.size());
for (OutputColumn outputColumn : outputColumns) {
ColumnMerger merger;
// only has one input column
if (outputColumn.isSingleInputColumn()) {
merger = new SingleColumnMerger(outputColumn.getSourceLocation(0), timeComparator);
} else if (outputColumn.isOverlapped()) {
// has more than one input columns but time of these input columns is overlapped
merger = new MultiColumnMerger(outputColumn.getSourceLocations());
} else {
// has more than one input columns and time of these input columns is not overlapped
merger =
new NonOverlappedMultiColumnMerger(outputColumn.getSourceLocations(), timeComparator);
}
mergers.add(merger);
}
return mergers;
}
@Override
public Operator visitExchange(ExchangeNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
ExchangeOperator.class.getSimpleName());
FragmentInstanceId localInstanceId = context.getInstanceContext().getId();
FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId();
TEndPoint upstreamEndPoint = node.getUpstreamEndpoint();
boolean isSameNode = isSameNode(upstreamEndPoint);
ISourceHandle sourceHandle =
isSameNode
? MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForFragment(
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
node.getUpstreamPlanNodeId().getId(),
remoteInstanceId.toThrift(),
node.getIndexOfUpstreamSinkHandle(),
context.getInstanceContext()::failed)
: MPP_DATA_EXCHANGE_MANAGER.createSourceHandle(
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
node.getIndexOfUpstreamSinkHandle(),
upstreamEndPoint,
remoteInstanceId.toThrift(),
context.getInstanceContext()::failed);
if (!isSameNode) {
context.addExchangeSumNum(1);
}
sourceHandle.setMaxBytesCanReserve(context.getMaxBytesOneHandleCanReserve());
ExchangeOperator exchangeOperator =
new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
context.addExchangeOperator(exchangeOperator);
return exchangeOperator;
}
@Override
public Operator visitIdentitySink(IdentitySinkNode node, LocalExecutionPlanContext context) {
context.addExchangeSumNum(1);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
IdentitySinkOperator.class.getSimpleName());
checkArgument(
MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null");
FragmentInstanceId localInstanceId = context.getInstanceContext().getId();
DownStreamChannelIndex downStreamChannelIndex = new DownStreamChannelIndex(0);
ISinkHandle sinkHandle =
MPP_DATA_EXCHANGE_MANAGER.createShuffleSinkHandle(
node.getDownStreamChannelLocationList(),
downStreamChannelIndex,
ShuffleSinkHandle.ShuffleStrategyEnum.PLAIN,
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
context.getInstanceContext());
sinkHandle.setMaxBytesCanReserve(context.getMaxBytesOneHandleCanReserve());
context.getDriverContext().setSink(sinkHandle);
List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, context);
return new IdentitySinkOperator(operatorContext, children, downStreamChannelIndex, sinkHandle);
}
@Override
public Operator visitShuffleSink(ShuffleSinkNode node, LocalExecutionPlanContext context) {
context.addExchangeSumNum(1);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
ShuffleHelperOperator.class.getSimpleName());
// TODO implement pipeline division for shuffle sink
context.setDegreeOfParallelism(1);
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
checkArgument(
MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null");
FragmentInstanceId localInstanceId = context.getInstanceContext().getId();
DownStreamChannelIndex downStreamChannelIndex = new DownStreamChannelIndex(0);
ISinkHandle sinkHandle =
MPP_DATA_EXCHANGE_MANAGER.createShuffleSinkHandle(
node.getDownStreamChannelLocationList(),
downStreamChannelIndex,
ShuffleSinkHandle.ShuffleStrategyEnum.SIMPLE_ROUND_ROBIN,
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
context.getInstanceContext());
sinkHandle.setMaxBytesCanReserve(context.getMaxBytesOneHandleCanReserve());
context.getDriverContext().setSink(sinkHandle);
return new ShuffleHelperOperator(operatorContext, children, downStreamChannelIndex, sinkHandle);
}
@Override
public Operator visitSchemaFetchMerge(
SchemaFetchMergeNode node, LocalExecutionPlanContext context) {
List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaFetchMergeOperator.class.getSimpleName());
return new SchemaFetchMergeOperator(operatorContext, children, node.getStorageGroupList());
}
@Override
public Operator visitSchemaFetchScan(
SchemaFetchScanNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaFetchScanOperator.class.getSimpleName());
return new SchemaFetchScanOperator(
node.getPlanNodeId(),
operatorContext,
node.getPatternTree(),
node.getTemplateMap(),
((SchemaDriverContext) (context.getDriverContext())).getSchemaRegion(),
node.isWithTags(),
node.isWithTemplate());
}
@Override
public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) {
PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
TimeValuePair timeValuePair = null;
context.dataNodeQueryContext.lock();
try {
if (!context.dataNodeQueryContext.unCached(seriesPath)) {
timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
if (timeValuePair == null) {
context.dataNodeQueryContext.addUnCachePath(seriesPath, node.getDataNodeSeriesScanNum());
}
}
} finally {
context.dataNodeQueryContext.unLock();
}
if (timeValuePair == null) { // last value is not cached
return createUpdateLastCacheOperator(node, context, node.getSeriesPath());
} else if (timeValuePair.getValue() == null) { // there is no data for this time series
return null;
} else if (!LastQueryUtil.satisfyFilter(
updateFilterUsingTTL(context.getGlobalTimeFilter(), context.getDataRegionTTL()),
timeValuePair)) { // cached last value is not satisfied
if (!isFilterGtOrGe(context.getGlobalTimeFilter())) {
// time filter is not > or >=, we still need to read from disk
return createUpdateLastCacheOperator(node, context, node.getSeriesPath());
} else { // otherwise, we just ignore it and return null
return null;
}
} else { // cached last value is satisfied, put it into LastCacheScanOperator
context.addCachedLastValue(timeValuePair, node.outputPathSymbol());
return null;
}
}
private boolean isFilterGtOrGe(Filter filter) {
return filter instanceof TimeGt || filter instanceof TimeGtEq;
}
private UpdateLastCacheOperator createUpdateLastCacheOperator(
LastQueryScanNode node, LocalExecutionPlanContext context, MeasurementPath fullPath) {
SeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context);
if (node.getOutputViewPath() == null) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
UpdateLastCacheOperator.class.getSimpleName());
return new UpdateLastCacheOperator(
operatorContext,
lastQueryScan,
fullPath,
node.getSeriesPath().getSeriesType(),
DATA_NODE_SCHEMA_CACHE,
context.isNeedUpdateLastCache(),
context.isNeedUpdateNullEntry());
} else {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
UpdateViewPathLastCacheOperator.class.getSimpleName());
return new UpdateViewPathLastCacheOperator(
operatorContext,
lastQueryScan,
fullPath,
node.getSeriesPath().getSeriesType(),
DATA_NODE_SCHEMA_CACHE,
context.isNeedUpdateLastCache(),
context.isNeedUpdateNullEntry(),
node.getOutputViewPath());
}
}
private AlignedUpdateLastCacheOperator createAlignedUpdateLastCacheOperator(
AlignedLastQueryScanNode node, AlignedPath unCachedPath, LocalExecutionPlanContext context) {
AlignedSeriesAggregationScanOperator lastQueryScan =
createLastQueryScanOperator(node, unCachedPath, context);
if (node.getOutputViewPath() == null) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AlignedUpdateLastCacheOperator.class.getSimpleName());
return new AlignedUpdateLastCacheOperator(
operatorContext,
lastQueryScan,
unCachedPath,
DATA_NODE_SCHEMA_CACHE,
context.isNeedUpdateLastCache(),
context.isNeedUpdateNullEntry());
} else {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AlignedUpdateViewPathLastCacheOperator.class.getSimpleName());
return new AlignedUpdateViewPathLastCacheOperator(
operatorContext,
lastQueryScan,
unCachedPath,
DATA_NODE_SCHEMA_CACHE,
context.isNeedUpdateLastCache(),
context.isNeedUpdateNullEntry(),
node.getOutputViewPath());
}
}
private SeriesAggregationScanOperator createLastQueryScanOperator(
LastQueryScanNode node, LocalExecutionPlanContext context) {
MeasurementPath seriesPath = node.getSeriesPath();
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SeriesAggregationScanOperator.class.getSimpleName());
// last_time, last_value
List<Aggregator> aggregators = LastQueryUtil.createAggregators(seriesPath.getSeriesType());
ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, false);
long maxReturnSize = calculateMaxAggregationResultSizeForLastQuery(aggregators);
SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
scanOptionsBuilder.withAllSensors(
context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()));
scanOptionsBuilder.withGlobalTimeFilter(context.getGlobalTimeFilter());
SeriesAggregationScanOperator seriesAggregationScanOperator =
new SeriesAggregationScanOperator(
node.getPlanNodeId(),
seriesPath,
Ordering.DESC,
scanOptionsBuilder.build(),
operatorContext,
aggregators,
timeRangeIterator,
null,
maxReturnSize);
((DataDriverContext) context.getDriverContext())
.addSourceOperator(seriesAggregationScanOperator);
((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
return seriesAggregationScanOperator;
}
private AlignedSeriesAggregationScanOperator createLastQueryScanOperator(
AlignedLastQueryScanNode node, AlignedPath unCachedPath, LocalExecutionPlanContext context) {
// last_time, last_value
List<Aggregator> aggregators = new ArrayList<>();
for (int i = 0; i < unCachedPath.getMeasurementList().size(); i++) {
aggregators.addAll(
LastQueryUtil.createAggregators(unCachedPath.getSchemaList().get(i).getType(), i));
}
ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, false);
long maxReturnSize = calculateMaxAggregationResultSizeForLastQuery(aggregators);
SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
scanOptionsBuilder.withAllSensors(new HashSet<>(unCachedPath.getMeasurementList()));
scanOptionsBuilder.withGlobalTimeFilter(context.getGlobalTimeFilter());
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AlignedSeriesAggregationScanOperator.class.getSimpleName());
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
node.getPlanNodeId(),
unCachedPath,
Ordering.DESC,
scanOptionsBuilder.build(),
operatorContext,
aggregators,
timeRangeIterator,
null,
maxReturnSize);
((DataDriverContext) context.getDriverContext())
.addSourceOperator(seriesAggregationScanOperator);
((DataDriverContext) context.getDriverContext()).addPath(unCachedPath);
return seriesAggregationScanOperator;
}
@Override
public Operator visitAlignedLastQueryScan(
AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
AlignedPath alignedPath = node.getSeriesPath();
PartialPath devicePath = alignedPath.getDevicePath();
// get series under aligned entity that has not been cached
List<Integer> unCachedMeasurementIndexes = new ArrayList<>();
List<String> measurementList = alignedPath.getMeasurementList();
for (int i = 0; i < measurementList.size(); i++) {
PartialPath measurementPath = devicePath.concatNode(measurementList.get(i));
TimeValuePair timeValuePair = null;
try {
context.dataNodeQueryContext.lock();
if (!context.dataNodeQueryContext.unCached(measurementPath)) {
timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(measurementPath);
if (timeValuePair == null) {
context.dataNodeQueryContext.addUnCachePath(
measurementPath, new AtomicInteger(node.getDataNodeSeriesScanNum().get()));
}
}
} finally {
context.dataNodeQueryContext.unLock();
}
if (timeValuePair == null) { // last value is not cached
unCachedMeasurementIndexes.add(i);
} else if (timeValuePair.getValue() == null) {
// there is no data for this time series, just ignore
} else if (!LastQueryUtil.satisfyFilter(
updateFilterUsingTTL(context.getGlobalTimeFilter(), context.getDataRegionTTL()),
timeValuePair)) { // cached last value is not satisfied
if (!isFilterGtOrGe(context.getGlobalTimeFilter())) {
// time filter is not > or >=, we still need to read from disk
unCachedMeasurementIndexes.add(i);
}
} else { // cached last value is satisfied, put it into LastCacheScanOperator
if (node.getOutputViewPath() != null) {
context.addCachedLastValue(timeValuePair, node.getOutputViewPath());
} else {
context.addCachedLastValue(timeValuePair, measurementPath.getFullPath());
}
}
}
if (unCachedMeasurementIndexes.isEmpty()) {
return null;
} else {
AlignedPath unCachedPath = new AlignedPath(alignedPath.getDevicePath());
for (int i : unCachedMeasurementIndexes) {
unCachedPath.addMeasurement(measurementList.get(i), alignedPath.getSchemaList().get(i));
}
return createAlignedUpdateLastCacheOperator(node, unCachedPath, context);
}
}
@Override
public Operator visitLastQuery(LastQueryNode node, LocalExecutionPlanContext context) {
Filter globalTimeFilter = context.getGlobalTimeFilter();
context.setNeedUpdateLastCache(LastQueryUtil.needUpdateCache(globalTimeFilter));
context.setNeedUpdateNullEntry(LastQueryUtil.needUpdateNullEntry(globalTimeFilter));
List<Operator> operatorList =
node.getChildren().stream()
.map(child -> child.accept(this, context))
.filter(Objects::nonNull)
.collect(Collectors.toList());
List<Pair<TimeValuePair, Binary>> cachedLastValueAndPathList =
context.getCachedLastValueAndPathList();
int initSize = cachedLastValueAndPathList != null ? cachedLastValueAndPathList.size() : 0;
// no need to order by timeseries at first
if (!node.needOrderByTimeseries()) {
TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(initSize);
for (int i = 0; i < initSize; i++) {
TimeValuePair timeValuePair = cachedLastValueAndPathList.get(i).left;
LastQueryUtil.appendLastValue(
builder,
timeValuePair.getTimestamp(),
cachedLastValueAndPathList.get(i).right,
timeValuePair.getValue().getStringValue(),
timeValuePair.getValue().getDataType().name());
}
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
LastQueryOperator.class.getSimpleName());
return new LastQueryOperator(operatorContext, operatorList, builder);
} else {
// order by timeseries
Comparator<Binary> comparator =
node.getTimeseriesOrdering() == Ordering.ASC
? ASC_BINARY_COMPARATOR
: DESC_BINARY_COMPARATOR;
// sort values from last cache
if (initSize > 0) {
cachedLastValueAndPathList.sort(Comparator.comparing(Pair::getRight, comparator));
}
TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(initSize);
for (int i = 0; i < initSize; i++) {
TimeValuePair timeValuePair = cachedLastValueAndPathList.get(i).left;
LastQueryUtil.appendLastValue(
builder,
timeValuePair.getTimestamp(),
cachedLastValueAndPathList.get(i).right,
timeValuePair.getValue().getStringValue(),
timeValuePair.getValue().getDataType().name());
}
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
LastQuerySortOperator.class.getSimpleName());
return new LastQuerySortOperator(operatorContext, builder.build(), operatorList, comparator);
}
}
@Override
public Operator visitLastQueryMerge(LastQueryMergeNode node, LocalExecutionPlanContext context) {
List<Operator> children =
node.getChildren().stream()
.map(child -> child.accept(this, context))
.collect(Collectors.toList());
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
LastQueryMergeOperator.class.getSimpleName());
Ordering timeseriesOrdering = node.getTimeseriesOrdering();
Comparator<Binary> comparator =
(timeseriesOrdering == null || timeseriesOrdering == Ordering.ASC)
? ASC_BINARY_COMPARATOR
: DESC_BINARY_COMPARATOR;
return new LastQueryMergeOperator(operatorContext, children, comparator);
}
@Override
public Operator visitLastQueryCollect(
LastQueryCollectNode node, LocalExecutionPlanContext context) {
List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
LastQueryCollectOperator.class.getSimpleName());
return new LastQueryCollectOperator(operatorContext, children);
}
@Override
public Operator visitLastQueryTransform(
LastQueryTransformNode node, LocalExecutionPlanContext context) {
Operator operator = node.getChild().accept(this, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
LastQueryCollectOperator.class.getSimpleName());
return new LastQueryTransformOperator(
node.getViewPath(), node.getDataType(), operatorContext, operator);
}
private Map<String, List<InputLocation>> makeLayout(PlanNode node) {
return makeLayout(node.getChildren());
}
private Map<String, List<InputLocation>> makeLayout(List<PlanNode> children) {
Map<String, List<InputLocation>> outputMappings = new LinkedHashMap<>();
int tsBlockIndex = 0;
for (PlanNode childNode : children) {
outputMappings
.computeIfAbsent(TIMESTAMP_EXPRESSION_STRING, key -> new ArrayList<>())
.add(new InputLocation(tsBlockIndex, -1));
int valueColumnIndex = 0;
for (String columnName : childNode.getOutputColumnNames()) {
outputMappings
.computeIfAbsent(columnName, key -> new ArrayList<>())
.add(new InputLocation(tsBlockIndex, valueColumnIndex));
valueColumnIndex++;
}
tsBlockIndex++;
}
return outputMappings;
}
private List<TSDataType> getInputColumnTypes(PlanNode node, TypeProvider typeProvider) {
if (typeProvider.getTemplatedInfo() == null) {
return node.getChildren().stream()
.map(PlanNode::getOutputColumnNames)
.flatMap(List::stream)
.map(typeProvider::getType)
.collect(Collectors.toList());
} else {
return getInputColumnTypesUseTemplate(node);
}
}
private List<TSDataType> getInputColumnTypesUseTemplate(PlanNode node) {
// Only templated device + filter situation can invoke this method,
// the children of FilterNode/TransformNode can be TimeJoinNode, ScanNode, any others?
List<TSDataType> dataTypes = new ArrayList<>();
for (PlanNode child : node.getChildren()) {
if (child instanceof SeriesScanNode) {
dataTypes.add(((SeriesScanNode) child).getSeriesPath().getSeriesType());
} else if (child instanceof AlignedSeriesScanNode) {
AlignedSeriesScanNode alignedSeriesScanNode = (AlignedSeriesScanNode) child;
alignedSeriesScanNode
.getAlignedPath()
.getSchemaList()
.forEach(c -> dataTypes.add(c.getType()));
} else {
dataTypes.addAll(getInputColumnTypesUseTemplate(child));
}
}
return dataTypes;
}
private List<TSDataType> getOutputColumnTypes(PlanNode node, TypeProvider typeProvider) {
return node.getOutputColumnNames().stream()
.map(typeProvider::getType)
.collect(Collectors.toList());
}
private List<TSDataType> getOutputColumnTypesOfTimeJoinNode(PlanNode node) {
// Only templated device situation can invoke this method,
// the children of TimeJoinNode can only be ScanNode or TimeJoinNode
List<TSDataType> dataTypes = new ArrayList<>();
for (PlanNode child : node.getChildren()) {
if (child instanceof SeriesScanNode) {
dataTypes.add(((SeriesScanNode) child).getSeriesPath().getSeriesType());
} else if (child instanceof AlignedSeriesScanNode) {
dataTypes.add(((AlignedSeriesScanNode) child).getAlignedPath().getSeriesType());
} else if (child instanceof FullOuterTimeJoinNode
|| child instanceof InnerTimeJoinNode
|| child instanceof LeftOuterTimeJoinNode) {
dataTypes.addAll(getOutputColumnTypesOfTimeJoinNode(child));
} else {
LOGGER.error(
"Unexpected PlanNode in getOutputColumnTypesOfTimeJoinNode, type: {}",
child.getOutputColumnNames());
}
}
return dataTypes;
}
private Operator generateOnlyChildOperator(PlanNode node, LocalExecutionPlanContext context) {
List<Operator> children =
node.getChildren().stream()
.map(child -> child.accept(this, context))
.collect(Collectors.toList());
Validate.isTrue(children.size() == 1);
return children.get(0);
}
public Operator visitPathsUsingTemplateScan(
PathsUsingTemplateScanNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaQueryScanOperator.class.getSimpleName());
return new SchemaQueryScanOperator<>(
node.getPlanNodeId(),
operatorContext,
SchemaSourceFactory.getPathsUsingTemplateSource(
node.getPathPatternList(), node.getTemplateId(), node.getScope()));
}
public Operator visitLogicalViewSchemaScan(
LogicalViewSchemaScanNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaQueryScanOperator.class.getSimpleName());
return new SchemaQueryScanOperator<>(
node.getPlanNodeId(),
operatorContext,
SchemaSourceFactory.getLogicalViewSchemaSource(
node.getPath(),
node.getLimit(),
node.getOffset(),
node.getSchemaFilter(),
node.getScope()));
}
public List<Operator> dealWithConsumeAllChildrenPipelineBreaker(
PlanNode node, LocalExecutionPlanContext context) {
// children after pipelining
List<Operator> parentPipelineChildren = new ArrayList<>();
if (context.getDegreeOfParallelism() == 1 || node.getChildren().size() == 1) {
// If dop = 1, we don't create extra pipeline
for (PlanNode localChild : node.getChildren()) {
Operator childOperation = localChild.accept(this, context);
parentPipelineChildren.add(childOperation);
}
} else {
int finalExchangeNum = context.getExchangeSumNum();
// Keep it since we may change the structure of origin children nodes
List<PlanNode> afterwardsNodes = new ArrayList<>();
// 1. Calculate localChildren size
int localChildrenSize = 0;
int firstChildIndex = -1;
for (int i = 0; i < node.getChildren().size(); i++) {
if (!(node.getChildren().get(i) instanceof ExchangeNode)) {
localChildrenSize++;
firstChildIndex = firstChildIndex == -1 ? i : firstChildIndex;
// deal with exchangeNode at head
} else if (firstChildIndex == -1) {
Operator childOperation = node.getChildren().get(i).accept(this, context);
finalExchangeNum += 1;
parentPipelineChildren.add(childOperation);
afterwardsNodes.add(node.getChildren().get(i));
}
}
if (firstChildIndex == -1) {
context.setExchangeSumNum(finalExchangeNum);
return parentPipelineChildren;
}
// If dop > localChildrenSize + 1, we can allocate extra dop to child node
// Extra dop = dop - localChildrenSize, since dop = 1 means serial but not 0
int dopForChild = Math.max(1, context.getDegreeOfParallelism() - localChildrenSize);
// If dop > localChildrenSize, we create one new pipeline for each child
if (context.getDegreeOfParallelism() > localChildrenSize) {
for (int i = firstChildIndex; i < node.getChildren().size(); i++) {
PlanNode childNode = node.getChildren().get(i);
if (childNode instanceof ExchangeNode) {
Operator childOperation = childNode.accept(this, context);
finalExchangeNum += 1;
parentPipelineChildren.add(childOperation);
} else {
LocalExecutionPlanContext subContext = context.createSubContext();
subContext.setDegreeOfParallelism(dopForChild);
int originPipeNum = context.getPipelineNumber();
Operator sourceOperator = createNewPipelineForChildNode(context, subContext, childNode);
parentPipelineChildren.add(sourceOperator);
dopForChild =
Math.max(1, dopForChild - (subContext.getPipelineNumber() - 1 - originPipeNum));
finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
}
}
} else {
// If dop <= localChildrenSize, we have to divide every childNumInEachPipeline localChildren
// to different pipeline
int[] childNumInEachPipeline =
getChildNumInEachPipeline(
node.getChildren(), localChildrenSize, context.getDegreeOfParallelism());
int childGroupNum = Math.min(context.getDegreeOfParallelism(), localChildrenSize);
int startIndex;
int endIndex = firstChildIndex;
for (int i = 0; i < childGroupNum; i++) {
startIndex = endIndex;
endIndex += childNumInEachPipeline[i];
// Only if dop >= size(children) + 1, split all children to new pipeline
// Otherwise, the first group will belong to the parent pipeline
if (i == 0) {
for (int j = startIndex; j < endIndex; j++) {
context.setDegreeOfParallelism(1);
Operator childOperation = node.getChildren().get(j).accept(this, context);
parentPipelineChildren.add(childOperation);
afterwardsNodes.add(node.getChildren().get(j));
}
continue;
}
LocalExecutionPlanContext subContext = context.createSubContext();
subContext.setDegreeOfParallelism(1);
// Create partial parent operator for children
PlanNode partialParentNode;
if (endIndex - startIndex == 1) {
partialParentNode = node.getChildren().get(startIndex);
} else {
// PartialParentNode is equals to parentNode except children
partialParentNode = node.createSubNode(i, startIndex, endIndex);
}
Operator sourceOperator =
createNewPipelineForChildNode(context, subContext, partialParentNode);
parentPipelineChildren.add(sourceOperator);
afterwardsNodes.add(partialParentNode);
finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
}
if (node instanceof MultiChildProcessNode) {
((MultiChildProcessNode) node).setChildren(afterwardsNodes);
} else if (node instanceof TwoChildProcessNode) {
checkState(afterwardsNodes.size() == 2);
((TwoChildProcessNode) node).setLeftChild(afterwardsNodes.get(0));
((TwoChildProcessNode) node).setRightChild(afterwardsNodes.get(1));
} else {
throw new IllegalArgumentException("Unknown node type: " + node.getClass().getName());
}
}
context.setExchangeSumNum(finalExchangeNum);
}
return parentPipelineChildren;
}
/**
* Now, we allocate children to each pipeline as average as possible. For example, 5 children with
* 3 dop, the children group will be [1, 2, 2]. After we can estimate the workload of each
* operator, maybe we can allocate based on workload rather than child number.
*
* <p>If child is ExchangeNode, it won't affect the children number of current group.
*
* <p>This method can only be invoked when dop <= localChildrenSize.
*/
public int[] getChildNumInEachPipeline(
List<PlanNode> allChildren, int localChildrenSize, int dop) {
int maxPipelineNum = Math.min(localChildrenSize, dop);
int[] childNumInEachPipeline = new int[maxPipelineNum];
int avgChildNum = Math.max(1, localChildrenSize / dop);
// allocate remaining child to group from splitIndex
int splitIndex = maxPipelineNum - localChildrenSize % dop;
int childIndex = 0;
// Skip ExchangeNode at head
while (childIndex < allChildren.size() && allChildren.get(childIndex) instanceof ExchangeNode) {
childIndex++;
}
int pipelineIndex = 0;
while (pipelineIndex < maxPipelineNum) {
int childNum = pipelineIndex < splitIndex ? avgChildNum : avgChildNum + 1;
int originChildIndex = childIndex;
while (childNum >= 0 && childIndex < allChildren.size()) {
if (!(allChildren.get(childIndex) instanceof ExchangeNode)) {
childNum--;
// Try to keep the first of a pipeline is not a ExchangeNode
if (childNum == -1) {
childIndex--;
}
}
childIndex++;
}
childNumInEachPipeline[pipelineIndex++] = childIndex - originChildIndex;
}
return childNumInEachPipeline;
}
private Operator createNewPipelineForChildNode(
LocalExecutionPlanContext context, LocalExecutionPlanContext subContext, PlanNode childNode) {
Operator childOperation = childNode.accept(this, subContext);
ISinkChannel localSinkChannel =
MPP_DATA_EXCHANGE_MANAGER.createLocalSinkChannelForPipeline(
// Attention, there is no parent node, use first child node instead
subContext.getDriverContext(), childNode.getPlanNodeId().getId());
subContext.setISink(localSinkChannel);
subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext(), 0);
ExchangeOperator sourceOperator =
new ExchangeOperator(
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(), null, ExchangeOperator.class.getSimpleName()),
MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
((LocalSinkChannel) localSinkChannel).getSharedTsBlockQueue(),
context.getDriverContext()),
childNode.getPlanNodeId(),
childOperation.calculateMaxReturnSize());
context.addExchangeOperator(sourceOperator);
return sourceOperator;
}
public List<Operator> dealWithConsumeChildrenOneByOneNode(
PlanNode node, LocalExecutionPlanContext context) {
List<Operator> parentPipelineChildren = new ArrayList<>();
int originExchangeNum = context.getExchangeSumNum();
int finalExchangeNum = context.getExchangeSumNum();
// 1. divide every child to pipeline using the max dop
if (context.getDegreeOfParallelism() == 1 || node.getChildren().size() == 1) {
// If dop = 1, we don't create extra pipeline
for (PlanNode childSource : node.getChildren()) {
Operator childOperation = childSource.accept(this, context);
finalExchangeNum = Math.max(finalExchangeNum, context.getExchangeSumNum());
context.setExchangeSumNum(originExchangeNum);
parentPipelineChildren.add(childOperation);
}
} else {
List<Integer> childPipelineNums = new ArrayList<>();
List<Integer> childExchangeNums = new ArrayList<>();
int sumOfChildPipelines = 0;
int sumOfChildExchangeNums = 0;
int dependencyChildNode = 0;
int dependencyPipeId = 0;
for (PlanNode childNode : node.getChildren()) {
if (childNode instanceof ExchangeNode) {
Operator childOperation = childNode.accept(this, context);
finalExchangeNum = Math.max(finalExchangeNum, context.getExchangeSumNum());
context.setExchangeSumNum(originExchangeNum);
parentPipelineChildren.add(childOperation);
} else {
LocalExecutionPlanContext subContext = context.createSubContext();
// Only context.getDegreeOfParallelism() - 1 can be allocated to child
int dopForChild = context.getDegreeOfParallelism() - 1;
subContext.setDegreeOfParallelism(dopForChild);
int originPipeNum = context.getPipelineNumber();
Operator childOperation = childNode.accept(this, subContext);
ISinkChannel localSinkChannel =
MPP_DATA_EXCHANGE_MANAGER.createLocalSinkChannelForPipeline(
// Attention, there is no parent node, use first child node instead
context.getDriverContext(), childNode.getPlanNodeId().getId());
subContext.setISink(localSinkChannel);
subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext(), 0);
// OneByOneChild may be divided into more than dop pipelines, but the number of running
// actually is dop
int curChildPipelineNum =
Math.min(dopForChild, subContext.getPipelineNumber() - originPipeNum);
childPipelineNums.add(curChildPipelineNum);
sumOfChildPipelines += curChildPipelineNum;
// If sumOfChildPipelines > dopForChild, we have to wait until some pipelines finish
if (sumOfChildPipelines > dopForChild) {
// Update dependencyPipeId, after which finishes we can submit curChildPipeline
while (sumOfChildPipelines > dopForChild) {
sumOfChildPipelines -= childPipelineNums.get(dependencyChildNode);
// The dependency pipeline must be a parent pipeline rather than a child pipeline
dependencyPipeId = context.getPipelineNumber() - sumOfChildPipelines - 1;
sumOfChildExchangeNums -= childExchangeNums.get(dependencyChildNode);
dependencyChildNode++;
}
}
// Add dependency for all pipelines under current node
if (dependencyChildNode != 0) {
for (int i = originPipeNum; i < subContext.getPipelineNumber(); i++) {
context.getPipelineDriverFactories().get(i).setDependencyPipeline(dependencyPipeId);
}
}
ExchangeOperator sourceOperator =
new ExchangeOperator(
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
null,
ExchangeOperator.class.getSimpleName()),
MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
((LocalSinkChannel) localSinkChannel).getSharedTsBlockQueue(),
context.getDriverContext()),
childNode.getPlanNodeId(),
childOperation.calculateMaxReturnSize());
context.getCurrentPipelineDriverFactory().setDownstreamOperator(sourceOperator);
parentPipelineChildren.add(sourceOperator);
context.addExchangeOperator(sourceOperator);
int childExchangeNum = subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
sumOfChildExchangeNums += childExchangeNum;
childExchangeNums.add(childExchangeNum);
finalExchangeNum =
Math.max(finalExchangeNum, context.getExchangeSumNum() + sumOfChildExchangeNums);
}
}
}
context.setExchangeSumNum(finalExchangeNum);
return parentPipelineChildren;
}
@Override
public Operator visitColumnInject(ColumnInjectNode node, LocalExecutionPlanContext context) {
final OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
ColumnInjectNode.class.getSimpleName());
Operator childOperator = node.getChild().accept(this, context);
ColumnGeneratorParameter parameter = node.getColumnGeneratorParameter();
ColumnGenerator columnGenerator = genColumnGeneratorAccordingToParameter(parameter);
long maxExtraColumnSize = 0;
for (TSDataType dataType : node.getGeneratedColumnTypes()) {
maxExtraColumnSize += getOutputColumnSizePerLine(dataType);
}
maxExtraColumnSize *= TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
return new ColumnInjectOperator(
operatorContext, childOperator, columnGenerator, node.getTargetIndex(), maxExtraColumnSize);
}
private ColumnGenerator genColumnGeneratorAccordingToParameter(
ColumnGeneratorParameter columnGeneratorParameter) {
ColumnGeneratorType type = columnGeneratorParameter.getGeneratorType();
if (type == ColumnGeneratorType.SLIDING_TIME) {
SlidingTimeColumnGeneratorParameter slidingTimeColumnGeneratorParameter =
(SlidingTimeColumnGeneratorParameter) columnGeneratorParameter;
return new SlidingTimeColumnGenerator(
initTimeRangeIterator(
slidingTimeColumnGeneratorParameter.getGroupByTimeParameter(),
slidingTimeColumnGeneratorParameter.isAscending(),
false));
} else {
throw new UnsupportedOperationException("Unsupported column generator type: " + type);
}
}
@Override
public Operator visitExplainAnalyze(ExplainAnalyzeNode node, LocalExecutionPlanContext context) {
Operator operator = node.getChild().accept(this, context);
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
ExplainAnalyzeOperator.class.getSimpleName());
return new ExplainAnalyzeOperator(
operatorContext, operator, node.getQueryId(), node.isVerbose(), node.getTimeout());
}
}