blob: 4182a1cb290c9821b2738a928fe892d736ec0b24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.mpp.common.schematree;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaEntityNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaInternalNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
import org.apache.iotdb.db.mpp.common.schematree.visitor.SchemaTreeDeviceVisitor;
import org.apache.iotdb.db.mpp.common.schematree.visitor.SchemaTreeMeasurementVisitor;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_MATCH_PATTERN;
import static org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode.SCHEMA_ENTITY_NODE;
import static org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode.SCHEMA_MEASUREMENT_NODE;
public class ClusterSchemaTree implements ISchemaTree {
private List<String> storageGroups;
private final SchemaNode root;
public ClusterSchemaTree() {
root = new SchemaInternalNode(PATH_ROOT);
}
public ClusterSchemaTree(SchemaNode root) {
this.root = root;
}
/**
* Return all measurement paths for given path pattern and filter the result by slimit and offset.
*
* @param pathPattern can be a pattern or a full path of timeseries.
* @param isPrefixMatch if true, the path pattern is used to match prefix path
* @return Left: all measurement paths; Right: remaining series offset
*/
@Override
public Pair<List<MeasurementPath>, Integer> searchMeasurementPaths(
PartialPath pathPattern, int slimit, int soffset, boolean isPrefixMatch) {
SchemaTreeMeasurementVisitor visitor =
new SchemaTreeMeasurementVisitor(root, pathPattern, slimit, soffset, isPrefixMatch);
return new Pair<>(visitor.getAllResult(), visitor.getNextOffset());
}
@Override
public Pair<List<MeasurementPath>, Integer> searchMeasurementPaths(PartialPath pathPattern) {
SchemaTreeMeasurementVisitor visitor =
new SchemaTreeMeasurementVisitor(
root,
pathPattern,
IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum() + 1,
0,
false);
return new Pair<>(visitor.getAllResult(), visitor.getNextOffset());
}
@Override
public List<MeasurementPath> getAllMeasurement() {
return searchMeasurementPaths(ALL_MATCH_PATTERN, 0, 0, false).left;
}
/**
* Get all device matching the path pattern.
*
* @param pathPattern the pattern of the target devices.
* @return A HashSet instance which stores info of the devices matching the given path pattern.
*/
@Override
public List<DeviceSchemaInfo> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch) {
SchemaTreeDeviceVisitor visitor = new SchemaTreeDeviceVisitor(root, pathPattern, isPrefixMatch);
return visitor.getAllResult();
}
@Override
public List<DeviceSchemaInfo> getMatchedDevices(PartialPath pathPattern) {
SchemaTreeDeviceVisitor visitor = new SchemaTreeDeviceVisitor(root, pathPattern, false);
return visitor.getAllResult();
}
@Override
public DeviceSchemaInfo searchDeviceSchemaInfo(
PartialPath devicePath, List<String> measurements) {
String[] nodes = devicePath.getNodes();
SchemaNode cur = root;
for (int i = 1; i < nodes.length; i++) {
if (cur == null) {
return null;
}
cur = cur.getChild(nodes[i]);
}
if (cur == null) {
return null;
}
List<MeasurementSchemaInfo> measurementSchemaInfoList = new ArrayList<>();
SchemaNode node;
SchemaMeasurementNode measurementNode;
for (String measurement : measurements) {
node = cur.getChild(measurement);
if (node == null) {
measurementSchemaInfoList.add(null);
} else {
measurementNode = node.getAsMeasurementNode();
measurementSchemaInfoList.add(
new MeasurementSchemaInfo(
measurementNode.getName(),
measurementNode.getSchema(),
measurementNode.getAlias()));
}
}
return new DeviceSchemaInfo(
devicePath, cur.getAsEntityNode().isAligned(), measurementSchemaInfoList);
}
public void appendMeasurementPaths(List<MeasurementPath> measurementPathList) {
for (MeasurementPath measurementPath : measurementPathList) {
appendSingleMeasurementPath(measurementPath);
}
}
private void appendSingleMeasurementPath(MeasurementPath measurementPath) {
appendSingleMeasurement(
measurementPath,
(MeasurementSchema) measurementPath.getMeasurementSchema(),
measurementPath.isMeasurementAliasExists() ? measurementPath.getMeasurementAlias() : null,
measurementPath.isUnderAlignedEntity());
}
public void appendSingleMeasurement(
PartialPath path, MeasurementSchema schema, String alias, boolean isAligned) {
String[] nodes = path.getNodes();
SchemaNode cur = root;
SchemaNode child;
for (int i = 1; i < nodes.length; i++) {
child = cur.getChild(nodes[i]);
if (child == null) {
if (i == nodes.length - 1) {
SchemaMeasurementNode measurementNode = new SchemaMeasurementNode(nodes[i], schema);
if (alias != null) {
measurementNode.setAlias(alias);
cur.getAsEntityNode().addAliasChild(alias, measurementNode);
}
child = measurementNode;
} else if (i == nodes.length - 2) {
SchemaEntityNode entityNode = new SchemaEntityNode(nodes[i]);
entityNode.setAligned(isAligned);
child = entityNode;
} else {
child = new SchemaInternalNode(nodes[i]);
}
cur.addChild(nodes[i], child);
} else if (i == nodes.length - 2 && !child.isEntity()) {
SchemaEntityNode entityNode = new SchemaEntityNode(nodes[i]);
cur.replaceChild(nodes[i], entityNode);
child = entityNode;
}
cur = child;
}
}
public void mergeSchemaTree(ClusterSchemaTree schemaTree) {
traverseAndMerge(this.root, null, schemaTree.root);
}
private void traverseAndMerge(SchemaNode thisNode, SchemaNode thisParent, SchemaNode thatNode) {
SchemaNode thisChild;
for (SchemaNode thatChild : thatNode.getChildren().values()) {
thisChild = thisNode.getChild(thatChild.getName());
if (thisChild == null) {
thisNode.addChild(thatChild.getName(), thatChild);
if (thatChild.isMeasurement()) {
SchemaEntityNode entityNode;
if (thisNode.isEntity()) {
entityNode = thisNode.getAsEntityNode();
} else {
entityNode = new SchemaEntityNode(thisNode.getName());
thisParent.replaceChild(thisNode.getName(), entityNode);
thisNode = entityNode;
}
if (!entityNode.isAligned()) {
entityNode.setAligned(thatNode.getAsEntityNode().isAligned());
}
SchemaMeasurementNode measurementNode = thatChild.getAsMeasurementNode();
if (measurementNode.getAlias() != null) {
entityNode.addAliasChild(measurementNode.getAlias(), measurementNode);
}
}
} else {
traverseAndMerge(thisChild, thisNode, thatChild);
}
}
}
public void serialize(OutputStream outputStream) throws IOException {
root.serialize(outputStream);
}
public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOException {
byte nodeType;
int childNum;
Deque<SchemaNode> stack = new ArrayDeque<>();
SchemaNode child;
while (inputStream.available() > 0) {
nodeType = ReadWriteIOUtils.readByte(inputStream);
if (nodeType == SCHEMA_MEASUREMENT_NODE) {
SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream);
stack.push(measurementNode);
} else {
SchemaInternalNode internalNode;
if (nodeType == SCHEMA_ENTITY_NODE) {
internalNode = SchemaEntityNode.deserialize(inputStream);
} else {
internalNode = SchemaInternalNode.deserialize(inputStream);
}
childNum = ReadWriteIOUtils.readInt(inputStream);
while (childNum > 0) {
child = stack.pop();
internalNode.addChild(child.getName(), child);
if (child.isMeasurement()) {
SchemaMeasurementNode measurementNode = child.getAsMeasurementNode();
if (measurementNode.getAlias() != null) {
internalNode
.getAsEntityNode()
.addAliasChild(measurementNode.getAlias(), measurementNode);
}
}
childNum--;
}
stack.push(internalNode);
}
}
return new ClusterSchemaTree(stack.poll());
}
/**
* Get storage group name by path
*
* <p>e.g., root.sg1 is a storage group and path = root.sg1.d1, return root.sg1
*
* @param pathName only full path, cannot be path pattern
* @return storage group in the given path
*/
@Override
public String getBelongedStorageGroup(String pathName) {
for (String storageGroup : storageGroups) {
if (PathUtils.isStartWith(pathName, storageGroup)) {
return storageGroup;
}
}
throw new RuntimeException("No matched storage group. Please check the path " + pathName);
}
@Override
public String getBelongedStorageGroup(PartialPath path) {
return getBelongedStorageGroup(path.getFullPath());
}
@Override
public List<String> getStorageGroups() {
return storageGroups;
}
public void setStorageGroups(List<String> storageGroups) {
this.storageGroups = storageGroups;
}
@TestOnly
SchemaNode getRoot() {
return root;
}
@Override
public boolean isEmpty() {
return root.getChildren() == null || root.getChildren().size() == 0;
}
}