Revert "[fix][broker] Fix entry filter feature for the non-persistent topic (#20141)"
This reverts commit e27abe9e128fb71b65ffe06417574c9a7f3facbd.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 33258b0..317b8df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -26,7 +26,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -199,8 +199,7 @@
// entry internally retains data so, duplicateBuffer should be release here
duplicateBuffer.release();
if (subscription.getDispatcher() != null) {
- // Dispatcher needs to call the set method to support entry filter feature.
- subscription.getDispatcher().sendMessages(Arrays.asList(entry));
+ subscription.getDispatcher().sendMessages(Collections.singletonList(entry));
} else {
// it happens when subscription is created but dispatcher is not created as consumer is not added
// yet
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index b868858..4b9d91f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -51,7 +51,6 @@
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryFilterSupport;
-import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
@@ -287,16 +286,10 @@
}
- @DataProvider(name = "topicProvider")
- public Object[][] topicProvider() {
- return new Object[][]{
- {"persistent://prop/ns-abc/topic" + UUID.randomUUID()},
- {"non-persistent://prop/ns-abc/topic" + UUID.randomUUID()},
- };
- }
- @Test(dataProvider = "topicProvider")
- public void testFilteredMsgCount(String topic) throws Throwable {
+ @Test
+ public void testFilteredMsgCount() throws Throwable {
+ String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
String subName = "sub";
try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
@@ -305,7 +298,7 @@
.subscriptionName(subName).subscribe()) {
// mock entry filters
- Subscription subscription = pulsar.getBrokerService()
+ PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
Dispatcher dispatcher = subscription.getDispatcher();
Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index d5e0066..bf9c1d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -211,22 +211,21 @@
hasFilterField.set(dispatcher, true);
}
- int rejectedCount = 100;
- int acceptCount = 100;
- int scheduleCount = 100;
- for (int i = 0; i < rejectedCount; i++) {
- producer.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send();
- }
- for (int i = 0; i < acceptCount; i++) {
+ for (int i = 0; i < 100; i++) {
producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
}
- for (int i = 0; i < scheduleCount; i++) {
+ for (int i = 0; i < 100; i++) {
+ producer.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send();
+ }
+ for (int i = 0; i < 100; i++) {
producer.newMessage().property("RESCHEDULE", " ").value(UUID.randomUUID().toString()).send();
}
- for (int i = 0; i < acceptCount; i++) {
- Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
- Assert.assertNotNull(message);
+ for (;;) {
+ Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
consumer.acknowledge(message);
}
@@ -264,12 +263,12 @@
.mapToDouble(m-> m.value).sum();
if (setFilter) {
- Assert.assertEquals(filterAccepted, acceptCount);
- Assert.assertEquals(filterRejected, rejectedCount);
- // Only works on the test, if there are some markers,
- // the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount
- Assert.assertEquals(throughFilter,
- filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
+ Assert.assertEquals(filterAccepted, 100);
+ if (isPersistent) {
+ Assert.assertEquals(filterRejected, 100);
+ // Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount
+ Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
+ }
} else {
Assert.assertEquals(throughFilter, 0D);
Assert.assertEquals(filterAccepted, 0D);
@@ -283,20 +282,19 @@
Assert.assertEquals(rescheduledMetrics.size(), 0);
}
- testSubscriptionStatsAdminApi(topic, subName, setFilter, acceptCount, rejectedCount);
+ testSubscriptionStatsAdminApi(topic, subName, setFilter);
}
- private void testSubscriptionStatsAdminApi(String topic, String subName, boolean setFilter,
- int acceptCount, int rejectedCount) throws Exception {
+ private void testSubscriptionStatsAdminApi(String topic, String subName, boolean setFilter) throws Exception {
boolean persistent = TopicName.get(topic).isPersistent();
TopicStats topicStats = admin.topics().getStats(topic);
SubscriptionStats stats = topicStats.getSubscriptions().get(subName);
Assert.assertNotNull(stats);
if (setFilter) {
- Assert.assertEquals(stats.getFilterAcceptedMsgCount(), acceptCount);
+ Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100);
if (persistent) {
- Assert.assertEquals(stats.getFilterRejectedMsgCount(), rejectedCount);
+ Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100);
// Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount
Assert.assertEquals(stats.getFilterProcessedMsgCount(),
stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount()