[fix][broker]fix the publish latency spike issue with large number of producers (#20607)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 1371019..8270690 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -35,6 +35,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -131,7 +132,11 @@
private static final AtomicLongFieldUpdater<AbstractTopic> RATE_LIMITED_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "publishRateLimitedTimes");
- protected volatile long publishRateLimitedTimes = 0;
+ protected volatile long publishRateLimitedTimes = 0L;
+
+ private static final AtomicIntegerFieldUpdater<AbstractTopic> USER_CREATED_PRODUCER_COUNTER_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(AbstractTopic.class, "userCreatedProducerCount");
+ private volatile int userCreatedProducerCount = 0;
protected volatile Optional<Long> topicEpoch = Optional.empty();
private volatile boolean hasExclusiveProducer;
@@ -447,14 +452,8 @@
return false;
}
Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
- if (maxProducers != null && maxProducers > 0 && maxProducers <= getUserCreatedProducersSize()) {
- return true;
- }
- return false;
- }
-
- private long getUserCreatedProducersSize() {
- return producers.values().stream().filter(p -> !p.isRemote()).count();
+ return maxProducers != null && maxProducers > 0
+ && maxProducers <= USER_CREATED_PRODUCER_COUNTER_UPDATER.get(this);
}
protected void registerTopicPolicyListener() {
@@ -988,6 +987,8 @@
Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
tryOverwriteOldProducer(existProducer, producer);
+ } else if (!producer.isRemote()) {
+ USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
}
}
@@ -1020,6 +1021,9 @@
checkArgument(producer.getTopic() == this);
if (producers.remove(producer.getProducerName(), producer)) {
+ if (!producer.isRemote()) {
+ USER_CREATED_PRODUCER_COUNTER_UPDATER.decrementAndGet(this);
+ }
handleProducerRemoved(producer);
}
}