| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #include <gtest/gtest.h> |
| #include <pulsar/Client.h> |
| #include <boost/lexical_cast.hpp> |
| #include <lib/LogUtils.h> |
| #include <pulsar/MessageBuilder.h> |
| #include "DestinationName.h" |
| #include <lib/Commands.h> |
| #include <sstream> |
| #include "boost/date_time/posix_time/posix_time.hpp" |
| #include "CustomRoutingPolicy.h" |
| #include <boost/thread.hpp> |
| #include <boost/thread/mutex.hpp> |
| #include "PulsarFriend.h" |
| #include "HttpHelper.h" |
| #include <set> |
| #include <vector> |
| #include "lib/Future.h" |
| #include "lib/Utils.h" |
| DECLARE_LOG_OBJECT() |
| |
| using namespace pulsar; |
| |
| boost::mutex mutex_; |
| static int globalTestBatchMessagesCounter = 0; |
| static int globalCount = 0; |
| static long globalResendMessageCount = 0; |
| static std::string lookupUrl = "pulsar://localhost:8885"; |
| static std::string adminUrl = "http://localhost:8765/"; |
| |
| static void messageListenerFunction(Consumer consumer, const Message& msg) { |
| globalCount++; |
| consumer.acknowledge(msg); |
| } |
| |
| static void sendCallBack(Result r, const Message& msg, std::string prefix) { |
| ASSERT_EQ(r, ResultOk); |
| std::string messageContent = prefix + boost::lexical_cast<std::string>(globalTestBatchMessagesCounter++); |
| ASSERT_EQ(messageContent, msg.getDataAsString()); |
| LOG_DEBUG("Received publish acknowledgement for " << msg.getDataAsString()); |
| } |
| |
| static void sendCallBack(Result r, const Message& msg, std::string prefix, double percentage, uint64_t delayInMicros) { |
| if ((rand() % 100) <= percentage) { |
| usleep(delayInMicros); |
| } |
| sendCallBack(r, msg, prefix); |
| } |
| |
| TEST(BasicEndToEndTest, testBatchMessages) |
| { |
| ClientConfiguration config; |
| Client client(lookupUrl); |
| std::string topicName = "persistent://property/cluster/namespace/test-batch-messages"; |
| std::string subName = "subscription-name"; |
| Producer producer; |
| |
| // Enable batching on producer side |
| int batchSize = 2; |
| int numOfMessages = 1000; |
| |
| ProducerConfiguration conf; |
| conf.setCompressionType(CompressionLZ4); |
| conf.setBatchingMaxMessages(batchSize); |
| conf.setBatchingEnabled(true); |
| conf.setBlockIfQueueFull(true); |
| |
| Promise<Result, Producer> producerPromise; |
| client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise)); |
| Future<Result, Producer> producerFuture = producerPromise.getFuture(); |
| Result result = producerFuture.get(producer); |
| ASSERT_EQ(ResultOk, result); |
| |
| Consumer consumer; |
| Promise<Result, Consumer> consumerPromise; |
| client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise)); |
| Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); |
| result = consumerFuture.get(consumer); |
| ASSERT_EQ(ResultOk, result); |
| |
| // handling dangling subscriptions |
| consumer.unsubscribe(); |
| client.subscribe(topicName, subName, consumer); |
| |
| std::string temp = producer.getTopic(); |
| ASSERT_EQ(temp, topicName); |
| temp = consumer.getTopic(); |
| ASSERT_EQ(temp, topicName); |
| ASSERT_EQ(consumer.getSubscriptionName(), subName); |
| |
| // Send Asynchronously |
| std::string prefix = "msg-batch-"; |
| for (int i = 0; i<numOfMessages; i++) { |
| std::string messageContent = prefix + boost::lexical_cast<std::string>(i); |
| Message msg = MessageBuilder().setContent(messageContent).setProperty("msgIndex", boost::lexical_cast<std::string>(i)).build(); |
| producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix)); |
| LOG_DEBUG("sending message " << messageContent); |
| } |
| |
| Message receivedMsg; |
| int i = 0; |
| while (consumer.receive(receivedMsg, 5000) == ResultOk) { |
| std::string expectedMessageContent = prefix + boost::lexical_cast<std::string>(i); |
| LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]"); |
| ASSERT_EQ(receivedMsg.getProperty("msgIndex"), boost::lexical_cast<std::string>(i++)); |
| ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString()); |
| ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg)); |
| } |
| // Number of messages produced |
| ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages); |
| globalTestBatchMessagesCounter = 0; |
| // Number of messages consumed |
| ASSERT_EQ(i, numOfMessages); |
| |
| } |
| |
| void resendMessage(Result r, const Message msg, Producer producer) { |
| Lock lock(mutex_); |
| if (r != ResultOk) { |
| LOG_DEBUG("globalResendMessageCount" << globalResendMessageCount); |
| if (++globalResendMessageCount >= 3) { |
| return; |
| } |
| } |
| lock.unlock(); |
| producer.sendAsync(MessageBuilder().build(), |
| boost::bind(resendMessage, _1, _2, producer)); |
| } |
| |
| TEST(BasicEndToEndTest, testProduceConsume) |
| { |
| ClientConfiguration config; |
| Client client(lookupUrl); |
| std::string topicName = "persistent://prop/unit/ns1/my-topic"; |
| std::string subName = "my-sub-name"; |
| Producer producer; |
| |
| Promise<Result, Producer> producerPromise; |
| client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise)); |
| Future<Result, Producer> producerFuture = producerPromise.getFuture(); |
| Result result = producerFuture.get(producer); |
| ASSERT_EQ(ResultOk, result); |
| |
| Consumer consumer; |
| Promise<Result, Consumer> consumerPromise; |
| client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise)); |
| Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); |
| result = consumerFuture.get(consumer); |
| ASSERT_EQ(ResultOk, result); |
| std::string temp = producer.getTopic(); |
| ASSERT_EQ(temp, topicName); |
| temp = consumer.getTopic(); |
| ASSERT_EQ(temp, topicName); |
| ASSERT_EQ(consumer.getSubscriptionName(), subName); |
| |
| // Send synchronously |
| std::string content = "msg-1-content"; |
| Message msg = MessageBuilder().setContent(content).build(); |
| result = producer.send(msg); |
| ASSERT_EQ(ResultOk, result); |
| |
| Message receivedMsg; |
| consumer.receive(receivedMsg); |
| ASSERT_EQ(content, receivedMsg.getDataAsString()); |
| ASSERT_EQ(ResultOk, consumer.unsubscribe()); |
| ASSERT_EQ(ResultAlreadyClosed, consumer.close()); |
| ASSERT_EQ(ResultOk, producer.close()); |
| ASSERT_EQ(ResultOk, client.close()); |
| } |
| |
| TEST(BasicEndToEndTest, testLookupThrottling) |
| { |
| std::string topicName = "persistent://prop/unit/ns1/testLookupThrottling"; |
| ClientConfiguration config; |
| config.setConcurrentLookupRequest(0); |
| Client client(lookupUrl, config); |
| |
| Producer producer; |
| Result result = client.createProducer(topicName, producer); |
| ASSERT_EQ(ResultTooManyLookupRequestException, result); |
| |
| Consumer consumer1; |
| result = client.subscribe(topicName, "my-sub-name", consumer1); |
| ASSERT_EQ(ResultTooManyLookupRequestException, result); |
| |
| } |
| |
| TEST(BasicEndToEndTest, testNonExistingTopic) |
| { |
| Client client(lookupUrl); |
| Producer producer; |
| Result result = client.createProducer("persistent://prop/unit/ns1", producer); |
| ASSERT_EQ(ResultInvalidTopicName, result); |
| |
| Consumer consumer; |
| result = client.subscribe("persistent://prop/unit/ns1", "my-sub-name", consumer); |
| ASSERT_EQ(ResultInvalidTopicName, result); |
| } |
| |
| TEST(BasicEndToEndTest, testNonPersistentTopic) |
| { |
| std::string topicName = "non-persistent://prop/unit/ns1/testNonPersistentTopic"; |
| Client client(lookupUrl); |
| Producer producer; |
| Result result = client.createProducer(topicName, producer); |
| ASSERT_EQ(ResultInvalidTopicName, result); |
| |
| Consumer consumer; |
| result = client.subscribe(topicName, "my-sub-name", consumer); |
| ASSERT_EQ(ResultInvalidTopicName, result); |
| } |
| |
| TEST(BasicEndToEndTest, testSingleClientMultipleSubscriptions) |
| { |
| std::string topicName = "persistent://prop/unit/ns1/testSingleClientMultipleSubscriptions"; |
| |
| Client client(lookupUrl); |
| |
| Producer producer; |
| Result result = client.createProducer(topicName, producer); |
| ASSERT_EQ(ResultOk, result); |
| |
| Consumer consumer1; |
| result = client.subscribe(topicName, "my-sub-name", consumer1); |
| ASSERT_EQ(ResultOk, result); |
| |
| Consumer consumer2; |
| result = client.subscribe(topicName, "my-sub-name", consumer2); |
| ASSERT_EQ(ResultConsumerBusy, result); |
| //at this point connection gets destroyed because this consumer creation fails |
| } |
| |
| TEST(BasicEndToEndTest, testMultipleClientsMultipleSubscriptions) |
| { |
| std::string topicName = "persistent://prop/unit/ns1/testMultipleClientsMultipleSubscriptions"; |
| Client client1(lookupUrl); |
| Client client2(lookupUrl); |
| |
| Producer producer1; |
| Result result = client1.createProducer(topicName, producer1); |
| ASSERT_EQ(ResultOk, result); |
| |
| Consumer consumer1; |
| result = client1.subscribe(topicName, "my-sub-name", consumer1); |
| ASSERT_EQ(ResultOk, result); |
| |
| Consumer consumer2; |
| result = client2.subscribe(topicName, "my-sub-name", consumer2); |
| ASSERT_EQ(ResultConsumerBusy, result); |
| |
| ASSERT_EQ(ResultOk, producer1.close()); |
| ASSERT_EQ(ResultOk, consumer1.close()); |
| ASSERT_EQ(ResultAlreadyClosed, consumer1.close()); |
| ASSERT_EQ(ResultConsumerNotInitialized, consumer2.close()); |
| ASSERT_EQ(ResultOk, client1.close()); |
| |
| // 2 seconds |
| usleep(2 * 1000 * 1000); |
| |
| ASSERT_EQ(ResultOk, client2.close()); |
| } |
| |
| TEST(BasicEndToEndTest, testProduceAndConsumeAfterClientClose) |
| { |
| std::string topicName = "persistent://prop/unit/ns1/testProduceAndConsumeAfterClientClose"; |
| Client client(lookupUrl); |
| |
| Producer producer; |
| Result result = client.createProducer(topicName, producer); |
| ASSERT_EQ(ResultOk, result); |
| |
| Consumer consumer; |
| result = client.subscribe(topicName, "my-sub-name", consumer); |
| |
| // Clean dangling subscription |
| consumer.unsubscribe(); |
| result = client.subscribe(topicName, "my-sub-name", consumer); |
| |
| ASSERT_EQ(ResultOk, result); |
| |
| // Send 10 messages synchronously |
| std::string msgContent = "msg-content"; |
| LOG_INFO("Publishing 10 messages synchronously"); |
| int numMsg = 0; |
| for (; numMsg < 10; numMsg++) { |
| Message msg = MessageBuilder().setContent(msgContent).setProperty( |
| "msgIndex", boost::lexical_cast<std::string>(numMsg)).build(); |
| ASSERT_EQ(ResultOk, producer.send(msg)); |
| } |
| |
| LOG_INFO("Trying to receive 10 messages"); |
| Message msgReceived; |
| for (int i = 0; i < 10; i++) { |
| consumer.receive(msgReceived, 1000); |
| LOG_DEBUG("Received message :" << msgReceived.getMessageId()); |
| ASSERT_EQ(msgContent, msgReceived.getDataAsString()); |
| ASSERT_EQ(boost::lexical_cast<std::string>(i), msgReceived.getProperty("msgIndex")); |
| ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(msgReceived)); |
| } |
| |
| LOG_INFO("Closing client"); |
| ASSERT_EQ(ResultOk, client.close()); |
| |
| LOG_INFO("Trying to publish a message after closing the client"); |
| Message msg = MessageBuilder().setContent(msgContent).setProperty( |
| "msgIndex", boost::lexical_cast<std::string>(numMsg)).build(); |
| |
| ASSERT_EQ(ResultAlreadyClosed, producer.send(msg)); |
| |
| LOG_INFO("Trying to consume a message after closing the client"); |
| ASSERT_EQ(ResultAlreadyClosed, consumer.receive(msgReceived)); |
| } |
| |
| TEST(BasicEndToEndTest, testIamSoFancyCharactersInTopicName) |
| { |
| Client client(lookupUrl); |
| Producer producer; |
| Result result = client.createProducer("persistent://prop/unit/ns1/topic@%*)(&!%$#@#$><?", producer); |
| ASSERT_EQ(ResultOk, result); |
| |
| Consumer consumer; |
| result = client.subscribe("persistent://prop/unit/ns1/topic@%*)(&!%$#@#$><?", "my-sub-name", consumer); |
| ASSERT_EQ(ResultOk, result); |
| } |
| |
| TEST(BasicEndToEndTest, testSubscribeCloseUnsubscribeSherpaScenario) |
| { |
| ClientConfiguration config; |
| Client client(lookupUrl, config); |
| std::string topicName = "persistent://prop/unit/ns1/::,::bf11"; |
| std::string subName = "weird-ass-characters-@%*)(&!%$#@#$><?)"; |
| Producer producer; |
| Result result = client.createProducer(topicName, producer); |
| ASSERT_EQ(ResultOk, result); |
| |
| Consumer consumer; |
| result = client.subscribe(topicName, subName, consumer); |
| ASSERT_EQ(ResultOk, result); |
| |
| result = consumer.close(); |
| ASSERT_EQ(ResultOk,result); |
| |
| Consumer consumer1; |
| result = client.subscribe(topicName, subName, consumer1); |
| result = consumer1.unsubscribe(); |
| ASSERT_EQ(ResultOk, result); |
| } |
| |
| TEST(BasicEndToEndTest, testInvalidUrlPassed) |
| { |
| Client client("localhost:4080"); |
| std::string topicName = "persistent://prop/unit/ns1/testInvalidUrlPassed"; |
| std::string subName = "test-sub"; |
| Producer producer; |
| Result result = client.createProducer(topicName, producer); |
| ASSERT_EQ(ResultConnectError, result); |
| |
| Client client1("test://localhost"); |
| result = client1.createProducer(topicName, producer); |
| ASSERT_EQ(ResultConnectError, result); |
| |
| Client client2("test://:4080"); |
| result = client2.createProducer(topicName, producer); |
| ASSERT_EQ(ResultConnectError, result); |
| |
| Client client3(""); |
| result = client3.createProducer(topicName, producer); |
| ASSERT_EQ(ResultConnectError, result); |
| |
| Client client4("Dream of the day when this will be a valid URL"); |
| result = client4.createProducer(topicName, producer); |
| ASSERT_EQ(ResultConnectError, result); |
| } |
| |
| TEST(BasicEndToEndTest, testPartitionedProducerConsumer) |
| { |
| Client client(lookupUrl); |
| std::string topicName = "persistent://prop/unit/ns/testPartitionedProducerConsumer"; |
| |
| // call admin api to make it partitioned |
| std::string url = adminUrl + "admin/persistent/prop/unit/ns/partition-test/partitions"; |
| int res = makePutRequest(url, "3"); |
| |
| LOG_INFO("res = "<<res); |
| ASSERT_FALSE(res != 204 && res != 409); |
| |
| Producer producer; |
| Result result = client.createProducer(topicName, producer); |
| ASSERT_EQ(ResultOk, result); |
| |
| Consumer consumer; |
| result = client.subscribe(topicName, "subscription-A", consumer); |
| ASSERT_EQ(ResultOk, result); |
| |
| for (int i = 0; i < 10; i++ ) { |
| boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time()); |
| long nanoSeconds = t.time_of_day().total_nanoseconds(); |
| std::stringstream ss; |
| ss << nanoSeconds; |
| Message msg = MessageBuilder().setContent(ss.str()).setPartitionKey(ss.str()).build(); |
| ASSERT_EQ(ResultOk, producer.send(msg)); |
| LOG_DEBUG("Message Timestamp is " << msg.getPublishTimestamp()); |
| LOG_DEBUG("Message is " << msg); |
| } |
| |
| ASSERT_EQ(consumer.getSubscriptionName(), "subscription-A"); |
| for (int i = 0; i < 10; i++) { |
| Message m; |
| consumer.receive(m, 10000); |
| consumer.acknowledge(m); |
| } |
| client.shutdown(); |
| } |
| |
| TEST(BasicEndToEndTest, testMessageTooBig) |
| { |
| ClientConfiguration config; |
| Client client(lookupUrl); |
| std::string topicName = "persistent://prop/unit/ns1/testMessageTooBig"; |
| Producer producer; |
| Result result = client.createProducer(topicName, producer); |
| ASSERT_EQ(ResultOk, result); |
| |
| int size = Commands::MaxMessageSize + 1; |
| char* content = new char[size]; |
| Message msg = MessageBuilder().setAllocatedContent(content, size).build(); |
| result = producer.send(msg); |
| ASSERT_EQ(ResultMessageTooBig, result); |
| |
| // Anything up to MaxMessageSize should be allowed |
| size = Commands::MaxMessageSize; |
| msg = MessageBuilder().setAllocatedContent(content, size).build(); |
| result = producer.send(msg); |
| ASSERT_EQ(ResultOk, result); |
| |
| delete[] content; |
| } |
| |
| TEST(BasicEndToEndTest, testCompressionLZ4) |
| { |
| ClientConfiguration config; |
| Client client(lookupUrl); |
| std::string topicName = "persistent://prop/unit/namespace1/testCompressionLZ4"; |
| std::string subName = "my-sub-name"; |
| Producer producer; |
| ProducerConfiguration conf; |
| conf.setCompressionType(CompressionLZ4); |
| Result result = client.createProducer(topicName, conf, producer); |
| ASSERT_EQ(ResultOk, result); |
| |
| Consumer consumer; |
| client.subscribe(topicName, subName, consumer); |
| |
| // Send synchronously |
| std::string content1 = "msg-1-content"; |
| Message msg = MessageBuilder().setContent(content1).build(); |
| result = producer.send(msg); |
| ASSERT_EQ(ResultOk, result); |
| |
| std::string content2 = "msg-2-content"; |
| msg = MessageBuilder().setContent(content2).build(); |
| result = producer.send(msg); |
| ASSERT_EQ(ResultOk, result); |
| |
| Message receivedMsg; |
| consumer.receive(receivedMsg); |
| ASSERT_EQ(content1, receivedMsg.getDataAsString()); |
| |
| consumer.receive(receivedMsg); |
| ASSERT_EQ(content2, receivedMsg.getDataAsString()); |
| |
| ASSERT_EQ(ResultOk, consumer.unsubscribe()); |
| ASSERT_EQ(ResultAlreadyClosed, consumer.close()); |
| ASSERT_EQ(ResultOk, producer.close()); |
| ASSERT_EQ(ResultOk, client.close()); |
| } |
| |
| TEST(BasicEndToEndTest, testCompressionZLib) |
| { |
| ClientConfiguration config; |
| Client client(lookupUrl); |
| std::string topicName = "persistent://prop/unit/ns1/testCompressionZLib"; |
| std::string subName = "my-sub-name"; |
| Producer producer; |
| ProducerConfiguration conf; |
| conf.setCompressionType(CompressionZLib); |
| Result result = client.createProducer(topicName, conf, producer); |
| ASSERT_EQ(ResultOk, result); |
| |
| Consumer consumer; |
| client.subscribe(topicName, subName, consumer); |
| |
| // Send synchronously |
| std::string content1 = "msg-1-content"; |
| Message msg = MessageBuilder().setContent(content1).build(); |
| result = producer.send(msg); |
| ASSERT_EQ(ResultOk, result); |
| |
| std::string content2 = "msg-2-content"; |
| msg = MessageBuilder().setContent(content2).build(); |
| result = producer.send(msg); |
| ASSERT_EQ(ResultOk, result); |
| |
| Message receivedMsg; |
| consumer.receive(receivedMsg); |
| ASSERT_EQ(content1, receivedMsg.getDataAsString()); |
| |
| consumer.receive(receivedMsg); |
| ASSERT_EQ(content2, receivedMsg.getDataAsString()); |
| |
| ASSERT_EQ(ResultOk, consumer.unsubscribe()); |
| ASSERT_EQ(ResultAlreadyClosed, consumer.close()); |
| ASSERT_EQ(ResultOk, producer.close()); |
| ASSERT_EQ(ResultOk, client.close()); |
| } |
| |
| TEST(BasicEndToEndTest, testConfigurationFile) |
| { |
| ClientConfiguration config1; |
| config1.setOperationTimeoutSeconds(100); |
| config1.setIOThreads(10); |
| config1.setMessageListenerThreads(1); |
| config1.setLogConfFilePath("/tmp/"); |
| |
| ClientConfiguration config2 = config1; |
| AuthenticationDataPtr authData; |
| ASSERT_EQ(ResultOk, config1.getAuth().getAuthData(authData)); |
| ASSERT_EQ(100, config2.getOperationTimeoutSeconds()); |
| ASSERT_EQ(10, config2.getIOThreads()); |
| ASSERT_EQ(1, config2.getMessageListenerThreads()); |
| ASSERT_EQ(config2.getLogConfFilePath().compare("/tmp/"), 0); |
| |
| } |
| |
| TEST(BasicEndToEndTest, testSinglePartitionRoutingPolicy) |
| { |
| Client client(lookupUrl); |
| std::string topicName = "persistent://prop/unit/ns/partition-testSinglePartitionRoutingPolicy"; |
| |
| // call admin api to make it partitioned |
| std::string url = adminUrl + "admin/persistent/prop/unit/ns/partition-testSinglePartitionRoutingPolicy/partitions"; |
| int res = makePutRequest(url, "5"); |
| |
| LOG_INFO("res = "<<res); |
| ASSERT_FALSE(res != 204 && res != 409); |
| |
| Producer producer; |
| ProducerConfiguration producerConfiguration; |
| producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); |
| Result result = client.createProducer(topicName, producerConfiguration, producer); |
| |
| Consumer consumer; |
| result = client.subscribe(topicName, "subscription-A", consumer); |
| ASSERT_EQ(ResultOk, result); |
| |
| ASSERT_EQ(ResultOk, result); |
| for (int i = 0; i < 10; i++ ) { |
| boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time()); |
| long nanoSeconds = t.time_of_day().total_nanoseconds(); |
| std::stringstream ss; |
| ss << nanoSeconds; |
| Message msg = MessageBuilder().setContent(ss.str()).build(); |
| ASSERT_EQ(ResultOk, producer.send(msg)); |
| } |
| |
| for (int i = 0; i < 10; i++) { |
| Message m; |
| consumer.receive(m); |
| consumer.acknowledgeCumulative(m); |
| } |
| |
| consumer.close(); |
| producer.close(); |
| client.close(); |
| } |
| |
| TEST(BasicEndToEndTest, testNamespaceName) |
| { |
| boost::shared_ptr<NamespaceName> nameSpaceName = NamespaceName::get("property", "bf1", "nameSpace"); |
| ASSERT_STREQ(nameSpaceName->getCluster().c_str(), "bf1"); |
| ASSERT_STREQ(nameSpaceName->getLocalName().c_str(), "nameSpace"); |
| ASSERT_STREQ(nameSpaceName->getProperty().c_str(), "property"); |
| } |
| |
| TEST(BasicEndToEndTest, testConsumerClose) |
| { |
| ClientConfiguration config; |
| Client client(lookupUrl); |
| std::string topicName = "persistent://prop/unit/ns1/testConsumerClose"; |
| std::string subName = "my-sub-name"; |
| Consumer consumer; |
| ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer)); |
| ASSERT_EQ(consumer.close(), ResultOk); |
| ASSERT_EQ(consumer.close(), ResultAlreadyClosed); |
| } |
| |
| TEST(BasicEndToEndTest, testDuplicateConsumerCreationOnPartitionedTopic) |
| { |
| Client client(lookupUrl); |
| std::string topicName = "persistent://prop/unit/ns/partition-testDuplicateConsumerCreationOnPartitionedTopic"; |
| |
| // call admin api to make it partitioned |
| std::string url = adminUrl + "admin/persistent/prop/unit/ns/testDuplicateConsumerCreationOnPartitionedTopic/partitions"; |
| int res = makePutRequest(url, "5"); |
| |
| LOG_INFO("res = "<<res); |
| ASSERT_FALSE(res != 204 && res != 409); |
| |
| usleep(2 * 1000 * 1000); |
| |
| Producer producer; |
| ProducerConfiguration producerConfiguration; |
| producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::CustomPartition); |
| producerConfiguration.setMessageRouter(boost::make_shared<CustomRoutingPolicy>()); |
| |
| Result result = client.createProducer(topicName, producer); |
| ASSERT_EQ(ResultOk, result); |
| for (int i = 0; i < 10; i++ ) { |
| boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time()); |
| long nanoSeconds = t.time_of_day().total_nanoseconds(); |
| std::stringstream ss; |
| ss << nanoSeconds; |
| Message msg = MessageBuilder().setContent(ss.str()).build(); |
| ASSERT_EQ(ResultOk, producer.send(msg)); |
| } |
| |
| |
| LOG_INFO("Creating Subscriber"); |
| std::string consumerId = "CONSUMER"; |
| ConsumerConfiguration tempConsConfig; |
| tempConsConfig.setConsumerType(ConsumerExclusive); |
| ConsumerConfiguration consConfig = tempConsConfig; |
| ASSERT_EQ(consConfig.getConsumerType(), ConsumerExclusive); |
| Consumer consumer; |
| Result subscribeResult = client.subscribe(topicName, consumerId, |
| consConfig, consumer); |
| ASSERT_EQ(ResultOk, subscribeResult); |
| |
| LOG_INFO("Creating Another Subscriber"); |
| Consumer consumer2; |
| ASSERT_EQ(consumer2.getSubscriptionName(), ""); |
| subscribeResult = client.subscribe(topicName, consumerId, |
| consConfig, consumer2); |
| ASSERT_EQ(ResultConsumerBusy, subscribeResult); |
| consumer.close(); |
| producer.close(); |
| } |
| |
| TEST(BasicEndToEndTest, testRoundRobinRoutingPolicy) |
| { |
| Client client(lookupUrl); |
| std::string topicName = "persistent://prop/unit/ns/partition-testRoundRobinRoutingPolicy"; |
| // call admin api to make it partitioned |
| std::string url = adminUrl + "admin/persistent/prop/unit/ns/partition-testRoundRobinRoutingPolicy/partitions"; |
| int res = makePutRequest(url, "5"); |
| |
| LOG_INFO("res = "<<res); |
| ASSERT_FALSE(res != 204 && res != 409); |
| |
| Producer producer; |
| ProducerConfiguration tempProducerConfiguration; |
| tempProducerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); |
| ProducerConfiguration producerConfiguration = tempProducerConfiguration; |
| Result result = client.createProducer(topicName, producerConfiguration, producer); |
| ASSERT_EQ(ResultOk, result); |
| ASSERT_EQ(producer.getTopic(), topicName); |
| |
| // Topic is partitioned into 5 partitions so each partition will receive two messages |
| LOG_INFO("Creating Subscriber"); |
| std::string consumerId = "CONSUMER"; |
| ConsumerConfiguration consConfig; |
| consConfig.setConsumerType(ConsumerExclusive); |
| consConfig.setReceiverQueueSize(2); |
| ASSERT_FALSE(consConfig.hasMessageListener()); |
| Consumer consumer[5]; |
| Result subscribeResult; |
| for (int i = 0; i < 5; i++) { |
| std::stringstream partitionedTopicName; |
| partitionedTopicName << topicName << "-partition-" << i; |
| |
| std::stringstream partitionedConsumerId; |
| partitionedConsumerId << consumerId << i; |
| subscribeResult = client.subscribe(partitionedTopicName.str(), partitionedConsumerId.str(), consConfig, consumer[i]); |
| |
| ASSERT_EQ(ResultOk, subscribeResult); |
| ASSERT_EQ(consumer[i].getTopic(), partitionedTopicName.str()); |
| } |
| |
| for (int i = 0; i < 10; i++ ) { |
| boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time()); |
| long nanoSeconds = t.time_of_day().total_nanoseconds(); |
| std::stringstream ss; |
| ss << nanoSeconds; |
| Message msg = MessageBuilder().setContent(ss.str()).build(); |
| ASSERT_EQ(ResultOk, producer.send(msg)); |
| } |
| |
| Message m; |
| for (int i = 0; i < 2; i++) { |
| for (int partitionIndex = 0; partitionIndex < 5; partitionIndex++) { |
| ASSERT_EQ(ResultOk, consumer[partitionIndex].receive(m)); |
| ASSERT_EQ(ResultOk, consumer[partitionIndex].acknowledge(m)); |
| } |
| } |
| |
| for (int partitionIndex = 0; partitionIndex < 5; partitionIndex++) { |
| consumer[partitionIndex].close(); |
| } |
| producer.close(); |
| client.shutdown(); |
| |
| } |
| |
| TEST(BasicEndToEndTest, testMessageListener) |
| { |
| Client client(lookupUrl); |
| std::string topicName = "persistent://prop/unit/ns/partition-testMessageListener"; |
| // call admin api to make it partitioned |
| std::string url = adminUrl + "admin/persistent/prop/unit/ns/partition-testMessageListener/partitions"; |
| int res = makePutRequest(url, "5"); |
| |
| LOG_INFO("res = "<<res); |
| ASSERT_FALSE(res != 204 && res != 409); |
| |
| Producer producer; |
| ProducerConfiguration producerConfiguration; |
| producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); |
| Result result = client.createProducer(topicName, producerConfiguration, producer); |
| |
| // Initializing global Count |
| globalCount = 0; |
| |
| ConsumerConfiguration consumerConfig; |
| consumerConfig.setMessageListener(boost::bind(messageListenerFunction, _1, _2)); |
| Consumer consumer; |
| result = client.subscribe(topicName, "subscription-A", consumerConfig, consumer); |
| |
| ASSERT_EQ(ResultOk, result); |
| for (int i = 0; i < 10; i++ ) { |
| boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time()); |
| long nanoSeconds = t.time_of_day().total_nanoseconds(); |
| std::stringstream ss; |
| ss << nanoSeconds; |
| Message msg = MessageBuilder().setContent(ss.str()).setPartitionKey(ss.str()).build(); |
| ASSERT_EQ(ResultOk, producer.send(msg)); |
| } |
| |
| // Sleeping for 5 seconds |
| usleep(5 * 1000 * 1000); |
| ASSERT_EQ(globalCount, 10); |
| consumer.close(); |
| producer.close(); |
| client.close(); |
| } |
| |
| TEST(BasicEndToEndTest, testMessageListenerPause) |
| { |
| Client client(lookupUrl); |
| std::string topicName = "persistent://property/cluster/namespace/partition-testMessageListenerPause"; |
| |
| // call admin api to make it partitioned |
| std::string url = adminUrl + "admin/persistent/property/cluster/namespace/partition-testMessageListener-pauses/partitions"; |
| int res = makePutRequest(url, "5"); |
| |
| LOG_INFO("res = "<<res); |
| ASSERT_FALSE(res != 204 && res != 409); |
| |
| |
| Producer producer; |
| ProducerConfiguration producerConfiguration; |
| producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); |
| Result result = client.createProducer(topicName, producerConfiguration, producer); |
| |
| // Initializing global Count |
| globalCount = 0; |
| |
| ConsumerConfiguration consumerConfig; |
| consumerConfig.setMessageListener(boost::bind(messageListenerFunction, _1, _2)); |
| Consumer consumer; |
| // Removing dangling subscription from previous test failures |
| result = client.subscribe(topicName, "subscription-name", consumerConfig, consumer); |
| consumer.unsubscribe(); |
| |
| result = client.subscribe(topicName, "subscription-name", consumerConfig, consumer); |
| ASSERT_EQ(ResultOk, result); |
| int temp = 1000; |
| for (int i = 0; i < 10000; i++ ) { |
| if(i && i%1000 == 0) { |
| usleep(2 * 1000 * 1000); |
| ASSERT_EQ(globalCount, temp); |
| consumer.resumeMessageListener(); |
| usleep(2 * 1000 * 1000); |
| ASSERT_EQ(globalCount, i); |
| temp = globalCount; |
| consumer.pauseMessageListener(); |
| } |
| Message msg = MessageBuilder().build(); |
| ASSERT_EQ(ResultOk, producer.send(msg)); |
| } |
| |
| ASSERT_EQ(globalCount, temp); |
| consumer.resumeMessageListener(); |
| // Sleeping for 2 seconds |
| usleep(2 * 1000 * 1000); |
| |
| ASSERT_EQ(globalCount, 10000); |
| consumer.close(); |
| producer.close(); |
| client.close(); |
| } |
| |
| TEST(BasicEndToEndTest, testResendViaSendCallback) |
| { |
| ClientConfiguration clientConfiguration; |
| clientConfiguration.setIOThreads(1); |
| Client client(lookupUrl, clientConfiguration); |
| std::string topicName = "persistent://my-property/my-cluster/my-namespace/testResendViaListener"; |
| |
| Producer producer; |
| |
| Promise<Result, Producer> producerPromise; |
| ProducerConfiguration producerConfiguration; |
| |
| // Setting timeout of 1 ms |
| producerConfiguration.setSendTimeout(1); |
| client.createProducerAsync(topicName, producerConfiguration, WaitForCallbackValue<Producer>(producerPromise)); |
| Future<Result, Producer> producerFuture = producerPromise.getFuture(); |
| Result result = producerFuture.get(producer); |
| ASSERT_EQ(ResultOk, result); |
| |
| // Send asynchronously for 3 seconds |
| // Expect timeouts since we have set timeout to 1 ms |
| // On receiving timeout send the message using the CMS client IO thread via cb function. |
| for (int i = 0; i<10000; i++) { |
| producer.sendAsync(MessageBuilder().build(), boost::bind(resendMessage, _1, _2, producer)); |
| } |
| // 3 seconds |
| usleep(3 * 1000 * 1000); |
| producer.close(); |
| Lock lock(mutex_); |
| ASSERT_GE(globalResendMessageCount, 3); |
| } |
| |
| TEST(BasicEndToEndTest, testStatsLatencies) |
| { |
| ClientConfiguration config; |
| config.setIOThreads(1); |
| config.setMessageListenerThreads(1); |
| config.setStatsIntervalInSeconds(5); |
| Client client(lookupUrl, config); |
| std::string topicName = "persistent://property/cluster/namespace/testStatsLatencies"; |
| std::string subName = "subscription-name"; |
| Producer producer; |
| |
| // Start Producer and Consumer |
| int numOfMessages = 1000; |
| |
| Promise<Result, Producer> producerPromise; |
| client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise)); |
| Future<Result, Producer> producerFuture = producerPromise.getFuture(); |
| Result result = producerFuture.get(producer); |
| ASSERT_EQ(ResultOk, result); |
| |
| Consumer consumer; |
| Promise<Result, Consumer> consumerPromise; |
| client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise)); |
| Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); |
| result = consumerFuture.get(consumer); |
| ASSERT_EQ(ResultOk, result); |
| |
| // handling dangling subscriptions |
| consumer.unsubscribe(); |
| client.subscribe(topicName, subName, consumer); |
| |
| std::string temp = producer.getTopic(); |
| ASSERT_EQ(temp, topicName); |
| temp = consumer.getTopic(); |
| ASSERT_EQ(temp, topicName); |
| ASSERT_EQ(consumer.getSubscriptionName(), subName); |
| |
| ProducerStatsImplPtr producerStatsImplPtr = PulsarFriend::getProducerStatsPtr(producer); |
| |
| // Send Asynchronously |
| std::string prefix = "msg-stats-"; |
| for (int i = 0; i < numOfMessages; i++) { |
| std::string messageContent = prefix + boost::lexical_cast<std::string>(i); |
| Message msg = MessageBuilder().setContent(messageContent).setProperty( |
| "msgIndex", boost::lexical_cast<std::string>(i)).build(); |
| producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, 15, 20 * 1e3)); |
| LOG_DEBUG("sending message " << messageContent); |
| } |
| |
| // Wait for all messages to be acked by broker |
| while (PulsarFriend::sum(producerStatsImplPtr->getTotalSendMap()) < numOfMessages) { |
| usleep(1000); // 1 ms |
| } |
| |
| // Get latencies |
| LatencyAccumulator totalLatencyAccumulator = |
| producerStatsImplPtr->getTotalLatencyAccumulator(); |
| boost::accumulators::detail::extractor_result<LatencyAccumulator, |
| boost::accumulators::tag::extended_p_square>::type totalLatencies = |
| boost::accumulators::extended_p_square(totalLatencyAccumulator); |
| |
| LatencyAccumulator latencyAccumulator = producerStatsImplPtr->getLatencyAccumulator(); |
| boost::accumulators::detail::extractor_result<LatencyAccumulator, |
| boost::accumulators::tag::extended_p_square>::type latencies = |
| boost::accumulators::extended_p_square(latencyAccumulator); |
| |
| // Since 15% of the messages have a delay of |
| ASSERT_EQ((uint64_t )latencies[1], (uint64_t )totalLatencies[1]); |
| ASSERT_EQ((uint64_t )latencies[2], (uint64_t )totalLatencies[2]); |
| ASSERT_EQ((uint64_t )latencies[3], (uint64_t )totalLatencies[3]); |
| |
| ASSERT_GE((uint64_t )latencies[1], 20 * 1000); |
| ASSERT_GE((uint64_t )latencies[2], 20 * 1000); |
| ASSERT_GE((uint64_t )latencies[3], 20 * 1000); |
| |
| ASSERT_GE((uint64_t )totalLatencies[1], 20 * 1000); |
| ASSERT_GE((uint64_t )totalLatencies[2], 20 * 1000); |
| ASSERT_GE((uint64_t )totalLatencies[3], 20 * 1000); |
| |
| while (producerStatsImplPtr->getNumMsgsSent() != 0) { |
| usleep(1e6); // wait till stats flush |
| } |
| |
| usleep(1 * 1e6); // 1 second |
| |
| latencyAccumulator = producerStatsImplPtr->getLatencyAccumulator(); |
| latencies = boost::accumulators::extended_p_square(latencyAccumulator); |
| |
| totalLatencyAccumulator = producerStatsImplPtr->getTotalLatencyAccumulator(); |
| totalLatencies = boost::accumulators::extended_p_square(totalLatencyAccumulator); |
| |
| ASSERT_NE((uint64_t )latencies[1], (uint64_t )totalLatencies[1]); |
| ASSERT_NE((uint64_t )latencies[2], (uint64_t )totalLatencies[2]); |
| ASSERT_NE((uint64_t )latencies[3], (uint64_t )totalLatencies[3]); |
| |
| ASSERT_EQ((uint64_t )latencies[1], 0); |
| ASSERT_EQ((uint64_t )latencies[2], 0); |
| ASSERT_EQ((uint64_t )latencies[3], 0); |
| |
| ASSERT_GE((uint64_t )totalLatencies[1], 20 * 1000); |
| ASSERT_GE((uint64_t )totalLatencies[2], 20 * 1000); |
| ASSERT_GE((uint64_t )totalLatencies[3], 20 * 1000); |
| |
| Message receivedMsg; |
| int i = 0; |
| ConsumerStatsImplPtr consumerStatsImplPtr = PulsarFriend::getConsumerStatsPtr(consumer); |
| |
| while (consumer.receive(receivedMsg, 5000) == ResultOk) { |
| std::string expectedMessageContent = prefix + boost::lexical_cast<std::string>(i); |
| LOG_DEBUG( |
| "Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]"); |
| ASSERT_EQ(receivedMsg.getProperty("msgIndex"), boost::lexical_cast<std::string>(i++)); |
| ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalReceivedMsgMap()), i); |
| ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString()); |
| ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalAckedMsgMap()), i - 1); |
| ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg)); |
| ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalAckedMsgMap()), i); |
| } |
| // Number of messages consumed |
| ASSERT_EQ(i, numOfMessages); |
| } |
| |
| TEST(BasicEndToEndTest, testProduceMessageSize) { |
| ClientConfiguration config; |
| Client client(lookupUrl); |
| std::string topicName = "persistent://prop/unit/ns1/testProduceMessageSize"; |
| std::string subName = "my-sub-name"; |
| Producer producer1; |
| Producer producer2; |
| |
| Promise<Result, Producer> producerPromise; |
| client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise)); |
| Future<Result, Producer> producerFuture = producerPromise.getFuture(); |
| Result result = producerFuture.get(producer1); |
| ASSERT_EQ(ResultOk, result); |
| |
| Promise<Result, Producer> producerPromise2; |
| ProducerConfiguration conf; |
| conf.setCompressionType(CompressionLZ4); |
| client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise2)); |
| producerFuture = producerPromise2.getFuture(); |
| result = producerFuture.get(producer2); |
| ASSERT_EQ(ResultOk, result); |
| |
| int size = Commands::MaxMessageSize + 1; |
| char* content = new char[size]; |
| Message msg = MessageBuilder().setAllocatedContent(content, size).build(); |
| result = producer1.send(msg); |
| ASSERT_EQ(ResultMessageTooBig, result); |
| |
| Consumer consumer; |
| Promise<Result, Consumer> consumerPromise; |
| client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise)); |
| Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); |
| result = consumerFuture.get(consumer); |
| ASSERT_EQ(ResultOk, result); |
| |
| msg = MessageBuilder().setAllocatedContent(content, size).build(); |
| result = producer2.send(msg); |
| ASSERT_EQ(ResultOk, result); |
| |
| Message receivedMsg; |
| consumer.receive(receivedMsg); |
| ASSERT_EQ(size, receivedMsg.getDataAsString().length()); |
| |
| producer1.closeAsync(0); |
| producer2.closeAsync(0); |
| consumer.close(); |
| client.close(); |
| |
| delete[] content; |
| } |
| |
| TEST(BasicEndToEndTest, testHandlerReconnectionLogic) { |
| Client client(adminUrl); |
| std::string topicName = "persistent://prop/unit/ns1/testHandlerReconnectionLogic"; |
| |
| Producer producer; |
| Consumer consumer; |
| |
| ASSERT_EQ(client.subscribe(topicName, "my-sub", consumer), ResultOk); |
| ASSERT_EQ(client.createProducer(topicName, producer), ResultOk); |
| |
| std::vector<ClientConnectionPtr> oldConnections; |
| |
| int numOfMessages = 10; |
| std::string propertyName = "msgIndex"; |
| for (int i = 0; i<numOfMessages; i++) { |
| std::string messageContent = "msg-" + boost::lexical_cast<std::string>(i); |
| Message msg = MessageBuilder().setContent(messageContent).setProperty( |
| propertyName, boost::lexical_cast<std::string>(i)).build(); |
| if (i % 3 == 1) { |
| ProducerImpl& pImpl = PulsarFriend::getProducerImpl(producer); |
| ClientConnectionPtr clientConnectionPtr; |
| do { |
| ClientConnectionWeakPtr clientConnectionWeakPtr = PulsarFriend::getClientConnection(pImpl); |
| clientConnectionPtr = clientConnectionWeakPtr.lock(); |
| usleep(1 * 1e6); |
| } while(!clientConnectionPtr); |
| oldConnections.push_back(clientConnectionPtr); |
| clientConnectionPtr->close(); |
| } |
| ASSERT_EQ(producer.send(msg), ResultOk); |
| } |
| |
| std::set<std::string> receivedMsgContent; |
| std::set<std::string> receivedMsgIndex; |
| |
| Message msg; |
| while(consumer.receive(msg, 30000) == ResultOk) { |
| receivedMsgContent.insert(msg.getDataAsString()); |
| receivedMsgIndex.insert(msg.getProperty(propertyName)); |
| } |
| |
| ConsumerImpl& cImpl = PulsarFriend::getConsumerImpl(consumer); |
| ClientConnectionWeakPtr clientConnectionWeakPtr = PulsarFriend::getClientConnection(cImpl); |
| ClientConnectionPtr clientConnectionPtr = clientConnectionWeakPtr.lock(); |
| oldConnections.push_back(clientConnectionPtr); |
| clientConnectionPtr->close(); |
| |
| while(consumer.receive(msg, 30000) == ResultOk) { |
| consumer.acknowledge(msg); |
| receivedMsgContent.insert(msg.getDataAsString()); |
| receivedMsgIndex.insert(msg.getProperty(propertyName)); |
| } |
| |
| ASSERT_EQ(receivedMsgContent.size(), 10); |
| ASSERT_EQ(receivedMsgIndex.size(), 10); |
| |
| for (int i = 0; i<numOfMessages; i++) { |
| ASSERT_TRUE(receivedMsgContent.find("msg-" + boost::lexical_cast<std::string>(i)) != receivedMsgContent.end()); |
| ASSERT_TRUE(receivedMsgIndex.find(boost::lexical_cast<std::string>(i)) != receivedMsgIndex.end()); |
| } |
| } |