blob: 7bfd9c4b8b1d431f8759b0cef5eb862deacd1100 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.MNodeTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
import org.apache.iotdb.db.metadata.LocalSchemaProcessor.StorageGroupFilter;
import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.idtable.IDTableManager;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.mnode.MNodeType;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaRegionUtils;
import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.REntityMNode;
import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeType;
import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeValueType;
import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMeasurementMNode;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.ActivateTemplateInClusterPlan;
import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.utils.EncodingInferenceUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import com.google.common.collect.MapMaker;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.rocksdb.Holder;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ALL_NODE_TYPE_ARRAY;
import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DEFAULT_NODE_VALUE;
import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_IS_ALIGNED;
import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ALIAS;
import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ENTITY;
import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_MEASUREMENT;
import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ROOT_STRING;
import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.TABLE_NAME_TAGS;
public class RSchemaRegion implements ISchemaRegion {
private static final Logger logger = LoggerFactory.getLogger(RSchemaRegion.class);
protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
// TODO: make it configurable
public static final int MAX_PATH_DEPTH = 10;
private static final long MAX_LOCK_WAIT_TIME = 50;
private final RSchemaReadWriteHandler readWriteHandler;
private final ReadWriteLock deleteUpdateLock = new ReentrantReadWriteLock();
private final Map<String, ReentrantLock> locksPool =
new MapMaker().weakValues().initialCapacity(10000).makeMap();
private String schemaRegionDirPath;
private String storageGroupFullPath;
private SchemaRegionId schemaRegionId;
private IStorageGroupMNode storageGroupMNode;
private int storageGroupPathLevel;
public RSchemaRegion() throws MetadataException {
try {
readWriteHandler = new RSchemaReadWriteHandler();
} catch (RocksDBException e) {
logger.error("create RocksDBReadWriteHandler fail", e);
throw new MetadataException(e);
}
}
public RSchemaRegion(
PartialPath storageGroup,
SchemaRegionId schemaRegionId,
IStorageGroupMNode storageGroupMNode,
RSchemaConfLoader rSchemaConfLoader)
throws MetadataException {
this.schemaRegionId = schemaRegionId;
storageGroupFullPath = storageGroup.getFullPath();
this.storageGroupMNode = storageGroupMNode;
init();
try {
readWriteHandler = new RSchemaReadWriteHandler(schemaRegionDirPath, rSchemaConfLoader);
} catch (RocksDBException e) {
logger.error("create RocksDBReadWriteHandler fail", e);
throw new MetadataException(e);
}
}
@Override
public void init() throws MetadataException {
schemaRegionDirPath =
config.getSchemaDir()
+ File.separator
+ storageGroupFullPath
+ File.separator
+ schemaRegionId.getId();
File schemaRegionFolder = SystemFileFactory.INSTANCE.getFile(schemaRegionDirPath);
if (!schemaRegionFolder.exists()) {
if (schemaRegionFolder.mkdirs()) {
logger.info("create schema region folder {}", schemaRegionDirPath);
} else {
if (!schemaRegionFolder.exists()) {
logger.error("create schema region folder {} failed.", schemaRegionDirPath);
throw new SchemaDirCreationFailureException(schemaRegionDirPath);
}
}
}
storageGroupPathLevel = RSchemaUtils.getLevelByPartialPath(storageGroupFullPath);
}
@Override
public void forceMlog() {
// do nothing
}
@Override
public SchemaRegionId getSchemaRegionId() {
return schemaRegionId;
}
@Override
public String getStorageGroupFullPath() {
return storageGroupFullPath;
}
@Override
public void deleteSchemaRegion() throws MetadataException {
clear();
SchemaRegionUtils.deleteSchemaRegionFolder(schemaRegionDirPath, logger);
}
@Override
public boolean createSnapshot(File snapshotDir) {
// todo implement this
throw new UnsupportedOperationException(
"Rocksdb mode currently doesn't support snapshot feature.");
}
@Override
public void loadSnapshot(File latestSnapshotRootDir) {
// todo implement this
throw new UnsupportedOperationException(
"Rocksdb mode currently doesn't support snapshot feature.");
}
@Override
public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
try {
if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
createTimeseries(
plan.getPath(),
new MeasurementSchema(
plan.getPath().getMeasurement(),
plan.getDataType(),
plan.getEncoding(),
plan.getCompressor(),
plan.getProps()),
plan.getAlias(),
plan.getTags(),
plan.getAttributes());
// update id table if id table log file is disabled
if (config.isEnableIDTable() && !config.isEnableIDTableLogFile()) {
IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPath().getDevicePath());
idTable.createTimeseries(plan);
}
} else {
throw new AcquireLockTimeoutException(
"Acquire lock timeout when creating timeseries: " + plan.getPath().getFullPath());
}
} catch (InterruptedException e) {
logger.warn("Acquire lock interrupted", e);
Thread.currentThread().interrupt();
} finally {
deleteUpdateLock.readLock().unlock();
}
}
@TestOnly
protected void createTimeseries(
PartialPath path,
TSDataType dataType,
TSEncoding encoding,
CompressionType compressor,
Map<String, String> props)
throws MetadataException {
createTimeseries(path, dataType, encoding, compressor, props, null);
}
@TestOnly
protected void createTimeseries(
PartialPath path,
TSDataType dataType,
TSEncoding encoding,
CompressionType compressor,
Map<String, String> props,
String alias)
throws MetadataException {
createTimeseries(
path,
new MeasurementSchema(path.getMeasurement(), dataType, encoding, compressor, props),
alias,
null,
null);
}
protected void createTimeseries(
PartialPath path,
IMeasurementSchema schema,
String alias,
Map<String, String> tags,
Map<String, String> attributes)
throws MetadataException {
// regular check
if (path.getNodes().length > RSchemaRegion.MAX_PATH_DEPTH) {
throw new IllegalPathException(
String.format(
"path is too long, provide: %d, max: %d",
path.getNodeLength(), RSchemaRegion.MAX_PATH_DEPTH));
}
MetaFormatUtils.checkTimeseries(path);
MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), schema.getProps());
// sg check and create
String[] nodes = path.getNodes();
SchemaUtils.checkDataTypeWithEncoding(schema.getType(), schema.getEncodingType());
try {
createTimeSeriesRecursively(
nodes, nodes.length, storageGroupPathLevel, schema, alias, tags, attributes);
// TODO: load tags to memory
} catch (RocksDBException | IOException e) {
throw new MetadataException(e);
} catch (InterruptedException e) {
logger.warn("Acquire lock interrupted", e);
Thread.currentThread().interrupt();
}
}
private void createTimeSeriesRecursively(
String[] nodes,
int start,
int end,
IMeasurementSchema schema,
String alias,
Map<String, String> tags,
Map<String, String> attributes)
throws InterruptedException, MetadataException, RocksDBException, IOException {
if (start <= end) {
// "ROOT" node must exist and don't need to check
return;
}
String levelPath = RSchemaUtils.getLevelPath(nodes, start - 1);
Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
try {
CheckKeyResult checkResult = readWriteHandler.keyExistByAllTypes(levelPath);
if (!checkResult.existAnyKey()) {
createTimeSeriesRecursively(nodes, start - 1, end, schema, alias, tags, attributes);
if (start == nodes.length) {
createTimeSeriesNode(nodes, levelPath, schema, alias, tags, attributes);
} else if (start == nodes.length - 1) {
readWriteHandler.createNode(levelPath, RMNodeType.ENTITY, DEFAULT_NODE_VALUE);
} else {
readWriteHandler.createNode(levelPath, RMNodeType.INTERNAL, DEFAULT_NODE_VALUE);
}
} else {
if (start == nodes.length) {
throw new PathAlreadyExistException(RSchemaUtils.getPathByLevelPath(levelPath));
}
if (start == nodes.length - 1) {
if (checkResult.getResult(RMNodeType.INTERNAL)) {
// convert the parent node to entity if it is internal node
readWriteHandler.convertToEntityNode(levelPath, DEFAULT_NODE_VALUE);
} else if (checkResult.getResult(RMNodeType.ENTITY)) {
if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
throw new AlignedTimeseriesException(
"Timeseries under this entity is aligned, please use createAlignedTimeseries"
+ " or change entity.",
RSchemaUtils.getPathByLevelPath(levelPath));
}
} else {
throw new MNodeTypeMismatchException(
RSchemaUtils.getPathByLevelPath(levelPath), MetadataConstant.ENTITY_MNODE_TYPE);
}
}
if (checkResult.getResult(RMNodeType.MEASUREMENT)
|| checkResult.getResult(RMNodeType.ALISA)) {
throw new MNodeTypeMismatchException(
RSchemaUtils.getPathByLevelPath(levelPath), MetadataConstant.INTERNAL_MNODE_TYPE);
}
}
} finally {
lock.unlock();
}
} else {
lock.unlock();
throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
}
}
private void createTimeSeriesNode(
String[] nodes,
String levelPath,
IMeasurementSchema schema,
String alias,
Map<String, String> tags,
Map<String, String> attributes)
throws IOException, RocksDBException, MetadataException, InterruptedException {
// create time-series node
try (WriteBatch batch = new WriteBatch()) {
byte[] value = RSchemaUtils.buildMeasurementNodeValue(schema, alias, tags, attributes);
byte[] measurementKey = RSchemaUtils.toMeasurementNodeKey(levelPath);
batch.put(measurementKey, value);
// measurement with tags will save in a separate table at the same time
if (tags != null && !tags.isEmpty()) {
batch.put(
readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS),
measurementKey,
DEFAULT_NODE_VALUE);
}
if (StringUtils.isNotEmpty(alias)) {
String[] aliasNodes = Arrays.copyOf(nodes, nodes.length);
aliasNodes[nodes.length - 1] = alias;
String aliasLevelPath = RSchemaUtils.getLevelPath(aliasNodes, aliasNodes.length - 1);
byte[] aliasNodeKey = RSchemaUtils.toAliasNodeKey(aliasLevelPath);
Lock lock = locksPool.computeIfAbsent(aliasLevelPath, x -> new ReentrantLock());
if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
try {
if (!readWriteHandler.keyExistByAllTypes(aliasLevelPath).existAnyKey()) {
batch.put(aliasNodeKey, RSchemaUtils.buildAliasNodeValue(measurementKey));
readWriteHandler.executeBatch(batch);
} else {
throw new AliasAlreadyExistException(
RSchemaUtils.getPathByLevelPath(levelPath), alias);
}
} finally {
lock.unlock();
}
} else {
lock.unlock();
throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
}
} else {
readWriteHandler.executeBatch(batch);
}
}
}
private void createAlignedTimeSeries(
PartialPath prefixPath,
List<String> measurements,
List<TSDataType> dataTypes,
List<TSEncoding> encodings)
throws MetadataException {
if (prefixPath.getNodeLength() > MAX_PATH_DEPTH - 1) {
throw new IllegalPathException(
String.format(
"Prefix path is too long, provide: %d, max: %d",
prefixPath.getNodeLength(), RSchemaRegion.MAX_PATH_DEPTH - 1));
}
MetaFormatUtils.checkTimeseries(prefixPath);
for (int i = 0; i < measurements.size(); i++) {
SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
MetaFormatUtils.checkNodeName(measurements.get(i));
}
try (WriteBatch batch = new WriteBatch()) {
createEntityRecursively(
prefixPath.getNodes(), prefixPath.getNodeLength(), storageGroupPathLevel + 1, true);
String[] locks = new String[measurements.size()];
for (int i = 0; i < measurements.size(); i++) {
String measurement = measurements.get(i);
String levelPath = RSchemaUtils.getMeasurementLevelPath(prefixPath.getNodes(), measurement);
locks[i] = levelPath;
MeasurementSchema schema =
new MeasurementSchema(measurement, dataTypes.get(i), encodings.get(i));
byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
byte[] value = RSchemaUtils.buildMeasurementNodeValue(schema, null, null, null);
batch.put(key, value);
}
for (String lockKey : locks) {
Lock lock = locksPool.computeIfAbsent(lockKey, x -> new ReentrantLock());
if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
try {
if (readWriteHandler.keyExistByAllTypes(lockKey).existAnyKey()) {
throw new PathAlreadyExistException(lockKey);
}
} finally {
lock.unlock();
}
} else {
lock.unlock();
throw new AcquireLockTimeoutException("acquire lock timeout: " + lockKey);
}
}
readWriteHandler.executeBatch(batch);
// TODO: update cache if necessary
} catch (RocksDBException | IOException e) {
throw new MetadataException(e);
} catch (InterruptedException e) {
logger.warn("Acquire lock interrupted", e);
Thread.currentThread().interrupt();
}
}
@Override
public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
PartialPath prefixPath = plan.getPrefixPath();
List<String> measurements = plan.getMeasurements();
List<TSDataType> dataTypes = plan.getDataTypes();
List<TSEncoding> encodings = plan.getEncodings();
try {
if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
createAlignedTimeSeries(prefixPath, measurements, dataTypes, encodings);
// update id table if not in recovering or disable id table log file
if (config.isEnableIDTable() && !config.isEnableIDTableLogFile()) {
IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPrefixPath());
idTable.createAlignedTimeseries(plan);
}
} else {
throw new AcquireLockTimeoutException(
"Acquire lock timeout when do createAlignedTimeSeries: " + prefixPath.getFullPath());
}
} catch (InterruptedException e) {
logger.warn("Acquire lock interrupted", e);
Thread.currentThread().interrupt();
} finally {
deleteUpdateLock.readLock().unlock();
}
}
private void createEntityRecursively(String[] nodes, int start, int end, boolean aligned)
throws RocksDBException, MetadataException, InterruptedException {
if (start <= end) {
// "ROOT" must exist
return;
}
String levelPath = RSchemaUtils.getLevelPath(nodes, start - 1);
Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
try {
CheckKeyResult checkResult = readWriteHandler.keyExistByAllTypes(levelPath);
if (!checkResult.existAnyKey()) {
createEntityRecursively(nodes, start - 1, end, aligned);
if (start == nodes.length) {
byte[] nodeKey = RSchemaUtils.toEntityNodeKey(levelPath);
byte[] value = aligned ? DEFAULT_ALIGNED_ENTITY_VALUE : DEFAULT_NODE_VALUE;
readWriteHandler.createNode(nodeKey, value);
} else {
readWriteHandler.createNode(levelPath, RMNodeType.INTERNAL, DEFAULT_NODE_VALUE);
}
} else {
if (start == nodes.length) {
// make sure sg node and entity node are different
// eg.,'root.a' is a storage group path, 'root.a.b' can not be a timeseries
if (checkResult.getResult(RMNodeType.STORAGE_GROUP)) {
throw new MetadataException("Storage Group Node and Entity Node could not be same!");
}
if (!checkResult.getResult(RMNodeType.ENTITY)) {
throw new MNodeTypeMismatchException(
RSchemaUtils.getPathByLevelPath(levelPath), MetadataConstant.ENTITY_MNODE_TYPE);
}
if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) == 0) {
throw new MetadataException(
"Timeseries under this entity is not aligned, please use createTimeseries or change entity. (Path: "
+ RSchemaUtils.getPathByLevelPath(levelPath)
+ ")");
}
} else if (checkResult.getResult(RMNodeType.MEASUREMENT)
|| checkResult.getResult(RMNodeType.ALISA)) {
throw new MNodeTypeMismatchException(
RSchemaUtils.getPathByLevelPath(levelPath), MetadataConstant.ENTITY_MNODE_TYPE);
}
}
} finally {
lock.unlock();
}
} else {
lock.unlock();
throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
}
}
@Override
public Pair<Integer, Set<String>> deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch)
throws MetadataException {
try {
if (deleteUpdateLock.writeLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
Set<String> failedNames = ConcurrentHashMap.newKeySet();
Set<IMNode> parentNeedsToCheck = ConcurrentHashMap.newKeySet();
AtomicInteger atomicInteger = new AtomicInteger(0);
traverseOutcomeBasins(
pathPattern.getNodes(),
MAX_PATH_DEPTH,
(key, value) -> {
String path = null;
RMeasurementMNode deletedNode;
try {
path = RSchemaUtils.getPathByInnerName(new String(key));
String[] nodes = PathUtils.splitPathToDetachedNodes(path);
deletedNode = new RMeasurementMNode(path, value, readWriteHandler);
atomicInteger.incrementAndGet();
try (WriteBatch batch = new WriteBatch()) {
// delete the last node of path
batch.delete(key);
if (deletedNode.getAlias() != null) {
String[] aliasNodes = Arrays.copyOf(nodes, nodes.length);
aliasNodes[nodes.length - 1] = deletedNode.getAlias();
String aliasLevelPath =
RSchemaUtils.getLevelPath(aliasNodes, aliasNodes.length - 1);
batch.delete(RSchemaUtils.toAliasNodeKey(aliasLevelPath));
}
if (deletedNode.getTags() != null && !deletedNode.getTags().isEmpty()) {
batch.delete(
readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS), key);
// TODO: tags invert index update
}
readWriteHandler.executeBatch(batch);
if (!deletedNode.getParent().isStorageGroup()) {
parentNeedsToCheck.add(deletedNode.getParent());
}
}
} catch (Exception e) {
logger.error("delete timeseries [{}] fail", path, e);
failedNames.add(path);
return false;
}
return true;
},
new Character[] {NODE_TYPE_MEASUREMENT});
// delete parent without child after timeseries deleted
while (true) {
if (parentNeedsToCheck.isEmpty()) {
break;
}
Set<IMNode> tempSet = ConcurrentHashMap.newKeySet();
parentNeedsToCheck
.parallelStream()
.forEach(
currentNode -> {
if (!currentNode.isStorageGroup()) {
PartialPath parentPath = currentNode.getPartialPath();
int level = parentPath.getNodeLength();
int end = parentPath.getNodeLength() - 1;
if (!readWriteHandler.existAnySiblings(
RSchemaUtils.getLevelPathPrefix(parentPath.getNodes(), end, level))) {
try {
readWriteHandler.deleteNode(
parentPath.getNodes(), RSchemaUtils.typeOfMNode(currentNode));
IMNode parentNode = currentNode.getParent();
if (!parentNode.isStorageGroup()) {
tempSet.add(currentNode.getParent());
}
} catch (Exception e) {
logger.warn("delete {} fail.", parentPath.getFullPath(), e);
}
}
}
});
parentNeedsToCheck.clear();
parentNeedsToCheck.addAll(tempSet);
}
return new Pair<>(atomicInteger.get(), failedNames);
} else {
throw new AcquireLockTimeoutException(
"acquire lock timeout when delete timeseries: " + pathPattern);
}
} catch (InterruptedException e) {
logger.warn("Acquire lock interrupted", e);
Thread.currentThread().interrupt();
return new Pair<>(0, null);
} finally {
deleteUpdateLock.writeLock().unlock();
}
}
@Override
public int constructSchemaBlackList(PathPatternTree patternTree) throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public void rollbackSchemaBlackList(PathPatternTree patternTree) throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public List<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree)
throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public void deleteTimeseriesInBlackList(PathPatternTree patternTree) throws MetadataException {
throw new UnsupportedOperationException();
}
private void traverseOutcomeBasins(
String[] nodes,
int maxLevel,
BiFunction<byte[], byte[], Boolean> function,
Character[] nodeTypeArray)
throws IllegalPathException {
List<String[]> allNodesArray = RSchemaUtils.replaceMultiWildcardToSingle(nodes, maxLevel);
allNodesArray.parallelStream().forEach(x -> traverseByPatternPath(x, function, nodeTypeArray));
}
private void traverseByPatternPath(
String[] nodes, BiFunction<byte[], byte[], Boolean> function, Character[] nodeTypeArray) {
int startIndex = 0;
List<String[]> scanKeys = new ArrayList<>();
int indexOfPrefix = indexOfFirstWildcard(nodes, startIndex);
if (indexOfPrefix >= nodes.length) {
Arrays.stream(nodeTypeArray)
.parallel()
.forEach(
x -> {
String levelPrefix =
RSchemaUtils.convertPartialPathToInnerByNodes(nodes, nodes.length - 1, x);
try {
Holder<byte[]> holder = new Holder<>();
readWriteHandler.keyExist(levelPrefix.getBytes(), holder);
if (holder.getValue() != null) {
function.apply(levelPrefix.getBytes(), holder.getValue());
}
} catch (RocksDBException e) {
logger.error(e.getMessage());
}
});
return;
}
startIndex = indexOfPrefix;
String[] seedPath = ArrayUtils.subarray(nodes, 0, indexOfPrefix);
scanKeys.add(seedPath);
while (!scanKeys.isEmpty()) {
int firstNonWildcardIndex = indexOfFirstNonWildcard(nodes, startIndex);
int nextFirstWildcardIndex = indexOfFirstWildcard(nodes, firstNonWildcardIndex);
startIndex = nextFirstWildcardIndex;
int level = nextFirstWildcardIndex - 1;
boolean lastIteration = nextFirstWildcardIndex >= nodes.length;
Character[] nodeType;
if (!lastIteration) {
nodeType = ALL_NODE_TYPE_ARRAY;
} else {
nodeType = nodeTypeArray;
}
Queue<String[]> tempNodes = new ConcurrentLinkedQueue<>();
byte[] suffixToMatch =
RSchemaUtils.getSuffixOfLevelPath(
ArrayUtils.subarray(nodes, firstNonWildcardIndex, nextFirstWildcardIndex), level);
scanKeys
.parallelStream()
.forEach(
prefixNodes -> {
String levelPrefix =
RSchemaUtils.getLevelPathPrefix(prefixNodes, prefixNodes.length - 1, level);
Arrays.stream(nodeType)
.parallel()
.forEach(
x -> {
byte[] startKey = RSchemaUtils.toRocksDBKey(levelPrefix, x);
RocksIterator iterator = readWriteHandler.iterator(null);
iterator.seek(startKey);
while (iterator.isValid()) {
if (!RSchemaUtils.prefixMatch(iterator.key(), startKey)) {
break;
}
if (RSchemaUtils.suffixMatch(iterator.key(), suffixToMatch)) {
if (lastIteration) {
function.apply(iterator.key(), iterator.value());
} else {
tempNodes.add(RSchemaUtils.toMetaNodes(iterator.key()));
}
}
iterator.next();
}
});
});
scanKeys.clear();
scanKeys.addAll(tempNodes);
tempNodes.clear();
}
}
private int indexOfFirstWildcard(String[] nodes, int start) {
int index = start;
for (; index < nodes.length; index++) {
if (IoTDBConstant.ONE_LEVEL_PATH_WILDCARD.equals(nodes[index])
|| IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD.equals(nodes[index])) {
break;
}
}
return index;
}
private int indexOfFirstNonWildcard(String[] nodes, int start) {
int index = start;
for (; index < nodes.length; index++) {
if (!IoTDBConstant.ONE_LEVEL_PATH_WILDCARD.equals(nodes[index])
&& !IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD.equals(nodes[index])) {
break;
}
}
return index;
}
protected Pair<Integer, Set<String>> deleteTimeseries(PartialPath pathPattern)
throws MetadataException {
return deleteTimeseries(pathPattern, false);
}
private IMNode getDeviceNodeWithAutoCreate(PartialPath devicePath, boolean autoCreateSchema)
throws MetadataException {
IMNode node = null;
try {
node = getDeviceNode(devicePath);
return node;
} catch (PathNotExistException e) {
if (!config.isAutoCreateSchemaEnabled()) {
throw new PathNotExistException(devicePath.getFullPath());
}
try {
createEntityRecursively(
devicePath.getNodes(), devicePath.getNodeLength(), storageGroupPathLevel, false);
node = getDeviceNode(devicePath);
} catch (RocksDBException ex) {
throw new MetadataException(ex);
} catch (InterruptedException ex) {
logger.warn("Acquire lock interrupted", ex);
Thread.currentThread().interrupt();
}
}
return node;
}
@Override
public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public boolean isPathExist(PartialPath path) throws MetadataException {
if (IoTDBConstant.PATH_ROOT.equals(path.getFullPath())) {
return true;
}
String innerPathName = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
try {
CheckKeyResult checkKeyResult =
readWriteHandler.keyExistByTypes(innerPathName, RMNodeType.values());
if (checkKeyResult.existAnyKey()) {
return true;
}
} catch (RocksDBException e) {
throw new MetadataException(e);
}
return false;
}
@Override
public int getAllTimeseriesCount(PartialPath pathPattern, boolean isPrefixMatch)
throws MetadataException {
return getCountByNodeType(new Character[] {NODE_TYPE_MEASUREMENT}, pathPattern.getNodes());
}
@Override
public int getAllTimeseriesCount(
PartialPath pathPattern, Map<Integer, Template> templateMap, boolean isPrefixMatch)
throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public int getAllTimeseriesCount(
PartialPath pathPattern, boolean isPrefixMatch, String key, String value, boolean isContains)
throws MetadataException {
return getMatchedMeasurementPathWithTags(pathPattern.getNodes()).size();
}
@TestOnly
public int getAllTimeseriesCount(PartialPath pathPattern) throws MetadataException {
return getAllTimeseriesCount(pathPattern, false);
}
@Override
public int getDevicesNum(PartialPath pathPattern, boolean isPrefixMatch)
throws MetadataException {
return getCountByNodeType(new Character[] {NODE_TYPE_ENTITY}, pathPattern.getNodes());
}
private int getCountByNodeType(Character[] nodetype, String[] nodes) throws IllegalPathException {
AtomicInteger atomicInteger = new AtomicInteger(0);
BiFunction<byte[], byte[], Boolean> function =
(a, b) -> {
atomicInteger.incrementAndGet();
return true;
};
traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, nodetype);
return atomicInteger.get();
}
@Override
public int getNodesCountInGivenLevel(PartialPath pathPattern, int level, boolean isPrefixMatch)
throws MetadataException {
// todo support wildcard
if (pathPattern.getFullPath().contains(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
throw new UnsupportedOperationException(
"Wildcards are not currently supported for this operation"
+ " [COUNT NODES pathPattern].");
}
String innerNameByLevel =
RSchemaUtils.getLevelPath(pathPattern.getNodes(), pathPattern.getNodeLength() - 1, level);
AtomicInteger atomicInteger = new AtomicInteger(0);
Function<String, Boolean> function =
s -> {
atomicInteger.incrementAndGet();
return true;
};
Arrays.stream(ALL_NODE_TYPE_ARRAY)
.parallel()
.forEach(
x -> {
String getKeyByInnerNameLevel =
x + innerNameByLevel + RSchemaConstants.PATH_SEPARATOR + level;
readWriteHandler.getKeyByPrefix(getKeyByInnerNameLevel, function);
});
return atomicInteger.get();
}
@Override
public Map<PartialPath, Integer> getMeasurementCountGroupByLevel(
PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
Map<PartialPath, Integer> result = new ConcurrentHashMap<>();
BiFunction<byte[], byte[], Boolean> function =
(a, b) -> {
String key = new String(a);
String partialName = splitToPartialNameByLevel(key, level);
if (partialName != null) {
PartialPath path = null;
try {
path = new PartialPath(partialName);
} catch (IllegalPathException e) {
logger.warn(e.getMessage());
}
result.putIfAbsent(path, 0);
result.put(path, result.get(path) + 1);
}
return true;
};
traverseOutcomeBasins(
pathPattern.getNodes(), MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
return result;
}
@Override
public Map<PartialPath, Integer> getMeasurementCountGroupByLevel(
PartialPath pathPattern,
int level,
boolean isPrefixMatch,
String key,
String value,
boolean isContains)
throws MetadataException {
Map<PartialPath, Integer> result = new ConcurrentHashMap<>();
Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>> measurementPathsAndTags =
getMatchedMeasurementPathWithTags(pathPattern.getNodes());
BiFunction<byte[], byte[], Boolean> function;
if (!measurementPathsAndTags.isEmpty()) {
function =
(a, b) -> {
String k = new String(a);
String partialName = splitToPartialNameByLevel(k, level);
if (partialName != null) {
PartialPath path = null;
try {
path = new PartialPath(partialName);
} catch (IllegalPathException e) {
logger.warn(e.getMessage());
}
if (!measurementPathsAndTags.keySet().contains(partialName)) {
result.put(path, result.get(path));
} else {
result.putIfAbsent(path, 0);
result.put(path, result.get(path) + 1);
}
}
return true;
};
} else {
function =
(a, b) -> {
String k = new String(a);
String partialName = splitToPartialNameByLevel(k, level);
if (partialName != null) {
PartialPath path = null;
try {
path = new PartialPath(partialName);
} catch (IllegalPathException e) {
logger.warn(e.getMessage());
}
result.putIfAbsent(path, 0);
}
return true;
};
}
traverseOutcomeBasins(
pathPattern.getNodes(), MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
return result;
}
private String splitToPartialNameByLevel(String innerName, int level) {
StringBuilder stringBuilder = new StringBuilder(ROOT_STRING);
boolean currentIsFlag;
boolean lastIsFlag = false;
int j = 0;
for (int i = 0; i < innerName.length() && j <= level; i++) {
currentIsFlag = innerName.charAt(i) == '.';
if (currentIsFlag) {
j++;
currentIsFlag = true;
}
if (j <= 0 || lastIsFlag || (currentIsFlag && j > level)) {
lastIsFlag = false;
continue;
}
stringBuilder.append(innerName.charAt(i));
lastIsFlag = currentIsFlag;
}
if (j < level) {
return null;
}
return stringBuilder.toString();
}
@Override
public List<PartialPath> getNodesListInGivenLevel(
PartialPath pathPattern, int nodeLevel, boolean isPrefixMatch, StorageGroupFilter filter)
throws MetadataException {
if (pathPattern.getFullPath().contains(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
throw new UnsupportedOperationException(
formatNotSupportInfo(Thread.currentThread().getStackTrace()[1].getMethodName()));
}
return getNodesListInGivenLevel(pathPattern, nodeLevel);
}
private String formatNotSupportInfo(String methodName) {
return String.format("[%s] is not currently supported!", methodName);
}
private List<PartialPath> getNodesListInGivenLevel(PartialPath pathPattern, int nodeLevel) {
List<PartialPath> result = Collections.synchronizedList(new ArrayList<>());
Arrays.stream(ALL_NODE_TYPE_ARRAY)
.forEach(
x -> {
if (x == NODE_TYPE_ALIAS) {
return;
}
String innerName =
RSchemaUtils.convertPartialPathToInnerByNodes(
pathPattern.getNodes(), nodeLevel, x);
readWriteHandler
.getAllByPrefix(innerName)
.forEach(
resultByPrefix -> {
try {
result.add(
new PartialPath(RSchemaUtils.getPathByInnerName(resultByPrefix)));
} catch (IllegalPathException e) {
logger.warn(e.getMessage());
}
});
});
return result;
}
@Override
public Set<TSchemaNode> getChildNodePathInNextLevel(PartialPath pathPattern)
throws MetadataException {
// todo support wildcard
if (pathPattern.getFullPath().contains(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
throw new MetadataException(
"Wildcards are not currently supported for this operation"
+ " [SHOW CHILD PATHS pathPattern].");
}
Set<TSchemaNode> result = Collections.synchronizedSet(new HashSet<>());
String innerNameByLevel =
RSchemaUtils.getLevelPath(
pathPattern.getNodes(),
pathPattern.getNodeLength() - 1,
pathPattern.getNodeLength())
+ RSchemaConstants.PATH_SEPARATOR
+ pathPattern.getNodeLength();
Function<String, Boolean> function =
s -> {
result.add(
new TSchemaNode(
RSchemaUtils.getPathByInnerName(s), MNodeType.UNIMPLEMENT.getNodeType()));
return true;
};
Arrays.stream(ALL_NODE_TYPE_ARRAY)
.parallel()
.forEach(x -> readWriteHandler.getKeyByPrefix(x + innerNameByLevel, function));
return result;
}
@Override
public Set<String> getChildNodeNameInNextLevel(PartialPath pathPattern) throws MetadataException {
Set<String> childPath =
getChildNodePathInNextLevel(pathPattern).stream()
.map(node -> node.getNodeName())
.collect(Collectors.toSet());
Set<String> childName = new HashSet<>();
for (String str : childPath) {
childName.add(str.substring(str.lastIndexOf(RSchemaConstants.PATH_SEPARATOR) + 1));
}
return childName;
}
@Override
public Set<PartialPath> getBelongedDevices(PartialPath timeseries) throws MetadataException {
Set<PartialPath> result = Collections.synchronizedSet(new HashSet<>());
BiFunction<byte[], byte[], Boolean> function =
(a, b) -> {
String path = new String(a);
PartialPath partialPath;
try {
partialPath =
new PartialPath(path.substring(0, path.lastIndexOf(IoTDBConstant.PATH_SEPARATOR)));
} catch (IllegalPathException e) {
return false;
}
result.add(partialPath);
return true;
};
traverseOutcomeBasins(
timeseries.getNodes(), MAX_PATH_DEPTH, function, new Character[NODE_TYPE_ENTITY]);
return result;
}
@Override
public Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
throws MetadataException {
Set<PartialPath> allPath = new HashSet<>();
getMatchedPathByNodeType(pathPattern.getNodes(), new Character[] {NODE_TYPE_ENTITY}, allPath);
return allPath;
}
private void getMatchedPathByNodeType(
String[] nodes, Character[] nodetype, Collection<PartialPath> collection)
throws IllegalPathException {
List<String> allResult = Collections.synchronizedList(new ArrayList<>());
BiFunction<byte[], byte[], Boolean> function =
(a, b) -> {
allResult.add(RSchemaUtils.getPathByInnerName(new String(a)));
return true;
};
traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, nodetype);
for (String path : allResult) {
collection.add(new PartialPath(path));
}
}
@Override
public Pair<List<ShowDevicesResult>, Integer> getMatchedDevices(ShowDevicesPlan plan)
throws MetadataException {
List<ShowDevicesResult> res = Collections.synchronizedList(new ArrayList<>());
BiFunction<byte[], byte[], Boolean> function =
(a, b) -> {
String fullPath = RSchemaUtils.getPathByInnerName(new String(a));
res.add(new ShowDevicesResult(fullPath, RSchemaUtils.isAligned(b), storageGroupFullPath));
return true;
};
traverseOutcomeBasins(
plan.getPath().getNodes(), MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_ENTITY});
// todo Page query, record offset
return new Pair<>(res, 1);
}
@Override
public List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMath)
throws MetadataException {
List<MeasurementPath> allResult = Collections.synchronizedList(new ArrayList<>());
BiFunction<byte[], byte[], Boolean> function =
(a, b) -> {
allResult.add(
new RMeasurementMNode(
RSchemaUtils.getPathByInnerName(new String(a)), b, readWriteHandler)
.getMeasurementPath());
return true;
};
traverseOutcomeBasins(
pathPattern.getNodes(), MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
return allResult;
}
@Override
public Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
throws MetadataException {
// todo page query
return new Pair<>(getMeasurementPaths(pathPattern, false), offset + limit);
}
@Override
public List<MeasurementPath> fetchSchema(
PartialPath pathPattern, Map<Integer, Template> templateMap) throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public Pair<List<ShowTimeSeriesResult>, Integer> showTimeseries(
ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
if (plan.getKey() != null && plan.getValue() != null) {
return showTimeseriesWithIndex(plan, context);
} else {
return showTimeseriesWithoutIndex(plan, context);
}
}
private Pair<List<ShowTimeSeriesResult>, Integer> showTimeseriesWithIndex(
ShowTimeSeriesPlan plan, QueryContext context) {
// temporarily unsupported
throw new UnsupportedOperationException(
formatNotSupportInfo(Thread.currentThread().getStackTrace()[1].getMethodName()));
}
private Pair<List<ShowTimeSeriesResult>, Integer> showTimeseriesWithoutIndex(
ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
List<ShowTimeSeriesResult> res = new LinkedList<>();
Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>> measurementPathsAndTags =
getMatchedMeasurementPathWithTags(plan.getPath().getNodes());
for (Entry<MeasurementPath, Pair<Map<String, String>, Map<String, String>>> entry :
measurementPathsAndTags.entrySet()) {
MeasurementPath measurementPath = entry.getKey();
res.add(
new ShowTimeSeriesResult(
measurementPath.getFullPath(),
measurementPath.getMeasurementAlias(),
storageGroupFullPath,
measurementPath.getMeasurementSchema().getType(),
measurementPath.getMeasurementSchema().getEncodingType(),
measurementPath.getMeasurementSchema().getCompressor(),
0,
entry.getValue().left,
entry.getValue().right));
}
// todo Page query, record offset
return new Pair<>(res, 1);
}
private Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>>
getMatchedMeasurementPathWithTags(String[] nodes) throws IllegalPathException {
Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>> allResult =
new ConcurrentHashMap<>();
BiFunction<byte[], byte[], Boolean> function =
(a, b) -> {
MeasurementPath measurementPath =
new RMeasurementMNode(
RSchemaUtils.getPathByInnerName(new String(a)), b, readWriteHandler)
.getMeasurementPath();
Object tag = RSchemaUtils.parseNodeValue(b, RMNodeValueType.TAGS);
if (!(tag instanceof Map)) {
tag = Collections.emptyMap();
}
Object attributes = RSchemaUtils.parseNodeValue(b, RMNodeValueType.ATTRIBUTES);
if (!(attributes instanceof Map)) {
attributes = Collections.emptyMap();
}
@SuppressWarnings("unchecked")
Pair<Map<String, String>, Map<String, String>> tagsAndAttributes =
new Pair<>((Map<String, String>) tag, (Map<String, String>) attributes);
allResult.put(measurementPath, tagsAndAttributes);
return true;
};
traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
return allResult;
}
@Override
public List<MeasurementPath> getAllMeasurementByDevicePath(PartialPath devicePath)
throws PathNotExistException {
List<MeasurementPath> result = new ArrayList<>();
String nextLevelPathName =
RSchemaUtils.convertPartialPathToInner(
devicePath.getFullPath(), devicePath.getNodeLength() + 1, NODE_TYPE_MEASUREMENT);
Map<byte[], byte[]> allMeasurementPath =
readWriteHandler.getKeyValueByPrefix(nextLevelPathName);
for (Map.Entry<byte[], byte[]> entry : allMeasurementPath.entrySet()) {
PartialPath pathName;
try {
pathName = new PartialPath(new String(entry.getKey()));
} catch (IllegalPathException e) {
throw new PathNotExistException(e.getMessage());
}
MeasurementSchema measurementSchema =
(MeasurementSchema) RSchemaUtils.parseNodeValue(entry.getValue(), RMNodeValueType.SCHEMA);
result.add(new MeasurementPath(pathName, measurementSchema));
}
return result;
}
@Override
public IMNode getDeviceNode(PartialPath path) throws MetadataException {
String[] nodes = path.getNodes();
String levelPath = RSchemaUtils.getLevelPath(nodes, nodes.length - 1);
Holder<byte[]> holder = new Holder<>();
try {
if (readWriteHandler.keyExistByType(levelPath, RMNodeType.ENTITY, holder)) {
return new REntityMNode(path.getFullPath(), holder.getValue(), readWriteHandler);
} else {
throw new PathNotExistException(path.getFullPath());
}
} catch (RocksDBException e) {
throw new MetadataException(e);
}
}
@Override
public IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException {
String[] nodes = fullPath.getNodes();
String key = RSchemaUtils.getLevelPath(nodes, nodes.length - 1);
IMeasurementMNode node = null;
try {
Holder<byte[]> holder = new Holder<>();
if (readWriteHandler.keyExistByType(key, RMNodeType.MEASUREMENT, holder)) {
node = new RMeasurementMNode(fullPath.getFullPath(), holder.getValue(), readWriteHandler);
} else if (readWriteHandler.keyExistByType(key, RMNodeType.ALISA, holder)) {
byte[] aliasValue = holder.getValue();
if (aliasValue != null) {
ByteBuffer byteBuffer = ByteBuffer.wrap(aliasValue);
ReadWriteIOUtils.readBytes(byteBuffer, 3);
byte[] oriKey = RSchemaUtils.readOriginKey(byteBuffer);
node =
new RMeasurementMNode(
fullPath.getFullPath(), readWriteHandler.get(null, oriKey), readWriteHandler);
}
}
return node;
} catch (RocksDBException e) {
throw new MetadataException(e);
}
}
@Override
public void changeAlias(PartialPath path, String alias) throws MetadataException, IOException {
upsertTagsAndAttributes(alias, null, null, path);
}
@Override
public void upsertTagsAndAttributes(
String alias,
Map<String, String> tagsMap,
Map<String, String> attributesMap,
PartialPath path)
throws MetadataException, IOException {
try {
if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
byte[] originKey = RSchemaUtils.toMeasurementNodeKey(levelPath);
try {
Lock rawKeyLock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
if (rawKeyLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
try {
boolean hasUpdate = false;
String[] nodes = path.getNodes();
RMeasurementMNode mNode = (RMeasurementMNode) getMeasurementMNode(path);
// upsert alias
if (StringUtils.isNotEmpty(alias)
&& (StringUtils.isEmpty(mNode.getAlias()) || !mNode.getAlias().equals(alias))) {
String oldAliasStr = mNode.getAlias();
mNode.setAlias(alias);
hasUpdate = true;
String[] newAlias = Arrays.copyOf(nodes, nodes.length);
newAlias[nodes.length - 1] = alias;
String newAliasLevel = RSchemaUtils.getLevelPath(newAlias, newAlias.length - 1);
byte[] newAliasKey = RSchemaUtils.toAliasNodeKey(newAliasLevel);
Lock newAliasLock =
locksPool.computeIfAbsent(newAliasLevel, x -> new ReentrantLock());
Lock oldAliasLock = null;
try (WriteBatch batch = new WriteBatch()) {
if (newAliasLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
if (readWriteHandler.keyExistByAllTypes(newAliasLevel).existAnyKey()) {
throw new PathAlreadyExistException("Alias node has exist: " + newAliasLevel);
}
batch.put(newAliasKey, RSchemaUtils.buildAliasNodeValue(originKey));
if (StringUtils.isNotEmpty(oldAliasStr)) {
String[] oldAliasNodes = Arrays.copyOf(nodes, nodes.length);
oldAliasNodes[nodes.length - 1] = oldAliasStr;
String oldAliasLevel =
RSchemaUtils.getLevelPath(oldAliasNodes, oldAliasNodes.length - 1);
byte[] oldAliasKey = RSchemaUtils.toAliasNodeKey(oldAliasLevel);
oldAliasLock =
locksPool.computeIfAbsent(oldAliasLevel, x -> new ReentrantLock());
if (oldAliasLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
if (!readWriteHandler.keyExist(oldAliasKey)) {
logger.error(
"origin node [{}] has alias but alias node [{}] doesn't exist ",
levelPath,
oldAliasLevel);
}
batch.delete(oldAliasKey);
} else {
throw new AcquireLockTimeoutException(
"acquire lock timeout: " + oldAliasLevel);
}
}
} else {
throw new AcquireLockTimeoutException("acquire lock timeout: " + newAliasLevel);
}
// TODO: need application lock
readWriteHandler.executeBatch(batch);
} finally {
newAliasLock.unlock();
if (oldAliasLock != null) {
oldAliasLock.unlock();
}
}
}
try (WriteBatch batch = new WriteBatch()) {
if (tagsMap != null && !tagsMap.isEmpty()) {
if (mNode.getTags() == null) {
mNode.setTags(tagsMap);
} else {
mNode.getTags().putAll(tagsMap);
}
batch.put(
readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS),
originKey,
DEFAULT_NODE_VALUE);
hasUpdate = true;
}
if (attributesMap != null && !attributesMap.isEmpty()) {
if (mNode.getAttributes() == null) {
mNode.setAttributes(attributesMap);
} else {
mNode.getAttributes().putAll(attributesMap);
}
hasUpdate = true;
}
if (hasUpdate) {
batch.put(originKey, mNode.getRocksDBValue());
readWriteHandler.executeBatch(batch);
}
}
} finally {
rawKeyLock.unlock();
}
} else {
rawKeyLock.unlock();
throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
}
} catch (RocksDBException e) {
throw new MetadataException(e);
}
} else {
throw new AcquireLockTimeoutException(
"acquire lock timeout when do upsertTagsAndAttributes: " + path.getFullPath());
}
} catch (InterruptedException e) {
logger.warn("Acquire lock interrupted", e);
Thread.currentThread().interrupt();
} finally {
deleteUpdateLock.readLock().unlock();
}
}
@Override
public void addAttributes(Map<String, String> attributesMap, PartialPath path)
throws MetadataException, IOException {
if (attributesMap == null || attributesMap.isEmpty()) {
return;
}
try {
if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
Holder<byte[]> holder = new Holder<>();
try {
Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
try {
if (!readWriteHandler.keyExist(key, holder)) {
throw new PathNotExistException(path.getFullPath());
}
byte[] originValue = holder.getValue();
RMeasurementMNode mNode =
new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
if (mNode.getAttributes() != null) {
for (Map.Entry<String, String> entry : attributesMap.entrySet()) {
if (mNode.getAttributes().containsKey(entry.getKey())) {
throw new MetadataException(
String.format(
"TimeSeries [%s] already has the attribute [%s].",
path, new String(key)));
}
}
attributesMap.putAll(mNode.getAttributes());
}
mNode.setAttributes(attributesMap);
readWriteHandler.updateNode(key, mNode.getRocksDBValue());
} finally {
lock.unlock();
}
} else {
lock.unlock();
throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
}
} catch (RocksDBException e) {
throw new MetadataException(e);
}
} else {
throw new AcquireLockTimeoutException(
"acquire lock timeout when do addAttributes: " + path.getFullPath());
}
} catch (InterruptedException e) {
logger.warn("Acquire lock interrupted", e);
Thread.currentThread().interrupt();
} finally {
deleteUpdateLock.readLock().unlock();
}
}
@Override
public void addTags(Map<String, String> tagsMap, PartialPath path)
throws MetadataException, IOException {
if (tagsMap == null || tagsMap.isEmpty()) {
return;
}
try {
if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
Holder<byte[]> holder = new Holder<>();
try {
Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
try {
if (!readWriteHandler.keyExist(key, holder)) {
throw new PathNotExistException(path.getFullPath());
}
byte[] originValue = holder.getValue();
RMeasurementMNode mNode =
new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
boolean hasTags = false;
if (mNode.getTags() != null && mNode.getTags().size() > 0) {
hasTags = true;
for (Map.Entry<String, String> entry : tagsMap.entrySet()) {
if (mNode.getTags().containsKey(entry.getKey())) {
throw new MetadataException(
String.format(
"TimeSeries [%s] already has the tag [%s].", path, new String(key)));
}
tagsMap.putAll(mNode.getTags());
}
}
mNode.setTags(tagsMap);
try (WriteBatch batch = new WriteBatch()) {
if (!hasTags) {
batch.put(
readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS),
key,
DEFAULT_NODE_VALUE);
}
batch.put(key, mNode.getRocksDBValue());
// TODO: need application lock
readWriteHandler.executeBatch(batch);
}
} finally {
lock.unlock();
}
} else {
lock.unlock();
throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
}
} catch (RocksDBException e) {
throw new MetadataException(e);
}
} else {
throw new AcquireLockTimeoutException(
"acquire lock timeout when do addTags: " + path.getFullPath());
}
} catch (InterruptedException e) {
logger.warn("Acquire lock interrupted", e);
Thread.currentThread().interrupt();
} finally {
deleteUpdateLock.readLock().unlock();
}
}
@Override
public void dropTagsOrAttributes(Set<String> keySet, PartialPath path)
throws MetadataException, IOException {
if (keySet == null || keySet.isEmpty()) {
return;
}
try {
if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
Holder<byte[]> holder = new Holder<>();
try {
Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
try {
if (!readWriteHandler.keyExist(key, holder)) {
throw new PathNotExistException(path.getFullPath());
}
byte[] originValue = holder.getValue();
RMeasurementMNode mNode =
new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
int tagLen = mNode.getTags() == null ? 0 : mNode.getTags().size();
boolean didAnyUpdate = false;
for (String toDelete : keySet) {
didAnyUpdate = false;
if (mNode.getTags() != null && mNode.getTags().containsKey(toDelete)) {
mNode.getTags().remove(toDelete);
didAnyUpdate = true;
}
if (mNode.getAttributes() != null && mNode.getAttributes().containsKey(toDelete)) {
mNode.getAttributes().remove(toDelete);
didAnyUpdate = true;
}
if (!didAnyUpdate) {
logger.warn("TimeSeries [{}] does not have tag/attribute [{}]", path, toDelete);
}
}
if (didAnyUpdate) {
try (WriteBatch batch = new WriteBatch()) {
if (tagLen > 0 && mNode.getTags().size() <= 0) {
batch.delete(
readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS), key);
}
batch.put(key, mNode.getRocksDBValue());
readWriteHandler.executeBatch(batch);
}
}
} finally {
lock.unlock();
}
} else {
lock.unlock();
throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
}
} catch (RocksDBException e) {
throw new MetadataException(e);
}
} else {
throw new AcquireLockTimeoutException(
"acquire lock timeout when do dropTagsOrAttributes: " + path.getFullPath());
}
} catch (InterruptedException e) {
logger.warn("Acquire lock interrupted", e);
Thread.currentThread().interrupt();
} finally {
deleteUpdateLock.readLock().unlock();
}
}
@Override
public void setTagsOrAttributesValue(Map<String, String> alterMap, PartialPath path)
throws MetadataException, IOException {
if (alterMap == null || alterMap.isEmpty()) {
return;
}
try {
if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
Holder<byte[]> holder = new Holder<>();
try {
Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
try {
if (!readWriteHandler.keyExist(key, holder)) {
throw new PathNotExistException(path.getFullPath());
}
byte[] originValue = holder.getValue();
RMeasurementMNode mNode =
new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
boolean didAnyUpdate = false;
for (Map.Entry<String, String> entry : alterMap.entrySet()) {
didAnyUpdate = false;
if (mNode.getTags() != null && mNode.getTags().containsKey(entry.getKey())) {
mNode.getTags().put(entry.getKey(), entry.getValue());
didAnyUpdate = true;
}
if (mNode.getAttributes() != null
&& mNode.getAttributes().containsKey(entry.getKey())) {
mNode.getAttributes().put(entry.getKey(), entry.getValue());
didAnyUpdate = true;
}
if (!didAnyUpdate) {
throw new MetadataException(
String.format(
"TimeSeries [%s] does not have tag/attribute [%s].",
path, new String(key)),
true);
}
}
if (didAnyUpdate) {
readWriteHandler.updateNode(key, mNode.getRocksDBValue());
}
} finally {
lock.unlock();
}
} else {
lock.unlock();
throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
}
} catch (RocksDBException e) {
throw new MetadataException(e);
}
} else {
throw new AcquireLockTimeoutException(
"acquire lock timeout when do setTagsOrAttributesValue: " + path.getFullPath());
}
} catch (InterruptedException e) {
logger.warn("Acquire lock interrupted", e);
Thread.currentThread().interrupt();
} finally {
deleteUpdateLock.readLock().unlock();
}
}
@Override
public void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath path)
throws MetadataException, IOException {
if (StringUtils.isEmpty(oldKey) || StringUtils.isEmpty(newKey)) {
return;
}
try {
if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
byte[] nodeKey = RSchemaUtils.toMeasurementNodeKey(levelPath);
Holder<byte[]> holder = new Holder<>();
try {
Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
try {
if (!readWriteHandler.keyExist(nodeKey, holder)) {
throw new PathNotExistException(path.getFullPath());
}
byte[] originValue = holder.getValue();
RMeasurementMNode mNode =
new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
boolean didAnyUpdate = false;
if (mNode.getTags() != null && mNode.getTags().containsKey(oldKey)) {
String value = mNode.getTags().get(oldKey);
mNode.getTags().remove(oldKey);
mNode.getTags().put(newKey, value);
didAnyUpdate = true;
}
if (mNode.getAttributes() != null && mNode.getAttributes().containsKey(oldKey)) {
String value = mNode.getAttributes().get(oldKey);
mNode.getAttributes().remove(oldKey);
mNode.getAttributes().put(newKey, value);
didAnyUpdate = true;
}
if (didAnyUpdate) {
readWriteHandler.updateNode(nodeKey, mNode.getRocksDBValue());
} else {
throw new MetadataException(
String.format(
"TimeSeries [%s] does not have tag/attribute [%s].", path, oldKey),
true);
}
} finally {
lock.unlock();
}
} else {
lock.unlock();
throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
}
} catch (RocksDBException e) {
throw new MetadataException(e);
}
} else {
throw new AcquireLockTimeoutException(
"acquire lock timeout when do renameTagOrAttributeKey: " + path.getFullPath());
}
} catch (InterruptedException e) {
logger.warn("Acquire lock interrupted", e);
Thread.currentThread().interrupt();
} finally {
deleteUpdateLock.readLock().unlock();
}
}
@Override
public IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan)
throws MetadataException, IOException {
// devicePath is a logical path which is parent of measurement, whether in template or not
PartialPath devicePath = plan.getDevicePath();
String[] measurementList = plan.getMeasurements();
IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
IMNode deviceMNode = getDeviceNodeWithAutoCreate(devicePath, plan.isAligned());
if (deviceMNode == null) {
throw new MetadataException(
String.format("Failed to create deviceMNode,device path:[%s]", plan.getDevicePath()));
}
// check insert non-aligned InsertPlan for aligned timeseries
if (deviceMNode.isEntity()) {
if (plan.isAligned() && !deviceMNode.getAsEntityMNode().isAligned()) {
throw new MetadataException(
String.format(
"Timeseries under path [%s] is not aligned , please set"
+ " InsertPlan.isAligned() = false",
plan.getDevicePath()));
}
if (!plan.isAligned() && deviceMNode.getAsEntityMNode().isAligned()) {
throw new MetadataException(
String.format(
"Timeseries under path [%s] is aligned , please set"
+ " InsertPlan.isAligned() = true",
plan.getDevicePath()));
}
}
// get node for each measurement
Map<Integer, IMeasurementMNode> nodeMap = new HashMap<>();
Map<Integer, PartialPath> missingNodeIndex = new HashMap<>();
for (int i = 0; i < measurementList.length; i++) {
PartialPath path = new PartialPath(devicePath.getFullPath(), measurementList[i]);
IMeasurementMNode node = getMeasurementMNode(path);
if (node == null) {
if (!config.isAutoCreateSchemaEnabled()) {
throw new PathNotExistException(path.getFullPath());
}
missingNodeIndex.put(i, path);
} else {
nodeMap.put(i, node);
}
}
// create missing nodes
if (!missingNodeIndex.isEmpty()) {
if (!(plan instanceof InsertRowPlan) && !(plan instanceof InsertTabletPlan)) {
throw new MetadataException(
String.format(
"Only support insertRow and insertTablet, plan is [%s]", plan.getOperatorType()));
}
if (plan.isAligned()) {
List<String> measurements = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
List<TSEncoding> encodings = new ArrayList<>();
for (Integer index : missingNodeIndex.keySet()) {
measurements.add(measurementList[index]);
TSDataType type = plan.getDataTypes()[index];
dataTypes.add(type);
encodings.add(EncodingInferenceUtils.getDefaultEncoding(type));
}
createAlignedTimeSeries(devicePath, measurements, dataTypes, encodings);
} else {
for (Map.Entry<Integer, PartialPath> entry : missingNodeIndex.entrySet()) {
IMeasurementSchema schema =
new MeasurementSchema(
entry.getValue().getMeasurement(), plan.getDataTypes()[entry.getKey()]);
createTimeseries(entry.getValue(), schema, null, null, null);
}
}
// get the latest node
for (Entry<Integer, PartialPath> entry : missingNodeIndex.entrySet()) {
nodeMap.put(entry.getKey(), getMeasurementMNode(entry.getValue()));
}
}
// check datatype
for (int i = 0; i < measurementList.length; i++) {
try {
// check type is match
if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) {
try {
SchemaRegionUtils.checkDataTypeMatch(plan, i, nodeMap.get(i).getSchema().getType());
} catch (DataTypeMismatchException mismatchException) {
logger.warn(mismatchException.getMessage());
if (!config.isEnablePartialInsert()) {
throw mismatchException;
} else {
// mark failed measurement
plan.markFailedMeasurementInsertion(i, mismatchException);
continue;
}
}
measurementMNodes[i] = nodeMap.get(i);
// set measurementName instead of alias
measurementList[i] = nodeMap.get(i).getName();
}
} catch (MetadataException e) {
if (config.isClusterMode()) {
logger.debug(
"meet error when check {}.{}, message: {}",
devicePath,
measurementList[i],
e.getMessage());
} else {
logger.warn(
"meet error when check {}.{}, message: {}",
devicePath,
measurementList[i],
e.getMessage());
}
if (config.isEnablePartialInsert()) {
// mark failed measurement
plan.markFailedMeasurementInsertion(i, e);
} else {
throw e;
}
}
}
return deviceMNode;
}
@Override
public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
PartialPath devicePath,
String[] measurements,
Function<Integer, TSDataType> getDataType,
TSEncoding[] encodings,
CompressionType[] compressionTypes,
boolean aligned)
throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
try {
readWriteHandler.close();
} catch (RocksDBException e) {
logger.error("Failed to close readWriteHandler,try again.", e);
try {
Thread.sleep(5);
readWriteHandler.close();
} catch (RocksDBException e1) {
logger.error(String.format("This schemaRegion [%s] closed failed.", this), e);
} catch (InterruptedException e1) {
logger.warn("Close RocksdDB instance interrupted", e1);
Thread.currentThread().interrupt();
}
}
}
@Override
public Set<String> getPathsSetTemplate(String templateName) throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public Set<String> getPathsUsingTemplate(String templateName) throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public boolean isTemplateAppendable(Template template, List<String> measurements) {
throw new UnsupportedOperationException();
}
@Override
public void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public void activateSchemaTemplate(ActivateTemplateInClusterPlan plan, Template template)
throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public List<String> getPathsUsingTemplate(int templateId) throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public IMNode getMNodeForTrigger(PartialPath fullPath) throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public void releaseMNodeAfterDropTrigger(IMNode imNode) throws MetadataException {
throw new UnsupportedOperationException();
}
@Override
public String toString() {
return String.format("storage group:[%s]", storageGroupFullPath);
}
@TestOnly
public void printScanAllKeys() throws IOException {
readWriteHandler.scanAllKeys(schemaRegionDirPath + File.separator + "scanAllKeys.txt");
}
}