blob: 7ef75fe54d73c249dbbe721fd5439b46582b21f2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.junit.Test;
import org.apache.geode.cache.Operation;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.FilterRoutingInfo;
import org.apache.geode.internal.cache.InternalCacheEvent;
import org.apache.geode.internal.cache.LocalRegion;
public class ClientRegistrationEventQueueManagerTest {
@Test
public void messageDeliveredAfterRegisteringOnDrainIfNewFilterIDsIncludesClient() {
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
LocalRegion localRegion = mock(LocalRegion.class);
FilterProfile filterProfile = mock(FilterProfile.class);
FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
when(filterRoutingInfo.getLocalFilterInfo()).thenReturn(
filterInfo);
when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent, true))
.thenReturn(filterRoutingInfo);
when(localRegion.getFilterProfile()).thenReturn(filterProfile);
when(internalCacheEvent.getRegion()).thenReturn(localRegion);
when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
ClientUpdateMessageImpl clientUpdateMessage = mock(ClientUpdateMessageImpl.class);
CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
Set<ClientProxyMembershipID> recalculatedFilterClientIDs = new HashSet<>();
recalculatedFilterClientIDs.add(clientProxyMembershipID);
when(cacheClientNotifier.getFilterClientIDs(internalCacheEvent, filterProfile, filterInfo,
clientUpdateMessage))
.thenReturn(recalculatedFilterClientIDs);
CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
// Create empty filter client IDs produced by the "normal" put processing path, so we can test
// that the event is still delivered if the client finished registering and needs the event.
Set<ClientProxyMembershipID> normalPutFilterClientIDs = new HashSet<>();
clientRegistrationEventQueueManager
.add(internalCacheEvent, clientUpdateMessage, normalPutFilterClientIDs,
cacheClientNotifier);
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
// The client update message should still be delivered because it is now part of the
// filter clients interested in this event, despite having not been included in the original
// filter info in the "normal" put processing path.
verify(cacheClientProxy, times(1)).deliverMessage(clientUpdateMessage);
}
@Test
public void clientRemovedFromFilterClientsListIfEventAddedToRegistrationQueue() {
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
Conflatable conflatable = mock(Conflatable.class);
// Add the registering client to the filter clients. This can happen if the filter info is
// received but the client is not completely registered yet (queue GII has not been completed).
// In that case, we want to remove the client from the filter IDs set and add the event
// to the client's registration queue.
Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
filterClientIDs.add(clientProxyMembershipID);
CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
cacheClientNotifier);
// The client should no longer be in the filter clients since the event was queued in the
// client's registration queue.
assertThat(filterClientIDs.isEmpty()).isTrue();
}
@Test
public void putInProgressCounterIncrementedOnAddAndDecrementedOnRemoveForAllEvents() {
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
new ConcurrentLinkedQueue<>(),
new ReentrantReadWriteLock());
List<HAEventWrapper> haEventWrappers = new ArrayList<>();
CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
for (int i = 0; i < 5; ++i) {
HAEventWrapper haEventWrapper = mock(HAEventWrapper.class);
haEventWrappers.add(haEventWrapper);
InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
clientRegistrationEventQueueManager.add(internalCacheEvent,
haEventWrapper, new HashSet<>(), cacheClientNotifier);
verify(haEventWrapper, times(1)).incrementPutInProgressCounter(anyString());
}
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
for (HAEventWrapper haEventWrapper : haEventWrappers) {
verify(haEventWrapper, times(1)).decrementPutInProgressCounter();
}
}
@Test
public void addAndDrainQueueContentionTest() throws ExecutionException, InterruptedException {
ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
ReentrantReadWriteLock mockPutDrainLock = mock(ReentrantReadWriteLock.class);
ReentrantReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
when(mockPutDrainLock.readLock())
.thenReturn(actualPutDrainLock.readLock());
when(mockPutDrainLock.writeLock())
.thenAnswer(i -> {
// Force a context switch from drain to put thread so we can ensure the event is not lost
Thread.sleep(1);
return actualPutDrainLock.writeLock();
});
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
new ConcurrentLinkedQueue<>(), mockPutDrainLock);
InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
Conflatable conflatable = mock(Conflatable.class);
Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
CompletableFuture<Void> addEventsToQueueTask = CompletableFuture.runAsync(() -> {
for (int numAdds = 0; numAdds < 100000; ++numAdds) {
// In thread one, we add events to the queue
clientRegistrationEventQueueManager
.add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier);
}
});
CompletableFuture<Void> drainEventsFromQueueTask = CompletableFuture.runAsync(() -> {
// In thread two, we drain events from the queue
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
});
CompletableFuture.allOf(addEventsToQueueTask, drainEventsFromQueueTask).get();
assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
}
@Test
public void addEventWithOffheapValueCopiedToHeap() {
EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
Operation mockOperation = mock(Operation.class);
when(mockOperation.isEntry()).thenReturn(true);
when(internalCacheEvent.getOperation()).thenReturn(mockOperation);
Conflatable conflatable = mock(Conflatable.class);
Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
clientRegistrationEventQueueManager
.add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier);
verify(internalCacheEvent, times(1)).copyOffHeapToHeap();
}
@Test
public void clientWasNeverRegisteredDrainQueueStillRemoved() {
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
new ConcurrentLinkedQueue<>(),
new ReentrantReadWriteLock());
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
Conflatable conflatable = mock(Conflatable.class);
Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
// Pass a new event to the ClientRegistrationEventQueueManager. This event should not be added
// to the test client's registration queue, because it should already be removed. We can
// validate that by asserting that the client's registration queue is empty after the add.
clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
cacheClientNotifier);
assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
}
@Test
public void drainThrowsExceptionQueueStillRemoved() {
CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
new ConcurrentLinkedQueue<>(),
new ReentrantReadWriteLock());
Conflatable conflatable = mock(Conflatable.class);
Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
RuntimeException testException = new RuntimeException();
when(internalCacheEvent.getRegion()).thenThrow(testException);
Operation mockOperation = mock(Operation.class);
when(mockOperation.isEntry()).thenReturn(true);
when(internalCacheEvent.getOperation()).thenReturn(mockOperation);
clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
cacheClientNotifier);
assertThatThrownBy(() -> clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue,
cacheClientNotifier))
.isEqualTo(testException);
// Pass a new event to the ClientRegistrationEventQueueManager. This event should not be added
// to the test client's registration queue, because it should already be removed. We can
// validate that by asserting that the client's registration queue is empty after the add.
clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
cacheClientNotifier);
assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
}
@Test
public void addEventInOriginalFilterIDsButQueueWasRemovedDueToSuccessfulRegistrationSoEventNotRedelivered() {
ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
Set<ClientProxyMembershipID> originalFilterIDs = new HashSet<>();
originalFilterIDs.add(clientProxyMembershipID);
ClientUpdateMessageImpl clientUpdateMessage = mock(ClientUpdateMessageImpl.class);
InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
LocalRegion localRegion = mock(LocalRegion.class);
FilterProfile filterProfile = mock(FilterProfile.class);
FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
when(filterRoutingInfo.getLocalFilterInfo()).thenReturn(
filterInfo);
when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent))
.thenReturn(filterRoutingInfo);
when(localRegion.getFilterProfile()).thenReturn(filterProfile);
when(internalCacheEvent.getRegion()).thenReturn(localRegion);
when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
Set<ClientProxyMembershipID> recalculatedFilterClientIDs = new HashSet<>();
recalculatedFilterClientIDs.add(clientProxyMembershipID);
when(cacheClientNotifier.getFilterClientIDs(internalCacheEvent, filterProfile, filterInfo,
clientUpdateMessage))
.thenReturn(recalculatedFilterClientIDs);
when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
ReentrantReadWriteLock mockReadWriteLock = mock(ReentrantReadWriteLock.class);
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
new ConcurrentLinkedQueue<>(),
mockReadWriteLock);
ReentrantReadWriteLock.ReadLock mockReadLock = mock(ReentrantReadWriteLock.ReadLock.class);
when(mockReadWriteLock.readLock()).thenReturn(mockReadLock);
ReentrantReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
when(mockReadWriteLock.writeLock()).thenReturn(actualPutDrainLock.writeLock());
doAnswer(i -> {
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
actualPutDrainLock.readLock();
return null;
}).when(mockReadLock).lock();
clientRegistrationEventQueueManager.add(internalCacheEvent, clientUpdateMessage,
originalFilterIDs, cacheClientNotifier);
verify(cacheClientProxy, times(0)).deliverMessage(clientUpdateMessage);
}
}