blob: 43f82037453d5fd2d2ba7a8d709a5a118cb1ac3a [file] [log] [blame]
#include "PullConsumerImpl.h"
#include "ClientManagerFactory.h"
#include "InvocationContext.h"
#include "Signature.h"
#include "apache/rocketmq/v1/definition.pb.h"
#include "rocketmq/ErrorCode.h"
#include "rocketmq/MQClientException.h"
#include "rocketmq/MessageModel.h"
#include "rocketmq/PullResult.h"
#include <exception>
#include <system_error>
ROCKETMQ_NAMESPACE_BEGIN
void PullConsumerImpl::start() {
ClientImpl::start();
if (State::STARTED != state_.load(std::memory_order_relaxed)) {
SPDLOG_WARN("Unexpected state: {}", state_.load(std::memory_order_relaxed));
return;
}
client_manager_->addClientObserver(shared_from_this());
}
void PullConsumerImpl::shutdown() {
// Shutdown services started by current tier
notifyClientTermination();
// Shutdown services that are started by the parent
ClientImpl::shutdown();
State expected = State::STOPPING;
if (state_.compare_exchange_strong(expected, State::STOPPED)) {
SPDLOG_INFO("DefaultMQPullConsumerImpl stopped");
}
}
std::future<std::vector<MQMessageQueue>> PullConsumerImpl::queuesFor(const std::string& topic) {
auto promise = std::make_shared<std::promise<std::vector<MQMessageQueue>>>();
{
absl::MutexLock lk(&topic_route_table_mtx_);
if (topic_route_table_.contains(topic)) {
TopicRouteDataPtr topic_route = topic_route_table_.at(topic);
auto partitions = topic_route->partitions();
std::vector<MQMessageQueue> message_queues;
message_queues.reserve(partitions.size());
for (const auto& partition : partitions) {
message_queues.emplace_back(partition.asMessageQueue());
}
promise->set_value(std::move(message_queues));
return promise->get_future();
}
}
auto callback = [promise](const std::error_code& ec, const TopicRouteDataPtr& route) {
if (ec) {
MQClientException e(ec.message(), ec.value(), __FILE__, __LINE__);
promise->set_exception(std::make_exception_ptr(e));
return;
}
std::vector<MQMessageQueue> message_queues;
for (const auto& partition : route->partitions()) {
message_queues.emplace_back(partition.asMessageQueue());
}
promise->set_value(message_queues);
};
getRouteFor(topic, callback);
return promise->get_future();
}
std::future<int64_t> PullConsumerImpl::queryOffset(const OffsetQuery& query) {
QueryOffsetRequest request;
switch (query.policy) {
case QueryOffsetPolicy::BEGINNING:
request.set_policy(rmq::QueryOffsetPolicy::BEGINNING);
break;
case QueryOffsetPolicy::END:
request.set_policy(rmq::QueryOffsetPolicy::END);
break;
case QueryOffsetPolicy::TIME_POINT:
request.set_policy(rmq::QueryOffsetPolicy::TIME_POINT);
auto duration = absl::FromChrono(query.time_point.time_since_epoch());
int64_t seconds = absl::ToInt64Seconds(duration);
request.mutable_time_point()->set_seconds(seconds);
request.mutable_time_point()->set_nanos(absl::ToInt64Nanoseconds(duration - absl::Seconds(seconds)));
break;
}
request.mutable_partition()->mutable_topic()->set_name(query.message_queue.getTopic());
request.mutable_partition()->mutable_topic()->set_resource_namespace(resource_namespace_);
request.mutable_partition()->set_id(query.message_queue.getQueueId());
absl::flat_hash_map<std::string, std::string> metadata;
Signature::sign(this, metadata);
// TODO: Use std::unique_ptr if C++14 is adopted.
auto promise_ptr = std::make_shared<std::promise<int64_t>>();
auto callback = [promise_ptr](const std::error_code& ec, const QueryOffsetResponse& response) {
if (ec) {
MQClientException e(ec.message(), ec.value(), __FILE__, __LINE__);
promise_ptr->set_exception(std::make_exception_ptr(e));
return;
}
promise_ptr->set_value(response.offset());
};
client_manager_->queryOffset(query.message_queue.serviceAddress(), metadata, request,
absl::ToChronoMilliseconds(io_timeout_), callback);
return promise_ptr->get_future();
}
void PullConsumerImpl::pull(const PullMessageQuery& query, PullCallback* cb) {
PullMessageRequest request;
request.set_offset(query.offset);
auto duration = absl::FromChrono(query.await_time);
int64_t seconds = absl::ToInt64Seconds(duration);
request.mutable_await_time()->set_seconds(seconds);
request.mutable_await_time()->set_nanos(absl::ToInt64Nanoseconds(duration - absl::Seconds(seconds)));
request.mutable_group()->set_name(group_name_);
request.mutable_group()->set_resource_namespace(resource_namespace_);
request.mutable_partition()->mutable_topic()->set_name(query.message_queue.getTopic());
request.mutable_partition()->mutable_topic()->set_resource_namespace(resource_namespace_);
request.mutable_partition()->set_id(query.message_queue.getQueueId());
request.set_client_id(clientId());
std::string target_host = query.message_queue.serviceAddress();
assert(!target_host.empty());
auto callback = [target_host, cb](const std::error_code& ec, const ReceiveMessageResult& result) {
if (ec) {
cb->onFailure(ec);
return;
}
PullResult pull_result(result.min_offset, result.max_offset, result.next_offset, result.messages);
cb->onSuccess(pull_result);
};
absl::flat_hash_map<std::string, std::string> metadata;
Signature::sign(this, metadata);
client_manager_->pullMessage(target_host, metadata, request, absl::ToChronoMilliseconds(long_polling_timeout_),
callback);
}
void PullConsumerImpl::prepareHeartbeatData(HeartbeatRequest& request) {
request.set_client_id(clientId());
auto consumer_data = request.mutable_consumer_data();
consumer_data->mutable_group()->set_resource_namespace(resource_namespace_);
consumer_data->mutable_group()->set_name(group_name_);
switch (message_model_) {
case MessageModel::BROADCASTING:
consumer_data->set_consume_model(rmq::ConsumeModel::BROADCASTING);
break;
case MessageModel::CLUSTERING:
consumer_data->set_consume_model(rmq::ConsumeModel::CLUSTERING);
break;
}
}
void PullConsumerImpl::notifyClientTermination() {
NotifyClientTerminationRequest request;
request.mutable_consumer_group()->set_resource_namespace(resource_namespace_);
request.mutable_consumer_group()->set_name(group_name_);
request.set_client_id(clientId());
ClientImpl::notifyClientTermination(request);
}
ROCKETMQ_NAMESPACE_END