GEODE-8783: Publish batchesWithIncompleteTransactions in GatewaySende… (#5845)

* GEODE-8783: Publish batchesWithIncompleteTransactions in GatewaySenderMXBean
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayMBeanBridgeJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayMBeanBridgeJUnitTest.java
index cd1a55d..a46d3c2 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayMBeanBridgeJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayMBeanBridgeJUnitTest.java
@@ -51,6 +51,10 @@
 
   @Test
   public void testSenderStats() throws InterruptedException {
+    senderStats.incBatchesWithIncompleteTransactions();
+    senderStats.incBatchesWithIncompleteTransactions();
+    senderStats.incBatchesWithIncompleteTransactions();
+    senderStats.incBatchesRedistributed();
     senderStats.incBatchesRedistributed();
     senderStats.incEventsReceived();
     Mockito.when(sender.getEventQueueSize()).thenReturn(10);
@@ -61,7 +65,9 @@
 
     sample();
 
-    assertEquals(1, getTotalBatchesRedistributed());
+    assertEquals(1, getTotalBatchesDistributed());
+    assertEquals(2, getTotalBatchesRedistributed());
+    assertEquals(3, getTotalBatchesWithIncompleteTransactions());
     assertEquals(1, getTotalEventsConflated());
     assertEquals(10, getEventQueueSize());
     assertTrue(getEventsQueuedRate() > 0);
@@ -71,10 +77,18 @@
     assertTrue(getEventsExceedingAlertThreshold() > 0);
   }
 
+  private int getTotalBatchesDistributed() {
+    return bridge.getTotalBatchesDistributed();
+  }
+
   private int getTotalBatchesRedistributed() {
     return bridge.getTotalBatchesRedistributed();
   }
 
+  private int getTotalBatchesWithIncompleteTransactions() {
+    return bridge.getTotalBatchesWithIncompleteTransactions();
+  }
+
   private int getTotalEventsConflated() {
     return bridge.getTotalEventsConflated();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index cf9cde4..73da797 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -63,6 +63,9 @@
   protected static final String BATCHES_DISTRIBUTED = "batchesDistributed";
   /** Name of the batches redistributed statistic */
   protected static final String BATCHES_REDISTRIBUTED = "batchesRedistributed";
+  /** Name of the batches redistributed statistic */
+  protected static final String BATCHES_WITH_INCOMPLETE_TRANSACTIONS =
+      "batchesWithIncompleteTransactions";
   /** Name of the batches resized statistic */
   protected static final String BATCHES_RESIZED = "batchesResized";
   /** Name of the unprocessed events added by primary statistic */
@@ -125,6 +128,8 @@
   private static final int batchesDistributedId;
   /** Id of the batches redistributed statistic */
   private static final int batchesRedistributedId;
+  /** Id of the batches with incomplete transactions statistic */
+  private static final int batchesWithIncompleteTransactionsId;
   /** Id of the batches resized statistic */
   private static final int batchesResizedId;
   /** Id of the unprocessed events added by primary statistic */
@@ -184,6 +189,7 @@
     batchDistributionTimeId = type.nameToId(BATCH_DISTRIBUTION_TIME);
     batchesDistributedId = type.nameToId(BATCHES_DISTRIBUTED);
     batchesRedistributedId = type.nameToId(BATCHES_REDISTRIBUTED);
+    batchesWithIncompleteTransactionsId = type.nameToId(BATCHES_WITH_INCOMPLETE_TRANSACTIONS);
     batchesResizedId = type.nameToId(BATCHES_RESIZED);
     unprocessedTokensAddedByPrimaryId = type.nameToId(UNPROCESSED_TOKENS_ADDED_BY_PRIMARY);
     unprocessedEventsAddedBySecondaryId = type.nameToId(UNPROCESSED_EVENTS_ADDED_BY_SECONDARY);
@@ -240,6 +246,9 @@
             f.createIntCounter(BATCHES_REDISTRIBUTED,
                 "Number of batches of events removed from the event queue and resent.",
                 "operations", false),
+            f.createLongCounter(BATCHES_WITH_INCOMPLETE_TRANSACTIONS,
+                "Number of batches of events sent with incomplete transactions.",
+                "operations", false),
             f.createIntCounter(BATCHES_RESIZED,
                 "Number of batches that were resized because they were too large", "operations",
                 false),
@@ -461,27 +470,36 @@
   }
 
   /**
-   * Returns the current value of the batchesDistributed" stat.
+   * Returns the current value of the "batchesDistributed" stat.
    *
-   * @return the current value of the batchesDistributed" stat
+   * @return the current value of the "batchesDistributed" stat
    */
   public int getBatchesDistributed() {
     return this.stats.getInt(batchesDistributedId);
   }
 
   /**
-   * Returns the current value of the batchesRedistributed" stat.
+   * Returns the current value of the "batchesRedistributed" stat.
    *
-   * @return the current value of the batchesRedistributed" stat
+   * @return the current value of the "batchesRedistributed" stat
    */
   public int getBatchesRedistributed() {
     return this.stats.getInt(batchesRedistributedId);
   }
 
   /**
-   * Returns the current value of the batchesResized" stat.
+   * Returns the current value of the "batchesWithIncompleteTransactions" stat.
    *
-   * @return the current value of the batchesResized" stat
+   * @return the current value of the "batchesWithIncompleteTransactions" stat
+   */
+  public long getBatchesWithIncompleteTransactions() {
+    return this.stats.getLong(batchesWithIncompleteTransactionsId);
+  }
+
+  /**
+   * Returns the current value of the "batchesResized" stat.
+   *
+   * @return the current value of the "batchesResized" stat
    */
   public int getBatchesResized() {
     return this.stats.getInt(batchesResizedId);
@@ -495,6 +513,13 @@
   }
 
   /**
+   * Increments the value of the "batchesWithIncompleteTransactions" stat by 1.
+   */
+  public void incBatchesWithIncompleteTransactions() {
+    this.stats.incLong(batchesWithIncompleteTransactionsId, 1);
+  }
+
+  /**
    * Increments the value of the "batchesRedistributed" stat by 1.
    */
   public void incBatchesResized() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 2100d54..b7e2922 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1311,6 +1311,7 @@
       peekEventsFromIncompleteTransactions(batch, prQ);
     }
 
