| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.wan.parallel; |
| |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.client.ClientCache; |
| import org.apache.geode.cache.client.ClientCacheFactory; |
| import org.apache.geode.cache.client.ClientRegionShortcut; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.internal.cache.execute.data.CustId; |
| import org.apache.geode.internal.cache.execute.data.Customer; |
| import org.apache.geode.internal.cache.execute.data.Order; |
| import org.apache.geode.internal.cache.execute.data.OrderId; |
| import org.apache.geode.internal.cache.execute.data.Shipment; |
| import org.apache.geode.internal.cache.execute.data.ShipmentId; |
| import org.apache.geode.internal.cache.wan.WANTestBase; |
| import org.apache.geode.test.dunit.AsyncInvocation; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.junit.categories.WanTest; |
| |
| @Category({WanTest.class}) |
| public class ParallelWANStatsDUnitTest extends WANTestBase { |
| |
| private static final int NUM_PUTS = 100; |
| private static final long serialVersionUID = 1L; |
| |
| private String testName; |
| |
| public ParallelWANStatsDUnitTest() { |
| super(); |
| } |
| |
| @Override |
| protected final void postSetUpWANTestBase() { |
| this.testName = getTestMethodName(); |
| } |
| |
| @Test |
| public void testConnectionStatsAreCreated() { |
| // 1. Create locators |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| // 2. Create cache & receiver in vm2 |
| createCacheInVMs(nyPort, vm2); |
| createReceiverInVMs(vm2); |
| |
| // 3. Create cache & sender in vm4 |
| createSenderInVm(lnPort, vm4); |
| |
| // 4. Create Region in vm4 (sender) |
| createSenderPRInVM(1, vm4); |
| |
| // 5. Create region in vm2 (receiver) |
| createReceiverPR(vm2, 1); |
| |
| // 6. Start sender in vm4 |
| startSenderInVMs("ln", vm4); |
| |
| vm4.invoke(() -> WANTestBase.checkConnectionStats("ln")); |
| } |
| |
| @Test |
| public void testQueueSizeInSecondaryBucketRegionQueuesWithMemberRestart() { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| createReceiverInVMs(vm2); |
| |
| createSendersWithConflation(lnPort); |
| |
| createSenderPRs(1); |
| |
| startPausedSenders(); |
| |
| createReceiverPR(vm2, 1); |
| putKeyValues(); |
| |
| ArrayList<Integer> v4List = |
| (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| ArrayList<Integer> v5List = |
| (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| ArrayList<Integer> v6List = |
| (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| ArrayList<Integer> v7List = |
| (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| |
| assertEquals(NUM_PUTS, |
| v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue size |
| assertEquals(NUM_PUTS * 2, |
| v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived |
| assertEquals(NUM_PUTS * 2, |
| v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events queued |
| assertEquals(0, |
| v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events distributed |
| assertEquals(NUM_PUTS, |
| v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary queue size |
| |
| // stop vm7 to trigger rebalance and move some primary buckets |
| System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) |
| + ":" + v6List.get(10) + ":" + v7List.get(10)); |
| vm7.invoke(() -> WANTestBase.closeCache()); |
| await().untilAsserted(() -> { |
| int v4secondarySize = vm4.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln")); |
| int v5secondarySize = vm5.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln")); |
| int v6secondarySize = vm6.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln")); |
| assertEquals(NUM_PUTS, v4secondarySize + v5secondarySize + v6secondarySize); |
| }); |
| System.out.println("New secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) + ":" |
| + v6List.get(10)); |
| |
| vm7.invoke(() -> WANTestBase.createCache(lnPort)); |
| vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true)); |
| vm7.invoke(() -> WANTestBase.createPartitionedRegion(testName, "ln", 1, 10, isOffHeap())); |
| startSenderInVMs("ln", vm7); |
| vm7.invoke(() -> pauseSender("ln")); |
| |
| v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| assertEquals(NUM_PUTS, |
| v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary |
| // queue |
| // size |
| System.out.println("After restart vm7, secondary queue sizes:" + v4List.get(10) + ":" |
| + v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10)); |
| |
| vm4.invoke(() -> WANTestBase.resumeSender("ln")); |
| vm5.invoke(() -> WANTestBase.resumeSender("ln")); |
| vm6.invoke(() -> WANTestBase.resumeSender("ln")); |
| vm7.invoke(() -> WANTestBase.resumeSender("ln")); |
| |
| vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); |
| vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS)); |
| |
| vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0)); |
| |
| v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| |
| // events distributed: |
| assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); |
| // secondary queue size: |
| assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); |
| } |
| |
| // TODO: add a test without redundancy for primary switch |
| @Test |
| public void testQueueSizeInSecondaryWithPrimarySwitch() { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| createReceiverInVMs(vm2); |
| |
| createSendersWithConflation(lnPort); |
| |
| createSenderPRs(1); |
| |
| startPausedSenders(); |
| |
| createReceiverPR(vm2, 1); |
| |
| putKeyValues(); |
| |
| ArrayList<Integer> v4List = |
| (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| ArrayList<Integer> v5List = |
| (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| ArrayList<Integer> v6List = |
| (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| ArrayList<Integer> v7List = |
| (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| |
| // queue size: |
| assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); |
| // events received: |
| assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); |
| // events queued: |
| assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); |
| // events distributed: |
| assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); |
| // secondary queue size: |
| assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); |
| |
| vm4.invoke(() -> WANTestBase.resumeSender("ln")); |
| vm5.invoke(() -> WANTestBase.resumeSender("ln")); |
| vm6.invoke(() -> WANTestBase.resumeSender("ln")); |
| vm7.invoke(() -> WANTestBase.resumeSender("ln")); |
| vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); |
| vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS)); |
| |
| vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0)); |
| |
| v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| |
| // events distributed: |
| assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); |
| // secondary queue size: |
| assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); |
| } |
| |
| @Test |
| public void testPartitionedRegionParallelPropagation_BeforeDispatch() { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2, vm3); |
| createReceiverInVMs(vm2, vm3); |
| |
| createSendersWithConflation(lnPort); |
| |
| createSenderPRs(0); |
| |
| startPausedSenders(); |
| |
| createReceiverPR(vm2, 1); |
| createReceiverPR(vm3, 1); |
| |
| putKeyValues(); |
| |
| ArrayList<Integer> v4List = |
| (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| ArrayList<Integer> v5List = |
| (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| ArrayList<Integer> v6List = |
| (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| ArrayList<Integer> v7List = |
| (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); |
| |
| // queue size: |
| assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); |
| // events received: |
| assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); |
| // events queued: |
| assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); |
| // events distributed |
| assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); |
| // batches distributed: |
| assertEquals(0, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)); |
| // batches redistributed |
| assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); |
| } |
| |
| @Test |
| public void testPartitionedRegionParallelPropagation_AfterDispatch_NoRedundancy() { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| createReceiverInVMs(vm2); |
| |
| createSenders(lnPort); |
| |
| createReceiverPR(vm2, 0); |
| |
| createSenderPRs(0); |
| |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> WANTestBase.doPuts(testName, NUM_PUTS)); |
| |
| vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); |
| |
| ArrayList<Integer> v4List = |
| (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v5List = |
| (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v6List = |
| (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v7List = |
| (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| |
| // queue size: |
| assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); |
| // eventsReceived: |
| assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); |
| // events queued: |
| assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); |
| // events distributed: |
| assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); |
| // batches distributed: |
| assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); |
| // batches redistributed: |
| assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); |
| |
| vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS)); |
| } |
| |
| @Test |
| public void testPRParallelPropagationWithoutGroupTransactionEventsSendsBatchesWithIncompleteTransactions() { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| createReceiverInVMs(vm2); |
| |
| createSenders(lnPort, false); |
| |
| createReceiverCustomerOrderShipmentPR(vm2, 0); |
| |
| createSenderCustomerOrderShipmentPRs(vm4, 0); |
| createSenderCustomerOrderShipmentPRs(vm5, 0); |
| createSenderCustomerOrderShipmentPRs(vm6, 0); |
| createSenderCustomerOrderShipmentPRs(vm7, 0); |
| |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| final Map custKeyValue = new HashMap(); |
| int intCustId = 1; |
| CustId custId = new CustId(intCustId); |
| custKeyValue.put(custId, new Customer()); |
| vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue)); |
| |
| int transactions = 3; |
| final Map keyValues = new HashMap(); |
| for (int i = 0; i < transactions; i++) { |
| OrderId orderId = new OrderId(i, custId); |
| ShipmentId shipmentId1 = new ShipmentId(i, orderId); |
| ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId); |
| ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId); |
| keyValues.put(orderId, new Order()); |
| keyValues.put(shipmentId1, new Shipment()); |
| keyValues.put(shipmentId2, new Shipment()); |
| keyValues.put(shipmentId3, new Shipment()); |
| } |
| int eventsPerTransaction = 4; |
| vm4.invoke(() -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues, |
| eventsPerTransaction)); |
| |
| int entries = (transactions * eventsPerTransaction) + 1; |
| |
| vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1)); |
| vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions)); |
| vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3)); |
| |
| ArrayList<Integer> v4List = |
| (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v5List = |
| (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v6List = |
| (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v7List = |
| (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| |
| // queue size: |
| assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); |
| // eventsReceived: |
| assertEquals(entries, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); |
| // events queued: |
| assertEquals(entries, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); |
| // events distributed: |
| assertEquals(entries, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); |
| // batches distributed: |
| assertEquals(2, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)); |
| // batches redistributed: |
| assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); |
| } |
| |
| @Test |
| public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithCompleteTransactions() { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| createReceiverInVMs(vm2); |
| |
| createSenders(lnPort, true); |
| |
| createReceiverCustomerOrderShipmentPR(vm2, 0); |
| |
| createSenderCustomerOrderShipmentPRs(vm4, 0); |
| createSenderCustomerOrderShipmentPRs(vm5, 0); |
| createSenderCustomerOrderShipmentPRs(vm6, 0); |
| createSenderCustomerOrderShipmentPRs(vm7, 0); |
| |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| final Map custKeyValue = new HashMap(); |
| int intCustId = 1; |
| CustId custId = new CustId(intCustId); |
| custKeyValue.put(custId, new Customer()); |
| vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue)); |
| |
| int transactions = 3; |
| final Map keyValues = new HashMap(); |
| for (int i = 0; i < transactions; i++) { |
| OrderId orderId = new OrderId(i, custId); |
| ShipmentId shipmentId1 = new ShipmentId(i, orderId); |
| ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId); |
| ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId); |
| keyValues.put(orderId, new Order()); |
| keyValues.put(shipmentId1, new Shipment()); |
| keyValues.put(shipmentId2, new Shipment()); |
| keyValues.put(shipmentId3, new Shipment()); |
| } |
| |
| // 3 transactions of 4 events each are sent so that the batch would |
| // initially contain the first 2 transactions complete and the first |
| // 2 events of the last transaction (10 entries). |
| // As --group-transaction-events is configured in the senders, the remaining |
| // 2 events of the last transaction are added to the batch which makes |
| // that only one batch of 12 events is sent. |
| int eventsPerTransaction = 4; |
| vm4.invoke(() -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues, |
| eventsPerTransaction)); |
| |
| int entries = (transactions * eventsPerTransaction) + 1; |
| |
| vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1)); |
| vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions)); |
| vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3)); |
| |
| ArrayList<Integer> v4List = |
| (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v5List = |
| (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v6List = |
| (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v7List = |
| (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| |
| // queue size: |
| assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); |
| // eventsReceived: |
| assertEquals(entries, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); |
| // events queued: |
| assertEquals(entries, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); |
| // events distributed: |
| assertEquals(entries, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); |
| // batches distributed: |
| assertEquals(1, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)); |
| // batches redistributed: |
| assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); |
| // events not queued conflated: |
| assertEquals(0, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)); |
| } |
| |
| @Test |
| public void testPRParallelPropagationWithBatchRedistWithoutGroupTransactionEventsSendsBatchesWithIncompleteTransactions() { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| |
| createSenders(lnPort, false); |
| |
| createSenderCustomerOrderShipmentPRs(vm4, 0); |
| |
| startSenderInVMs("ln", vm4); |
| |
| final Map custKeyValue = new HashMap(); |
| int intCustId = 1; |
| CustId custId = new CustId(intCustId); |
| custKeyValue.put(custId, new Customer()); |
| vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue)); |
| |
| int transactions = 6; |
| final Map keyValues = new HashMap(); |
| for (int i = 0; i < transactions; i++) { |
| OrderId orderId = new OrderId(i, custId); |
| ShipmentId shipmentId1 = new ShipmentId(i, orderId); |
| ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId); |
| ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId); |
| keyValues.put(orderId, new Order()); |
| keyValues.put(shipmentId1, new Shipment()); |
| keyValues.put(shipmentId2, new Shipment()); |
| keyValues.put(shipmentId3, new Shipment()); |
| } |
| int eventsPerTransaction = 4; |
| vm4.invoke(() -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues, |
| eventsPerTransaction)); |
| |
| int entries = (transactions * eventsPerTransaction) + 1; |
| |
| createReceiverCustomerOrderShipmentPR(vm2, 0); |
| |
| vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1)); |
| vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions)); |
| vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3)); |
| |
| // wait for batches to be redistributed and then start the receiver |
| vm4.invoke(() -> await() |
| .until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0)); |
| |
| createReceiverInVMs(vm2); |
| |
| ArrayList<Integer> v4List = |
| (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| |
| // queue size: |
| assertEquals(0, (int) v4List.get(0)); |
| // events received: |
| assertEquals(entries, (int) v4List.get(1)); |
| // events queued: |
| assertEquals(entries, (int) v4List.get(2)); |
| // events distributed: |
| assertEquals(entries, (int) v4List.get(3)); |
| // batches distributed: |
| assertEquals(3, (int) v4List.get(4)); |
| // batches redistributed: |
| assertTrue("Batch was not redistributed", (v4List.get(5)) > 0); |
| } |
| |
| @Test |
| public void testPRParallelPropagationWithBatchRedistWithGroupTransactionEventsSendsBatchesWithCompleteTransactions() { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| |
| createSenders(lnPort, true); |
| |
| createReceiverCustomerOrderShipmentPR(vm2, 0); |
| |
| createSenderCustomerOrderShipmentPRs(vm4, 0); |
| |
| startSenderInVMs("ln", vm4); |
| |
| |
| final Map custKeyValue = new HashMap(); |
| int intCustId = 1; |
| CustId custId = new CustId(intCustId); |
| custKeyValue.put(custId, new Customer()); |
| vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue)); |
| |
| int transactions = 6; |
| final Map keyValues = new HashMap(); |
| for (int i = 0; i < transactions; i++) { |
| OrderId orderId = new OrderId(i, custId); |
| ShipmentId shipmentId1 = new ShipmentId(i, orderId); |
| ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId); |
| ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId); |
| keyValues.put(orderId, new Order()); |
| keyValues.put(shipmentId1, new Shipment()); |
| keyValues.put(shipmentId2, new Shipment()); |
| keyValues.put(shipmentId3, new Shipment()); |
| } |
| |
| // 6 transactions of 4 events each are sent so that the first batch |
| // would initially contain the first 2 transactions complete and the first |
| // 2 events of the next transaction (10 entries). |
| // As --group-transaction-events is configured in the senders, the remaining |
| // 2 events of the second transaction are added to the batch which makes |
| // that the first batch is sent with 12 events. The same happens with the |
| // second batch which will contain 12 events too. |
| int eventsPerTransaction = 4; |
| vm4.invoke(() -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues, |
| eventsPerTransaction)); |
| |
| int entries = (transactions * eventsPerTransaction) + 1; |
| |
| vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1)); |
| vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions)); |
| vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3)); |
| |
| // wait for batches to be redistributed and then start the receiver |
| vm4.invoke(() -> await() |
| .until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0)); |
| |
| createReceiverInVMs(vm2); |
| |
| ArrayList<Integer> v4List = |
| (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| |
| // queue size: |
| assertEquals(0, (int) v4List.get(0)); |
| // events received: |
| assertEquals(entries, (int) v4List.get(1)); |
| // events queued: |
| assertEquals(entries, (int) v4List.get(2)); |
| // events distributed: |
| assertEquals(entries, (int) v4List.get(3)); |
| // batches distributed: |
| assertEquals(2, (int) v4List.get(4)); |
| // batches redistributed: |
| assertTrue("Batch was not redistributed", (v4List.get(5)) > 0); |
| // events not queued conflated: |
| assertEquals(0, (int) v4List.get(7)); |
| } |
| |
| @Test |
| public void testPartitionedRegionParallelPropagation_AfterDispatch_Redundancy_3() { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| createReceiverInVMs(vm2); |
| |
| createSenders(lnPort); |
| |
| createReceiverPR(vm2, 0); |
| |
| createSenderPRs(3); |
| |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> WANTestBase.doPuts(testName, NUM_PUTS)); |
| |
| vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); |
| |
| ArrayList<Integer> v4List = |
| (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v5List = |
| (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v6List = |
| (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v7List = |
| (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| |
| // queue size: |
| assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); |
| // events received: |
| assertEquals(400, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); |
| // events queued: |
| assertEquals(400, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); |
| // events distributed |
| assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); |
| // batches distributed: |
| assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); |
| // batches redistributed: |
| assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); |
| |
| vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS)); |
| } |
| |
| @Test |
| public void testWANStatsTwoWanSites_Bug44331() { |
| Integer lnPort = createFirstLocatorWithDSId(1); |
| Integer nyPort = vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| Integer tkPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| createCacheInVMs(tkPort, vm3); |
| createReceiverInVMs(vm2); |
| createReceiverInVMs(vm3); |
| |
| vm4.invoke(() -> WANTestBase.createCache(lnPort)); |
| |
| vm4.invoke(() -> WANTestBase.createSender("ln1", 2, true, 100, 10, false, false, null, true)); |
| |
| vm4.invoke(() -> WANTestBase.createSender("ln2", 3, true, 100, 10, false, false, null, true)); |
| |
| createReceiverPR(vm2, 0); |
| vm3.invoke(() -> WANTestBase.createPartitionedRegion(testName, null, 0, 10, isOffHeap())); |
| |
| vm4.invoke(() -> WANTestBase.createPartitionedRegion(testName, "ln1,ln2", 0, 10, isOffHeap())); |
| |
| vm4.invoke(() -> WANTestBase.startSender("ln1")); |
| |
| vm4.invoke(() -> WANTestBase.startSender("ln2")); |
| |
| vm4.invoke(() -> WANTestBase.doPuts(testName, NUM_PUTS)); |
| |
| vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); |
| vm3.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); |
| |
| ArrayList<Integer> v4Sender1List = |
| (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln1", 0)); |
| ArrayList<Integer> v4Sender2List = |
| (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln2", 0)); |
| |
| assertEquals(0, v4Sender1List.get(0).intValue()); // queue size |
| assertEquals(NUM_PUTS, v4Sender1List.get(1).intValue()); // eventsReceived |
| assertEquals(NUM_PUTS, v4Sender1List.get(2).intValue()); // events queued |
| assertEquals(NUM_PUTS, v4Sender1List.get(3).intValue()); // events distributed |
| assertTrue(v4Sender1List.get(4).intValue() >= 10); // batches distributed |
| assertEquals(0, v4Sender1List.get(5).intValue()); // batches redistributed |
| |
| assertEquals(0, v4Sender2List.get(0).intValue()); // queue size |
| assertEquals(NUM_PUTS, v4Sender2List.get(1).intValue()); // eventsReceived |
| assertEquals(NUM_PUTS, v4Sender2List.get(2).intValue()); // events queued |
| assertEquals(NUM_PUTS, v4Sender2List.get(3).intValue()); // events distributed |
| assertTrue(v4Sender2List.get(4).intValue() >= 10); // batches distributed |
| assertEquals(0, v4Sender2List.get(5).intValue()); // batches redistributed |
| |
| vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS)); |
| vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS)); |
| } |
| |
| @Category({WanTest.class}) |
| @Test |
| public void testParallelPropagationHA() throws Exception { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| |
| createReceiverPR(vm2, 0); |
| |
| createReceiverInVMs(vm2); |
| |
| createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); |
| |
| createSenderPRs(3); |
| |
| vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true)); |
| vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true)); |
| vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true)); |
| vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true)); |
| |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts(testName, 1000)); |
| vm2.invoke(() -> await() |
| .untilAsserted(() -> assertEquals("Waiting for first batch to be received", true, |
| getRegionSize(testName) > 10))); |
| AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); |
| inv1.join(); |
| inv2.join(); |
| |
| vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 1000)); |
| |
| ArrayList<Integer> v5List = |
| (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v6List = |
| (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v7List = |
| (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| |
| assertEquals(0, v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue size |
| int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1); |
| // We may see a single retried event on all members due to the kill |
| assertTrue("Received " + receivedEvents, |
| 3000 <= receivedEvents && 3003 >= receivedEvents); // eventsReceived |
| int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2); |
| assertTrue("Queued " + queuedEvents, |
| 3000 <= queuedEvents && 3003 >= queuedEvents); // eventsQueued |
| // assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed : |
| // its quite possible that vm4 has distributed some of the events |
| // assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed : its |
| // quite possible that vm4 has distributed some of the batches. |
| assertEquals(0, v5List.get(5) + v6List.get(5) + v7List.get(5)); // batches redistributed |
| |
| vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(NUM_PUTS, 1000, 1000)); |
| } |
| |
| @Category({WanTest.class}) |
| @Test |
| public void testParallelPropagationHAWithGroupTransactionEvents() throws Exception { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| |
| createReceiverPR(vm2, 0); |
| |
| createReceiverInVMs(vm2); |
| |
| createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); |
| |
| createSenderPRs(3); |
| |
| int batchSize = 9; |
| boolean groupTransactionEvents = true; |
| vm4.invoke( |
| () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true, |
| groupTransactionEvents)); |
| vm5.invoke( |
| () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true, |
| groupTransactionEvents)); |
| vm6.invoke( |
| () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true, |
| groupTransactionEvents)); |
| vm7.invoke( |
| () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true, |
| groupTransactionEvents)); |
| |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| AsyncInvocation inv1 = |
| vm5.invokeAsync(() -> WANTestBase.doTxPutsWithRetryIfError(testName, 2, 1000, 0)); |
| |
| vm2.invoke(() -> await() |
| .untilAsserted(() -> assertEquals("Waiting for some batches to be received", true, |
| getRegionSize(testName) > 40))); |
| AsyncInvocation inv3 = vm4.invokeAsync(() -> WANTestBase.killSender()); |
| inv1.join(); |
| inv3.join(); |
| |
| vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 2000)); |
| |
| ArrayList<Integer> v5List = |
| (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v6List = |
| (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v7List = |
| (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| |
| assertEquals(0, v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue size |
| int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1); |
| // We may see two retried events (as transactions are made of 2 events) on all members due to |
| // the kill |
| assertTrue("Received " + receivedEvents, |
| 6000 <= receivedEvents && 6006 >= receivedEvents); // eventsReceived |
| int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2); |
| assertTrue("Queued " + queuedEvents, |
| 6000 <= queuedEvents && 6006 >= queuedEvents); // eventsQueued |
| assertEquals(0, v5List.get(5) + v6List.get(5) + v7List.get(5)); // batches redistributed |
| |
| // batchesReceived is equal to numberOfEntries/(batchSize+1) |
| // As transactions are 2 events long, for each batch it will always be necessary to |
| // add one more entry to the 9 events batch in order to have complete transactions in the batch. |
| int batchesReceived = (1000 + 1000) / (batchSize + 1); |
| vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(batchesReceived, 2000, 2000)); |
| } |
| |
| |
| /** |
| * 1 region and sender configured on local site and 1 region and a receiver configured on remote |
| * site. Puts to the local region are in progress. Remote region is destroyed in the middle. |
| */ |
| @Test |
| public void testParallelPropagationWithRemoteRegionDestroy() { |
| addIgnoredException("RegionDestroyedException"); |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| createReceiverPR(vm2, 0); |
| createReceiverInVMs(vm2); |
| |
| createSenders(lnPort); |
| |
| vm2.invoke(() -> WANTestBase.addCacheListenerAndDestroyRegion(testName)); |
| |
| createSenderPRs(0); |
| |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| // start puts in RR_1 in another thread |
| vm4.invoke(() -> WANTestBase.doPuts(testName, 2000)); |
| |
| // verify that all is well in local site. All the events should be present in local region |
| vm4.invoke(() -> WANTestBase.validateRegionSize(testName, 2000)); |
| |
| ArrayList<Integer> v4List = |
| (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)); |
| ArrayList<Integer> v5List = |
| (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)); |
| ArrayList<Integer> v6List = |
| (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1)); |
| ArrayList<Integer> v7List = |
| (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1)); |
| |
| // batches distributed: it's quite possible that vm4 has distributed some of the batches. |
| assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1); |
| // batches redistributed: |
| assertTrue(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1); |
| } |
| |
| @Test |
| public void testParallelPropagationWithFilter() { |
| |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| |
| createReceiverPR(vm2, 1); |
| |
| createReceiverInVMs(vm2); |
| |
| createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); |
| |
| createSenderPRs(0); |
| |
| vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, |
| new MyGatewayEventFilter(), true)); |
| vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, |
| new MyGatewayEventFilter(), true)); |
| vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, |
| new MyGatewayEventFilter(), true)); |
| vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, |
| new MyGatewayEventFilter(), true)); |
| |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> WANTestBase.doPuts(testName, 1000)); |
| |
| vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 800)); |
| |
| ArrayList<Integer> v4List = |
| (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v5List = |
| (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v6List = |
| (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| ArrayList<Integer> v7List = |
| (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| |
| // queue size: |
| assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); |
| // events received: |
| assertEquals(1000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); |
| // events queued: |
| assertEquals(900, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); |
| // events distributed: |
| assertEquals(800, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); |
| // batches distributed: |
| assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 80); |
| // batches redistributed: |
| assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); |
| // events filtered: |
| assertEquals(200, v4List.get(6) + v5List.get(6) + v6List.get(6) + v7List.get(6)); |
| vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(80, 800, 800)); |
| } |
| |
| @Test |
| public void testParallelPropagationConflation() { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| createReceiverInVMs(vm2); |
| |
| createSendersWithConflation(lnPort); |
| |
| createSenderPRs(0); |
| |
| startPausedSenders(); |
| |
| createReceiverPR(vm2, 1); |
| |
| Map keyValues = putKeyValues(); |
| |
| // Verify the conflation indexes map is empty |
| verifyConflationIndexesSize("ln", 0, vm4, vm5, vm6, vm7); |
| |
| final Map updateKeyValues = new HashMap(); |
| for (int i = 0; i < 50; i++) { |
| updateKeyValues.put(i, i + "_updated"); |
| } |
| |
| vm4.invoke(() -> WANTestBase.putGivenKeyValue(testName, updateKeyValues)); |
| |
| // Verify the conflation indexes map equals the number of updates |
| verifyConflationIndexesSize("ln", 50, vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> WANTestBase.checkQueueSize("ln", |
| keyValues.size() + updateKeyValues.size() /* creates aren't conflated */)); |
| |
| // Do the puts again. Since these are updates, the previous updates will be conflated. |
| vm4.invoke(() -> WANTestBase.putGivenKeyValue(testName, updateKeyValues)); |
| |
| // Verify the conflation indexes map still equals the number of updates |
| verifyConflationIndexesSize("ln", 50, vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> WANTestBase.checkQueueSize("ln", |
| keyValues.size() + updateKeyValues.size() /* creates aren't conflated */)); |
| |
| vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 0)); |
| |
| vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 0, 0)); |
| |
| vm4.invoke(() -> WANTestBase.resumeSender("ln")); |
| vm5.invoke(() -> WANTestBase.resumeSender("ln")); |
| vm6.invoke(() -> WANTestBase.resumeSender("ln")); |
| vm7.invoke(() -> WANTestBase.resumeSender("ln")); |
| |
| keyValues.putAll(updateKeyValues); |
| vm2.invoke(() -> WANTestBase.validateRegionSize(testName, keyValues.size())); |
| |
| vm2.invoke(() -> WANTestBase.validateRegionContents(testName, keyValues)); |
| |
| vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 150, NUM_PUTS)); |
| |
| vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0)); |
| |
| List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); |
| |
| // Verify final stats |
| // 0 -> eventQueueSize |
| // 1 -> eventsReceived |
| // 2 -> eventsQueued |
| // 3 -> eventsDistributed |
| // 4 -> batchesDistributed |
| // 5 -> batchesRedistributed |
| // 7 -> eventsNotQueuedConflated |
| // 9 -> conflationIndexesMapSize |
| assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); |
| assertEquals(200, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); |
| assertEquals(200, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); |
| assertEquals(150, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); |
| assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); |
| assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); |
| assertEquals(50, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)); |
| assertEquals(0, v4List.get(9) + v5List.get(9) + v6List.get(9) + v7List.get(9)); |
| } |
| |
| @Test |
| public void testConflationWithSameEntryPuts() { |
| // Start locators |
| Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm2.invoke(() -> createFirstRemoteLocator(2, lnPort)); |
| |
| // Configure sending site member |
| String senderId = "ny"; |
| String regionName = this.testName + "_PR"; |
| vm1.invoke(() -> createCache(lnPort)); |
| vm1.invoke(() -> createSender(senderId, 2, true, 100, 10, true, true, null, false)); |
| vm1.invoke(() -> createPartitionedRegion(regionName, senderId, 0, 10, isOffHeap())); |
| |
| // Do puts of the same key |
| int numIterations = 100; |
| vm1.invoke(() -> putSameEntry(regionName, numIterations)); |
| |
| // Wait for appropriate queue size |
| vm1.invoke(() -> checkQueueSize(senderId, 2)); |
| |
| // Verify the conflation indexes size stat |
| verifyConflationIndexesSize(senderId, 1, vm1); |
| |
| // Configure receiving site member |
| vm3.invoke(() -> createCache(nyPort)); |
| vm3.invoke(() -> createReceiver()); |
| vm3.invoke(() -> createPartitionedRegion(regionName, null, 0, 10, isOffHeap())); |
| |
| // Wait for queue to drain |
| vm1.invoke(() -> checkQueueSize(senderId, 0)); |
| |
| // Verify the conflation indexes size stat |
| verifyConflationIndexesSize(senderId, 0, vm1); |
| } |
| |
| |
| @Test |
| public void testPartitionedRegionParallelPropagation_RestartSenders_NoRedundancy() { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2); |
| createReceiverInVMs(vm2); |
| |
| createSenders(lnPort); |
| |
| createReceiverPR(vm2, 0); |
| |
| createSenderPRs(0); |
| |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| // pause the senders |
| vm4.invoke(() -> WANTestBase.pauseSender("ln")); |
| vm5.invoke(() -> WANTestBase.pauseSender("ln")); |
| vm6.invoke(() -> WANTestBase.pauseSender("ln")); |
| vm7.invoke(() -> WANTestBase.pauseSender("ln")); |
| |
| vm4.invoke(() -> WANTestBase.doPuts(testName, NUM_PUTS)); |
| |
| vm4.invoke(() -> WANTestBase.stopSender("ln")); |
| vm5.invoke(() -> WANTestBase.stopSender("ln")); |
| vm6.invoke(() -> WANTestBase.stopSender("ln")); |
| vm7.invoke(() -> WANTestBase.stopSender("ln")); |
| |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> WANTestBase.checkQueueSizeInStats("ln", 0)); |
| vm5.invoke(() -> WANTestBase.checkQueueSizeInStats("ln", 0)); |
| vm6.invoke(() -> WANTestBase.checkQueueSizeInStats("ln", 0)); |
| vm7.invoke(() -> WANTestBase.checkQueueSizeInStats("ln", 0)); |
| |
| vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); |
| |
| } |
| |
| @Test |
| public void testMaximumTimeBetweenPingsInGatewayReceiverIsHonored() { |
| Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| vm2.invoke(() -> createServer(nyPort)); |
| |
| // Set a maximum time between pings lower than the time between pings sent at the sender (5000) |
| // so that the receiver will not receive the ping on time and will close the connection. |
| int maximumTimeBetweenPingsInGatewayReceiver = 4000; |
| createReceiverInVMs(maximumTimeBetweenPingsInGatewayReceiver, vm2); |
| createReceiverPR(vm2, 0); |
| |
| int maximumTimeBetweenPingsInServer = 60000; |
| vm4.invoke(() -> createServer(lnPort, maximumTimeBetweenPingsInServer)); |
| vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); |
| createSenderPRInVM(0, vm4); |
| String senderId = "ln"; |
| startSenderInVMs(senderId, vm4); |
| |
| // Send some puts to start the connections from the sender to the receiver |
| vm4.invoke(() -> WANTestBase.doPuts(testName, 2)); |
| |
| // Create client to check if connections are later closed. |
| ClientCache clientCache = new ClientCacheFactory() |
| .addPoolLocator("localhost", lnPort) |
| .setPoolLoadConditioningInterval(-1) |
| .setPoolIdleTimeout(-1) |
| .setPoolPingInterval(5000) |
| .create(); |
| Region clientRegion = |
| clientCache.<String, String>createClientRegionFactory(ClientRegionShortcut.PROXY) |
| .create(testName); |
| for (long i = 0; i < 2; i++) { |
| clientRegion.put(i, "Value_" + i); |
| } |
| |
| // Wait more than maximum-time-between-pings in the gateway receiver so that connections are |
| // closed. |
| try { |
| Thread.sleep(maximumTimeBetweenPingsInGatewayReceiver + 2000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| |
| assertNotEquals(0, (int) vm4.invoke(() -> getGatewaySenderPoolDisconnects(senderId))); |
| assertEquals(0, ((PoolImpl) clientCache.getDefaultPool()).getStats().getDisConnects()); |
| } |
| |
| protected Map putKeyValues() { |
| final Map keyValues = new HashMap(); |
| for (int i = 0; i < NUM_PUTS; i++) { |
| keyValues.put(i, i); |
| } |
| |
| vm4.invoke(() -> WANTestBase.putGivenKeyValue(testName, keyValues)); |
| |
| vm4.invoke(() -> WANTestBase.checkQueueSize("ln", keyValues.size())); |
| |
| return keyValues; |
| } |
| |
| protected void createReceiverPR(VM vm, int redundancy) { |
| vm.invoke( |
| () -> WANTestBase.createPartitionedRegion(testName, null, redundancy, 10, isOffHeap())); |
| } |
| |
| protected void createReceiverCustomerOrderShipmentPR(VM vm, int redundancy) { |
| vm.invoke( |
| () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion(null, redundancy, 10, |
| isOffHeap())); |
| } |
| |
| protected void createSenderCustomerOrderShipmentPRs(VM vm, int redundancy) { |
| vm.invoke( |
| () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", redundancy, 10, |
| isOffHeap())); |
| } |
| |
| protected void createSenderPRs(int redundancy) { |
| vm4.invoke( |
| () -> WANTestBase.createPartitionedRegion(testName, "ln", redundancy, 10, isOffHeap())); |
| vm5.invoke( |
| () -> WANTestBase.createPartitionedRegion(testName, "ln", redundancy, 10, isOffHeap())); |
| vm6.invoke( |
| () -> WANTestBase.createPartitionedRegion(testName, "ln", redundancy, 10, isOffHeap())); |
| vm7.invoke( |
| () -> WANTestBase.createPartitionedRegion(testName, "ln", redundancy, 10, isOffHeap())); |
| } |
| |
| protected void createSenderPRInVM(int redundancy, VM vm) { |
| vm.invoke( |
| () -> WANTestBase.createPartitionedRegion(testName, "ln", redundancy, 10, isOffHeap())); |
| } |
| |
| protected void startPausedSenders() { |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> pauseSender("ln")); |
| vm5.invoke(() -> pauseSender("ln")); |
| vm6.invoke(() -> pauseSender("ln")); |
| vm7.invoke(() -> pauseSender("ln")); |
| } |
| |
| protected void createSendersWithConflation(Integer lnPort) { |
| createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true)); |
| vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true)); |
| vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true)); |
| vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true)); |
| } |
| |
| protected void createSenderInVm(Integer lnPort, VM vm) { |
| createCacheInVMs(lnPort, vm); |
| vm.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); |
| } |
| |
| protected void createSenders(Integer lnPort, boolean groupTransactionEvents) { |
| createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, |
| groupTransactionEvents)); |
| vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, |
| groupTransactionEvents)); |
| vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, |
| groupTransactionEvents)); |
| vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, |
| groupTransactionEvents)); |
| } |
| |
| protected void createSenders(Integer lnPort) { |
| createSenders(lnPort, false); |
| } |
| |
| private void verifyConflationIndexesSize(String senderId, int expectedSize, VM... vms) { |
| int actualSize = 0; |
| for (VM vm : vms) { |
| List<Integer> stats = vm.invoke(() -> WANTestBase.getSenderStats(senderId, -1)); |
| actualSize += stats.get(9); |
| } |
| assertEquals(expectedSize, actualSize); |
| } |
| |
| private void putSameEntry(String regionName, int numIterations) { |
| // This does one create and numInterations-1 updates |
| Region region = cache.getRegion(regionName); |
| for (int i = 0; i < numIterations; i++) { |
| region.put(0, i); |
| } |
| } |
| } |