blob: 82ce99c2bc2087d6e7f73d7e9b48370942f33fb8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.common.schematree;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.schematree.node.SchemaEntityNode;
import org.apache.iotdb.db.queryengine.common.schematree.node.SchemaInternalNode;
import org.apache.iotdb.db.queryengine.common.schematree.node.SchemaMeasurementNode;
import org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode;
import org.apache.iotdb.db.queryengine.common.schematree.visitor.SchemaTreeDeviceUsingTemplateVisitor;
import org.apache.iotdb.db.queryengine.common.schematree.visitor.SchemaTreeDeviceVisitor;
import org.apache.iotdb.db.queryengine.common.schematree.visitor.SchemaTreeVisitorFactory;
import org.apache.iotdb.db.queryengine.common.schematree.visitor.SchemaTreeVisitorWithLimitOffsetWrapper;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputation;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_PATTERN;
import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;
import static org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode.SCHEMA_ENTITY_NODE;
import static org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode.SCHEMA_MEASUREMENT_NODE;
public class ClusterSchemaTree implements ISchemaTree {
private static final ClusterTemplateManager templateManager =
ClusterTemplateManager.getInstance();
private Set<String> databases;
private final SchemaNode root;
/** a flag recording whether there is logical view in this schema tree. */
private boolean hasLogicalMeasurementPath = false;
/** used to judge schema tree type */
private boolean hasNormalTimeSeries = false;
private Map<Integer, Template> templateMap = new HashMap<>();
private PathPatternTree authorityScope;
public ClusterSchemaTree() {
root = new SchemaInternalNode(PATH_ROOT);
}
public ClusterSchemaTree(SchemaNode root) {
this.root = root;
}
public void setTemplateMap(Map<Integer, Template> templateMap) {
this.templateMap = templateMap;
}
/**
* Return all measurement paths for given path pattern and filter the result by slimit and offset.
*
* @param pathPattern can be a pattern or a full path of timeseries.
* @param isPrefixMatch if true, the path pattern is used to match prefix path
* @return Left: all measurement paths; Right: remaining series offset
*/
@Override
public Pair<List<MeasurementPath>, Integer> searchMeasurementPaths(
PartialPath pathPattern, int slimit, int soffset, boolean isPrefixMatch) {
try (SchemaTreeVisitorWithLimitOffsetWrapper<MeasurementPath> visitor =
SchemaTreeVisitorFactory.createSchemaTreeMeasurementVisitor(
root, pathPattern, isPrefixMatch, slimit, soffset, authorityScope)) {
visitor.setTemplateMap(templateMap);
return new Pair<>(visitor.getAllResult(), visitor.getNextOffset());
}
}
@Override
public Pair<List<MeasurementPath>, Integer> searchMeasurementPaths(PartialPath pathPattern) {
return searchMeasurementPaths(pathPattern, 0, 0, false);
}
public List<DeviceSchemaInfo> getAllDevices() {
return getMatchedDevices(ALL_MATCH_PATTERN);
}
/**
* Get all device matching the path pattern.
*
* @param pathPattern the pattern of the target devices.
* @return A HashSet instance which stores info of the devices matching the given path pattern.
*/
@Override
public List<DeviceSchemaInfo> getMatchedDevices(PartialPath pathPattern) {
return getMatchedDevices(pathPattern, false);
}
private List<DeviceSchemaInfo> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch) {
try (SchemaTreeDeviceVisitor visitor =
SchemaTreeVisitorFactory.createSchemaTreeDeviceVisitor(root, pathPattern, isPrefixMatch)) {
return visitor.getAllResult();
}
}
/**
* Get measurement schema info from entity node children or template.
*
* @param entityNode entity node
* @param measurement measurement name
* @return measurement schema info, null if not found
*/
private IMeasurementSchemaInfo getMeasurementSchemaInfo(
SchemaEntityNode entityNode, String measurement) {
SchemaNode node = entityNode.getChild(measurement);
if (node != null && node.isMeasurement()) {
return node.getAsMeasurementNode();
} else {
Template template = templateMap.get(entityNode.getTemplateId());
if (template != null && template.getSchemaMap().containsKey(measurement)) {
return new MeasurementSchemaInfo(measurement, template.getSchema(measurement), null, null);
}
}
return null;
}
@Override
public DeviceSchemaInfo searchDeviceSchemaInfo(
PartialPath devicePath, List<String> measurements) {
String[] nodes = devicePath.getNodes();
SchemaNode cur = root;
for (int i = 1; i < nodes.length; i++) {
if (cur == null) {
return null;
}
cur = cur.getChild(nodes[i]);
}
if (cur == null || !cur.isEntity()) {
return null;
}
SchemaEntityNode entityNode = cur.getAsEntityNode();
List<IMeasurementSchemaInfo> measurementSchemaInfoList = new ArrayList<>();
for (String measurement : measurements) {
measurementSchemaInfoList.add(getMeasurementSchemaInfo(entityNode, measurement));
}
return new DeviceSchemaInfo(
devicePath, entityNode.isAligned(), entityNode.getTemplateId(), measurementSchemaInfoList);
}
public List<Integer> compute(
ISchemaComputation schemaComputation, List<Integer> indexOfTargetMeasurements) {
PartialPath devicePath = schemaComputation.getDevicePath();
String[] measurements = schemaComputation.getMeasurements();
String[] nodes = devicePath.getNodes();
SchemaNode cur = root;
for (int i = 1; i < nodes.length; i++) {
if (cur == null) {
return indexOfTargetMeasurements;
}
cur = cur.getChild(nodes[i]);
}
if (cur == null || !cur.isEntity()) {
return indexOfTargetMeasurements;
}
SchemaEntityNode entityNode = cur.getAsEntityNode();
boolean firstNonViewMeasurement = true;
List<Integer> indexOfMissingMeasurements = new ArrayList<>();
for (int index : indexOfTargetMeasurements) {
IMeasurementSchemaInfo measurementSchemaInfo =
getMeasurementSchemaInfo(entityNode, measurements[index]);
if (measurementSchemaInfo == null) {
indexOfMissingMeasurements.add(index);
} else {
if (firstNonViewMeasurement && !measurementSchemaInfo.isLogicalView()) {
schemaComputation.computeDevice(cur.getAsEntityNode().isAligned());
firstNonViewMeasurement = false;
}
schemaComputation.computeMeasurement(index, measurementSchemaInfo);
}
}
return indexOfMissingMeasurements;
}
/**
* This function compute logical view and fill source of these views. It returns nothing ! If some
* source paths are missed, throw errors.
*
* @param schemaComputation the statement
* @param indexOfTargetLogicalView the index list of logicalViewSchemaList that you want to check
* @throws SemanticException path not exist or different source path of view
*/
public void computeSourceOfLogicalView(
ISchemaComputation schemaComputation, List<Integer> indexOfTargetLogicalView)
throws SemanticException {
if (!schemaComputation.hasLogicalViewNeedProcess()) {
return;
}
List<LogicalViewSchema> logicalViewSchemaList = schemaComputation.getLogicalViewSchemaList();
for (Integer index : indexOfTargetLogicalView) {
LogicalViewSchema logicalViewSchema = logicalViewSchemaList.get(index);
PartialPath fullPath = logicalViewSchema.getSourcePathIfWritable();
Pair<List<MeasurementPath>, Integer> searchResult = this.searchMeasurementPaths(fullPath);
List<MeasurementPath> measurementPathList = searchResult.left;
if (measurementPathList.isEmpty()) {
throw new SemanticException(
new PathNotExistException(
fullPath.getFullPath(),
schemaComputation
.getDevicePath()
.concatNode(logicalViewSchema.getMeasurementId())
.getFullPath()));
} else if (measurementPathList.size() > 1) {
throw new SemanticException(
String.format(
"The source paths [%s] of view [%s] are multiple.",
fullPath.getFullPath(),
schemaComputation
.getDevicePath()
.concatNode(logicalViewSchema.getMeasurementId())));
} else {
Integer realIndex = schemaComputation.getIndexListOfLogicalViewPaths().get(index);
MeasurementPath measurementPath = measurementPathList.get(0);
schemaComputation.computeMeasurementOfView(
realIndex,
new MeasurementSchemaInfo(
measurementPath.getMeasurement(),
measurementPath.getMeasurementSchema(),
null,
measurementPath.getTagMap()),
measurementPath.isUnderAlignedEntity());
}
}
}
/**
* Append a template device to the schema tree.
*
* @param devicePath device path
* @param isAligned whether the device is aligned
* @param templateId template id
* @param template template instance, used to search measurement schema, it can be null if
* write-only
*/
public void appendTemplateDevice(
PartialPath devicePath, boolean isAligned, int templateId, Template template) {
String[] nodes = devicePath.getNodes();
SchemaNode cur = root;
SchemaNode child;
for (int i = 1; i < nodes.length - 1; i++) {
child = cur.getChild(nodes[i]);
if (child == null) {
child = new SchemaInternalNode(nodes[i]);
cur.addChild(nodes[i], child);
}
cur = child;
}
String deviceName = nodes[nodes.length - 1];
child = cur.getChild(deviceName);
if (child == null) {
SchemaEntityNode entityNode = new SchemaEntityNode(deviceName);
entityNode.setAligned(isAligned);
entityNode.setTemplateId(templateId);
cur.addChild(deviceName, entityNode);
} else if (child.isEntity()) {
child.getAsEntityNode().setTemplateId(templateId);
child.getAsEntityNode().setAligned(isAligned);
} else {
SchemaEntityNode entityNode = new SchemaEntityNode(deviceName);
entityNode.setAligned(isAligned);
entityNode.setTemplateId(templateId);
cur.replaceChild(deviceName, entityNode);
}
templateMap.putIfAbsent(templateId, template);
}
public void appendMeasurementPaths(List<MeasurementPath> measurementPathList) {
for (MeasurementPath measurementPath : measurementPathList) {
appendSingleMeasurementPath(measurementPath);
}
}
public void appendSingleMeasurementPath(MeasurementPath measurementPath) {
appendSingleMeasurement(
measurementPath,
measurementPath.getMeasurementSchema(),
measurementPath.getTagMap(),
measurementPath.isMeasurementAliasExists() ? measurementPath.getMeasurementAlias() : null,
measurementPath.isUnderAlignedEntity());
}
public void appendSingleMeasurement(
PartialPath path,
IMeasurementSchema schema,
Map<String, String> tagMap,
String alias,
boolean isAligned) {
String[] nodes = path.getNodes();
SchemaNode cur = root;
SchemaNode child;
for (int i = 1; i < nodes.length; i++) {
child = cur.getChild(nodes[i]);
if (child == null) {
if (i == nodes.length - 1) {
SchemaMeasurementNode measurementNode = new SchemaMeasurementNode(nodes[i], schema);
if (alias != null) {
measurementNode.setAlias(alias);
cur.getAsEntityNode().addAliasChild(alias, measurementNode);
}
measurementNode.setTagMap(tagMap);
child = measurementNode;
if (schema.isLogicalView()) {
this.hasLogicalMeasurementPath = true;
}
} else if (i == nodes.length - 2) {
SchemaEntityNode entityNode = new SchemaEntityNode(nodes[i]);
entityNode.setAligned(isAligned);
child = entityNode;
} else {
child = new SchemaInternalNode(nodes[i]);
}
cur.addChild(nodes[i], child);
} else if (i == nodes.length - 2 && !child.isEntity()) {
SchemaEntityNode entityNode = new SchemaEntityNode(nodes[i]);
cur.replaceChild(nodes[i], entityNode);
if (!entityNode.isAligned()) {
entityNode.setAligned(isAligned);
}
child = entityNode;
}
cur = child;
}
hasNormalTimeSeries = true;
}
@Override
public void mergeSchemaTree(ISchemaTree schemaTree) {
if (schemaTree instanceof ClusterSchemaTree) {
this.mergeSchemaTree((ClusterSchemaTree) schemaTree);
}
}
@Override
public boolean hasNormalTimeSeries() {
return hasNormalTimeSeries;
}
@Override
public List<Template> getUsingTemplates() {
return new ArrayList<>(templateMap.values());
}
@Override
public List<PartialPath> getDeviceUsingTemplate(int templateId) {
try (SchemaTreeDeviceUsingTemplateVisitor visitor =
SchemaTreeVisitorFactory.createSchemaTreeDeviceUsingTemplateVisitor(
root, ALL_MATCH_PATTERN, templateId)) {
return visitor.getAllResult();
}
}
public void mergeSchemaTree(ClusterSchemaTree schemaTree) {
this.hasLogicalMeasurementPath =
this.hasLogicalMeasurementPath || schemaTree.hasLogicalViewMeasurement();
traverseAndMerge(this.root, null, schemaTree.root);
this.templateMap.putAll(schemaTree.templateMap);
this.hasNormalTimeSeries |= schemaTree.hasNormalTimeSeries;
}
private void traverseAndMerge(SchemaNode thisNode, SchemaNode thisParent, SchemaNode thatNode) {
SchemaNode thisChild;
SchemaEntityNode entityNode;
// merge template device
if (thatNode.isEntity() && thatNode.getAsEntityNode().getTemplateId() != NON_TEMPLATE) {
if (thisNode.isEntity()) {
entityNode = thisNode.getAsEntityNode();
} else {
entityNode = new SchemaEntityNode(thisNode.getName());
thisParent.replaceChild(thisNode.getName(), entityNode);
thisNode = entityNode;
}
entityNode.setTemplateId(thatNode.getAsEntityNode().getTemplateId());
}
// merge normal device, internal node and measurement
for (SchemaNode thatChild : thatNode.getChildren().values()) {
thisChild = thisNode.getChild(thatChild.getName());
if (thisChild == null) {
thisNode.addChild(thatChild.getName(), thatChild);
if (thatChild.isMeasurement()) {
SchemaEntityNode thatEntity = thatNode.getAsEntityNode();
if (thisNode.isEntity()) {
entityNode = thisNode.getAsEntityNode();
} else {
entityNode = new SchemaEntityNode(thisNode.getName());
thisParent.replaceChild(thisNode.getName(), entityNode);
thisNode = entityNode;
}
if (!entityNode.isAligned()) {
entityNode.setAligned(thatEntity.isAligned());
}
SchemaMeasurementNode measurementNode = thatChild.getAsMeasurementNode();
if (measurementNode.getAlias() != null) {
entityNode.addAliasChild(measurementNode.getAlias(), measurementNode);
}
}
} else {
traverseAndMerge(thisChild, thisNode, thatChild);
}
}
}
@Override
public boolean hasLogicalViewMeasurement() {
return this.hasLogicalMeasurementPath;
}
public void serialize(OutputStream outputStream) throws IOException {
root.serialize(outputStream);
}
public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOException {
byte nodeType;
int childNum;
Deque<SchemaNode> stack = new ArrayDeque<>();
SchemaNode child;
boolean hasLogicalView = false;
boolean hasNormalTimeSeries = false;
Map<Integer, Template> templateMap = new HashMap<>();
while (inputStream.available() > 0) {
nodeType = ReadWriteIOUtils.readByte(inputStream);
if (nodeType == SCHEMA_MEASUREMENT_NODE) {
SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream);
stack.push(measurementNode);
if (measurementNode.isLogicalView()) {
hasLogicalView = true;
}
hasNormalTimeSeries = true;
} else {
SchemaInternalNode internalNode;
if (nodeType == SCHEMA_ENTITY_NODE) {
internalNode = SchemaEntityNode.deserialize(inputStream);
int templateId = internalNode.getAsEntityNode().getTemplateId();
if (templateId != NON_TEMPLATE) {
templateMap.putIfAbsent(templateId, templateManager.getTemplate(templateId));
}
} else {
internalNode = SchemaInternalNode.deserialize(inputStream);
}
childNum = ReadWriteIOUtils.readInt(inputStream);
while (childNum > 0) {
child = stack.pop();
internalNode.addChild(child.getName(), child);
if (child.isMeasurement()) {
SchemaMeasurementNode measurementNode = child.getAsMeasurementNode();
if (measurementNode.getAlias() != null) {
internalNode
.getAsEntityNode()
.addAliasChild(measurementNode.getAlias(), measurementNode);
}
}
childNum--;
}
stack.push(internalNode);
}
}
ClusterSchemaTree result = new ClusterSchemaTree(stack.poll());
result.templateMap = templateMap;
result.hasLogicalMeasurementPath = hasLogicalView;
result.hasNormalTimeSeries = hasNormalTimeSeries;
return result;
}
/**
* Get database name by path
*
* <p>e.g., root.sg1 is a database and path = root.sg1.d1, return root.sg1
*
* @param pathName only full path, cannot be path pattern
* @return database in the given path
* @throws SemanticException no matched database
*/
@Override
public String getBelongedDatabase(String pathName) {
for (String database : databases) {
if (PathUtils.isStartWith(pathName, database)) {
return database;
}
}
throw new SemanticException("No matched database. Please check the path " + pathName);
}
@Override
public String getBelongedDatabase(PartialPath path) {
return getBelongedDatabase(path.getFullPath());
}
@Override
public Set<String> getDatabases() {
return databases;
}
@Override
public void setDatabases(Set<String> databases) {
this.databases = databases;
}
@TestOnly
SchemaNode getRoot() {
return root;
}
@Override
public boolean isEmpty() {
return root.getChildren() == null || root.getChildren().isEmpty();
}
}