blob: 7ddd082891040c4d0a46539f3db9b230b5e3fa7e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.messaging;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertEquals;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.testng.annotations.BeforeMethod;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class MessagingBase extends PulsarTestSuite {
protected String methodName;
@BeforeMethod
public void beforeMethod(Method m) throws Exception {
methodName = m.getName();
}
protected String getNonPartitionedTopic(String topicPrefix, boolean isPersistent) throws Exception {
String nsName = generateNamespaceName();
pulsarCluster.createNamespace(nsName);
return generateTopicName(nsName, topicPrefix, true);
}
protected String getPartitionedTopic(String topicPrefix, boolean isPersistent, int partitions) throws Exception {
assertTrue(partitions > 0, "partitions must greater than 1");
String nsName = generateNamespaceName();
pulsarCluster.createNamespace(nsName);
String topicName = generateTopicName(nsName, topicPrefix, true);
pulsarCluster.createPartitionedTopic(topicName, partitions);
return topicName;
}
protected <T extends Comparable<T>> void receiveMessagesCheckOrderAndDuplicate
(List<Consumer<T>> consumerList, int messagesToReceive) throws PulsarClientException {
Set<T> messagesReceived = Sets.newHashSet();
for (Consumer<T> consumer : consumerList) {
Message<T> currentReceived;
Map<String, Message<T>> lastReceivedMap = new HashMap<>();
while (true) {
try {
currentReceived = consumer.receive(3, TimeUnit.SECONDS);
} catch (PulsarClientException e) {
log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
break;
}
// Make sure that messages are received in order
if (currentReceived != null) {
consumer.acknowledge(currentReceived);
if (lastReceivedMap.containsKey(currentReceived.getTopicName())) {
assertTrue(currentReceived.getMessageId().compareTo(
lastReceivedMap.get(currentReceived.getTopicName()).getMessageId()) > 0,
"Received messages are not in order.");
}
} else {
break;
}
lastReceivedMap.put(currentReceived.getTopicName(), currentReceived);
// Make sure that there are no duplicates
assertTrue(messagesReceived.add(currentReceived.getValue()),
"Received duplicate message " + currentReceived.getValue());
}
}
assertEquals(messagesReceived.size(), messagesToReceive);
}
protected <T> void receiveMessagesCheckDuplicate
(List<Consumer<T>> consumerList, int messagesToReceive) throws PulsarClientException {
Set<T> messagesReceived = Sets.newHashSet();
for (Consumer<T> consumer : consumerList) {
Message<T> currentReceived = null;
while (true) {
try {
currentReceived = consumer.receive(3, TimeUnit.SECONDS);
} catch (PulsarClientException e) {
log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
break;
}
if (currentReceived != null) {
consumer.acknowledge(currentReceived);
// Make sure that there are no duplicates
assertTrue(messagesReceived.add(currentReceived.getValue()),
"Received duplicate message " + currentReceived.getValue());
} else {
break;
}
}
}
assertEquals(messagesReceived.size(), messagesToReceive);
}
protected <T> void receiveMessagesCheckStickyKeyAndDuplicate
(List<Consumer<T>> consumerList, int messagesToReceive) throws PulsarClientException {
Map<String, Set<String>> consumerKeys = Maps.newHashMap();
Set<T> messagesReceived = Sets.newHashSet();
for (Consumer<T> consumer : consumerList) {
Message<T> currentReceived;
while (true) {
try {
currentReceived = consumer.receive(3, TimeUnit.SECONDS);
} catch (PulsarClientException e) {
log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
break;
}
if (currentReceived != null) {
consumer.acknowledge(currentReceived);
assertNotNull(currentReceived.getKey());
consumerKeys.putIfAbsent(consumer.getConsumerName(), Sets.newHashSet());
consumerKeys.get(consumer.getConsumerName()).add(currentReceived.getKey());
// Make sure that there are no duplicates
assertTrue(messagesReceived.add(currentReceived.getValue()),
"Received duplicate message " + currentReceived.getValue());
} else {
break;
}
}
}
// Make sure key will not be distributed to multiple consumers
Set<String> allKeys = Sets.newHashSet();
consumerKeys.forEach((k, v) -> v.forEach(key -> {
assertTrue(allKeys.add(key),
"Key "+ key + "is distributed to multiple consumers" );
}));
assertEquals(messagesReceived.size(), messagesToReceive);
}
protected <T> void closeConsumers(List<Consumer<T>> consumerList) throws PulsarClientException {
Iterator<Consumer<T>> iterator = consumerList.iterator();
while (iterator.hasNext()) {
iterator.next().close();
iterator.remove();
}
}
}