| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #include "ConsumerImpl.h" |
| |
| #include <pulsar/DeadLetterPolicyBuilder.h> |
| #include <pulsar/MessageIdBuilder.h> |
| |
| #include <algorithm> |
| |
| #include "AckGroupingTracker.h" |
| #include "AckGroupingTrackerDisabled.h" |
| #include "AckGroupingTrackerEnabled.h" |
| #include "BatchMessageAcker.h" |
| #include "BatchedMessageIdImpl.h" |
| #include "BitSet.h" |
| #include "ChunkMessageIdImpl.h" |
| #include "ClientConnection.h" |
| #include "ClientImpl.h" |
| #include "Commands.h" |
| #include "ExecutorService.h" |
| #include "GetLastMessageIdResponse.h" |
| #include "LogUtils.h" |
| #include "MessageCrypto.h" |
| #include "MessageIdUtil.h" |
| #include "MessageImpl.h" |
| #include "MessagesImpl.h" |
| #include "ProducerConfigurationImpl.h" |
| #include "PulsarApi.pb.h" |
| #include "TimeUtils.h" |
| #include "TopicName.h" |
| #include "UnAckedMessageTrackerDisabled.h" |
| #include "UnAckedMessageTrackerEnabled.h" |
| #include "Utils.h" |
| #include "stats/ConsumerStatsDisabled.h" |
| #include "stats/ConsumerStatsImpl.h" |
| |
| namespace pulsar { |
| |
| DECLARE_LOG_OBJECT() |
| |
| ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, |
| const std::string& subscriptionName, const ConsumerConfiguration& conf, |
| bool isPersistent, const ConsumerInterceptorsPtr& interceptors, |
| const ExecutorServicePtr listenerExecutor /* = NULL by default */, |
| bool hasParent /* = false by default */, |
| const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */, |
| Commands::SubscriptionMode subscriptionMode, |
| boost::optional<MessageId> startMessageId) |
| : ConsumerImplBase( |
| client, topic, |
| Backoff(milliseconds(client->getClientConfig().getInitialBackoffIntervalMs()), |
| milliseconds(client->getClientConfig().getMaxBackoffIntervalMs()), milliseconds(0)), |
| conf, listenerExecutor ? listenerExecutor : client->getListenerExecutorProvider()->get()), |
| waitingForZeroQueueSizeMessage(false), |
| config_(conf), |
| subscription_(subscriptionName), |
| originalSubscriptionName_(subscriptionName), |
| isPersistent_(isPersistent), |
| messageListener_(config_.getMessageListener()), |
| eventListener_(config_.getConsumerEventListener()), |
| hasParent_(hasParent), |
| consumerTopicType_(consumerTopicType), |
| subscriptionMode_(subscriptionMode), |
| // This is the initial capacity of the queue |
| incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)), |
| availablePermits_(0), |
| receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2), |
| consumerId_(client->newConsumerId()), |
| consumerName_(config_.getConsumerName()), |
| messageListenerRunning_(true), |
| negativeAcksTracker_(client, *this, conf), |
| readCompacted_(conf.isReadCompacted()), |
| startMessageId_(startMessageId), |
| maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()), |
| autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()), |
| expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()), |
| interceptors_(interceptors) { |
| std::stringstream consumerStrStream; |
| consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] "; |
| consumerStr_ = consumerStrStream.str(); |
| |
| // Initialize un-ACKed messages OT tracker. |
| if (conf.getUnAckedMessagesTimeoutMs() != 0) { |
| if (conf.getTickDurationInMs() > 0) { |
| unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( |
| conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); |
| } else { |
| unAckedMessageTrackerPtr_.reset( |
| new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); |
| } |
| } else { |
| unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled()); |
| } |
| |
| // Setup stats reporter. |
| unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds(); |
| if (statsIntervalInSeconds) { |
| consumerStatsBasePtr_ = std::make_shared<ConsumerStatsImpl>( |
| consumerStr_, client->getIOExecutorProvider()->get(), statsIntervalInSeconds); |
| } else { |
| consumerStatsBasePtr_ = std::make_shared<ConsumerStatsDisabled>(); |
| } |
| consumerStatsBasePtr_->start(); |
| |
| // Create msgCrypto |
| if (conf.isEncryptionEnabled()) { |
| msgCrypto_ = std::make_shared<MessageCrypto>(consumerStr_, false); |
| } |
| |
| // Config dlq |
| auto deadLetterPolicy = conf.getDeadLetterPolicy(); |
| if (deadLetterPolicy.getMaxRedeliverCount() > 0) { |
| auto deadLetterPolicyBuilder = |
| DeadLetterPolicyBuilder() |
| .maxRedeliverCount(deadLetterPolicy.getMaxRedeliverCount()) |
| .initialSubscriptionName(deadLetterPolicy.getInitialSubscriptionName()); |
| if (deadLetterPolicy.getDeadLetterTopic().empty()) { |
| deadLetterPolicyBuilder.deadLetterTopic(topic + "-" + subscriptionName + DLQ_GROUP_TOPIC_SUFFIX); |
| } else { |
| deadLetterPolicyBuilder.deadLetterTopic(deadLetterPolicy.getDeadLetterTopic()); |
| } |
| deadLetterPolicy_ = deadLetterPolicyBuilder.build(); |
| } |
| |
| checkExpiredChunkedTimer_ = executor_->createDeadlineTimer(); |
| } |
| |
| ConsumerImpl::~ConsumerImpl() { |
| LOG_DEBUG(getName() << "~ConsumerImpl"); |
| if (state_ == Ready) { |
| // this could happen at least in this condition: |
| // consumer seek, caused reconnection, if consumer close happened before connection ready, |
| // then consumer will not send closeConsumer to Broker side, and caused a leak of consumer in |
| // broker. |
| LOG_WARN(getName() << "Destroyed consumer which was not properly closed"); |
| |
| ClientConnectionPtr cnx = getCnx().lock(); |
| ClientImplPtr client = client_.lock(); |
| if (client && cnx) { |
| int requestId = client->newRequestId(); |
| cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); |
| cnx->removeConsumer(consumerId_); |
| LOG_INFO(getName() << "Closed consumer for race condition: " << consumerId_); |
| } else { |
| LOG_WARN(getName() << "Client is destroyed and cannot send the CloseConsumer command"); |
| } |
| } |
| shutdown(); |
| } |
| |
| void ConsumerImpl::setPartitionIndex(int partitionIndex) { partitionIndex_ = partitionIndex; } |
| |
| int ConsumerImpl::getPartitionIndex() { return partitionIndex_; } |
| |
| uint64_t ConsumerImpl::getConsumerId() { return consumerId_; } |
| |
| Future<Result, ConsumerImplBaseWeakPtr> ConsumerImpl::getConsumerCreatedFuture() { |
| return consumerCreatedPromise_.getFuture(); |
| } |
| |
| const std::string& ConsumerImpl::getSubscriptionName() const { return originalSubscriptionName_; } |
| |
| const std::string& ConsumerImpl::getTopic() const { return *topic_; } |
| |
| void ConsumerImpl::start() { |
| HandlerBase::start(); |
| |
| std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()}; |
| auto connectionSupplier = [weakSelf]() -> ClientConnectionPtr { |
| auto self = weakSelf.lock(); |
| if (!self) { |
| return nullptr; |
| } |
| return self->getCnx().lock(); |
| }; |
| |
| // NOTE: start() is always called in `ClientImpl`'s method, so lock() returns not null |
| const auto requestIdGenerator = client_.lock()->getRequestIdGenerator(); |
| const auto requestIdSupplier = [requestIdGenerator] { return (*requestIdGenerator)++; }; |
| |
| // Initialize ackGroupingTrackerPtr_ here because the get_shared_this_ptr() was not initialized until the |
| // constructor completed. |
| if (TopicName::get(*topic_)->isPersistent()) { |
| if (config_.getAckGroupingTimeMs() > 0) { |
| ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled( |
| connectionSupplier, requestIdSupplier, consumerId_, config_.isAckReceiptEnabled(), |
| config_.getAckGroupingTimeMs(), config_.getAckGroupingMaxSize(), |
| client_.lock()->getIOExecutorProvider()->get())); |
| } else { |
| ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled( |
| connectionSupplier, requestIdSupplier, consumerId_, config_.isAckReceiptEnabled())); |
| } |
| } else { |
| LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic."); |
| ackGroupingTrackerPtr_.reset(new AckGroupingTracker(connectionSupplier, requestIdSupplier, |
| consumerId_, config_.isAckReceiptEnabled())); |
| } |
| ackGroupingTrackerPtr_->start(); |
| } |
| |
| void ConsumerImpl::beforeConnectionChange(ClientConnection& cnx) { cnx.removeConsumer(consumerId_); } |
| |
| void ConsumerImpl::onNegativeAcksSend(const std::set<MessageId>& messageIds) { |
| interceptors_->onNegativeAcksSend(Consumer(shared_from_this()), messageIds); |
| } |
| |
| void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { |
| if (state_ == Closed) { |
| LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed"); |
| return; |
| } |
| |
| // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after |
| // sending the subscribe request. |
| cnx->registerConsumer(consumerId_, get_shared_this_ptr()); |
| |
| if (duringSeek_) { |
| ackGroupingTrackerPtr_->flushAndClean(); |
| } |
| |
| Lock lockForMessageId(mutexForMessageId_); |
| // Update startMessageId so that we can discard messages after delivery restarts |
| const auto startMessageId = clearReceiveQueue(); |
| const auto subscribeMessageId = |
| (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId : boost::none; |
| startMessageId_ = startMessageId; |
| lockForMessageId.unlock(); |
| |
| unAckedMessageTrackerPtr_->clear(); |
| |
| ClientImplPtr client = client_.lock(); |
| uint64_t requestId = client->newRequestId(); |
| SharedBuffer cmd = Commands::newSubscribe( |
| *topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_, |
| subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(), |
| config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(), |
| config_.getKeySharedPolicy(), config_.getPriorityLevel()); |
| cnx->sendRequestWithId(cmd, requestId) |
| .addListener(std::bind(&ConsumerImpl::handleCreateConsumer, get_shared_this_ptr(), cnx, |
| std::placeholders::_1)); |
| } |
| |
| void ConsumerImpl::connectionFailed(Result result) { |
| // Keep a reference to ensure object is kept alive |
| auto ptr = get_shared_this_ptr(); |
| |
| if (consumerCreatedPromise_.setFailed(result)) { |
| state_ = Failed; |
| } |
| } |
| |
| void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int numMessages) { |
| if (cnx && numMessages > 0) { |
| LOG_DEBUG(getName() << "Send more permits: " << numMessages); |
| SharedBuffer cmd = Commands::newFlow(consumerId_, static_cast<unsigned int>(numMessages)); |
| cnx->sendCommand(cmd); |
| } |
| } |
| |
| void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) { |
| static bool firstTime = true; |
| if (result == ResultOk) { |
| if (firstTime) { |
| firstTime = false; |
| } |
| LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); |
| { |
| Lock lock(mutex_); |
| setCnx(cnx); |
| incomingMessages_.clear(); |
| possibleSendToDeadLetterTopicMessages_.clear(); |
| state_ = Ready; |
| backoff_.reset(); |
| // Complicated logic since we don't have a isLocked() function for mutex |
| if (waitingForZeroQueueSizeMessage) { |
| sendFlowPermitsToBroker(cnx, 1); |
| } |
| availablePermits_ = 0; |
| } |
| |
| LOG_DEBUG(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize()); |
| if (consumerTopicType_ == NonPartitioned || !firstTime) { |
| if (config_.getReceiverQueueSize() != 0) { |
| sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize()); |
| } else if (messageListener_) { |
| sendFlowPermitsToBroker(cnx, 1); |
| } |
| } |
| consumerCreatedPromise_.setValue(get_shared_this_ptr()); |
| } else { |
| if (result == ResultTimeout) { |
| // Creating the consumer has timed out. We need to ensure the broker closes the consumer |
| // in case it was indeed created, otherwise it might prevent new subscribe operation, |
| // since we are not closing the connection |
| int requestId = client_.lock()->newRequestId(); |
| cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); |
| } |
| |
| if (consumerCreatedPromise_.isComplete()) { |
| // Consumer had already been initially created, we need to retry connecting in any case |
| LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result)); |
| scheduleReconnection(get_shared_this_ptr()); |
| } else { |
| // Consumer was not yet created, retry to connect to broker if it's possible |
| if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) { |
| LOG_WARN(getName() << "Temporary error in creating consumer : " << strResult(result)); |
| scheduleReconnection(get_shared_this_ptr()); |
| } else { |
| LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result)); |
| consumerCreatedPromise_.setFailed(result); |
| state_ = Failed; |
| } |
| } |
| } |
| } |
| |
| void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) { |
| LOG_INFO(getName() << "Unsubscribing"); |
| |
| auto callback = [this, originalCallback](Result result) { |
| if (result == ResultOk) { |
| shutdown(); |
| LOG_INFO(getName() << "Unsubscribed successfully"); |
| } else { |
| state_ = Ready; |
| LOG_WARN(getName() << "Failed to unsubscribe: " << result); |
| } |
| if (originalCallback) { |
| originalCallback(result); |
| } |
| }; |
| |
| if (state_ != Ready) { |
| callback(ResultAlreadyClosed); |
| return; |
| } |
| |
| Lock lock(mutex_); |
| |
| ClientConnectionPtr cnx = getCnx().lock(); |
| if (cnx) { |
| LOG_DEBUG(getName() << "Unsubscribe request sent for consumer - " << consumerId_); |
| ClientImplPtr client = client_.lock(); |
| lock.unlock(); |
| int requestId = client->newRequestId(); |
| SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId); |
| auto self = get_shared_this_ptr(); |
| cnx->sendRequestWithId(cmd, requestId) |
| .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); |
| } else { |
| Result result = ResultNotConnected; |
| lock.unlock(); |
| LOG_WARN(getName() << "Failed to unsubscribe: " << strResult(result)); |
| callback(result); |
| } |
| } |
| |
| void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, bool autoAck) { |
| if (autoAck) { |
| acknowledgeAsync(messageId, [uuid, messageId](Result result) { |
| if (result != ResultOk) { |
| LOG_WARN("Failed to acknowledge discarded chunk, uuid: " << uuid |
| << ", messageId: " << messageId); |
| } |
| }); |
| } else { |
| trackMessage(messageId); |
| } |
| } |
| |
| void ConsumerImpl::triggerCheckExpiredChunkedTimer() { |
| checkExpiredChunkedTimer_->expires_from_now( |
| boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); |
| std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()}; |
| checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void { |
| auto self = weakSelf.lock(); |
| if (!self) { |
| return; |
| } |
| if (ec) { |
| LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec |
| << "]."); |
| return; |
| } |
| Lock lock(chunkProcessMutex_); |
| long currentTimeMs = TimeUtils::currentTimeMillis(); |
| chunkedMessageCache_.removeOldestValuesIf( |
| [this, currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool { |
| bool expired = |
| currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_; |
| if (!expired) { |
| return false; |
| } |
| for (const MessageId& msgId : ctx.getChunkedMessageIds()) { |
| LOG_INFO("Removing expired chunk messages: uuid: " << uuid << ", messageId: " << msgId); |
| discardChunkMessages(uuid, msgId, true); |
| } |
| return true; |
| }); |
| triggerCheckExpiredChunkedTimer(); |
| return; |
| }); |
| } |
| |
| boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload, |
| const proto::MessageMetadata& metadata, |
| const proto::MessageIdData& messageIdData, |
| const ClientConnectionPtr& cnx, |
| MessageId& messageId) { |
| const auto chunkId = metadata.chunk_id(); |
| const auto uuid = metadata.uuid(); |
| LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid |
| << ", messageId: " << messageId << ") of " |
| << payload.readableBytes() << " bytes"); |
| |
| Lock lock(chunkProcessMutex_); |
| |
| // Lazy task scheduling to expire incomplete chunk message |
| bool expected = false; |
| if (expireTimeOfIncompleteChunkedMessageMs_ > 0 && |
| expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, true)) { |
| triggerCheckExpiredChunkedTimer(); |
| } |
| |
| auto it = chunkedMessageCache_.find(uuid); |
| |
| if (chunkId == 0 && it == chunkedMessageCache_.end()) { |
| if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) { |
| chunkedMessageCache_.removeOldestValues( |
| chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1, |
| [this](const std::string& uuid, const ChunkedMessageCtx& ctx) { |
| for (const MessageId& msgId : ctx.getChunkedMessageIds()) { |
| discardChunkMessages(uuid, msgId, autoAckOldestChunkedMessageOnQueueFull_); |
| } |
| }); |
| } |
| it = chunkedMessageCache_.putIfAbsent( |
| uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()}); |
| } |
| |
| auto& chunkedMsgCtx = it->second; |
| if (it == chunkedMessageCache_.end() || !chunkedMsgCtx.validateChunkId(chunkId)) { |
| if (it == chunkedMessageCache_.end()) { |
| LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId |
| << ", messageId: " << messageId << ")"); |
| } else { |
| LOG_ERROR("Received a chunk whose chunk id is invalid (uuid: " |
| << uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")"); |
| chunkedMessageCache_.remove(uuid); |
| } |
| lock.unlock(); |
| increaseAvailablePermits(cnx); |
| trackMessage(messageId); |
| return boost::none; |
| } |
| |
| chunkedMsgCtx.appendChunk(messageId, payload); |
| if (!chunkedMsgCtx.isCompleted()) { |
| lock.unlock(); |
| increaseAvailablePermits(cnx); |
| return boost::none; |
| } |
| |
| ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>(); |
| chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front()); |
| chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back()); |
| messageId = chunkMsgId->build(); |
| |
| LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx |
| << ", sequenceId: " << metadata.sequence_id()); |
| |
| auto wholePayload = chunkedMsgCtx.getBuffer(); |
| chunkedMessageCache_.remove(uuid); |
| if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) { |
| return wholePayload; |
| } else { |
| return boost::none; |
| } |
| } |
| |
| void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg, |
| bool& isChecksumValid, proto::MessageMetadata& metadata, |
| SharedBuffer& payload) { |
| LOG_DEBUG(getName() << "Received Message -- Size: " << payload.readableBytes()); |
| |
| if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) { |
| // Message was discarded or not consumed due to decryption failure |
| return; |
| } |
| |
| if (!isChecksumValid) { |
| // Message discarded for checksum error |
| discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_ChecksumMismatch); |
| return; |
| } |
| |
| auto redeliveryCount = msg.redelivery_count(); |
| const bool isMessageUndecryptable = |
| metadata.encryption_keys_size() > 0 && !config_.getCryptoKeyReader().get() && |
| config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME; |
| |
| const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1; |
| if (!isMessageUndecryptable && !isChunkedMessage) { |
| if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata, payload, true)) { |
| // Message was discarded on decompression error |
| return; |
| } |
| } |
| |
| const auto& messageIdData = msg.message_id(); |
| auto messageId = MessageIdBuilder::from(messageIdData).batchIndex(-1).build(); |
| |
| // Only a non-batched messages can be a chunk |
| if (!metadata.has_num_messages_in_batch() && isChunkedMessage) { |
| auto optionalPayload = processMessageChunk(payload, metadata, messageIdData, cnx, messageId); |
| if (optionalPayload) { |
| payload = optionalPayload.value(); |
| } else { |
| return; |
| } |
| } |
| |
| Message m(messageId, metadata, payload); |
| m.impl_->cnx_ = cnx.get(); |
| m.impl_->setTopicName(topic_); |
| m.impl_->setRedeliveryCount(msg.redelivery_count()); |
| |
| if (metadata.has_schema_version()) { |
| m.impl_->setSchemaVersion(metadata.schema_version()); |
| } |
| |
| LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch()); |
| LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = " |
| << metadata.has_num_messages_in_batch()); |
| |
| uint32_t numOfMessageReceived = m.impl_->metadata.num_messages_in_batch(); |
| if (this->ackGroupingTrackerPtr_->isDuplicate(m.getMessageId())) { |
| LOG_DEBUG(getName() << " Ignoring message as it was ACKed earlier by same consumer."); |
| increaseAvailablePermits(cnx, numOfMessageReceived); |
| return; |
| } |
| |
| if (metadata.has_num_messages_in_batch()) { |
| BitSet::Data words(msg.ack_set_size()); |
| for (int i = 0; i < words.size(); i++) { |
| words[i] = msg.ack_set(i); |
| } |
| BitSet ackSet{std::move(words)}; |
| Lock lock(mutex_); |
| numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, ackSet, msg.redelivery_count()); |
| } else { |
| // try convery key value data. |
| m.impl_->convertPayloadToKeyValue(config_.getSchema()); |
| |
| const auto startMessageId = startMessageId_.get(); |
| if (isPersistent_ && startMessageId && |
| m.getMessageId().ledgerId() == startMessageId.value().ledgerId() && |
| m.getMessageId().entryId() == startMessageId.value().entryId() && |
| isPriorEntryIndex(m.getMessageId().entryId())) { |
| LOG_DEBUG(getName() << " Ignoring message from before the startMessageId: " |
| << startMessageId.value()); |
| return; |
| } |
| if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) { |
| possibleSendToDeadLetterTopicMessages_.emplace(m.getMessageId(), std::vector<Message>{m}); |
| if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) { |
| redeliverUnacknowledgedMessages({m.getMessageId()}); |
| increaseAvailablePermits(cnx); |
| return; |
| } |
| } |
| executeNotifyCallback(m); |
| } |
| |
| if (messageListener_) { |
| if (!messageListenerRunning_) { |
| return; |
| } |
| // Trigger message listener callback in a separate thread |
| while (numOfMessageReceived--) { |
| listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr())); |
| } |
| } |
| } |
| |
| void ConsumerImpl::activeConsumerChanged(bool isActive) { |
| if (eventListener_) { |
| listenerExecutor_->postWork( |
| std::bind(&ConsumerImpl::internalConsumerChangeListener, get_shared_this_ptr(), isActive)); |
| } |
| } |
| |
| void ConsumerImpl::internalConsumerChangeListener(bool isActive) { |
| try { |
| if (isActive) { |
| eventListener_->becameActive(Consumer(get_shared_this_ptr()), partitionIndex_); |
| } else { |
| eventListener_->becameInactive(Consumer(get_shared_this_ptr()), partitionIndex_); |
| } |
| } catch (const std::exception& e) { |
| LOG_ERROR(getName() << "Exception thrown from event listener " << e.what()); |
| } |
| } |
| |
| void ConsumerImpl::failPendingReceiveCallback() { |
| Message msg; |
| Lock lock(pendingReceiveMutex_); |
| while (!pendingReceives_.empty()) { |
| ReceiveCallback callback = pendingReceives_.front(); |
| pendingReceives_.pop(); |
| listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback, |
| get_shared_this_ptr(), ResultAlreadyClosed, msg, callback)); |
| } |
| lock.unlock(); |
| } |
| |
| void ConsumerImpl::executeNotifyCallback(Message& msg) { |
| Lock lock(pendingReceiveMutex_); |
| // if asyncReceive is waiting then notify callback without adding to incomingMessages queue |
| bool asyncReceivedWaiting = !pendingReceives_.empty(); |
| ReceiveCallback callback; |
| if (asyncReceivedWaiting) { |
| callback = pendingReceives_.front(); |
| pendingReceives_.pop(); |
| } |
| lock.unlock(); |
| |
| // has pending receive, direct callback. |
| if (asyncReceivedWaiting) { |
| listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback, |
| get_shared_this_ptr(), ResultOk, msg, callback)); |
| return; |
| } |
| |
| // try to add incoming messages. |
| // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message` |
| if (messageListener_ || config_.getReceiverQueueSize() != 0 || waitingForZeroQueueSizeMessage) { |
| incomingMessages_.push(msg); |
| incomingMessagesSize_.fetch_add(msg.getLength()); |
| } |
| |
| // try trigger pending batch messages |
| Lock batchOptionLock(batchReceiveOptionMutex_); |
| if (hasEnoughMessagesForBatchReceive()) { |
| ConsumerImplBase::notifyBatchPendingReceivedCallback(); |
| } |
| } |
| |
| void ConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) { |
| auto messages = std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(), |
| batchReceivePolicy_.getMaxNumBytes()); |
| Message msg; |
| while (incomingMessages_.popIf( |
| msg, [&messages](const Message& peekMsg) { return messages->canAdd(peekMsg); })) { |
| messageProcessed(msg); |
| Message interceptMsg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); |
| messages->add(interceptMsg); |
| } |
| auto self = get_shared_this_ptr(); |
| listenerExecutor_->postWork( |
| [callback, messages, self]() { callback(ResultOk, messages->getMessageList()); }); |
| } |
| |
| void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg, |
| const ReceiveCallback& callback) { |
| if (result == ResultOk && config_.getReceiverQueueSize() != 0) { |
| messageProcessed(msg); |
| msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); |
| unAckedMessageTrackerPtr_->add(msg.getMessageId()); |
| } |
| callback(result, msg); |
| } |
| |
| // Zero Queue size is not supported with Batch Messages |
| uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, |
| Message& batchedMessage, const BitSet& ackSet, |
| int redeliveryCount) { |
| auto batchSize = batchedMessage.impl_->metadata.num_messages_in_batch(); |
| LOG_DEBUG("Received Batch messages of size - " << batchSize |
| << " -- msgId: " << batchedMessage.getMessageId()); |
| const auto startMessageId = startMessageId_.get(); |
| |
| int skippedMessages = 0; |
| |
| auto acker = BatchMessageAckerImpl::create(batchSize); |
| std::vector<Message> possibleToDeadLetter; |
| for (int i = 0; i < batchSize; i++) { |
| // This is a cheap copy since message contains only one shared pointer (impl_) |
| Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker); |
| msg.impl_->setRedeliveryCount(redeliveryCount); |
| msg.impl_->setTopicName(batchedMessage.impl_->topicName_); |
| msg.impl_->convertPayloadToKeyValue(config_.getSchema()); |
| |
| if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) { |
| possibleToDeadLetter.emplace_back(msg); |
| if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) { |
| skippedMessages++; |
| continue; |
| } |
| } |
| |
| if (startMessageId) { |
| const MessageId& msgId = msg.getMessageId(); |
| |
| // If we are receiving a batch message, we need to discard messages that were prior |
| // to the startMessageId |
| if (isPersistent_ && msgId.ledgerId() == startMessageId.value().ledgerId() && |
| msgId.entryId() == startMessageId.value().entryId() && |
| isPriorBatchIndex(msgId.batchIndex())) { |
| LOG_DEBUG(getName() << "Ignoring message from before the startMessageId" |
| << msg.getMessageId()); |
| ++skippedMessages; |
| continue; |
| } |
| } |
| |
| if (!ackSet.isEmpty() && !ackSet.get(i)) { |
| LOG_DEBUG(getName() << "Ignoring message from " << i |
| << "th message, which has been acknowledged"); |
| ++skippedMessages; |
| continue; |
| } |
| |
| executeNotifyCallback(msg); |
| } |
| |
| if (!possibleToDeadLetter.empty()) { |
| possibleSendToDeadLetterTopicMessages_.emplace(batchedMessage.getMessageId(), possibleToDeadLetter); |
| if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) { |
| redeliverUnacknowledgedMessages({batchedMessage.getMessageId()}); |
| } |
| } |
| |
| if (skippedMessages > 0) { |
| increaseAvailablePermits(cnx, skippedMessages); |
| } |
| |
| return batchSize - skippedMessages; |
| } |
| |
| bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg, |
| const proto::MessageMetadata& metadata, SharedBuffer& payload) { |
| if (!metadata.encryption_keys_size()) { |
| return true; |
| } |
| |
| // If KeyReader is not configured throw exception based on config param |
| if (!config_.isEncryptionEnabled()) { |
| if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) { |
| LOG_WARN(getName() << "CryptoKeyReader is not implemented. Consuming encrypted message."); |
| return true; |
| } else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) { |
| LOG_WARN(getName() << "Skipping decryption since CryptoKeyReader is not implemented and config " |
| "is set to discard"); |
| discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError); |
| } else { |
| LOG_ERROR(getName() << "Message delivery failed since CryptoKeyReader is not implemented to " |
| "consume encrypted message"); |
| auto messageId = MessageIdBuilder::from(msg.message_id()).build(); |
| unAckedMessageTrackerPtr_->add(messageId); |
| } |
| return false; |
| } |
| |
| SharedBuffer decryptedPayload; |
| if (msgCrypto_->decrypt(metadata, payload, config_.getCryptoKeyReader(), decryptedPayload)) { |
| payload = decryptedPayload; |
| return true; |
| } |
| |
| if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) { |
| // Note, batch message will fail to consume even if config is set to consume |
| LOG_WARN( |
| getName() << "Decryption failed. Consuming encrypted message since config is set to consume."); |
| return true; |
| } else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) { |
| LOG_WARN(getName() << "Discarding message since decryption failed and config is set to discard"); |
| discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError); |
| } else { |
| LOG_ERROR(getName() << "Message delivery failed since unable to decrypt incoming message"); |
| auto messageId = MessageIdBuilder::from(msg.message_id()).build(); |
| unAckedMessageTrackerPtr_->add(messageId); |
| } |
| return false; |
| } |
| |
| bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, |
| const proto::MessageIdData& messageIdData, |
| const proto::MessageMetadata& metadata, SharedBuffer& payload, |
| bool checkMaxMessageSize) { |
| if (!metadata.has_compression()) { |
| return true; |
| } |
| |
| CompressionType compressionType = static_cast<CompressionType>(metadata.compression()); |
| |
| uint32_t uncompressedSize = metadata.uncompressed_size(); |
| uint32_t payloadSize = payload.readableBytes(); |
| if (cnx) { |
| if (checkMaxMessageSize && payloadSize > ClientConnection::getMaxMessageSize()) { |
| // Uncompressed size is itself corrupted since it cannot be bigger than the MaxMessageSize |
| LOG_ERROR(getName() << "Got corrupted payload message size " << payloadSize // |
| << " at " << messageIdData.ledgerid() << ":" << messageIdData.entryid()); |
| discardCorruptedMessage(cnx, messageIdData, |
| CommandAck_ValidationError_UncompressedSizeCorruption); |
| return false; |
| } |
| } else { |
| LOG_ERROR("Connection not ready for Consumer - " << getConsumerId()); |
| return false; |
| } |
| |
| if (!CompressionCodecProvider::getCodec(compressionType).decode(payload, uncompressedSize, payload)) { |
| LOG_ERROR(getName() << "Failed to decompress message with " << uncompressedSize // |
| << " at " << messageIdData.ledgerid() << ":" << messageIdData.entryid()); |
| discardCorruptedMessage(cnx, messageIdData, CommandAck_ValidationError_DecompressionError); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| void ConsumerImpl::discardCorruptedMessage(const ClientConnectionPtr& cnx, |
| const proto::MessageIdData& messageId, |
| CommandAck_ValidationError validationError) { |
| LOG_ERROR(getName() << "Discarding corrupted message at " << messageId.ledgerid() << ":" |
| << messageId.entryid()); |
| |
| SharedBuffer cmd = Commands::newAck(consumerId_, messageId.ledgerid(), messageId.entryid(), {}, |
| CommandAck_AckType_Individual, validationError); |
| |
| cnx->sendCommand(cmd); |
| increaseAvailablePermits(cnx); |
| } |
| |
| void ConsumerImpl::internalListener() { |
| if (!messageListenerRunning_) { |
| return; |
| } |
| Message msg; |
| if (!incomingMessages_.pop(msg, std::chrono::milliseconds(0))) { |
| // This will only happen when the connection got reset and we cleared the queue |
| return; |
| } |
| trackMessage(msg.getMessageId()); |
| try { |
| consumerStatsBasePtr_->receivedMessage(msg, ResultOk); |
| lastDequedMessageId_ = msg.getMessageId(); |
| Consumer consumer{get_shared_this_ptr()}; |
| Message interceptMsg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); |
| messageListener_(consumer, interceptMsg); |
| } catch (const std::exception& e) { |
| LOG_ERROR(getName() << "Exception thrown from listener" << e.what()); |
| } |
| messageProcessed(msg, false); |
| } |
| |
| Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) { |
| if (config_.getReceiverQueueSize() != 0) { |
| LOG_ERROR(getName() << " Can't use receiveForZeroQueueSize if the queue size is not 0"); |
| return ResultInvalidConfiguration; |
| } |
| |
| // Using RAII for locking |
| ClientConnectionPtr currentCnx = getCnx().lock(); |
| Lock lock(mutexForReceiveWithZeroQueueSize); |
| |
| // Just being cautious |
| if (incomingMessages_.size() != 0) { |
| LOG_ERROR( |
| getName() << "The incoming message queue should never be greater than 0 when Queue size is 0"); |
| incomingMessages_.clear(); |
| } |
| waitingForZeroQueueSizeMessage = true; |
| |
| sendFlowPermitsToBroker(currentCnx, 1); |
| |
| while (true) { |
| if (!incomingMessages_.pop(msg)) { |
| return ResultInterrupted; |
| } |
| |
| { |
| // Lock needed to prevent race between connectionOpened and the check "msg.impl_->cnx_ == |
| // currentCnx.get())" |
| Lock localLock(mutex_); |
| // if message received due to an old flow - discard it and wait for the message from the |
| // latest flow command |
| if (msg.impl_->cnx_ == currentCnx.get()) { |
| waitingForZeroQueueSizeMessage = false; |
| // Can't use break here else it may trigger a race with connection opened. |
| |
| localLock.unlock(); |
| msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); |
| return ResultOk; |
| } |
| } |
| } |
| } |
| |
| Result ConsumerImpl::receive(Message& msg) { |
| Result res = receiveHelper(msg); |
| consumerStatsBasePtr_->receivedMessage(msg, res); |
| return res; |
| } |
| |
| void ConsumerImpl::receiveAsync(ReceiveCallback callback) { |
| Message msg; |
| |
| // fail the callback if consumer is closing or closed |
| if (state_ != Ready) { |
| callback(ResultAlreadyClosed, msg); |
| return; |
| } |
| |
| Lock lock(pendingReceiveMutex_); |
| if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) { |
| lock.unlock(); |
| messageProcessed(msg); |
| msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); |
| callback(ResultOk, msg); |
| } else { |
| pendingReceives_.push(callback); |
| lock.unlock(); |
| |
| if (config_.getReceiverQueueSize() == 0) { |
| sendFlowPermitsToBroker(getCnx().lock(), 1); |
| } |
| } |
| } |
| |
| Result ConsumerImpl::receiveHelper(Message& msg) { |
| if (state_ != Ready) { |
| return ResultAlreadyClosed; |
| } |
| |
| if (messageListener_) { |
| LOG_ERROR(getName() << "Can not receive when a listener has been set"); |
| return ResultInvalidConfiguration; |
| } |
| |
| if (config_.getReceiverQueueSize() == 0) { |
| return fetchSingleMessageFromBroker(msg); |
| } |
| |
| if (!incomingMessages_.pop(msg)) { |
| return ResultInterrupted; |
| } |
| |
| messageProcessed(msg); |
| msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); |
| return ResultOk; |
| } |
| |
| Result ConsumerImpl::receive(Message& msg, int timeout) { |
| Result res = receiveHelper(msg, timeout); |
| consumerStatsBasePtr_->receivedMessage(msg, res); |
| return res; |
| } |
| |
| Result ConsumerImpl::receiveHelper(Message& msg, int timeout) { |
| if (config_.getReceiverQueueSize() == 0) { |
| LOG_WARN(getName() << "Can't use this function if the queue size is 0"); |
| return ResultInvalidConfiguration; |
| } |
| |
| if (state_ != Ready) { |
| return ResultAlreadyClosed; |
| } |
| |
| if (messageListener_) { |
| LOG_ERROR(getName() << "Can not receive when a listener has been set"); |
| return ResultInvalidConfiguration; |
| } |
| |
| if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) { |
| messageProcessed(msg); |
| msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); |
| return ResultOk; |
| } else { |
| if (state_ != Ready) { |
| return ResultAlreadyClosed; |
| } |
| return ResultTimeout; |
| } |
| } |
| |
| void ConsumerImpl::messageProcessed(Message& msg, bool track) { |
| Lock lock(mutexForMessageId_); |
| lastDequedMessageId_ = msg.getMessageId(); |
| lock.unlock(); |
| |
| incomingMessagesSize_.fetch_sub(msg.getLength()); |
| |
| ClientConnectionPtr currentCnx = getCnx().lock(); |
| if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) { |
| LOG_DEBUG(getName() << "Not adding permit since connection is different."); |
| return; |
| } |
| |
| increaseAvailablePermits(currentCnx); |
| if (track) { |
| trackMessage(msg.getMessageId()); |
| } |
| } |
| |
| /** |
| * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that |
| * was |
| * not seen by the application |
| */ |
| boost::optional<MessageId> ConsumerImpl::clearReceiveQueue() { |
| bool expectedDuringSeek = true; |
| if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) { |
| return seekMessageId_.get(); |
| } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) { |
| return startMessageId_.get(); |
| } |
| Message nextMessageInQueue; |
| if (incomingMessages_.peekAndClear(nextMessageInQueue)) { |
| // There was at least one message pending in the queue |
| const MessageId& nextMessageId = nextMessageInQueue.getMessageId(); |
| auto previousMessageId = (nextMessageId.batchIndex() >= 0) |
| ? MessageIdBuilder() |
| .ledgerId(nextMessageId.ledgerId()) |
| .entryId(nextMessageId.entryId()) |
| .batchIndex(nextMessageId.batchIndex() - 1) |
| .batchSize(nextMessageId.batchSize()) |
| .build() |
| : MessageIdBuilder() |
| .ledgerId(nextMessageId.ledgerId()) |
| .entryId(nextMessageId.entryId() - 1) |
| .build(); |
| return previousMessageId; |
| } else if (lastDequedMessageId_ != MessageId::earliest()) { |
| // If the queue was empty we need to restart from the message just after the last one that has been |
| // dequeued |
| // in the past |
| return lastDequedMessageId_; |
| } else { |
| // No message was received or dequeued by this consumer. Next message would still be the |
| // startMessageId |
| return startMessageId_.get(); |
| } |
| } |
| |
| void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta) { |
| int newAvailablePermits = availablePermits_.fetch_add(delta) + delta; |
| |
| while (newAvailablePermits >= receiverQueueRefillThreshold_ && messageListenerRunning_) { |
| if (availablePermits_.compare_exchange_weak(newAvailablePermits, 0)) { |
| sendFlowPermitsToBroker(currentCnx, newAvailablePermits); |
| break; |
| } |
| } |
| } |
| |
| inline CommandSubscribe_SubType ConsumerImpl::getSubType() { |
| ConsumerType type = config_.getConsumerType(); |
| switch (type) { |
| case ConsumerExclusive: |
| return CommandSubscribe_SubType_Exclusive; |
| |
| case ConsumerShared: |
| return CommandSubscribe_SubType_Shared; |
| |
| case ConsumerFailover: |
| return CommandSubscribe_SubType_Failover; |
| |
| case ConsumerKeyShared: |
| return CommandSubscribe_SubType_Key_Shared; |
| } |
| BOOST_THROW_EXCEPTION(std::logic_error("Invalid ConsumerType enumeration value")); |
| } |
| |
| inline CommandSubscribe_InitialPosition ConsumerImpl::getInitialPosition() { |
| InitialPosition initialPosition = config_.getSubscriptionInitialPosition(); |
| switch (initialPosition) { |
| case InitialPositionLatest: |
| return CommandSubscribe_InitialPosition_Latest; |
| |
| case InitialPositionEarliest: |
| return CommandSubscribe_InitialPosition_Earliest; |
| } |
| BOOST_THROW_EXCEPTION(std::logic_error("Invalid InitialPosition enumeration value")); |
| } |
| |
| void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) { |
| auto pair = prepareIndividualAck(msgId); |
| const auto& msgIdToAck = pair.first; |
| const bool readyToAck = pair.second; |
| if (readyToAck) { |
| ackGroupingTrackerPtr_->addAcknowledge(msgIdToAck, callback); |
| } else { |
| if (callback) { |
| callback(ResultOk); |
| } |
| } |
| interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultOk, msgId); |
| } |
| |
| void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) { |
| MessageIdList messageIdListToAck; |
| // TODO: Need to check if the consumer is ready. Same to all other public methods |
| for (auto&& msgId : messageIdList) { |
| auto pair = prepareIndividualAck(msgId); |
| const auto& msgIdToAck = pair.first; |
| const bool readyToAck = pair.second; |
| if (readyToAck) { |
| messageIdListToAck.emplace_back(msgIdToAck); |
| } |
| // Invoking `onAcknowledge` for all message ids no matter if it's ready to ack. This is consistent |
| // with the Java client. |
| interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultOk, msgId); |
| } |
| this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdListToAck, callback); |
| } |
| |
| void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) { |
| if (!isCumulativeAcknowledgementAllowed(config_.getConsumerType())) { |
| interceptors_->onAcknowledgeCumulative(Consumer(shared_from_this()), |
| ResultCumulativeAcknowledgementNotAllowedError, msgId); |
| if (callback) { |
| callback(ResultCumulativeAcknowledgementNotAllowedError); |
| } |
| return; |
| } |
| auto pair = prepareCumulativeAck(msgId); |
| const auto& msgIdToAck = pair.first; |
| const auto& readyToAck = pair.second; |
| if (readyToAck) { |
| consumerStatsBasePtr_->messageAcknowledged(ResultOk, CommandAck_AckType_Cumulative, 1); |
| unAckedMessageTrackerPtr_->removeMessagesTill(msgIdToAck); |
| ackGroupingTrackerPtr_->addAcknowledgeCumulative(msgIdToAck, callback); |
| } else if (callback) { |
| callback(ResultOk); |
| } |
| interceptors_->onAcknowledgeCumulative(Consumer(shared_from_this()), ResultOk, msgId); |
| } |
| |
| bool ConsumerImpl::isCumulativeAcknowledgementAllowed(ConsumerType consumerType) { |
| return consumerType != ConsumerKeyShared && consumerType != ConsumerShared; |
| } |
| |
| std::pair<MessageId, bool> ConsumerImpl::prepareIndividualAck(const MessageId& messageId) { |
| auto messageIdImpl = Commands::getMessageIdImpl(messageId); |
| auto batchedMessageIdImpl = std::dynamic_pointer_cast<BatchedMessageIdImpl>(messageIdImpl); |
| |
| auto batchSize = messageId.batchSize(); |
| if (!batchedMessageIdImpl || batchedMessageIdImpl->ackIndividual(messageId.batchIndex())) { |
| consumerStatsBasePtr_->messageAcknowledged(ResultOk, CommandAck_AckType_Individual, |
| (batchSize > 0) ? batchSize : 1); |
| unAckedMessageTrackerPtr_->remove(messageId); |
| possibleSendToDeadLetterTopicMessages_.remove(messageId); |
| return std::make_pair(discardBatch(messageId), true); |
| } else if (config_.isBatchIndexAckEnabled()) { |
| return std::make_pair(messageId, true); |
| } else { |
| return std::make_pair(MessageId{}, false); |
| } |
| } |
| |
| std::pair<MessageId, bool> ConsumerImpl::prepareCumulativeAck(const MessageId& messageId) { |
| auto messageIdImpl = Commands::getMessageIdImpl(messageId); |
| auto batchedMessageIdImpl = std::dynamic_pointer_cast<BatchedMessageIdImpl>(messageIdImpl); |
| |
| if (!batchedMessageIdImpl || batchedMessageIdImpl->ackCumulative(messageId.batchIndex())) { |
| return std::make_pair(discardBatch(messageId), true); |
| } else if (config_.isBatchIndexAckEnabled()) { |
| return std::make_pair(messageId, true); |
| } else { |
| if (batchedMessageIdImpl->shouldAckPreviousMessageId()) { |
| return std::make_pair(batchedMessageIdImpl->getPreviousMessageId(), true); |
| } else { |
| return std::make_pair(MessageId{}, false); |
| } |
| } |
| } |
| |
| void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) { |
| unAckedMessageTrackerPtr_->remove(messageId); |
| negativeAcksTracker_.add(messageId); |
| } |
| |
| void ConsumerImpl::disconnectConsumer() { |
| LOG_INFO("Broker notification of Closed consumer: " << consumerId_); |
| resetCnx(); |
| scheduleReconnection(get_shared_this_ptr()); |
| } |
| |
| void ConsumerImpl::closeAsync(ResultCallback originalCallback) { |
| auto callback = [this, originalCallback](Result result) { |
| shutdown(); |
| if (result == ResultOk) { |
| LOG_INFO(getName() << "Closed consumer " << consumerId_); |
| } else { |
| LOG_WARN(getName() << "Failed to close consumer: " << result); |
| } |
| if (originalCallback) { |
| originalCallback(result); |
| } |
| }; |
| |
| if (state_ != Ready) { |
| callback(ResultAlreadyClosed); |
| return; |
| } |
| |
| LOG_INFO(getName() << "Closing consumer for topic " << topic_); |
| state_ = Closing; |
| incomingMessages_.close(); |
| |
| // Flush pending grouped ACK requests. |
| if (ackGroupingTrackerPtr_) { |
| ackGroupingTrackerPtr_->close(); |
| } |
| negativeAcksTracker_.close(); |
| |
| ClientConnectionPtr cnx = getCnx().lock(); |
| if (!cnx) { |
| // If connection is gone, also the consumer is closed on the broker side |
| callback(ResultOk); |
| return; |
| } |
| |
| ClientImplPtr client = client_.lock(); |
| if (!client) { |
| // Client was already destroyed |
| callback(ResultOk); |
| return; |
| } |
| |
| cancelTimers(); |
| |
| int requestId = client->newRequestId(); |
| auto self = get_shared_this_ptr(); |
| cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId) |
| .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); |
| } |
| |
| const std::string& ConsumerImpl::getName() const { return consumerStr_; } |
| |
| void ConsumerImpl::shutdown() { |
| if (ackGroupingTrackerPtr_) { |
| ackGroupingTrackerPtr_->close(); |
| } |
| incomingMessages_.clear(); |
| possibleSendToDeadLetterTopicMessages_.clear(); |
| resetCnx(); |
| interceptors_->close(); |
| auto client = client_.lock(); |
| if (client) { |
| client->cleanupConsumer(this); |
| } |
| negativeAcksTracker_.close(); |
| cancelTimers(); |
| consumerCreatedPromise_.setFailed(ResultAlreadyClosed); |
| failPendingReceiveCallback(); |
| failPendingBatchReceiveCallback(); |
| state_ = Closed; |
| } |
| |
| bool ConsumerImpl::isClosed() { return state_ == Closed; } |
| |
| bool ConsumerImpl::isOpen() { return state_ == Ready; } |
| |
| Result ConsumerImpl::pauseMessageListener() { |
| if (!messageListener_) { |
| return ResultInvalidConfiguration; |
| } |
| messageListenerRunning_ = false; |
| return ResultOk; |
| } |
| |
| Result ConsumerImpl::resumeMessageListener() { |
| if (!messageListener_) { |
| return ResultInvalidConfiguration; |
| } |
| |
| if (messageListenerRunning_) { |
| // Not paused |
| return ResultOk; |
| } |
| messageListenerRunning_ = true; |
| const size_t count = incomingMessages_.size(); |
| |
| for (size_t i = 0; i < count; i++) { |
| // Trigger message listener callback in a separate thread |
| listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr())); |
| } |
| // Check current permits and determine whether to send FLOW command |
| this->increaseAvailablePermits(getCnx().lock(), 0); |
| return ResultOk; |
| } |
| |
| void ConsumerImpl::redeliverUnacknowledgedMessages() { |
| static std::set<MessageId> emptySet; |
| redeliverMessages(emptySet); |
| unAckedMessageTrackerPtr_->clear(); |
| } |
| |
| void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) { |
| if (messageIds.empty()) { |
| return; |
| } |
| if (config_.getConsumerType() != ConsumerShared && config_.getConsumerType() != ConsumerKeyShared) { |
| redeliverUnacknowledgedMessages(); |
| return; |
| } |
| |
| ClientConnectionPtr cnx = getCnx().lock(); |
| if (cnx) { |
| if (cnx->getServerProtocolVersion() >= proto::v2) { |
| auto needRedeliverMsgs = std::make_shared<std::set<MessageId>>(); |
| auto needCallBack = std::make_shared<std::atomic<int>>(messageIds.size()); |
| auto self = get_shared_this_ptr(); |
| // TODO Support MAX_REDELIVER_UNACKNOWLEDGED Avoid redelivering too many messages |
| for (const auto& msgId : messageIds) { |
| processPossibleToDLQ(msgId, |
| [self, needRedeliverMsgs, &msgId, needCallBack](bool processSuccess) { |
| if (!processSuccess) { |
| needRedeliverMsgs->emplace(msgId); |
| } |
| if (--(*needCallBack) == 0 && !needRedeliverMsgs->empty()) { |
| self->redeliverMessages(*needRedeliverMsgs); |
| } |
| }); |
| } |
| } |
| } else { |
| LOG_WARN("Connection not ready for Consumer - " << getConsumerId()); |
| } |
| } |
| |
| void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) { |
| ClientConnectionPtr cnx = getCnx().lock(); |
| if (cnx) { |
| if (cnx->getServerProtocolVersion() >= proto::v2) { |
| cnx->sendCommand(Commands::newRedeliverUnacknowledgedMessages(consumerId_, messageIds)); |
| LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for Consumer - " << getConsumerId()); |
| } |
| } else { |
| LOG_DEBUG("Connection not ready for Consumer - " << getConsumerId()); |
| } |
| } |
| |
| int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); } |
| |
| void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { |
| if (state_ != Ready) { |
| LOG_ERROR(getName() << "Client connection is not open, please try again later.") |
| callback(ResultConsumerNotInitialized, BrokerConsumerStats()); |
| return; |
| } |
| |
| Lock lock(mutex_); |
| if (brokerConsumerStats_.isValid()) { |
| LOG_DEBUG(getName() << "Serving data from cache"); |
| BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_; |
| lock.unlock(); |
| callback(ResultOk, |
| BrokerConsumerStats(std::make_shared<BrokerConsumerStatsImpl>(brokerConsumerStats_))); |
| return; |
| } |
| lock.unlock(); |
| |
| ClientConnectionPtr cnx = getCnx().lock(); |
| if (cnx) { |
| if (cnx->getServerProtocolVersion() >= proto::v8) { |
| ClientImplPtr client = client_.lock(); |
| uint64_t requestId = client->newRequestId(); |
| LOG_DEBUG(getName() << " Sending ConsumerStats Command for Consumer - " << getConsumerId() |
| << ", requestId - " << requestId); |
| |
| cnx->newConsumerStats(consumerId_, requestId) |
| .addListener(std::bind(&ConsumerImpl::brokerConsumerStatsListener, get_shared_this_ptr(), |
| std::placeholders::_1, std::placeholders::_2, callback)); |
| return; |
| } else { |
| LOG_ERROR(getName() << " Operation not supported since server protobuf version " |
| << cnx->getServerProtocolVersion() << " is older than proto::v7"); |
| callback(ResultUnsupportedVersionError, BrokerConsumerStats()); |
| return; |
| } |
| } |
| LOG_ERROR(getName() << " Client Connection not ready for Consumer"); |
| callback(ResultNotConnected, BrokerConsumerStats()); |
| } |
| |
| void ConsumerImpl::brokerConsumerStatsListener(Result res, BrokerConsumerStatsImpl brokerConsumerStats, |
| BrokerConsumerStatsCallback callback) { |
| if (res == ResultOk) { |
| Lock lock(mutex_); |
| brokerConsumerStats.setCacheTime(config_.getBrokerConsumerStatsCacheTimeInMs()); |
| brokerConsumerStats_ = brokerConsumerStats; |
| } |
| |
| if (callback) { |
| callback(res, BrokerConsumerStats(std::make_shared<BrokerConsumerStatsImpl>(brokerConsumerStats))); |
| } |
| } |
| |
| void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { |
| const auto state = state_.load(); |
| if (state == Closed || state == Closing) { |
| LOG_ERROR(getName() << "Client connection already closed."); |
| if (callback) { |
| callback(ResultAlreadyClosed); |
| } |
| return; |
| } |
| |
| ClientImplPtr client = client_.lock(); |
| if (!client) { |
| LOG_ERROR(getName() << "Client is expired when seekAsync " << msgId); |
| return; |
| } |
| const auto requestId = client->newRequestId(); |
| seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), msgId, 0L, callback); |
| } |
| |
| void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { |
| const auto state = state_.load(); |
| if (state == Closed || state == Closing) { |
| LOG_ERROR(getName() << "Client connection already closed."); |
| if (callback) { |
| callback(ResultAlreadyClosed); |
| } |
| return; |
| } |
| |
| ClientImplPtr client = client_.lock(); |
| if (!client) { |
| LOG_ERROR(getName() << "Client is expired when seekAsync " << timestamp); |
| return; |
| } |
| const auto requestId = client->newRequestId(); |
| seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), MessageId::earliest(), |
| timestamp, callback); |
| } |
| |
| bool ConsumerImpl::isReadCompacted() { return readCompacted_; } |
| |
| inline bool hasMoreMessages(const MessageId& lastMessageIdInBroker, const MessageId& messageId) { |
| return lastMessageIdInBroker > messageId && lastMessageIdInBroker.entryId() != -1; |
| } |
| |
| void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) { |
| const auto startMessageId = startMessageId_.get(); |
| Lock lock(mutexForMessageId_); |
| const auto messageId = |
| (lastDequedMessageId_ == MessageId::earliest()) ? startMessageId.value() : lastDequedMessageId_; |
| |
| if (messageId == MessageId::latest()) { |
| lock.unlock(); |
| getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) { |
| if (result != ResultOk) { |
| callback(result, {}); |
| return; |
| } |
| if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) { |
| // We only care about comparing ledger ids and entry ids as mark delete position doesn't have |
| // other ids such as batch index |
| callback(ResultOk, compareLedgerAndEntryId(response.getMarkDeletePosition(), |
| response.getLastMessageId()) < 0); |
| } else { |
| callback(ResultOk, false); |
| } |
| }); |
| } else { |
| if (hasMoreMessages(lastMessageIdInBroker_, messageId)) { |
| lock.unlock(); |
| callback(ResultOk, true); |
| return; |
| } |
| lock.unlock(); |
| |
| getLastMessageIdAsync([callback, messageId](Result result, const GetLastMessageIdResponse& response) { |
| callback(result, (result == ResultOk) && hasMoreMessages(response.getLastMessageId(), messageId)); |
| }); |
| } |
| } |
| |
| void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) { |
| const auto state = state_.load(); |
| if (state == Closed || state == Closing) { |
| LOG_ERROR(getName() << "Client connection already closed."); |
| if (callback) { |
| callback(ResultAlreadyClosed, GetLastMessageIdResponse()); |
| } |
| return; |
| } |
| |
| TimeDuration operationTimeout = seconds(client_.lock()->conf().getOperationTimeoutSeconds()); |
| BackoffPtr backoff = std::make_shared<Backoff>(milliseconds(100), operationTimeout * 2, milliseconds(0)); |
| DeadlineTimerPtr timer = executor_->createDeadlineTimer(); |
| |
| internalGetLastMessageIdAsync(backoff, operationTimeout, timer, callback); |
| } |
| |
| void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, TimeDuration remainTime, |
| const DeadlineTimerPtr& timer, |
| BrokerGetLastMessageIdCallback callback) { |
| ClientConnectionPtr cnx = getCnx().lock(); |
| if (cnx) { |
| if (cnx->getServerProtocolVersion() >= proto::v12) { |
| ClientImplPtr client = client_.lock(); |
| uint64_t requestId = client->newRequestId(); |
| LOG_DEBUG(getName() << " Sending getLastMessageId Command for Consumer - " << getConsumerId() |
| << ", requestId - " << requestId); |
| |
| auto self = get_shared_this_ptr(); |
| cnx->newGetLastMessageId(consumerId_, requestId) |
| .addListener([this, self, callback](Result result, const GetLastMessageIdResponse& response) { |
| if (result == ResultOk) { |
| LOG_DEBUG(getName() << "getLastMessageId: " << response); |
| Lock lock(mutexForMessageId_); |
| lastMessageIdInBroker_ = response.getLastMessageId(); |
| lock.unlock(); |
| } else { |
| LOG_ERROR(getName() << "Failed to getLastMessageId: " << result); |
| } |
| callback(result, response); |
| }); |
| } else { |
| LOG_ERROR(getName() << " Operation not supported since server protobuf version " |
| << cnx->getServerProtocolVersion() << " is older than proto::v12"); |
| callback(ResultUnsupportedVersionError, MessageId()); |
| } |
| } else { |
| TimeDuration next = std::min(remainTime, backoff->next()); |
| if (next.total_milliseconds() <= 0) { |
| LOG_ERROR(getName() << " Client Connection not ready for Consumer"); |
| callback(ResultNotConnected, MessageId()); |
| return; |
| } |
| remainTime -= next; |
| |
| timer->expires_from_now(next); |
| |
| auto self = shared_from_this(); |
| timer->async_wait([this, backoff, remainTime, timer, next, callback, |
| self](const boost::system::error_code& ec) -> void { |
| if (ec == boost::asio::error::operation_aborted) { |
| LOG_DEBUG(getName() << " Get last message id operation was cancelled, code[" << ec << "]."); |
| return; |
| } |
| if (ec) { |
| LOG_ERROR(getName() << " Failed to get last message id, code[" << ec << "]."); |
| return; |
| } |
| LOG_WARN(getName() << " Could not get connection while getLastMessageId -- Will try again in " |
| << next.total_milliseconds() << " ms") |
| this->internalGetLastMessageIdAsync(backoff, remainTime, timer, callback); |
| }); |
| } |
| } |
| |
| void ConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) { |
| negativeAcksTracker_.setEnabledForTesting(enabled); |
| } |
| |
| void ConsumerImpl::trackMessage(const MessageId& messageId) { |
| if (hasParent_) { |
| unAckedMessageTrackerPtr_->remove(messageId); |
| } else { |
| unAckedMessageTrackerPtr_->add(messageId); |
| } |
| } |
| |
| bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; } |
| |
| uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; } |
| |
| void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, |
| long timestamp, ResultCallback callback) { |
| ClientConnectionPtr cnx = getCnx().lock(); |
| if (!cnx) { |
| LOG_ERROR(getName() << " Client Connection not ready for Consumer"); |
| callback(ResultNotConnected); |
| return; |
| } |
| |
| const auto originalSeekMessageId = seekMessageId_.get(); |
| seekMessageId_ = seekId; |
| duringSeek_ = true; |
| if (timestamp > 0) { |
| LOG_INFO(getName() << " Seeking subscription to " << timestamp); |
| } else { |
| LOG_INFO(getName() << " Seeking subscription to " << seekId); |
| } |
| |
| std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()}; |
| |
| cnx->sendRequestWithId(seek, requestId) |
| .addListener([this, weakSelf, callback, originalSeekMessageId](Result result, |
| const ResponseData& responseData) { |
| auto self = weakSelf.lock(); |
| if (!self) { |
| callback(result); |
| return; |
| } |
| if (result == ResultOk) { |
| LOG_INFO(getName() << "Seek successfully"); |
| ackGroupingTrackerPtr_->flushAndClean(); |
| incomingMessages_.clear(); |
| Lock lock(mutexForMessageId_); |
| lastDequedMessageId_ = MessageId::earliest(); |
| lock.unlock(); |
| } else { |
| LOG_ERROR(getName() << "Failed to seek: " << result); |
| seekMessageId_ = originalSeekMessageId; |
| duringSeek_ = false; |
| } |
| callback(result); |
| }); |
| } |
| |
| bool ConsumerImpl::isPriorBatchIndex(int32_t idx) { |
| return config_.isStartMessageIdInclusive() ? idx < startMessageId_.get().value().batchIndex() |
| : idx <= startMessageId_.get().value().batchIndex(); |
| } |
| |
| bool ConsumerImpl::isPriorEntryIndex(int64_t idx) { |
| return config_.isStartMessageIdInclusive() ? idx < startMessageId_.get().value().entryId() |
| : idx <= startMessageId_.get().value().entryId(); |
| } |
| |
| bool ConsumerImpl::hasEnoughMessagesForBatchReceive() const { |
| if (batchReceivePolicy_.getMaxNumMessages() <= 0 && batchReceivePolicy_.getMaxNumBytes() <= 0) { |
| return false; |
| } |
| return (batchReceivePolicy_.getMaxNumMessages() > 0 && |
| incomingMessages_.size() >= batchReceivePolicy_.getMaxNumMessages()) || |
| (batchReceivePolicy_.getMaxNumBytes() > 0 && |
| incomingMessagesSize_ >= batchReceivePolicy_.getMaxNumBytes()); |
| } |
| |
| std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() { |
| return std::dynamic_pointer_cast<ConsumerImpl>(shared_from_this()); |
| } |
| |
| void ConsumerImpl::cancelTimers() noexcept { |
| boost::system::error_code ec; |
| batchReceiveTimer_->cancel(ec); |
| checkExpiredChunkedTimer_->cancel(ec); |
| } |
| |
| void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) { |
| auto messages = possibleSendToDeadLetterTopicMessages_.find(messageId); |
| if (!messages) { |
| cb(false); |
| return; |
| } |
| |
| // Initialize deadLetterProducer_ |
| if (!deadLetterProducer_) { |
| std::lock_guard<std::mutex> createLock(createProducerLock_); |
| if (!deadLetterProducer_) { |
| deadLetterProducer_ = std::make_shared<Promise<Result, Producer>>(); |
| ProducerConfiguration producerConfiguration; |
| producerConfiguration.setSchema(config_.getSchema()); |
| producerConfiguration.setBlockIfQueueFull(false); |
| producerConfiguration.impl_->initialSubscriptionName = |
| deadLetterPolicy_.getInitialSubscriptionName(); |
| ClientImplPtr client = client_.lock(); |
| if (client) { |
| auto self = get_shared_this_ptr(); |
| client->createProducerAsync( |
| deadLetterPolicy_.getDeadLetterTopic(), producerConfiguration, |
| [self](Result res, Producer producer) { |
| if (res == ResultOk) { |
| self->deadLetterProducer_->setValue(producer); |
| } else { |
| LOG_ERROR("Dead letter producer create exception with topic: " |
| << self->deadLetterPolicy_.getDeadLetterTopic() << " ex: " << res); |
| self->deadLetterProducer_.reset(); |
| } |
| }); |
| } else { |
| LOG_WARN(getName() << "Client is destroyed and cannot create dead letter producer."); |
| return; |
| } |
| } |
| } |
| |
| for (const auto& message : messages.value()) { |
| std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()}; |
| deadLetterProducer_->getFuture().addListener([weakSelf, message, messageId, cb](Result res, |
| Producer producer) { |
| auto self = weakSelf.lock(); |
| if (!self) { |
| return; |
| } |
| auto originMessageId = message.getMessageId(); |
| std::stringstream originMessageIdStr; |
| originMessageIdStr << originMessageId; |
| MessageBuilder msgBuilder; |
| msgBuilder.setAllocatedContent(const_cast<void*>(message.getData()), message.getLength()) |
| .setProperties(message.getProperties()) |
| .setProperty(PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr.str()) |
| .setProperty(SYSTEM_PROPERTY_REAL_TOPIC, message.getTopicName()); |
| if (message.hasPartitionKey()) { |
| msgBuilder.setPartitionKey(message.getPartitionKey()); |
| } |
| if (message.hasOrderingKey()) { |
| msgBuilder.setOrderingKey(message.getOrderingKey()); |
| } |
| producer.sendAsync(msgBuilder.build(), [weakSelf, originMessageId, messageId, cb]( |
| Result res, const MessageId& messageIdInDLQ) { |
| auto self = weakSelf.lock(); |
| if (!self) { |
| return; |
| } |
| if (res == ResultOk) { |
| if (self->state_ != Ready) { |
| LOG_WARN( |
| "Send to the DLQ successfully, but consumer is not ready. ignore acknowledge : " |
| << self->state_); |
| cb(false); |
| return; |
| } |
| self->possibleSendToDeadLetterTopicMessages_.remove(messageId); |
| self->acknowledgeAsync(originMessageId, [weakSelf, originMessageId, cb](Result result) { |
| auto self = weakSelf.lock(); |
| if (!self) { |
| return; |
| } |
| if (result != ResultOk) { |
| LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {" |
| << self->consumerName_ << "} Failed to acknowledge the message {" |
| << originMessageId |
| << "} of the original topic but send to the DLQ successfully : " |
| << result); |
| cb(false); |
| } else { |
| LOG_DEBUG("Send msg:" << originMessageId |
| << "to DLQ success and acknowledge success."); |
| cb(true); |
| } |
| }); |
| } else { |
| LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {" |
| << self->consumerName_ << "} Failed to send DLQ message to {" |
| << self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id " |
| << "{" << originMessageId << "} : " << res); |
| cb(false); |
| } |
| }); |
| }); |
| } |
| } |
| |
| } /* namespace pulsar */ |