| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.client.api; |
| |
| import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; |
| import static org.mockito.Mockito.spy; |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertTrue; |
| import static org.testng.Assert.fail; |
| |
| import com.google.common.base.Function; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import java.lang.reflect.Method; |
| import java.net.URL; |
| import java.util.Collections; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicLong; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.pulsar.broker.PulsarService; |
| import org.apache.pulsar.broker.ServiceConfiguration; |
| import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; |
| import org.apache.pulsar.client.admin.BrokerStats; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.client.impl.BatchMessageIdImpl; |
| import org.apache.pulsar.client.impl.MessageIdImpl; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.RetentionPolicies; |
| import org.apache.pulsar.common.policies.data.TenantInfo; |
| import org.apache.pulsar.common.policies.data.TopicStats; |
| import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| @Slf4j |
| @Test(groups = "quarantine") |
| public class ClientDeduplicationFailureTest { |
| LocalBookkeeperEnsemble bkEnsemble; |
| |
| ServiceConfiguration config; |
| URL url; |
| PulsarService pulsar; |
| PulsarAdmin admin; |
| PulsarClient pulsarClient; |
| BrokerStats brokerStatsClient; |
| final String tenant = "external-repl-prop"; |
| String primaryHost; |
| |
| @BeforeMethod(timeOut = 300000, alwaysRun = true) |
| void setup(Method method) throws Exception { |
| log.info("--- Setting up method {} ---", method.getName()); |
| |
| // Start local bookkeeper ensemble |
| bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); |
| bkEnsemble.start(); |
| |
| config = spy(ServiceConfiguration.class); |
| config.setClusterName("use"); |
| config.setWebServicePort(Optional.of(0)); |
| config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); |
| config.setBrokerShutdownTimeoutMs(0L); |
| config.setBrokerServicePort(Optional.of(0)); |
| config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); |
| config.setTlsAllowInsecureConnection(true); |
| config.setAdvertisedAddress("localhost"); |
| config.setLoadBalancerSheddingEnabled(false); |
| config.setLoadBalancerAutoBundleSplitEnabled(false); |
| config.setLoadBalancerEnabled(false); |
| config.setLoadBalancerAutoUnloadSplitBundlesEnabled(false); |
| |
| config.setAllowAutoTopicCreationType("non-partitioned"); |
| |
| |
| pulsar = new PulsarService(config); |
| pulsar.start(); |
| |
| String brokerServiceUrl = pulsar.getWebServiceAddress(); |
| url = new URL(brokerServiceUrl); |
| |
| admin = PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).build(); |
| |
| brokerStatsClient = admin.brokerStats(); |
| primaryHost = pulsar.getWebServiceAddress(); |
| |
| // update cluster metadata |
| ClusterData clusterData = ClusterData.builder().serviceUrl(url.toString()).build(); |
| admin.clusters().createCluster(config.getClusterName(), clusterData); |
| |
| if (pulsarClient != null) { |
| pulsarClient.shutdown(); |
| } |
| ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).maxBackoffInterval(1, TimeUnit.SECONDS); |
| pulsarClient = clientBuilder.build(); |
| |
| TenantInfo tenantInfo = TenantInfo.builder() |
| .allowedClusters(Collections.singleton("use")) |
| .build(); |
| admin.tenants().createTenant(tenant, tenantInfo); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| void shutdown() throws Exception { |
| log.info("--- Shutting down ---"); |
| pulsarClient.close(); |
| admin.close(); |
| pulsar.close(); |
| bkEnsemble.stop(); |
| } |
| |
| private static class ProducerThread implements Runnable { |
| |
| private volatile boolean isRunning = false; |
| private Thread thread; |
| private Producer<String> producer; |
| private long i = 1; |
| private final AtomicLong atomicLong = new AtomicLong(0); |
| private CompletableFuture<MessageId> lastMessageFuture; |
| |
| public ProducerThread(Producer<String> producer) { |
| this.thread = new Thread(this); |
| this.producer = producer; |
| } |
| |
| @Override |
| public void run() { |
| while(isRunning) { |
| lastMessageFuture = producer.newMessage().sequenceId(i).value("foo-" + i).sendAsync(); |
| lastMessageFuture.thenAccept(messageId -> { |
| atomicLong.incrementAndGet(); |
| |
| }).exceptionally(ex -> { |
| log.info("publish exception:", ex); |
| return null; |
| }); |
| i++; |
| } |
| log.info("done Producing! Last send: {}", i); |
| } |
| |
| public void start() { |
| this.isRunning = true; |
| this.thread.start(); |
| } |
| |
| public void stop() { |
| this.isRunning = false; |
| try { |
| log.info("Waiting for last message to complete"); |
| try { |
| this.lastMessageFuture.get(60, TimeUnit.SECONDS); |
| } catch (TimeoutException e) { |
| throw new RuntimeException("Last message hasn't completed within timeout!"); |
| } |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| log.info("Producer Thread stopped!"); |
| } |
| |
| public long getLastSeqId() { |
| return this.atomicLong.get(); |
| } |
| } |
| |
| @Test(timeOut = 300000, groups = "quarantine") |
| public void testClientDeduplicationCorrectnessWithFailure() throws Exception { |
| final String namespacePortion = "dedup"; |
| final String replNamespace = tenant + "/" + namespacePortion; |
| final String sourceTopic = "persistent://" + replNamespace + "/my-topic1"; |
| admin.namespaces().createNamespace(replNamespace); |
| Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); |
| admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); |
| admin.namespaces().setDeduplicationStatus(replNamespace, true); |
| admin.namespaces().setRetention(replNamespace, new RetentionPolicies(-1, -1)); |
| Producer<String> producer = pulsarClient.newProducer(Schema.STRING) |
| .blockIfQueueFull(true).sendTimeout(0, TimeUnit.SECONDS) |
| .topic(sourceTopic) |
| .producerName("test-producer-1") |
| .create(); |
| |
| |
| ProducerThread producerThread = new ProducerThread(producer); |
| producerThread.start(); |
| |
| retryStrategically((test) -> { |
| try { |
| TopicStats topicStats = admin.topics().getStats(sourceTopic); |
| return topicStats.getPublishers().size() == 1 && topicStats.getPublishers().get(0).getProducerName().equals("test-producer-1") && topicStats.getStorageSize() > 0; |
| } catch (PulsarAdminException e) { |
| return false; |
| } |
| }, 5, 200); |
| |
| TopicStats topicStats = admin.topics().getStats(sourceTopic); |
| assertEquals(topicStats.getPublishers().size(), 1); |
| assertEquals(topicStats.getPublishers().get(0).getProducerName(), "test-producer-1"); |
| assertTrue(topicStats.getStorageSize() > 0); |
| |
| for (int i = 0; i < 5; i++) { |
| log.info("Stopping BK..."); |
| bkEnsemble.stopBK(); |
| |
| Thread.sleep(1000 + new Random().nextInt(500)); |
| |
| log.info("Starting BK..."); |
| bkEnsemble.startBK(); |
| } |
| |
| producerThread.stop(); |
| |
| // send last message |
| producer.newMessage().sequenceId(producerThread.getLastSeqId() + 1).value("end").send(); |
| producer.close(); |
| |
| Reader<String> reader = pulsarClient.newReader(Schema.STRING).startMessageId(MessageId.earliest) |
| .topic(sourceTopic).create(); |
| Message<String> prevMessage = null; |
| Message<String> message = null; |
| int count = 0; |
| while(true) { |
| message = reader.readNext(5, TimeUnit.SECONDS); |
| if (message == null) { |
| break; |
| } |
| |
| if (message.getValue().equals("end")) { |
| log.info("Last seq Id received: {}", prevMessage.getSequenceId()); |
| break; |
| } |
| if (prevMessage == null) { |
| assertEquals(message.getSequenceId(), 1); |
| } else { |
| assertEquals(message.getSequenceId(), prevMessage.getSequenceId() + 1); |
| } |
| prevMessage = message; |
| count++; |
| } |
| |
| log.info("# of messages read: {}", count); |
| |
| assertNotNull(prevMessage); |
| assertEquals(prevMessage.getSequenceId(), producerThread.getLastSeqId()); |
| } |
| |
| @Test(timeOut = 300000) |
| public void testClientDeduplicationWithBkFailure() throws Exception { |
| final String namespacePortion = "dedup"; |
| final String replNamespace = tenant + "/" + namespacePortion; |
| final String sourceTopic = "persistent://" + replNamespace + "/my-topic1"; |
| final String subscriptionName1 = "sub1"; |
| final String subscriptionName2 = "sub2"; |
| final String consumerName1 = "test-consumer-1"; |
| final String consumerName2 = "test-consumer-2"; |
| final List<Message<String>> msgRecvd = new LinkedList<>(); |
| admin.namespaces().createNamespace(replNamespace); |
| Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); |
| admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); |
| admin.namespaces().setDeduplicationStatus(replNamespace, true); |
| Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic) |
| .producerName("test-producer-1").create(); |
| Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(sourceTopic) |
| .consumerName(consumerName1).subscriptionName(subscriptionName1).subscribe(); |
| Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(sourceTopic) |
| .consumerName(consumerName2).subscriptionName(subscriptionName2).subscribe(); |
| |
| Thread thread = new Thread(() -> { |
| while(true) { |
| try { |
| Message<String> msg = consumer2.receive(); |
| msgRecvd.add(msg); |
| consumer2.acknowledge(msg); |
| } catch (PulsarClientException e) { |
| log.error("Failed to consume message: {}", e, e); |
| break; |
| } |
| } |
| }); |
| thread.start(); |
| |
| retryStrategically((test) -> { |
| try { |
| TopicStats topicStats = admin.topics().getStats(sourceTopic); |
| boolean c1 = topicStats!= null |
| && topicStats.getSubscriptions().get(subscriptionName1) != null |
| && topicStats.getSubscriptions().get(subscriptionName1).getConsumers().size() == 1 |
| && topicStats.getSubscriptions().get(subscriptionName1).getConsumers().get(0).getConsumerName().equals(consumerName1); |
| |
| boolean c2 = topicStats!= null |
| && topicStats.getSubscriptions().get(subscriptionName2) != null |
| && topicStats.getSubscriptions().get(subscriptionName2).getConsumers().size() == 1 |
| && topicStats.getSubscriptions().get(subscriptionName2).getConsumers().get(0).getConsumerName().equals(consumerName2); |
| return c1 && c2; |
| } catch (PulsarAdminException e) { |
| return false; |
| } |
| }, 5, 200); |
| |
| TopicStats topicStats1 = admin.topics().getStats(sourceTopic); |
| assertNotNull(topicStats1); |
| assertNotNull(topicStats1.getSubscriptions().get(subscriptionName1)); |
| assertEquals(topicStats1.getSubscriptions().get(subscriptionName1).getConsumers().size(), 1); |
| assertEquals(topicStats1.getSubscriptions().get(subscriptionName1).getConsumers().get(0).getConsumerName(), consumerName1); |
| TopicStats topicStats2 = admin.topics().getStats(sourceTopic); |
| assertNotNull(topicStats2); |
| assertNotNull(topicStats2.getSubscriptions().get(subscriptionName2)); |
| assertEquals(topicStats2.getSubscriptions().get(subscriptionName2).getConsumers().size(), 1); |
| assertEquals(topicStats2.getSubscriptions().get(subscriptionName2).getConsumers().get(0).getConsumerName(), consumerName2); |
| |
| for (int i=0; i<10; i++) { |
| producer.newMessage().sequenceId(i).value("foo-" + i).send(); |
| } |
| |
| for (int i=0; i<10; i++) { |
| Message<String> msg = consumer1.receive(); |
| consumer1.acknowledge(msg); |
| assertEquals(msg.getValue(), "foo-" + i); |
| assertEquals(msg.getSequenceId(), i); |
| } |
| |
| log.info("Stopping BK..."); |
| bkEnsemble.stopBK(); |
| |
| List<CompletableFuture<MessageId>> futures = new LinkedList<>(); |
| for (int i=10; i<20; i++) { |
| CompletableFuture<MessageId> future = producer.newMessage().sequenceId(i).value("foo-" + i).sendAsync(); |
| int finalI = i; |
| future.thenRun(() -> log.error("message: {} successful", finalI)).exceptionally((Function<Throwable, Void>) throwable -> { |
| log.info("message: {} failed: {}", finalI, throwable, throwable); |
| return null; |
| }); |
| futures.add(future); |
| } |
| |
| for (int i = 0; i < futures.size(); i++) { |
| try { |
| // message should not be produced successfully |
| futures.get(i).join(); |
| fail(); |
| } catch (CompletionException ex) { |
| |
| } catch (Exception e) { |
| fail(); |
| } |
| } |
| |
| try { |
| producer.newMessage().sequenceId(10).value("foo-10").send(); |
| fail(); |
| } catch (PulsarClientException ex) { |
| |
| } |
| |
| try { |
| producer.newMessage().sequenceId(10).value("foo-10").send(); |
| fail(); |
| } catch (PulsarClientException ex) { |
| |
| } |
| |
| log.info("Starting BK..."); |
| bkEnsemble.startBK(); |
| |
| for (int i=20; i<30; i++) { |
| producer.newMessage().sequenceId(i).value("foo-" + i).send(); |
| } |
| |
| MessageId lastMessageId = null; |
| for (int i=20; i<30; i++) { |
| Message<String> msg = consumer1.receive(); |
| lastMessageId = msg.getMessageId(); |
| consumer1.acknowledge(msg); |
| assertEquals(msg.getValue(), "foo-" + i); |
| assertEquals(msg.getSequenceId(), i); |
| } |
| |
| // check all messages |
| retryStrategically((test) -> msgRecvd.size() >= 20, 5, 200); |
| |
| assertEquals(msgRecvd.size(), 20); |
| for (int i = 0; i < 10; i++) { |
| assertEquals(msgRecvd.get(i).getValue(), "foo-" + i); |
| assertEquals(msgRecvd.get(i).getSequenceId(), i); |
| } |
| for (int i = 10; i <20; i++) { |
| assertEquals(msgRecvd.get(i).getValue(), "foo-" + (i + 10)); |
| assertEquals(msgRecvd.get(i).getSequenceId(), i + 10); |
| } |
| |
| BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId; |
| MessageIdImpl messageId = (MessageIdImpl) consumer1.getLastMessageId(); |
| |
| assertEquals(messageId.getLedgerId(), batchMessageId.getLedgerId()); |
| assertEquals(messageId.getEntryId(), batchMessageId.getEntryId()); |
| thread.interrupt(); |
| } |
| } |