+
     if (isDebugEnabled) {
       logger.debug("{}: Peeked a batch of {} entries. The size of the queue is {}. localSize is {}",
           this, batch.size(), size(), localSize());
@@ -1354,6 +1355,7 @@
       return;
     }
 
+    boolean batchHasIncompleteTransactions = false;
     for (Map.Entry<TransactionId, Integer> pendingTransaction : incompleteTransactionIdsInBatch
         .entrySet()) {
       TransactionId transactionId = pendingTransaction.getKey();
@@ -1377,10 +1379,14 @@
         }
       }
       if (!areAllEventsForTransactionInBatch) {
+        batchHasIncompleteTransactions = true;
         logger.warn("Not able to retrieve all events for transaction {} after {} tries",
             transactionId, retries);
       }
     }
+    if (batchHasIncompleteTransactions) {
+      stats.incBatchesWithIncompleteTransactions();
+    }
   }
 
   private Map<TransactionId, Integer> getIncompleteTransactionsInBatch(
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index ce119a2..45f4f84 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -469,6 +469,7 @@
       return;
     }
 
+    boolean batchHasIncompleteTransactions = true;
     for (TransactionId transactionId : incompleteTransactionIdsInBatch) {
       boolean areAllEventsForTransactionInBatch = false;
       int retries = 0;
@@ -492,10 +493,14 @@
         lastKeyForTransaction = eventsAndKey.lastKey;
       }
       if (!areAllEventsForTransactionInBatch) {
+        batchHasIncompleteTransactions = true;
         logger.warn("Not able to retrieve all events for transaction {} after {} tries",
             transactionId, retries);
       }
     }
