package org.apache.geode.internal.cache.wan;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Set;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.DiskRegionStats;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.test.junit.categories.WanTest;
public class AsyncEventQueueOverflowMBeanAttributesDistributedTest extends AsyncEventQueueTestBase {
@Parameters({"true", "false"})
public void testParallelAsyncEventQueueOverflowMBeanAttributes(boolean createSenderFirst)
throws Exception {
// Start locator
Integer locatorPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
// Create the cache
vm1.invoke(() -> createCache(locatorPort));
String aeqId = "db";
if (createSenderFirst) {
// Create an async event queue then a region (normal xml order)
// Create a async event queue
vm1.invoke(() -> createAsyncEventQueue(aeqId, true, 1, 10, false, false, null, true,
new WaitingAsyncEventListener()));
// Create a partitioned region attached to the async event queue
vm1.invoke(() -> createPartitionedRegionWithAsyncEventQueue(getTestMethodName(), aeqId,
} else {
// Create a region then an async event queue
// Create a partitioned region attached to the async event queue
vm1.invoke(() -> createPartitionedRegionWithAsyncEventQueue(getTestMethodName(), aeqId,
// Create a async event queue
vm1.invoke(() -> createAsyncEventQueue(aeqId, true, 1, 10, false, false, null, true,
new WaitingAsyncEventListener()));
// Do some puts to cause overflow
int numPuts = 10;
vm1.invoke(() -> doHeavyPuts(getTestMethodName(), numPuts));
// Compare overflow stats to mbean attributes
vm1.invoke(() -> compareParallelOverflowStatsToMBeanAttributes(aeqId));
// Resume the listener
vm1.invoke(() -> startProcessingAsyncEvents(aeqId));
// Wait for queue to drain
vm1.invoke(() -> waitForAsyncQueueToGetEmpty(aeqId));
// Compare overflow stats to mbean attributes
vm1.invoke(() -> compareParallelOverflowStatsToMBeanAttributes(aeqId));
@Parameters({"true", "false"})
public void testSerialAsyncEventQueueOverflowMBeanAttributes(boolean createSenderFirst)
throws Exception {
// Start locator
Integer locatorPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
// Create the cache
vm1.invoke(() -> createCache(locatorPort));
String aeqId = "db";
if (createSenderFirst) {
// Create an async event queue then a region (normal xml order)
// Create a async event queue
vm1.invoke(() -> createAsyncEventQueue(aeqId, false, 10, 10, false, false, null, true, 5,
new WaitingAsyncEventListener()));
// Create a partitioned region attached to the async event queue
vm1.invoke(() -> createPartitionedRegionWithAsyncEventQueue(getTestMethodName(), aeqId,
} else {
// Create a region then an async event queue
// Create a partitioned region attached to the async event queue
vm1.invoke(() -> createPartitionedRegionWithAsyncEventQueue(getTestMethodName(), aeqId,
// Create a async event queue
vm1.invoke(() -> createAsyncEventQueue(aeqId, false, 1, 10, false, false, null, true,
new WaitingAsyncEventListener()));
// Do some puts to cause overflow
int numPuts = 15;
vm1.invoke(() -> doHeavyPuts(getTestMethodName(), numPuts));
// Compare overflow stats to mbean attributes
vm1.invoke(() -> compareSerialOverflowStatsToMBeanAttributes(aeqId));
// Resume the listener
vm1.invoke(() -> startProcessingAsyncEvents(aeqId));
// Wait for queue to drain
vm1.invoke(() -> waitForAsyncQueueToGetEmpty(aeqId));
// Compare overflow stats to mbean attributes
vm1.invoke(() -> compareSerialOverflowStatsToMBeanAttributes(aeqId));
private void compareParallelOverflowStatsToMBeanAttributes(String aeqId) throws Exception {
// Get disk region stats associated with the queue region
PartitionedRegion region = (PartitionedRegion) cache.getRegion(
AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + aeqId + ParallelGatewaySenderQueue.QSTRING);
DiskRegionStats drs = region.getDiskRegionStats();
// Get async event queue mbean
ManagementService service = ManagementService.getManagementService(cache);
AsyncEventQueueMXBean bean = service.getLocalAsyncEventQueueMXBean(aeqId);
// Wait for the sampler to take a few samples
// Verify the bean attributes match the stat values
await().untilAsserted(() -> {
private void compareSerialOverflowStatsToMBeanAttributes(String aeqId) throws Exception {
// Get the async event queue
AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId);
// Get the sender
AbstractGatewaySender sender = (AbstractGatewaySender) aeq.getSender();
// Get the sender's queue regions
Set<RegionQueue> queues = sender.getQueues();
// Get async event queue mbean
ManagementService service = ManagementService.getManagementService(cache);
AsyncEventQueueMXBean bean = service.getLocalAsyncEventQueueMXBean(aeqId);
// Wait for the sampler to take a few samples
// Verify the bean attributes match the stat values
await().untilAsserted(() -> {
// Calculate the total entries and bytes overflowed to disk
int entriesOverflowedToDisk = 0;
long bytesOverflowedToDisk = 0l;
for (RegionQueue queue : queues) {
LocalRegion lr = (LocalRegion) queue.getRegion();
DiskRegionStats drs = lr.getDiskRegion().getStats();
entriesOverflowedToDisk += drs.getNumOverflowOnDisk();
bytesOverflowedToDisk += drs.getNumOverflowBytesOnDisk();
// Verify the bean attributes match the stat values
private void waitForSamplerToSample(int numTimesToSample) throws Exception {
InternalDistributedSystem ids = (InternalDistributedSystem) cache.getDistributedSystem();
for (int i = 0; i < numTimesToSample; i++) {
private void startProcessingAsyncEvents(String aeqId) {
// Get the async event listener
WaitingAsyncEventListener listener = getWaitingAsyncEventListener(aeqId);
// Start processing waiting events
private WaitingAsyncEventListener getWaitingAsyncEventListener(String aeqId) {
// Get the async event queue
AsyncEventQueue aeq = cache.getAsyncEventQueue(aeqId);
// Get and return the async event listener
AsyncEventListener aeqListener = aeq.getAsyncEventListener();
return (WaitingAsyncEventListener) aeqListener;