blob: 186a2491a47509a56698b3b28727e62d4c47f7b7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* Tests replicated subscriptions (PIP-33)
*/
@Test(groups = "broker")
public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
private static final Logger log = LoggerFactory.getLogger(ReplicatorSubscriptionTest.class);
@Override
@BeforeClass(timeOut = 300000)
public void setup() throws Exception {
super.setup();
}
@Override
@AfterClass(alwaysRun = true, timeOut = 300000)
public void cleanup() throws Exception {
super.cleanup();
}
/**
* Tests replicated subscriptions across two regions
*/
@Test
public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
String topicName = "persistent://" + namespace + "/mytopic";
String subscriptionName = "cluster-subscription";
// Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054
// TODO: duplications shouldn't be allowed, change to "false" when fixing the issue
boolean allowDuplicates = true;
// this setting can be used to manually run the test with subscription replication disabled
// it shows that subscription replication has no impact in behavior for this test case
boolean replicateSubscriptionState = true;
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();
// create subscription in r1
createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState);
@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();
// create subscription in r2
createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState);
Set<String> sentMessages = new LinkedHashSet<>();
// send messages in r1
{
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
int numMessages = 6;
for (int i = 0; i < numMessages; i++) {
String body = "message" + i;
producer.send(body.getBytes(StandardCharsets.UTF_8));
sentMessages.add(body);
}
producer.close();
}
Set<String> receivedMessages = new LinkedHashSet<>();
// consume 3 messages in r1
try (Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(replicateSubscriptionState)
.subscribe()) {
readMessages(consumer1, receivedMessages, 3, allowDuplicates);
}
// wait for subscription to be replicated
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
// consume remaining messages in r2
try (Consumer<byte[]> consumer2 = client2.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(replicateSubscriptionState)
.subscribe()) {
readMessages(consumer2, receivedMessages, -1, allowDuplicates);
}
// assert that all messages have been received
assertEquals(new ArrayList<>(sentMessages), new ArrayList<>(receivedMessages), "Sent and received " +
"messages don't match.");
}
/**
* If there's no traffic, the snapshot creation should stop and then resume when traffic comes back
*/
@Test
public void testReplicationSnapshotStopWhenNoTraffic() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
String topicName = "persistent://" + namespace + "/mytopic";
String subscriptionName = "cluster-subscription";
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
@Cleanup
PulsarClient client1 = PulsarClient.builder()
.serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();
// create subscription in r1
createReplicatedSubscription(client1, topicName, subscriptionName, true);
// Validate that no snapshots are created before messages are published
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
.getTopic(topicName, false).get().get();
ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get();
// no snapshot should have been created before any messages are published
assertFalse(rsc1.getLastCompletedSnapshotId().isPresent());
@Cleanup
PulsarClient client2 = PulsarClient.builder()
.serviceUrl(url2.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();
Set<String> sentMessages = new LinkedHashSet<>();
// send messages in r1
{
@Cleanup
Producer<String> producer = client1.newProducer(Schema.STRING)
.topic(topicName)
.create();
for (int i = 0; i < 10; i++) {
producer.send("hello-" + i);
}
}
// Wait for last snapshots to be created
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
// In R1
Position p1 = t1.getLastPosition();
String snapshot1 = rsc1.getLastCompletedSnapshotId().get();
// In R2
PersistentTopic t2 = (PersistentTopic) pulsar1.getBrokerService()
.getTopic(topicName, false).get().get();
ReplicatedSubscriptionsController rsc2 = t2.getReplicatedSubscriptionController().get();
Position p2 = t2.getLastPosition();
String snapshot2 = rsc2.getLastCompletedSnapshotId().get();
// There shouldn't be anymore snapshots
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
assertEquals(t1.getLastPosition(), p1);
assertEquals(rsc1.getLastCompletedSnapshotId().get(), snapshot1);
assertEquals(t2.getLastPosition(), p2);
assertEquals(rsc2.getLastCompletedSnapshotId().get(), snapshot2);
@Cleanup
Producer<String> producer2 = client2.newProducer(Schema.STRING)
.topic(topicName)
.create();
for (int i = 0; i < 10; i++) {
producer2.send("hello-" + i);
}
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
// Now we should have one or more snapshots
assertNotEquals(t1.getLastPosition(), p1);
assertNotEquals(rsc1.getLastCompletedSnapshotId().get(), snapshot1);
assertNotEquals(t2.getLastPosition(), p2);
assertNotEquals(rsc2.getLastCompletedSnapshotId().get(), snapshot2);
}
@Test(timeOut = 30000)
public void testReplicatedSubscriptionRestApi1() throws Exception {
final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
final String topicName = "persistent://" + namespace + "/topic-rest-api1";
final String subName = "sub";
// Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054
// TODO: duplications shouldn't be allowed, change to "false" when fixing the issue
final boolean allowDuplicates = true;
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
@Cleanup
final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
// Create subscription in r1
createReplicatedSubscription(client1, topicName, subName, true);
@Cleanup
final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
// Create subscription in r2
createReplicatedSubscription(client2, topicName, subName, true);
TopicStats stats = admin1.topics().getStats(topicName);
assertTrue(stats.getSubscriptions().get(subName).isReplicated());
// Disable replicated subscription in r1
admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
stats = admin1.topics().getStats(topicName);
assertFalse(stats.getSubscriptions().get(subName).isReplicated());
stats = admin2.topics().getStats(topicName);
assertTrue(stats.getSubscriptions().get(subName).isReplicated());
// Disable replicated subscription in r2
admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
stats = admin2.topics().getStats(topicName);
assertFalse(stats.getSubscriptions().get(subName).isReplicated());
// Unload topic in r1
admin1.topics().unload(topicName);
Awaitility.await().untilAsserted(() -> {
TopicStats stats2 = admin1.topics().getStats(topicName);
assertFalse(stats2.getSubscriptions().get(subName).isReplicated());
});
// Make sure the replicated subscription is actually disabled
final int numMessages = 20;
final Set<String> sentMessages = new LinkedHashSet<>();
final Set<String> receivedMessages = new LinkedHashSet<>();
Producer<byte[]> producer = client1.newProducer().topic(topicName).enableBatching(false).create();
sentMessages.clear();
publishMessages(producer, 0, numMessages, sentMessages);
producer.close();
Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
receivedMessages.clear();
readMessages(consumer1, receivedMessages, numMessages, false);
assertEquals(receivedMessages, sentMessages);
consumer1.close();
Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
receivedMessages.clear();
readMessages(consumer2, receivedMessages, numMessages, false);
assertEquals(receivedMessages, sentMessages);
consumer2.close();
// Enable replicated subscription in r1
admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
stats = admin1.topics().getStats(topicName);
assertTrue(stats.getSubscriptions().get(subName).isReplicated());
stats = admin2.topics().getStats(topicName);
assertFalse(stats.getSubscriptions().get(subName).isReplicated());
// Enable replicated subscription in r2
admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
stats = admin2.topics().getStats(topicName);
assertTrue(stats.getSubscriptions().get(subName).isReplicated());
// Make sure the replicated subscription is actually enabled
sentMessages.clear();
receivedMessages.clear();
producer = client1.newProducer().topic(topicName).enableBatching(false).create();
publishMessages(producer, 0, numMessages / 2, sentMessages);
producer.close();
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
final int numReceivedMessages1 = readMessages(consumer1, receivedMessages, numMessages / 2, allowDuplicates);
consumer1.close();
producer = client1.newProducer().topic(topicName).enableBatching(false).create();
publishMessages(producer, numMessages / 2, numMessages / 2, sentMessages);
producer.close();
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
final int numReceivedMessages2 = readMessages(consumer2, receivedMessages, -1, allowDuplicates);
consumer2.close();
assertEquals(receivedMessages, sentMessages);
assertTrue(numReceivedMessages1 < numMessages,
String.format("numReceivedMessages1 (%d) should be less than %d", numReceivedMessages1, numMessages));
assertTrue(numReceivedMessages2 < numMessages,
String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, numMessages));
}
@Test(timeOut = 30000)
public void testReplicatedSubscriptionRestApi2() throws Exception {
final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
final String topicName = "persistent://" + namespace + "/topic-rest-api2";
final String subName = "sub";
// Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054
// TODO: duplications shouldn't be allowed, change to "false" when fixing the issue
final boolean allowDuplicates = true;
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
admin1.topics().createPartitionedTopic(topicName, 2);
@Cleanup
final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
// Create subscription in r1
createReplicatedSubscription(client1, topicName, subName, true);
@Cleanup
final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
// Create subscription in r2
createReplicatedSubscription(client2, topicName, subName, true);
PartitionedTopicStats partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
for (TopicStats stats : partitionedStats.getPartitions().values()) {
assertTrue(stats.getSubscriptions().get(subName).isReplicated());
}
// Disable replicated subscription in r1
admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
for (TopicStats stats : partitionedStats.getPartitions().values()) {
assertFalse(stats.getSubscriptions().get(subName).isReplicated());
}
// Disable replicated subscription in r2
admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
partitionedStats = admin2.topics().getPartitionedStats(topicName, true);
for (TopicStats stats : partitionedStats.getPartitions().values()) {
assertFalse(stats.getSubscriptions().get(subName).isReplicated());
}
// Make sure the replicated subscription is actually disabled
final int numMessages = 20;
final Set<String> sentMessages = new LinkedHashSet<>();
final Set<String> receivedMessages = new LinkedHashSet<>();
Producer<byte[]> producer = client1.newProducer().topic(topicName).enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
sentMessages.clear();
publishMessages(producer, 0, numMessages, sentMessages);
producer.close();
Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
receivedMessages.clear();
readMessages(consumer1, receivedMessages, numMessages, false);
assertEquals(receivedMessages, sentMessages);
consumer1.close();
Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
receivedMessages.clear();
readMessages(consumer2, receivedMessages, numMessages, false);
assertEquals(receivedMessages, sentMessages);
consumer2.close();
// Enable replicated subscription in r1
admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
for (TopicStats stats : partitionedStats.getPartitions().values()) {
assertTrue(stats.getSubscriptions().get(subName).isReplicated());
}
// Enable replicated subscription in r2
admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
partitionedStats = admin2.topics().getPartitionedStats(topicName, true);
for (TopicStats stats : partitionedStats.getPartitions().values()) {
assertTrue(stats.getSubscriptions().get(subName).isReplicated());
}
// Make sure the replicated subscription is actually enabled
sentMessages.clear();
receivedMessages.clear();
producer = client1.newProducer().topic(topicName).enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
publishMessages(producer, 0, numMessages / 2, sentMessages);
producer.close();
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
final int numReceivedMessages1 = readMessages(consumer1, receivedMessages, numMessages / 2, allowDuplicates);
consumer1.close();
producer = client1.newProducer().topic(topicName).enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
publishMessages(producer, numMessages / 2, numMessages / 2, sentMessages);
producer.close();
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
final int numReceivedMessages2 = readMessages(consumer2, receivedMessages, -1, allowDuplicates);
consumer2.close();
assertEquals(receivedMessages, sentMessages);
assertTrue(numReceivedMessages1 < numMessages,
String.format("numReceivedMessages1 (%d) should be less than %d", numReceivedMessages1, numMessages));
assertTrue(numReceivedMessages2 < numMessages,
String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, numMessages));
}
/**
* Tests replicated subscriptions when replicator producer is closed
*/
@Test
public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
String topicName = "persistent://" + namespace + "/when-replicator-producer-is-closed";
String subscriptionName = "sub";
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();
{
// create consumer in r1
@Cleanup
Consumer<byte[]> consumer = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(true)
.subscribe();
// send one message to trigger replication
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.send("message".getBytes(StandardCharsets.UTF_8));
assertEquals(readMessages(consumer, new HashSet<>(), 1, false), 1);
// waiting to replicate topic/subscription to r1->r2
Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName));
final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
}
// unsubscribe replicated subscription in r2
admin2.topics().deleteSubscription(topicName, subscriptionName);
final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
assertNull(topic2.getSubscription(subscriptionName));
// close replicator producer in r2
final Method closeReplProducersIfNoBacklog = PersistentTopic.class.getDeclaredMethod("closeReplProducersIfNoBacklog", null);
closeReplProducersIfNoBacklog.setAccessible(true);
((CompletableFuture<Void>) closeReplProducersIfNoBacklog.invoke(topic2, null)).join();
assertFalse(topic2.getReplicators().get("r1").isConnected());
// send messages in r1
int numMessages = 6;
{
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < numMessages; i++) {
String body = "message" + i;
producer.send(body.getBytes(StandardCharsets.UTF_8));
}
}
// consume 6 messages in r1
Set<String> receivedMessages = new LinkedHashSet<>();
@Cleanup
Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(true)
.subscribe();
assertEquals(readMessages(consumer1, receivedMessages, numMessages, false), numMessages);
// wait for subscription to be replicated
Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
}
void publishMessages(Producer<byte[]> producer, int startIndex, int numMessages, Set<String> sentMessages)
throws PulsarClientException {
for (int i = startIndex; i < startIndex + numMessages; i++) {
final String msg = "msg" + i;
producer.send(msg.getBytes(StandardCharsets.UTF_8));
sentMessages.add(msg);
}
}
int readMessages(Consumer<byte[]> consumer, Set<String> messages, int maxMessages, boolean allowDuplicates)
throws PulsarClientException {
int count = 0;
while (count < maxMessages || maxMessages == -1) {
Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
if (message != null) {
count++;
String body = new String(message.getValue(), StandardCharsets.UTF_8);
if (!allowDuplicates) {
assertFalse(messages.contains(body), "Duplicate message '" + body + "' detected.");
}
messages.add(body);
consumer.acknowledge(message);
} else {
break;
}
}
return count;
}
void createReplicatedSubscription(PulsarClient pulsarClient, String topicName, String subscriptionName,
boolean replicateSubscriptionState)
throws PulsarClientException {
pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(replicateSubscriptionState)
.subscribe()
.close();
}
}