blob: 01e05fcbea72c9e57f25b7ba33675dfb033b1ab9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan.serial;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.cache30.MyGatewayEventFilter1;
import org.apache.geode.cache30.MyGatewayTransportFilter1;
import org.apache.geode.cache30.MyGatewayTransportFilter2;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.junit.categories.WanTest;
@Category({WanTest.class})
public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
@Test
public void testPrimarySecondaryQueueDrainInOrder_RR() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
vm2.invoke(() -> WANTestBase.createCache(nyPort));
vm3.invoke(() -> WANTestBase.createCache(nyPort));
vm2.invoke(
() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
vm3.invoke(
() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
vm2.invoke(() -> WANTestBase.createReceiver());
vm3.invoke(() -> WANTestBase.createReceiver());
vm4.invoke(() -> WANTestBase.createCache(lnPort));
vm5.invoke(() -> WANTestBase.createCache(lnPort));
vm6.invoke(() -> WANTestBase.createCache(lnPort));
vm7.invoke(() -> WANTestBase.createCache(lnPort));
vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers("ln", 2, false, 100, 10, false,
false, null, true, 1, OrderPolicy.KEY));
vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers("ln", 2, false, 100, 10, false,
false, null, true, 1, OrderPolicy.KEY));
startSenderInVMs("ln", vm4, vm5);
vm4.invoke(
() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln", isOffHeap()));
vm5.invoke(
() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln", isOffHeap()));
vm6.invoke(
() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln", isOffHeap()));
vm7.invoke(
() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln", isOffHeap()));
vm4.invoke(() -> WANTestBase.addQueueListener("ln", false));
vm5.invoke(() -> WANTestBase.addQueueListener("ln", false));
vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_RR"));
vm3.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_RR"));
vm4.invoke(() -> WANTestBase.pauseSender("ln"));
vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
ArrayList<Integer> v4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
ArrayList<Integer> v5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
// secondary queue size stats in serial queue should be 0
assertEquals(0, v4List.get(10) + v5List.get(10));
HashMap primarySenderUpdates = (HashMap) vm4.invoke(() -> WANTestBase.checkQueue());
HashMap secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue());
assertEquals(primarySenderUpdates, secondarySenderUpdates);
vm4.invoke(() -> WANTestBase.resumeSender("ln"));
Wait.pause(2000);
vm4.invoke(() -> WANTestBase.pauseSender("ln"));
Wait.pause(2000);
// We should wait till primarySenderUpdates and secondarySenderUpdates become same
// If in 300000ms they don't then throw error.
primarySenderUpdates = (HashMap) vm4.invoke(() -> WANTestBase.checkQueue());
secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue());
checkPrimarySenderUpdatesOnVM5(primarySenderUpdates);
// assertIndexDetailsEquals(primarySenderUpdates, secondarySenderUpdates);
vm4.invoke(() -> WANTestBase.resumeSender("ln"));
Wait.pause(5000);
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000));
primarySenderUpdates = (HashMap) vm4.invoke(() -> WANTestBase.checkQueue());
HashMap receiverUpdates = (HashMap) vm2.invoke(() -> WANTestBase.checkQueue());
List destroyList = (List) primarySenderUpdates.get("Destroy");
List createList = (List) receiverUpdates.get("Create");
for (int i = 0; i < 1000; i++) {
assertEquals(destroyList.get(i), createList.get(i));
}
assertEquals(primarySenderUpdates.get("Destroy"), receiverUpdates.get("Create"));
Wait.pause(5000);
// We expect that after this much time secondary would have got batch removal message
// removing all the keys.
secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue());
assertEquals(secondarySenderUpdates.get("Destroy"), receiverUpdates.get("Create"));
vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
}
protected void checkPrimarySenderUpdatesOnVM5(HashMap primarySenderUpdates) {
vm5.invoke(() -> WANTestBase.checkQueueOnSecondary(primarySenderUpdates));
}
@Test
public void testPrimarySecondaryQueueDrainInOrder_PR() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR"));
vm3.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR"));
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers("ln", 2, false, 100, 10, false,
false, null, true, 1, OrderPolicy.KEY));
vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers("ln", 2, false, 100, 10, false,
false, null, true, 1, OrderPolicy.KEY));
vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
isOffHeap()));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
isOffHeap()));
vm6.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
isOffHeap()));
vm7.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
isOffHeap()));
startSenderInVMs("ln", vm4, vm5);
vm4.invoke(() -> WANTestBase.addQueueListener("ln", false));
vm5.invoke(() -> WANTestBase.addQueueListener("ln", false));
vm4.invoke(() -> WANTestBase.pauseSender("ln"));
vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000));
Wait.pause(5000);
HashMap primarySenderUpdates = (HashMap) vm4.invoke(() -> WANTestBase.checkQueue());
HashMap secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue());
checkPrimarySenderUpdatesOnVM5(primarySenderUpdates);
vm4.invoke(() -> WANTestBase.resumeSender("ln"));
Wait.pause(4000);
vm4.invoke(() -> WANTestBase.pauseSender("ln"));
Wait.pause(15000);
primarySenderUpdates = (HashMap) vm4.invoke(() -> WANTestBase.checkQueue());
secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue());
assertEquals(primarySenderUpdates, secondarySenderUpdates);
vm4.invoke(() -> WANTestBase.resumeSender("ln"));
Wait.pause(5000);
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
}
/**
* Test to validate that serial gateway sender queue diskSynchronous attribute when persistence of
* sender is enabled.
*/
@Test
public void test_ValidateSerialGatewaySenderQueueAttributes_1() {
Integer localLocPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer remoteLocPort =
(Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, localLocPort));
WANTestBase test = new WANTestBase();
Properties props = test.getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "localhost[" + localLocPort + "]");
InternalDistributedSystem ds = test.getSystem(props);
cache = CacheFactory.create(ds);
File directory =
new File("TKSender" + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
directory.mkdir();
File[] dirs1 = new File[] {directory};
DiskStoreFactory dsf = cache.createDiskStoreFactory();
dsf.setDiskDirs(dirs1);
DiskStore diskStore = dsf.create("FORNY");
GatewaySenderFactory fact = cache.createGatewaySenderFactory();
fact.setBatchConflationEnabled(true);
fact.setBatchSize(200);
fact.setBatchTimeInterval(300);
fact.setPersistenceEnabled(true);// enable the persistence
fact.setDiskSynchronous(true);
fact.setDiskStoreName("FORNY");
fact.setMaximumQueueMemory(200);
fact.setAlertThreshold(1200);
GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
fact.addGatewayEventFilter(myEventFilter1);
GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
fact.addGatewayTransportFilter(myStreamFilter1);
GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
fact.addGatewayTransportFilter(myStreamFilter2);
final IgnoredException exTKSender = IgnoredException.addIgnoredException("Could not connect");
try {
GatewaySender sender1 = fact.create("TKSender", 2);
AttributesFactory factory = new AttributesFactory();
factory.addGatewaySenderId(sender1.getId());
factory.setDataPolicy(DataPolicy.PARTITION);
Region region = cache.createRegionFactory(factory.create())
.create("test_ValidateGatewaySenderAttributes");
Set<GatewaySender> senders = cache.getGatewaySenders();
assertEquals(senders.size(), 1);
GatewaySender gatewaySender = senders.iterator().next();
Set<RegionQueue> regionQueues = ((AbstractGatewaySender) gatewaySender).getQueues();
assertEquals(regionQueues.size(), GatewaySender.DEFAULT_DISPATCHER_THREADS);
RegionQueue regionQueue = regionQueues.iterator().next();
assertEquals(true, regionQueue.getRegion().getAttributes().isDiskSynchronous());
} finally {
exTKSender.remove();
}
}
/**
* Test to validate that serial gateway sender queue diskSynchronous attribute when persistence of
* sender is not enabled.
*/
@Test
public void test_ValidateSerialGatewaySenderQueueAttributes_2() {
Integer localLocPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer remoteLocPort =
(Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, localLocPort));
WANTestBase test = new WANTestBase();
Properties props = test.getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "localhost[" + localLocPort + "]");
InternalDistributedSystem ds = test.getSystem(props);
cache = CacheFactory.create(ds);
GatewaySenderFactory fact = cache.createGatewaySenderFactory();
fact.setBatchConflationEnabled(true);
fact.setBatchSize(200);
fact.setBatchTimeInterval(300);
fact.setPersistenceEnabled(false);// set persistence to false
fact.setDiskSynchronous(true);
fact.setMaximumQueueMemory(200);
fact.setAlertThreshold(1200);
GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
fact.addGatewayEventFilter(myEventFilter1);
GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
fact.addGatewayTransportFilter(myStreamFilter1);
GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
fact.addGatewayTransportFilter(myStreamFilter2);
final IgnoredException exp = IgnoredException.addIgnoredException("Could not connect");
try {
GatewaySender sender1 = fact.create("TKSender", 2);
AttributesFactory factory = new AttributesFactory();
factory.addGatewaySenderId(sender1.getId());
factory.setDataPolicy(DataPolicy.PARTITION);
Region region = cache.createRegionFactory(factory.create())
.create("test_ValidateGatewaySenderAttributes");
Set<GatewaySender> senders = cache.getGatewaySenders();
assertEquals(senders.size(), 1);
GatewaySender gatewaySender = senders.iterator().next();
Set<RegionQueue> regionQueues = ((AbstractGatewaySender) gatewaySender).getQueues();
assertEquals(regionQueues.size(), GatewaySender.DEFAULT_DISPATCHER_THREADS);
RegionQueue regionQueue = regionQueues.iterator().next();
assertEquals(false, regionQueue.getRegion().getAttributes().isDiskSynchronous());
} finally {
exp.remove();
}
}
/**
* Test to validate that the maximum number of senders can be created and used successfully.
*/
@Test
public void testCreateMaximumSenders() {
// Create locators
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// Create receiver and region
vm2.invoke(() -> WANTestBase.createCache(nyPort));
vm2.invoke(
() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
vm2.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_RR"));
// Create maximum number of senders
vm4.invoke(() -> WANTestBase.createCache(lnPort));
StringBuilder builder = new StringBuilder();
long maxSenders = ThreadIdentifier.Bits.GATEWAY_ID.mask() + 1;
for (int i = 0; i < maxSenders; i++) {
String senderId = "ln-" + i;
builder.append(senderId);
if (i + 1 != maxSenders) {
builder.append(',');
}
vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers(senderId, 2, false, 100, 10,
false, false, null, false, 1, OrderPolicy.KEY, 32768));
}
// Create region with the sender ids
vm4.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR",
builder.toString(), isOffHeap()));
// Do puts
int numPuts = 100;
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", numPuts));
// Verify receiver listener events
vm2.invoke(() -> WANTestBase.verifyListenerEvents(maxSenders * numPuts));
}
/**
* Test to validate that the maximum number of senders plus one fails to be created.
*/
@Test
public void testCreateMaximumPlusOneSenders() {
// Create locators
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// Create receiver
vm2.invoke(() -> WANTestBase.createCache(nyPort));
vm2.invoke(
() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
vm2.invoke(() -> WANTestBase.createReceiver());
// Create maximum number of senders
vm4.invoke(() -> WANTestBase.createCache(lnPort));
for (int i = 0; i < ThreadIdentifier.Bits.GATEWAY_ID.mask() + 1; i++) {
String senderId = "ln-" + i;
vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers(senderId, 2, false, 100, 10,
false, false, null, false, 1, OrderPolicy.KEY, 32768));
}
// Attempt to create one more sender
vm4.invoke(() -> SerialGatewaySenderQueueDUnitTest.attemptToCreateGatewaySenderOverLimit());
}
private static void attemptToCreateGatewaySenderOverLimit() {
IgnoredException exp =
IgnoredException.addIgnoredException(IllegalStateException.class.getName());
try {
createSenderWithMultipleDispatchers("ln-one-too-many", 2, false, 100, 10, false, false, null,
false, 1, OrderPolicy.KEY, 32768);
fail("Should not have been able to create gateway sender");
} catch (IllegalStateException e) {
/* ignore expected exception */
} finally {
exp.remove();
}
}
}