blob: 34975e71a6ac5afeac5626e079af2d6ba7f35550 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ProducerImpl.h"
#include <algorithm>
#include <atomic>
#include <cassert>
#include <chrono>
#include <memory>
#include <system_error>
#include <utility>
#include "apache/rocketmq/v2/definition.pb.h"
#include "MixAll.h"
#include "Protocol.h"
#include "PublishInfoCallback.h"
#include "SendContext.h"
#include "Signature.h"
#include "TracingUtility.h"
#include "TransactionImpl.h"
#include "UtilAll.h"
#include "opencensus/trace/propagation/trace_context.h"
#include "opencensus/trace/span.h"
#include "rocketmq/ErrorCode.h"
#include "rocketmq/Message.h"
#include "rocketmq/SendReceipt.h"
#include "rocketmq/Transaction.h"
#include "rocketmq/TransactionChecker.h"
ROCKETMQ_NAMESPACE_BEGIN
ProducerImpl::ProducerImpl() : ClientImpl(""), compress_body_threshold_(MixAll::DEFAULT_COMPRESS_BODY_THRESHOLD_) {
// TODO: initialize client_config_ and fault_strategy_
}
ProducerImpl::~ProducerImpl() {
SPDLOG_INFO("Producer instance is destructed");
shutdown();
}
void ProducerImpl::start() {
ClientImpl::start();
State expecting = State::STARTING;
if (!state_.compare_exchange_strong(expecting, State::STARTED)) {
SPDLOG_ERROR("Start with unexpected state. Expecting: {}, Actual: {}", State::STARTING,
state_.load(std::memory_order_relaxed));
return;
}
client_manager_->addClientObserver(shared_from_this());
}
void ProducerImpl::shutdown() {
State expected = State::STARTED;
if (!state_.compare_exchange_strong(expected, State::STOPPING)) {
SPDLOG_ERROR("Shutdown with unexpected state. Expecting: {}, Actual: {}", State::STOPPING,
state_.load(std::memory_order_relaxed));
return;
}
notifyClientTermination();
ClientImpl::shutdown();
assert(State::STOPPED == state_.load());
SPDLOG_INFO("Producer instance stopped");
}
void ProducerImpl::notifyClientTermination() {
}
bool ProducerImpl::isRunning() const {
return State::STARTED == state_.load(std::memory_order_relaxed);
}
void ProducerImpl::ensureRunning(std::error_code& ec) const noexcept {
if (!isRunning()) {
ec = ErrorCode::IllegalState;
}
}
void ProducerImpl::validate(const Message& message, std::error_code& ec) {
MixAll::validate(message, ec);
if (!ec) {
if (message.body().length() > client_config_.publisher.max_body_size) {
SPDLOG_WARN("Body of the message to send is too large");
ec = ErrorCode::PayloadTooLarge;
}
}
}
void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageRequest& request,
const rmq::MessageQueue& message_queue) {
auto msg = new rmq::Message();
msg->mutable_topic()->set_resource_namespace(resourceNamespace());
msg->mutable_topic()->set_name(message.topic());
auto system_properties = msg->mutable_system_properties();
// Handle Tag
if (!message.tag().empty()) {
system_properties->set_tag(message.tag());
}
// Handle Key
const auto& keys = message.keys();
if (!keys.empty()) {
system_properties->mutable_keys()->Add(keys.begin(), keys.end());
}
// TraceContext
if (!message.traceContext().empty()) {
const auto& trace_context = message.traceContext();
if (!trace_context.empty()) {
system_properties->set_trace_context(trace_context);
}
}
// Delivery Timestamp
if (message.deliveryTimestamp().time_since_epoch().count()) {
auto delivery_timestamp = message.deliveryTimestamp();
if (delivery_timestamp.time_since_epoch().count()) {
auto duration = delivery_timestamp.time_since_epoch();
system_properties->set_delivery_attempt(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
auto mutable_delivery_timestamp = system_properties->mutable_delivery_timestamp();
mutable_delivery_timestamp->set_seconds(std::chrono::duration_cast<std::chrono::seconds>(duration).count());
mutable_delivery_timestamp->set_nanos(std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count() % 1000000000);
}
}
// Born-time
auto duration = absl::Now() - absl::UnixEpoch();
int64_t seconds = absl::ToInt64Seconds(duration);
system_properties->mutable_born_timestamp()->set_seconds(seconds);
system_properties->mutable_born_timestamp()->set_nanos(absl::ToInt64Nanoseconds(duration - absl::Seconds(seconds)));
system_properties->set_born_host(UtilAll::hostname());
if (message.deliveryTimestamp().time_since_epoch().count()) {
system_properties->set_message_type(rmq::MessageType::DELAY);
} else if (!message.group().empty()) {
system_properties->set_message_type(rmq::MessageType::FIFO);
} else if (message.extension().transactional) {
system_properties->set_message_type(rmq::MessageType::TRANSACTION);
} else {
system_properties->set_message_type(rmq::MessageType::NORMAL);
}
if (message.body().size() >= compress_body_threshold_) {
std::string compressed_body;
UtilAll::compress(message.body(), compressed_body);
msg->set_body(compressed_body);
system_properties->set_body_encoding(rmq::Encoding::GZIP);
} else {
msg->set_body(message.body());
system_properties->set_body_encoding(rmq::Encoding::IDENTITY);
}
if (!message.group().empty()) {
system_properties->set_message_group(message.group());
}
system_properties->set_message_id(message.id());
system_properties->set_queue_id(message_queue.id());
// Forward user-defined-properties
for (auto& item : message.properties()) {
msg->mutable_user_properties()->insert({item.first, item.second});
}
// Add into request
request.mutable_messages()->AddAllocated(msg);
SPDLOG_TRACE("SendMessageRequest: {}", request.DebugString());
}
SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) noexcept {
ensureRunning(ec);
if (ec) {
SPDLOG_WARN("Producer is not running");
SendReceipt send_receipt{};
send_receipt.message = std::move(message);
return send_receipt;
}
auto topic_publish_info = getPublishInfo(message->topic());
if (!topic_publish_info) {
SPDLOG_WARN("Route of topic[{}] is not found", message->topic());
ec = ErrorCode::NotFound;
SendReceipt send_receipt{};
send_receipt.message = std::move(message);
return send_receipt;
}
std::vector<rmq::MessageQueue> message_queue_list;
if (!topic_publish_info->selectMessageQueues(absl::make_optional<std::string>(), message_queue_list)) {
SPDLOG_WARN("Failed to select an addressable message queue for topic[{}]", message->topic());
ec = ErrorCode::NotFound;
SendReceipt send_receipt{};
send_receipt.message = std::move(message);
return send_receipt;
}
auto mtx = std::make_shared<absl::Mutex>();
auto cv = std::make_shared<absl::CondVar>();
bool completed = false;
SendReceipt send_receipt;
// Define callback
auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) mutable {
ec = code;
SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
send_receipt.message = std::move(receipt_mut.message);
{
absl::MutexLock lk(mtx.get());
completed = true;
}
cv->Signal();
};
send0(std::move(message), callback, message_queue_list);
{
absl::MutexLock lk(mtx.get());
if (!completed) {
cv->Wait(mtx.get());
}
}
return send_receipt;
}
void ProducerImpl::send(MessageConstPtr message, SendCallback cb) {
std::error_code ec;
ensureRunning(ec);
if (ec) {
SendReceipt send_receipt;
send_receipt.message = std::move(message);
cb(ec, send_receipt);
}
std::string topic = message->topic();
std::weak_ptr<ProducerImpl> client(shared_from_this());
// Walk-around: Use capture by move when C++14 is possible
const Message* msg = message.release();
auto callback = [client, cb, msg](const std::error_code& ec, const TopicPublishInfoPtr& publish_info) {
MessageConstPtr ptr(msg);
// No route entries of the given topic is available
if (ec) {
SendReceipt send_receipt;
send_receipt.message = std::move(ptr);
cb(ec, send_receipt);
return;
}
if (!publish_info) {
std::error_code ec = ErrorCode::NotFound;
SendReceipt send_receipt;
send_receipt.message = std::move(ptr);
cb(ec, send_receipt);
return;
}
auto publisher = client.lock();
std::vector<rmq::MessageQueue> message_queue_list;
if (!publish_info->selectMessageQueues(ptr->group(), message_queue_list)) {
std::error_code ec = ErrorCode::NotFound;
SendReceipt send_receipt;
send_receipt.message = std::move(ptr);
cb(ec, send_receipt);
return;
}
publisher->send0(std::move(ptr), cb, message_queue_list);
};
getPublishInfoAsync(topic, callback);
}
void ProducerImpl::setTransactionChecker(TransactionChecker checker) {
transaction_checker_ = std::move(checker);
}
void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) {
const std::string& target = urlOf(context->messageQueue());
if (target.empty()) {
SPDLOG_WARN("Failed to resolve broker address from MessageQueue");
std::error_code ec = ErrorCode::BadGateway;
context->onFailure(ec);
return;
}
{
// Trace Send RPC
if (!context->message_->traceContext().empty() && client_config_.sampler_) {
auto span_context = opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext());
auto span = opencensus::trace::Span::BlankSpan();
std::string span_name = resourceNamespace() + "/" + context->message_->topic() + " " +
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION;
if (span_context.IsValid()) {
span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context,
{client_config_.sampler_.get()});
} else {
span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()});
}
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION);
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION);
TracingUtility::addUniversalSpanAttributes(*context->message_, config(), span);
// Note: attempt-time is 0-based
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, 1 + context->attempt_times_);
if (context->message_->deliveryTimestamp().time_since_epoch().count()) {
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP,
absl::FormatTime(absl::FromChrono(context->message_->deliveryTimestamp())));
}
auto ptr = const_cast<Message*>(context->message_.get());
ptr->traceContext(opencensus::trace::propagation::ToTraceParentHeader(span.context()));
context->span_ = span;
}
}
SendMessageRequest request;
wrapSendMessageRequest(*context->message_, request, context->messageQueue());
Metadata metadata;
Signature::sign(client_config_, metadata);
auto callback = [context](const SendResult& send_result) {
if (send_result.ec) {
context->onFailure(send_result.ec);
return;
}
context->onSuccess(send_result);
};
client_manager_->send(target, metadata, request, callback);
}
void ProducerImpl::send0(MessageConstPtr message, SendCallback callback, std::vector<rmq::MessageQueue> list) {
SendReceipt send_receipt;
std::error_code ec;
validate(*message, ec);
if (ec) {
send_receipt.message = std::move(message);
callback(ec, send_receipt);
return;
}
if (list.empty()) {
ec = ErrorCode::NotFound;
send_receipt.message = std::move(message);
callback(ec, send_receipt);
return;
}
auto context = std::make_shared<SendContext>(shared_from_this(), std::move(message), callback, std::move(list));
sendImpl(context);
}
bool ProducerImpl::endTransaction0(const MiniTransaction& transaction, TransactionState resolution) {
EndTransactionRequest request;
const std::string& topic = transaction.topic;
request.mutable_topic()->set_name(topic);
request.mutable_topic()->set_resource_namespace(resourceNamespace());
request.set_message_id(transaction.message_id);
request.set_transaction_id(transaction.transaction_id);
std::string action;
switch (resolution) {
case TransactionState::COMMIT:
request.set_resolution(rmq::COMMIT);
action = "commit";
break;
case TransactionState::ROLLBACK:
request.set_resolution(rmq::ROLLBACK);
action = "rollback";
break;
}
absl::flat_hash_map<std::string, std::string> metadata;
Signature::sign(config(), metadata);
bool completed = false;
bool success = false;
auto span = opencensus::trace::Span::BlankSpan();
if (!transaction.trace_context.empty() && client_config_.sampler_) {
// Trace transactional message
opencensus::trace::SpanContext span_context =
opencensus::trace::propagation::FromTraceParentHeader(transaction.trace_context);
std::string trace_operation_name = TransactionState::COMMIT == resolution
? MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION
: MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION;
std::string span_name = resourceNamespace() + "/" + transaction.topic + " " + trace_operation_name;
if (span_context.IsValid()) {
span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {client_config_.sampler_.get()});
} else {
span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()});
}
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, trace_operation_name);
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION, trace_operation_name);
// TracingUtility::addUniversalSpanAttributes(message, config(), span);
}
auto mtx = std::make_shared<absl::Mutex>();
auto cv = std::make_shared<absl::CondVar>();
const auto& endpoint = transaction.target;
std::weak_ptr<ProducerImpl> publisher(shared_from_this());
auto cb = [&, span, endpoint, mtx, cv, topic](const std::error_code& ec, const EndTransactionResponse& response) {
if (ec) {
{
span.SetStatus(opencensus::trace::StatusCode::ABORTED);
span.AddAnnotation(ec.message());
span.End();
}
SPDLOG_WARN("Failed to send {} transaction request to {}. Cause: ", action, endpoint, ec.message());
success = false;
} else {
{
span.SetStatus(opencensus::trace::StatusCode::OK);
span.End();
}
success = true;
}
{
absl::MutexLock lk(mtx.get());
completed = true;
cv->SignalAll();
}
};
client_manager_->endTransaction(transaction.target, metadata, request, absl::ToChronoMilliseconds(requestTimeout()),
cb);
{
absl::MutexLock lk(mtx.get());
cv->Wait(mtx.get());
}
return success;
}
void ProducerImpl::isolatedEndpoints(absl::flat_hash_set<std::string>& endpoints) {
absl::MutexLock lk(&isolated_endpoints_mtx_);
endpoints.insert(isolated_endpoints_.begin(), isolated_endpoints_.end());
}
bool ProducerImpl::isEndpointIsolated(const std::string& target) {
absl::MutexLock lk(&isolated_endpoints_mtx_);
return isolated_endpoints_.contains(target);
}
void ProducerImpl::isolateEndpoint(const std::string& target) {
absl::MutexLock lk(&isolated_endpoints_mtx_);
isolated_endpoints_.insert(target);
}
void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) {
MiniTransaction mini = {};
mini.topic = message->topic();
mini.trace_context = message->traceContext();
if (!message->group().empty()) {
ec = ErrorCode::MessagePropertyConflictWithType;
SPDLOG_WARN("FIFO message may not be transactional");
return;
}
if (message->deliveryTimestamp().time_since_epoch().count()) {
ec = ErrorCode::MessagePropertyConflictWithType;
SPDLOG_WARN("Timed message may not be transactional");
return;
}
Message* msg = const_cast<Message*>(message.get());
msg->mutableExtension().transactional = true;
SendReceipt send_receipt = send(std::move(message), ec);
mini.message_id = send_receipt.message_id;
mini.transaction_id = send_receipt.transaction_id;
mini.target = send_receipt.target;
auto& impl = dynamic_cast<TransactionImpl&>(transaction);
impl.appendMiniTransaction(mini);
}
void ProducerImpl::getPublishInfoAsync(const std::string& topic, const PublishInfoCallback& cb) {
TopicPublishInfoPtr ptr;
{
absl::MutexLock lock(&topic_publish_info_mtx_);
if (topic_publish_info_table_.contains(topic)) {
ptr = topic_publish_info_table_.at(topic);
}
}
std::error_code ec;
if (ptr) {
// Serve with the cached one
cb(ec, ptr);
return;
}
std::weak_ptr<ProducerImpl> producer(shared_from_this());
auto callback = [topic, cb, producer](const std::error_code& ec, const TopicRouteDataPtr& route) {
if (ec) {
cb(ec, nullptr);
return;
}
auto publish_info = std::make_shared<TopicPublishInfo>(producer, topic, route);
auto impl = producer.lock();
if (impl) {
impl->cachePublishInfo(topic, publish_info);
}
cb(ec, publish_info);
};
getRouteFor(topic, callback);
}
void ProducerImpl::cachePublishInfo(const std::string& topic, TopicPublishInfoPtr info) {
absl::MutexLock lk(&topic_publish_info_mtx_);
topic_publish_info_table_.insert_or_assign(topic, info);
}
TopicPublishInfoPtr ProducerImpl::getPublishInfo(const std::string& topic) {
bool complete = false;
auto mtx = std::make_shared<absl::Mutex>();
auto cv = std::make_shared<absl::CondVar>();
TopicPublishInfoPtr topic_publish_info;
std::error_code error_code;
auto cb = [&, mtx, cv](const std::error_code& ec, const TopicPublishInfoPtr& ptr) {
absl::MutexLock lk(mtx.get());
topic_publish_info = ptr;
error_code = ec;
complete = true;
cv->SignalAll();
};
getPublishInfoAsync(topic, cb);
// Wait till acquiring topic publish info completes
while (!complete) {
absl::MutexLock lk(mtx.get());
cv->Wait(mtx.get());
}
// TODO: propagate error_code to caller
return topic_publish_info;
}
void ProducerImpl::prepareHeartbeatData(HeartbeatRequest& request) {
request.set_client_type(rmq::ClientType::PRODUCER);
}
void ProducerImpl::onOrphanedTransactionalMessage(MessageConstSharedPtr message) {
if (transaction_checker_) {
std::weak_ptr<ProducerImpl> producer(shared_from_this());
MiniTransaction transaction = {};
transaction.topic = message->topic();
transaction.message_id = message->id();
transaction.transaction_id = message->extension().transaction_id;
transaction.trace_context = message->traceContext();
transaction.target = message->extension().target_endpoint;
TransactionState state = transaction_checker_(*message);
endTransaction0(transaction, state);
} else {
SPDLOG_WARN("LocalTransactionStateChecker is unexpectedly nullptr");
}
}
void ProducerImpl::topicsOfInterest(std::vector<std::string> &topics) {
absl::MutexLock lk(&topics_mtx_);
for (auto& topic : topics_) {
if (std::find(topics.begin(), topics.end(), topic) == topics.end()) {
topics.push_back(topic);
}
}
}
void ProducerImpl::withTopics(const std::vector<std::string> &topics) {
absl::MutexLock lk(&topics_mtx_);
for (auto &topic: topics) {
if (std::find(topics_.begin(), topics_.end(), topic) == topics_.end()) {
topics_.push_back(topic);
}
}
}
void ProducerImpl::buildClientSettings(rmq::Settings& settings) {
settings.set_client_type(rmq::ClientType::PRODUCER);
settings.mutable_publishing()->set_max_body_size(MixAll::MAX_MESSAGE_BODY_SIZE);
settings.mutable_publishing()->set_validate_message_type(false);
auto topics = settings.mutable_publishing()->mutable_topics();
{
absl::MutexLock lk(&topics_mtx_);
for (const auto& item : topics_) {
auto topic = new rmq::Resource;
topic->set_resource_namespace(resourceNamespace());
topic->set_name(item);
topics->AddAllocated(topic);
}
}
}
ROCKETMQ_NAMESPACE_END