[#1680] improvement(server): Remove partial HDFS files that written by server self for expired apps (#1681)

### What changes were proposed in this pull request?

Only remove the hdfs files that written by server themself when application is purged for expired.

### Why are the changes needed?

Fix: #1680 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

UT
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 50ea004..8cf3e3e 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -774,7 +774,10 @@
       shuffleFlushManager.removeResources(appId);
       storageManager.removeResources(
           new AppPurgeEvent(
-              appId, shuffleTaskInfo.getUser(), new ArrayList<>(shuffleTaskInfo.getShuffleIds())));
+              appId,
+              shuffleTaskInfo.getUser(),
+              new ArrayList<>(shuffleTaskInfo.getShuffleIds()),
+              checkAppExpired));
       if (shuffleTaskInfo.hasHugePartition()) {
         ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
         ShuffleServerMetrics.gaugeHugePartitionNum.dec();
diff --git a/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java b/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
index 804d384..cb510d5 100644
--- a/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
@@ -20,12 +20,22 @@
 import java.util.List;
 
 public class AppPurgeEvent extends PurgeEvent {
+  private final boolean appExpired;
+
+  public AppPurgeEvent(String appId, String user, List<Integer> shuffleIds, boolean appExpired) {
+    super(appId, user, shuffleIds);
+    this.appExpired = appExpired;
+  }
 
   public AppPurgeEvent(String appId, String user, List<Integer> shuffleIds) {
-    super(appId, user, shuffleIds);
+    this(appId, user, shuffleIds, false);
   }
 
   public AppPurgeEvent(String appId, String user) {
-    super(appId, user, null);
+    this(appId, user, null);
+  }
+
+  public boolean isAppExpired() {
+    return appExpired;
   }
 }
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index 496c485..7ab9ef6 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -57,12 +57,14 @@
   private static final Logger LOG = LoggerFactory.getLogger(HadoopStorageManager.class);
 
   private final Configuration hadoopConf;
+  private final String shuffleServerId;
   private Map<String, HadoopStorage> appIdToStorages = JavaUtils.newConcurrentMap();
   private Map<String, HadoopStorage> pathToStorages = JavaUtils.newConcurrentMap();
 
   HadoopStorageManager(ShuffleServerConf conf) {
     super(conf);
     hadoopConf = conf.getHadoopConf();
+    shuffleServerId = conf.getString(ShuffleServerConf.SHUFFLE_SERVER_ID, "shuffleServerId");
   }
 
   @Override
@@ -94,15 +96,19 @@
     String appId = event.getAppId();
     HadoopStorage storage = getStorageByAppId(appId);
     if (storage != null) {
+      boolean purgeForExpired = false;
       if (event instanceof AppPurgeEvent) {
         storage.removeHandlers(appId);
         appIdToStorages.remove(appId);
+        purgeForExpired = ((AppPurgeEvent) event).isAppExpired();
       }
       ShuffleDeleteHandler deleteHandler =
           ShuffleHandlerFactory.getInstance()
               .createShuffleDeleteHandler(
                   new CreateShuffleDeleteHandlerRequest(
-                      StorageType.HDFS.name(), storage.getConf()));
+                      StorageType.HDFS.name(),
+                      storage.getConf(),
+                      purgeForExpired ? shuffleServerId : null));
 
       String basicPath =
           ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
diff --git a/server/src/test/java/org/apache/uniffle/server/storage/HadoopStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/HadoopStorageManagerTest.java
index c9fee74..663a7bf 100644
--- a/server/src/test/java/org/apache/uniffle/server/storage/HadoopStorageManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/storage/HadoopStorageManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.server.storage;
 
+import java.io.File;
 import java.util.Arrays;
 import java.util.Map;
 
@@ -26,6 +27,7 @@
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.server.ShuffleServerConf;
@@ -33,11 +35,14 @@
 import org.apache.uniffle.server.event.AppPurgeEvent;
 import org.apache.uniffle.server.event.ShufflePurgeEvent;
 import org.apache.uniffle.storage.common.HadoopStorage;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class HadoopStorageManagerTest {
 
@@ -112,4 +117,86 @@
     assertNull(hs3.getConf().get("k2"));
     assertNull(hs3.getConf().get("k3"));
   }
