[feat][broker][PIP-278] Support pluggable topic compaction service - part2 (#20718)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 40c5a2d..4ffb5b7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -147,9 +147,11 @@
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.common.util.ThreadDumpUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor;
-import org.apache.pulsar.compaction.TwoPhaseCompactor;
+import org.apache.pulsar.compaction.TopicCompactionService;
 import org.apache.pulsar.functions.worker.ErrorNotifier;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -198,7 +200,7 @@
     private WebSocketService webSocketService = null;
     private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED;
     private BookKeeperClientFactory bkClientFactory;
-    private Compactor compactor;
+    protected CompactionServiceFactory compactionServiceFactory;
     private StrategicTwoPhaseCompactor strategicCompactor;
     private ResourceUsageTransportManager resourceUsageTransportManager;
     private ResourceGroupService resourceGroupServiceManager;
@@ -452,6 +454,15 @@
 
             resetMetricsServlet();
 
+            if (this.compactionServiceFactory != null) {
+                try {
+                    this.compactionServiceFactory.close();
+                } catch (Exception e) {
+                    LOG.warn("CompactionServiceFactory closing failed {}", e.getMessage());
+                }
+                this.compactionServiceFactory = null;
+            }
+
             if (this.webSocketService != null) {
                 this.webSocketService.close();
             }
@@ -813,6 +824,9 @@
             this.brokerServiceUrl = brokerUrl(config);
             this.brokerServiceUrlTls = brokerUrlTls(config);
 
+            if (this.compactionServiceFactory == null) {
+                this.compactionServiceFactory = loadCompactionServiceFactory();
+            }
 
             if (null != this.webSocketService) {
                 ClusterDataImpl clusterData = ClusterDataImpl.builder()
@@ -1475,25 +1489,16 @@
         return this.compactorExecutor;
     }
 
-    // only public so mockito can mock it
-    public Compactor newCompactor() throws PulsarServerException {
-        return new TwoPhaseCompactor(this.getConfiguration(),
-                getClient(), getBookKeeperClient(),
-                getCompactorExecutor());
-    }
-
-    public synchronized Compactor getCompactor() throws PulsarServerException {
-        if (this.compactor == null) {
-            this.compactor = newCompactor();
-        }
-        return this.compactor;
-    }
-
     // This method is used for metrics, which is allowed to as null
     // Because it's no operation on the compactor, so let's remove the  synchronized on this method
     // to avoid unnecessary lock competition.
+    // Only the pulsar's compaction service provides the compaction stats. The compaction service plugin,
+    // it should be done by the plugin itself to expose the compaction metrics.
     public Compactor getNullableCompactor() {
-        return this.compactor;
+        if (this.compactionServiceFactory instanceof PulsarCompactionServiceFactory pulsarCompactedServiceFactory) {
+            return pulsarCompactedServiceFactory.getNullableCompactor();
+        }
+        return null;
     }
 
     public StrategicTwoPhaseCompactor newStrategicCompactor() throws PulsarServerException {
@@ -1911,4 +1916,22 @@
     protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
         return new BrokerService(pulsar, ioEventLoopGroup);
     }
+
+    private CompactionServiceFactory loadCompactionServiceFactory() {
+        String compactionServiceFactoryClassName = config.getCompactionServiceFactoryClassName();
+        var compactionServiceFactory =
+                Reflections.createInstance(compactionServiceFactoryClassName, CompactionServiceFactory.class,
+                        Thread.currentThread().getContextClassLoader());
+        compactionServiceFactory.initialize(this).join();
+        return compactionServiceFactory;
+    }
+
+    public CompletableFuture<TopicCompactionService> newTopicCompactionService(String topic) {
+        try {
+            CompactionServiceFactory compactionServiceFactory = this.getCompactionServiceFactory();
+            return compactionServiceFactory.newTopicCompactionService(topic);
+        } catch (Throwable e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index f91793c..bf55dda 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2077,64 +2077,73 @@
 
         // If it's not pointing to a valid entry, respond messageId of the current position.
         // If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
-        Optional<Position> compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon();
-        if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
-                        && lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0)) {
-            handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
-                    markDeletePosition);
-            return;
-        }
+        CompletableFuture<Position> compactionHorizonFuture =
+                persistentTopic.getTopicCompactionService().getLastCompactedPosition();
 
-        // For a valid position, we read the entry out and parse the batch size from its metadata.
-        CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
-        ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() {
-            @Override
-            public void readEntryComplete(Entry entry, Object ctx) {
-                entryFuture.complete(entry);
+        compactionHorizonFuture.whenComplete((compactionHorizon, ex) -> {
+            if (ex != null) {
+                log.error("Failed to get compactionHorizon.", ex);
+                writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, ex.getMessage()));
+                return;
             }
 
-            @Override
-            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
-                entryFuture.completeExceptionally(exception);
+            if (lastPosition.getEntryId() == -1 || (compactionHorizon != null
+                    && lastPosition.compareTo((PositionImpl) compactionHorizon) <= 0)) {
+                handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
+                        markDeletePosition);
+                return;
             }
-        }, null);
 
-        CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
-            MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
-            int batchSize = metadata.getNumMessagesInBatch();
-            entry.release();
-            return metadata.hasNumMessagesInBatch() ? batchSize : -1;
-        });
+            // For a valid position, we read the entry out and parse the batch size from its metadata.
+            CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
+            ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    entryFuture.complete(entry);
+                }
 
-        batchSizeFuture.whenComplete((batchSize, e) -> {
-            if (e != null) {
-                if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
-                    handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
-                            markDeletePosition);
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                    entryFuture.completeExceptionally(exception);
+                }
+            }, null);
+
+            CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
+                MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
+                int batchSize = metadata.getNumMessagesInBatch();
+                entry.release();
+                return metadata.hasNumMessagesInBatch() ? batchSize : -1;
+            });
+
+            batchSizeFuture.whenComplete((batchSize, e) -> {
+                if (e != null) {
+                    if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
+                        handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
+                                markDeletePosition);
+                    } else {
+                        writeAndFlush(Commands.newError(
+                                requestId, ServerError.MetadataError,
+                                "Failed to get batch size for entry " + e.getMessage()));
+                    }
                 } else {
-                    writeAndFlush(Commands.newError(
-                            requestId, ServerError.MetadataError,
-                            "Failed to get batch size for entry " + e.getMessage()));
-                }
-            } else {
-                int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
+                    int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
 
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
-                            topic.getName(), subscriptionName, lastPosition, partitionIndex);
-                }
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
+                                topic.getName(), subscriptionName, lastPosition, partitionIndex);
+                    }
 
-                writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(),
-                        lastPosition.getEntryId(), partitionIndex, largestBatchIndex,
-                        markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
-                        markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
-            }
+                    writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(),
+                            lastPosition.getEntryId(), partitionIndex, largestBatchIndex,
+                            markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
+                            markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
+                }
+            });
         });
     }
-
-    private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTopic, long requestId,
-            int partitionIndex, PositionImpl markDeletePosition) {
-        persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry -> {
+    private void handleLastMessageIdFromCompactionService(PersistentTopic persistentTopic, long requestId,
+                                                          int partitionIndex, PositionImpl markDeletePosition) {
+        persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> {
             if (entry != null) {
                 try {
                     // in this case, all the data has been compacted, so return the last position
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index d9d0f6a..6de113d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -50,9 +50,11 @@
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.compaction.CompactedTopicUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -347,8 +349,9 @@
                 }
                 havePendingRead = true;
                 if (consumer.readCompacted()) {
-                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
-                            this, consumer);
+                    boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId());
+                    CompactedTopicUtils.readCompactedEntries(topic.getTopicCompactionService(), cursor, messagesToRead,
+                            readFromEarliest, this, consumer);
                 } else {
                     ReadEntriesCtx readEntriesCtx =
                             ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 09a9961..1b8b47d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -772,16 +772,26 @@
             log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset",
                     topicName, subName);
 
-            try {
-                boolean forceReset = false;
-                if (topic.getCompactedTopic() != null && topic.getCompactedTopic().getCompactionHorizon().isPresent()) {
-                    PositionImpl horizon = (PositionImpl) topic.getCompactedTopic().getCompactionHorizon().get();
+            CompletableFuture<Boolean> forceReset = new CompletableFuture<>();
+            if (topic.getTopicCompactionService() == null) {
+                forceReset.complete(false);
+            } else {
+                topic.getTopicCompactionService().getLastCompactedPosition().thenAccept(lastCompactedPosition -> {
                     PositionImpl resetTo = (PositionImpl) finalPosition;
-                    if (horizon.compareTo(resetTo) >= 0) {
-                        forceReset = true;
+                    if (lastCompactedPosition != null && resetTo.compareTo(lastCompactedPosition.getLedgerId(),
+                            lastCompactedPosition.getEntryId()) <= 0) {
+                        forceReset.complete(true);
+                    } else {
+                        forceReset.complete(false);
                     }
-                }
-                cursor.asyncResetCursor(finalPosition, forceReset, new AsyncCallbacks.ResetCursorCallback() {
+                }).exceptionally(ex -> {
+                    forceReset.completeExceptionally(ex);
+                    return null;
+                });
+            }
+
+            forceReset.thenAccept(forceResetValue -> {
+                cursor.asyncResetCursor(finalPosition, forceResetValue, new AsyncCallbacks.ResetCursorCallback() {
                     @Override
                     public void resetComplete(Object ctx) {
                         if (log.isDebugEnabled()) {
@@ -811,11 +821,12 @@
                         }
                     }
                 });
-            } catch (Exception e) {
+            }).exceptionally((e) -> {
                 log.error("[{}][{}] Error while resetting cursor", topicName, subName, e);
                 IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
                 future.completeExceptionally(new BrokerServiceException(e));
-            }
+                return null;
+            });
         });
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 12691d1..1e055ec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -167,11 +167,12 @@
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.CompactedTopicImpl;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.CompactorMXBean;
+import org.apache.pulsar.compaction.PulsarTopicCompactionService;
+import org.apache.pulsar.compaction.TopicCompactionService;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.utils.StatsOutputStream;
@@ -210,9 +211,9 @@
 
     protected final MessageDeduplication messageDeduplication;
 
-    private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
+    private static final Long COMPACTION_NEVER_RUN = -0xfebecffeL;
     private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
-    private final CompactedTopic compactedTopic;
+    private TopicCompactionService topicCompactionService;
 
     // TODO: Create compaction strategy from topic policy when exposing strategic compaction to users.
     private static Map<String, TopicCompactionStrategy> strategicCompactionMap = Map.of(
@@ -296,29 +297,11 @@
                 brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
         registerTopicPolicyListener();
 
-        this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
-
-        for (ManagedCursor cursor : ledger.getCursors()) {
-            if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)
-                    || cursor.getName().startsWith(replicatorPrefix)) {
-                // This is not a regular subscription, we are going to
-                // ignore it for now and let the message dedup logic to take care of it
-            } else {
-                final String subscriptionName = Codec.decode(cursor.getName());
-                subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
-                        PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
-                        cursor.getCursorProperties()));
-                // subscription-cursor gets activated by default: deactivate as there is no active subscription right
-                // now
-                subscriptions.get(subscriptionName).deactivateCursor();
-            }
-        }
         this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger);
         if (ledger.getProperties().containsKey(TOPIC_EPOCH_PROPERTY_NAME)) {
             topicEpoch = Optional.of(Long.parseLong(ledger.getProperties().get(TOPIC_EPOCH_PROPERTY_NAME)));
         }
 
