[ISSUE #688] Add information about transaction Source on endTransaction Request (#689)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 450a68d..a17d195 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.client.java.impl.producer;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.EndTransactionRequest;
@@ -29,6 +27,8 @@
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.Status;
+import apache.rocketmq.v2.TransactionSource;
+import com.google.common.base.Preconditions;
import com.google.common.math.IntMath;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -163,7 +163,7 @@
}
final GeneralMessage generalMessage = new GeneralMessageImpl(messageView);
endTransaction(endpoints, generalMessage, messageView.getMessageId(),
- transactionId, resolution);
+ transactionId, resolution, TransactionSource.SOURCE_SERVER_CHECK);
} catch (Throwable t) {
log.error("Exception raised while ending the transaction, messageId={}, transactionId={}, "
+ "endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
@@ -241,7 +241,7 @@
*/
@Override
public Transaction beginTransaction() {
- checkNotNull(checker, "Transaction checker should not be null");
+ Preconditions.checkNotNull(checker, "Transaction checker should not be null");
if (!this.isRunning()) {
log.error("Unable to begin a transaction because producer is not running, state={}, clientId={}",
this.state(), clientId);
@@ -256,9 +256,11 @@
}
public void endTransaction(Endpoints endpoints, GeneralMessage generalMessage, MessageId messageId,
- String transactionId, final TransactionResolution resolution) throws ClientException {
+ String transactionId, final TransactionResolution resolution, final TransactionSource transactionSource)
+ throws ClientException {
final EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder()
.setMessageId(messageId.toString()).setTransactionId(transactionId)
+ .setSource(transactionSource)
.setTopic(apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(generalMessage.getTopic())
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
index 51ca92c..9bc97c2 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.java.impl.producer;
+import apache.rocketmq.v2.TransactionSource;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.IOException;
import java.util.HashSet;
@@ -94,8 +95,14 @@
for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) {
final PublishingMessageImpl publishingMessage = entry.getKey();
final SendReceiptImpl sendReceipt = entry.getValue();
- producerImpl.endTransaction(sendReceipt.getEndpoints(), new GeneralMessageImpl(publishingMessage),
- sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.COMMIT);
+ producerImpl.endTransaction(
+ sendReceipt.getEndpoints(),
+ new GeneralMessageImpl(publishingMessage),
+ sendReceipt.getMessageId(),
+ sendReceipt.getTransactionId(),
+ TransactionResolution.COMMIT,
+ TransactionSource.SOURCE_CLIENT
+ );
}
}
@@ -107,8 +114,14 @@
for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) {
final PublishingMessageImpl publishingMessage = entry.getKey();
final SendReceiptImpl sendReceipt = entry.getValue();
- producerImpl.endTransaction(sendReceipt.getEndpoints(), new GeneralMessageImpl(publishingMessage),
- sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.ROLLBACK);
+ producerImpl.endTransaction(
+ sendReceipt.getEndpoints(),
+ new GeneralMessageImpl(publishingMessage),
+ sendReceipt.getMessageId(),
+ sendReceipt.getTransactionId(),
+ TransactionResolution.ROLLBACK,
+ TransactionSource.SOURCE_CLIENT
+ );
}
}
}
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
index 6cca321..68d0507 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
@@ -20,6 +20,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import apache.rocketmq.v2.TransactionSource;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
@@ -115,7 +116,7 @@
final SendReceiptImpl sendReceipt = fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
transaction.tryAddReceipt(publishingMessage, sendReceipt);
Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class), any(GeneralMessage.class),
- any(MessageId.class), anyString(), any(TransactionResolution.class));
+ any(MessageId.class), anyString(), any(TransactionResolution.class), any(TransactionSource.class));
transaction.commit();
}
@@ -130,7 +131,7 @@
final SendReceiptImpl sendReceipt = fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
transaction.tryAddReceipt(publishingMessage, sendReceipt);
Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class), any(GeneralMessage.class),
- any(MessageId.class), anyString(), any(TransactionResolution.class));
+ any(MessageId.class), anyString(), any(TransactionResolution.class), any(TransactionSource.class));
transaction.rollback();
}
}
\ No newline at end of file