blob: b671fd753664fee4e940d02c9c141faf4f211bdc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.ha;
import static junit.framework.TestCase.assertEquals;
import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.HARegion;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
@Category({ClientSubscriptionTest.class})
public class HARegionQueueTest {
private HARegionQueue haRegionQueue;
private final HARegion haRegion = mock(HARegion.class);
private final InternalCache internalCache = mock(InternalCache.class);
private final List<EventID> eventIDs = new LinkedList<>();
private final EventID id1 = mock(EventID.class);
private final EventID id2 = mock(EventID.class);
private final EventID id3 = mock(EventID.class);
private final EventID id4 = mock(EventID.class);
private final Collection<List<EventID>> chunks = new LinkedList<>();
private final List<EventID> chunk1 = new LinkedList<>();
private final List<EventID> chunk2 = new LinkedList<>();
private final InternalDistributedMember primary = mock(InternalDistributedMember.class);
private final HARegionQueue.DispatchedAndCurrentEvents wrapper =
new HARegionQueue.DispatchedAndCurrentEvents();
private final long delay = 1L;
@Before
public void setup() throws IOException, ClassNotFoundException, InterruptedException {
StoppableReentrantReadWriteLock giiLock = mock(StoppableReentrantReadWriteLock.class);
when(giiLock.readLock())
.thenReturn(mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
StoppableReentrantReadWriteLock rwLock = mock(StoppableReentrantReadWriteLock.class);
when(rwLock.writeLock())
.thenReturn(mock(StoppableReentrantReadWriteLock.StoppableWriteLock.class));
when(rwLock.readLock())
.thenReturn(mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
HashMap map = new HashMap();
when(haRegion.put(any(), any())).then((invocationOnMock) -> {
return map.put(invocationOnMock.getArgument(0), invocationOnMock.getArgument(1));
});
when(haRegion.get(any())).then((invocationOnMock) -> {
return map.get(invocationOnMock.getArgument(0));
});
when(haRegion.getGemFireCache()).thenReturn(internalCache);
haRegionQueue = new HARegionQueue("haRegion", haRegion, internalCache,
new HAContainerMap(new ConcurrentHashMap()), null, (byte) 1, true,
mock(HARegionQueueStats.class), giiLock, rwLock, mock(CancelCriterion.class), false,
mock(StatisticsClock.class));
CacheClientNotifier.resetInstance();
}
@Test
public void whenProxyIDisNullThenItIsNotAddedToClientInterestList() {
ClientUpdateMessageImpl clientUpdateMessage = mock(ClientUpdateMessageImpl.class);
HAEventWrapper haEventWrapper = mock(HAEventWrapper.class);
HAContainerWrapper haContainerWrapper = mock(HAContainerWrapper.class);
String regionName = "mockRegion";
when(haContainerWrapper.getProxyID(any())).thenReturn(null);
haRegionQueue.addClientCQsAndInterestList(clientUpdateMessage, haEventWrapper,
haContainerWrapper, regionName);
verify(haEventWrapper, times(0)).getClientCqs();
verify(haEventWrapper, times(0)).getClientUpdateMessage();
}
@Test
public void conflateConflatableEntriesAndDoNotConflateNonConflatableEntries() throws Exception {
EventID eventId1 = new EventID(new byte[] {1}, 1, 1);
EventID eventId2 = new EventID(new byte[] {1}, 1, 2);
EventID eventId3 = new EventID(new byte[] {1}, 1, 3);
EventID eventId4 = new EventID(new byte[] {1}, 1, 4);
EventID eventId5 = new EventID(new byte[] {1}, 1, 5);
EventID eventId6 = new EventID(new byte[] {1}, 1, 6);
EventID eventId7 = new EventID(new byte[] {1}, 1, 7);
haRegionQueue.put(new ConflatableObject("key", "value1", eventId1, false, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value2", eventId2, true, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value3", eventId3, true, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value4", eventId4, true, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value5", eventId5, true, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value6", eventId6, true, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value7", eventId7, false, "someRegion"));
assertEquals(3, haRegionQueue.size());
}
@Test
public void queueShouldconflateConflatableEntries() throws Exception {
EventID eventId1 = new EventID(new byte[] {1}, 1, 1);
EventID eventId2 = new EventID(new byte[] {1}, 1, 2);
EventID eventId3 = new EventID(new byte[] {1}, 1, 3);
EventID eventId4 = new EventID(new byte[] {1}, 1, 4);
EventID eventId5 = new EventID(new byte[] {1}, 1, 5);
EventID eventId6 = new EventID(new byte[] {1}, 1, 6);
EventID eventId7 = new EventID(new byte[] {1}, 1, 7);
haRegionQueue.put(new ConflatableObject("key", "value1", eventId1, true, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value2", eventId2, true, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value3", eventId3, true, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value4", eventId4, true, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value5", eventId5, true, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value6", eventId6, true, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value7", eventId7, true, "someRegion"));
assertEquals(1, haRegionQueue.size());
}
@Test
public void queuePutElidesSequenceIdLowerThanOrEqualToLastSeenSequenceId() throws Exception {
EventID eventId1 = new EventID(new byte[] {1}, 1, 1);
EventID eventId2 = new EventID(new byte[] {1}, 1, 2);
EventID eventId3 = new EventID(new byte[] {1}, 1, 0);
EventID eventId4 = new EventID(new byte[] {1}, 1, 3);
haRegionQueue.put(new ConflatableObject("key", "value1", eventId1, false, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value2", eventId2, false, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value3", eventId3, false, "someRegion"));
haRegionQueue.put(new ConflatableObject("key", "value3", eventId4, false, "someRegion"));
assertEquals(3, haRegionQueue.size());
}
@Test
public void isQueueInitializedWithWaitDoesNotWaitIfInitialized() throws Exception {
long time = 1;
HARegionQueue spy = spy(haRegionQueue);
doReturn(true).when(spy).isQueueInitialized();
assertThat(spy.isQueueInitializedWithWait(time)).isTrue();
verify(spy, never()).waitForInitialized(time);
}
@Test
public void isQueueInitializedWithWaitWillWaitIfNotInitialized() throws Exception {
long time = 1;
HARegionQueue spy = spy(haRegionQueue);
doReturn(false).doReturn(true).when(spy).isQueueInitialized();
doNothing().when(spy).waitForInitialized(time);
assertThat(spy.isQueueInitializedWithWait(time)).isTrue();
verify(spy).waitForInitialized(time);
}
@Test
public void isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() throws Exception {
long time = 1;
HARegionQueue spy = spy(haRegionQueue);
doReturn(false).doReturn(false).when(spy).isQueueInitialized();
doNothing().when(spy).waitForInitialized(time);
assertThat(spy.isQueueInitializedWithWait(time)).isFalse();
verify(spy).waitForInitialized(time);
}
@Test
public void getDispatchedEventsReturnsDispatchedEvents() {
HARegionQueue spy = spy(haRegionQueue);
addEvents();
doReturn(false).when(spy).isDispatched(id1);
doReturn(true).when(spy).isDispatched(id2);
doReturn(true).when(spy).isDispatched(id3);
doReturn(false).when(spy).isDispatched(id4);
List<EventID> dispatchedEvents = spy.getDispatchedEvents(eventIDs);
assertThat(dispatchedEvents).containsExactlyInAnyOrder(id2, id3);
}
private void addEvents() {
eventIDs.add(id1);
eventIDs.add(id2);
eventIDs.add(id3);
eventIDs.add(id4);
}
@Test
public void isDispatchedReturnsTrueIfDispatchedAndCurrentEventsAreRemoved() {
HARegionQueue spy = spy(haRegionQueue);
doReturn(null).when(spy).getDispatchedAndCurrentEvents(id1);
assertThat(spy.isDispatched(id1)).isTrue();
}
@Test
public void isDispatchedReturnsFalseIfSequenceIdGreaterThanLastDispatched() {
HARegionQueue spy = spy(haRegionQueue);
when(id1.getSequenceID()).thenReturn(100L);
wrapper.lastDispatchedSequenceId = 99L;
doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
assertThat(spy.isDispatched(id1)).isFalse();
}
@Test
public void isDispatchedReturnsTrueIfSequenceIdEqualsLastDispatched() {
HARegionQueue spy = spy(haRegionQueue);
when(id1.getSequenceID()).thenReturn(100L);
wrapper.lastDispatchedSequenceId = 100L;
doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
assertThat(spy.isDispatched(id1)).isTrue();
}
@Test
public void isDispatchedReturnsTrueIfSequenceIdLessThanLastDispatched() {
HARegionQueue spy = spy(haRegionQueue);
when(id1.getSequenceID()).thenReturn(90L);
wrapper.lastDispatchedSequenceId = 100L;
doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
assertThat(spy.isDispatched(id1)).isTrue();
}
@Test
public void doNotScheduleSynchronizationWithPrimaryIfHasDoneSynchronization() {
HARegionQueue spy = spy(haRegionQueue);
spy.hasSynchronizedWithPrimary.set(true);
doReturn(delay).when(spy).getDelay();
spy.synchronizeQueueWithPrimary(primary, internalCache);
verify(spy, never()).scheduleSynchronizationWithPrimary(primary, internalCache, delay);
}
@Test
public void doNotScheduleSynchronizationWithPrimaryIfSynchronizationIsInProgress() {
HARegionQueue spy = spy(haRegionQueue);
spy.scheduleSynchronizationWithPrimaryInProgress.set(true);
doReturn(delay).when(spy).getDelay();
spy.synchronizeQueueWithPrimary(primary, internalCache);
verify(spy, never()).scheduleSynchronizationWithPrimary(primary, internalCache, delay);
}
@Test
public void doNotScheduleSynchronizationWithPrimaryIfGIINotFinished() {
HARegionQueue spy = spy(haRegionQueue);
doReturn(delay).when(spy).getDelay();
spy.synchronizeQueueWithPrimary(primary, internalCache);
verify(spy, never()).scheduleSynchronizationWithPrimary(primary, internalCache, delay);
}
@Test
public void doNotScheduleSynchronizationWithPrimaryIfPrimaryHasOlderThanGEODE_1_14_0Version() {
HARegionQueue spy = spy(haRegionQueue);
spy.doneGIIQueueing.set(true);
when(primary.getVersionOrdinal()).thenReturn((short) (KnownVersion.GEODE_1_14_0.ordinal() - 1));
doReturn(delay).when(spy).getDelay();
spy.synchronizeQueueWithPrimary(primary, internalCache);
verify(spy, never()).scheduleSynchronizationWithPrimary(primary, internalCache, delay);
}
@Test
public void scheduleSynchronizationWithPrimaryIfPrimaryIsGEODE_1_14_0Version() {
HARegionQueue spy = spy(haRegionQueue);
spy.doneGIIQueueing.set(true);
when(primary.getVersionOrdinal()).thenReturn(KnownVersion.GEODE_1_14_0.ordinal());
doReturn(delay).when(spy).getDelay();
doNothing().when(spy).scheduleSynchronizationWithPrimary(primary, internalCache, delay);
spy.synchronizeQueueWithPrimary(primary, internalCache);
verify(spy).scheduleSynchronizationWithPrimary(primary, internalCache, delay);
}
@Test
public void scheduleSynchronizationWithPrimaryIfPrimaryIsLaterThanGEODE_1_14_0Version() {
HARegionQueue spy = spy(haRegionQueue);
spy.doneGIIQueueing.set(true);
when(primary.getVersionOrdinal()).thenReturn((short) (KnownVersion.GEODE_1_14_0.ordinal() + 1));
doReturn(delay).when(spy).getDelay();
doNothing().when(spy).scheduleSynchronizationWithPrimary(primary, internalCache, delay);
spy.synchronizeQueueWithPrimary(primary, internalCache);
verify(spy).scheduleSynchronizationWithPrimary(primary, internalCache, delay);
}
@Test
public void getGIIEventsReturnsCorrectEvents() {
HARegionQueue spy = spy(haRegionQueue);
List<EventID> giiEvents;
spy.positionBeforeGII = 1;
spy.positionAfterGII = 4;
HAEventWrapper wrapper1 = mock(HAEventWrapper.class);
HAEventWrapper wrapper2 = mock(HAEventWrapper.class);
when(wrapper1.getEventId()).thenReturn(id1);
when(wrapper2.getEventId()).thenReturn(id2);
Region.Entry<Object, Object> entry1 = uncheckedCast(mock(Region.Entry.class));
Region.Entry<Object, Object> entry2 = uncheckedCast(mock(Region.Entry.class));
Region.Entry<Object, Object> entry3 = uncheckedCast(mock(Region.Entry.class));
when(entry1.getValue()).thenReturn(wrapper1);
when(entry2.getValue()).thenReturn(new Object());
when(entry3.getValue()).thenReturn(wrapper2);
when(haRegion.getEntry(1L)).thenReturn(entry1);
when(haRegion.getEntry(2L)).thenReturn(entry2);
when(haRegion.getEntry(3L)).thenReturn(null);
when(haRegion.getEntry(4L)).thenReturn(entry3);
giiEvents = spy.getGIIEvents();
assertThat(giiEvents).containsExactlyInAnyOrder(id1, id2);
}
@Test
public void doSynchronizationWithPrimaryReturnsIfNoGIIEvents() {
HARegionQueue spy = spy(haRegionQueue);
int maxChunkSize = 1000;
spy.hasSynchronizedWithPrimary.set(true);
doReturn(new LinkedList<EventID>()).when(spy).getGIIEvents();
spy.doSynchronizationWithPrimary(primary, internalCache);
verify(spy, never()).getChunks(eventIDs, maxChunkSize);
verify(spy, never()).removeDispatchedEvents(primary, internalCache, eventIDs);
assertThat(spy.hasSynchronizedWithPrimary).isTrue();
assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
}
@Test
public void doSynchronizationWithPrimaryRemoveDispatchedEvents() {
HARegionQueue spy = spy(haRegionQueue);
int maxChunkSize = 1000;
addEvents();
doReturn(eventIDs).when(spy).getGIIEvents();
doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, eventIDs);
spy.doSynchronizationWithPrimary(primary, internalCache);
verify(spy, never()).getChunks(eventIDs, maxChunkSize);
verify(spy).removeDispatchedEvents(primary, internalCache, eventIDs);
assertThat(spy.hasSynchronizedWithPrimary).isTrue();
assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
}
@Test
public void hasSynchronizedWithPrimaryNotSetIfRemoveDispatchedEventsFails() {
HARegionQueue spy = spy(haRegionQueue);
int maxChunkSize = 1000;
addEvents();
doReturn(eventIDs).when(spy).getGIIEvents();
doReturn(false).when(spy).removeDispatchedEvents(primary, internalCache, eventIDs);
spy.doSynchronizationWithPrimary(primary, internalCache);
verify(spy, never()).getChunks(eventIDs, maxChunkSize);
verify(spy).removeDispatchedEvents(primary, internalCache, eventIDs);
assertThat(spy.hasSynchronizedWithPrimary).isFalse();
assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
}
@Test
public void hasSynchronizedWithPrimaryRemoveChunksIfManyGIIEvents() {
HARegionQueue spy = spy(haRegionQueue);
int maxChunkSize = 1000;
for (int i = 0; i < 1100; i++) {
eventIDs.add(mock(EventID.class));
}
createChunks();
doReturn(eventIDs).when(spy).getGIIEvents();
doReturn(chunks).when(spy).getChunks(eventIDs, maxChunkSize);
doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, chunk1);
doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, chunk2);
spy.doSynchronizationWithPrimary(primary, internalCache);
verify(spy).getChunks(eventIDs, maxChunkSize);
verify(spy).removeDispatchedEvents(primary, internalCache, chunk1);
verify(spy).removeDispatchedEvents(primary, internalCache, chunk2);
assertThat(spy.hasSynchronizedWithPrimary).isTrue();
assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
}
private void createChunks() {
chunk1.add(id1);
chunk2.add(id2);
chunks.add(chunk1);
chunks.add(chunk2);
}
@Test
public void hasSynchronizedWithPrimaryNotSetIfRemoveChunksFails() {
HARegionQueue spy = spy(haRegionQueue);
int maxChunkSize = 1000;
for (int i = 0; i < 1100; i++) {
eventIDs.add(mock(EventID.class));
}
createChunks();
doReturn(eventIDs).when(spy).getGIIEvents();
doReturn(chunks).when(spy).getChunks(eventIDs, maxChunkSize);
doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, chunk1);
doReturn(false).when(spy).removeDispatchedEvents(primary, internalCache, chunk2);
spy.doSynchronizationWithPrimary(primary, internalCache);
verify(spy).getChunks(eventIDs, maxChunkSize);
verify(spy).removeDispatchedEvents(primary, internalCache, chunk1);
verify(spy).removeDispatchedEvents(primary, internalCache, chunk2);
assertThat(spy.hasSynchronizedWithPrimary).isFalse();
assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
}
@Test
public void getChunksReturnsEqualSizedChunks() {
HARegionQueue spy = spy(haRegionQueue);
addEvents();
// add more events
eventIDs.add(mock(EventID.class));
eventIDs.add(mock(EventID.class));
int maxChunkSize = 2;
Collection<List<EventID>> myChunks = spy.getChunks(eventIDs, maxChunkSize);
assertThat(myChunks.size()).isEqualTo(eventIDs.size() / maxChunkSize);
for (List<EventID> chunk : myChunks) {
assertThat(chunk.size()).isEqualTo(maxChunkSize);
}
}
@Test
public void removeDispatchedEventAfterSyncWithPrimaryRemovesEvents() throws Exception {
HARegionQueue spy = spy(haRegionQueue);
doNothing().when(spy).removeDispatchedEvents(id1);
assertThat(spy.removeDispatchedEventAfterSyncWithPrimary(id1)).isTrue();
verify(spy).removeDispatchedEvents(id1);
}
@Test
public void removeDispatchedEventReturnsTrueIfRemovalThrowsCacheException() throws Exception {
HARegionQueue spy = spy(haRegionQueue);
doThrow(new EntryNotFoundException("")).when(spy).removeDispatchedEvents(id1);
assertThat(spy.removeDispatchedEventAfterSyncWithPrimary(id1)).isTrue();
verify(spy).removeDispatchedEvents(id1);
}
@Test
public void removeDispatchedEventReturnsTrueIfRemovalThrowsRegionDestroyedException()
throws Exception {
HARegionQueue spy = spy(haRegionQueue);
doThrow(new RegionDestroyedException("", "")).when(spy).removeDispatchedEvents(id1);
assertThat(spy.removeDispatchedEventAfterSyncWithPrimary(id1)).isTrue();
verify(spy).removeDispatchedEvents(id1);
}
@Test
public void removeDispatchedEventReturnsFalseIfRemovalThrowsCancelException() throws Exception {
HARegionQueue spy = spy(haRegionQueue);
doThrow(new CacheClosedException()).when(spy).removeDispatchedEvents(id1);
assertThat(spy.removeDispatchedEventAfterSyncWithPrimary(id1)).isFalse();
verify(spy).removeDispatchedEvents(id1);
}
@Test
public void removeDispatchedEventsReturnsFalseIfNotGettingEventsFromPrimary() {
HARegionQueue spy = spy(haRegionQueue);
doReturn(null).when(spy).getDispatchedEventsFromPrimary(primary, internalCache, eventIDs);
assertThat(spy.removeDispatchedEvents(primary, internalCache, eventIDs)).isFalse();
}
@Test
public void removeDispatchedEventsReturnsTrueIfRemovedDispatchedEvents() {
HARegionQueue spy = spy(haRegionQueue);
List<EventID> dispatched = new LinkedList<>();
dispatched.add(id1);
dispatched.add(id3);
doReturn(dispatched).when(spy).getDispatchedEventsFromPrimary(primary, internalCache, eventIDs);
doReturn(true).when(spy).removeDispatchedEventAfterSyncWithPrimary(id1);
doReturn(true).when(spy).removeDispatchedEventAfterSyncWithPrimary(id3);
assertThat(spy.removeDispatchedEvents(primary, internalCache, eventIDs)).isTrue();
}
@Test
public void removeDispatchedEventsReturnsFalseIfFailedToRemoveDispatchedEvents() {
HARegionQueue spy = spy(haRegionQueue);
List<EventID> dispatched = new LinkedList<>();
dispatched.add(id1);
dispatched.add(id3);
doReturn(dispatched).when(spy).getDispatchedEventsFromPrimary(primary, internalCache, eventIDs);
doReturn(true).when(spy).removeDispatchedEventAfterSyncWithPrimary(id1);
doReturn(false).when(spy).removeDispatchedEventAfterSyncWithPrimary(id3);
assertThat(spy.removeDispatchedEvents(primary, internalCache, eventIDs)).isFalse();
}
@Test
public void getDelayReturnsTimeToWait() {
HARegionQueue spy = spy(haRegionQueue);
long waitTime = 15000L;
long currentTime = 5000L;
long doneGIIQueueingTime = 0L;
spy.doneGIIQueueingTime = doneGIIQueueingTime;
long elapsed = currentTime - doneGIIQueueingTime;
doReturn(currentTime).when(spy).getCurrentTime();
assertThat(spy.getDelay()).isEqualTo(waitTime - elapsed);
}
@Test
public void getDelayReturnsZeroIfExceedWaitTime() {
HARegionQueue spy = spy(haRegionQueue);
long waitTime = 15000L;
long doneGIIQueueingTime = 0L;
long currentTime = waitTime + doneGIIQueueingTime + 1;
spy.doneGIIQueueingTime = doneGIIQueueingTime;
doReturn(currentTime).when(spy).getCurrentTime();
assertThat(spy.getDelay()).isEqualTo(0);
}
}