blob: d57ba5f999611df567908b957ce381b363eea8e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.distributed.internal.DistributionAdvisor;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.KeyInfo;
import org.apache.geode.internal.cache.RegionQueue;
public class AbstractGatewaySenderTest {
@Test
public void getSynchronizationEventCanHandleRegionIsNullCase() {
AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
Object key = new Object();
long timestamp = 1;
GatewaySenderEventImpl gatewaySenderEvent = mock(GatewaySenderEventImpl.class);
when(gatewaySenderEvent.getKey()).thenReturn(key);
when(gatewaySenderEvent.getVersionTimeStamp()).thenReturn(timestamp);
Region region = mock(Region.class);
Collection collection = new ArrayList();
collection.add(gatewaySenderEvent);
when(region.values()).thenReturn(collection);
Set<RegionQueue> queues = new HashSet<>();
RegionQueue queue1 = mock(RegionQueue.class);
RegionQueue queue2 = mock(RegionQueue.class);
queues.add(queue2);
queues.add(queue1);
when(queue1.getRegion()).thenReturn(null);
when(queue2.getRegion()).thenReturn(region);
when(sender.getQueues()).thenReturn(queues);
when(sender.getStatistics()).thenReturn(mock(GatewaySenderStats.class));
when(sender.getSynchronizationEvent(key, timestamp)).thenCallRealMethod();
GatewayQueueEvent event = sender.getSynchronizationEvent(key, timestamp);
assertThat(event).isSameAs(gatewaySenderEvent);
}
@Test
public void distributeFinishesWorkWhenInterrupted() throws InterruptedException {
DummyGatewaySenderEventProcessor processor = new DummyGatewaySenderEventProcessor();
TestableGatewaySender gatewaySender = new TestableGatewaySender(processor);
EnumListenerEvent operationType = EnumListenerEvent.AFTER_CREATE;
EntryEventImpl event = mock(EntryEventImpl.class);
when(event.getKeyInfo()).thenReturn(mock(KeyInfo.class));
Operation operation = mock(Operation.class);
when(operation.isLocal()).thenReturn(false);
when(operation.isExpiration()).thenReturn(false);
when(event.getOperation()).thenReturn(operation);
InternalRegion region = mock(InternalRegion.class);
when(region.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
when(event.getRegion()).thenReturn(region);
List<Integer> allRemoteDSIds = Collections.singletonList(1);
CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
CountDownLatch unlockLatch = new CountDownLatch(1);
// Get lifeCycleLock in write mode in new thread so that
// the thread calling distribute will not be able
// to acquire it
Thread thread = new Thread(() -> {
gatewaySender.getLifeCycleLock().writeLock().lock();
lockAcquiredLatch.countDown();
try {
unlockLatch.await();
} catch (InterruptedException ignore) {
}
gatewaySender.getLifeCycleLock().writeLock().unlock();
});
thread.start();
lockAcquiredLatch.await();
// Send interrupted and then call distribute
Thread.currentThread().interrupt();
gatewaySender.distribute(operationType, event, allRemoteDSIds, true);
unlockLatch.countDown();
// Check that the interrupted exception has been reset
assertThat(Thread.currentThread().isInterrupted()).isTrue();
// Check that the work was finished even if the interrupt signal was set
assertThat(processor.getTimesRegisterEventDroppedInPrimaryQueueCalled()).isEqualTo(1);
}
public static class TestableGatewaySender extends AbstractGatewaySender {
private int isRunningTimesCalled = 0;
public TestableGatewaySender(AbstractGatewaySenderEventProcessor eventProcessor) {
this.eventProcessor = eventProcessor;
enqueuedAllTempQueueEvents = true;
}
@Override
public void fillInProfile(DistributionAdvisor.Profile profile) {}
@Override
public void start() {}
@Override
public boolean isPrimary() {
return true;
}
@Override
public void startWithCleanQueue() {}
@Override
public void stop() {}
@Override
public void setModifiedEventId(EntryEventImpl clonedEvent) {}
@Override
public GatewaySenderStats getStatistics() {
return mock(GatewaySenderStats.class);
}
@Override
public GatewaySenderAdvisor getSenderAdvisor() {
return mock(GatewaySenderAdvisor.class);
}
@Override
public boolean isRunning() {
if (isRunningTimesCalled++ == 0) {
return true;
}
return false;
}
@Override
public String getId() {
return "test";
}
}
public static class DummyGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor {
private int timesEnqueueEventCalled = 0;
private int timesRegisterEventDroppedInPrimaryQueueCalled = 0;
public DummyGatewaySenderEventProcessor() {
super("", new DummyGatewaySender(), null);
}
@Override
public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue,
boolean isLastEventInTransaction) throws IOException, CacheException {
timesEnqueueEventCalled++;
}
public int getTimesEnqueueEventCalled() {
return timesEnqueueEventCalled;
}
@Override
protected void initializeMessageQueue(String id, boolean cleanQueues) {}
@Override
protected void rebalance() {}
public int getTimesRegisterEventDroppedInPrimaryQueueCalled() {
return timesRegisterEventDroppedInPrimaryQueueCalled;
}
@Override
protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
timesRegisterEventDroppedInPrimaryQueueCalled++;
}
@Override
public void initializeEventDispatcher() {}
@Override
protected void enqueueEvent(GatewayQueueEvent event) {}
}
public static class DummyGatewaySender extends AbstractGatewaySender {
@Override
public void fillInProfile(DistributionAdvisor.Profile profile) {}
@Override
public void start() {}
@Override
public void startWithCleanQueue() {}
@Override
public void stop() {}
@Override
public void setModifiedEventId(EntryEventImpl clonedEvent) {}
}
}