GEODE-8783: Publish batchesWithIncompleteTransactions in GatewaySende… (#5845)
* GEODE-8783: Publish batchesWithIncompleteTransactions in GatewaySenderMXBean
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayMBeanBridgeJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayMBeanBridgeJUnitTest.java
index cd1a55d..a46d3c2 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayMBeanBridgeJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayMBeanBridgeJUnitTest.java
@@ -51,6 +51,10 @@
@Test
public void testSenderStats() throws InterruptedException {
+ senderStats.incBatchesWithIncompleteTransactions();
+ senderStats.incBatchesWithIncompleteTransactions();
+ senderStats.incBatchesWithIncompleteTransactions();
+ senderStats.incBatchesRedistributed();
senderStats.incBatchesRedistributed();
senderStats.incEventsReceived();
Mockito.when(sender.getEventQueueSize()).thenReturn(10);
@@ -61,7 +65,9 @@
sample();
- assertEquals(1, getTotalBatchesRedistributed());
+ assertEquals(1, getTotalBatchesDistributed());
+ assertEquals(2, getTotalBatchesRedistributed());
+ assertEquals(3, getTotalBatchesWithIncompleteTransactions());
assertEquals(1, getTotalEventsConflated());
assertEquals(10, getEventQueueSize());
assertTrue(getEventsQueuedRate() > 0);
@@ -71,10 +77,18 @@
assertTrue(getEventsExceedingAlertThreshold() > 0);
}
+ private int getTotalBatchesDistributed() {
+ return bridge.getTotalBatchesDistributed();
+ }
+
private int getTotalBatchesRedistributed() {
return bridge.getTotalBatchesRedistributed();
}
+ private int getTotalBatchesWithIncompleteTransactions() {
+ return bridge.getTotalBatchesWithIncompleteTransactions();
+ }
+
private int getTotalEventsConflated() {
return bridge.getTotalEventsConflated();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index cf9cde4..73da797 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -63,6 +63,9 @@
protected static final String BATCHES_DISTRIBUTED = "batchesDistributed";
/** Name of the batches redistributed statistic */
protected static final String BATCHES_REDISTRIBUTED = "batchesRedistributed";
+ /** Name of the batches redistributed statistic */
+ protected static final String BATCHES_WITH_INCOMPLETE_TRANSACTIONS =
+ "batchesWithIncompleteTransactions";
/** Name of the batches resized statistic */
protected static final String BATCHES_RESIZED = "batchesResized";
/** Name of the unprocessed events added by primary statistic */
@@ -125,6 +128,8 @@
private static final int batchesDistributedId;
/** Id of the batches redistributed statistic */
private static final int batchesRedistributedId;
+ /** Id of the batches with incomplete transactions statistic */
+ private static final int batchesWithIncompleteTransactionsId;
/** Id of the batches resized statistic */
private static final int batchesResizedId;
/** Id of the unprocessed events added by primary statistic */
@@ -184,6 +189,7 @@
batchDistributionTimeId = type.nameToId(BATCH_DISTRIBUTION_TIME);
batchesDistributedId = type.nameToId(BATCHES_DISTRIBUTED);
batchesRedistributedId = type.nameToId(BATCHES_REDISTRIBUTED);
+ batchesWithIncompleteTransactionsId = type.nameToId(BATCHES_WITH_INCOMPLETE_TRANSACTIONS);
batchesResizedId = type.nameToId(BATCHES_RESIZED);
unprocessedTokensAddedByPrimaryId = type.nameToId(UNPROCESSED_TOKENS_ADDED_BY_PRIMARY);
unprocessedEventsAddedBySecondaryId = type.nameToId(UNPROCESSED_EVENTS_ADDED_BY_SECONDARY);
@@ -240,6 +246,9 @@
f.createIntCounter(BATCHES_REDISTRIBUTED,
"Number of batches of events removed from the event queue and resent.",
"operations", false),
+ f.createLongCounter(BATCHES_WITH_INCOMPLETE_TRANSACTIONS,
+ "Number of batches of events sent with incomplete transactions.",
+ "operations", false),
f.createIntCounter(BATCHES_RESIZED,
"Number of batches that were resized because they were too large", "operations",
false),
@@ -461,27 +470,36 @@
}
/**
- * Returns the current value of the batchesDistributed" stat.
+ * Returns the current value of the "batchesDistributed" stat.
*
- * @return the current value of the batchesDistributed" stat
+ * @return the current value of the "batchesDistributed" stat
*/
public int getBatchesDistributed() {
return this.stats.getInt(batchesDistributedId);
}
/**
- * Returns the current value of the batchesRedistributed" stat.
+ * Returns the current value of the "batchesRedistributed" stat.
*
- * @return the current value of the batchesRedistributed" stat
+ * @return the current value of the "batchesRedistributed" stat
*/
public int getBatchesRedistributed() {
return this.stats.getInt(batchesRedistributedId);
}
/**
- * Returns the current value of the batchesResized" stat.
+ * Returns the current value of the "batchesWithIncompleteTransactions" stat.
*
- * @return the current value of the batchesResized" stat
+ * @return the current value of the "batchesWithIncompleteTransactions" stat
+ */
+ public long getBatchesWithIncompleteTransactions() {
+ return this.stats.getLong(batchesWithIncompleteTransactionsId);
+ }
+
+ /**
+ * Returns the current value of the "batchesResized" stat.
+ *
+ * @return the current value of the "batchesResized" stat
*/
public int getBatchesResized() {
return this.stats.getInt(batchesResizedId);
@@ -495,6 +513,13 @@
}
/**
+ * Increments the value of the "batchesWithIncompleteTransactions" stat by 1.
+ */
+ public void incBatchesWithIncompleteTransactions() {
+ this.stats.incLong(batchesWithIncompleteTransactionsId, 1);
+ }
+
+ /**
* Increments the value of the "batchesRedistributed" stat by 1.
*/
public void incBatchesResized() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 2100d54..b7e2922 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1311,6 +1311,7 @@
peekEventsFromIncompleteTransactions(batch, prQ);
}
+
if (isDebugEnabled) {
logger.debug("{}: Peeked a batch of {} entries. The size of the queue is {}. localSize is {}",
this, batch.size(), size(), localSize());
@@ -1354,6 +1355,7 @@
return;
}
+ boolean batchHasIncompleteTransactions = false;
for (Map.Entry<TransactionId, Integer> pendingTransaction : incompleteTransactionIdsInBatch
.entrySet()) {
TransactionId transactionId = pendingTransaction.getKey();
@@ -1377,10 +1379,14 @@
}
}
if (!areAllEventsForTransactionInBatch) {
+ batchHasIncompleteTransactions = true;
logger.warn("Not able to retrieve all events for transaction {} after {} tries",
transactionId, retries);
}
}
+ if (batchHasIncompleteTransactions) {
+ stats.incBatchesWithIncompleteTransactions();
+ }
}
private Map<TransactionId, Integer> getIncompleteTransactionsInBatch(
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index ce119a2..45f4f84 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -469,6 +469,7 @@
return;
}
+ boolean batchHasIncompleteTransactions = true;
for (TransactionId transactionId : incompleteTransactionIdsInBatch) {
boolean areAllEventsForTransactionInBatch = false;
int retries = 0;
@@ -492,10 +493,14 @@
lastKeyForTransaction = eventsAndKey.lastKey;
}
if (!areAllEventsForTransactionInBatch) {
+ batchHasIncompleteTransactions = true;
logger.warn("Not able to retrieve all events for transaction {} after {} tries",
transactionId, retries);
}
}
+ if (batchHasIncompleteTransactions) {
+ stats.incBatchesWithIncompleteTransactions();
+ }
}
protected boolean mustGroupTransactionEvents() {
diff --git a/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java b/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java
index a889dd5..adc9202 100644
--- a/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java
+++ b/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java
@@ -174,9 +174,20 @@
/**
* Returns the total number of batches of events that were resent.
*/
+ int getTotalBatchesDistributed();
+
+ /**
+ * Returns the total number of batches of events that were resent.
+ */
int getTotalBatchesRedistributed();
/**
+ * Returns the total number of batches sent with incomplete transactions.
+ * Only relevant if group-transaction-events is enabled.
+ */
+ int getTotalBatchesWithIncompleteTransactions();
+
+ /**
* Returns the total number of bytes in heap occupied by the event queue.
*/
long getTotalQueueSizeBytesInUse();
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
index 10a2f75..c7ea0d4 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
@@ -1275,6 +1275,13 @@
}
/**
+ * @return total batches distributed
+ */
+ public int getGatewaySenderTotalBatchesDistributed() {
+ return senderMonitor.getGatewaySenderTotalBatchesDistributed();
+ }
+
+ /**
* @return total batches redistributed
*/
public int getGatewaySenderTotalBatchesRedistributed() {
@@ -1282,6 +1289,13 @@
}
/**
+ * @return total batches with incomplete transactions
+ */
+ public int getGatewaySenderTotalBatchesWithIncompleteTransactions() {
+ return senderMonitor.getGatewaySenderTotalBatchesWithIncompleteTransactions();
+ }
+
+ /**
* @return total number of events conflated
*/
public int getGatewaySenderTotalEventsConflated() {
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java
index 1f422ff..e83dde4 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java
@@ -124,11 +124,21 @@
}
@Override
+ public int getTotalBatchesDistributed() {
+ return bridge.getTotalBatchesDistributed();
+ }
+
+ @Override
public int getTotalBatchesRedistributed() {
return bridge.getTotalBatchesRedistributed();
}
@Override
+ public int getTotalBatchesWithIncompleteTransactions() {
+ return bridge.getTotalBatchesWithIncompleteTransactions();
+ }
+
+ @Override
public int getTotalEventsConflated() {
return bridge.getTotalEventsConflated();
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
index c07802b..0d38622 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
@@ -248,11 +248,19 @@
/** Statistics Related Attributes **/
+ public int getTotalBatchesDistributed() {
+ return getStatistic(StatsKey.GATEWAYSENDER_BATCHES_DISTRIBUTED).intValue();
+ }
public int getTotalBatchesRedistributed() {
return getStatistic(StatsKey.GATEWAYSENDER_TOTAL_BATCHES_REDISTRIBUTED).intValue();
}
+ public int getTotalBatchesWithIncompleteTransactions() {
+ return getStatistic(StatsKey.GATEWAYSENDER_TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS)
+ .intValue();
+ }
+
public int getTotalEventsConflated() {
return getStatistic(StatsKey.GATEWAYSENDER_EVENTS_QUEUED_CONFLATED).intValue();
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderClusterStatsMonitor.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderClusterStatsMonitor.java
index 37f9c7d..6186cf5 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderClusterStatsMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderClusterStatsMonitor.java
@@ -30,8 +30,13 @@
private static final String EVENTS_QUEUED_RATE = "EventsQueuedRate";
+ private static final String TOTAL_BATCHES_DISTRIBUTED = "TotalBatchesDistributed";
+
private static final String TOTAL_BATCHES_REDISTRIBUTED = "TotalBatchesRedistributed";
+ private static final String TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS =
+ "TotalBatchesWithIncompleteTransactions";
+
private static final String TOTAL_EVENTS_CONFLATED = "TotalEventsConflated";
@@ -54,7 +59,9 @@
typeMap.put(BATCHES_DISPATCHED_RATE, Float.TYPE);
typeMap.put(EVENT_QUEUE_SIZE, Integer.TYPE);
typeMap.put(EVENTS_QUEUED_RATE, Float.TYPE);
+ typeMap.put(TOTAL_BATCHES_DISTRIBUTED, Integer.TYPE);
typeMap.put(TOTAL_BATCHES_REDISTRIBUTED, Integer.TYPE);
+ typeMap.put(TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS, Integer.TYPE);
typeMap.put(TOTAL_EVENTS_CONFLATED, Integer.TYPE);
}
@@ -75,10 +82,18 @@
return aggregator.getFloatValue(EVENTS_QUEUED_RATE);
}
+ public int getGatewaySenderTotalBatchesDistributed() {
+ return aggregator.getIntValue(TOTAL_BATCHES_DISTRIBUTED);
+ }
+
public int getGatewaySenderTotalBatchesRedistributed() {
return aggregator.getIntValue(TOTAL_BATCHES_REDISTRIBUTED);
}
+ public int getGatewaySenderTotalBatchesWithIncompleteTransactions() {
+ return aggregator.getIntValue(TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS);
+ }
+
public int getGatewaySenderTotalEventsConflated() {
return aggregator.getIntValue(TOTAL_EVENTS_CONFLATED);
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java
index be65916..56d856a 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java
@@ -302,6 +302,8 @@
public static final String GATEWAYSENDER_BATCHES_DISTRIBUTED = "batchesDistributed";
public static final String GATEWAYSENDER_BATCHES_DISTRIBUTE_TIME = "batchDistributionTime";
public static final String GATEWAYSENDER_TOTAL_BATCHES_REDISTRIBUTED = "batchesRedistributed";
+ public static final String GATEWAYSENDER_TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS =
+ "batchesWithIncompleteTransactions";
public static final String GATEWAYSENDER_EVENTS_QUEUED_CONFLATED = "eventsNotQueuedConflated";
public static final String GATEWAYSENDER_EVENTS_EXCEEDING_ALERT_THRESHOLD =
"eventsExceedingAlertThreshold";
diff --git a/geode-docs/reference/topics/cache_xml.html.md.erb b/geode-docs/reference/topics/cache_xml.html.md.erb
index 0c83a17..9ab1ba7 100644
--- a/geode-docs/reference/topics/cache_xml.html.md.erb
+++ b/geode-docs/reference/topics/cache_xml.html.md.erb
@@ -299,7 +299,8 @@
<td>group-transaction-events</td>
<td>Boolean value to ensure that all the events of a transaction are sent in the same batch, i.e., they are never spread across different batches.
<p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders with the <code class="ph codeph">parallel</code> attribute set to true. Also, the <code class="ph codeph">enable-batch-conflation</code> attribute of the gateway sender must be set to false.</p>
-<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p></td>
+<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p>
+<p><b>Note:</b> If the above condition is not fulfilled or under very high load traffic conditions, it may not be guaranteed that all the events for a transaction will be sent in the same batch, even if <code class="ph codeph">group-transaction-events</code> is enabled. The number of batches sent with incomplete transactions can be retrieved from the <code class="ph codeph">GatewaySenderMXBean</code> bean.</p></td>
<td>false</td>
</tr>
</tbody>
diff --git a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
index 53c8f79..30b4da1 100644
--- a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
+++ b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
@@ -636,7 +636,8 @@
<td><span class="keyword parmname">\-\-group-transaction-events</span></td>
<td>Boolean value to ensure that all the events of a transaction are sent in the same batch, i.e., they are never spread across different batches.
<p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders with the <code class="ph codeph">parallel</code> attribute set to true. Also, the <code class="ph codeph">enable-batch-conflation</code> attribute of the gateway sender must be set to false.</p>
-<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p></td>
+<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p>
+<p><b>Note:</b> If the above condition is not fulfilled or under very high load traffic conditions, it may not be guaranteed that all the events for a transaction will be sent in the same batch, even if <code class="ph codeph">group-transaction-events</code> is enabled. The number of batches sent with incomplete transactions can be retrieved from the <code class="ph codeph">GatewaySenderMXBean</code> bean.</p></td>
</td>
<td>false</td>
</tr>
diff --git a/geode-docs/topologies_and_comm/topology_concepts/multisite_overview.html.md.erb b/geode-docs/topologies_and_comm/topology_concepts/multisite_overview.html.md.erb
index 20d063b..6bd91b7 100644
--- a/geode-docs/topologies_and_comm/topology_concepts/multisite_overview.html.md.erb
+++ b/geode-docs/topologies_and_comm/topology_concepts/multisite_overview.html.md.erb
@@ -43,6 +43,9 @@
- The regions to which the transaction events belong must be replicated by the same set of gateway senders that also have this setting enabled.
- This setting cannot be enabled if `enable-batch-conflation` is in effect.
+**Note:**
+If the above conditions are not fulfilled or under very high load traffic conditions, it may not be guaranteed that all the events for a transaction will be sent in the same batch, even if <code class="ph codeph">group-transaction-events</code> is enabled. The number of batches sent with incomplete transactions can be retrieved from the <code class="ph codeph">GatewaySenderMXBean</code> bean.
+
By default, potential WAN conflicts are resolved using a timestamp mechanism. You can optionally install a custom conflict resolver to apply custom logic when determining whether to apply a potentially conflicting update received over a WAN.
[Consistency for Region Updates](../../developing/distributed_regions/region_entry_versions.html#topic_CF2798D3E12647F182C2CEC4A46E2045) describes how Geode ensures consistency within a cluster, in client caches, and when applying updates over a WAN. [Resolving Conflicting Events](../../developing/events/resolving_multisite_conflicts.html#topic_E97BB68748F14987916CD1A50E4B4542) provides more details about implementing a custom conflict resolver for WAN updates.
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 9292bae..413dcad 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1251,6 +1251,7 @@
stats.add(statistics.getSecondaryEventQueueSize());
stats.add(statistics.getEventsProcessedByPQRM());
stats.add(statistics.getEventsExceedingAlertThreshold());
+ stats.add((int) statistics.getBatchesWithIncompleteTransactions());
return stats;
}
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index d41dc6a..2138342 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -330,23 +330,23 @@
createSenders(lnPort, false);
- createReceiverCustomerOrderShipmentPR(vm2, 0);
+ createReceiverCustomerOrderShipmentPR(vm2);
- createSenderCustomerOrderShipmentPRs(vm4, 0);
- createSenderCustomerOrderShipmentPRs(vm5, 0);
- createSenderCustomerOrderShipmentPRs(vm6, 0);
- createSenderCustomerOrderShipmentPRs(vm7, 0);
+ createSenderCustomerOrderShipmentPRs(vm4);
+ createSenderCustomerOrderShipmentPRs(vm5);
+ createSenderCustomerOrderShipmentPRs(vm6);
+ createSenderCustomerOrderShipmentPRs(vm7);
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
- final Map custKeyValue = new HashMap();
+ final Map<Object, Object> custKeyValue = new HashMap<>();
int intCustId = 1;
CustId custId = new CustId(intCustId);
custKeyValue.put(custId, new Customer());
vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue));
int transactions = 3;
- final Map keyValues = new HashMap();
+ final Map<Object, Object> keyValues = new HashMap<>();
for (int i = 0; i < transactions; i++) {
OrderId orderId = new OrderId(i, custId);
ShipmentId shipmentId1 = new ShipmentId(i, orderId);
@@ -400,23 +400,23 @@
createSenders(lnPort, true);
- createReceiverCustomerOrderShipmentPR(vm2, 0);
+ createReceiverCustomerOrderShipmentPR(vm2);
- createSenderCustomerOrderShipmentPRs(vm4, 0);
- createSenderCustomerOrderShipmentPRs(vm5, 0);
- createSenderCustomerOrderShipmentPRs(vm6, 0);
- createSenderCustomerOrderShipmentPRs(vm7, 0);
+ createSenderCustomerOrderShipmentPRs(vm4);
+ createSenderCustomerOrderShipmentPRs(vm5);
+ createSenderCustomerOrderShipmentPRs(vm6);
+ createSenderCustomerOrderShipmentPRs(vm7);
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
- final Map custKeyValue = new HashMap();
+ final Map<Object, Object> custKeyValue = new HashMap<>();
int intCustId = 1;
CustId custId = new CustId(intCustId);
custKeyValue.put(custId, new Customer());
vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue));
int transactions = 3;
- final Map keyValues = new HashMap();
+ final Map<Object, Object> keyValues = new HashMap<>();
for (int i = 0; i < transactions; i++) {
OrderId orderId = new OrderId(i, custId);
ShipmentId shipmentId1 = new ShipmentId(i, orderId);
@@ -467,9 +467,76 @@
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5));
// events not queued conflated:
assertEquals(0, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7));
+ // batches with incomplete transactions
+ assertEquals(0, (int) v4List.get(13));
+
}
@Test
+ public void testPRParallelPropagationWithGroupTransactionEventsWithIncompleteTransactions() {
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+
+ int dispThreads = 2;
+ createSenderInVm(lnPort, vm4, dispThreads);
+
+ createReceiverPR(vm2, 0);
+
+ createSenderPRInVM(0, vm4);
+
+ startSenderInVMs("ln", vm4);
+
+ // Adding events in transactions
+ // Transactions will contain objects assigned to different buckets but given that there is only
+ // one server, there will be no TransactionDataNotCollocatedException.
+ // With this and by using more than one dispatcher thread, we will provoke that
+ // it will be impossible for the batches to have complete transactions as some
+ // events for a transaction will be handled by one dispatcher thread and some other events by
+ // another thread.
+ final Map<Object, Object> keyValue = new HashMap<>();
+ int entries = 30;
+ for (int i = 0; i < entries; i++) {
+ keyValue.put(i, i);
+ }
+
+ int entriesPerTransaction = 3;
+ vm4.invoke(
+ () -> WANTestBase.doPutsInsideTransactions(testName, keyValue, entriesPerTransaction));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(testName, entries));
+
+ ArrayList<Integer> v4List =
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // The number of batches will be 4 because each
+ // dispatcher thread (there are 2) will send half the number of entries,
+ // each on 2 batches.
+ int batches = 4;
+ // queue size:
+ assertEquals(0, (int) v4List.get(0));
+ // eventsReceived:
+ assertEquals(entries, (int) v4List.get(1));
+ // events queued:
+ assertEquals(entries, (int) v4List.get(2));
+ // events distributed:
+ assertEquals(entries, (int) v4List.get(3));
+ // batches distributed:
+ assertEquals(batches, (int) v4List.get(4));
+ // batches redistributed:
+ assertEquals(0, (int) v4List.get(5));
+ // events not queued conflated:
+ assertEquals(0, (int) v4List.get(7));
+ // batches with incomplete transactions
+ assertEquals(batches, (int) v4List.get(13));
+
+ vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(batches, entries, entries));
+ }
+
+
+ @Test
public void testPRParallelPropagationWithBatchRedistWithoutGroupTransactionEventsSendsBatchesWithIncompleteTransactions() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
@@ -478,18 +545,18 @@
createSenders(lnPort, false);
- createSenderCustomerOrderShipmentPRs(vm4, 0);
+ createSenderCustomerOrderShipmentPRs(vm4);
startSenderInVMs("ln", vm4);
- final Map custKeyValue = new HashMap();
+ final Map<Object, Object> custKeyValue = new HashMap<>();
int intCustId = 1;
CustId custId = new CustId(intCustId);
custKeyValue.put(custId, new Customer());
vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue));
int transactions = 6;
- final Map keyValues = new HashMap();
+ final Map<Object, Object> keyValues = new HashMap<>();
for (int i = 0; i < transactions; i++) {
OrderId orderId = new OrderId(i, custId);
ShipmentId shipmentId1 = new ShipmentId(i, orderId);
@@ -506,7 +573,7 @@
int entries = (transactions * eventsPerTransaction) + 1;
- createReceiverCustomerOrderShipmentPR(vm2, 0);
+ createReceiverCustomerOrderShipmentPR(vm2);
vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1));
vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions));
@@ -544,21 +611,21 @@
createSenders(lnPort, true);
- createReceiverCustomerOrderShipmentPR(vm2, 0);
+ createReceiverCustomerOrderShipmentPR(vm2);
- createSenderCustomerOrderShipmentPRs(vm4, 0);
+ createSenderCustomerOrderShipmentPRs(vm4);
startSenderInVMs("ln", vm4);
- final Map custKeyValue = new HashMap();
+ final Map<Object, Object> custKeyValue = new HashMap<>();
int intCustId = 1;
CustId custId = new CustId(intCustId);
custKeyValue.put(custId, new Customer());
vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue));
int transactions = 6;
- final Map keyValues = new HashMap();
+ final Map<Object, Object> keyValues = new HashMap<>();
for (int i = 0; i < transactions; i++) {
OrderId orderId = new OrderId(i, custId);
ShipmentId shipmentId1 = new ShipmentId(i, orderId);
@@ -697,14 +764,14 @@
assertEquals(NUM_PUTS, v4Sender1List.get(1).intValue()); // eventsReceived
assertEquals(NUM_PUTS, v4Sender1List.get(2).intValue()); // events queued
assertEquals(NUM_PUTS, v4Sender1List.get(3).intValue()); // events distributed
- assertTrue(v4Sender1List.get(4).intValue() >= 10); // batches distributed
+ assertTrue(v4Sender1List.get(4) >= 10); // batches distributed
assertEquals(0, v4Sender1List.get(5).intValue()); // batches redistributed
assertEquals(0, v4Sender2List.get(0).intValue()); // queue size
assertEquals(NUM_PUTS, v4Sender2List.get(1).intValue()); // eventsReceived
assertEquals(NUM_PUTS, v4Sender2List.get(2).intValue()); // events queued
assertEquals(NUM_PUTS, v4Sender2List.get(3).intValue()); // events distributed
- assertTrue(v4Sender2List.get(4).intValue() >= 10); // batches distributed
+ assertTrue(v4Sender2List.get(4) >= 10); // batches distributed
assertEquals(0, v4Sender2List.get(5).intValue()); // batches redistributed
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
@@ -734,13 +801,13 @@
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
- AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts(testName, 1000));
+ AsyncInvocation<Void> inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts(testName, 1000));
vm2.invoke(() -> await()
- .untilAsserted(() -> assertEquals("Waiting for first batch to be received", true,
+ .untilAsserted(() -> assertTrue("Waiting for first batch to be received",
getRegionSize(testName) > 10)));
- AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
- inv1.join();
- inv2.join();
+ AsyncInvocation<Void> inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
+ inv1.await();
+ inv2.await();
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 1000));
@@ -801,15 +868,15 @@
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
- AsyncInvocation inv1 =
+ AsyncInvocation<Void> inv1 =
vm5.invokeAsync(() -> WANTestBase.doTxPutsWithRetryIfError(testName, 2, 1000, 0));
vm2.invoke(() -> await()
- .untilAsserted(() -> assertEquals("Waiting for some batches to be received", true,
+ .untilAsserted(() -> assertTrue("Waiting for some batches to be received",
getRegionSize(testName) > 40)));
- AsyncInvocation inv3 = vm4.invokeAsync(() -> WANTestBase.killSender());
- inv1.join();
- inv3.join();
+ AsyncInvocation<Void> inv3 = vm4.invokeAsync(() -> WANTestBase.killSender());
+ inv1.await();
+ inv3.await();
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 2000));
@@ -955,12 +1022,12 @@
createReceiverPR(vm2, 1);
- Map keyValues = putKeyValues();
+ Map<Object, Object> keyValues = putKeyValues();
// Verify the conflation indexes map is empty
verifyConflationIndexesSize("ln", 0, vm4, vm5, vm6, vm7);
- final Map updateKeyValues = new HashMap();
+ final Map<Object, Object> updateKeyValues = new HashMap<>();
for (int i = 0; i < 50; i++) {
updateKeyValues.put(i, i + "_updated");
}
@@ -1130,8 +1197,8 @@
.setPoolIdleTimeout(-1)
.setPoolPingInterval(5000)
.create();
- Region clientRegion =
- clientCache.<String, String>createClientRegionFactory(ClientRegionShortcut.PROXY)
+ Region<Long, String> clientRegion =
+ clientCache.<Long, String>createClientRegionFactory(ClientRegionShortcut.PROXY)
.create(testName);
for (long i = 0; i < 2; i++) {
clientRegion.put(i, "Value_" + i);
@@ -1149,8 +1216,8 @@
assertEquals(0, ((PoolImpl) clientCache.getDefaultPool()).getStats().getDisConnects());
}
- protected Map putKeyValues() {
- final Map keyValues = new HashMap();
+ protected Map<Object, Object> putKeyValues() {
+ final Map<Object, Object> keyValues = new HashMap<>();
for (int i = 0; i < NUM_PUTS; i++) {
keyValues.put(i, i);
}
@@ -1167,15 +1234,15 @@
() -> WANTestBase.createPartitionedRegion(testName, null, redundancy, 10, isOffHeap()));
}
- protected void createReceiverCustomerOrderShipmentPR(VM vm, int redundancy) {
+ protected void createReceiverCustomerOrderShipmentPR(VM vm) {
vm.invoke(
- () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion(null, redundancy, 10,
+ () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion(null, 0, 10,
isOffHeap()));
}
- protected void createSenderCustomerOrderShipmentPRs(VM vm, int redundancy) {
+ protected void createSenderCustomerOrderShipmentPRs(VM vm) {
vm.invoke(
- () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", redundancy, 10,
+ () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 0, 10,
isOffHeap()));
}
@@ -1213,6 +1280,14 @@
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
}
+ protected void createSenderInVm(Integer lnPort, VM vm,
+ int dispatcherThreads) {
+ createCacheInVMs(lnPort, vm);
+ vm.invoke(() -> WANTestBase.setNumDispatcherThreadsForTheRun(dispatcherThreads));
+ vm.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true,
+ true));
+ }
+
protected void createSenderInVm(Integer lnPort, VM vm) {
createCacheInVMs(lnPort, vm);
vm.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
@@ -1246,7 +1321,7 @@
private void putSameEntry(String regionName, int numIterations) {
// This does one create and numInterations-1 updates
- Region region = cache.getRegion(regionName);
+ Region<Object, Object> region = cache.getRegion(regionName);
for (int i = 0; i < numIterations; i++) {
region.put(0, i);
}