| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.query.control; |
| |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.db.engine.StorageEngine; |
| import org.apache.iotdb.db.engine.querycontext.QueryDataSource; |
| import org.apache.iotdb.db.engine.storagegroup.DataRegion; |
| import org.apache.iotdb.db.exception.StorageEngineException; |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.metadata.idtable.IDTable; |
| import org.apache.iotdb.db.query.context.QueryContext; |
| import org.apache.iotdb.db.query.externalsort.serialize.IExternalSortFileDeserializer; |
| import org.apache.iotdb.db.service.TemporaryQueryDataFileService; |
| import org.apache.iotdb.db.utils.QueryUtils; |
| import org.apache.iotdb.tsfile.read.filter.basic.Filter; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.stream.Collectors; |
| |
| /** |
| * QueryResourceManager manages resource (file streams) used by each query job, and assign Ids to |
| * the jobs. During the life cycle of a query, the following methods must be called in strict order: |
| * |
| * <p>1. assignQueryId - get an Id for the new query. |
| * |
| * <p>2. getQueryDataSource - open files for the job or reuse existing readers. |
| * |
| * <p>3. endQueryForGivenJob - release the resource used by this job. |
| */ |
| public class QueryResourceManager { |
| |
| private final AtomicLong queryIdAtom = new AtomicLong(); |
| private final QueryFileManager filePathsManager; |
| |
| /** |
| * Record temporary files used for external sorting. |
| * |
| * <p>Key: query job id. Value: temporary file list used for external sorting. |
| */ |
| private final Map<Long, List<IExternalSortFileDeserializer>> externalSortFileMap; |
| |
| /** |
| * Record QueryDataSource used in queries |
| * |
| * <p>Key: query job id. Value: QueryDataSource corresponding to each virtual storage group. |
| */ |
| private final Map<Long, Map<String, QueryDataSource>> cachedQueryDataSourcesMap; |
| |
| private QueryResourceManager() { |
| filePathsManager = new QueryFileManager(); |
| externalSortFileMap = new ConcurrentHashMap<>(); |
| cachedQueryDataSourcesMap = new ConcurrentHashMap<>(); |
| } |
| |
| public static QueryResourceManager getInstance() { |
| return QueryTokenManagerHelper.INSTANCE; |
| } |
| |
| /** Register a new query. When a query request is created firstly, this method must be invoked. */ |
| public long assignQueryId(boolean isDataQuery) { |
| long queryId = queryIdAtom.incrementAndGet(); |
| if (isDataQuery) { |
| filePathsManager.addQueryId(queryId); |
| } |
| return queryId; |
| } |
| |
| /** |
| * Register a query id for compaction. The name of the compaction thread is |
| * 'pool-x-IoTDB-Compaction-xx', xx in which is usually an integer from 0 to |
| * MAXCOMPACTION_THREAD_NUM. We use the following rules to define query id for compaction: <br> |
| * queryId = xx + Long.MIN_VALUE |
| */ |
| public long assignCompactionQueryId() { |
| long threadNum = Long.parseLong((Thread.currentThread().getName().split("-"))[4]); |
| long queryId = Long.MIN_VALUE + threadNum; |
| filePathsManager.addQueryId(queryId); |
| return queryId; |
| } |
| |
| /** |
| * register temporary file generated by external sort for resource release. |
| * |
| * @param queryId query job id |
| * @param deserializer deserializer of temporary file in external sort. |
| */ |
| public void registerTempExternalSortFile( |
| long queryId, IExternalSortFileDeserializer deserializer) { |
| externalSortFileMap.computeIfAbsent(queryId, x -> new ArrayList<>()).add(deserializer); |
| } |
| |
| /** |
| * The method is called in mergeLock() when executing query. This method will get all the |
| * QueryDataSource needed for this query and put them in the cachedQueryDataSourcesMap. |
| * |
| * @param processorToSeriesMap Key: processor of the virtual storage group. Value: selected series |
| * under the virtual storage group |
| */ |
| public void initQueryDataSourceCache( |
| Map<DataRegion, List<PartialPath>> processorToSeriesMap, |
| QueryContext context, |
| Filter timeFilter) |
| throws QueryProcessException { |
| for (Map.Entry<DataRegion, List<PartialPath>> entry : processorToSeriesMap.entrySet()) { |
| DataRegion processor = entry.getKey(); |
| List<PartialPath> pathList = |
| entry.getValue().stream().map(IDTable::translateQueryPath).collect(Collectors.toList()); |
| |
| // when all the selected series are under the same device, the QueryDataSource will be |
| // filtered according to timeIndex |
| Set<String> selectedDeviceIdSet = |
| pathList.stream().map(PartialPath::getDevice).collect(Collectors.toSet()); |
| |
| long queryId = context.getQueryId(); |
| String storageGroupPath = processor.getStorageGroupPath(); |
| |
| QueryDataSource cachedQueryDataSource = |
| processor.query( |
| pathList, |
| selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null, |
| context, |
| filePathsManager, |
| timeFilter); |
| cachedQueryDataSourcesMap |
| .computeIfAbsent(queryId, k -> new HashMap<>()) |
| .put(storageGroupPath, cachedQueryDataSource); |
| } |
| } |
| |
| /** |
| * @param selectedPath MeasurementPath or AlignedPath, even if it contains only one sub sensor of |
| * an aligned device, it should be AlignedPath instead of MeasurementPath |
| */ |
| public QueryDataSource getQueryDataSource( |
| PartialPath selectedPath, QueryContext context, Filter timeFilter, boolean ascending) |
| throws StorageEngineException, QueryProcessException { |
| |
| long queryId = context.getQueryId(); |
| String storageGroupPath = StorageEngine.getInstance().getStorageGroupPath(selectedPath); |
| String deviceId = selectedPath.getDevice(); |
| |
| // get cached QueryDataSource |
| QueryDataSource cachedQueryDataSource; |
| if (cachedQueryDataSourcesMap.containsKey(queryId) |
| && cachedQueryDataSourcesMap.get(queryId).containsKey(storageGroupPath)) { |
| cachedQueryDataSource = cachedQueryDataSourcesMap.get(queryId).get(storageGroupPath); |
| } else { |
| // QueryDataSource is never cached in cluster mode |
| DataRegion processor = StorageEngine.getInstance().getProcessor(selectedPath.getDevicePath()); |
| PartialPath translatedPath = IDTable.translateQueryPath(selectedPath); |
| cachedQueryDataSource = |
| processor.query( |
| Collections.singletonList(translatedPath), |
| translatedPath.getDevice(), |
| context, |
| filePathsManager, |
| timeFilter); |
| } |
| |
| // construct QueryDataSource for selectedPath |
| QueryDataSource queryDataSource = |
| new QueryDataSource( |
| cachedQueryDataSource.getSeqResources(), cachedQueryDataSource.getUnseqResources()); |
| |
| queryDataSource.setDataTTL(cachedQueryDataSource.getDataTTL()); |
| |
| // calculate the read order of unseqResources |
| QueryUtils.fillOrderIndexes(queryDataSource, deviceId, ascending); |
| |
| return queryDataSource; |
| } |
| |
| /** |
| * Whenever the jdbc request is closed normally or abnormally, this method must be invoked. All |
| * query tokens created by this jdbc request must be cleared. |
| */ |
| // Suppress high Cognitive Complexity warning |
| public void endQuery(long queryId) throws StorageEngineException { |
| // close file stream of external sort files, and delete |
| if (externalSortFileMap.get(queryId) != null) { |
| for (IExternalSortFileDeserializer deserializer : externalSortFileMap.get(queryId)) { |
| try { |
| deserializer.close(); |
| } catch (IOException e) { |
| throw new StorageEngineException(e); |
| } |
| } |
| externalSortFileMap.remove(queryId); |
| } |
| |
| // remove usage of opened file paths of current thread |
| filePathsManager.removeUsedFilesForQuery(queryId); |
| |
| // close and delete UDF temp files |
| TemporaryQueryDataFileService.getInstance().deregister(queryId); |
| |
| // remove query info in QueryTimeManager |
| QueryTimeManager.getInstance().unRegisterQuery(queryId, true); |
| |
| // remove cached QueryDataSource |
| cachedQueryDataSourcesMap.remove(queryId); |
| } |
| |
| public void writeQueryFileInfo() { |
| filePathsManager.writeQueryFileInfo(); |
| } |
| |
| public QueryFileManager getQueryFileManager() { |
| return filePathsManager; |
| } |
| |
| private static class QueryTokenManagerHelper { |
| |
| private static final QueryResourceManager INSTANCE = new QueryResourceManager(); |
| |
| private QueryTokenManagerHelper() {} |
| } |
| } |