blob: 044518a51a8e68fd1423903edea8b8f0bea283b2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <pulsar/ConsumerConfiguration.h>
#include <stdexcept>
#include "ConsumerConfigurationImpl.h"
namespace pulsar {
const static std::string emptyString;
ConsumerConfiguration::ConsumerConfiguration() : impl_(std::make_shared<ConsumerConfigurationImpl>()) {}
ConsumerConfiguration::~ConsumerConfiguration() {}
ConsumerConfiguration::ConsumerConfiguration(const ConsumerConfiguration& x) : impl_(x.impl_) {}
ConsumerConfiguration& ConsumerConfiguration::operator=(const ConsumerConfiguration& x) {
impl_ = x.impl_;
return *this;
}
ConsumerConfiguration ConsumerConfiguration::clone() const {
ConsumerConfiguration newConf;
newConf.impl_.reset(new ConsumerConfigurationImpl(*this->impl_));
return newConf;
}
ConsumerConfiguration& ConsumerConfiguration::setSchema(const SchemaInfo& schemaInfo) {
impl_->schemaInfo = schemaInfo;
return *this;
}
const SchemaInfo& ConsumerConfiguration::getSchema() const { return impl_->schemaInfo; }
long ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs() const {
return impl_->brokerConsumerStatsCacheTimeInMs;
}
void ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs) {
impl_->brokerConsumerStatsCacheTimeInMs = cacheTimeInMs;
}
ConsumerConfiguration& ConsumerConfiguration::setConsumerType(ConsumerType consumerType) {
impl_->consumerType = consumerType;
return *this;
}
ConsumerType ConsumerConfiguration::getConsumerType() const { return impl_->consumerType; }
ConsumerConfiguration& ConsumerConfiguration::setMessageListener(MessageListener messageListener) {
impl_->messageListener = messageListener;
impl_->hasMessageListener = true;
return *this;
}
MessageListener ConsumerConfiguration::getMessageListener() const { return impl_->messageListener; }
bool ConsumerConfiguration::hasMessageListener() const { return impl_->hasMessageListener; }
ConsumerConfiguration& ConsumerConfiguration::setConsumerEventListener(
ConsumerEventListenerPtr eventListener) {
impl_->eventListener = eventListener;
impl_->hasConsumerEventListener = true;
return *this;
}
ConsumerEventListenerPtr ConsumerConfiguration::getConsumerEventListener() const {
return impl_->eventListener;
}
bool ConsumerConfiguration::hasConsumerEventListener() const { return impl_->hasConsumerEventListener; }
void ConsumerConfiguration::setReceiverQueueSize(int size) { impl_->receiverQueueSize = size; }
int ConsumerConfiguration::getReceiverQueueSize() const { return impl_->receiverQueueSize; }
void ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions(int size) {
impl_->maxTotalReceiverQueueSizeAcrossPartitions = size;
}
int ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions() const {
return impl_->maxTotalReceiverQueueSizeAcrossPartitions;
}
const std::string& ConsumerConfiguration::getConsumerName() const { return impl_->consumerName; }
void ConsumerConfiguration::setConsumerName(const std::string& consumerName) {
impl_->consumerName = consumerName;
}
long ConsumerConfiguration::getUnAckedMessagesTimeoutMs() const { return impl_->unAckedMessagesTimeoutMs; }
void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds) {
if (milliSeconds < 10000 && milliSeconds != 0) {
throw std::invalid_argument(
"Consumer Config Exception: Unacknowledged message timeout should be greater than 10 seconds.");
}
impl_->unAckedMessagesTimeoutMs = milliSeconds;
}
long ConsumerConfiguration::getTickDurationInMs() const { return impl_->tickDurationInMs; }
void ConsumerConfiguration::setTickDurationInMs(const uint64_t milliSeconds) {
impl_->tickDurationInMs = milliSeconds;
}
void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis) {
impl_->negativeAckRedeliveryDelayMs = redeliveryDelayMillis;
}
long ConsumerConfiguration::getNegativeAckRedeliveryDelayMs() const {
return impl_->negativeAckRedeliveryDelayMs;
}
void ConsumerConfiguration::setAckGroupingTimeMs(long ackGroupingMillis) {
impl_->ackGroupingTimeMs = ackGroupingMillis;
}
long ConsumerConfiguration::getAckGroupingTimeMs() const { return impl_->ackGroupingTimeMs; }
void ConsumerConfiguration::setAckGroupingMaxSize(long maxGroupingSize) {
impl_->ackGroupingMaxSize = maxGroupingSize;
}
long ConsumerConfiguration::getAckGroupingMaxSize() const { return impl_->ackGroupingMaxSize; }
bool ConsumerConfiguration::isEncryptionEnabled() const { return (impl_->cryptoKeyReader != NULL); }
const CryptoKeyReaderPtr ConsumerConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader; }
ConsumerConfiguration& ConsumerConfiguration::setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader) {
impl_->cryptoKeyReader = cryptoKeyReader;
return *this;
}
ConsumerCryptoFailureAction ConsumerConfiguration::getCryptoFailureAction() const {
return impl_->cryptoFailureAction;
}
ConsumerConfiguration& ConsumerConfiguration::setCryptoFailureAction(ConsumerCryptoFailureAction action) {
impl_->cryptoFailureAction = action;
return *this;
}
bool ConsumerConfiguration::isReadCompacted() const { return impl_->readCompacted; }
void ConsumerConfiguration::setReadCompacted(bool compacted) { impl_->readCompacted = compacted; }
void ConsumerConfiguration::setSubscriptionInitialPosition(InitialPosition subscriptionInitialPosition) {
impl_->subscriptionInitialPosition = subscriptionInitialPosition;
}
InitialPosition ConsumerConfiguration::getSubscriptionInitialPosition() const {
return impl_->subscriptionInitialPosition;
}
void ConsumerConfiguration::setPatternAutoDiscoveryPeriod(int periodInSeconds) {
impl_->patternAutoDiscoveryPeriod = periodInSeconds;
}
int ConsumerConfiguration::getPatternAutoDiscoveryPeriod() const { return impl_->patternAutoDiscoveryPeriod; }
void ConsumerConfiguration::setReplicateSubscriptionStateEnabled(bool enabled) {
impl_->replicateSubscriptionStateEnabled = enabled;
}
bool ConsumerConfiguration::isReplicateSubscriptionStateEnabled() const {
return impl_->replicateSubscriptionStateEnabled;
}
bool ConsumerConfiguration::hasProperty(const std::string& name) const {
const std::map<std::string, std::string>& m = impl_->properties;
return m.find(name) != m.end();
}
const std::string& ConsumerConfiguration::getProperty(const std::string& name) const {
if (hasProperty(name)) {
const std::map<std::string, std::string>& m = impl_->properties;
return m.at(name);
} else {
return emptyString;
}
}
std::map<std::string, std::string>& ConsumerConfiguration::getProperties() const { return impl_->properties; }
ConsumerConfiguration& ConsumerConfiguration::setProperty(const std::string& name, const std::string& value) {
impl_->properties.insert(std::make_pair(name, value));
return *this;
}
ConsumerConfiguration& ConsumerConfiguration::setProperties(
const std::map<std::string, std::string>& properties) {
for (std::map<std::string, std::string>::const_iterator it = properties.begin(); it != properties.end();
it++) {
setProperty(it->first, it->second);
}
return *this;
}
std::map<std::string, std::string>& ConsumerConfiguration::getSubscriptionProperties() const {
return impl_->subscriptionProperties;
}
ConsumerConfiguration& ConsumerConfiguration::setSubscriptionProperties(
const std::map<std::string, std::string>& subscriptionProperties) {
for (const auto& subscriptionProperty : subscriptionProperties) {
impl_->subscriptionProperties.emplace(subscriptionProperty.first, subscriptionProperty.second);
}
return *this;
}
ConsumerConfiguration& ConsumerConfiguration::setPriorityLevel(int priorityLevel) {
if (priorityLevel < 0) {
throw std::invalid_argument("Consumer Config Exception: PriorityLevel should be nonnegative number.");
}
impl_->priorityLevel = priorityLevel;
return *this;
}
int ConsumerConfiguration::getPriorityLevel() const { return impl_->priorityLevel; }
ConsumerConfiguration& ConsumerConfiguration::setKeySharedPolicy(KeySharedPolicy keySharedPolicy) {
impl_->keySharedPolicy = keySharedPolicy.clone();
return *this;
}
KeySharedPolicy ConsumerConfiguration::getKeySharedPolicy() const { return impl_->keySharedPolicy; }
ConsumerConfiguration& ConsumerConfiguration::setMaxPendingChunkedMessage(size_t maxPendingChunkedMessage) {
impl_->maxPendingChunkedMessage = maxPendingChunkedMessage;
return *this;
}
size_t ConsumerConfiguration::getMaxPendingChunkedMessage() const { return impl_->maxPendingChunkedMessage; }
ConsumerConfiguration& ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull(
bool autoAckOldestChunkedMessageOnQueueFull) {
impl_->autoAckOldestChunkedMessageOnQueueFull = autoAckOldestChunkedMessageOnQueueFull;
return *this;
}
bool ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull() const {
return impl_->autoAckOldestChunkedMessageOnQueueFull;
}
ConsumerConfiguration& ConsumerConfiguration::setExpireTimeOfIncompleteChunkedMessageMs(
long expireTimeOfIncompleteChunkedMessageMs) {
impl_->expireTimeOfIncompleteChunkedMessageMs = expireTimeOfIncompleteChunkedMessageMs;
return *this;
}
long ConsumerConfiguration::getExpireTimeOfIncompleteChunkedMessageMs() const {
return impl_->expireTimeOfIncompleteChunkedMessageMs;
}
ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool startMessageIdInclusive) {
impl_->startMessageIdInclusive = startMessageIdInclusive;
return *this;
}
bool ConsumerConfiguration::isStartMessageIdInclusive() const { return impl_->startMessageIdInclusive; }
void ConsumerConfiguration::setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy) {
impl_->batchReceivePolicy = batchReceivePolicy;
}
const BatchReceivePolicy& ConsumerConfiguration::getBatchReceivePolicy() const {
return impl_->batchReceivePolicy;
}
ConsumerConfiguration& ConsumerConfiguration::setBatchIndexAckEnabled(bool enabled) {
impl_->batchIndexAckEnabled = enabled;
return *this;
}
bool ConsumerConfiguration::isBatchIndexAckEnabled() const { return impl_->batchIndexAckEnabled; }
void ConsumerConfiguration::setDeadLetterPolicy(const DeadLetterPolicy& deadLetterPolicy) {
impl_->deadLetterPolicy = deadLetterPolicy;
}
const DeadLetterPolicy& ConsumerConfiguration::getDeadLetterPolicy() const { return impl_->deadLetterPolicy; }
ConsumerConfiguration& ConsumerConfiguration::intercept(
const std::vector<ConsumerInterceptorPtr>& interceptors) {
impl_->interceptors.insert(impl_->interceptors.end(), interceptors.begin(), interceptors.end());
return *this;
}
const std::vector<ConsumerInterceptorPtr>& ConsumerConfiguration::getInterceptors() const {
return impl_->interceptors;
}
ConsumerConfiguration& ConsumerConfiguration::setAckReceiptEnabled(bool ackReceiptEnabled) {
impl_->ackReceiptEnabled = ackReceiptEnabled;
return *this;
}
bool ConsumerConfiguration::isAckReceiptEnabled() const { return impl_->ackReceiptEnabled; }
ConsumerConfiguration& ConsumerConfiguration::setRegexSubscriptionMode(
RegexSubscriptionMode regexSubscriptionMode) {
impl_->regexSubscriptionMode = regexSubscriptionMode;
return *this;
}
RegexSubscriptionMode ConsumerConfiguration::getRegexSubscriptionMode() const {
return impl_->regexSubscriptionMode;
}
} // namespace pulsar