+    if (batchHasIncompleteTransactions) {
+      stats.incBatchesWithIncompleteTransactions();
+    }
   }
 
   protected boolean mustGroupTransactionEvents() {
diff --git a/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java b/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java
index a889dd5..adc9202 100644
--- a/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java
+++ b/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java
@@ -174,9 +174,20 @@
   /**
    * Returns the total number of batches of events that were resent.
    */
+  int getTotalBatchesDistributed();
+
+  /**
+   * Returns the total number of batches of events that were resent.
+   */
   int getTotalBatchesRedistributed();
 
   /**
+   * Returns the total number of batches sent with incomplete transactions.
+   * Only relevant if group-transaction-events is enabled.
+   */
+  int getTotalBatchesWithIncompleteTransactions();
+
+  /**
    * Returns the total number of bytes in heap occupied by the event queue.
    */
   long getTotalQueueSizeBytesInUse();
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
index 10a2f75..c7ea0d4 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
@@ -1275,6 +1275,13 @@
   }
 
   /**
+   * @return total batches distributed
+   */
+  public int getGatewaySenderTotalBatchesDistributed() {
+    return senderMonitor.getGatewaySenderTotalBatchesDistributed();
+  }
+
+  /**
    * @return total batches redistributed
    */
   public int getGatewaySenderTotalBatchesRedistributed() {
@@ -1282,6 +1289,13 @@
   }
 
   /**
+   * @return total batches with incomplete transactions
+   */
+  public int getGatewaySenderTotalBatchesWithIncompleteTransactions() {
+    return senderMonitor.getGatewaySenderTotalBatchesWithIncompleteTransactions();
+  }
+
+  /**
    * @return total number of events conflated
    */
   public int getGatewaySenderTotalEventsConflated() {
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java
index 1f422ff..e83dde4 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java
@@ -124,11 +124,21 @@
   }
 
   @Override
+  public int getTotalBatchesDistributed() {
+    return bridge.getTotalBatchesDistributed();
+  }
+
+  @Override
   public int getTotalBatchesRedistributed() {
     return bridge.getTotalBatchesRedistributed();
   }
 
   @Override
+  public int getTotalBatchesWithIncompleteTransactions() {
+    return bridge.getTotalBatchesWithIncompleteTransactions();
+  }
+
+  @Override
   public int getTotalEventsConflated() {
     return bridge.getTotalEventsConflated();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
index c07802b..0d38622 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
@@ -248,11 +248,19 @@
 
   /** Statistics Related Attributes **/
 
+  public int getTotalBatchesDistributed() {
+    return getStatistic(StatsKey.GATEWAYSENDER_BATCHES_DISTRIBUTED).intValue();
+  }
 
   public int getTotalBatchesRedistributed() {
     return getStatistic(StatsKey.GATEWAYSENDER_TOTAL_BATCHES_REDISTRIBUTED).intValue();
   }
 
+  public int getTotalBatchesWithIncompleteTransactions() {
+    return getStatistic(StatsKey.GATEWAYSENDER_TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS)
+        .intValue();
+  }
+
   public int getTotalEventsConflated() {
     return getStatistic(StatsKey.GATEWAYSENDER_EVENTS_QUEUED_CONFLATED).intValue();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderClusterStatsMonitor.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderClusterStatsMonitor.java
index 37f9c7d..6186cf5 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderClusterStatsMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderClusterStatsMonitor.java
@@ -30,8 +30,13 @@
 
   private static final String EVENTS_QUEUED_RATE = "EventsQueuedRate";
 
+  private static final String TOTAL_BATCHES_DISTRIBUTED = "TotalBatchesDistributed";
+
   private static final String TOTAL_BATCHES_REDISTRIBUTED = "TotalBatchesRedistributed";
 
+  private static final String TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS =
+      "TotalBatchesWithIncompleteTransactions";
+
   private static final String TOTAL_EVENTS_CONFLATED = "TotalEventsConflated";
 
 
@@ -54,7 +59,9 @@
     typeMap.put(BATCHES_DISPATCHED_RATE, Float.TYPE);
     typeMap.put(EVENT_QUEUE_SIZE, Integer.TYPE);
     typeMap.put(EVENTS_QUEUED_RATE, Float.TYPE);
+    typeMap.put(TOTAL_BATCHES_DISTRIBUTED, Integer.TYPE);
     typeMap.put(TOTAL_BATCHES_REDISTRIBUTED, Integer.TYPE);
+    typeMap.put(TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS, Integer.TYPE);
     typeMap.put(TOTAL_EVENTS_CONFLATED, Integer.TYPE);
 
   }
@@ -75,10 +82,18 @@
     return aggregator.getFloatValue(EVENTS_QUEUED_RATE);
   }
 
