blob: 91b8cbedbe8df62ff24758e2bbfb90dcc4d18ac7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.commons.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferHandshakeV1Req;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferHandshakeV2Req;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.commons.io.FileUtils;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* {@link IoTDBFileReceiver} is the parent class of receiver on both configNode and DataNode,
* handling all the logic of parallel file receiving.
*/
public abstract class IoTDBFileReceiver implements IoTDBReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBFileReceiver.class);
protected final AtomicReference<File> receiverFileDirWithIdSuffix = new AtomicReference<>();
// Used to generate transfer id, which is used to identify a receiver thread.
private static final AtomicLong RECEIVER_ID_GENERATOR = new AtomicLong(0);
protected final AtomicLong receiverId = new AtomicLong(0);
private File writingFile;
private RandomAccessFile writingFileWriter;
@Override
public IoTDBConnectorRequestVersion getVersion() {
return IoTDBConnectorRequestVersion.VERSION_1;
}
protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshakeV1Req req) {
if (!CommonDescriptor.getInstance()
.getConfig()
.getTimestampPrecision()
.equals(req.getTimestampPrecision())) {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR,
String.format(
"IoTDB receiver's timestamp precision %s, "
+ "connector's timestamp precision %s. Validation fails.",
CommonDescriptor.getInstance().getConfig().getTimestampPrecision(),
req.getTimestampPrecision()));
LOGGER.warn("Handshake failed, response status = {}.", status);
return new TPipeTransferResp(status);
}
receiverId.set(RECEIVER_ID_GENERATOR.incrementAndGet());
// Clear the original receiver file dir if exists
if (receiverFileDirWithIdSuffix.get() != null) {
if (receiverFileDirWithIdSuffix.get().exists()) {
try {
FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
LOGGER.info(
"Receiver id = {}: Original receiver file dir {} was deleted.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath());
} catch (IOException e) {
LOGGER.warn(
"Receiver id = {}: Failed to delete original receiver file dir {}, because {}.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath(),
e.getMessage(),
e);
}
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Receiver id = {}: Original receiver file dir {} is not existed. No need to delete.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath());
}
}
receiverFileDirWithIdSuffix.set(null);
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Receiver id = {}: Current receiver file dir is null. No need to delete.",
receiverId.get());
}
}
final String receiverFileBaseDir;
try {
receiverFileBaseDir = getReceiverFileBaseDir();
if (Objects.isNull(receiverFileBaseDir)) {
LOGGER.warn(
"Receiver id = {}: Failed to init pipe receiver file folder manager because all disks of folders are full.",
receiverId.get());
return new TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
}
} catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to create pipe receiver file folder because all disks of folders are full.",
receiverId.get(),
e);
return new TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
}
// Create a new receiver file dir
final File newReceiverDir = new File(receiverFileBaseDir, Long.toString(receiverId.get()));
if (!newReceiverDir.exists() && !newReceiverDir.mkdirs()) {
LOGGER.warn(
"Receiver id = {}: Failed to create receiver file dir {}.",
receiverId.get(),
newReceiverDir.getPath());
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR,
String.format("Failed to create receiver file dir %s.", newReceiverDir.getPath())));
}
receiverFileDirWithIdSuffix.set(newReceiverDir);
LOGGER.info(
"Receiver id = {}: Handshake successfully, receiver file dir = {}.",
receiverId.get(),
newReceiverDir.getPath());
return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
}
protected abstract String getReceiverFileBaseDir() throws Exception;
protected TPipeTransferResp handleTransferHandshakeV2(final PipeTransferHandshakeV2Req req)
throws IOException {
// Reject to handshake if the receiver can not take clusterId from config node.
final String clusterIdFromConfigNode = getClusterId();
if (clusterIdFromConfigNode == null) {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR,
"Receiver can not get clusterId from config node.");
LOGGER.warn(
"Receiver id = {}: Handshake failed, response status = {}.", receiverId.get(), status);
return new TPipeTransferResp(status);
}
// Reject to handshake if the request does not contain sender's clusterId.
final String clusterIdFromHandshakeRequest =
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID);
if (clusterIdFromHandshakeRequest == null) {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR, "Handshake request does not contain clusterId.");
LOGGER.warn(
"Receiver id = {}: Handshake failed, response status = {}.", receiverId.get(), status);
return new TPipeTransferResp(status);
}
// Reject to handshake if the receiver and sender are from the same cluster.
if (Objects.equals(clusterIdFromConfigNode, clusterIdFromHandshakeRequest)) {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR,
String.format(
"Receiver and sender are from the same cluster %s.",
clusterIdFromHandshakeRequest));
LOGGER.warn(
"Receiver id = {}: Handshake failed, response status = {}.", receiverId.get(), status);
return new TPipeTransferResp(status);
}
// Reject to handshake if the request does not contain timestampPrecision.
final String timestampPrecision =
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION);
if (timestampPrecision == null) {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR,
"Handshake request does not contain timestampPrecision.");
LOGGER.warn(
"Receiver id = {}: Handshake failed, response status = {}.", receiverId.get(), status);
return new TPipeTransferResp(status);
}
// Handle the handshake request as a v1 request.
// Here we construct a fake "dataNode" request to valid from v1 validation logic, though
// it may not require the actual type of the v1 request.
return handleTransferHandshakeV1(
new PipeTransferHandshakeV1Req() {
@Override
protected PipeRequestType getPlanType() {
return PipeRequestType.HANDSHAKE_DATANODE_V1;
}
}.convertToTPipeTransferReq(timestampPrecision));
}
protected abstract String getClusterId();
protected final TPipeTransferResp handleTransferFilePiece(
final PipeTransferFilePieceReq req,
final boolean isRequestThroughAirGap,
final boolean isSingleFile)
throws IOException {
try {
updateWritingFileIfNeeded(req.getFileName(), isSingleFile);
if (!isWritingFileOffsetCorrect(req.getStartWritingOffset())) {
if (isRequestThroughAirGap
|| !writingFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
// 1. If the request is through air gap, the sender will resend the file piece from the
// beginning of the file. So the receiver should reset the offset of the writing file to
// the beginning of the file.
// 2. If the file is a tsFile, then the content will not be changed for a specific
// filename. However, for other files (mod, snapshot, etc.) the content varies for the
// same name in different times, then we must rewrite the file to apply the newest
// version.
writingFileWriter.setLength(0);
}
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET,
String.format(
"Request sender to reset file reader's offset from %s to %s.",
req.getStartWritingOffset(), writingFileWriter.length()));
LOGGER.warn(
"Receiver id = {}: File offset reset requested by receiver, response status = {}.",
receiverId.get(),
status);
return PipeTransferFilePieceResp.toTPipeTransferResp(status, writingFileWriter.length());
}
writingFileWriter.write(req.getFilePiece());
writingFileWriter.getFD().sync();
return PipeTransferFilePieceResp.toTPipeTransferResp(
RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
} catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to write file piece from req {}.", receiverId.get(), req, e);
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format("Failed to write file piece, because %s", e.getMessage()));
try {
return PipeTransferFilePieceResp.toTPipeTransferResp(
status, PipeTransferFilePieceResp.ERROR_END_OFFSET);
} catch (IOException ex) {
return PipeTransferFilePieceResp.toTPipeTransferResp(status);
}
}
}
protected final void updateWritingFileIfNeeded(final String fileName, final boolean isSingleFile)
throws IOException {
if (isFileExistedAndNameCorrect(fileName)) {
return;
}
LOGGER.info(
"Receiver id = {}: Writing file {} is not existed or name is not correct, try to create it. "
+ "Current writing file is {}.",
receiverId.get(),
fileName,
writingFile == null ? "null" : writingFile.getPath());
closeCurrentWritingFileWriter();
// If there are multiple files we can not delete the current file
// instead they will be deleted after seal request
if (writingFile != null && isSingleFile) {
deleteCurrentWritingFile();
}
// Make sure receiver file dir exists
// This may be useless, because receiver file dir is created when handshake. just in case.
if (!receiverFileDirWithIdSuffix.get().exists()) {
if (receiverFileDirWithIdSuffix.get().mkdirs()) {
LOGGER.info(
"Receiver id = {}: Receiver file dir {} was created.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath());
} else {
LOGGER.error(
"Receiver id = {}: Failed to create receiver file dir {}.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath());
}
}
writingFile = new File(receiverFileDirWithIdSuffix.get(), fileName);
writingFileWriter = new RandomAccessFile(writingFile, "rw");
LOGGER.info(
"Receiver id = {}: Writing file {} was created. Ready to write file pieces.",
receiverId.get(),
writingFile.getPath());
}
private boolean isFileExistedAndNameCorrect(String fileName) {
return writingFile != null && writingFile.getName().equals(fileName);
}
private void closeCurrentWritingFileWriter() {
if (writingFileWriter != null) {
try {
writingFileWriter.close();
LOGGER.info(
"Receiver id = {}: Current writing file writer {} was closed.",
receiverId.get(),
writingFile == null ? "null" : writingFile.getPath());
} catch (IOException e) {
LOGGER.warn(
"Receiver id = {}: Failed to close current writing file writer {}, because {}.",
receiverId.get(),
writingFile == null ? "null" : writingFile.getPath(),
e.getMessage(),
e);
}
writingFileWriter = null;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Receiver id = {}: Current writing file writer is null. No need to close.",
receiverId.get());
}
}
}
private void deleteCurrentWritingFile() {
if (writingFile != null) {
deleteFile(writingFile);
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Receiver id = {}: Current writing file is null. No need to delete.", receiverId.get());
}
}
}
private void deleteFile(File file) {
if (file.exists()) {
try {
FileUtils.delete(file);
LOGGER.info(
"Receiver id = {}: Original writing file {} was deleted.",
receiverId.get(),
file.getPath());
} catch (IOException e) {
LOGGER.warn(
"Receiver id = {}: Failed to delete original writing file {}, because {}.",
receiverId.get(),
file.getPath(),
e.getMessage(),
e);
}
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Receiver id = {}: Original file {} is not existed. No need to delete.",
receiverId.get(),
file.getPath());
}
}
}
private boolean isWritingFileOffsetCorrect(final long offset) throws IOException {
final boolean offsetCorrect = writingFileWriter.length() == offset;
if (!offsetCorrect) {
LOGGER.warn(
"Receiver id = {}: Writing file {}'s offset is {}, but request sender's offset is {}.",
receiverId.get(),
writingFile.getPath(),
writingFileWriter.length(),
offset);
}
return offsetCorrect;
}
protected final TPipeTransferResp handleTransferFileSealV1(final PipeTransferFileSealReqV1 req) {
try {
if (!isWritingFileAvailable()) {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format(
"Failed to seal file, because writing file %s is not available.", writingFile));
LOGGER.warn(status.getMessage());
return new TPipeTransferResp(status);
}
final TPipeTransferResp resp = checkFinalFileSeal(req.getFileName(), req.getFileLength());
if (Objects.nonNull(resp)) {
return resp;
}
final String fileAbsolutePath = writingFile.getAbsolutePath();
// 1. The writing file writer must be closed, otherwise it may cause concurrent errors during
// the process of loading tsfile when parsing tsfile.
//
// 2. The writing file must be set to null, otherwise if the next passed tsfile has the same
// name as the current tsfile, it will bypass the judgment logic of
// updateWritingFileIfNeeded#isFileExistedAndNameCorrect, and continue to write to the already
// loaded file. Since the writing file writer has already been closed, it will throw a Stream
// Close exception.
writingFileWriter.getFD().sync();
writingFileWriter.close();
writingFileWriter = null;
// writingFile will be deleted after load if no exception occurs
writingFile = null;
final TSStatus status = loadFileV1(req, fileAbsolutePath);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(
"Receiver id = {}: Seal file {} successfully.", receiverId.get(), fileAbsolutePath);
} else {
LOGGER.warn(
"Receiver id = {}: Failed to seal file {}, because {}.",
receiverId.get(),
fileAbsolutePath,
status.getMessage());
}
return new TPipeTransferResp(status);
} catch (IOException e) {
LOGGER.warn(
"Receiver id = {}: Failed to seal file {} from req {}.",
receiverId.get(),
writingFile,
req,
e);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format("Failed to seal file %s because %s", writingFile, e.getMessage())));
} finally {
// If the writing file is not sealed successfully, the writing file will be deleted.
// All pieces of the writing file and its mod (if exists) should be retransmitted by the
// sender.
closeCurrentWritingFileWriter();
deleteCurrentWritingFile();
}
}
protected final TPipeTransferResp handleTransferFileSealV2(final PipeTransferFileSealReqV2 req) {
final List<File> files =
req.getFileNames().stream()
.map(fileName -> new File(receiverFileDirWithIdSuffix.get(), fileName))
.collect(Collectors.toList());
try {
if (!isWritingFileAvailable()) {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format(
"Failed to seal file %s, because writing file %s is not available.",
req.getFileNames(), writingFile));
LOGGER.warn(status.getMessage());
return new TPipeTransferResp(status);
}
// Any of the transferred files cannot be empty, or else the receiver
// will not sense this file because no pieces are sent
for (int i = 0; i < req.getFileNames().size(); ++i) {
final TPipeTransferResp resp =
i == req.getFileNames().size() - 1
? checkFinalFileSeal(req.getFileNames().get(i), req.getFileLengths().get(i))
: checkNonFinalFileSeal(
files.get(i), req.getFileNames().get(i), req.getFileLengths().get(i));
if (Objects.nonNull(resp)) {
return resp;
}
}
// 1. The writing file writer must be closed, otherwise it may cause concurrent errors during
// the process of loading tsfile when parsing tsfile.
//
// 2. The writing file must be set to null, otherwise if the next passed tsfile has the same
// name as the current tsfile, it will bypass the judgment logic of
// updateWritingFileIfNeeded#isFileExistedAndNameCorrect, and continue to write to the already
// loaded file. Since the writing file writer has already been closed, it will throw a Stream
// Close exception.
writingFileWriter.getFD().sync();
writingFileWriter.close();
writingFileWriter = null;
// WritingFile will be deleted after load if no exception occurs
writingFile = null;
final List<String> fileAbsolutePaths =
files.stream().map(File::getAbsolutePath).collect(Collectors.toList());
final TSStatus status = loadFileV2(req, fileAbsolutePaths);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(
"Receiver id = {}: Seal file {} successfully.", receiverId.get(), fileAbsolutePaths);
} else {
LOGGER.warn(
"Receiver id = {}: Failed to seal file {}, status is {}.",
receiverId.get(),
fileAbsolutePaths,
status);
}
return new TPipeTransferResp(status);
} catch (IOException | IllegalPathException e) {
LOGGER.warn(
"Receiver id = {}: Failed to seal file {} from req {}.", receiverId.get(), files, req, e);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format("Failed to seal file %s because %s", writingFile, e.getMessage())));
} finally {
// If the writing file is not sealed successfully, the writing file will be deleted.
// All pieces of the writing file and its mod(if exists) should be retransmitted by the
// sender.
closeCurrentWritingFileWriter();
// Clear the directory instead of only deleting the referenced files in seal request
// to avoid previously undeleted file being redundant when transferring multi files
IoTDBReceiverAgent.cleanPipeReceiverDir(receiverFileDirWithIdSuffix.get());
}
}
private TPipeTransferResp checkNonFinalFileSeal(
final File file, final String fileName, final long fileLength) throws IOException {
if (!file.exists()) {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format("Failed to seal file %s, the file does not exist.", fileName));
LOGGER.warn(
"Receiver id = {}: Failed to seal file {}, because the file does not exist.",
receiverId.get(),
fileName);
return new TPipeTransferResp(status);
}
if (fileLength != file.length()) {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format(
"Failed to seal file %s, because the length of file is not correct. "
+ "The original file has length %s, but receiver file has length %s.",
fileName, fileLength, writingFileWriter.length()));
LOGGER.warn(
"Receiver id = {}: Failed to seal file {}, because the length of file is not correct. "
+ "The original file has length {}, but receiver file has length {}.",
receiverId.get(),
fileName,
fileLength,
writingFileWriter.length());
return new TPipeTransferResp(status);
}
return null;
}
private TPipeTransferResp checkFinalFileSeal(final String fileName, final long fileLength)
throws IOException {
if (!isFileExistedAndNameCorrect(fileName)) {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format(
"Failed to seal file %s, because writing file is %s.", fileName, writingFile));
LOGGER.warn(
"Receiver id = {}: Failed to seal file {}, because writing file is {}.",
receiverId.get(),
fileName,
writingFile);
return new TPipeTransferResp(status);
}
if (!isWritingFileOffsetCorrect(fileLength)) {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format(
"Failed to seal file %s, because the length of file is not correct. "
+ "The original file has length %s, but receiver file has length %s.",
fileName, fileLength, writingFileWriter.length()));
LOGGER.warn(
"Receiver id = {}: Failed to seal file {}, because the length of file is not correct. "
+ "The original file has length {}, but receiver file has length {}.",
receiverId.get(),
fileName,
fileLength,
writingFileWriter.length());
return new TPipeTransferResp(status);
}
return null;
}
private boolean isWritingFileAvailable() {
final boolean isWritingFileAvailable =
writingFile != null && writingFile.exists() && writingFileWriter != null;
if (!isWritingFileAvailable) {
LOGGER.info(
"Receiver id = {}: Writing file {} is not available. "
+ "Writing file is null: {}, writing file exists: {}, writing file writer is null: {}.",
receiverId.get(),
writingFile,
writingFile == null,
writingFile != null && writingFile.exists(),
writingFileWriter == null);
}
return isWritingFileAvailable;
}
protected abstract TSStatus loadFileV1(
final PipeTransferFileSealReqV1 req, final String fileAbsolutePath) throws IOException;
protected abstract TSStatus loadFileV2(
final PipeTransferFileSealReqV2 req, final List<String> fileAbsolutePaths)
throws IOException, IllegalPathException;
@Override
public synchronized void handleExit() {
if (writingFileWriter != null) {
try {
writingFileWriter.close();
LOGGER.info(
"Receiver id = {}: Handling exit: Writing file writer was closed.", receiverId.get());
} catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Handling exit: Close writing file writer error.",
receiverId.get(),
e);
}
writingFileWriter = null;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Receiver id = {}: Handling exit: Writing file writer is null. No need to close.",
receiverId.get());
}
}
if (writingFile != null) {
try {
FileUtils.delete(writingFile);
LOGGER.info(
"Receiver id = {}: Handling exit: Writing file {} was deleted.",
receiverId.get(),
writingFile.getPath());
} catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Handling exit: Delete writing file {} error.",
receiverId.get(),
writingFile.getPath(),
e);
}
writingFile = null;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Receiver id = {}: Handling exit: Writing file is null. No need to delete.",
receiverId.get());
}
}
// Clear the original receiver file dir if exists
if (receiverFileDirWithIdSuffix.get() != null) {
if (receiverFileDirWithIdSuffix.get().exists()) {
try {
FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
LOGGER.info(
"Receiver id = {}: Handling exit: Original receiver file dir {} was deleted.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath());
} catch (IOException e) {
LOGGER.warn(
"Receiver id = {}: Handling exit: Delete original receiver file dir {} error.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath(),
e);
}
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Receiver id = {}: Handling exit: Original receiver file dir {} does not exist. No need to delete.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath());
}
}
receiverFileDirWithIdSuffix.set(null);
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Receiver id = {}: Handling exit: Original receiver file dir is null. No need to delete.",
receiverId.get());
}
}
LOGGER.info("Receiver id = {}: Handling exit: Receiver exited.", receiverId.get());
}
}