blob: 1f4d81e4a297826ee2fb9bdeb5693877d73cb5f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan.parallel;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.BucketAdvisor;
import org.apache.geode.internal.cache.BucketRegionQueue;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalRegionFactory;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
import org.apache.geode.internal.cache.TXId;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue.MetaRegionFactory;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue.ParallelGatewaySenderQueueMetaRegion;
public class ParallelGatewaySenderQueueJUnitTest {
private ParallelGatewaySenderQueue queue;
private MetaRegionFactory metaRegionFactory;
private GemFireCacheImpl cache;
private AbstractGatewaySender sender;
@Before
public void createParallelGatewaySenderQueue() {
cache = mock(GemFireCacheImpl.class);
sender = mock(AbstractGatewaySender.class);
CancelCriterion cancelCriterion = mock(CancelCriterion.class);
when(sender.getCancelCriterion()).thenReturn(cancelCriterion);
when(sender.getCache()).thenReturn(cache);
when(sender.getMaximumQueueMemory()).thenReturn(100);
when(sender.getLifeCycleLock()).thenReturn(new ReentrantReadWriteLock());
when(sender.getId()).thenReturn("");
metaRegionFactory = mock(MetaRegionFactory.class);
queue = new ParallelGatewaySenderQueue(sender, Collections.emptySet(), 0, 1, metaRegionFactory,
false);
}
@Test
public void whenReplicatedDataRegionNotReadyShouldNotThrowException() throws Exception {
GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
when(event.getRegion()).thenReturn(null);
String regionPath = "/testRegion";
when(event.getRegionPath()).thenReturn(regionPath);
Mockito.doThrow(new IllegalStateException()).when(event).release();
Queue backingList = new LinkedList();
backingList.add(event);
queue = spy(queue);
doReturn(true).when(queue).isDREvent(any(), any());
boolean putDone = queue.put(event);
assertThat(putDone).isFalse();
}
@Test
public void whenPartitionedDataRegionNotReadyShouldNotThrowException() throws Exception {
GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
when(event.getRegion()).thenReturn(null);
String regionPath = "/testRegion";
when(event.getRegionPath()).thenReturn(regionPath);
PartitionedRegion region = mock(PartitionedRegion.class);
when(region.getFullPath()).thenReturn(regionPath);
when(cache.getRegion(regionPath, true)).thenReturn(region);
PartitionAttributes pa = mock(PartitionAttributes.class);
when(region.getPartitionAttributes()).thenReturn(pa);
when(pa.getColocatedWith()).thenReturn(null);
Mockito.doThrow(new IllegalStateException()).when(event).release();
Queue backingList = new LinkedList();
backingList.add(event);
BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender,
Collections.emptySet(), 0, 1, metaRegionFactory);
queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
queue = spy(queue);
boolean putDone = queue.put(event);
assertThat(putDone).isFalse();
}
private void testEnqueueToBrqAfterLockFailedInitialImageReadLock(boolean isTmpQueue)
throws InterruptedException {
GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
String regionPath = "/userPR";
when(event.getRegionPath()).thenReturn(regionPath);
when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
when(event.getRegion()).thenReturn(null);
when(event.getBucketId()).thenReturn(1);
when(event.getShadowKey()).thenReturn(100L);
when(sender.isPersistenceEnabled()).thenReturn(true);
PartitionedRegionDataStore prds = mock(PartitionedRegionDataStore.class);
PartitionedRegion prQ = mock(PartitionedRegion.class);
AbstractBucketRegionQueue brq = mock(AbstractBucketRegionQueue.class);
ReentrantReadWriteLock initializationLock = mock(ReentrantReadWriteLock.class);
ReentrantReadWriteLock.ReadLock readLock = mock(ReentrantReadWriteLock.ReadLock.class);
when(initializationLock.readLock()).thenReturn(readLock);
doNothing().when(readLock).lock();
doNothing().when(readLock).unlock();
doNothing().when(brq).unlockWhenRegionIsInitializing();
when(brq.getInitializationLock()).thenReturn(initializationLock);
when(brq.lockWhenRegionIsInitializing()).thenReturn(true);
when(prQ.getDataStore()).thenReturn(prds);
when(prQ.getCache()).thenReturn(cache);
when(prQ.getBucketName(1)).thenReturn("_B__PARALLEL_GATEWAY_SENDER_QUEUE_1");
when(prds.getLocalBucketById(1)).thenReturn(null);
PartitionedRegion userPR = mock(PartitionedRegion.class);
PartitionAttributes pa = mock(PartitionAttributes.class);
when(userPR.getPartitionAttributes()).thenReturn(pa);
when(pa.getColocatedWith()).thenReturn(null);
when(userPR.getDataPolicy()).thenReturn(DataPolicy.PERSISTENT_PARTITION);
when(userPR.getFullPath()).thenReturn(regionPath);
when(cache.getRegion("_PARALLEL_GATEWAY_SENDER_QUEUE")).thenReturn(prQ);
when(cache.getRegion(regionPath, true)).thenReturn(userPR);
when(prQ.getColocatedWithRegion()).thenReturn(userPR);
RegionAdvisor ra = mock(RegionAdvisor.class);
BucketAdvisor ba = mock(BucketAdvisor.class);
when(userPR.getRegionAdvisor()).thenReturn(ra);
when(ra.getBucketAdvisor(1)).thenReturn(ba);
when(ba.isShadowBucketDestroyed("/__PR/_B__PARALLEL_GATEWAY_SENDER_QUEUE_1")).thenReturn(false);
prepareBrq(brq, isTmpQueue);
Mockito.doThrow(new IllegalStateException()).when(event).release();
Queue backingList = new LinkedList();
backingList.add(event);
BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender,
Collections.emptySet(), 0, 1, metaRegionFactory);
queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
InOrder inOrder = inOrder(brq, readLock);
queue = spy(queue);
queue.addShadowPartitionedRegionForUserPR(userPR);
doNothing().when(queue).putIntoBucketRegionQueue(eq(brq), any(), eq(event));
boolean putDone = queue.put(event);
assertThat(putDone).isTrue();
inOrder.verify(brq).lockWhenRegionIsInitializing();
inOrder.verify(readLock).lock();
inOrder.verify(readLock).unlock();
inOrder.verify(brq).unlockWhenRegionIsInitializing();
}
private void prepareBrq(AbstractBucketRegionQueue brq, boolean isTmpQueue) {
if (isTmpQueue) {
when(cache.getInternalRegionByPath("/__PR/_B__PARALLEL_GATEWAY_SENDER_QUEUE_1"))
.thenReturn(null).thenReturn(brq);
} else {
when(cache.getInternalRegionByPath("/__PR/_B__PARALLEL_GATEWAY_SENDER_QUEUE_1"))
.thenReturn(brq);
}
}
@Test
public void enqueueToInitializingBrqShouldLockFailedInitialImageReadLock() throws Exception {
testEnqueueToBrqAfterLockFailedInitialImageReadLock(false);
}
@Test
public void enqueueToTmpQueueShouldLockFailedInitialImageReadLock() throws Exception {
testEnqueueToBrqAfterLockFailedInitialImageReadLock(true);
}
@Test
public void whenEventReleaseFromOffHeapFailsExceptionShouldNotBeThrownToAckReaderThread()
throws Exception {
GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
Mockito.doThrow(new IllegalStateException()).when(event).release();
Queue backingList = new LinkedList();
backingList.add(event);
BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender,
Collections.emptySet(), 0, 1, metaRegionFactory);
queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
List peeked = queue.peek(1, 100);
assertEquals(1, peeked.size());
queue.remove();
}
@Test
public void whenGatewayEventUnableToResolveFromOffHeapTheStatForNotQueuedConflatedShouldBeIncremented()
throws Exception {
GatewaySenderStats stats = mockGatewaySenderStats();
GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
when(event.makeHeapCopyIfOffHeap()).thenReturn(null);
GatewaySenderEventImpl eventResolvesFromOffHeap = mock(GatewaySenderEventImpl.class);
when(eventResolvesFromOffHeap.makeHeapCopyIfOffHeap()).thenReturn(eventResolvesFromOffHeap);
Queue backingList = new LinkedList();
backingList.add(event);
backingList.add(eventResolvesFromOffHeap);
BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender,
Collections.emptySet(), 0, 1, metaRegionFactory);
queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
List peeked = queue.peek(1, 100);
assertEquals(1, peeked.size());
verify(stats, times(1)).incEventsNotQueuedConflated();
}
private GatewaySenderStats mockGatewaySenderStats() {
GatewaySenderStats stats = mock(GatewaySenderStats.class);
when(sender.getStatistics()).thenReturn(stats);
return stats;
}
@Test
public void whenNullPeekedEventFromBucketRegionQueueTheStatForNotQueuedConflatedShouldBeIncremented()
throws Exception {
GatewaySenderStats stats = mockGatewaySenderStats();
GatewaySenderEventImpl eventResolvesFromOffHeap = mock(GatewaySenderEventImpl.class);
when(eventResolvesFromOffHeap.makeHeapCopyIfOffHeap()).thenReturn(eventResolvesFromOffHeap);
Queue backingList = new LinkedList();
backingList.add(null);
backingList.add(eventResolvesFromOffHeap);
BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender,
Collections.emptySet(), 0, 1, metaRegionFactory);
queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
List peeked = queue.peek(1, 100);
assertEquals(1, peeked.size());
verify(stats, times(1)).incEventsNotQueuedConflated();
}
@Test
public void testLocalSize() throws Exception {
ParallelGatewaySenderQueueMetaRegion mockMetaRegion =
mock(ParallelGatewaySenderQueueMetaRegion.class);
PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class);
when(mockMetaRegion.getDataStore()).thenReturn(dataStore);
when(dataStore.getSizeOfLocalPrimaryBuckets()).thenReturn(3);
when(metaRegionFactory.newMetataRegion(any(), any(), any(), any())).thenReturn(mockMetaRegion);
InternalRegionFactory regionFactory = mock(InternalRegionFactory.class);
when(regionFactory.create(any())).thenReturn(mockMetaRegion);
when(cache.createInternalRegionFactory(any())).thenReturn(regionFactory);
queue.addShadowPartitionedRegionForUserPR(mockPR("region1"));
assertEquals(3, queue.localSize());
}
@Test
public void isDREventReturnsTrueForDistributedRegionEvent() {
String regionPath = "regionPath";
GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
when(event.getRegionPath()).thenReturn(regionPath);
DistributedRegion region = mock(DistributedRegion.class);
when(cache.getRegion(regionPath)).thenReturn(region);
ParallelGatewaySenderQueue queue = mock(ParallelGatewaySenderQueue.class);
when(queue.isDREvent(cache, event)).thenCallRealMethod();
assertThat(queue.isDREvent(cache, event)).isTrue();
}
@Test
public void isDREventReturnsFalseForPartitionedRegionEvent() {
String regionPath = "regionPath";
GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
when(event.getRegionPath()).thenReturn(regionPath);
PartitionedRegion region = mock(PartitionedRegion.class);
when(cache.getRegion(regionPath)).thenReturn(region);
ParallelGatewaySenderQueue queue = mock(ParallelGatewaySenderQueue.class);
when(queue.isDREvent(cache, event)).thenCallRealMethod();
assertThat(queue.isDREvent(cache, event)).isFalse();
}
@Test
public void peekGetsExtraEventsWhenMustGroupTransactionEventsAndNotAllEventsForTransactionsInMaxSizeBatch()
throws Exception {
GatewaySenderEventImpl event1 = createGatewaySenderEventImpl(1, false);
GatewaySenderEventImpl event2 = createGatewaySenderEventImpl(2, false);
GatewaySenderEventImpl event3 = createGatewaySenderEventImpl(1, true);
GatewaySenderEventImpl event4 = createGatewaySenderEventImpl(2, true);
GatewaySenderEventImpl event5 = createGatewaySenderEventImpl(3, false);
GatewaySenderEventImpl event6 = createGatewaySenderEventImpl(3, true);
Queue backingList = new LinkedList();
backingList.add(event1);
backingList.add(event2);
backingList.add(event3);
backingList.add(event4);
backingList.add(event5);
backingList.add(event6);
BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender,
Collections.emptySet(), 0, 1, metaRegionFactory);
queue.setGroupTransactionEvents(true);
queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
List peeked = queue.peek(3, 100);
assertEquals(4, peeked.size());
List peekedAfter = queue.peek(3, 100);
assertEquals(2, peekedAfter.size());
}
@Test
public void peekGetsExtraEventsWhenMustGroupTransactionEventsAndNotAllEventsForTransactionsInBatchByTime()
throws Exception {
GatewaySenderEventImpl event1 = createGatewaySenderEventImpl(1, false);
GatewaySenderEventImpl event2 = createGatewaySenderEventImpl(2, false);
GatewaySenderEventImpl event3 = createGatewaySenderEventImpl(1, true);
GatewaySenderEventImpl event4 = createGatewaySenderEventImpl(2, true);
GatewaySenderEventImpl event5 = createGatewaySenderEventImpl(3, false);
GatewaySenderEventImpl event6 = createGatewaySenderEventImpl(3, true);
Queue backingList = new LinkedList();
backingList.add(event1);
backingList.add(event2);
backingList.add(event3);
backingList.add(null);
backingList.add(event4);
backingList.add(event5);
backingList.add(event6);
BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender,
Collections.emptySet(), 0, 1, metaRegionFactory);
queue.setGroupTransactionEvents(true);
queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
List peeked = queue.peek(-1, 1);
assertEquals(4, peeked.size());
List peekedAfter = queue.peek(-1, 100);
assertEquals(2, peekedAfter.size());
}
@Test
public void peekDoesNotGetExtraEventsWhenNotMustGroupTransactionEventsAndNotAllEventsForTransactionsInBatchMaxSize()
throws Exception {
GatewaySenderEventImpl event1 = createGatewaySenderEventImpl(1, false);
GatewaySenderEventImpl event2 = createGatewaySenderEventImpl(1, false);
GatewaySenderEventImpl event3 = createGatewaySenderEventImpl(2, false);
GatewaySenderEventImpl event4 = createGatewaySenderEventImpl(1, true);
GatewaySenderEventImpl event5 = createGatewaySenderEventImpl(2, true);
Queue backingList = new LinkedList();
backingList.add(event1);
backingList.add(event2);
backingList.add(event3);
backingList.add(event4);
backingList.add(event5);
BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender,
Collections.emptySet(), 0, 1, metaRegionFactory);
queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
List peeked = queue.peek(3, 100);
assertEquals(3, peeked.size());
List peekedAfter = queue.peek(3, 100);
assertEquals(2, peekedAfter.size());
}
@Test
public void peekDoesNotGetExtraEventsWhenMustGroupTransactionEventsAndNotAllEventsForTransactionsInBatchByTime()
throws Exception {
GatewaySenderEventImpl event1 = createGatewaySenderEventImpl(1, false);
GatewaySenderEventImpl event2 = createGatewaySenderEventImpl(2, false);
GatewaySenderEventImpl event3 = createGatewaySenderEventImpl(1, true);
GatewaySenderEventImpl event4 = createGatewaySenderEventImpl(2, true);
GatewaySenderEventImpl event5 = createGatewaySenderEventImpl(3, false);
GatewaySenderEventImpl event6 = createGatewaySenderEventImpl(3, true);
Queue backingList = new LinkedList();
backingList.add(event1);
backingList.add(event2);
backingList.add(event3);
backingList.add(null);
backingList.add(event4);
backingList.add(event5);
backingList.add(event6);
BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender,
Collections.emptySet(), 0, 1, metaRegionFactory);
queue.setGroupTransactionEvents(false);
queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
List peeked = queue.peek(-1, 1);
assertEquals(3, peeked.size());
List peekedAfter = queue.peek(-1, 100);
assertEquals(3, peekedAfter.size());
}
@Test
public void testCalculateTimeToSleepNegativeInputReturnsZero() {
assertEquals(0L, ParallelGatewaySenderQueue.calculateTimeToSleep(-3));
}
@Test
public void testCalculateTimeToSleepZeroInputReturnsZero() {
assertEquals(0L, ParallelGatewaySenderQueue.calculateTimeToSleep(0));
}
@Test
public void testCalculateTimeToSleepInputGreaterThanOneThousand() {
assertEquals(50L, ParallelGatewaySenderQueue.calculateTimeToSleep(1002));
}
@Test
public void testCalculateTimeToSleepInputSmallerThanOneThousand() {
assertEquals(2L, ParallelGatewaySenderQueue.calculateTimeToSleep(40));
}
private GatewaySenderEventImpl createGatewaySenderEventImpl(int transactionId,
boolean isLastEventInTransaction) {
GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
when(event.getTransactionId()).thenReturn(new TXId(null, transactionId));
when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
when(event.isLastEventInTransaction()).thenReturn(isLastEventInTransaction);
return event;
}
private PartitionedRegion mockPR(String name) {
PartitionedRegion region = mock(PartitionedRegion.class);
when(region.getFullPath()).thenReturn(name);
when(region.getPartitionAttributes()).thenReturn(new PartitionAttributesFactory<>().create());
when(region.getTotalNumberOfBuckets()).thenReturn(113);
when(region.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
return region;
}
private BucketRegionQueue mockBucketRegionQueue(final Queue backingList) {
PartitionedRegion mockBucketRegion = mockPR("bucketRegion");
// These next mocked return calls are for when peek is called. It ends up checking these on the
// mocked pr region
when(mockBucketRegion.getLocalMaxMemory()).thenReturn(100);
when(mockBucketRegion.size()).thenReturn(backingList.size());
BucketRegionQueue bucketRegionQueue = mock(BucketRegionQueue.class);
when(bucketRegionQueue.getPartitionedRegion()).thenReturn(mockBucketRegion);
when(bucketRegionQueue.peek())
.thenAnswer((Answer) invocation -> pollAndWaitIfNull(backingList));
when(bucketRegionQueue.getElementsMatching(any(), any()))
.thenAnswer((Answer) invocation -> Arrays
.asList(new Object[] {getFirstNotNull(backingList)}));
return bucketRegionQueue;
}
private Object pollAndWaitIfNull(Queue queue) {
Object object = queue.poll();
if (object == null) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return object;
}
private Object getFirstNotNull(Queue queue) {
Object object = queue.poll();
while (object == null) {
object = queue.poll();
}
return object;
}
private class TestableParallelGatewaySenderQueue extends ParallelGatewaySenderQueue {
private BucketRegionQueue mockedAbstractBucketRegionQueue;
private boolean groupTransactionEvents = false;
public TestableParallelGatewaySenderQueue(final AbstractGatewaySender sender,
final Set<Region> userRegions, final int idx, final int nDispatcher) {
super(sender, userRegions, idx, nDispatcher, false);
}
public TestableParallelGatewaySenderQueue(final AbstractGatewaySender sender,
final Set<Region> userRegions, final int idx, final int nDispatcher,
final MetaRegionFactory metaRegionFactory) {
super(sender, userRegions, idx, nDispatcher, metaRegionFactory, false);
}
public void setMockedAbstractBucketRegionQueue(BucketRegionQueue mocked) {
this.mockedAbstractBucketRegionQueue = mocked;
}
public void setGroupTransactionEvents(boolean groupTransactionEvents) {
this.groupTransactionEvents = groupTransactionEvents;
}
@Override
public boolean mustGroupTransactionEvents() {
return groupTransactionEvents;
}
public AbstractBucketRegionQueue getBucketRegion(final PartitionedRegion prQ,
final int bucketId) {
return mockedAbstractBucketRegionQueue;
}
@Override
public boolean areLocalBucketQueueRegionsPresent() {
return true;
}
@Override
protected PartitionedRegion getRandomShadowPR() {
return mockedAbstractBucketRegionQueue.getPartitionedRegion();
}
@Override
protected int getRandomPrimaryBucket(PartitionedRegion pr) {
return 0;
}
@Override
protected BucketRegionQueue getBucketRegionQueueByBucketId(PartitionedRegion prQ,
int bucketId) {
return mockedAbstractBucketRegionQueue;
}
}
}