blob: d6a8f1d584debae8777ac87ff702be3199535c07 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "ReaderImpl.h"
#include "ClientImpl.h"
#include "ConsumerImpl.h"
#include "ExecutorService.h"
#include "GetLastMessageIdResponse.h"
#include "MultiTopicsConsumerImpl.h"
#include "TopicName.h"
namespace pulsar {
namespace test {
std::mutex readerConfigTestMutex;
std::atomic_bool readerConfigTestEnabled{false};
ConsumerConfiguration consumerConfigOfReader;
} // namespace test
static ResultCallback emptyCallback;
ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions,
const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor,
ReaderCallback readerCreatedCallback)
: topic_(topic),
partitions_(partitions),
client_(client),
readerConf_(conf),
readerCreatedCallback_(readerCreatedCallback) {}
void ReaderImpl::start(const MessageId& startMessageId,
std::function<void(const ConsumerImplBaseWeakPtr&)> callback) {
ConsumerConfiguration consumerConf;
consumerConf.setConsumerType(ConsumerExclusive);
consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
consumerConf.setReadCompacted(readerConf_.isReadCompacted());
consumerConf.setSchema(readerConf_.getSchema());
consumerConf.setUnAckedMessagesTimeoutMs(readerConf_.getUnAckedMessagesTimeoutMs());
consumerConf.setTickDurationInMs(readerConf_.getTickDurationInMs());
consumerConf.setAckGroupingTimeMs(readerConf_.getAckGroupingTimeMs());
consumerConf.setAckGroupingMaxSize(readerConf_.getAckGroupingMaxSize());
consumerConf.setCryptoKeyReader(readerConf_.getCryptoKeyReader());
consumerConf.setCryptoFailureAction(readerConf_.getCryptoFailureAction());
consumerConf.setProperties(readerConf_.getProperties());
if (readerConf_.getReaderName().length() > 0) {
consumerConf.setConsumerName(readerConf_.getReaderName());
}
if (readerConf_.hasReaderListener()) {
// Adapt the message listener to be a reader-listener
readerListener_ = readerConf_.getReaderListener();
consumerConf.setMessageListener(std::bind(&ReaderImpl::messageListener, shared_from_this(),
std::placeholders::_1, std::placeholders::_2));
}
std::string subscription;
if (!readerConf_.getInternalSubscriptionName().empty()) {
subscription = readerConf_.getInternalSubscriptionName();
} else {
subscription = "reader-" + generateRandomName();
if (!readerConf_.getSubscriptionRolePrefix().empty()) {
subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
}
}
// get the consumer's configuration before created
if (test::readerConfigTestEnabled) {
test::consumerConfigOfReader = consumerConf.clone();
}
if (partitions_ > 0) {
auto consumerImpl = std::make_shared<MultiTopicsConsumerImpl>(
client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf,
client_.lock()->getLookup(),
std::make_shared<ConsumerInterceptors>(std::vector<ConsumerInterceptorPtr>()),
Commands::SubscriptionModeNonDurable, startMessageId);
consumer_ = consumerImpl;
} else {
auto consumerImpl = std::make_shared<ConsumerImpl>(
client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
std::make_shared<ConsumerInterceptors>(std::vector<ConsumerInterceptorPtr>()),
ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable,
startMessageId);
consumerImpl->setPartitionIndex(TopicName::getPartitionIndex(topic_));
consumer_ = consumerImpl;
}
auto self = shared_from_this();
consumer_->getConsumerCreatedFuture().addListener(
[this, self, callback](Result result, const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
if (result == ResultOk) {
callback(weakConsumerPtr);
readerCreatedCallback_(result, Reader(self));
} else {
readerCreatedCallback_(result, {});
}
});
consumer_->start();
}
const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic(); }
Result ReaderImpl::readNext(Message& msg) {
Result res = consumer_->receive(msg);
acknowledgeIfNecessary(res, msg);
return res;
}
Result ReaderImpl::readNext(Message& msg, int timeoutMs) {
Result res = consumer_->receive(msg, timeoutMs);
acknowledgeIfNecessary(res, msg);
return res;
}
void ReaderImpl::readNextAsync(ReceiveCallback callback) {
auto self = shared_from_this();
consumer_->receiveAsync([self, callback](Result result, const Message& message) {
self->acknowledgeIfNecessary(result, message);
callback(result, message);
});
}
void ReaderImpl::messageListener(Consumer consumer, const Message& msg) {
readerListener_(Reader(shared_from_this()), msg);
acknowledgeIfNecessary(ResultOk, msg);
}
void ReaderImpl::acknowledgeIfNecessary(Result result, const Message& msg) {
if (result != ResultOk) {
return;
}
// Only acknowledge on the first message in the batch
if (msg.getMessageId().batchIndex() <= 0) {
// Acknowledge message immediately because the reader is based on non-durable
// subscription. When it reconnects, it will specify the subscription position anyway
consumer_->acknowledgeCumulativeAsync(msg.getMessageId(), emptyCallback);
}
}
void ReaderImpl::closeAsync(ResultCallback callback) { consumer_->closeAsync(callback); }
void ReaderImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
consumer_->hasMessageAvailableAsync(callback);
}
void ReaderImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
consumer_->seekAsync(msgId, callback);
}
void ReaderImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
consumer_->seekAsync(timestamp, callback);
}
void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
consumer_->getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) {
callback(result, response.getLastMessageId());
});
}
bool ReaderImpl::isConnected() const { return consumer_->isConnected(); }
} // namespace pulsar