| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.mpp.plan.analyze; |
| |
| import org.apache.iotdb.commons.conf.IoTDBConstant; |
| import org.apache.iotdb.commons.exception.MetadataException; |
| import org.apache.iotdb.commons.partition.DataPartition; |
| import org.apache.iotdb.commons.partition.DataPartitionQueryParam; |
| import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition; |
| import org.apache.iotdb.commons.partition.SchemaPartition; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.db.exception.metadata.template.TemplateImcompatibeException; |
| import org.apache.iotdb.db.exception.sql.MeasurementNotExistException; |
| import org.apache.iotdb.db.exception.sql.SemanticException; |
| import org.apache.iotdb.db.exception.sql.StatementAnalyzeException; |
| import org.apache.iotdb.db.metadata.path.MeasurementPath; |
| import org.apache.iotdb.db.metadata.template.Template; |
| import org.apache.iotdb.db.mpp.common.MPPQueryContext; |
| import org.apache.iotdb.db.mpp.common.header.ColumnHeader; |
| import org.apache.iotdb.db.mpp.common.header.DatasetHeader; |
| import org.apache.iotdb.db.mpp.common.header.HeaderConstant; |
| import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo; |
| import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree; |
| import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree; |
| import org.apache.iotdb.db.mpp.plan.expression.Expression; |
| import org.apache.iotdb.db.mpp.plan.expression.ExpressionType; |
| import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand; |
| import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression; |
| import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor; |
| import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; |
| import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter; |
| import org.apache.iotdb.db.mpp.plan.statement.Statement; |
| import org.apache.iotdb.db.mpp.plan.statement.StatementNode; |
| import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor; |
| import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent; |
| import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent; |
| import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; |
| import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn; |
| import org.apache.iotdb.db.mpp.plan.statement.component.SortItem; |
| import org.apache.iotdb.db.mpp.plan.statement.component.SortKey; |
| import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition; |
| import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDevicesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.CountNodesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTimeSeriesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchemaTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathsUsingTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.read.filter.GroupByFilter; |
| import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter; |
| import org.apache.iotdb.tsfile.read.filter.basic.Filter; |
| import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory; |
| import org.apache.iotdb.tsfile.utils.Pair; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.TimeZone; |
| import java.util.stream.Collectors; |
| |
| import static com.google.common.base.Preconditions.checkState; |
| import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; |
| import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD; |
| import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_RESULT_NODES; |
| |
| /** This visitor is used to analyze each type of Statement and returns the {@link Analysis}. */ |
| public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> { |
| |
| private static final Logger logger = LoggerFactory.getLogger(Analyzer.class); |
| |
| private final IPartitionFetcher partitionFetcher; |
| private final ISchemaFetcher schemaFetcher; |
| private final TypeProvider typeProvider; |
| private final MPPQueryContext context; |
| |
| public AnalyzeVisitor( |
| IPartitionFetcher partitionFetcher, |
| ISchemaFetcher schemaFetcher, |
| TypeProvider typeProvider, |
| MPPQueryContext context) { |
| this.context = context; |
| this.partitionFetcher = partitionFetcher; |
| this.schemaFetcher = schemaFetcher; |
| this.typeProvider = typeProvider; |
| } |
| |
| private String getLogHeader() { |
| return String.format("Query[%s]:", context.getQueryId()); |
| } |
| |
| @Override |
| public Analysis visitNode(StatementNode node, MPPQueryContext context) { |
| throw new UnsupportedOperationException( |
| "Unsupported statement type: " + node.getClass().getName()); |
| } |
| |
| @Override |
| public Analysis visitExplain(ExplainStatement explainStatement, MPPQueryContext context) { |
| Analysis analysis = visitQuery(explainStatement.getQueryStatement(), context); |
| analysis.setStatement(explainStatement); |
| analysis.setFinishQueryAfterAnalyze(true); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| try { |
| // check for semantic errors |
| queryStatement.semanticCheck(); |
| |
| // concat path and construct path pattern tree |
| PathPatternTree patternTree = new PathPatternTree(); |
| queryStatement = |
| (QueryStatement) new ConcatPathRewriter().rewrite(queryStatement, patternTree); |
| analysis.setStatement(queryStatement); |
| |
| // request schema fetch API |
| logger.info("{} fetch query schema...", getLogHeader()); |
| ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree); |
| logger.info("{} fetch schema done", getLogHeader()); |
| // If there is no leaf node in the schema tree, the query should be completed immediately |
| if (schemaTree.isEmpty()) { |
| analysis.setFinishQueryAfterAnalyze(true); |
| return analysis; |
| } |
| |
| // extract global time filter from query filter and determine if there is a value filter |
| Pair<Filter, Boolean> resultPair = analyzeGlobalTimeFilter(queryStatement); |
| Filter globalTimeFilter = resultPair.left; |
| boolean hasValueFilter = resultPair.right; |
| analysis.setGlobalTimeFilter(globalTimeFilter); |
| analysis.setHasValueFilter(hasValueFilter); |
| |
| if (queryStatement.isLastQuery()) { |
| if (hasValueFilter) { |
| throw new SemanticException("Only time filters are supported in LAST query"); |
| } |
| analysis.setMergeOrderParameter(analyzeOrderBy(queryStatement)); |
| return analyzeLast(analysis, schemaTree.getAllMeasurement(), schemaTree); |
| } |
| |
| // Example 1: select s1, s1 + s2 as t, udf(udf(s1)) from root.sg.d1 |
| // outputExpressions: [<root.sg.d1.s1,null>, <root.sg.d1.s1 + root.sg.d1.s2,t>, |
| // <udf(udf(root.sg.d1.s1)),null>] |
| // transformExpressions: [root.sg.d1.s1, root.sg.d1.s1 + root.sg.d1.s2, |
| // udf(udf(root.sg.d1.s1))] |
| // sourceExpressions: {root.sg.d1 -> [root.sg.d1.s1, root.sg.d1.s2]} |
| // |
| // Example 2: select s1, s2, s3 as t from root.sg.* align by device |
| // outputExpressions: [<s1,null>, <s2,null>, <s1,t>] |
| // transformExpressions: [root.sg.d1.s1, root.sg.d1.s2, root.sg.d1.s3, |
| // root.sg.d2.s1, root.sg.d2.s2] |
| // sourceExpressions: {root.sg.d1 -> [root.sg.d1.s1, root.sg.d1.s2, root.sg.d1.s2], |
| // root.sg.d2 -> [root.sg.d2.s1, root.sg.d2.s2]} |
| // |
| // Example 3: select sum(s1) + 1 as t, count(s2) from root.sg.d1 |
| // outputExpressions: [<sum(root.sg.d1.s1) + 1,t>, <count(root.sg.d1.s2),t>] |
| // transformExpressions: [sum(root.sg.d1.s1) + 1, count(root.sg.d1.s2)] |
| // aggregationExpressions: {root.sg.d1 -> [sum(root.sg.d1.s1), count(root.sg.d1.s2)]} |
| // sourceExpressions: {root.sg.d1 -> [sum(root.sg.d1.s1), count(root.sg.d1.s2)]} |
| // |
| // Example 4: select sum(s1) + 1 as t, count(s2) from root.sg.d1 where s1 > 1 |
| // outputExpressions: [<sum(root.sg.d1.s1) + 1,t>, <count(root.sg.d1.s2),t>] |
| // transformExpressions: [sum(root.sg.d1.s1) + 1, count(root.sg.d1.s2)] |
| // aggregationExpressions: {root.sg.d1 -> [sum(root.sg.d1.s1), count(root.sg.d1.s2)]} |
| // sourceExpressions: {root.sg.d1 -> [root.sg.d1.s1, root.sg.d1.s2]} |
| List<Pair<Expression, String>> outputExpressions; |
| if (queryStatement.isAlignByDevice()) { |
| Map<String, Set<Expression>> deviceToTransformExpressions = new HashMap<>(); |
| Map<String, Set<Expression>> deviceToTransformExpressionsInHaving = new HashMap<>(); |
| |
| // all selected device |
| Set<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree); |
| |
| Map<String, Set<String>> deviceToMeasurementsMap = new HashMap<>(); |
| outputExpressions = |
| analyzeSelect( |
| queryStatement, |
| schemaTree, |
| deviceList, |
| deviceToTransformExpressions, |
| deviceToMeasurementsMap); |
| |
| if (queryStatement.hasHaving()) { |
| List<PartialPath> measurementNotExistDevices = new ArrayList<>(); |
| for (PartialPath device : deviceList) { |
| try { |
| deviceToTransformExpressionsInHaving.put( |
| device.toString(), |
| ExpressionAnalyzer.removeWildcardInFilterByDevice( |
| queryStatement.getHavingCondition().getPredicate(), |
| device, |
| schemaTree, |
| typeProvider, |
| false) |
| .stream() |
| .collect(Collectors.toSet())); |
| } catch (SemanticException e) { |
| if (e instanceof MeasurementNotExistException) { |
| logger.warn(e.getMessage()); |
| measurementNotExistDevices.add(device); |
| continue; |
| } |
| throw e; |
| } |
| } |
| for (PartialPath measurementNotExistDevice : measurementNotExistDevices) { |
| deviceList.remove(measurementNotExistDevice); |
| deviceToTransformExpressions.remove(measurementNotExistDevice.getFullPath()); |
| } |
| } |
| |
| Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>(); |
| List<String> allMeasurements = |
| outputExpressions.stream() |
| .map(Pair::getLeft) |
| .map(Expression::getExpressionString) |
| .distinct() |
| .collect(Collectors.toList()); |
| for (String deviceName : deviceToMeasurementsMap.keySet()) { |
| List<String> measurementsUnderDevice = |
| new ArrayList<>(deviceToMeasurementsMap.get(deviceName)); |
| List<Integer> indexes = new ArrayList<>(); |
| for (String measurement : measurementsUnderDevice) { |
| indexes.add( |
| allMeasurements.indexOf(measurement) + 1); // add 1 to skip the device column |
| } |
| deviceToMeasurementIndexesMap.put(deviceName, indexes); |
| } |
| analysis.setDeviceToMeasurementIndexesMap(deviceToMeasurementIndexesMap); |
| |
| Map<String, Set<Expression>> deviceToSourceExpressions = new HashMap<>(); |
| boolean isValueFilterAggregation = queryStatement.isAggregationQuery() && hasValueFilter; |
| |
| Map<String, Boolean> deviceToIsRawDataSource = new HashMap<>(); |
| |
| Map<String, Set<Expression>> deviceToAggregationExpressions = new HashMap<>(); |
| Map<String, Set<Expression>> deviceToAggregationTransformExpressions = new HashMap<>(); |
| Map<String, Expression> deviceToHavingExpression = |
| new HashMap<>(); // store filter of every device |
| for (String deviceName : deviceToTransformExpressions.keySet()) { |
| Set<Expression> transformExpressions = deviceToTransformExpressions.get(deviceName); |
| Set<Expression> transformExpressionsInHaving = |
| deviceToTransformExpressionsInHaving.get(deviceName); |
| Set<Expression> aggregationExpressions = new LinkedHashSet<>(); |
| Set<Expression> aggregationTransformExpressions = new LinkedHashSet<>(); |
| |
| boolean isHasRawDataInputAggregation = false; |
| if (queryStatement.isAggregationQuery()) { |
| // true if nested expressions and UDFs exist in aggregation function |
| // i.e. select sum(s1 + 1) from root.sg.d1 align by device |
| isHasRawDataInputAggregation = |
| analyzeAggregation( |
| transformExpressions, aggregationExpressions, aggregationTransformExpressions); |
| |
| if (queryStatement.hasHaving() && transformExpressionsInHaving != null) { |
| List<Expression> aggregationExpressionsInHaving = new ArrayList<>(); |
| isHasRawDataInputAggregation |= |
| analyzeAggregationInHaving( |
| transformExpressionsInHaving, |
| aggregationExpressionsInHaving, |
| aggregationExpressions, |
| aggregationTransformExpressions); |
| |
| Expression havingExpression; |
| havingExpression = |
| analyzeHavingSplitByDevice( |
| transformExpressionsInHaving); // construct Filter from Having |
| |
| havingExpression.inferTypes(typeProvider); |
| deviceToHavingExpression.put(deviceName, havingExpression); |
| } |
| |
| deviceToAggregationExpressions.put(deviceName, aggregationExpressions); |
| deviceToAggregationTransformExpressions.put( |
| deviceName, aggregationTransformExpressions); |
| } |
| |
| boolean isRawDataSource = |
| !queryStatement.isAggregationQuery() |
| || isValueFilterAggregation |
| || isHasRawDataInputAggregation; |
| |
| for (Expression expression : transformExpressions) { |
| updateSource( |
| expression, |
| deviceToSourceExpressions.computeIfAbsent(deviceName, key -> new LinkedHashSet<>()), |
| isRawDataSource); |
| } |
| |
| if (queryStatement.hasHaving() && transformExpressionsInHaving != null) { |
| for (Expression expression : transformExpressionsInHaving) { |
| updateSource( |
| expression, |
| deviceToSourceExpressions.computeIfAbsent( |
| deviceName, key -> new LinkedHashSet<>()), |
| isRawDataSource); |
| } |
| } |
| deviceToIsRawDataSource.put(deviceName, isRawDataSource); |
| } |
| analysis.setDeviceToAggregationExpressions(deviceToAggregationExpressions); |
| analysis.setDeviceToAggregationTransformExpressions( |
| deviceToAggregationTransformExpressions); |
| analysis.setDeviceToIsRawDataSource(deviceToIsRawDataSource); |
| analysis.setDeviceToHavingExpression(deviceToHavingExpression); |
| |
| if (queryStatement.getWhereCondition() != null) { |
| Map<String, Expression> deviceToQueryFilter = new HashMap<>(); |
| Iterator<PartialPath> deviceIterator = deviceList.iterator(); |
| while (deviceIterator.hasNext()) { |
| PartialPath devicePath = deviceIterator.next(); |
| Expression queryFilter = null; |
| try { |
| queryFilter = analyzeWhereSplitByDevice(queryStatement, devicePath, schemaTree); |
| } catch (SemanticException e) { |
| if (e instanceof MeasurementNotExistException) { |
| logger.warn(e.getMessage()); |
| deviceIterator.remove(); |
| deviceToSourceExpressions.remove(devicePath.getFullPath()); |
| continue; |
| } |
| throw e; |
| } |
| deviceToQueryFilter.put(devicePath.getFullPath(), queryFilter); |
| queryFilter.inferTypes(typeProvider); |
| updateSource( |
| queryFilter, |
| deviceToSourceExpressions.computeIfAbsent( |
| devicePath.getFullPath(), key -> new LinkedHashSet<>()), |
| true); |
| } |
| analysis.setDeviceToQueryFilter(deviceToQueryFilter); |
| } |
| analysis.setDeviceToSourceExpressions(deviceToSourceExpressions); |
| analysis.setDeviceToTransformExpressions(deviceToTransformExpressions); |
| } else { |
| outputExpressions = analyzeSelect(queryStatement, schemaTree); |
| Set<Expression> transformExpressions = |
| outputExpressions.stream() |
| .map(Pair::getLeft) |
| .collect(Collectors.toCollection(LinkedHashSet::new)); |
| |
| // get removeWildcard Expressions in Having |
| // used to analyzeAggregation in Having expression and updateSource |
| Set<Expression> transformExpressionsInHaving = |
| queryStatement.hasHaving() |
| ? ExpressionAnalyzer.removeWildcardInFilter( |
| queryStatement.getHavingCondition().getPredicate(), |
| queryStatement.getFromComponent().getPrefixPaths(), |
| schemaTree, |
| typeProvider, |
| false) |
| .stream() |
| .collect(Collectors.toSet()) |
| : null; |
| |
| if (queryStatement.isGroupByLevel()) { |
| // map from grouped expression to set of input expressions |
| Map<Expression, Expression> rawPathToGroupedPathMap = new HashMap<>(); |
| Map<Expression, Set<Expression>> groupByLevelExpressions = |
| analyzeGroupByLevel( |
| queryStatement, outputExpressions, transformExpressions, rawPathToGroupedPathMap); |
| analysis.setGroupByLevelExpressions(groupByLevelExpressions); |
| analysis.setRawPathToGroupedPathMap(rawPathToGroupedPathMap); |
| } |
| |
| // true if nested expressions and UDFs exist in aggregation function |
| // i.e. select sum(s1 + 1) from root.sg.d1 |
| boolean isHasRawDataInputAggregation = false; |
| if (queryStatement.isAggregationQuery()) { |
| Set<Expression> aggregationExpressions = new HashSet<>(); |
| Set<Expression> aggregationTransformExpressions = new HashSet<>(); |
| List<Expression> aggregationExpressionsInHaving = |
| new ArrayList<>(); // as input of GroupByLevelController |
| isHasRawDataInputAggregation = |
| analyzeAggregation( |
| transformExpressions, aggregationExpressions, aggregationTransformExpressions); |
| if (queryStatement.hasHaving()) { |
| isHasRawDataInputAggregation |= |
| analyzeAggregationInHaving( |
| transformExpressionsInHaving, |
| aggregationExpressionsInHaving, |
| aggregationExpressions, |
| aggregationTransformExpressions); |
| |
| Expression havingExpression = // construct Filter from Having |
| analyzeHaving( |
| queryStatement, |
| analysis.getGroupByLevelExpressions(), |
| transformExpressionsInHaving, |
| aggregationExpressionsInHaving); |
| havingExpression.inferTypes(typeProvider); |
| analysis.setHavingExpression(havingExpression); |
| } |
| analysis.setAggregationExpressions(aggregationExpressions); |
| analysis.setAggregationTransformExpressions(aggregationTransformExpressions); |
| } |
| |
| // generate sourceExpression according to transformExpressions |
| Set<Expression> sourceExpressions = new HashSet<>(); |
| boolean isValueFilterAggregation = queryStatement.isAggregationQuery() && hasValueFilter; |
| boolean isRawDataSource = |
| !queryStatement.isAggregationQuery() |
| || isValueFilterAggregation |
| || isHasRawDataInputAggregation; |
| for (Expression expression : transformExpressions) { |
| updateSource(expression, sourceExpressions, isRawDataSource); |
| } |
| |
| if (queryStatement.hasHaving()) { |
| for (Expression expression : transformExpressionsInHaving) { |
| updateSource(expression, sourceExpressions, isRawDataSource); |
| } |
| } |
| |
| if (queryStatement.getWhereCondition() != null) { |
| Expression queryFilter = analyzeWhere(queryStatement, schemaTree); |
| |
| // update sourceExpression according to queryFilter |
| queryFilter.inferTypes(typeProvider); |
| updateSource(queryFilter, sourceExpressions, isRawDataSource); |
| analysis.setQueryFilter(queryFilter); |
| } |
| analysis.setRawDataSource(isRawDataSource); |
| analysis.setSourceExpressions(sourceExpressions); |
| analysis.setTransformExpressions(transformExpressions); |
| } |
| |
| if (queryStatement.isGroupByTime()) { |
| GroupByTimeComponent groupByTimeComponent = queryStatement.getGroupByTimeComponent(); |
| if ((groupByTimeComponent.isIntervalByMonth() |
| || groupByTimeComponent.isSlidingStepByMonth()) |
| && queryStatement.getResultTimeOrder() == Ordering.DESC) { |
| throw new SemanticException("Group by month doesn't support order by time desc now."); |
| } |
| analysis.setGroupByTimeParameter(new GroupByTimeParameter(groupByTimeComponent)); |
| } |
| |
| if (queryStatement.getFillComponent() != null) { |
| FillComponent fillComponent = queryStatement.getFillComponent(); |
| List<Expression> fillColumnList = |
| outputExpressions.stream().map(Pair::getLeft).distinct().collect(Collectors.toList()); |
| analysis.setFillDescriptor( |
| new FillDescriptor(fillComponent.getFillPolicy(), fillComponent.getFillValue())); |
| } |
| |
| // generate result set header according to output expressions |
| DatasetHeader datasetHeader = analyzeOutput(queryStatement, outputExpressions); |
| analysis.setRespDatasetHeader(datasetHeader); |
| analysis.setTypeProvider(typeProvider); |
| |
| // fetch partition information |
| Set<String> deviceSet = new HashSet<>(); |
| if (queryStatement.isAlignByDevice()) { |
| deviceSet = analysis.getDeviceToSourceExpressions().keySet(); |
| } else { |
| for (Expression expression : analysis.getSourceExpressions()) { |
| deviceSet.add(ExpressionAnalyzer.getDeviceNameInSourceExpression(expression)); |
| } |
| } |
| DataPartition dataPartition = fetchDataPartitionByDevices(deviceSet, schemaTree); |
| analysis.setDataPartitionInfo(dataPartition); |
| } catch (StatementAnalyzeException e) { |
| logger.error("Meet error when analyzing the query statement: ", e); |
| throw new StatementAnalyzeException( |
| "Meet error when analyzing the query statement: " + e.getMessage()); |
| } |
| return analysis; |
| } |
| |
| private List<Pair<Expression, String>> analyzeSelect( |
| QueryStatement queryStatement, ISchemaTree schemaTree) { |
| List<Pair<Expression, String>> outputExpressions = new ArrayList<>(); |
| boolean isGroupByLevel = queryStatement.isGroupByLevel(); |
| ColumnPaginationController paginationController = |
| new ColumnPaginationController( |
| queryStatement.getSeriesLimit(), |
| queryStatement.getSeriesOffset(), |
| queryStatement.isLastQuery() || isGroupByLevel); |
| |
| for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) { |
| boolean hasAlias = resultColumn.hasAlias(); |
| List<Expression> resultExpressions = |
| ExpressionAnalyzer.removeWildcardInExpression(resultColumn.getExpression(), schemaTree); |
| if (hasAlias && !queryStatement.isGroupByLevel() && resultExpressions.size() > 1) { |
| throw new SemanticException( |
| String.format( |
| "alias '%s' can only be matched with one time series", resultColumn.getAlias())); |
| } |
| for (Expression expression : resultExpressions) { |
| if (paginationController.hasCurOffset()) { |
| paginationController.consumeOffset(); |
| continue; |
| } |
| if (paginationController.hasCurLimit()) { |
| if (isGroupByLevel) { |
| ExpressionAnalyzer.updateTypeProvider(expression, typeProvider); |
| expression.inferTypes(typeProvider); |
| outputExpressions.add(new Pair<>(expression, resultColumn.getAlias())); |
| if (resultColumn.getExpression() instanceof FunctionExpression) { |
| queryStatement |
| .getGroupByLevelComponent() |
| .updateIsCountStar((FunctionExpression) resultColumn.getExpression()); |
| } |
| } else { |
| Expression expressionWithoutAlias = |
| ExpressionAnalyzer.removeAliasFromExpression(expression); |
| String alias = |
| !Objects.equals(expressionWithoutAlias, expression) |
| ? expression.getExpressionString() |
| : null; |
| alias = hasAlias ? resultColumn.getAlias() : alias; |
| |
| ExpressionAnalyzer.updateTypeProvider(expressionWithoutAlias, typeProvider); |
| expressionWithoutAlias.inferTypes(typeProvider); |
| outputExpressions.add(new Pair<>(expressionWithoutAlias, alias)); |
| } |
| paginationController.consumeLimit(); |
| } else { |
| break; |
| } |
| } |
| } |
| return outputExpressions; |
| } |
| |
| private Set<PartialPath> analyzeFrom(QueryStatement queryStatement, ISchemaTree schemaTree) { |
| // device path patterns in FROM clause |
| List<PartialPath> devicePatternList = queryStatement.getFromComponent().getPrefixPaths(); |
| |
| Set<PartialPath> deviceList = new LinkedHashSet<>(); |
| for (PartialPath devicePattern : devicePatternList) { |
| // get all matched devices |
| deviceList.addAll( |
| schemaTree.getMatchedDevices(devicePattern).stream() |
| .map(DeviceSchemaInfo::getDevicePath) |
| .collect(Collectors.toList())); |
| } |
| return deviceList; |
| } |
| |
| private List<Pair<Expression, String>> analyzeSelect( |
| QueryStatement queryStatement, |
| ISchemaTree schemaTree, |
| Set<PartialPath> deviceList, |
| Map<String, Set<Expression>> deviceToTransformExpressions, |
| Map<String, Set<String>> deviceToMeasurementsMap) { |
| List<Pair<Expression, String>> outputExpressions = new ArrayList<>(); |
| ColumnPaginationController paginationController = |
| new ColumnPaginationController( |
| queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset(), false); |
| |
| for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) { |
| Expression selectExpression = resultColumn.getExpression(); |
| boolean hasAlias = resultColumn.hasAlias(); |
| |
| // measurement expression after removing wildcard |
| // use LinkedHashMap for order-preserving |
| Map<Expression, Map<String, Expression>> measurementToDeviceTransformExpressions = |
| new LinkedHashMap<>(); |
| for (PartialPath device : deviceList) { |
| List<Expression> transformExpressions = |
| ExpressionAnalyzer.concatDeviceAndRemoveWildcard( |
| selectExpression, device, schemaTree, typeProvider); |
| for (Expression transformExpression : transformExpressions) { |
| measurementToDeviceTransformExpressions |
| .computeIfAbsent( |
| ExpressionAnalyzer.getMeasurementExpression(transformExpression), |
| key -> new LinkedHashMap<>()) |
| .put( |
| device.getFullPath(), |
| ExpressionAnalyzer.removeAliasFromExpression(transformExpression)); |
| } |
| } |
| |
| if (hasAlias && measurementToDeviceTransformExpressions.keySet().size() > 1) { |
| throw new SemanticException( |
| String.format( |
| "alias '%s' can only be matched with one time series", resultColumn.getAlias())); |
| } |
| |
| for (Expression measurementExpression : measurementToDeviceTransformExpressions.keySet()) { |
| if (paginationController.hasCurOffset()) { |
| paginationController.consumeOffset(); |
| continue; |
| } |
| if (paginationController.hasCurLimit()) { |
| Map<String, Expression> deviceToTransformExpressionOfOneMeasurement = |
| measurementToDeviceTransformExpressions.get(measurementExpression); |
| deviceToTransformExpressionOfOneMeasurement |
| .values() |
| .forEach(expression -> expression.inferTypes(typeProvider)); |
| // check whether the datatype of paths which has the same measurement name are |
| // consistent |
| // if not, throw a SemanticException |
| checkDataTypeConsistencyInAlignByDevice( |
| new ArrayList<>(deviceToTransformExpressionOfOneMeasurement.values())); |
| |
| // add outputExpressions |
| Expression measurementExpressionWithoutAlias = |
| ExpressionAnalyzer.removeAliasFromExpression(measurementExpression); |
| String alias = |
| !Objects.equals(measurementExpressionWithoutAlias, measurementExpression) |
| ? measurementExpression.getExpressionString() |
| : null; |
| alias = hasAlias ? resultColumn.getAlias() : alias; |
| ExpressionAnalyzer.updateTypeProvider(measurementExpressionWithoutAlias, typeProvider); |
| measurementExpressionWithoutAlias.inferTypes(typeProvider); |
| outputExpressions.add(new Pair<>(measurementExpressionWithoutAlias, alias)); |
| |
| // add deviceToTransformExpressions |
| for (String deviceName : deviceToTransformExpressionOfOneMeasurement.keySet()) { |
| Expression transformExpression = |
| deviceToTransformExpressionOfOneMeasurement.get(deviceName); |
| ExpressionAnalyzer.updateTypeProvider(transformExpression, typeProvider); |
| transformExpression.inferTypes(typeProvider); |
| deviceToTransformExpressions |
| .computeIfAbsent(deviceName, key -> new LinkedHashSet<>()) |
| .add(ExpressionAnalyzer.removeAliasFromExpression(transformExpression)); |
| deviceToMeasurementsMap |
| .computeIfAbsent(deviceName, key -> new LinkedHashSet<>()) |
| .add(measurementExpressionWithoutAlias.getExpressionString()); |
| } |
| paginationController.consumeLimit(); |
| } else { |
| break; |
| } |
| } |
| } |
| |
| return outputExpressions; |
| } |
| |
| private Pair<Filter, Boolean> analyzeGlobalTimeFilter(QueryStatement queryStatement) { |
| Filter globalTimeFilter = null; |
| boolean hasValueFilter = false; |
| if (queryStatement.getWhereCondition() != null) { |
| Expression predicate = queryStatement.getWhereCondition().getPredicate(); |
| WhereCondition whereCondition = queryStatement.getWhereCondition(); |
| Pair<Filter, Boolean> resultPair = |
| ExpressionAnalyzer.transformToGlobalTimeFilter(predicate, true, true); |
| predicate = ExpressionAnalyzer.evaluatePredicate(predicate); |
| |
| // set where condition to null if predicate is true |
| if (predicate.getExpressionType().equals(ExpressionType.CONSTANT) |
| && Boolean.parseBoolean(predicate.getExpressionString())) { |
| queryStatement.setWhereCondition(null); |
| } else { |
| whereCondition.setPredicate(predicate); |
| } |
| globalTimeFilter = resultPair.left; |
| hasValueFilter = resultPair.right; |
| } |
| if (queryStatement.isGroupByTime()) { |
| GroupByTimeComponent groupByTimeComponent = queryStatement.getGroupByTimeComponent(); |
| Filter groupByFilter = initGroupByFilter(groupByTimeComponent); |
| if (globalTimeFilter == null) { |
| globalTimeFilter = groupByFilter; |
| } else { |
| // TODO: optimize the filter |
| globalTimeFilter = FilterFactory.and(globalTimeFilter, groupByFilter); |
| } |
| } |
| return new Pair<>(globalTimeFilter, hasValueFilter); |
| } |
| |
| private void updateSource( |
| Expression selectExpr, Set<Expression> sourceExpressions, boolean isRawDataSource) { |
| sourceExpressions.addAll( |
| ExpressionAnalyzer.searchSourceExpressions(selectExpr, isRawDataSource)); |
| } |
| |
| private boolean analyzeAggregation( |
| Set<Expression> transformExpressions, |
| Set<Expression> aggregationExpressions, |
| Set<Expression> aggregationTransformExpressions) { |
| // true if nested expressions and UDFs exist in aggregation function |
| // i.e. select sum(s1 + 1) from root.sg.d1 align by device |
| boolean isHasRawDataInputAggregation = false; |
| for (Expression expression : transformExpressions) { |
| for (Expression aggregationExpression : |
| ExpressionAnalyzer.searchAggregationExpressions(expression)) { |
| aggregationExpressions.add(aggregationExpression); |
| aggregationTransformExpressions.addAll(aggregationExpression.getExpressions()); |
| } |
| } |
| for (Expression aggregationTransformExpression : aggregationTransformExpressions) { |
| if (ExpressionAnalyzer.checkIsNeedTransform(aggregationTransformExpression)) { |
| isHasRawDataInputAggregation = true; |
| break; |
| } |
| } |
| return isHasRawDataInputAggregation; |
| } |
| |
| private boolean analyzeAggregationInHaving( |
| Set<Expression> expressions, |
| List<Expression> aggregationExpressionsInHaving, |
| Set<Expression> aggregationExpressions, |
| Set<Expression> aggregationTransformExpressions) { |
| // true if nested expressions and UDFs exist in aggregation function |
| // i.e. select sum(s1 + 1) from root.sg.d1 align by device |
| boolean isHasRawDataInputAggregation = false; |
| for (Expression expression : expressions) { |
| for (Expression aggregationExpression : |
| ExpressionAnalyzer.searchAggregationExpressions(expression)) { |
| aggregationExpressionsInHaving.add(aggregationExpression); |
| aggregationExpressions.add(aggregationExpression); |
| aggregationTransformExpressions.addAll(aggregationExpression.getExpressions()); |
| } |
| } |
| |
| for (Expression aggregationTransformExpression : aggregationTransformExpressions) { |
| if (ExpressionAnalyzer.checkIsNeedTransform(aggregationTransformExpression)) { |
| isHasRawDataInputAggregation = true; |
| break; |
| } |
| } |
| return isHasRawDataInputAggregation; |
| } |
| |
| private Expression analyzeWhere(QueryStatement queryStatement, ISchemaTree schemaTree) { |
| List<Expression> rewrittenPredicates = |
| ExpressionAnalyzer.removeWildcardInFilter( |
| queryStatement.getWhereCondition().getPredicate(), |
| queryStatement.getFromComponent().getPrefixPaths(), |
| schemaTree, |
| typeProvider, |
| true); |
| return ExpressionUtils.constructQueryFilter( |
| rewrittenPredicates.stream().distinct().collect(Collectors.toList())); |
| } |
| |
| private Expression analyzeWhereSplitByDevice( |
| QueryStatement queryStatement, PartialPath devicePath, ISchemaTree schemaTree) { |
| List<Expression> rewrittenPredicates = |
| ExpressionAnalyzer.removeWildcardInFilterByDevice( |
| queryStatement.getWhereCondition().getPredicate(), |
| devicePath, |
| schemaTree, |
| typeProvider, |
| true); |
| return ExpressionUtils.constructQueryFilter( |
| rewrittenPredicates.stream().distinct().collect(Collectors.toList())); |
| } |
| |
| private Expression analyzeHaving( |
| QueryStatement queryStatement, |
| Map<Expression, Set<Expression>> groupByLevelExpressions, |
| Set<Expression> transformExpressionsInHaving, |
| List<Expression> aggregationExpressionsInHaving) { |
| |
| if (queryStatement.isGroupByLevel()) { |
| Map<Expression, Expression> RawPathToGroupedPathMapInHaving = |
| analyzeGroupByLevelInHaving( |
| queryStatement, aggregationExpressionsInHaving, groupByLevelExpressions); |
| List<Expression> convertedPredicates = new ArrayList<>(); |
| for (Expression expression : transformExpressionsInHaving) { |
| convertedPredicates.add( |
| ExpressionAnalyzer.replaceRawPathWithGroupedPath( |
| expression, RawPathToGroupedPathMapInHaving)); |
| } |
| return ExpressionUtils.constructQueryFilter( |
| convertedPredicates.stream().distinct().collect(Collectors.toList())); |
| } |
| |
| return ExpressionUtils.constructQueryFilter( |
| transformExpressionsInHaving.stream().distinct().collect(Collectors.toList())); |
| } |
| |
| private Expression analyzeHavingSplitByDevice(Set<Expression> transformExpressionsInHaving) { |
| return ExpressionUtils.constructQueryFilter( |
| transformExpressionsInHaving.stream().distinct().collect(Collectors.toList())); |
| } |
| |
| private Map<Expression, Expression> analyzeGroupByLevelInHaving( |
| QueryStatement queryStatement, |
| List<Expression> inputExpressions, |
| Map<Expression, Set<Expression>> groupByLevelExpressions) { |
| GroupByLevelController groupByLevelController = |
| new GroupByLevelController( |
| queryStatement.getGroupByLevelComponent().getLevels(), typeProvider); |
| for (Expression inputExpression : inputExpressions) { |
| groupByLevelController.control(false, inputExpression, null); |
| } |
| Map<Expression, Set<Expression>> groupedPathMap = groupByLevelController.getGroupedPathMap(); |
| groupByLevelExpressions.putAll(groupedPathMap); |
| return groupByLevelController.getRawPathToGroupedPathMap(); |
| } |
| |
| private Map<Expression, Set<Expression>> analyzeGroupByLevel( |
| QueryStatement queryStatement, |
| List<Pair<Expression, String>> outputExpressions, |
| Set<Expression> transformExpressions, |
| Map<Expression, Expression> rawPathToGroupedPathMap) { |
| GroupByLevelController groupByLevelController = |
| new GroupByLevelController( |
| queryStatement.getGroupByLevelComponent().getLevels(), typeProvider); |
| for (int i = 0; i < outputExpressions.size(); i++) { |
| Pair<Expression, String> measurementWithAlias = outputExpressions.get(i); |
| boolean isCountStar = queryStatement.getGroupByLevelComponent().isCountStar(i); |
| groupByLevelController.control( |
| isCountStar, measurementWithAlias.left, measurementWithAlias.right); |
| } |
| Map<Expression, Set<Expression>> rawGroupByLevelExpressions = |
| groupByLevelController.getGroupedPathMap(); |
| rawPathToGroupedPathMap.putAll(groupByLevelController.getRawPathToGroupedPathMap()); |
| |
| Map<Expression, Set<Expression>> groupByLevelExpressions = new LinkedHashMap<>(); |
| outputExpressions.clear(); |
| ColumnPaginationController paginationController = |
| new ColumnPaginationController( |
| queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset(), false); |
| for (Expression groupedExpression : rawGroupByLevelExpressions.keySet()) { |
| if (paginationController.hasCurOffset()) { |
| paginationController.consumeOffset(); |
| continue; |
| } |
| if (paginationController.hasCurLimit()) { |
| Pair<Expression, String> outputExpression = |
| removeAliasFromExpression( |
| groupedExpression, |
| groupByLevelController.getAlias(groupedExpression.getExpressionString())); |
| Expression groupedExpressionWithoutAlias = outputExpression.left; |
| |
| Set<Expression> rawExpressions = rawGroupByLevelExpressions.get(groupedExpression); |
| rawExpressions.forEach( |
| expression -> ExpressionAnalyzer.updateTypeProvider(expression, typeProvider)); |
| rawExpressions.forEach(expression -> expression.inferTypes(typeProvider)); |
| |
| Set<Expression> rawExpressionsWithoutAlias = |
| rawExpressions.stream() |
| .map(ExpressionAnalyzer::removeAliasFromExpression) |
| .collect(Collectors.toSet()); |
| rawExpressionsWithoutAlias.forEach( |
| expression -> ExpressionAnalyzer.updateTypeProvider(expression, typeProvider)); |
| rawExpressionsWithoutAlias.forEach(expression -> expression.inferTypes(typeProvider)); |
| |
| groupByLevelExpressions.put(groupedExpressionWithoutAlias, rawExpressionsWithoutAlias); |
| |
| TSDataType dataType = |
| typeProvider.getType( |
| new ArrayList<>(groupByLevelExpressions.get(groupedExpressionWithoutAlias)) |
| .get(0) |
| .getExpressionString()); |
| typeProvider.setType(groupedExpression.getExpressionString(), dataType); |
| typeProvider.setType(groupedExpressionWithoutAlias.getExpressionString(), dataType); |
| outputExpressions.add(outputExpression); |
| paginationController.consumeLimit(); |
| } else { |
| break; |
| } |
| } |
| |
| // reset transformExpressions after applying SLIMIT/SOFFSET |
| transformExpressions.clear(); |
| transformExpressions.addAll( |
| groupByLevelExpressions.values().stream().flatMap(Set::stream).collect(Collectors.toSet())); |
| return groupByLevelExpressions; |
| } |
| |
| private Pair<Expression, String> removeAliasFromExpression( |
| Expression rawExpression, String rawAlias) { |
| Expression expressionWithoutAlias = ExpressionAnalyzer.removeAliasFromExpression(rawExpression); |
| String alias = |
| !Objects.equals(expressionWithoutAlias, rawExpression) |
| ? rawExpression.getExpressionString() |
| : null; |
| alias = rawAlias == null ? alias : rawAlias; |
| return new Pair<>(expressionWithoutAlias, alias); |
| } |
| |
| private DatasetHeader analyzeOutput( |
| QueryStatement queryStatement, List<Pair<Expression, String>> outputExpressions) { |
| boolean isIgnoreTimestamp = |
| queryStatement.isAggregationQuery() && !queryStatement.isGroupByTime(); |
| List<ColumnHeader> columnHeaders = new ArrayList<>(); |
| if (queryStatement.isAlignByDevice()) { |
| columnHeaders.add(new ColumnHeader(HeaderConstant.COLUMN_DEVICE, TSDataType.TEXT, null)); |
| typeProvider.setType(HeaderConstant.COLUMN_DEVICE, TSDataType.TEXT); |
| } |
| columnHeaders.addAll( |
| outputExpressions.stream() |
| .map( |
| expressionWithAlias -> { |
| String columnName = expressionWithAlias.left.getExpressionString(); |
| String alias = expressionWithAlias.right; |
| return new ColumnHeader(columnName, typeProvider.getType(columnName), alias); |
| }) |
| .collect(Collectors.toList())); |
| return new DatasetHeader(columnHeaders, isIgnoreTimestamp); |
| } |
| |
| private Analysis analyzeLast( |
| Analysis analysis, List<MeasurementPath> allSelectedPath, ISchemaTree schemaTree) { |
| Set<Expression> sourceExpressions; |
| List<SortItem> sortItemList = analysis.getMergeOrderParameter().getSortItemList(); |
| if (sortItemList.size() > 0) { |
| checkState( |
| sortItemList.size() == 1 && sortItemList.get(0).getSortKey() == SortKey.TIMESERIES, |
| "Last queries only support sorting by timeseries now."); |
| boolean isAscending = sortItemList.get(0).getOrdering() == Ordering.ASC; |
| sourceExpressions = |
| allSelectedPath.stream() |
| .map(TimeSeriesOperand::new) |
| .sorted( |
| (o1, o2) -> |
| isAscending |
| ? o1.getExpressionString().compareTo(o2.getExpressionString()) |
| : o2.getExpressionString().compareTo(o1.getExpressionString())) |
| .collect(Collectors.toCollection(LinkedHashSet::new)); |
| } else { |
| sourceExpressions = |
| allSelectedPath.stream() |
| .map(TimeSeriesOperand::new) |
| .collect(Collectors.toCollection(LinkedHashSet::new)); |
| } |
| |
| sourceExpressions.forEach( |
| expression -> ExpressionAnalyzer.updateTypeProvider(expression, typeProvider)); |
| analysis.setSourceExpressions(sourceExpressions); |
| |
| analysis.setRespDatasetHeader(HeaderConstant.LAST_QUERY_HEADER); |
| typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES, TSDataType.TEXT); |
| typeProvider.setType(HeaderConstant.COLUMN_VALUE, TSDataType.TEXT); |
| typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES_DATATYPE, TSDataType.TEXT); |
| |
| Set<String> deviceSet = |
| allSelectedPath.stream().map(MeasurementPath::getDevice).collect(Collectors.toSet()); |
| Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>(); |
| for (String devicePath : deviceSet) { |
| DataPartitionQueryParam queryParam = new DataPartitionQueryParam(); |
| queryParam.setDevicePath(devicePath); |
| sgNameToQueryParamsMap |
| .computeIfAbsent(schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>()) |
| .add(queryParam); |
| } |
| DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap); |
| analysis.setDataPartitionInfo(dataPartition); |
| |
| return analysis; |
| } |
| |
| private DataPartition fetchDataPartitionByDevices(Set<String> deviceSet, ISchemaTree schemaTree) { |
| Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>(); |
| for (String devicePath : deviceSet) { |
| DataPartitionQueryParam queryParam = new DataPartitionQueryParam(); |
| queryParam.setDevicePath(devicePath); |
| sgNameToQueryParamsMap |
| .computeIfAbsent(schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>()) |
| .add(queryParam); |
| } |
| return partitionFetcher.getDataPartition(sgNameToQueryParamsMap); |
| } |
| |
| private OrderByParameter analyzeOrderBy(QueryStatement queryStatement) { |
| return new OrderByParameter(queryStatement.getSortItemList()); |
| } |
| |
| /** |
| * Check datatype consistency in ALIGN BY DEVICE. |
| * |
| * <p>an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device, return |
| * false while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT. |
| */ |
| private void checkDataTypeConsistencyInAlignByDevice(List<Expression> expressions) { |
| TSDataType checkedDataType = typeProvider.getType(expressions.get(0).getExpressionString()); |
| for (Expression expression : expressions) { |
| if (typeProvider.getType(expression.getExpressionString()) != checkedDataType) { |
| throw new SemanticException( |
| "ALIGN BY DEVICE: the data types of the same measurement column should be the same across devices."); |
| } |
| } |
| } |
| |
| @Override |
| public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| insertStatement.semanticCheck(); |
| long[] timeArray = insertStatement.getTimes(); |
| PartialPath devicePath = insertStatement.getDevice(); |
| String[] measurementList = insertStatement.getMeasurementList(); |
| if (timeArray.length == 1) { |
| // construct insert row statement |
| InsertRowStatement insertRowStatement = new InsertRowStatement(); |
| insertRowStatement.setDevicePath(devicePath); |
| insertRowStatement.setTime(timeArray[0]); |
| insertRowStatement.setMeasurements(measurementList); |
| insertRowStatement.setDataTypes(new TSDataType[measurementList.length]); |
| Object[] values = new Object[measurementList.length]; |
| System.arraycopy(insertStatement.getValuesList().get(0), 0, values, 0, values.length); |
| insertRowStatement.setValues(values); |
| insertRowStatement.setNeedInferType(true); |
| insertRowStatement.setAligned(insertStatement.isAligned()); |
| return insertRowStatement.accept(this, context); |
| } else { |
| // construct insert rows statement |
| // construct insert statement |
| InsertRowsStatement insertRowsStatement = new InsertRowsStatement(); |
| List<InsertRowStatement> insertRowStatementList = new ArrayList<>(); |
| for (int i = 0; i < timeArray.length; i++) { |
| InsertRowStatement statement = new InsertRowStatement(); |
| statement.setDevicePath(devicePath); |
| String[] measurements = new String[measurementList.length]; |
| System.arraycopy(measurementList, 0, measurements, 0, measurements.length); |
| statement.setMeasurements(measurements); |
| statement.setTime(timeArray[i]); |
| statement.setDataTypes(new TSDataType[measurementList.length]); |
| Object[] values = new Object[measurementList.length]; |
| System.arraycopy(insertStatement.getValuesList().get(i), 0, values, 0, values.length); |
| statement.setValues(values); |
| statement.setAligned(insertStatement.isAligned()); |
| statement.setNeedInferType(true); |
| insertRowStatementList.add(statement); |
| } |
| insertRowsStatement.setInsertRowStatementList(insertRowStatementList); |
| return insertRowsStatement.accept(this, context); |
| } |
| } |
| |
| @Override |
| public Analysis visitCreateTimeseries( |
| CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| if (createTimeSeriesStatement.getTags() != null |
| && !createTimeSeriesStatement.getTags().isEmpty() |
| && createTimeSeriesStatement.getAttributes() != null |
| && !createTimeSeriesStatement.getAttributes().isEmpty()) { |
| for (String tagKey : createTimeSeriesStatement.getTags().keySet()) { |
| if (createTimeSeriesStatement.getAttributes().containsKey(tagKey)) { |
| throw new SemanticException( |
| String.format("Tag and attribute shouldn't have the same property key [%s]", tagKey)); |
| } |
| } |
| } |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(createTimeSeriesStatement); |
| |
| checkIsTemplateCompatible( |
| createTimeSeriesStatement.getPath(), createTimeSeriesStatement.getAlias()); |
| |
| PathPatternTree patternTree = new PathPatternTree(); |
| patternTree.appendFullPath(createTimeSeriesStatement.getPath()); |
| SchemaPartition schemaPartitionInfo = partitionFetcher.getOrCreateSchemaPartition(patternTree); |
| analysis.setSchemaPartitionInfo(schemaPartitionInfo); |
| return analysis; |
| } |
| |
| private void checkIsTemplateCompatible(PartialPath timeseriesPath, String alias) { |
| Pair<Template, PartialPath> templateInfo = schemaFetcher.checkTemplateSetInfo(timeseriesPath); |
| if (templateInfo != null) { |
| if (templateInfo.left.hasSchema(timeseriesPath.getMeasurement())) { |
| throw new RuntimeException( |
| new TemplateImcompatibeException( |
| timeseriesPath.getFullPath(), |
| templateInfo.left.getName(), |
| timeseriesPath.getMeasurement())); |
| } |
| |
| if (alias != null && templateInfo.left.hasSchema(alias)) { |
| throw new RuntimeException( |
| new TemplateImcompatibeException( |
| timeseriesPath.getDevicePath().concatNode(alias).getFullPath(), |
| templateInfo.left.getName(), |
| alias)); |
| } |
| } |
| } |
| |
| private void checkIsTemplateCompatible( |
| PartialPath devicePath, List<String> measurements, List<String> aliasList) { |
| Pair<Template, PartialPath> templateInfo = schemaFetcher.checkTemplateSetInfo(devicePath); |
| if (templateInfo != null) { |
| Template template = templateInfo.left; |
| for (String measurement : measurements) { |
| if (template.hasSchema(measurement)) { |
| throw new RuntimeException( |
| new TemplateImcompatibeException( |
| devicePath.concatNode(measurement).getFullPath(), |
| templateInfo.left.getName(), |
| measurement)); |
| } |
| } |
| |
| if (aliasList == null) { |
| return; |
| } |
| |
| for (String alias : aliasList) { |
| if (template.hasSchema(alias)) { |
| throw new RuntimeException( |
| new TemplateImcompatibeException( |
| devicePath.concatNode(alias).getFullPath(), templateInfo.left.getName(), alias)); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public Analysis visitCreateAlignedTimeseries( |
| CreateAlignedTimeSeriesStatement createAlignedTimeSeriesStatement, MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| List<String> measurements = createAlignedTimeSeriesStatement.getMeasurements(); |
| Set<String> measurementsSet = new HashSet<>(measurements); |
| if (measurementsSet.size() < measurements.size()) { |
| throw new SemanticException( |
| "Measurement under an aligned device is not allowed to have the same measurement name"); |
| } |
| |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(createAlignedTimeSeriesStatement); |
| |
| checkIsTemplateCompatible( |
| createAlignedTimeSeriesStatement.getDevicePath(), |
| createAlignedTimeSeriesStatement.getMeasurements(), |
| createAlignedTimeSeriesStatement.getAliasList()); |
| |
| PathPatternTree pathPatternTree = new PathPatternTree(); |
| for (String measurement : createAlignedTimeSeriesStatement.getMeasurements()) { |
| pathPatternTree.appendFullPath(createAlignedTimeSeriesStatement.getDevicePath(), measurement); |
| } |
| |
| SchemaPartition schemaPartitionInfo; |
| schemaPartitionInfo = partitionFetcher.getOrCreateSchemaPartition(pathPatternTree); |
| analysis.setSchemaPartitionInfo(schemaPartitionInfo); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitInternalCreateTimeseries( |
| InternalCreateTimeSeriesStatement internalCreateTimeSeriesStatement, |
| MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(internalCreateTimeSeriesStatement); |
| |
| checkIsTemplateCompatible( |
| internalCreateTimeSeriesStatement.getDevicePath(), |
| internalCreateTimeSeriesStatement.getMeasurements(), |
| null); |
| |
| PathPatternTree pathPatternTree = new PathPatternTree(); |
| for (String measurement : internalCreateTimeSeriesStatement.getMeasurements()) { |
| pathPatternTree.appendFullPath( |
| internalCreateTimeSeriesStatement.getDevicePath(), measurement); |
| } |
| |
| SchemaPartition schemaPartitionInfo; |
| schemaPartitionInfo = partitionFetcher.getOrCreateSchemaPartition(pathPatternTree); |
| analysis.setSchemaPartitionInfo(schemaPartitionInfo); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitCreateMultiTimeseries( |
| CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(createMultiTimeSeriesStatement); |
| |
| List<PartialPath> timeseriesPathList = createMultiTimeSeriesStatement.getPaths(); |
| List<String> aliasList = createMultiTimeSeriesStatement.getAliasList(); |
| for (int i = 0; i < timeseriesPathList.size(); i++) { |
| checkIsTemplateCompatible( |
| timeseriesPathList.get(i), aliasList == null ? null : aliasList.get(i)); |
| } |
| |
| PathPatternTree patternTree = new PathPatternTree(); |
| for (PartialPath path : createMultiTimeSeriesStatement.getPaths()) { |
| patternTree.appendFullPath(path); |
| } |
| SchemaPartition schemaPartitionInfo = partitionFetcher.getOrCreateSchemaPartition(patternTree); |
| analysis.setSchemaPartitionInfo(schemaPartitionInfo); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitAlterTimeseries( |
| AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(alterTimeSeriesStatement); |
| |
| if (alterTimeSeriesStatement.getAlias() != null) { |
| checkIsTemplateCompatible( |
| alterTimeSeriesStatement.getPath(), alterTimeSeriesStatement.getAlias()); |
| } |
| |
| PathPatternTree patternTree = new PathPatternTree(); |
| patternTree.appendFullPath(alterTimeSeriesStatement.getPath()); |
| SchemaPartition schemaPartitionInfo; |
| schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree); |
| analysis.setSchemaPartitionInfo(schemaPartitionInfo); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitInsertTablet( |
| InsertTabletStatement insertTabletStatement, MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| |
| DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); |
| dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath()); |
| dataPartitionQueryParam.setTimePartitionSlotList(insertTabletStatement.getTimePartitionSlots()); |
| |
| DataPartition dataPartition = |
| partitionFetcher.getOrCreateDataPartition( |
| Collections.singletonList(dataPartitionQueryParam)); |
| |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(insertTabletStatement); |
| analysis.setDataPartitionInfo(dataPartition); |
| |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| |
| DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); |
| dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath()); |
| dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots()); |
| |
| DataPartition dataPartition = |
| partitionFetcher.getOrCreateDataPartition( |
| Collections.singletonList(dataPartitionQueryParam)); |
| |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(insertRowStatement); |
| analysis.setDataPartitionInfo(dataPartition); |
| |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitInsertRows( |
| InsertRowsStatement insertRowsStatement, MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| |
| List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>(); |
| for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) { |
| DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); |
| dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath()); |
| dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots()); |
| dataPartitionQueryParams.add(dataPartitionQueryParam); |
| } |
| |
| DataPartition dataPartition = |
| partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams); |
| |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(insertRowsStatement); |
| analysis.setDataPartitionInfo(dataPartition); |
| |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitInsertMultiTablets( |
| InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| |
| List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>(); |
| for (InsertTabletStatement insertTabletStatement : |
| insertMultiTabletsStatement.getInsertTabletStatementList()) { |
| DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); |
| dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath()); |
| dataPartitionQueryParam.setTimePartitionSlotList( |
| insertTabletStatement.getTimePartitionSlots()); |
| dataPartitionQueryParams.add(dataPartitionQueryParam); |
| } |
| |
| DataPartition dataPartition = |
| partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams); |
| |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(insertMultiTabletsStatement); |
| analysis.setDataPartitionInfo(dataPartition); |
| |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitInsertRowsOfOneDevice( |
| InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| |
| DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); |
| dataPartitionQueryParam.setDevicePath( |
| insertRowsOfOneDeviceStatement.getDevicePath().getFullPath()); |
| dataPartitionQueryParam.setTimePartitionSlotList( |
| insertRowsOfOneDeviceStatement.getTimePartitionSlots()); |
| |
| DataPartition dataPartition = |
| partitionFetcher.getOrCreateDataPartition( |
| Collections.singletonList(dataPartitionQueryParam)); |
| |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(insertRowsOfOneDeviceStatement); |
| analysis.setDataPartitionInfo(dataPartition); |
| |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitShowTimeSeries( |
| ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(showTimeSeriesStatement); |
| |
| PathPatternTree patternTree = new PathPatternTree(); |
| patternTree.appendPathPattern(showTimeSeriesStatement.getPathPattern()); |
| SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree); |
| analysis.setSchemaPartitionInfo(schemaPartitionInfo); |
| |
| Map<Integer, Template> templateMap = |
| schemaFetcher.checkAllRelatedTemplate(showTimeSeriesStatement.getPathPattern()); |
| analysis.setRelatedTemplateInfo(templateMap); |
| |
| if (showTimeSeriesStatement.isOrderByHeat()) { |
| patternTree.constructTree(); |
| // request schema fetch API |
| logger.info("{} fetch query schema...", getLogHeader()); |
| ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree); |
| logger.info("{} fetch schema done", getLogHeader()); |
| List<MeasurementPath> allSelectedPath = schemaTree.getAllMeasurement(); |
| |
| Set<Expression> sourceExpressions = |
| allSelectedPath.stream() |
| .map(TimeSeriesOperand::new) |
| .collect(Collectors.toCollection(LinkedHashSet::new)); |
| analysis.setSourceExpressions(sourceExpressions); |
| |
| Set<String> deviceSet = |
| allSelectedPath.stream().map(MeasurementPath::getDevice).collect(Collectors.toSet()); |
| Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>(); |
| for (String devicePath : deviceSet) { |
| DataPartitionQueryParam queryParam = new DataPartitionQueryParam(); |
| queryParam.setDevicePath(devicePath); |
| sgNameToQueryParamsMap |
| .computeIfAbsent( |
| schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>()) |
| .add(queryParam); |
| } |
| DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap); |
| analysis.setDataPartitionInfo(dataPartition); |
| } |
| |
| analysis.setRespDatasetHeader(HeaderConstant.showTimeSeriesHeader); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitShowStorageGroup( |
| ShowStorageGroupStatement showStorageGroupStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(showStorageGroupStatement); |
| analysis.setRespDatasetHeader(HeaderConstant.showStorageGroupHeader); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitShowTTL(ShowTTLStatement showTTLStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(showTTLStatement); |
| analysis.setRespDatasetHeader(HeaderConstant.showTTLHeader); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitShowDevices( |
| ShowDevicesStatement showDevicesStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(showDevicesStatement); |
| |
| PathPatternTree patternTree = new PathPatternTree(); |
| patternTree.appendPathPattern( |
| showDevicesStatement.getPathPattern().concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)); |
| SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree); |
| |
| analysis.setSchemaPartitionInfo(schemaPartitionInfo); |
| analysis.setRespDatasetHeader( |
| showDevicesStatement.hasSgCol() |
| ? HeaderConstant.showDevicesWithSgHeader |
| : HeaderConstant.showDevicesHeader); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitShowCluster( |
| ShowClusterStatement showClusterStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(showClusterStatement); |
| analysis.setRespDatasetHeader(HeaderConstant.showClusterHeader); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitCountStorageGroup( |
| CountStorageGroupStatement countStorageGroupStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(countStorageGroupStatement); |
| analysis.setRespDatasetHeader(HeaderConstant.countStorageGroupHeader); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitSchemaFetch( |
| SchemaFetchStatement schemaFetchStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(schemaFetchStatement); |
| |
| SchemaPartition schemaPartition = |
| partitionFetcher.getSchemaPartition(schemaFetchStatement.getPatternTree()); |
| analysis.setSchemaPartitionInfo(schemaPartition); |
| |
| if (schemaPartition.isEmpty()) { |
| analysis.setFinishQueryAfterAnalyze(true); |
| } |
| |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitCountDevices( |
| CountDevicesStatement countDevicesStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(countDevicesStatement); |
| |
| PathPatternTree patternTree = new PathPatternTree(); |
| patternTree.appendPathPattern( |
| countDevicesStatement.getPartialPath().concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)); |
| SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree); |
| |
| analysis.setSchemaPartitionInfo(schemaPartitionInfo); |
| analysis.setRespDatasetHeader(HeaderConstant.countDevicesHeader); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitCountTimeSeries( |
| CountTimeSeriesStatement countTimeSeriesStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(countTimeSeriesStatement); |
| |
| PathPatternTree patternTree = new PathPatternTree(); |
| patternTree.appendPathPattern(countTimeSeriesStatement.getPartialPath()); |
| SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree); |
| |
| analysis.setSchemaPartitionInfo(schemaPartitionInfo); |
| analysis.setRespDatasetHeader(HeaderConstant.countTimeSeriesHeader); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitCountLevelTimeSeries( |
| CountLevelTimeSeriesStatement countLevelTimeSeriesStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(countLevelTimeSeriesStatement); |
| |
| PathPatternTree patternTree = new PathPatternTree(); |
| patternTree.appendPathPattern(countLevelTimeSeriesStatement.getPartialPath()); |
| SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree); |
| |
| analysis.setSchemaPartitionInfo(schemaPartitionInfo); |
| analysis.setRespDatasetHeader(HeaderConstant.countLevelTimeSeriesHeader); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitCountNodes(CountNodesStatement countStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(countStatement); |
| |
| PathPatternTree patternTree = new PathPatternTree(); |
| patternTree.appendPathPattern(countStatement.getPartialPath()); |
| SchemaNodeManagementPartition schemaNodeManagementPartition = |
| partitionFetcher.getSchemaNodeManagementPartitionWithLevel( |
| patternTree, countStatement.getLevel()); |
| |
| if (schemaNodeManagementPartition == null) { |
| return analysis; |
| } |
| if (!schemaNodeManagementPartition.getMatchedNode().isEmpty() |
| && schemaNodeManagementPartition.getSchemaPartition().getSchemaPartitionMap().size() == 0) { |
| analysis.setFinishQueryAfterAnalyze(true); |
| } |
| analysis.setMatchedNodes(schemaNodeManagementPartition.getMatchedNode()); |
| analysis.setSchemaPartitionInfo(schemaNodeManagementPartition.getSchemaPartition()); |
| analysis.setRespDatasetHeader(HeaderConstant.countNodesHeader); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitShowChildPaths( |
| ShowChildPathsStatement showChildPathsStatement, MPPQueryContext context) { |
| return visitSchemaNodeManagementPartition( |
| showChildPathsStatement, |
| showChildPathsStatement.getPartialPath(), |
| HeaderConstant.showChildPathsHeader); |
| } |
| |
| @Override |
| public Analysis visitShowChildNodes( |
| ShowChildNodesStatement showChildNodesStatement, MPPQueryContext context) { |
| return visitSchemaNodeManagementPartition( |
| showChildNodesStatement, |
| showChildNodesStatement.getPartialPath(), |
| HeaderConstant.showChildNodesHeader); |
| } |
| |
| @Override |
| public Analysis visitShowVersion( |
| ShowVersionStatement showVersionStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(showVersionStatement); |
| analysis.setRespDatasetHeader(HeaderConstant.showVersionHeader); |
| analysis.setFinishQueryAfterAnalyze(true); |
| return analysis; |
| } |
| |
| private Analysis visitSchemaNodeManagementPartition( |
| Statement statement, PartialPath path, DatasetHeader header) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(statement); |
| |
| PathPatternTree patternTree = new PathPatternTree(); |
| patternTree.appendPathPattern(path); |
| SchemaNodeManagementPartition schemaNodeManagementPartition = |
| partitionFetcher.getSchemaNodeManagementPartition(patternTree); |
| |
| if (schemaNodeManagementPartition == null) { |
| return analysis; |
| } |
| if (!schemaNodeManagementPartition.getMatchedNode().isEmpty() |
| && schemaNodeManagementPartition.getSchemaPartition().getSchemaPartitionMap().size() == 0) { |
| analysis.setFinishQueryAfterAnalyze(true); |
| } |
| analysis.setMatchedNodes(schemaNodeManagementPartition.getMatchedNode()); |
| analysis.setSchemaPartitionInfo(schemaNodeManagementPartition.getSchemaPartition()); |
| analysis.setRespDatasetHeader(header); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitDeleteData( |
| DeleteDataStatement deleteDataStatement, MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(deleteDataStatement); |
| |
| PathPatternTree patternTree = new PathPatternTree(); |
| for (PartialPath pathPattern : deleteDataStatement.getPathList()) { |
| patternTree.appendPathPattern(pathPattern); |
| } |
| |
| ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree); |
| analysis.setSchemaTree(schemaTree); |
| |
| Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>(); |
| |
| schemaTree |
| .getMatchedDevices(new PartialPath(ALL_RESULT_NODES)) |
| .forEach( |
| deviceSchemaInfo -> { |
| PartialPath devicePath = deviceSchemaInfo.getDevicePath(); |
| DataPartitionQueryParam queryParam = new DataPartitionQueryParam(); |
| queryParam.setDevicePath(devicePath.getFullPath()); |
| sgNameToQueryParamsMap |
| .computeIfAbsent( |
| schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>()) |
| .add(queryParam); |
| }); |
| |
| DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap); |
| analysis.setDataPartitionInfo(dataPartition); |
| |
| if (dataPartition.isEmpty()) { |
| analysis.setFinishQueryAfterAnalyze(true); |
| } |
| |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitCreateSchemaTemplate( |
| CreateSchemaTemplateStatement createTemplateStatement, MPPQueryContext context) { |
| |
| context.setQueryType(QueryType.WRITE); |
| List<List<String>> measurementsList = createTemplateStatement.getMeasurements(); |
| for (List measurements : measurementsList) { |
| Set<String> measurementsSet = new HashSet<>(measurements); |
| if (measurementsSet.size() < measurements.size()) { |
| throw new SemanticException( |
| "Measurement under an aligned device is not allowed to have the same measurement name"); |
| } |
| } |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(createTemplateStatement); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitShowNodesInSchemaTemplate( |
| ShowNodesInSchemaTemplateStatement showNodesInSchemaTemplateStatement, |
| MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(showNodesInSchemaTemplateStatement); |
| analysis.setRespDatasetHeader(HeaderConstant.showNodesInSchemaTemplate); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitShowSchemaTemplate( |
| ShowSchemaTemplateStatement showSchemaTemplateStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(showSchemaTemplateStatement); |
| analysis.setRespDatasetHeader(HeaderConstant.showSchemaTemplate); |
| return analysis; |
| } |
| |
| private GroupByFilter initGroupByFilter(GroupByTimeComponent groupByTimeComponent) { |
| if (groupByTimeComponent.isIntervalByMonth() || groupByTimeComponent.isSlidingStepByMonth()) { |
| return new GroupByMonthFilter( |
| groupByTimeComponent.getInterval(), |
| groupByTimeComponent.getSlidingStep(), |
| groupByTimeComponent.getStartTime(), |
| groupByTimeComponent.getEndTime(), |
| groupByTimeComponent.isSlidingStepByMonth(), |
| groupByTimeComponent.isIntervalByMonth(), |
| TimeZone.getTimeZone("+00:00")); |
| } else { |
| return new GroupByFilter( |
| groupByTimeComponent.getInterval(), |
| groupByTimeComponent.getSlidingStep(), |
| groupByTimeComponent.getStartTime(), |
| groupByTimeComponent.getEndTime()); |
| } |
| } |
| |
| @Override |
| public Analysis visitSetSchemaTemplate( |
| SetSchemaTemplateStatement setSchemaTemplateStatement, MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(setSchemaTemplateStatement); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitShowPathSetTemplate( |
| ShowPathSetTemplateStatement showPathSetTemplateStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(showPathSetTemplateStatement); |
| analysis.setRespDatasetHeader(HeaderConstant.showPathSetTemplate); |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitActivateTemplate( |
| ActivateTemplateStatement activateTemplateStatement, MPPQueryContext context) { |
| context.setQueryType(QueryType.WRITE); |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(activateTemplateStatement); |
| |
| PartialPath activatePath = activateTemplateStatement.getPath(); |
| |
| Pair<Template, PartialPath> templateSetInfo = schemaFetcher.checkTemplateSetInfo(activatePath); |
| if (templateSetInfo == null) { |
| throw new StatementAnalyzeException( |
| new MetadataException( |
| String.format( |
| "Path [%s] has not been set any template.", activatePath.getFullPath()))); |
| } |
| analysis.setTemplateSetInfo( |
| new Pair<>(templateSetInfo.left, Collections.singletonList(templateSetInfo.right))); |
| |
| PathPatternTree patternTree = new PathPatternTree(); |
| patternTree.appendPathPattern(activatePath.concatNode(ONE_LEVEL_PATH_WILDCARD)); |
| SchemaPartition partition = partitionFetcher.getOrCreateSchemaPartition(patternTree); |
| |
| analysis.setSchemaPartitionInfo(partition); |
| |
| return analysis; |
| } |
| |
| @Override |
| public Analysis visitShowPathsUsingTemplate( |
| ShowPathsUsingTemplateStatement showPathsUsingTemplateStatement, MPPQueryContext context) { |
| Analysis analysis = new Analysis(); |
| analysis.setStatement(showPathsUsingTemplateStatement); |
| analysis.setRespDatasetHeader(HeaderConstant.showPathsUsingTemplate); |
| |
| Pair<Template, List<PartialPath>> templateSetInfo = |
| schemaFetcher.getAllPathsSetTemplate(showPathsUsingTemplateStatement.getTemplateName()); |
| analysis.setTemplateSetInfo(templateSetInfo); |
| if (templateSetInfo == null |
| || templateSetInfo.right == null |
| || templateSetInfo.right.isEmpty()) { |
| analysis.setFinishQueryAfterAnalyze(true); |
| return analysis; |
| } |
| |
| PathPatternTree patternTree = new PathPatternTree(); |
| templateSetInfo.right.forEach( |
| path -> { |
| patternTree.appendPathPattern(path); |
| patternTree.appendPathPattern(path.concatNode(MULTI_LEVEL_PATH_WILDCARD)); |
| }); |
| |
| SchemaPartition partition = partitionFetcher.getOrCreateSchemaPartition(patternTree); |
| analysis.setSchemaPartitionInfo(partition); |
| if (partition.isEmpty()) { |
| analysis.setFinishQueryAfterAnalyze(true); |
| return analysis; |
| } |
| |
| return analysis; |
| } |
| } |