blob: f4060e7c4ed05934f55f8b37a5b201e7632369fc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.mpp.plan.analyze;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.mpp.common.schematree.DeviceGroupSchemaTree;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
public class StandaloneSchemaFetcher implements ISchemaFetcher {
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
private StandaloneSchemaFetcher() {}
public static StandaloneSchemaFetcher getInstance() {
return new StandaloneSchemaFetcher();
}
@Override
public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
patternTree.constructTree();
Set<String> storageGroupSet = new HashSet<>();
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
List<PartialPath> pathPatterns = patternTree.getAllPathPatterns();
try {
for (PartialPath pathPattern : pathPatterns) {
List<PartialPath> storageGroups = localConfigNode.getBelongedStorageGroups(pathPattern);
for (PartialPath storageGroupPath : storageGroups) {
storageGroupSet.add(storageGroupPath.getFullPath());
SchemaRegionId schemaRegionId =
localConfigNode.getBelongedSchemaRegionId(storageGroupPath);
ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
schemaTree.appendMeasurementPaths(schemaRegion.getMeasurementPaths(pathPattern, false));
}
}
} catch (MetadataException e) {
throw new RuntimeException(e);
}
schemaTree.setStorageGroups(new ArrayList<>(storageGroupSet));
return schemaTree;
}
@Override
public ISchemaTree fetchSchemaWithAutoCreate(
PartialPath devicePath,
String[] measurements,
Function<Integer, TSDataType> getDataType,
boolean aligned) {
DeviceSchemaInfo deviceSchemaInfo =
getDeviceSchemaInfoWithAutoCreate(
devicePath,
measurements,
getDataType,
new TSEncoding[measurements.length],
new CompressionType[measurements.length],
aligned);
DeviceGroupSchemaTree schemaTree = new DeviceGroupSchemaTree();
schemaTree.addDeviceInfo(deviceSchemaInfo);
return schemaTree;
}
private DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
PartialPath devicePath,
String[] measurements,
Function<Integer, TSDataType> getDataType,
TSEncoding[] encodings,
CompressionType[] compressionTypes,
boolean aligned) {
try {
SchemaRegionId schemaRegionId = localConfigNode.getBelongedSchemaRegionId(devicePath);
ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
return schemaRegion.getDeviceSchemaInfoWithAutoCreate(
devicePath, measurements, getDataType, encodings, compressionTypes, aligned);
} catch (MetadataException e) {
throw new RuntimeException(e);
}
}
@Override
public ISchemaTree fetchSchemaListWithAutoCreate(
List<PartialPath> devicePathList,
List<String[]> measurementsList,
List<TSDataType[]> tsDataTypesList,
List<Boolean> isAlignedList) {
return fetchSchemaListWithAutoCreate(
devicePathList, measurementsList, tsDataTypesList, null, null, isAlignedList);
}
@Override
public ISchemaTree fetchSchemaListWithAutoCreate(
List<PartialPath> devicePathList,
List<String[]> measurementsList,
List<TSDataType[]> tsDataTypesList,
List<TSEncoding[]> encodingsList,
List<CompressionType[]> compressionTypesList,
List<Boolean> isAlignedList) {
Map<PartialPath, List<Integer>> deviceMap = new HashMap<>();
for (int i = 0, size = devicePathList.size(); i < size; i++) {
deviceMap.computeIfAbsent(devicePathList.get(i), k -> new ArrayList<>()).add(i);
}
DeviceGroupSchemaTree schemaTree = new DeviceGroupSchemaTree();
for (Map.Entry<PartialPath, List<Integer>> entry : deviceMap.entrySet()) {
int totalSize = 0;
boolean isAligned = isAlignedList.get(entry.getValue().get(0));
for (int index : entry.getValue()) {
if (isAlignedList.get(index) != isAligned) {
throw new StatementAnalyzeException(
String.format("Inconsistent device alignment of %s in insert plan.", entry.getKey()));
}
totalSize += measurementsList.get(index).length;
}
String[] measurements = new String[totalSize];
TSDataType[] tsDataTypes = new TSDataType[totalSize];
TSEncoding[] encodings = new TSEncoding[totalSize];
CompressionType[] compressionTypes = new CompressionType[totalSize];
int curPos = 0;
for (int index : entry.getValue()) {
System.arraycopy(
measurementsList.get(index),
0,
measurements,
curPos,
measurementsList.get(index).length);
System.arraycopy(
tsDataTypesList.get(index), 0, tsDataTypes, curPos, tsDataTypesList.get(index).length);
if (encodingsList != null) {
System.arraycopy(
encodingsList.get(index), 0, encodings, curPos, encodingsList.get(index).length);
}
if (compressionTypesList != null) {
System.arraycopy(
compressionTypesList.get(index),
0,
compressionTypes,
curPos,
compressionTypesList.get(index).length);
}
curPos += measurementsList.get(index).length;
}
schemaTree.addDeviceInfo(
getDeviceSchemaInfoWithAutoCreate(
entry.getKey(),
measurements,
index -> tsDataTypes[index],
encodings,
compressionTypes,
isAligned));
}
return schemaTree;
}
@Override
public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
return null;
}
@Override
public Map<Integer, Template> checkAllRelatedTemplate(PartialPath pathPattern) {
return null;
}
@Override
public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String templateName) {
return null;
}
@Override
public void invalidAllCache() {}
}