-        checkReplicatedSubscriptionControllerState();
         TopicName topicName = TopicName.get(topic);
         if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
                 && !isEventSystemTopic(topicName)) {
@@ -338,6 +321,11 @@
     @Override
     public CompletableFuture<Void> initialize() {
         List<CompletableFuture<Void>> futures = new ArrayList<>();
+        futures.add(brokerService.getPulsar().newTopicCompactionService(topic).thenAccept(service -> {
+            PersistentTopic.this.topicCompactionService = service;
+            this.createPersistentSubscriptions();
+        }));
+
         for (ManagedCursor cursor : ledger.getCursors()) {
             if (cursor.getName().startsWith(replicatorPrefix)) {
                 String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
@@ -406,7 +394,6 @@
                 .expectedItems(16)
                 .concurrencyLevel(1)
                 .build();
-        this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
         this.backloggedCursorThresholdEntries =
                 brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
 
@@ -434,6 +421,25 @@
         return pendingWriteOps;
     }
 
+    private void createPersistentSubscriptions() {
+        for (ManagedCursor cursor : ledger.getCursors()) {
+                if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)
+                        || cursor.getName().startsWith(replicatorPrefix)) {
+                    // This is not a regular subscription, we are going to
+                    // ignore it for now and let the message dedup logic to take care of it
+                } else {
+                    final String subscriptionName = Codec.decode(cursor.getName());
+                    subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
+                            PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
+                            cursor.getCursorProperties()));
+                    // subscription-cursor gets activated by default: deactivate as there is no active subscription
+                    // right now
+                    subscriptions.get(subscriptionName).deactivateCursor();
+                }
+        }
+        checkReplicatedSubscriptionControllerState();
+    }
+
     /**
      * Unload a subscriber.
      * @throws SubscriptionNotFoundException If subscription not founded.
@@ -481,8 +487,10 @@
 
     private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
             boolean replicated, Map<String, String> subscriptionProperties) {
-        Objects.requireNonNull(compactedTopic);
-        if (isCompactionSubscription(subscriptionName)) {
+        Objects.requireNonNull(topicCompactionService);
+        if (isCompactionSubscription(subscriptionName)
+                && topicCompactionService instanceof PulsarTopicCompactionService pulsarTopicCompactionService) {
+            CompactedTopicImpl compactedTopic = pulsarTopicCompactionService.getCompactedTopic();
             return new PulsarCompactorSubscription(this, compactedTopic, subscriptionName, cursor);
         } else {
             return new PersistentSubscription(this, subscriptionName, cursor, replicated, subscriptionProperties);
@@ -1477,6 +1485,14 @@
             });
         }
 
+        if (topicCompactionService != null) {
+            try {
+                topicCompactionService.close();
+            } catch (Exception e) {
+                log.warn("Error close topicCompactionService ", e);
+            }
+        }
+
         CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect
                 ? CompletableFuture.completedFuture(null)
                 : FutureUtil.waitForAll(futures);
@@ -2517,7 +2533,9 @@
 
     public Optional<CompactedTopicContext> getCompactedTopicContext() {
         try {
-            return ((CompactedTopicImpl) compactedTopic).getCompactedTopicContext();
+            if (topicCompactionService instanceof PulsarTopicCompactionService pulsarCompactedService) {
+                return pulsarCompactedService.getCompactedTopic().getCompactedTopicContext();
+            }
         } catch (ExecutionException | InterruptedException e) {
             log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
         }
@@ -3167,7 +3185,7 @@
                 currentCompaction = brokerService.pulsar().getStrategicCompactor()
                         .compact(topic, strategicCompactionMap.get(topic));
             } else {
-                currentCompaction = brokerService.pulsar().getCompactor().compact(topic);
+                currentCompaction = topicCompactionService.compact().thenApply(x -> null);
             }
             currentCompaction.whenComplete((ignore, ex) -> {
                if (ex != null){
@@ -3188,7 +3206,7 @@
             return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
         } else {
             try {
-                if (current.join() == COMPACTION_NEVER_RUN) {
+                if (Objects.equals(current.join(), COMPACTION_NEVER_RUN)) {
                     return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
                 } else {
                     return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
@@ -3311,8 +3329,8 @@
         return replicatedSubscriptionsController;
     }
 
-    public CompactedTopic getCompactedTopic() {
-        return compactedTopic;
+    public TopicCompactionService getTopicCompactionService() {
+        return this.topicCompactionService;
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index e1a10b3..99e2f8a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -29,6 +29,14 @@
 public interface CompactedTopic {
     CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId);
     CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId);
+
+    /**
+     * Read entries from compacted topic.
+     *
+     * @deprecated Use {@link CompactedTopicUtils#readCompactedEntries(TopicCompactionService, ManagedCursor,
+     * int, boolean, ReadEntriesCallback, Consumer)} instead.
+     */
+    @Deprecated
     void asyncReadEntriesOrWait(ManagedCursor cursor,
                                 int numberOfEntriesToRead,
                                 boolean isFirstRead,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 1f11733..fd20c31 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -87,6 +87,7 @@
     }
 
     @Override
