blob: 5dd63847866c4ccfbf0caa02efb0fce3ae27a4f8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.resource.tsfile;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.agent.runtime.PipePeriodicalJobExecutor;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class PipeTsFileResourceManager {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileResourceManager.class);
private final Map<String, PipeTsFileResource> hardlinkOrCopiedFileToPipeTsFileResourceMap =
new ConcurrentHashMap<>();
private final ReentrantLock lock = new ReentrantLock();
public PipeTsFileResourceManager() {
PipeAgent.runtime()
.registerPeriodicalJob(
"PipeTsFileResourceManager#ttlCheck()",
this::tryTtlCheck,
Math.max(PipeTsFileResource.TSFILE_MIN_TIME_TO_LIVE_IN_MS / 1000, 1));
}
private void tryTtlCheck() {
try {
final long timeout = PipePeriodicalJobExecutor.getMinIntervalSeconds() >> 1;
if (lock.tryLock(timeout, TimeUnit.SECONDS)) {
try {
ttlCheck();
} finally {
lock.unlock();
}
} else {
LOGGER.warn("failed to try lock when checking TTL because of timeout ({}s)", timeout);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("failed to try lock when checking TTL because of interruption", e);
}
}
private void ttlCheck() {
final Iterator<Map.Entry<String, PipeTsFileResource>> iterator =
hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator();
final Optional<Logger> logger =
PipeResourceManager.log()
.schedule(
PipeTsFileResourceManager.class,
PipeConfig.getInstance().getPipeTsFilePinMaxLogNumPerRound(),
PipeConfig.getInstance().getPipeTsFilePinMaxLogIntervalRounds(),
hardlinkOrCopiedFileToPipeTsFileResourceMap.size());
while (iterator.hasNext()) {
final Map.Entry<String, PipeTsFileResource> entry = iterator.next();
try {
if (entry.getValue().closeIfOutOfTimeToLive()) {
iterator.remove();
} else {
logger.ifPresent(
l ->
l.info(
"Pipe file (file name: {}) is still referenced {} times",
entry.getKey(),
entry.getValue().getReferenceCount()));
}
} catch (IOException e) {
LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", e);
}
}
}
/**
* Given a file, create a hardlink or copy it to pipe dir, maintain a reference count for the
* hardlink or copied file, and return the hardlink or copied file.
*
* <p>If the given file is already a hardlink or copied file, increase its reference count and
* return it.
*
* <p>If the given file is a tsfile, create a hardlink in pipe dir, increase the reference count
* of the hardlink and return it.
*
* <p>Otherwise, copy the file (.mod or .resource) to pipe dir, increase the reference count of
* the copied file and return it.
*
* @param file tsfile, resource file or mod file. can be original file or hardlink/copy of
* original file
* @param isTsFile {@code true} to create hardlink, {@code false} to copy file
* @return the hardlink or copied file
* @throws IOException when create hardlink or copy file failed
*/
public File increaseFileReference(File file, boolean isTsFile) throws IOException {
lock.lock();
try {
// If the file is already a hardlink or copied file,
// just increase reference count and return it
if (increaseReferenceIfExists(file.getPath())) {
return file;
}
// If the file is not a hardlink or copied file, check if there is a related hardlink or
// copied file in pipe dir. if so, increase reference count and return it
final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) {
return hardlinkOrCopiedFileToPipeTsFileResourceMap
.get(hardlinkOrCopiedFile.getPath())
.getFile();
}
// If the file is a tsfile, create a hardlink in pipe dir and will return it.
// otherwise, copy the file (.mod or .resource) to pipe dir and will return it.
final File resultFile =
isTsFile
? FileUtils.createHardLink(file, hardlinkOrCopiedFile)
: FileUtils.copyFile(file, hardlinkOrCopiedFile);
// If the file is not a hardlink or copied file, and there is no related hardlink or copied
// file in pipe dir, create a hardlink or copy it to pipe dir, maintain a reference count for
// the hardlink or copied file, and return the hardlink or copied file.
hardlinkOrCopiedFileToPipeTsFileResourceMap.put(
resultFile.getPath(), new PipeTsFileResource(resultFile, isTsFile));
return resultFile;
} finally {
lock.unlock();
}
}
private boolean increaseReferenceIfExists(String path) {
final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(path);
if (resource != null) {
resource.increaseAndGetReference();
return true;
}
return false;
}
private static File getHardlinkOrCopiedFileInPipeDir(File file) throws IOException {
try {
return new File(getPipeTsFileDirPath(file), getRelativeFilePath(file));
} catch (Exception e) {
throw new IOException(
String.format(
"failed to get hardlink or copied file in pipe dir "
+ "for file %s, it is not a tsfile, mod file or resource file",
file.getPath()),
e);
}
}
private static String getPipeTsFileDirPath(File file) throws IOException {
while (!file.getName().equals(IoTDBConstant.SEQUENCE_FOLDER_NAME)
&& !file.getName().equals(IoTDBConstant.UNSEQUENCE_FOLDER_NAME)) {
file = file.getParentFile();
}
return file.getParentFile().getCanonicalPath()
+ File.separator
+ PipeConfig.getInstance().getPipeHardlinkBaseDirName()
+ File.separator
+ PipeConfig.getInstance().getPipeHardlinkTsFileDirName();
}
private static String getRelativeFilePath(File file) {
StringBuilder builder = new StringBuilder(file.getName());
while (!file.getName().equals(IoTDBConstant.SEQUENCE_FOLDER_NAME)
&& !file.getName().equals(IoTDBConstant.UNSEQUENCE_FOLDER_NAME)) {
file = file.getParentFile();
builder =
new StringBuilder(file.getName())
.append(IoTDBConstant.FILE_NAME_SEPARATOR)
.append(builder);
}
return builder.toString();
}
/**
* Given a hardlink or copied file, decrease its reference count, if the reference count is 0,
* delete the file. if the given file is not a hardlink or copied file, do nothing.
*
* @param hardlinkOrCopiedFile the copied or hardlinked file
*/
public void decreaseFileReference(File hardlinkOrCopiedFile) {
lock.lock();
try {
final String filePath = hardlinkOrCopiedFile.getPath();
final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath);
if (resource != null) {
resource.decreaseAndGetReference();
}
} finally {
lock.unlock();
}
}
/**
* Get the reference count of the file.
*
* @param hardlinkOrCopiedFile the copied or hardlinked file
* @return the reference count of the file
*/
public int getFileReferenceCount(File hardlinkOrCopiedFile) {
lock.lock();
try {
final String filePath = hardlinkOrCopiedFile.getPath();
final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath);
return resource != null ? resource.getReferenceCount() : 0;
} finally {
lock.unlock();
}
}
/**
* Cache maps of the TsFile for further use.
*
* @return {@code true} if the maps are successfully put into cache or already cached. {@code
* false} if they can not be cached.
*/
public boolean cacheObjectsIfAbsent(File hardlinkOrCopiedTsFile) throws IOException {
lock.lock();
try {
final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
return resource != null && resource.cacheObjectsIfAbsent();
} finally {
lock.unlock();
}
}
public Map<IDeviceID, List<String>> getDeviceMeasurementsMapFromCache(File hardlinkOrCopiedTsFile)
throws IOException {
lock.lock();
try {
final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
return resource == null ? null : resource.tryGetDeviceMeasurementsMap();
} finally {
lock.unlock();
}
}
public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache(File hardlinkOrCopiedTsFile)
throws IOException {
lock.lock();
try {
final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
return resource == null ? null : resource.tryGetDeviceIsAlignedMap();
} finally {
lock.unlock();
}
}
public Map<String, TSDataType> getMeasurementDataTypeMapFromCache(File hardlinkOrCopiedTsFile)
throws IOException {
lock.lock();
try {
final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
return resource == null ? null : resource.tryGetMeasurementDataTypeMap();
} finally {
lock.unlock();
}
}
public void pinTsFileResource(TsFileResource resource, boolean withMods) throws IOException {
lock.lock();
try {
increaseFileReference(resource.getTsFile(), true);
if (withMods && resource.getModFile().exists()) {
increaseFileReference(new File(resource.getModFile().getFilePath()), false);
}
} finally {
lock.unlock();
}
}
public void unpinTsFileResource(TsFileResource resource) throws IOException {
lock.lock();
try {
File pinnedFile = getHardlinkOrCopiedFileInPipeDir(resource.getTsFile());
decreaseFileReference(pinnedFile);
File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX);
if (modFile.exists()) {
decreaseFileReference(modFile);
}
} finally {
lock.unlock();
}
}
public int getLinkedTsfileCount() {
lock.lock();
try {
return hardlinkOrCopiedFileToPipeTsFileResourceMap.size();
} finally {
lock.unlock();
}
}
}