blob: 1ea59da13554c659ddfa50624438214824810952 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputation;
import org.apache.iotdb.db.schemaengine.template.ITemplateManager;
import org.apache.iotdb.db.schemaengine.template.Template;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Weigher;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class DeviceUsingTemplateSchemaCache {
private static final Logger logger =
LoggerFactory.getLogger(DeviceUsingTemplateSchemaCache.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final Cache<PartialPath, DeviceCacheEntry> cache;
private final ITemplateManager templateManager;
DeviceUsingTemplateSchemaCache(ITemplateManager templateManager) {
this.templateManager = templateManager;
// TODO proprietary config parameter expected
cache =
Caffeine.newBuilder()
.maximumWeight(config.getAllocateMemoryForSchemaCache())
.weigher(
(Weigher<PartialPath, DeviceCacheEntry>)
(key, val) -> (PartialPath.estimateSize(key) + 32))
.recordStats()
.build();
}
public long getHitCount() {
return cache.stats().hitCount();
}
public long getRequestCount() {
return cache.stats().requestCount();
}
public ClusterSchemaTree get(PartialPath fullPath) {
DeviceCacheEntry deviceCacheEntry = cache.getIfPresent(fullPath.getDevicePath());
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
if (deviceCacheEntry != null) {
Template template = templateManager.getTemplate(deviceCacheEntry.getTemplateId());
if (template.getSchema(fullPath.getMeasurement()) != null) {
schemaTree.appendTemplateDevice(
fullPath.getDevicePath(), template.isDirectAligned(), template.getId(), template);
schemaTree.setDatabases(Collections.singleton(deviceCacheEntry.getDatabase()));
}
}
return schemaTree;
}
public ClusterSchemaTree getMatchedSchemaWithTemplate(PartialPath devicePath) {
DeviceCacheEntry deviceCacheEntry = cache.getIfPresent(devicePath);
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
if (deviceCacheEntry != null) {
Template template = templateManager.getTemplate(deviceCacheEntry.getTemplateId());
schemaTree.appendTemplateDevice(
devicePath, template.isDirectAligned(), template.getId(), template);
schemaTree.setDatabases(Collections.singleton(deviceCacheEntry.getDatabase()));
}
return schemaTree;
}
/**
* CONFORM indicates that the provided devicePath had been cached as a template activated path,
* ensuring that the alignment of the device, as well as the name and schema of every measurement
* are consistent with the cache.
*
* @param computation
* @return true if conform to template cache, which means no need to fetch or create anymore
*/
public List<Integer> compute(ISchemaComputation computation) {
List<Integer> indexOfMissingMeasurements = new ArrayList<>();
PartialPath devicePath = computation.getDevicePath();
String[] measurements = computation.getMeasurements();
DeviceCacheEntry deviceCacheEntry = cache.getIfPresent(devicePath);
if (deviceCacheEntry == null) {
for (int i = 0; i < measurements.length; i++) {
indexOfMissingMeasurements.add(i);
}
return indexOfMissingMeasurements;
}
computation.computeDevice(
templateManager.getTemplate(deviceCacheEntry.getTemplateId()).isDirectAligned());
Map<String, IMeasurementSchema> templateSchema =
templateManager.getTemplate(deviceCacheEntry.getTemplateId()).getSchemaMap();
for (int i = 0; i < measurements.length; i++) {
if (!templateSchema.containsKey(measurements[i])) {
indexOfMissingMeasurements.add(i);
continue;
}
IMeasurementSchema schema = templateSchema.get(measurements[i]);
computation.computeMeasurement(
i,
new IMeasurementSchemaInfo() {
@Override
public String getName() {
return schema.getMeasurementId();
}
@Override
public IMeasurementSchema getSchema() {
if (isLogicalView()) {
return new LogicalViewSchema(
schema.getMeasurementId(), ((LogicalViewSchema) schema).getExpression());
} else {
return this.getSchemaAsMeasurementSchema();
}
}
@Override
public MeasurementSchema getSchemaAsMeasurementSchema() {
return new MeasurementSchema(
schema.getMeasurementId(),
schema.getType(),
schema.getEncodingType(),
schema.getCompressor());
}
@Override
public LogicalViewSchema getSchemaAsLogicalViewSchema() {
throw new RuntimeException(
new UnsupportedOperationException(
"Function getSchemaAsLogicalViewSchema is not supported in DeviceUsingTemplateSchemaCache."));
}
@Override
public Map<String, String> getTagMap() {
return null;
}
@Override
public String getAlias() {
return null;
}
@Override
public boolean isLogicalView() {
return schema.isLogicalView();
}
});
}
return indexOfMissingMeasurements;
}
public void put(PartialPath path, String database, Integer id) {
cache.put(path, new DeviceCacheEntry(database, id));
}
public void invalidateCache() {
cache.invalidateAll();
}
public void invalidateCache(String database) {
for (PartialPath key : cache.asMap().keySet()) {
if (key.startsWith(database)) {
cache.invalidate(key);
}
}
}
public void invalidateCache(List<PartialPath> partialPathList) {
for (PartialPath path : partialPathList) {
for (PartialPath key : cache.asMap().keySet()) {
if (key.startsWith(path.getDevice())) {
cache.invalidate(key);
}
}
}
}
private static class DeviceCacheEntry {
private final String database;
private final int templateId;
private DeviceCacheEntry(String database, int templateId) {
this.database = database.intern();
this.templateId = templateId;
}
public int getTemplateId() {
return templateId;
}
public String getDatabase() {
return database;
}
}
}