HDFS-15689. allow/disallowSnapshot on EZ roots shouldn't fail due to trash provisioning/emptiness check (#2472)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index d9f1b46..861b6a9 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -3158,6 +3158,17 @@
     return getKeyProviderUri() != null;
   }
 
+  /**
+   * @return true if p is an encryption zone root
+   */
+  boolean isEZRoot(Path p) throws IOException {
+    EncryptionZone ez = getEZForPath(p.toUri().getPath());
+    if (ez == null) {
+      return false;
+    }
+    return ez.getPath().equals(p.toString());
+  }
+
   boolean isSnapshotTrashRootEnabled() throws IOException {
     return getServerDefaults().getSnapshotTrashRootEnabled();
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 076dbdd..fe2d077 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -2122,6 +2122,12 @@
    * @param p Path to a directory.
    */
   private void checkTrashRootAndRemoveIfEmpty(final Path p) throws IOException {
+    // If p is EZ root, skip the check
+    if (dfs.isHDFSEncryptionEnabled() && dfs.isEZRoot(p)) {
+      DFSClient.LOG.debug("{} is an encryption zone root. "
+          + "Skipping empty trash root check.", p);
+      return;
+    }
     Path trashRoot = new Path(p, FileSystem.TRASH_PREFIX);
     try {
       // listStatus has 4 possible outcomes here:
@@ -2139,9 +2145,10 @@
       } else {
         if (fileStatuses.length == 1
             && !fileStatuses[0].isDirectory()
-            && !fileStatuses[0].getPath().equals(p)) {
+            && fileStatuses[0].getPath().toUri().getPath().equals(
+                trashRoot.toString())) {
           // Ignore the trash path because it is not a directory.
-          DFSClient.LOG.warn("{} is not a directory.", trashRoot);
+          DFSClient.LOG.warn("{} is not a directory. Ignored.", trashRoot);
         } else {
           throw new IOException("Found non-empty trash root at " +
               trashRoot + ". Rename or delete it, then try again.");
@@ -3002,19 +3009,24 @@
     Path trashPath = new Path(path, FileSystem.TRASH_PREFIX);
     try {
       FileStatus trashFileStatus = getFileStatus(trashPath);
+      boolean throwException = false;
       String errMessage = "Can't provision trash for snapshottable directory " +
           pathStr + " because trash path " + trashPath.toString() +
           " already exists.";
       if (!trashFileStatus.isDirectory()) {
+        throwException = true;
         errMessage += "\r\n" +
             "WARNING: " + trashPath.toString() + " is not a directory.";
       }
       if (!trashFileStatus.getPermission().equals(trashPermission)) {
+        throwException = true;
         errMessage += "\r\n" +
             "WARNING: Permission of " + trashPath.toString() +
             " differs from provided permission " + trashPermission;
       }
-      throw new FileAlreadyExistsException(errMessage);
+      if (throwException) {
+        throw new FileAlreadyExistsException(errMessage);
+      }
     } catch (FileNotFoundException ignored) {
       // Trash path doesn't exist. Continue
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
index 716ec3a..3f086dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
@@ -67,7 +67,6 @@
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class HdfsAdmin {
-
   final private DistributedFileSystem dfs;
   public static final FsPermission TRASH_PERMISSION = new FsPermission(
       FsAction.ALL, FsAction.ALL, FsAction.ALL, true);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
index 7721cd6..278db06 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -93,6 +93,7 @@
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -926,6 +927,57 @@
   }
 
   @Test
+  public void testAllowSnapshotWhenTrashExists() throws Exception {
+    final Path dirPath = new Path("/ssdir3");
+    final Path trashRoot = new Path(dirPath, ".Trash");
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+
+    // Case 1: trash directory exists and permission matches
+    dfs.mkdirs(trashRoot);
+    dfs.setPermission(trashRoot, TRASH_PERMISSION);
+    // allowSnapshot should still succeed even when trash exists
+    assertEquals(0, ToolRunner.run(dfsAdmin,
+        new String[]{"-allowSnapshot", dirPath.toString()}));
+    // Clean up. disallowSnapshot should remove the empty trash
+    assertEquals(0, ToolRunner.run(dfsAdmin,
+        new String[]{"-disallowSnapshot", dirPath.toString()}));
+    assertFalse(dfs.exists(trashRoot));
+
+    // Case 2: trash directory exists and but permission doesn't match
+    dfs.mkdirs(trashRoot);
+    dfs.setPermission(trashRoot, new FsPermission((short)0755));
+    // allowSnapshot should fail here
+    assertEquals(-1, ToolRunner.run(dfsAdmin,
+        new String[]{"-allowSnapshot", dirPath.toString()}));
+    // Correct trash permission and retry
+    dfs.setPermission(trashRoot, TRASH_PERMISSION);
+    assertEquals(0, ToolRunner.run(dfsAdmin,
+        new String[]{"-allowSnapshot", dirPath.toString()}));
+    // Clean up
+    assertEquals(0, ToolRunner.run(dfsAdmin,
+        new String[]{"-disallowSnapshot", dirPath.toString()}));
+    assertFalse(dfs.exists(trashRoot));
+
+    // Case 3: trash directory path is taken by a file
+    dfs.create(trashRoot).close();
+    // allowSnapshot should fail here
+    assertEquals(-1, ToolRunner.run(dfsAdmin,
+        new String[]{"-allowSnapshot", dirPath.toString()}));
+    // Remove the file and retry
+    dfs.delete(trashRoot, false);
+    assertEquals(0, ToolRunner.run(dfsAdmin,
+        new String[]{"-allowSnapshot", dirPath.toString()}));
+    // Clean up
+    assertEquals(0, ToolRunner.run(dfsAdmin,
+        new String[]{"-disallowSnapshot", dirPath.toString()}));
+    assertFalse(dfs.exists(trashRoot));
+
+    // Cleanup
+    dfs.delete(dirPath, true);
+  }
+
+  @Test
   public void testAllowDisallowSnapshot() throws Exception {
     final Path dirPath = new Path("/ssdir1");
     final Path trashRoot = new Path(dirPath, ".Trash");