Revert "[fix][broker][branch-3.2] Fix broker not starting when both transacti…"
This reverts commit 158d5eb670c9fd7b123c204533ac6cf8cb439ccd.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 3bedf49..e5d90bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -58,7 +58,6 @@
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
-import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
@@ -158,8 +157,7 @@
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
- && !isEventSystemTopic(TopicName.get(topicName))
- && !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) {
+ && !isEventSystemTopic(TopicName.get(topicName))) {
this.pendingAckHandle = new PendingAckHandleImpl(this);
} else {
this.pendingAckHandle = new PendingAckHandleDisabled();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9c93407..bb0796b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -322,8 +322,7 @@
TopicName topicName = TopicName.get(topic);
if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
&& !isEventSystemTopic(topicName)
- && !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())
- && !ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
+ && !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
deleted file mode 100644
index d9c6f78..0000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.loadbalance.extensions;
-
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import com.google.common.collect.Sets;
-import java.util.concurrent.CompletableFuture;
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
-import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
-import org.apache.pulsar.broker.testcontext.PulsarTestContext;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.impl.LookupService;
-import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.SystemTopicNames;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.policies.data.TopicType;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-
-public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServiceBaseTest {
-
- protected PulsarService pulsar1;
- protected PulsarService pulsar2;
-
- protected PulsarTestContext additionalPulsarTestContext;
-
- protected ExtensibleLoadManagerImpl primaryLoadManager;
-
- protected ExtensibleLoadManagerImpl secondaryLoadManager;
-
- protected ServiceUnitStateChannelImpl channel1;
- protected ServiceUnitStateChannelImpl channel2;
-
- protected final String defaultTestNamespace;
-
- protected LookupService lookupService;
-
- protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) {
- this.defaultTestNamespace = defaultTestNamespace;
- }
-
- protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
- conf.setForceDeleteNamespaceAllowed(true);
- conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
- conf.setAllowAutoTopicCreation(true);
- conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
- conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
- conf.setLoadBalancerSheddingEnabled(false);
- conf.setLoadBalancerDebugModeEnabled(true);
- conf.setTopicLevelPoliciesEnabled(true);
- return conf;
- }
-
- @Override
- @BeforeClass(alwaysRun = true)
- protected void setup() throws Exception {
- initConfig(conf);
- super.internalSetup(conf);
- pulsar1 = pulsar;
- var conf2 = initConfig(getDefaultConf());
- additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2);
- pulsar2 = additionalPulsarTestContext.getPulsarService();
-
- setPrimaryLoadManager();
- setSecondaryLoadManager();
-
- admin.clusters().createCluster(this.conf.getClusterName(),
- ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
- admin.tenants().createTenant("public",
- new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
- Sets.newHashSet(this.conf.getClusterName())));
- admin.namespaces().createNamespace("public/default");
- admin.namespaces().setNamespaceReplicationClusters("public/default",
- Sets.newHashSet(this.conf.getClusterName()));
-
- admin.namespaces().createNamespace(defaultTestNamespace, 128);
- admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
- Sets.newHashSet(this.conf.getClusterName()));
- lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
- }
-
- @Override
- @AfterClass(alwaysRun = true)
- protected void cleanup() throws Exception {
- this.additionalPulsarTestContext.close();
- super.internalCleanup();
- }
-
- @BeforeMethod(alwaysRun = true)
- protected void initializeState() throws PulsarAdminException, IllegalAccessException {
- admin.namespaces().unload(defaultTestNamespace);
- reset(primaryLoadManager, secondaryLoadManager);
- FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true);
- }
-
- protected void setPrimaryLoadManager() throws IllegalAccessException {
- ExtensibleLoadManagerWrapper wrapper =
- (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
- primaryLoadManager = spy((ExtensibleLoadManagerImpl)
- FieldUtils.readField(wrapper, "loadManager", true));
- FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true);
- channel1 = (ServiceUnitStateChannelImpl)
- FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true);
- }
-
- private void setSecondaryLoadManager() throws IllegalAccessException {
- ExtensibleLoadManagerWrapper wrapper =
- (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
- secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
- FieldUtils.readField(wrapper, "loadManager", true));
- FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true);
- channel2 = (ServiceUnitStateChannelImpl)
- FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true);
- }
-
- protected CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
- return pulsar.getNamespaceService().getBundleAsync(topic);
- }
-
- protected Pair<TopicName, NamespaceBundle> getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
- throws Exception {
- TopicName changeEventsTopicName =
- TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
- NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get();
- int i = 0;
- while(true) {
- TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i);
- NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
- if (!bundle.equals(changeEventsBundle)) {
- return Pair.of(topicName, bundle);
- }
- i++;
- }
- }
-}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 3949aa6..b6073e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -83,6 +83,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
@@ -108,6 +109,7 @@
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -121,18 +123,25 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.awaitility.Awaitility;
import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -142,10 +151,79 @@
@Slf4j
@Test(groups = "broker")
@SuppressWarnings("unchecked")
-public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBaseTest {
+public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
- public ExtensibleLoadManagerImplTest() {
- super("public/test");
+ private PulsarService pulsar1;
+ private PulsarService pulsar2;
+
+ private PulsarTestContext additionalPulsarTestContext;
+
+ private ExtensibleLoadManagerImpl primaryLoadManager;
+
+ private ExtensibleLoadManagerImpl secondaryLoadManager;
+
+ private ServiceUnitStateChannelImpl channel1;
+ private ServiceUnitStateChannelImpl channel2;
+
+ private final String defaultTestNamespace = "public/test";
+
+ private LookupService lookupService;
+
+ private static void initConfig(ServiceConfiguration conf){
+ conf.setForceDeleteNamespaceAllowed(true);
+ conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+ conf.setAllowAutoTopicCreation(true);
+ conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+ conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+ conf.setLoadBalancerSheddingEnabled(false);
+ conf.setLoadBalancerDebugModeEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ }
+
+ @BeforeClass
+ @Override
+ public void setup() throws Exception {
+ // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid
+ // stuck when doing unload.
+ initConfig(conf);
+ super.internalSetup(conf);
+ pulsar1 = pulsar;
+ ServiceConfiguration defaultConf = getDefaultConf();
+ initConfig(defaultConf);
+ additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
+ pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+ setPrimaryLoadManager();
+
+ setSecondaryLoadManager();
+
+ admin.clusters().createCluster(this.conf.getClusterName(),
+ ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant("public",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
+ Sets.newHashSet(this.conf.getClusterName())));
+ admin.namespaces().createNamespace("public/default");
+ admin.namespaces().setNamespaceReplicationClusters("public/default",
+ Sets.newHashSet(this.conf.getClusterName()));
+
+ admin.namespaces().createNamespace(defaultTestNamespace, 128);
+ admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
+ Sets.newHashSet(this.conf.getClusterName()));
+ lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ this.additionalPulsarTestContext.close();
+ super.internalCleanup();
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ protected void initializeState() throws PulsarAdminException, IllegalAccessException {
+ admin.namespaces().unload(defaultTestNamespace);
+ reset(primaryLoadManager, secondaryLoadManager);
+ FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true);
}
@Test
@@ -1515,4 +1593,43 @@
}
+ private void setPrimaryLoadManager() throws IllegalAccessException {
+ ExtensibleLoadManagerWrapper wrapper =
+ (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
+ primaryLoadManager = spy((ExtensibleLoadManagerImpl)
+ FieldUtils.readField(wrapper, "loadManager", true));
+ FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true);
+ channel1 = (ServiceUnitStateChannelImpl)
+ FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true);
+ }
+
+ private void setSecondaryLoadManager() throws IllegalAccessException {
+ ExtensibleLoadManagerWrapper wrapper =
+ (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
+ secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
+ FieldUtils.readField(wrapper, "loadManager", true));
+ FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true);
+ channel2 = (ServiceUnitStateChannelImpl)
+ FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true);
+ }
+
+ private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
+ return pulsar.getNamespaceService().getBundleAsync(topic);
+ }
+
+ private Pair<TopicName, NamespaceBundle> getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
+ throws Exception {
+ TopicName changeEventsTopicName =
+ TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+ NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get();
+ int i = 0;
+ while(true) {
+ TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i);
+ NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ if (!bundle.equals(changeEventsBundle)) {
+ return Pair.of(topicName, bundle);
+ }
+ i++;
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
deleted file mode 100644
index 0c95dd8..0000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.loadbalance.extensions;
-
-import static org.testng.Assert.assertEquals;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.awaitility.Awaitility;
-import org.testng.annotations.Test;
-
-@Test(groups = "broker")
-public class ExtensibleLoadManagerImplWithTransactionCoordinatorTest extends ExtensibleLoadManagerImplBaseTest {
-
- public ExtensibleLoadManagerImplWithTransactionCoordinatorTest() {
- super("public/test-elb-with-tx");
- }
-
- @Override
- protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
- conf = super.initConfig(conf);
- conf.setTransactionCoordinatorEnabled(true);
- return conf;
- }
-
- @Test(timeOut = 30 * 1000)
- public void testUnloadAdminAPI() throws Exception {
- var topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-unload");
- var topicName = topicAndBundle.getLeft();
- var bundle = topicAndBundle.getRight();
-
- var srcBroker = admin.lookups().lookupTopic(topicName.toString());
- var dstBroker = srcBroker.equals(pulsar1.getBrokerServiceUrl()) ? pulsar2 : pulsar1;
- var dstBrokerUrl = dstBroker.getBrokerId();
- var dstBrokerServiceUrl = dstBroker.getBrokerServiceUrl();
-
- admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(), dstBrokerUrl);
- Awaitility.await().untilAsserted(
- () -> assertEquals(admin.lookups().lookupTopic(topicName.toString()), dstBrokerServiceUrl));
- }
-}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
deleted file mode 100644
index 618053a..0000000
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.messaging;
-
-import java.util.List;
-import java.util.Map;
-import java.util.function.Supplier;
-import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
-import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
-import org.apache.pulsar.common.naming.TopicDomain;
-import org.testng.ITest;
-import org.testng.annotations.Factory;
-import org.testng.annotations.Test;
-
-public class MessagingSmokeTest extends TopicMessagingBase implements ITest {
-
- @Factory
- public static Object[] messagingTests() {
- List<?> tests = List.of(
- new MessagingSmokeTest("Extensible Load Manager",
- Map.of("loadManagerClassName", ExtensibleLoadManagerImpl.class.getName(),
- "loadBalancerLoadSheddingStrategy", TransferShedder.class.getName())),
- new MessagingSmokeTest("Extensible Load Manager with TX Coordinator",
- Map.of("loadManagerClassName", ExtensibleLoadManagerImpl.class.getName(),
- "loadBalancerLoadSheddingStrategy", TransferShedder.class.getName(),
- "transactionCoordinatorEnabled", "true"))
- );
- return tests.toArray();
- }
-
- private final String name;
-
- public MessagingSmokeTest(String name, Map<String, String> brokerEnvs) {
- super();
- this.brokerEnvs.putAll(brokerEnvs);
- this.name = name;
- }
-
- @Override
- public String getTestName() {
- return name;
- }
-
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testNonPartitionedTopicMessagingWithExclusive(Supplier<String> serviceUrl, TopicDomain topicDomain)
- throws Exception {
- nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
- }
-
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testPartitionedTopicMessagingWithExclusive(Supplier<String> serviceUrl, TopicDomain topicDomain)
- throws Exception {
- partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
- }
-
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testNonPartitionedTopicMessagingWithFailover(Supplier<String> serviceUrl, TopicDomain topicDomain)
- throws Exception {
- nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
- }
-
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testPartitionedTopicMessagingWithFailover(Supplier<String> serviceUrl, TopicDomain topicDomain)
- throws Exception {
- partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
- }
-
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testNonPartitionedTopicMessagingWithShared(Supplier<String> serviceUrl, TopicDomain topicDomain)
- throws Exception {
- nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
- }
-
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testPartitionedTopicMessagingWithShared(Supplier<String> serviceUrl, TopicDomain topicDomain)
- throws Exception {
- partitionedTopicSendAndReceiveWithShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
- }
-
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testNonPartitionedTopicMessagingWithKeyShared(Supplier<String> serviceUrl, TopicDomain topicDomain)
- throws Exception {
- nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
- }
-
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testPartitionedTopicMessagingWithKeyShared(Supplier<String> serviceUrl, TopicDomain topicDomain)
- throws Exception {
- partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
- }
-}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 79cd55d..77f21df 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -118,7 +118,7 @@
this.brokerContainers = Maps.newTreeMap();
this.workerContainers = Maps.newTreeMap();
- this.proxyContainer = new ProxyContainer(clusterName, appendClusterName(ProxyContainer.NAME), spec.enableTls)
+ this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME, spec.enableTls)
.withNetwork(network)
.withNetworkAliases(appendClusterName("pulsar-proxy"))
.withEnv("zkServers", appendClusterName(ZKContainer.NAME))
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
index 93e2221a..ae9e44f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
@@ -25,7 +25,6 @@
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.common.naming.TopicDomain;
import org.testng.annotations.DataProvider;
import java.util.stream.Stream;
@@ -87,20 +86,6 @@
};
}
- @DataProvider
- public Object[][] serviceUrlAndTopicDomain() {
- return new Object[][] {
- {
- stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()),
- TopicDomain.persistent
- },
- {
- stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()),
- TopicDomain.non_persistent
- },
- };
- }
-
protected PulsarAdmin pulsarAdmin;
protected PulsarCluster pulsarCluster;
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml
index c6cd900..cfbdb22 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -28,7 +28,6 @@
<class name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" />
<class name="org.apache.pulsar.tests.integration.messaging.ReaderMessagingTest" />
<class name="org.apache.pulsar.tests.integration.messaging.NonDurableConsumerMessagingTest" />
- <class name="org.apache.pulsar.tests.integration.messaging.MessagingSmokeTest" />
<class name="org.apache.pulsar.tests.integration.admin.AdminTest" />
</classes>
</test>