blob: 3557646ab82d9039aa8958332300d34e5854fdf4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.buffer;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.TimeSeriesMetadataCacheMetrics;
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.BloomFilter;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Weigher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.READ_TIMESERIES_METADATA_CACHE;
import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.READ_TIMESERIES_METADATA_FILE;
import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.sizeOfCharArray;
/**
* This class is used to cache <code>TimeSeriesMetadata</code> in IoTDB. The caching strategy is
* LRU.
*/
@SuppressWarnings("squid:S6548")
public class TimeSeriesMetadataCache {
private static final Logger logger = LoggerFactory.getLogger(TimeSeriesMetadataCache.class);
private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE =
config.getAllocateMemoryForTimeSeriesMetaDataCache();
private static final boolean CACHE_ENABLE = config.isMetaDataCacheEnable();
private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET =
SeriesScanCostMetricSet.getInstance();
private final Cache<TimeSeriesMetadataCacheKey, TimeseriesMetadata> lruCache;
private final AtomicLong entryAverageSize = new AtomicLong(0);
private final Map<String, WeakReference<String>> devices =
Collections.synchronizedMap(new WeakHashMap<>());
private static final String SEPARATOR = "$";
private TimeSeriesMetadataCache() {
if (CACHE_ENABLE) {
logger.info(
"TimeSeriesMetadataCache size = {}", MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE);
}
lruCache =
Caffeine.newBuilder()
.maximumWeight(MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE)
.weigher(
(Weigher<TimeSeriesMetadataCacheKey, TimeseriesMetadata>)
(key, value) ->
(int) (key.getRetainedSizeInBytes() + value.getRetainedSizeInBytes()))
.recordStats()
.build();
// add metrics
MetricService.getInstance().addMetricSet(new TimeSeriesMetadataCacheMetrics(this));
}
public static TimeSeriesMetadataCache getInstance() {
return TimeSeriesMetadataCache.TimeSeriesMetadataCacheHolder.INSTANCE;
}
@SuppressWarnings({"squid:S1860", "squid:S6541", "squid:S3776"}) // Suppress synchronize warning
public TimeseriesMetadata get(
String filePath,
TimeSeriesMetadataCacheKey key,
Set<String> allSensors,
boolean ignoreNotExists,
boolean debug)
throws IOException {
long startTime = System.nanoTime();
boolean cacheHit = true;
try {
if (!CACHE_ENABLE) {
cacheHit = false;
// bloom filter part
TsFileSequenceReader reader = FileReaderManager.getInstance().get(filePath, true);
BloomFilter bloomFilter = reader.readBloomFilter();
if (bloomFilter != null
&& !bloomFilter.contains(
((PlainDeviceID) key.device).toStringID()
+ IoTDBConstant.PATH_SEPARATOR
+ key.measurement)) {
return null;
}
TimeseriesMetadata timeseriesMetadata =
reader.readTimeseriesMetadata(key.device, key.measurement, ignoreNotExists);
return (timeseriesMetadata == null || timeseriesMetadata.getStatistics().getCount() == 0)
? null
: timeseriesMetadata;
}
TimeseriesMetadata timeseriesMetadata = lruCache.getIfPresent(key);
if (timeseriesMetadata == null) {
if (debug) {
DEBUG_LOGGER.info("Cache miss: {}.{} in file: {}", key.device, key.measurement, filePath);
DEBUG_LOGGER.info("Device: {}, all sensors: {}", key.device, allSensors);
}
// allow for the parallelism of different devices
synchronized (
devices.computeIfAbsent(
((PlainDeviceID) key.device).toStringID() + SEPARATOR + filePath,
WeakReference::new)) {
// double check
timeseriesMetadata = lruCache.getIfPresent(key);
if (timeseriesMetadata == null) {
cacheHit = false;
// bloom filter part
BloomFilter bloomFilter =
BloomFilterCache.getInstance()
.get(
new BloomFilterCache.BloomFilterCacheKey(
filePath,
key.regionId,
key.timePartitionId,
key.tsFileVersion,
key.compactionVersion),
debug);
if (bloomFilter != null
&& !bloomFilter.contains(
((PlainDeviceID) key.device).toStringID()
+ TsFileConstant.PATH_SEPARATOR
+ key.measurement)) {
if (debug) {
DEBUG_LOGGER.info("TimeSeries meta data {} is filter by bloomFilter!", key);
}
return null;
}
TsFileSequenceReader reader = FileReaderManager.getInstance().get(filePath, true);
List<TimeseriesMetadata> timeSeriesMetadataList =
reader.readTimeseriesMetadata(key.device, key.measurement, allSensors);
// put TimeSeriesMetadata of all sensors used in this read into cache
for (TimeseriesMetadata metadata : timeSeriesMetadataList) {
TimeSeriesMetadataCacheKey k =
new TimeSeriesMetadataCacheKey(
key.regionId,
key.timePartitionId,
key.tsFileVersion,
key.compactionVersion,
key.device,
metadata.getMeasurementId());
if (metadata.getStatistics().getCount() != 0) {
lruCache.put(k, metadata);
}
if (metadata.getMeasurementId().equals(key.measurement)) {
timeseriesMetadata = metadata.getStatistics().getCount() == 0 ? null : metadata;
}
}
}
}
}
if (timeseriesMetadata == null) {
if (debug) {
DEBUG_LOGGER.info("The file doesn't have this time series {}.", key);
}
return null;
} else {
if (debug) {
DEBUG_LOGGER.info(
"Get timeseries: {}.{} metadata in file: {} from cache: {}.",
key.device,
key.measurement,
filePath,
timeseriesMetadata);
}
return new TimeseriesMetadata(timeseriesMetadata);
}
} finally {
SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
cacheHit ? READ_TIMESERIES_METADATA_CACHE : READ_TIMESERIES_METADATA_FILE,
System.nanoTime() - startTime);
}
}
public double calculateTimeSeriesMetadataHitRatio() {
return lruCache.stats().hitRate();
}
public long getEvictionCount() {
return lruCache.stats().evictionCount();
}
public long getMaxMemory() {
return MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE;
}
public double getAverageLoadPenalty() {
return lruCache.stats().averageLoadPenalty();
}
public long getAverageSize() {
return entryAverageSize.get();
}
public double calculateBloomFilterHitRatio() {
return BloomFilterCache.getInstance().calculateBloomFilterHitRatio();
}
/** clear LRUCache. */
public void clear() {
lruCache.invalidateAll();
lruCache.cleanUp();
}
public void remove(TimeSeriesMetadataCacheKey key) {
lruCache.invalidate(key);
}
@TestOnly
public boolean isEmpty() {
return lruCache.asMap().isEmpty();
}
public static class TimeSeriesMetadataCacheKey {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TimeSeriesMetadataCacheKey.class)
+ RamUsageEstimator.shallowSizeOfInstance(String.class);
private final int regionId;
private final long timePartitionId;
private final long tsFileVersion;
// high 32 bit is compaction level, low 32 bit is merge count
private final long compactionVersion;
private final IDeviceID device;
private final String measurement;
public TimeSeriesMetadataCacheKey(TsFileID tsFileID, IDeviceID device, String measurement) {
this.regionId = tsFileID.regionId;
this.timePartitionId = tsFileID.timePartitionId;
this.tsFileVersion = tsFileID.fileVersion;
this.compactionVersion = tsFileID.compactionVersion;
this.device = device;
this.measurement = measurement;
}
public TimeSeriesMetadataCacheKey(
int regionId,
long timePartitionId,
long tsFileVersion,
long compactionVersion,
IDeviceID device,
String measurement) {
this.regionId = regionId;
this.timePartitionId = timePartitionId;
this.tsFileVersion = tsFileVersion;
this.compactionVersion = compactionVersion;
this.device = device;
this.measurement = measurement;
}
public long getRetainedSizeInBytes() {
return INSTANCE_SIZE + device.ramBytesUsed() + sizeOfCharArray(measurement.length());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TimeSeriesMetadataCacheKey that = (TimeSeriesMetadataCacheKey) o;
return regionId == that.regionId
&& timePartitionId == that.timePartitionId
&& tsFileVersion == that.tsFileVersion
&& compactionVersion == that.compactionVersion
&& Objects.equals(device, that.device)
&& Objects.equals(measurement, that.measurement);
}
@Override
public int hashCode() {
return Objects.hash(
regionId, timePartitionId, tsFileVersion, compactionVersion, device, measurement);
}
@Override
public String toString() {
return "TimeSeriesMetadataCacheKey{"
+ "regionId="
+ regionId
+ ", timePartitionId="
+ timePartitionId
+ ", tsFileVersion="
+ tsFileVersion
+ ", compactionVersion="
+ compactionVersion
+ ", device='"
+ ((PlainDeviceID) device).toStringID()
+ '\''
+ ", measurement='"
+ measurement
+ '\''
+ '}';
}
}
/** singleton pattern. */
private static class TimeSeriesMetadataCacheHolder {
private static final TimeSeriesMetadataCache INSTANCE = new TimeSeriesMetadataCache();
}
}