blob: c40149c05ddb8bb96dc0b807cd4ed5eabeec5e30 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.storagegroup;
import org.apache.iotdb.db.exception.WriteLockFailedException;
import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
public class TsFileManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TsFileManager.class);
private String storageGroupName;
private String virtualStorageGroup;
private String storageGroupDir;
/** Serialize queries, delete resource files, compaction cleanup files */
private final ReadWriteLock resourceListLock = new ReentrantReadWriteLock();
private String writeLockHolder;
// time partition -> double linked list of tsfiles
private Map<Long, TsFileResourceList> sequenceFiles = new TreeMap<>();
private Map<Long, TsFileResourceList> unsequenceFiles = new TreeMap<>();
private List<TsFileResource> sequenceRecoverTsFileResources = new ArrayList<>();
private List<TsFileResource> unsequenceRecoverTsFileResources = new ArrayList<>();
private boolean allowCompaction = true;
public TsFileManager(
String storageGroupName, String virtualStorageGroup, String storageGroupDir) {
this.storageGroupName = storageGroupName;
this.storageGroupDir = storageGroupDir;
this.virtualStorageGroup = virtualStorageGroup;
}
public List<TsFileResource> getTsFileList(boolean sequence) {
// the iteration of ConcurrentSkipListMap is not concurrent secure
// so we must add read lock here
readLock();
try {
List<TsFileResource> allResources = new ArrayList<>();
Map<Long, TsFileResourceList> chosenMap = sequence ? sequenceFiles : unsequenceFiles;
for (Map.Entry<Long, TsFileResourceList> entry : chosenMap.entrySet()) {
allResources.addAll(entry.getValue().getArrayList());
}
return allResources;
} finally {
readUnlock();
}
}
public TsFileResourceList getSequenceListByTimePartition(long timePartition) {
readLock();
try {
return sequenceFiles.computeIfAbsent(timePartition, l -> new TsFileResourceList());
} finally {
readUnlock();
}
}
public TsFileResourceList getUnsequenceListByTimePartition(long timePartition) {
readLock();
try {
return unsequenceFiles.computeIfAbsent(timePartition, l -> new TsFileResourceList());
} finally {
readUnlock();
}
}
public Iterator<TsFileResource> getIterator(boolean sequence) {
readLock();
try {
return getTsFileList(sequence).iterator();
} finally {
readUnlock();
}
}
public void remove(TsFileResource tsFileResource, boolean sequence) {
writeLock("remove");
try {
Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles;
for (Map.Entry<Long, TsFileResourceList> entry : selectedMap.entrySet()) {
if (entry.getValue().contains(tsFileResource)) {
entry.getValue().remove(tsFileResource);
TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource);
break;
}
}
} finally {
writeUnlock();
}
}
public void removeAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
writeLock("removeAll");
try {
for (TsFileResource resource : tsFileResourceList) {
remove(resource, sequence);
TsFileResourceManager.getInstance().removeTsFileResource(resource);
}
} finally {
writeLock("removeAll");
}
}
/**
* insert tsFileResource to a target pos(targetPos = insertPos) e.g. if insertPos = 0, then to the
* first, if insert Pos = 1, then to the second.
*/
public void insertToPartitionFileList(
TsFileResource tsFileResource, boolean sequence, int insertPos) {
writeLock("add");
try {
Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles;
TsFileResourceList tsFileResources =
selectedMap.computeIfAbsent(
tsFileResource.getTimePartition(), o -> new TsFileResourceList());
tsFileResources.set(insertPos, tsFileResource);
} finally {
writeUnlock();
}
}
public void add(TsFileResource tsFileResource, boolean sequence) {
writeLock("add");
try {
Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles;
selectedMap
.computeIfAbsent(tsFileResource.getTimePartition(), o -> new TsFileResourceList())
.add(tsFileResource);
} finally {
writeUnlock();
}
}
public void keepOrderInsert(TsFileResource tsFileResource, boolean sequence) throws IOException {
writeLock("keepOrderInsert");
try {
Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles;
selectedMap
.computeIfAbsent(tsFileResource.getTimePartition(), o -> new TsFileResourceList())
.keepOrderInsert(tsFileResource);
} finally {
writeUnlock();
}
}
public void addForRecover(TsFileResource tsFileResource, boolean sequence) {
if (sequence) {
sequenceRecoverTsFileResources.add(tsFileResource);
} else {
unsequenceRecoverTsFileResources.add(tsFileResource);
}
}
public void addAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
writeLock("add");
try {
for (TsFileResource resource : tsFileResourceList) {
add(resource, sequence);
}
} finally {
writeUnlock();
}
}
/** This method is called after compaction to update memory. */
public void replace(
List<TsFileResource> seqFileResources,
List<TsFileResource> unseqFileResources,
List<TsFileResource> targetFileResources,
long timePartition,
boolean isTargetSequence)
throws IOException {
writeLock("replace");
try {
for (TsFileResource tsFileResource : seqFileResources) {
if (sequenceFiles.get(timePartition).remove(tsFileResource)) {
TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource);
}
}
for (TsFileResource tsFileResource : unseqFileResources) {
if (unsequenceFiles.get(timePartition).remove(tsFileResource)) {
TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource);
}
}
if (isTargetSequence) {
// seq inner space compaction or cross space compaction
for (TsFileResource resource : targetFileResources) {
TsFileResourceManager.getInstance().registerSealedTsFileResource(resource);
sequenceFiles.get(timePartition).keepOrderInsert(resource);
}
} else {
// unseq inner space compaction
for (TsFileResource resource : targetFileResources) {
TsFileResourceManager.getInstance().registerSealedTsFileResource(resource);
unsequenceFiles.get(timePartition).keepOrderInsert(resource);
}
}
} finally {
writeUnlock();
}
}
public boolean contains(TsFileResource tsFileResource, boolean sequence) {
readLock();
try {
Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles;
TsFileResourceList list = selectedMap.getOrDefault(tsFileResource.getTimePartition(), null);
return list != null && list.contains(tsFileResource);
} finally {
readUnlock();
}
}
public void clear() {
writeLock("clear");
try {
sequenceFiles.clear();
unsequenceFiles.clear();
} finally {
writeUnlock();
}
}
public boolean isEmpty(boolean sequence) {
readLock();
try {
Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles;
for (Map.Entry<Long, TsFileResourceList> entry : selectedMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
return false;
}
}
return true;
} finally {
readUnlock();
}
}
public int size(boolean sequence) {
readLock();
try {
int totalSize = 0;
Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles;
for (Map.Entry<Long, TsFileResourceList> entry : selectedMap.entrySet()) {
totalSize += entry.getValue().size();
}
return totalSize;
} finally {
readUnlock();
}
}
public void readLock() {
resourceListLock.readLock().lock();
}
public void readUnlock() {
resourceListLock.readLock().unlock();
}
public void writeLock(String holder) {
resourceListLock.writeLock().lock();
writeLockHolder = holder;
}
/**
* Acquire write lock with timeout, {@link WriteLockFailedException} will be thrown after timeout.
* The unit of timeout is ms.
*/
public void writeLockWithTimeout(String holder, long timeout) throws WriteLockFailedException {
try {
if (resourceListLock.writeLock().tryLock(timeout, TimeUnit.MILLISECONDS)) {
writeLockHolder = holder;
} else {
throw new WriteLockFailedException(
String.format("cannot get write lock in %d ms", timeout));
}
} catch (InterruptedException e) {
LOGGER.warn(e.getMessage(), e);
Thread.interrupted();
throw new WriteLockFailedException("thread is interrupted");
}
}
public void writeUnlock() {
resourceListLock.writeLock().unlock();
writeLockHolder = "";
}
public String getStorageGroupName() {
return storageGroupName;
}
public String getStorageGroupDir() {
return storageGroupDir;
}
public void setStorageGroupDir(String storageGroupDir) {
this.storageGroupDir = storageGroupDir;
}
public Set<Long> getTimePartitions() {
readLock();
try {
Set<Long> timePartitions = new HashSet<>(sequenceFiles.keySet());
return timePartitions;
} finally {
readUnlock();
}
}
public boolean isAllowCompaction() {
return allowCompaction;
}
public void setAllowCompaction(boolean allowCompaction) {
this.allowCompaction = allowCompaction;
}
public String getVirtualStorageGroup() {
return virtualStorageGroup;
}
public void setVirtualStorageGroup(String virtualStorageGroup) {
this.virtualStorageGroup = virtualStorageGroup;
}
public List<TsFileResource> getSequenceRecoverTsFileResources() {
return sequenceRecoverTsFileResources;
}
public List<TsFileResource> getUnsequenceRecoverTsFileResources() {
return unsequenceRecoverTsFileResources;
}
// ({systemTime}-{versionNum}-{innerCompactionNum}-{crossCompactionNum}.tsfile)
public static int compareFileName(File o1, File o2) {
String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
long ver1 = Long.parseLong(items1[0]);
long ver2 = Long.parseLong(items2[0]);
int cmp = Long.compare(ver1, ver2);
if (cmp == 0) {
int cmpVersion = Long.compare(Long.parseLong(items1[1]), Long.parseLong(items2[1]));
if (cmpVersion == 0) {
return Long.compare(Long.parseLong(items1[2]), Long.parseLong(items2[2]));
}
return cmpVersion;
} else {
return cmp;
}
}
}