[fix][broker][branch-3.2] Fix broker not starting when both transactions and the Extensible Load Manager are enabled (#22193)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index e5d90bf..3bedf49 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -58,6 +58,7 @@
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
@@ -157,7 +158,8 @@
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
- && !isEventSystemTopic(TopicName.get(topicName))) {
+ && !isEventSystemTopic(TopicName.get(topicName))
+ && !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) {
this.pendingAckHandle = new PendingAckHandleImpl(this);
} else {
this.pendingAckHandle = new PendingAckHandleDisabled();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bb0796b..9c93407 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -322,7 +322,8 @@
TopicName topicName = TopicName.get(topic);
if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
&& !isEventSystemTopic(topicName)
- && !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+ && !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())
+ && !ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
new file mode 100644
index 0000000..d9c6f78
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServiceBaseTest {
+
+ protected PulsarService pulsar1;
+ protected PulsarService pulsar2;
+
+ protected PulsarTestContext additionalPulsarTestContext;
+
+ protected ExtensibleLoadManagerImpl primaryLoadManager;
+
+ protected ExtensibleLoadManagerImpl secondaryLoadManager;
+
+ protected ServiceUnitStateChannelImpl channel1;
+ protected ServiceUnitStateChannelImpl channel2;
+
+ protected final String defaultTestNamespace;
+
+ protected LookupService lookupService;
+
+ protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) {
+ this.defaultTestNamespace = defaultTestNamespace;
+ }
+
+ protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
+ conf.setForceDeleteNamespaceAllowed(true);
+ conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+ conf.setAllowAutoTopicCreation(true);
+ conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+ conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+ conf.setLoadBalancerSheddingEnabled(false);
+ conf.setLoadBalancerDebugModeEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ return conf;
+ }
+
+ @Override
+ @BeforeClass(alwaysRun = true)
+ protected void setup() throws Exception {
+ initConfig(conf);
+ super.internalSetup(conf);
+ pulsar1 = pulsar;
+ var conf2 = initConfig(getDefaultConf());
+ additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2);
+ pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+ setPrimaryLoadManager();
+ setSecondaryLoadManager();
+
+ admin.clusters().createCluster(this.conf.getClusterName(),
+ ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant("public",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
+ Sets.newHashSet(this.conf.getClusterName())));
+ admin.namespaces().createNamespace("public/default");
+ admin.namespaces().setNamespaceReplicationClusters("public/default",
+ Sets.newHashSet(this.conf.getClusterName()));
+
+ admin.namespaces().createNamespace(defaultTestNamespace, 128);
+ admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
+ Sets.newHashSet(this.conf.getClusterName()));
+ lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ this.additionalPulsarTestContext.close();
+ super.internalCleanup();
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ protected void initializeState() throws PulsarAdminException, IllegalAccessException {
+ admin.namespaces().unload(defaultTestNamespace);
+ reset(primaryLoadManager, secondaryLoadManager);
+ FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true);
+ }
+
+ protected void setPrimaryLoadManager() throws IllegalAccessException {
+ ExtensibleLoadManagerWrapper wrapper =
+ (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
+ primaryLoadManager = spy((ExtensibleLoadManagerImpl)
+ FieldUtils.readField(wrapper, "loadManager", true));
+ FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true);
+ channel1 = (ServiceUnitStateChannelImpl)
+ FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true);
+ }
+
+ private void setSecondaryLoadManager() throws IllegalAccessException {
+ ExtensibleLoadManagerWrapper wrapper =
+ (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
+ secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
+ FieldUtils.readField(wrapper, "loadManager", true));
+ FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true);
+ channel2 = (ServiceUnitStateChannelImpl)
+ FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true);
+ }
+
+ protected CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
+ return pulsar.getNamespaceService().getBundleAsync(topic);
+ }
+
+ protected Pair<TopicName, NamespaceBundle> getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
+ throws Exception {
+ TopicName changeEventsTopicName =
+ TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+ NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get();
+ int i = 0;
+ while(true) {
+ TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i);
+ NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ if (!bundle.equals(changeEventsBundle)) {
+ return Pair.of(topicName, bundle);
+ }
+ i++;
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index b6073e4..3949aa6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -83,7 +83,6 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
@@ -109,7 +108,6 @@
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -123,25 +121,18 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
-import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
-import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.awaitility.Awaitility;
import org.testng.AssertJUnit;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -151,79 +142,10 @@
@Slf4j
@Test(groups = "broker")
@SuppressWarnings("unchecked")
-public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
+public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBaseTest {
- private PulsarService pulsar1;
- private PulsarService pulsar2;
-
- private PulsarTestContext additionalPulsarTestContext;
-
- private ExtensibleLoadManagerImpl primaryLoadManager;
-
- private ExtensibleLoadManagerImpl secondaryLoadManager;
-
- private ServiceUnitStateChannelImpl channel1;
- private ServiceUnitStateChannelImpl channel2;
-
- private final String defaultTestNamespace = "public/test";
-
- private LookupService lookupService;
-
- private static void initConfig(ServiceConfiguration conf){
- conf.setForceDeleteNamespaceAllowed(true);
- conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
- conf.setAllowAutoTopicCreation(true);
- conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
- conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
- conf.setLoadBalancerSheddingEnabled(false);
- conf.setLoadBalancerDebugModeEnabled(true);
- conf.setTopicLevelPoliciesEnabled(true);
- }
-
- @BeforeClass
- @Override
- public void setup() throws Exception {
- // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid
- // stuck when doing unload.
- initConfig(conf);
- super.internalSetup(conf);
- pulsar1 = pulsar;
- ServiceConfiguration defaultConf = getDefaultConf();
- initConfig(defaultConf);
- additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
- pulsar2 = additionalPulsarTestContext.getPulsarService();
-
- setPrimaryLoadManager();
-
- setSecondaryLoadManager();
-
- admin.clusters().createCluster(this.conf.getClusterName(),
- ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
- admin.tenants().createTenant("public",
- new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
- Sets.newHashSet(this.conf.getClusterName())));
- admin.namespaces().createNamespace("public/default");
- admin.namespaces().setNamespaceReplicationClusters("public/default",
- Sets.newHashSet(this.conf.getClusterName()));
-
- admin.namespaces().createNamespace(defaultTestNamespace, 128);
- admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
- Sets.newHashSet(this.conf.getClusterName()));
- lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
- }
-
- @Override
- @AfterClass(alwaysRun = true)
- protected void cleanup() throws Exception {
- this.additionalPulsarTestContext.close();
- super.internalCleanup();
- }
-
- @BeforeMethod(alwaysRun = true)
- protected void initializeState() throws PulsarAdminException, IllegalAccessException {
- admin.namespaces().unload(defaultTestNamespace);
- reset(primaryLoadManager, secondaryLoadManager);
- FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true);
+ public ExtensibleLoadManagerImplTest() {
+ super("public/test");
}
@Test
@@ -1593,43 +1515,4 @@
}
- private void setPrimaryLoadManager() throws IllegalAccessException {
- ExtensibleLoadManagerWrapper wrapper =
- (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
- primaryLoadManager = spy((ExtensibleLoadManagerImpl)
- FieldUtils.readField(wrapper, "loadManager", true));
- FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true);
- channel1 = (ServiceUnitStateChannelImpl)
- FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true);
- }
-
- private void setSecondaryLoadManager() throws IllegalAccessException {
- ExtensibleLoadManagerWrapper wrapper =
- (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
- secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
- FieldUtils.readField(wrapper, "loadManager", true));
- FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true);
- channel2 = (ServiceUnitStateChannelImpl)
- FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true);
- }
-
- private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
- return pulsar.getNamespaceService().getBundleAsync(topic);
- }
-
- private Pair<TopicName, NamespaceBundle> getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
- throws Exception {
- TopicName changeEventsTopicName =
- TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
- NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get();
- int i = 0;
- while(true) {
- TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i);
- NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
- if (!bundle.equals(changeEventsBundle)) {
- return Pair.of(topicName, bundle);
- }
- i++;
- }
- }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
new file mode 100644
index 0000000..0c95dd8
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import static org.testng.Assert.assertEquals;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ExtensibleLoadManagerImplWithTransactionCoordinatorTest extends ExtensibleLoadManagerImplBaseTest {
+
+ public ExtensibleLoadManagerImplWithTransactionCoordinatorTest() {
+ super("public/test-elb-with-tx");
+ }
+
+ @Override
+ protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
+ conf = super.initConfig(conf);
+ conf.setTransactionCoordinatorEnabled(true);
+ return conf;
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testUnloadAdminAPI() throws Exception {
+ var topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-unload");
+ var topicName = topicAndBundle.getLeft();
+ var bundle = topicAndBundle.getRight();
+
+ var srcBroker = admin.lookups().lookupTopic(topicName.toString());
+ var dstBroker = srcBroker.equals(pulsar1.getBrokerServiceUrl()) ? pulsar2 : pulsar1;
+ var dstBrokerUrl = dstBroker.getBrokerId();
+ var dstBrokerServiceUrl = dstBroker.getBrokerServiceUrl();
+
+ admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(), dstBrokerUrl);
+ Awaitility.await().untilAsserted(
+ () -> assertEquals(admin.lookups().lookupTopic(topicName.toString()), dstBrokerServiceUrl));
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
new file mode 100644
index 0000000..618053a
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.messaging;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.testng.ITest;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+
+public class MessagingSmokeTest extends TopicMessagingBase implements ITest {
+
+ @Factory
+ public static Object[] messagingTests() {
+ List<?> tests = List.of(
+ new MessagingSmokeTest("Extensible Load Manager",
+ Map.of("loadManagerClassName", ExtensibleLoadManagerImpl.class.getName(),
+ "loadBalancerLoadSheddingStrategy", TransferShedder.class.getName())),
+ new MessagingSmokeTest("Extensible Load Manager with TX Coordinator",
+ Map.of("loadManagerClassName", ExtensibleLoadManagerImpl.class.getName(),
+ "loadBalancerLoadSheddingStrategy", TransferShedder.class.getName(),
+ "transactionCoordinatorEnabled", "true"))
+ );
+ return tests.toArray();
+ }
+
+ private final String name;
+
+ public MessagingSmokeTest(String name, Map<String, String> brokerEnvs) {
+ super();
+ this.brokerEnvs.putAll(brokerEnvs);
+ this.name = name;
+ }
+
+ @Override
+ public String getTestName() {
+ return name;
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testNonPartitionedTopicMessagingWithExclusive(Supplier<String> serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testPartitionedTopicMessagingWithExclusive(Supplier<String> serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testNonPartitionedTopicMessagingWithFailover(Supplier<String> serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testPartitionedTopicMessagingWithFailover(Supplier<String> serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testNonPartitionedTopicMessagingWithShared(Supplier<String> serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testPartitionedTopicMessagingWithShared(Supplier<String> serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ partitionedTopicSendAndReceiveWithShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testNonPartitionedTopicMessagingWithKeyShared(Supplier<String> serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testPartitionedTopicMessagingWithKeyShared(Supplier<String> serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 77f21df..79cd55d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -118,7 +118,7 @@
this.brokerContainers = Maps.newTreeMap();
this.workerContainers = Maps.newTreeMap();
- this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME, spec.enableTls)
+ this.proxyContainer = new ProxyContainer(clusterName, appendClusterName(ProxyContainer.NAME), spec.enableTls)
.withNetwork(network)
.withNetworkAliases(appendClusterName("pulsar-proxy"))
.withEnv("zkServers", appendClusterName(ZKContainer.NAME))
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
index ae9e44f..93e2221a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
@@ -25,6 +25,7 @@
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.testng.annotations.DataProvider;
import java.util.stream.Stream;
@@ -86,6 +87,20 @@
};
}
+ @DataProvider
+ public Object[][] serviceUrlAndTopicDomain() {
+ return new Object[][] {
+ {
+ stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()),
+ TopicDomain.persistent
+ },
+ {
+ stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()),
+ TopicDomain.non_persistent
+ },
+ };
+ }
+
protected PulsarAdmin pulsarAdmin;
protected PulsarCluster pulsarCluster;
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml
index cfbdb22..c6cd900 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -28,6 +28,7 @@
<class name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" />
<class name="org.apache.pulsar.tests.integration.messaging.ReaderMessagingTest" />
<class name="org.apache.pulsar.tests.integration.messaging.NonDurableConsumerMessagingTest" />
+ <class name="org.apache.pulsar.tests.integration.messaging.MessagingSmokeTest" />
<class name="org.apache.pulsar.tests.integration.admin.AdminTest" />
</classes>
</test>