+  public int getGatewaySenderTotalBatchesDistributed() {
+    return aggregator.getIntValue(TOTAL_BATCHES_DISTRIBUTED);
+  }
+
   public int getGatewaySenderTotalBatchesRedistributed() {
     return aggregator.getIntValue(TOTAL_BATCHES_REDISTRIBUTED);
   }
 
+  public int getGatewaySenderTotalBatchesWithIncompleteTransactions() {
+    return aggregator.getIntValue(TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS);
+  }
+
   public int getGatewaySenderTotalEventsConflated() {
     return aggregator.getIntValue(TOTAL_EVENTS_CONFLATED);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java
index be65916..56d856a 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java
@@ -302,6 +302,8 @@
   public static final String GATEWAYSENDER_BATCHES_DISTRIBUTED = "batchesDistributed";
   public static final String GATEWAYSENDER_BATCHES_DISTRIBUTE_TIME = "batchDistributionTime";
   public static final String GATEWAYSENDER_TOTAL_BATCHES_REDISTRIBUTED = "batchesRedistributed";
+  public static final String GATEWAYSENDER_TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS =
+      "batchesWithIncompleteTransactions";
   public static final String GATEWAYSENDER_EVENTS_QUEUED_CONFLATED = "eventsNotQueuedConflated";
   public static final String GATEWAYSENDER_EVENTS_EXCEEDING_ALERT_THRESHOLD =
       "eventsExceedingAlertThreshold";
diff --git a/geode-docs/reference/topics/cache_xml.html.md.erb b/geode-docs/reference/topics/cache_xml.html.md.erb
index 0c83a17..9ab1ba7 100644
--- a/geode-docs/reference/topics/cache_xml.html.md.erb
+++ b/geode-docs/reference/topics/cache_xml.html.md.erb
@@ -299,7 +299,8 @@
 <td>group-transaction-events</td>
 <td>Boolean value to ensure that all the events of a transaction are sent in the same batch, i.e., they are never spread across different batches.
 <p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders with the <code class="ph codeph">parallel</code> attribute set to true. Also, the  <code class="ph codeph">enable-batch-conflation</code> attribute of the gateway sender must be set to false.</p>
-<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p></td>
+<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p>
+<p><b>Note:</b> If the above condition is not fulfilled or under very high load traffic conditions, it may not be guaranteed that all the events for a transaction will be sent in the same batch, even if <code class="ph codeph">group-transaction-events</code> is enabled. The number of batches sent with incomplete transactions can be retrieved from the <code class="ph codeph">GatewaySenderMXBean</code> bean.</p></td>
 <td>false</td>
 </tr>
 </tbody>
diff --git a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
index 53c8f79..30b4da1 100644
--- a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
+++ b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
@@ -636,7 +636,8 @@
 <td><span class="keyword parmname">\-\-group-transaction-events</span></td>
 <td>Boolean value to ensure that all the events of a transaction are sent in the same batch, i.e., they are never spread across different batches.
 <p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders with the <code class="ph codeph">parallel</code> attribute set to true. Also, the  <code class="ph codeph">enable-batch-conflation</code> attribute of the gateway sender must be set to false.</p>
-<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p></td>
+<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p>
+<p><b>Note:</b> If the above condition is not fulfilled or under very high load traffic conditions, it may not be guaranteed that all the events for a transaction will be sent in the same batch, even if <code class="ph codeph">group-transaction-events</code> is enabled. The number of batches sent with incomplete transactions can be retrieved from the <code class="ph codeph">GatewaySenderMXBean</code> bean.</p></td>
 </td>
 <td>false</td>
 </tr>
diff --git a/geode-docs/topologies_and_comm/topology_concepts/multisite_overview.html.md.erb b/geode-docs/topologies_and_comm/topology_concepts/multisite_overview.html.md.erb
index 20d063b..6bd91b7 100644
--- a/geode-docs/topologies_and_comm/topology_concepts/multisite_overview.html.md.erb
+++ b/geode-docs/topologies_and_comm/topology_concepts/multisite_overview.html.md.erb
@@ -43,6 +43,9 @@
 - The regions to which the transaction events belong must be replicated by the same set of gateway senders that also have this setting enabled.
 - This setting cannot be enabled if `enable-batch-conflation` is in effect.
 
+**Note:**
+If the above conditions are not fulfilled or under very high load traffic conditions, it may not be guaranteed that all the events for a transaction will be sent in the same batch, even if <code class="ph codeph">group-transaction-events</code> is enabled. The number of batches sent with incomplete transactions can be retrieved from the <code class="ph codeph">GatewaySenderMXBean</code> bean.
+
 By default, potential WAN conflicts are resolved using a timestamp mechanism. You can optionally install a custom conflict resolver to apply custom logic when determining whether to apply a potentially conflicting update received over a WAN.
 
 [Consistency for Region Updates](../../developing/distributed_regions/region_entry_versions.html#topic_CF2798D3E12647F182C2CEC4A46E2045) describes how Geode ensures consistency within a cluster, in client caches, and when applying updates over a WAN. [Resolving Conflicting Events](../../developing/events/resolving_multisite_conflicts.html#topic_E97BB68748F14987916CD1A50E4B4542) provides more details about implementing a custom conflict resolver for WAN updates.
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 9292bae..413dcad 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1251,6 +1251,7 @@
     stats.add(statistics.getSecondaryEventQueueSize());
     stats.add(statistics.getEventsProcessedByPQRM());
     stats.add(statistics.getEventsExceedingAlertThreshold());
+    stats.add((int) statistics.getBatchesWithIncompleteTransactions());
     return stats;
   }
 
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index d41dc6a..2138342 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -330,23 +330,23 @@
 
     createSenders(lnPort, false);
 
-    createReceiverCustomerOrderShipmentPR(vm2, 0);
+    createReceiverCustomerOrderShipmentPR(vm2);
 
-    createSenderCustomerOrderShipmentPRs(vm4, 0);
-    createSenderCustomerOrderShipmentPRs(vm5, 0);
-    createSenderCustomerOrderShipmentPRs(vm6, 0);
-    createSenderCustomerOrderShipmentPRs(vm7, 0);
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+    createSenderCustomerOrderShipmentPRs(vm6);
+    createSenderCustomerOrderShipmentPRs(vm7);
 
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
-    final Map custKeyValue = new HashMap();
+    final Map<Object, Object> custKeyValue = new HashMap<>();
     int intCustId = 1;
     CustId custId = new CustId(intCustId);
     custKeyValue.put(custId, new Customer());
     vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue));
 
     int transactions = 3;
