blob: 74617f77403365ed9c8d55f5aed5c0bce6adbdd9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.metadata;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.logfile.MLogReader;
import org.apache.iotdb.db.metadata.logfile.MLogWriter;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.RandomDeleteCache;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.cache.CacheException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.stream.Collectors.toList;
import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
/**
* This class takes the responsibility of serialization of all the metadata info and persistent it
* into files. This class contains all the interfaces to modify the metadata for delta system. All
* the operations will be insert into the logs temporary in case the downtime of the delta system.
*/
@SuppressWarnings("java:S1135") // ignore todos
public class MManager {
public static final String TIME_SERIES_TREE_HEADER = "=== Timeseries Tree ===\n\n";
private static final String TAG_FORMAT = "tag key is %s, tag value is %s, tlog offset is %d";
private static final String DEBUG_MSG = "%s : TimeSeries %s is removed from tag inverted index, ";
private static final String DEBUG_MSG_1 =
"%s: TimeSeries %s's tag info has been removed from tag inverted index ";
private static final String PREVIOUS_CONDITION =
"before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b";
private static final int UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD = 5000;
private static final Logger logger = LoggerFactory.getLogger(MManager.class);
/** A thread will check whether the MTree is modified lately each such interval. Unit: second */
private static final long MTREE_SNAPSHOT_THREAD_CHECK_TIME = 600L;
private final int mtreeSnapshotInterval;
private final long mtreeSnapshotThresholdTime;
// the log file seriesPath
private String logFilePath;
private String mtreeSnapshotPath;
private String mtreeSnapshotTmpPath;
private MTree mtree;
private MLogWriter logWriter;
private TagLogFile tagLogFile;
private boolean isRecovering;
// device -> DeviceMNode
private RandomDeleteCache<PartialPath, MNode> mNodeCache;
// tag key -> tag value -> LeafMNode
private Map<String, Map<String, Set<MeasurementMNode>>> tagIndex = new ConcurrentHashMap<>();
// data type -> number
private Map<TSDataType, Integer> schemaDataTypeNumMap = new ConcurrentHashMap<>();
// reported total series number
private long reportedDataTypeTotalNum;
private AtomicLong totalSeriesNumber = new AtomicLong();
private boolean initialized;
protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private File logFile;
private ScheduledExecutorService timedCreateMTreeSnapshotThread;
private ScheduledExecutorService timedForceMLogThread;
/** threshold total size of MTree */
private static final long MTREE_SIZE_THRESHOLD = config.getAllocateMemoryForSchema();
private boolean allowToCreateNewSeries = true;
private static final int ESTIMATED_SERIES_SIZE = config.getEstimatedSeriesSize();
private static class MManagerHolder {
private MManagerHolder() {
// allowed to do nothing
}
private static final MManager INSTANCE = new MManager();
}
protected MManager() {
mtreeSnapshotInterval = config.getMtreeSnapshotInterval();
mtreeSnapshotThresholdTime = config.getMtreeSnapshotThresholdTime() * 1000L;
String schemaDir = config.getSchemaDir();
File schemaFolder = SystemFileFactory.INSTANCE.getFile(schemaDir);
if (!schemaFolder.exists()) {
if (schemaFolder.mkdirs()) {
logger.info("create system folder {}", schemaFolder.getAbsolutePath());
} else {
logger.info("create system folder {} failed.", schemaFolder.getAbsolutePath());
}
}
logFilePath = schemaDir + File.separator + MetadataConstant.METADATA_LOG;
mtreeSnapshotPath = schemaDir + File.separator + MetadataConstant.MTREE_SNAPSHOT;
mtreeSnapshotTmpPath = schemaDir + File.separator + MetadataConstant.MTREE_SNAPSHOT_TMP;
// do not write log when recover
isRecovering = true;
int cacheSize = config.getmManagerCacheSize();
mNodeCache =
new RandomDeleteCache<PartialPath, MNode>(cacheSize) {
@Override
public MNode loadObjectByKey(PartialPath key) throws CacheException {
try {
return mtree.getNodeByPathWithStorageGroupCheck(key);
} catch (MetadataException e) {
throw new CacheException(e);
}
}
};
if (config.isEnableMTreeSnapshot()) {
timedCreateMTreeSnapshotThread =
Executors.newSingleThreadScheduledExecutor(
r -> new Thread(r, "timedCreateMTreeSnapshotThread"));
timedCreateMTreeSnapshotThread.scheduleAtFixedRate(
this::checkMTreeModified,
MTREE_SNAPSHOT_THREAD_CHECK_TIME,
MTREE_SNAPSHOT_THREAD_CHECK_TIME,
TimeUnit.SECONDS);
}
}
/** we should not use this function in other place, but only in IoTDB class */
public static MManager getInstance() {
return MManagerHolder.INSTANCE;
}
// Because the writer will be used later and should not be closed here.
@SuppressWarnings("squid:S2093")
public synchronized void init() {
if (initialized) {
return;
}
logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
try {
tagLogFile = new TagLogFile(config.getSchemaDir(), MetadataConstant.TAG_LOG);
isRecovering = true;
int lineNumber = initFromLog(logFile);
List<PartialPath> storageGroups = mtree.getAllStorageGroupPaths();
for (PartialPath sg : storageGroups) {
MNode node = mtree.getNodeByPath(sg);
totalSeriesNumber.addAndGet(node.getMeasurementMNodeCount());
}
logWriter = new MLogWriter(config.getSchemaDir(), MetadataConstant.METADATA_LOG);
logWriter.setLogNum(lineNumber);
isRecovering = false;
} catch (IOException | MetadataException e) {
logger.error(
"Cannot recover all MTree from file, we try to recover as possible as we can", e);
}
reportedDataTypeTotalNum = 0L;
initialized = true;
}
/** @return line number of the logFile */
@SuppressWarnings("squid:S3776")
private int initFromLog(File logFile) throws IOException {
File tmpFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath);
if (tmpFile.exists()) {
logger.warn("Creating MTree snapshot not successful before crashing...");
Files.delete(tmpFile.toPath());
}
File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
long time = System.currentTimeMillis();
if (!mtreeSnapshot.exists()) {
mtree = new MTree();
} else {
mtree = MTree.deserializeFrom(mtreeSnapshot);
logger.debug(
"spend {} ms to deserialize mtree from snapshot", System.currentTimeMillis() - time);
}
time = System.currentTimeMillis();
// init the metadata from the operation log
if (logFile.exists()) {
int idx = 0;
try (MLogReader mLogReader =
new MLogReader(config.getSchemaDir(), MetadataConstant.METADATA_LOG); ) {
idx = applyMlog(mLogReader);
logger.debug(
"spend {} ms to deserialize mtree from mlog.bin", System.currentTimeMillis() - time);
return idx;
} catch (Exception e) {
throw new IOException("Failed to parser mlog.bin for err:" + e);
}
} else {
return 0;
}
}
private int applyMlog(MLogReader mLogReader) {
int idx = 0;
while (mLogReader.hasNext()) {
PhysicalPlan plan = null;
try {
plan = mLogReader.next();
if (plan == null) {
continue;
}
operation(plan);
idx++;
} catch (Exception e) {
logger.error(
"Can not operate cmd {} for err:", plan == null ? "" : plan.getOperatorType(), e);
}
}
return idx;
}
/** function for clearing MTree */
public void clear() {
try {
this.mtree = new MTree();
this.mNodeCache.clear();
this.tagIndex.clear();
this.totalSeriesNumber.set(0);
if (logWriter != null) {
logWriter.close();
logWriter = null;
}
if (tagLogFile != null) {
tagLogFile.close();
tagLogFile = null;
}
this.schemaDataTypeNumMap.clear();
this.reportedDataTypeTotalNum = 0L;
initialized = false;
if (config.isEnableMTreeSnapshot() && timedCreateMTreeSnapshotThread != null) {
timedCreateMTreeSnapshotThread.shutdownNow();
timedCreateMTreeSnapshotThread = null;
}
if (timedForceMLogThread != null) {
timedForceMLogThread.shutdownNow();
timedForceMLogThread = null;
}
} catch (IOException e) {
logger.error("Cannot close metadata log writer, because:", e);
}
}
public void operation(PhysicalPlan plan) throws IOException, MetadataException {
switch (plan.getOperatorType()) {
case CREATE_TIMESERIES:
CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
createTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
break;
case DELETE_TIMESERIES:
DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) plan;
// cause we only has one path for one DeleteTimeSeriesPlan
deleteTimeseries(deleteTimeSeriesPlan.getPaths().get(0));
break;
case SET_STORAGE_GROUP:
SetStorageGroupPlan setStorageGroupPlan = (SetStorageGroupPlan) plan;
setStorageGroup(setStorageGroupPlan.getPath());
break;
case DELETE_STORAGE_GROUP:
DeleteStorageGroupPlan deleteStorageGroupPlan = (DeleteStorageGroupPlan) plan;
deleteStorageGroups(deleteStorageGroupPlan.getPaths());
break;
case TTL:
SetTTLPlan setTTLPlan = (SetTTLPlan) plan;
setTTL(setTTLPlan.getStorageGroup(), setTTLPlan.getDataTTL());
break;
case CHANGE_ALIAS:
ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
changeAlias(changeAliasPlan.getPath(), changeAliasPlan.getAlias());
break;
case CHANGE_TAG_OFFSET:
ChangeTagOffsetPlan changeTagOffsetPlan = (ChangeTagOffsetPlan) plan;
changeOffset(changeTagOffsetPlan.getPath(), changeTagOffsetPlan.getOffset());
break;
default:
logger.error("Unrecognizable command {}", plan.getOperatorType());
}
}
public void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException {
createTimeseries(plan, -1);
}
private void ensureStorageGroup(PartialPath path) throws MetadataException {
try {
mtree.getStorageGroupPath(path);
} catch (StorageGroupNotSetException e) {
if (!config.isAutoCreateSchemaEnabled()) {
throw e;
}
PartialPath storageGroupPath =
MetaUtils.getStorageGroupPathByLevel(path, config.getDefaultStorageGroupLevel());
setStorageGroup(storageGroupPath);
}
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
if (!allowToCreateNewSeries) {
throw new MetadataException(
"IoTDB system load is too large to create timeseries, "
+ "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
}
try {
PartialPath path = plan.getPath();
SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(), plan.getEncoding());
ensureStorageGroup(path);
TSDataType type = plan.getDataType();
// create time series in MTree
MeasurementMNode leafMNode =
mtree.createTimeseries(
path,
type,
plan.getEncoding(),
plan.getCompressor(),
plan.getProps(),
plan.getAlias());
// update tag index
if (plan.getTags() != null) {
// tag key, tag value
for (Entry<String, String> entry : plan.getTags().entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
}
tagIndex
.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
.computeIfAbsent(entry.getValue(), v -> new CopyOnWriteArraySet<>())
.add(leafMNode);
}
}
// update statistics and schemaDataTypeNumMap
totalSeriesNumber.addAndGet(1);
if (totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE >= MTREE_SIZE_THRESHOLD) {
logger.warn("Current series number {} is too large...", totalSeriesNumber);
allowToCreateNewSeries = false;
}
updateSchemaDataTypeNumMap(type, 1);
// write log
if (!isRecovering) {
// either tags or attributes is not empty
if ((plan.getTags() != null && !plan.getTags().isEmpty())
|| (plan.getAttributes() != null && !plan.getAttributes().isEmpty())) {
offset = tagLogFile.write(plan.getTags(), plan.getAttributes());
}
plan.setTagOffset(offset);
logWriter.createTimeseries(plan);
}
leafMNode.setOffset(offset);
} catch (IOException e) {
throw new MetadataException(e);
}
}
/**
* Add one timeseries to metadata tree, if the timeseries already exists, throw exception
*
* @param path the timeseries path
* @param dataType the dateType {@code DataType} of the timeseries
* @param encoding the encoding function {@code Encoding} of the timeseries
* @param compressor the compressor function {@code Compressor} of the time series
*/
public void createTimeseries(
PartialPath path,
TSDataType dataType,
TSEncoding encoding,
CompressionType compressor,
Map<String, String> props)
throws MetadataException {
try {
createTimeseries(
new CreateTimeSeriesPlan(path, dataType, encoding, compressor, props, null, null, null));
} catch (PathAlreadyExistException | AliasAlreadyExistException e) {
if (logger.isDebugEnabled()) {
logger.debug(
"Ignore PathAlreadyExistException and AliasAlreadyExistException when Concurrent inserting"
+ " a non-exist time series {}",
path);
}
}
}
/**
* Delete all timeseries under the given path, may cross different storage group
*
* @param prefixPath path to be deleted, could be root or a prefix path or a full path TODO:
* directly return the failed string set
* @return The String is the deletion failed Timeseries
*/
public String deleteTimeseries(PartialPath prefixPath) throws MetadataException {
if (isStorageGroup(prefixPath)) {
mNodeCache.clear();
}
try {
List<PartialPath> allTimeseries = mtree.getAllTimeseriesPath(prefixPath);
if (allTimeseries.isEmpty()) {
throw new PathNotExistException(prefixPath.getFullPath());
}
// Monitor storage group seriesPath is not allowed to be deleted
allTimeseries.removeIf(p -> p.startsWith(MonitorConstants.STAT_STORAGE_GROUP_ARRAY));
Set<String> failedNames = new HashSet<>();
for (PartialPath p : allTimeseries) {
deleteSingleTimeseriesInternal(p, failedNames);
}
return failedNames.isEmpty() ? null : String.join(",", failedNames);
} catch (IOException e) {
throw new MetadataException(e.getMessage());
}
}
private void deleteSingleTimeseriesInternal(PartialPath p, Set<String> failedNames)
throws MetadataException, IOException {
DeleteTimeSeriesPlan deleteTimeSeriesPlan = new DeleteTimeSeriesPlan();
try {
PartialPath emptyStorageGroup = deleteOneTimeseriesUpdateStatisticsAndDropTrigger(p);
if (!isRecovering) {
if (emptyStorageGroup != null) {
StorageEngine.getInstance().deleteAllDataFilesInOneStorageGroup(emptyStorageGroup);
StorageEngine.getInstance()
.releaseWalDirectByteBufferPoolInOneStorageGroup(emptyStorageGroup);
}
deleteTimeSeriesPlan.setDeletePathList(Collections.singletonList(p));
logWriter.deleteTimeseries(deleteTimeSeriesPlan);
}
} catch (DeleteFailedException e) {
failedNames.add(e.getName());
}
}
/** remove the node from the tag inverted index */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private void removeFromTagInvertedIndex(MeasurementMNode node) throws IOException {
if (node.getOffset() < 0) {
return;
}
Map<String, String> tagMap =
tagLogFile.readTag(config.getTagAttributeTotalSize(), node.getOffset());
if (tagMap != null) {
for (Entry<String, String> entry : tagMap.entrySet()) {
if (tagIndex.containsKey(entry.getKey())
&& tagIndex.get(entry.getKey()).containsKey(entry.getValue())) {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
String.format(DEBUG_MSG, "Delete" + TAG_FORMAT, node.getFullPath()),
entry.getKey(),
entry.getValue(),
node.getOffset()));
}
tagIndex.get(entry.getKey()).get(entry.getValue()).remove(node);
if (tagIndex.get(entry.getKey()).get(entry.getValue()).isEmpty()) {
tagIndex.get(entry.getKey()).remove(entry.getValue());
if (tagIndex.get(entry.getKey()).isEmpty()) {
tagIndex.remove(entry.getKey());
}
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
String.format(DEBUG_MSG_1, "Delete" + PREVIOUS_CONDITION, node.getFullPath()),
entry.getKey(),
entry.getValue(),
node.getOffset(),
tagIndex.containsKey(entry.getKey())));
}
}
}
}
}
/**
* @param path full path from root to leaf node
* @return after delete if the storage group is empty, return its path, otherwise return null
*/
private PartialPath deleteOneTimeseriesUpdateStatisticsAndDropTrigger(PartialPath path)
throws MetadataException, IOException {
Pair<PartialPath, MeasurementMNode> pair =
mtree.deleteTimeseriesAndReturnEmptyStorageGroup(path);
removeFromTagInvertedIndex(pair.right);
PartialPath storageGroupPath = pair.left;
// update statistics in schemaDataTypeNumMap
updateSchemaDataTypeNumMap(pair.right.getSchema().getType(), -1);
// drop trigger with no exceptions
TriggerEngine.drop(pair.right);
// TODO: delete the path node and all its ancestors
mNodeCache.clear();
totalSeriesNumber.addAndGet(-1);
if (!allowToCreateNewSeries
&& totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE < MTREE_SIZE_THRESHOLD) {
logger.info("Current series number {} come back to normal level", totalSeriesNumber);
allowToCreateNewSeries = true;
}
return storageGroupPath;
}
/**
* Set storage group of the given path to MTree.
*
* @param storageGroup root.node.(node)*
*/
public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
try {
mtree.setStorageGroup(storageGroup);
if (!config.isEnableMemControl()) {
MemTableManager.getInstance().addOrDeleteStorageGroup(1);
}
if (!isRecovering) {
logWriter.setStorageGroup(storageGroup);
}
} catch (IOException e) {
throw new MetadataException(e.getMessage());
}
}
/**
* Delete storage groups of given paths from MTree. Log format: "delete_storage_group,sg1,sg2,sg3"
*
* @param storageGroups list of paths to be deleted. Format: root.node
*/
public void deleteStorageGroups(List<PartialPath> storageGroups) throws MetadataException {
try {
for (PartialPath storageGroup : storageGroups) {
totalSeriesNumber.addAndGet(mtree.getAllTimeseriesCount(storageGroup));
// clear cached MNode
if (!allowToCreateNewSeries
&& totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE < MTREE_SIZE_THRESHOLD) {
logger.info("Current series number {} come back to normal level", totalSeriesNumber);
allowToCreateNewSeries = true;
}
mNodeCache.clear();
// try to delete storage group
List<MeasurementMNode> leafMNodes = mtree.deleteStorageGroup(storageGroup);
for (MeasurementMNode leafMNode : leafMNodes) {
removeFromTagInvertedIndex(leafMNode);
// update statistics in schemaDataTypeNumMap
updateSchemaDataTypeNumMap(leafMNode.getSchema().getType(), -1);
}
// drop triggers with no exceptions
TriggerEngine.drop(leafMNodes);
if (!config.isEnableMemControl()) {
MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
}
// if success
if (!isRecovering) {
logWriter.deleteStorageGroup(storageGroup);
}
}
} catch (IOException e) {
throw new MetadataException(e.getMessage());
}
}
/**
* update statistics in schemaDataTypeNumMap
*
* @param type data type
* @param num 1 for creating timeseries and -1 for deleting timeseries
*/
private synchronized void updateSchemaDataTypeNumMap(TSDataType type, int num) {
// add an array of the series type
schemaDataTypeNumMap.put(type, schemaDataTypeNumMap.getOrDefault(type, 0) + num);
// add an array of time
schemaDataTypeNumMap.put(
TSDataType.INT64, schemaDataTypeNumMap.getOrDefault(TSDataType.INT64, 0) + num);
// total current DataType Total Num (twice of number of time series)
// used in primitive array manager
long currentDataTypeTotalNum = totalSeriesNumber.get() * 2;
if (num > 0
&& currentDataTypeTotalNum - reportedDataTypeTotalNum
>= UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD) {
PrimitiveArrayManager.updateSchemaDataTypeNum(schemaDataTypeNumMap, currentDataTypeTotalNum);
reportedDataTypeTotalNum = currentDataTypeTotalNum;
}
}
/**
* Check if the given path is storage group or not.
*
* @param path Format: root.node.(node)*
* @apiNote :for cluster
*/
public boolean isStorageGroup(PartialPath path) {
return mtree.isStorageGroup(path);
}
/**
* Get series type for given seriesPath.
*
* @param path full path
*/
public TSDataType getSeriesType(PartialPath path) throws MetadataException {
if (path.equals(SQLConstant.TIME_PATH)) {
return TSDataType.INT64;
}
return mtree.getSchema(path).getType();
}
public MeasurementMNode[] getMNodes(PartialPath deviceId, String[] measurements)
throws MetadataException {
MNode deviceNode = getNodeByPath(deviceId);
MeasurementMNode[] mNodes = new MeasurementMNode[measurements.length];
for (int i = 0; i < mNodes.length; i++) {
mNodes[i] = ((MeasurementMNode) deviceNode.getChild(measurements[i]));
if (mNodes[i] == null && !IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
throw new MetadataException(measurements[i] + " does not exist in " + deviceId);
}
}
return mNodes;
}
/**
* Get all devices under given prefixPath.
*
* @param prefixPath a prefix of a full path. if the wildcard is not at the tail, then each
* wildcard can only match one level, otherwise it can match to the tail.
* @return A HashSet instance which stores devices paths with given prefixPath.
*/
public Set<PartialPath> getDevices(PartialPath prefixPath) throws MetadataException {
return mtree.getDevices(prefixPath);
}
public List<ShowDevicesResult> getDevices(ShowDevicesPlan plan) throws MetadataException {
return mtree.getDevices(plan);
}
/**
* Get all nodes from the given level
*
* @param prefixPath can be a prefix of a full path. Can not be a full path. can not have
* wildcard. But, the level of the prefixPath can be smaller than the given level, e.g.,
* prefixPath = root.a while the given level is 5
* @param nodeLevel the level can not be smaller than the level of the prefixPath
* @return A List instance which stores all node at given level
*/
public List<PartialPath> getNodesList(PartialPath prefixPath, int nodeLevel)
throws MetadataException {
return getNodesList(prefixPath, nodeLevel, null);
}
public List<PartialPath> getNodesList(
PartialPath prefixPath, int nodeLevel, StorageGroupFilter filter) throws MetadataException {
return mtree.getNodesList(prefixPath, nodeLevel, filter);
}
/**
* Get storage group name by path
*
* <p>e.g., root.sg1 is a storage group and path = root.sg1.d1, return root.sg1
*
* @return storage group in the given path
*/
public PartialPath getStorageGroupPath(PartialPath path) throws StorageGroupNotSetException {
return mtree.getStorageGroupPath(path);
}
/** Get all storage group paths */
public List<PartialPath> getAllStorageGroupPaths() {
return mtree.getAllStorageGroupPaths();
}
public List<PartialPath> searchAllRelatedStorageGroups(PartialPath path)
throws MetadataException {
return mtree.searchAllRelatedStorageGroups(path);
}
/**
* Get all storage group under given prefixPath.
*
* @param prefixPath a prefix of a full path. if the wildcard is not at the tail, then each
* wildcard can only match one level, otherwise it can match to the tail.
* @return A ArrayList instance which stores storage group paths with given prefixPath.
*/
public List<PartialPath> getStorageGroupPaths(PartialPath prefixPath) throws MetadataException {
return mtree.getStorageGroupPaths(prefixPath);
}
/** Get all storage group MNodes */
public List<StorageGroupMNode> getAllStorageGroupNodes() {
return mtree.getAllStorageGroupNodes();
}
/**
* Return all paths for given path if the path is abstract. Or return the path itself. Regular
* expression in this method is formed by the amalgamation of seriesPath and the character '*'.
*
* @param prefixPath can be a prefix or a full path. if the wildcard is not at the tail, then each
* wildcard can only match one level, otherwise it can match to the tail.
*/
public List<PartialPath> getAllTimeseriesPath(PartialPath prefixPath) throws MetadataException {
return mtree.getAllTimeseriesPath(prefixPath);
}
/** Similar to method getAllTimeseriesPath(), but return Path with alias alias. */
public Pair<List<PartialPath>, Integer> getAllTimeseriesPathWithAlias(
PartialPath prefixPath, int limit, int offset) throws MetadataException {
return mtree.getAllTimeseriesPathWithAlias(prefixPath, limit, offset);
}
/** To calculate the count of timeseries for given prefix path. */
public int getAllTimeseriesCount(PartialPath prefixPath) throws MetadataException {
return mtree.getAllTimeseriesCount(prefixPath);
}
/** To calculate the count of devices for given prefix path. */
public int getDevicesNum(PartialPath prefixPath) throws MetadataException {
return mtree.getDevicesNum(prefixPath);
}
/** To calculate the count of storage group for given prefix path. */
public int getStorageGroupNum(PartialPath prefixPath) throws MetadataException {
return mtree.getStorageGroupNum(prefixPath);
}
/**
* To calculate the count of nodes in the given level for given prefix path.
*
* @param prefixPath a prefix path or a full path, can not contain '*'
* @param level the level can not be smaller than the level of the prefixPath
*/
public int getNodesCountInGivenLevel(PartialPath prefixPath, int level) throws MetadataException {
return mtree.getNodesCountInGivenLevel(prefixPath, level);
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private List<ShowTimeSeriesResult> showTimeseriesWithIndex(
ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
if (!tagIndex.containsKey(plan.getKey())) {
throw new MetadataException("The key " + plan.getKey() + " is not a tag.", true);
}
Map<String, Set<MeasurementMNode>> value2Node = tagIndex.get(plan.getKey());
if (value2Node.isEmpty()) {
throw new MetadataException("The key " + plan.getKey() + " is not a tag.");
}
List<MeasurementMNode> allMatchedNodes = new ArrayList<>();
if (plan.isContains()) {
for (Entry<String, Set<MeasurementMNode>> entry : value2Node.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
}
String tagValue = entry.getKey();
if (tagValue.contains(plan.getValue())) {
allMatchedNodes.addAll(entry.getValue());
}
}
} else {
for (Entry<String, Set<MeasurementMNode>> entry : value2Node.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
}
String tagValue = entry.getKey();
if (plan.getValue().equals(tagValue)) {
allMatchedNodes.addAll(entry.getValue());
}
}
}
// if ordered by heat, we sort all the timeseries by the descending order of the last insert
// timestamp
if (plan.isOrderByHeat()) {
List<StorageGroupProcessor> list;
try {
list =
StorageEngine.getInstance()
.mergeLock(allMatchedNodes.stream().map(MNode::getPartialPath).collect(toList()));
try {
allMatchedNodes =
allMatchedNodes.stream()
.sorted(
Comparator.comparingLong(
(MeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, context))
.reversed()
.thenComparing(MNode::getFullPath))
.collect(toList());
} finally {
StorageEngine.getInstance().mergeUnLock(list);
}
} catch (StorageEngineException e) {
throw new MetadataException(e);
}
} else {
// otherwise, we just sort them by the alphabetical order
allMatchedNodes =
allMatchedNodes.stream()
.sorted(Comparator.comparing(MNode::getFullPath))
.collect(toList());
}
List<ShowTimeSeriesResult> res = new LinkedList<>();
String[] prefixNodes = plan.getPath().getNodes();
int curOffset = -1;
int count = 0;
int limit = plan.getLimit();
int offset = plan.getOffset();
for (MeasurementMNode leaf : allMatchedNodes) {
if (match(leaf.getPartialPath(), prefixNodes)) {
if (limit != 0 || offset != 0) {
curOffset++;
if (curOffset < offset || count == limit) {
continue;
}
}
try {
Pair<Map<String, String>, Map<String, String>> tagAndAttributePair =
tagLogFile.read(config.getTagAttributeTotalSize(), leaf.getOffset());
MeasurementSchema measurementSchema = leaf.getSchema();
res.add(
new ShowTimeSeriesResult(
leaf.getFullPath(),
leaf.getAlias(),
getStorageGroupPath(leaf.getPartialPath()).getFullPath(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
measurementSchema.getCompressor(),
tagAndAttributePair.left,
tagAndAttributePair.right));
if (limit != 0) {
count++;
}
} catch (IOException e) {
throw new MetadataException(
"Something went wrong while deserialize tag info of " + leaf.getFullPath(), e);
}
}
}
return res;
}
/** whether the full path has the prefixNodes */
private boolean match(PartialPath fullPath, String[] prefixNodes) {
String[] nodes = fullPath.getNodes();
if (nodes.length < prefixNodes.length) {
return false;
}
for (int i = 0; i < prefixNodes.length; i++) {
if (!"*".equals(prefixNodes[i]) && !prefixNodes[i].equals(nodes[i])) {
return false;
}
}
return true;
}
public List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan, QueryContext context)
throws MetadataException {
// show timeseries with index
if (plan.getKey() != null && plan.getValue() != null) {
return showTimeseriesWithIndex(plan, context);
} else {
return showTimeseriesWithoutIndex(plan, context);
}
}
/**
* Get the result of ShowTimeseriesPlan
*
* @param plan show time series query plan
*/
private List<ShowTimeSeriesResult> showTimeseriesWithoutIndex(
ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
List<Pair<PartialPath, String[]>> ans;
if (plan.isOrderByHeat()) {
ans = mtree.getAllMeasurementSchemaByHeatOrder(plan, context);
} else {
ans = mtree.getAllMeasurementSchema(plan);
}
List<ShowTimeSeriesResult> res = new LinkedList<>();
for (Pair<PartialPath, String[]> ansString : ans) {
long tagFileOffset = Long.parseLong(ansString.right[5]);
try {
Pair<Map<String, String>, Map<String, String>> tagAndAttributePair =
new Pair<>(Collections.emptyMap(), Collections.emptyMap());
if (tagFileOffset >= 0) {
tagAndAttributePair = tagLogFile.read(config.getTagAttributeTotalSize(), tagFileOffset);
}
res.add(
new ShowTimeSeriesResult(
ansString.left.getFullPath(),
ansString.right[0],
ansString.right[1],
TSDataType.valueOf(ansString.right[2]),
TSEncoding.valueOf(ansString.right[3]),
CompressionType.valueOf(ansString.right[4]),
tagAndAttributePair.left,
tagAndAttributePair.right));
} catch (IOException e) {
throw new MetadataException(
"Something went wrong while deserialize tag info of " + ansString.left.getFullPath(),
e);
}
}
return res;
}
public MeasurementSchema getSeriesSchema(PartialPath device, String measurement)
throws MetadataException {
MNode node = mtree.getNodeByPath(device);
MNode leaf = node.getChild(measurement);
if (leaf != null) {
return ((MeasurementMNode) leaf).getSchema();
}
return null;
}
/**
* Get child node path in the next level of the given path.
*
* <p>e.g., MTree has [root.sg1.d1.s1, root.sg1.d1.s2, root.sg1.d2.s1] given path = root.sg1,
* return [root.sg1.d1, root.sg1.d2]
*
* @param path The given path
* @return All child nodes' seriesPath(s) of given seriesPath.
*/
public Set<String> getChildNodePathInNextLevel(PartialPath path) throws MetadataException {
return mtree.getChildNodePathInNextLevel(path);
}
/**
* Get child node in the next level of the given path.
*
* <p>e.g., MTree has [root.sg1.d1.s1, root.sg1.d1.s2, root.sg1.d2.s1] given path = root.sg1,
* return [d1, d2] given path = root.sg.d1 return [s1,s2]
*
* @return All child nodes of given seriesPath.
*/
public Set<String> getChildNodeInNextLevel(PartialPath path) throws MetadataException {
return mtree.getChildNodeInNextLevel(path);
}
/**
* Check whether the path exists.
*
* @param path a full path or a prefix path
*/
public boolean isPathExist(PartialPath path) {
return mtree.isPathExist(path);
}
/** Get node by path */
public MNode getNodeByPath(PartialPath path) throws MetadataException {
return mtree.getNodeByPath(path);
}
/**
* E.g., root.sg is storage group given [root, sg], return the MNode of root.sg given [root, sg,
* device], return the MNode of root.sg Get storage group node by path. If storage group is not
* set, StorageGroupNotSetException will be thrown
*/
public StorageGroupMNode getStorageGroupNodeByStorageGroupPath(PartialPath path)
throws MetadataException {
return mtree.getStorageGroupNodeByStorageGroupPath(path);
}
/** Get storage group node by path. the give path don't need to be storage group path. */
public StorageGroupMNode getStorageGroupNodeByPath(PartialPath path) throws MetadataException {
return mtree.getStorageGroupNodeByPath(path);
}
/**
* get device node, if the storage group is not set, create it when autoCreateSchema is true
*
* <p>(we develop this method as we need to get the node's lock after we get the lock.writeLock())
*
* @param path path
*/
public MNode getDeviceNodeWithAutoCreate(PartialPath path, boolean autoCreateSchema, int sgLevel)
throws MetadataException {
MNode node;
boolean shouldSetStorageGroup;
try {
node = mNodeCache.get(path);
return node;
} catch (CacheException e) {
if (!autoCreateSchema) {
throw new PathNotExistException(path.getFullPath());
}
shouldSetStorageGroup = e.getCause() instanceof StorageGroupNotSetException;
}
try {
if (shouldSetStorageGroup) {
PartialPath storageGroupPath = MetaUtils.getStorageGroupPathByLevel(path, sgLevel);
setStorageGroup(storageGroupPath);
}
node = mtree.getDeviceNodeWithAutoCreating(path, sgLevel);
return node;
} catch (StorageGroupAlreadySetException e) {
// ignore set storage group concurrently
node = mtree.getDeviceNodeWithAutoCreating(path, sgLevel);
return node;
}
}
/** !!!!!!Attention!!!!! must call the return node's readUnlock() if you call this method. */
public MNode getDeviceNodeWithAutoCreate(PartialPath path) throws MetadataException {
return getDeviceNodeWithAutoCreate(
path, config.isAutoCreateSchemaEnabled(), config.getDefaultStorageGroupLevel());
}
public MNode getDeviceNode(PartialPath path) throws MetadataException {
MNode node;
try {
node = mNodeCache.get(path);
return node;
} catch (CacheException e) {
throw new PathNotExistException(path.getFullPath());
}
}
/**
* To reduce the String number in memory, use the deviceId from MManager instead of the deviceId
* read from disk
*
* @param path read from disk
* @return deviceId
*/
public String getDeviceId(PartialPath path) {
String device = null;
try {
MNode deviceNode = getDeviceNode(path);
device = deviceNode.getFullPath();
} catch (MetadataException | NullPointerException e) {
// Cannot get deviceId from MManager, return the input deviceId
}
return device;
}
/** Get metadata in string */
public String getMetadataInString() {
return TIME_SERIES_TREE_HEADER + mtree;
}
public void setTTL(PartialPath storageGroup, long dataTTL) throws MetadataException, IOException {
getStorageGroupNodeByStorageGroupPath(storageGroup).setDataTTL(dataTTL);
if (!isRecovering) {
logWriter.setTTL(storageGroup, dataTTL);
}
}
/**
* get all storageGroups ttl
*
* @return key-> storageGroupPath, value->ttl
*/
public Map<PartialPath, Long> getStorageGroupsTTL() {
Map<PartialPath, Long> storageGroupsTTL = new HashMap<>();
try {
List<PartialPath> storageGroups = this.getAllStorageGroupPaths();
for (PartialPath storageGroup : storageGroups) {
long ttl = getStorageGroupNodeByStorageGroupPath(storageGroup).getDataTTL();
storageGroupsTTL.put(storageGroup, ttl);
}
} catch (MetadataException e) {
logger.error("get storage groups ttl failed.", e);
}
return storageGroupsTTL;
}
/**
* Check whether the given path contains a storage group change or set the new offset of a
* timeseries
*
* @param path timeseries
* @param offset offset in the tag file
*/
public void changeOffset(PartialPath path, long offset) throws MetadataException {
((MeasurementMNode) mtree.getNodeByPath(path)).setOffset(offset);
}
public void changeAlias(PartialPath path, String alias) throws MetadataException {
MeasurementMNode leafMNode = (MeasurementMNode) mtree.getNodeByPath(path);
if (leafMNode.getAlias() != null) {
leafMNode.getParent().deleteAliasChild(leafMNode.getAlias());
}
leafMNode.getParent().addAlias(alias, leafMNode);
leafMNode.setAlias(alias);
}
/**
* upsert tags and attributes key-value for the timeseries if the key has existed, just use the
* new value to update it.
*
* @param alias newly added alias
* @param tagsMap newly added tags map
* @param attributesMap newly added attributes map
* @param fullPath timeseries
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void upsertTagsAndAttributes(
String alias,
Map<String, String> tagsMap,
Map<String, String> attributesMap,
PartialPath fullPath)
throws MetadataException, IOException {
MNode mNode = mtree.getNodeByPath(fullPath);
if (!(mNode instanceof MeasurementMNode)) {
throw new PathNotExistException(fullPath.getFullPath());
}
MeasurementMNode leafMNode = (MeasurementMNode) mNode;
// upsert alias
if (alias != null && !alias.equals(leafMNode.getAlias())) {
if (!leafMNode.getParent().addAlias(alias, leafMNode)) {
throw new MetadataException("The alias already exists.");
}
if (leafMNode.getAlias() != null) {
leafMNode.getParent().deleteAliasChild(leafMNode.getAlias());
}
leafMNode.setAlias(alias);
// persist to WAL
logWriter.changeAlias(fullPath, alias);
}
if (tagsMap == null && attributesMap == null) {
return;
}
// no tag or attribute, we need to add a new record in log
if (leafMNode.getOffset() < 0) {
long offset = tagLogFile.write(tagsMap, attributesMap);
logWriter.changeOffset(fullPath, offset);
leafMNode.setOffset(offset);
// update inverted Index map
if (tagsMap != null) {
for (Entry<String, String> entry : tagsMap.entrySet()) {
tagIndex
.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
.computeIfAbsent(entry.getValue(), v -> new CopyOnWriteArraySet<>())
.add(leafMNode);
}
}
return;
}
Pair<Map<String, String>, Map<String, String>> pair =
tagLogFile.read(config.getTagAttributeTotalSize(), leafMNode.getOffset());
if (tagsMap != null) {
for (Entry<String, String> entry : tagsMap.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
String beforeValue = pair.left.get(key);
pair.left.put(key, value);
// if the key has existed and the value is not equal to the new one
// we should remove before key-value from inverted index map
if (beforeValue != null && !beforeValue.equals(value)) {
if (tagIndex.containsKey(key) && tagIndex.get(key).containsKey(beforeValue)) {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
String.format(DEBUG_MSG, "Upsert" + TAG_FORMAT, leafMNode.getFullPath()),
key,
beforeValue,
leafMNode.getOffset()));
}
tagIndex.get(key).get(beforeValue).remove(leafMNode);
if (tagIndex.get(key).get(beforeValue).isEmpty()) {
tagIndex.get(key).remove(beforeValue);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
String.format(
DEBUG_MSG_1, "Upsert" + PREVIOUS_CONDITION, leafMNode.getFullPath()),
key,
beforeValue,
leafMNode.getOffset(),
tagIndex.containsKey(key)));
}
}
}
// if the key doesn't exist or the value is not equal to the new one
// we should add a new key-value to inverted index map
if (beforeValue == null || !beforeValue.equals(value)) {
tagIndex
.computeIfAbsent(key, k -> new ConcurrentHashMap<>())
.computeIfAbsent(value, v -> new CopyOnWriteArraySet<>())
.add(leafMNode);
}
}
}
if (attributesMap != null) {
pair.right.putAll(attributesMap);
}
// persist the change to disk
tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
}
/**
* add new attributes key-value for the timeseries
*
* @param attributesMap newly added attributes map
* @param fullPath timeseries
*/
public void addAttributes(Map<String, String> attributesMap, PartialPath fullPath)
throws MetadataException, IOException {
MNode mNode = mtree.getNodeByPath(fullPath);
if (!(mNode instanceof MeasurementMNode)) {
throw new PathNotExistException(fullPath.getFullPath());
}
MeasurementMNode leafMNode = (MeasurementMNode) mNode;
// no tag or attribute, we need to add a new record in log
if (leafMNode.getOffset() < 0) {
long offset = tagLogFile.write(Collections.emptyMap(), attributesMap);
logWriter.changeOffset(fullPath, offset);
leafMNode.setOffset(offset);
return;
}
Pair<Map<String, String>, Map<String, String>> pair =
tagLogFile.read(config.getTagAttributeTotalSize(), leafMNode.getOffset());
for (Entry<String, String> entry : attributesMap.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (pair.right.containsKey(key)) {
throw new MetadataException(
String.format("TimeSeries [%s] already has the attribute [%s].", fullPath, key));
}
pair.right.put(key, value);
}
// persist the change to disk
tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
}
/**
* add new tags key-value for the timeseries
*
* @param tagsMap newly added tags map
* @param fullPath timeseries
*/
public void addTags(Map<String, String> tagsMap, PartialPath fullPath)
throws MetadataException, IOException {
MNode mNode = mtree.getNodeByPath(fullPath);
if (!(mNode instanceof MeasurementMNode)) {
throw new PathNotExistException(fullPath.getFullPath());
}
MeasurementMNode leafMNode = (MeasurementMNode) mNode;
// no tag or attribute, we need to add a new record in log
if (leafMNode.getOffset() < 0) {
long offset = tagLogFile.write(tagsMap, Collections.emptyMap());
logWriter.changeOffset(fullPath, offset);
leafMNode.setOffset(offset);
// update inverted Index map
for (Entry<String, String> entry : tagsMap.entrySet()) {
tagIndex
.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
.computeIfAbsent(entry.getValue(), v -> new CopyOnWriteArraySet<>())
.add(leafMNode);
}
return;
}
Pair<Map<String, String>, Map<String, String>> pair =
tagLogFile.read(config.getTagAttributeTotalSize(), leafMNode.getOffset());
for (Entry<String, String> entry : tagsMap.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (pair.left.containsKey(key)) {
throw new MetadataException(
String.format("TimeSeries [%s] already has the tag [%s].", fullPath, key));
}
pair.left.put(key, value);
}
// persist the change to disk
tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
// update tag inverted map
tagsMap.forEach(
(key, value) ->
tagIndex
.computeIfAbsent(key, k -> new ConcurrentHashMap<>())
.computeIfAbsent(value, v -> new CopyOnWriteArraySet<>())
.add(leafMNode));
}
/**
* drop tags or attributes of the timeseries
*
* @param keySet tags key or attributes key
* @param fullPath timeseries path
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void dropTagsOrAttributes(Set<String> keySet, PartialPath fullPath)
throws MetadataException, IOException {
MNode mNode = mtree.getNodeByPath(fullPath);
if (!(mNode instanceof MeasurementMNode)) {
throw new PathNotExistException(fullPath.getFullPath());
}
MeasurementMNode leafMNode = (MeasurementMNode) mNode;
// no tag or attribute, just do nothing.
if (leafMNode.getOffset() < 0) {
return;
}
Pair<Map<String, String>, Map<String, String>> pair =
tagLogFile.read(config.getTagAttributeTotalSize(), leafMNode.getOffset());
Map<String, String> deleteTag = new HashMap<>();
for (String key : keySet) {
// check tag map
// check attribute map
String removeVal = pair.left.remove(key);
if (removeVal != null) {
deleteTag.put(key, removeVal);
} else {
removeVal = pair.right.remove(key);
if (removeVal == null) {
logger.warn("TimeSeries [{}] does not have tag/attribute [{}]", fullPath, key);
}
}
}
// persist the change to disk
tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
Map<String, Set<MeasurementMNode>> tagVal2LeafMNodeSet;
Set<MeasurementMNode> MMNodes;
for (Entry<String, String> entry : deleteTag.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
// change the tag inverted index map
tagVal2LeafMNodeSet = tagIndex.get(key);
if (tagVal2LeafMNodeSet != null) {
MMNodes = tagVal2LeafMNodeSet.get(value);
if (MMNodes != null) {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
String.format(DEBUG_MSG, "Drop" + TAG_FORMAT, leafMNode.getFullPath()),
entry.getKey(),
entry.getValue(),
leafMNode.getOffset()));
}
MMNodes.remove(leafMNode);
if (MMNodes.isEmpty()) {
tagVal2LeafMNodeSet.remove(value);
if (tagVal2LeafMNodeSet.isEmpty()) {
tagIndex.remove(key);
}
}
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
String.format(DEBUG_MSG_1, "Drop" + PREVIOUS_CONDITION, leafMNode.getFullPath()),
key,
value,
leafMNode.getOffset(),
tagIndex.containsKey(key)));
}
}
}
}
/**
* set/change the values of tags or attributes
*
* @param alterMap the new tags or attributes key-value
* @param fullPath timeseries
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void setTagsOrAttributesValue(Map<String, String> alterMap, PartialPath fullPath)
throws MetadataException, IOException {
MNode mNode = mtree.getNodeByPath(fullPath);
if (!(mNode instanceof MeasurementMNode)) {
throw new PathNotExistException(fullPath.getFullPath());
}
MeasurementMNode leafMNode = (MeasurementMNode) mNode;
if (leafMNode.getOffset() < 0) {
throw new MetadataException(
String.format("TimeSeries [%s] does not have any tag/attribute.", fullPath));
}
// tags, attributes
Pair<Map<String, String>, Map<String, String>> pair =
tagLogFile.read(config.getTagAttributeTotalSize(), leafMNode.getOffset());
Map<String, String> oldTagValue = new HashMap<>();
Map<String, String> newTagValue = new HashMap<>();
for (Entry<String, String> entry : alterMap.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
// check tag map
if (pair.left.containsKey(key)) {
oldTagValue.put(key, pair.left.get(key));
newTagValue.put(key, value);
pair.left.put(key, value);
} else if (pair.right.containsKey(key)) {
// check attribute map
pair.right.put(key, value);
} else {
throw new MetadataException(
String.format("TimeSeries [%s] does not have tag/attribute [%s].", fullPath, key),
true);
}
}
// persist the change to disk
tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
for (Entry<String, String> entry : oldTagValue.entrySet()) {
String key = entry.getKey();
String beforeValue = entry.getValue();
String currentValue = newTagValue.get(key);
// change the tag inverted index map
if (tagIndex.containsKey(key) && tagIndex.get(key).containsKey(beforeValue)) {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
String.format(DEBUG_MSG, "Set" + TAG_FORMAT, leafMNode.getFullPath()),
entry.getKey(),
beforeValue,
leafMNode.getOffset()));
}
tagIndex.get(key).get(beforeValue).remove(leafMNode);
} else {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
String.format(DEBUG_MSG_1, "Set" + PREVIOUS_CONDITION, leafMNode.getFullPath()),
key,
beforeValue,
leafMNode.getOffset(),
tagIndex.containsKey(key)));
}
}
tagIndex
.computeIfAbsent(key, k -> new ConcurrentHashMap<>())
.computeIfAbsent(currentValue, k -> new CopyOnWriteArraySet<>())
.add(leafMNode);
}
}
/**
* rename the tag or attribute's key of the timeseries
*
* @param oldKey old key of tag or attribute
* @param newKey new key of tag or attribute
* @param fullPath timeseries
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath fullPath)
throws MetadataException, IOException {
MNode mNode = mtree.getNodeByPath(fullPath);
if (!(mNode instanceof MeasurementMNode)) {
throw new PathNotExistException(fullPath.getFullPath());
}
MeasurementMNode leafMNode = (MeasurementMNode) mNode;
if (leafMNode.getOffset() < 0) {
throw new MetadataException(
String.format("TimeSeries [%s] does not have [%s] tag/attribute.", fullPath, oldKey),
true);
}
// tags, attributes
Pair<Map<String, String>, Map<String, String>> pair =
tagLogFile.read(config.getTagAttributeTotalSize(), leafMNode.getOffset());
// current name has existed
if (pair.left.containsKey(newKey) || pair.right.containsKey(newKey)) {
throw new MetadataException(
String.format(
"TimeSeries [%s] already has a tag/attribute named [%s].", fullPath, newKey),
true);
}
// check tag map
if (pair.left.containsKey(oldKey)) {
String value = pair.left.remove(oldKey);
pair.left.put(newKey, value);
// persist the change to disk
tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
// change the tag inverted index map
if (tagIndex.containsKey(oldKey) && tagIndex.get(oldKey).containsKey(value)) {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
String.format(DEBUG_MSG, "Rename" + TAG_FORMAT, leafMNode.getFullPath()),
oldKey,
value,
leafMNode.getOffset()));
}
tagIndex.get(oldKey).get(value).remove(leafMNode);
} else {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
String.format(
DEBUG_MSG_1, "Rename" + PREVIOUS_CONDITION, leafMNode.getFullPath()),
oldKey,
value,
leafMNode.getOffset(),
tagIndex.containsKey(oldKey)));
}
}
tagIndex
.computeIfAbsent(newKey, k -> new ConcurrentHashMap<>())
.computeIfAbsent(value, k -> new CopyOnWriteArraySet<>())
.add(leafMNode);
} else if (pair.right.containsKey(oldKey)) {
// check attribute map
pair.right.put(newKey, pair.right.remove(oldKey));
// persist the change to disk
tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
} else {
throw new MetadataException(
String.format("TimeSeries [%s] does not have tag/attribute [%s].", fullPath, oldKey),
true);
}
}
/** Check whether the given path contains a storage group */
boolean checkStorageGroupByPath(PartialPath path) {
return mtree.checkStorageGroupByPath(path);
}
/**
* Get all storage groups under the given path
*
* @return List of String represented all storage group names
* @apiNote :for cluster
*/
List<String> getStorageGroupByPath(PartialPath path) throws MetadataException {
try {
return mtree.getStorageGroupByPath(path);
} catch (MetadataException e) {
throw new MetadataException(e);
}
}
public void collectTimeseriesSchema(
MNode startingNode, Collection<TimeseriesSchema> timeseriesSchemas) {
Deque<MNode> nodeDeque = new ArrayDeque<>();
nodeDeque.addLast(startingNode);
while (!nodeDeque.isEmpty()) {
MNode node = nodeDeque.removeFirst();
if (node instanceof MeasurementMNode) {
MeasurementSchema nodeSchema = ((MeasurementMNode) node).getSchema();
timeseriesSchemas.add(
new TimeseriesSchema(
node.getFullPath(),
nodeSchema.getType(),
nodeSchema.getEncodingType(),
nodeSchema.getCompressor()));
} else if (!node.getChildren().isEmpty()) {
nodeDeque.addAll(node.getChildren().values());
}
}
}
public void collectTimeseriesSchema(
String prefixPath, Collection<TimeseriesSchema> timeseriesSchemas) throws MetadataException {
collectTimeseriesSchema(getNodeByPath(new PartialPath(prefixPath)), timeseriesSchemas);
}
public void collectMeasurementSchema(
MNode startingNode, Collection<MeasurementSchema> measurementSchemas) {
Deque<MNode> nodeDeque = new ArrayDeque<>();
nodeDeque.addLast(startingNode);
while (!nodeDeque.isEmpty()) {
MNode node = nodeDeque.removeFirst();
if (node instanceof MeasurementMNode) {
MeasurementSchema nodeSchema = ((MeasurementMNode) node).getSchema();
measurementSchemas.add(
new MeasurementSchema(
node.getName(),
nodeSchema.getType(),
nodeSchema.getEncodingType(),
nodeSchema.getCompressor()));
} else if (!node.getChildren().isEmpty()) {
nodeDeque.addAll(node.getChildren().values());
}
}
}
/** Collect the timeseries schemas under "startingPath". */
public void collectSeries(PartialPath startingPath, List<MeasurementSchema> measurementSchemas) {
MNode mNode;
try {
mNode = getNodeByPath(startingPath);
} catch (MetadataException e) {
return;
}
collectMeasurementSchema(mNode, measurementSchemas);
}
/**
* For a path, infer all storage groups it may belong to. The path can have wildcards.
*
* <p>Consider the path into two parts: (1) the sub path which can not contain a storage group
* name and (2) the sub path which is substring that begin after the storage group name.
*
* <p>(1) Suppose the part of the path can not contain a storage group name (e.g.,
* "root".contains("root.sg") == false), then: If the wildcard is not at the tail, then for each
* wildcard, only one level will be inferred and the wildcard will be removed. If the wildcard is
* at the tail, then the inference will go on until the storage groups are found and the wildcard
* will be kept. (2) Suppose the part of the path is a substring that begin after the storage
* group name. (e.g., For "root.*.sg1.a.*.b.*" and "root.x.sg1" is a storage group, then this part
* is "a.*.b.*"). For this part, keep what it is.
*
* <p>Assuming we have three SGs: root.group1, root.group2, root.area1.group3 Eg1: for input
* "root.*", returns ("root.group1", "root.group1.*"), ("root.group2", "root.group2.*")
* ("root.area1.group3", "root.area1.group3.*") Eg2: for input "root.*.s1", returns
* ("root.group1", "root.group1.s1"), ("root.group2", "root.group2.s1")
*
* <p>Eg3: for input "root.area1.*", returns ("root.area1.group3", "root.area1.group3.*")
*
* @param path can be a prefix or a full path.
* @return StorageGroupName-FullPath pairs
*/
public Map<String, String> determineStorageGroup(PartialPath path) throws IllegalPathException {
Map<String, String> sgPathMap = mtree.determineStorageGroup(path);
if (logger.isDebugEnabled()) {
logger.debug("The storage groups of path {} are {}", path, sgPathMap.keySet());
}
return sgPathMap;
}
/**
* if the path is in local mtree, nothing needed to do (because mtree is in the memory); Otherwise
* cache the path to mRemoteSchemaCache
*/
public void cacheMeta(PartialPath path, MeasurementMNode measurementMNode) {
// do nothing
}
public void updateLastCache(
PartialPath seriesPath,
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
Long latestFlushedTime,
MeasurementMNode node) {
if (node != null) {
node.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
} else {
try {
MeasurementMNode node1 = (MeasurementMNode) mtree.getNodeByPath(seriesPath);
node1.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
} catch (MetadataException e) {
logger.warn("failed to update last cache for the {}, err:{}", seriesPath, e.getMessage());
}
}
}
public TimeValuePair getLastCache(PartialPath seriesPath) {
try {
MeasurementMNode node = (MeasurementMNode) mtree.getNodeByPath(seriesPath);
return node.getCachedLast();
} catch (MetadataException e) {
logger.warn("failed to get last cache for the {}, err:{}", seriesPath, e.getMessage());
}
return null;
}
@TestOnly
public void flushAllMlogForTest() throws IOException {
logWriter.close();
}
private void checkMTreeModified() {
if (logWriter == null || logFile == null) {
// the logWriter is not initialized now, we skip the check once.
return;
}
if (System.currentTimeMillis() - logFile.lastModified() < mtreeSnapshotThresholdTime) {
if (logger.isDebugEnabled()) {
logger.debug(
"MTree snapshot need not be created. Time from last modification: {} ms.",
System.currentTimeMillis() - logFile.lastModified());
}
} else if (logWriter.getLogNum() < mtreeSnapshotInterval) {
if (logger.isDebugEnabled()) {
logger.debug(
"MTree snapshot need not be created. New mlog line number: {}.", logWriter.getLogNum());
}
} else {
logger.info(
"New mlog line number: {}, time from last modification: {} ms",
logWriter.getLogNum(),
System.currentTimeMillis() - logFile.lastModified());
createMTreeSnapshot();
}
}
public void createMTreeSnapshot() {
long time = System.currentTimeMillis();
logger.info("Start creating MTree snapshot to {}", mtreeSnapshotPath);
try {
mtree.serializeTo(mtreeSnapshotTmpPath);
File tmpFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath);
File snapshotFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
if (snapshotFile.exists()) {
Files.delete(snapshotFile.toPath());
}
if (tmpFile.renameTo(snapshotFile)) {
logger.info(
"Finish creating MTree snapshot to {}, spend {} ms.",
mtreeSnapshotPath,
System.currentTimeMillis() - time);
}
logWriter.clear();
} catch (IOException e) {
logger.warn("Failed to create MTree snapshot to {}", mtreeSnapshotPath, e);
if (SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath).exists()) {
try {
Files.delete(SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath).toPath());
} catch (IOException e1) {
logger.warn("delete file {} failed: {}", mtreeSnapshotTmpPath, e1.getMessage());
}
}
}
}
/** get schema for device. Attention!!! Only support insertPlan */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public MNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) throws MetadataException {
PartialPath deviceId = plan.getDeviceId();
String[] measurementList = plan.getMeasurements();
MeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
// 1. get device node
MNode deviceMNode = getDeviceNodeWithAutoCreate(deviceId);
// 2. get schema of each measurement
// if do not has measurement
MeasurementMNode measurementMNode;
TSDataType dataType;
for (int i = 0; i < measurementList.length; i++) {
try {
MNode child = getMNode(deviceMNode, measurementList[i]);
if (child instanceof MeasurementMNode) {
measurementMNode = (MeasurementMNode) child;
} else if (child instanceof StorageGroupMNode) {
throw new PathAlreadyExistException(deviceId + PATH_SEPARATOR + measurementList[i]);
} else {
if (!config.isAutoCreateSchemaEnabled()) {
throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurementList[i]);
} else {
// child is null or child is type of MNode
dataType = getTypeInLoc(plan, i);
// create it, may concurrent created by multiple thread
internalCreateTimeseries(deviceId.concatNode(measurementList[i]), dataType);
measurementMNode = (MeasurementMNode) deviceMNode.getChild(measurementList[i]);
}
}
// check type is match
TSDataType insertDataType = null;
if (plan instanceof InsertRowPlan) {
if (!((InsertRowPlan) plan).isNeedInferType()) {
// only when InsertRowPlan's values is object[], we should check type
insertDataType = getTypeInLoc(plan, i);
} else {
insertDataType = measurementMNode.getSchema().getType();
}
} else if (plan instanceof InsertTabletPlan) {
insertDataType = getTypeInLoc(plan, i);
}
if (measurementMNode.getSchema().getType() != insertDataType) {
logger.warn(
"DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
measurementList[i],
insertDataType,
measurementMNode.getSchema().getType());
DataTypeMismatchException mismatchException =
new DataTypeMismatchException(
measurementList[i], insertDataType, measurementMNode.getSchema().getType());
if (!config.isEnablePartialInsert()) {
throw mismatchException;
} else {
// mark failed measurement
plan.markFailedMeasurementInsertion(i, mismatchException);
continue;
}
}
measurementMNodes[i] = measurementMNode;
// set measurementName instead of alias
measurementList[i] = measurementMNode.getName();
} catch (MetadataException e) {
logger.warn(
"meet error when check {}.{}, message: {}",
deviceId,
measurementList[i],
e.getMessage());
if (config.isEnablePartialInsert()) {
// mark failed measurement
plan.markFailedMeasurementInsertion(i, e);
} else {
throw e;
}
}
}
return deviceMNode;
}
public MNode getMNode(MNode deviceMNode, String measurementName) {
return deviceMNode.getChild(measurementName);
}
/** create timeseries with ignore PathAlreadyExistException */
private void internalCreateTimeseries(PartialPath path, TSDataType dataType)
throws MetadataException {
createTimeseries(
path,
dataType,
getDefaultEncoding(dataType),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
/** get dataType of plan, in loc measurements only support InsertRowPlan and InsertTabletPlan */
private TSDataType getTypeInLoc(InsertPlan plan, int loc) throws MetadataException {
TSDataType dataType;
if (plan instanceof InsertRowPlan) {
InsertRowPlan tPlan = (InsertRowPlan) plan;
dataType =
TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
} else if (plan instanceof InsertTabletPlan) {
dataType = (plan).getDataTypes()[loc];
} else {
throw new MetadataException(
String.format(
"Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
}
return dataType;
}
/**
* StorageGroupFilter filters unsatisfied storage groups in metadata queries to speed up and
* deduplicate.
*/
@FunctionalInterface
public interface StorageGroupFilter {
boolean satisfy(String storageGroup);
}
}