SOLR-16866: Reorder nested directory deletions to avoid logging spurious NoSuchFileException (#2349)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index c30d05b..1ce3bd0 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -189,6 +189,8 @@
 
 * SOLR-11535: Fix race condition in singleton-per-collection StateWatcher creation (Michael Gibney)
 
+* SOLR-16866: Reorder nested directory deletions to avoid logging spurious NoSuchFileException (Michael Gibney)
+
 Dependency Upgrades
 ---------------------
 
diff --git a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
index 8a5766f..945642f 100644
--- a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
+++ b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
@@ -30,6 +30,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.LockFactory;
@@ -70,13 +71,9 @@
     public boolean closeCacheValueCalled = false;
     public boolean doneWithDir = false;
     private boolean deleteAfterCoreClose = false;
-    public Set<CacheValue> removeEntries = new HashSet<>();
     public Set<CacheValue> closeEntries = new HashSet<>();
 
     public void setDeleteOnClose(boolean deleteOnClose, boolean deleteAfterCoreClose) {
-      if (deleteOnClose) {
-        removeEntries.add(this);
-      }
       this.deleteOnClose = deleteOnClose;
       this.deleteAfterCoreClose = deleteAfterCoreClose;
     }
@@ -210,7 +207,7 @@
         }
       }
 
