/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.geode.internal.cache.tier.sockets;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.junit.Test;

import org.apache.geode.cache.Operation;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.FilterRoutingInfo;
import org.apache.geode.internal.cache.InternalCacheEvent;
import org.apache.geode.internal.cache.LocalRegion;

public class ClientRegistrationEventQueueManagerTest {
  @Test
  public void messageDeliveredAfterRegisteringOnDrainIfNewFilterIDsIncludesClient() {
    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
        new ClientRegistrationEventQueueManager();

    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);

    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
            new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());

    InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
    LocalRegion localRegion = mock(LocalRegion.class);
    FilterProfile filterProfile = mock(FilterProfile.class);
    FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);

    when(filterRoutingInfo.getLocalFilterInfo()).thenReturn(
        filterInfo);
    when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent, true))
        .thenReturn(filterRoutingInfo);
    when(localRegion.getFilterProfile()).thenReturn(filterProfile);
    when(internalCacheEvent.getRegion()).thenReturn(localRegion);
    when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));

    ClientUpdateMessageImpl clientUpdateMessage = mock(ClientUpdateMessageImpl.class);

    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
    Set<ClientProxyMembershipID> recalculatedFilterClientIDs = new HashSet<>();
    recalculatedFilterClientIDs.add(clientProxyMembershipID);
    when(cacheClientNotifier.getFilterClientIDs(internalCacheEvent, filterProfile, filterInfo,
        clientUpdateMessage))
            .thenReturn(recalculatedFilterClientIDs);
    CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
    when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
    when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);

    // Create empty filter client IDs produced by the "normal" put processing path, so we can test
    // that the event is still delivered if the client finished registering and needs the event.
    Set<ClientProxyMembershipID> normalPutFilterClientIDs = new HashSet<>();

    clientRegistrationEventQueueManager
        .add(internalCacheEvent, clientUpdateMessage, normalPutFilterClientIDs,
            cacheClientNotifier);

    clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);

    // The client update message should still be delivered because it is now part of the
    // filter clients interested in this event, despite having not been included in the original
    // filter info in the "normal" put processing path.
    verify(cacheClientProxy, times(1)).deliverMessage(clientUpdateMessage);
  }

  @Test
  public void clientRemovedFromFilterClientsListIfEventAddedToRegistrationQueue() {
    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
        new ClientRegistrationEventQueueManager();

    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);

    clientRegistrationEventQueueManager.create(clientProxyMembershipID,
        new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());

    InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
    when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
    when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));

    Conflatable conflatable = mock(Conflatable.class);

    // Add the registering client to the filter clients. This can happen if the filter info is
    // received but the client is not completely registered yet (queue GII has not been completed).
    // In that case, we want to remove the client from the filter IDs set and add the event
    // to the client's registration queue.
    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
    filterClientIDs.add(clientProxyMembershipID);

    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);

    clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
        cacheClientNotifier);

    // The client should no longer be in the filter clients since the event was queued in the
    // client's registration queue.
    assertThat(filterClientIDs.isEmpty()).isTrue();
  }

  @Test
  public void putInProgressCounterIncrementedOnAddAndDecrementedOnRemoveForAllEvents() {
    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
        new ClientRegistrationEventQueueManager();

    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);

    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
            new ConcurrentLinkedQueue<>(),
            new ReentrantReadWriteLock());

    List<HAEventWrapper> haEventWrappers = new ArrayList<>();
    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);

    for (int i = 0; i < 5; ++i) {
      HAEventWrapper haEventWrapper = mock(HAEventWrapper.class);
      haEventWrappers.add(haEventWrapper);
      InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
      when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
      when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
      clientRegistrationEventQueueManager.add(internalCacheEvent,
          haEventWrapper, new HashSet<>(), cacheClientNotifier);
      verify(haEventWrapper, times(1)).incrementPutInProgressCounter(anyString());
    }

    clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);

    for (HAEventWrapper haEventWrapper : haEventWrappers) {
      verify(haEventWrapper, times(1)).decrementPutInProgressCounter();
    }
  }

  @Test
  public void addAndDrainQueueContentionTest() throws ExecutionException, InterruptedException {
    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
    ReentrantReadWriteLock mockPutDrainLock = mock(ReentrantReadWriteLock.class);
    ReentrantReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();

    when(mockPutDrainLock.readLock())
        .thenReturn(actualPutDrainLock.readLock());

    when(mockPutDrainLock.writeLock())
        .thenAnswer(i -> {
          // Force a context switch from drain to put thread so we can ensure the event is not lost
          Thread.sleep(1);
          return actualPutDrainLock.writeLock();
        });

    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
        new ClientRegistrationEventQueueManager();

    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
            new ConcurrentLinkedQueue<>(), mockPutDrainLock);

    InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
    when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
    when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));

    Conflatable conflatable = mock(Conflatable.class);
    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
    CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
    when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);

    CompletableFuture<Void> addEventsToQueueTask = CompletableFuture.runAsync(() -> {
      for (int numAdds = 0; numAdds < 100000; ++numAdds) {
        // In thread one, we add events to the queue
        clientRegistrationEventQueueManager
            .add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier);
      }
    });

    CompletableFuture<Void> drainEventsFromQueueTask = CompletableFuture.runAsync(() -> {
      // In thread two, we drain events from the queue
      clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
    });

    CompletableFuture.allOf(addEventsToQueueTask, drainEventsFromQueueTask).get();

    assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
  }

  @Test
  public void addEventWithOffheapValueCopiedToHeap() {
    EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
    when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
    Operation mockOperation = mock(Operation.class);
    when(mockOperation.isEntry()).thenReturn(true);
    when(internalCacheEvent.getOperation()).thenReturn(mockOperation);

    Conflatable conflatable = mock(Conflatable.class);
    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);

    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
        new ClientRegistrationEventQueueManager();

    clientRegistrationEventQueueManager.create(clientProxyMembershipID,
        new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());

    clientRegistrationEventQueueManager
        .add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier);

    verify(internalCacheEvent, times(1)).copyOffHeapToHeap();
  }

  @Test
  public void clientWasNeverRegisteredDrainQueueStillRemoved() {
    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
        new ClientRegistrationEventQueueManager();

    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);

    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
            new ConcurrentLinkedQueue<>(),
            new ReentrantReadWriteLock());

    clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);

    EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
    Conflatable conflatable = mock(Conflatable.class);
    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();

    // Pass a new event to the ClientRegistrationEventQueueManager. This event should not be added
    // to the test client's registration queue, because it should already be removed. We can
    // validate that by asserting that the client's registration queue is empty after the add.
    clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
        cacheClientNotifier);

    assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
  }

  @Test
  public void drainThrowsExceptionQueueStillRemoved() {
    CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
    when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);

    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
        new ClientRegistrationEventQueueManager();

    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
            new ConcurrentLinkedQueue<>(),
            new ReentrantReadWriteLock());

    Conflatable conflatable = mock(Conflatable.class);
    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();

    EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
    RuntimeException testException = new RuntimeException();
    when(internalCacheEvent.getRegion()).thenThrow(testException);
    Operation mockOperation = mock(Operation.class);
    when(mockOperation.isEntry()).thenReturn(true);
    when(internalCacheEvent.getOperation()).thenReturn(mockOperation);

    clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
        cacheClientNotifier);

    assertThatThrownBy(() -> clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue,
        cacheClientNotifier))
            .isEqualTo(testException);

    // Pass a new event to the ClientRegistrationEventQueueManager. This event should not be added
    // to the test client's registration queue, because it should already be removed. We can
    // validate that by asserting that the client's registration queue is empty after the add.
    clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
        cacheClientNotifier);

    assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
  }

  @Test
  public void addEventInOriginalFilterIDsButQueueWasRemovedDueToSuccessfulRegistrationSoEventNotRedelivered() {
    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
    CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
    when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
    Set<ClientProxyMembershipID> originalFilterIDs = new HashSet<>();
    originalFilterIDs.add(clientProxyMembershipID);

    ClientUpdateMessageImpl clientUpdateMessage = mock(ClientUpdateMessageImpl.class);

    InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
    LocalRegion localRegion = mock(LocalRegion.class);
    FilterProfile filterProfile = mock(FilterProfile.class);
    FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);

    when(filterRoutingInfo.getLocalFilterInfo()).thenReturn(
        filterInfo);
    when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent))
        .thenReturn(filterRoutingInfo);
    when(localRegion.getFilterProfile()).thenReturn(filterProfile);
    when(internalCacheEvent.getRegion()).thenReturn(localRegion);
    when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));

    Set<ClientProxyMembershipID> recalculatedFilterClientIDs = new HashSet<>();
    recalculatedFilterClientIDs.add(clientProxyMembershipID);
    when(cacheClientNotifier.getFilterClientIDs(internalCacheEvent, filterProfile, filterInfo,
        clientUpdateMessage))
            .thenReturn(recalculatedFilterClientIDs);
    when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
    when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
    ReentrantReadWriteLock mockReadWriteLock = mock(ReentrantReadWriteLock.class);

    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
        new ClientRegistrationEventQueueManager();

    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
            new ConcurrentLinkedQueue<>(),
            mockReadWriteLock);

    ReentrantReadWriteLock.ReadLock mockReadLock = mock(ReentrantReadWriteLock.ReadLock.class);
    when(mockReadWriteLock.readLock()).thenReturn(mockReadLock);
    ReentrantReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
    when(mockReadWriteLock.writeLock()).thenReturn(actualPutDrainLock.writeLock());
    doAnswer(i -> {
      clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
      actualPutDrainLock.readLock();
      return null;
    }).when(mockReadLock).lock();

    clientRegistrationEventQueueManager.add(internalCacheEvent, clientUpdateMessage,
        originalFilterIDs, cacheClientNotifier);

    verify(cacheClientProxy, times(0)).deliverMessage(clientUpdateMessage);
  }
}
