HADOOP-18340. deleteOnExit does not work with S3AFileSystem (#4608)
Contributed by Huaxiang Sun
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index c49c368..1144cad 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -32,12 +32,14 @@
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Objects;
+import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -386,6 +388,12 @@
*/
private ArnResource accessPoint;
+ /**
+ * A cache of files that should be deleted when the FileSystem is closed
+ * or the JVM is exited.
+ */
+ private final Set<Path> deleteOnExit = new TreeSet<>();
+
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
@@ -3064,6 +3072,24 @@
@AuditEntryPoint
public boolean delete(Path f, boolean recursive) throws IOException {
checkNotClosed();
+ return deleteWithoutCloseCheck(f, recursive);
+ }
+
+ /**
+ * Same as delete(), except that it does not check if fs is closed.
+ *
+ * @param f the path to delete.
+ * @param recursive if path is a directory and set to
+ * true, the directory is deleted else throws an exception. In
+ * case of a file the recursive can be set to either true or false.
+ * @return true if the path existed and then was deleted; false if there
+ * was no path in the first place, or the corner cases of root path deletion
+ * have surfaced.
+ * @throws IOException due to inability to delete a directory or file.
+ */
+
+ @VisibleForTesting
+ protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException {
final Path path = qualify(f);
// span covers delete, getFileStatus, fake directory operations.
try (AuditSpan span = createSpan(INVOCATION_DELETE.getSymbol(),
@@ -3806,6 +3832,61 @@
}
/**
+ * This override bypasses checking for existence.
+ *
+ * @param f the path to delete; this may be unqualified.
+ * @return true, always. * @param f the path to delete.
+ * @return true if deleteOnExit is successful, otherwise false.
+ * @throws IOException IO failure
+ */
+ @Override
+ public boolean deleteOnExit(Path f) throws IOException {
+ Path qualifedPath = makeQualified(f);
+ synchronized (deleteOnExit) {
+ deleteOnExit.add(qualifedPath);
+ }
+ return true;
+ }
+
+ /**
+ * Cancel the scheduled deletion of the path when the FileSystem is closed.
+ * @param f the path to cancel deletion
+ * @return true if the path was found in the delete-on-exit list.
+ */
+ @Override
+ public boolean cancelDeleteOnExit(Path f) {
+ Path qualifedPath = makeQualified(f);
+ synchronized (deleteOnExit) {
+ return deleteOnExit.remove(qualifedPath);
+ }
+ }
+
+ /**
+ * Delete all paths that were marked as delete-on-exit. This recursively
+ * deletes all files and directories in the specified paths. It does not
+ * check if file exists and filesystem is closed.
+ *
+ * The time to process this operation is {@code O(paths)}, with the actual
+ * time dependent on the time for existence and deletion operations to
+ * complete, successfully or not.
+ */
+ @Override
+ protected void processDeleteOnExit() {
+ synchronized (deleteOnExit) {
+ for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
+ Path path = iter.next();
+ try {
+ deleteWithoutCloseCheck(path, true);
+ } catch (IOException e) {
+ LOG.info("Ignoring failure to deleteOnExit for path {}", path);
+ LOG.debug("The exception for deleteOnExit is {}", e);
+ }
+ iter.remove();
+ }
+ }
+ }
+
+ /**
* Close the filesystem. This shuts down all transfers.
* @throws IOException IO problem
*/
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADeleteOnExit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADeleteOnExit.java
new file mode 100644
index 0000000..31c58de
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADeleteOnExit.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3a;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+
+/**
+ * Test deleteOnExit for S3A.
+ * The following cases for deleteOnExit are tested:
+ * 1. A nonexist file, which is added to deleteOnExit set.
+ * 2. An existing file
+ * 3. A file is added to deleteOnExist set first, then created.
+ * 4. A directory with some files under it.
+ */
+public class ITestS3ADeleteOnExit extends AbstractS3ATestBase {
+
+ private static final String PARENT_DIR_PATH_STR = "testDeleteOnExitDir";
+ private static final String NON_EXIST_FILE_PATH_STR =
+ PARENT_DIR_PATH_STR + "/nonExistFile";
+ private static final String INORDER_FILE_PATH_STR =
+ PARENT_DIR_PATH_STR + "/inOrderFile";
+ private static final String OUT_OF_ORDER_FILE_PATH_STR =
+ PARENT_DIR_PATH_STR + "/outOfOrderFile";
+ private static final String SUBDIR_PATH_STR =
+ PARENT_DIR_PATH_STR + "/subDir";
+ private static final String FILE_UNDER_SUBDIR_PATH_STR =
+ SUBDIR_PATH_STR + "/subDirFile";
+
+ @Test
+ public void testDeleteOnExit() throws Exception {
+ FileSystem fs = getFileSystem();
+
+ // Get a new filesystem object which is same as fs.
+ FileSystem s3aFs = new S3AFileSystem();
+ s3aFs.initialize(fs.getUri(), fs.getConf());
+ Path nonExistFilePath = path(NON_EXIST_FILE_PATH_STR);
+ Path inOrderFilePath = path(INORDER_FILE_PATH_STR);
+ Path outOfOrderFilePath = path(OUT_OF_ORDER_FILE_PATH_STR);
+ Path subDirPath = path(SUBDIR_PATH_STR);
+ Path fileUnderSubDirPath = path(FILE_UNDER_SUBDIR_PATH_STR);
+
+ // 1. set up the test directory.
+ Path dir = path("testDeleteOnExitDir");
+ s3aFs.mkdirs(dir);
+
+ // 2. Add a nonexisting file to DeleteOnExit set.
+ s3aFs.deleteOnExit(nonExistFilePath);
+ assertPathDoesNotExist("File " + NON_EXIST_FILE_PATH_STR + " should not exist",
+ nonExistFilePath);
+
+ // 3. create a file and then add it to DeleteOnExit set.
+ byte[] data = dataset(16, 'a', 26);
+ createFile(s3aFs, inOrderFilePath, true, data);
+ assertPathExists("File " + INORDER_FILE_PATH_STR + " should exist", inOrderFilePath);
+ s3aFs.deleteOnExit(inOrderFilePath);
+
+ // 4. add a path to DeleteOnExit set first, then create it.
+ s3aFs.deleteOnExit(outOfOrderFilePath);
+ createFile(s3aFs, outOfOrderFilePath, true, data);
+ assertPathExists("File " + OUT_OF_ORDER_FILE_PATH_STR + " should exist",
+ outOfOrderFilePath);
+
+ // 5. create a subdirectory, a file under it, and add subdirectory DeleteOnExit set.
+ s3aFs.mkdirs(subDirPath);
+ s3aFs.deleteOnExit(subDirPath);
+ createFile(s3aFs, fileUnderSubDirPath, true, data);
+ assertPathExists("Directory " + SUBDIR_PATH_STR + " should exist", subDirPath);
+ assertPathExists("File " + FILE_UNDER_SUBDIR_PATH_STR + " should exist",
+ fileUnderSubDirPath);
+
+ s3aFs.close();
+
+ // After s3aFs is closed, make sure that all files/directories in deleteOnExit
+ // set are deleted.
+ assertPathDoesNotExist("File " + NON_EXIST_FILE_PATH_STR + " should not exist",
+ nonExistFilePath);
+ assertPathDoesNotExist("File " + INORDER_FILE_PATH_STR + " should not exist",
+ inOrderFilePath);
+ assertPathDoesNotExist("File " + OUT_OF_ORDER_FILE_PATH_STR + " should not exist",
+ outOfOrderFilePath);
+ assertPathDoesNotExist("Directory " + SUBDIR_PATH_STR + " should not exist",
+ subDirPath);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java
new file mode 100644
index 0000000..62a99d7
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+/**
+ * deleteOnExit test for S3A.
+ */
+public class TestS3ADeleteOnExit extends AbstractS3AMockTest {
+
+ static class TestS3AFileSystem extends S3AFileSystem {
+ private int deleteOnDnExitCount;
+
+ public int getDeleteOnDnExitCount() {
+ return deleteOnDnExitCount;
+ }
+
+ @Override
+ public boolean deleteOnExit(Path f) throws IOException {
+ deleteOnDnExitCount++;
+ return super.deleteOnExit(f);
+ }
+
+ // This is specifically designed for deleteOnExit processing.
+ // In this specific case, deleteWithoutCloseCheck() will only be called in the path of
+ // processDeleteOnExit.
+ @Override
+ protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException {
+ boolean result = super.deleteWithoutCloseCheck(f, recursive);
+ deleteOnDnExitCount--;
+ return result;
+ }
+ }
+
+ @Test
+ public void testDeleteOnExit() throws Exception {
+ Configuration conf = createConfiguration();
+ TestS3AFileSystem testFs = new TestS3AFileSystem();
+ URI uri = URI.create(FS_S3A + "://" + BUCKET);
+ // unset S3CSE property from config to avoid pathIOE.
+ conf.unset(Constants.S3_ENCRYPTION_ALGORITHM);
+ testFs.initialize(uri, conf);
+ AmazonS3 testS3 = testFs.getAmazonS3ClientForTesting("mocking");
+
+ Path path = new Path("/file");
+ String key = path.toUri().getPath().substring(1);
+ ObjectMetadata meta = new ObjectMetadata();
+ meta.setContentLength(1L);
+ meta.setLastModified(new Date(2L));
+ when(testS3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
+ .thenReturn(meta);
+
+ testFs.deleteOnExit(path);
+ testFs.close();
+ assertEquals(0, testFs.getDeleteOnDnExitCount());
+ }
+
+ private ArgumentMatcher<GetObjectMetadataRequest> correctGetMetadataRequest(
+ String bucket, String key) {
+ return request -> request != null
+ && request.getBucketName().equals(bucket)
+ && request.getKey().equals(key);
+ }
+}