Implement trace for transaction message (#385)
diff --git a/example/rocketmq/ExampleTransactionProducer.cpp b/example/rocketmq/ExampleTransactionProducer.cpp
index 1fe5b87..1041b92 100644
--- a/example/rocketmq/ExampleTransactionProducer.cpp
+++ b/example/rocketmq/ExampleTransactionProducer.cpp
@@ -44,6 +44,8 @@
transaction->commit();
+ std::this_thread::sleep_for(std::chrono::minutes(30));
+
producer.shutdown();
return EXIT_SUCCESS;
diff --git a/src/main/cpp/base/MixAll.cpp b/src/main/cpp/base/MixAll.cpp
index e919dac..0bf273a 100644
--- a/src/main/cpp/base/MixAll.cpp
+++ b/src/main/cpp/base/MixAll.cpp
@@ -75,8 +75,6 @@
const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME = "service.name";
const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME = "rocketmq-client";
-const char* MixAll::SPAN_NAME_END_TRANSACTION = "EndTransaction";
-
// Span attributes follows to the opentelemetry specification, refers to:
// https://github.com/open-telemetry/opentelemetry-specification
diff --git a/src/main/cpp/base/include/MixAll.h b/src/main/cpp/base/include/MixAll.h
index 5a45cd1..ae272dc 100644
--- a/src/main/cpp/base/include/MixAll.h
+++ b/src/main/cpp/base/include/MixAll.h
@@ -70,8 +70,6 @@
static const char* TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME;
static const char* TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME;
- static const char* SPAN_NAME_END_TRANSACTION;
-
// RocketMQ span attribute name list
static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION;
static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE;
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index ea54b75..38bc99e 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -373,6 +373,8 @@
auto retry_callback =
new RetrySendCallback(shared_from_this(), message, max_attempt_times, callback, std::move(list));
sendImpl(retry_callback);
+ const_cast<MQMessage&>(message).traceContext(
+ opencensus::trace::propagation::ToTraceParentHeader(retry_callback->span().context()));
}
bool ProducerImpl::endTransaction0(const std::string& target, const MQMessage& message,
@@ -403,25 +405,18 @@
opencensus::trace::SpanContext span_context =
opencensus::trace::propagation::FromTraceParentHeader(message.traceContext());
auto span = opencensus::trace::Span::BlankSpan();
+ std::string trace_operation_name = TransactionState::COMMIT == resolution
+ ? MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION
+ : MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION;
+ std::string span_name = resourceNamespace() + "/" + message.getTopic() + " " + trace_operation_name;
if (span_context.IsValid()) {
- span = opencensus::trace::Span::StartSpanWithRemoteParent(MixAll::SPAN_NAME_END_TRANSACTION, span_context,
- {&Samplers::always()});
+ span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {&Samplers::always()});
} else {
- span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_END_TRANSACTION, nullptr, {&Samplers::always()});
+ span = opencensus::trace::Span::StartSpan(span_name, nullptr, {&Samplers::always()});
}
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
- credentialsProvider()->getCredentials().accessKey());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, resourceNamespace());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, getGroupName());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message.getMsgId());
- switch (resolution) {
- case TransactionState::COMMIT:
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "commit");
- break;
- case TransactionState::ROLLBACK:
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "rollback");
- break;
- }
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, trace_operation_name);
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION, trace_operation_name);
+ TracingUtility::addUniversalSpanAttributes(message, *this, span);
absl::Mutex mtx;
absl::CondVar cv;