blob: 46f390ac875badcf6e57c7f6f501be9b144e1b5a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan.serial;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.junit.Before;
import org.junit.Test;
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalRegionFactory;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.TXId;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.statistics.DummyStatisticsRegistry;
import org.apache.geode.metrics.internal.NoopMeterRegistry;
public class SerialGatewaySenderQueueJUnitTest {
private static final String QUEUE_REGION = "queueRegion";
private SerialGatewaySenderQueue.MetaRegionFactory metaRegionFactory;
private AbstractGatewaySender sender;
Region region;
InternalRegionFactory regionFactory;
@Before
public void setup() {
InternalDistributedSystem mockInternalDistributedSystem = mock(InternalDistributedSystem.class);
when(mockInternalDistributedSystem.getStatisticsManager())
.thenReturn(new DummyStatisticsRegistry("", 0));
GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
when(cache.getInternalDistributedSystem()).thenReturn(mockInternalDistributedSystem);
when(cache.getMeterRegistry()).thenReturn(new NoopMeterRegistry());
region = createLocalRegionMock();
regionFactory = mock(InternalRegionFactory.class, RETURNS_DEEP_STUBS);
when(regionFactory.setInternalMetaRegion(any())
.setDestroyLockFlag(anyBoolean())
.setSnapshotInputStream(any())
.setImageTarget(any())
.setIsUsedForSerialGatewaySenderQueue(anyBoolean())
.setInternalRegion(anyBoolean())
.setSerialGatewaySender(any())).thenReturn(regionFactory);
when(regionFactory.create(QUEUE_REGION)).thenReturn(region);
when(cache.createInternalRegionFactory(any())).thenReturn(regionFactory);
CancelCriterion cancelCriterion = mock(CancelCriterion.class);
when(cache.getCancelCriterion()).thenReturn(cancelCriterion);
sender = mock(AbstractGatewaySender.class);
when(sender.getCancelCriterion()).thenReturn(cancelCriterion);
when(sender.getCache()).thenReturn(cache);
when(sender.getMaximumQueueMemory()).thenReturn(100);
when(sender.getLifeCycleLock()).thenReturn(new ReentrantReadWriteLock());
when(sender.getId()).thenReturn("");
when(sender.getStatistics()).thenReturn(mock(GatewaySenderStats.class));
metaRegionFactory = mock(SerialGatewaySenderQueue.MetaRegionFactory.class);
SerialGatewaySenderQueue.SerialGatewaySenderQueueMetaRegion mockMetaRegion =
mock(SerialGatewaySenderQueue.SerialGatewaySenderQueueMetaRegion.class);
when(metaRegionFactory.newMetaRegion(any(), any(), any(), any())).thenReturn(mockMetaRegion);
}
@Test
public void peekGetsExtraEventsWhenMustGroupTransactionEventsAndNotAllEventsForTransactionsInMaxSizeBatch() {
TestableSerialGatewaySenderQueue queue = new TestableSerialGatewaySenderQueue(sender,
QUEUE_REGION, metaRegionFactory);
queue.setGroupTransactionEvents(true);
List<AsyncEvent<?, ?>> peeked = queue.peek(3, 100);
assertThat(peeked.size()).isEqualTo(4);
List<AsyncEvent<?, ?>> peekedAfter = queue.peek(3, 100);
assertThat(peekedAfter.size()).isEqualTo(3);
}
@Test
public void peekGetsExtraEventsWhenMustGroupTransactionEventsAndNotAllEventsForTransactionsInBatchByTime() {
GatewaySenderEventImpl event1 = createMockGatewaySenderEventImpl(1, false, region);
GatewaySenderEventImpl event2 = createMockGatewaySenderEventImpl(2, false, region);
GatewaySenderEventImpl event3 = createMockGatewaySenderEventImpl(1, true, region);
GatewaySenderEventImpl event4 = createMockGatewaySenderEventImpl(2, true, region);
SerialGatewaySenderQueue.KeyAndEventPair eventPair1 =
new SerialGatewaySenderQueue.KeyAndEventPair(0L, event1);
SerialGatewaySenderQueue.KeyAndEventPair eventPair2 =
new SerialGatewaySenderQueue.KeyAndEventPair(1L, event2);
SerialGatewaySenderQueue.KeyAndEventPair eventPair3 =
new SerialGatewaySenderQueue.KeyAndEventPair(2L, event3);
TestableSerialGatewaySenderQueue realQueue = new TestableSerialGatewaySenderQueue(sender,
QUEUE_REGION, metaRegionFactory);
TestableSerialGatewaySenderQueue queue = spy(realQueue);
queue.setGroupTransactionEvents(true);
doAnswer(invocation -> eventPair1)
.doAnswer(invocation -> eventPair2)
.doAnswer(invocation -> eventPair3)
.doAnswer(invocation -> null)
.when(queue).peekAhead();
doAnswer(invocation -> Collections
.singletonList(new SerialGatewaySenderQueue.KeyAndEventPair(1L, event4)))
.when(queue).getElementsMatching(any(), any(), anyLong());
List<AsyncEvent<?, ?>> peeked = queue.peek(-1, 1);
assertThat(peeked.size()).isEqualTo(4);
}
@Test
public void peekDoesNotGetExtraEventsWhenNotMustGroupTransactionEventsAndNotAllEventsForTransactionsInBatchMaxSize() {
TestableSerialGatewaySenderQueue queue = new TestableSerialGatewaySenderQueue(sender,
QUEUE_REGION, metaRegionFactory);
List<AsyncEvent<?, ?>> peeked = queue.peek(3, 100);
assertThat(peeked.size()).isEqualTo(3);
List<AsyncEvent<?, ?>> peekedAfter = queue.peek(3, 100);
assertThat(peekedAfter.size()).isEqualTo(3);
peekedAfter = queue.peek(1, 100);
assertThat(peekedAfter.size()).isEqualTo(1);
}
@Test
public void peekDoesNotGetExtraEventsWhenNotMustGroupTransactionEventsAndNotAllEventsForTransactionsInBatchByTime() {
GatewaySenderEventImpl event1 = createMockGatewaySenderEventImpl(1, false, region);
GatewaySenderEventImpl event2 = createMockGatewaySenderEventImpl(2, false, region);
GatewaySenderEventImpl event3 = createMockGatewaySenderEventImpl(1, true, region);
GatewaySenderEventImpl event4 = createMockGatewaySenderEventImpl(2, true, region);
SerialGatewaySenderQueue.KeyAndEventPair eventPair1 =
new SerialGatewaySenderQueue.KeyAndEventPair(0L, event1);
SerialGatewaySenderQueue.KeyAndEventPair eventPair2 =
new SerialGatewaySenderQueue.KeyAndEventPair(1L, event2);
SerialGatewaySenderQueue.KeyAndEventPair eventPair3 =
new SerialGatewaySenderQueue.KeyAndEventPair(2L, event3);
TestableSerialGatewaySenderQueue realQueue = new TestableSerialGatewaySenderQueue(sender,
QUEUE_REGION, metaRegionFactory);
TestableSerialGatewaySenderQueue queue = spy(realQueue);
queue.setGroupTransactionEvents(false);
doAnswer(invocation -> eventPair1)
.doAnswer(invocation -> eventPair2)
.doAnswer(invocation -> eventPair3)
.doAnswer(invocation -> null)
.when(queue).peekAhead();
doAnswer(invocation -> Collections
.singletonList(new SerialGatewaySenderQueue.KeyAndEventPair(2L, event4)))
.when(queue).getElementsMatching(any(), any(), anyLong());
List<AsyncEvent<?, ?>> peeked = queue.peek(-1, 1);
assertThat(peeked.size()).isEqualTo(3);
}
@Test
public void peekEventsFromIncompleteTransactionsDoesNotThrowConcurrentModificationExceptionWhenCompletingTwoTransactions() {
GatewaySenderEventImpl event1 = createMockGatewaySenderEventImpl(1, false, region);
GatewaySenderEventImpl event2 = createMockGatewaySenderEventImpl(2, false, region);
TestableSerialGatewaySenderQueue queue = new TestableSerialGatewaySenderQueue(sender,
QUEUE_REGION, metaRegionFactory);
queue.setGroupTransactionEvents(true);
@SuppressWarnings("unchecked")
List<AsyncEvent<?, ?>> batch = new ArrayList(Arrays.asList(event1, event2));
queue.peekEventsFromIncompleteTransactions(batch, 0);
}
@Test
public void removeExtraPeekedEventDoesNotRemoveFromExtraPeekedIdsUntilPreviousEventIsRemoved() {
TestableSerialGatewaySenderQueue queue = new TestableSerialGatewaySenderQueue(sender,
QUEUE_REGION, metaRegionFactory);
queue.setGroupTransactionEvents(true);
List<AsyncEvent<?, ?>> peeked = queue.peek(3, -1);
assertThat(peeked.size()).isEqualTo(4);
assertThat(queue.getLastPeekedId()).isEqualTo(2);
assertThat(queue.getExtraPeekedIds()).contains(5L);
for (Object ignored : peeked) {
queue.remove();
}
assertThat(queue.getExtraPeekedIds()).contains(5L);
peeked = queue.peek(3, -1);
assertThat(peeked.size()).isEqualTo(3);
assertThat(queue.getExtraPeekedIds()).contains(5L);
for (Object ignored : peeked) {
queue.remove();
}
assertThat(queue.getExtraPeekedIds()).doesNotContain(5L);
}
private GatewaySenderEventImpl createMockGatewaySenderEventImpl(int transactionId,
boolean isLastEventInTransaction, Region region) {
GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
when(event.getTransactionId()).thenReturn(new TXId(null, transactionId));
when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
when(event.isLastEventInTransaction()).thenReturn(isLastEventInTransaction);
when(event.getRegion()).thenReturn(region);
return event;
}
private LocalRegion createLocalRegionMock() {
GatewaySenderEventImpl event1 = createMockGatewaySenderEventImpl(1, false, region);
GatewaySenderEventImpl event2 = createMockGatewaySenderEventImpl(2, false, region);
GatewaySenderEventImpl event3 = createMockGatewaySenderEventImpl(1, true, region);
GatewaySenderEventImpl event4 = createMockGatewaySenderEventImpl(3, true, region);
GatewaySenderEventImpl event5 = createMockGatewaySenderEventImpl(4, true, region);
GatewaySenderEventImpl event6 = createMockGatewaySenderEventImpl(2, true, region);
GatewaySenderEventImpl event7 = createMockGatewaySenderEventImpl(5, false, region);
LocalRegion region = mock(LocalRegion.class);
when(region.getValueInVMOrDiskWithoutFaultIn(0L)).thenReturn(event1);
when(region.getValueInVMOrDiskWithoutFaultIn(1L)).thenReturn(event2);
when(region.getValueInVMOrDiskWithoutFaultIn(2L)).thenReturn(event3);
when(region.getValueInVMOrDiskWithoutFaultIn(3L)).thenReturn(event4);
when(region.getValueInVMOrDiskWithoutFaultIn(4L)).thenReturn(event5);
when(region.getValueInVMOrDiskWithoutFaultIn(5L)).thenReturn(event6);
when(region.getValueInVMOrDiskWithoutFaultIn(6L)).thenReturn(event7);
Map<Long, AsyncEvent<?, ?>> map = new HashMap<>();
map.put(0L, event1);
map.put(1L, event2);
map.put(2L, event3);
map.put(3L, event4);
map.put(4L, event5);
map.put(5L, event6);
map.put(6L, event7);
when(region.keySet()).thenReturn(map.keySet());
return region;
}
private static class TestableSerialGatewaySenderQueue extends SerialGatewaySenderQueue {
private boolean groupTransactionEvents = false;
public TestableSerialGatewaySenderQueue(final AbstractGatewaySender sender,
String regionName, final MetaRegionFactory metaRegionFactory) {
super(sender, regionName, null, false, metaRegionFactory);
}
public void setGroupTransactionEvents(boolean groupTransactionEvents) {
this.groupTransactionEvents = groupTransactionEvents;
}
@Override
public boolean mustGroupTransactionEvents() {
return groupTransactionEvents;
}
@Override
protected void addOverflowStatisticsToMBean(Cache cache, AbstractGatewaySender sender) {}
}
}