blob: a58aafcd5fe00b580feae9995517d258e7680779 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.mpp.plan.analyze;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.VerifyMetadataException;
import org.apache.iotdb.db.exception.metadata.template.TemplateImcompatibeException;
import org.apache.iotdb.db.exception.sql.MeasurementNotExistException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDevicesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathsUsingTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkTypeStatement;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TimePartitionUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_RESULT_NODES;
import static org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant.COLUMN_DEVICE;
/** This visitor is used to analyze each type of Statement and returns the {@link Analysis}. */
public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> {
private static final Logger logger = LoggerFactory.getLogger(Analyzer.class);
private final IPartitionFetcher partitionFetcher;
private final ISchemaFetcher schemaFetcher;
private final MPPQueryContext context;
public AnalyzeVisitor(
IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher, MPPQueryContext context) {
this.context = context;
this.partitionFetcher = partitionFetcher;
this.schemaFetcher = schemaFetcher;
}
@Override
public Analysis visitNode(StatementNode node, MPPQueryContext context) {
throw new UnsupportedOperationException(
"Unsupported statement type: " + node.getClass().getName());
}
@Override
public Analysis visitExplain(ExplainStatement explainStatement, MPPQueryContext context) {
Analysis analysis = visitQuery(explainStatement.getQueryStatement(), context);
analysis.setStatement(explainStatement);
analysis.setFinishQueryAfterAnalyze(true);
return analysis;
}
@Override
public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
try {
// check for semantic errors
queryStatement.semanticCheck();
// concat path and construct path pattern tree
PathPatternTree patternTree = new PathPatternTree();
queryStatement =
(QueryStatement) new ConcatPathRewriter().rewrite(queryStatement, patternTree);
analysis.setStatement(queryStatement);
// request schema fetch API
logger.info("[StartFetchSchema]");
ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
logger.info("[EndFetchSchema]");
// If there is no leaf node in the schema tree, the query should be completed immediately
if (schemaTree.isEmpty()) {
if (queryStatement.isLastQuery()) {
analysis.setRespDatasetHeader(DatasetHeaderFactory.getLastQueryHeader());
}
analysis.setFinishQueryAfterAnalyze(true);
return analysis;
}
// extract global time filter from query filter and determine if there is a value filter
Pair<Filter, Boolean> resultPair = analyzeGlobalTimeFilter(queryStatement);
Filter globalTimeFilter = resultPair.left;
boolean hasValueFilter = resultPair.right;
analysis.setGlobalTimeFilter(globalTimeFilter);
analysis.setHasValueFilter(hasValueFilter);
if (queryStatement.isLastQuery()) {
if (hasValueFilter) {
throw new SemanticException("Only time filters are supported in LAST query");
}
analysis.setMergeOrderParameter(analyzeOrderBy(queryStatement));
return analyzeLast(analysis, schemaTree.getAllMeasurement(), schemaTree);
}
// Example 1: select s1, s1 + s2 as t, udf(udf(s1)) from root.sg.d1
// outputExpressions: [<root.sg.d1.s1,null>, <root.sg.d1.s1 + root.sg.d1.s2,t>,
// <udf(udf(root.sg.d1.s1)),null>]
// transformExpressions: [root.sg.d1.s1, root.sg.d1.s1 + root.sg.d1.s2,
// udf(udf(root.sg.d1.s1))]
// sourceExpressions: {root.sg.d1 -> [root.sg.d1.s1, root.sg.d1.s2]}
//
// Example 2: select s1, s2, s3 as t from root.sg.* align by device
// outputExpressions: [<s1,null>, <s2,null>, <s1,t>]
// transformExpressions: [root.sg.d1.s1, root.sg.d1.s2, root.sg.d1.s3,
// root.sg.d2.s1, root.sg.d2.s2]
// sourceExpressions: {root.sg.d1 -> [root.sg.d1.s1, root.sg.d1.s2, root.sg.d1.s2],
// root.sg.d2 -> [root.sg.d2.s1, root.sg.d2.s2]}
//
// Example 3: select sum(s1) + 1 as t, count(s2) from root.sg.d1
// outputExpressions: [<sum(root.sg.d1.s1) + 1,t>, <count(root.sg.d1.s2),t>]
// transformExpressions: [sum(root.sg.d1.s1) + 1, count(root.sg.d1.s2)]
// aggregationExpressions: {root.sg.d1 -> [sum(root.sg.d1.s1), count(root.sg.d1.s2)]}
// sourceExpressions: {root.sg.d1 -> [sum(root.sg.d1.s1), count(root.sg.d1.s2)]}
//
// Example 4: select sum(s1) + 1 as t, count(s2) from root.sg.d1 where s1 > 1
// outputExpressions: [<sum(root.sg.d1.s1) + 1,t>, <count(root.sg.d1.s2),t>]
// transformExpressions: [sum(root.sg.d1.s1) + 1, count(root.sg.d1.s2)]
// aggregationExpressions: {root.sg.d1 -> [sum(root.sg.d1.s1), count(root.sg.d1.s2)]}
// sourceExpressions: {root.sg.d1 -> [root.sg.d1.s1, root.sg.d1.s2]}
List<Pair<Expression, String>> outputExpressions;
if (queryStatement.isAlignByDevice()) {
Map<String, Set<Expression>> deviceToTransformExpressions = new HashMap<>();
Expression deviceExpression =
new TimeSeriesOperand(new MeasurementPath(COLUMN_DEVICE, TSDataType.TEXT));
Set<Expression> transformInput = new LinkedHashSet<>();
transformInput.add(deviceExpression);
Set<Expression> transformOutput = new LinkedHashSet<>();
transformOutput.add(deviceExpression);
// all selected device
Set<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
Map<String, Set<String>> deviceToMeasurementsMap = new HashMap<>();
outputExpressions =
analyzeSelect(
analysis,
schemaTree,
deviceList,
deviceToTransformExpressions,
deviceToMeasurementsMap,
transformInput);
transformOutput.addAll(
outputExpressions.stream().map(Pair::getLeft).collect(Collectors.toList()));
if (queryStatement.hasHaving()) {
Expression havingExpression =
analyzeHaving(
analysis,
schemaTree,
deviceList,
deviceToTransformExpressions,
deviceToMeasurementsMap,
transformInput);
analyzeExpression(analysis, havingExpression);
// used for planFilter after planDeviceView
analysis.setHavingExpression(havingExpression);
}
List<String> allMeasurements =
transformInput.stream()
.map(Expression::getExpressionString)
.collect(Collectors.toList());
Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
for (String deviceName : deviceToMeasurementsMap.keySet()) {
List<String> measurementsUnderDevice =
new ArrayList<>(deviceToMeasurementsMap.get(deviceName));
List<Integer> indexes = new ArrayList<>();
for (String measurement : measurementsUnderDevice) {
indexes.add(allMeasurements.indexOf(measurement));
}
deviceToMeasurementIndexesMap.put(deviceName, indexes);
}
analysis.setDeviceToMeasurementIndexesMap(deviceToMeasurementIndexesMap);
Map<String, Set<Expression>> deviceToSourceExpressions = new HashMap<>();
boolean isValueFilterAggregation = queryStatement.isAggregationQuery() && hasValueFilter;
Map<String, Boolean> deviceToIsRawDataSource = new HashMap<>();
Map<String, Set<Expression>> deviceToAggregationExpressions = new HashMap<>();
Map<String, Set<Expression>> deviceToAggregationTransformExpressions = new HashMap<>();
for (String deviceName : deviceToTransformExpressions.keySet()) {
Set<Expression> transformExpressions = deviceToTransformExpressions.get(deviceName);
Set<Expression> aggregationExpressions = new LinkedHashSet<>();
Set<Expression> aggregationTransformExpressions = new LinkedHashSet<>();
boolean isHasRawDataInputAggregation = false;
if (queryStatement.isAggregationQuery()) {
// true if nested expressions and UDFs exist in aggregation function
// i.e. select sum(s1 + 1) from root.sg.d1 align by device
isHasRawDataInputAggregation =
analyzeAggregation(
transformExpressions, aggregationExpressions, aggregationTransformExpressions);
deviceToAggregationExpressions.put(deviceName, aggregationExpressions);
deviceToAggregationTransformExpressions.put(
deviceName, aggregationTransformExpressions);
}
boolean isRawDataSource =
!queryStatement.isAggregationQuery()
|| isValueFilterAggregation
|| isHasRawDataInputAggregation;
for (Expression expression : transformExpressions) {
updateSource(
expression,
deviceToSourceExpressions.computeIfAbsent(deviceName, key -> new LinkedHashSet<>()),
isRawDataSource);
}
deviceToIsRawDataSource.put(deviceName, isRawDataSource);
}
analysis.setDeviceToAggregationExpressions(deviceToAggregationExpressions);
analysis.setDeviceToAggregationTransformExpressions(
deviceToAggregationTransformExpressions);
analysis.setDeviceToIsRawDataSource(deviceToIsRawDataSource);
if (queryStatement.getWhereCondition() != null) {
Map<String, Expression> deviceToQueryFilter = new HashMap<>();
Iterator<PartialPath> deviceIterator = deviceList.iterator();
while (deviceIterator.hasNext()) {
PartialPath devicePath = deviceIterator.next();
Expression queryFilter = null;
try {
queryFilter = analyzeWhereSplitByDevice(queryStatement, devicePath, schemaTree);
} catch (SemanticException e) {
if (e instanceof MeasurementNotExistException) {
logger.warn(e.getMessage());
deviceIterator.remove();
deviceToSourceExpressions.remove(devicePath.getFullPath());
continue;
}
throw e;
}
deviceToQueryFilter.put(devicePath.getFullPath(), queryFilter);
analyzeExpression(analysis, queryFilter);
updateSource(
queryFilter,
deviceToSourceExpressions.computeIfAbsent(
devicePath.getFullPath(), key -> new LinkedHashSet<>()),
true);
}
analysis.setDeviceToQueryFilter(deviceToQueryFilter);
}
analysis.setTransformInput(transformInput);
analysis.setTransformOutput(transformOutput);
analysis.setDeviceToSourceExpressions(deviceToSourceExpressions);
analysis.setDeviceToTransformExpressions(deviceToTransformExpressions);
} else {
outputExpressions = analyzeSelect(analysis, schemaTree);
Set<Expression> transformExpressions =
outputExpressions.stream()
.map(Pair::getLeft)
.collect(Collectors.toCollection(LinkedHashSet::new));
// cast functionName to lowercase in havingExpression
Expression havingExpression =
queryStatement.hasHaving()
? ExpressionAnalyzer.removeAliasFromExpression(
queryStatement.getHavingCondition().getPredicate())
: null;
// get removeWildcard Expressions in Having
// used to analyzeAggregation in Having expression and updateSource
Set<Expression> transformExpressionsInHaving =
queryStatement.hasHaving()
? new HashSet<>(
ExpressionAnalyzer.removeWildcardInFilter(
havingExpression,
queryStatement.getFromComponent().getPrefixPaths(),
schemaTree,
false))
: null;
if (queryStatement.isGroupByLevel()) {
// map from grouped expression to set of input expressions
Map<Expression, Expression> rawPathToGroupedPathMap = new HashMap<>();
Map<Expression, Set<Expression>> groupByLevelExpressions =
analyzeGroupByLevel(
analysis, outputExpressions, transformExpressions, rawPathToGroupedPathMap);
analysis.setGroupByLevelExpressions(groupByLevelExpressions);
analysis.setRawPathToGroupedPathMap(rawPathToGroupedPathMap);
}
// true if nested expressions and UDFs exist in aggregation function
// i.e. select sum(s1 + 1) from root.sg.d1
boolean isHasRawDataInputAggregation = false;
if (queryStatement.isAggregationQuery()) {
Set<Expression> aggregationExpressions = new HashSet<>();
Set<Expression> aggregationTransformExpressions = new HashSet<>();
List<Expression> aggregationExpressionsInHaving =
new ArrayList<>(); // as input of GroupByLevelController
isHasRawDataInputAggregation =
analyzeAggregation(
transformExpressions, aggregationExpressions, aggregationTransformExpressions);
if (queryStatement.hasHaving()) {
isHasRawDataInputAggregation |=
analyzeAggregationInHaving(
transformExpressionsInHaving,
aggregationExpressionsInHaving,
aggregationExpressions,
aggregationTransformExpressions);
havingExpression = // construct Filter from Having
analyzeHaving(
analysis,
analysis.getGroupByLevelExpressions(),
transformExpressionsInHaving,
aggregationExpressionsInHaving);
analyzeExpression(analysis, havingExpression);
analysis.setHavingExpression(havingExpression);
}
analysis.setAggregationExpressions(aggregationExpressions);
analysis.setAggregationTransformExpressions(aggregationTransformExpressions);
}
// generate sourceExpression according to transformExpressions
Set<Expression> sourceExpressions = new HashSet<>();
boolean isValueFilterAggregation = queryStatement.isAggregationQuery() && hasValueFilter;
boolean isRawDataSource =
!queryStatement.isAggregationQuery()
|| isValueFilterAggregation
|| isHasRawDataInputAggregation;
for (Expression expression : transformExpressions) {
updateSource(expression, sourceExpressions, isRawDataSource);
}
if (queryStatement.hasHaving()) {
for (Expression expression : transformExpressionsInHaving) {
analyzeExpression(analysis, expression);
updateSource(expression, sourceExpressions, isRawDataSource);
}
}
if (queryStatement.getWhereCondition() != null) {
Expression queryFilter = analyzeWhere(queryStatement, schemaTree);
// update sourceExpression according to queryFilter
analyzeExpression(analysis, queryFilter);
updateSource(queryFilter, sourceExpressions, isRawDataSource);
analysis.setQueryFilter(queryFilter);
}
analysis.setRawDataSource(isRawDataSource);
analysis.setSourceExpressions(sourceExpressions);
analysis.setTransformExpressions(transformExpressions);
}
if (queryStatement.isGroupByTime()) {
GroupByTimeComponent groupByTimeComponent = queryStatement.getGroupByTimeComponent();
if ((groupByTimeComponent.isIntervalByMonth()
|| groupByTimeComponent.isSlidingStepByMonth())
&& queryStatement.getResultTimeOrder() == Ordering.DESC) {
throw new SemanticException("Group by month doesn't support order by time desc now.");
}
analysis.setGroupByTimeParameter(new GroupByTimeParameter(groupByTimeComponent));
}
if (queryStatement.getFillComponent() != null) {
FillComponent fillComponent = queryStatement.getFillComponent();
analysis.setFillDescriptor(
new FillDescriptor(fillComponent.getFillPolicy(), fillComponent.getFillValue()));
}
// generate result set header according to output expressions
DatasetHeader datasetHeader = analyzeOutput(analysis, outputExpressions);
analysis.setOutputExpressions(outputExpressions);
analysis.setRespDatasetHeader(datasetHeader);
// fetch partition information
Set<String> deviceSet = new HashSet<>();
if (queryStatement.isAlignByDevice()) {
deviceSet = analysis.getDeviceToSourceExpressions().keySet();
} else {
for (Expression expression : analysis.getSourceExpressions()) {
deviceSet.add(ExpressionAnalyzer.getDeviceNameInSourceExpression(expression));
}
}
DataPartition dataPartition = fetchDataPartitionByDevices(deviceSet, schemaTree);
analysis.setDataPartitionInfo(dataPartition);
} catch (StatementAnalyzeException | IllegalPathException e) {
logger.error("Meet error when analyzing the query statement: ", e);
throw new StatementAnalyzeException(
"Meet error when analyzing the query statement: " + e.getMessage());
}
return analysis;
}
private List<Pair<Expression, String>> analyzeSelect(Analysis analysis, ISchemaTree schemaTree) {
QueryStatement queryStatement = (QueryStatement) analysis.getStatement();
List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
boolean isGroupByLevel = queryStatement.isGroupByLevel();
ColumnPaginationController paginationController =
new ColumnPaginationController(
queryStatement.getSeriesLimit(),
queryStatement.getSeriesOffset(),
queryStatement.isLastQuery() || isGroupByLevel);
for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) {
boolean hasAlias = resultColumn.hasAlias();
List<Expression> resultExpressions =
ExpressionAnalyzer.removeWildcardInExpression(resultColumn.getExpression(), schemaTree);
if (hasAlias && !queryStatement.isGroupByLevel() && resultExpressions.size() > 1) {
throw new SemanticException(
String.format(
"alias '%s' can only be matched with one time series", resultColumn.getAlias()));
}
for (Expression expression : resultExpressions) {
if (paginationController.hasCurOffset()) {
paginationController.consumeOffset();
continue;
}
if (paginationController.hasCurLimit()) {
if (isGroupByLevel) {
analyzeExpression(analysis, expression);
outputExpressions.add(new Pair<>(expression, resultColumn.getAlias()));
if (resultColumn.getExpression() instanceof FunctionExpression) {
queryStatement
.getGroupByLevelComponent()
.updateIsCountStar((FunctionExpression) resultColumn.getExpression());
}
} else {
Expression expressionWithoutAlias =
ExpressionAnalyzer.removeAliasFromExpression(expression);
String alias =
!Objects.equals(expressionWithoutAlias, expression)
? expression.getExpressionString()
: null;
alias = hasAlias ? resultColumn.getAlias() : alias;
analyzeExpression(analysis, expressionWithoutAlias);
outputExpressions.add(new Pair<>(expressionWithoutAlias, alias));
}
paginationController.consumeLimit();
} else {
break;
}
}
}
return outputExpressions;
}
private Set<PartialPath> analyzeFrom(QueryStatement queryStatement, ISchemaTree schemaTree) {
// device path patterns in FROM clause
List<PartialPath> devicePatternList = queryStatement.getFromComponent().getPrefixPaths();
Set<PartialPath> deviceList = new LinkedHashSet<>();
for (PartialPath devicePattern : devicePatternList) {
// get all matched devices
deviceList.addAll(
schemaTree.getMatchedDevices(devicePattern).stream()
.map(DeviceSchemaInfo::getDevicePath)
.collect(Collectors.toList()));
}
return deviceList;
}
private List<Pair<Expression, String>> analyzeSelect(
Analysis analysis,
ISchemaTree schemaTree,
Set<PartialPath> deviceList,
Map<String, Set<Expression>> deviceToTransformExpressions,
Map<String, Set<String>> deviceToMeasurementsMap,
Set<Expression> transformInput) {
QueryStatement queryStatement = (QueryStatement) analysis.getStatement();
List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
ColumnPaginationController paginationController =
new ColumnPaginationController(
queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset(), false);
for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) {
Expression selectExpression = resultColumn.getExpression();
boolean hasAlias = resultColumn.hasAlias();
// measurement expression after removing wildcard
// use LinkedHashMap for order-preserving
Map<Expression, Map<String, Expression>> measurementToDeviceTransformExpressions =
new LinkedHashMap<>();
for (PartialPath device : deviceList) {
List<Expression> transformExpressions =
ExpressionAnalyzer.concatDeviceAndRemoveWildcard(selectExpression, device, schemaTree);
if (queryStatement.isAggregationQuery()) {
// extract aggregation in transformExpressions as input of Transform node after DeviceView
// node
Set<Expression> aggregationExpressions = analyzeAggregation(transformExpressions);
for (Expression aggregationExpression : aggregationExpressions) {
Expression measurementExpression =
ExpressionAnalyzer.getMeasurementExpression(aggregationExpression);
transformInput.add(measurementExpression);
analyzeExpression(analysis, measurementExpression);
}
}
for (Expression transformExpression : transformExpressions) {
Expression measurementExpression =
ExpressionAnalyzer.getMeasurementExpression(transformExpression);
measurementToDeviceTransformExpressions
.computeIfAbsent(measurementExpression, key -> new LinkedHashMap<>())
.put(
device.getFullPath(),
ExpressionAnalyzer.removeAliasFromExpression(transformExpression));
}
}
if (hasAlias && measurementToDeviceTransformExpressions.keySet().size() > 1) {
throw new SemanticException(
String.format(
"alias '%s' can only be matched with one time series", resultColumn.getAlias()));
}
for (Expression measurementExpression : measurementToDeviceTransformExpressions.keySet()) {
if (paginationController.hasCurOffset()) {
paginationController.consumeOffset();
continue;
}
if (paginationController.hasCurLimit()) {
Map<String, Expression> deviceToTransformExpressionOfOneMeasurement =
measurementToDeviceTransformExpressions.get(measurementExpression);
deviceToTransformExpressionOfOneMeasurement
.values()
.forEach(expression -> analyzeExpression(analysis, expression));
// check whether the datatype of paths which has the same measurement name are
// consistent
// if not, throw a SemanticException
checkDataTypeConsistencyInAlignByDevice(
analysis, new ArrayList<>(deviceToTransformExpressionOfOneMeasurement.values()));
// add outputExpressions
Expression measurementExpressionWithoutAlias =
ExpressionAnalyzer.removeAliasFromExpression(measurementExpression);
String alias =
!Objects.equals(measurementExpressionWithoutAlias, measurementExpression)
? measurementExpression.getExpressionString()
: null;
alias = hasAlias ? resultColumn.getAlias() : alias;
analyzeExpression(analysis, measurementExpressionWithoutAlias);
outputExpressions.add(new Pair<>(measurementExpressionWithoutAlias, alias));
// add deviceToTransformExpressions
for (String deviceName : deviceToTransformExpressionOfOneMeasurement.keySet()) {
Expression transformExpression =
deviceToTransformExpressionOfOneMeasurement.get(deviceName);
Expression transformExpressionWithoutAlias =
ExpressionAnalyzer.removeAliasFromExpression(transformExpression);
analyzeExpression(analysis, transformExpressionWithoutAlias);
deviceToTransformExpressions
.computeIfAbsent(deviceName, key -> new LinkedHashSet<>())
.add(transformExpressionWithoutAlias);
if (queryStatement.isAggregationQuery()) {
deviceToMeasurementsMap
.computeIfAbsent(deviceName, key -> new LinkedHashSet<>())
.addAll(
ExpressionAnalyzer.searchAggregationExpressions(
measurementExpressionWithoutAlias)
.stream()
.map(Expression::getExpressionString)
.collect(Collectors.toList()));
} else {
List<Expression> sourceExpressions =
ExpressionAnalyzer.searchSourceExpressions(
measurementExpressionWithoutAlias, true);
transformInput.addAll(sourceExpressions);
deviceToMeasurementsMap
.computeIfAbsent(deviceName, key -> new LinkedHashSet<>())
.addAll(
sourceExpressions.stream()
.map(Expression::getExpressionString)
.collect(Collectors.toList()));
}
}
paginationController.consumeLimit();
} else {
break;
}
}
}
return outputExpressions;
}
private Expression analyzeHaving(
Analysis analysis,
ISchemaTree schemaTree,
Set<PartialPath> deviceList,
Map<String, Set<Expression>> deviceToTransformExpressions,
Map<String, Set<String>> deviceToMeasurementsMap,
Set<Expression> transformInput) {
QueryStatement queryStatement = (QueryStatement) analysis.getStatement();
Expression havingExpression =
ExpressionAnalyzer.removeAliasFromExpression(
queryStatement.getHavingCondition().getPredicate());
Set<Expression> measurementsInHaving = new HashSet<>();
// measurement expression after removing wildcard
// use LinkedHashMap for order-preserving
Map<Expression, Map<String, Expression>> measurementToDeviceTransformExpressions =
new LinkedHashMap<>();
for (PartialPath device : deviceList) {
List<Expression> transformExpressionsInHaving =
ExpressionAnalyzer.concatDeviceAndRemoveWildcard(havingExpression, device, schemaTree);
measurementsInHaving.addAll(
transformExpressionsInHaving.stream()
.map(
transformExpression ->
ExpressionAnalyzer.getMeasurementExpression(transformExpression))
.collect(Collectors.toList()));
Set<Expression> aggregationExpressionsInHaving =
analyzeAggregation(transformExpressionsInHaving);
for (Expression aggregationExpressionInHaving : aggregationExpressionsInHaving) {
Expression measurementExpression =
ExpressionAnalyzer.getMeasurementExpression(aggregationExpressionInHaving);
transformInput.add(measurementExpression);
analyzeExpression(analysis, measurementExpression);
measurementToDeviceTransformExpressions
.computeIfAbsent(measurementExpression, key -> new LinkedHashMap<>())
.put(device.getFullPath(), aggregationExpressionInHaving);
}
}
for (Expression measurementExpression : measurementToDeviceTransformExpressions.keySet()) {
Map<String, Expression> deviceToTransformExpressionOfOneMeasurement =
measurementToDeviceTransformExpressions.get(measurementExpression);
deviceToTransformExpressionOfOneMeasurement
.values()
.forEach(expression -> analyzeExpression(analysis, expression));
// check whether the datatype of paths which has the same measurement name are
// consistent
// if not, throw a SemanticException
checkDataTypeConsistencyInAlignByDevice(
analysis, new ArrayList<>(deviceToTransformExpressionOfOneMeasurement.values()));
analyzeExpression(analysis, measurementExpression);
// add deviceToTransformExpressions
for (String deviceName : deviceToTransformExpressionOfOneMeasurement.keySet()) {
Expression transformExpression =
deviceToTransformExpressionOfOneMeasurement.get(deviceName);
deviceToTransformExpressions
.computeIfAbsent(deviceName, key -> new LinkedHashSet<>())
.add(transformExpression);
deviceToMeasurementsMap
.computeIfAbsent(deviceName, key -> new LinkedHashSet<>())
.add(measurementExpression.getExpressionString());
}
}
return ExpressionUtils.constructQueryFilter(
measurementsInHaving.stream().distinct().collect(Collectors.toList()));
}
private Pair<Filter, Boolean> analyzeGlobalTimeFilter(QueryStatement queryStatement) {
Filter globalTimeFilter = null;
boolean hasValueFilter = false;
if (queryStatement.getWhereCondition() != null) {
Expression predicate = queryStatement.getWhereCondition().getPredicate();
WhereCondition whereCondition = queryStatement.getWhereCondition();
Pair<Filter, Boolean> resultPair =
ExpressionAnalyzer.transformToGlobalTimeFilter(predicate, true, true);
predicate = ExpressionAnalyzer.evaluatePredicate(predicate);
// set where condition to null if predicate is true
if (predicate.getExpressionType().equals(ExpressionType.CONSTANT)
&& Boolean.parseBoolean(predicate.getExpressionString())) {
queryStatement.setWhereCondition(null);
} else {
whereCondition.setPredicate(predicate);
}
globalTimeFilter = resultPair.left;
hasValueFilter = resultPair.right;
}
if (queryStatement.isGroupByTime()) {
GroupByTimeComponent groupByTimeComponent = queryStatement.getGroupByTimeComponent();
Filter groupByFilter = initGroupByFilter(groupByTimeComponent);
if (globalTimeFilter == null) {
globalTimeFilter = groupByFilter;
} else {
globalTimeFilter = FilterFactory.and(globalTimeFilter, groupByFilter);
}
}
return new Pair<>(globalTimeFilter, hasValueFilter);
}
private void updateSource(
Expression selectExpr, Set<Expression> sourceExpressions, boolean isRawDataSource) {
sourceExpressions.addAll(
ExpressionAnalyzer.searchSourceExpressions(selectExpr, isRawDataSource));
}
private boolean analyzeAggregation(
Set<Expression> transformExpressions,
Set<Expression> aggregationExpressions,
Set<Expression> aggregationTransformExpressions) {
// true if nested expressions and UDFs exist in aggregation function
// i.e. select sum(s1 + 1) from root.sg.d1 align by device
boolean isHasRawDataInputAggregation = false;
for (Expression expression : transformExpressions) {
for (Expression aggregationExpression :
ExpressionAnalyzer.searchAggregationExpressions(expression)) {
aggregationExpressions.add(aggregationExpression);
aggregationTransformExpressions.addAll(aggregationExpression.getExpressions());
}
}
for (Expression aggregationTransformExpression : aggregationTransformExpressions) {
if (ExpressionAnalyzer.checkIsNeedTransform(aggregationTransformExpression)) {
isHasRawDataInputAggregation = true;
break;
}
}
return isHasRawDataInputAggregation;
}
private boolean analyzeAggregationInHaving(
Set<Expression> expressions,
List<Expression> aggregationExpressionsInHaving,
Set<Expression> aggregationExpressions,
Set<Expression> aggregationTransformExpressions) {
// true if nested expressions and UDFs exist in aggregation function
// i.e. select sum(s1 + 1) from root.sg.d1 align by device
boolean isHasRawDataInputAggregation = false;
for (Expression expression : expressions) {
for (Expression aggregationExpression :
ExpressionAnalyzer.searchAggregationExpressions(expression)) {
aggregationExpressionsInHaving.add(aggregationExpression);
aggregationExpressions.add(aggregationExpression);
aggregationTransformExpressions.addAll(aggregationExpression.getExpressions());
}
}
for (Expression aggregationTransformExpression : aggregationTransformExpressions) {
if (ExpressionAnalyzer.checkIsNeedTransform(aggregationTransformExpression)) {
isHasRawDataInputAggregation = true;
break;
}
}
return isHasRawDataInputAggregation;
}
private Set<Expression> analyzeAggregation(List<Expression> inputExpressions) {
Set<Expression> aggregationExpressions = new HashSet<>();
for (Expression inputExpression : inputExpressions) {
for (Expression aggregationExpression :
ExpressionAnalyzer.searchAggregationExpressions(inputExpression)) {
aggregationExpressions.add(aggregationExpression);
}
}
return aggregationExpressions;
}
private Expression analyzeWhere(QueryStatement queryStatement, ISchemaTree schemaTree) {
List<Expression> rewrittenPredicates =
ExpressionAnalyzer.removeWildcardInFilter(
queryStatement.getWhereCondition().getPredicate(),
queryStatement.getFromComponent().getPrefixPaths(),
schemaTree,
true);
return ExpressionUtils.constructQueryFilter(
rewrittenPredicates.stream().distinct().collect(Collectors.toList()));
}
private Expression analyzeWhereSplitByDevice(
QueryStatement queryStatement, PartialPath devicePath, ISchemaTree schemaTree) {
List<Expression> rewrittenPredicates =
ExpressionAnalyzer.removeWildcardInFilterByDevice(
queryStatement.getWhereCondition().getPredicate(), devicePath, schemaTree, true);
return ExpressionUtils.constructQueryFilter(
rewrittenPredicates.stream().distinct().collect(Collectors.toList()));
}
private Expression analyzeHaving(
Analysis analysis,
Map<Expression, Set<Expression>> groupByLevelExpressions,
Set<Expression> transformExpressionsInHaving,
List<Expression> aggregationExpressionsInHaving) {
QueryStatement queryStatement = (QueryStatement) analysis.getStatement();
if (queryStatement.isGroupByLevel()) {
Map<Expression, Expression> rawPathToGroupedPathMapInHaving =
analyzeGroupByLevelInHaving(
analysis, aggregationExpressionsInHaving, groupByLevelExpressions);
List<Expression> convertedPredicates = new ArrayList<>();
for (Expression expression : transformExpressionsInHaving) {
Expression convertedPredicate =
ExpressionAnalyzer.replaceRawPathWithGroupedPath(
expression, rawPathToGroupedPathMapInHaving);
convertedPredicates.add(convertedPredicate);
analyzeExpression(analysis, expression);
}
return ExpressionUtils.constructQueryFilter(
convertedPredicates.stream().distinct().collect(Collectors.toList()));
}
return ExpressionUtils.constructQueryFilter(
transformExpressionsInHaving.stream().distinct().collect(Collectors.toList()));
}
private Map<Expression, Expression> analyzeGroupByLevelInHaving(
Analysis analysis,
List<Expression> inputExpressions,
Map<Expression, Set<Expression>> groupByLevelExpressions) {
QueryStatement queryStatement = (QueryStatement) analysis.getStatement();
GroupByLevelController groupByLevelController =
new GroupByLevelController(queryStatement.getGroupByLevelComponent().getLevels());
for (Expression inputExpression : inputExpressions) {
groupByLevelController.control(false, inputExpression, null);
}
Map<Expression, Set<Expression>> groupedPathMap = groupByLevelController.getGroupedPathMap();
groupedPathMap.keySet().forEach(expression -> analyzeExpression(analysis, expression));
groupByLevelExpressions.putAll(groupedPathMap);
return groupByLevelController.getRawPathToGroupedPathMap();
}
private Map<Expression, Set<Expression>> analyzeGroupByLevel(
Analysis analysis,
List<Pair<Expression, String>> outputExpressions,
Set<Expression> transformExpressions,
Map<Expression, Expression> rawPathToGroupedPathMap) {
QueryStatement queryStatement = (QueryStatement) analysis.getStatement();
GroupByLevelController groupByLevelController =
new GroupByLevelController(queryStatement.getGroupByLevelComponent().getLevels());
for (int i = 0; i < outputExpressions.size(); i++) {
Pair<Expression, String> expressionAliasPair = outputExpressions.get(i);
boolean isCountStar = queryStatement.getGroupByLevelComponent().isCountStar(i);
groupByLevelController.control(
isCountStar, expressionAliasPair.left, expressionAliasPair.right);
}
Map<Expression, Set<Expression>> rawGroupByLevelExpressions =
groupByLevelController.getGroupedPathMap();
rawPathToGroupedPathMap.putAll(groupByLevelController.getRawPathToGroupedPathMap());
Map<Expression, Set<Expression>> groupByLevelExpressions = new LinkedHashMap<>();
outputExpressions.clear();
ColumnPaginationController paginationController =
new ColumnPaginationController(
queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset(), false);
for (Expression groupedExpression : rawGroupByLevelExpressions.keySet()) {
if (paginationController.hasCurOffset()) {
paginationController.consumeOffset();
continue;
}
if (paginationController.hasCurLimit()) {
Pair<Expression, String> outputExpression =
removeAliasFromExpression(
groupedExpression,
groupByLevelController.getAlias(groupedExpression.getExpressionString()));
Expression groupedExpressionWithoutAlias = outputExpression.left;
Set<Expression> rawExpressions = rawGroupByLevelExpressions.get(groupedExpression);
rawExpressions.forEach(expression -> analyzeExpression(analysis, expression));
Set<Expression> rawExpressionsWithoutAlias =
rawExpressions.stream()
.map(ExpressionAnalyzer::removeAliasFromExpression)
.collect(Collectors.toSet());
rawExpressionsWithoutAlias.forEach(expression -> analyzeExpression(analysis, expression));
groupByLevelExpressions.put(groupedExpressionWithoutAlias, rawExpressionsWithoutAlias);
analyzeExpression(analysis, groupedExpressionWithoutAlias);
outputExpressions.add(outputExpression);
paginationController.consumeLimit();
} else {
break;
}
}
// reset transformExpressions after applying SLIMIT/SOFFSET
transformExpressions.clear();
transformExpressions.addAll(
groupByLevelExpressions.values().stream().flatMap(Set::stream).collect(Collectors.toSet()));
return groupByLevelExpressions;
}
private Pair<Expression, String> removeAliasFromExpression(
Expression rawExpression, String rawAlias) {
Expression expressionWithoutAlias = ExpressionAnalyzer.removeAliasFromExpression(rawExpression);
String alias =
!Objects.equals(expressionWithoutAlias, rawExpression)
? rawExpression.getExpressionString()
: null;
alias = rawAlias == null ? alias : rawAlias;
return new Pair<>(expressionWithoutAlias, alias);
}
private DatasetHeader analyzeOutput(
Analysis analysis, List<Pair<Expression, String>> outputExpressions) {
QueryStatement queryStatement = (QueryStatement) analysis.getStatement();
boolean isIgnoreTimestamp =
queryStatement.isAggregationQuery() && !queryStatement.isGroupByTime();
List<ColumnHeader> columnHeaders = new ArrayList<>();
if (queryStatement.isAlignByDevice()) {
columnHeaders.add(new ColumnHeader(COLUMN_DEVICE, TSDataType.TEXT, null));
}
columnHeaders.addAll(
outputExpressions.stream()
.map(
expressionAliasPair -> {
String columnName = expressionAliasPair.left.getExpressionString();
String alias = expressionAliasPair.right;
return new ColumnHeader(
columnName, analysis.getType(expressionAliasPair.left), alias);
})
.collect(Collectors.toList()));
return new DatasetHeader(columnHeaders, isIgnoreTimestamp);
}
private Analysis analyzeLast(
Analysis analysis, List<MeasurementPath> allSelectedPath, ISchemaTree schemaTree) {
Set<Expression> sourceExpressions;
List<SortItem> sortItemList = analysis.getMergeOrderParameter().getSortItemList();
if (sortItemList.size() > 0) {
checkState(
sortItemList.size() == 1 && sortItemList.get(0).getSortKey() == SortKey.TIMESERIES,
"Last queries only support sorting by timeseries now.");
boolean isAscending = sortItemList.get(0).getOrdering() == Ordering.ASC;
sourceExpressions =
allSelectedPath.stream()
.map(TimeSeriesOperand::new)
.sorted(
(o1, o2) ->
isAscending
? o1.getExpressionString().compareTo(o2.getExpressionString())
: o2.getExpressionString().compareTo(o1.getExpressionString()))
.collect(Collectors.toCollection(LinkedHashSet::new));
} else {
sourceExpressions =
allSelectedPath.stream()
.map(TimeSeriesOperand::new)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
sourceExpressions.forEach(expression -> analyzeExpression(analysis, expression));
analysis.setSourceExpressions(sourceExpressions);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getLastQueryHeader());
Set<String> deviceSet =
allSelectedPath.stream().map(MeasurementPath::getDevice).collect(Collectors.toSet());
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
for (String devicePath : deviceSet) {
DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
queryParam.setDevicePath(devicePath);
sgNameToQueryParamsMap
.computeIfAbsent(schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
.add(queryParam);
}
DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
analysis.setDataPartitionInfo(dataPartition);
return analysis;
}
private DataPartition fetchDataPartitionByDevices(Set<String> deviceSet, ISchemaTree schemaTree) {
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
for (String devicePath : deviceSet) {
DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
queryParam.setDevicePath(devicePath);
sgNameToQueryParamsMap
.computeIfAbsent(schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
.add(queryParam);
}
return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
}
private OrderByParameter analyzeOrderBy(QueryStatement queryStatement) {
return new OrderByParameter(queryStatement.getSortItemList());
}
private void analyzeExpression(Analysis analysis, Expression expression) {
ExpressionTypeAnalyzer.analyzeExpression(analysis, expression);
}
/**
* Check datatype consistency in ALIGN BY DEVICE.
*
* <p>an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device, return
* false while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
*/
private void checkDataTypeConsistencyInAlignByDevice(
Analysis analysis, List<Expression> expressions) {
TSDataType checkedDataType = analysis.getType(expressions.get(0));
for (Expression expression : expressions) {
if (analysis.getType(expression) != checkedDataType) {
throw new SemanticException(
"ALIGN BY DEVICE: the data types of the same measurement column should be the same across devices.");
}
}
}
@Override
public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
insertStatement.semanticCheck();
long[] timeArray = insertStatement.getTimes();
PartialPath devicePath = insertStatement.getDevice();
String[] measurementList = insertStatement.getMeasurementList();
if (timeArray.length == 1) {
// construct insert row statement
InsertRowStatement insertRowStatement = new InsertRowStatement();
insertRowStatement.setDevicePath(devicePath);
insertRowStatement.setTime(timeArray[0]);
insertRowStatement.setMeasurements(measurementList);
insertRowStatement.setDataTypes(new TSDataType[measurementList.length]);
Object[] values = new Object[measurementList.length];
System.arraycopy(insertStatement.getValuesList().get(0), 0, values, 0, values.length);
insertRowStatement.setValues(values);
insertRowStatement.setNeedInferType(true);
insertRowStatement.setAligned(insertStatement.isAligned());
return insertRowStatement.accept(this, context);
} else {
// construct insert rows statement
// construct insert statement
InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
for (int i = 0; i < timeArray.length; i++) {
InsertRowStatement statement = new InsertRowStatement();
statement.setDevicePath(devicePath);
String[] measurements = new String[measurementList.length];
System.arraycopy(measurementList, 0, measurements, 0, measurements.length);
statement.setMeasurements(measurements);
statement.setTime(timeArray[i]);
statement.setDataTypes(new TSDataType[measurementList.length]);
Object[] values = new Object[measurementList.length];
System.arraycopy(insertStatement.getValuesList().get(i), 0, values, 0, values.length);
statement.setValues(values);
statement.setAligned(insertStatement.isAligned());
statement.setNeedInferType(true);
insertRowStatementList.add(statement);
}
insertRowsStatement.setInsertRowStatementList(insertRowStatementList);
return insertRowsStatement.accept(this, context);
}
}
@Override
public Analysis visitCreateTimeseries(
CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
if (createTimeSeriesStatement.getTags() != null
&& !createTimeSeriesStatement.getTags().isEmpty()
&& createTimeSeriesStatement.getAttributes() != null
&& !createTimeSeriesStatement.getAttributes().isEmpty()) {
for (String tagKey : createTimeSeriesStatement.getTags().keySet()) {
if (createTimeSeriesStatement.getAttributes().containsKey(tagKey)) {
throw new SemanticException(
String.format("Tag and attribute shouldn't have the same property key [%s]", tagKey));
}
}
}
Analysis analysis = new Analysis();
analysis.setStatement(createTimeSeriesStatement);
checkIsTemplateCompatible(
createTimeSeriesStatement.getPath(), createTimeSeriesStatement.getAlias());
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendFullPath(createTimeSeriesStatement.getPath());
SchemaPartition schemaPartitionInfo = partitionFetcher.getOrCreateSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
return analysis;
}
private void checkIsTemplateCompatible(PartialPath timeseriesPath, String alias) {
Pair<Template, PartialPath> templateInfo = schemaFetcher.checkTemplateSetInfo(timeseriesPath);
if (templateInfo != null) {
if (templateInfo.left.hasSchema(timeseriesPath.getMeasurement())) {
throw new RuntimeException(
new TemplateImcompatibeException(
timeseriesPath.getFullPath(),
templateInfo.left.getName(),
timeseriesPath.getMeasurement()));
}
if (alias != null && templateInfo.left.hasSchema(alias)) {
throw new RuntimeException(
new TemplateImcompatibeException(
timeseriesPath.getDevicePath().concatNode(alias).getFullPath(),
templateInfo.left.getName(),
alias));
}
}
}
private void checkIsTemplateCompatible(
PartialPath devicePath, List<String> measurements, List<String> aliasList) {
Pair<Template, PartialPath> templateInfo = schemaFetcher.checkTemplateSetInfo(devicePath);
if (templateInfo != null) {
Template template = templateInfo.left;
for (String measurement : measurements) {
if (template.hasSchema(measurement)) {
throw new RuntimeException(
new TemplateImcompatibeException(
devicePath.concatNode(measurement).getFullPath(),
templateInfo.left.getName(),
measurement));
}
}
if (aliasList == null) {
return;
}
for (String alias : aliasList) {
if (template.hasSchema(alias)) {
throw new RuntimeException(
new TemplateImcompatibeException(
devicePath.concatNode(alias).getFullPath(), templateInfo.left.getName(), alias));
}
}
}
}
@Override
public Analysis visitCreateAlignedTimeseries(
CreateAlignedTimeSeriesStatement createAlignedTimeSeriesStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
List<String> measurements = createAlignedTimeSeriesStatement.getMeasurements();
Set<String> measurementsSet = new HashSet<>(measurements);
if (measurementsSet.size() < measurements.size()) {
throw new SemanticException(
"Measurement under an aligned device is not allowed to have the same measurement name");
}
Analysis analysis = new Analysis();
analysis.setStatement(createAlignedTimeSeriesStatement);
checkIsTemplateCompatible(
createAlignedTimeSeriesStatement.getDevicePath(),
createAlignedTimeSeriesStatement.getMeasurements(),
createAlignedTimeSeriesStatement.getAliasList());
PathPatternTree pathPatternTree = new PathPatternTree();
for (String measurement : createAlignedTimeSeriesStatement.getMeasurements()) {
pathPatternTree.appendFullPath(createAlignedTimeSeriesStatement.getDevicePath(), measurement);
}
SchemaPartition schemaPartitionInfo;
schemaPartitionInfo = partitionFetcher.getOrCreateSchemaPartition(pathPatternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
return analysis;
}
@Override
public Analysis visitInternalCreateTimeseries(
InternalCreateTimeSeriesStatement internalCreateTimeSeriesStatement,
MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(internalCreateTimeSeriesStatement);
checkIsTemplateCompatible(
internalCreateTimeSeriesStatement.getDevicePath(),
internalCreateTimeSeriesStatement.getMeasurements(),
null);
PathPatternTree pathPatternTree = new PathPatternTree();
for (String measurement : internalCreateTimeSeriesStatement.getMeasurements()) {
pathPatternTree.appendFullPath(
internalCreateTimeSeriesStatement.getDevicePath(), measurement);
}
SchemaPartition schemaPartitionInfo;
schemaPartitionInfo = partitionFetcher.getOrCreateSchemaPartition(pathPatternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
return analysis;
}
@Override
public Analysis visitCreateMultiTimeseries(
CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(createMultiTimeSeriesStatement);
List<PartialPath> timeseriesPathList = createMultiTimeSeriesStatement.getPaths();
List<String> aliasList = createMultiTimeSeriesStatement.getAliasList();
for (int i = 0; i < timeseriesPathList.size(); i++) {
checkIsTemplateCompatible(
timeseriesPathList.get(i), aliasList == null ? null : aliasList.get(i));
}
PathPatternTree patternTree = new PathPatternTree();
for (PartialPath path : createMultiTimeSeriesStatement.getPaths()) {
patternTree.appendFullPath(path);
}
SchemaPartition schemaPartitionInfo = partitionFetcher.getOrCreateSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
return analysis;
}
@Override
public Analysis visitAlterTimeseries(
AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(alterTimeSeriesStatement);
if (alterTimeSeriesStatement.getAlias() != null) {
checkIsTemplateCompatible(
alterTimeSeriesStatement.getPath(), alterTimeSeriesStatement.getAlias());
}
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendFullPath(alterTimeSeriesStatement.getPath());
SchemaPartition schemaPartitionInfo;
schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
return analysis;
}
@Override
public Analysis visitInsertTablet(
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(insertTabletStatement.getTimePartitionSlots());
DataPartition dataPartition =
partitionFetcher.getOrCreateDataPartition(
Collections.singletonList(dataPartitionQueryParam));
Analysis analysis = new Analysis();
analysis.setStatement(insertTabletStatement);
analysis.setDataPartitionInfo(dataPartition);
return analysis;
}
@Override
public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
DataPartition dataPartition =
partitionFetcher.getOrCreateDataPartition(
Collections.singletonList(dataPartitionQueryParam));
Analysis analysis = new Analysis();
analysis.setStatement(insertRowStatement);
analysis.setDataPartitionInfo(dataPartition);
return analysis;
}
@Override
public Analysis visitInsertRows(
InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) {
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
dataPartitionQueryParams.add(dataPartitionQueryParam);
}
DataPartition dataPartition =
partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams);
Analysis analysis = new Analysis();
analysis.setStatement(insertRowsStatement);
analysis.setDataPartitionInfo(dataPartition);
return analysis;
}
@Override
public Analysis visitInsertMultiTablets(
InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
for (InsertTabletStatement insertTabletStatement :
insertMultiTabletsStatement.getInsertTabletStatementList()) {
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(
insertTabletStatement.getTimePartitionSlots());
dataPartitionQueryParams.add(dataPartitionQueryParam);
}
DataPartition dataPartition =
partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams);
Analysis analysis = new Analysis();
analysis.setStatement(insertMultiTabletsStatement);
analysis.setDataPartitionInfo(dataPartition);
return analysis;
}
@Override
public Analysis visitInsertRowsOfOneDevice(
InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(
insertRowsOfOneDeviceStatement.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(
insertRowsOfOneDeviceStatement.getTimePartitionSlots());
DataPartition dataPartition =
partitionFetcher.getOrCreateDataPartition(
Collections.singletonList(dataPartitionQueryParam));
Analysis analysis = new Analysis();
analysis.setStatement(insertRowsOfOneDeviceStatement);
analysis.setDataPartitionInfo(dataPartition);
return analysis;
}
@Override
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Map<String, Long> device2MinTime = new HashMap<>();
Map<String, Long> device2MaxTime = new HashMap<>();
Map<String, Map<MeasurementSchema, File>> device2Schemas = new HashMap<>();
Map<String, Pair<Boolean, File>> device2IsAligned = new HashMap<>();
// analyze tsfile metadata
for (File tsFile : loadTsFileStatement.getTsFiles()) {
try {
TsFileResource resource =
analyzeTsFile(
loadTsFileStatement,
tsFile,
device2MinTime,
device2MaxTime,
device2Schemas,
device2IsAligned);
loadTsFileStatement.addTsFileResource(resource);
} catch (Exception e) {
logger.error(String.format("Parse file %s to resource error.", tsFile.getPath()), e);
throw new SemanticException(
String.format("Parse file %s to resource error", tsFile.getPath()));
}
}
// auto create and verify schema
if (loadTsFileStatement.isAutoCreateSchema()) {
try {
if (loadTsFileStatement.isVerifySchema()) {
verifyLoadingMeasurements(device2Schemas);
}
autoCreateSg(loadTsFileStatement.getSgLevel(), device2Schemas);
ISchemaTree schemaTree = autoCreateSchema(device2Schemas, device2IsAligned);
if (loadTsFileStatement.isVerifySchema()) {
verifySchema(schemaTree, device2Schemas, device2IsAligned);
}
} catch (Exception e) {
logger.error("Auto create or verify schema error.", e);
throw new SemanticException(
String.format(
"Auto create or verify schema error when executing statement %s.",
loadTsFileStatement));
}
}
// construct partition info
List<DataPartitionQueryParam> params = new ArrayList<>();
for (Map.Entry<String, Long> entry : device2MinTime.entrySet()) {
List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
String device = entry.getKey();
long endTime = device2MaxTime.get(device);
long interval =
IoTDBDescriptor.getInstance().getConfig().getTimePartitionIntervalForRouting();
long time = (entry.getValue() / interval) * interval;
for (; time <= endTime; time += interval) {
timePartitionSlots.add(TimePartitionUtils.getTimePartitionForRouting(time));
}
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(device);
dataPartitionQueryParam.setTimePartitionSlotList(timePartitionSlots);
params.add(dataPartitionQueryParam);
}
DataPartition dataPartition = partitionFetcher.getOrCreateDataPartition(params);
Analysis analysis = new Analysis();
analysis.setStatement(loadTsFileStatement);
analysis.setDataPartitionInfo(dataPartition);
return analysis;
}
private TsFileResource analyzeTsFile(
LoadTsFileStatement statement,
File tsFile,
Map<String, Long> device2MinTime,
Map<String, Long> device2MaxTime,
Map<String, Map<MeasurementSchema, File>> device2Schemas,
Map<String, Pair<Boolean, File>> device2IsAligned)
throws IOException, VerifyMetadataException {
try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) {
Map<String, List<TimeseriesMetadata>> device2Metadata = reader.getAllTimeseriesMetadata(true);
if (statement.isAutoCreateSchema() || statement.isVerifySchema()) {
// construct schema
for (Map.Entry<String, List<TimeseriesMetadata>> entry : device2Metadata.entrySet()) {
String device = entry.getKey();
List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
boolean isAligned = false;
for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
TSDataType dataType = timeseriesMetadata.getTSDataType();
if (!dataType.equals(TSDataType.VECTOR)) {
ChunkHeader chunkHeader =
getChunkHeaderByTimeseriesMetadata(reader, timeseriesMetadata);
MeasurementSchema measurementSchema =
new MeasurementSchema(
timeseriesMetadata.getMeasurementId(),
dataType,
chunkHeader.getEncodingType(),
chunkHeader.getCompressionType());
device2Schemas
.computeIfAbsent(device, o -> new HashMap<>())
.put(measurementSchema, tsFile);
} else {
isAligned = true;
}
}
boolean finalIsAligned = isAligned;
if (!device2IsAligned
.computeIfAbsent(device, o -> new Pair<>(finalIsAligned, tsFile))
.left
.equals(isAligned)) {
throw new VerifyMetadataException(
String.format(
"Device %s has different aligned definition in tsFile %s and other TsFile.",
device, tsFile.getParentFile()));
}
}
}
// construct TsFileResource
TsFileResource resource = new TsFileResource(tsFile);
FileLoaderUtils.updateTsFileResource(device2Metadata, resource);
resource.updatePlanIndexes(reader.getMinPlanIndex());
resource.updatePlanIndexes(reader.getMaxPlanIndex());
// construct device time range
for (String device : resource.getDevices()) {
device2MinTime.put(
device,
Math.min(
device2MinTime.getOrDefault(device, Long.MAX_VALUE),
resource.getStartTime(device)));
device2MaxTime.put(
device,
Math.max(
device2MaxTime.getOrDefault(device, Long.MIN_VALUE), resource.getEndTime(device)));
}
resource.setStatus(TsFileResourceStatus.CLOSED);
return resource;
}
}
private ChunkHeader getChunkHeaderByTimeseriesMetadata(
TsFileSequenceReader reader, TimeseriesMetadata timeseriesMetadata) throws IOException {
IChunkMetadata chunkMetadata = timeseriesMetadata.getChunkMetadataList().get(0);
reader.position(chunkMetadata.getOffsetOfChunkHeader());
return reader.readChunkHeader(reader.readMarker());
}
private void autoCreateSg(int sgLevel, Map<String, Map<MeasurementSchema, File>> device2Schemas)
throws VerifyMetadataException, LoadFileException, IllegalPathException {
sgLevel += 1; // e.g. "root.sg" means sgLevel = 1, "root.sg.test" means sgLevel=2
Set<PartialPath> sgSet = new HashSet<>();
for (String device : device2Schemas.keySet()) {
PartialPath devicePath = new PartialPath(device);
String[] nodes = devicePath.getNodes();
String[] sgNodes = new String[sgLevel];
if (nodes.length < sgLevel) {
throw new VerifyMetadataException(
String.format("Sg level %d is longer than device %s.", sgLevel, device));
}
for (int i = 0; i < sgLevel; i++) {
sgNodes[i] = nodes[i];
}
PartialPath sgPath = new PartialPath(sgNodes);
sgSet.add(sgPath);
}
for (PartialPath sgPath : sgSet) {
SetStorageGroupStatement statement = new SetStorageGroupStatement();
statement.setStorageGroupPath(sgPath);
executeSetStorageGroupStatement(statement);
}
}
private void executeSetStorageGroupStatement(Statement statement) throws LoadFileException {
long queryId = SessionManager.getInstance().requestQueryId(false);
ExecutionResult result =
Coordinator.getInstance()
.execute(
statement,
queryId,
null,
"",
partitionFetcher,
schemaFetcher,
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code != TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode()) {
logger.error(String.format("Set Storage group error, statement: %s.", statement));
logger.error(String.format("Set storage group result status : %s.", result.status));
throw new LoadFileException(
String.format("Can not execute set storage group statement: %s", statement));
}
}
private ISchemaTree autoCreateSchema(
Map<String, Map<MeasurementSchema, File>> device2Schemas,
Map<String, Pair<Boolean, File>> device2IsAligned)
throws IllegalPathException {
List<PartialPath> deviceList = new ArrayList<>();
List<String[]> measurementList = new ArrayList<>();
List<TSDataType[]> dataTypeList = new ArrayList<>();
List<TSEncoding[]> encodingsList = new ArrayList<>();
List<CompressionType[]> compressionTypesList = new ArrayList<>();
List<Boolean> isAlignedList = new ArrayList<>();
for (Map.Entry<String, Map<MeasurementSchema, File>> entry : device2Schemas.entrySet()) {
int measurementSize = entry.getValue().size();
String[] measurements = new String[measurementSize];
TSDataType[] tsDataTypes = new TSDataType[measurementSize];
TSEncoding[] encodings = new TSEncoding[measurementSize];
CompressionType[] compressionTypes = new CompressionType[measurementSize];
int index = 0;
for (MeasurementSchema measurementSchema : entry.getValue().keySet()) {
measurements[index] = measurementSchema.getMeasurementId();
tsDataTypes[index] = measurementSchema.getType();
encodings[index] = measurementSchema.getEncodingType();
compressionTypes[index++] = measurementSchema.getCompressor();
}
deviceList.add(new PartialPath(entry.getKey()));
measurementList.add(measurements);
dataTypeList.add(tsDataTypes);
encodingsList.add(encodings);
compressionTypesList.add(compressionTypes);
isAlignedList.add(device2IsAligned.get(entry.getKey()).left);
}
return SchemaValidator.validate(
deviceList,
measurementList,
dataTypeList,
encodingsList,
compressionTypesList,
isAlignedList);
}
private void verifyLoadingMeasurements(Map<String, Map<MeasurementSchema, File>> device2Schemas)
throws VerifyMetadataException {
for (Map.Entry<String, Map<MeasurementSchema, File>> deviceEntry : device2Schemas.entrySet()) {
Map<String, MeasurementSchema> id2Schema = new HashMap<>();
Map<MeasurementSchema, File> schema2TsFile = deviceEntry.getValue();
for (Map.Entry<MeasurementSchema, File> entry : schema2TsFile.entrySet()) {
String measurementId = entry.getKey().getMeasurementId();
if (!id2Schema.containsKey(measurementId)) {
id2Schema.put(measurementId, entry.getKey());
} else {
MeasurementSchema conflictSchema = id2Schema.get(measurementId);
String msg =
String.format(
"Measurement %s Conflict, TsFile %s has measurement: %s, TsFile %s has measurement %s.",
deviceEntry.getKey() + measurementId,
entry.getValue().getPath(),
entry.getKey(),
schema2TsFile.get(conflictSchema).getPath(),
conflictSchema);
logger.error(msg);
throw new VerifyMetadataException(msg);
}
}
}
}
private void verifySchema(
ISchemaTree schemaTree,
Map<String, Map<MeasurementSchema, File>> device2Schemas,
Map<String, Pair<Boolean, File>> device2IsAligned)
throws VerifyMetadataException, IllegalPathException {
for (Map.Entry<String, Map<MeasurementSchema, File>> entry : device2Schemas.entrySet()) {
String device = entry.getKey();
MeasurementSchema[] tsFileSchemas =
entry.getValue().keySet().toArray(new MeasurementSchema[0]);
DeviceSchemaInfo schemaInfo =
schemaTree.searchDeviceSchemaInfo(
new PartialPath(device),
Arrays.stream(tsFileSchemas)
.map(MeasurementSchema::getMeasurementId)
.collect(Collectors.toList()));
if (schemaInfo.isAligned() != device2IsAligned.get(device).left) {
throw new VerifyMetadataException(
device,
"Is aligned",
device2IsAligned.get(device).left.toString(),
device2IsAligned.get(device).right.getPath(),
String.valueOf(schemaInfo.isAligned()));
}
List<MeasurementSchema> originSchemaList = schemaInfo.getMeasurementSchemaList();
int measurementSize = originSchemaList.size();
for (int j = 0; j < measurementSize; j++) {
MeasurementSchema originSchema = originSchemaList.get(j);
MeasurementSchema tsFileSchema = tsFileSchemas[j];
String measurementPath =
device + TsFileConstant.PATH_SEPARATOR + originSchema.getMeasurementId();
if (!tsFileSchema.getType().equals(originSchema.getType())) {
throw new VerifyMetadataException(
measurementPath,
"Datatype",
tsFileSchema.getType().name(),
entry.getValue().get(tsFileSchema).getPath(),
originSchema.getType().name());
}
if (!tsFileSchema.getEncodingType().equals(originSchema.getEncodingType())) {
throw new VerifyMetadataException(
measurementPath,
"Encoding",
tsFileSchema.getEncodingType().name(),
entry.getValue().get(tsFileSchema).getPath(),
originSchema.getEncodingType().name());
}
if (!tsFileSchema.getCompressor().equals(originSchema.getCompressor())) {
throw new VerifyMetadataException(
measurementPath,
"Compress type",
tsFileSchema.getCompressor().name(),
entry.getValue().get(tsFileSchema).getPath(),
originSchema.getCompressor().name());
}
}
}
}
@Override
public Analysis visitShowTimeSeries(
ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(showTimeSeriesStatement);
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(showTimeSeriesStatement.getPathPattern());
SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
Map<Integer, Template> templateMap =
schemaFetcher.checkAllRelatedTemplate(showTimeSeriesStatement.getPathPattern());
analysis.setRelatedTemplateInfo(templateMap);
if (showTimeSeriesStatement.isOrderByHeat()) {
patternTree.constructTree();
// request schema fetch API
logger.info("[StartFetchSchema]");
ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
logger.info("[EndFetchSchema]]");
List<MeasurementPath> allSelectedPath = schemaTree.getAllMeasurement();
Set<Expression> sourceExpressions =
allSelectedPath.stream()
.map(TimeSeriesOperand::new)
.collect(Collectors.toCollection(LinkedHashSet::new));
analysis.setSourceExpressions(sourceExpressions);
sourceExpressions.forEach(expression -> analyzeExpression(analysis, expression));
Set<String> deviceSet =
allSelectedPath.stream().map(MeasurementPath::getDevice).collect(Collectors.toSet());
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
for (String devicePath : deviceSet) {
DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
queryParam.setDevicePath(devicePath);
sgNameToQueryParamsMap
.computeIfAbsent(
schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
.add(queryParam);
}
DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
analysis.setDataPartitionInfo(dataPartition);
}
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowTimeSeriesHeader());
return analysis;
}
@Override
public Analysis visitShowStorageGroup(
ShowStorageGroupStatement showStorageGroupStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(showStorageGroupStatement);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowStorageGroupHeader());
return analysis;
}
@Override
public Analysis visitShowTTL(ShowTTLStatement showTTLStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(showTTLStatement);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowTTLHeader());
return analysis;
}
@Override
public Analysis visitShowDevices(
ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(showDevicesStatement);
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(
showDevicesStatement.getPathPattern().concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
analysis.setRespDatasetHeader(
showDevicesStatement.hasSgCol()
? DatasetHeaderFactory.getShowDevicesWithSgHeader()
: DatasetHeaderFactory.getShowDevicesHeader());
return analysis;
}
@Override
public Analysis visitShowCluster(
ShowClusterStatement showClusterStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(showClusterStatement);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowClusterHeader());
return analysis;
}
@Override
public Analysis visitCountStorageGroup(
CountStorageGroupStatement countStorageGroupStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(countStorageGroupStatement);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getCountStorageGroupHeader());
return analysis;
}
@Override
public Analysis visitSchemaFetch(
SchemaFetchStatement schemaFetchStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(schemaFetchStatement);
SchemaPartition schemaPartition =
partitionFetcher.getSchemaPartition(schemaFetchStatement.getPatternTree());
analysis.setSchemaPartitionInfo(schemaPartition);
if (schemaPartition.isEmpty()) {
analysis.setFinishQueryAfterAnalyze(true);
}
return analysis;
}
@Override
public Analysis visitCountDevices(
CountDevicesStatement countDevicesStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(countDevicesStatement);
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(
countDevicesStatement.getPathPattern().concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getCountDevicesHeader());
return analysis;
}
@Override
public Analysis visitCountTimeSeries(
CountTimeSeriesStatement countTimeSeriesStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(countTimeSeriesStatement);
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(countTimeSeriesStatement.getPathPattern());
SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
Map<Integer, Template> templateMap =
schemaFetcher.checkAllRelatedTemplate(countTimeSeriesStatement.getPathPattern());
analysis.setRelatedTemplateInfo(templateMap);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getCountTimeSeriesHeader());
return analysis;
}
@Override
public Analysis visitCountLevelTimeSeries(
CountLevelTimeSeriesStatement countLevelTimeSeriesStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(countLevelTimeSeriesStatement);
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(countLevelTimeSeriesStatement.getPathPattern());
SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getCountLevelTimeSeriesHeader());
return analysis;
}
@Override
public Analysis visitCountNodes(CountNodesStatement countStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(countStatement);
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(countStatement.getPathPattern());
SchemaNodeManagementPartition schemaNodeManagementPartition =
partitionFetcher.getSchemaNodeManagementPartitionWithLevel(
patternTree, countStatement.getLevel());
if (schemaNodeManagementPartition == null) {
return analysis;
}
if (!schemaNodeManagementPartition.getMatchedNode().isEmpty()
&& schemaNodeManagementPartition.getSchemaPartition().getSchemaPartitionMap().size() == 0) {
analysis.setFinishQueryAfterAnalyze(true);
}
analysis.setMatchedNodes(schemaNodeManagementPartition.getMatchedNode());
analysis.setSchemaPartitionInfo(schemaNodeManagementPartition.getSchemaPartition());
analysis.setRespDatasetHeader(DatasetHeaderFactory.getCountNodesHeader());
return analysis;
}
@Override
public Analysis visitShowChildPaths(
ShowChildPathsStatement showChildPathsStatement, MPPQueryContext context) {
return visitSchemaNodeManagementPartition(
showChildPathsStatement,
showChildPathsStatement.getPartialPath(),
DatasetHeaderFactory.getShowChildPathsHeader());
}
@Override
public Analysis visitShowChildNodes(
ShowChildNodesStatement showChildNodesStatement, MPPQueryContext context) {
return visitSchemaNodeManagementPartition(
showChildNodesStatement,
showChildNodesStatement.getPartialPath(),
DatasetHeaderFactory.getShowChildNodesHeader());
}
@Override
public Analysis visitShowVersion(
ShowVersionStatement showVersionStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(showVersionStatement);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowVersionHeader());
analysis.setFinishQueryAfterAnalyze(true);
return analysis;
}
private Analysis visitSchemaNodeManagementPartition(
Statement statement, PartialPath path, DatasetHeader header) {
Analysis analysis = new Analysis();
analysis.setStatement(statement);
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(path);
SchemaNodeManagementPartition schemaNodeManagementPartition =
partitionFetcher.getSchemaNodeManagementPartition(patternTree);
if (schemaNodeManagementPartition == null) {
return analysis;
}
if (!schemaNodeManagementPartition.getMatchedNode().isEmpty()
&& schemaNodeManagementPartition.getSchemaPartition().getSchemaPartitionMap().size() == 0) {
analysis.setFinishQueryAfterAnalyze(true);
}
analysis.setMatchedNodes(schemaNodeManagementPartition.getMatchedNode());
analysis.setSchemaPartitionInfo(schemaNodeManagementPartition.getSchemaPartition());
analysis.setRespDatasetHeader(header);
return analysis;
}
@Override
public Analysis visitDeleteData(
DeleteDataStatement deleteDataStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(deleteDataStatement);
PathPatternTree patternTree = new PathPatternTree();
for (PartialPath pathPattern : deleteDataStatement.getPathList()) {
patternTree.appendPathPattern(pathPattern);
}
ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
analysis.setSchemaTree(schemaTree);
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
schemaTree
.getMatchedDevices(new PartialPath(ALL_RESULT_NODES))
.forEach(
deviceSchemaInfo -> {
PartialPath devicePath = deviceSchemaInfo.getDevicePath();
DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
queryParam.setDevicePath(devicePath.getFullPath());
sgNameToQueryParamsMap
.computeIfAbsent(
schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
.add(queryParam);
});
DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
analysis.setDataPartitionInfo(dataPartition);
if (dataPartition.isEmpty()) {
analysis.setFinishQueryAfterAnalyze(true);
}
return analysis;
}
@Override
public Analysis visitCreateSchemaTemplate(
CreateSchemaTemplateStatement createTemplateStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
List<List<String>> measurementsList = createTemplateStatement.getMeasurements();
for (List measurements : measurementsList) {
Set<String> measurementsSet = new HashSet<>(measurements);
if (measurementsSet.size() < measurements.size()) {
throw new SemanticException(
"Measurement under an aligned device is not allowed to have the same measurement name");
}
}
Analysis analysis = new Analysis();
analysis.setStatement(createTemplateStatement);
return analysis;
}
@Override
public Analysis visitShowNodesInSchemaTemplate(
ShowNodesInSchemaTemplateStatement showNodesInSchemaTemplateStatement,
MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(showNodesInSchemaTemplateStatement);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowNodesInSchemaTemplateHeader());
return analysis;
}
@Override
public Analysis visitShowSchemaTemplate(
ShowSchemaTemplateStatement showSchemaTemplateStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(showSchemaTemplateStatement);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowSchemaTemplateHeader());
return analysis;
}
private GroupByFilter initGroupByFilter(GroupByTimeComponent groupByTimeComponent) {
if (groupByTimeComponent.isIntervalByMonth() || groupByTimeComponent.isSlidingStepByMonth()) {
return new GroupByMonthFilter(
groupByTimeComponent.getInterval(),
groupByTimeComponent.getSlidingStep(),
groupByTimeComponent.getStartTime(),
groupByTimeComponent.getEndTime(),
groupByTimeComponent.isSlidingStepByMonth(),
groupByTimeComponent.isIntervalByMonth(),
TimeZone.getTimeZone("+00:00"));
} else {
long startTime =
groupByTimeComponent.isLeftCRightO()
? groupByTimeComponent.getStartTime()
: groupByTimeComponent.getStartTime() + 1;
long endTime =
groupByTimeComponent.isLeftCRightO()
? groupByTimeComponent.getEndTime()
: groupByTimeComponent.getEndTime() + 1;
return new GroupByFilter(
groupByTimeComponent.getInterval(),
groupByTimeComponent.getSlidingStep(),
startTime,
endTime);
}
}
@Override
public Analysis visitSetSchemaTemplate(
SetSchemaTemplateStatement setSchemaTemplateStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(setSchemaTemplateStatement);
return analysis;
}
@Override
public Analysis visitShowPathSetTemplate(
ShowPathSetTemplateStatement showPathSetTemplateStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(showPathSetTemplateStatement);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowPathSetTemplateHeader());
return analysis;
}
@Override
public Analysis visitActivateTemplate(
ActivateTemplateStatement activateTemplateStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(activateTemplateStatement);
PartialPath activatePath = activateTemplateStatement.getPath();
Pair<Template, PartialPath> templateSetInfo = schemaFetcher.checkTemplateSetInfo(activatePath);
if (templateSetInfo == null) {
throw new StatementAnalyzeException(
new MetadataException(
String.format(
"Path [%s] has not been set any template.", activatePath.getFullPath())));
}
analysis.setTemplateSetInfo(
new Pair<>(templateSetInfo.left, Collections.singletonList(templateSetInfo.right)));
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(activatePath.concatNode(ONE_LEVEL_PATH_WILDCARD));
SchemaPartition partition = partitionFetcher.getOrCreateSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(partition);
return analysis;
}
@Override
public Analysis visitShowPathsUsingTemplate(
ShowPathsUsingTemplateStatement showPathsUsingTemplateStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(showPathsUsingTemplateStatement);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowPathsUsingTemplateHeader());
Pair<Template, List<PartialPath>> templateSetInfo =
schemaFetcher.getAllPathsSetTemplate(showPathsUsingTemplateStatement.getTemplateName());
analysis.setTemplateSetInfo(templateSetInfo);
if (templateSetInfo == null
|| templateSetInfo.right == null
|| templateSetInfo.right.isEmpty()) {
analysis.setFinishQueryAfterAnalyze(true);
return analysis;
}
PathPatternTree patternTree = new PathPatternTree();
templateSetInfo.right.forEach(
path -> {
patternTree.appendPathPattern(path);
patternTree.appendPathPattern(path.concatNode(MULTI_LEVEL_PATH_WILDCARD));
});
SchemaPartition partition = partitionFetcher.getOrCreateSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(partition);
if (partition.isEmpty()) {
analysis.setFinishQueryAfterAnalyze(true);
return analysis;
}
return analysis;
}
@Override
public Analysis visitShowPipeSinkType(
ShowPipeSinkTypeStatement showPipeSinkTypeStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(showPipeSinkTypeStatement);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowPipeSinkTypeHeader());
analysis.setFinishQueryAfterAnalyze(true);
return analysis;
}
}