blob: f3785011f2094187b27c78da8da97e91324c1615 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <lib/LogUtils.h>
#include <pulsar/Client.h>
#include <climits>
#include "NoOpsCryptoKeyReader.h"
DECLARE_LOG_OBJECT()
#include <pulsar/DeadLetterPolicyBuilder.h>
#include "../lib/Future.h"
#include "../lib/Utils.h"
using namespace pulsar;
class DummyEventListener : public ConsumerEventListener {
public:
virtual void becameActive(Consumer consumer, int partitionId) override {}
virtual void becameInactive(Consumer consumer, int partitionId) override {}
};
TEST(ConsumerConfigurationTest, testDefaultConfig) {
ConsumerConfiguration conf;
ASSERT_EQ(conf.getSchema().getSchemaType(), SchemaType::BYTES);
ASSERT_EQ(conf.getConsumerType(), ConsumerExclusive);
ASSERT_EQ(conf.hasMessageListener(), false);
ASSERT_EQ(conf.hasConsumerEventListener(), false);
ASSERT_EQ(conf.getReceiverQueueSize(), 1000);
ASSERT_EQ(conf.getMaxTotalReceiverQueueSizeAcrossPartitions(), 50000);
ASSERT_EQ(conf.getConsumerName(), "");
ASSERT_EQ(conf.getUnAckedMessagesTimeoutMs(), 0);
ASSERT_EQ(conf.getTickDurationInMs(), 1000);
ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 60000);
ASSERT_EQ(conf.getAckGroupingTimeMs(), 100);
ASSERT_EQ(conf.getAckGroupingMaxSize(), 1000);
ASSERT_EQ(conf.getBrokerConsumerStatsCacheTimeInMs(), 30000);
ASSERT_EQ(conf.isReadCompacted(), false);
ASSERT_EQ(conf.getPatternAutoDiscoveryPeriod(), 60);
ASSERT_EQ(conf.getSubscriptionInitialPosition(), InitialPositionLatest);
ASSERT_EQ(conf.getCryptoKeyReader(), CryptoKeyReaderPtr{});
ASSERT_EQ(conf.getCryptoFailureAction(), ConsumerCryptoFailureAction::FAIL);
ASSERT_EQ(conf.isEncryptionEnabled(), false);
ASSERT_EQ(conf.isReplicateSubscriptionStateEnabled(), false);
ASSERT_EQ(conf.getProperties().empty(), true);
ASSERT_EQ(conf.getPriorityLevel(), 0);
ASSERT_EQ(conf.getMaxPendingChunkedMessage(), 10);
ASSERT_EQ(conf.isAutoAckOldestChunkedMessageOnQueueFull(), false);
ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumMessages(), -1);
ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumBytes(), 10 * 1024 * 1024);
ASSERT_EQ(conf.getBatchReceivePolicy().getTimeoutMs(), 100);
ASSERT_EQ(conf.isBatchIndexAckEnabled(), false);
ASSERT_EQ(conf.isAckReceiptEnabled(), false);
}
TEST(ConsumerConfigurationTest, testCustomConfig) {
ConsumerConfiguration conf;
const std::string exampleSchema =
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
const SchemaInfo schema(AVRO, "Avro", exampleSchema, StringMap{{"schema-key", "schema-value"}});
conf.setSchema(schema);
ASSERT_EQ(conf.getSchema().getName(), schema.getName());
ASSERT_EQ(conf.getSchema().getSchemaType(), schema.getSchemaType());
ASSERT_EQ(conf.getSchema().getSchema(), schema.getSchema());
ASSERT_EQ(conf.getSchema().getProperties(), schema.getProperties());
conf.setConsumerType(ConsumerKeyShared);
ASSERT_EQ(conf.getConsumerType(), ConsumerKeyShared);
conf.setMessageListener([](Consumer consumer, const Message& msg) {});
ASSERT_EQ(conf.hasMessageListener(), true);
conf.setConsumerEventListener(std::make_shared<DummyEventListener>());
ASSERT_EQ(conf.hasConsumerEventListener(), true);
conf.setReceiverQueueSize(2000);
ASSERT_EQ(conf.getReceiverQueueSize(), 2000);
conf.setMaxTotalReceiverQueueSizeAcrossPartitions(100000);
ASSERT_EQ(conf.getMaxTotalReceiverQueueSizeAcrossPartitions(), 100000);
conf.setConsumerName("consumer");
ASSERT_EQ(conf.getConsumerName(), "consumer");
conf.setUnAckedMessagesTimeoutMs(20000);
ASSERT_EQ(conf.getUnAckedMessagesTimeoutMs(), 20000);
conf.setTickDurationInMs(2000);
ASSERT_EQ(conf.getTickDurationInMs(), 2000);
conf.setNegativeAckRedeliveryDelayMs(10000);
ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 10000);
conf.setAckGroupingTimeMs(200);
ASSERT_EQ(conf.getAckGroupingTimeMs(), 200);
conf.setAckGroupingMaxSize(2000);
ASSERT_EQ(conf.getAckGroupingMaxSize(), 2000);
conf.setBrokerConsumerStatsCacheTimeInMs(60000);
ASSERT_EQ(conf.getBrokerConsumerStatsCacheTimeInMs(), 60000);
conf.setReadCompacted(true);
ASSERT_EQ(conf.isReadCompacted(), true);
conf.setPatternAutoDiscoveryPeriod(120);
ASSERT_EQ(conf.getPatternAutoDiscoveryPeriod(), 120);
conf.setSubscriptionInitialPosition(InitialPositionEarliest);
ASSERT_EQ(conf.getSubscriptionInitialPosition(), InitialPositionEarliest);
const auto cryptoKeyReader = std::make_shared<NoOpsCryptoKeyReader>();
conf.setCryptoKeyReader(cryptoKeyReader);
ASSERT_EQ(conf.getCryptoKeyReader(), cryptoKeyReader);
// NOTE: once CryptoKeyReader was set, the isEncryptionEnabled() would return true, it's different from
// ProducerConfiguration
ASSERT_EQ(conf.isEncryptionEnabled(), true);
conf.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME);
ASSERT_EQ(conf.getCryptoFailureAction(), ConsumerCryptoFailureAction::CONSUME);
conf.setReplicateSubscriptionStateEnabled(true);
ASSERT_EQ(conf.isReplicateSubscriptionStateEnabled(), true);
conf.setProperty("k1", "v1");
ASSERT_EQ(conf.getProperties()["k1"], "v1");
ASSERT_EQ(conf.hasProperty("k1"), true);
std::map<std::string, std::string> subscriptionProperties = {{"k1", "v1"}};
conf.setSubscriptionProperties(subscriptionProperties);
ASSERT_EQ(conf.getSubscriptionProperties()["k1"], "v1");
conf.setPriorityLevel(1);
ASSERT_EQ(conf.getPriorityLevel(), 1);
conf.setMaxPendingChunkedMessage(500);
ASSERT_EQ(conf.getMaxPendingChunkedMessage(), 500);
conf.setAutoAckOldestChunkedMessageOnQueueFull(true);
ASSERT_TRUE(conf.isAutoAckOldestChunkedMessageOnQueueFull());
conf.setBatchReceivePolicy(BatchReceivePolicy(10, 10, 100));
ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumMessages(), 10);
ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumBytes(), 10);
ASSERT_EQ(conf.getBatchReceivePolicy().getTimeoutMs(), 100);
conf.setBatchIndexAckEnabled(true);
ASSERT_EQ(conf.isBatchIndexAckEnabled(), true);
conf.setAckReceiptEnabled(true);
ASSERT_TRUE(conf.isAckReceiptEnabled());
}
TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {
std::string lookupUrl = "pulsar://localhost:6650";
std::string topicName = "persist-topic";
std::string subName = "test-persist-exclusive";
Result result;
ConsumerConfiguration config;
config.setReadCompacted(true);
config.setConsumerType(ConsumerExclusive);
ClientConfiguration clientConfig;
Client client(lookupUrl, clientConfig);
Consumer consumer;
result = client.subscribe(topicName, subName, config, consumer);
ASSERT_EQ(ResultOk, result);
consumer.close();
}
TEST(ConsumerConfigurationTest, testReadCompactPersistentFailover) {
std::string lookupUrl = "pulsar://localhost:6650";
std::string topicName = "persist-topic";
std::string subName = "test-persist-fail-over";
Result result;
ConsumerConfiguration config;
config.setReadCompacted(true);
config.setConsumerType(ConsumerFailover);
ClientConfiguration clientConfig;
Client client(lookupUrl, clientConfig);
Consumer consumer;
result = client.subscribe(topicName, subName, config, consumer);
ASSERT_EQ(ResultOk, result);
consumer.close();
}
TEST(ConsumerConfigurationTest, testSubscribePersistentKeyShared) {
std::string lookupUrl = "pulsar://localhost:6650";
std::string topicName = "persist-key-shared-topic";
std::string subName = "test-persist-key-shared";
Result result;
ConsumerConfiguration config;
// now, key-shared not support read compact
config.setReadCompacted(false);
config.setConsumerType(ConsumerKeyShared);
ClientConfiguration clientConfig;
Client client(lookupUrl, clientConfig);
Consumer consumer;
result = client.subscribe(topicName, subName, config, consumer);
ASSERT_EQ(ResultOk, result);
consumer.close();
}
TEST(ConsumerConfigurationTest, testReadCompactPersistentShared) {
std::string lookupUrl = "pulsar://localhost:6650";
std::string topicName = "persist-topic";
std::string subName = "test-persist-shared";
Result result;
ConsumerConfiguration config;
config.setReadCompacted(true);
config.setConsumerType(ConsumerShared);
ClientConfiguration clientConfig;
Client client(lookupUrl, clientConfig);
Consumer consumer;
result = client.subscribe(topicName, subName, config, consumer);
ASSERT_EQ(ResultInvalidConfiguration, result);
consumer.close();
}
TEST(ConsumerConfigurationTest, testReadCompactNonPersistentExclusive) {
std::string lookupUrl = "pulsar://localhost:6650";
std::string topicName = "non-persistent://public/default/testNonPersistentTopic";
std::string subName = "test-non-persist-exclusive";
Result result;
ConsumerConfiguration config;
config.setReadCompacted(true);
config.setConsumerType(ConsumerExclusive);
ClientConfiguration clientConfig;
Client client(lookupUrl, clientConfig);
Consumer consumer;
result = client.subscribe(topicName, subName, config, consumer);
ASSERT_EQ(ResultInvalidConfiguration, result);
consumer.close();
}
TEST(ConsumerConfigurationTest, testSubscriptionInitialPosition) {
std::string lookupUrl = "pulsar://localhost:6650";
std::string topicName = "persist-topic-test-position";
std::string subName = "test-subscription-initial-earliest-position";
ClientConfiguration clientConfig;
Client client(lookupUrl, clientConfig);
LOG_INFO("create 1 producer...");
Producer producer;
Result result;
ProducerConfiguration conf;
result = client.createProducer(topicName, conf, producer);
ASSERT_EQ(ResultOk, result);
// Send synchronously
std::string content1 = "msg-1-content-1";
Message msg = MessageBuilder().setContent(content1).build();
result = producer.send(msg);
ASSERT_EQ(ResultOk, result);
std::string content2 = "msg-2-content-2";
msg = MessageBuilder().setContent(content2).build();
result = producer.send(msg);
ASSERT_EQ(ResultOk, result);
Consumer consumer;
ConsumerConfiguration config;
config.setSubscriptionInitialPosition(InitialPosition::InitialPositionEarliest);
result = client.subscribe(topicName, subName, config, consumer);
ASSERT_EQ(ResultOk, result);
Message receivedMsg;
result = consumer.receive(receivedMsg, 2000);
ASSERT_EQ(ResultOk, result);
ASSERT_EQ(content1, receivedMsg.getDataAsString());
ASSERT_EQ(ResultOk, consumer.unsubscribe());
ASSERT_EQ(ResultAlreadyClosed, consumer.close());
ASSERT_EQ(ResultOk, producer.close());
ASSERT_EQ(ResultOk, client.close());
}
TEST(ConsumerConfigurationTest, testResetAckTimeOut) {
const uint64_t milliSeconds = 50000;
ConsumerConfiguration config;
config.setUnAckedMessagesTimeoutMs(milliSeconds);
ASSERT_EQ(milliSeconds, config.getUnAckedMessagesTimeoutMs());
// should be able to set it back to 0.
config.setUnAckedMessagesTimeoutMs(0);
ASSERT_EQ(0, config.getUnAckedMessagesTimeoutMs());
}
TEST(ConsumerConfigurationTest, testDeadLetterPolicy) {
ConsumerConfiguration config;
auto dlqPolicy = config.getDeadLetterPolicy();
ASSERT_TRUE(dlqPolicy.getDeadLetterTopic().empty());
ASSERT_EQ(dlqPolicy.getMaxRedeliverCount(), INT_MAX);
ASSERT_TRUE(dlqPolicy.getInitialSubscriptionName().empty());
config.setDeadLetterPolicy(DeadLetterPolicyBuilder().maxRedeliverCount(10).build());
auto dlqPolicy2 = config.getDeadLetterPolicy();
ASSERT_EQ(dlqPolicy2.getMaxRedeliverCount(), 10);
}