blob: e4d2cbb9e22845fb060c83d025cc5eb7fe613287 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan.concurrent;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.internal.cache.wan.AsyncEventQueueTestBase;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.junit.categories.AEQTest;
@Category({AEQTest.class})
public class ConcurrentAsyncEventQueueDUnitTest extends AsyncEventQueueTestBase {
private static final long serialVersionUID = 1L;
public ConcurrentAsyncEventQueueDUnitTest() {
super();
}
@Test
public void testConcurrentSerialAsyncEventQueueAttributes() {
Integer lnPort =
vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(1));
vm1.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm1.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", false, 100, 150,
true, true, "testDS", true, 5, OrderPolicy.THREAD));
vm1.invoke(() -> AsyncEventQueueTestBase.validateConcurrentAsyncEventQueueAttributes("ln", 100,
150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true, 5,
OrderPolicy.THREAD));
}
@Test
public void testConcurrentParallelAsyncEventQueueAttributesOrderPolicyKey() {
Integer lnPort =
vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(1));
vm1.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm1.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", true, 100, 150,
true, true, "testDS", true, 5, OrderPolicy.KEY));
vm1.invoke(() -> AsyncEventQueueTestBase.validateConcurrentAsyncEventQueueAttributes("ln", 100,
150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true, 5,
OrderPolicy.KEY));
}
@Test
public void testConcurrentParallelAsyncEventQueueAttributesOrderPolicyPartition() {
Integer lnPort =
vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(1));
vm1.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm1.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", true, 100, 150,
true, true, "testDS", true, 5, OrderPolicy.PARTITION));
vm1.invoke(() -> AsyncEventQueueTestBase.validateConcurrentAsyncEventQueueAttributes("ln", 100,
150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true, 5,
OrderPolicy.PARTITION));
}
/**
* Test configuration::
*
* Region: Replicated WAN: Serial Dispatcher threads: more than 1 Order policy: key based ordering
*/
@Test
public void testReplicatedSerialAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyKey() {
Integer lnPort =
vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(1));
vm1.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm2.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm3.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm4.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm1.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", false, 100, 10,
true, false, null, false, 3, OrderPolicy.KEY));
vm2.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", false, 100, 10,
true, false, null, false, 3, OrderPolicy.KEY));
vm3.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", false, 100, 10,
true, false, null, false, 3, OrderPolicy.KEY));
vm4.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", false, 100, 10,
true, false, null, false, 3, OrderPolicy.KEY));
vm1.invoke(() -> AsyncEventQueueTestBase
.createReplicatedRegionWithAsyncEventQueue(getTestMethodName() + "_RR", "ln", isOffHeap()));
vm2.invoke(() -> AsyncEventQueueTestBase
.createReplicatedRegionWithAsyncEventQueue(getTestMethodName() + "_RR", "ln", isOffHeap()));
vm3.invoke(() -> AsyncEventQueueTestBase
.createReplicatedRegionWithAsyncEventQueue(getTestMethodName() + "_RR", "ln", isOffHeap()));
vm4.invoke(() -> AsyncEventQueueTestBase
.createReplicatedRegionWithAsyncEventQueue(getTestMethodName() + "_RR", "ln", isOffHeap()));
vm1.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_RR", 100));
vm1.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm2.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm3.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 100));// primary
// sender
vm2.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0));// secondary
vm3.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0));// secondary
vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0));// secondary
}
/**
* Test configuration::
*
* Region: Replicated WAN: Serial Dispatcher threads: more than 1 Order policy: Thread ordering
*/
@Test
public void testReplicatedSerialAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyThread() {
Integer lnPort =
vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(1));
vm1.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm2.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm3.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm4.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm1.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", false, 100, 10,
true, false, null, false, 3, OrderPolicy.THREAD));
vm2.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", false, 100, 10,
true, false, null, false, 3, OrderPolicy.THREAD));
vm3.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", false, 100, 10,
true, false, null, false, 3, OrderPolicy.THREAD));
vm4.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", false, 100, 10,
true, false, null, false, 3, OrderPolicy.THREAD));
vm1.invoke(() -> AsyncEventQueueTestBase
.createReplicatedRegionWithAsyncEventQueue(getTestMethodName() + "_RR", "ln", isOffHeap()));
vm2.invoke(() -> AsyncEventQueueTestBase
.createReplicatedRegionWithAsyncEventQueue(getTestMethodName() + "_RR", "ln", isOffHeap()));
vm3.invoke(() -> AsyncEventQueueTestBase
.createReplicatedRegionWithAsyncEventQueue(getTestMethodName() + "_RR", "ln", isOffHeap()));
vm4.invoke(() -> AsyncEventQueueTestBase
.createReplicatedRegionWithAsyncEventQueue(getTestMethodName() + "_RR", "ln", isOffHeap()));
AsyncInvocation<Void> inv1 =
vm1.invokeAsync(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_RR", 50));
AsyncInvocation<Void> inv2 = vm1.invokeAsync(
() -> AsyncEventQueueTestBase.doNextPuts(getTestMethodName() + "_RR", 50, 100));
AsyncInvocation<Void> inv3 = vm1.invokeAsync(
() -> AsyncEventQueueTestBase.doNextPuts(getTestMethodName() + "_RR", 100, 150));
try {
inv1.join();
inv2.join();
inv3.join();
} catch (InterruptedException ie) {
Assert.fail("Cought interrupted exception while waiting for the task tgo complete.", ie);
}
vm1.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm2.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm3.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 150));// primary
// sender
vm2.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0));// secondary
vm3.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0));// secondary
vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0));// secondary
}
/**
* Test configuration::
*
* Region: PartitionedRegion WAN: Parallel Dispatcher threads: more than 1 Order policy: key based
* ordering
*/
// Disabling test for bug #48323
@Test
public void testPartitionedParallelAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyKey() {
Integer lnPort =
vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(1));
vm1.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm2.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm3.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm4.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm1.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", true, 100, 10,
true, false, null, false, 3, OrderPolicy.KEY));
vm2.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", true, 100, 10,
true, false, null, false, 3, OrderPolicy.KEY));
vm3.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", true, 100, 10,
true, false, null, false, 3, OrderPolicy.KEY));
vm4.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", true, 100, 10,
true, false, null, false, 3, OrderPolicy.KEY));
vm1.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue(
getTestMethodName() + "_PR", "ln", isOffHeap()));
vm2.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue(
getTestMethodName() + "_PR", "ln", isOffHeap()));
vm3.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue(
getTestMethodName() + "_PR", "ln", isOffHeap()));
vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue(
getTestMethodName() + "_PR", "ln", isOffHeap()));
vm1.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_PR", 100));
vm1.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm2.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm3.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
int vm1size =
vm1.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize("ln"));
int vm2size =
vm2.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize("ln"));
int vm3size =
vm3.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize("ln"));
int vm4size =
vm4.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize("ln"));
assertEquals(vm1size + vm2size + vm3size + vm4size, 100);
}
/**
* Test configuration::
*
* Region: PartitionedRegion WAN: Parallel Dispatcher threads: more than 1 Order policy: PARTITION
* based ordering
*/
// Disabled test for bug #48323
@Test
public void testPartitionedParallelAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyPartition() {
Integer lnPort =
vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(1));
vm1.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm2.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm3.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm4.invoke(() -> AsyncEventQueueTestBase.createCache(lnPort));
vm1.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", true, 100, 10,
true, false, null, false, 3, OrderPolicy.PARTITION));
vm2.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", true, 100, 10,
true, false, null, false, 3, OrderPolicy.PARTITION));
vm3.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", true, 100, 10,
true, false, null, false, 3, OrderPolicy.PARTITION));
vm4.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue("ln", true, 100, 10,
true, false, null, false, 3, OrderPolicy.PARTITION));
vm1.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue(
getTestMethodName() + "_PR", "ln", isOffHeap()));
vm2.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue(
getTestMethodName() + "_PR", "ln", isOffHeap()));
vm3.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue(
getTestMethodName() + "_PR", "ln", isOffHeap()));
vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue(
getTestMethodName() + "_PR", "ln", isOffHeap()));
vm1.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_PR", 100));
vm1.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm2.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm3.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
int vm1size =
vm1.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize("ln"));
int vm2size =
vm2.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize("ln"));
int vm3size =
vm3.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize("ln"));
int vm4size =
vm4.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize("ln"));
assertEquals(100, vm1size + vm2size + vm3size + vm4size);
}
}