blob: 598dcbb8d2cf57b235ede8df3c1cf7ed18d09119 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <apache/rocketmq/v1/definition.pb.h>
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <memory>
#include <set>
#include "Assignment.h"
#include "ClientManager.h"
#include "FilterExpression.h"
#include "MixAll.h"
#include "ProcessQueue.h"
#include "ReceiveMessageCallback.h"
#include "TopicAssignmentInfo.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "apache/rocketmq/v1/service.pb.h"
#include "rocketmq/ConsumeType.h"
#include "rocketmq/MQMessageExt.h"
#include "rocketmq/MQMessageQueue.h"
#include "gtest/gtest_prod.h"
ROCKETMQ_NAMESPACE_BEGIN
struct OffsetRecord {
explicit OffsetRecord(int64_t offset) : offset_(offset), released_(false) {
}
OffsetRecord(int64_t offset, bool released) : offset_(offset), released_(released) {
}
int64_t offset_;
bool released_;
};
ROCKETMQ_NAMESPACE_END
namespace std {
template <>
struct less<ROCKETMQ_NAMESPACE::OffsetRecord> {
bool operator()(const ROCKETMQ_NAMESPACE::OffsetRecord& lhs, const ROCKETMQ_NAMESPACE::OffsetRecord& rhs) const {
return lhs.offset_ < rhs.offset_;
}
};
} // namespace std
ROCKETMQ_NAMESPACE_BEGIN
class PushConsumer;
/**
* @brief Once messages are fetched(either pulled or popped) from remote server, they are firstly put into cache.
* Dispatcher thread, after waking up, will submit them into thread-pool. Messages at this phase are called "inflight"
* state. Once messages are processed by user-passed-in callback, their quota will be released for future incoming
* messages.
*/
class ProcessQueueImpl : virtual public ProcessQueue {
public:
ProcessQueueImpl(MQMessageQueue message_queue, FilterExpression filter_expression,
std::weak_ptr<PushConsumer> consumer, std::shared_ptr<ClientManager> client_instance);
~ProcessQueueImpl() override;
void callback(std::shared_ptr<ReceiveMessageCallback> callback) override;
MQMessageQueue getMQMessageQueue() override;
bool expired() const override;
bool shouldThrottle() const override LOCKS_EXCLUDED(messages_mtx_);
const FilterExpression& getFilterExpression() const override;
std::weak_ptr<PushConsumer> getConsumer() override;
std::shared_ptr<ClientManager> getClientManager() override;
void receiveMessage() override;
const std::string& simpleName() const override {
return simple_name_;
}
std::string topic() const override {
return message_queue_.getTopic();
}
bool hasPendingMessages() const override LOCKS_EXCLUDED(messages_mtx_);
/**
* Put message fetched from broker into cache.
*
* @param messages
*/
void cacheMessages(const std::vector<MQMessageExt>& messages) override LOCKS_EXCLUDED(messages_mtx_, offsets_mtx_);
/**
* @return Number of messages that is not yet dispatched to thread pool, likely, due to topic-rate-limiting.
*/
uint32_t cachedMessagesSize() const LOCKS_EXCLUDED(messages_mtx_) {
absl::MutexLock lk(&messages_mtx_);
return cached_messages_.size();
}
/**
* Dispatch messages from cache to thread pool in form of consumeTask.
* @param batch_size
* @param messages
* @return true if there are more messages to consume in cache
*/
bool take(uint32_t batch_size, std::vector<MQMessageExt>& messages) override LOCKS_EXCLUDED(messages_mtx_);
void syncIdleState() override {
idle_since_ = std::chrono::steady_clock::now();
}
void nextOffset(int64_t next_offset) override {
assert(next_offset >= 0);
next_offset_ = next_offset;
}
int64_t nextOffset() const {
return next_offset_;
}
bool committedOffset(int64_t& offset) override LOCKS_EXCLUDED(offsets_mtx_);
void release(uint64_t body_size, int64_t offset) override LOCKS_EXCLUDED(messages_mtx_, offsets_mtx_);
bool unbindFifoConsumeTask() override {
bool expected = true;
return has_fifo_task_bound_.compare_exchange_strong(expected, false, std::memory_order_relaxed);
}
bool bindFifoConsumeTask() override {
bool expected = false;
return has_fifo_task_bound_.compare_exchange_strong(expected, true, std::memory_order_relaxed);
}
private:
MQMessageQueue message_queue_;
/**
* Expression used to filter message in the server side.
*/
const FilterExpression filter_expression_;
std::chrono::milliseconds invisible_time_;
std::chrono::steady_clock::time_point idle_since_{std::chrono::steady_clock::now()};
absl::Time create_timestamp_{absl::Now()};
std::string simple_name_;
std::weak_ptr<PushConsumer> consumer_;
std::shared_ptr<ClientManager> client_manager_;
std::shared_ptr<ReceiveMessageCallback> receive_callback_;
/**
* Messages that are pending to be submitted to thread pool.
*/
mutable std::vector<MQMessageExt> cached_messages_ GUARDED_BY(messages_mtx_);
mutable absl::Mutex messages_mtx_;
/**
* @brief Quantity of the cached messages.
*
*/
std::atomic<uint32_t> cached_message_quantity_;
/**
* @brief Total body memory size of the cached messages.
*
*/
std::atomic<uint64_t> cached_message_memory_;
int64_t next_offset_{0};
/**
* If this process queue is used in FIFO scenario, this field marks if there is an task in thread pool.
*/
std::atomic_bool has_fifo_task_bound_{false};
std::set<OffsetRecord> offsets_ GUARDED_BY(offsets_mtx_);
absl::Mutex offsets_mtx_;
void popMessage();
void wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata,
rmq::ReceiveMessageRequest& request);
void pullMessage();
void wrapPullMessageRequest(rmq::PullMessageRequest& request);
void wrapFilterExpression(rmq::FilterExpression* filter_expression);
FRIEND_TEST(ProcessQueueTest, testExpired);
};
ROCKETMQ_NAMESPACE_END