blob: b81307ab7abb11a2e2842898d269b4de2f9b9a82 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputation;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.ITemplateManager;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.IntFunction;
import java.util.function.IntPredicate;
import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;
/**
* This class takes the responsibility of metadata cache management of all DataRegions under
* StorageEngine
*/
public class DataNodeSchemaCache {
private static final Logger logger = LoggerFactory.getLogger(DataNodeSchemaCache.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final ITemplateManager templateManager = ClusterTemplateManager.getInstance();
private final DeviceUsingTemplateSchemaCache deviceUsingTemplateSchemaCache;
private final TimeSeriesSchemaCache timeSeriesSchemaCache;
// cache update or clean have higher priority than cache read
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(false);
private DataNodeSchemaCache() {
deviceUsingTemplateSchemaCache = new DeviceUsingTemplateSchemaCache(templateManager);
timeSeriesSchemaCache = new TimeSeriesSchemaCache();
MetricService.getInstance().addMetricSet(new DataNodeSchemaCacheMetrics(this));
}
public long getHitCount() {
return deviceUsingTemplateSchemaCache.getHitCount() + timeSeriesSchemaCache.getHitCount();
}
public long getRequestCount() {
return deviceUsingTemplateSchemaCache.getRequestCount()
+ timeSeriesSchemaCache.getRequestCount();
}
public static DataNodeSchemaCache getInstance() {
return DataNodeSchemaCacheHolder.INSTANCE;
}
/** singleton pattern. */
private static class DataNodeSchemaCacheHolder {
private static final DataNodeSchemaCache INSTANCE = new DataNodeSchemaCache();
}
public void takeReadLock() {
readWriteLock.readLock().lock();
}
public void releaseReadLock() {
readWriteLock.readLock().unlock();
}
public void takeWriteLock() {
readWriteLock.writeLock().lock();
}
public void releaseWriteLock() {
readWriteLock.writeLock().unlock();
}
/**
* Get SchemaEntity info without auto create schema
*
* @param devicePath should not be measurementPath or AlignedPath
* @param measurements
* @return timeseries partialPath and its SchemaEntity
*/
public ClusterSchemaTree get(PartialPath devicePath, String[] measurements) {
return timeSeriesSchemaCache.get(devicePath, measurements);
}
public ClusterSchemaTree get(PartialPath fullPath) {
ClusterSchemaTree clusterSchemaTree =
deviceUsingTemplateSchemaCache.get(fullPath.getDevicePath());
if (clusterSchemaTree == null || clusterSchemaTree.isEmpty()) {
return timeSeriesSchemaCache.get(fullPath);
} else {
return clusterSchemaTree;
}
}
/**
* Get schema info under the given device if the device path is a template activated path.
*
* @param devicePath full path of the device
* @return empty if cache miss or the device path is not a template activated path
*/
public ClusterSchemaTree getMatchedSchemaWithTemplate(PartialPath devicePath) {
return deviceUsingTemplateSchemaCache.getMatchedSchemaWithTemplate(devicePath);
}
/**
* Get schema info under the given full path that must not be a template series.
*
* @param fullPath full path
* @return empty if cache miss
*/
public ClusterSchemaTree getMatchedSchemaWithoutTemplate(PartialPath fullPath) {
return timeSeriesSchemaCache.get(fullPath);
}
public List<Integer> computeWithoutTemplate(ISchemaComputation schemaComputation) {
List<Integer> result = timeSeriesSchemaCache.computeAndRecordLogicalView(schemaComputation);
schemaComputation.recordRangeOfLogicalViewSchemaListNow();
return result;
}
/**
* This function is used to process logical view schema list in statement. It will try to find the
* source paths of those views in cache. If it found sources, measurement schemas of sources will
* be recorded in measurement schema list; else the views will be recorded as missed. The indexes
* of missed views and full paths of their source paths will be returned.
*
* @param schemaComputation the statement you want to process
* @return The indexes of missed views and full paths of their source paths will be returned.
*/
public Pair<List<Integer>, List<String>> computeSourceOfLogicalView(
ISchemaComputation schemaComputation) {
if (!schemaComputation.hasLogicalViewNeedProcess()) {
return new Pair<>(new ArrayList<>(), new ArrayList<>());
}
return timeSeriesSchemaCache.computeSourceOfLogicalView(schemaComputation);
}
public List<Integer> computeWithTemplate(ISchemaComputation schemaComputation) {
return deviceUsingTemplateSchemaCache.compute(schemaComputation);
}
/**
* Store the fetched schema in either the schemaCache or templateSchemaCache, depending on its
* associated device.
*/
public void put(ClusterSchemaTree tree) {
PartialPath devicePath;
for (DeviceSchemaInfo deviceSchemaInfo : tree.getAllDevices()) {
devicePath = deviceSchemaInfo.getDevicePath();
if (deviceSchemaInfo.getTemplateId() != NON_TEMPLATE) {
deviceUsingTemplateSchemaCache.put(
devicePath, tree.getBelongedDatabase(devicePath), deviceSchemaInfo.getTemplateId());
} else {
for (MeasurementPath measurementPath : deviceSchemaInfo.getMeasurementSchemaPathList()) {
timeSeriesSchemaCache.putSingleMeasurementPath(
tree.getBelongedDatabase(devicePath), measurementPath);
}
}
}
}
public TimeValuePair getLastCache(PartialPath seriesPath) {
return timeSeriesSchemaCache.getLastCache(seriesPath);
}
public void invalidateLastCache(PartialPath path) {
if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
return;
}
takeReadLock();
try {
timeSeriesSchemaCache.invalidateLastCache(path);
} finally {
releaseReadLock();
}
}
public void invalidateLastCacheInDataRegion(String database) {
if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
return;
}
takeReadLock();
try {
timeSeriesSchemaCache.invalidateDataRegionLastCache(database);
} finally {
releaseReadLock();
}
}
/** get SchemaCacheEntry and update last cache */
@TestOnly
public void updateLastCache(
PartialPath devicePath,
String measurement,
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
Long latestFlushedTime) {
timeSeriesSchemaCache.updateLastCache(
devicePath, measurement, timeValuePair, highPriorityUpdate, latestFlushedTime);
}
public void updateLastCache(
String database,
PartialPath devicePath,
String[] measurements,
MeasurementSchema[] measurementSchemas,
boolean isAligned,
IntFunction<TimeValuePair> timeValuePairProvider,
IntPredicate shouldUpdateProvider,
boolean highPriorityUpdate,
Long latestFlushedTime) {
takeReadLock();
try {
timeSeriesSchemaCache.updateLastCache(
database,
devicePath,
measurements,
measurementSchemas,
isAligned,
timeValuePairProvider,
shouldUpdateProvider,
highPriorityUpdate,
latestFlushedTime);
} finally {
releaseReadLock();
}
}
public void updateLastCacheWithoutLock(
String database,
PartialPath devicePath,
String[] measurements,
MeasurementSchema[] measurementSchemas,
boolean isAligned,
IntFunction<TimeValuePair> timeValuePairProvider,
IntPredicate shouldUpdateProvider,
boolean highPriorityUpdate,
Long latestFlushedTime) {
timeSeriesSchemaCache.updateLastCache(
database,
devicePath,
measurements,
measurementSchemas,
isAligned,
timeValuePairProvider,
shouldUpdateProvider,
highPriorityUpdate,
latestFlushedTime);
}
/**
* get or create SchemaCacheEntry and update last cache, only support non-aligned sensor or
* aligned sensor without only one sub sensor
*/
public void updateLastCache(
String storageGroup,
MeasurementPath measurementPath,
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
Long latestFlushedTime) {
takeReadLock();
try {
timeSeriesSchemaCache.updateLastCache(
storageGroup, measurementPath, timeValuePair, highPriorityUpdate, latestFlushedTime);
} finally {
releaseReadLock();
}
}
public void invalidate(String database) {
deviceUsingTemplateSchemaCache.invalidateCache(database);
timeSeriesSchemaCache.invalidate(database);
}
public void invalidate(List<PartialPath> partialPathList) {
boolean doPrecise = true;
for (PartialPath partialPath : partialPathList) {
if (partialPath.getDevicePath().hasWildcard()) {
doPrecise = false;
break;
}
}
if (doPrecise) {
deviceUsingTemplateSchemaCache.invalidateCache(partialPathList);
timeSeriesSchemaCache.invalidate(partialPathList);
} else {
invalidateAll();
}
}
public void invalidateAll() {
deviceUsingTemplateSchemaCache.invalidateCache();
timeSeriesSchemaCache.invalidateAll();
}
public void cleanUp() {
deviceUsingTemplateSchemaCache.invalidateCache();
timeSeriesSchemaCache.invalidateAll();
}
}