[#1680] improvement(server): Remove partial HDFS files that written by server self for expired apps (#1681)
### What changes were proposed in this pull request?
Only remove the hdfs files that written by server themself when application is purged for expired.
### Why are the changes needed?
Fix: #1680
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 50ea004..8cf3e3e 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -774,7 +774,10 @@
shuffleFlushManager.removeResources(appId);
storageManager.removeResources(
new AppPurgeEvent(
- appId, shuffleTaskInfo.getUser(), new ArrayList<>(shuffleTaskInfo.getShuffleIds())));
+ appId,
+ shuffleTaskInfo.getUser(),
+ new ArrayList<>(shuffleTaskInfo.getShuffleIds()),
+ checkAppExpired));
if (shuffleTaskInfo.hasHugePartition()) {
ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
ShuffleServerMetrics.gaugeHugePartitionNum.dec();
diff --git a/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java b/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
index 804d384..cb510d5 100644
--- a/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
@@ -20,12 +20,22 @@
import java.util.List;
public class AppPurgeEvent extends PurgeEvent {
+ private final boolean appExpired;
+
+ public AppPurgeEvent(String appId, String user, List<Integer> shuffleIds, boolean appExpired) {
+ super(appId, user, shuffleIds);
+ this.appExpired = appExpired;
+ }
public AppPurgeEvent(String appId, String user, List<Integer> shuffleIds) {
- super(appId, user, shuffleIds);
+ this(appId, user, shuffleIds, false);
}
public AppPurgeEvent(String appId, String user) {
- super(appId, user, null);
+ this(appId, user, null);
+ }
+
+ public boolean isAppExpired() {
+ return appExpired;
}
}
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index 496c485..7ab9ef6 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -57,12 +57,14 @@
private static final Logger LOG = LoggerFactory.getLogger(HadoopStorageManager.class);
private final Configuration hadoopConf;
+ private final String shuffleServerId;
private Map<String, HadoopStorage> appIdToStorages = JavaUtils.newConcurrentMap();
private Map<String, HadoopStorage> pathToStorages = JavaUtils.newConcurrentMap();
HadoopStorageManager(ShuffleServerConf conf) {
super(conf);
hadoopConf = conf.getHadoopConf();
+ shuffleServerId = conf.getString(ShuffleServerConf.SHUFFLE_SERVER_ID, "shuffleServerId");
}
@Override
@@ -94,15 +96,19 @@
String appId = event.getAppId();
HadoopStorage storage = getStorageByAppId(appId);
if (storage != null) {
+ boolean purgeForExpired = false;
if (event instanceof AppPurgeEvent) {
storage.removeHandlers(appId);
appIdToStorages.remove(appId);
+ purgeForExpired = ((AppPurgeEvent) event).isAppExpired();
}
ShuffleDeleteHandler deleteHandler =
ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(
- StorageType.HDFS.name(), storage.getConf()));
+ StorageType.HDFS.name(),
+ storage.getConf(),
+ purgeForExpired ? shuffleServerId : null));
String basicPath =
ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
diff --git a/server/src/test/java/org/apache/uniffle/server/storage/HadoopStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/HadoopStorageManagerTest.java
index c9fee74..663a7bf 100644
--- a/server/src/test/java/org/apache/uniffle/server/storage/HadoopStorageManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/storage/HadoopStorageManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.server.storage;
+import java.io.File;
import java.util.Arrays;
import java.util.Map;
@@ -26,6 +27,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.server.ShuffleServerConf;
@@ -33,11 +35,14 @@
import org.apache.uniffle.server.event.AppPurgeEvent;
import org.apache.uniffle.server.event.ShufflePurgeEvent;
import org.apache.uniffle.storage.common.HadoopStorage;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class HadoopStorageManagerTest {
@@ -112,4 +117,86 @@
assertNull(hs3.getConf().get("k2"));
assertNull(hs3.getConf().get("k3"));
}
+
+ @Test
+ public void testRemoveExpiredResourcesWithTwoReplicas(@TempDir File remoteBasePath)
+ throws Exception {
+ ShuffleServerConf conf = new ShuffleServerConf();
+ conf.setString(
+ ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
+ String shuffleServerId = "127.0.0.1:19999";
+ conf.setString(ShuffleServerConf.SHUFFLE_SERVER_ID, shuffleServerId);
+ HadoopStorageManager hadoopStorageManager = new HadoopStorageManager(conf);
+ final String remoteStoragePath1 = new File(remoteBasePath, "path1").getAbsolutePath();
+ String appId = "testRemoveExpiredResources";
+ hadoopStorageManager.registerRemoteStorage(
+ appId, new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", "v1", "k2", "v2")));
+ Map<String, HadoopStorage> appStorageMap = hadoopStorageManager.getAppIdToStorages();
+
+ HadoopStorage storage = appStorageMap.get(appId);
+ String appPath = ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
+ File appPathFile = new File(appPath);
+ File partitionDir = new File(appPathFile, "1/1-1/");
+ partitionDir.mkdirs();
+ // Simulate the case that there are two shuffle servers write data.
+ File dataFile = new File(partitionDir, shuffleServerId + "_1.data");
+ dataFile.createNewFile();
+ File dataFile2 = new File(partitionDir, "shuffleserver2_1.data");
+ dataFile2.createNewFile();
+ assertTrue(partitionDir.exists());
+ // Purged for expired
+ assertEquals(1, appStorageMap.size());
+ AppPurgeEvent shufflePurgeEvent = new AppPurgeEvent(appId, "", null, true);
+ hadoopStorageManager.removeResources(shufflePurgeEvent);
+ assertEquals(0, appStorageMap.size());
+ // The directory of the partition should have not been deleted, for it was not empty.
+ assertTrue(partitionDir.exists());
+ assertFalse(dataFile.exists());
+ assertTrue(dataFile2.exists());
+
+ // Purged for unregister
+ AppPurgeEvent appPurgeEvent = new AppPurgeEvent(appId, "");
+ hadoopStorageManager.removeResources(appPurgeEvent);
+ assertEquals(0, appStorageMap.size());
+ assertFalse(appPathFile.exists());
+ }
+
+ @Test
+ public void testRemoveExpiredResourcesWithOneReplica(@TempDir File remoteBasePath)
+ throws Exception {
+ ShuffleServerConf conf = new ShuffleServerConf();
+ conf.setString(
+ ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
+ String shuffleServerId = "127.0.0.1:19999";
+ conf.setString(ShuffleServerConf.SHUFFLE_SERVER_ID, shuffleServerId);
+ HadoopStorageManager hadoopStorageManager = new HadoopStorageManager(conf);
+ final String remoteStoragePath1 = new File(remoteBasePath, "path1").getAbsolutePath();
+ String appId = "testRemoveExpiredResources2";
+ hadoopStorageManager.registerRemoteStorage(
+ appId, new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", "v1", "k2", "v2")));
+ Map<String, HadoopStorage> appStorageMap = hadoopStorageManager.getAppIdToStorages();
+
+ HadoopStorage storage = appStorageMap.get(appId);
+ String appPath = ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
+ File appPathFile = new File(appPath);
+ File partitionDir = new File(appPathFile, "1/1-1/");
+ partitionDir.mkdirs();
+ // Simulate the case that only one shuffle server writes data.
+ File dataFile = new File(partitionDir, shuffleServerId + "_1.data");
+ dataFile.createNewFile();
+ assertTrue(partitionDir.exists());
+ // purged for expired
+ assertEquals(1, appStorageMap.size());
+ AppPurgeEvent shufflePurgeEvent = new AppPurgeEvent(appId, "", null, true);
+ hadoopStorageManager.removeResources(shufflePurgeEvent);
+ assertEquals(0, appStorageMap.size());
+ // The directory of the application should have been deleted, for it was empty.
+ assertFalse(partitionDir.exists());
+
+ // purged for unregister
+ AppPurgeEvent appPurgeEvent = new AppPurgeEvent(appId, "");
+ hadoopStorageManager.removeResources(appPurgeEvent);
+ assertEquals(0, appStorageMap.size());
+ assertFalse(appPathFile.exists());
+ }
}
diff --git a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index 819c26e..eac9584 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -185,7 +185,7 @@
public ShuffleDeleteHandler createShuffleDeleteHandler(
CreateShuffleDeleteHandlerRequest request) {
if (StorageType.HDFS.name().equals(request.getStorageType())) {
- return new HadoopShuffleDeleteHandler(request.getConf());
+ return new HadoopShuffleDeleteHandler(request.getConf(), request.getShuffleServerId());
} else if (StorageType.LOCALFILE.name().equals(request.getStorageType())) {
return new LocalFileDeleteHandler();
} else {
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
index e3daa8e..d48d724 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
@@ -17,7 +17,11 @@
package org.apache.uniffle.storage.handler.impl;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -29,11 +33,13 @@
public class HadoopShuffleDeleteHandler implements ShuffleDeleteHandler {
private static final Logger LOG = LoggerFactory.getLogger(HadoopShuffleDeleteHandler.class);
+ private final String shuffleServerId;
private Configuration hadoopConf;
- public HadoopShuffleDeleteHandler(Configuration hadoopConf) {
+ public HadoopShuffleDeleteHandler(Configuration hadoopConf, String shuffleServerId) {
this.hadoopConf = hadoopConf;
+ this.shuffleServerId = shuffleServerId;
}
@Override
@@ -52,7 +58,7 @@
while (!isSuccess && times < retryMax) {
try {
FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user, path, hadoopConf);
- fileSystem.delete(path, true);
+ delete(fileSystem, path, shuffleServerId);
isSuccess = true;
} catch (Exception e) {
times++;
@@ -86,4 +92,25 @@
}
}
}
+
+ private void delete(FileSystem fileSystem, Path path, String filePrefix) throws IOException {
+ if (filePrefix == null) {
+ fileSystem.delete(path, true);
+ return;
+ }
+ FileStatus[] fileStatuses = fileSystem.listStatus(path);
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.isDirectory()) {
+ delete(fileSystem, fileStatus.getPath(), filePrefix);
+ } else {
+ if (fileStatus.getPath().getName().startsWith(filePrefix)) {
+ fileSystem.delete(fileStatus.getPath(), true);
+ }
+ }
+ }
+ ContentSummary contentSummary = fileSystem.getContentSummary(path);
+ if (contentSummary.getFileCount() == 0) {
+ fileSystem.delete(path, true);
+ }
+ }
}
diff --git a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
index 6df61e5..b8eda28 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
@@ -23,10 +23,17 @@
private String storageType;
private Configuration conf;
+ private String shuffleServerId;
public CreateShuffleDeleteHandlerRequest(String storageType, Configuration conf) {
+ this(storageType, conf, null);
+ }
+
+ public CreateShuffleDeleteHandlerRequest(
+ String storageType, Configuration conf, String shuffleServerId) {
this.storageType = storageType;
this.conf = conf;
+ this.shuffleServerId = shuffleServerId;
}
public String getStorageType() {
@@ -36,4 +43,8 @@
public Configuration getConf() {
return conf;
}
+
+ public String getShuffleServerId() {
+ return shuffleServerId;
+ }
}