| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service; |
| |
| import static org.mockito.ArgumentMatchers.eq; |
| import static org.mockito.Mockito.spy; |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertNull; |
| import static org.testng.Assert.assertTrue; |
| import static org.testng.Assert.fail; |
| |
| import com.google.common.collect.Sets; |
| import com.scurrilous.circe.checksum.Crc32cIntChecksum; |
| |
| import io.netty.buffer.ByteBuf; |
| |
| import java.lang.reflect.Field; |
| import java.lang.reflect.Method; |
| import java.util.List; |
| import java.util.SortedSet; |
| import java.util.TreeSet; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| |
| import lombok.Cleanup; |
| |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; |
| import org.apache.bookkeeper.mledger.Entry; |
| import org.apache.bookkeeper.mledger.ManagedCursor; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; |
| import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; |
| import org.apache.pulsar.broker.BrokerTestUtil; |
| import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; |
| import org.apache.pulsar.broker.service.persistent.PersistentReplicator; |
| import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
| import org.apache.pulsar.client.api.Consumer; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.MessageRoutingMode; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.PulsarClient; |
| import org.apache.pulsar.client.api.RawMessage; |
| import org.apache.pulsar.client.api.RawReader; |
| import org.apache.pulsar.client.api.Schema; |
| import org.apache.pulsar.client.api.TypedMessageBuilder; |
| import org.apache.pulsar.client.impl.ProducerImpl; |
| import org.apache.pulsar.client.impl.PulsarClientImpl; |
| import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.BacklogQuota; |
| import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.ReplicatorStats; |
| import org.apache.pulsar.common.protocol.Commands; |
| import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; |
| import org.awaitility.Awaitility; |
| import org.mockito.Mockito; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.DataProvider; |
| import org.testng.annotations.Test; |
| import org.testng.collections.Lists; |
| |
| /** |
| * Starts 3 brokers that are in 3 different clusters |
| */ |
| @Test(groups = "flaky") |
| public class ReplicatorTest extends ReplicatorTestBase { |
| |
| protected String methodName; |
| |
| @BeforeMethod |
| public void beforeMethod(Method m) throws Exception { |
| methodName = m.getName(); |
| admin1.namespaces().removeBacklogQuota("pulsar/ns"); |
| admin1.namespaces().removeBacklogQuota("pulsar/ns1"); |
| admin1.namespaces().removeBacklogQuota("pulsar/global/ns"); |
| } |
| |
| @Override |
| @BeforeClass(timeOut = 300000) |
| public void setup() throws Exception { |
| super.setup(); |
| } |
| |
| @Override |
| @AfterClass(alwaysRun = true, timeOut = 300000) |
| public void cleanup() throws Exception { |
| super.cleanup(); |
| } |
| |
| @DataProvider(name = "partitionedTopic") |
| public Object[][] partitionedTopicProvider() { |
| return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; |
| } |
| |
| @Test |
| public void testConfigChange() throws Exception { |
| log.info("--- Starting ReplicatorTest::testConfigChange ---"); |
| // This test is to verify that the config change on global namespace is successfully applied in broker during |
| // runtime. |
| // Run a set of producer tasks to create the topics |
| List<Future<Void>> results = Lists.newArrayList(); |
| for (int i = 0; i < 10; i++) { |
| final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i)); |
| |
| results.add(executor.submit(new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| |
| @Cleanup |
| MessageProducer producer = new MessageProducer(url1, dest); |
| log.info("--- Starting producer --- " + url1); |
| |
| @Cleanup |
| MessageConsumer consumer = new MessageConsumer(url1, dest); |
| log.info("--- Starting Consumer --- " + url1); |
| |
| producer.produce(2); |
| consumer.receive(2); |
| return null; |
| } |
| })); |
| } |
| |
| for (Future<Void> result : results) { |
| try { |
| result.get(); |
| } catch (Exception e) { |
| log.error("exception in getting future result ", e); |
| fail(String.format("replication test failed with %s exception", e.getMessage())); |
| } |
| } |
| |
| Thread.sleep(1000L); |
| // Make sure that the internal replicators map contains remote cluster info |
| ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients(); |
| ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients(); |
| ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients(); |
| |
| Assert.assertNotNull(replicationClients1.get("r2")); |
| Assert.assertNotNull(replicationClients1.get("r3")); |
| Assert.assertNotNull(replicationClients2.get("r1")); |
| Assert.assertNotNull(replicationClients2.get("r3")); |
| Assert.assertNotNull(replicationClients3.get("r1")); |
| Assert.assertNotNull(replicationClients3.get("r2")); |
| |
| // Case 1: Update the global namespace replication configuration to only contains the local cluster itself |
| admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1")); |
| |
| // Wait for config changes to be updated. |
| Thread.sleep(1000L); |
| |
| // Make sure that the internal replicators map still contains remote cluster info |
| Assert.assertNotNull(replicationClients1.get("r2")); |
| Assert.assertNotNull(replicationClients1.get("r3")); |
| Assert.assertNotNull(replicationClients2.get("r1")); |
| Assert.assertNotNull(replicationClients2.get("r3")); |
| Assert.assertNotNull(replicationClients3.get("r1")); |
| Assert.assertNotNull(replicationClients3.get("r2")); |
| |
| // Case 2: Update the configuration back |
| admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3")); |
| |
| // Wait for config changes to be updated. |
| Thread.sleep(1000L); |
| |
| // Make sure that the internal replicators map still contains remote cluster info |
| Assert.assertNotNull(replicationClients1.get("r2")); |
| Assert.assertNotNull(replicationClients1.get("r3")); |
| Assert.assertNotNull(replicationClients2.get("r1")); |
| Assert.assertNotNull(replicationClients2.get("r3")); |
| Assert.assertNotNull(replicationClients3.get("r1")); |
| Assert.assertNotNull(replicationClients3.get("r2")); |
| |
| // Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters |
| } |
| |
| @Test(timeOut = 10000) |
| public void activeBrokerParse() throws Exception { |
| pulsar1.getConfiguration().setAuthorizationEnabled(true); |
| //init clusterData |
| ClusterData cluster2Data = new ClusterData(); |
| String cluster2ServiceUrls = String.format("%s,localhost:1234,localhost:5678", pulsar2.getWebServiceAddress()); |
| cluster2Data.setServiceUrl(cluster2ServiceUrls); |
| String cluster2 = "activeCLuster2"; |
| admin2.clusters().createCluster(cluster2, cluster2Data); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() |
| -> admin2.clusters().getCluster(cluster2) != null); |
| |
| List<String> list = admin1.brokers().getActiveBrokers(cluster2); |
| assertEquals(list.get(0), url2.toString().replace("http://", "")); |
| //restore configuration |
| pulsar1.getConfiguration().setAuthorizationEnabled(false); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Test(timeOut = 30000) |
| public void testConcurrentReplicator() throws Exception { |
| |
| log.info("--- Starting ReplicatorTest::testConcurrentReplicator ---"); |
| |
| final String namespace = BrokerTestUtil.newUniqueName("pulsar/concurrent"); |
| admin1.namespaces().createNamespace(namespace); |
| admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); |
| final TopicName topicName = TopicName |
| .get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic")); |
| |
| @Cleanup |
| PulsarClient client1 = PulsarClient.builder() |
| .serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| Producer<byte[]> producer = client1.newProducer().topic(topicName.toString()) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| producer.close(); |
| |
| PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService() |
| .getOrCreateTopic(topicName.toString()).get(); |
| |
| PulsarClientImpl pulsarClient = spy((PulsarClientImpl) pulsar1.getBrokerService() |
| .getReplicationClient("r3")); |
| final Method startRepl = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class); |
| startRepl.setAccessible(true); |
| |
| Field replClientField = BrokerService.class.getDeclaredField("replicationClients"); |
| replClientField.setAccessible(true); |
| ConcurrentOpenHashMap<String, PulsarClient> replicationClients = |
| (ConcurrentOpenHashMap<String, PulsarClient>) replClientField |
| .get(pulsar1.getBrokerService()); |
| replicationClients.put("r3", pulsarClient); |
| |
| admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); |
| ExecutorService executor = Executors.newFixedThreadPool(5); |
| for (int i = 0; i < 5; i++) { |
| executor.submit(() -> { |
| try { |
| startRepl.invoke(topic, "r3"); |
| } catch (Exception e) { |
| fail("setting replicator failed", e); |
| } |
| }); |
| } |
| Thread.sleep(3000); |
| |
| Mockito.verify(pulsarClient, Mockito.times(1)) |
| .createProducerAsync( |
| Mockito.any(ProducerConfigurationData.class), |
| Mockito.any(Schema.class), eq(null)); |
| |
| executor.shutdown(); |
| } |
| |
| @DataProvider(name = "namespace") |
| public Object[][] namespaceNameProvider() { |
| return new Object[][] { { "pulsar/ns" }, { "pulsar/global/ns" } }; |
| } |
| |
| @Test(dataProvider = "namespace") |
| public void testReplication(String namespace) throws Exception { |
| log.info("--- Starting ReplicatorTest::testReplication ---"); |
| |
| // This test is to verify that the config change on global namespace is successfully applied in broker during |
| // runtime. |
| // Run a set of producer tasks to create the topics |
| final TopicName dest = TopicName |
| .get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/repltopic")); |
| |
| @Cleanup |
| MessageProducer producer1 = new MessageProducer(url1, dest); |
| log.info("--- Starting producer --- " + url1); |
| |
| @Cleanup |
| MessageProducer producer2 = new MessageProducer(url2, dest); |
| log.info("--- Starting producer --- " + url2); |
| |
| @Cleanup |
| MessageProducer producer3 = new MessageProducer(url3, dest); |
| log.info("--- Starting producer --- " + url3); |
| |
| @Cleanup |
| MessageConsumer consumer1 = new MessageConsumer(url1, dest); |
| log.info("--- Starting Consumer --- " + url1); |
| |
| @Cleanup |
| MessageConsumer consumer2 = new MessageConsumer(url2, dest); |
| log.info("--- Starting Consumer --- " + url2); |
| |
| @Cleanup |
| MessageConsumer consumer3 = new MessageConsumer(url3, dest); |
| log.info("--- Starting Consumer --- " + url3); |
| |
| // Produce from cluster1 and consume from the rest |
| producer1.produce(2); |
| |
| consumer1.receive(2); |
| |
| consumer2.receive(2); |
| |
| consumer3.receive(2); |
| |
| // Produce from cluster2 and consume from the rest |
| producer2.produce(2); |
| |
| consumer1.receive(2); |
| |
| consumer2.receive(2); |
| |
| consumer3.receive(2); |
| |
| // Produce from cluster3 and consume from the rest |
| producer3.produce(2); |
| |
| consumer1.receive(2); |
| |
| consumer2.receive(2); |
| |
| consumer3.receive(2); |
| |
| // Produce from cluster1&2 and consume from cluster3 |
| producer1.produce(1); |
| producer2.produce(1); |
| |
| consumer1.receive(1); |
| |
| consumer2.receive(1); |
| |
| consumer3.receive(1); |
| |
| consumer1.receive(1); |
| |
| consumer2.receive(1); |
| |
| consumer3.receive(1); |
| } |
| |
| @Test |
| public void testReplicationOverrides() throws Exception { |
| log.info("--- Starting ReplicatorTest::testReplicationOverrides ---"); |
| |
| // This test is to verify that the config change on global namespace is successfully applied in broker during |
| // runtime. |
| // Run a set of producer tasks to create the topics |
| for (int i = 0; i < 10; i++) { |
| final TopicName dest = TopicName |
| .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopic")); |
| |
| @Cleanup |
| MessageProducer producer1 = new MessageProducer(url1, dest); |
| log.info("--- Starting producer --- " + url1); |
| |
| @Cleanup |
| MessageProducer producer2 = new MessageProducer(url2, dest); |
| log.info("--- Starting producer --- " + url2); |
| |
| @Cleanup |
| MessageProducer producer3 = new MessageProducer(url3, dest); |
| log.info("--- Starting producer --- " + url3); |
| |
| @Cleanup |
| MessageConsumer consumer1 = new MessageConsumer(url1, dest); |
| log.info("--- Starting Consumer --- " + url1); |
| |
| @Cleanup |
| MessageConsumer consumer2 = new MessageConsumer(url2, dest); |
| log.info("--- Starting Consumer --- " + url2); |
| |
| @Cleanup |
| MessageConsumer consumer3 = new MessageConsumer(url3, dest); |
| log.info("--- Starting Consumer --- " + url3); |
| |
| // Produce a message that isn't replicated |
| producer1.produce(1, producer1.newMessage().disableReplication()); |
| |
| consumer1.receive(1); |
| assertTrue(consumer2.drained()); |
| assertTrue(consumer3.drained()); |
| |
| // Produce a message not replicated to r2 |
| producer1.produce(1, producer1.newMessage().replicationClusters(Lists.newArrayList("r1", "r3"))); |
| consumer1.receive(1); |
| assertTrue(consumer2.drained()); |
| consumer3.receive(1); |
| |
| // Produce a default replicated message |
| producer1.produce(1); |
| |
| consumer1.receive(1); |
| consumer2.receive(1); |
| consumer3.receive(1); |
| |
| assertTrue(consumer1.drained()); |
| assertTrue(consumer2.drained()); |
| assertTrue(consumer3.drained()); |
| } |
| } |
| |
| @Test |
| public void testFailures() { |
| |
| log.info("--- Starting ReplicatorTest::testFailures ---"); |
| |
| try { |
| // 1. Create a consumer using the reserved consumer id prefix "pulsar.repl." |
| |
| final TopicName dest = TopicName |
| .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/res-cons-id-")); |
| |
| // Create another consumer using replication prefix as sub id |
| MessageConsumer consumer = new MessageConsumer(url2, dest, "pulsar.repl."); |
| consumer.close(); |
| |
| } catch (Exception e) { |
| // SUCCESS |
| } |
| |
| } |
| |
| @Test(timeOut = 30000) |
| public void testReplicatePeekAndSkip() throws Exception { |
| |
| final TopicName dest = TopicName.get( |
| BrokerTestUtil.newUniqueName("persistent://pulsar/ns/peekAndSeekTopic")); |
| |
| @Cleanup |
| MessageProducer producer1 = new MessageProducer(url1, dest); |
| |
| @Cleanup |
| MessageConsumer consumer1 = new MessageConsumer(url3, dest); |
| |
| // Produce from cluster1 and consume from the rest |
| producer1.produce(2); |
| PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); |
| PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators() |
| .get(topic.getReplicators().keys().get(0)); |
| replicator.skipMessages(2); |
| CompletableFuture<Entry> result = replicator.peekNthMessage(1); |
| Entry entry = result.get(50, TimeUnit.MILLISECONDS); |
| assertNull(entry); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testReplicatorClearBacklog() throws Exception { |
| |
| // This test is to verify that reset cursor fails on global topic |
| SortedSet<String> testDests = new TreeSet<>(); |
| |
| final TopicName dest = TopicName |
| .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic")); |
| testDests.add(dest.toString()); |
| |
| @Cleanup |
| MessageProducer producer1 = new MessageProducer(url1, dest); |
| |
| @Cleanup |
| MessageConsumer consumer1 = new MessageConsumer(url3, dest); |
| |
| // Produce from cluster1 and consume from the rest |
| producer1.produce(2); |
| PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); |
| PersistentReplicator replicator = (PersistentReplicator) spy( |
| topic.getReplicators().get(topic.getReplicators().keys().get(0))); |
| replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null); |
| replicator.clearBacklog().get(); |
| Thread.sleep(100); |
| replicator.updateRates(); // for code-coverage |
| replicator.expireMessages(1); // for code-coverage |
| ReplicatorStats status = replicator.getStats(); |
| assertEquals(status.replicationBacklog, 0); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testResetCursorNotFail() throws Exception { |
| |
| log.info("--- Starting ReplicatorTest::testResetCursorNotFail ---"); |
| |
| // This test is to verify that reset cursor fails on global topic |
| final TopicName dest = TopicName |
| .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetrepltopic")); |
| |
| @Cleanup |
| MessageProducer producer1 = new MessageProducer(url1, dest); |
| log.info("--- Starting producer --- " + url1); |
| |
| @Cleanup |
| MessageConsumer consumer1 = new MessageConsumer(url1, dest); |
| log.info("--- Starting Consumer --- " + url1); |
| |
| // Produce from cluster1 and consume from the rest |
| producer1.produce(2); |
| |
| consumer1.receive(2); |
| |
| admin1.topics().resetCursor(dest.toString(), "sub-id", System.currentTimeMillis()); |
| } |
| |
| @Test |
| public void testReplicationForBatchMessages() throws Exception { |
| log.info("--- Starting ReplicatorTest::testReplicationForBatchMessages ---"); |
| |
| // Run a set of producer tasks to create the topics |
| final TopicName dest = TopicName |
| .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch")); |
| |
| @Cleanup |
| MessageProducer producer1 = new MessageProducer(url1, dest, true); |
| log.info("--- Starting producer --- " + url1); |
| |
| @Cleanup |
| MessageProducer producer2 = new MessageProducer(url2, dest, true); |
| log.info("--- Starting producer --- " + url2); |
| |
| @Cleanup |
| MessageProducer producer3 = new MessageProducer(url3, dest, true); |
| log.info("--- Starting producer --- " + url3); |
| |
| @Cleanup |
| MessageConsumer consumer1 = new MessageConsumer(url1, dest); |
| log.info("--- Starting Consumer --- " + url1); |
| |
| @Cleanup |
| MessageConsumer consumer2 = new MessageConsumer(url2, dest); |
| log.info("--- Starting Consumer --- " + url2); |
| |
| @Cleanup |
| MessageConsumer consumer3 = new MessageConsumer(url3, dest); |
| log.info("--- Starting Consumer --- " + url3); |
| |
| // Produce from cluster1 and consume from the rest |
| producer1.produceBatch(10); |
| |
| consumer1.receive(10); |
| |
| consumer2.receive(10); |
| |
| consumer3.receive(10); |
| |
| // Produce from cluster2 and consume from the rest |
| producer2.produceBatch(10); |
| |
| consumer1.receive(10); |
| |
| consumer2.receive(10); |
| |
| consumer3.receive(10); |
| } |
| |
| /** |
| * It verifies that: if it fails while removing replicator-cluster-cursor: it should not restart the replicator and |
| * it should have cleaned up from the list |
| * |
| * @throws Exception |
| */ |
| @Test(timeOut = 30000) |
| public void testDeleteReplicatorFailure() throws Exception { |
| log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---"); |
| final String topicName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch"); |
| final TopicName dest = TopicName.get(topicName); |
| |
| @Cleanup |
| MessageProducer producer1 = new MessageProducer(url1, dest); |
| PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get(); |
| final String replicatorClusterName = topic.getReplicators().keys().get(0); |
| ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); |
| CountDownLatch latch = new CountDownLatch(1); |
| // delete cursor already : so next time if topic.removeReplicator will get exception but then it should |
| // remove-replicator from the list even with failure |
| ledger.asyncDeleteCursor("pulsar.repl." + replicatorClusterName, new DeleteCursorCallback() { |
| @Override |
| public void deleteCursorComplete(Object ctx) { |
| latch.countDown(); |
| } |
| |
| @Override |
| public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { |
| latch.countDown(); |
| } |
| }, null); |
| latch.await(); |
| |
| Method removeReplicator = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class); |
| removeReplicator.setAccessible(true); |
| // invoke removeReplicator : it fails as cursor is not present: but still it should remove the replicator from |
| // list without restarting it |
| @SuppressWarnings("unchecked") |
| CompletableFuture<Void> result = (CompletableFuture<Void>) removeReplicator.invoke(topic, |
| replicatorClusterName); |
| result.thenApply((v) -> { |
| assertNull(topic.getPersistentReplicator(replicatorClusterName)); |
| return null; |
| }); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Test(priority = 5, timeOut = 30000) |
| public void testReplicatorProducerClosing() throws Exception { |
| log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---"); |
| final String topicName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch"); |
| final TopicName dest = TopicName.get(topicName); |
| |
| @Cleanup |
| MessageProducer producer1 = new MessageProducer(url1, dest); |
| PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get(); |
| final String replicatorClusterName = topic.getReplicators().keys().get(0); |
| Replicator replicator = topic.getPersistentReplicator(replicatorClusterName); |
| pulsar2.close(); |
| pulsar2 = null; |
| pulsar3.close(); |
| pulsar3 = null; |
| replicator.disconnect(false); |
| Thread.sleep(100); |
| Field field = AbstractReplicator.class.getDeclaredField("producer"); |
| field.setAccessible(true); |
| ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) field.get(replicator); |
| assertNull(producer); |
| } |
| |
| /** |
| * Issue #199 |
| * |
| * It verifies that: if the remote cluster reaches backlog quota limit, replicator temporarily stops and once the |
| * backlog drains it should resume replication. |
| * |
| * @throws Exception |
| */ |
| |
| @Test(timeOut = 60000, priority = -1) |
| public void testResumptionAfterBacklogRelaxed() throws Exception { |
| List<RetentionPolicy> policies = Lists.newArrayList(); |
| policies.add(RetentionPolicy.producer_exception); |
| policies.add(RetentionPolicy.producer_request_hold); |
| |
| for (RetentionPolicy policy : policies) { |
| // Use 1Mb quota by default |
| admin1.namespaces().setBacklogQuota("pulsar/ns1", new BacklogQuota(1 * 1024 * 1024, policy)); |
| Thread.sleep(200); |
| |
| TopicName dest = TopicName |
| .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/%s-" + policy)); |
| |
| // Producer on r1 |
| @Cleanup |
| MessageProducer producer1 = new MessageProducer(url1, dest); |
| |
| // Consumer on r2 |
| @Cleanup |
| MessageConsumer consumer2 = new MessageConsumer(url2, dest); |
| |
| // Replicator for r1 -> r2 |
| PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); |
| Replicator replicator = topic.getPersistentReplicator("r2"); |
| |
| // Produce 1 message in r1. This message will be replicated immediately into r2 and it will become part of |
| // local backlog |
| producer1.produce(1); |
| |
| Thread.sleep(500); |
| |
| // Restrict backlog quota limit to 1 byte to stop replication |
| admin1.namespaces().setBacklogQuota("pulsar/ns1", new BacklogQuota(1, policy)); |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); |
| |
| assertEquals(replicator.getStats().replicationBacklog, 0); |
| |
| // Next message will not be replicated, because r2 has reached the quota |
| producer1.produce(1); |
| |
| Thread.sleep(500); |
| |
| assertEquals(replicator.getStats().replicationBacklog, 1); |
| |
| // Consumer will now drain 1 message and the replication backlog will be cleared |
| consumer2.receive(1); |
| |
| // Wait until the 2nd message got delivered to consumer |
| consumer2.receive(1); |
| |
| int retry = 10; |
| for (int i = 0; i < retry && replicator.getStats().replicationBacklog > 0; i++) { |
| if (i != retry - 1) { |
| Thread.sleep(100); |
| } |
| } |
| |
| assertEquals(replicator.getStats().replicationBacklog, 0); |
| } |
| } |
| |
| /** |
| * It verifies that PersistentReplicator considers CursorAlreadyClosedException as non-retriable-read exception and |
| * it should closed the producer as cursor is already closed because replicator is already deleted. |
| * |
| * @throws Exception |
| */ |
| @Test(timeOut = 15000) |
| public void testCloseReplicatorStartProducer() throws Exception { |
| TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/closeCursor")); |
| // Producer on r1 |
| @Cleanup |
| MessageProducer producer1 = new MessageProducer(url1, dest); |
| // Consumer on r1 |
| @Cleanup |
| MessageConsumer consumer1 = new MessageConsumer(url1, dest); |
| // Consumer on r2 |
| @Cleanup |
| MessageConsumer consumer2 = new MessageConsumer(url2, dest); |
| |
| // Replicator for r1 -> r2 |
| PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); |
| PersistentReplicator replicator = (PersistentReplicator) topic.getPersistentReplicator("r2"); |
| |
| // close the cursor |
| Field cursorField = PersistentReplicator.class.getDeclaredField("cursor"); |
| cursorField.setAccessible(true); |
| ManagedCursor cursor = (ManagedCursor) cursorField.get(replicator); |
| cursor.close(); |
| // try to read entries |
| producer1.produce(10); |
| |
| try { |
| cursor.readEntriesOrWait(10); |
| fail("It should have failed"); |
| } catch (Exception e) { |
| assertEquals(e.getClass(), CursorAlreadyClosedException.class); |
| } |
| |
| // replicator-readException: cursorAlreadyClosed |
| replicator.readEntriesFailed(new CursorAlreadyClosedException("Cursor already closed exception"), null); |
| |
| // wait replicator producer to be closed |
| Thread.sleep(100); |
| |
| // Replicator producer must be closed |
| Field producerField = AbstractReplicator.class.getDeclaredField("producer"); |
| producerField.setAccessible(true); |
| @SuppressWarnings("unchecked") |
| ProducerImpl<byte[]> replicatorProducer = (ProducerImpl<byte[]>) producerField.get(replicator); |
| assertNull(replicatorProducer); |
| } |
| |
| @Test(timeOut = 30000) |
| public void verifyChecksumAfterReplication() throws Exception { |
| final String topicName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/checksumAfterReplication"); |
| |
| PulsarClient c1 = PulsarClient.builder().serviceUrl(url1.toString()).build(); |
| Producer<byte[]> p1 = c1.newProducer().topic(topicName) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| |
| PulsarClient c2 = PulsarClient.builder().serviceUrl(url2.toString()).build(); |
| RawReader reader2 = RawReader.create(c2, topicName, "sub").get(); |
| |
| p1.send("Hello".getBytes()); |
| |
| RawMessage msg = reader2.readNextAsync().get(); |
| |
| ByteBuf b = msg.getHeadersAndPayload(); |
| |
| assertTrue(Commands.hasChecksum(b)); |
| int parsedChecksum = Commands.readChecksum(b); |
| int computedChecksum = Crc32cIntChecksum.computeChecksum(b); |
| |
| assertEquals(parsedChecksum, computedChecksum); |
| |
| p1.close(); |
| reader2.closeAsync().get(); |
| } |
| |
| /** |
| * It verifies that broker should not start replicator for partitioned-topic (topic without -partition postfix) |
| * |
| * @param isPartitionedTopic |
| * @throws Exception |
| */ |
| @Test(dataProvider = "partitionedTopic") |
| public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) throws Exception { |
| |
| log.info("--- Starting ReplicatorTest::{} --- ", methodName); |
| |
| final String namespace = BrokerTestUtil.newUniqueName("pulsar/partitionedNs-" + isPartitionedTopic); |
| final String persistentTopicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/partTopic-" + isPartitionedTopic); |
| final String nonPersistentTopicName = BrokerTestUtil.newUniqueName("non-persistent://" + namespace + "/partTopic-" + isPartitionedTopic); |
| BrokerService brokerService = pulsar1.getBrokerService(); |
| |
| admin1.namespaces().createNamespace(namespace); |
| admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); |
| |
| if (isPartitionedTopic) { |
| admin1.topics().createPartitionedTopic(persistentTopicName, 5); |
| admin1.topics().createPartitionedTopic(nonPersistentTopicName, 5); |
| } |
| |
| // load namespace with dummy topic on ns |
| PulsarClient client = PulsarClient.builder().serviceUrl(url1.toString()).build(); |
| client.newProducer().topic("persistent://" + namespace + "/dummyTopic") |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| |
| // persistent topic test |
| try { |
| brokerService.getOrCreateTopic(persistentTopicName).get(); |
| if (isPartitionedTopic) { |
| fail("Topic creation fails with partitioned topic as replicator init fails"); |
| } |
| } catch (Exception e) { |
| if (!isPartitionedTopic) { |
| fail("Topic creation should not fail without any partitioned topic"); |
| } |
| assertTrue(e.getCause() instanceof NamingException); |
| } |
| |
| // non-persistent topic test |
| try { |
| brokerService.getOrCreateTopic(nonPersistentTopicName).get(); |
| if (isPartitionedTopic) { |
| fail("Topic creation fails with partitioned topic as replicator init fails"); |
| } |
| } catch (Exception e) { |
| if (!isPartitionedTopic) { |
| fail("Topic creation should not fail without any partitioned topic"); |
| } |
| assertTrue(e.getCause() instanceof NamingException); |
| } |
| |
| } |
| |
| @Test |
| public void testReplicatedCluster() throws Exception { |
| |
| log.info("--- Starting ReplicatorTest::testReplicatedCluster ---"); |
| |
| final String namespace = "pulsar/global/repl"; |
| final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic1"); |
| admin1.namespaces().createNamespace(namespace); |
| admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); |
| admin1.topics().createPartitionedTopic(topicName, 4); |
| |
| PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| |
| Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create(); |
| org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe(); |
| org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName("s1").subscribe(); |
| byte[] value = "test".getBytes(); |
| |
| // publish message local only |
| TypedMessageBuilder<byte[]> msg = producer1.newMessage().replicationClusters(Lists.newArrayList("r1")).value(value); |
| msg.send(); |
| assertEquals(consumer1.receive().getValue(), value); |
| |
| Message<byte[]> msg2 = consumer2.receive(1, TimeUnit.SECONDS); |
| if (msg2 != null) { |
| fail("msg should have not been replicated to remote cluster"); |
| } |
| |
| consumer1.close(); |
| consumer2.close(); |
| producer1.close(); |
| |
| } |
| |
| /** |
| * This validates that broker supports update-partition api for global topics. |
| * <pre> |
| * 1. Create global topic with 4 partitions |
| * 2. Update partition with 8 partitions |
| * 3. Create producer on the partition topic which loads all new partitions |
| * 4. Check subscriptions are created on all new partitions. |
| * </pre> |
| * @throws Exception |
| */ |
| @Test |
| public void testUpdateGlobalTopicPartition() throws Exception { |
| log.info("--- Starting ReplicatorTest::testUpdateGlobalTopicPartition ---"); |
| |
| final String cluster1 = pulsar1.getConfig().getClusterName(); |
| final String cluster2 = pulsar2.getConfig().getClusterName(); |
| final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns"); |
| final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic1"); |
| int startPartitions = 4; |
| int newPartitions = 8; |
| final String subscriberName = "sub1"; |
| admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2)); |
| admin1.topics().createPartitionedTopic(topicName, startPartitions); |
| |
| PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| |
| Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subscriberName) |
| .subscribe(); |
| Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subscriberName) |
| .subscribe(); |
| |
| admin1.topics().updatePartitionedTopic(topicName, newPartitions); |
| |
| assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, newPartitions); |
| |
| // create producers to load all the partition topics |
| Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create(); |
| Producer<byte[]> producer2 = client2.newProducer().topic(topicName).create(); |
| |
| for (int i = startPartitions; i < newPartitions; i++) { |
| String partitionedTopic = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX + i; |
| assertEquals(admin1.topics().getSubscriptions(partitionedTopic).size(), 1); |
| assertEquals(admin2.topics().getSubscriptions(partitionedTopic).size(), 1); |
| } |
| |
| producer1.close(); |
| producer2.close(); |
| consumer1.close(); |
| consumer2.close(); |
| |
| client1.close(); |
| client2.close(); |
| } |
| |
| @DataProvider(name = "topicPrefix") |
| public static Object[][] topicPrefix() { |
| return new Object[][] { { "persistent://" , "/persistent" }, { "non-persistent://" , "/non-persistent" } }; |
| } |
| |
| @Test(dataProvider = "topicPrefix") |
| public void testTopicReplicatedAndProducerCreate(String topicPrefix, String topicName) throws Exception { |
| log.info("--- Starting ReplicatorTest::testTopicReplicatedAndProducerCreate ---"); |
| |
| final String cluster1 = pulsar1.getConfig().getClusterName(); |
| final String cluster2 = pulsar2.getConfig().getClusterName(); |
| final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns"); |
| final String partitionedTopicName = BrokerTestUtil.newUniqueName(topicPrefix + namespace + topicName + "-partitioned"); |
| final String nonPartitionedTopicName = BrokerTestUtil.newUniqueName(topicPrefix + namespace + topicName + "-non-partitioned"); |
| final int startPartitions = 4; |
| admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2)); |
| admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); |
| admin1.topics().createPartitionedTopic(partitionedTopicName, startPartitions); |
| admin1.topics().createNonPartitionedTopic(nonPartitionedTopicName); |
| |
| PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| |
| //persistent topic |
| Producer<byte[]> persistentProducer1 = client1.newProducer().topic(partitionedTopicName).create(); |
| Producer<byte[]> persistentProducer2 = client2.newProducer().topic(partitionedTopicName).create(); |
| assertNotNull(persistentProducer1.send("test".getBytes())); |
| assertNotNull(persistentProducer2.send("test".getBytes())); |
| //non-persistent topic |
| Producer<byte[]> nonPersistentProducer1 = client1.newProducer().topic(nonPartitionedTopicName).create(); |
| Producer<byte[]> nonPersistentProducer2 = client2.newProducer().topic(nonPartitionedTopicName).create(); |
| |
| assertNotNull(nonPersistentProducer1.send("test".getBytes())); |
| assertNotNull(nonPersistentProducer2.send("test".getBytes())); |
| |
| persistentProducer1.close(); |
| persistentProducer2.close(); |
| |
| nonPersistentProducer1.close(); |
| nonPersistentProducer2.close(); |
| } |
| |
| private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class); |
| |
| } |