blob: c9f7ba67f45ca5395010fcfaff00f0869c220edd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.spark.tsfile.qp;
import org.apache.iotdb.spark.tsfile.qp.common.FilterOperator;
import org.apache.iotdb.spark.tsfile.qp.common.SQLConstant;
import org.apache.iotdb.spark.tsfile.qp.common.SingleQuery;
import org.apache.iotdb.spark.tsfile.qp.common.TSQueryPlan;
import org.apache.iotdb.spark.tsfile.qp.exception.QueryOperatorException;
import org.apache.iotdb.spark.tsfile.qp.exception.QueryProcessorException;
import org.apache.iotdb.spark.tsfile.qp.optimizer.DNFFilterOptimizer;
import org.apache.iotdb.spark.tsfile.qp.optimizer.MergeSingleFilterOptimizer;
import org.apache.iotdb.spark.tsfile.qp.optimizer.PhysicalOptimizer;
import org.apache.iotdb.spark.tsfile.qp.optimizer.RemoveNotOptimizer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This class is used to convert information given by sparkSQL to construct TSFile's query plans.
* For TSFile's schema differ from SparkSQL's table schema e.g. TSFile's SQL: select s1,s2 from
* root.car.d1 where s1 = 10 SparkSQL's SQL: select s1,s2 from XXX where delta_object = d1
*/
public class QueryProcessor {
// construct logical query plans first, then convert them to physical ones
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public List<TSQueryPlan> generatePlans(
FilterOperator filter,
List<String> paths,
List<String> columnNames,
TsFileSequenceReader in,
Long start,
Long end)
throws QueryProcessorException, IOException {
List<TSQueryPlan> queryPlans = new ArrayList<>();
if (filter != null) {
RemoveNotOptimizer removeNot = new RemoveNotOptimizer();
filter = removeNot.optimize(filter);
DNFFilterOptimizer dnf = new DNFFilterOptimizer();
filter = dnf.optimize(filter);
// merge different query path
// e.g. or (sensor_1 > 20, sensor_1 <10, sensor_2 > 10)
// => or (or (sensor_1 > 20, sensor_1 < 10), sensor_2 > 10)
MergeSingleFilterOptimizer merge = new MergeSingleFilterOptimizer();
filter = merge.optimize(filter);
List<FilterOperator> filterOperators = splitFilter(filter);
for (FilterOperator filterOperator : filterOperators) {
SingleQuery singleQuery = constructSelectPlan(filterOperator, columnNames);
if (singleQuery != null) {
queryPlans.addAll(
new PhysicalOptimizer(columnNames).optimize(singleQuery, paths, in, start, end));
}
}
} else {
queryPlans.addAll(new PhysicalOptimizer(columnNames).optimize(null, paths, in, start, end));
}
// merge query plan
Map<List<String>, List<TSQueryPlan>> pathMap = new HashMap<>();
for (TSQueryPlan tsQueryPlan : queryPlans) {
if (pathMap.containsKey(tsQueryPlan.getPaths())) {
pathMap.get(tsQueryPlan.getPaths()).add(tsQueryPlan);
} else {
List<TSQueryPlan> plans = new ArrayList<>();
plans.add(tsQueryPlan);
pathMap.put(tsQueryPlan.getPaths(), plans);
}
}
queryPlans.clear();
for (List<TSQueryPlan> plans : pathMap.values()) {
TSQueryPlan mergePlan = null;
for (TSQueryPlan plan : plans) {
if (mergePlan == null) {
mergePlan = plan;
} else {
FilterOperator timeFilterOperator = new FilterOperator(SQLConstant.KW_OR);
List<FilterOperator> timeFilterChildren = new ArrayList<>();
timeFilterChildren.add(mergePlan.getTimeFilterOperator());
timeFilterChildren.add(plan.getTimeFilterOperator());
timeFilterOperator.setChildrenList(timeFilterChildren);
mergePlan.setTimeFilterOperator(timeFilterOperator);
FilterOperator valueFilterOperator = new FilterOperator(SQLConstant.KW_OR);
List<FilterOperator> valueFilterChildren = new ArrayList<>();
valueFilterChildren.add(mergePlan.getValueFilterOperator());
valueFilterChildren.add(plan.getValueFilterOperator());
valueFilterOperator.setChildrenList(valueFilterChildren);
mergePlan.setValueFilterOperator(valueFilterOperator);
}
}
queryPlans.add(mergePlan);
}
return queryPlans;
}
private List<FilterOperator> splitFilter(FilterOperator filterOperator) {
if (filterOperator.isSingle() || filterOperator.getTokenIntType() != SQLConstant.KW_OR) {
List<FilterOperator> ret = new ArrayList<>();
ret.add(filterOperator);
return ret;
}
// a list of conjunctions linked by or
return filterOperator.getChildOperators();
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private SingleQuery constructSelectPlan(FilterOperator filterOperator, List<String> columnNames)
throws QueryOperatorException {
FilterOperator timeFilter = null;
FilterOperator valueFilter = null;
List<FilterOperator> columnFilterOperators = new ArrayList<>();
List<FilterOperator> singleFilterList = null;
if (filterOperator.isSingle()) {
singleFilterList = new ArrayList<>();
singleFilterList.add(filterOperator);
} else if (filterOperator.getTokenIntType() == SQLConstant.KW_AND) {
// original query plan has been dealt with merge optimizer, thus all nodes with same
// path have been merged to one node
singleFilterList = filterOperator.getChildren();
}
if (singleFilterList == null) {
return null;
}
List<FilterOperator> valueList = new ArrayList<>();
for (FilterOperator child : singleFilterList) {
if (!child.isSingle()) {
valueList.add(child);
} else {
String singlePath = child.getSinglePath();
if (columnNames.contains(singlePath)) {
if (!columnFilterOperators.contains(child)) {
columnFilterOperators.add(child);
} else {
throw new QueryOperatorException(
"The same key filter has been specified more than once: " + singlePath);
}
} else {
if (SQLConstant.RESERVED_TIME.equals(child.getSinglePath())) {
if (timeFilter != null) {
throw new QueryOperatorException("time filter has been specified more than once");
}
timeFilter = child;
} else {
valueList.add(child);
}
}
}
}
if (valueList.size() == 1) {
valueFilter = valueList.get(0);
} else if (valueList.size() > 1) {
valueFilter = new FilterOperator(SQLConstant.KW_AND, false);
valueFilter.setChildOperators(valueList);
}
return new SingleQuery(columnFilterOperators, timeFilter, valueFilter);
}
}