blob: d41dc6a79339d7560395d666530705187ebf9686 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan.parallel;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.internal.cache.execute.data.CustId;
import org.apache.geode.internal.cache.execute.data.Customer;
import org.apache.geode.internal.cache.execute.data.Order;
import org.apache.geode.internal.cache.execute.data.OrderId;
import org.apache.geode.internal.cache.execute.data.Shipment;
import org.apache.geode.internal.cache.execute.data.ShipmentId;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.WanTest;
@Category({WanTest.class})
public class ParallelWANStatsDUnitTest extends WANTestBase {
private static final int NUM_PUTS = 100;
private static final long serialVersionUID = 1L;
private String testName;
public ParallelWANStatsDUnitTest() {
super();
}
@Override
protected final void postSetUpWANTestBase() {
this.testName = getTestMethodName();
}
@Test
public void testConnectionStatsAreCreated() {
// 1. Create locators
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// 2. Create cache & receiver in vm2
createCacheInVMs(nyPort, vm2);
createReceiverInVMs(vm2);
// 3. Create cache & sender in vm4
createSenderInVm(lnPort, vm4);
// 4. Create Region in vm4 (sender)
createSenderPRInVM(1, vm4);
// 5. Create region in vm2 (receiver)
createReceiverPR(vm2, 1);
// 6. Start sender in vm4
startSenderInVMs("ln", vm4);
vm4.invoke(() -> WANTestBase.checkConnectionStats("ln"));
}
@Test
public void testQueueSizeInSecondaryBucketRegionQueuesWithMemberRestart() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createReceiverInVMs(vm2);
createSendersWithConflation(lnPort);
createSenderPRs(1);
startPausedSenders();
createReceiverPR(vm2, 1);
putKeyValues();
ArrayList<Integer> v4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
ArrayList<Integer> v5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
ArrayList<Integer> v6List =
(ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
ArrayList<Integer> v7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
assertEquals(NUM_PUTS,
v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue size
assertEquals(NUM_PUTS * 2,
v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived
assertEquals(NUM_PUTS * 2,
v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events queued
assertEquals(0,
v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events distributed
assertEquals(NUM_PUTS,
v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary queue size
// stop vm7 to trigger rebalance and move some primary buckets
System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10)
+ ":" + v6List.get(10) + ":" + v7List.get(10));
vm7.invoke(() -> WANTestBase.closeCache());
await().untilAsserted(() -> {
int v4secondarySize = vm4.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
int v5secondarySize = vm5.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
int v6secondarySize = vm6.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
assertEquals(NUM_PUTS, v4secondarySize + v5secondarySize + v6secondarySize);
});
System.out.println("New secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) + ":"
+ v6List.get(10));
vm7.invoke(() -> WANTestBase.createCache(lnPort));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
vm7.invoke(() -> WANTestBase.createPartitionedRegion(testName, "ln", 1, 10, isOffHeap()));
startSenderInVMs("ln", vm7);
vm7.invoke(() -> pauseSender("ln"));
v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
assertEquals(NUM_PUTS,
v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
// queue
// size
System.out.println("After restart vm7, secondary queue sizes:" + v4List.get(10) + ":"
+ v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10));
vm4.invoke(() -> WANTestBase.resumeSender("ln"));
vm5.invoke(() -> WANTestBase.resumeSender("ln"));
vm6.invoke(() -> WANTestBase.resumeSender("ln"));
vm7.invoke(() -> WANTestBase.resumeSender("ln"));
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS));
vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
// events distributed:
assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3));
// secondary queue size:
assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10));
}
// TODO: add a test without redundancy for primary switch
@Test
public void testQueueSizeInSecondaryWithPrimarySwitch() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createReceiverInVMs(vm2);
createSendersWithConflation(lnPort);
createSenderPRs(1);
startPausedSenders();
createReceiverPR(vm2, 1);
putKeyValues();
ArrayList<Integer> v4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
ArrayList<Integer> v5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
ArrayList<Integer> v6List =
(ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
ArrayList<Integer> v7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
// queue size:
assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0));
// events received:
assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1));
// events queued:
assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2));
// events distributed:
assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3));
// secondary queue size:
assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10));
vm4.invoke(() -> WANTestBase.resumeSender("ln"));
vm5.invoke(() -> WANTestBase.resumeSender("ln"));
vm6.invoke(() -> WANTestBase.resumeSender("ln"));
vm7.invoke(() -> WANTestBase.resumeSender("ln"));
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS));
vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
// events distributed:
assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3));
// secondary queue size:
assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10));
}
@Test
public void testPartitionedRegionParallelPropagation_BeforeDispatch() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
createSendersWithConflation(lnPort);
createSenderPRs(0);
startPausedSenders();
createReceiverPR(vm2, 1);
createReceiverPR(vm3, 1);
putKeyValues();
ArrayList<Integer> v4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
ArrayList<Integer> v5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
ArrayList<Integer> v6List =
(ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
ArrayList<Integer> v7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
// queue size:
assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0));
// events received:
assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1));
// events queued:
assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2));
// events distributed
assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3));
// batches distributed:
assertEquals(0, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4));
// batches redistributed
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5));
}
@Test
public void testPartitionedRegionParallelPropagation_AfterDispatch_NoRedundancy() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createReceiverInVMs(vm2);
createSenders(lnPort);
createReceiverPR(vm2, 0);
createSenderPRs(0);
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.doPuts(testName, NUM_PUTS));
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
ArrayList<Integer> v4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v6List =
(ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
// queue size:
assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0));
// eventsReceived:
assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1));
// events queued:
assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2));
// events distributed:
assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3));
// batches distributed:
assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10);
// batches redistributed:
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5));
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
}
@Test
public void testPRParallelPropagationWithoutGroupTransactionEventsSendsBatchesWithIncompleteTransactions() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createReceiverInVMs(vm2);
createSenders(lnPort, false);
createReceiverCustomerOrderShipmentPR(vm2, 0);
createSenderCustomerOrderShipmentPRs(vm4, 0);
createSenderCustomerOrderShipmentPRs(vm5, 0);
createSenderCustomerOrderShipmentPRs(vm6, 0);
createSenderCustomerOrderShipmentPRs(vm7, 0);
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
final Map custKeyValue = new HashMap();
int intCustId = 1;
CustId custId = new CustId(intCustId);
custKeyValue.put(custId, new Customer());
vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue));
int transactions = 3;
final Map keyValues = new HashMap();
for (int i = 0; i < transactions; i++) {
OrderId orderId = new OrderId(i, custId);
ShipmentId shipmentId1 = new ShipmentId(i, orderId);
ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
keyValues.put(orderId, new Order());
keyValues.put(shipmentId1, new Shipment());
keyValues.put(shipmentId2, new Shipment());
keyValues.put(shipmentId3, new Shipment());
}
int eventsPerTransaction = 4;
vm4.invoke(() -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues,
eventsPerTransaction));
int entries = (transactions * eventsPerTransaction) + 1;
vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1));
vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions));
vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3));
ArrayList<Integer> v4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v6List =
(ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
// queue size:
assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0));
// eventsReceived:
assertEquals(entries, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1));
// events queued:
assertEquals(entries, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2));
// events distributed:
assertEquals(entries, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3));
// batches distributed:
assertEquals(2, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4));
// batches redistributed:
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5));
}
@Test
public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithCompleteTransactions() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createReceiverInVMs(vm2);
createSenders(lnPort, true);
createReceiverCustomerOrderShipmentPR(vm2, 0);
createSenderCustomerOrderShipmentPRs(vm4, 0);
createSenderCustomerOrderShipmentPRs(vm5, 0);
createSenderCustomerOrderShipmentPRs(vm6, 0);
createSenderCustomerOrderShipmentPRs(vm7, 0);
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
final Map custKeyValue = new HashMap();
int intCustId = 1;
CustId custId = new CustId(intCustId);
custKeyValue.put(custId, new Customer());
vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue));
int transactions = 3;
final Map keyValues = new HashMap();
for (int i = 0; i < transactions; i++) {
OrderId orderId = new OrderId(i, custId);
ShipmentId shipmentId1 = new ShipmentId(i, orderId);
ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
keyValues.put(orderId, new Order());
keyValues.put(shipmentId1, new Shipment());
keyValues.put(shipmentId2, new Shipment());
keyValues.put(shipmentId3, new Shipment());
}
// 3 transactions of 4 events each are sent so that the batch would
// initially contain the first 2 transactions complete and the first
// 2 events of the last transaction (10 entries).
// As --group-transaction-events is configured in the senders, the remaining
// 2 events of the last transaction are added to the batch which makes
// that only one batch of 12 events is sent.
int eventsPerTransaction = 4;
vm4.invoke(() -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues,
eventsPerTransaction));
int entries = (transactions * eventsPerTransaction) + 1;
vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1));
vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions));
vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3));
ArrayList<Integer> v4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v6List =
(ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
// queue size:
assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0));
// eventsReceived:
assertEquals(entries, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1));
// events queued:
assertEquals(entries, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2));
// events distributed:
assertEquals(entries, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3));
// batches distributed:
assertEquals(1, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4));
// batches redistributed:
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5));
// events not queued conflated:
assertEquals(0, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7));
}
@Test
public void testPRParallelPropagationWithBatchRedistWithoutGroupTransactionEventsSendsBatchesWithIncompleteTransactions() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createSenders(lnPort, false);
createSenderCustomerOrderShipmentPRs(vm4, 0);
startSenderInVMs("ln", vm4);
final Map custKeyValue = new HashMap();
int intCustId = 1;
CustId custId = new CustId(intCustId);
custKeyValue.put(custId, new Customer());
vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue));
int transactions = 6;
final Map keyValues = new HashMap();
for (int i = 0; i < transactions; i++) {
OrderId orderId = new OrderId(i, custId);
ShipmentId shipmentId1 = new ShipmentId(i, orderId);
ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
keyValues.put(orderId, new Order());
keyValues.put(shipmentId1, new Shipment());
keyValues.put(shipmentId2, new Shipment());
keyValues.put(shipmentId3, new Shipment());
}
int eventsPerTransaction = 4;
vm4.invoke(() -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues,
eventsPerTransaction));
int entries = (transactions * eventsPerTransaction) + 1;
createReceiverCustomerOrderShipmentPR(vm2, 0);
vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1));
vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions));
vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3));
// wait for batches to be redistributed and then start the receiver
vm4.invoke(() -> await()
.until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0));
createReceiverInVMs(vm2);
ArrayList<Integer> v4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
// queue size:
assertEquals(0, (int) v4List.get(0));
// events received:
assertEquals(entries, (int) v4List.get(1));
// events queued:
assertEquals(entries, (int) v4List.get(2));
// events distributed:
assertEquals(entries, (int) v4List.get(3));
// batches distributed:
assertEquals(3, (int) v4List.get(4));
// batches redistributed:
assertTrue("Batch was not redistributed", (v4List.get(5)) > 0);
}
@Test
public void testPRParallelPropagationWithBatchRedistWithGroupTransactionEventsSendsBatchesWithCompleteTransactions() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createSenders(lnPort, true);
createReceiverCustomerOrderShipmentPR(vm2, 0);
createSenderCustomerOrderShipmentPRs(vm4, 0);
startSenderInVMs("ln", vm4);
final Map custKeyValue = new HashMap();
int intCustId = 1;
CustId custId = new CustId(intCustId);
custKeyValue.put(custId, new Customer());
vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue));
int transactions = 6;
final Map keyValues = new HashMap();
for (int i = 0; i < transactions; i++) {
OrderId orderId = new OrderId(i, custId);
ShipmentId shipmentId1 = new ShipmentId(i, orderId);
ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
keyValues.put(orderId, new Order());
keyValues.put(shipmentId1, new Shipment());
keyValues.put(shipmentId2, new Shipment());
keyValues.put(shipmentId3, new Shipment());
}
// 6 transactions of 4 events each are sent so that the first batch
// would initially contain the first 2 transactions complete and the first
// 2 events of the next transaction (10 entries).
// As --group-transaction-events is configured in the senders, the remaining
// 2 events of the second transaction are added to the batch which makes
// that the first batch is sent with 12 events. The same happens with the
// second batch which will contain 12 events too.
int eventsPerTransaction = 4;
vm4.invoke(() -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues,
eventsPerTransaction));
int entries = (transactions * eventsPerTransaction) + 1;
vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1));
vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions));
vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3));
// wait for batches to be redistributed and then start the receiver
vm4.invoke(() -> await()
.until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0));
createReceiverInVMs(vm2);
ArrayList<Integer> v4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
// queue size:
assertEquals(0, (int) v4List.get(0));
// events received:
assertEquals(entries, (int) v4List.get(1));
// events queued:
assertEquals(entries, (int) v4List.get(2));
// events distributed:
assertEquals(entries, (int) v4List.get(3));
// batches distributed:
assertEquals(2, (int) v4List.get(4));
// batches redistributed:
assertTrue("Batch was not redistributed", (v4List.get(5)) > 0);
// events not queued conflated:
assertEquals(0, (int) v4List.get(7));
}
@Test
public void testPartitionedRegionParallelPropagation_AfterDispatch_Redundancy_3() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createReceiverInVMs(vm2);
createSenders(lnPort);
createReceiverPR(vm2, 0);
createSenderPRs(3);
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.doPuts(testName, NUM_PUTS));
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
ArrayList<Integer> v4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v6List =
(ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
// queue size:
assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0));
// events received:
assertEquals(400, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1));
// events queued:
assertEquals(400, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2));
// events distributed
assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3));
// batches distributed:
assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10);
// batches redistributed:
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5));
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
}
@Test
public void testWANStatsTwoWanSites_Bug44331() {
Integer lnPort = createFirstLocatorWithDSId(1);
Integer nyPort = vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
Integer tkPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3, lnPort));
createCacheInVMs(nyPort, vm2);
createCacheInVMs(tkPort, vm3);
createReceiverInVMs(vm2);
createReceiverInVMs(vm3);
vm4.invoke(() -> WANTestBase.createCache(lnPort));
vm4.invoke(() -> WANTestBase.createSender("ln1", 2, true, 100, 10, false, false, null, true));
vm4.invoke(() -> WANTestBase.createSender("ln2", 3, true, 100, 10, false, false, null, true));
createReceiverPR(vm2, 0);
vm3.invoke(() -> WANTestBase.createPartitionedRegion(testName, null, 0, 10, isOffHeap()));
vm4.invoke(() -> WANTestBase.createPartitionedRegion(testName, "ln1,ln2", 0, 10, isOffHeap()));
vm4.invoke(() -> WANTestBase.startSender("ln1"));
vm4.invoke(() -> WANTestBase.startSender("ln2"));
vm4.invoke(() -> WANTestBase.doPuts(testName, NUM_PUTS));
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
vm3.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
ArrayList<Integer> v4Sender1List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln1", 0));
ArrayList<Integer> v4Sender2List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln2", 0));
assertEquals(0, v4Sender1List.get(0).intValue()); // queue size
assertEquals(NUM_PUTS, v4Sender1List.get(1).intValue()); // eventsReceived
assertEquals(NUM_PUTS, v4Sender1List.get(2).intValue()); // events queued
assertEquals(NUM_PUTS, v4Sender1List.get(3).intValue()); // events distributed
assertTrue(v4Sender1List.get(4).intValue() >= 10); // batches distributed
assertEquals(0, v4Sender1List.get(5).intValue()); // batches redistributed
assertEquals(0, v4Sender2List.get(0).intValue()); // queue size
assertEquals(NUM_PUTS, v4Sender2List.get(1).intValue()); // eventsReceived
assertEquals(NUM_PUTS, v4Sender2List.get(2).intValue()); // events queued
assertEquals(NUM_PUTS, v4Sender2List.get(3).intValue()); // events distributed
assertTrue(v4Sender2List.get(4).intValue() >= 10); // batches distributed
assertEquals(0, v4Sender2List.get(5).intValue()); // batches redistributed
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
}
@Category({WanTest.class})
@Test
public void testParallelPropagationHA() throws Exception {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createReceiverPR(vm2, 0);
createReceiverInVMs(vm2);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
createSenderPRs(3);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts(testName, 1000));
vm2.invoke(() -> await()
.untilAsserted(() -> assertEquals("Waiting for first batch to be received", true,
getRegionSize(testName) > 10)));
AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
inv1.join();
inv2.join();
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 1000));
ArrayList<Integer> v5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v6List =
(ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
assertEquals(0, v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue size
int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
// We may see a single retried event on all members due to the kill
assertTrue("Received " + receivedEvents,
3000 <= receivedEvents && 3003 >= receivedEvents); // eventsReceived
int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
assertTrue("Queued " + queuedEvents,
3000 <= queuedEvents && 3003 >= queuedEvents); // eventsQueued
// assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed :
// its quite possible that vm4 has distributed some of the events
// assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed : its
// quite possible that vm4 has distributed some of the batches.
assertEquals(0, v5List.get(5) + v6List.get(5) + v7List.get(5)); // batches redistributed
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(NUM_PUTS, 1000, 1000));
}
@Category({WanTest.class})
@Test
public void testParallelPropagationHAWithGroupTransactionEvents() throws Exception {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createReceiverPR(vm2, 0);
createReceiverInVMs(vm2);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
createSenderPRs(3);
int batchSize = 9;
boolean groupTransactionEvents = true;
vm4.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true,
groupTransactionEvents));
vm5.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true,
groupTransactionEvents));
vm6.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true,
groupTransactionEvents));
vm7.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true,
groupTransactionEvents));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
AsyncInvocation inv1 =
vm5.invokeAsync(() -> WANTestBase.doTxPutsWithRetryIfError(testName, 2, 1000, 0));
vm2.invoke(() -> await()
.untilAsserted(() -> assertEquals("Waiting for some batches to be received", true,
getRegionSize(testName) > 40)));
AsyncInvocation inv3 = vm4.invokeAsync(() -> WANTestBase.killSender());
inv1.join();
inv3.join();
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 2000));
ArrayList<Integer> v5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v6List =
(ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
assertEquals(0, v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue size
int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
// We may see two retried events (as transactions are made of 2 events) on all members due to
// the kill
assertTrue("Received " + receivedEvents,
6000 <= receivedEvents && 6006 >= receivedEvents); // eventsReceived
int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
assertTrue("Queued " + queuedEvents,
6000 <= queuedEvents && 6006 >= queuedEvents); // eventsQueued
assertEquals(0, v5List.get(5) + v6List.get(5) + v7List.get(5)); // batches redistributed
// batchesReceived is equal to numberOfEntries/(batchSize+1)
// As transactions are 2 events long, for each batch it will always be necessary to
// add one more entry to the 9 events batch in order to have complete transactions in the batch.
int batchesReceived = (1000 + 1000) / (batchSize + 1);
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(batchesReceived, 2000, 2000));
}
/**
* 1 region and sender configured on local site and 1 region and a receiver configured on remote
* site. Puts to the local region are in progress. Remote region is destroyed in the middle.
*/
@Test
public void testParallelPropagationWithRemoteRegionDestroy() {
addIgnoredException("RegionDestroyedException");
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createReceiverPR(vm2, 0);
createReceiverInVMs(vm2);
createSenders(lnPort);
vm2.invoke(() -> WANTestBase.addCacheListenerAndDestroyRegion(testName));
createSenderPRs(0);
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// start puts in RR_1 in another thread
vm4.invoke(() -> WANTestBase.doPuts(testName, 2000));
// verify that all is well in local site. All the events should be present in local region
vm4.invoke(() -> WANTestBase.validateRegionSize(testName, 2000));
ArrayList<Integer> v4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
ArrayList<Integer> v5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
ArrayList<Integer> v6List =
(ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1));
ArrayList<Integer> v7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1));
// batches distributed: it's quite possible that vm4 has distributed some of the batches.
assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1);
// batches redistributed:
assertTrue(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1);
}
@Test
public void testParallelPropagationWithFilter() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createReceiverPR(vm2, 1);
createReceiverInVMs(vm2);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
createSenderPRs(0);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false,
new MyGatewayEventFilter(), true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false,
new MyGatewayEventFilter(), true));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false,
new MyGatewayEventFilter(), true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false,
new MyGatewayEventFilter(), true));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.doPuts(testName, 1000));
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 800));
ArrayList<Integer> v4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v6List =
(ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
ArrayList<Integer> v7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
// queue size:
assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0));
// events received:
assertEquals(1000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1));
// events queued:
assertEquals(900, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2));
// events distributed:
assertEquals(800, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3));
// batches distributed:
assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 80);
// batches redistributed:
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5));
// events filtered:
assertEquals(200, v4List.get(6) + v5List.get(6) + v6List.get(6) + v7List.get(6));
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(80, 800, 800));
}
@Test
public void testParallelPropagationConflation() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createReceiverInVMs(vm2);
createSendersWithConflation(lnPort);
createSenderPRs(0);
startPausedSenders();
createReceiverPR(vm2, 1);
Map keyValues = putKeyValues();
// Verify the conflation indexes map is empty
verifyConflationIndexesSize("ln", 0, vm4, vm5, vm6, vm7);
final Map updateKeyValues = new HashMap();
for (int i = 0; i < 50; i++) {
updateKeyValues.put(i, i + "_updated");
}
vm4.invoke(() -> WANTestBase.putGivenKeyValue(testName, updateKeyValues));
// Verify the conflation indexes map equals the number of updates
verifyConflationIndexesSize("ln", 50, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.checkQueueSize("ln",
keyValues.size() + updateKeyValues.size() /* creates aren't conflated */));
// Do the puts again. Since these are updates, the previous updates will be conflated.
vm4.invoke(() -> WANTestBase.putGivenKeyValue(testName, updateKeyValues));
// Verify the conflation indexes map still equals the number of updates
verifyConflationIndexesSize("ln", 50, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.checkQueueSize("ln",
keyValues.size() + updateKeyValues.size() /* creates aren't conflated */));
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 0));
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 0, 0));
vm4.invoke(() -> WANTestBase.resumeSender("ln"));
vm5.invoke(() -> WANTestBase.resumeSender("ln"));
vm6.invoke(() -> WANTestBase.resumeSender("ln"));
vm7.invoke(() -> WANTestBase.resumeSender("ln"));
keyValues.putAll(updateKeyValues);
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, keyValues.size()));
vm2.invoke(() -> WANTestBase.validateRegionContents(testName, keyValues));
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 150, NUM_PUTS));
vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
// Verify final stats
// 0 -> eventQueueSize
// 1 -> eventsReceived
// 2 -> eventsQueued
// 3 -> eventsDistributed
// 4 -> batchesDistributed
// 5 -> batchesRedistributed
// 7 -> eventsNotQueuedConflated
// 9 -> conflationIndexesMapSize
assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0));
assertEquals(200, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1));
assertEquals(200, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2));
assertEquals(150, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3));
assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10);
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5));
assertEquals(50, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7));
assertEquals(0, v4List.get(9) + v5List.get(9) + v6List.get(9) + v7List.get(9));
}
@Test
public void testConflationWithSameEntryPuts() {
// Start locators
Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
Integer nyPort = vm2.invoke(() -> createFirstRemoteLocator(2, lnPort));
// Configure sending site member
String senderId = "ny";
String regionName = this.testName + "_PR";
vm1.invoke(() -> createCache(lnPort));
vm1.invoke(() -> createSender(senderId, 2, true, 100, 10, true, true, null, false));
vm1.invoke(() -> createPartitionedRegion(regionName, senderId, 0, 10, isOffHeap()));
// Do puts of the same key
int numIterations = 100;
vm1.invoke(() -> putSameEntry(regionName, numIterations));
// Wait for appropriate queue size
vm1.invoke(() -> checkQueueSize(senderId, 2));
// Verify the conflation indexes size stat
verifyConflationIndexesSize(senderId, 1, vm1);
// Configure receiving site member
vm3.invoke(() -> createCache(nyPort));
vm3.invoke(() -> createReceiver());
vm3.invoke(() -> createPartitionedRegion(regionName, null, 0, 10, isOffHeap()));
// Wait for queue to drain
vm1.invoke(() -> checkQueueSize(senderId, 0));
// Verify the conflation indexes size stat
verifyConflationIndexesSize(senderId, 0, vm1);
}
@Test
public void testPartitionedRegionParallelPropagation_RestartSenders_NoRedundancy() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
createReceiverInVMs(vm2);
createSenders(lnPort);
createReceiverPR(vm2, 0);
createSenderPRs(0);
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// pause the senders
vm4.invoke(() -> WANTestBase.pauseSender("ln"));
vm5.invoke(() -> WANTestBase.pauseSender("ln"));
vm6.invoke(() -> WANTestBase.pauseSender("ln"));
vm7.invoke(() -> WANTestBase.pauseSender("ln"));
vm4.invoke(() -> WANTestBase.doPuts(testName, NUM_PUTS));
vm4.invoke(() -> WANTestBase.stopSender("ln"));
vm5.invoke(() -> WANTestBase.stopSender("ln"));
vm6.invoke(() -> WANTestBase.stopSender("ln"));
vm7.invoke(() -> WANTestBase.stopSender("ln"));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.checkQueueSizeInStats("ln", 0));
vm5.invoke(() -> WANTestBase.checkQueueSizeInStats("ln", 0));
vm6.invoke(() -> WANTestBase.checkQueueSizeInStats("ln", 0));
vm7.invoke(() -> WANTestBase.checkQueueSizeInStats("ln", 0));
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
}
@Test
public void testMaximumTimeBetweenPingsInGatewayReceiverIsHonored() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
vm2.invoke(() -> createServer(nyPort));
// Set a maximum time between pings lower than the time between pings sent at the sender (5000)
// so that the receiver will not receive the ping on time and will close the connection.
int maximumTimeBetweenPingsInGatewayReceiver = 4000;
createReceiverInVMs(maximumTimeBetweenPingsInGatewayReceiver, vm2);
createReceiverPR(vm2, 0);
int maximumTimeBetweenPingsInServer = 60000;
vm4.invoke(() -> createServer(lnPort, maximumTimeBetweenPingsInServer));
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
createSenderPRInVM(0, vm4);
String senderId = "ln";
startSenderInVMs(senderId, vm4);
// Send some puts to start the connections from the sender to the receiver
vm4.invoke(() -> WANTestBase.doPuts(testName, 2));
// Create client to check if connections are later closed.
ClientCache clientCache = new ClientCacheFactory()
.addPoolLocator("localhost", lnPort)
.setPoolLoadConditioningInterval(-1)
.setPoolIdleTimeout(-1)
.setPoolPingInterval(5000)
.create();
Region clientRegion =
clientCache.<String, String>createClientRegionFactory(ClientRegionShortcut.PROXY)
.create(testName);
for (long i = 0; i < 2; i++) {
clientRegion.put(i, "Value_" + i);
}
// Wait more than maximum-time-between-pings in the gateway receiver so that connections are
// closed.
try {
Thread.sleep(maximumTimeBetweenPingsInGatewayReceiver + 2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
assertNotEquals(0, (int) vm4.invoke(() -> getGatewaySenderPoolDisconnects(senderId)));
assertEquals(0, ((PoolImpl) clientCache.getDefaultPool()).getStats().getDisConnects());
}
protected Map putKeyValues() {
final Map keyValues = new HashMap();
for (int i = 0; i < NUM_PUTS; i++) {
keyValues.put(i, i);
}
vm4.invoke(() -> WANTestBase.putGivenKeyValue(testName, keyValues));
vm4.invoke(() -> WANTestBase.checkQueueSize("ln", keyValues.size()));
return keyValues;
}
protected void createReceiverPR(VM vm, int redundancy) {
vm.invoke(
() -> WANTestBase.createPartitionedRegion(testName, null, redundancy, 10, isOffHeap()));
}
protected void createReceiverCustomerOrderShipmentPR(VM vm, int redundancy) {
vm.invoke(
() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion(null, redundancy, 10,
isOffHeap()));
}
protected void createSenderCustomerOrderShipmentPRs(VM vm, int redundancy) {
vm.invoke(
() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", redundancy, 10,
isOffHeap()));
}
protected void createSenderPRs(int redundancy) {
vm4.invoke(
() -> WANTestBase.createPartitionedRegion(testName, "ln", redundancy, 10, isOffHeap()));
vm5.invoke(
() -> WANTestBase.createPartitionedRegion(testName, "ln", redundancy, 10, isOffHeap()));
vm6.invoke(
() -> WANTestBase.createPartitionedRegion(testName, "ln", redundancy, 10, isOffHeap()));
vm7.invoke(
() -> WANTestBase.createPartitionedRegion(testName, "ln", redundancy, 10, isOffHeap()));
}
protected void createSenderPRInVM(int redundancy, VM vm) {
vm.invoke(
() -> WANTestBase.createPartitionedRegion(testName, "ln", redundancy, 10, isOffHeap()));
}
protected void startPausedSenders() {
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm4.invoke(() -> pauseSender("ln"));
vm5.invoke(() -> pauseSender("ln"));
vm6.invoke(() -> pauseSender("ln"));
vm7.invoke(() -> pauseSender("ln"));
}
protected void createSendersWithConflation(Integer lnPort) {
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
}
protected void createSenderInVm(Integer lnPort, VM vm) {
createCacheInVMs(lnPort, vm);
vm.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
}
protected void createSenders(Integer lnPort, boolean groupTransactionEvents) {
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true,
groupTransactionEvents));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true,
groupTransactionEvents));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true,
groupTransactionEvents));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true,
groupTransactionEvents));
}
protected void createSenders(Integer lnPort) {
createSenders(lnPort, false);
}
private void verifyConflationIndexesSize(String senderId, int expectedSize, VM... vms) {
int actualSize = 0;
for (VM vm : vms) {
List<Integer> stats = vm.invoke(() -> WANTestBase.getSenderStats(senderId, -1));
actualSize += stats.get(9);
}
assertEquals(expectedSize, actualSize);
}
private void putSameEntry(String regionName, int numIterations) {
// This does one create and numInterations-1 updates
Region region = cache.getRegion(regionName);
for (int i = 0; i < numIterations; i++) {
region.put(0, i);
}
}
}