| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.client.api; |
| |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertFalse; |
| import static org.testng.Assert.assertNull; |
| import static org.testng.Assert.assertTrue; |
| import static org.testng.Assert.fail; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import lombok.Cleanup; |
| |
| import org.apache.bookkeeper.common.concurrent.FutureUtils; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.pulsar.client.impl.BatchMessageIdImpl; |
| import org.apache.pulsar.client.impl.MessageIdImpl; |
| import org.apache.pulsar.client.impl.MessageImpl; |
| import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; |
| import org.apache.pulsar.client.impl.MultiTopicsReaderImpl; |
| import org.apache.pulsar.client.impl.ReaderImpl; |
| import org.apache.pulsar.client.impl.TopicMessageIdImpl; |
| import org.apache.pulsar.client.impl.TopicMessageImpl; |
| import org.apache.pulsar.common.policies.data.TopicStats; |
| import org.apache.pulsar.common.util.RelativeTimeUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.DataProvider; |
| import org.testng.annotations.Test; |
| |
| @Test(groups = "flaky") |
| public class TopicReaderTest extends ProducerConsumerBase { |
| private static final Logger log = LoggerFactory.getLogger(TopicReaderTest.class); |
| |
| @BeforeClass |
| @Override |
| protected void setup() throws Exception { |
| super.internalSetup(); |
| super.producerBaseSetup(); |
| } |
| |
| @AfterClass(alwaysRun = true) |
| @Override |
| protected void cleanup() throws Exception { |
| super.internalCleanup(); |
| } |
| |
| @DataProvider |
| public static Object[][] variationsForExpectedPos() { |
| return new Object[][] { |
| // batching / start-inclusive / num-of-messages |
| {true, true, 10 }, |
| {true, false, 10 }, |
| {false, true, 10 }, |
| {false, false, 10 }, |
| |
| {true, true, 100 }, |
| {true, false, 100 }, |
| {false, true, 100 }, |
| {false, false, 100 }, |
| }; |
| } |
| |
| @DataProvider |
| public static Object[][] variationsForResetOnLatestMsg() { |
| return new Object[][] { |
| // start-inclusive / num-of-messages |
| {true, 20}, |
| {false, 20} |
| }; |
| } |
| |
| @DataProvider |
| public static Object[][] variationsForHasMessageAvailable() { |
| return new Object[][] { |
| // batching / start-inclusive |
| {true, true}, |
| {true, false}, |
| {false, true}, |
| {false, false}, |
| }; |
| } |
| |
| @Test |
| public void testSimpleReader() throws Exception { |
| Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader") |
| .startMessageId(MessageId.earliest).create(); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/testSimpleReader") |
| .create(); |
| for (int i = 0; i < 10; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| Message<byte[]> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = 0; i < 10; i++) { |
| msg = reader.readNext(1, TimeUnit.SECONDS); |
| |
| String receivedMessage = new String(msg.getData()); |
| log.debug("Received message: [{}]", receivedMessage); |
| String expectedMessage = "my-message-" + i; |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); |
| } |
| |
| // Acknowledge the consumption of all messages at once |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testSimpleMultiReader() throws Exception { |
| String topic = "persistent://my-property/my-ns/testSimpleMultiReader"; |
| admin.topics().createPartitionedTopic(topic, 3); |
| |
| Reader<byte[]> reader = pulsarClient.newReader().topic(topic) |
| .startMessageId(MessageId.earliest).create(); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .create(); |
| for (int i = 0; i < 10; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| Message<byte[]> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = 0; i < 10; i++) { |
| msg = reader.readNext(1, TimeUnit.SECONDS); |
| String receivedMessage = new String(msg.getData()); |
| assertTrue(messageSet.add(receivedMessage)); |
| } |
| |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testReaderAfterMessagesWerePublished() throws Exception { |
| Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished") |
| .create(); |
| for (int i = 0; i < 10; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| Reader<byte[]> reader = pulsarClient.newReader() |
| .topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished") |
| .startMessageId(MessageId.earliest).create(); |
| |
| Message<byte[]> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = 0; i < 10; i++) { |
| msg = reader.readNext(1, TimeUnit.SECONDS); |
| |
| String receivedMessage = new String(msg.getData()); |
| log.debug("Received message: [{}]", receivedMessage); |
| String expectedMessage = "my-message-" + i; |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); |
| } |
| |
| // Acknowledge the consumption of all messages at once |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testMultiReaderAfterMessagesWerePublished() throws Exception { |
| String topic = "persistent://my-property/my-ns/testMultiReaderAfterMessagesWerePublished"; |
| admin.topics().createPartitionedTopic(topic, 3); |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .create(); |
| for (int i = 0; i < 10; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| Reader<byte[]> reader = pulsarClient.newReader().topic(topic) |
| .startMessageId(MessageId.earliest).create(); |
| |
| Message<byte[]> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = 0; i < 10; i++) { |
| msg = reader.readNext(1, TimeUnit.SECONDS); |
| |
| String receivedMessage = new String(msg.getData()); |
| assertTrue(messageSet.add(receivedMessage)); |
| } |
| |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testMultipleReaders() throws Exception { |
| Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/testMultipleReaders") |
| .create(); |
| for (int i = 0; i < 10; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| Reader<byte[]> reader1 = pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders") |
| .startMessageId(MessageId.earliest).create(); |
| |
| Reader<byte[]> reader2 = pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders") |
| .startMessageId(MessageId.earliest).create(); |
| |
| Message<byte[]> msg = null; |
| Set<String> messageSet1 = Sets.newHashSet(); |
| for (int i = 0; i < 10; i++) { |
| msg = reader1.readNext(1, TimeUnit.SECONDS); |
| |
| String receivedMessage = new String(msg.getData()); |
| log.debug("Received message: [{}]", receivedMessage); |
| String expectedMessage = "my-message-" + i; |
| testMessageOrderAndDuplicates(messageSet1, receivedMessage, expectedMessage); |
| } |
| |
| Set<String> messageSet2 = Sets.newHashSet(); |
| for (int i = 0; i < 10; i++) { |
| msg = reader2.readNext(1, TimeUnit.SECONDS); |
| |
| String receivedMessage = new String(msg.getData()); |
| log.debug("Received message: [{}]", receivedMessage); |
| String expectedMessage = "my-message-" + i; |
| testMessageOrderAndDuplicates(messageSet2, receivedMessage, expectedMessage); |
| } |
| |
| reader1.close(); |
| reader2.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testMultiMultipleReaders() throws Exception { |
| final String topic = "persistent://my-property/my-ns/testMultiMultipleReaders"; |
| admin.topics().createPartitionedTopic(topic, 3); |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .create(); |
| for (int i = 0; i < 10; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| Reader<byte[]> reader1 = pulsarClient.newReader().topic(topic) |
| .startMessageId(MessageId.earliest).create(); |
| |
| Reader<byte[]> reader2 = pulsarClient.newReader().topic(topic) |
| .startMessageId(MessageId.earliest).create(); |
| |
| Message<byte[]> msg = null; |
| Set<String> messageSet1 = Sets.newHashSet(); |
| for (int i = 0; i < 10; i++) { |
| msg = reader1.readNext(1, TimeUnit.SECONDS); |
| String receivedMessage = new String(msg.getData()); |
| assertTrue(messageSet1.add(receivedMessage)); |
| } |
| |
| Set<String> messageSet2 = Sets.newHashSet(); |
| for (int i = 0; i < 10; i++) { |
| msg = reader2.readNext(1, TimeUnit.SECONDS); |
| |
| String receivedMessage = new String(msg.getData()); |
| assertTrue(messageSet2.add(receivedMessage)); |
| } |
| |
| reader1.close(); |
| reader2.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testTopicStats() throws Exception { |
| String topicName = "persistent://my-property/my-ns/testTopicStats"; |
| |
| Reader<byte[]> reader1 = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create(); |
| |
| Reader<byte[]> reader2 = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create(); |
| |
| TopicStats stats = admin.topics().getStats(topicName); |
| assertEquals(stats.getSubscriptions().size(), 2); |
| |
| reader1.close(); |
| stats = admin.topics().getStats(topicName); |
| assertEquals(stats.getSubscriptions().size(), 1); |
| |
| reader2.close(); |
| |
| stats = admin.topics().getStats(topicName); |
| assertEquals(stats.getSubscriptions().size(), 0); |
| } |
| |
| @Test |
| public void testMultiTopicStats() throws Exception { |
| String topicName = "persistent://my-property/my-ns/testMultiTopicStats"; |
| admin.topics().createPartitionedTopic(topicName, 3); |
| |
| Reader<byte[]> reader1 = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create(); |
| |
| Reader<byte[]> reader2 = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create(); |
| |
| TopicStats stats = admin.topics().getPartitionedStats(topicName,true); |
| assertEquals(stats.getSubscriptions().size(), 2); |
| |
| reader1.close(); |
| stats = admin.topics().getPartitionedStats(topicName, true); |
| assertEquals(stats.getSubscriptions().size(), 1); |
| |
| reader2.close(); |
| |
| stats = admin.topics().getPartitionedStats(topicName, true); |
| assertEquals(stats.getSubscriptions().size(), 0); |
| } |
| |
| @Test(dataProvider = "variationsForResetOnLatestMsg") |
| public void testReaderOnLatestMessage(boolean startInclusive, int numOfMessages) throws Exception { |
| final String topicName = "persistent://my-property/my-ns/ReaderOnLatestMessage"; |
| final int halfOfMsgs = numOfMessages / 2; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topicName) |
| .create(); |
| |
| for (int i = 0; i < halfOfMsgs; i++) { |
| producer.send(String.format("my-message-%d", i).getBytes()); |
| } |
| |
| ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader() |
| .topic(topicName) |
| .startMessageId(MessageId.latest); |
| |
| if (startInclusive) { |
| readerBuilder.startMessageIdInclusive(); |
| } |
| |
| Reader<byte[]> reader = readerBuilder.create(); |
| |
| for (int i = halfOfMsgs; i < numOfMessages; i++) { |
| producer.send(String.format("my-message-%d", i).getBytes()); |
| } |
| |
| // Publish more messages and verify the readers only sees new messages |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = halfOfMsgs; i < numOfMessages; i++) { |
| Message<byte[]> message = reader.readNext(); |
| String receivedMessage = new String(message.getData()); |
| String expectedMessage = String.format("my-message-%d", i); |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); |
| } |
| |
| assertTrue(reader.isConnected()); |
| assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(), 0); |
| assertEquals(messageSet.size(), halfOfMsgs); |
| |
| // Acknowledge the consumption of all messages at once |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test(dataProvider = "variationsForResetOnLatestMsg") |
| public void testMultiReaderOnLatestMessage(boolean startInclusive, int numOfMessages) throws Exception { |
| final String topicName = "persistent://my-property/my-ns/testMultiReaderOnLatestMessage" + System.currentTimeMillis(); |
| admin.topics().createPartitionedTopic(topicName, 3); |
| final int halfOfMsgs = numOfMessages / 2; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topicName) |
| .create(); |
| |
| Set<byte[]> oldMessage = new HashSet<>(); |
| for (int i = 0; i < halfOfMsgs; i++) { |
| byte[] message = String.format("my-message-%d", i).getBytes(); |
| producer.send(message); |
| oldMessage.add(message); |
| } |
| |
| ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader() |
| .topic(topicName) |
| .startMessageId(MessageId.latest); |
| |
| if (startInclusive) { |
| readerBuilder.startMessageIdInclusive(); |
| } |
| |
| Reader<byte[]> reader = readerBuilder.create(); |
| |
| for (int i = halfOfMsgs; i < numOfMessages; i++) { |
| producer.send(String.format("my-message-%d", i).getBytes()); |
| } |
| |
| // Publish more messages and verify the readers only sees new messages |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = halfOfMsgs; i < numOfMessages; i++) { |
| Message<byte[]> message = reader.readNext(); |
| assertFalse(oldMessage.contains(message)); |
| String receivedMessage = new String(message.getData()); |
| assertTrue(messageSet.add(receivedMessage)); |
| } |
| |
| assertTrue(reader.isConnected()); |
| assertEquals(((MultiTopicsReaderImpl) reader).getMultiTopicsConsumer().numMessagesInQueue(), 0); |
| assertEquals(messageSet.size(), halfOfMsgs); |
| |
| producer.close(); |
| reader.close(); |
| } |
| |
| @Test |
| public void testReaderOnSpecificMessage() throws Exception { |
| Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage") |
| .create(); |
| List<MessageId> messageIds = new ArrayList<>(); |
| for (int i = 0; i < 10; i++) { |
| String message = "my-message-" + i; |
| messageIds.add(producer.send(message.getBytes())); |
| } |
| |
| Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage") |
| .startMessageId(messageIds.get(4)).create(); |
| |
| // Publish more messages and verify the readers only sees messages starting from the intended message |
| Message<byte[]> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = 5; i < 10; i++) { |
| msg = reader.readNext(1, TimeUnit.SECONDS); |
| |
| String receivedMessage = new String(msg.getData()); |
| log.debug("Received message: [{}]", receivedMessage); |
| String expectedMessage = "my-message-" + i; |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); |
| } |
| |
| // Acknowledge the consumption of all messages at once |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testReaderOnSpecificMessageWithBatches() throws Exception { |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches").enableBatching(true) |
| .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS).create(); |
| for (int i = 0; i < 10; i++) { |
| String message = "my-message-" + i; |
| producer.sendAsync(message.getBytes()); |
| } |
| |
| // Write one sync message to ensure everything before got persistend |
| producer.send("my-message-10".getBytes()); |
| Reader<byte[]> reader1 = pulsarClient.newReader() |
| .topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches") |
| .startMessageId(MessageId.earliest).create(); |
| |
| MessageId lastMessageId = null; |
| for (int i = 0; i < 5; i++) { |
| Message<byte[]> msg = reader1.readNext(); |
| lastMessageId = msg.getMessageId(); |
| } |
| |
| assertEquals(lastMessageId.getClass(), BatchMessageIdImpl.class); |
| |
| System.out.println("CREATING READER ON MSG ID: " + lastMessageId); |
| |
| Reader<byte[]> reader2 = pulsarClient.newReader() |
| .topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches") |
| .startMessageId(lastMessageId).create(); |
| |
| for (int i = 5; i < 11; i++) { |
| Message<byte[]> msg = reader2.readNext(1, TimeUnit.SECONDS); |
| |
| String receivedMessage = new String(msg.getData()); |
| log.debug("Received message: [{}]", receivedMessage); |
| String expectedMessage = "my-message-" + i; |
| assertEquals(receivedMessage, expectedMessage); |
| } |
| |
| producer.close(); |
| } |
| |
| @Test |
| public void testECDSAEncryption() throws Exception { |
| log.info("-- Starting {} test --", methodName); |
| |
| class EncKeyReader implements CryptoKeyReader { |
| |
| final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); |
| |
| @Override |
| public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { |
| String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; |
| if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { |
| try { |
| keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); |
| return keyInfo; |
| } catch (IOException e) { |
| Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); |
| } |
| } else { |
| Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); |
| } |
| return null; |
| } |
| |
| @Override |
| public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { |
| String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; |
| if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { |
| try { |
| keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); |
| return keyInfo; |
| } catch (IOException e) { |
| Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); |
| } |
| } else { |
| Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); |
| } |
| return null; |
| } |
| } |
| |
| final int totalMsg = 10; |
| |
| Set<String> messageSet = Sets.newHashSet(); |
| Reader<byte[]> reader = pulsarClient.newReader() |
| .topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1").startMessageId(MessageId.latest) |
| .cryptoKeyReader(new EncKeyReader()).create(); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1") |
| .addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()).create(); |
| for (int i = 0; i < totalMsg; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| Message<byte[]> msg = null; |
| |
| for (int i = 0; i < totalMsg; i++) { |
| msg = reader.readNext(5, TimeUnit.SECONDS); |
| String receivedMessage = new String(msg.getData()); |
| log.debug("Received message: [{}]", receivedMessage); |
| String expectedMessage = "my-message-" + i; |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); |
| } |
| producer.close(); |
| reader.close(); |
| log.info("-- Exiting {} test --", methodName); |
| } |
| |
| @Test |
| public void testMultiReaderECDSAEncryption() throws Exception { |
| log.info("-- Starting {} test --", methodName); |
| |
| class EncKeyReader implements CryptoKeyReader { |
| |
| final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); |
| |
| @Override |
| public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { |
| String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; |
| if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { |
| try { |
| keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); |
| return keyInfo; |
| } catch (IOException e) { |
| Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); |
| } |
| } else { |
| Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); |
| } |
| return null; |
| } |
| |
| @Override |
| public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { |
| String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; |
| if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { |
| try { |
| keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); |
| return keyInfo; |
| } catch (IOException e) { |
| Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); |
| } |
| } else { |
| Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); |
| } |
| return null; |
| } |
| } |
| |
| final int totalMsg = 10; |
| |
| Set<String> messageSet = Sets.newHashSet(); |
| String topic = "persistent://my-property/my-ns/test-multi-reader-myecdsa-topic1"; |
| admin.topics().createPartitionedTopic(topic, 3); |
| Reader<byte[]> reader = pulsarClient.newReader() |
| .topic(topic).startMessageId(MessageId.latest) |
| .cryptoKeyReader(new EncKeyReader()).create(); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()).create(); |
| for (int i = 0; i < totalMsg; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| Message<byte[]> msg = null; |
| |
| for (int i = 0; i < totalMsg; i++) { |
| msg = reader.readNext(5, TimeUnit.SECONDS); |
| String receivedMessage = new String(msg.getData()); |
| Assert.assertTrue(messageSet.add(receivedMessage), "Received duplicate message " + receivedMessage); |
| } |
| producer.close(); |
| reader.close(); |
| } |
| |
| @Test |
| public void testDefaultCryptoKeyReader() throws Exception { |
| final String topic = "persistent://my-property/my-ns/test-reader-default-crypto-key-reader" |
| + System.currentTimeMillis(); |
| final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; |
| final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem"; |
| final String ecdsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS0tLUVORCBQVUJMSUMgS0VZLS0tLS0K"; |
| final String ecdsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K"; |
| final String rsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-rsa.pem"; |
| final String rsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-rsa.pem"; |
| final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg=="; |
| final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg=="; |
| final int numMsg = 10; |
| |
| Map<String, String> privateKeyFileMap = Maps.newHashMap(); |
| privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile); |
| privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile); |
| Map<String, String> privateKeyDataMap = Maps.newHashMap(); |
| privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData); |
| privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData); |
| |
| Reader<byte[]> reader1 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) |
| .defaultCryptoKeyReader(ecdsaPrivateKeyFile).create(); |
| Reader<byte[]> reader2 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) |
| .defaultCryptoKeyReader(ecdsaPrivateKeyData).create(); |
| Reader<byte[]> reader3 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) |
| .defaultCryptoKeyReader(privateKeyFileMap).create(); |
| Reader<byte[]> reader4 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) |
| .defaultCryptoKeyReader(privateKeyDataMap).create(); |
| |
| Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem") |
| .defaultCryptoKeyReader(ecdsaPublicKeyFile).create(); |
| Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem") |
| .defaultCryptoKeyReader(ecdsaPublicKeyData).create(); |
| |
| for (int i = 0; i < numMsg; i++) { |
| producer1.send(("my-message-" + i).getBytes()); |
| } |
| for (int i = numMsg; i < numMsg * 2; i++) { |
| producer2.send(("my-message-" + i).getBytes()); |
| } |
| |
| producer1.close(); |
| producer2.close(); |
| |
| for (Reader<byte[]> reader : (List<Reader<byte[]>>) Lists.newArrayList(reader1, reader2)) { |
| for (int i = 0; i < numMsg * 2; i++) { |
| MessageImpl<byte[]> msg = (MessageImpl<byte[]>) reader.readNext(5, TimeUnit.SECONDS); |
| // verify that encrypted message contains encryption-context |
| msg.getEncryptionCtx().orElseThrow( |
| () -> new IllegalStateException("encryption-ctx not present for encrypted message")); |
| assertEquals(new String(msg.getData()), "my-message-" + i); |
| } |
| } |
| |
| reader1.close(); |
| reader2.close(); |
| |
| Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem") |
| .defaultCryptoKeyReader(rsaPublicKeyFile).create(); |
| Producer<byte[]> producer4 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem") |
| .defaultCryptoKeyReader(rsaPublicKeyData).create(); |
| |
| for (int i = numMsg * 2; i < numMsg * 3; i++) { |
| producer3.send(("my-message-" + i).getBytes()); |
| } |
| for (int i = numMsg * 3; i < numMsg * 4; i++) { |
| producer4.send(("my-message-" + i).getBytes()); |
| } |
| |
| producer3.close(); |
| producer4.close(); |
| |
| for (Reader<byte[]> reader : (List<Reader<byte[]>>) Lists.newArrayList(reader3, reader4)) { |
| for (int i = 0; i < numMsg * 4; i++) { |
| MessageImpl<byte[]> msg = (MessageImpl<byte[]>) reader.readNext(5, TimeUnit.SECONDS); |
| // verify that encrypted message contains encryption-context |
| msg.getEncryptionCtx().orElseThrow( |
| () -> new IllegalStateException("encryption-ctx not present for encrypted message")); |
| assertEquals(new String(msg.getData()), "my-message-" + i); |
| } |
| } |
| |
| reader3.close(); |
| reader4.close(); |
| } |
| |
| @Test |
| public void testSimpleReaderReachEndOfTopic() throws Exception { |
| Reader<byte[]> reader = pulsarClient.newReader() |
| .topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic") |
| .startMessageId(MessageId.earliest).create(); |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic") |
| .create(); |
| |
| // no data write, should return false |
| assertFalse(reader.hasMessageAvailable()); |
| |
| // produce message 0 -- 99 |
| for (int i = 0; i < 100; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| MessageImpl<byte[]> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| int index = 0; |
| |
| // read message till end. |
| while (reader.hasMessageAvailable()) { |
| msg = (MessageImpl<byte[]>) reader.readNext(1, TimeUnit.SECONDS); |
| String receivedMessage = new String(msg.getData()); |
| log.debug("Received message: [{}]", receivedMessage); |
| String expectedMessage = "my-message-" + (index++); |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); |
| } |
| |
| assertEquals(index, 100); |
| // readNext should return null, after reach the end of topic. |
| assertNull(reader.readNext(1, TimeUnit.SECONDS)); |
| |
| // produce message again. |
| for (int i = 100; i < 200; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| // read message till end again. |
| while (reader.hasMessageAvailable()) { |
| msg = (MessageImpl<byte[]>) reader.readNext(1, TimeUnit.SECONDS); |
| String receivedMessage = new String(msg.getData()); |
| log.debug("Received message: [{}]", receivedMessage); |
| String expectedMessage = "my-message-" + (index++); |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); |
| } |
| |
| assertEquals(index, 200); |
| // readNext should return null, after reach the end of topic. |
| assertNull(reader.readNext(1, TimeUnit.SECONDS)); |
| |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testSimpleMultiReaderReachEndOfTopic() throws Exception { |
| String topic = "persistent://my-property/my-ns/testSimpleMultiReaderReachEndOfTopic"; |
| admin.topics().createPartitionedTopic(topic,3); |
| Reader<byte[]> reader = pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create(); |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); |
| |
| // no data write, should return false |
| assertFalse(reader.hasMessageAvailable()); |
| |
| // produce message 0 -- 99 |
| for (int i = 0; i < 100; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| TopicMessageImpl<byte[]> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| int index = 0; |
| |
| // read message till end. |
| while (reader.hasMessageAvailable()) { |
| msg = (TopicMessageImpl<byte[]>) reader.readNext(1, TimeUnit.SECONDS); |
| String receivedMessage = new String(msg.getData()); |
| index++; |
| Assert.assertTrue(messageSet.add(receivedMessage), "Received duplicate message " + receivedMessage); |
| } |
| |
| assertEquals(index, 100); |
| // readNext should return null, after reach the end of topic. |
| assertNull(reader.readNext(1, TimeUnit.SECONDS)); |
| |
| // produce message again. |
| for (int i = 100; i < 200; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| // read message till end again. |
| while (reader.hasMessageAvailable()) { |
| msg = (TopicMessageImpl<byte[]>) reader.readNext(1, TimeUnit.SECONDS); |
| String receivedMessage = new String(msg.getData()); |
| index++; |
| Assert.assertTrue(messageSet.add(receivedMessage), "Received duplicate message " + receivedMessage); |
| } |
| |
| assertEquals(index, 200); |
| // readNext should return null, after reach the end of topic. |
| assertNull(reader.readNext(1, TimeUnit.SECONDS)); |
| |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testReaderReachEndOfTopicOnMessageWithBatches() throws Exception { |
| Reader<byte[]> reader = pulsarClient.newReader() |
| .topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches") |
| .startMessageId(MessageId.earliest).create(); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches") |
| .enableBatching(true).batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS).create(); |
| |
| // no data write, should return false |
| assertFalse(reader.hasMessageAvailable()); |
| |
| for (int i = 0; i < 100; i++) { |
| String message = "my-message-" + i; |
| producer.sendAsync(message.getBytes()); |
| } |
| |
| // Write one sync message to ensure everything before got persistend |
| producer.send("my-message-10".getBytes()); |
| |
| MessageId lastMessageId = null; |
| int index = 0; |
| assertTrue(reader.hasMessageAvailable()); |
| |
| if (reader.hasMessageAvailable()) { |
| Message<byte[]> msg = reader.readNext(); |
| lastMessageId = msg.getMessageId(); |
| assertEquals(lastMessageId.getClass(), BatchMessageIdImpl.class); |
| |
| while (msg != null) { |
| index++; |
| msg = reader.readNext(100, TimeUnit.MILLISECONDS); |
| } |
| assertEquals(index, 101); |
| } |
| |
| assertFalse(reader.hasMessageAvailable()); |
| |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testMultiReaderReachEndOfTopicOnMessageWithBatches() throws Exception { |
| String topic = "persistent://my-property/my-ns/testMultiReaderReachEndOfTopicOnMessageWithBatches"; |
| admin.topics().createPartitionedTopic(topic, 3); |
| Reader<byte[]> reader = pulsarClient.newReader() |
| .topic(topic) |
| .startMessageId(MessageId.earliest).create(); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(true).batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS).create(); |
| |
| // no data write, should return false |
| assertFalse(reader.hasMessageAvailable()); |
| |
| for (int i = 0; i < 100; i++) { |
| String message = "my-message-" + i; |
| producer.sendAsync(message.getBytes()); |
| } |
| |
| // Write one sync message to ensure everything before got persistend |
| producer.send("my-message-10".getBytes()); |
| |
| MessageId lastMessageId = null; |
| int index = 0; |
| assertTrue(reader.hasMessageAvailable()); |
| |
| if (reader.hasMessageAvailable()) { |
| Message<byte[]> msg = reader.readNext(); |
| lastMessageId = msg.getMessageId(); |
| assertEquals(lastMessageId.getClass(), TopicMessageIdImpl.class); |
| |
| while (msg != null) { |
| index++; |
| msg = reader.readNext(100, TimeUnit.MILLISECONDS); |
| } |
| assertEquals(index, 101); |
| } |
| |
| assertFalse(reader.hasMessageAvailable()); |
| |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testMessageAvailableAfterRestart() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/testMessageAvailableAfterRestart"; |
| String content = "my-message-1"; |
| |
| // stop retention from cleaning up |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close(); |
| |
| try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic) |
| .startMessageId(MessageId.earliest).create()) { |
| assertFalse(reader.hasMessageAvailable()); |
| } |
| |
| try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create()) { |
| producer.send(content.getBytes()); |
| } |
| |
| try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic) |
| .startMessageId(MessageId.earliest).create()) { |
| assertTrue(reader.hasMessageAvailable()); |
| } |
| |
| // cause broker to drop topic. Will be loaded next time we access it |
| pulsar.getBrokerService().getTopicReference(topic).get().close(false).get(); |
| |
| try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic) |
| .startMessageId(MessageId.earliest).create()) { |
| assertTrue(reader.hasMessageAvailable()); |
| |
| String readOut = new String(reader.readNext().getData()); |
| assertEquals(content, readOut); |
| assertFalse(reader.hasMessageAvailable()); |
| } |
| |
| } |
| |
| @Test |
| public void testMultiReaderMessageAvailableAfterRestart() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/testMessageAvailableAfterRestart2"; |
| String content = "my-message-1"; |
| admin.topics().createPartitionedTopic(topic, 3); |
| // stop retention from cleaning up |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub2").subscribe().close(); |
| |
| try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic) |
| .startMessageId(MessageId.earliest).create()) { |
| assertFalse(reader.hasMessageAvailable()); |
| } |
| |
| try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create()) { |
| producer.send(content.getBytes()); |
| } |
| |
| try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic) |
| .startMessageId(MessageId.earliest).create()) { |
| assertTrue(reader.hasMessageAvailable()); |
| } |
| |
| // cause broker to drop topic. Will be loaded next time we access it |
| pulsar.getBrokerService().getTopics().keys().forEach(topicName -> { |
| try { |
| pulsar.getBrokerService().getTopicReference(topicName).get().close(false).get(); |
| } catch (Exception e) { |
| fail(); |
| } |
| }); |
| |
| try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic) |
| .startMessageId(MessageId.earliest).create()) { |
| assertTrue(reader.hasMessageAvailable()); |
| |
| String readOut = new String(reader.readNext().getData()); |
| assertEquals(content, readOut); |
| assertFalse(reader.hasMessageAvailable()); |
| } |
| |
| } |
| |
| @Test(dataProvider = "variationsForHasMessageAvailable") |
| public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive) throws Exception { |
| final String topicName = "persistent://my-property/my-ns/HasMessageAvailable"; |
| final int numOfMessage = 100; |
| |
| ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer() |
| .topic(topicName); |
| |
| if (enableBatch) { |
| producerBuilder |
| .enableBatching(true) |
| .batchingMaxMessages(10); |
| } else { |
| producerBuilder |
| .enableBatching(false); |
| } |
| |
| Producer<byte[]> producer = producerBuilder.create(); |
| |
| CountDownLatch latch = new CountDownLatch(numOfMessage); |
| |
| List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>()); |
| |
| for (int i = 0; i < numOfMessage; i++) { |
| producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> { |
| if (e != null) { |
| Assert.fail(); |
| } else { |
| allIds.add(mid); |
| } |
| latch.countDown(); |
| }); |
| } |
| |
| latch.await(); |
| |
| allIds.sort(null); // make sure the largest mid appears at last. |
| |
| for (MessageId id : allIds) { |
| Reader<byte[]> reader; |
| |
| if (startInclusive) { |
| reader = pulsarClient.newReader().topic(topicName) |
| .startMessageId(id).startMessageIdInclusive().create(); |
| } else { |
| reader = pulsarClient.newReader().topic(topicName) |
| .startMessageId(id).create(); |
| } |
| if (startInclusive) { |
| assertTrue(reader.hasMessageAvailable()); |
| } else if (id != allIds.get(allIds.size() - 1)) { |
| assertTrue(reader.hasMessageAvailable()); |
| } else { |
| assertFalse(reader.hasMessageAvailable()); |
| } |
| reader.close(); |
| } |
| |
| producer.close(); |
| } |
| |
| @Test(timeOut = 20000) |
| public void testHasMessageAvailableWithBatch() throws Exception { |
| final String topicName = "persistent://my-property/my-ns/testHasMessageAvailableWithBatch"; |
| final int numOfMessage = 10; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .enableBatching(true) |
| .batchingMaxMessages(10) |
| .batchingMaxPublishDelay(2,TimeUnit.SECONDS) |
| .topic(topicName).create(); |
| |
| //For batch-messages with single message, the type of client messageId should be the same as that of broker |
| MessageIdImpl messageId = (MessageIdImpl) producer.send("msg".getBytes()); |
| assertTrue(messageId instanceof MessageIdImpl); |
| ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName) |
| .startMessageId(messageId).startMessageIdInclusive().create(); |
| MessageIdImpl lastMsgId = (MessageIdImpl) reader.getConsumer().getLastMessageId(); |
| assertTrue(messageId instanceof BatchMessageIdImpl); |
| assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId()); |
| assertEquals(lastMsgId.getEntryId(), messageId.getEntryId()); |
| reader.close(); |
| |
| CountDownLatch latch = new CountDownLatch(numOfMessage); |
| List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>()); |
| for (int i = 0; i < numOfMessage; i++) { |
| producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> { |
| if (e != null) { |
| Assert.fail(); |
| } else { |
| allIds.add(mid); |
| } |
| latch.countDown(); |
| }); |
| } |
| producer.flush(); |
| latch.await(); |
| producer.close(); |
| |
| //For batch-message with multi messages, the type of client messageId should be the same as that of broker |
| for (MessageId id : allIds) { |
| reader = (ReaderImpl<byte[]>) pulsarClient.newReader().topic(topicName) |
| .startMessageId(id).startMessageIdInclusive().create(); |
| if (id instanceof BatchMessageIdImpl) { |
| MessageId lastMessageId = reader.getConsumer().getLastMessageId(); |
| assertTrue(lastMessageId instanceof BatchMessageIdImpl); |
| log.info("id {} instance of BatchMessageIdImpl",id); |
| } else { |
| assertTrue(id instanceof MessageIdImpl); |
| MessageId lastMessageId = reader.getConsumer().getLastMessageId(); |
| assertTrue(lastMessageId instanceof MessageIdImpl); |
| log.info("id {} instance of MessageIdImpl",id); |
| } |
| reader.close(); |
| } |
| //For non-batch message, the type of client messageId should be the same as that of broker |
| producer = pulsarClient.newProducer() |
| .enableBatching(false).topic(topicName).create(); |
| messageId = (MessageIdImpl) producer.send("non-batch".getBytes()); |
| assertFalse(messageId instanceof BatchMessageIdImpl); |
| assertTrue(messageId instanceof MessageIdImpl); |
| reader = (ReaderImpl<byte[]>) pulsarClient.newReader().topic(topicName) |
| .startMessageId(messageId).create(); |
| MessageId lastMessageId = reader.getConsumer().getLastMessageId(); |
| assertFalse(lastMessageId instanceof BatchMessageIdImpl); |
| assertTrue(lastMessageId instanceof MessageIdImpl); |
| assertEquals(lastMessageId, messageId); |
| producer.close(); |
| reader.close(); |
| } |
| |
| @Test |
| public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception { |
| final int numOfMessage = 10; |
| final String topicName = "persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime"; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topicName).create(); |
| |
| for (int i = 0; i < numOfMessage; i++) { |
| producer.send(String.format("msg num %d", i).getBytes()); |
| } |
| |
| Reader<byte[]> reader = pulsarClient.newReader().topic(topicName) |
| .startMessageId(MessageId.earliest).create(); |
| assertTrue(reader.hasMessageAvailable()); |
| |
| reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m")); |
| |
| assertTrue(reader.hasMessageAvailable()); |
| |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testMultiReaderNonDurableIsAbleToSeekRelativeTime() throws Exception { |
| final int numOfMessage = 10; |
| final String topicName = "persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime"; |
| admin.topics().createPartitionedTopic(topicName, 3); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); |
| |
| for (int i = 0; i < numOfMessage; i++) { |
| producer.send(String.format("msg num %d", i).getBytes()); |
| } |
| |
| Reader<byte[]> reader = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create(); |
| assertTrue(reader.hasMessageAvailable()); |
| |
| reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m")); |
| |
| assertTrue(reader.hasMessageAvailable()); |
| |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws Exception { |
| final String topicName = "persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic"; |
| final int numOfMessage = 10; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topicName).create(); |
| |
| for (int i = 0; i < numOfMessage; i++) { |
| producer.send(String.format("msg num %d", i).getBytes()); |
| } |
| |
| Reader<byte[]> reader = pulsarClient.newReader().topic(topicName) |
| .startMessageId(MessageId.earliest).create(); |
| |
| assertTrue(reader.hasMessageAvailable()); |
| |
| // Read all messages the first time |
| Set<String> messageSetA = Sets.newHashSet(); |
| for (int i = 0; i < numOfMessage; i++) { |
| Message<byte[]> message = reader.readNext(); |
| String receivedMessage = new String(message.getData()); |
| String expectedMessage = String.format("msg num %d", i); |
| testMessageOrderAndDuplicates(messageSetA, receivedMessage, expectedMessage); |
| } |
| |
| assertFalse(reader.hasMessageAvailable()); |
| |
| // Perform cursor reset by time |
| reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m")); |
| |
| // Read all messages a second time after seek() |
| Set<String> messageSetB = Sets.newHashSet(); |
| for (int i = 0; i < numOfMessage; i++) { |
| Message<byte[]> message = reader.readNext(); |
| String receivedMessage = new String(message.getData()); |
| String expectedMessage = String.format("msg num %d", i); |
| testMessageOrderAndDuplicates(messageSetB, receivedMessage, expectedMessage); |
| } |
| |
| // Reader should be finished |
| assertTrue(reader.isConnected()); |
| assertFalse(reader.hasMessageAvailable()); |
| assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(), 0); |
| |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testMultiReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws Exception { |
| final String topicName = "persistent://my-property/my-ns/MultiReaderSeekWithTimeOnBeginningOfTopic"; |
| final int numOfMessage = 10; |
| admin.topics().createPartitionedTopic(topicName, 3); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); |
| |
| for (int i = 0; i < numOfMessage; i++) { |
| producer.send(String.format("msg num %d", i).getBytes()); |
| } |
| |
| Reader<byte[]> reader = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create(); |
| |
| assertTrue(reader.hasMessageAvailable()); |
| |
| // Read all messages the first time |
| Set<String> messageSetA = Sets.newHashSet(); |
| for (int i = 0; i < numOfMessage; i++) { |
| Message<byte[]> message = reader.readNext(); |
| String receivedMessage = new String(message.getData()); |
| Assert.assertTrue(messageSetA.add(receivedMessage), "Received duplicate message " + receivedMessage); |
| } |
| |
| assertFalse(reader.hasMessageAvailable()); |
| |
| // Perform cursor reset by time |
| reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m")); |
| |
| // Read all messages a second time after seek() |
| Set<String> messageSetB = Sets.newHashSet(); |
| for (int i = 0; i < numOfMessage; i++) { |
| Message<byte[]> message = reader.readNext(); |
| String receivedMessage = new String(message.getData()); |
| Assert.assertTrue(messageSetB.add(receivedMessage), "Received duplicate message " + receivedMessage); |
| } |
| |
| // Reader should be finished |
| assertTrue(reader.isConnected()); |
| assertFalse(reader.hasMessageAvailable()); |
| assertEquals(((MultiTopicsReaderImpl) reader).getMultiTopicsConsumer().numMessagesInQueue(), 0); |
| |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testReaderIsAbleToSeekWithMessageIdOnMiddleOfTopic() throws Exception { |
| final String topicName = "persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic"; |
| final int numOfMessage = 100; |
| final int halfMessages = numOfMessage / 2; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topicName).create(); |
| |
| for (int i = 0; i < numOfMessage; i++) { |
| producer.send(String.format("msg num %d", i).getBytes()); |
| } |
| |
| Reader<byte[]> reader = pulsarClient.newReader().topic(topicName) |
| .startMessageId(MessageId.earliest).create(); |
| |
| assertTrue(reader.hasMessageAvailable()); |
| |
| // Read all messages the first time |
| MessageId midmessageToSeek = null; |
| Set<String> messageSetA = Sets.newHashSet(); |
| for (int i = 0; i < numOfMessage; i++) { |
| Message<byte[]> message = reader.readNext(); |
| String receivedMessage = new String(message.getData()); |
| String expectedMessage = String.format("msg num %d", i); |
| testMessageOrderAndDuplicates(messageSetA, receivedMessage, expectedMessage); |
| |
| if (i == halfMessages) { |
| midmessageToSeek = message.getMessageId(); |
| } |
| } |
| |
| assertFalse(reader.hasMessageAvailable()); |
| |
| // Perform cursor reset by MessageId to half of the topic |
| reader.seek(midmessageToSeek); |
| |
| // Read all halved messages after seek() |
| Set<String> messageSetB = Sets.newHashSet(); |
| for (int i = halfMessages + 1; i < numOfMessage; i++) { |
| Message<byte[]> message = reader.readNext(); |
| String receivedMessage = new String(message.getData()); |
| String expectedMessage = String.format("msg num %d", i); |
| testMessageOrderAndDuplicates(messageSetB, receivedMessage, expectedMessage); |
| } |
| |
| // Reader should be finished |
| assertTrue(reader.isConnected()); |
| assertFalse(reader.hasMessageAvailable()); |
| assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(), 0); |
| |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception { |
| final String topicName = "persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic"; |
| final int numOfMessage = 10; |
| final int halfMessages = numOfMessage / 2; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topicName).create(); |
| |
| long l = System.currentTimeMillis(); |
| for (int i = 0; i < numOfMessage; i++) { |
| producer.send(String.format("msg num %d", i).getBytes()); |
| Thread.sleep(100); |
| } |
| |
| Reader<byte[]> reader = pulsarClient.newReader().topic(topicName) |
| .startMessageId(MessageId.earliest).create(); |
| |
| int plusTime = (halfMessages + 1) * 100; |
| reader.seek(l + plusTime); |
| |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = halfMessages + 1; i < numOfMessage; i++) { |
| Message<byte[]> message = reader.readNext(); |
| String receivedMessage = new String(message.getData()); |
| String expectedMessage = String.format("msg num %d", i); |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); |
| } |
| |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception { |
| final String topicName = "persistent://my-property/my-ns/testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic" + System.currentTimeMillis(); |
| final int numOfMessage = 10; |
| final int halfMessages = numOfMessage / 2; |
| admin.topics().createPartitionedTopic(topicName, 3); |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); |
| long halfTime = 0; |
| for (int i = 0; i < numOfMessage; i++) { |
| if (i == numOfMessage / 2) { |
| halfTime = System.currentTimeMillis(); |
| } |
| producer.send(String.format("msg num %d", i).getBytes()); |
| } |
| Assert.assertTrue(halfTime != 0); |
| Reader<byte[]> reader = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create(); |
| |
| reader.seek(halfTime); |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = halfMessages + 1; i < numOfMessage; i++) { |
| Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS); |
| String receivedMessage = new String(message.getData()); |
| Assert.assertTrue(messageSet.add(receivedMessage), "Received duplicate message " + receivedMessage); |
| } |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test(dataProvider = "variationsForExpectedPos") |
| public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages) |
| throws Exception { |
| final String topicName = "persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos"; |
| final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset |
| final int firstMessage = startInclusive ? resetIndex : resetIndex + 1; // First message of reset |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topicName) |
| .enableBatching(batching) |
| .create(); |
| |
| CountDownLatch latch = new CountDownLatch(numOfMessages); |
| |
| final AtomicReference<MessageId> resetPos = new AtomicReference<>(); |
| |
| for (int i = 0; i < numOfMessages; i++) { |
| |
| final int j = i; |
| |
| producer.sendAsync(String.format("msg num %d", i).getBytes()) |
| .thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId))) |
| .whenComplete((p, e) -> { |
| if (e != null) { |
| fail("send msg failed due to " + e.getMessage()); |
| } else { |
| if (p.getLeft() == resetIndex) { |
| resetPos.set(p.getRight()); |
| } |
| } |
| latch.countDown(); |
| }); |
| } |
| |
| latch.await(); |
| |
| ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader() |
| .topic(topicName) |
| .startMessageId(resetPos.get()); |
| |
| if (startInclusive) { |
| readerBuilder.startMessageIdInclusive(); |
| } |
| |
| Reader<byte[]> reader = readerBuilder.create(); |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = firstMessage; i < numOfMessages; i++) { |
| Message<byte[]> message = reader.readNext(); |
| String receivedMessage = new String(message.getData()); |
| String expectedMessage = String.format("msg num %d", i); |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); |
| } |
| |
| assertTrue(reader.isConnected()); |
| assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(), 0); |
| |
| // Processed messages should be the number of messages in the range: [FirstResetMessage..TotalNumOfMessages] |
| assertEquals(messageSet.size(), numOfMessages - firstMessage); |
| |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testReaderBuilderConcurrentCreate() throws Exception { |
| String topicName = "persistent://my-property/my-ns/testReaderBuilderConcurrentCreate_"; |
| int numTopic = 30; |
| ReaderBuilder<byte[]> builder = pulsarClient.newReader().startMessageId(MessageId.earliest); |
| |
| List<CompletableFuture<Reader<byte[]>>> readers = Lists.newArrayListWithExpectedSize(numTopic); |
| List<Producer<byte[]>> producers = Lists.newArrayListWithExpectedSize(numTopic); |
| // create producer firstly |
| for (int i = 0; i < numTopic; i++) { |
| producers.add(pulsarClient.newProducer() |
| .topic(topicName + i) |
| .create()); |
| } |
| |
| // create reader concurrently |
| for (int i = 0; i < numTopic; i++) { |
| readers.add(builder.clone().topic(topicName + i).createAsync()); |
| } |
| |
| // verify readers config are different for topic name. |
| for (int i = 0; i < numTopic; i++) { |
| assertEquals(readers.get(i).get().getTopic(), topicName + i); |
| readers.get(i).get().close(); |
| producers.get(i).close(); |
| } |
| } |
| |
| @Test(timeOut = 10000) |
| public void testMultiReaderBuilderConcurrentCreate() throws Exception { |
| String topicName = "persistent://my-property/my-ns/testMultiReaderBuilderConcurrentCreate_"; |
| int numTopic = 30; |
| ReaderBuilder<byte[]> builder = pulsarClient.newReader().startMessageId(MessageId.earliest); |
| |
| List<CompletableFuture<Reader<byte[]>>> readers = Lists.newArrayListWithExpectedSize(numTopic); |
| List<Producer<byte[]>> producers = Lists.newArrayListWithExpectedSize(numTopic); |
| // create producer firstly |
| for (int i = 0; i < numTopic; i++) { |
| admin.topics().createPartitionedTopic(topicName + i, 3); |
| producers.add(pulsarClient.newProducer() |
| .topic(topicName + i) |
| .create()); |
| } |
| |
| // create reader concurrently |
| for (int i = 0; i < numTopic; i++) { |
| readers.add(builder.clone().topic(topicName + i).createAsync()); |
| } |
| |
| // verify readers config are different for topic name. |
| for (int i = 0; i < numTopic; i++) { |
| assertTrue(readers.get(i).get().getTopic().startsWith(MultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX)); |
| readers.get(i).get().close(); |
| producers.get(i).close(); |
| } |
| } |
| |
| @Test |
| public void testReaderStartInMiddleOfBatch() throws Exception { |
| final String topicName = "persistent://my-property/my-ns/ReaderStartInMiddleOfBatch"; |
| final int numOfMessage = 100; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topicName) |
| .enableBatching(true) |
| .batchingMaxMessages(10) |
| .create(); |
| |
| CountDownLatch latch = new CountDownLatch(numOfMessage); |
| |
| List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>()); |
| |
| for (int i = 0; i < numOfMessage; i++) { |
| producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> { |
| if (e != null) { |
| fail(); |
| } else { |
| allIds.add(mid); |
| } |
| latch.countDown(); |
| }); |
| } |
| |
| latch.await(); |
| |
| for (MessageId id : allIds) { |
| Reader<byte[]> reader = pulsarClient.newReader().topic(topicName) |
| .startMessageId(id).startMessageIdInclusive().create(); |
| MessageId idGot = reader.readNext().getMessageId(); |
| assertEquals(idGot, id); |
| reader.close(); |
| } |
| |
| producer.close(); |
| } |
| |
| @Test |
| public void testHasMessageAvailableOnEmptyTopic() throws Exception { |
| String topic = newTopicName(); |
| |
| @Cleanup |
| Reader<String> r1 = pulsarClient.newReader(Schema.STRING) |
| .topic(topic) |
| .startMessageId(MessageId.earliest) |
| .create(); |
| |
| @Cleanup |
| Reader<String> r2 = pulsarClient.newReader(Schema.STRING) |
| .topic(topic) |
| .startMessageId(MessageId.latest) |
| .create(); |
| |
| @Cleanup |
| Reader<String> r2Inclusive = pulsarClient.newReader(Schema.STRING) |
| .topic(topic) |
| .startMessageId(MessageId.latest) |
| .startMessageIdInclusive() |
| .create(); |
| |
| // no data write, should return false |
| assertFalse(r1.hasMessageAvailable()); |
| assertFalse(r2.hasMessageAvailable()); |
| assertFalse(r2Inclusive.hasMessageAvailable()); |
| |
| @Cleanup |
| Producer<String> producer = pulsarClient.newProducer(Schema.STRING) |
| .topic(topic) |
| .create(); |
| |
| producer.send("hello-1"); |
| assertTrue(r1.hasMessageAvailable()); |
| assertTrue(r2.hasMessageAvailable()); |
| assertTrue(r2Inclusive.hasMessageAvailable()); |
| |
| @Cleanup |
| Reader<String> r3 = pulsarClient.newReader(Schema.STRING) |
| .topic(topic) |
| .startMessageId(MessageId.latest) |
| .create(); |
| |
| |
| assertFalse(r3.hasMessageAvailable()); |
| |
| producer.send("hello-2"); |
| |
| assertTrue(r1.hasMessageAvailable()); |
| assertTrue(r2.hasMessageAvailable()); |
| assertTrue(r2Inclusive.hasMessageAvailable()); |
| assertTrue(r3.hasMessageAvailable()); |
| } |
| } |