HADOOP-18340. deleteOnExit does not work with S3AFileSystem (#4608)



Contributed by Huaxiang Sun
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index c49c368..1144cad 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -32,12 +32,14 @@
 import java.util.Collections;
 import java.util.Date;
 import java.util.EnumSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.Objects;
+import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -386,6 +388,12 @@
    */
   private ArnResource accessPoint;
 
+  /**
+   * A cache of files that should be deleted when the FileSystem is closed
+   * or the JVM is exited.
+   */
+  private final Set<Path> deleteOnExit = new TreeSet<>();
+
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -3064,6 +3072,24 @@
   @AuditEntryPoint
   public boolean delete(Path f, boolean recursive) throws IOException {
     checkNotClosed();
+    return deleteWithoutCloseCheck(f, recursive);
+  }
+
+  /**
+   * Same as delete(), except that it does not check if fs is closed.
+   *
+   * @param f the path to delete.
+   * @param recursive if path is a directory and set to
+   * true, the directory is deleted else throws an exception. In
+   * case of a file the recursive can be set to either true or false.
+   * @return true if the path existed and then was deleted; false if there
+   * was no path in the first place, or the corner cases of root path deletion
+   * have surfaced.
+   * @throws IOException due to inability to delete a directory or file.
+   */
+
+  @VisibleForTesting
+  protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException {
     final Path path = qualify(f);
     // span covers delete, getFileStatus, fake directory operations.
     try (AuditSpan span = createSpan(INVOCATION_DELETE.getSymbol(),
@@ -3806,6 +3832,61 @@
   }
 
   /**
+   * This override bypasses checking for existence.
+   *
+   * @param f the path to delete; this may be unqualified.
+   * @return true, always.   * @param f the path to delete.
+   * @return  true if deleteOnExit is successful, otherwise false.
+   * @throws IOException IO failure
+   */
+  @Override
+  public boolean deleteOnExit(Path f) throws IOException {
+    Path qualifedPath = makeQualified(f);
+    synchronized (deleteOnExit) {
+      deleteOnExit.add(qualifedPath);
+    }
+    return true;
+  }
+
+  /**
+   * Cancel the scheduled deletion of the path when the FileSystem is closed.
+   * @param f the path to cancel deletion
+   * @return true if the path was found in the delete-on-exit list.
+   */
+  @Override
+  public boolean cancelDeleteOnExit(Path f) {
+    Path qualifedPath = makeQualified(f);
+    synchronized (deleteOnExit) {
+      return deleteOnExit.remove(qualifedPath);
+    }
+  }
+
+  /**
+   * Delete all paths that were marked as delete-on-exit. This recursively
+   * deletes all files and directories in the specified paths. It does not
+   * check if file exists and filesystem is closed.
+   *
+   * The time to process this operation is {@code O(paths)}, with the actual
+   * time dependent on the time for existence and deletion operations to
+   * complete, successfully or not.
+   */
+  @Override
+  protected void processDeleteOnExit() {
+    synchronized (deleteOnExit) {
+      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
+        Path path = iter.next();
+        try {
+          deleteWithoutCloseCheck(path, true);
+        } catch (IOException e) {
+          LOG.info("Ignoring failure to deleteOnExit for path {}", path);
+          LOG.debug("The exception for deleteOnExit is {}", e);
+        }
+        iter.remove();
+      }
+    }
+  }
+
+  /**
    * Close the filesystem. This shuts down all transfers.
    * @throws IOException IO problem
    */
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADeleteOnExit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADeleteOnExit.java
new file mode 100644
index 0000000..31c58de
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADeleteOnExit.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3a;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+
+/**
+ * Test deleteOnExit for S3A.
+ * The following cases for deleteOnExit are tested:
+ *  1. A nonexist file, which is added to deleteOnExit set.
+ *  2. An existing file
+ *  3. A file is added to deleteOnExist set first, then created.
+ *  4. A directory with some files under it.
+ */
+public class ITestS3ADeleteOnExit extends AbstractS3ATestBase {
+
+  private static final String PARENT_DIR_PATH_STR = "testDeleteOnExitDir";
+  private static final String NON_EXIST_FILE_PATH_STR =
+          PARENT_DIR_PATH_STR + "/nonExistFile";
+  private static final String INORDER_FILE_PATH_STR =
+          PARENT_DIR_PATH_STR + "/inOrderFile";
+  private static final String OUT_OF_ORDER_FILE_PATH_STR =
+          PARENT_DIR_PATH_STR + "/outOfOrderFile";
+  private static final String SUBDIR_PATH_STR =
+          PARENT_DIR_PATH_STR + "/subDir";
+  private static final String FILE_UNDER_SUBDIR_PATH_STR =
+          SUBDIR_PATH_STR + "/subDirFile";
+
+  @Test
+  public void testDeleteOnExit() throws Exception {
+    FileSystem fs = getFileSystem();
+
+    // Get a new filesystem object which is same as fs.
+    FileSystem s3aFs = new S3AFileSystem();
+    s3aFs.initialize(fs.getUri(), fs.getConf());
+    Path nonExistFilePath = path(NON_EXIST_FILE_PATH_STR);
+    Path inOrderFilePath = path(INORDER_FILE_PATH_STR);
+    Path outOfOrderFilePath = path(OUT_OF_ORDER_FILE_PATH_STR);
+    Path subDirPath = path(SUBDIR_PATH_STR);
+    Path fileUnderSubDirPath = path(FILE_UNDER_SUBDIR_PATH_STR);
+
+    // 1. set up the test directory.
+    Path dir = path("testDeleteOnExitDir");
+    s3aFs.mkdirs(dir);
+
+    // 2. Add a nonexisting file to DeleteOnExit set.
+    s3aFs.deleteOnExit(nonExistFilePath);
+    assertPathDoesNotExist("File " + NON_EXIST_FILE_PATH_STR + " should not exist",
+            nonExistFilePath);
+
+    // 3. create a file and then add it to DeleteOnExit set.
+    byte[] data = dataset(16, 'a', 26);
+    createFile(s3aFs, inOrderFilePath, true, data);
+    assertPathExists("File " + INORDER_FILE_PATH_STR + " should exist", inOrderFilePath);
+    s3aFs.deleteOnExit(inOrderFilePath);
+
+    // 4. add a path to DeleteOnExit set first, then create it.
+    s3aFs.deleteOnExit(outOfOrderFilePath);
+    createFile(s3aFs, outOfOrderFilePath, true, data);
+    assertPathExists("File " + OUT_OF_ORDER_FILE_PATH_STR + " should exist",
+            outOfOrderFilePath);
+
+    // 5. create a subdirectory, a file under it,  and add subdirectory DeleteOnExit set.
+    s3aFs.mkdirs(subDirPath);
+    s3aFs.deleteOnExit(subDirPath);
+    createFile(s3aFs, fileUnderSubDirPath, true, data);
+    assertPathExists("Directory " + SUBDIR_PATH_STR + " should exist", subDirPath);
+    assertPathExists("File " + FILE_UNDER_SUBDIR_PATH_STR + " should exist",
+            fileUnderSubDirPath);
+
+    s3aFs.close();
+
+    // After s3aFs is closed, make sure that all files/directories in deleteOnExit
+    // set are deleted.
+    assertPathDoesNotExist("File " + NON_EXIST_FILE_PATH_STR + " should not exist",
+            nonExistFilePath);
+    assertPathDoesNotExist("File " + INORDER_FILE_PATH_STR + " should not exist",
+            inOrderFilePath);
+    assertPathDoesNotExist("File " + OUT_OF_ORDER_FILE_PATH_STR + " should not exist",
+            outOfOrderFilePath);
+    assertPathDoesNotExist("Directory " + SUBDIR_PATH_STR + " should not exist",
+            subDirPath);
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java
new file mode 100644
index 0000000..62a99d7
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+/**
+ * deleteOnExit test for S3A.
+ */
+public class TestS3ADeleteOnExit extends AbstractS3AMockTest {
+
+  static class TestS3AFileSystem extends S3AFileSystem {
+    private int deleteOnDnExitCount;
+
+    public int getDeleteOnDnExitCount() {
+      return deleteOnDnExitCount;
+    }
+
+    @Override
+    public boolean deleteOnExit(Path f) throws IOException {
+      deleteOnDnExitCount++;
+      return super.deleteOnExit(f);
+    }
+
+    // This is specifically designed for deleteOnExit processing.
+    // In this specific case, deleteWithoutCloseCheck() will only be called in the path of
+    // processDeleteOnExit.
+    @Override
+    protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException {
+      boolean result = super.deleteWithoutCloseCheck(f, recursive);
+      deleteOnDnExitCount--;
+      return result;
+    }
+  }
+
+  @Test
+  public void testDeleteOnExit() throws Exception {
+    Configuration conf = createConfiguration();
+    TestS3AFileSystem testFs  = new TestS3AFileSystem();
+    URI uri = URI.create(FS_S3A + "://" + BUCKET);
+    // unset S3CSE property from config to avoid pathIOE.
+    conf.unset(Constants.S3_ENCRYPTION_ALGORITHM);
+    testFs.initialize(uri, conf);
+    AmazonS3 testS3 = testFs.getAmazonS3ClientForTesting("mocking");
+
+    Path path = new Path("/file");
+    String key = path.toUri().getPath().substring(1);
+    ObjectMetadata meta = new ObjectMetadata();
+    meta.setContentLength(1L);
+    meta.setLastModified(new Date(2L));
+    when(testS3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
+            .thenReturn(meta);
+
+    testFs.deleteOnExit(path);
+    testFs.close();
+    assertEquals(0, testFs.getDeleteOnDnExitCount());
+  }
+
+  private ArgumentMatcher<GetObjectMetadataRequest> correctGetMetadataRequest(
+          String bucket, String key) {
+    return request -> request != null
+            && request.getBucketName().equals(bucket)
+            && request.getKey().equals(key);
+  }
+}