blob: 853da874784e7e12835614d8bffd2d7839579061 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.asyncqueue;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.ExpirationAction;
import org.apache.geode.cache.ExpirationAttributes;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.test.junit.categories.AEQTest;
@Category({AEQTest.class})
public class AsyncEventQueueEvictionAndExpirationJUnitTest {
private AsyncEventQueue aeq;
private Cache cache;
private Region region;
String aeqId;
List<AsyncEvent> events = new ArrayList<AsyncEvent>();
@Rule
public TestName name = new TestName();
@Before
public void setup() {
events.clear();
try {
cache = CacheFactory.getAnyInstance();
} catch (Exception e) {
// ignore
}
if (null == cache) {
cache = (GemFireCacheImpl) new CacheFactory().set(MCAST_PORT, "0").create();
}
aeqId = name.getMethodName();
}
@After
public void tearDown() {
if (cache != null && !cache.isClosed()) {
cache.close();
cache = null;
}
}
@Test
public void isForwardExpirationDestroyAttributeFalseByDefault() {
AsyncEventListener al = mock(AsyncEventListener.class);
aeq = cache.createAsyncEventQueueFactory().create("aeq", al);
// Test for default value of isIgnoreEvictionAndExpiration setting.
assertFalse(aeq.isForwardExpirationDestroy());
}
@Test
public void canSetTrueForForwardExpirationDestroy() {
AsyncEventListener al = mock(AsyncEventListener.class);
aeq = cache.createAsyncEventQueueFactory().setForwardExpirationDestroy(true).create("aeq", al);
// Test for default value of isIgnoreEvictionAndExpiration setting.
assertTrue(aeq.isForwardExpirationDestroy());
}
@Test
public void evictionDestroyOpEventsNotPropogatedByDefault() {
// For Replicated Region with eviction-destroy op.
// Number of expected events 2. Two for create and none for eviction destroy.
createRegionAeqAndPopulate(false /* isPR */, false /* forwardExpirationDestroy */,
true /* eviction */, false /* evictionOverflow */, false /* expirationDestroy */,
false /* expirationInvalidate */);
// Wait for region to evict/expire events.
await()
.untilAsserted(() -> assertEquals(1, region.size()));
// Validate events that are not queued.
// This guarantees that eviction/expiration is performed and events are
// sent all the way to Gateway.
await()
.untilAsserted(() -> assertEquals(1, getEventsNotQueuedSize(aeqId)));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(2, events.size()));
}
@Test
public void evictionDestroyOpEventsNotPropogatedByDefaultForPR() {
// For PR with eviction-destroy op.
// Number of expected events 2. Two for create and none for eviction destroy.
createRegionAeqAndPopulate(true /* isPR */, false /* forwardExpirationDestroy */,
true /* eviction */, false /* evictionOverflow */, false /* expirationDestroy */,
false /* expirationInvalidate */);
// Wait for region to evict/expire events.
await()
.untilAsserted(() -> assertEquals(1, region.size()));
// Validate events that are not queued.
// This guarantees that eviction/expiration is performed and events are
// sent all the way to Gateway.
// In case of eviction one event is evicted that should not be queued.
await()
.untilAsserted(() -> assertEquals(1, getEventsNotQueuedSize(aeqId)));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(2, events.size()));
}
@Test
public void expirationDestroyOpEventsNotPropogatedByDefault() {
// For Replicated Region with expiration-destroy op.
// Number of expected events 2. Two for create and none for eviction destroy.
createRegionAeqAndPopulate(false /* isPR */, false /* forwardExpirationDestroy */,
false /* eviction */, false /* evictionOverflow */, true /* expirationDestroy */,
false /* expirationInvalidate */);
// Wait for region to evict/expire events.
await()
.untilAsserted(() -> assertEquals(0, region.size()));
// Validate events that are not queued.
// This guarantees that eviction/expiration is performed and events are
// sent all the way to Gateway.
await()
.untilAsserted(() -> assertEquals(2, getEventsNotQueuedSize(aeqId)));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(2, events.size()));
}
@Test
public void expirationDestroyOpEventsNotPropogatedByDefaultForPR() {
// For PR with expiration-destroy op.
// Number of expected events 2. Two for create and none for eviction destroy.
createRegionAeqAndPopulate(true /* isPR */, false /* forwardExpirationDestroy */,
false /* eviction */, false /* evictionOverflow */, true /* expirationDestroy */,
false /* expirationInvalidate */);
// Wait for region to evict/expire events.
await()
.untilAsserted(() -> assertEquals(0, region.size()));
// Validate events that are not queued.
// This guarantees that eviction/expiration is performed and events are
// sent all the way to Gateway.
await()
.untilAsserted(() -> assertEquals(2, getEventsNotQueuedSize(aeqId)));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(2, events.size()));
}
@Test
public void expirationInvalidOpEventsNotPropogatedByDefault() {
// For Replicated Region with expiration-invalid op.
// Number of expected events 2. Two for create and none for eviction destroy.
createRegionAeqAndPopulate(false /* isPR */, false /* forwardExpirationDestroy */,
false /* eviction */, false /* evictionOverflow */, false /* expirationDestroy */,
true /* expirationInvalidate */);
LocalRegion lr = (LocalRegion) region;
// Wait for region to evict/expire events.
await()
.untilAsserted(() -> assertEquals(2L, lr.getCachePerfStats().getInvalidates()));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(2, events.size()));
}
@Test
public void expirationInvalidOpEventsNotPropogatedByDefaultForPR() {
// For Replicated Region with expiration-invalid op.
// Number of expected events 2. Two for create and none for eviction destroy.
createRegionAeqAndPopulate(true /* isPR */, false /* forwardExpirationDestroy */,
false /* eviction */, false /* evictionOverflow */, false /* expirationDestroy */,
true /* expirationInvalidate */);
LocalRegion lr = (LocalRegion) region;
// Wait for region to evict/expire events.
await()
.untilAsserted(() -> assertEquals(2L, lr.getCachePerfStats().getInvalidates()));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(2, events.size()));
}
@Test
public void evictionNotPropogatedUsingForwardExpirationDestroyAttribute() {
// For Replicated Region with eviction-destroy op.
// Number of expected events 2. Two for create and none for eviction destroy.
createRegionAeqAndPopulate(false /* isPR */, true /* forwardExpirationDestroy */,
true /* eviction */, false /* evictionOverflow */, false /* expirationDestroy */,
false /* expirationInvalidate */);
// Wait for region to evict/expire events.
await()
.untilAsserted(() -> assertEquals(1, region.size()));
// Validate events that are not queued.
// This guarantees that eviction/expiration is performed and events are
// sent all the way to Gateway.
await()
.untilAsserted(() -> assertEquals(1, getEventsNotQueuedSize(aeqId)));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(2, events.size()));
}
@Test
public void evictionNotPropogatedUsingForwardExpirationDestroyAttributeForPR() {
// For PR with eviction-destroy op.
// Number of expected events 3. Two for create and none for eviction destroy.
createRegionAeqAndPopulate(true /* isPR */, true /* forwardExpirationDestroy */,
true /* eviction */, false /* evictionOverflow */, false /* expirationDestroy */,
false /* expirationInvalidate */);
// Wait for region to evict/expire events.
await()
.untilAsserted(() -> assertEquals(1, region.size()));
// Validate events that are not queued.
// This guarantees that eviction/expiration is performed and events are
// sent all the way to Gateway.
await()
.untilAsserted(() -> assertEquals(1, getEventsNotQueuedSize(aeqId)));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(2, events.size()));
}
@Test
public void overflowNotPropogatedUsingForwardExpirationDestroyAttribute() {
// For Replicated Region with eviction-overflow op.
// Number of expected events 2. Two for create and non for eviction overflow.
createRegionAeqAndPopulate(false /* isPR */, true /* forwardExpirationDestroy */,
false /* eviction */, true /* evictionOverflow */, false /* expirationDestroy */,
false /* expirationInvalidate */);
// Wait for region to evict/expire events.
LocalRegion lr = (LocalRegion) region;
await()
.untilAsserted(() -> assertEquals(1, lr.getDiskRegion().getStats().getNumOverflowOnDisk()));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(2, events.size()));
}
@Test
public void overflowNotPropogatedUsingForwardExpirationDestroyAttributeForPR() {
// For PR with eviction-overflow op.
// Number of expected events 2. Two for create and non for eviction overflow.
createRegionAeqAndPopulate(true /* isPR */, true /* forwardExpirationDestroy */,
false /* eviction */, true /* evictionOverflow */, false /* expirationDestroy */,
false /* expirationInvalidate */);
// Wait for region to evict/expire events.
PartitionedRegion pr = (PartitionedRegion) region;
await()
.untilAsserted(() -> assertEquals(1, pr.getDiskRegionStats().getNumOverflowOnDisk()));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(2, events.size()));
}
@Test
public void expirationDestroyPropogatedUsingForwardExpirationDestroyAttribute() {
// For Replicated Region with expiration-destroy op.
// Number of expected events 4. Two for create and Two for expiration destroy.
createRegionAeqAndPopulate(false /* isPR */, true /* forwardExpirationDestroy */,
false /* eviction */, false /* evictionOverflow */, true /* expirationDestroy */,
false /* expirationInvalidate */);
// Wait for region to evict/expire events.
await()
.untilAsserted(() -> assertEquals(0, region.size()));
await()
.untilAsserted(() -> assertEquals(4, getEventsReceived(aeqId)));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(4, events.size()));
assertTrue("Expiration event not arrived", checkForOperation(events, false, true));
}
@Test
public void expirationDestroyPropogatedUsingForwardExpirationDestroyAttributeForPR() {
// For PR with expiration-destroy op.
// Number of expected events 4. Two for create and Two for expiration destroy.
createRegionAeqAndPopulate(true /* isPR */, true /* forwardExpirationDestroy */,
false /* eviction */, false /* evictionOverflow */, true /* expirationDestroy */,
false /* expirationInvalidate */);
// Wait for region to evict/expire events.
await()
.untilAsserted(() -> assertEquals(0, region.size()));
await()
.untilAsserted(() -> assertEquals(4, getEventsReceived(aeqId)));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(4, events.size()));
assertTrue("Expiration event not arrived", checkForOperation(events, false, true));
}
@Test
public void expirationInvalidateNotPropogatedUsingForwardExpirationDestroyAttribute() {
// For Replicated Region with expiration-invalidate op.
// Currently invalidate event callbacks are not made to GateWay sender.
// Invalidates are not sent to AEQ.
// Number of expected events 2. None for expiration invalidate.
createRegionAeqAndPopulate(false /* isPR */, true /* forwardExpirationDestroy */,
false /* eviction */, false /* evictionOverflow */, false /* expirationDestroy */,
true /* expirationInvalidate */);
// Wait for region to evict/expire events.
LocalRegion lr = (LocalRegion) region;
await()
.untilAsserted(() -> assertEquals(2L, lr.getCachePerfStats().getInvalidates()));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(2, events.size()));
}
@Test
public void expirationInvalidateNotPropogatedUsingForwardExpirationDestroyAttributeForPR() {
// For PR with expiration-invalidate op.
// Currently invalidate event callbacks are not made to GateWay sender.
// Invalidates are not sent to AEQ.
// Number of expected events 2. None for expiration invalidate.
createRegionAeqAndPopulate(true /* isPR */, true /* forwardExpirationDestroy */,
false /* eviction */, false /* evictionOverflow */, false /* expirationDestroy */,
true /* expirationInvalidate */);
// Wait for region to evict/expire events.
LocalRegion lr = (LocalRegion) region;
await()
.untilAsserted(() -> assertEquals(2L, lr.getCachePerfStats().getInvalidates()));
// The AQListner should get expected events.
await()
.untilAsserted(() -> assertEquals(2, events.size()));
}
private void createRegionAeqAndPopulate(boolean isPR, boolean forwardExpirationDestroy,
boolean eviction, boolean evictionOverflow, boolean expirationDestroy,
boolean expirationInvalidate) {
// String aeqId = "AEQTest";
String aeqId = name.getMethodName();
// Create AEQ
createAsyncEventQueue(aeqId, forwardExpirationDestroy, events);
region = createRegion("ReplicatedRegionForAEQ", isPR, aeqId, eviction, evictionOverflow,
expirationDestroy, expirationInvalidate);
// Populate region with two entries.
region.put("Key-1", "Value-1");
region.put("Key-2", "Value-2");
}
private void waitForAEQEventsNotQueued() {
await().until(() -> {
return (getEventsNotQueuedSize(aeqId) >= 1);
});
}
private boolean checkForOperation(List<AsyncEvent> events, boolean invalidate, boolean destroy) {
boolean found = false;
for (AsyncEvent e : events) {
if (invalidate && e.getOperation().isInvalidate()) {
found = true;
break;
}
if (destroy && e.getOperation().isDestroy()) {
found = true;
break;
}
}
return found;
}
public int getEventsNotQueuedSize(String aeqId) {
AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId);
AbstractGatewaySender sender = (AbstractGatewaySender) aeq.getSender();
return sender.getStatistics().getEventsNotQueued();
}
public int getEventsReceived(String aeqId) {
AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId);
AbstractGatewaySender sender = (AbstractGatewaySender) aeq.getSender();
return sender.getStatistics().getEventsReceived();
}
private void createAsyncEventQueue(String id, boolean forwardExpirationDestroy,
List<AsyncEvent> storeEvents) {
AsyncEventListener al = this.createAsyncListener(storeEvents);
aeq = cache.createAsyncEventQueueFactory().setParallel(false)
.setForwardExpirationDestroy(forwardExpirationDestroy).setBatchSize(1)
.setBatchTimeInterval(1).create(id, al);
}
private Region createRegion(String name, boolean isPR, String aeqId, boolean evictionDestroy,
boolean evictionOverflow, boolean expirationDestroy, boolean expirationInvalidate) {
RegionFactory rf;
if (isPR) {
rf = cache.createRegionFactory(RegionShortcut.PARTITION);
} else {
rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
}
// Set AsyncQueue.
rf.addAsyncEventQueueId(aeqId);
if (evictionDestroy) {
rf.setEvictionAttributes(
EvictionAttributes.createLIFOEntryAttributes(1, EvictionAction.LOCAL_DESTROY));
}
if (evictionOverflow) {
rf.setEvictionAttributes(
EvictionAttributes.createLIFOEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK));
}
if (expirationDestroy) {
rf.setEntryTimeToLive(new ExpirationAttributes(1, ExpirationAction.DESTROY));
}
if (expirationInvalidate) {
rf.setEntryTimeToLive(new ExpirationAttributes(1, ExpirationAction.INVALIDATE));
}
return rf.create(name);
}
private AsyncEventListener createAsyncListener(List<AsyncEvent> list) {
AsyncEventListener listener = new AsyncEventListener() {
private List<AsyncEvent> aeList = list;
@Override
public void close() {}
@Override
public boolean processEvents(List<AsyncEvent> arg0) {
try {
synchronized (aeList) {
aeList.add(arg0.get(0));
}
} catch (Exception ex) {
}
return true;
}
};
return listener;
}
}