[ISSUE #688] Add information about transaction Source on endTransaction Request (#689)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 450a68d..a17d195 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import apache.rocketmq.v2.ClientType;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.EndTransactionRequest;
@@ -29,6 +27,8 @@
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.Status;
+import apache.rocketmq.v2.TransactionSource;
+import com.google.common.base.Preconditions;
 import com.google.common.math.IntMath;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -163,7 +163,7 @@
                     }
                     final GeneralMessage generalMessage = new GeneralMessageImpl(messageView);
                     endTransaction(endpoints, generalMessage, messageView.getMessageId(),
-                        transactionId, resolution);
+                        transactionId, resolution, TransactionSource.SOURCE_SERVER_CHECK);
                 } catch (Throwable t) {
                     log.error("Exception raised while ending the transaction, messageId={}, transactionId={}, "
                         + "endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
@@ -241,7 +241,7 @@
      */
     @Override
     public Transaction beginTransaction() {
-        checkNotNull(checker, "Transaction checker should not be null");
+        Preconditions.checkNotNull(checker, "Transaction checker should not be null");
         if (!this.isRunning()) {
             log.error("Unable to begin a transaction because producer is not running, state={}, clientId={}",
                 this.state(), clientId);
@@ -256,9 +256,11 @@
     }
 
     public void endTransaction(Endpoints endpoints, GeneralMessage generalMessage, MessageId messageId,
-        String transactionId, final TransactionResolution resolution) throws ClientException {
+        String transactionId, final TransactionResolution resolution, final TransactionSource transactionSource)
+        throws ClientException {
         final EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder()
             .setMessageId(messageId.toString()).setTransactionId(transactionId)
+            .setSource(transactionSource)
             .setTopic(apache.rocketmq.v2.Resource.newBuilder()
                 .setResourceNamespace(clientConfiguration.getNamespace())
                 .setName(generalMessage.getTopic())
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
index 51ca92c..9bc97c2 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
+import apache.rocketmq.v2.TransactionSource;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import java.io.IOException;
 import java.util.HashSet;
@@ -94,8 +95,14 @@
         for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) {
             final PublishingMessageImpl publishingMessage = entry.getKey();
             final SendReceiptImpl sendReceipt = entry.getValue();
-            producerImpl.endTransaction(sendReceipt.getEndpoints(), new GeneralMessageImpl(publishingMessage),
-                sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.COMMIT);
+            producerImpl.endTransaction(
+                sendReceipt.getEndpoints(),
+                new GeneralMessageImpl(publishingMessage),
+                sendReceipt.getMessageId(),
+                sendReceipt.getTransactionId(),
+                TransactionResolution.COMMIT,
+                TransactionSource.SOURCE_CLIENT
+            );
         }
     }
 
@@ -107,8 +114,14 @@
         for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) {
             final PublishingMessageImpl publishingMessage = entry.getKey();
             final SendReceiptImpl sendReceipt = entry.getValue();
-            producerImpl.endTransaction(sendReceipt.getEndpoints(), new GeneralMessageImpl(publishingMessage),
-                sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.ROLLBACK);
+            producerImpl.endTransaction(
+                sendReceipt.getEndpoints(),
+                new GeneralMessageImpl(publishingMessage),
+                sendReceipt.getMessageId(),
+                sendReceipt.getTransactionId(),
+                TransactionResolution.ROLLBACK,
+                TransactionSource.SOURCE_CLIENT
+            );
         }
     }
 }
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
index 6cca321..68d0507 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
@@ -20,6 +20,7 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 
+import apache.rocketmq.v2.TransactionSource;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
@@ -115,7 +116,7 @@
         final SendReceiptImpl sendReceipt = fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
         transaction.tryAddReceipt(publishingMessage, sendReceipt);
         Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class), any(GeneralMessage.class),
-            any(MessageId.class), anyString(), any(TransactionResolution.class));
+            any(MessageId.class), anyString(), any(TransactionResolution.class), any(TransactionSource.class));
         transaction.commit();
     }
 
@@ -130,7 +131,7 @@
         final SendReceiptImpl sendReceipt = fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
         transaction.tryAddReceipt(publishingMessage, sendReceipt);
         Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class), any(GeneralMessage.class),
-            any(MessageId.class), anyString(), any(TransactionResolution.class));
+            any(MessageId.class), anyString(), any(TransactionResolution.class), any(TransactionSource.class));
         transaction.rollback();
     }
 }
\ No newline at end of file