| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.qp.logical.crud; |
| |
| import org.apache.iotdb.commons.exception.MetadataException; |
| import org.apache.iotdb.commons.path.MeasurementPath; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.db.exception.query.LogicalOperatorException; |
| import org.apache.iotdb.db.exception.query.LogicalOptimizeException; |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.index.common.IndexType; |
| import org.apache.iotdb.db.mpp.plan.expression.Expression; |
| import org.apache.iotdb.db.mpp.plan.expression.ResultColumn; |
| import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand; |
| import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression; |
| import org.apache.iotdb.db.qp.constant.SQLConstant; |
| import org.apache.iotdb.db.qp.logical.Operator; |
| import org.apache.iotdb.db.qp.physical.PhysicalPlan; |
| import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; |
| import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo; |
| import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan; |
| import org.apache.iotdb.db.qp.physical.crud.QueryPlan; |
| import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; |
| import org.apache.iotdb.db.qp.strategy.PhysicalGenerator; |
| import org.apache.iotdb.db.service.IoTDB; |
| import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.read.expression.IExpression; |
| import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import static org.apache.iotdb.db.utils.SchemaUtils.getSeriesTypeByPath; |
| |
| public class QueryOperator extends Operator { |
| |
| protected SelectComponent selectComponent; |
| protected FromComponent fromComponent; |
| protected WhereComponent whereComponent; |
| protected SpecialClauseComponent specialClauseComponent; |
| |
| protected Map<String, Object> props; |
| protected IndexType indexType; |
| |
| protected boolean enableTracing; |
| |
| Set<String> aliasSet; |
| |
| public QueryOperator() { |
| super(SQLConstant.TOK_QUERY); |
| operatorType = Operator.OperatorType.QUERY; |
| } |
| |
| public QueryOperator(QueryOperator queryOperator) { |
| this(); |
| this.selectComponent = queryOperator.getSelectComponent(); |
| this.fromComponent = queryOperator.getFromComponent(); |
| this.whereComponent = queryOperator.getWhereComponent(); |
| this.specialClauseComponent = queryOperator.getSpecialClauseComponent(); |
| this.props = queryOperator.getProps(); |
| this.indexType = queryOperator.getIndexType(); |
| this.enableTracing = queryOperator.isEnableTracing(); |
| } |
| |
| public void setAliasSet(Set<String> aliasSet) { |
| this.aliasSet = aliasSet; |
| } |
| |
| public Set<String> getAliasSet() { |
| return aliasSet; |
| } |
| |
| public SelectComponent getSelectComponent() { |
| return selectComponent; |
| } |
| |
| public void setSelectComponent(SelectComponent selectComponent) { |
| this.selectComponent = selectComponent; |
| } |
| |
| public FromComponent getFromComponent() { |
| return fromComponent; |
| } |
| |
| public void setFromComponent(FromComponent fromComponent) { |
| this.fromComponent = fromComponent; |
| } |
| |
| public WhereComponent getWhereComponent() { |
| return whereComponent; |
| } |
| |
| public void setWhereComponent(WhereComponent whereComponent) { |
| this.whereComponent = whereComponent; |
| } |
| |
| public void setSpecialClauseComponent(SpecialClauseComponent specialClauseComponent) { |
| this.specialClauseComponent = specialClauseComponent; |
| } |
| |
| public SpecialClauseComponent getSpecialClauseComponent() { |
| return specialClauseComponent; |
| } |
| |
| public Map<String, Object> getProps() { |
| return props; |
| } |
| |
| public void setProps(Map<String, Object> props) { |
| this.props = props; |
| } |
| |
| public IndexType getIndexType() { |
| return indexType; |
| } |
| |
| public void setIndexType(IndexType indexType) { |
| this.indexType = indexType; |
| } |
| |
| public boolean hasAggregationFunction() { |
| return selectComponent.hasPlainAggregationFunction(); |
| } |
| |
| public boolean hasTimeSeriesGeneratingFunction() { |
| return selectComponent.hasTimeSeriesGeneratingFunction(); |
| } |
| |
| public boolean isAlignByDevice() { |
| return specialClauseComponent != null && specialClauseComponent.isAlignByDevice(); |
| } |
| |
| public boolean isAlignByTime() { |
| return specialClauseComponent == null || specialClauseComponent.isAlignByTime(); |
| } |
| |
| public boolean isGroupByLevel() { |
| return specialClauseComponent != null && specialClauseComponent.getLevels() != null; |
| } |
| |
| public int[] getLevels() { |
| return specialClauseComponent.getLevels(); |
| } |
| |
| public boolean hasSlimit() { |
| return specialClauseComponent != null && specialClauseComponent.hasSlimit(); |
| } |
| |
| public boolean hasSoffset() { |
| return specialClauseComponent != null && specialClauseComponent.hasSoffset(); |
| } |
| |
| /** Reset sLimit and sOffset to zero. */ |
| public void resetSLimitOffset() { |
| if (specialClauseComponent != null) { |
| specialClauseComponent.setSeriesLimit(0); |
| specialClauseComponent.setSeriesOffset(0); |
| } |
| } |
| |
| public void check() throws LogicalOperatorException { |
| if (isAlignByDevice()) { |
| if (selectComponent.hasTimeSeriesGeneratingFunction()) { |
| throw new LogicalOperatorException( |
| "ALIGN BY DEVICE clause is not supported in UDF queries."); |
| } |
| |
| for (PartialPath path : selectComponent.getPaths()) { |
| if (path.getNodes().length > 1) { |
| throw new LogicalOperatorException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator) |
| throws QueryProcessException { |
| QueryPlan queryPlan = indexType == null ? new RawDataQueryPlan() : new QueryIndexPlan(); |
| return isAlignByDevice() |
| ? this.generateAlignByDevicePlan(generator) |
| : this.generateRawDataQueryPlan(generator, queryPlan); |
| } |
| |
| protected QueryPlan generateRawDataQueryPlan(PhysicalGenerator generator, QueryPlan queryPlan) |
| throws QueryProcessException { |
| RawDataQueryPlan rawDataQueryPlan = (RawDataQueryPlan) queryPlan; |
| rawDataQueryPlan.setPaths(selectComponent.getPaths()); |
| rawDataQueryPlan.setResultColumns(selectComponent.getResultColumns()); |
| rawDataQueryPlan.setEnableTracing(enableTracing); |
| |
| if (queryPlan instanceof QueryIndexPlan) { |
| ((QueryIndexPlan) queryPlan).setIndexType(indexType); |
| ((QueryIndexPlan) queryPlan).setProps(props); |
| return queryPlan; |
| } |
| |
| try { |
| queryPlan.deduplicate(generator); |
| } catch (MetadataException e) { |
| throw new QueryProcessException(e); |
| } |
| |
| rawDataQueryPlan.convertSpecialClauseValues(specialClauseComponent); |
| |
| // transform filter operator to expression |
| IExpression expression = transformFilterOperatorToExpression(); |
| expression = optimizeExpression(expression, (RawDataQueryPlan) queryPlan); |
| if (expression != null) { |
| ((RawDataQueryPlan) queryPlan).setExpression(expression); |
| } |
| |
| return rawDataQueryPlan; |
| } |
| |
| protected IExpression transformFilterOperatorToExpression() throws QueryProcessException { |
| if (whereComponent == null) { |
| return null; |
| } |
| FilterOperator filterOperator = whereComponent.getFilterOperator(); |
| List<PartialPath> filterPaths = new ArrayList<>(filterOperator.getPathSet()); |
| HashMap<PartialPath, TSDataType> pathTSDataTypeHashMap = new HashMap<>(); |
| for (PartialPath filterPath : filterPaths) { |
| pathTSDataTypeHashMap.put( |
| filterPath, |
| SQLConstant.isReservedPath(filterPath) ? TSDataType.INT64 : filterPath.getSeriesType()); |
| } |
| return filterOperator.transformToExpression(pathTSDataTypeHashMap); |
| } |
| |
| protected IExpression optimizeExpression(IExpression expression, RawDataQueryPlan queryPlan) |
| throws QueryProcessException { |
| try { |
| return expression == null |
| ? null |
| : ExpressionOptimizer.getInstance() |
| .optimize(expression, new ArrayList<>(queryPlan.getDeduplicatedPaths())); |
| } catch (QueryFilterOptimizationException e) { |
| throw new QueryProcessException(e.getMessage()); |
| } |
| } |
| |
| protected AlignByDevicePlan generateAlignByDevicePlan(PhysicalGenerator generator) |
| throws QueryProcessException { |
| AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan(); |
| |
| // remove stars in fromPaths and get deviceId with deduplication |
| List<PartialPath> devices = removeStarsInDeviceWithUnique(fromComponent.getPrefixPaths()); |
| List<ResultColumn> resultColumns = selectComponent.getResultColumns(); |
| List<String> aggregationFuncs = selectComponent.getAggregationFunctions(); |
| // to record result measurement columns |
| List<String> measurements = new ArrayList<>(); |
| Map<String, MeasurementInfo> measurementInfoMap = new HashMap<>(); |
| List<PartialPath> paths = new ArrayList<>(); |
| List<String> aggregations = new ArrayList<>(); |
| |
| // per suffix in SELECT |
| for (int i = 0; i < resultColumns.size(); i++) { |
| ResultColumn resultColumn = resultColumns.get(i); |
| PartialPath suffixPath = getSuffixPathFromExpression(resultColumn.getExpression()); |
| String aggregation = aggregationFuncs != null ? aggregationFuncs.get(i) : null; |
| // to record measurements in the loop of a suffix path |
| Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>(); |
| |
| // concat suffix with per device |
| for (PartialPath device : devices) { |
| PartialPath fullPath = device.concatPath(suffixPath); |
| try { |
| // remove stars in SELECT to get actual paths |
| List<MeasurementPath> actualPaths = getMatchedTimeseries(fullPath); |
| if (resultColumn.hasAlias() && actualPaths.size() >= 2) { |
| throw new QueryProcessException( |
| String.format(AlignByDevicePlan.ALIAS_ERROR_MESSAGE, resultColumn.getAlias())); |
| } |
| for (MeasurementPath path : actualPaths) { |
| MeasurementInfo measurementInfo = |
| new MeasurementInfo(getMeasurementName(path, aggregation)); |
| TSDataType columnDataType = getSeriesTypeByPath(path, aggregation); |
| if (aggregation != null) { |
| aggregations.add(aggregation); |
| } |
| checkDataTypeConsistency( |
| columnDataType, measurementInfoMap.get(measurementInfo.getMeasurement())); |
| |
| if (!measurementInfoMap.containsKey(measurementInfo.getMeasurement())) { |
| measurementInfo.setMeasurementAlias( |
| resultColumn.hasAlias() ? resultColumn.getAlias() : null); |
| measurementInfo.setColumnDataType(columnDataType); |
| measurementInfoMap.put(measurementInfo.getMeasurement(), measurementInfo); |
| } |
| measurementSetOfGivenSuffix.add(measurementInfo.getMeasurement()); |
| paths.add(path); |
| } |
| } catch (MetadataException | QueryProcessException e) { |
| throw new QueryProcessException(e.getMessage()); |
| } |
| } |
| |
| if (measurementSetOfGivenSuffix.isEmpty()) { |
| measurements.add(suffixPath.toString()); |
| } else { |
| // Note that in the loop of a suffix path, set is used. |
| // And across the loops of suffix paths, list is used. |
| // e.g. select *,s1 from root.sg.d0, root.sg.d1 |
| // for suffix *, measurementSetOfGivenSuffix = {s1,s2,s3} |
| // for suffix s1, measurementSetOfGivenSuffix = {s1} |
| // therefore the final measurements is [s1,s2,s3,s1]. |
| measurements.addAll(measurementSetOfGivenSuffix); |
| } |
| |
| if (specialClauseComponent.hasSlimit() |
| && measurements.size() |
| >= specialClauseComponent.getSeriesLimit() |
| + specialClauseComponent.getSeriesOffset()) { |
| break; |
| } |
| } |
| |
| // assigns to alignByDevicePlan |
| alignByDevicePlan.setMeasurements(convertSpecialClauseValues(alignByDevicePlan, measurements)); |
| alignByDevicePlan.setPaths(paths); |
| alignByDevicePlan.setAggregations(aggregations); |
| alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap); |
| alignByDevicePlan.setEnableTracing(enableTracing); |
| |
| alignByDevicePlan.deduplicate(generator); |
| if (specialClauseComponent != null) { |
| alignByDevicePlan.calcWithoutNullColumnIndex(specialClauseComponent.withoutNullColumns); |
| } |
| |
| if (whereComponent != null) { |
| alignByDevicePlan.setDeviceToFilterMap( |
| concatFilterByDevice(alignByDevicePlan, devices, whereComponent.getFilterOperator())); |
| } |
| |
| return alignByDevicePlan; |
| } |
| |
| private void checkDataTypeConsistency(TSDataType checkedDataType, MeasurementInfo measurementInfo) |
| throws QueryProcessException { |
| // check datatype consistency |
| // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device |
| // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT. |
| if (measurementInfo != null && !checkedDataType.equals(measurementInfo.getColumnDataType())) { |
| throw new QueryProcessException(AlignByDevicePlan.DATATYPE_ERROR_MESSAGE); |
| } |
| } |
| |
| private List<String> convertSpecialClauseValues(QueryPlan queryPlan, List<String> measurements) |
| throws QueryProcessException { |
| queryPlan.convertSpecialClauseValues(specialClauseComponent); |
| |
| // sLimit trim on the measurementColumnList |
| if (specialClauseComponent.hasSlimit()) { |
| int seriesSLimit = specialClauseComponent.getSeriesLimit(); |
| int seriesOffset = specialClauseComponent.getSeriesOffset(); |
| return slimitTrimColumn(measurements, seriesSLimit, seriesOffset); |
| } |
| return measurements; |
| } |
| |
| private List<PartialPath> removeStarsInDeviceWithUnique(List<PartialPath> paths) |
| throws LogicalOptimizeException { |
| List<PartialPath> retDevices; |
| Set<PartialPath> deviceSet = new LinkedHashSet<>(); |
| try { |
| for (PartialPath path : paths) { |
| Set<PartialPath> tempDS = getMatchedDevices(path); |
| deviceSet.addAll(tempDS); |
| } |
| retDevices = new ArrayList<>(deviceSet); |
| } catch (MetadataException e) { |
| throw new LogicalOptimizeException("error when remove star: " + e.getMessage()); |
| } |
| return retDevices; |
| } |
| |
| private PartialPath getSuffixPathFromExpression(Expression expression) { |
| return expression instanceof TimeSeriesOperand |
| ? ((TimeSeriesOperand) expression).getPath() |
| : (((FunctionExpression) expression).getPaths().get(0)); |
| } |
| |
| /** |
| * If path is a vectorPartialPath, we return its measurementId + subMeasurement as the final |
| * measurement. e.g. path: root.sg.d1.vector1[s1], return "vector1.s1". |
| */ |
| private String getMeasurementName(PartialPath path, String aggregation) { |
| String initialMeasurement = path.getMeasurement(); |
| if (aggregation != null) { |
| initialMeasurement = aggregation + "(" + initialMeasurement + ")"; |
| } |
| return initialMeasurement; |
| } |
| |
| private List<String> slimitTrimColumn( |
| List<String> measurements, int seriesLimit, int seriesOffset) throws QueryProcessException { |
| int size = measurements.size(); |
| |
| // check parameter range |
| if (seriesOffset >= size) { |
| String errorMessage = |
| "The value of SOFFSET (%d) is equal to or exceeds the number of sequences (%d) that can actually be returned."; |
| throw new QueryProcessException(String.format(errorMessage, seriesOffset, size)); |
| } |
| int endPosition = seriesOffset + seriesLimit; |
| if (endPosition > size) { |
| endPosition = size; |
| } |
| |
| // trim seriesPath list |
| return new ArrayList<>(measurements.subList(seriesOffset, endPosition)); |
| } |
| |
| // e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to |
| // [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10, |
| // root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)] |
| private Map<String, IExpression> concatFilterByDevice( |
| AlignByDevicePlan alignByDevicePlan, List<PartialPath> devices, FilterOperator operator) |
| throws QueryProcessException { |
| Map<String, IExpression> deviceToFilterMap = new HashMap<>(); |
| Set<PartialPath> filterPaths = new HashSet<>(); |
| Iterator<PartialPath> deviceIterator = devices.iterator(); |
| while (deviceIterator.hasNext()) { |
| PartialPath device = deviceIterator.next(); |
| FilterOperator newOperator; |
| try { |
| newOperator = operator.copy(); |
| concatFilterPath(device, newOperator, filterPaths); |
| } catch (LogicalOptimizeException | MetadataException e) { |
| deviceIterator.remove(); |
| alignByDevicePlan.removeDevice(device.getFullPath()); |
| continue; |
| } |
| // transform to a list so it can be indexed |
| List<PartialPath> filterPathList = new ArrayList<>(filterPaths); |
| Map<PartialPath, TSDataType> pathTSDataTypeHashMap = new HashMap<>(); |
| for (PartialPath filterPath : filterPathList) { |
| pathTSDataTypeHashMap.put( |
| filterPath, |
| SQLConstant.isReservedPath(filterPath) ? TSDataType.INT64 : filterPath.getSeriesType()); |
| } |
| deviceToFilterMap.put( |
| device.getFullPath(), newOperator.transformToExpression(pathTSDataTypeHashMap)); |
| filterPaths.clear(); |
| } |
| |
| return deviceToFilterMap; |
| } |
| |
| private void concatFilterPath( |
| PartialPath prefix, FilterOperator operator, Set<PartialPath> filterPaths) |
| throws LogicalOptimizeException, MetadataException { |
| if (!operator.isLeaf()) { |
| for (FilterOperator child : operator.getChildren()) { |
| concatFilterPath(prefix, child, filterPaths); |
| } |
| return; |
| } |
| FunctionOperator basicOperator; |
| if (operator instanceof InOperator) { |
| basicOperator = (InOperator) operator; |
| } else if (operator instanceof LikeOperator) { |
| basicOperator = (LikeOperator) operator; |
| } else if (operator instanceof RegexpOperator) { |
| basicOperator = (RegexpOperator) operator; |
| } else { |
| basicOperator = (BasicFunctionOperator) operator; |
| } |
| PartialPath filterPath = basicOperator.getSinglePath(); |
| |
| // do nothing in the cases of "where time > 5" or "where root.d1.s1 > 5" |
| if (SQLConstant.isReservedPath(filterPath) |
| || filterPath.getFirstNode().startsWith(SQLConstant.ROOT)) { |
| filterPaths.add(filterPath); |
| return; |
| } |
| |
| PartialPath concatPath = prefix.concatPath(filterPath); |
| List<MeasurementPath> concatMeasurementPaths = getMatchedTimeseries(concatPath); |
| if (concatMeasurementPaths.isEmpty()) { |
| throw new LogicalOptimizeException( |
| String.format("Unknown time series %s in `where clause`", concatPath)); |
| } |
| filterPaths.add(concatMeasurementPaths.get(0)); |
| basicOperator.setSinglePath(concatMeasurementPaths.get(0)); |
| } |
| |
| protected Set<PartialPath> getMatchedDevices(PartialPath path) throws MetadataException { |
| return IoTDB.schemaProcessor.getMatchedDevices(path, isPrefixMatchPath); |
| } |
| |
| protected List<MeasurementPath> getMatchedTimeseries(PartialPath path) throws MetadataException { |
| return IoTDB.schemaProcessor.getMeasurementPaths(path, isPrefixMatchPath); |
| } |
| |
| public boolean isEnableTracing() { |
| return enableTracing; |
| } |
| |
| public void setEnableTracing(boolean enableTracing) { |
| this.enableTracing = enableTracing; |
| } |
| } |