-    final Map keyValues = new HashMap();
+    final Map<Object, Object> keyValues = new HashMap<>();
     for (int i = 0; i < transactions; i++) {
       OrderId orderId = new OrderId(i, custId);
       ShipmentId shipmentId1 = new ShipmentId(i, orderId);
@@ -400,23 +400,23 @@
 
     createSenders(lnPort, true);
 
-    createReceiverCustomerOrderShipmentPR(vm2, 0);
+    createReceiverCustomerOrderShipmentPR(vm2);
 
-    createSenderCustomerOrderShipmentPRs(vm4, 0);
-    createSenderCustomerOrderShipmentPRs(vm5, 0);
-    createSenderCustomerOrderShipmentPRs(vm6, 0);
-    createSenderCustomerOrderShipmentPRs(vm7, 0);
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+    createSenderCustomerOrderShipmentPRs(vm6);
+    createSenderCustomerOrderShipmentPRs(vm7);
 
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
-    final Map custKeyValue = new HashMap();
+    final Map<Object, Object> custKeyValue = new HashMap<>();
     int intCustId = 1;
     CustId custId = new CustId(intCustId);
     custKeyValue.put(custId, new Customer());
     vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue));
 
     int transactions = 3;
-    final Map keyValues = new HashMap();
+    final Map<Object, Object> keyValues = new HashMap<>();
     for (int i = 0; i < transactions; i++) {
       OrderId orderId = new OrderId(i, custId);
       ShipmentId shipmentId1 = new ShipmentId(i, orderId);
@@ -467,9 +467,76 @@
     assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5));
     // events not queued conflated:
     assertEquals(0, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7));
