blob: 735d1aa520f8d6c7520d146cdb95cad1aad1050e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.spark.tsfile.qp.optimizer;
import org.apache.iotdb.spark.tsfile.qp.common.BasicOperator;
import org.apache.iotdb.spark.tsfile.qp.common.FilterOperator;
import org.apache.iotdb.spark.tsfile.qp.common.SQLConstant;
import org.apache.iotdb.spark.tsfile.qp.common.SingleQuery;
import org.apache.iotdb.spark.tsfile.qp.common.TSQueryPlan;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class PhysicalOptimizer {
// determine whether to query all delta_objects from TsFile. true means do read.
private boolean flag;
private List<String> validDeltaObjects = new ArrayList<>();
private List<String> columnNames;
public PhysicalOptimizer(List<String> columnNames) {
this.columnNames = columnNames;
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public List<TSQueryPlan> optimize(
SingleQuery singleQuery, List<String> paths, TsFileSequenceReader in, Long start, Long end)
throws IOException {
Map<String, TSDataType> allMeasurementsInFile = in.getAllMeasurements();
List<String> selectedSeries = new ArrayList<>();
for (String path : paths) {
if (!columnNames.contains(path) && !path.equals(SQLConstant.RESERVED_TIME)) {
selectedSeries.add(path);
}
}
FilterOperator timeFilter = null;
FilterOperator valueFilter = null;
if (singleQuery != null) {
timeFilter = singleQuery.getTimeFilterOperator();
valueFilter = singleQuery.getValueFilterOperator();
if (valueFilter != null) {
List<String> filterPaths = valueFilter.getAllPaths();
// if filter paths doesn't in tsfile, don't read
for (String filterPath : filterPaths) {
if (!allMeasurementsInFile.containsKey(filterPath)) {
return new ArrayList<>();
}
}
}
flag = true;
Map<String, Set<String>> selectColumns = mergeColumns(singleQuery.getColumnFilterOperator());
if (!flag) {
// e.g. where column1 = 'd1' and column2 = 'd2', should not read
return new ArrayList<>();
}
// if select deltaObject, then match with measurement
List<String> actualDeltaObjects =
in.getDeviceNameInRange(start, end).stream()
.map(deviceID -> ((PlainDeviceID) deviceID).toStringID())
.collect(Collectors.toList());
if (!selectColumns.isEmpty()) {
combination(
actualDeltaObjects,
selectColumns,
selectColumns.keySet().toArray(),
0,
new String[selectColumns.size()]);
} else {
validDeltaObjects.addAll(actualDeltaObjects);
}
} else {
validDeltaObjects.addAll(
in.getDeviceNameInRange(start, end).stream()
.map(deviceID -> ((PlainDeviceID) deviceID).toStringID())
.collect(Collectors.toList()));
}
// query all measurements from TSFile
if (selectedSeries.isEmpty()) {
selectedSeries.addAll(allMeasurementsInFile.keySet());
} else {
// remove paths that doesn't exist in file
selectedSeries.removeIf(path -> !allMeasurementsInFile.containsKey(path));
}
List<TSQueryPlan> tsFileQueries = new ArrayList<>();
for (String deltaObject : validDeltaObjects) {
List<String> newPaths = new ArrayList<>();
for (String path : selectedSeries) {
String newPath = deltaObject + SQLConstant.PATH_SEPARATOR + path;
newPaths.add(newPath);
}
if (valueFilter == null) {
tsFileQueries.add(new TSQueryPlan(newPaths, timeFilter, null));
} else {
FilterOperator newValueFilter = valueFilter.clone();
newValueFilter.addHeadDeltaObjectPath(deltaObject);
tsFileQueries.add(new TSQueryPlan(newPaths, timeFilter, newValueFilter));
}
}
return tsFileQueries;
}
/**
* calculate combinations of selected columns and add valid deltaObjects to validDeltaObjects
*
* @param actualDeltaObjects deltaObjects from file
* @param columnValues e.g. (device:{d1,d2}) (board:{c1,c2}) or (delta_object:{d1,d2})
* @param columns e.g. device, board
* @param beginIndex current recursion list index
* @param values combination of column values
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private void combination(
List<String> actualDeltaObjects,
Map<String, Set<String>> columnValues,
Object[] columns,
int beginIndex,
String[] values) {
// which should in column names -> now just device_name
// use delta_object column
if (columnValues.containsKey(SQLConstant.RESERVED_DELTA_OBJECT)) {
Set<String> deltaObjects = columnValues.get(SQLConstant.RESERVED_DELTA_OBJECT);
for (String deltaObject : deltaObjects) {
if (actualDeltaObjects.contains(deltaObject)) {
validDeltaObjects.add(deltaObject);
}
}
return;
}
if (beginIndex == columns.length) {
for (String deltaObject : actualDeltaObjects) {
boolean valid = true;
// if deltaObject is root.column1_value.column2_value then
// actualValues is [root, column1_value, column2_value]
String[] actualValues = deltaObject.split(SQLConstant.REGEX_PATH_SEPARATOR);
for (int i = 0; i < columns.length; i++) {
int columnIndex = columnNames.indexOf(columns[i].toString());
if (!actualValues[columnIndex].equals(values[i])) {
valid = false;
}
}
if (valid) {
validDeltaObjects.add(deltaObject);
}
}
return;
}
for (String c : columnValues.get(columns[beginIndex].toString())) {
values[beginIndex] = c;
combination(actualDeltaObjects, columnValues, columns, beginIndex + 1, values);
}
}
private Map<String, Set<String>> mergeColumns(List<FilterOperator> columnFilterOperators) {
Map<String, Set<String>> columnValuesMap = new HashMap<>();
for (FilterOperator filterOperator : columnFilterOperators) {
Pair<String, Set<String>> columnValues = mergeColumn(filterOperator);
if (columnValues != null && !columnValues.right.isEmpty()) {
columnValuesMap.put(columnValues.left, columnValues.right);
}
}
return columnValuesMap;
}
/**
* merge one column filterOperator
*
* @param columnFilterOperator column filter
* @return selected values of the column filter
*/
private Pair<String, Set<String>> mergeColumn(FilterOperator columnFilterOperator) {
if (columnFilterOperator == null) {
return null;
}
if (columnFilterOperator.isLeaf()) {
// special case : not equal
if (columnFilterOperator.getTokenIntType() == SQLConstant.NOTEQUAL) {
return null;
}
//
Set<String> ret = new HashSet<>();
ret.add(((BasicOperator) columnFilterOperator).getSeriesValue());
return new Pair<>(columnFilterOperator.getSinglePath(), ret);
}
List<FilterOperator> children = columnFilterOperator.getChildren();
if (children == null || children.isEmpty()) {
return new Pair<>(null, new HashSet<>());
}
Pair<String, Set<String>> ret = mergeColumn(children.get(0));
if (ret == null) {
return null;
}
for (int i = 1; i < children.size(); i++) {
Pair<String, Set<String>> temp = mergeColumn(children.get(i));
if (temp == null) {
return null;
}
switch (columnFilterOperator.getTokenIntType()) {
case SQLConstant.KW_AND:
ret.right.retainAll(temp.right);
// example: "where device = d1 and device = d2" should not query data
if (ret.right.isEmpty()) {
flag = false;
}
break;
case SQLConstant.KW_OR:
ret.right.addAll(temp.right);
break;
default:
throw new UnsupportedOperationException(
"given error token type:" + columnFilterOperator.getTokenIntType());
}
}
return ret;
}
}