[ISSUE #617] retrieve transactionid from property first (#620)
diff --git a/primitive/message.go b/primitive/message.go
index fd7e9c6..b330dc1 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -59,6 +59,7 @@
PropertyTranscationCheckTimes = "TRANSACTION_CHECK_TIMES"
PropertyCheckImmunityTimeInSeconds = "CHECK_IMMUNITY_TIME_IN_SECONDS"
PropertyShardingKey = "SHARDING_KEY"
+ PropertyTransactionID = "__transactionId__"
)
type Message struct {
diff --git a/producer/producer.go b/producer/producer.go
index 65e39c2..910bb23 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -462,13 +462,20 @@
if uniqueKey == "" {
uniqueKey = callback.Msg.MsgId
}
+ transactionId := callback.Msg.GetProperty(primitive.PropertyTransactionID)
+ if transactionId == "" {
+ transactionId = callback.Header.TransactionId
+ }
+ if transactionId == "" {
+ transactionId = callback.Msg.TransactionId
+ }
header := &internal.EndTransactionRequestHeader{
CommitLogOffset: callback.Header.CommitLogOffset,
ProducerGroup: tp.producer.group,
TranStateTableOffset: callback.Header.TranStateTableOffset,
FromTransactionCheck: true,
MsgID: uniqueKey,
- TransactionId: callback.Header.TransactionId,
+ TransactionId: transactionId,
CommitOrRollback: tp.transactionState(localTransactionState),
}