blob: 7bd82cdd840ee6688c6ebfc03d5beff87470013a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static java.lang.Thread.sleep;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "cluster-migration")
public class ClusterMigrationTest {
private static final Logger log = LoggerFactory.getLogger(ClusterMigrationTest.class);
protected String methodName;
String namespace = "pulsar/migrationNs";
String namespaceNotToMigrate = "pulsar/notToMigrateNs";
TestBroker broker1, broker2, broker3, broker4;
URL url1;
URL urlTls1;
PulsarService pulsar1;
PulsarAdmin admin1;
URL url2;
URL urlTls2;
PulsarService pulsar2;
PulsarAdmin admin2;
URL url3;
URL urlTls3;
PulsarService pulsar3;
PulsarAdmin admin3;
URL url4;
URL urlTls4;
PulsarService pulsar4;
PulsarAdmin admin4;
@DataProvider(name="NamespaceMigrationTopicSubscriptionTypes")
public Object[][] namespaceMigrationSubscriptionTypes() {
return new Object[][] {
{SubscriptionType.Shared, true, false},
{SubscriptionType.Shared, false, true},
{SubscriptionType.Shared, true, true},
};
}
@BeforeMethod(alwaysRun = true, timeOut = 300000)
public void setup() throws Exception {
log.info("--- Starting ReplicatorTestBase::setup ---");
broker1 = new TestBroker("r1");
broker2 = new TestBroker("r2");
broker3 = new TestBroker("r3");
broker4 = new TestBroker("r4");
pulsar1 = broker1.getPulsarService();
url1 = new URL(pulsar1.getWebServiceAddress());
urlTls1 = new URL(pulsar1.getWebServiceAddressTls());
admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
pulsar2 = broker2.getPulsarService();
url2 = new URL(pulsar2.getWebServiceAddress());
urlTls2 = new URL(pulsar2.getWebServiceAddressTls());
admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
pulsar3 = broker3.getPulsarService();
url3 = new URL(pulsar3.getWebServiceAddress());
urlTls3 = new URL(pulsar3.getWebServiceAddressTls());
admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build();
pulsar4 = broker4.getPulsarService();
url4 = new URL(pulsar4.getWebServiceAddress());
urlTls4 = new URL(pulsar4.getWebServiceAddressTls());
admin4 = PulsarAdmin.builder().serviceHttpUrl(url4.toString()).build();
admin1.clusters().createCluster("r1",
ClusterData.builder().serviceUrl(url1.toString()).serviceUrlTls(urlTls1.toString())
.brokerServiceUrl(pulsar1.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()).build());
admin3.clusters().createCluster("r1",
ClusterData.builder().serviceUrl(url1.toString()).serviceUrlTls(urlTls1.toString())
.brokerServiceUrl(pulsar1.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()).build());
admin2.clusters().createCluster("r2",
ClusterData.builder().serviceUrl(url2.toString()).serviceUrlTls(urlTls2.toString())
.brokerServiceUrl(pulsar2.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls()).build());
admin4.clusters().createCluster("r2",
ClusterData.builder().serviceUrl(url2.toString()).serviceUrlTls(urlTls2.toString())
.brokerServiceUrl(pulsar2.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls()).build());
admin1.clusters().createCluster("r3",
ClusterData.builder().serviceUrl(url3.toString()).serviceUrlTls(urlTls3.toString())
.brokerServiceUrl(pulsar3.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls()).build());
admin3.clusters().createCluster("r3",
ClusterData.builder().serviceUrl(url3.toString()).serviceUrlTls(urlTls3.toString())
.brokerServiceUrl(pulsar3.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls()).build());
admin2.clusters().createCluster("r4",
ClusterData.builder().serviceUrl(url4.toString()).serviceUrlTls(urlTls4.toString())
.brokerServiceUrl(pulsar4.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar4.getBrokerServiceUrlTls()).build());
admin4.clusters().createCluster("r4",
ClusterData.builder().serviceUrl(url4.toString()).serviceUrlTls(urlTls4.toString())
.brokerServiceUrl(pulsar4.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar4.getBrokerServiceUrlTls()).build());
// Setting r3 as replication cluster for r1
admin1.tenants().createTenant("pulsar",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r3")));
admin3.tenants().createTenant("pulsar",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r3")));
admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1", "r3"));
admin3.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r3"));
admin1.namespaces().createNamespace(namespaceNotToMigrate, Sets.newHashSet("r1", "r3"));
admin3.namespaces().createNamespace(namespaceNotToMigrate);
admin1.namespaces().setNamespaceReplicationClusters(namespaceNotToMigrate, Sets.newHashSet("r1", "r3"));
// Setting r4 as replication cluster for r2
admin2.tenants().createTenant("pulsar",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r2", "r4")));
admin4.tenants().createTenant("pulsar",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r2", "r4")));
admin2.namespaces().createNamespace(namespace, Sets.newHashSet("r2", "r4"));
admin4.namespaces().createNamespace(namespace);
admin2.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r2", "r4"));
admin2.namespaces().createNamespace(namespaceNotToMigrate, Sets.newHashSet("r2", "r4"));
admin4.namespaces().createNamespace(namespaceNotToMigrate);
admin2.namespaces().setNamespaceReplicationClusters(namespaceNotToMigrate, Sets.newHashSet("r2", "r4"));
assertEquals(admin1.clusters().getCluster("r1").getServiceUrl(), url1.toString());
assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
assertEquals(admin3.clusters().getCluster("r3").getServiceUrl(), url3.toString());
assertEquals(admin4.clusters().getCluster("r4").getServiceUrl(), url4.toString());
assertEquals(admin1.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl());
assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl());
assertEquals(admin3.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl());
assertEquals(admin4.clusters().getCluster("r4").getBrokerServiceUrl(), pulsar4.getBrokerServiceUrl());
sleep(100);
log.info("--- ReplicatorTestBase::setup completed ---");
}
@AfterMethod(alwaysRun = true, timeOut = 300000)
protected void cleanup() throws Exception {
log.info("--- Shutting down ---");
admin1.close();
admin2.close();
admin3.close();
admin4.close();
broker1.cleanup();
broker2.cleanup();
broker3.cleanup();
broker4.cleanup();
}
@BeforeMethod(alwaysRun = true)
public void beforeMethod(Method m) throws Exception {
methodName = m.getName();
}
/**
* Test producer/consumer migration: using persistent/non-persistent topic and all types of subscriptions
* (1) Producer1 and consumer1 connect to cluster-1
* (2) Close consumer1 to build backlog and publish messages using producer1
* (3) Migrate topic to cluster-2
* (4) Validate producer-1 is connected to cluster-2
* (5) create consumer1, drain backlog and migrate and reconnect to cluster-2
* (6) Create new consumer2 with different subscription on cluster-1,
* which immediately migrate and reconnect to cluster-2
* (7) Create producer-2 directly to cluster-2
* (8) Create producer-3 on cluster-1 which should be redirected to cluster-2
* (8) Publish messages using producer1, producer2, and producer3
* (9) Consume all messages by both consumer1 and consumer2
* (10) Create Producer/consumer on non-migrated cluster and verify their connection with cluster-1
* (11) Restart Broker-1 and connect producer/consumer on cluster-1
* @throws Exception
*/
@Test
public void testClusterMigration() throws Exception {
log.info("--- Starting ReplicatorTest::testClusterMigration ---");
final String topicName = BrokerTestUtil
.newUniqueName("persistent://" + namespace + "/migrationTopic");
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
// cluster-1 producer/consumer
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s1").subscribe();
AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 500);
assertFalse(topic1.getProducers().isEmpty());
assertFalse(topic1.getSubscriptions().isEmpty());
// build backlog
consumer1.close();
int n = 5;
for (int i = 0; i < n; i++) {
producer1.send("test1".getBytes());
}
@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
// cluster-2 producer/consumer
Producer<byte[]> producer2 = client2.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get();
assertFalse(topic2.getProducers().isEmpty());
ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(), migratedUrl);
retryStrategically((test) -> {
try {
topic1.checkClusterMigration().get();
return true;
} catch (Exception e) {
// ok
}
return false;
}, 10, 500);
topic1.checkClusterMigration().get();
log.info("before sending message");
sleep(1000);
producer1.sendAsync("test1".getBytes());
// producer is disconnected from cluster-1
retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500);
log.info("before asserting");
assertTrue(topic1.getProducers().isEmpty());
// create 3rd producer on cluster-1 which should be redirected to cluster-2
Producer<byte[]> producer3 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-2").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
// producer is connected with cluster-2
retryStrategically((test) -> topic2.getProducers().size() == 3, 10, 500);
assertTrue(topic2.getProducers().size() == 3);
// try to consume backlog messages from cluster-1
consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
for (int i = 0; i < n; i++) {
Message<byte[]> msg = consumer1.receive();
assertEquals(msg.getData(), "test1".getBytes());
consumer1.acknowledge(msg);
}
// after consuming all messages, consumer should have disconnected
// from cluster-1 and reconnect with cluster-2
retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500);
assertFalse(topic2.getSubscriptions().isEmpty());
topic1.checkClusterMigration().get();
ConcurrentOpenHashMap<String, ? extends Replicator> replicators = topic1.getReplicators();
replicators.forEach((r, replicator) -> {
assertFalse(replicator.isConnected());
});
assertTrue(topic1.getSubscriptions().isEmpty());
// not also create a new consumer which should also reconnect to cluster-2
Consumer<byte[]> consumer2 = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s2").subscribe();
retryStrategically((test) -> topic2.getSubscription("s2") != null, 10, 500);
assertFalse(topic2.getSubscription("s2").getConsumers().isEmpty());
// new sub on migration topic must be redirected immediately
Consumer<byte[]> consumerM = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("sM").subscribe();
assertFalse(pulsar2.getBrokerService().getTopicReference(topicName).get().getSubscription("sM").getConsumers()
.isEmpty());
consumerM.close();
// migrate topic after creating subscription
String newTopicName = topicName + "-new";
consumerM = client1.newConsumer().topic(newTopicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("sM").subscribe();
retryStrategically((t) -> pulsar2.getBrokerService().getTopicReference(newTopicName).isPresent(), 5, 100);
pulsar2.getBrokerService().getTopicReference(newTopicName).get().checkClusterMigration().get();
retryStrategically((t) ->
pulsar2.getBrokerService().getTopicReference(newTopicName).isPresent() &&
pulsar2.getBrokerService().getTopicReference(newTopicName).get().getSubscription("sM")
.getConsumers().isEmpty(), 5, 100);
assertFalse(pulsar2.getBrokerService().getTopicReference(newTopicName).get().getSubscription("sM").getConsumers()
.isEmpty());
consumerM.close();
// publish messages to cluster-2 and consume them
for (int i = 0; i < n; i++) {
producer1.send("test2".getBytes());
producer2.send("test2".getBytes());
producer3.send("test2".getBytes());
}
log.info("Successfully published messages by migrated producers");
for (int i = 0; i < n * 3; i++) {
assertEquals(consumer1.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());
assertEquals(consumer2.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());
}
// create non-migrated topic which should connect to cluster-1
String diffTopic = BrokerTestUtil
.newUniqueName("persistent://" + namespace + "/migrationTopic");
Consumer<byte[]> consumerDiff = client1.newConsumer().topic(diffTopic).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s1-d").subscribe();
Producer<byte[]> producerDiff = client1.newProducer().topic(diffTopic).enableBatching(false)
.producerName("cluster1-d").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic topicDiff = (AbstractTopic) pulsar2.getBrokerService().getTopic(diffTopic, false).getNow(null).get();
assertNotNull(topicDiff);
for (int i = 0; i < n; i++) {
producerDiff.send("diff".getBytes());
assertEquals(consumerDiff.receive(2, TimeUnit.SECONDS).getData(), "diff".getBytes());
}
// restart broker-1
broker1.restart();
Producer<byte[]> producer4 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-4").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> consumer3 = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s3").subscribe();
retryStrategically((test) -> topic2.getProducers().size() == 4, 10, 500);
assertTrue(topic2.getProducers().size() == 4);
retryStrategically((test) -> topic2.getSubscription("s3") != null, 10, 500);
assertFalse(topic2.getSubscription("s3").getConsumers().isEmpty());
for (int i = 0; i < n; i++) {
producer4.send("test3".getBytes());
assertEquals(consumer1.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
assertEquals(consumer2.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
assertEquals(consumer3.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
}
client1.close();
client2.close();
log.info("Successfully consumed messages by migrated consumers");
}
@Test
public void testClusterMigrationWithReplicationBacklog() throws Exception {
log.info("--- Starting ReplicatorTest::testClusterMigrationWithReplicationBacklog ---");
final String topicName = BrokerTestUtil
.newUniqueName("persistent://" + namespace + "/migrationTopic");
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
@Cleanup
PulsarClient client3 = PulsarClient.builder().serviceUrl(url3.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
// cluster-1 producer/consumer
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s1").subscribe();
// cluster-3 consumer
Consumer<byte[]> consumer3 = client3.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s1").subscribe();
AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 500);
assertFalse(topic1.getProducers().isEmpty());
assertFalse(topic1.getSubscriptions().isEmpty());
// build backlog
consumer1.close();
retryStrategically((test) -> topic1.getReplicators().size() == 1, 10, 3000);
assertEquals(topic1.getReplicators().size(), 1);
// stop service in the replication cluster to build replication backlog
broker3.stop();
retryStrategically((test) -> broker3.getPulsarService() == null, 10, 1000);
assertNull(pulsar3.getBrokerService());
//publish messages into topic in "r1" cluster
int n = 5;
for (int i = 0; i < n; i++) {
producer1.send("test1".getBytes());
}
retryStrategically((test) -> topic1.isReplicationBacklogExist(), 10, 1000);
assertTrue(topic1.isReplicationBacklogExist());
@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
// cluster-2 producer/consumer
Producer<byte[]> producer2 = client2.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get();
log.info("name of topic 2 - {}", topic2.getName());
assertFalse(topic2.getProducers().isEmpty());
retryStrategically((test) -> topic2.getReplicators().size() == 1, 10, 2000);
log.info("replicators should be ready");
ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
log.info("update cluster migration called");
retryStrategically((test) -> {
try {
topic1.checkClusterMigration().get();
return true;
} catch (Exception e) {
// ok
}
return false;
}, 10, 500);
topic1.checkClusterMigration().get();
producer1.sendAsync("test1".getBytes());
// producer is disconnected from cluster-1
retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500);
assertTrue(topic1.getProducers().isEmpty());
// verify that the disconnected producer is not redirected
// to replication cluster since there is replication backlog.
assertEquals(topic2.getProducers().size(), 1);
// Restart the service in cluster "r3".
broker3.restart();
retryStrategically((test) -> broker3.getPulsarService() != null, 10, 1000);
assertNotNull(broker3.getPulsarService());
pulsar3 = broker3.getPulsarService();
// verify that the replication backlog drains once service in cluster "r3" is restarted.
retryStrategically((test) -> !topic1.isReplicationBacklogExist(), 10, 1000);
assertFalse(topic1.isReplicationBacklogExist());
// verify that the producer1 is now connected to migrated cluster "r2" since backlog is cleared.
topic1.checkClusterMigration().get();
// verify that the producer1 is now is now connected to migrated cluster "r2" since backlog is cleared.
retryStrategically((test) -> topic2.getProducers().size()==2, 10, 500);
assertEquals(topic2.getProducers().size(), 2);
client1.close();
client2.close();
client3.close();
}
/**
* This test validates that blue cluster first creates list of subscriptions into green cluster so, green cluster
* will not lose the data if producer migrates.
*
* @throws Exception
*/
@Test
public void testClusterMigrationWithResourceCreated() throws Exception {
log.info("--- Starting testClusterMigrationWithResourceCreated ---");
String tenant = "pulsar2";
String namespace = tenant + "/migration";
String greenClusterName = pulsar2.getConfig().getClusterName();
String blueClusterName = pulsar1.getConfig().getClusterName();
admin1.clusters().createCluster(greenClusterName,
ClusterData.builder().serviceUrl(url2.toString()).serviceUrlTls(urlTls2.toString())
.brokerServiceUrl(pulsar2.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls()).build());
admin2.clusters().createCluster(blueClusterName,
ClusterData.builder().serviceUrl(url1.toString()).serviceUrlTls(urlTls1.toString())
.brokerServiceUrl(pulsar1.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()).build());
admin1.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"),
Sets.newHashSet("r1", greenClusterName)));
// broker should handle already tenant creation
admin2.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"),
Sets.newHashSet("r1", greenClusterName)));
admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1", greenClusterName));
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/migrationTopic");
broker1.getPulsarService().getConfig().setClusterMigrationAutoResourceCreation(true);
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
// cluster-1 producer/consumer
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
// create subscriptions
admin1.topics().createSubscription(topicName, "s1", MessageId.earliest);
admin1.topics().createSubscription(topicName, "s2", MessageId.earliest);
ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
PersistentTopic topic1 = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null)
.get();
retryStrategically((test) -> {
try {
topic1.checkClusterMigration().get();
return true;
} catch (Exception e) {
// ok
}
return false;
}, 10, 500);
assertNotNull(admin2.tenants().getTenantInfo(tenant));
assertNotNull(admin2.namespaces().getPolicies(namespace));
List<String> subLists = admin2.topics().getSubscriptions(topicName);
assertTrue(subLists.contains("s1"));
assertTrue(subLists.contains("s2"));
int n = 5;
for (int i = 0; i < n; i++) {
producer1.send("test1".getBytes());
}
Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
for (int i = 0; i < n; i++) {
assertNotNull(consumer1.receive());
}
consumer1.close();
producer1.close();
// publish to new topic which should be redirected immediately
String newTopic = topicName+"-new";
producer1 = client1.newProducer().topic(newTopic).enableBatching(false)
.producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
retryStrategically((test) -> {
try {
pulsar2.getBrokerService().getTopic(newTopic, false).getNow(null).get();
return true;
} catch (Exception e) {
// ok
}
return false;
}, 10, 500);
PersistentTopic pulsar2Topic = (PersistentTopic) pulsar2.getBrokerService().getTopic(newTopic, false).getNow(null)
.get();
retryStrategically((test) -> {
try {
return !pulsar2Topic.getProducers().isEmpty();
} catch (Exception e) {
return false;
}
}, 10, 500);
assertFalse(pulsar2Topic.getProducers().isEmpty());
consumer1 = client1.newConsumer().topic(newTopic).subscriptionName("s1").subscribe();
retryStrategically((test) -> {
try {
return !pulsar2Topic.getSubscription("s1").getConsumers().isEmpty();
} catch (Exception e) {
return false;
}
}, 10, 500);
assertFalse(pulsar2Topic.getSubscription("s1").getConsumers().isEmpty());
client1.close();
}
@Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes")
public void testNamespaceMigration(SubscriptionType subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception {
log.info("--- Starting Test::testNamespaceMigration ---");
// topic for the namespace1 (to be migrated)
final String topicName = BrokerTestUtil
.newUniqueName("persistent://" + namespace + "/migrationTopic");
// topic for namespace2 (not to be migrated)
final String topicName2 = BrokerTestUtil
.newUniqueName("persistent://" + namespaceNotToMigrate + "/migrationTopic");
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
// blue cluster - namespace1 - producer/consumer
Producer<byte[]> blueProducerNs1_1 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("blue-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> blueConsumerNs1_1 = client1.newConsumer().topic(topicName).subscriptionType(subType)
.subscriptionName("s1").subscribe();
AbstractTopic blueTopicNs1_1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
retryStrategically((test) -> !blueTopicNs1_1.getProducers().isEmpty(), 5, 500);
retryStrategically((test) -> !blueTopicNs1_1.getSubscriptions().isEmpty(), 5, 500);
assertFalse(blueTopicNs1_1.getProducers().isEmpty());
assertFalse(blueTopicNs1_1.getSubscriptions().isEmpty());
// blue cluster - namespace2 - producer/consumer
Producer<byte[]> blueProducerNs2_1 = client1.newProducer().topic(topicName2).enableBatching(false)
.producerName("blue-producer-ns2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> blueConsumerNs2_1 = client1.newConsumer().topic(topicName2).subscriptionType(subType)
.subscriptionName("s1").subscribe();
AbstractTopic blueTopicNs2_1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName2, false).getNow(null).get();
retryStrategically((test) -> !blueTopicNs2_1.getProducers().isEmpty(), 5, 500);
retryStrategically((test) -> !blueTopicNs2_1.getSubscriptions().isEmpty(), 5, 500);
assertFalse(blueTopicNs2_1.getProducers().isEmpty());
assertFalse(blueTopicNs2_1.getSubscriptions().isEmpty());
// build backlog on the blue cluster
blueConsumerNs1_1.close();
blueConsumerNs2_1.close();
int n = 5;
for (int i = 0; i < n; i++) {
blueProducerNs1_1.send("test1".getBytes());
blueProducerNs2_1.send("test1".getBytes());
}
@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
// green cluster - namespace1 - producer/consumer
Producer<byte[]> greenProducerNs1_1 = client2.newProducer().topic(topicName).enableBatching(false)
.producerName("green-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic greenTopicNs1_1 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get();
assertFalse(greenTopicNs1_1.getProducers().isEmpty());
// green cluster - namespace2 - producer/consumer
Producer<byte[]> greenProducerNs2_1 = client2.newProducer().topic(topicName2).enableBatching(false)
.producerName("cluster2-nm1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic greenTopicNs2_1 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName2, false).getNow(null).get();
assertFalse(greenTopicNs2_1.getProducers().isEmpty());
// blue - green cluster migration
ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", isClusterMigrate, migratedUrl);
admin1.namespaces().updateMigrationState(namespace, isNamespaceMigrate);
retryStrategically((test) -> {
try {
blueTopicNs1_1.checkClusterMigration().get();
if (isClusterMigrate) {
blueTopicNs2_1.checkClusterMigration().get();
}
return true;
} catch (Exception e) {
// ok
}
return false;
}, 10, 500);
blueTopicNs1_1.checkClusterMigration().get();
if (isClusterMigrate) {
blueTopicNs2_1.checkClusterMigration().get();
}
log.info("before sending message");
sleep(1000);
blueProducerNs1_1.sendAsync("test1".getBytes());
blueProducerNs2_1.sendAsync("test1".getBytes());
// producer is disconnected from blue for namespace1 as cluster or namespace migration is enabled
retryStrategically((test) -> blueTopicNs1_1.getProducers().isEmpty(), 10, 500);
assertTrue(blueTopicNs1_1.getProducers().isEmpty());
if(isClusterMigrate){
// producer is disconnected from blue for namespace2 if cluster migration is enabled
retryStrategically((test) -> blueTopicNs2_1.getProducers().isEmpty(), 10, 500);
assertTrue(blueTopicNs2_1.getProducers().isEmpty());
} else {
// producer is not disconnected from blue for namespace2 if namespace migration is disabled
retryStrategically((test) -> !blueTopicNs2_1.getProducers().isEmpty(), 10, 500);
assertTrue(!blueTopicNs2_1.getProducers().isEmpty());
}
// create producer on blue which should be redirected to green
Producer<byte[]> blueProducerNs1_2 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("blue-producer-ns1-2").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
// producer is connected with green
retryStrategically((test) -> greenTopicNs1_1.getProducers().size() == 3, 10, 500);
assertTrue(greenTopicNs1_1.getProducers().size() == 3);
// blueProducerNs2_1 should be migrated to green if the cluster migration is enabled
// should not be migrated if the namespace migration is disabled for namespace2
if (isClusterMigrate) {
retryStrategically((test) -> greenTopicNs2_1.getProducers().size() == 2, 10, 500);
assertTrue(greenTopicNs2_1.getProducers().size() == 2);
} else{
retryStrategically((test) -> greenTopicNs2_1.getProducers().size() == 1, 10, 500);
assertTrue(greenTopicNs2_1.getProducers().size() == 1);
}
// try to consume backlog messages from cluster-1
blueConsumerNs1_1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
blueConsumerNs2_1 = client1.newConsumer().topic(topicName2).subscriptionName("s1").subscribe();
for (int i = 0; i < n; i++) {
Message<byte[]> msg = blueConsumerNs1_1.receive();
assertEquals(msg.getData(), "test1".getBytes());
blueConsumerNs1_1.acknowledge(msg);
Message<byte[]> msg2 = blueConsumerNs2_1.receive();
assertEquals(msg2.getData(), "test1".getBytes());
blueConsumerNs2_1.acknowledge(msg2);
}
// after consuming all messages, consumer should have disconnected
// from blue and reconnect with green
retryStrategically((test) -> !greenTopicNs1_1.getSubscriptions().isEmpty(), 10, 500);
assertFalse(greenTopicNs1_1.getSubscriptions().isEmpty());
if (isClusterMigrate) {
retryStrategically((test) -> !greenTopicNs2_1.getSubscriptions().isEmpty(), 10, 500);
assertFalse(greenTopicNs2_1.getSubscriptions().isEmpty());
} else {
retryStrategically((test) -> greenTopicNs2_1.getSubscriptions().isEmpty(), 10, 500);
assertTrue(greenTopicNs2_1.getSubscriptions().isEmpty());
}
blueTopicNs1_1.checkClusterMigration().get();
if (isClusterMigrate) {
blueTopicNs2_1.checkClusterMigration().get();
}
ConcurrentOpenHashMap<String, ? extends Replicator> replicators = blueTopicNs1_1.getReplicators();
replicators.forEach((r, replicator) -> {
assertFalse(replicator.isConnected());
});
assertTrue(blueTopicNs1_1.getSubscriptions().isEmpty());
if (isClusterMigrate) {
ConcurrentOpenHashMap<String, ? extends Replicator> replicatorsNm = blueTopicNs2_1.getReplicators();
replicatorsNm.forEach((r, replicator) -> {
assertFalse(replicator.isConnected());
});
assertTrue(blueTopicNs2_1.getSubscriptions().isEmpty());
} else {
ConcurrentOpenHashMap<String, ? extends Replicator> replicatorsNm = blueTopicNs2_1.getReplicators();
replicatorsNm.forEach((r, replicator) -> {
assertTrue(replicator.isConnected());
});
assertFalse(blueTopicNs2_1.getSubscriptions().isEmpty());
}
// create a new consumer on blue which should also reconnect to green
Consumer<byte[]> blueConsumerNs1_2 = client1.newConsumer().topic(topicName).subscriptionType(subType)
.subscriptionName("s2").subscribe();
Consumer<byte[]> blueConsumerNs2_2 = client1.newConsumer().topic(topicName2).subscriptionType(subType)
.subscriptionName("s2").subscribe();
retryStrategically((test) -> greenTopicNs1_1.getSubscription("s2") != null, 10, 500);
assertFalse(greenTopicNs1_1.getSubscription("s2").getConsumers().isEmpty());
if (isClusterMigrate) {
retryStrategically((test) -> greenTopicNs2_1.getSubscription("s2") != null, 10, 500);
assertFalse(greenTopicNs2_1.getSubscription("s2").getConsumers().isEmpty());
} else {
retryStrategically((test) -> greenTopicNs2_1.getSubscription("s2") == null, 10, 500);
}
// new sub on migration topic must be redirected immediately
Consumer<byte[]> consumerM = client1.newConsumer().topic(topicName).subscriptionType(subType)
.subscriptionName("sM").subscribe();
assertFalse(pulsar2.getBrokerService().getTopicReference(topicName).get().getSubscription("sM").getConsumers()
.isEmpty());
consumerM.close();
// migrate topic after creating subscription
String newTopicName = topicName + "-new";
consumerM = client1.newConsumer().topic(newTopicName).subscriptionType(subType)
.subscriptionName("sM").subscribe();
retryStrategically((t) -> pulsar1.getBrokerService().getTopicReference(newTopicName).isPresent(), 5, 100);
pulsar2.getBrokerService().getTopicReference(newTopicName).get().checkClusterMigration().get();
retryStrategically((t) ->
pulsar2.getBrokerService().getTopicReference(newTopicName).isPresent() &&
pulsar2.getBrokerService().getTopicReference(newTopicName).get().getSubscription("sM")
.getConsumers().isEmpty(), 5, 100);
assertFalse(pulsar2.getBrokerService().getTopicReference(newTopicName).get().getSubscription("sM").getConsumers()
.isEmpty());
consumerM.close();
// publish messages to cluster-2 and consume them
for (int i = 0; i < n; i++) {
blueProducerNs1_1.send("test2".getBytes());
blueProducerNs1_2.send("test2".getBytes());
greenProducerNs1_1.send("test2".getBytes());
}
log.info("Successfully published messages by migrated producers");
for (int i = 0; i < n * 3; i++) {
assertEquals(blueConsumerNs1_1.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());
assertEquals(blueConsumerNs1_2.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());
}
// create non-migrated topic which should connect to blue
String diffTopic = BrokerTestUtil
.newUniqueName("persistent://" + namespace + "/migrationTopic");
Consumer<byte[]> consumerDiff = client1.newConsumer().topic(diffTopic).subscriptionType(subType)
.subscriptionName("s1-d").subscribe();
Producer<byte[]> producerDiff = client1.newProducer().topic(diffTopic).enableBatching(false)
.producerName("cluster1-d").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic topicDiff = (AbstractTopic) pulsar2.getBrokerService().getTopic(diffTopic, false).getNow(null).get();
assertNotNull(topicDiff);
for (int i = 0; i < n; i++) {
producerDiff.send("diff".getBytes());
assertEquals(consumerDiff.receive(2, TimeUnit.SECONDS).getData(), "diff".getBytes());
}
// restart broker-1
broker1.restart();
Producer<byte[]> producer4 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-4").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> consumer3 = client1.newConsumer().topic(topicName).subscriptionType(subType)
.subscriptionName("s3").subscribe();
retryStrategically((test) -> greenTopicNs1_1.getProducers().size() == 4, 10, 500);
assertTrue(greenTopicNs1_1.getProducers().size() == 4);
retryStrategically((test) -> greenTopicNs1_1.getSubscription("s3") != null, 10, 500);
assertFalse(greenTopicNs1_1.getSubscription("s3").getConsumers().isEmpty());
for (int i = 0; i < n; i++) {
producer4.send("test3".getBytes());
assertEquals(blueConsumerNs1_1.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
assertEquals(blueConsumerNs1_2.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
assertEquals(consumer3.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
}
log.info("Successfully consumed messages by migrated consumers");
// clean up
blueConsumerNs1_1.close();
blueConsumerNs1_2.close();
blueConsumerNs2_1.close();
blueProducerNs1_1.close();
blueProducerNs1_2.close();
blueProducerNs2_1.close();
greenProducerNs1_1.close();
greenProducerNs2_1.close();
client1.close();
client2.close();
}
@Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes")
public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception {
log.info("--- Starting ReplicatorTest::testNamespaceMigrationWithReplicationBacklog ---");
// topic for namespace1 (to be migrated)
final String topicName = BrokerTestUtil
.newUniqueName("persistent://" + namespace + "/migrationTopic");
// topic for namespace2 (not to be migrated)
final String topicName2 = BrokerTestUtil
.newUniqueName("persistent://" + namespaceNotToMigrate + "/migrationTopic");
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
@Cleanup
PulsarClient client3 = PulsarClient.builder().serviceUrl(url3.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
// blue cluster - namespace1 - producer/consumer
Producer<byte[]> blueProducerNs1_1 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("blue-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> blueConsumerNs1_1 = client1.newConsumer().topic(topicName).subscriptionType(subType)
.subscriptionName("s1").subscribe();
// blue cluster - namespace2 - producer/consumer
Producer<byte[]> blueProducerNs2_1 = client1.newProducer().topic(topicName2).enableBatching(false)
.producerName("blue-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> blueConsumerNs2_1 = client1.newConsumer().topic(topicName2).subscriptionType(subType)
.subscriptionName("s1").subscribe();
// blue cluster replication consumer namespace1
Consumer<byte[]> blueConsumerReplicationNs1 = client3.newConsumer().topic(topicName).subscriptionType(subType)
.subscriptionName("s1").subscribe();
// blue cluster replication consumer namespace2
Consumer<byte[]> blueConsumerReplicationNs2 = client3.newConsumer().topic(topicName2).subscriptionType(subType)
.subscriptionName("s1").subscribe();
AbstractTopic blueTopicNs1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
retryStrategically((test) -> !blueTopicNs1.getProducers().isEmpty(), 5, 500);
retryStrategically((test) -> !blueTopicNs1.getSubscriptions().isEmpty(), 5, 500);
assertFalse(blueTopicNs1.getProducers().isEmpty());
assertFalse(blueTopicNs1.getSubscriptions().isEmpty());
AbstractTopic blueTopicNs2 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName2, false).getNow(null).get();
retryStrategically((test) -> !blueTopicNs2.getProducers().isEmpty(), 5, 500);
retryStrategically((test) -> !blueTopicNs2.getSubscriptions().isEmpty(), 5, 500);
assertFalse(blueTopicNs2.getProducers().isEmpty());
assertFalse(blueTopicNs2.getSubscriptions().isEmpty());
// build backlog
blueConsumerNs1_1.close();
blueConsumerNs2_1.close();
retryStrategically((test) -> blueTopicNs1.getReplicators().size() == 1, 10, 3000);
assertEquals(blueTopicNs1.getReplicators().size(), 1);
retryStrategically((test) -> blueTopicNs2.getReplicators().size() == 1, 10, 3000);
assertEquals(blueTopicNs2.getReplicators().size(), 1);
// stop service in the replication cluster to build replication backlog
broker3.stop();
retryStrategically((test) -> broker3.getPulsarService() == null, 10, 1000);
assertNull(pulsar3.getBrokerService());
//publish messages into topic in blue cluster
int n = 5;
for (int i = 0; i < n; i++) {
blueProducerNs1_1.send("test1".getBytes());
blueProducerNs2_1.send("test1".getBytes());
}
retryStrategically((test) -> blueTopicNs1.isReplicationBacklogExist(), 10, 1000);
assertTrue(blueTopicNs1.isReplicationBacklogExist());
retryStrategically((test) -> blueTopicNs2.isReplicationBacklogExist(), 10, 1000);
assertTrue(blueTopicNs2.isReplicationBacklogExist());
@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
// green cluster - namespace1 - producer/consumer
Producer<byte[]> greenProducerNs1_1 = client2.newProducer().topic(topicName).enableBatching(false)
.producerName("green-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic greenTopicNs1 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get();
Producer<byte[]> greenProducerNs2_1 = client2.newProducer().topic(topicName2).enableBatching(false)
.producerName("green-producer-ns2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic greenTopicNs2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName2, false).getNow(null).get();
log.info("name of topic 2 - {}", greenTopicNs1.getName());
assertFalse(greenTopicNs1.getProducers().isEmpty());
retryStrategically((test) -> greenTopicNs1.getReplicators().size() == 1, 10, 2000);
log.info("replicators should be ready");
ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", isClusterMigrate, migratedUrl);
admin1.namespaces().updateMigrationState(namespace, isNamespaceMigrate);
assertEquals(admin1.namespaces().getPolicies(namespace).migrated, isNamespaceMigrate);
log.info("update cluster migration called");
retryStrategically((test) -> {
try {
blueTopicNs1.checkClusterMigration().get();
if (isClusterMigrate) {
blueTopicNs2.checkClusterMigration().get();
}
return true;
} catch (Exception e) {
// ok
}
return false;
}, 10, 500);
blueTopicNs1.checkClusterMigration().get();
if (isClusterMigrate) {
blueTopicNs2.checkClusterMigration().get();
}
blueProducerNs1_1.sendAsync("test1".getBytes());
blueProducerNs2_1.sendAsync("test1".getBytes());
// producer is disconnected from blue
retryStrategically((test) -> blueTopicNs1.getProducers().isEmpty(), 10, 500);
assertTrue(blueTopicNs1.getProducers().isEmpty());
if (isClusterMigrate) {
retryStrategically((test) -> blueTopicNs2.getProducers().isEmpty(), 10, 500);
assertTrue(blueTopicNs2.getProducers().isEmpty());
} else {
retryStrategically((test) -> !blueTopicNs2.getProducers().isEmpty(), 10, 500);
assertFalse(blueTopicNs2.getProducers().isEmpty());
}
// verify that the disconnected producer is not redirected
// to replication cluster since there is replication backlog.
assertEquals(greenTopicNs1.getProducers().size(), 1);
// Restart the service in cluster "r3".
broker3.restart();
retryStrategically((test) -> broker3.getPulsarService() != null, 10, 1000);
assertNotNull(broker3.getPulsarService());
pulsar3 = broker3.getPulsarService();
// verify that the replication backlog drains once service in cluster "r3" is restarted.
retryStrategically((test) -> !blueTopicNs1.isReplicationBacklogExist(), 10, 1000);
assertFalse(blueTopicNs1.isReplicationBacklogExist());
retryStrategically((test) -> !blueTopicNs2.isReplicationBacklogExist(), 10, 1000);
assertFalse(blueTopicNs2.isReplicationBacklogExist());
blueTopicNs1.checkClusterMigration().get();
blueTopicNs2.checkClusterMigration().get();
// verify that the producer1 is now is now connected to migrated cluster green since backlog is cleared.
retryStrategically((test) -> greenTopicNs1.getProducers().size()==2, 10, 500);
assertEquals(greenTopicNs1.getProducers().size(), 2);
if (isClusterMigrate) {
retryStrategically((test) -> greenTopicNs2.getProducers().size()==2, 10, 500);
assertEquals(greenTopicNs2.getProducers().size(), 2);
} else {
retryStrategically((test) -> greenTopicNs2.getProducers().size()==1, 10, 500);
assertEquals(greenTopicNs2.getProducers().size(), 1);
}
// clean up
blueProducerNs1_1.close();
blueProducerNs2_1.close();
blueConsumerNs1_1.close();
blueConsumerNs2_1.close();
greenProducerNs1_1.close();
greenProducerNs2_1.close();
client1.close();
client2.close();
client3.close();
}
static class TestBroker extends MockedPulsarServiceBaseTest {
private String clusterName;
public TestBroker(String clusterName) throws Exception {
this.clusterName = clusterName;
setup();
}
@Override
protected void setup() throws Exception {
super.setupWithClusterName(clusterName);
}
@Override
protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setWebServicePortTls(Optional.of(0));
this.conf.setBrokerServicePortTls(Optional.of(0));
}
public PulsarService getPulsarService() {
return pulsar;
}
public String getClusterName() {
return configClusterName;
}
public void stop() throws Exception {
stopBroker();
}
@Override
protected void cleanup() throws Exception {
internalCleanup();
}
public void restart() throws Exception {
restartBroker();
}
}
}