blob: 6cd98bee060c9dd6a47d5d1e8859456de9002458 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan.misc;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.cache30.MyGatewayTransportFilter1;
import org.apache.geode.cache30.MyGatewayTransportFilter2;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.SenderIdMonitor;
import org.apache.geode.internal.cache.wan.Filter70;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.MyGatewayTransportFilter3;
import org.apache.geode.internal.cache.wan.MyGatewayTransportFilter4;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.WanTest;
@Category({WanTest.class})
public class WanValidationsDUnitTest extends WANTestBase {
public WanValidationsDUnitTest() {
super();
}
/**
* Test to make sure that serial sender Ids configured in Distributed Region is same across all DR
* nodes TODO : Should this validation hold tru now. Discuss. If I have 2 members on Which DR is
* defined. But sender is defined on only one member. How can I add the instance on the sender in
* Region which does not have a sender. I can bypass the existing validation for the DR with
* SerialGatewaySender. But for PR with SerialGatewaySender, we need to send the adjunct message.
* Find out the way to send the adjunct message to the member on which serialGatewaySender is
* available.
*/
@Test
public void testSameSerialGatewaySenderIdAcrossSameDistributedRegion() throws Exception {
IgnoredException.addIgnoredException("another cache has the same region defined");
try {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(lnPort, vm4, vm5);
vm4.invoke(
() -> WANTestBase.createSender("ln1", 2, false, 10, 100, false, false, null, true));
vm4.invoke(
() -> WANTestBase.createSender("ln2", 2, false, 10, 100, false, false, null, true));
vm5.invoke(
() -> WANTestBase.createSender("ln2", 2, false, 10, 100, false, false, null, true));
vm5.invoke(
() -> WANTestBase.createSender("ln3", 2, false, 10, 100, false, false, null, true));
vm4.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln1,ln2",
isOffHeap()));
vm5.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln2,ln3",
isOffHeap()));
fail("Expected IllegalStateException with incompatible gateway sender ids message");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException)
|| !(e.getCause().getMessage().contains("Cannot create Region"))) {
Assert.fail("Expected IllegalStateException with incompatible gateway sender ids message",
e);
}
}
}
/**
* Validate that ParallelGatewaySender can be added to Distributed region
*
*
*
* Below test is disabled intentionally Replicated region with Parallel Async Event queue
* is not supported. Test is added for the same
* ReplicatedRegion_ParallelWANPropagationDUnitTest#test_DR_PGS_1Nodes_Put_Receiver
*
* We are gone support this configuration in upcoming releases
*/
@Ignore("Bug51491")
@Test
public void testParallelGatewaySenderForDistributedRegion() throws Exception {
try {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(lnPort, vm4, vm5);
vm4.invoke(
() -> WANTestBase.createSender("ln1", 2, true, 10, 100, false, false, null, false));
vm5.invoke(
() -> WANTestBase.createSender("ln2", 2, true, 10, 100, false, false, null, false));
vm4.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln1",
isOffHeap()));
vm5.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln1",
isOffHeap()));
} catch (Exception e) {
Assert.fail("Caught Exception", e);
}
}
/**
* Test to make sure that serial sender Ids configured in partitioned regions should be same
* across all PR members
*/
@Test
public void testSameSerialGatewaySenderIdAcrossSamePartitionedRegion() throws Exception {
IgnoredException.addIgnoredException("another cache has the same region defined");
try {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm4, vm5);
vm4.invoke(
() -> WANTestBase.createSender("ln1", 2, false, 10, 100, false, false, null, true));
vm4.invoke(
() -> WANTestBase.createSender("ln2", 2, false, 10, 100, false, false, null, true));
vm5.invoke(
() -> WANTestBase.createSender("ln2", 2, false, 10, 100, false, false, null, true));
vm5.invoke(
() -> WANTestBase.createSender("ln3", 2, false, 10, 100, false, false, null, true));
vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln1,ln2",
1, 100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln2,ln3",
1, 100, isOffHeap()));
fail("Expected IllegalStateException with incompatible gateway sender ids message");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException)
|| !(e.getCause().getMessage().contains("Cannot create Region"))) {
Assert.fail("Expected IllegalStateException with incompatible gateway sender ids message",
e);
}
}
}
@Test
public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled() {
IgnoredException.addIgnoredException("another cache has the same region defined");
try {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm4, vm5);
vm4.invoke(() -> WANTestBase.createReplicatedRegionWithAsyncEventQueue(
getTestMethodName() + "_RR", "ln1", isOffHeap()));
vm5.invoke(() -> WANTestBase.createReplicatedRegionWithAsyncEventQueue(
getTestMethodName() + "_RR", "ln2", isOffHeap()));
fail("Expected IllegalStateException with incompatible gateway sender ids message");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException)
|| !(e.getCause().getMessage().contains("Cannot create Region"))) {
Assert.fail("Expected IllegalStateException with incompatible gateway sender ids message",
e);
}
}
}
/**
* Test to make sure that parallel sender Ids configured in partitioned regions should be same
* across all PR members
*/
@Test
public void testSameParallelGatewaySenderIdAcrossSamePartitionedRegion() throws Exception {
IgnoredException.addIgnoredException("another cache has the same region defined");
try {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm4, vm5);
vm4.invoke(() -> WANTestBase.createSender("ln1", 2, true, 10, 100, false, false, null, true));
vm4.invoke(() -> WANTestBase.createSender("ln2", 2, true, 10, 100, false, false, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln2", 2, true, 10, 100, false, false, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln3", 2, true, 10, 100, false, false, null, true));
vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln1,ln2",
1, 100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln2,ln3",
1, 100, isOffHeap()));
fail("Expected IllegalStateException with incompatible gateway sender ids message");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException)
|| !(e.getCause().getMessage().contains("Cannot create Region"))) {
Assert.fail("Expected IllegalStateException with incompatible gateway sender ids message",
e);
}
}
}
/**
* Test to make sure that same parallel gateway sender id can be used by 2 different PRs
*
*/
@Ignore
@Test
public void testSameParallelGatewaySenderIdAcrossDifferentPartitionedRegion() throws Exception {
try {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1);
vm1.invoke(() -> WANTestBase.createSender("ln1_Parallel", 2, true, 10, 100, false, false,
null, true));
vm1.invoke(() -> WANTestBase.createSender("ln2_Parallel", 2, true, 10, 100, false, false,
null, true));
vm1.invoke(() -> WANTestBase.createPartitionedRegionWithSerialParallelSenderIds(
getTestMethodName() + "_PR1", null, "ln1_Parallel,ln2_Parallel", null, isOffHeap()));
vm1.invoke(() -> WANTestBase.createPartitionedRegionWithSerialParallelSenderIds(
getTestMethodName() + "_PR2", null, "ln1_Parallel,ln2_Parallel", null, isOffHeap()));
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage()
.contains("cannot have the same parallel gateway sender id"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
@Test
public void testSameParallelGatewaySenderIdAcrossColocatedPartitionedRegion() throws Exception {
IgnoredException.addIgnoredException("another cache has the same region defined");
try {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1);
vm1.invoke(() -> WANTestBase.createSender("ln1_Parallel", 2, true, 10, 100, false, false,
null, true));
vm1.invoke(() -> WANTestBase.createSender("ln2_Parallel", 2, true, 10, 100, false, false,
null, true));
vm1.invoke(() -> WANTestBase.createPartitionedRegionWithSerialParallelSenderIds(
getTestMethodName() + "_PR1", null, "ln1_Parallel", null, isOffHeap()));
vm1.invoke(() -> WANTestBase.createPartitionedRegionWithSerialParallelSenderIds(
getTestMethodName() + "_PR2", null, "ln1_Parallel,ln2_Parallel",
getTestMethodName() + "_PR1", isOffHeap()));
// now we support this
// fail("Expected IllegalStateException with incompatible gateway sender ids in colocated
// regions");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage()
.contains("should have same parallel gateway sender ids"))) {
Assert.fail(
"Expected IllegalStateException with incompatible gateway sender ids in colocated regions",
e);
}
}
}
/**
* Validate that if Colocated partitioned region doesn't want to add a PGS even if its parent has
* one then it is fine
*
*/
@Test
public void testSameParallelGatewaySenderIdAcrossColocatedPartitionedRegion2() throws Exception {
try {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1);
vm1.invoke(() -> WANTestBase.createSender("ln1_Parallel", 2, true, 10, 100, false, false,
null, true));
vm1.invoke(() -> WANTestBase.createSender("ln2_Parallel", 2, true, 10, 100, false, false,
null, true));
vm1.invoke(() -> WANTestBase.createPartitionedRegionWithSerialParallelSenderIds(
getTestMethodName() + "_PR1", null, "ln1_Parallel", null, isOffHeap()));
vm1.invoke(() -> WANTestBase.createPartitionedRegionWithSerialParallelSenderIds(
getTestMethodName() + "_PR2", null, null, getTestMethodName() + "_PR1", isOffHeap()));
} catch (Exception e) {
Assert.fail("The tests caught Exception.", e);
}
}
/**
* Validate that if Colocated partitioned region has a subset of PGS then it is fine.
*
*/
@Test
public void testSameParallelGatewaySenderIdAcrossColocatedPartitionedRegion3() throws Exception {
try {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1);
vm1.invoke(() -> WANTestBase.createSender("ln1_Parallel", 2, true, 10, 100, false, false,
null, true));
vm1.invoke(() -> WANTestBase.createSender("ln2_Parallel", 2, true, 10, 100, false, false,
null, true));
vm1.invoke(() -> WANTestBase.createPartitionedRegionWithSerialParallelSenderIds(
getTestMethodName() + "_PR1", null, "ln1_Parallel,ln2_Parallel", null, isOffHeap()));
vm1.invoke(() -> WANTestBase.createPartitionedRegionWithSerialParallelSenderIds(
getTestMethodName() + "_PR2", null, "ln1_Parallel", getTestMethodName() + "_PR1",
isOffHeap()));
} catch (Exception e) {
Assert.fail("The tests caught Exception.", e);
}
}
/**
* Validate that if Colocated partitioned region has a superset of PGS then Exception is thrown.
*
*/
@Test
public void testSameParallelGatewaySenderIdAcrossColocatedPartitionedRegion4() throws Exception {
try {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1);
vm1.invoke(() -> WANTestBase.createSender("ln1_Parallel", 2, true, 10, 100, false, false,
null, true));
vm1.invoke(() -> WANTestBase.createSender("ln2_Parallel", 2, true, 10, 100, false, false,
null, true));
vm1.invoke(() -> WANTestBase.createSender("ln3_Parallel", 2, true, 10, 100, false, false,
null, true));
vm1.invoke(() -> WANTestBase.createPartitionedRegionWithSerialParallelSenderIds(
getTestMethodName() + "_PR1", null, "ln1_Parallel,ln2_Parallel", null, isOffHeap()));
vm1.invoke(() -> WANTestBase.createPartitionedRegionWithSerialParallelSenderIds(
getTestMethodName() + "_PR2", null, "ln1_Parallel,ln2_Parallel,ln3_Parallel",
getTestMethodName() + "_PR1", isOffHeap()));
// now we support this
// fail("Expected IllegalStateException with incompatible gateway sender ids in colocated
// regions");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage()
.contains("should have same parallel gateway sender ids"))) {
Assert.fail(
"Expected IllegalStateException with incompatible gateway sender ids in colocated regions",
e);
}
}
}
/**
* SerialGatewaySender and ParallelGatewaySender with same name is allowed
*/
@Test
public void testSerialGatewaySenderAndParallelGatewaySenderWithSameName() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1);
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false, null,
null, true, false));
try {
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, true, 100, false, false,
null, null, true, false));
fail("Expected IllegalStateException : Sender names should be different.");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException)
|| !(e.getCause().getMessage().contains("is already defined in this cache"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
// remote ds ids should be same
@Test
public void testSameRemoteDSAcrossSameSender() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false, null,
null, true, false));
try {
vm2.invoke(() -> WANTestBase.createSenderForValidations("ln", 3, false, 100, false, false,
null, null, true, false));
fail("Expected IllegalStateException : Remote Ds Ids should match");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage().contains(
"because another cache has the same Gateway Sender defined with remote ds id"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
// sender with same name should be either serial or parallel but not both.
@Test
public void testSerialSenderOnBothCache() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false, null,
null, true, false));
try {
vm2.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, true, 100, false, false,
null, null, true, false));
fail("Expected IllegalStateException : is not serial Gateway Sender");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage()
.contains("because another cache has the same sender as serial gateway sender"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
// sender with same name should be either serial or parallel but not both.
@Test
public void testParallelSenderOnBothCache() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, true, 100, false, false, null,
null, true, false));
try {
vm2.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false,
null, null, true, false));
fail("Expected IllegalStateException : is not parallel Gateway Sender");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage()
.contains("because another cache has the same sender as parallel gateway sender"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
// isBatchConflation should be same across the same sender
@Test
public void testBatchConflation() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false, null,
null, true, false));
// isBatchConflation
try {
vm2.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, true, false,
null, null, true, false));
fail("Expected IllegalStateException : isBatchConflation Should match");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage().contains(
"another cache has the same Gateway Sender defined with isBatchConflationEnabled"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
// isPersistentEnabled should be same across the same sender
@Test
public void testisPersistentEnabled() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false, null,
null, true, false));
try {
vm2.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, true,
null, null, true, false));
fail("Expected IllegalStateException : isPersistentEnabled Should match");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage().contains(
"because another cache has the same Gateway Sender defined with isPersistentEnabled"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
@Test
public void testAlertThreshold() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false, null,
null, true, false));
try {
vm2.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 50, false, false,
null, null, true, false));
fail("Expected IllegalStateException : alertThreshold Should match");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage().contains(
"because another cache has the same Gateway Sender defined with alertThreshold"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
@Test
public void testManualStart() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false, null,
null, true, false));
try {
vm2.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false,
null, null, false, false));
fail("Expected IllegalStateException : manualStart Should match");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage().contains(
"because another cache has the same Gateway Sender defined with manual start"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
@Test
public void testGatewayEventFilters() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
ArrayList<GatewayEventFilter> eventFilters = new ArrayList<GatewayEventFilter>();
eventFilters.add(new MyGatewayEventFilter());
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false,
eventFilters, null, true, false));
try {
eventFilters.clear();
eventFilters.add(new Filter70());
vm2.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false,
eventFilters, null, true, false));
fail("Expected IllegalStateException : GatewayEventFilters Should match");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage().contains(
"because another cache has the same Gateway Sender defined with GatewayEventFilters"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
@Test
public void testGatewayEventFilters2() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
ArrayList<GatewayEventFilter> eventFilters = new ArrayList<GatewayEventFilter>();
eventFilters.add(new MyGatewayEventFilter());
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false,
eventFilters, null, true, false));
try {
eventFilters.clear();
eventFilters.add(new MyGatewayEventFilter());
eventFilters.add(new Filter70());
vm2.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false,
eventFilters, null, true, false));
fail("Expected IllegalStateException : GatewayEventFilters Should match");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage().contains(
"because another cache has the same Gateway Sender defined with GatewayEventFilters"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
@Test
public void testGatewayTransportFilters() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>();
transportFilters.add(new MyGatewayTransportFilter1());
transportFilters.add(new MyGatewayTransportFilter2());
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false, null,
transportFilters, true, false));
try {
transportFilters.clear();
transportFilters.add(new MyGatewayTransportFilter3());
transportFilters.add(new MyGatewayTransportFilter4());
vm2.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false,
null, transportFilters, true, false));
fail("Expected IllegalStateException : GatewayEventFilters Should match");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage().contains(
"because another cache has the same Gateway Sender defined with GatewayTransportFilters"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
@Test
public void testGatewayTransportFiltersOrder() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>();
transportFilters.add(new MyGatewayTransportFilter1());
transportFilters.add(new MyGatewayTransportFilter2());
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false, null,
transportFilters, true, false));
try {
transportFilters.clear();
transportFilters.add(new MyGatewayTransportFilter2());
transportFilters.add(new MyGatewayTransportFilter1());
vm2.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false,
null, transportFilters, true, false));
fail("Expected IllegalStateException : GatewayEventFilters Should match");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage().contains(
"because another cache has the same Gateway Sender defined with GatewayTransportFilters"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
@Test
public void testIsDiskSynchronous() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
vm1.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false, null,
null, true, false));
try {
vm2.invoke(() -> WANTestBase.createSenderForValidations("ln", 2, false, 100, false, false,
null, null, true, true));
fail("Expected IllegalStateException : isDiskSynchronous Should match");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage().contains(
"because another cache has the same Gateway Sender defined with isDiskSynchronous"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
/**
* This test has been added for the defect # 44372. A single VM hosts a cache server as well as a
* Receiver. Expected: Cache.getCacheServer should return only the cache server and not the
* Receiver
*/
@Test
public void test_GetCacheServersDoesNotReturnReceivers() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm4);
vm4.invoke(() -> WANTestBase.createReceiver());
vm4.invoke(() -> WANTestBase.createCacheServer());
Map cacheServers = (Map) vm4.invoke(() -> WANTestBase.getCacheServers());
assertEquals("Cache.getCacheServers returned incorrect BridgeServers: ", 1,
cacheServers.get("BridgeServer"));
assertEquals("Cache.getCacheServers returned incorrect ReceiverServers: ", 0,
cacheServers.get("ReceiverServer"));
}
/**
* Added for the defect # 44372. Two VMs are part of the DS. One VM hosts a cache server while
* the other hosts a Receiver. Expected: Cache.getCacheServers should only return the bridge
* server and not the Receiver.
*/
@Test
public void test_GetCacheServersDoesNotReturnReceivers_Scenario2() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm4);
vm4.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm5);
vm5.invoke(() -> WANTestBase.createCacheServer());
Map cacheServers_vm4 = (Map) vm4.invoke(() -> WANTestBase.getCacheServers());
Map cacheServers_vm5 = (Map) vm5.invoke(() -> WANTestBase.getCacheServers());
assertEquals("Cache.getCacheServers on vm4 returned incorrect BridgeServers: ", 0,
cacheServers_vm4.get("BridgeServer"));
assertEquals("Cache.getCacheServers on vm4 returned incorrect ReceiverServers: ", 0,
cacheServers_vm4.get("ReceiverServer"));
assertEquals("Cache.getCacheServers on vm5 returned incorrect BridgeServers: ", 1,
cacheServers_vm5.get("BridgeServer"));
assertEquals("Cache.getCacheServers on vm5 returned incorrect ReceiverServers: ", 0,
cacheServers_vm5.get("ReceiverServer"));
}
// dispatcher threads are same across all the nodes for ParallelGatewaySender
/*
* We are allowing number of dispatcher threads for parallel sender to differ on number of
* machines
*/
@Ignore
@Test
public void testDispatcherThreadsForParallelGatewaySender() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
vm1.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, true, 100, 10, false, false, null,
true, 5, OrderPolicy.KEY));
// dispatcher threads
try {
vm2.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, true, 100, 10, false, false,
null, true, 4, OrderPolicy.KEY));
fail("Expected IllegalStateException : dispatcher threads Should match");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage().contains(
"because another cache has the same Gateway Sender defined with dispatcherThread"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
// dispatcher threads are same across all the nodes for ParallelGatewaySender
/*
* For Parallel sender, thread policy is not supported which is checked at the time of sender
* creation. policy KEY and Partition are same for PGS. Hence disabling the tests
*/
@Ignore
@Test
public void testOrderPolicyForParallelGatewaySender() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm1, vm2);
vm1.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, true, 100, 10, false, false, null,
true, 5, OrderPolicy.KEY));
// dispatcher threads
try {
vm2.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, true, 100, 10, false, false,
null, true, 5, OrderPolicy.PARTITION));
fail("Expected IllegalStateException : order policy Should match");
} catch (Exception e) {
if (!(e.getCause() instanceof IllegalStateException) || !(e.getCause().getMessage().contains(
"because another cache has the same Gateway Sender defined with orderPolicy"))) {
Assert.fail("Expected IllegalStateException", e);
}
}
}
@Test
public void test_RR_Serial_warnAboutGatewaySenderIdsConsistency() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
vm2.invoke(createReceiverReplicatedRegion());
vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
vm4.invoke(() -> WANTestBase.startSender("ln"));
vm4.invoke(createReceiverReplicatedRegion());
vm5.invoke(createReceiverReplicatedRegion());
vm6.invoke(createReceiverReplicatedRegion());
vm7.invoke(createReceiverReplicatedRegion());
String regionName = getTestMethodName() + "_RR";
vm4.invoke(() -> WANTestBase.addSenderThroughAttributesMutator(regionName, "ln"));
vm5.invoke(() -> WANTestBase.addSenderThroughAttributesMutator(regionName, "ln"));
verifyNoGatewaySenderIdWarning(regionName);
vm4.invoke(() -> WANTestBase.doPuts(regionName, 10));
verifyGatewaySenderIdWarning(regionName);
// now add the sender on the other vms so they will be consistent
vm6.invoke(() -> WANTestBase.addSenderThroughAttributesMutator(regionName, "ln"));
vm7.invoke(() -> WANTestBase.addSenderThroughAttributesMutator(regionName, "ln"));
vm4.invoke(() -> WANTestBase.doPuts(regionName, 10));
verifyNoGatewaySenderIdWarning(regionName);
}
private void verifyGatewaySenderIdWarning(String regionName) {
verifyIdConsistencyWarningOnVms(regionName, true, true);
}
private void verifyNoAsyncEventQueueIdWarning(String regionName) {
verifyIdConsistencyWarningOnVms(regionName, false, false);
}
private void verifyAsyncEventQueueIdWarning(String regionName) {
verifyIdConsistencyWarningOnVms(regionName, true, false);
}
private void verifyNoGatewaySenderIdWarning(String regionName) {
verifyIdConsistencyWarningOnVms(regionName, false, true);
}
private void verifyIdConsistencyWarningOnVms(String regionName, boolean expected,
boolean gatewaySenderId) {
for (VM vm : Arrays.asList(vm4, vm5, vm6, vm7)) {
vm.invoke(() -> verifyIdConsistencyWarning(regionName, expected, gatewaySenderId));
}
}
private void verifyIdConsistencyWarning(String regionName, boolean expected,
boolean gatewaySenderId) {
Region r = cache.getRegion(Region.SEPARATOR + regionName);
SenderIdMonitor senderIdMonitor = getSenderIdMonitor(r);
if (gatewaySenderId) {
assertThat(senderIdMonitor.getGatewaySenderIdsDifferWarningMessage()).isEqualTo(expected);
} else {
assertThat(senderIdMonitor.getAsyncQueueIdsDifferWarningMessage()).isEqualTo(expected);
}
}
private SenderIdMonitor getSenderIdMonitor(Region r) {
if (r instanceof DistributedRegion) {
return ((DistributedRegion) r).getSenderIdMonitor();
} else if (r instanceof PartitionedRegion) {
return ((PartitionedRegion) r).getSenderIdMonitor();
} else {
throw new IllegalStateException(
"expected region to be distributed or partitioned but it was: " + r.getClass());
}
}
@Test
public void test_RR_SerialAsyncEventQueue_warnAboutAsyncEventQueueIdsConsistency()
throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm5.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm6.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm7.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm4.invoke(createReceiverReplicatedRegion());
vm5.invoke(createReceiverReplicatedRegion());
vm6.invoke(createReceiverReplicatedRegion());
vm7.invoke(createReceiverReplicatedRegion());
String regionName = getTestMethodName() + "_RR";
vm4.invoke(() -> WANTestBase.addAsyncEventQueueThroughAttributesMutator(regionName, "ln"));
vm5.invoke(() -> WANTestBase.addAsyncEventQueueThroughAttributesMutator(regionName, "ln"));
verifyNoAsyncEventQueueIdWarning(regionName);
vm4.invoke(() -> WANTestBase.doPuts(regionName, 1000));
verifyAsyncEventQueueIdWarning(regionName);
vm6.invoke(() -> WANTestBase.addAsyncEventQueueThroughAttributesMutator(regionName, "ln"));
vm7.invoke(() -> WANTestBase.addAsyncEventQueueThroughAttributesMutator(regionName, "ln"));
vm4.invoke(() -> WANTestBase.doPuts(regionName, 1000));
verifyNoAsyncEventQueueIdWarning(regionName);
}
protected SerializableRunnableIF createReceiverReplicatedRegion() {
return () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap());
}
@Test
public void testBug50434_RR_Serial_Pass() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
vm2.invoke(createReceiverReplicatedRegion());
vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
vm4.invoke(() -> WANTestBase.startSender("ln"));
vm4.invoke(createReceiverReplicatedRegion());
vm5.invoke(createReceiverReplicatedRegion());
vm6.invoke(createReceiverReplicatedRegion());
vm7.invoke(createReceiverReplicatedRegion());
vm4.invoke(
() -> WANTestBase.addSenderThroughAttributesMutator(getTestMethodName() + "_RR", "ln"));
vm5.invoke(
() -> WANTestBase.addSenderThroughAttributesMutator(getTestMethodName() + "_RR", "ln"));
vm6.invoke(
() -> WANTestBase.addSenderThroughAttributesMutator(getTestMethodName() + "_RR", "ln"));
vm7.invoke(
() -> WANTestBase.addSenderThroughAttributesMutator(getTestMethodName() + "_RR", "ln"));
vm4.invoke(() -> WANTestBase.waitForSenderRunningState("ln"));
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 10));
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 10));
}
@Test
public void testBug50434_RR_SerialAsyncEventQueue_Pass() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm5.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm6.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm7.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm4.invoke(createReceiverReplicatedRegion());
vm5.invoke(createReceiverReplicatedRegion());
vm6.invoke(createReceiverReplicatedRegion());
vm7.invoke(createReceiverReplicatedRegion());
vm4.invoke(() -> WANTestBase
.addAsyncEventQueueThroughAttributesMutator(getTestMethodName() + "_RR", "ln"));
vm5.invoke(() -> WANTestBase
.addAsyncEventQueueThroughAttributesMutator(getTestMethodName() + "_RR", "ln"));
vm6.invoke(() -> WANTestBase
.addAsyncEventQueueThroughAttributesMutator(getTestMethodName() + "_RR", "ln"));
vm7.invoke(() -> WANTestBase
.addAsyncEventQueueThroughAttributesMutator(getTestMethodName() + "_RR", "ln"));
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
vm4.invoke(() -> WANTestBase.validateAsyncEventListener("ln", 1000));// primary sender
vm5.invoke(() -> WANTestBase.validateAsyncEventListener("ln", 0));// secondary
vm6.invoke(() -> WANTestBase.validateAsyncEventListener("ln", 0));// secondary
vm7.invoke(() -> WANTestBase.validateAsyncEventListener("ln", 0));// secondary
}
@Test
public void test_PR_Serial_warnAboutGatewaySenderIdsConsistency() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_RR", null, 1, 100,
isOffHeap()));
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
String regionName = getTestMethodName() + "_PR";
vm4.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 100, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 100, isOffHeap()));
vm4.invoke(() -> WANTestBase.addSenderThroughAttributesMutator(regionName, "ln"));
vm5.invoke(() -> WANTestBase.addSenderThroughAttributesMutator(regionName, "ln"));
vm4.invoke(() -> WANTestBase.waitForSenderRunningState("ln"));
verifyNoGatewaySenderIdWarning(regionName);
vm4.invoke(() -> WANTestBase.doPuts(regionName, 10));
verifyGatewaySenderIdWarning(regionName);
// now add the sender on the other vms so they will be consistent
vm6.invoke(() -> WANTestBase.addSenderThroughAttributesMutator(regionName, "ln"));
vm7.invoke(() -> WANTestBase.addSenderThroughAttributesMutator(regionName, "ln"));
vm4.invoke(() -> WANTestBase.doPuts(regionName, 10));
verifyNoGatewaySenderIdWarning(regionName);
}
@Test
public void test_PR_SerialAsyncEventQueue_warnAboutAsyncEventQueueIdsConsistency()
throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm5.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm6.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm7.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
String regionName = getTestMethodName() + "_PR";
vm4.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 100, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 100, isOffHeap()));
vm4.invoke(() -> WANTestBase.addAsyncEventQueueThroughAttributesMutator(regionName, "ln"));
vm5.invoke(() -> WANTestBase.addAsyncEventQueueThroughAttributesMutator(regionName, "ln"));
verifyNoAsyncEventQueueIdWarning(regionName);
vm4.invoke(() -> WANTestBase.doPuts(regionName, 1000));
verifyAsyncEventQueueIdWarning(regionName);
vm6.invoke(() -> WANTestBase.addAsyncEventQueueThroughAttributesMutator(regionName, "ln"));
vm7.invoke(() -> WANTestBase.addAsyncEventQueueThroughAttributesMutator(regionName, "ln"));
vm4.invoke(() -> WANTestBase.doPuts(regionName, 1000));
verifyNoAsyncEventQueueIdWarning(regionName);
}
@Test
public void testBug50434_PR_Serial_Pass() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
vm6.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
vm7.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
vm4.invoke(
() -> WANTestBase.addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm5.invoke(
() -> WANTestBase.addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm6.invoke(
() -> WANTestBase.addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm7.invoke(
() -> WANTestBase.addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm4.invoke(() -> WANTestBase.waitForSenderRunningState("ln"));
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 10));
vm4.invoke(() -> WANTestBase.validateQueueContents("ln", 0));
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10));
}
@Test
public void testBug50434_PR_SerialAsyncEventQueue_Pass() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm5.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm6.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm7.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", false, 100, 100, false, false, null, false));
vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
vm6.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
vm7.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
vm4.invoke(() -> WANTestBase
.addAsyncEventQueueThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm5.invoke(() -> WANTestBase
.addAsyncEventQueueThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm6.invoke(() -> WANTestBase
.addAsyncEventQueueThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm7.invoke(() -> WANTestBase
.addAsyncEventQueueThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000));
vm4.invoke(() -> WANTestBase.validateAsyncEventListener("ln", 1000));// primary sender
vm5.invoke(() -> WANTestBase.validateAsyncEventListener("ln", 0));// secondary
vm6.invoke(() -> WANTestBase.validateAsyncEventListener("ln", 0));// secondary
vm7.invoke(() -> WANTestBase.validateAsyncEventListener("ln", 0));// secondary
}
@Test
public void test_PR_Parallel_warnAboutGatewaySenderIdsConsistency() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 10,
isOffHeap()));
vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
startSenderInVMs("ln", vm4, vm5);
String regionName = getTestMethodName() + "_PR";
vm4.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 10, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 10, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 10, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 10, isOffHeap()));
vm4.invoke(() -> WANTestBase.addSenderThroughAttributesMutator(regionName, "ln"));
vm5.invoke(() -> WANTestBase.addSenderThroughAttributesMutator(regionName, "ln"));
vm4.invoke(() -> WANTestBase.waitForSenderRunningState("ln"));
verifyNoGatewaySenderIdWarning(regionName);
vm4.invoke(() -> WANTestBase.doPuts(regionName, 10));
verifyGatewaySenderIdWarning(regionName);
// now add the sender on the other vms so they will be consistent
vm6.invoke(() -> WANTestBase.addSenderThroughAttributesMutator(regionName, "ln"));
vm7.invoke(() -> WANTestBase.addSenderThroughAttributesMutator(regionName, "ln"));
vm4.invoke(() -> WANTestBase.doPuts(regionName, 10));
verifyNoGatewaySenderIdWarning(regionName);
}
@Test
public void test_PR_ParallelAsyncEventQueue_warnAboutAsyncEventQueueIdsConsistency()
throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", true, 100, 100, false, false, null, false));
vm5.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", true, 100, 100, false, false, null, false));
vm6.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", true, 100, 100, false, false, null, false));
vm7.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", true, 100, 100, false, false, null, false));
String regionName = getTestMethodName() + "_PR";
vm4.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 10, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 10, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 10, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 3, 10, isOffHeap()));
vm4.invoke(() -> WANTestBase.addAsyncEventQueueThroughAttributesMutator(regionName, "ln"));
vm5.invoke(() -> WANTestBase.addAsyncEventQueueThroughAttributesMutator(regionName, "ln"));
verifyNoAsyncEventQueueIdWarning(regionName);
vm4.invoke(() -> WANTestBase.doPuts(regionName, 10));
verifyAsyncEventQueueIdWarning(regionName);
}
@Test
public void whenSendersAreAddedUsingAttributesMutatorThenEventsMustBeSuccessfullyReceviedByRemoteSite()
throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 10,
isOffHeap()));
vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 10,
isOffHeap()));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 10,
isOffHeap()));
vm6.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 10,
isOffHeap()));
vm7.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 10,
isOffHeap()));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm4.invoke(
() -> WANTestBase.addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm5.invoke(
() -> WANTestBase.addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm6.invoke(
() -> WANTestBase.addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm7.invoke(
() -> WANTestBase.addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 10));
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10));
}
@Test
public void testBug50434_PR_ParallelAsyncEventQueue_Pass() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", true, 100, 100, false, false, null, false));
vm5.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", true, 100, 100, false, false, null, false));
vm6.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", true, 100, 100, false, false, null, false));
vm7.invoke(
() -> WANTestBase.createAsyncEventQueue("ln", true, 100, 100, false, false, null, false));
vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 10,
isOffHeap()));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 10,
isOffHeap()));
vm6.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 10,
isOffHeap()));
vm7.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 10,
isOffHeap()));
vm4.invoke(() -> WANTestBase
.addAsyncEventQueueThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm5.invoke(() -> WANTestBase
.addAsyncEventQueueThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm6.invoke(() -> WANTestBase
.addAsyncEventQueueThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm7.invoke(() -> WANTestBase
.addAsyncEventQueueThroughAttributesMutator(getTestMethodName() + "_PR", "ln"));
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 256));
vm4.invoke(() -> WANTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm5.invoke(() -> WANTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm6.invoke(() -> WANTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm7.invoke(() -> WANTestBase.waitForAsyncQueueToGetEmpty("ln"));
int vm4size = (Integer) vm4.invoke(() -> WANTestBase.getAsyncEventListenerMapSize("ln"));
int vm5size = (Integer) vm5.invoke(() -> WANTestBase.getAsyncEventListenerMapSize("ln"));
int vm6size = (Integer) vm6.invoke(() -> WANTestBase.getAsyncEventListenerMapSize("ln"));
int vm7size = (Integer) vm7.invoke(() -> WANTestBase.getAsyncEventListenerMapSize("ln"));
assertEquals(vm4size + vm5size + vm6size + vm7size, 256);
}
@Test
public void testBug51367_WrongBindAddressOnGatewayReceiver() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
vm2.invoke(() -> WANTestBase.createReceiverWithBindAddress(lnPort));
}
@Test
public void testBug50247_NonPersistentSenderWithPersistentRegion() {
IgnoredException.addIgnoredException("could not get remote locator information");
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
createCacheInVMs(lnPort, vm4, vm5);
vm4.invoke(() -> WANTestBase.createSender("ln1", 2, true, 10, 100, false, false, null, false));
assertThatThrownBy(() -> vm4.invoke(() -> WANTestBase
.createPartitionedRegionWithPersistence(getTestMethodName() + "_PR", "ln1", 1, 100)))
.withFailMessage(
"Expected GatewaySenderException with incompatible gateway sender ids and region")
.hasRootCauseInstanceOf(GatewaySenderException.class)
.hasStackTraceContaining("can not be attached to persistent region ");
vm5.invoke(() -> WANTestBase.createPartitionedRegionWithPersistence(getTestMethodName() + "_PR",
"ln1", 1, 100));
assertThatThrownBy(() -> vm5
.invoke(() -> WANTestBase.createSender("ln1", 2, true, 10, 100, false, false, null, false)))
.withFailMessage(
"Expected GatewaySenderException with incompatible gateway sender ids and region")
.hasRootCauseInstanceOf(GatewaySenderException.class)
.hasStackTraceContaining("can not be attached to persistent region ");
}
/**
* Test configuration::
*
* Region: Replicated WAN: Serial Number of WAN sites: 2 Region persistence enabled: false Async
* channel persistence enabled: false
*/
@Test
public void testReplicatedSerialAsyncEventQueueWith2WANSites() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// ------------ START - CREATE CACHE, REGION ON LOCAL SITE ------------//
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
vm4.invoke(() -> WANTestBase.createAsyncEventQueue("lnAsync", false, 100, 100, false, false,
null, false));
vm5.invoke(() -> WANTestBase.createAsyncEventQueue("lnAsync", false, 100, 100, false, false,
null, false));
vm6.invoke(() -> WANTestBase.createAsyncEventQueue("lnAsync", false, 100, 100, false, false,
null, false));
vm7.invoke(() -> WANTestBase.createAsyncEventQueue("lnAsync", false, 100, 100, false, false,
null, false));
startSenderInVMs("ln", vm4, vm5);
vm4.invoke(() -> WANTestBase.createReplicatedRegionWithSenderAndAsyncEventQueue(
getTestMethodName() + "_RR", "ln", "lnAsync", isOffHeap()));
vm5.invoke(() -> WANTestBase.createReplicatedRegionWithSenderAndAsyncEventQueue(
getTestMethodName() + "_RR", "ln", "lnAsync", isOffHeap()));
vm6.invoke(() -> WANTestBase.createReplicatedRegionWithSenderAndAsyncEventQueue(
getTestMethodName() + "_RR", "ln", "lnAsync", isOffHeap()));
vm7.invoke(() -> WANTestBase.createReplicatedRegionWithSenderAndAsyncEventQueue(
getTestMethodName() + "_RR", "ln", "lnAsync", isOffHeap()));
// ------------- END - CREATE CACHE, REGION ON LOCAL SITE -------------//
// ------------- START - CREATE CACHE ON REMOTE SITE ---------------//
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true));
vm3.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true));
vm2.invoke(() -> WANTestBase.createAsyncEventQueue("nyAsync", false, 100, 100, false, false,
null, false));
vm3.invoke(() -> WANTestBase.createAsyncEventQueue("nyAsync", false, 100, 100, false, false,
null, false));
startSenderInVMs("ny", vm2, vm3);
vm2.invoke(() -> WANTestBase.createReplicatedRegionWithSenderAndAsyncEventQueue(
getTestMethodName() + "_RR", "ny", "nyAsync", isOffHeap()));
vm3.invoke(() -> WANTestBase.createReplicatedRegionWithSenderAndAsyncEventQueue(
getTestMethodName() + "_RR", "ny", "nyAsync", isOffHeap()));
// ------------- END - CREATE CACHE, REGION ON REMOTE SITE -------------//
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
// validate AsyncEventListener on local site
vm4.invoke(() -> WANTestBase.validateAsyncEventListener("lnAsync", 1000));// primary sender
vm5.invoke(() -> WANTestBase.validateAsyncEventListener("lnAsync", 0));// secondary
vm6.invoke(() -> WANTestBase.validateAsyncEventListener("lnAsync", 0));// secondary
vm7.invoke(() -> WANTestBase.validateAsyncEventListener("lnAsync", 0));// secondary
// validate region size on remote site
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000));
// validate AsyncEventListener on remote site
vm2.invoke(() -> WANTestBase.validateAsyncEventListener("nyAsync", 1000));// primary sender
vm3.invoke(() -> WANTestBase.validateAsyncEventListener("nyAsync", 0));// secondary
}
}