| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.storageengine.dataregion.tsfile; |
| |
| import org.apache.iotdb.commons.utils.TimePartitionUtils; |
| import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager; |
| import org.apache.iotdb.tsfile.read.filter.basic.Filter; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| public class TsFileManager { |
| private final String storageGroupName; |
| private String dataRegionId; |
| private final String storageGroupDir; |
| |
| /** Serialize queries, delete resource files, compaction cleanup files */ |
| private final ReadWriteLock resourceListLock = new ReentrantReadWriteLock(); |
| |
| private String writeLockHolder; |
| // time partition -> double linked list of tsfiles |
| private final TreeMap<Long, TsFileResourceList> sequenceFiles = new TreeMap<>(); |
| private final TreeMap<Long, TsFileResourceList> unsequenceFiles = new TreeMap<>(); |
| |
| private volatile boolean allowCompaction = true; |
| private final AtomicLong currentCompactionTaskSerialId = new AtomicLong(0); |
| |
| public TsFileManager(String storageGroupName, String dataRegionId, String storageGroupDir) { |
| this.storageGroupName = storageGroupName; |
| this.storageGroupDir = storageGroupDir; |
| this.dataRegionId = dataRegionId; |
| } |
| |
| public List<TsFileResource> getTsFileList(boolean sequence) { |
| return getTsFileList(sequence, null, null); |
| } |
| |
| /** |
| * @param sequence {@code true} for sequence, {@code false} for unsequence |
| * @param timePartitions {@code null} for all time partitions, empty for zero time partitions |
| */ |
| public List<TsFileResource> getTsFileList( |
| boolean sequence, List<Long> timePartitions, Filter timeFilter) { |
| // the iteration of ConcurrentSkipListMap is not concurrent secure |
| // so we must add read lock here |
| readLock(); |
| try { |
| List<TsFileResource> allResources = new ArrayList<>(); |
| Map<Long, TsFileResourceList> chosenMap = sequence ? sequenceFiles : unsequenceFiles; |
| if (timePartitions == null) { |
| for (Map.Entry<Long, TsFileResourceList> entry : chosenMap.entrySet()) { |
| if (TimePartitionUtils.satisfyTimePartition(timeFilter, entry.getKey())) { |
| allResources.addAll(entry.getValue().getArrayList()); |
| } |
| } |
| } else { |
| for (Long timePartitionId : timePartitions) { |
| TsFileResourceList tsFileResources = chosenMap.get(timePartitionId); |
| if (tsFileResources != null) { |
| allResources.addAll(tsFileResources.getArrayList()); |
| } |
| } |
| } |
| return allResources; |
| } finally { |
| readUnlock(); |
| } |
| } |
| |
| public List<TsFileResource> getTsFileListSnapshot(long timePartition, boolean sequence) { |
| readLock(); |
| try { |
| Map<Long, TsFileResourceList> chosenMap = sequence ? sequenceFiles : unsequenceFiles; |
| return new ArrayList<>(chosenMap.getOrDefault(timePartition, new TsFileResourceList())); |
| } finally { |
| readUnlock(); |
| } |
| } |
| |
| public List<TsFileResource> getTsFileList(boolean sequence, long startTime, long endTime) { |
| // the iteration of ConcurrentSkipListMap is not concurrent secure |
| // so we must add read lock here |
| readLock(); |
| try { |
| List<TsFileResource> allResources = new ArrayList<>(); |
| Map<Long, TsFileResourceList> chosenMap = sequence ? sequenceFiles : unsequenceFiles; |
| for (Map.Entry<Long, TsFileResourceList> entry : chosenMap.entrySet()) { |
| if (TimePartitionUtils.satisfyPartitionId(startTime, endTime, entry.getKey())) { |
| allResources.addAll(entry.getValue().getArrayList()); |
| } |
| } |
| return allResources; |
| } finally { |
| readUnlock(); |
| } |
| } |
| |
| public TsFileResourceList getOrCreateSequenceListByTimePartition(long timePartition) { |
| writeLock("getOrCreateSequenceListByTimePartition"); |
| try { |
| return sequenceFiles.computeIfAbsent(timePartition, l -> new TsFileResourceList()); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| public TsFileResourceList getOrCreateUnsequenceListByTimePartition(long timePartition) { |
| writeLock("getOrCreateUnsequenceListByTimePartition"); |
| try { |
| return unsequenceFiles.computeIfAbsent(timePartition, l -> new TsFileResourceList()); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| public Iterator<TsFileResource> getIterator(boolean sequence) { |
| readLock(); |
| try { |
| return getTsFileList(sequence).iterator(); |
| } finally { |
| readUnlock(); |
| } |
| } |
| |
| public void remove(TsFileResource tsFileResource, boolean sequence) { |
| writeLock("remove"); |
| try { |
| Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles; |
| for (Map.Entry<Long, TsFileResourceList> entry : selectedMap.entrySet()) { |
| if (entry.getValue().contains(tsFileResource)) { |
| entry.getValue().remove(tsFileResource); |
| TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource); |
| break; |
| } |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| public void removeAll(List<TsFileResource> tsFileResourceList, boolean sequence) { |
| writeLock("removeAll"); |
| try { |
| for (TsFileResource resource : tsFileResourceList) { |
| remove(resource, sequence); |
| TsFileResourceManager.getInstance().removeTsFileResource(resource); |
| } |
| } finally { |
| writeLock("removeAll"); |
| } |
| } |
| |
| /** |
| * Insert tsFileResource to a target pos(targetPos = insertPos) e.g. if insertPos = 0, then to the |
| * first, if insert Pos = 1, then to the second. |
| */ |
| public void insertToPartitionFileList( |
| TsFileResource tsFileResource, long timePartition, boolean sequence, int insertPos) { |
| writeLock("add"); |
| try { |
| Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles; |
| TsFileResourceList tsFileResources = |
| selectedMap.computeIfAbsent(timePartition, o -> new TsFileResourceList()); |
| tsFileResources.set(insertPos, tsFileResource); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| public void add(TsFileResource tsFileResource, boolean sequence) { |
| writeLock("add"); |
| try { |
| Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles; |
| selectedMap |
| .computeIfAbsent(tsFileResource.getTimePartition(), o -> new TsFileResourceList()) |
| .add(tsFileResource); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| public void keepOrderInsert(TsFileResource tsFileResource, boolean sequence) throws IOException { |
| writeLock("keepOrderInsert"); |
| try { |
| Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles; |
| selectedMap |
| .computeIfAbsent(tsFileResource.getTimePartition(), o -> new TsFileResourceList()) |
| .keepOrderInsert(tsFileResource); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| public void addAll(List<TsFileResource> tsFileResourceList, boolean sequence) { |
| writeLock("add"); |
| try { |
| for (TsFileResource resource : tsFileResourceList) { |
| add(resource, sequence); |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** This method is called after compaction to update memory. */ |
| public void replace( |
| List<TsFileResource> seqFileResources, |
| List<TsFileResource> unseqFileResources, |
| List<TsFileResource> targetFileResources, |
| long timePartition) |
| throws IOException { |
| writeLock("replace"); |
| try { |
| for (TsFileResource tsFileResource : seqFileResources) { |
| if (sequenceFiles.get(timePartition).remove(tsFileResource)) { |
| TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource); |
| } |
| } |
| for (TsFileResource tsFileResource : unseqFileResources) { |
| if (unsequenceFiles.get(timePartition).remove(tsFileResource)) { |
| TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource); |
| } |
| } |
| for (TsFileResource resource : targetFileResources) { |
| if (!resource.isDeleted()) { |
| TsFileResourceManager.getInstance().registerSealedTsFileResource(resource); |
| if (resource.isSeq()) { |
| sequenceFiles |
| .computeIfAbsent(timePartition, t -> new TsFileResourceList()) |
| .keepOrderInsert(resource); |
| } else { |
| unsequenceFiles |
| .computeIfAbsent(timePartition, t -> new TsFileResourceList()) |
| .keepOrderInsert(resource); |
| } |
| } |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| public boolean contains(TsFileResource tsFileResource, boolean sequence) { |
| readLock(); |
| try { |
| Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles; |
| TsFileResourceList list = selectedMap.getOrDefault(tsFileResource.getTimePartition(), null); |
| return list != null && list.contains(tsFileResource); |
| } finally { |
| readUnlock(); |
| } |
| } |
| |
| public void clear() { |
| writeLock("clear"); |
| try { |
| sequenceFiles.clear(); |
| unsequenceFiles.clear(); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| public boolean isEmpty(boolean sequence) { |
| readLock(); |
| try { |
| Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles; |
| for (Map.Entry<Long, TsFileResourceList> entry : selectedMap.entrySet()) { |
| if (!entry.getValue().isEmpty()) { |
| return false; |
| } |
| } |
| return true; |
| } finally { |
| readUnlock(); |
| } |
| } |
| |
| public int size(boolean sequence) { |
| readLock(); |
| try { |
| int totalSize = 0; |
| Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles; |
| for (Map.Entry<Long, TsFileResourceList> entry : selectedMap.entrySet()) { |
| totalSize += entry.getValue().size(); |
| } |
| return totalSize; |
| } finally { |
| readUnlock(); |
| } |
| } |
| |
| public void readLock() { |
| resourceListLock.readLock().lock(); |
| } |
| |
| public void readUnlock() { |
| resourceListLock.readLock().unlock(); |
| } |
| |
| public void writeLock(String holder) { |
| resourceListLock.writeLock().lock(); |
| writeLockHolder = holder; |
| } |
| |
| public void writeUnlock() { |
| resourceListLock.writeLock().unlock(); |
| writeLockHolder = ""; |
| } |
| |
| public String getStorageGroupName() { |
| return storageGroupName; |
| } |
| |
| public String getStorageGroupDir() { |
| return storageGroupDir; |
| } |
| |
| public Set<Long> getTimePartitions() { |
| readLock(); |
| try { |
| Set<Long> timePartitions = new HashSet<>(sequenceFiles.keySet()); |
| timePartitions.addAll(unsequenceFiles.keySet()); |
| return timePartitions; |
| } finally { |
| readUnlock(); |
| } |
| } |
| |
| public boolean isAllowCompaction() { |
| return allowCompaction; |
| } |
| |
| public void setAllowCompaction(boolean allowCompaction) { |
| this.allowCompaction = allowCompaction; |
| } |
| |
| public String getDataRegionId() { |
| return dataRegionId; |
| } |
| |
| public void setDataRegionId(String dataRegionId) { |
| this.dataRegionId = dataRegionId; |
| } |
| |
| public long getNextCompactionTaskId() { |
| return currentCompactionTaskSerialId.getAndIncrement(); |
| } |
| |
| public boolean hasNextTimePartition(long timePartition, boolean sequence) { |
| try { |
| return sequence |
| ? sequenceFiles.higherKey(timePartition) != null |
| : unsequenceFiles.higherKey(timePartition) != null; |
| } catch (NullPointerException e) { |
| return false; |
| } |
| } |
| |
| // determine whether time partition is the latest(largest) or not |
| public boolean isLatestTimePartition(long timePartitionId) { |
| return (sequenceFiles.higherKey(timePartitionId) == null |
| && unsequenceFiles.higherKey(timePartitionId) == null); |
| } |
| } |