[To dev/1.3] Clean up tmp dirs of udf and sort while starting up (#17377)
(cherry picked from commit 32f1010942e8841e9c1aefcb89e042dba18d42f7)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 35ded21..44240e7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -682,6 +682,15 @@
}
}
+ private void cleanupSortTmpDir() {
+ String sortTmpDir = config.getSortTmpDir();
+ File tmpDir = new File(sortTmpDir);
+ if (tmpDir.exists()) {
+ FileUtils.deleteFileOrDirectory(tmpDir, true);
+ logger.info("Cleaned up stale sort temp directory: {}", sortTmpDir);
+ }
+ }
+
private void prepareResources() throws StartupException {
prepareUDFResources();
prepareTriggerResources();
@@ -736,6 +745,9 @@
registerManager.register(new JMXService());
JMXService.registerMBean(getInstance(), mbeanName);
+ // Clean up stale sort temp files left from previous runs
+ cleanupSortTmpDir();
+
// Get resources for trigger,udf,pipe...
prepareResources();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java
index 9c0eef8..4706fa0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java
@@ -33,6 +33,7 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -59,10 +60,9 @@
public String register(SerializationRecorder recorder) throws IOException {
String queryId = recorder.getQueryId();
- if (!recorders.containsKey(queryId)) {
- recorders.put(queryId, new ArrayList<>());
- }
- recorders.get(queryId).add(recorder);
+ recorders
+ .computeIfAbsent(queryId, k -> Collections.synchronizedList(new ArrayList<>()))
+ .add(recorder);
String dirName = getDirName(queryId);
makeDirIfNecessary(dirName);
@@ -109,6 +109,11 @@
@Override
public void start() throws StartupException {
try {
+ // Clean up stale temp directories left from previous runs (e.g., after a crash)
+ File tmpDir = SystemFileFactory.INSTANCE.getFile(TEMPORARY_FILE_DIR);
+ if (tmpDir.exists()) {
+ FileUtils.deleteDirectory(tmpDir);
+ }
makeDirIfNecessary(TEMPORARY_FILE_DIR);
} catch (IOException e) {
throw new StartupException(e);
@@ -117,8 +122,11 @@
@Override
public void stop() {
- for (Object queryId : recorders.keySet().toArray()) {
- deregister((String) queryId);
+ recorders.clear();
+ try {
+ FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(TEMPORARY_FILE_DIR));
+ } catch (IOException e) {
+ logger.warn("Failed to delete temp dir {}.", TEMPORARY_FILE_DIR, e);
}
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 093e094..fc739b5 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -81,7 +81,6 @@
private void downloadExecutables(List<String> uris, long requestId)
throws IOException, URISyntaxException {
- // TODO: para download
try {
for (String uriString : uris) {
final URL url = new URI(uriString).toURL();
@@ -238,7 +237,8 @@
}
Files.createFile(path);
}
- // FileOutPutStream is not in append mode by default, so the file will be overridden if it
+ // FileOutPutStream is not in append mode by default, so the file will be
+ // overridden if it
// already exists.
try (FileOutputStream outputStream = new FileOutputStream(destination)) {
outputStream.getChannel().write(byteBuffer);