blob: 2ddb9e8c8a35a5a89e0aadc7dfdefa6bbf5238a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "quarantine")
public class ClientDeduplicationFailureTest {
LocalBookkeeperEnsemble bkEnsemble;
ServiceConfiguration config;
URL url;
PulsarService pulsar;
PulsarAdmin admin;
PulsarClient pulsarClient;
BrokerStats brokerStatsClient;
final String tenant = "external-repl-prop";
String primaryHost;
@BeforeMethod(timeOut = 300000, alwaysRun = true)
void setup(Method method) throws Exception {
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble.start();
config = spy(ServiceConfiguration.class);
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config.setTlsAllowInsecureConnection(true);
config.setAdvertisedAddress("localhost");
config.setLoadBalancerSheddingEnabled(false);
config.setLoadBalancerAutoBundleSplitEnabled(false);
config.setLoadBalancerEnabled(false);
config.setLoadBalancerAutoUnloadSplitBundlesEnabled(false);
config.setAllowAutoTopicCreationType("non-partitioned");
pulsar = new PulsarService(config);
pulsar.start();
String brokerServiceUrl = pulsar.getWebServiceAddress();
url = new URL(brokerServiceUrl);
admin = PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).build();
brokerStatsClient = admin.brokerStats();
primaryHost = pulsar.getWebServiceAddress();
// update cluster metadata
ClusterData clusterData = ClusterData.builder().serviceUrl(url.toString()).build();
admin.clusters().createCluster(config.getClusterName(), clusterData);
if (pulsarClient != null) {
pulsarClient.shutdown();
}
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).maxBackoffInterval(1, TimeUnit.SECONDS);
pulsarClient = clientBuilder.build();
TenantInfo tenantInfo = TenantInfo.builder()
.allowedClusters(Collections.singleton("use"))
.build();
admin.tenants().createTenant(tenant, tenantInfo);
}
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
pulsarClient.close();
admin.close();
pulsar.close();
bkEnsemble.stop();
}
private static class ProducerThread implements Runnable {
private volatile boolean isRunning = false;
private Thread thread;
private Producer<String> producer;
private long i = 1;
private final AtomicLong atomicLong = new AtomicLong(0);
private CompletableFuture<MessageId> lastMessageFuture;
public ProducerThread(Producer<String> producer) {
this.thread = new Thread(this);
this.producer = producer;
}
@Override
public void run() {
while(isRunning) {
lastMessageFuture = producer.newMessage().sequenceId(i).value("foo-" + i).sendAsync();
lastMessageFuture.thenAccept(messageId -> {
atomicLong.incrementAndGet();
}).exceptionally(ex -> {
log.info("publish exception:", ex);
return null;
});
i++;
}
log.info("done Producing! Last send: {}", i);
}
public void start() {
this.isRunning = true;
this.thread.start();
}
public void stop() {
this.isRunning = false;
try {
log.info("Waiting for last message to complete");
try {
this.lastMessageFuture.get(60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new RuntimeException("Last message hasn't completed within timeout!");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
log.info("Producer Thread stopped!");
}
public long getLastSeqId() {
return this.atomicLong.get();
}
}
@Test(timeOut = 300000, groups = "quarantine")
public void testClientDeduplicationCorrectnessWithFailure() throws Exception {
final String namespacePortion = "dedup";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
admin.namespaces().setDeduplicationStatus(replNamespace, true);
admin.namespaces().setRetention(replNamespace, new RetentionPolicies(-1, -1));
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.blockIfQueueFull(true).sendTimeout(0, TimeUnit.SECONDS)
.topic(sourceTopic)
.producerName("test-producer-1")
.create();
ProducerThread producerThread = new ProducerThread(producer);
producerThread.start();
retryStrategically((test) -> {
try {
TopicStats topicStats = admin.topics().getStats(sourceTopic);
return topicStats.getPublishers().size() == 1 && topicStats.getPublishers().get(0).getProducerName().equals("test-producer-1") && topicStats.getStorageSize() > 0;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 200);
TopicStats topicStats = admin.topics().getStats(sourceTopic);
assertEquals(topicStats.getPublishers().size(), 1);
assertEquals(topicStats.getPublishers().get(0).getProducerName(), "test-producer-1");
assertTrue(topicStats.getStorageSize() > 0);
for (int i = 0; i < 5; i++) {
log.info("Stopping BK...");
bkEnsemble.stopBK();
Thread.sleep(1000 + new Random().nextInt(500));
log.info("Starting BK...");
bkEnsemble.startBK();
}
producerThread.stop();
// send last message
producer.newMessage().sequenceId(producerThread.getLastSeqId() + 1).value("end").send();
producer.close();
Reader<String> reader = pulsarClient.newReader(Schema.STRING).startMessageId(MessageId.earliest)
.topic(sourceTopic).create();
Message<String> prevMessage = null;
Message<String> message = null;
int count = 0;
while(true) {
message = reader.readNext(5, TimeUnit.SECONDS);
if (message == null) {
break;
}
if (message.getValue().equals("end")) {
log.info("Last seq Id received: {}", prevMessage.getSequenceId());
break;
}
if (prevMessage == null) {
assertEquals(message.getSequenceId(), 1);
} else {
assertEquals(message.getSequenceId(), prevMessage.getSequenceId() + 1);
}
prevMessage = message;
count++;
}
log.info("# of messages read: {}", count);
assertNotNull(prevMessage);
assertEquals(prevMessage.getSequenceId(), producerThread.getLastSeqId());
}
@Test(timeOut = 300000)
public void testClientDeduplicationWithBkFailure() throws Exception {
final String namespacePortion = "dedup";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
final String subscriptionName1 = "sub1";
final String subscriptionName2 = "sub2";
final String consumerName1 = "test-consumer-1";
final String consumerName2 = "test-consumer-2";
final List<Message<String>> msgRecvd = new LinkedList<>();
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
admin.namespaces().setDeduplicationStatus(replNamespace, true);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic)
.producerName("test-producer-1").create();
Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(sourceTopic)
.consumerName(consumerName1).subscriptionName(subscriptionName1).subscribe();
Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(sourceTopic)
.consumerName(consumerName2).subscriptionName(subscriptionName2).subscribe();
Thread thread = new Thread(() -> {
while(true) {
try {
Message<String> msg = consumer2.receive();
msgRecvd.add(msg);
consumer2.acknowledge(msg);
} catch (PulsarClientException e) {
log.error("Failed to consume message: {}", e, e);
break;
}
}
});
thread.start();
retryStrategically((test) -> {
try {
TopicStats topicStats = admin.topics().getStats(sourceTopic);
boolean c1 = topicStats!= null
&& topicStats.getSubscriptions().get(subscriptionName1) != null
&& topicStats.getSubscriptions().get(subscriptionName1).getConsumers().size() == 1
&& topicStats.getSubscriptions().get(subscriptionName1).getConsumers().get(0).getConsumerName().equals(consumerName1);
boolean c2 = topicStats!= null
&& topicStats.getSubscriptions().get(subscriptionName2) != null
&& topicStats.getSubscriptions().get(subscriptionName2).getConsumers().size() == 1
&& topicStats.getSubscriptions().get(subscriptionName2).getConsumers().get(0).getConsumerName().equals(consumerName2);
return c1 && c2;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 200);
TopicStats topicStats1 = admin.topics().getStats(sourceTopic);
assertNotNull(topicStats1);
assertNotNull(topicStats1.getSubscriptions().get(subscriptionName1));
assertEquals(topicStats1.getSubscriptions().get(subscriptionName1).getConsumers().size(), 1);
assertEquals(topicStats1.getSubscriptions().get(subscriptionName1).getConsumers().get(0).getConsumerName(), consumerName1);
TopicStats topicStats2 = admin.topics().getStats(sourceTopic);
assertNotNull(topicStats2);
assertNotNull(topicStats2.getSubscriptions().get(subscriptionName2));
assertEquals(topicStats2.getSubscriptions().get(subscriptionName2).getConsumers().size(), 1);
assertEquals(topicStats2.getSubscriptions().get(subscriptionName2).getConsumers().get(0).getConsumerName(), consumerName2);
for (int i=0; i<10; i++) {
producer.newMessage().sequenceId(i).value("foo-" + i).send();
}
for (int i=0; i<10; i++) {
Message<String> msg = consumer1.receive();
consumer1.acknowledge(msg);
assertEquals(msg.getValue(), "foo-" + i);
assertEquals(msg.getSequenceId(), i);
}
log.info("Stopping BK...");
bkEnsemble.stopBK();
List<CompletableFuture<MessageId>> futures = new LinkedList<>();
for (int i=10; i<20; i++) {
CompletableFuture<MessageId> future = producer.newMessage().sequenceId(i).value("foo-" + i).sendAsync();
int finalI = i;
future.thenRun(() -> log.error("message: {} successful", finalI)).exceptionally((Function<Throwable, Void>) throwable -> {
log.info("message: {} failed: {}", finalI, throwable, throwable);
return null;
});
futures.add(future);
}
for (int i = 0; i < futures.size(); i++) {
try {
// message should not be produced successfully
futures.get(i).join();
fail();
} catch (CompletionException ex) {
} catch (Exception e) {
fail();
}
}
try {
producer.newMessage().sequenceId(10).value("foo-10").send();
fail();
} catch (PulsarClientException ex) {
}
try {
producer.newMessage().sequenceId(10).value("foo-10").send();
fail();
} catch (PulsarClientException ex) {
}
log.info("Starting BK...");
bkEnsemble.startBK();
for (int i=20; i<30; i++) {
producer.newMessage().sequenceId(i).value("foo-" + i).send();
}
MessageId lastMessageId = null;
for (int i=20; i<30; i++) {
Message<String> msg = consumer1.receive();
lastMessageId = msg.getMessageId();
consumer1.acknowledge(msg);
assertEquals(msg.getValue(), "foo-" + i);
assertEquals(msg.getSequenceId(), i);
}
// check all messages
retryStrategically((test) -> msgRecvd.size() >= 20, 5, 200);
assertEquals(msgRecvd.size(), 20);
for (int i = 0; i < 10; i++) {
assertEquals(msgRecvd.get(i).getValue(), "foo-" + i);
assertEquals(msgRecvd.get(i).getSequenceId(), i);
}
for (int i = 10; i <20; i++) {
assertEquals(msgRecvd.get(i).getValue(), "foo-" + (i + 10));
assertEquals(msgRecvd.get(i).getSequenceId(), i + 10);
}
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId;
MessageIdImpl messageId = (MessageIdImpl) consumer1.getLastMessageId();
assertEquals(messageId.getLedgerId(), batchMessageId.getLedgerId());
assertEquals(messageId.getEntryId(), batchMessageId.getEntryId());
thread.interrupt();
}
}