+    @Deprecated
     public void asyncReadEntriesOrWait(ManagedCursor cursor,
                                        int numberOfEntriesToRead,
                                        boolean isFirstRead,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
new file mode 100644
index 0000000..4cd21cb
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import com.google.common.annotations.Beta;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import javax.annotation.Nullable;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
+import org.apache.pulsar.common.util.FutureUtil;
+
+public class CompactedTopicUtils {
+
+    @Beta
+    public static void readCompactedEntries(TopicCompactionService topicCompactionService, ManagedCursor cursor,
+                                            int numberOfEntriesToRead, boolean readFromEarliest,
+                                            AsyncCallbacks.ReadEntriesCallback callback, @Nullable Consumer consumer) {
+        Objects.requireNonNull(topicCompactionService);
+        Objects.requireNonNull(cursor);
+        checkArgument(numberOfEntriesToRead > 0);
+        Objects.requireNonNull(callback);
+
+        final PositionImpl readPosition;
+        if (readFromEarliest) {
+            readPosition = PositionImpl.EARLIEST;
+        } else {
+            readPosition = (PositionImpl) cursor.getReadPosition();
+        }
+
+        // TODO: redeliver epoch link https://github.com/apache/pulsar/issues/13690
+        PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx readEntriesCtx =
+                PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx.create(consumer, DEFAULT_CONSUMER_EPOCH);
+
+        CompletableFuture<Position> lastCompactedPositionFuture = topicCompactionService.getLastCompactedPosition();
+
+        lastCompactedPositionFuture.thenCompose(lastCompactedPosition -> {
+            if (lastCompactedPosition == null
+                    || readPosition.compareTo(
+                    lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId()) > 0) {
+                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
+                return CompletableFuture.completedFuture(null);
+            }
+
+            return topicCompactionService.readCompactedEntries(readPosition, numberOfEntriesToRead)
+                    .thenAccept(entries -> {
+                        if (CollectionUtils.isEmpty(entries)) {
+                            Position seekToPosition = lastCompactedPosition.getNext();
+                            if (readPosition.compareTo(seekToPosition.getLedgerId(), seekToPosition.getEntryId()) > 0) {
+                                seekToPosition = readPosition;
+                            }
+                            cursor.seek(seekToPosition);
+                            callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
+                        }
+
+                        Entry lastEntry = entries.get(entries.size() - 1);
+                        cursor.seek(lastEntry.getPosition().getNext(), true);
+                        callback.readEntriesComplete(entries, readEntriesCtx);
+                    });
+        }).exceptionally((exception) -> {
+            exception = FutureUtil.unwrapCompletionException(exception);
+            ManagedLedgerException managedLedgerException;
+            if (exception instanceof ManagedLedgerException) {
+                managedLedgerException = (ManagedLedgerException) exception;
+            } else {
+                managedLedgerException = new ManagedLedgerException(exception);
+            }
+            callback.readEntriesFailed(managedLedgerException, readEntriesCtx);
+            return null;
+        });
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
index de1abfb..7bb3037 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
@@ -18,14 +18,14 @@
  */
 package org.apache.pulsar.compaction;
 
-import com.google.common.annotations.Beta;
 import java.util.concurrent.CompletableFuture;
 import javax.annotation.Nonnull;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
 
-@Beta
 @InterfaceAudience.Public
+@InterfaceStability.Evolving
 public interface CompactionServiceFactory extends AutoCloseable {
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
index dd817ca..424733a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
@@ -24,11 +24,14 @@
 import java.util.concurrent.CompletionException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import lombok.AccessLevel;
+import lombok.Getter;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 
 public class PulsarCompactionServiceFactory implements CompactionServiceFactory {
 
+    @Getter(AccessLevel.PROTECTED)
     private PulsarService pulsarService;
 
     private volatile Compactor compactor;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
index 0a8bf9d..dd218c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
@@ -22,6 +22,7 @@
 import static org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY;
 import static org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED;
 import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -108,4 +109,9 @@
     public CompactedTopicImpl getCompactedTopic() {
         return compactedTopic;
     }
+
+    @Override
+    public void close() throws IOException {
+        // noop
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
index 6b64b9c..74df0da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
@@ -18,17 +18,17 @@
  */
 package org.apache.pulsar.compaction;
 
-import com.google.common.annotations.Beta;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import javax.annotation.Nonnull;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
 
-@Beta
 @InterfaceAudience.Public
-public interface TopicCompactionService {
+@InterfaceStability.Evolving
+public interface TopicCompactionService extends AutoCloseable {
     /**
      * Compact the topic.
      * Topic Compaction is a key-based retention mechanism. It keeps the most recent value for a given key and
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 821dd9c..7d3b586 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -22,12 +22,14 @@
 import java.io.IOException;
 import java.time.Duration;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -59,7 +61,6 @@
 public class TwoPhaseCompactor extends Compactor {
     private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
     private static final int MAX_OUTSTANDING = 500;
-    protected static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
     private final Duration phaseOneLoopReadTimeout;
 
     public TwoPhaseCompactor(ServiceConfiguration conf,
@@ -128,8 +129,7 @@
                 mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
                 if (RawBatchConverter.isReadableBatch(m)) {
                     try {
-                        for (ImmutableTriple<MessageId, String, Integer> e : RawBatchConverter
-                                .extractIdsAndKeysAndSize(m)) {
+                        for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
                             if (e != null) {
                                 if (e.getRight() > 0) {
                                     MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
@@ -238,7 +238,7 @@
                 mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
                 if (RawBatchConverter.isReadableBatch(m)) {
                     try {
-                        messageToAdd = RawBatchConverter.rebatchMessage(
+                        messageToAdd = rebatchMessage(
                                 m, (key, subid) -> subid.equals(latestForKey.get(key)));
                     } catch (IOException ioe) {
                         log.info("Error decoding batch for message {}. Whole batch will be included in output",
@@ -386,7 +386,7 @@
         return bkf;
     }
 
-    private static Pair<String, Integer> extractKeyAndSize(RawMessage m) {
+    protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
         ByteBuf headersAndPayload = m.getHeadersAndPayload();
         MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
         if (msgMetadata.hasPartitionKey()) {
@@ -400,6 +400,16 @@
         }
     }
 
+    protected List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAndSizeFromBatch(RawMessage msg)
+            throws IOException {
+        return RawBatchConverter.extractIdsAndKeysAndSize(msg);
+    }
+
+    protected Optional<RawMessage> rebatchMessage(RawMessage msg, BiPredicate<String, MessageId> filter)
+            throws IOException {
+        return RawBatchConverter.rebatchMessage(msg, filter);
+    }
+
     private static class PhaseOneResult {
         final MessageId from;
         final MessageId to; // last undeleted messageId
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 7b04717..342a409 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -140,6 +140,7 @@
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -3123,7 +3124,7 @@
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<Long>();
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         doReturn(promise).when(compactor).compact(topicName);
         admin.topics().triggerCompaction(topicName);
 
@@ -3159,7 +3160,7 @@
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<>();
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         doReturn(promise).when(compactor).compact(topicName + "-partition-0");
 
         CompletableFuture<Long> promise1 = new CompletableFuture<>();
@@ -3203,7 +3204,7 @@
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<Long>();
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         doReturn(promise).when(compactor).compact(topicName);
         admin.topics().triggerCompaction(topicName);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index e720c9b..ab83c8f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -111,6 +111,7 @@
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -2075,7 +2076,7 @@
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<>();
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         doReturn(promise).when(compactor).compact(topicName);
         admin.topics().triggerCompaction(topicName);
 
@@ -2112,7 +2113,7 @@
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<>();
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         doReturn(promise).when(compactor).compact(topicName);
         admin.topics().triggerCompaction(topicName);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 6ce6de1..cbbb880 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -119,6 +119,7 @@
     public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
         // create topic
         final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+        topic.initialize().join();
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
                 .setTopic(successTopicName)
@@ -177,6 +178,7 @@
     public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception {
         // create topic
         final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+        topic.initialize().join();
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
                 .setTopic(successTopicName)
@@ -241,6 +243,7 @@
     public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception {
         // create topic
         final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+        topic.initialize().join();
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
                 .setTopic(successTopicName)
@@ -299,6 +302,7 @@
     public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception {
         // create topic
         final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+        topic.initialize().join();
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
                 .setTopic(successTopicName)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index c49df3e..078208f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -93,13 +93,13 @@
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -132,6 +132,7 @@
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.CompactorMXBean;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.awaitility.Awaitility;
@@ -630,6 +631,7 @@
     @Test
     public void testSubscribeFail() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
+        topic.initialize().join();
 
         // Empty subscription name
         CommandSubscribe cmd = new CommandSubscribe()
@@ -666,6 +668,7 @@
     @Test
     public void testSubscribeUnsubscribe() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
+        topic.initialize().join();
 
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
@@ -1238,6 +1241,7 @@
     public void testDeleteAndUnsubscribeTopic() throws Exception {
         // create topic
         final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+        topic.initialize().join();
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
                 .setTopic(successTopicName)
@@ -1347,6 +1351,7 @@
     @Test
     public void testDeleteTopicRaceConditions() throws Exception {
         PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+        topic.initialize().join();
 
         // override ledger deletion callback to slow down deletion
         doAnswer(invocationOnMock -> {
@@ -1537,6 +1542,7 @@
                 .setSubType(SubType.Failover);
 
         // 1. Subscribe with non partition topic
+        topic1.initialize().join();
         Future<Consumer> f1 = topic1.subscribe(getSubscriptionOption(cmd1));
         f1.get();
 
@@ -1552,6 +1558,7 @@
                 .setRequestId(1)
                 .setSubType(SubType.Failover);
 
+        topic2.initialize();
         Future<Consumer> f2 = topic2.subscribe(getSubscriptionOption(cmd2));
         f2.get();
 
@@ -1826,7 +1833,8 @@
     @Test
     public void testCompactionTriggeredAfterThresholdFirstInvocation() throws Exception {
         CompletableFuture<Long> compactPromise = new CompletableFuture<>();
-        Compactor compactor = pulsarTestContext.getPulsarService().getCompactor();
+        Compactor compactor = ((PulsarCompactionServiceFactory) pulsarTestContext.getPulsarService()
+                .getCompactionServiceFactory()).getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
         Policies policies = new Policies();
@@ -1857,7 +1865,8 @@
     @Test
     public void testCompactionTriggeredAfterThresholdSecondInvocation() throws Exception {
         CompletableFuture<Long> compactPromise = new CompletableFuture<>();
-        Compactor compactor = pulsarTestContext.getPulsarService().getCompactor();
+        Compactor compactor = ((PulsarCompactionServiceFactory) pulsarTestContext.getPulsarService()
+                .getCompactionServiceFactory()).getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
         ManagedCursor subCursor = mock(ManagedCursor.class);
@@ -1891,7 +1900,8 @@
     @Test
     public void testCompactionDisabledWithZeroThreshold() throws Exception {
         CompletableFuture<Long> compactPromise = new CompletableFuture<>();
-        Compactor compactor = pulsarTestContext.getPulsarService().getCompactor();
+        Compactor compactor = ((PulsarCompactionServiceFactory) pulsarTestContext.getPulsarService()
+                .getCompactionServiceFactory()).getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
         Policies policies = new Policies();
@@ -2161,6 +2171,7 @@
             return null;
         }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any(), any());
         PersistentTopic topic = new PersistentTopic(successTopicName, mockLedger, brokerService);
+        topic.initialize().join();
 
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 19583a4..a665681 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -51,6 +51,7 @@
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -235,6 +236,7 @@
 
         doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
         doReturn(mock(PulsarResources.class)).when(pulsarService).getPulsarResources();
+        doReturn(mock(CompactionServiceFactory.class)).when(pulsarService).getCompactionServiceFactory();
 
         ManagedLedger managedLedger = mock(ManagedLedger.class);
         MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, mock(PersistentTopic.class), managedLedger));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index c4e4107..31d4683 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -81,6 +81,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.zookeeper.CreateMode;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
@@ -1739,7 +1740,7 @@
                     .value(data)
                     .send();
         }
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         compactor.compact(topicName).get();
         statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
index 517d57d..fcea99c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
@@ -26,7 +26,7 @@
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
-import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -38,11 +38,11 @@
  */
 abstract class AbstractTestPulsarService extends PulsarService {
     protected final SpyConfig spyConfig;
-    private boolean compactorExists;
 
     public AbstractTestPulsarService(SpyConfig spyConfig, ServiceConfiguration config,
                                      MetadataStoreExtended localMetadataStore,
-                                     MetadataStoreExtended configurationMetadataStore, Compactor compactor,
+                                     MetadataStoreExtended configurationMetadataStore,
+                                     CompactionServiceFactory compactionServiceFactory,
                                      BrokerInterceptor brokerInterceptor,
                                      BookKeeperClientFactory bookKeeperClientFactory) {
         super(config);
@@ -51,7 +51,7 @@
                 NonClosingProxyHandler.createNonClosingProxy(localMetadataStore, MetadataStoreExtended.class));
         setConfigurationMetadataStore(
                 NonClosingProxyHandler.createNonClosingProxy(configurationMetadataStore, MetadataStoreExtended.class));
-        setCompactor(compactor);
+        super.setCompactionServiceFactory(compactionServiceFactory);
         setBrokerInterceptor(brokerInterceptor);
         setBkClientFactory(bookKeeperClientFactory);
     }
@@ -77,23 +77,6 @@
     }
 
     @Override
-    protected void setCompactor(Compactor compactor) {
-        if (compactor != null) {
-            compactorExists = true;
-        }
-        super.setCompactor(compactor);
-    }
-
-    @Override
-    public Compactor newCompactor() throws PulsarServerException {
-        if (compactorExists) {
-            return getCompactor();
-        } else {
-            return spyConfig.getCompactor().spy(super.newCompactor());
-        }
-    }
-
-    @Override
     public BookKeeperClientFactory newBookKeeperClientFactory() {
         return getBkClientFactory();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockPulsarCompactionServiceFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockPulsarCompactionServiceFactory.java
new file mode 100644
index 0000000..77959fe
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockPulsarCompactionServiceFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
+
+public class MockPulsarCompactionServiceFactory extends PulsarCompactionServiceFactory {
+    private final Compactor compactor;
+    private final SpyConfig spyConfig;
+
+    public MockPulsarCompactionServiceFactory(SpyConfig spyConfig, Compactor compactor) {
+        this.compactor = compactor;
+        this.spyConfig = spyConfig;
+    }
+
+    @Override
+    protected Compactor newCompactor() throws PulsarServerException {
+        if (this.compactor != null) {
+            return this.compactor;
+        } else {
+            return spyConfig.getCompactor().spy(super.newCompactor());
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
index af365ed..2896f33 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -45,7 +45,7 @@
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
@@ -60,13 +60,14 @@
     public NonStartableTestPulsarService(SpyConfig spyConfig, ServiceConfiguration config,
                                          MetadataStoreExtended localMetadataStore,
                                          MetadataStoreExtended configurationMetadataStore,
-                                         Compactor compactor, BrokerInterceptor brokerInterceptor,
+                                         CompactionServiceFactory compactionServiceFactory,
+                                         BrokerInterceptor brokerInterceptor,
                                          BookKeeperClientFactory bookKeeperClientFactory,
                                          PulsarResources pulsarResources,
                                          ManagedLedgerStorage managedLedgerClientFactory,
                                          Function<BrokerService, BrokerService> brokerServiceCustomizer) {
-        super(spyConfig, config, localMetadataStore, configurationMetadataStore, compactor, brokerInterceptor,
-                bookKeeperClientFactory);
+        super(spyConfig, config, localMetadataStore, configurationMetadataStore, compactionServiceFactory,
+                brokerInterceptor, bookKeeperClientFactory);
         setPulsarResources(pulsarResources);
         setManagedLedgerClientFactory(managedLedgerClientFactory);
         try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index f490ebd..40a4228 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -54,7 +54,9 @@
 import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -135,6 +137,8 @@
 
     private final Compactor compactor;
 
+    private final CompactionServiceFactory compactionServiceFactory;
+
     private final BrokerService brokerService;
 
     @Getter(AccessLevel.NONE)
@@ -659,10 +663,19 @@
         protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) {
             BookKeeperClientFactory bookKeeperClientFactory =
                     new MockBookKeeperClientFactory(builder.bookKeeperClient);
+            CompactionServiceFactory compactionServiceFactory = builder.compactionServiceFactory;
+            if (builder.compactionServiceFactory == null && builder.config.getCompactionServiceFactoryClassName()
+                    .equals(PulsarCompactionServiceFactory.class.getName())) {
+                compactionServiceFactory = new MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
+            }
             PulsarService pulsarService = spyConfig.getPulsarService()
                     .spy(StartableTestPulsarService.class, spyConfig, builder.config, builder.localMetadataStore,
-                            builder.configurationMetadataStore, builder.compactor, builder.brokerInterceptor,
+                            builder.configurationMetadataStore, compactionServiceFactory,
+                            builder.brokerInterceptor,
                             bookKeeperClientFactory, builder.brokerServiceCustomizer);
+            if (compactionServiceFactory != null) {
+                compactionServiceFactory.initialize(pulsarService);
+            }
             registerCloseable(() -> {
                 pulsarService.close();
                 resetSpyOrMock(pulsarService);
@@ -717,11 +730,20 @@
             }
             BookKeeperClientFactory bookKeeperClientFactory =
                     new MockBookKeeperClientFactory(builder.bookKeeperClient);
+            CompactionServiceFactory compactionServiceFactory = builder.compactionServiceFactory;
+            if (builder.compactionServiceFactory == null && builder.config.getCompactionServiceFactoryClassName()
+                    .equals(PulsarCompactionServiceFactory.class.getName())) {
+                compactionServiceFactory = new MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
+            }
             PulsarService pulsarService = spyConfig.getPulsarService()
                     .spy(NonStartableTestPulsarService.class, spyConfig, builder.config, builder.localMetadataStore,
-                            builder.configurationMetadataStore, builder.compactor, builder.brokerInterceptor,
+                            builder.configurationMetadataStore, compactionServiceFactory,
+                            builder.brokerInterceptor,
                             bookKeeperClientFactory, builder.pulsarResources,
                             builder.managedLedgerClientFactory, builder.brokerServiceCustomizer);
+            if (compactionServiceFactory != null) {
+                compactionServiceFactory.initialize(pulsarService);
+            }
             registerCloseable(() -> {
                 pulsarService.close();
                 resetSpyOrMock(pulsarService);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
index de51cee..8c42998 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
@@ -98,9 +98,15 @@
      */
     private final SpyType bookKeeperClient;
     /**
-     * Spy configuration for {@link PulsarService#getCompactor()}.
+     * Spy configuration for {@link PulsarService#getCompactionServiceFactory#getCompactor()}.
      */
     private final SpyType compactor;
+
+    /**
+     * Spy configuration for {@link PulsarService#getCompactionServiceFactory()}.
+     */
+
+    private final SpyType compactedServiceFactory;
     /**
      * Spy configuration for {@link PulsarService#getNamespaceService()}.
      */
@@ -128,6 +134,7 @@
         spyConfigBuilder.brokerService(defaultSpyType);
         spyConfigBuilder.bookKeeperClient(defaultSpyType);
         spyConfigBuilder.compactor(defaultSpyType);
+        spyConfigBuilder.compactedServiceFactory(defaultSpyType);
         spyConfigBuilder.namespaceService(defaultSpyType);
         return spyConfigBuilder;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
index 8a485a0..a5964c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
@@ -28,7 +28,7 @@
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 /**
@@ -41,12 +41,12 @@
     public StartableTestPulsarService(SpyConfig spyConfig, ServiceConfiguration config,
                                       MetadataStoreExtended localMetadataStore,
                                       MetadataStoreExtended configurationMetadataStore,
-                                      Compactor compactor,
+                                      CompactionServiceFactory compactionServiceFactory,
                                       BrokerInterceptor brokerInterceptor,
                                       BookKeeperClientFactory bookKeeperClientFactory,
                                       Function<BrokerService, BrokerService> brokerServiceCustomizer) {
-        super(spyConfig, config, localMetadataStore, configurationMetadataStore, compactor, brokerInterceptor,
-                bookKeeperClientFactory);
+        super(spyConfig, config, localMetadataStore, configurationMetadataStore, compactionServiceFactory,
+                brokerInterceptor, bookKeeperClientFactory);
         this.brokerServiceCustomizer = brokerServiceCustomizer;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index c4ec2ec..4e50401 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -21,8 +21,8 @@
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_CURSOR_NAME;
 import static org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
-import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
 import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
+import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
@@ -42,12 +42,12 @@
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
-import java.util.ArrayList;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -99,8 +99,8 @@
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
-import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCallBack;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
@@ -124,9 +124,9 @@
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessagesImpl;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
@@ -138,14 +138,16 @@
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
 import org.awaitility.Awaitility;
 import org.mockito.invocation.InvocationOnMock;
@@ -1571,6 +1573,9 @@
         when(pulsar.getTransactionBufferSnapshotServiceFactory()).thenReturn(transactionBufferSnapshotServiceFactory);
         TopicTransactionBufferProvider topicTransactionBufferProvider = new TopicTransactionBufferProvider();
         when(pulsar.getTransactionBufferProvider()).thenReturn(topicTransactionBufferProvider);
+        CompactionServiceFactory compactionServiceFactory = new PulsarCompactionServiceFactory();
+        compactionServiceFactory.initialize(pulsar);
+        when(pulsar.getCompactionServiceFactory()).thenReturn(compactionServiceFactory);
         // Mock BacklogQuotaManager
         BacklogQuotaManager backlogQuotaManager = mock(BacklogQuotaManager.class);
         // Mock brokerService.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 957671b..96adc67 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -21,10 +21,8 @@
 import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.google.common.collect.Sets;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -36,9 +34,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.IntStream;
-
 import lombok.Cleanup;
-
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -47,7 +43,6 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -665,7 +660,7 @@
 
         producer.newMessage().key("k").value(("value").getBytes()).send();
         producer.newMessage().key("k").value(null).send();
-        pulsar.getCompactor().compact(topic).get();
+        ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor().compact(topic).get();
 
         Awaitility.await()
                 .pollInterval(3, TimeUnit.SECONDS)