[Tiered Storage] Prevent Class Loader Leak; Restore Offloader Directory Override (#9878)

### Motivation

In Pulsar 2.7.0, there is a class loader leak. It looks like https://github.com/apache/pulsar/pull/8739 fixed the leak by only loading the offloader classes for the directory configured in `broker.conf`. However, the solution in https://github.com/apache/pulsar/pull/8739 ignores the fact that an offload policy can override the the offloaded directory. As such, there could be a regression in 2.7.1 if users are providing multiple offload directories.

This PR returns the functionality without reintroducing the class loader leak.

### Modifications

Update the `PulsarService` and the `PulsarConnectorCache` classes to use a map from directory strings to `Offloaders`.

### Alternative Approaches

The new `Map` has keys of type `String`, but we could use keys of type `Path` and then normalize the paths to ensure that `./offloaders` and `offloaders` result in a single class loader. However, it looks like the `normalize` method in the path class has a warning about symbolic links. As such, I went with the basic `String` approach, which might lead to some duplication of loaded classes. Below is the javadoc for `normalize`, in case that helps for a design decision.

```java
  /**
     * Returns a path that is this path with redundant name elements eliminated.
     *
     * <p> The precise definition of this method is implementation dependent but
     * in general it derives from this path, a path that does not contain
     * <em>redundant</em> name elements. In many file systems, the "{@code .}"
     * and "{@code ..}" are special names used to indicate the current directory
     * and parent directory. In such file systems all occurrences of "{@code .}"
     * are considered redundant. If a "{@code ..}" is preceded by a
     * non-"{@code ..}" name then both names are considered redundant (the
     * process to identify such names is repeated until it is no longer
     * applicable).
     *
     * <p> This method does not access the file system; the path may not locate
     * a file that exists. Eliminating "{@code ..}" and a preceding name from a
     * path may result in the path that locates a different file than the original
     * path. This can arise when the preceding name is a symbolic link.
     *
     * @return  the resulting path or this path if it does not contain
     *          redundant name elements; an empty path is returned if this path
     *          does have a root component and all name elements are redundant
     *
     * @see #getParent
     * @see #toRealPath
     */
    Path normalize();
```

### Verifying this change

This change is a code cleanup without any test coverage that should be covered by other tests. If required, I can create some tests.
diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index c665659..344b789 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -60,8 +60,10 @@
                                                 **/PrimitiveSchemaTest.java,
                                                 BlobStoreManagedLedgerOffloaderTest.java'
 
-  $MVN_TEST_COMMAND -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java' \
-                                       -DtestForkCount=1
+  $MVN_TEST_COMMAND -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java,
+                                                  **/OffloadersCacheTest.java' \
+                                       -DtestForkCount=1 \
+                                       -DtestReuseFork=true
 
   $MVN_TEST_COMMAND -pl pulsar-sql/presto-pulsar-plugin -Dinclude='**/TestPulsarKeyValueSchemaHandler.java' \
                                                         -DtestForkCount=1
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
index 5243691..bc747d7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
@@ -114,8 +114,8 @@
         }
     }
 
