blob: aebc8ffe90e2fea3f6c19a08d3d0770923328569 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.qp.utils;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.ResultColumn;
import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* This class is used to control the row number of group by level query. For example, selected
* series[root.sg.d1.s1, root.sg.d2.s1, root.sg2.d1.s1], level = 1; the result rows will be
* [root.sg.*.s1, root.sg2.*.s1], sLimit and sOffset will be used to control the result numbers
* rather than the selected series.
*/
public class GroupByLevelController {
public static String ALIAS_ERROR_MESSAGE1 =
"alias '%s' can only be matched with one result column";
public static String ALIAS_ERROR_MESSAGE2 = "Result column %s with more than one alias[%s, %s]";
private final int seriesLimit;
private int seriesOffset;
Set<String> limitPaths;
Set<String> offsetPaths;
private final int[] levels;
int prevSize = 0;
/** count(root.sg.d1.s1) with level = 1 -> count(root.*.d1.s1) */
private Map<String, String> groupedPathMap;
/** count(root.*.d1.s1) -> alias */
private Map<String, String> columnToAliasMap;
/**
* Only used to check whether one alisa is corresponding to only one column. i.e. Different
* columns can't have the same alias.
*/
private Map<String, String> aliasToColumnMap;
public GroupByLevelController(int seriesLimit, int[] levels) {
this.seriesLimit = seriesLimit;
this.levels = levels;
}
public GroupByLevelController(QueryOperator operator) {
this.seriesLimit = operator.getSpecialClauseComponent().getSeriesLimit();
this.seriesOffset = operator.getSpecialClauseComponent().getSeriesOffset();
this.limitPaths = seriesLimit > 0 ? new HashSet<>() : null;
this.offsetPaths = seriesOffset > 0 ? new HashSet<>() : null;
this.groupedPathMap = new LinkedHashMap<>();
this.levels = operator.getLevels();
}
public String getGroupedPath(String rawPath) {
return groupedPathMap.get(rawPath);
}
public String getAlias(String originName) {
return columnToAliasMap != null && columnToAliasMap.get(originName) != null
? columnToAliasMap.get(originName)
: null;
}
public void control(ResultColumn rawColumn, List<ResultColumn> resultColumns)
throws LogicalOptimizeException {
Set<Integer> countWildcardIterIndices = getCountStarIndices(rawColumn);
// `resultColumns` includes all result columns after removing wildcards, so we need to skip
// those we have processed
Iterator<ResultColumn> iterator = resultColumns.iterator();
for (int i = 0; i < prevSize; i++) {
iterator.next();
}
while (iterator.hasNext()) {
ResultColumn resultColumn = iterator.next();
Expression rootExpression = resultColumn.getExpression();
boolean hasAggregation = false;
int idx = 0;
for (Iterator<Expression> it = rootExpression.iterator(); it.hasNext(); ) {
Expression expression = it.next();
if (expression instanceof FunctionExpression
&& expression.isBuiltInAggregationFunctionExpression()) {
hasAggregation = true;
List<PartialPath> paths = ((FunctionExpression) expression).getPaths();
String functionName = ((FunctionExpression) expression).getFunctionName();
boolean isCountStar = countWildcardIterIndices.contains(idx++);
String groupedPath = generatePartialPathByLevel(isCountStar, paths.get(0), levels);
String rawPath = String.format("%s(%s)", functionName, paths.get(0).getFullPath());
String pathWithFunction = String.format("%s(%s)", functionName, groupedPath);
if (seriesLimit == 0 && seriesOffset == 0) {
groupedPathMap.put(rawPath, pathWithFunction);
checkAliasAndUpdateAliasMap(rawColumn, pathWithFunction);
} else {
// We cannot judge whether the path after grouping exists until we add it to set
if (seriesOffset > 0 && offsetPaths != null) {
offsetPaths.add(pathWithFunction);
if (offsetPaths.size() <= seriesOffset) {
iterator.remove();
if (offsetPaths.size() == seriesOffset) {
seriesOffset = 0;
}
}
} else if (offsetPaths == null || !offsetPaths.contains(pathWithFunction)) {
limitPaths.add(pathWithFunction);
if (seriesLimit > 0 && limitPaths.size() > seriesLimit) {
iterator.remove();
limitPaths.remove(pathWithFunction);
} else {
groupedPathMap.put(rawPath, pathWithFunction);
checkAliasAndUpdateAliasMap(rawColumn, pathWithFunction);
}
} else {
iterator.remove();
}
}
}
}
if (!hasAggregation) {
throw new LogicalOptimizeException(rootExpression + " can't be used in group by level.");
}
}
prevSize = resultColumns.size();
}
// As one expression may have many aggregation results in the tree leaf, here we should traverse
// all the successor expressions and record the count(*) indices
private Set<Integer> getCountStarIndices(ResultColumn rawColumn) {
Set<Integer> countWildcardIterIndices = new HashSet<>();
int idx = 0;
for (Iterator<Expression> it = rawColumn.getExpression().iterator(); it.hasNext(); ) {
Expression expression = it.next();
if (expression instanceof FunctionExpression
&& expression.isBuiltInAggregationFunctionExpression()
&& ((FunctionExpression) expression).isCountStar()) {
countWildcardIterIndices.add(idx);
}
idx++;
}
return countWildcardIterIndices;
}
private void checkAliasAndUpdateAliasMap(ResultColumn rawColumn, String originName)
throws LogicalOptimizeException {
if (!rawColumn.hasAlias()) {
return;
} else if (columnToAliasMap == null) {
columnToAliasMap = new HashMap<>();
aliasToColumnMap = new HashMap<>();
}
// If an alias is corresponding to more than one result column, throw an exception
if (columnToAliasMap.get(originName) == null) {
if (aliasToColumnMap.get(rawColumn.getAlias()) != null) {
throw new LogicalOptimizeException(
String.format(ALIAS_ERROR_MESSAGE1, rawColumn.getAlias()));
} else {
columnToAliasMap.put(originName, rawColumn.getAlias());
aliasToColumnMap.put(rawColumn.getAlias(), originName);
}
// If a result column is corresponding to more than one alias, throw an exception
} else if (!columnToAliasMap.get(originName).equals(rawColumn.getAlias())) {
throw new LogicalOptimizeException(
String.format(
ALIAS_ERROR_MESSAGE2,
originName,
columnToAliasMap.get(originName),
rawColumn.getAlias()));
}
}
/**
* Transform an originalPath to a partial path that satisfies given level. Path nodes don't
* satisfy the given level will be replaced by "*" except the sensor level, e.g.
* generatePartialPathByLevel("root.sg.dh.d1.s1", 2) will return "root.*.dh.*.s1".
*
* <p>Especially, if count(*), then the sensor level will be replaced by "*" too.
*
* @return result partial path
*/
public String generatePartialPathByLevel(
boolean isCountStar, PartialPath rawPath, int[] pathLevels) {
String[] nodes = rawPath.getNodes();
Set<Integer> levelSet = new HashSet<>();
for (int level : pathLevels) {
levelSet.add(level);
}
List<String> transformedNodes = new ArrayList<>(nodes.length);
transformedNodes.add(nodes[0]);
for (int k = 1; k < nodes.length - 1; k++) {
if (levelSet.contains(k)) {
transformedNodes.add(nodes[k]);
} else {
transformedNodes.add(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD);
}
}
if (isCountStar) {
transformedNodes.add(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD);
} else {
transformedNodes.add(nodes[nodes.length - 1]);
}
MeasurementPath groupedPath =
new MeasurementPath(
new PartialPath(transformedNodes.toArray(new String[0])),
((MeasurementPath) rawPath).getMeasurementSchema());
if (rawPath.isMeasurementAliasExists()) {
groupedPath.setMeasurementAlias(rawPath.getMeasurementAlias());
return groupedPath.getFullPathWithAlias();
}
return groupedPath.getFullPath();
}
public void serialize(ByteBuffer byteBuffer) {
ReadWriteIOUtils.write(seriesLimit, byteBuffer);
ReadWriteIOUtils.write(seriesOffset, byteBuffer);
if (limitPaths == null) {
ReadWriteIOUtils.write(-1, byteBuffer);
} else {
ReadWriteIOUtils.write(limitPaths.size(), byteBuffer);
for (String limitPath : limitPaths) {
ReadWriteIOUtils.write(limitPath, byteBuffer);
}
}
if (offsetPaths == null) {
ReadWriteIOUtils.write(-1, byteBuffer);
} else {
ReadWriteIOUtils.write(offsetPaths.size(), byteBuffer);
for (String offsetPath : offsetPaths) {
ReadWriteIOUtils.write(offsetPath, byteBuffer);
}
}
if (levels == null) {
ReadWriteIOUtils.write(-1, byteBuffer);
} else {
ReadWriteIOUtils.write(levels.length, byteBuffer);
for (int level : levels) {
ReadWriteIOUtils.write(level, byteBuffer);
}
}
ReadWriteIOUtils.write(prevSize, byteBuffer);
ReadWriteIOUtils.write(groupedPathMap, byteBuffer);
ReadWriteIOUtils.write(columnToAliasMap, byteBuffer);
ReadWriteIOUtils.write(aliasToColumnMap, byteBuffer);
}
public static GroupByLevelController deserialize(ByteBuffer byteBuffer) {
int seriesLimit = ReadWriteIOUtils.readInt(byteBuffer);
int seriesOffset = ReadWriteIOUtils.readInt(byteBuffer);
int limitPathSize = ReadWriteIOUtils.readInt(byteBuffer);
Set<String> limitPaths = null;
if (limitPathSize != -1) {
limitPaths = new HashSet<>();
for (int i = 0; i < limitPathSize; i++) {
limitPaths.add(ReadWriteIOUtils.readString(byteBuffer));
}
}
int offsetPathSize = ReadWriteIOUtils.readInt(byteBuffer);
Set<String> offsetPaths = null;
if (offsetPathSize != -1) {
offsetPaths = new HashSet<>();
for (int i = 0; i < offsetPathSize; i++) {
offsetPaths.add(ReadWriteIOUtils.readString(byteBuffer));
}
}
int levelSize = ReadWriteIOUtils.readInt(byteBuffer);
int[] levels = null;
if (levelSize != -1) {
levels = new int[levelSize];
for (int i = 0; i < levelSize; i++) {
levels[i] = ReadWriteIOUtils.readInt(byteBuffer);
}
}
int prevSize = ReadWriteIOUtils.readInt(byteBuffer);
Map<String, String> groupedPathMap = ReadWriteIOUtils.readMap(byteBuffer);
Map<String, String> columnToAliasMap = ReadWriteIOUtils.readMap(byteBuffer);
Map<String, String> aliasToColumnMap = ReadWriteIOUtils.readMap(byteBuffer);
GroupByLevelController groupByLevelController = new GroupByLevelController(seriesLimit, levels);
groupByLevelController.limitPaths = limitPaths;
groupByLevelController.aliasToColumnMap = aliasToColumnMap;
groupByLevelController.columnToAliasMap = columnToAliasMap;
groupByLevelController.groupedPathMap = groupedPathMap;
groupByLevelController.offsetPaths = offsetPaths;
groupByLevelController.prevSize = prevSize;
groupByLevelController.seriesOffset = seriesOffset;
return groupByLevelController;
}
}