Move duplicate code to abstract parent class (#10061)
* Move duplicate code to abstract parent class
* code style
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 02305e4..d57fc7b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -19,20 +19,26 @@
package org.apache.pulsar.broker.service;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
@@ -148,6 +154,39 @@
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
}
+ /**
+ * Determine whether the number of consumers on the subscription reaches the threshold.
+ * @return
+ */
+ protected abstract boolean isConsumersExceededOnSubscription();
+
+ protected boolean isConsumersExceededOnSubscription(BrokerService brokerService,
+ String topic, int consumerSize) {
+ Policies policies = null;
+ Integer maxConsumersPerSubscription = null;
+ try {
+ maxConsumersPerSubscription = Optional.ofNullable(brokerService
+ .getTopicPolicies(TopicName.get(topic)))
+ .map(TopicPolicies::getMaxConsumersPerSubscription)
+ .orElse(null);
+ if (maxConsumersPerSubscription == null) {
+ // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
+ policies = brokerService.pulsar().getConfigurationCache().policiesCache()
+ .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
+ }
+ } catch (Exception e) {
+ log.debug("Get topic or namespace policies fail", e);
+ }
+
+ if (maxConsumersPerSubscription == null) {
+ maxConsumersPerSubscription = policies != null && policies.max_consumers_per_subscription > 0
+ ? policies.max_consumers_per_subscription :
+ brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription();
+ }
+
+ return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
+ }
+
private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
// Remove the protobuf headers
Commands.skipMessageMetadata(headersAndPayload);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 9eb5a2e..b142c51 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -70,8 +70,6 @@
protected abstract void cancelPendingRead();
- protected abstract boolean isConsumersExceededOnSubscription();
-
protected void notifyActiveConsumerChanged(Consumer activeConsumer) {
if (null != activeConsumer && subscriptionType == SubType.Failover) {
consumers.forEach(consumer ->
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 00584cf..b6f4ca7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -86,7 +86,8 @@
consumerSet.add(consumer);
}
- private boolean isConsumersExceededOnSubscription() {
+ @Override
+ protected boolean isConsumersExceededOnSubscription() {
final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
return true;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index cecdbaf..a720767 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -18,13 +18,10 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import java.util.List;
-import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
@@ -33,9 +30,6 @@
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
@@ -79,37 +73,9 @@
}
}
+ @Override
protected boolean isConsumersExceededOnSubscription() {
- Policies policies = null;
- Integer maxConsumersPerSubscription = null;
- try {
- maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
- .getTopicPolicies(TopicName.get(topicName)))
- .map(TopicPolicies::getMaxConsumersPerSubscription)
- .orElse(null);
- if (maxConsumersPerSubscription == null) {
- // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
- policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
-
- if (policies == null) {
- policies = new Policies();
- }
- }
- } catch (Exception e) {
- policies = new Policies();
- }
-
- if (maxConsumersPerSubscription == null) {
- maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0
- ? policies.max_consumers_per_subscription :
- serviceConfig.getMaxConsumersPerSubscription();
- }
-
- if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
- return true;
- }
- return false;
+ return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size());
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index eec0f93..2c05644 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
@@ -40,7 +39,6 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -59,10 +57,8 @@
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
import org.apache.pulsar.common.util.collections.LongPairSet;
@@ -158,37 +154,9 @@
consumerSet.add(consumer);
}
- private boolean isConsumersExceededOnSubscription() {
- Policies policies = null;
- Integer maxConsumersPerSubscription = null;
- try {
- maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
- .getTopicPolicies(TopicName.get(topic.getName())))
- .map(TopicPolicies::getMaxConsumersPerSubscription)
- .orElse(null);
-
- if (maxConsumersPerSubscription == null) {
- // Use getDataIfPresent from zk cache to make the call non-blocking and
- // prevent deadlocks in addConsumer
- policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
- if (policies == null) {
- policies = new Policies();
- }
- }
- } catch (Exception e) {
- policies = new Policies();
- }
-
- if (maxConsumersPerSubscription == null) {
- maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0
- ? policies.max_consumers_per_subscription : serviceConfig.getMaxConsumersPerSubscription();
- }
-
- if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
- return true;
- }
- return false;
+ @Override
+ protected boolean isConsumersExceededOnSubscription() {
+ return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumerList.size());
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 6e28a3c..93c2d25 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import java.util.Iterator;
import java.util.List;
@@ -36,7 +35,6 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
@@ -50,10 +48,8 @@
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,36 +128,9 @@
}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
}
+ @Override
protected boolean isConsumersExceededOnSubscription() {
- Policies policies = null;
- Integer maxConsumersPerSubscription = null;
- try {
- maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
- .getTopicPolicies(TopicName.get(topicName)))
- .map(TopicPolicies::getMaxConsumersPerSubscription)
- .orElse(null);
- if (maxConsumersPerSubscription == null) {
- // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
- policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
-
- if (policies == null) {
- policies = new Policies();
- }
- }
- } catch (Exception e) {
- policies = new Policies();
- }
-
- if (maxConsumersPerSubscription == null) {
- maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0
- ? policies.max_consumers_per_subscription : serviceConfig.getMaxConsumersPerSubscription();
- }
-
- if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
- return true;
- }
- return false;
+ return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size());
}
@Override