blob: 1ec734bc508e9368d26a363244068ec384778fdb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.tests.EnumValuesDataProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test
public class MessageIdTest extends BrokerTestBase {
private static final Logger log = LoggerFactory.getLogger(MessageIdTest.class);
@BeforeMethod
@Override
public void setup() throws Exception {
baseSetup();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
internalCleanup();
}
@Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class, dataProvider = "values")
public void producerSendAsync(TopicType topicType) throws PulsarClientException, PulsarAdminException {
// Given
String key = "producerSendAsync-" + topicType;
final String topicName = "persistent://prop/cluster/namespace/topic-" + key;
final String subscriptionName = "my-subscription-" + key;
final String messagePrefix = "my-message-" + key + "-";
final int numberOfMessages = 30;
if (topicType == TopicType.PARTITIONED) {
int numberOfPartitions = 3;
admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
}
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.subscribe();
// When
// Messages are published asynchronously
List<Future<MessageId>> futures = new ArrayList<>();
for (int i = 0; i < numberOfMessages; i++) {
String message = messagePrefix + i;
futures.add(producer.sendAsync(message.getBytes()));
}
// Then
// expect that the Message Ids of subsequently sent messages are in ascending order
Set<MessageId> messageIds = new HashSet<>();
MessageIdImpl previousMessageId = null;
for (Future<MessageId> f : futures) {
try {
MessageIdImpl currentMessageId = (MessageIdImpl) f.get();
if (previousMessageId != null) {
assertTrue(currentMessageId.compareTo(previousMessageId) > 0,
"Message Ids should be in ascending order");
}
messageIds.add(currentMessageId);
previousMessageId = currentMessageId;
} catch (Exception e) {
fail("Failed to publish message", e);
}
}
// And
// expect that there's a message id for each sent out message
// and that all messages have been received by the consumer
log.info("Message IDs = {}", messageIds);
assertEquals(messageIds.size(), numberOfMessages, "Not all messages published successfully");
for (int i = 0; i < numberOfMessages; i++) {
Message<byte[]> message = consumer.receive();
assertEquals(new String(message.getData()), messagePrefix + i);
MessageId messageId = message.getMessageId();
if (topicType == TopicType.PARTITIONED) {
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
}
assertTrue(messageIds.remove(messageId), "Failed to receive message");
}
log.info("Remaining message IDs = {}", messageIds);
assertEquals(messageIds.size(), 0, "Not all messages received successfully");
consumer.unsubscribe();
}
@Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class, dataProvider = "values")
public void producerSend(TopicType topicType) throws PulsarClientException, PulsarAdminException {
// Given
String key = "producerSend-" + topicType;
final String topicName = "persistent://prop/cluster/namespace/topic-" + key;
final String subscriptionName = "my-subscription-" + key;
final String messagePrefix = "my-message-" + key + "-";
final int numberOfMessages = 30;
if (topicType == TopicType.PARTITIONED) {
int numberOfPartitions = 7;
admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
}
Producer<byte[]> producer = pulsarClient.newProducer()
.enableBatching(false)
.topic(topicName)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.subscribe();
// When
// Messages are published
Set<MessageId> messageIds = new HashSet<>();
for (int i = 0; i < numberOfMessages; i++) {
String message = messagePrefix + i;
messageIds.add(producer.send(message.getBytes()));
}
// Then
// expect that the Message Ids of subsequently sent messages are in ascending order
log.info("Message IDs = {}", messageIds);
assertEquals(messageIds.size(), numberOfMessages, "Not all messages published successfully");
for (int i = 0; i < numberOfMessages; i++) {
MessageId messageId = consumer.receive().getMessageId();
if (topicType == TopicType.PARTITIONED) {
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
}
assertTrue(messageIds.remove(messageId), "Failed to receive Message");
}
log.info("Remaining message IDs = {}", messageIds);
assertEquals(messageIds.size(), 0, "Not all messages received successfully");
consumer.unsubscribe();
}
}