+    // batches with incomplete transactions
+    assertEquals(0, (int) v4List.get(13));
+
   }
 
   @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsWithIncompleteTransactions() {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    int dispThreads = 2;
+    createSenderInVm(lnPort, vm4, dispThreads);
+
+    createReceiverPR(vm2, 0);
+
+    createSenderPRInVM(0, vm4);
+
+    startSenderInVMs("ln", vm4);
+
+    // Adding events in transactions
+    // Transactions will contain objects assigned to different buckets but given that there is only
+    // one server, there will be no TransactionDataNotCollocatedException.
+    // With this and by using more than one dispatcher thread, we will provoke that
+    // it will be impossible for the batches to have complete transactions as some
+    // events for a transaction will be handled by one dispatcher thread and some other events by
+    // another thread.
+    final Map<Object, Object> keyValue = new HashMap<>();
+    int entries = 30;
+    for (int i = 0; i < entries; i++) {
+      keyValue.put(i, i);
+    }
+
+    int entriesPerTransaction = 3;
+    vm4.invoke(
+        () -> WANTestBase.doPutsInsideTransactions(testName, keyValue, entriesPerTransaction));
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(testName, entries));
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    // The number of batches will be 4 because each
+    // dispatcher thread (there are 2) will send half the number of entries,
+    // each on 2 batches.
+    int batches = 4;
+    // queue size:
+    assertEquals(0, (int) v4List.get(0));
+    // eventsReceived:
+    assertEquals(entries, (int) v4List.get(1));
+    // events queued:
+    assertEquals(entries, (int) v4List.get(2));
+    // events distributed:
+    assertEquals(entries, (int) v4List.get(3));
+    // batches distributed:
+    assertEquals(batches, (int) v4List.get(4));
+    // batches redistributed:
+    assertEquals(0, (int) v4List.get(5));
+    // events not queued conflated:
+    assertEquals(0, (int) v4List.get(7));
+    // batches with incomplete transactions
+    assertEquals(batches, (int) v4List.get(13));
+
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(batches, entries, entries));
+  }
+
+
+  @Test
   public void testPRParallelPropagationWithBatchRedistWithoutGroupTransactionEventsSendsBatchesWithIncompleteTransactions() {
     Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
@@ -478,18 +545,18 @@
 
     createSenders(lnPort, false);
 
-    createSenderCustomerOrderShipmentPRs(vm4, 0);
+    createSenderCustomerOrderShipmentPRs(vm4);
 
     startSenderInVMs("ln", vm4);
 
-    final Map custKeyValue = new HashMap();
+    final Map<Object, Object> custKeyValue = new HashMap<>();
     int intCustId = 1;
     CustId custId = new CustId(intCustId);
     custKeyValue.put(custId, new Customer());
     vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue));
 
     int transactions = 6;
