blob: 75222efb6fc99835e1551dbca0010f1a440d9eb6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.planner;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchSourceExpressions;
import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo.makeLayout;
import static org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanVisitor.pushDownLimitToScanNode;
/**
* This class provides accelerated implementation for multiple devices align by device query. This
* optimization is only used for devices set in only one template, using template can avoid many
* unnecessary judgements.
*/
public class TemplatedLogicalPlan {
private final Analysis analysis;
private final QueryStatement queryStatement;
private final MPPQueryContext context;
private final List<String> measurementList;
private final List<IMeasurementSchema> schemaList;
private final long limitValue;
private static final long OFFSET_VALUE = 0;
private final Expression whereExpression;
// to fix this query: `select s1 from root.** where s2>1 align by device`,
// while project measurements are [s1], but newMeasurements should be [s1,s2]
private List<String> newMeasurementList;
private List<IMeasurementSchema> newSchemaList;
private Map<String, List<InputLocation>> filterLayoutMap;
public TemplatedLogicalPlan(
Analysis analysis, QueryStatement queryStatement, MPPQueryContext context) {
this.analysis = analysis;
this.queryStatement = queryStatement;
this.context = context;
this.measurementList = analysis.getMeasurementList();
this.schemaList = analysis.getMeasurementSchemaList();
this.newMeasurementList = measurementList;
this.newSchemaList = schemaList;
this.limitValue = pushDownLimitToScanNode(queryStatement, analysis);
this.whereExpression = analysis.getWhereExpression();
// for align by device query with template, most used variables are same
initCommonVariables();
}
private void initCommonVariables() {
if (whereExpression != null) {
if (!analysis.isTemplateWildCardQuery()) {
newMeasurementList = new ArrayList<>(measurementList);
newSchemaList = new ArrayList<>(schemaList);
Set<String> selectExpressions = new HashSet<>(measurementList);
List<Expression> whereSourceExpressions = searchSourceExpressions(whereExpression);
for (Expression expression : whereSourceExpressions) {
if (expression instanceof TimeSeriesOperand) {
String measurement = ((TimeSeriesOperand) expression).getPath().getMeasurement();
if (!analysis.getDeviceTemplate().getSchemaMap().containsKey(measurement)) {
continue;
}
if (!selectExpressions.contains(measurement)) {
selectExpressions.add(measurement);
newMeasurementList.add(measurement);
newSchemaList.add(analysis.getDeviceTemplate().getSchema(measurement));
}
}
}
}
filterLayoutMap = makeLayout(newMeasurementList);
analysis
.getExpressionTypes()
.forEach(
(key, value) ->
context.getTypeProvider().setType(key.getNode().getOutputSymbol(), value));
}
context
.getTypeProvider()
.setTemplatedInfo(
new TemplatedInfo(
newMeasurementList,
newSchemaList,
newSchemaList.stream()
.map(IMeasurementSchema::getType)
.collect(Collectors.toList()),
queryStatement.getResultTimeOrder(),
analysis.isLastLevelUseWildcard(),
analysis.getDeviceViewOutputExpressions().stream()
.map(Expression::getExpressionString)
.collect(Collectors.toList()),
analysis.getDeviceViewInputIndexesMap().values().iterator().next(),
OFFSET_VALUE,
limitValue,
whereExpression,
queryStatement.isGroupByTime(),
analysis.getDeviceTemplate().getSchemaMap(),
filterLayoutMap,
null));
}
public PlanNode visitQuery() {
LogicalPlanBuilder planBuilder =
new TemplatedLogicalPlanBuilder(analysis, context, measurementList, schemaList);
Map<String, PlanNode> deviceToSubPlanMap = new LinkedHashMap<>();
for (PartialPath devicePath : analysis.getDeviceList()) {
String deviceName = devicePath.getFullPath();
PlanNode rootNode = visitQueryBody(devicePath);
LogicalPlanBuilder subPlanBuilder =
new TemplatedLogicalPlanBuilder(analysis, context, measurementList, schemaList)
.withNewRoot(rootNode);
// order by device, expression, push down sortOperator
if (queryStatement.needPushDownSort()) {
subPlanBuilder =
subPlanBuilder.planOrderBy(
analysis.getDeviceToOrderByExpressions().get(deviceName),
analysis.getDeviceToSortItems().get(deviceName));
}
deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot());
}
// convert to ALIGN BY DEVICE view
planBuilder =
planBuilder.planDeviceView(
deviceToSubPlanMap,
analysis.getDeviceViewOutputExpressions(),
analysis.getDeviceViewInputIndexesMap(),
analysis.getSelectExpressions(),
queryStatement,
analysis);
if (!queryStatement.needPushDownSort()) {
planBuilder = planBuilder.planOrderBy(queryStatement, analysis);
}
planBuilder =
planBuilder
.planFill(analysis.getFillDescriptor(), queryStatement.getResultTimeOrder())
.planOffset(queryStatement.getRowOffset());
if (!analysis.isUseTopKNode() || queryStatement.hasOffset()) {
planBuilder = planBuilder.planLimit(queryStatement.getRowLimit());
}
return planBuilder.getRoot();
}
public PlanNode visitQueryBody(PartialPath devicePath) {
TemplatedLogicalPlanBuilder planBuilder =
new TemplatedLogicalPlanBuilder(analysis, context, newMeasurementList, newSchemaList);
planBuilder =
planBuilder
.planRawDataSource(
devicePath,
queryStatement.getResultTimeOrder(),
OFFSET_VALUE,
limitValue,
analysis.isLastLevelUseWildcard())
.planFilter(
whereExpression,
queryStatement.isGroupByTime(),
queryStatement.getResultTimeOrder());
return planBuilder.getRoot();
}
}