Remove `internalUpdateOffloadPolicies` to keep the same behavior update topic policy (#17236)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 231168e..26b208e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -53,9 +53,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
@@ -855,19 +853,6 @@
topicPolicies.setOffloadPolicies(offloadPolicies);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
- }).thenCompose(__ -> {
- //The policy update is asynchronous. Cache at this step may not be updated yet.
- //So we need to set the loader by the incoming offloadPolicies instead of topic policies cache.
- PartitionedTopicMetadata metadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
- if (metadata.partitions > 0) {
- List<CompletableFuture<Void>> futures = new ArrayList<>(metadata.partitions);
- for (int i = 0; i < metadata.partitions; i++) {
- futures.add(internalUpdateOffloadPolicies(offloadPolicies, topicName.getPartition(i)));
- }
- return FutureUtil.waitForAll(futures);
- } else {
- return internalUpdateOffloadPolicies(offloadPolicies, topicName);
- }
});
}
@@ -898,35 +883,6 @@
});
}
- private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPoliciesImpl offloadPolicies,
- TopicName topicName) {
- return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
- .thenAccept(optionalTopic -> {
- try {
- if (!optionalTopic.isPresent() || !topicName.isPersistent()) {
- return;
- }
- PersistentTopic persistentTopic = (PersistentTopic) optionalTopic.get();
- ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
- if (offloadPolicies == null) {
- LedgerOffloader namespaceOffloader =
- pulsar().getLedgerOffloaderMap().get(topicName.getNamespaceObject());
- LedgerOffloader topicOffloader = managedLedgerConfig.getLedgerOffloader();
- if (topicOffloader != null && topicOffloader != namespaceOffloader) {
- topicOffloader.close();
- }
- managedLedgerConfig.setLedgerOffloader(namespaceOffloader);
- } else {
- managedLedgerConfig.setLedgerOffloader(
- pulsar().createManagedLedgerOffloader(offloadPolicies));
- }
- persistentTopic.getManagedLedger().setConfig(managedLedgerConfig);
- } catch (PulsarServerException e) {
- throw new RestException(e);
- }
- });
- }
-
protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnSubscription(boolean applied,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 079f6de..0308988 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -329,12 +329,6 @@
admin.topics().removeOffloadPolicies(topicName);
Awaitility.await().untilAsserted(()
-> assertNull(admin.topics().getOffloadPolicies(topicName)));
- // topic level offloader should be closed
- if (isPartitioned) {
- verify(topicOffloader, times(partitionNum)).close();
- } else {
- verify(topicOffloader).close();
- }
if (isPartitioned) {
for (int i = 0; i < partitionNum; i++) {
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()