-    final Map keyValues = new HashMap();
+    final Map<Object, Object> keyValues = new HashMap<>();
     for (int i = 0; i < transactions; i++) {
       OrderId orderId = new OrderId(i, custId);
       ShipmentId shipmentId1 = new ShipmentId(i, orderId);
@@ -506,7 +573,7 @@
 
     int entries = (transactions * eventsPerTransaction) + 1;
 
-    createReceiverCustomerOrderShipmentPR(vm2, 0);
+    createReceiverCustomerOrderShipmentPR(vm2);
 
     vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1));
     vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions));
@@ -544,21 +611,21 @@
 
     createSenders(lnPort, true);
 
-    createReceiverCustomerOrderShipmentPR(vm2, 0);
+    createReceiverCustomerOrderShipmentPR(vm2);
 
-    createSenderCustomerOrderShipmentPRs(vm4, 0);
+    createSenderCustomerOrderShipmentPRs(vm4);
 
     startSenderInVMs("ln", vm4);
 
 
-    final Map custKeyValue = new HashMap();
+    final Map<Object, Object> custKeyValue = new HashMap<>();
     int intCustId = 1;
     CustId custId = new CustId(intCustId);
     custKeyValue.put(custId, new Customer());
     vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue));
 
     int transactions = 6;
-    final Map keyValues = new HashMap();
+    final Map<Object, Object> keyValues = new HashMap<>();
     for (int i = 0; i < transactions; i++) {
       OrderId orderId = new OrderId(i, custId);
       ShipmentId shipmentId1 = new ShipmentId(i, orderId);
@@ -697,14 +764,14 @@
     assertEquals(NUM_PUTS, v4Sender1List.get(1).intValue()); // eventsReceived
     assertEquals(NUM_PUTS, v4Sender1List.get(2).intValue()); // events queued
     assertEquals(NUM_PUTS, v4Sender1List.get(3).intValue()); // events distributed
-    assertTrue(v4Sender1List.get(4).intValue() >= 10); // batches distributed
+    assertTrue(v4Sender1List.get(4) >= 10); // batches distributed
     assertEquals(0, v4Sender1List.get(5).intValue()); // batches redistributed
 
     assertEquals(0, v4Sender2List.get(0).intValue()); // queue size
     assertEquals(NUM_PUTS, v4Sender2List.get(1).intValue()); // eventsReceived
     assertEquals(NUM_PUTS, v4Sender2List.get(2).intValue()); // events queued
     assertEquals(NUM_PUTS, v4Sender2List.get(3).intValue()); // events distributed
-    assertTrue(v4Sender2List.get(4).intValue() >= 10); // batches distributed
+    assertTrue(v4Sender2List.get(4) >= 10); // batches distributed
     assertEquals(0, v4Sender2List.get(5).intValue()); // batches redistributed
 
     vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
@@ -734,13 +801,13 @@
 
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
-    AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts(testName, 1000));
+    AsyncInvocation<Void> inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts(testName, 1000));
     vm2.invoke(() -> await()
-        .untilAsserted(() -> assertEquals("Waiting for first batch to be received", true,
+        .untilAsserted(() -> assertTrue("Waiting for first batch to be received",
             getRegionSize(testName) > 10)));
-    AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
-    inv1.join();
-    inv2.join();
+    AsyncInvocation<Void> inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
+    inv1.await();
+    inv2.await();
 
     vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 1000));
 
