[Tiered Storage] Prevent Class Loader Leak; Restore Offloader Directory Override (#9878)
### Motivation
In Pulsar 2.7.0, there is a class loader leak. It looks like https://github.com/apache/pulsar/pull/8739 fixed the leak by only loading the offloader classes for the directory configured in `broker.conf`. However, the solution in https://github.com/apache/pulsar/pull/8739 ignores the fact that an offload policy can override the the offloaded directory. As such, there could be a regression in 2.7.1 if users are providing multiple offload directories.
This PR returns the functionality without reintroducing the class loader leak.
### Modifications
Update the `PulsarService` and the `PulsarConnectorCache` classes to use a map from directory strings to `Offloaders`.
### Alternative Approaches
The new `Map` has keys of type `String`, but we could use keys of type `Path` and then normalize the paths to ensure that `./offloaders` and `offloaders` result in a single class loader. However, it looks like the `normalize` method in the path class has a warning about symbolic links. As such, I went with the basic `String` approach, which might lead to some duplication of loaded classes. Below is the javadoc for `normalize`, in case that helps for a design decision.
```java
/**
* Returns a path that is this path with redundant name elements eliminated.
*
* <p> The precise definition of this method is implementation dependent but
* in general it derives from this path, a path that does not contain
* <em>redundant</em> name elements. In many file systems, the "{@code .}"
* and "{@code ..}" are special names used to indicate the current directory
* and parent directory. In such file systems all occurrences of "{@code .}"
* are considered redundant. If a "{@code ..}" is preceded by a
* non-"{@code ..}" name then both names are considered redundant (the
* process to identify such names is repeated until it is no longer
* applicable).
*
* <p> This method does not access the file system; the path may not locate
* a file that exists. Eliminating "{@code ..}" and a preceding name from a
* path may result in the path that locates a different file than the original
* path. This can arise when the preceding name is a symbolic link.
*
* @return the resulting path or this path if it does not contain
* redundant name elements; an empty path is returned if this path
* does have a root component and all name elements are redundant
*
* @see #getParent
* @see #toRealPath
*/
Path normalize();
```
### Verifying this change
This change is a code cleanup without any test coverage that should be covered by other tests. If required, I can create some tests.
diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index c665659..344b789 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -60,8 +60,10 @@
**/PrimitiveSchemaTest.java,
BlobStoreManagedLedgerOffloaderTest.java'
- $MVN_TEST_COMMAND -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java' \
- -DtestForkCount=1
+ $MVN_TEST_COMMAND -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java,
+ **/OffloadersCacheTest.java' \
+ -DtestForkCount=1 \
+ -DtestReuseFork=true
$MVN_TEST_COMMAND -pl pulsar-sql/presto-pulsar-plugin -Dinclude='**/TestPulsarKeyValueSchemaHandler.java' \
-DtestForkCount=1
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
index 5243691..bc747d7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
@@ -114,8 +114,8 @@
}
}
- public static Offloaders searchForOffloaders(String connectorsDirectory, String narExtractionDirectory) throws IOException {
- Path path = Paths.get(connectorsDirectory).toAbsolutePath();
+ public static Offloaders searchForOffloaders(String offloadersPath, String narExtractionDirectory) throws IOException {
+ Path path = Paths.get(offloadersPath).toAbsolutePath();
log.info("Searching for offloaders in {}", path);
Offloaders offloaders = new Offloaders();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
new file mode 100644
index 0000000..e80c75b
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of an Offloaders. The main purpose of this class is to
+ * ensure that an Offloaders directory is only loaded once.
+ */
+@Slf4j
+public class OffloadersCache implements AutoCloseable {
+
+ private Map<String, Offloaders> loadedOffloaders = new ConcurrentHashMap<>();
+
+ /**
+ * Method to load an Offloaders directory or to get an already loaded Offloaders directory.
+ *
+ * @param offloadersPath - the directory to search the offloaders nar files
+ * @param narExtractionDirectory - the directory to use for extraction
+ * @return the loaded offloaders class
+ * @throws IOException when fail to retrieve the pulsar offloader class
+ */
+ public Offloaders getOrLoadOffloaders(String offloadersPath, String narExtractionDirectory) {
+ return loadedOffloaders.computeIfAbsent(offloadersPath,
+ (directory) -> {
+ try {
+ return OffloaderUtils.searchForOffloaders(directory, narExtractionDirectory);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+ loadedOffloaders.values().forEach(offloaders -> {
+ try {
+ offloaders.close();
+ } catch (Exception e) {
+ log.error("Error while closing offloader.", e);
+ // Even if the offloader fails to close, the graceful shutdown process continues
+ }
+ });
+ // Don't want to hold on to references to closed offloaders
+ loadedOffloaders.clear();
+ }
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java
new file mode 100644
index 0000000..1c2cd85
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.testng.Assert.assertSame;
+
+@PrepareForTest({OffloaderUtils.class})
+@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.pulsar.common.nar.*"})
+public class OffloadersCacheTest {
+
+ // Necessary to make PowerMockito.mockStatic work with TestNG.
+ @ObjectFactory
+ public IObjectFactory getObjectFactory() {
+ return new org.powermock.modules.testng.PowerMockObjectFactory();
+ }
+
+ @Test
+ public void testLoadsOnlyOnce() throws Exception {
+ Offloaders expectedOffloaders = new Offloaders();
+
+ PowerMockito.mockStatic(OffloaderUtils.class);
+ PowerMockito.when(OffloaderUtils.searchForOffloaders(eq("./offloaders"), eq("/tmp")))
+ .thenReturn(expectedOffloaders);
+
+ OffloadersCache cache = new OffloadersCache();
+
+ // Call a first time to load the offloader
+ Offloaders offloaders1 = cache.getOrLoadOffloaders("./offloaders", "/tmp");
+
+ assertSame(offloaders1, expectedOffloaders, "The offloaders should be the mocked one.");
+
+ // Call a second time to get the stored offlaoder
+ Offloaders offloaders2 = cache.getOrLoadOffloaders("./offloaders", "/tmp");
+
+ assertSame(offloaders2, expectedOffloaders, "The offloaders should be the mocked one.");
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b32344b..aac8755 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -63,8 +63,8 @@
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
-import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
import org.apache.bookkeeper.mledger.offload.Offloaders;
+import org.apache.bookkeeper.mledger.offload.OffloadersCache;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
@@ -185,7 +185,7 @@
private final ScheduledExecutorService loadManagerExecutor;
private ScheduledExecutorService compactorExecutor;
private OrderedScheduler offloaderScheduler;
- private Offloaders offloaderManager = new Offloaders();
+ private OffloadersCache offloadersCache = new OffloadersCache();
private LedgerOffloader defaultOffloader;
private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
private ScheduledFuture<?> loadReportTask = null;
@@ -422,7 +422,7 @@
schemaRegistryService.close();
}
- offloaderManager.close();
+ offloadersCache.close();
if (protocolHandlers != null) {
protocolHandlers.close();
@@ -578,8 +578,6 @@
schemaRegistryService = SchemaRegistryService.create(
schemaStorage, config.getSchemaRegistryCompatibilityCheckers());
- this.offloaderManager = OffloaderUtils.searchForOffloaders(
- config.getOffloadersDirectory(), config.getNarExtractionDirectory());
this.defaultOffloader = createManagedLedgerOffloader(
OffloadPolicies.create(this.getConfiguration().getProperties()));
this.brokerInterceptor = BrokerInterceptors.load(config);
@@ -1078,7 +1076,10 @@
checkNotNull(offloadPolicies.getOffloadersDirectory(),
"Offloader driver is configured to be '%s' but no offloaders directory is configured.",
offloadPolicies.getManagedLedgerOffloadDriver());
- LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
+ Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
+ offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
+
+ LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
offloadPolicies.getManagedLedgerOffloadDriver());
try {
return offloaderFactory.create(
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 1530c79..6dfd8c5 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -35,8 +35,8 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
-import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
import org.apache.bookkeeper.mledger.offload.Offloaders;
+import org.apache.bookkeeper.mledger.offload.OffloadersCache;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
@@ -57,7 +57,7 @@
private final StatsProvider statsProvider;
private OrderedScheduler offloaderScheduler;
- private Offloaders offloaderManager;
+ private OffloadersCache offloadersCache = new OffloadersCache();
private LedgerOffloader defaultOffloader;
private Map<NamespaceName, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
@@ -154,9 +154,9 @@
checkNotNull(offloadPolicies.getOffloadersDirectory(),
"Offloader driver is configured to be '%s' but no offloaders directory is configured.",
offloadPolicies.getManagedLedgerOffloadDriver());
- this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory(),
+ Offloaders offloaders = offloadersCache.getOrLoadOffloaders(offloadPolicies.getOffloadersDirectory(),
pulsarConnectorConfig.getNarExtractionDirectory());
- LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
+ LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
offloadPolicies.getManagedLedgerOffloadDriver());
try {
@@ -194,8 +194,7 @@
instance.statsProvider.stop();
instance.managedLedgerFactory.shutdown();
instance.offloaderScheduler.shutdown();
- instance.offloaderManager.close();
- instance = null;
+ instance.offloadersCache.close();
}
}
}