blob: 4c9a579b174f866085cb0661eaadea049c13153c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.mpp.plan.planner;
import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.LevelTimeSeriesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodeManagementMemoryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsConvertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.NodePathsSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.PathsUsingTemplateScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import com.google.common.base.Function;
import org.apache.commons.lang.Validate;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant.COLUMN_DEVICE;
public class LogicalPlanBuilder {
private PlanNode root;
private final MPPQueryContext context;
private final Function<Expression, TSDataType> getPreAnalyzedType;
public LogicalPlanBuilder(Analysis analysis, MPPQueryContext context) {
this.getPreAnalyzedType = analysis::getType;
this.context = context;
}
public PlanNode getRoot() {
return root;
}
public LogicalPlanBuilder withNewRoot(PlanNode newRoot) {
this.root = newRoot;
return this;
}
private void updateTypeProvider(Collection<Expression> expressions) {
expressions.forEach(
expression -> {
if (!expression.getExpressionString().equals(COLUMN_DEVICE)) {
context
.getTypeProvider()
.setType(expression.toString(), getPreAnalyzedType.apply(expression));
}
});
}
public LogicalPlanBuilder planRawDataSource(
Set<Expression> sourceExpressions, Ordering scanOrder, Filter timeFilter) {
List<PlanNode> sourceNodeList = new ArrayList<>();
List<PartialPath> selectedPaths =
sourceExpressions.stream()
.map(expression -> ((TimeSeriesOperand) expression).getPath())
.collect(Collectors.toList());
List<PartialPath> groupedPaths = MetaUtils.groupAlignedSeries(selectedPaths);
for (PartialPath path : groupedPaths) {
if (path instanceof MeasurementPath) { // non-aligned series
SeriesScanNode seriesScanNode =
new SeriesScanNode(
context.getQueryId().genPlanNodeId(), (MeasurementPath) path, scanOrder);
seriesScanNode.setTimeFilter(timeFilter);
sourceNodeList.add(seriesScanNode);
} else if (path instanceof AlignedPath) { // aligned series
AlignedSeriesScanNode alignedSeriesScanNode =
new AlignedSeriesScanNode(
context.getQueryId().genPlanNodeId(), (AlignedPath) path, scanOrder);
alignedSeriesScanNode.setTimeFilter(timeFilter);
sourceNodeList.add(alignedSeriesScanNode);
} else {
throw new IllegalArgumentException("unexpected path type");
}
}
updateTypeProvider(sourceExpressions);
this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
return this;
}
public LogicalPlanBuilder planLast(
Set<Expression> sourceExpressions,
Filter globalTimeFilter,
OrderByParameter mergeOrderParameter) {
List<PlanNode> sourceNodeList = new ArrayList<>();
for (Expression sourceExpression : sourceExpressions) {
MeasurementPath selectPath =
(MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath();
if (selectPath.isUnderAlignedEntity()) {
sourceNodeList.add(
new AlignedLastQueryScanNode(
context.getQueryId().genPlanNodeId(), new AlignedPath(selectPath)));
} else {
sourceNodeList.add(new LastQueryScanNode(context.getQueryId().genPlanNodeId(), selectPath));
}
}
updateTypeProvider(sourceExpressions);
this.root =
new LastQueryNode(
context.getQueryId().genPlanNodeId(),
sourceNodeList,
globalTimeFilter,
mergeOrderParameter);
ColumnHeaderConstant.lastQueryColumnHeaders.forEach(
columnHeader ->
context
.getTypeProvider()
.setType(columnHeader.getColumnName(), columnHeader.getColumnType()));
return this;
}
public LogicalPlanBuilder planAggregationSource(
Set<Expression> sourceExpressions,
AggregationStep curStep,
Ordering scanOrder,
Filter timeFilter,
GroupByTimeParameter groupByTimeParameter,
Set<Expression> aggregationExpressions,
Set<Expression> aggregationTransformExpressions,
Map<Expression, Set<Expression>> groupByLevelExpressions) {
boolean needCheckAscending = groupByTimeParameter == null;
Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>();
Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>();
for (Expression sourceExpression : sourceExpressions) {
createAggregationDescriptor(
(FunctionExpression) sourceExpression,
curStep,
scanOrder,
needCheckAscending,
ascendingAggregations,
descendingAggregations);
}
List<PlanNode> sourceNodeList =
constructSourceNodeFromAggregationDescriptors(
ascendingAggregations,
descendingAggregations,
scanOrder,
timeFilter,
groupByTimeParameter);
updateTypeProvider(sourceExpressions);
updateTypeProvider(aggregationTransformExpressions);
return convergeAggregationSource(
sourceNodeList,
curStep,
scanOrder,
groupByTimeParameter,
aggregationExpressions,
groupByLevelExpressions);
}
public LogicalPlanBuilder planAggregationSourceWithIndexAdjust(
Set<Expression> sourceExpressions,
AggregationStep curStep,
Ordering scanOrder,
Filter timeFilter,
GroupByTimeParameter groupByTimeParameter,
List<Integer> measurementIndexes,
Set<Expression> aggregationExpressions,
Set<Expression> aggregationTransformExpressions,
Map<Expression, Set<Expression>> groupByLevelExpressions) {
checkArgument(
sourceExpressions.size() == measurementIndexes.size(),
"Each aggregate should correspond to a column of output.");
boolean needCheckAscending = groupByTimeParameter == null;
Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>();
Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>();
Map<AggregationDescriptor, Integer> aggregationToMeasurementIndexMap = new HashMap<>();
int index = 0;
for (Expression sourceExpression : sourceExpressions) {
AggregationDescriptor aggregationDescriptor =
createAggregationDescriptor(
(FunctionExpression) sourceExpression,
curStep,
scanOrder,
needCheckAscending,
ascendingAggregations,
descendingAggregations);
aggregationToMeasurementIndexMap.put(aggregationDescriptor, measurementIndexes.get(index));
index++;
}
List<PlanNode> sourceNodeList =
constructSourceNodeFromAggregationDescriptors(
ascendingAggregations,
descendingAggregations,
scanOrder,
timeFilter,
groupByTimeParameter);
updateTypeProvider(sourceExpressions);
updateTypeProvider(aggregationTransformExpressions);
if (!curStep.isOutputPartial()) {
// update measurementIndexes
measurementIndexes.clear();
measurementIndexes.addAll(
sourceNodeList.stream()
.map(
planNode ->
((SeriesAggregationSourceNode) planNode).getAggregationDescriptorList())
.flatMap(List::stream)
.map(aggregationToMeasurementIndexMap::get)
.collect(Collectors.toList()));
}
return convergeAggregationSource(
sourceNodeList,
curStep,
scanOrder,
groupByTimeParameter,
aggregationExpressions,
groupByLevelExpressions);
}
private AggregationDescriptor createAggregationDescriptor(
FunctionExpression sourceExpression,
AggregationStep curStep,
Ordering scanOrder,
boolean needCheckAscending,
Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
Map<PartialPath, List<AggregationDescriptor>> descendingAggregations) {
AggregationDescriptor aggregationDescriptor =
new AggregationDescriptor(
sourceExpression.getFunctionName(), curStep, sourceExpression.getExpressions());
if (curStep.isOutputPartial()) {
updateTypeProviderByPartialAggregation(aggregationDescriptor, context.getTypeProvider());
}
PartialPath selectPath =
((TimeSeriesOperand) sourceExpression.getExpressions().get(0)).getPath();
if (!needCheckAscending
|| SchemaUtils.isConsistentWithScanOrder(
aggregationDescriptor.getAggregationType(), scanOrder)) {
ascendingAggregations
.computeIfAbsent(selectPath, key -> new ArrayList<>())
.add(aggregationDescriptor);
} else {
descendingAggregations
.computeIfAbsent(selectPath, key -> new ArrayList<>())
.add(aggregationDescriptor);
}
return aggregationDescriptor;
}
private List<PlanNode> constructSourceNodeFromAggregationDescriptors(
Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
Map<PartialPath, List<AggregationDescriptor>> descendingAggregations,
Ordering scanOrder,
Filter timeFilter,
GroupByTimeParameter groupByTimeParameter) {
List<PlanNode> sourceNodeList = new ArrayList<>();
boolean needCheckAscending = groupByTimeParameter == null;
Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations =
MetaUtils.groupAlignedAggregations(ascendingAggregations);
for (Map.Entry<PartialPath, List<AggregationDescriptor>> pathAggregationsEntry :
groupedAscendingAggregations.entrySet()) {
sourceNodeList.add(
createAggregationScanNode(
pathAggregationsEntry.getKey(),
pathAggregationsEntry.getValue(),
scanOrder,
groupByTimeParameter,
timeFilter));
}
if (needCheckAscending) {
Map<PartialPath, List<AggregationDescriptor>> groupedDescendingAggregations =
MetaUtils.groupAlignedAggregations(descendingAggregations);
for (Map.Entry<PartialPath, List<AggregationDescriptor>> pathAggregationsEntry :
groupedDescendingAggregations.entrySet()) {
sourceNodeList.add(
createAggregationScanNode(
pathAggregationsEntry.getKey(),
pathAggregationsEntry.getValue(),
scanOrder.reverse(),
null,
timeFilter));
}
}
return sourceNodeList;
}
private LogicalPlanBuilder convergeAggregationSource(
List<PlanNode> sourceNodeList,
AggregationStep curStep,
Ordering scanOrder,
GroupByTimeParameter groupByTimeParameter,
Set<Expression> aggregationExpressions,
Map<Expression, Set<Expression>> groupByLevelExpressions) {
if (curStep.isOutputPartial()) {
if (groupByTimeParameter != null && groupByTimeParameter.hasOverlap()) {
curStep =
groupByLevelExpressions != null ? AggregationStep.INTERMEDIATE : AggregationStep.FINAL;
this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
this.root =
createSlidingWindowAggregationNode(
this.getRoot(), aggregationExpressions, groupByTimeParameter, curStep, scanOrder);
if (groupByLevelExpressions != null) {
curStep = AggregationStep.FINAL;
this.root =
createGroupByTLevelNode(
Collections.singletonList(this.getRoot()),
groupByLevelExpressions,
curStep,
groupByTimeParameter,
scanOrder);
}
} else {
if (groupByLevelExpressions != null) {
curStep = AggregationStep.FINAL;
this.root =
createGroupByTLevelNode(
sourceNodeList,
groupByLevelExpressions,
curStep,
groupByTimeParameter,
scanOrder);
}
}
} else {
this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
}
return this;
}
public static void updateTypeProviderByPartialAggregation(
AggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) {
List<AggregationType> splitAggregations =
SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType());
PartialPath path =
((TimeSeriesOperand) aggregationDescriptor.getInputExpressions().get(0)).getPath();
for (AggregationType aggregationType : splitAggregations) {
String functionName = aggregationType.toString().toLowerCase();
typeProvider.setType(
String.format("%s(%s)", functionName, path.getFullPath()),
SchemaUtils.getSeriesTypeByPath(path, functionName));
}
}
public static void updateTypeProviderByPartialAggregation(
GroupByLevelDescriptor aggregationDescriptor, TypeProvider typeProvider) {
List<AggregationType> splitAggregations =
SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType());
PartialPath path = ((TimeSeriesOperand) aggregationDescriptor.getOutputExpression()).getPath();
for (AggregationType aggregationType : splitAggregations) {
String functionName = aggregationType.toString().toLowerCase();
typeProvider.setType(
String.format("%s(%s)", functionName, path.getFullPath()),
SchemaUtils.getSeriesTypeByPath(path, functionName));
}
}
private PlanNode convergeWithTimeJoin(List<PlanNode> sourceNodes, Ordering mergeOrder) {
PlanNode tmpNode;
if (sourceNodes.size() == 1) {
tmpNode = sourceNodes.get(0);
} else {
tmpNode = new TimeJoinNode(context.getQueryId().genPlanNodeId(), mergeOrder, sourceNodes);
}
return tmpNode;
}
public LogicalPlanBuilder planDeviceView(
Map<String, PlanNode> deviceNameToSourceNodesMap,
Set<Expression> deviceViewOutput,
Map<String, List<Integer>> deviceToMeasurementIndexesMap,
Ordering mergeOrder) {
List<String> outputColumnNames =
deviceViewOutput.stream().map(Expression::getExpressionString).collect(Collectors.toList());
DeviceViewNode deviceViewNode =
new DeviceViewNode(
context.getQueryId().genPlanNodeId(),
new OrderByParameter(
Arrays.asList(
new SortItem(SortKey.DEVICE, Ordering.ASC),
new SortItem(SortKey.TIME, mergeOrder))),
outputColumnNames,
deviceToMeasurementIndexesMap);
for (Map.Entry<String, PlanNode> entry : deviceNameToSourceNodesMap.entrySet()) {
String deviceName = entry.getKey();
PlanNode subPlan = entry.getValue();
deviceViewNode.addChildDeviceNode(deviceName, subPlan);
}
context.getTypeProvider().setType(COLUMN_DEVICE, TSDataType.TEXT);
updateTypeProvider(deviceViewOutput);
this.root = deviceViewNode;
return this;
}
public LogicalPlanBuilder planGroupByLevel(
Map<Expression, Set<Expression>> groupByLevelExpressions,
AggregationStep curStep,
GroupByTimeParameter groupByTimeParameter,
Ordering scanOrder) {
if (groupByLevelExpressions == null) {
return this;
}
this.root =
createGroupByTLevelNode(
Collections.singletonList(this.getRoot()),
groupByLevelExpressions,
curStep,
groupByTimeParameter,
scanOrder);
return this;
}
public LogicalPlanBuilder planAggregation(
Set<Expression> aggregationExpressions,
GroupByTimeParameter groupByTimeParameter,
AggregationStep curStep,
Ordering scanOrder) {
if (aggregationExpressions == null) {
return this;
}
List<AggregationDescriptor> aggregationDescriptorList =
constructAggregationDescriptorList(aggregationExpressions, curStep);
updateTypeProvider(aggregationExpressions);
if (curStep.isOutputPartial()) {
aggregationDescriptorList.forEach(
aggregationDescriptor ->
updateTypeProviderByPartialAggregation(
aggregationDescriptor, context.getTypeProvider()));
}
this.root =
new AggregationNode(
context.getQueryId().genPlanNodeId(),
Collections.singletonList(this.getRoot()),
aggregationDescriptorList,
groupByTimeParameter,
scanOrder);
return this;
}
public LogicalPlanBuilder planSlidingWindowAggregation(
Set<Expression> aggregationExpressions,
GroupByTimeParameter groupByTimeParameter,
AggregationStep curStep,
Ordering scanOrder) {
if (aggregationExpressions == null) {
return this;
}
this.root =
createSlidingWindowAggregationNode(
this.getRoot(), aggregationExpressions, groupByTimeParameter, curStep, scanOrder);
return this;
}
private PlanNode createSlidingWindowAggregationNode(
PlanNode child,
Set<Expression> aggregationExpressions,
GroupByTimeParameter groupByTimeParameter,
AggregationStep curStep,
Ordering scanOrder) {
List<AggregationDescriptor> aggregationDescriptorList =
constructAggregationDescriptorList(aggregationExpressions, curStep);
return new SlidingWindowAggregationNode(
context.getQueryId().genPlanNodeId(),
child,
aggregationDescriptorList,
groupByTimeParameter,
scanOrder);
}
private PlanNode createGroupByTLevelNode(
List<PlanNode> children,
Map<Expression, Set<Expression>> groupByLevelExpressions,
AggregationStep curStep,
GroupByTimeParameter groupByTimeParameter,
Ordering scanOrder) {
List<GroupByLevelDescriptor> groupByLevelDescriptors = new ArrayList<>();
for (Expression groupedExpression : groupByLevelExpressions.keySet()) {
groupByLevelDescriptors.add(
new GroupByLevelDescriptor(
((FunctionExpression) groupedExpression).getFunctionName(),
curStep,
groupByLevelExpressions.get(groupedExpression).stream()
.map(Expression::getExpressions)
.flatMap(List::stream)
.collect(Collectors.toList()),
groupedExpression.getExpressions().get(0)));
}
updateTypeProvider(groupByLevelExpressions.keySet());
updateTypeProvider(
groupByLevelDescriptors.stream()
.map(GroupByLevelDescriptor::getOutputExpression)
.collect(Collectors.toList()));
return new GroupByLevelNode(
context.getQueryId().genPlanNodeId(),
children,
groupByLevelDescriptors,
groupByTimeParameter,
scanOrder);
}
private SeriesAggregationSourceNode createAggregationScanNode(
PartialPath selectPath,
List<AggregationDescriptor> aggregationDescriptorList,
Ordering scanOrder,
GroupByTimeParameter groupByTimeParameter,
Filter timeFilter) {
if (selectPath instanceof MeasurementPath) { // non-aligned series
SeriesAggregationScanNode seriesAggregationScanNode =
new SeriesAggregationScanNode(
context.getQueryId().genPlanNodeId(),
(MeasurementPath) selectPath,
aggregationDescriptorList,
scanOrder,
groupByTimeParameter);
seriesAggregationScanNode.setTimeFilter(timeFilter);
return seriesAggregationScanNode;
} else if (selectPath instanceof AlignedPath) { // aligned series
AlignedSeriesAggregationScanNode alignedSeriesAggregationScanNode =
new AlignedSeriesAggregationScanNode(
context.getQueryId().genPlanNodeId(),
(AlignedPath) selectPath,
aggregationDescriptorList,
scanOrder,
groupByTimeParameter);
alignedSeriesAggregationScanNode.setTimeFilter(timeFilter);
return alignedSeriesAggregationScanNode;
} else {
throw new IllegalArgumentException("unexpected path type");
}
}
private List<AggregationDescriptor> constructAggregationDescriptorList(
Set<Expression> aggregationExpressions, AggregationStep curStep) {
return aggregationExpressions.stream()
.map(
expression -> {
Validate.isTrue(expression instanceof FunctionExpression);
return new AggregationDescriptor(
((FunctionExpression) expression).getFunctionName(),
curStep,
expression.getExpressions());
})
.collect(Collectors.toList());
}
public LogicalPlanBuilder planFilterAndTransform(
Expression queryFilter,
Set<Expression> selectExpressions,
boolean isGroupByTime,
ZoneId zoneId,
Ordering scanOrder) {
if (queryFilter == null || selectExpressions.isEmpty()) {
return this;
}
this.root =
new FilterNode(
context.getQueryId().genPlanNodeId(),
this.getRoot(),
selectExpressions.toArray(new Expression[0]),
queryFilter,
isGroupByTime,
zoneId,
scanOrder);
updateTypeProvider(selectExpressions);
return this;
}
public LogicalPlanBuilder planTransform(
Set<Expression> transformExpressions,
boolean isGroupByTime,
ZoneId zoneId,
Ordering scanOrder) {
boolean needTransform = false;
for (Expression expression : transformExpressions) {
if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
needTransform = true;
break;
}
}
if (!needTransform) {
return this;
}
this.root =
new TransformNode(
context.getQueryId().genPlanNodeId(),
this.getRoot(),
transformExpressions.toArray(new Expression[0]),
isGroupByTime,
zoneId,
scanOrder);
updateTypeProvider(transformExpressions);
return this;
}
public LogicalPlanBuilder planFill(FillDescriptor fillDescriptor, Ordering scanOrder) {
if (fillDescriptor == null) {
return this;
}
this.root =
new FillNode(
context.getQueryId().genPlanNodeId(), this.getRoot(), fillDescriptor, scanOrder);
return this;
}
public LogicalPlanBuilder planLimit(int rowLimit) {
if (rowLimit == 0) {
return this;
}
this.root = new LimitNode(context.getQueryId().genPlanNodeId(), this.getRoot(), rowLimit);
return this;
}
public LogicalPlanBuilder planOffset(int rowOffset) {
if (rowOffset == 0) {
return this;
}
this.root = new OffsetNode(context.getQueryId().genPlanNodeId(), this.getRoot(), rowOffset);
return this;
}
/** Meta Query* */
public LogicalPlanBuilder planTimeSeriesSchemaSource(
PartialPath pathPattern,
String key,
String value,
int limit,
int offset,
boolean orderByHeat,
boolean contains,
boolean prefixPath,
Map<Integer, Template> templateMap) {
this.root =
new TimeSeriesSchemaScanNode(
context.getQueryId().genPlanNodeId(),
pathPattern,
key,
value,
limit,
offset,
orderByHeat,
contains,
prefixPath,
templateMap);
return this;
}
public LogicalPlanBuilder planDeviceSchemaSource(
PartialPath pathPattern, int limit, int offset, boolean prefixPath, boolean hasSgCol) {
this.root =
new DevicesSchemaScanNode(
context.getQueryId().genPlanNodeId(), pathPattern, limit, offset, prefixPath, hasSgCol);
return this;
}
public LogicalPlanBuilder planSchemaQueryMerge(boolean orderByHeat) {
SchemaQueryMergeNode schemaMergeNode =
new SchemaQueryMergeNode(context.getQueryId().genPlanNodeId(), orderByHeat);
schemaMergeNode.addChild(this.getRoot());
this.root = schemaMergeNode;
return this;
}
public LogicalPlanBuilder planSchemaQueryOrderByHeat(PlanNode lastPlanNode) {
SchemaQueryOrderByHeatNode node =
new SchemaQueryOrderByHeatNode(context.getQueryId().genPlanNodeId());
node.addChild(this.getRoot());
node.addChild(lastPlanNode);
this.root = node;
return this;
}
public LogicalPlanBuilder planSchemaFetchMerge(List<String> storageGroupList) {
this.root = new SchemaFetchMergeNode(context.getQueryId().genPlanNodeId(), storageGroupList);
return this;
}
@SuppressWarnings({"checkstyle:Indentation", "checkstyle:CommentsIndentation"})
public LogicalPlanBuilder planSchemaFetchSource(
List<String> storageGroupList,
PathPatternTree patternTree,
Map<Integer, Template> templateMap) {
PartialPath storageGroupPath;
for (String storageGroup : storageGroupList) {
try {
storageGroupPath = new PartialPath(storageGroup);
PathPatternTree overlappedPatternTree = new PathPatternTree();
for (PartialPath pathPattern :
patternTree.getOverlappedPathPatterns(
storageGroupPath.concatNode(MULTI_LEVEL_PATH_WILDCARD))) {
// pathPattern has been deduplicated, no need to deduplicate again
overlappedPatternTree.appendFullPath(pathPattern);
}
this.root.addChild(
new SchemaFetchScanNode(
context.getQueryId().genPlanNodeId(),
storageGroupPath,
overlappedPatternTree,
templateMap));
} catch (IllegalPathException e) {
// definitely won't happen
throw new RuntimeException(e);
}
}
return this;
}
public LogicalPlanBuilder planCountMerge() {
CountSchemaMergeNode countMergeNode =
new CountSchemaMergeNode(context.getQueryId().genPlanNodeId());
countMergeNode.addChild(this.getRoot());
this.root = countMergeNode;
return this;
}
public LogicalPlanBuilder planDevicesCountSource(PartialPath partialPath, boolean prefixPath) {
this.root = new DevicesCountNode(context.getQueryId().genPlanNodeId(), partialPath, prefixPath);
return this;
}
public LogicalPlanBuilder planTimeSeriesCountSource(
PartialPath partialPath,
boolean prefixPath,
String key,
String value,
boolean isContains,
Map<Integer, Template> templateMap) {
this.root =
new TimeSeriesCountNode(
context.getQueryId().genPlanNodeId(),
partialPath,
prefixPath,
key,
value,
isContains,
templateMap);
return this;
}
public LogicalPlanBuilder planLevelTimeSeriesCountSource(
PartialPath partialPath,
boolean prefixPath,
int level,
String key,
String value,
boolean isContains) {
this.root =
new LevelTimeSeriesCountNode(
context.getQueryId().genPlanNodeId(),
partialPath,
prefixPath,
level,
key,
value,
isContains);
return this;
}
public LogicalPlanBuilder planNodePathsSchemaSource(PartialPath partialPath, Integer level) {
this.root =
new NodePathsSchemaScanNode(context.getQueryId().genPlanNodeId(), partialPath, level);
return this;
}
public LogicalPlanBuilder planNodePathsConvert() {
NodePathsConvertNode nodePathsConvertNode =
new NodePathsConvertNode(context.getQueryId().genPlanNodeId());
nodePathsConvertNode.addChild(this.getRoot());
this.root = nodePathsConvertNode;
return this;
}
public LogicalPlanBuilder planNodePathsCount() {
NodePathsCountNode nodePathsCountNode =
new NodePathsCountNode(context.getQueryId().genPlanNodeId());
nodePathsCountNode.addChild(this.getRoot());
this.root = nodePathsCountNode;
return this;
}
public LogicalPlanBuilder planNodeManagementMemoryMerge(Set<TSchemaNode> data) {
NodeManagementMemoryMergeNode memorySourceNode =
new NodeManagementMemoryMergeNode(context.getQueryId().genPlanNodeId(), data);
memorySourceNode.addChild(this.getRoot());
this.root = memorySourceNode;
return this;
}
public LogicalPlanBuilder planPathsUsingTemplateSource(int templateId) {
this.root = new PathsUsingTemplateScanNode(context.getQueryId().genPlanNodeId(), templateId);
return this;
}
}