blob: e9d72488a9f5fe3409df5d3a3cc0d30f15f2258f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.load;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* {@link LoadTsFileManager} is used for dealing with {@link LoadTsFilePieceNode} and {@link
* LoadCommand}. This class turn the content of a piece of loading TsFile into a new TsFile. When
* DataNode finish transfer pieces, this class will flush all TsFile and laod them into IoTDB, or
* delete all.
*/
public class LoadTsFileManager {
private static final Logger logger = LoggerFactory.getLogger(LoadTsFileManager.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final File loadDir;
private final Map<String, TsFileWriterManager> uuid2WriterManager;
private final Map<String, Integer> dataPartition2NextTsFileIndex;
private final ReentrantLock lock;
public LoadTsFileManager() {
this.loadDir = SystemFileFactory.INSTANCE.getFile(config.getLoadTsFileDir());
this.uuid2WriterManager = new ConcurrentHashMap<>();
this.dataPartition2NextTsFileIndex = new HashMap<>();
this.lock = new ReentrantLock();
if (loadDir.delete()) {
logger.info(String.format("Delete origin load TsFile dir %s.", loadDir.getPath()));
}
}
public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode pieceNode, String uuid)
throws PageException, IOException {
TsFileWriterManager writerManager =
uuid2WriterManager.computeIfAbsent(
uuid, o -> new TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
for (ChunkData chunkData : pieceNode.getAllChunkData()) {
writerManager.write(
new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData);
}
}
public boolean loadAll(String uuid) throws IOException, LoadFileException {
if (!uuid2WriterManager.containsKey(uuid)) {
return false;
}
uuid2WriterManager.get(uuid).loadAll();
uuid2WriterManager.get(uuid).close();
uuid2WriterManager.remove(uuid);
clean();
return true;
}
public boolean deleteAll(String uuid) throws IOException {
if (!uuid2WriterManager.containsKey(uuid)) {
return false;
}
uuid2WriterManager.get(uuid).close();
uuid2WriterManager.remove(uuid);
clean();
return true;
}
private String getNewTsFileName(String dataPartition) {
lock.lock();
try {
int nextIndex = dataPartition2NextTsFileIndex.getOrDefault(dataPartition, 0) + 1;
dataPartition2NextTsFileIndex.put(dataPartition, nextIndex);
return dataPartition
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ nextIndex
+ TsFileConstant.TSFILE_SUFFIX;
} finally {
lock.unlock();
}
}
private void clean() {
if (loadDir.delete()) { // this method will check if there sub-dir in this dir.
logger.info(String.format("Delete load dir %s.", loadDir.getPath()));
}
}
private class TsFileWriterManager {
private final File taskDir;
private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
private Map<DataPartitionInfo, String> dataPartition2LastDevice;
private TsFileWriterManager(File taskDir) {
this.taskDir = taskDir;
this.dataPartition2Writer = new HashMap<>();
this.dataPartition2LastDevice = new HashMap<>();
clearDir(taskDir);
}
private void clearDir(File dir) {
if (dir.delete()) {
logger.info(String.format("Delete origin load TsFile dir %s.", dir.getPath()));
}
if (!dir.mkdirs()) {
logger.warn(String.format("load TsFile dir %s can not be created.", dir.getPath()));
}
}
private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) throws IOException {
if (!dataPartition2Writer.containsKey(partitionInfo)) {
File newTsFile =
SystemFileFactory.INSTANCE.getFile(taskDir, getNewTsFileName(partitionInfo.toString()));
if (!newTsFile.createNewFile()) {
logger.error(String.format("Can not create TsFile %s for writing.", newTsFile.getPath()));
return;
}
dataPartition2Writer.put(partitionInfo, new TsFileIOWriter(newTsFile));
}
TsFileIOWriter writer = dataPartition2Writer.get(partitionInfo);
if (!chunkData.getDevice().equals(dataPartition2LastDevice.getOrDefault(partitionInfo, ""))) {
if (dataPartition2LastDevice.containsKey(partitionInfo)) {
writer.endChunkGroup();
}
writer.startChunkGroup(chunkData.getDevice());
dataPartition2LastDevice.put(partitionInfo, chunkData.getDevice());
}
chunkData.writeToFileWriter(writer);
}
private void loadAll() throws IOException, LoadFileException {
for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) {
TsFileIOWriter writer = entry.getValue();
if (writer.isWritingChunkGroup()) {
writer.endChunkGroup();
}
writer.endFile();
entry.getKey().getDataRegion().loadNewTsFile(generateResource(writer), true);
}
}
private TsFileResource generateResource(TsFileIOWriter writer) throws IOException {
TsFileResource tsFileResource = new TsFileResource(writer.getFile());
Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
writer.getDeviceTimeseriesMetadataMap();
for (Map.Entry<String, List<TimeseriesMetadata>> entry :
deviceTimeseriesMetadataMap.entrySet()) {
String device = entry.getKey();
for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
tsFileResource.updateStartTime(device, timeseriesMetaData.getStatistics().getStartTime());
tsFileResource.updateEndTime(device, timeseriesMetaData.getStatistics().getEndTime());
}
}
tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
tsFileResource.serialize();
return tsFileResource;
}
private void close() throws IOException {
for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) {
TsFileIOWriter writer = entry.getValue();
if (writer.canWrite()) {
entry.getValue().close();
}
}
if (!taskDir.delete()) {
logger.warn(String.format("Can not delete load uuid dir %s.", taskDir.getPath()));
}
dataPartition2Writer = null;
dataPartition2LastDevice = null;
}
}
private class DataPartitionInfo {
private final DataRegion dataRegion;
private final TTimePartitionSlot timePartitionSlot;
private DataPartitionInfo(DataRegion dataRegion, TTimePartitionSlot timePartitionSlot) {
this.dataRegion = dataRegion;
this.timePartitionSlot = timePartitionSlot;
}
public DataRegion getDataRegion() {
return dataRegion;
}
public TTimePartitionSlot getTimePartitionSlot() {
return timePartitionSlot;
}
@Override
public String toString() {
return String.join(
IoTDBConstant.FILE_NAME_SEPARATOR,
dataRegion.getStorageGroupName(),
dataRegion.getDataRegionId(),
Long.toString(timePartitionSlot.getStartTime()));
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DataPartitionInfo that = (DataPartitionInfo) o;
return Objects.equals(dataRegion, that.dataRegion)
&& timePartitionSlot.getStartTime() == that.timePartitionSlot.getStartTime();
}
@Override
public int hashCode() {
return Objects.hash(dataRegion, timePartitionSlot.getStartTime());
}
}
}