-      for (CacheValue val : removeEntries) {
+      for (CacheValue val : sorted(removeEntries)) {
         log.debug("Removing directory after core close: {}", val.path);
         try {
           removeDirectory(val);
@@ -246,47 +243,42 @@
       }
     }
     cacheValue.closeCacheValueCalled = true;
-    if (cacheValue.deleteOnClose) {
-      // see if we are a subpath
-      Collection<CacheValue> values = byPathCache.values();
-
-      Collection<CacheValue> cacheValues = new ArrayList<>(values);
-      cacheValues.remove(cacheValue);
-      for (CacheValue otherCacheValue : cacheValues) {
-        // if we are a parent path and a sub path is not already closed, get a sub path to close us
-        // later
-        if (isSubPath(cacheValue, otherCacheValue) && !otherCacheValue.closeCacheValueCalled) {
-          // we let the sub dir remove and close us
-          if (!otherCacheValue.deleteAfterCoreClose && cacheValue.deleteAfterCoreClose) {
-            otherCacheValue.deleteAfterCoreClose = true;
-          }
-          otherCacheValue.removeEntries.addAll(cacheValue.removeEntries);
-          otherCacheValue.closeEntries.addAll(cacheValue.closeEntries);
-          cacheValue.closeEntries.clear();
-          cacheValue.removeEntries.clear();
-          return false;
-        }
-      }
+    if (cacheValue.deleteOnClose && maybeDeferClose(cacheValue)) {
+      // we will be closed by a child path
+      return false;
     }
 
     boolean cl = false;
-    for (CacheValue val : cacheValue.closeEntries) {
-      close(val);
+    for (CacheValue val : sorted(cacheValue.closeEntries)) {
+      if (!val.deleteOnClose) {
+        // just a simple close, do it unconditionally
+        close(val);
+      } else {
+        if (maybeDeferClose(val)) {
+          // parent path must have been arbitrarily associated with one of multiple
+          // subpaths; so, assuming this one of the subpaths, we now delegate to
+          // another live subpath.
+          assert val != cacheValue; // else would already have been deferred
+          continue;
+        }
+        close(val);
+        if (!val.deleteAfterCoreClose) {
+          log.debug("Removing directory before core close: {}", val.path);
+          try {
+            removeDirectory(val);
+          } catch (Exception e) {
+            log.error("Error removing directory {} before core close", val.path, e);
+          }
+        } else {
+          removeEntries.add(val);
+        }
+      }
       if (val == cacheValue) {
         cl = true;
-      }
-    }
-
-    for (CacheValue val : cacheValue.removeEntries) {
-      if (!val.deleteAfterCoreClose) {
-        log.debug("Removing directory before core close: {}", val.path);
-        try {
-          removeDirectory(val);
-        } catch (Exception e) {
-          log.error("Error removing directory {} before core close", val.path, e);
-        }
       } else {
-        removeEntries.add(val);
+        // this was a deferred close, so it's our responsibility to remove it from cache
+        assert val.closeEntries.isEmpty();
+        removeFromCache(val);
       }
     }
 
@@ -302,6 +294,51 @@
     return cl;
   }
 
+  private static Iterable<CacheValue> sorted(Set<CacheValue> vals) {
+    if (vals.size() < 2) {
+      return vals;
+    }
+    // here we reverse-sort entries by path, in order to trivially ensure that
+    // subpaths are removed before parent paths.
+    return vals.stream().sorted((a, b) -> b.path.compareTo(a.path)).collect(Collectors.toList());
+  }
+
+  private boolean maybeDeferClose(CacheValue maybeDefer) {
+    assert maybeDefer.deleteOnClose;
+    for (CacheValue maybeChildPath : byPathCache.values()) {
+      // if we are a parent path and a sub path is not already closed, get a sub path to close us
+      // later
+      if (maybeDefer != maybeChildPath
+          && isSubPath(maybeDefer, maybeChildPath)
+          && !maybeChildPath.closeCacheValueCalled) {
+        // we let the sub dir remove and close us
+        if (maybeChildPath.deleteAfterCoreClose && !maybeDefer.deleteAfterCoreClose) {
+          // if we need to hold onto the child path until after core close, then don't allow
+          // the parent path to be deleted before!
+          maybeDefer.deleteAfterCoreClose = true;
+        }
+        if (maybeDefer.closeEntries.isEmpty()) {
+          // we've already been deferred
+          maybeChildPath.closeEntries.add(maybeDefer);
+        } else {
+          maybeChildPath.closeEntries.addAll(maybeDefer.closeEntries);
+          maybeDefer.closeEntries.clear();
+        }
+        return true;
+      }
+    }
+    if (!maybeDefer.deleteAfterCoreClose) {
+      // check whether we need to order ourselves after potential subpath `deleteAfterCoreClose`
+      for (CacheValue maybeChildPath : removeEntries) {
+        if (isSubPath(maybeDefer, maybeChildPath)) {
+          maybeDefer.deleteAfterCoreClose = true;
+          break;
+        }
+      }
+    }
+    return false;
+  }
+
   private void close(CacheValue val) {
     if (log.isDebugEnabled()) {
       log.debug(
@@ -324,7 +361,7 @@
     }
   }
 
-  private boolean isSubPath(CacheValue cacheValue, CacheValue otherCacheValue) {
+  private static boolean isSubPath(CacheValue cacheValue, CacheValue otherCacheValue) {
     int one = cacheValue.path.lastIndexOf('/');
     int two = otherCacheValue.path.lastIndexOf('/');
 
diff --git a/solr/core/src/test/org/apache/solr/core/CachingDirectoryFactoryTest.java b/solr/core/src/test/org/apache/solr/core/CachingDirectoryFactoryTest.java
index 7ce94dc..404a798 100644
--- a/solr/core/src/test/org/apache/solr/core/CachingDirectoryFactoryTest.java
+++ b/solr/core/src/test/org/apache/solr/core/CachingDirectoryFactoryTest.java
@@ -16,18 +16,29 @@
  */
 package org.apache.solr.core;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.nio.file.Path;
+import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
+import org.apache.commons.io.file.PathUtils;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.tests.util.LuceneTestCase;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.CollectionUtil;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.DirectoryFactory.DirContext;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -47,6 +58,89 @@
   }
 
   @Test
+  public void reorderingTest() throws Exception {
+    Path tmpDir = LuceneTestCase.createTempDir();
+    Random r = random();
+    try (MMapDirectoryFactory df = new MMapDirectoryFactory()) {
+      df.init(new NamedList<>());
+      Path pathA = tmpDir.resolve("a");
+      Directory a = df.get(pathA.toString(), DirContext.DEFAULT, DirectoryFactory.LOCK_TYPE_SINGLE);
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      Map.Entry<String, Directory>[] subdirs = new Map.Entry[26];
+      BooleanSupplier removeAfter;
+      boolean alwaysBefore = false;
+      switch (r.nextInt(3)) {
+        case 0:
+          removeAfter = () -> true;
+          break;
+        case 1:
+          removeAfter = () -> false;
+          alwaysBefore = true;
+          break;
+        case 2:
+          removeAfter = r::nextBoolean;
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+      int i = 0;
+      for (char c = 'a'; c <= 'z'; c++) {
+        String subpath = pathA.resolve(Character.toString(c)).toString();
+        Directory d = df.get(subpath, DirContext.DEFAULT, DirectoryFactory.LOCK_TYPE_SINGLE);
+        subdirs[i++] = new AbstractMap.SimpleImmutableEntry<>(subpath, d);
+      }
+      Set<String> deleteAfter = CollectionUtil.newHashSet(subdirs.length + 1);
+      String pathAString = pathA.toString();
+      df.remove(pathAString, addIfTrue(deleteAfter, pathAString, removeAfter.getAsBoolean()));
+      df.doneWithDirectory(a);
+      df.release(a);
+      assertTrue(pathA.toFile().exists()); // we know there are subdirs that should prevent removal
+      Collections.shuffle(Arrays.asList(subdirs), r);
+      for (Map.Entry<String, Directory> e : subdirs) {
+        boolean after = removeAfter.getAsBoolean();
+        String pathString = e.getKey();
+        Directory d = e.getValue();
+        df.remove(pathString, addIfTrue(deleteAfter, pathString, after));
+        df.doneWithDirectory(d);
+        df.release(d);
+        boolean exists = Path.of(pathString).toFile().exists();
+        if (after) {
+          assertTrue(exists);
+        } else {
+          assertFalse(exists);
+        }
+      }
+      if (alwaysBefore) {
+        assertTrue(deleteAfter.isEmpty());
+      }
+      if (deleteAfter.isEmpty()) {
+        assertTrue(PathUtils.isEmpty(tmpDir));
+      } else {
+        assertTrue(pathA.toFile().exists()); // parent must still be present
+        for (Map.Entry<String, Directory> e : subdirs) {
+          String pathString = e.getKey();
+          boolean exists = new File(pathString).exists();
+          if (deleteAfter.contains(pathString)) {
+            assertTrue(exists);
+          } else {
+            assertFalse(exists);
+          }
+        }
+      }
+    }
+    assertTrue(PathUtils.isEmpty(tmpDir));
+  }
+
+  private static boolean addIfTrue(Set<String> deleteAfter, String path, boolean after) {
+    if (after) {
+      deleteAfter.add(path);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Test
   public void stressTest() throws Exception {
     doStressTest(new RAMDirectoryFactory());
     doStressTest(new ByteBuffersDirectoryFactory());