| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.queryengine.plan.analyze.schema; |
| |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.commons.path.PathPatternTree; |
| import org.apache.iotdb.db.conf.IoTDBConfig; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.queryengine.common.MPPQueryContext; |
| import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree; |
| import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo; |
| import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; |
| import org.apache.iotdb.db.queryengine.plan.Coordinator; |
| import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache; |
| import org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager; |
| import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType; |
| import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager; |
| import org.apache.iotdb.db.schemaengine.template.ITemplateManager; |
| import org.apache.iotdb.db.schemaengine.template.Template; |
| |
| import org.apache.tsfile.enums.TSDataType; |
| import org.apache.tsfile.file.metadata.enums.CompressionType; |
| import org.apache.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.tsfile.utils.Pair; |
| import org.apache.tsfile.write.schema.MeasurementSchema; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| |
| public class ClusterSchemaFetcher implements ISchemaFetcher { |
| |
| private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); |
| |
| private final Coordinator coordinator = Coordinator.getInstance(); |
| |
| // DataNodeSchemaCache's rwlock is used to block deletion when we insert the same timeseries |
| // and will be released after coordinator's execute(). |
| private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance(); |
| private final ITemplateManager templateManager = ClusterTemplateManager.getInstance(); |
| |
| private final AutoCreateSchemaExecutor autoCreateSchemaExecutor = |
| new AutoCreateSchemaExecutor(coordinator, templateManager, this); |
| private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor = |
| new ClusterSchemaFetchExecutor(coordinator, templateManager, this, schemaCache::put); |
| |
| private final NormalSchemaFetcher normalSchemaFetcher = |
| new NormalSchemaFetcher(schemaCache, autoCreateSchemaExecutor, clusterSchemaFetchExecutor); |
| private final TemplateSchemaFetcher templateSchemaFetcher = |
| new TemplateSchemaFetcher(schemaCache, autoCreateSchemaExecutor, clusterSchemaFetchExecutor); |
| |
| private static final class ClusterSchemaFetcherHolder { |
| private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher(); |
| |
| private ClusterSchemaFetcherHolder() {} |
| } |
| |
| public static ClusterSchemaFetcher getInstance() { |
| return ClusterSchemaFetcherHolder.INSTANCE; |
| } |
| |
| private ClusterSchemaFetcher() {} |
| |
| @Override |
| public ClusterSchemaTree fetchSchema( |
| PathPatternTree patternTree, boolean withTemplate, MPPQueryContext context) { |
| patternTree.constructTree(); |
| List<PartialPath> pathPatternList = patternTree.getAllPathPatterns(); |
| List<PartialPath> explicitPathList = new ArrayList<>(); |
| Set<PartialPath> explicitDevicePatternList = new HashSet<>(); |
| int explicitDevicePatternCount = 0; |
| for (PartialPath pattern : pathPatternList) { |
| if (!pattern.hasWildcard()) { |
| explicitPathList.add(pattern); |
| } else if (pattern.hasExplicitDevice() |
| && templateManager.checkTemplateSetInfo(pattern) != null) { |
| explicitDevicePatternList.add(pattern.getDevicePath()); |
| explicitDevicePatternCount++; |
| } |
| } |
| |
| if (explicitPathList.size() + explicitDevicePatternCount < pathPatternList.size()) { |
| return clusterSchemaFetchExecutor.fetchSchemaOfFuzzyMatch( |
| patternTree, false, withTemplate, context); |
| } |
| |
| // The schema cache R/W and fetch operation must be locked together thus the cache clean |
| // operation executed by delete timeseries will be effective. |
| schemaCache.takeReadLock(); |
| try { |
| ClusterSchemaTree schemaTree = new ClusterSchemaTree(); |
| boolean isAllCached = true; |
| |
| ClusterSchemaTree cachedSchema; |
| Set<String> storageGroupSet = new HashSet<>(); |
| if (!explicitDevicePatternList.isEmpty()) { |
| for (PartialPath explicitDevicePattern : explicitDevicePatternList) { |
| cachedSchema = schemaCache.getMatchedSchemaWithTemplate(explicitDevicePattern); |
| if (cachedSchema.isEmpty()) { |
| isAllCached = false; |
| break; |
| } else { |
| schemaTree.mergeSchemaTree(cachedSchema); |
| storageGroupSet.addAll(cachedSchema.getDatabases()); |
| } |
| } |
| } |
| |
| if (isAllCached && !explicitPathList.isEmpty()) { |
| for (PartialPath fullPath : explicitPathList) { |
| cachedSchema = schemaCache.getMatchedSchemaWithoutTemplate(fullPath); |
| if (cachedSchema.isEmpty()) { |
| isAllCached = false; |
| break; |
| } else { |
| schemaTree.mergeSchemaTree(cachedSchema); |
| storageGroupSet.addAll(cachedSchema.getDatabases()); |
| } |
| } |
| } |
| |
| if (isAllCached) { |
| schemaTree.setDatabases(storageGroupSet); |
| return schemaTree; |
| } |
| |
| return clusterSchemaFetchExecutor.fetchSchemaOfPreciseMatchOrPreciseDeviceUsingTemplate( |
| pathPatternList, patternTree, withTemplate, context); |
| |
| } finally { |
| schemaCache.releaseReadLock(); |
| } |
| } |
| |
| @Override |
| public ClusterSchemaTree fetchSchemaWithTags( |
| PathPatternTree patternTree, boolean withTemplate, MPPQueryContext context) { |
| patternTree.constructTree(); |
| return clusterSchemaFetchExecutor.fetchSchemaOfFuzzyMatch( |
| patternTree, true, withTemplate, context); |
| } |
| |
| @Override |
| public void fetchAndComputeSchemaWithAutoCreate( |
| ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation, |
| MPPQueryContext context) { |
| // The schema cache R/W and fetch operation must be locked together thus the cache clean |
| // operation executed by delete timeseries will be effective. |
| DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.VALIDATE_VS_DELETION); |
| context.addAcquiredLockNum(SchemaLockType.VALIDATE_VS_DELETION); |
| schemaCache.takeReadLock(); |
| try { |
| Pair<Template, PartialPath> templateSetInfo = |
| templateManager.checkTemplateSetInfo(schemaComputationWithAutoCreation.getDevicePath()); |
| List<Integer> indexOfMissingMeasurements; |
| if (templateSetInfo == null) { |
| // normal timeseries |
| indexOfMissingMeasurements = |
| normalSchemaFetcher.processNormalTimeSeries(schemaComputationWithAutoCreation, context); |
| } else { |
| // template timeseries |
| indexOfMissingMeasurements = |
| templateSchemaFetcher.processTemplateTimeSeries( |
| templateSetInfo, schemaComputationWithAutoCreation, context); |
| } |
| |
| // all schema has been taken and processed |
| if (indexOfMissingMeasurements.isEmpty()) { |
| return; |
| } |
| |
| // offer null for the rest missing schema processing |
| for (int index : indexOfMissingMeasurements) { |
| schemaComputationWithAutoCreation.computeMeasurement(index, null); |
| } |
| } finally { |
| schemaCache.releaseReadLock(); |
| } |
| } |
| |
| @Override |
| public void fetchAndComputeSchemaWithAutoCreate( |
| List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList, |
| MPPQueryContext context) { |
| // The schema cache R/W and fetch operation must be locked together thus the cache clean |
| // operation executed by delete timeseries will be effective. |
| DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.VALIDATE_VS_DELETION); |
| context.addAcquiredLockNum(SchemaLockType.VALIDATE_VS_DELETION); |
| schemaCache.takeReadLock(); |
| try { |
| |
| List<ISchemaComputationWithAutoCreation> normalTimeSeriesRequestList = new ArrayList<>(); |
| List<ISchemaComputationWithAutoCreation> templateTimeSeriesRequestList = new ArrayList<>(); |
| List<Pair<Template, PartialPath>> templateSetInfoList = new ArrayList<>(); |
| Pair<Template, PartialPath> templateSetInfo; |
| for (ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation : |
| schemaComputationWithAutoCreationList) { |
| templateSetInfo = |
| templateManager.checkTemplateSetInfo(schemaComputationWithAutoCreation.getDevicePath()); |
| if (templateSetInfo == null) { |
| normalTimeSeriesRequestList.add(schemaComputationWithAutoCreation); |
| } else { |
| templateTimeSeriesRequestList.add(schemaComputationWithAutoCreation); |
| templateSetInfoList.add(templateSetInfo); |
| } |
| } |
| |
| if (!normalTimeSeriesRequestList.isEmpty()) { |
| normalSchemaFetcher.processNormalTimeSeries(normalTimeSeriesRequestList, context); |
| } |
| if (!templateTimeSeriesRequestList.isEmpty()) { |
| templateSchemaFetcher.processTemplateTimeSeries( |
| templateSetInfoList, templateTimeSeriesRequestList, context); |
| } |
| } finally { |
| schemaCache.releaseReadLock(); |
| } |
| } |
| |
| @Override |
| public ISchemaTree fetchSchemaListWithAutoCreate( |
| List<PartialPath> devicePathList, |
| List<String[]> measurementsList, |
| List<TSDataType[]> tsDataTypesList, |
| List<TSEncoding[]> encodingsList, |
| List<CompressionType[]> compressionTypesList, |
| List<Boolean> isAlignedList, |
| MPPQueryContext context) { |
| // The schema cache R/W and fetch operation must be locked together thus the cache clean |
| // operation executed by delete timeseries will be effective. |
| DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.VALIDATE_VS_DELETION); |
| context.addAcquiredLockNum(SchemaLockType.VALIDATE_VS_DELETION); |
| schemaCache.takeReadLock(); |
| try { |
| ClusterSchemaTree schemaTree = new ClusterSchemaTree(); |
| List<List<Integer>> indexOfMissingMeasurementsList = new ArrayList<>(devicePathList.size()); |
| List<Integer> indexOfDevicesWithMissingMeasurements = new ArrayList<>(); |
| for (int i = 0; i < devicePathList.size(); i++) { |
| schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i), measurementsList.get(i))); |
| List<Integer> indexOfMissingMeasurements = |
| checkMissingMeasurements(schemaTree, devicePathList.get(i), measurementsList.get(i)); |
| if (!indexOfMissingMeasurements.isEmpty()) { |
| indexOfDevicesWithMissingMeasurements.add(i); |
| indexOfMissingMeasurementsList.add(indexOfMissingMeasurements); |
| } |
| } |
| |
| // all schema can be taken from cache |
| if (indexOfDevicesWithMissingMeasurements.isEmpty()) { |
| return schemaTree; |
| } |
| |
| // try fetch the missing schema from remote and cache fetched schema |
| ClusterSchemaTree remoteSchemaTree = |
| clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices( |
| devicePathList, |
| measurementsList, |
| indexOfDevicesWithMissingMeasurements, |
| indexOfMissingMeasurementsList, |
| context); |
| if (!remoteSchemaTree.isEmpty()) { |
| schemaTree.mergeSchemaTree(remoteSchemaTree); |
| } |
| |
| if (!config.isAutoCreateSchemaEnabled()) { |
| return schemaTree; |
| } |
| |
| // auto create the still missing schema and merge them into schemaTree |
| List<Integer> indexOfDevicesNeedAutoCreateSchema = new ArrayList<>(); |
| List<List<Integer>> indexOfMeasurementsNeedAutoCreate = new ArrayList<>(); |
| List<Integer> indexOfMissingMeasurements; |
| int deviceIndex; |
| for (int i = 0, size = indexOfDevicesWithMissingMeasurements.size(); i < size; i++) { |
| deviceIndex = indexOfDevicesWithMissingMeasurements.get(i); |
| indexOfMissingMeasurements = indexOfMissingMeasurementsList.get(i); |
| indexOfMissingMeasurements = |
| checkMissingMeasurementsAfterSchemaFetch( |
| schemaTree, |
| devicePathList.get(deviceIndex), |
| indexOfMissingMeasurements, |
| measurementsList.get(deviceIndex)); |
| if (!indexOfMissingMeasurements.isEmpty()) { |
| indexOfDevicesNeedAutoCreateSchema.add(deviceIndex); |
| indexOfMeasurementsNeedAutoCreate.add(indexOfMissingMeasurements); |
| } |
| } |
| |
| if (!indexOfDevicesNeedAutoCreateSchema.isEmpty()) { |
| autoCreateSchemaExecutor.autoCreateMissingMeasurements( |
| schemaTree, |
| devicePathList, |
| indexOfDevicesNeedAutoCreateSchema, |
| indexOfMeasurementsNeedAutoCreate, |
| measurementsList, |
| tsDataTypesList, |
| encodingsList, |
| compressionTypesList, |
| isAlignedList, |
| context); |
| } |
| |
| return schemaTree; |
| } finally { |
| schemaCache.releaseReadLock(); |
| } |
| } |
| |
| @Override |
| public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath devicePath) { |
| return templateManager.checkTemplateSetInfo(devicePath); |
| } |
| |
| @Override |
| public Pair<Template, PartialPath> checkTemplateSetAndPreSetInfo( |
| PartialPath timeSeriesPath, String alias) { |
| return templateManager.checkTemplateSetAndPreSetInfo(timeSeriesPath, alias); |
| } |
| |
| @Override |
| public Map<Integer, Template> checkAllRelatedTemplate(PartialPath pathPattern) { |
| return templateManager.checkAllRelatedTemplate(pathPattern); |
| } |
| |
| @Override |
| public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String templateName) { |
| return templateManager.getAllPathsSetTemplate(templateName); |
| } |
| |
| private List<Integer> checkMissingMeasurements( |
| ISchemaTree schemaTree, PartialPath devicePath, String[] measurements) { |
| DeviceSchemaInfo deviceSchemaInfo = |
| schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements)); |
| if (deviceSchemaInfo == null) { |
| return IntStream.range(0, measurements.length).boxed().collect(Collectors.toList()); |
| } |
| |
| List<Integer> indexOfMissingMeasurements = new ArrayList<>(); |
| List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList(); |
| for (int i = 0; i < measurements.length; i++) { |
| if (schemaList.get(i) == null) { |
| indexOfMissingMeasurements.add(i); |
| } |
| } |
| |
| return indexOfMissingMeasurements; |
| } |
| |
| private List<Integer> checkMissingMeasurementsAfterSchemaFetch( |
| ClusterSchemaTree schemaTree, |
| PartialPath devicePath, |
| List<Integer> indexOfTargetMeasurements, |
| String[] measurements) { |
| DeviceSchemaInfo deviceSchemaInfo = |
| schemaTree.searchDeviceSchemaInfo( |
| devicePath, |
| indexOfTargetMeasurements.stream() |
| .map(index -> measurements[index]) |
| .collect(Collectors.toList())); |
| if (deviceSchemaInfo == null) { |
| return indexOfTargetMeasurements; |
| } |
| |
| List<Integer> indexOfMissingMeasurements = new ArrayList<>(); |
| List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList(); |
| for (int i = 0, size = schemaList.size(); i < size; i++) { |
| if (schemaList.get(i) == null) { |
| indexOfMissingMeasurements.add(indexOfTargetMeasurements.get(i)); |
| } |
| } |
| |
| return indexOfMissingMeasurements; |
| } |
| } |