[feat][broker][PIP-278] Support pluggable topic compaction service - part2 (#20718)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 40c5a2d..4ffb5b7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -147,9 +147,11 @@
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.common.util.ThreadDumpUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor;
-import org.apache.pulsar.compaction.TwoPhaseCompactor;
+import org.apache.pulsar.compaction.TopicCompactionService;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -198,7 +200,7 @@
private WebSocketService webSocketService = null;
private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED;
private BookKeeperClientFactory bkClientFactory;
- private Compactor compactor;
+ protected CompactionServiceFactory compactionServiceFactory;
private StrategicTwoPhaseCompactor strategicCompactor;
private ResourceUsageTransportManager resourceUsageTransportManager;
private ResourceGroupService resourceGroupServiceManager;
@@ -452,6 +454,15 @@
resetMetricsServlet();
+ if (this.compactionServiceFactory != null) {
+ try {
+ this.compactionServiceFactory.close();
+ } catch (Exception e) {
+ LOG.warn("CompactionServiceFactory closing failed {}", e.getMessage());
+ }
+ this.compactionServiceFactory = null;
+ }
+
if (this.webSocketService != null) {
this.webSocketService.close();
}
@@ -813,6 +824,9 @@
this.brokerServiceUrl = brokerUrl(config);
this.brokerServiceUrlTls = brokerUrlTls(config);
+ if (this.compactionServiceFactory == null) {
+ this.compactionServiceFactory = loadCompactionServiceFactory();
+ }
if (null != this.webSocketService) {
ClusterDataImpl clusterData = ClusterDataImpl.builder()
@@ -1475,25 +1489,16 @@
return this.compactorExecutor;
}
- // only public so mockito can mock it
- public Compactor newCompactor() throws PulsarServerException {
- return new TwoPhaseCompactor(this.getConfiguration(),
- getClient(), getBookKeeperClient(),
- getCompactorExecutor());
- }
-
- public synchronized Compactor getCompactor() throws PulsarServerException {
- if (this.compactor == null) {
- this.compactor = newCompactor();
- }
- return this.compactor;
- }
-
// This method is used for metrics, which is allowed to as null
// Because it's no operation on the compactor, so let's remove the synchronized on this method
// to avoid unnecessary lock competition.
+ // Only the pulsar's compaction service provides the compaction stats. The compaction service plugin,
+ // it should be done by the plugin itself to expose the compaction metrics.
public Compactor getNullableCompactor() {
- return this.compactor;
+ if (this.compactionServiceFactory instanceof PulsarCompactionServiceFactory pulsarCompactedServiceFactory) {
+ return pulsarCompactedServiceFactory.getNullableCompactor();
+ }
+ return null;
}
public StrategicTwoPhaseCompactor newStrategicCompactor() throws PulsarServerException {
@@ -1911,4 +1916,22 @@
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
return new BrokerService(pulsar, ioEventLoopGroup);
}
+
+ private CompactionServiceFactory loadCompactionServiceFactory() {
+ String compactionServiceFactoryClassName = config.getCompactionServiceFactoryClassName();
+ var compactionServiceFactory =
+ Reflections.createInstance(compactionServiceFactoryClassName, CompactionServiceFactory.class,
+ Thread.currentThread().getContextClassLoader());
+ compactionServiceFactory.initialize(this).join();
+ return compactionServiceFactory;
+ }
+
+ public CompletableFuture<TopicCompactionService> newTopicCompactionService(String topic) {
+ try {
+ CompactionServiceFactory compactionServiceFactory = this.getCompactionServiceFactory();
+ return compactionServiceFactory.newTopicCompactionService(topic);
+ } catch (Throwable e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index f91793c..bf55dda 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2077,64 +2077,73 @@
// If it's not pointing to a valid entry, respond messageId of the current position.
// If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
- Optional<Position> compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon();
- if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
- && lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0)) {
- handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
- markDeletePosition);
- return;
- }
+ CompletableFuture<Position> compactionHorizonFuture =
+ persistentTopic.getTopicCompactionService().getLastCompactedPosition();
- // For a valid position, we read the entry out and parse the batch size from its metadata.
- CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
- ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() {
- @Override
- public void readEntryComplete(Entry entry, Object ctx) {
- entryFuture.complete(entry);
+ compactionHorizonFuture.whenComplete((compactionHorizon, ex) -> {
+ if (ex != null) {
+ log.error("Failed to get compactionHorizon.", ex);
+ writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, ex.getMessage()));
+ return;
}
- @Override
- public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
- entryFuture.completeExceptionally(exception);
+ if (lastPosition.getEntryId() == -1 || (compactionHorizon != null
+ && lastPosition.compareTo((PositionImpl) compactionHorizon) <= 0)) {
+ handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
+ markDeletePosition);
+ return;
}
- }, null);
- CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
- MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
- int batchSize = metadata.getNumMessagesInBatch();
- entry.release();
- return metadata.hasNumMessagesInBatch() ? batchSize : -1;
- });
+ // For a valid position, we read the entry out and parse the batch size from its metadata.
+ CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
+ ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ entryFuture.complete(entry);
+ }
- batchSizeFuture.whenComplete((batchSize, e) -> {
- if (e != null) {
- if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
- handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
- markDeletePosition);
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+ entryFuture.completeExceptionally(exception);
+ }
+ }, null);
+
+ CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
+ MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
+ int batchSize = metadata.getNumMessagesInBatch();
+ entry.release();
+ return metadata.hasNumMessagesInBatch() ? batchSize : -1;
+ });
+
+ batchSizeFuture.whenComplete((batchSize, e) -> {
+ if (e != null) {
+ if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
+ handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
+ markDeletePosition);
+ } else {
+ writeAndFlush(Commands.newError(
+ requestId, ServerError.MetadataError,
+ "Failed to get batch size for entry " + e.getMessage()));
+ }
} else {
- writeAndFlush(Commands.newError(
- requestId, ServerError.MetadataError,
- "Failed to get batch size for entry " + e.getMessage()));
- }
- } else {
- int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
+ int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
- topic.getName(), subscriptionName, lastPosition, partitionIndex);
- }
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
+ topic.getName(), subscriptionName, lastPosition, partitionIndex);
+ }
- writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(),
- lastPosition.getEntryId(), partitionIndex, largestBatchIndex,
- markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
- markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
- }
+ writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(),
+ lastPosition.getEntryId(), partitionIndex, largestBatchIndex,
+ markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
+ markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
+ }
+ });
});
}
-
- private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTopic, long requestId,
- int partitionIndex, PositionImpl markDeletePosition) {
- persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry -> {
+ private void handleLastMessageIdFromCompactionService(PersistentTopic persistentTopic, long requestId,
+ int partitionIndex, PositionImpl markDeletePosition) {
+ persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> {
if (entry != null) {
try {
// in this case, all the data has been compacted, so return the last position
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index d9d0f6a..6de113d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -50,9 +50,11 @@
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.compaction.CompactedTopicUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -347,8 +349,9 @@
}
havePendingRead = true;
if (consumer.readCompacted()) {
- topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
- this, consumer);
+ boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId());
+ CompactedTopicUtils.readCompactedEntries(topic.getTopicCompactionService(), cursor, messagesToRead,
+ readFromEarliest, this, consumer);
} else {
ReadEntriesCtx readEntriesCtx =
ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 09a9961..1b8b47d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -772,16 +772,26 @@
log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset",
topicName, subName);
- try {
- boolean forceReset = false;
- if (topic.getCompactedTopic() != null && topic.getCompactedTopic().getCompactionHorizon().isPresent()) {
- PositionImpl horizon = (PositionImpl) topic.getCompactedTopic().getCompactionHorizon().get();
+ CompletableFuture<Boolean> forceReset = new CompletableFuture<>();
+ if (topic.getTopicCompactionService() == null) {
+ forceReset.complete(false);
+ } else {
+ topic.getTopicCompactionService().getLastCompactedPosition().thenAccept(lastCompactedPosition -> {
PositionImpl resetTo = (PositionImpl) finalPosition;
- if (horizon.compareTo(resetTo) >= 0) {
- forceReset = true;
+ if (lastCompactedPosition != null && resetTo.compareTo(lastCompactedPosition.getLedgerId(),
+ lastCompactedPosition.getEntryId()) <= 0) {
+ forceReset.complete(true);
+ } else {
+ forceReset.complete(false);
}
- }
- cursor.asyncResetCursor(finalPosition, forceReset, new AsyncCallbacks.ResetCursorCallback() {
+ }).exceptionally(ex -> {
+ forceReset.completeExceptionally(ex);
+ return null;
+ });
+ }
+
+ forceReset.thenAccept(forceResetValue -> {
+ cursor.asyncResetCursor(finalPosition, forceResetValue, new AsyncCallbacks.ResetCursorCallback() {
@Override
public void resetComplete(Object ctx) {
if (log.isDebugEnabled()) {
@@ -811,11 +821,12 @@
}
}
});
- } catch (Exception e) {
+ }).exceptionally((e) -> {
log.error("[{}][{}] Error while resetting cursor", topicName, subName, e);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.completeExceptionally(new BrokerServiceException(e));
- }
+ return null;
+ });
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 12691d1..1e055ec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -167,11 +167,12 @@
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
+import org.apache.pulsar.compaction.PulsarTopicCompactionService;
+import org.apache.pulsar.compaction.TopicCompactionService;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
@@ -210,9 +211,9 @@
protected final MessageDeduplication messageDeduplication;
- private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
+ private static final Long COMPACTION_NEVER_RUN = -0xfebecffeL;
private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
- private final CompactedTopic compactedTopic;
+ private TopicCompactionService topicCompactionService;
// TODO: Create compaction strategy from topic policy when exposing strategic compaction to users.
private static Map<String, TopicCompactionStrategy> strategicCompactionMap = Map.of(
@@ -296,29 +297,11 @@
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
registerTopicPolicyListener();
- this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
-
- for (ManagedCursor cursor : ledger.getCursors()) {
- if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)
- || cursor.getName().startsWith(replicatorPrefix)) {
- // This is not a regular subscription, we are going to
- // ignore it for now and let the message dedup logic to take care of it
- } else {
- final String subscriptionName = Codec.decode(cursor.getName());
- subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
- PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
- cursor.getCursorProperties()));
- // subscription-cursor gets activated by default: deactivate as there is no active subscription right
- // now
- subscriptions.get(subscriptionName).deactivateCursor();
- }
- }
this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger);
if (ledger.getProperties().containsKey(TOPIC_EPOCH_PROPERTY_NAME)) {
topicEpoch = Optional.of(Long.parseLong(ledger.getProperties().get(TOPIC_EPOCH_PROPERTY_NAME)));
}
- checkReplicatedSubscriptionControllerState();
TopicName topicName = TopicName.get(topic);
if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
&& !isEventSystemTopic(topicName)) {
@@ -338,6 +321,11 @@
@Override
public CompletableFuture<Void> initialize() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
+ futures.add(brokerService.getPulsar().newTopicCompactionService(topic).thenAccept(service -> {
+ PersistentTopic.this.topicCompactionService = service;
+ this.createPersistentSubscriptions();
+ }));
+
for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
@@ -406,7 +394,6 @@
.expectedItems(16)
.concurrencyLevel(1)
.build();
- this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
@@ -434,6 +421,25 @@
return pendingWriteOps;
}
+ private void createPersistentSubscriptions() {
+ for (ManagedCursor cursor : ledger.getCursors()) {
+ if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)
+ || cursor.getName().startsWith(replicatorPrefix)) {
+ // This is not a regular subscription, we are going to
+ // ignore it for now and let the message dedup logic to take care of it
+ } else {
+ final String subscriptionName = Codec.decode(cursor.getName());
+ subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
+ PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
+ cursor.getCursorProperties()));
+ // subscription-cursor gets activated by default: deactivate as there is no active subscription
+ // right now
+ subscriptions.get(subscriptionName).deactivateCursor();
+ }
+ }
+ checkReplicatedSubscriptionControllerState();
+ }
+
/**
* Unload a subscriber.
* @throws SubscriptionNotFoundException If subscription not founded.
@@ -481,8 +487,10 @@
private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
- Objects.requireNonNull(compactedTopic);
- if (isCompactionSubscription(subscriptionName)) {
+ Objects.requireNonNull(topicCompactionService);
+ if (isCompactionSubscription(subscriptionName)
+ && topicCompactionService instanceof PulsarTopicCompactionService pulsarTopicCompactionService) {
+ CompactedTopicImpl compactedTopic = pulsarTopicCompactionService.getCompactedTopic();
return new PulsarCompactorSubscription(this, compactedTopic, subscriptionName, cursor);
} else {
return new PersistentSubscription(this, subscriptionName, cursor, replicated, subscriptionProperties);
@@ -1477,6 +1485,14 @@
});
}
+ if (topicCompactionService != null) {
+ try {
+ topicCompactionService.close();
+ } catch (Exception e) {
+ log.warn("Error close topicCompactionService ", e);
+ }
+ }
+
CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect
? CompletableFuture.completedFuture(null)
: FutureUtil.waitForAll(futures);
@@ -2517,7 +2533,9 @@
public Optional<CompactedTopicContext> getCompactedTopicContext() {
try {
- return ((CompactedTopicImpl) compactedTopic).getCompactedTopicContext();
+ if (topicCompactionService instanceof PulsarTopicCompactionService pulsarCompactedService) {
+ return pulsarCompactedService.getCompactedTopic().getCompactedTopicContext();
+ }
} catch (ExecutionException | InterruptedException e) {
log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
}
@@ -3167,7 +3185,7 @@
currentCompaction = brokerService.pulsar().getStrategicCompactor()
.compact(topic, strategicCompactionMap.get(topic));
} else {
- currentCompaction = brokerService.pulsar().getCompactor().compact(topic);
+ currentCompaction = topicCompactionService.compact().thenApply(x -> null);
}
currentCompaction.whenComplete((ignore, ex) -> {
if (ex != null){
@@ -3188,7 +3206,7 @@
return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
} else {
try {
- if (current.join() == COMPACTION_NEVER_RUN) {
+ if (Objects.equals(current.join(), COMPACTION_NEVER_RUN)) {
return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
} else {
return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
@@ -3311,8 +3329,8 @@
return replicatedSubscriptionsController;
}
- public CompactedTopic getCompactedTopic() {
- return compactedTopic;
+ public TopicCompactionService getTopicCompactionService() {
+ return this.topicCompactionService;
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index e1a10b3..99e2f8a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -29,6 +29,14 @@
public interface CompactedTopic {
CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId);
CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId);
+
+ /**
+ * Read entries from compacted topic.
+ *
+ * @deprecated Use {@link CompactedTopicUtils#readCompactedEntries(TopicCompactionService, ManagedCursor,
+ * int, boolean, ReadEntriesCallback, Consumer)} instead.
+ */
+ @Deprecated
void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
boolean isFirstRead,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 1f11733..fd20c31 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -87,6 +87,7 @@
}
@Override
+ @Deprecated
public void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
boolean isFirstRead,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
new file mode 100644
index 0000000..4cd21cb
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import com.google.common.annotations.Beta;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import javax.annotation.Nullable;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
+import org.apache.pulsar.common.util.FutureUtil;
+
+public class CompactedTopicUtils {
+
+ @Beta
+ public static void readCompactedEntries(TopicCompactionService topicCompactionService, ManagedCursor cursor,
+ int numberOfEntriesToRead, boolean readFromEarliest,
+ AsyncCallbacks.ReadEntriesCallback callback, @Nullable Consumer consumer) {
+ Objects.requireNonNull(topicCompactionService);
+ Objects.requireNonNull(cursor);
+ checkArgument(numberOfEntriesToRead > 0);
+ Objects.requireNonNull(callback);
+
+ final PositionImpl readPosition;
+ if (readFromEarliest) {
+ readPosition = PositionImpl.EARLIEST;
+ } else {
+ readPosition = (PositionImpl) cursor.getReadPosition();
+ }
+
+ // TODO: redeliver epoch link https://github.com/apache/pulsar/issues/13690
+ PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx readEntriesCtx =
+ PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx.create(consumer, DEFAULT_CONSUMER_EPOCH);
+
+ CompletableFuture<Position> lastCompactedPositionFuture = topicCompactionService.getLastCompactedPosition();
+
+ lastCompactedPositionFuture.thenCompose(lastCompactedPosition -> {
+ if (lastCompactedPosition == null
+ || readPosition.compareTo(
+ lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId()) > 0) {
+ cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return topicCompactionService.readCompactedEntries(readPosition, numberOfEntriesToRead)
+ .thenAccept(entries -> {
+ if (CollectionUtils.isEmpty(entries)) {
+ Position seekToPosition = lastCompactedPosition.getNext();
+ if (readPosition.compareTo(seekToPosition.getLedgerId(), seekToPosition.getEntryId()) > 0) {
+ seekToPosition = readPosition;
+ }
+ cursor.seek(seekToPosition);
+ callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
+ }
+
+ Entry lastEntry = entries.get(entries.size() - 1);
+ cursor.seek(lastEntry.getPosition().getNext(), true);
+ callback.readEntriesComplete(entries, readEntriesCtx);
+ });
+ }).exceptionally((exception) -> {
+ exception = FutureUtil.unwrapCompletionException(exception);
+ ManagedLedgerException managedLedgerException;
+ if (exception instanceof ManagedLedgerException) {
+ managedLedgerException = (ManagedLedgerException) exception;
+ } else {
+ managedLedgerException = new ManagedLedgerException(exception);
+ }
+ callback.readEntriesFailed(managedLedgerException, readEntriesCtx);
+ return null;
+ });
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
index de1abfb..7bb3037 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
@@ -18,14 +18,14 @@
*/
package org.apache.pulsar.compaction;
-import com.google.common.annotations.Beta;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
-@Beta
@InterfaceAudience.Public
+@InterfaceStability.Evolving
public interface CompactionServiceFactory extends AutoCloseable {
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
index dd817ca..424733a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
@@ -24,11 +24,14 @@
import java.util.concurrent.CompletionException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import lombok.AccessLevel;
+import lombok.Getter;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
public class PulsarCompactionServiceFactory implements CompactionServiceFactory {
+ @Getter(AccessLevel.PROTECTED)
private PulsarService pulsarService;
private volatile Compactor compactor;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
index 0a8bf9d..dd218c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
@@ -22,6 +22,7 @@
import static org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY;
import static org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED;
import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
@@ -108,4 +109,9 @@
public CompactedTopicImpl getCompactedTopic() {
return compactedTopic;
}
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
index 6b64b9c..74df0da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
@@ -18,17 +18,17 @@
*/
package org.apache.pulsar.compaction;
-import com.google.common.annotations.Beta;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
-@Beta
@InterfaceAudience.Public
-public interface TopicCompactionService {
+@InterfaceStability.Evolving
+public interface TopicCompactionService extends AutoCloseable {
/**
* Compact the topic.
* Topic Compaction is a key-based retention mechanism. It keeps the most recent value for a given key and
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 821dd9c..7d3b586 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -22,12 +22,14 @@
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -59,7 +61,6 @@
public class TwoPhaseCompactor extends Compactor {
private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
- protected static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
private final Duration phaseOneLoopReadTimeout;
public TwoPhaseCompactor(ServiceConfiguration conf,
@@ -128,8 +129,7 @@
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
try {
- for (ImmutableTriple<MessageId, String, Integer> e : RawBatchConverter
- .extractIdsAndKeysAndSize(m)) {
+ for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
if (e != null) {
if (e.getRight() > 0) {
MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
@@ -238,7 +238,7 @@
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
try {
- messageToAdd = RawBatchConverter.rebatchMessage(
+ messageToAdd = rebatchMessage(
m, (key, subid) -> subid.equals(latestForKey.get(key)));
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
@@ -386,7 +386,7 @@
return bkf;
}
- private static Pair<String, Integer> extractKeyAndSize(RawMessage m) {
+ protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
ByteBuf headersAndPayload = m.getHeadersAndPayload();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
if (msgMetadata.hasPartitionKey()) {
@@ -400,6 +400,16 @@
}
}
+ protected List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAndSizeFromBatch(RawMessage msg)
+ throws IOException {
+ return RawBatchConverter.extractIdsAndKeysAndSize(msg);
+ }
+
+ protected Optional<RawMessage> rebatchMessage(RawMessage msg, BiPredicate<String, MessageId> filter)
+ throws IOException {
+ return RawBatchConverter.rebatchMessage(msg, filter);
+ }
+
private static class PhaseOneResult {
final MessageId from;
final MessageId to; // last undeleted messageId
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 7b04717..342a409 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -140,6 +140,7 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -3123,7 +3124,7 @@
// mock actual compaction, we don't need to really run it
CompletableFuture<Long> promise = new CompletableFuture<Long>();
- Compactor compactor = pulsar.getCompactor();
+ Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
doReturn(promise).when(compactor).compact(topicName);
admin.topics().triggerCompaction(topicName);
@@ -3159,7 +3160,7 @@
// mock actual compaction, we don't need to really run it
CompletableFuture<Long> promise = new CompletableFuture<>();
- Compactor compactor = pulsar.getCompactor();
+ Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
doReturn(promise).when(compactor).compact(topicName + "-partition-0");
CompletableFuture<Long> promise1 = new CompletableFuture<>();
@@ -3203,7 +3204,7 @@
// mock actual compaction, we don't need to really run it
CompletableFuture<Long> promise = new CompletableFuture<Long>();
- Compactor compactor = pulsar.getCompactor();
+ Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
doReturn(promise).when(compactor).compact(topicName);
admin.topics().triggerCompaction(topicName);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index e720c9b..ab83c8f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -111,6 +111,7 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -2075,7 +2076,7 @@
// mock actual compaction, we don't need to really run it
CompletableFuture<Long> promise = new CompletableFuture<>();
- Compactor compactor = pulsar.getCompactor();
+ Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
doReturn(promise).when(compactor).compact(topicName);
admin.topics().triggerCompaction(topicName);
@@ -2112,7 +2113,7 @@
// mock actual compaction, we don't need to really run it
CompletableFuture<Long> promise = new CompletableFuture<>();
- Compactor compactor = pulsar.getCompactor();
+ Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
doReturn(promise).when(compactor).compact(topicName);
admin.topics().triggerCompaction(topicName);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 6ce6de1..cbbb880 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -119,6 +119,7 @@
public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
// create topic
final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+ topic.initialize().join();
CommandSubscribe cmd = new CommandSubscribe()
.setConsumerId(1)
.setTopic(successTopicName)
@@ -177,6 +178,7 @@
public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception {
// create topic
final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+ topic.initialize().join();
CommandSubscribe cmd = new CommandSubscribe()
.setConsumerId(1)
.setTopic(successTopicName)
@@ -241,6 +243,7 @@
public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception {
// create topic
final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+ topic.initialize().join();
CommandSubscribe cmd = new CommandSubscribe()
.setConsumerId(1)
.setTopic(successTopicName)
@@ -299,6 +302,7 @@
public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception {
// create topic
final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+ topic.initialize().join();
CommandSubscribe cmd = new CommandSubscribe()
.setConsumerId(1)
.setTopic(successTopicName)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index c49df3e..078208f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -93,13 +93,13 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
@@ -132,6 +132,7 @@
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
@@ -630,6 +631,7 @@
@Test
public void testSubscribeFail() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
+ topic.initialize().join();
// Empty subscription name
CommandSubscribe cmd = new CommandSubscribe()
@@ -666,6 +668,7 @@
@Test
public void testSubscribeUnsubscribe() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
+ topic.initialize().join();
CommandSubscribe cmd = new CommandSubscribe()
.setConsumerId(1)
@@ -1238,6 +1241,7 @@
public void testDeleteAndUnsubscribeTopic() throws Exception {
// create topic
final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+ topic.initialize().join();
CommandSubscribe cmd = new CommandSubscribe()
.setConsumerId(1)
.setTopic(successTopicName)
@@ -1347,6 +1351,7 @@
@Test
public void testDeleteTopicRaceConditions() throws Exception {
PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+ topic.initialize().join();
// override ledger deletion callback to slow down deletion
doAnswer(invocationOnMock -> {
@@ -1537,6 +1542,7 @@
.setSubType(SubType.Failover);
// 1. Subscribe with non partition topic
+ topic1.initialize().join();
Future<Consumer> f1 = topic1.subscribe(getSubscriptionOption(cmd1));
f1.get();
@@ -1552,6 +1558,7 @@
.setRequestId(1)
.setSubType(SubType.Failover);
+ topic2.initialize();
Future<Consumer> f2 = topic2.subscribe(getSubscriptionOption(cmd2));
f2.get();
@@ -1826,7 +1833,8 @@
@Test
public void testCompactionTriggeredAfterThresholdFirstInvocation() throws Exception {
CompletableFuture<Long> compactPromise = new CompletableFuture<>();
- Compactor compactor = pulsarTestContext.getPulsarService().getCompactor();
+ Compactor compactor = ((PulsarCompactionServiceFactory) pulsarTestContext.getPulsarService()
+ .getCompactionServiceFactory()).getCompactor();
doReturn(compactPromise).when(compactor).compact(anyString());
Policies policies = new Policies();
@@ -1857,7 +1865,8 @@
@Test
public void testCompactionTriggeredAfterThresholdSecondInvocation() throws Exception {
CompletableFuture<Long> compactPromise = new CompletableFuture<>();
- Compactor compactor = pulsarTestContext.getPulsarService().getCompactor();
+ Compactor compactor = ((PulsarCompactionServiceFactory) pulsarTestContext.getPulsarService()
+ .getCompactionServiceFactory()).getCompactor();
doReturn(compactPromise).when(compactor).compact(anyString());
ManagedCursor subCursor = mock(ManagedCursor.class);
@@ -1891,7 +1900,8 @@
@Test
public void testCompactionDisabledWithZeroThreshold() throws Exception {
CompletableFuture<Long> compactPromise = new CompletableFuture<>();
- Compactor compactor = pulsarTestContext.getPulsarService().getCompactor();
+ Compactor compactor = ((PulsarCompactionServiceFactory) pulsarTestContext.getPulsarService()
+ .getCompactionServiceFactory()).getCompactor();
doReturn(compactPromise).when(compactor).compact(anyString());
Policies policies = new Policies();
@@ -2161,6 +2171,7 @@
return null;
}).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any(), any());
PersistentTopic topic = new PersistentTopic(successTopicName, mockLedger, brokerService);
+ topic.initialize().join();
CommandSubscribe cmd = new CommandSubscribe()
.setConsumerId(1)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 19583a4..a665681 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -51,6 +51,7 @@
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.testng.annotations.Test;
@Slf4j
@@ -235,6 +236,7 @@
doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
doReturn(mock(PulsarResources.class)).when(pulsarService).getPulsarResources();
+ doReturn(mock(CompactionServiceFactory.class)).when(pulsarService).getCompactionServiceFactory();
ManagedLedger managedLedger = mock(ManagedLedger.class);
MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, mock(PersistentTopic.class), managedLedger));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index c4e4107..31d4683 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -81,6 +81,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.zookeeper.CreateMode;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
@@ -1739,7 +1740,7 @@
.value(data)
.send();
}
- Compactor compactor = pulsar.getCompactor();
+ Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
compactor.compact(topicName).get();
statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
index 517d57d..fcea99c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
@@ -26,7 +26,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
-import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -38,11 +38,11 @@
*/
abstract class AbstractTestPulsarService extends PulsarService {
protected final SpyConfig spyConfig;
- private boolean compactorExists;
public AbstractTestPulsarService(SpyConfig spyConfig, ServiceConfiguration config,
MetadataStoreExtended localMetadataStore,
- MetadataStoreExtended configurationMetadataStore, Compactor compactor,
+ MetadataStoreExtended configurationMetadataStore,
+ CompactionServiceFactory compactionServiceFactory,
BrokerInterceptor brokerInterceptor,
BookKeeperClientFactory bookKeeperClientFactory) {
super(config);
@@ -51,7 +51,7 @@
NonClosingProxyHandler.createNonClosingProxy(localMetadataStore, MetadataStoreExtended.class));
setConfigurationMetadataStore(
NonClosingProxyHandler.createNonClosingProxy(configurationMetadataStore, MetadataStoreExtended.class));
- setCompactor(compactor);
+ super.setCompactionServiceFactory(compactionServiceFactory);
setBrokerInterceptor(brokerInterceptor);
setBkClientFactory(bookKeeperClientFactory);
}
@@ -77,23 +77,6 @@
}
@Override
- protected void setCompactor(Compactor compactor) {
- if (compactor != null) {
- compactorExists = true;
- }
- super.setCompactor(compactor);
- }
-
- @Override
- public Compactor newCompactor() throws PulsarServerException {
- if (compactorExists) {
- return getCompactor();
- } else {
- return spyConfig.getCompactor().spy(super.newCompactor());
- }
- }
-
- @Override
public BookKeeperClientFactory newBookKeeperClientFactory() {
return getBkClientFactory();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockPulsarCompactionServiceFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockPulsarCompactionServiceFactory.java
new file mode 100644
index 0000000..77959fe
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockPulsarCompactionServiceFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
+
+public class MockPulsarCompactionServiceFactory extends PulsarCompactionServiceFactory {
+ private final Compactor compactor;
+ private final SpyConfig spyConfig;
+
+ public MockPulsarCompactionServiceFactory(SpyConfig spyConfig, Compactor compactor) {
+ this.compactor = compactor;
+ this.spyConfig = spyConfig;
+ }
+
+ @Override
+ protected Compactor newCompactor() throws PulsarServerException {
+ if (this.compactor != null) {
+ return this.compactor;
+ } else {
+ return spyConfig.getCompactor().spy(super.newCompactor());
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
index af365ed..2896f33 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -45,7 +45,7 @@
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -60,13 +60,14 @@
public NonStartableTestPulsarService(SpyConfig spyConfig, ServiceConfiguration config,
MetadataStoreExtended localMetadataStore,
MetadataStoreExtended configurationMetadataStore,
- Compactor compactor, BrokerInterceptor brokerInterceptor,
+ CompactionServiceFactory compactionServiceFactory,
+ BrokerInterceptor brokerInterceptor,
BookKeeperClientFactory bookKeeperClientFactory,
PulsarResources pulsarResources,
ManagedLedgerStorage managedLedgerClientFactory,
Function<BrokerService, BrokerService> brokerServiceCustomizer) {
- super(spyConfig, config, localMetadataStore, configurationMetadataStore, compactor, brokerInterceptor,
- bookKeeperClientFactory);
+ super(spyConfig, config, localMetadataStore, configurationMetadataStore, compactionServiceFactory,
+ brokerInterceptor, bookKeeperClientFactory);
setPulsarResources(pulsarResources);
setManagedLedgerClientFactory(managedLedgerClientFactory);
try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index f490ebd..40a4228 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -54,7 +54,9 @@
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -135,6 +137,8 @@
private final Compactor compactor;
+ private final CompactionServiceFactory compactionServiceFactory;
+
private final BrokerService brokerService;
@Getter(AccessLevel.NONE)
@@ -659,10 +663,19 @@
protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) {
BookKeeperClientFactory bookKeeperClientFactory =
new MockBookKeeperClientFactory(builder.bookKeeperClient);
+ CompactionServiceFactory compactionServiceFactory = builder.compactionServiceFactory;
+ if (builder.compactionServiceFactory == null && builder.config.getCompactionServiceFactoryClassName()
+ .equals(PulsarCompactionServiceFactory.class.getName())) {
+ compactionServiceFactory = new MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
+ }
PulsarService pulsarService = spyConfig.getPulsarService()
.spy(StartableTestPulsarService.class, spyConfig, builder.config, builder.localMetadataStore,
- builder.configurationMetadataStore, builder.compactor, builder.brokerInterceptor,
+ builder.configurationMetadataStore, compactionServiceFactory,
+ builder.brokerInterceptor,
bookKeeperClientFactory, builder.brokerServiceCustomizer);
+ if (compactionServiceFactory != null) {
+ compactionServiceFactory.initialize(pulsarService);
+ }
registerCloseable(() -> {
pulsarService.close();
resetSpyOrMock(pulsarService);
@@ -717,11 +730,20 @@
}
BookKeeperClientFactory bookKeeperClientFactory =
new MockBookKeeperClientFactory(builder.bookKeeperClient);
+ CompactionServiceFactory compactionServiceFactory = builder.compactionServiceFactory;
+ if (builder.compactionServiceFactory == null && builder.config.getCompactionServiceFactoryClassName()
+ .equals(PulsarCompactionServiceFactory.class.getName())) {
+ compactionServiceFactory = new MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
+ }
PulsarService pulsarService = spyConfig.getPulsarService()
.spy(NonStartableTestPulsarService.class, spyConfig, builder.config, builder.localMetadataStore,
- builder.configurationMetadataStore, builder.compactor, builder.brokerInterceptor,
+ builder.configurationMetadataStore, compactionServiceFactory,
+ builder.brokerInterceptor,
bookKeeperClientFactory, builder.pulsarResources,
builder.managedLedgerClientFactory, builder.brokerServiceCustomizer);
+ if (compactionServiceFactory != null) {
+ compactionServiceFactory.initialize(pulsarService);
+ }
registerCloseable(() -> {
pulsarService.close();
resetSpyOrMock(pulsarService);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
index de51cee..8c42998 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
@@ -98,9 +98,15 @@
*/
private final SpyType bookKeeperClient;
/**
- * Spy configuration for {@link PulsarService#getCompactor()}.
+ * Spy configuration for {@link PulsarService#getCompactionServiceFactory#getCompactor()}.
*/
private final SpyType compactor;
+
+ /**
+ * Spy configuration for {@link PulsarService#getCompactionServiceFactory()}.
+ */
+
+ private final SpyType compactedServiceFactory;
/**
* Spy configuration for {@link PulsarService#getNamespaceService()}.
*/
@@ -128,6 +134,7 @@
spyConfigBuilder.brokerService(defaultSpyType);
spyConfigBuilder.bookKeeperClient(defaultSpyType);
spyConfigBuilder.compactor(defaultSpyType);
+ spyConfigBuilder.compactedServiceFactory(defaultSpyType);
spyConfigBuilder.namespaceService(defaultSpyType);
return spyConfigBuilder;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
index 8a485a0..a5964c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
@@ -28,7 +28,7 @@
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
/**
@@ -41,12 +41,12 @@
public StartableTestPulsarService(SpyConfig spyConfig, ServiceConfiguration config,
MetadataStoreExtended localMetadataStore,
MetadataStoreExtended configurationMetadataStore,
- Compactor compactor,
+ CompactionServiceFactory compactionServiceFactory,
BrokerInterceptor brokerInterceptor,
BookKeeperClientFactory bookKeeperClientFactory,
Function<BrokerService, BrokerService> brokerServiceCustomizer) {
- super(spyConfig, config, localMetadataStore, configurationMetadataStore, compactor, brokerInterceptor,
- bookKeeperClientFactory);
+ super(spyConfig, config, localMetadataStore, configurationMetadataStore, compactionServiceFactory,
+ brokerInterceptor, bookKeeperClientFactory);
this.brokerServiceCustomizer = brokerServiceCustomizer;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index c4ec2ec..4e50401 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -21,8 +21,8 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_CURSOR_NAME;
import static org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
-import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
+import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
@@ -42,12 +42,12 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
-import java.util.ArrayList;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Map;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -99,8 +99,8 @@
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
-import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCallBack;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
@@ -124,9 +124,9 @@
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessagesImpl;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
@@ -138,14 +138,16 @@
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.awaitility.Awaitility;
import org.mockito.invocation.InvocationOnMock;
@@ -1571,6 +1573,9 @@
when(pulsar.getTransactionBufferSnapshotServiceFactory()).thenReturn(transactionBufferSnapshotServiceFactory);
TopicTransactionBufferProvider topicTransactionBufferProvider = new TopicTransactionBufferProvider();
when(pulsar.getTransactionBufferProvider()).thenReturn(topicTransactionBufferProvider);
+ CompactionServiceFactory compactionServiceFactory = new PulsarCompactionServiceFactory();
+ compactionServiceFactory.initialize(pulsar);
+ when(pulsar.getCompactionServiceFactory()).thenReturn(compactionServiceFactory);
// Mock BacklogQuotaManager
BacklogQuotaManager backlogQuotaManager = mock(BacklogQuotaManager.class);
// Mock brokerService.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 957671b..96adc67 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -21,10 +21,8 @@
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
@@ -36,9 +34,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
-
import lombok.Cleanup;
-
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -47,7 +43,6 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -665,7 +660,7 @@
producer.newMessage().key("k").value(("value").getBytes()).send();
producer.newMessage().key("k").value(null).send();
- pulsar.getCompactor().compact(topic).get();
+ ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor().compact(topic).get();
Awaitility.await()
.pollInterval(3, TimeUnit.SECONDS)