@@ -801,15 +868,15 @@
 
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
-    AsyncInvocation inv1 =
+    AsyncInvocation<Void> inv1 =
         vm5.invokeAsync(() -> WANTestBase.doTxPutsWithRetryIfError(testName, 2, 1000, 0));
 
     vm2.invoke(() -> await()
-        .untilAsserted(() -> assertEquals("Waiting for some batches to be received", true,
+        .untilAsserted(() -> assertTrue("Waiting for some batches to be received",
             getRegionSize(testName) > 40)));
-    AsyncInvocation inv3 = vm4.invokeAsync(() -> WANTestBase.killSender());
-    inv1.join();
-    inv3.join();
+    AsyncInvocation<Void> inv3 = vm4.invokeAsync(() -> WANTestBase.killSender());
+    inv1.await();
+    inv3.await();
 
     vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 2000));
 
@@ -955,12 +1022,12 @@
 
     createReceiverPR(vm2, 1);
 
-    Map keyValues = putKeyValues();
+    Map<Object, Object> keyValues = putKeyValues();
 
     // Verify the conflation indexes map is empty
     verifyConflationIndexesSize("ln", 0, vm4, vm5, vm6, vm7);
 
-    final Map updateKeyValues = new HashMap();
+    final Map<Object, Object> updateKeyValues = new HashMap<>();
     for (int i = 0; i < 50; i++) {
       updateKeyValues.put(i, i + "_updated");
     }
@@ -1130,8 +1197,8 @@
         .setPoolIdleTimeout(-1)
         .setPoolPingInterval(5000)
         .create();
-    Region clientRegion =
-        clientCache.<String, String>createClientRegionFactory(ClientRegionShortcut.PROXY)
+    Region<Long, String> clientRegion =
+        clientCache.<Long, String>createClientRegionFactory(ClientRegionShortcut.PROXY)
             .create(testName);
     for (long i = 0; i < 2; i++) {
       clientRegion.put(i, "Value_" + i);
@@ -1149,8 +1216,8 @@
     assertEquals(0, ((PoolImpl) clientCache.getDefaultPool()).getStats().getDisConnects());
   }
 
-  protected Map putKeyValues() {
-    final Map keyValues = new HashMap();
+  protected Map<Object, Object> putKeyValues() {
+    final Map<Object, Object> keyValues = new HashMap<>();
     for (int i = 0; i < NUM_PUTS; i++) {
       keyValues.put(i, i);
     }
@@ -1167,15 +1234,15 @@
         () -> WANTestBase.createPartitionedRegion(testName, null, redundancy, 10, isOffHeap()));
   }
 
-  protected void createReceiverCustomerOrderShipmentPR(VM vm, int redundancy) {
+  protected void createReceiverCustomerOrderShipmentPR(VM vm) {
     vm.invoke(
-        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion(null, redundancy, 10,
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion(null, 0, 10,
             isOffHeap()));
   }
 
-  protected void createSenderCustomerOrderShipmentPRs(VM vm, int redundancy) {
+  protected void createSenderCustomerOrderShipmentPRs(VM vm) {
     vm.invoke(
-        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", redundancy, 10,
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 0, 10,
             isOffHeap()));
   }
 
@@ -1213,6 +1280,14 @@
     vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
   }
 
+  protected void createSenderInVm(Integer lnPort, VM vm,
+      int dispatcherThreads) {
+    createCacheInVMs(lnPort, vm);
+    vm.invoke(() -> WANTestBase.setNumDispatcherThreadsForTheRun(dispatcherThreads));
+    vm.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true,
+        true));
+  }
+
   protected void createSenderInVm(Integer lnPort, VM vm) {
     createCacheInVMs(lnPort, vm);
     vm.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
@@ -1246,7 +1321,7 @@
 
   private void putSameEntry(String regionName, int numIterations) {
     // This does one create and numInterations-1 updates
-    Region region = cache.getRegion(regionName);
+    Region<Object, Object> region = cache.getRegion(regionName);
     for (int i = 0; i < numIterations; i++) {
       region.put(0, i);
     }