+
+  @Test
+  public void testRemoveExpiredResourcesWithTwoReplicas(@TempDir File remoteBasePath)
+      throws Exception {
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.setString(
+        ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
+    String shuffleServerId = "127.0.0.1:19999";
+    conf.setString(ShuffleServerConf.SHUFFLE_SERVER_ID, shuffleServerId);
+    HadoopStorageManager hadoopStorageManager = new HadoopStorageManager(conf);
+    final String remoteStoragePath1 = new File(remoteBasePath, "path1").getAbsolutePath();
+    String appId = "testRemoveExpiredResources";
+    hadoopStorageManager.registerRemoteStorage(
+        appId, new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", "v1", "k2", "v2")));
+    Map<String, HadoopStorage> appStorageMap = hadoopStorageManager.getAppIdToStorages();
+
+    HadoopStorage storage = appStorageMap.get(appId);
+    String appPath = ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
+    File appPathFile = new File(appPath);
+    File partitionDir = new File(appPathFile, "1/1-1/");
+    partitionDir.mkdirs();
+    // Simulate the case that there are two shuffle servers write data.
+    File dataFile = new File(partitionDir, shuffleServerId + "_1.data");
+    dataFile.createNewFile();
+    File dataFile2 = new File(partitionDir, "shuffleserver2_1.data");
+    dataFile2.createNewFile();
+    assertTrue(partitionDir.exists());
+    // Purged for expired
+    assertEquals(1, appStorageMap.size());
+    AppPurgeEvent shufflePurgeEvent = new AppPurgeEvent(appId, "", null, true);
+    hadoopStorageManager.removeResources(shufflePurgeEvent);
+    assertEquals(0, appStorageMap.size());
+    // The directory of the partition should have not been deleted, for it was not empty.
+    assertTrue(partitionDir.exists());
+    assertFalse(dataFile.exists());
+    assertTrue(dataFile2.exists());
+
+    // Purged for unregister
+    AppPurgeEvent appPurgeEvent = new AppPurgeEvent(appId, "");
+    hadoopStorageManager.removeResources(appPurgeEvent);
+    assertEquals(0, appStorageMap.size());
+    assertFalse(appPathFile.exists());
+  }
+
+  @Test
+  public void testRemoveExpiredResourcesWithOneReplica(@TempDir File remoteBasePath)
+      throws Exception {
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.setString(
+        ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
+    String shuffleServerId = "127.0.0.1:19999";
+    conf.setString(ShuffleServerConf.SHUFFLE_SERVER_ID, shuffleServerId);
+    HadoopStorageManager hadoopStorageManager = new HadoopStorageManager(conf);
+    final String remoteStoragePath1 = new File(remoteBasePath, "path1").getAbsolutePath();
+    String appId = "testRemoveExpiredResources2";
+    hadoopStorageManager.registerRemoteStorage(
+        appId, new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", "v1", "k2", "v2")));
+    Map<String, HadoopStorage> appStorageMap = hadoopStorageManager.getAppIdToStorages();
+
+    HadoopStorage storage = appStorageMap.get(appId);
+    String appPath = ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
+    File appPathFile = new File(appPath);
+    File partitionDir = new File(appPathFile, "1/1-1/");
+    partitionDir.mkdirs();
+    // Simulate the case that only one shuffle server writes data.
+    File dataFile = new File(partitionDir, shuffleServerId + "_1.data");
+    dataFile.createNewFile();
+    assertTrue(partitionDir.exists());
+    // purged for expired
+    assertEquals(1, appStorageMap.size());
+    AppPurgeEvent shufflePurgeEvent = new AppPurgeEvent(appId, "", null, true);
+    hadoopStorageManager.removeResources(shufflePurgeEvent);
+    assertEquals(0, appStorageMap.size());
+    // The directory of the application should have been deleted, for it was empty.
+    assertFalse(partitionDir.exists());
+
+    // purged for unregister
+    AppPurgeEvent appPurgeEvent = new AppPurgeEvent(appId, "");
+    hadoopStorageManager.removeResources(appPurgeEvent);
+    assertEquals(0, appStorageMap.size());
+    assertFalse(appPathFile.exists());
+  }
 }
diff --git a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index 819c26e..eac9584 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -185,7 +185,7 @@
   public ShuffleDeleteHandler createShuffleDeleteHandler(
       CreateShuffleDeleteHandlerRequest request) {
     if (StorageType.HDFS.name().equals(request.getStorageType())) {
-      return new HadoopShuffleDeleteHandler(request.getConf());
+      return new HadoopShuffleDeleteHandler(request.getConf(), request.getShuffleServerId());
     } else if (StorageType.LOCALFILE.name().equals(request.getStorageType())) {
       return new LocalFileDeleteHandler();
     } else {
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
index e3daa8e..d48d724 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
@@ -17,7 +17,11 @@
 
 package org.apache.uniffle.storage.handler.impl;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -29,11 +33,13 @@
 public class HadoopShuffleDeleteHandler implements ShuffleDeleteHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(HadoopShuffleDeleteHandler.class);
+  private final String shuffleServerId;
 
   private Configuration hadoopConf;
 
-  public HadoopShuffleDeleteHandler(Configuration hadoopConf) {
+  public HadoopShuffleDeleteHandler(Configuration hadoopConf, String shuffleServerId) {
     this.hadoopConf = hadoopConf;
+    this.shuffleServerId = shuffleServerId;
   }
 
   @Override
@@ -52,7 +58,7 @@
       while (!isSuccess && times < retryMax) {
         try {
           FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user, path, hadoopConf);
-          fileSystem.delete(path, true);
+          delete(fileSystem, path, shuffleServerId);
           isSuccess = true;
         } catch (Exception e) {
           times++;
@@ -86,4 +92,25 @@
       }
     }
   }
+
+  private void delete(FileSystem fileSystem, Path path, String filePrefix) throws IOException {
+    if (filePrefix == null) {
+      fileSystem.delete(path, true);
+      return;
+    }
+    FileStatus[] fileStatuses = fileSystem.listStatus(path);
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        delete(fileSystem, fileStatus.getPath(), filePrefix);
+      } else {
+        if (fileStatus.getPath().getName().startsWith(filePrefix)) {
+          fileSystem.delete(fileStatus.getPath(), true);
+        }
+      }
+    }
+    ContentSummary contentSummary = fileSystem.getContentSummary(path);
+    if (contentSummary.getFileCount() == 0) {
+      fileSystem.delete(path, true);
+    }
+  }
 }
diff --git a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
index 6df61e5..b8eda28 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
@@ -23,10 +23,17 @@
 
   private String storageType;
   private Configuration conf;
+  private String shuffleServerId;
 
   public CreateShuffleDeleteHandlerRequest(String storageType, Configuration conf) {
+    this(storageType, conf, null);
+  }
+
+  public CreateShuffleDeleteHandlerRequest(
+      String storageType, Configuration conf, String shuffleServerId) {
     this.storageType = storageType;
     this.conf = conf;
+    this.shuffleServerId = shuffleServerId;
   }
 
   public String getStorageType() {
@@ -36,4 +43,8 @@
   public Configuration getConf() {
     return conf;
   }
+
+  public String getShuffleServerId() {
+    return shuffleServerId;
+  }
 }