-    public static Offloaders searchForOffloaders(String connectorsDirectory, String narExtractionDirectory) throws IOException {
-        Path path = Paths.get(connectorsDirectory).toAbsolutePath();
+    public static Offloaders searchForOffloaders(String offloadersPath, String narExtractionDirectory) throws IOException {
+        Path path = Paths.get(offloadersPath).toAbsolutePath();
         log.info("Searching for offloaders in {}", path);
 
         Offloaders offloaders = new Offloaders();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
new file mode 100644
index 0000000..e80c75b
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of an Offloaders. The main purpose of this class is to
+ * ensure that an Offloaders directory is only loaded once.
+ */
+@Slf4j
+public class OffloadersCache implements AutoCloseable {
+
+    private Map<String, Offloaders> loadedOffloaders = new ConcurrentHashMap<>();
+
+    /**
+     * Method to load an Offloaders directory or to get an already loaded Offloaders directory.
+     *
+     * @param offloadersPath - the directory to search the offloaders nar files
+     * @param narExtractionDirectory - the directory to use for extraction
+     * @return the loaded offloaders class
+     * @throws IOException when fail to retrieve the pulsar offloader class
+     */
+    public Offloaders getOrLoadOffloaders(String offloadersPath, String narExtractionDirectory) {
+        return loadedOffloaders.computeIfAbsent(offloadersPath,
+                (directory) -> {
+                    try {
+                        return OffloaderUtils.searchForOffloaders(directory, narExtractionDirectory);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
+    @Override
+    public void close() {
+        loadedOffloaders.values().forEach(offloaders -> {
+            try {
+                offloaders.close();
+            } catch (Exception e) {
+                log.error("Error while closing offloader.", e);
+                // Even if the offloader fails to close, the graceful shutdown process continues
+            }
+        });
+        // Don't want to hold on to references to closed offloaders
+        loadedOffloaders.clear();
+    }
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java
new file mode 100644
index 0000000..1c2cd85
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.testng.Assert.assertSame;
+
+@PrepareForTest({OffloaderUtils.class})
+@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.pulsar.common.nar.*"})
+public class OffloadersCacheTest {
+
+    // Necessary to make PowerMockito.mockStatic work with TestNG.
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @Test
+    public void testLoadsOnlyOnce() throws Exception {
+        Offloaders expectedOffloaders = new Offloaders();
+
+        PowerMockito.mockStatic(OffloaderUtils.class);
+        PowerMockito.when(OffloaderUtils.searchForOffloaders(eq("./offloaders"), eq("/tmp")))
+                .thenReturn(expectedOffloaders);
+
+        OffloadersCache cache = new OffloadersCache();
+
+        // Call a first time to load the offloader
+        Offloaders offloaders1 = cache.getOrLoadOffloaders("./offloaders", "/tmp");
+
+        assertSame(offloaders1, expectedOffloaders, "The offloaders should be the mocked one.");
+
+        // Call a second time to get the stored offlaoder
+        Offloaders offloaders2 = cache.getOrLoadOffloaders("./offloaders", "/tmp");
+
+        assertSame(offloaders2, expectedOffloaders, "The offloaders should be the mocked one.");
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b32344b..aac8755 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -63,8 +63,8 @@
 import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
-import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
 import org.apache.bookkeeper.mledger.offload.Offloaders;
+import org.apache.bookkeeper.mledger.offload.OffloadersCache;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
@@ -185,7 +185,7 @@
     private final ScheduledExecutorService loadManagerExecutor;
     private ScheduledExecutorService compactorExecutor;
     private OrderedScheduler offloaderScheduler;
-    private Offloaders offloaderManager = new Offloaders();
+    private OffloadersCache offloadersCache = new OffloadersCache();
     private LedgerOffloader defaultOffloader;
     private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
     private ScheduledFuture<?> loadReportTask = null;
@@ -422,7 +422,7 @@
                 schemaRegistryService.close();
             }
 
-            offloaderManager.close();
+            offloadersCache.close();
 
             if (protocolHandlers != null) {
                 protocolHandlers.close();
@@ -578,8 +578,6 @@
             schemaRegistryService = SchemaRegistryService.create(
                     schemaStorage, config.getSchemaRegistryCompatibilityCheckers());
 
-            this.offloaderManager = OffloaderUtils.searchForOffloaders(
-                    config.getOffloadersDirectory(), config.getNarExtractionDirectory());
             this.defaultOffloader = createManagedLedgerOffloader(
                     OffloadPolicies.create(this.getConfiguration().getProperties()));
             this.brokerInterceptor = BrokerInterceptors.load(config);
@@ -1078,7 +1076,10 @@
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                     "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
-                LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
+                Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
+                        offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
+
+                LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
                         offloadPolicies.getManagedLedgerOffloadDriver());
                 try {
                     return offloaderFactory.create(
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 1530c79..6dfd8c5 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -35,8 +35,8 @@
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
-import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
 import org.apache.bookkeeper.mledger.offload.Offloaders;
+import org.apache.bookkeeper.mledger.offload.OffloadersCache;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.PulsarVersion;
@@ -57,7 +57,7 @@
 
     private final StatsProvider statsProvider;
     private OrderedScheduler offloaderScheduler;
-    private Offloaders offloaderManager;
+    private OffloadersCache offloadersCache = new OffloadersCache();
     private LedgerOffloader defaultOffloader;
     private Map<NamespaceName, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
 
@@ -154,9 +154,9 @@
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                         "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
-                this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory(),
+                Offloaders offloaders = offloadersCache.getOrLoadOffloaders(offloadPolicies.getOffloadersDirectory(),
                         pulsarConnectorConfig.getNarExtractionDirectory());
-                LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
+                LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
                         offloadPolicies.getManagedLedgerOffloadDriver());
 
                 try {
@@ -194,8 +194,7 @@
                 instance.statsProvider.stop();
                 instance.managedLedgerFactory.shutdown();
                 instance.offloaderScheduler.shutdown();
-                instance.offloaderManager.close();
-                instance = null;
+                instance.offloadersCache.close();
             }
         }
     }