Implement trace for transaction message (#385)

diff --git a/example/rocketmq/ExampleTransactionProducer.cpp b/example/rocketmq/ExampleTransactionProducer.cpp
index 1fe5b87..1041b92 100644
--- a/example/rocketmq/ExampleTransactionProducer.cpp
+++ b/example/rocketmq/ExampleTransactionProducer.cpp
@@ -44,6 +44,8 @@
 
   transaction->commit();
 
+  std::this_thread::sleep_for(std::chrono::minutes(30));
+
   producer.shutdown();
 
   return EXIT_SUCCESS;
diff --git a/src/main/cpp/base/MixAll.cpp b/src/main/cpp/base/MixAll.cpp
index e919dac..0bf273a 100644
--- a/src/main/cpp/base/MixAll.cpp
+++ b/src/main/cpp/base/MixAll.cpp
@@ -75,8 +75,6 @@
 const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME = "service.name";
 const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME = "rocketmq-client";
 
-const char* MixAll::SPAN_NAME_END_TRANSACTION = "EndTransaction";
-
 // Span attributes follows to the opentelemetry specification, refers to:
 // https://github.com/open-telemetry/opentelemetry-specification
 
diff --git a/src/main/cpp/base/include/MixAll.h b/src/main/cpp/base/include/MixAll.h
index 5a45cd1..ae272dc 100644
--- a/src/main/cpp/base/include/MixAll.h
+++ b/src/main/cpp/base/include/MixAll.h
@@ -70,8 +70,6 @@
   static const char* TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME;
   static const char* TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME;
 
-  static const char* SPAN_NAME_END_TRANSACTION;
-
   // RocketMQ span attribute name list
   static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION;
   static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE;
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index ea54b75..38bc99e 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -373,6 +373,8 @@
   auto retry_callback =
       new RetrySendCallback(shared_from_this(), message, max_attempt_times, callback, std::move(list));
   sendImpl(retry_callback);
+  const_cast<MQMessage&>(message).traceContext(
+      opencensus::trace::propagation::ToTraceParentHeader(retry_callback->span().context()));
 }
 
 bool ProducerImpl::endTransaction0(const std::string& target, const MQMessage& message,
@@ -403,25 +405,18 @@
   opencensus::trace::SpanContext span_context =
       opencensus::trace::propagation::FromTraceParentHeader(message.traceContext());
   auto span = opencensus::trace::Span::BlankSpan();
+  std::string trace_operation_name = TransactionState::COMMIT == resolution
+                                         ? MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION
+                                         : MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION;
+  std::string span_name = resourceNamespace() + "/" + message.getTopic() + " " + trace_operation_name;
   if (span_context.IsValid()) {
-    span = opencensus::trace::Span::StartSpanWithRemoteParent(MixAll::SPAN_NAME_END_TRANSACTION, span_context,
-                                                              {&Samplers::always()});
+    span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {&Samplers::always()});
   } else {
-    span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_END_TRANSACTION, nullptr, {&Samplers::always()});
+    span = opencensus::trace::Span::StartSpan(span_name, nullptr, {&Samplers::always()});
   }
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
-                    credentialsProvider()->getCredentials().accessKey());
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, resourceNamespace());
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, getGroupName());
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message.getMsgId());
-  switch (resolution) {
-    case TransactionState::COMMIT:
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "commit");
-      break;
-    case TransactionState::ROLLBACK:
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "rollback");
-      break;
-  }
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, trace_operation_name);
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION, trace_operation_name);
+  TracingUtility::addUniversalSpanAttributes(message, *this, span);
 
   absl::Mutex mtx;
   absl::CondVar cv;