blob: 4cd62aed0256c78eff917467ec0e2329f0921e8a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.optimization;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TwoChildProcessNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
import org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.tsfile.utils.TimeDuration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* <b>Optimization phase:</b> Distributed plan planning.
*
* <p><b>Rule:</b> The LIMIT OFFSET condition can be pushed down to the SeriesScanNode, when the
* following conditions are met:
* <li>Time series query (not aggregation query).
* <li>The query expressions are all scalar expression.
* <li>Functions that need to be calculated based on before or after values are not used, such as
* trend functions, FILL(previous), FILL(linear).
* <li>Only one scan node is included in the distributed plan. That is, only one single series or a
* group of series under an aligned device is queried, and all queried data is in one region.
*/
public class LimitOffsetPushDown implements PlanOptimizer {
@Override
public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext context) {
if (analysis.getStatement().getType() != StatementType.QUERY) {
return plan;
}
QueryStatement queryStatement = analysis.getQueryStatement();
if (queryStatement.isLastQuery()
|| queryStatement.isAggregationQuery()
|| (!queryStatement.hasLimit() && !queryStatement.hasOffset())) {
return plan;
}
return plan.accept(new Rewriter(), new RewriterContext(analysis));
}
private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> {
@Override
public PlanNode visitPlan(PlanNode node, RewriterContext context) {
PlanNode newNode = node.clone();
for (PlanNode child : node.getChildren()) {
context.setParent(node);
newNode.addChild(child.accept(this, context));
}
return newNode;
}
@Override
public PlanNode visitLimit(LimitNode node, RewriterContext context) {
context.setParent(node);
context.setLimit(node.getLimit());
node.setChild(node.getChild().accept(this, context));
if (context.isEnablePushDown()) {
return node.getChild();
}
return node;
}
@Override
public PlanNode visitOffset(OffsetNode node, RewriterContext context) {
context.setParent(node);
context.setOffset(node.getOffset());
node.setChild(node.getChild().accept(this, context));
if (context.isEnablePushDown()) {
return node.getChild();
}
return node;
}
@Override
public PlanNode visitFill(FillNode node, RewriterContext context) {
FillPolicy fillPolicy = node.getFillDescriptor().getFillPolicy();
if (fillPolicy == FillPolicy.VALUE) {
node.setChild(node.getChild().accept(this, context));
} else {
context.setEnablePushDown(false);
}
return node;
}
@Override
public PlanNode visitFilter(FilterNode node, RewriterContext context) {
// Value filtering push-down occurs during the logical planning phase. If there is still a
// FilterNode here, it means that there are read filter conditions that cannot be pushed
// down.
context.setEnablePushDown(false);
return node;
}
@Override
public PlanNode visitSort(SortNode node, RewriterContext context) {
// Limit/Offset in ORDER BY focus the sorted result, so it should not be pushed down.
context.setEnablePushDown(false);
return node;
}
@Override
public PlanNode visitTransform(TransformNode node, RewriterContext context) {
Expression[] outputExpressions = node.getOutputExpressions();
boolean enablePushDown = true;
for (Expression expression : outputExpressions) {
if (!ExpressionAnalyzer.checkIsScalarExpression(expression, context.getAnalysis())) {
enablePushDown = false;
break;
}
}
if (enablePushDown) {
node.setChild(node.getChild().accept(this, context));
} else {
context.setEnablePushDown(false);
}
return node;
}
@Override
public PlanNode visitMultiChildProcess(MultiChildProcessNode node, RewriterContext context) {
if (node.getChildren().size() == 1) {
PlanNode child = node.getChildren().get(0).accept(this, context);
node.setChildren(Collections.singletonList(child));
return node;
}
context.setEnablePushDown(false);
return node;
}
@Override
public PlanNode visitTwoChildProcess(TwoChildProcessNode node, RewriterContext context) {
context.setEnablePushDown(false);
return node;
}
@Override
public PlanNode visitLeftOuterTimeJoin(LeftOuterTimeJoinNode node, RewriterContext context) {
// TODO we may need to push limit and offset to left child
context.setEnablePushDown(false);
return node;
}
@Override
public PlanNode visitSeriesScan(SeriesScanNode node, RewriterContext context) {
if (context.isEnablePushDown()) {
node.setPushDownLimit(context.getLimit());
node.setPushDownOffset(context.getOffset());
}
return node;
}
@Override
public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, RewriterContext context) {
if (context.isEnablePushDown()) {
node.setPushDownLimit(context.getLimit());
node.setPushDownOffset(context.getOffset());
}
return node;
}
}
private static class RewriterContext {
private long limit;
private long offset;
private boolean enablePushDown = true;
private PlanNode parent;
private final Analysis analysis;
public RewriterContext(Analysis analysis) {
this.analysis = analysis;
}
public long getLimit() {
return limit;
}
public void setLimit(long limit) {
this.limit = limit;
}
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
public boolean isEnablePushDown() {
return enablePushDown;
}
public void setEnablePushDown(boolean enablePushDown) {
this.enablePushDown = enablePushDown;
}
public PlanNode getParent() {
return parent;
}
public void setParent(PlanNode parent) {
this.parent = parent;
}
public Analysis getAnalysis() {
return analysis;
}
}
// following methods are used to push down limit/offset in group by time
// 1. push down limit/offset to group by time in align by time
public static boolean canPushDownLimitOffsetToGroupByTime(QueryStatement queryStatement) {
if (queryStatement.isGroupByTime()
&& !queryStatement.isAlignByDevice()
&& !queryStatement.hasHaving()
&& !queryStatement.hasFill()) {
return !queryStatement.hasOrderBy() || queryStatement.isOrderByBasedOnTime();
}
return false;
}
private static void pushDownLimitOffsetToTimeParameterContainingMonth(
QueryStatement queryStatement) {
GroupByTimeComponent groupByTimeComponent = queryStatement.getGroupByTimeComponent();
long startTime = groupByTimeComponent.getStartTime();
long endTime = groupByTimeComponent.getEndTime();
TimeDuration slidingStep = groupByTimeComponent.getSlidingStep();
TimeDuration interval = groupByTimeComponent.getInterval();
long limitSize = queryStatement.getRowLimit();
long offsetSize = queryStatement.getRowOffset();
// Evaluate the day of month as 28 days
long totalStep = slidingStep.getMinTotalDuration(TimeUnit.MILLISECONDS);
long size = (endTime - startTime + totalStep - 1) / totalStep;
if (size > offsetSize) {
// ordering in group by month must be ascending
long newStartTime =
DateTimeUtils.calcPositiveIntervalByMonth(startTime, slidingStep.multiple(offsetSize));
if (limitSize != 0) {
endTime =
Math.min(
endTime,
DateTimeUtils.calcPositiveIntervalByMonth(
startTime,
calculateEndTimeDuration(slidingStep, interval, limitSize, offsetSize)));
}
groupByTimeComponent.setEndTime(endTime);
groupByTimeComponent.setStartTime(newStartTime);
} else {
// finish the query, resultSet is empty
queryStatement.setResultSetEmpty(true);
}
// If windows overlap, we need to keep LIMIT because the window size can be less than interval
// which may result in more windows than we need in the target time range.
queryStatement.setRowLimit(interval.isGreaterThan(slidingStep) ? limitSize : 0);
queryStatement.setRowOffset(0);
}
private static TimeDuration calculateEndTimeDuration(
TimeDuration slidingStep, TimeDuration interval, long limitSize, long offsetSize) {
long length = offsetSize + limitSize - 1;
// startTime + offsetSize * step + (limitSize - 1) * step + interval
int monthDuration = (int) (length * slidingStep.monthDuration + interval.monthDuration);
long nonMonthDuration = length * slidingStep.nonMonthDuration + interval.nonMonthDuration;
return new TimeDuration(monthDuration, nonMonthDuration);
}
public static void pushDownLimitOffsetToTimeParameter(QueryStatement queryStatement) {
GroupByTimeComponent groupByTimeComponent = queryStatement.getGroupByTimeComponent();
// if group by time contains month, we use another push down limit/offset
if (groupByTimeComponent.getInterval().containsMonth()
|| groupByTimeComponent.getSlidingStep().containsMonth()) {
pushDownLimitOffsetToTimeParameterContainingMonth(queryStatement);
return;
}
long startTime = groupByTimeComponent.getStartTime();
long endTime = groupByTimeComponent.getEndTime();
long step = groupByTimeComponent.getSlidingStep().nonMonthDuration;
long interval = groupByTimeComponent.getInterval().nonMonthDuration;
long limitSize = queryStatement.getRowLimit();
long offsetSize = queryStatement.getRowOffset();
long size = (endTime - startTime + step - 1) / step;
if (size > offsetSize) {
if (queryStatement.getResultTimeOrder() == Ordering.ASC) {
startTime = startTime + offsetSize * step;
} else {
startTime = startTime + (size - offsetSize - limitSize) * step;
}
endTime =
limitSize == 0
? endTime
: Math.min(endTime, startTime + (limitSize - 1) * step + interval);
groupByTimeComponent.setEndTime(endTime);
groupByTimeComponent.setStartTime(startTime);
} else {
// finish the query, resultSet is empty
queryStatement.setResultSetEmpty(true);
}
// If windows overlap, we need to keep LIMIT because the window size can be less than interval
// which may result in more windows than we need in the target time range.
queryStatement.setRowLimit(interval > step ? limitSize : 0);
queryStatement.setRowOffset(0);
}
// 2. push down limit/offset to group by time in align by device
public static boolean canPushDownLimitOffsetInGroupByTimeForDevice(
QueryStatement queryStatement) {
if (!hasLimitOffset(queryStatement)) {
return false;
}
if (queryStatement.isGroupByTime()
&& queryStatement.isAlignByDevice()
&& !queryStatement.hasHaving()
&& !queryStatement.hasFill()) {
return !queryStatement.hasOrderBy() || queryStatement.isOrderByBasedOnDevice();
}
return false;
}
public static List<PartialPath> pushDownLimitOffsetInGroupByTimeForDevice(
List<PartialPath> deviceNames, QueryStatement queryStatement) {
GroupByTimeComponent groupByTimeComponent = queryStatement.getGroupByTimeComponent();
if (groupByTimeComponent.getInterval().containsMonth()
|| groupByTimeComponent.getSlidingStep().containsMonth()) {
return deviceNames;
}
long startTime = groupByTimeComponent.getStartTime();
long endTime = groupByTimeComponent.getEndTime();
long slidingStep = groupByTimeComponent.getSlidingStep().nonMonthDuration;
long size = (endTime - startTime + slidingStep - 1) / slidingStep;
if (size == 0 || size * deviceNames.size() <= queryStatement.getRowOffset()) {
// resultSet is empty
queryStatement.setResultSetEmpty(true);
return deviceNames;
}
long limitSize = queryStatement.getRowLimit();
long offsetSize = queryStatement.getRowOffset();
List<PartialPath> optimizedDeviceNames = new ArrayList<>();
int startDeviceIndex = (int) (offsetSize / size);
int endDeviceIndex =
limitSize == 0
? deviceNames.size() - 1
: (int)
((limitSize - ((startDeviceIndex + 1) * size - offsetSize) + size - 1) / size
+ startDeviceIndex);
int index = 0;
while (index < startDeviceIndex) {
index++;
}
queryStatement.setRowOffset(offsetSize - startDeviceIndex * size);
// if only refer to one device, optimize the time parameter
if (startDeviceIndex == endDeviceIndex) {
optimizedDeviceNames.add(deviceNames.get(startDeviceIndex));
if (hasLimitOffset(queryStatement) && queryStatement.isOrderByTimeInDevices()) {
pushDownLimitOffsetToTimeParameter(queryStatement);
}
} else {
while (index <= endDeviceIndex && index < deviceNames.size()) {
optimizedDeviceNames.add(deviceNames.get(index));
index++;
}
}
return optimizedDeviceNames;
}
private static boolean hasLimitOffset(QueryStatement queryStatement) {
return queryStatement.hasLimit() || queryStatement.hasOffset();
}
}