blob: 3d05dc95f1e85149b1285678e8646a13fe9b3526 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.analyze.schema;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Pair;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
class NormalSchemaFetcher {
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final DataNodeSchemaCache schemaCache;
private final AutoCreateSchemaExecutor autoCreateSchemaExecutor;
private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor;
NormalSchemaFetcher(
DataNodeSchemaCache schemaCache,
AutoCreateSchemaExecutor autoCreateSchemaExecutor,
ClusterSchemaFetchExecutor clusterSchemaFetchExecutor) {
this.schemaCache = schemaCache;
this.autoCreateSchemaExecutor = autoCreateSchemaExecutor;
this.clusterSchemaFetchExecutor = clusterSchemaFetchExecutor;
}
/** Given full paths will be */
private void computePatternTreeNeededReFetch(
PathPatternTree patternTree, List<String> fullPaths) {
for (String fullPath : fullPaths) {
try {
patternTree.appendFullPath(new PartialPath(fullPath));
} catch (IllegalPathException e) {
throw new RuntimeException(e);
}
}
}
private PathPatternTree computePatternTreeNeededReFetch(
PartialPath devicePath,
String[] measurementList,
List<Integer> indexOfMissingMeasurements,
List<String> fullPaths) {
PathPatternTree patternTree = new PathPatternTree();
for (int index : indexOfMissingMeasurements) {
patternTree.appendFullPath(devicePath, measurementList[index]);
}
for (String fullPath : fullPaths) {
try {
patternTree.appendFullPath(new PartialPath(fullPath));
} catch (IllegalPathException e) {
throw new RuntimeException(e);
}
}
return patternTree;
}
private PathPatternTree computePatternTreeNeededReFetch(
List<PartialPath> devicePathList,
List<String[]> measurementsList,
List<Integer> indexOfTargetDevices,
List<List<Integer>> indexOfTargetMeasurementsList) {
PathPatternTree patternTree = new PathPatternTree();
int deviceIndex;
for (int i = 0, size = indexOfTargetDevices.size(); i < size; i++) {
deviceIndex = indexOfTargetDevices.get(i);
for (int measurementIndex : indexOfTargetMeasurementsList.get(i)) {
patternTree.appendFullPath(
devicePathList.get(deviceIndex), measurementsList.get(deviceIndex)[measurementIndex]);
}
}
return patternTree;
}
List<Integer> processNormalTimeSeries(
ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation,
MPPQueryContext context) {
// [Step 0] Record the input value.
boolean isAlignedPutIn = schemaComputationWithAutoCreation.isAligned();
// [Step 1] Cache 1. compute measurements and record logical views.
List<Integer> indexOfMissingMeasurements =
schemaCache.computeWithoutTemplate(schemaComputationWithAutoCreation);
// [Step 2] Cache 2. process recorded logical views. If there is no views now, it returns empty
// lists.
Pair<List<Integer>, List<String>> missedIndexAndPathString =
schemaCache.computeSourceOfLogicalView(schemaComputationWithAutoCreation);
List<Integer> indexOfMissingLogicalView = missedIndexAndPathString.left;
List<String> missedPathStringOfLogicalView = missedIndexAndPathString.right;
// all schema can be taken from cache
if (indexOfMissingMeasurements.isEmpty() && indexOfMissingLogicalView.isEmpty()) {
return indexOfMissingMeasurements;
}
// [Step 3] Fetch 1. fetch schema from remote. Process logical view first; then process
// measurements.
// try fetch the missing raw schema from remote and cache fetched schema
ClusterSchemaTree remoteSchemaTree;
if (missedPathStringOfLogicalView.isEmpty()) {
remoteSchemaTree =
clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(
schemaComputationWithAutoCreation.getDevicePath(),
schemaComputationWithAutoCreation.getMeasurements(),
indexOfMissingMeasurements,
context);
} else {
PathPatternTree patternTree =
computePatternTreeNeededReFetch(
schemaComputationWithAutoCreation.getDevicePath(),
schemaComputationWithAutoCreation.getMeasurements(),
indexOfMissingMeasurements,
missedPathStringOfLogicalView);
remoteSchemaTree =
clusterSchemaFetchExecutor.fetchSchemaWithPatternTreeAndCache(patternTree, context);
}
// make sure all missed views are computed.
remoteSchemaTree.computeSourceOfLogicalView(
schemaComputationWithAutoCreation, indexOfMissingLogicalView);
// check and compute the fetched schema
indexOfMissingMeasurements =
remoteSchemaTree.compute(schemaComputationWithAutoCreation, indexOfMissingMeasurements);
schemaComputationWithAutoCreation.recordRangeOfLogicalViewSchemaListNow();
// [Step 4] Fetch 2. Some fetched measurements in [Step 3] are views. Process them.
missedIndexAndPathString =
schemaCache.computeSourceOfLogicalView(schemaComputationWithAutoCreation);
indexOfMissingLogicalView = missedIndexAndPathString.left;
missedPathStringOfLogicalView = missedIndexAndPathString.right;
if (!missedPathStringOfLogicalView.isEmpty()) {
ClusterSchemaTree viewSchemaTree =
clusterSchemaFetchExecutor.fetchSchemaWithFullPaths(
missedPathStringOfLogicalView, context);
viewSchemaTree.computeSourceOfLogicalView(
schemaComputationWithAutoCreation, indexOfMissingLogicalView);
}
// all schema has been taken and processed
if (indexOfMissingMeasurements.isEmpty()) {
return indexOfMissingMeasurements;
}
// [Step 5] Auto Create and process the missing schema
if (config.isAutoCreateSchemaEnabled()) {
// Check the isAligned value. If the input value is different from the actual value of the
// existing device, throw exception.
PartialPath devicePath = schemaComputationWithAutoCreation.getDevicePath();
validateIsAlignedValueIfAutoCreate(
schemaComputationWithAutoCreation.isAligned(), isAlignedPutIn, devicePath);
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
autoCreateSchemaExecutor.autoCreateTimeSeries(
schemaTree,
devicePath,
indexOfMissingMeasurements,
schemaComputationWithAutoCreation.getMeasurements(),
schemaComputationWithAutoCreation::getDataType,
isAlignedPutIn,
context);
indexOfMissingMeasurements =
schemaTree.compute(schemaComputationWithAutoCreation, indexOfMissingMeasurements);
}
return indexOfMissingMeasurements;
}
void processNormalTimeSeries(
List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList,
MPPQueryContext context) {
// [Step 0] Record the input value.
List<Boolean> isAlignedPutInList = null;
if (config.isAutoCreateSchemaEnabled()) {
isAlignedPutInList =
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::isAligned)
.collect(Collectors.toList());
}
// [Step 1] Cache 1. compute measurements and record logical views.
List<Integer> indexOfDevicesWithMissingMeasurements = new ArrayList<>();
List<List<Integer>> indexOfMissingMeasurementsList =
new ArrayList<>(schemaComputationWithAutoCreationList.size());
List<Integer> indexOfMissingMeasurements;
for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
indexOfMissingMeasurements =
schemaCache.computeWithoutTemplate(schemaComputationWithAutoCreationList.get(i));
if (!indexOfMissingMeasurements.isEmpty()) {
indexOfDevicesWithMissingMeasurements.add(i);
indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
}
}
// [Step 2] Cache 2. process recorded logical views.
boolean hasUnFetchedLogicalView = false;
List<Pair<List<Integer>, List<String>>> missedIndexAndPathStringOfViewList =
new ArrayList<>(schemaComputationWithAutoCreationList.size());
for (ISchemaComputationWithAutoCreation iSchemaComputationWithAutoCreation :
schemaComputationWithAutoCreationList) {
Pair<List<Integer>, List<String>> missedIndexAndPathString =
schemaCache.computeSourceOfLogicalView(iSchemaComputationWithAutoCreation);
if (!missedIndexAndPathString.left.isEmpty()) {
hasUnFetchedLogicalView = true;
}
missedIndexAndPathStringOfViewList.add(missedIndexAndPathString);
}
// all schema can be taken from cache
if (indexOfDevicesWithMissingMeasurements.isEmpty() && (!hasUnFetchedLogicalView)) {
return;
}
// [Step 3] Fetch 1.fetch schema from remote. Process logical view first; then process
// measurements.
// try fetch the missing schema from remote
ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation;
ClusterSchemaTree remoteSchemaTree;
if (!hasUnFetchedLogicalView) {
remoteSchemaTree =
clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices(
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::getDevicePath)
.collect(Collectors.toList()),
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::getMeasurements)
.collect(Collectors.toList()),
indexOfDevicesWithMissingMeasurements,
indexOfMissingMeasurementsList,
context);
} else {
PathPatternTree patternTree =
computePatternTreeNeededReFetch(
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::getDevicePath)
.collect(Collectors.toList()),
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::getMeasurements)
.collect(Collectors.toList()),
indexOfDevicesWithMissingMeasurements,
indexOfMissingMeasurementsList);
List<String> fullPathsNeedReFetch = new ArrayList<>();
for (Pair<List<Integer>, List<String>> pair : missedIndexAndPathStringOfViewList) {
fullPathsNeedReFetch.addAll(pair.right);
}
computePatternTreeNeededReFetch(patternTree, fullPathsNeedReFetch);
remoteSchemaTree =
clusterSchemaFetchExecutor.fetchSchemaWithPatternTreeAndCache(patternTree, context);
}
// make sure all missed views are computed.
for (int i = 0; i < schemaComputationWithAutoCreationList.size(); i++) {
schemaComputationWithAutoCreation = schemaComputationWithAutoCreationList.get(i);
remoteSchemaTree.computeSourceOfLogicalView(
schemaComputationWithAutoCreation, missedIndexAndPathStringOfViewList.get(i).left);
}
// check and compute the fetched schema
List<Integer> indexOfDevicesNeedAutoCreateSchema = new ArrayList<>();
List<List<Integer>> indexOfMeasurementsNeedAutoCreate = new ArrayList<>();
for (int i = 0; i < indexOfDevicesWithMissingMeasurements.size(); i++) {
schemaComputationWithAutoCreation =
schemaComputationWithAutoCreationList.get(indexOfDevicesWithMissingMeasurements.get(i));
indexOfMissingMeasurements =
remoteSchemaTree.compute(
schemaComputationWithAutoCreation, indexOfMissingMeasurementsList.get(i));
schemaComputationWithAutoCreation.recordRangeOfLogicalViewSchemaListNow();
if (!indexOfMissingMeasurements.isEmpty()) {
indexOfDevicesNeedAutoCreateSchema.add(indexOfDevicesWithMissingMeasurements.get(i));
indexOfMeasurementsNeedAutoCreate.add(indexOfMissingMeasurements);
}
}
// [Step 4] Fetch 2. Some fetched measurements in [Step 3] are views. Process them.
hasUnFetchedLogicalView = false;
for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
Pair<List<Integer>, List<String>> missedIndexAndPathString =
schemaCache.computeSourceOfLogicalView(schemaComputationWithAutoCreationList.get(i));
if (!missedIndexAndPathString.left.isEmpty()) {
hasUnFetchedLogicalView = true;
}
missedIndexAndPathStringOfViewList.get(i).left = missedIndexAndPathString.left;
missedIndexAndPathStringOfViewList.get(i).right = missedIndexAndPathString.right;
}
if (hasUnFetchedLogicalView) {
List<String> fullPathsNeedRefetch = new ArrayList<>();
for (Pair<List<Integer>, List<String>> pair : missedIndexAndPathStringOfViewList) {
fullPathsNeedRefetch.addAll(pair.right);
}
ClusterSchemaTree viewSchemaTree =
clusterSchemaFetchExecutor.fetchSchemaWithFullPaths(fullPathsNeedRefetch, context);
for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
schemaComputationWithAutoCreation = schemaComputationWithAutoCreationList.get(i);
viewSchemaTree.computeSourceOfLogicalView(
schemaComputationWithAutoCreation, missedIndexAndPathStringOfViewList.get(i).left);
}
}
// all schema has been taken and processed
if (indexOfDevicesNeedAutoCreateSchema.isEmpty()) {
return;
}
// [Step 5] Auto Create and process the missing schema
if (config.isAutoCreateSchemaEnabled()) {
List<PartialPath> devicePathList =
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::getDevicePath)
.collect(Collectors.toList());
List<Boolean> isAlignedRealList =
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::isAligned)
.collect(Collectors.toList());
// Check the isAligned value. If the input value is different from the actual value of the
// existing device, throw exception.
validateIsAlignedValueIfAutoCreate(isAlignedRealList, isAlignedPutInList, devicePathList);
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
autoCreateSchemaExecutor.autoCreateTimeSeries(
schemaTree,
devicePathList,
indexOfDevicesNeedAutoCreateSchema,
indexOfMeasurementsNeedAutoCreate,
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::getMeasurements)
.collect(Collectors.toList()),
schemaComputationWithAutoCreationList.stream()
.map(
o -> {
TSDataType[] dataTypes = new TSDataType[o.getMeasurements().length];
for (int i = 0, length = dataTypes.length; i < length; i++) {
dataTypes[i] = o.getDataType(i);
}
return dataTypes;
})
.collect(Collectors.toList()),
isAlignedPutInList,
context);
indexOfDevicesWithMissingMeasurements = new ArrayList<>();
indexOfMissingMeasurementsList = new ArrayList<>();
for (int i = 0; i < indexOfDevicesNeedAutoCreateSchema.size(); i++) {
schemaComputationWithAutoCreation =
schemaComputationWithAutoCreationList.get(indexOfDevicesNeedAutoCreateSchema.get(i));
indexOfMissingMeasurements =
schemaTree.compute(
schemaComputationWithAutoCreation, indexOfMeasurementsNeedAutoCreate.get(i));
if (!indexOfMissingMeasurements.isEmpty()) {
indexOfDevicesWithMissingMeasurements.add(indexOfDevicesNeedAutoCreateSchema.get(i));
indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
}
}
// all schema has been taken and processed
if (indexOfDevicesWithMissingMeasurements.isEmpty()) {
return;
}
} else {
indexOfDevicesWithMissingMeasurements = indexOfDevicesNeedAutoCreateSchema;
indexOfMissingMeasurementsList = indexOfMeasurementsNeedAutoCreate;
}
// offer null for the rest missing schema processing
for (int i = 0; i < indexOfDevicesWithMissingMeasurements.size(); i++) {
schemaComputationWithAutoCreation =
schemaComputationWithAutoCreationList.get(indexOfDevicesWithMissingMeasurements.get(i));
for (int index : indexOfMissingMeasurementsList.get(i)) {
schemaComputationWithAutoCreation.computeMeasurement(index, null);
}
}
}
private void validateIsAlignedValueIfAutoCreate(
List<Boolean> realValueList, List<Boolean> putInValueList, List<PartialPath> devicePathList) {
int checkLen =
Math.min(Math.min(realValueList.size(), putInValueList.size()), devicePathList.size());
for (int i = 0; i < checkLen; i++) {
validateIsAlignedValueIfAutoCreate(
realValueList.get(i), putInValueList.get(i), devicePathList.get(i));
}
}
private void validateIsAlignedValueIfAutoCreate(
boolean realValue, boolean putInValue, PartialPath devicePath) {
if (realValue != putInValue) {
String msg;
if (realValue) {
msg =
"Timeseries under this device is aligned, please use createTimeseries or change device.";
} else {
msg =
"Timeseries under this device is not aligned, please use createTimeseries or change device.";
}
throw new SemanticException(new AlignedTimeseriesException(msg, devicePath.getFullPath()));
}
}
}