/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.geode.internal.cache.tier.sockets;

import java.util.Queue;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.logging.log4j.Logger;

import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.FilterRoutingInfo;
import org.apache.geode.internal.cache.InternalCacheEvent;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.concurrent.ConcurrentHashSet;
import org.apache.geode.internal.logging.LogService;

/**
 * This class is responsible for queueing events for clients while they are registering. Once the
 * client is completely registered (filter info retrieved and GII complete), we will drain the
 * client's queued events and deliver them to the cache client proxy if necessary.
 */
public class ClientRegistrationEventQueueManager {
  private static final Logger logger = LogService.getLogger();
  private final Set<ClientRegistrationEventQueue> registeringProxyEventQueues =
      new ConcurrentHashSet<>();

  void add(final InternalCacheEvent event,
      final Conflatable conflatable,
      final Set<ClientProxyMembershipID> originalFilterClientIDs,
      final CacheClientNotifier cacheClientNotifier) {
    if (registeringProxyEventQueues.isEmpty())
      return;

    ClientRegistrationEvent clientRegistrationEvent =
        new ClientRegistrationEvent(event, conflatable);

    for (final ClientRegistrationEventQueue registrationEventQueue : registeringProxyEventQueues) {
      registrationEventQueue.lockForPutting();
      try {
        // If this is an HAEventWrapper we need to increment the PutInProgress counter so
        // that the contents of the contents of the HAEventWrapper are preserved when the
        // event is drained and processed. See incrementPutInProgressCounter() and
        // decrementPutInProgressCounter() for more details.
        if (conflatable instanceof HAEventWrapper) {
          ((HAEventWrapper) conflatable).incrementPutInProgressCounter("client registration");
        }

        ClientProxyMembershipID clientProxyMembershipID =
            registrationEventQueue.getClientProxyMembershipID();

        // After taking out the lock, we need to determine if the client is still actually
        // registering since there is a small race where it may have finished registering
        // after we pulled the queue out of the registeringProxyEventQueues collection
        if (registeringProxyEventQueues.contains(registrationEventQueue)) {
          // If the event value is off-heap, copy it to heap so we are guaranteed the value
          // is available when we drain the registration queue
          copyOffHeapToHeapForRegistrationQueue(event);

          registrationEventQueue.add(clientRegistrationEvent);

          // Because this event will be processed and sent when it is drained out of the temporary
          // client registration queue, we do not need to send it to the client at this time.
          // We can prevent the event from being sent to the registering client at this time
          // by removing its client proxy membership ID from the filter clients collection,
          // if it exists.
          originalFilterClientIDs.remove(clientProxyMembershipID);
        } else {
          // The client is no longer registering, but we want to reprocess the filter info on
          // this event and potentially deliver the conflatable to handle the edge case where
          // the original client filter IDs generated in the "normal" put processing path did
          // not include the registering client because the filter info was not yet available.
          CacheClientProxy cacheClientProxy =
              cacheClientNotifier.getClientProxy(clientProxyMembershipID);

          processEventAndDeliverConflatable(cacheClientProxy, cacheClientNotifier, event,
              conflatable, originalFilterClientIDs);
        }
      } finally {
        registrationEventQueue.unlockForPutting();
      }
    }
  }

  void drain(final ClientRegistrationEventQueue clientRegistrationEventQueue,
      final CacheClientNotifier cacheClientNotifier) {
    try {
      ClientProxyMembershipID clientProxyMembershipID =
          clientRegistrationEventQueue.getClientProxyMembershipID();

      if (logger.isDebugEnabled()) {
        logger.debug("Draining events from registration queue for client proxy "
            + clientProxyMembershipID
            + " without synchronization");
      }

      CacheClientProxy cacheClientProxy = cacheClientNotifier
          .getClientProxy(clientProxyMembershipID);

      drainEventsReceivedWhileRegisteringClient(
          cacheClientProxy,
          clientRegistrationEventQueue,
          cacheClientNotifier);

      // Prevents additional events from being added to the queue while we process and remove it
      clientRegistrationEventQueue.lockForDraining();

      if (logger.isDebugEnabled()) {
        logger.debug("Draining remaining events from registration queue for client proxy "
            + clientProxyMembershipID
            + " with synchronization");
      }

      drainEventsReceivedWhileRegisteringClient(
          cacheClientProxy,
          clientRegistrationEventQueue,
          cacheClientNotifier);
    } finally {
      // The queue must be removed before attempting to release the drain lock
      // so that no additional events can be added from add threads.
      registeringProxyEventQueues.remove(clientRegistrationEventQueue);

      if (clientRegistrationEventQueue.isLockForDrainingHeld()) {
        clientRegistrationEventQueue.unlockForDraining();
      }
    }
  }

  ClientRegistrationEventQueue create(
      final ClientProxyMembershipID clientProxyMembershipID,
      final Queue<ClientRegistrationEvent> eventQueue,
      final ReentrantReadWriteLock eventAddDrainLock) {
    final ClientRegistrationEventQueue clientRegistrationEventQueue =
        new ClientRegistrationEventQueue(clientProxyMembershipID, eventQueue,
            eventAddDrainLock);
    registeringProxyEventQueues.add(clientRegistrationEventQueue);
    return clientRegistrationEventQueue;
  }

  private void processEventAndDeliverConflatable(final CacheClientProxy cacheClientProxy,
      final CacheClientNotifier cacheClientNotifier,
      final InternalCacheEvent internalCacheEvent,
      final Conflatable conflatable,
      final Set<ClientProxyMembershipID> originalFilterClientIDs) {
    try {
      // If the cache client proxy is null, the registration was not successful and the proxy
      // was never added to the initialized proxy collection managed by the cache client notifier.
      // If that is the case, we can just decrement the put in progress counter on the conflatable
      // if it is an HAEventWrapper.
      if (cacheClientProxy != null) {
        // The first step is to repopulate the filter info for the event to determine if
        // the client which was registering has a matching CQ or has registered interest
        // in the key for this event. We need to get the filter profile, filter routing info,
        // and local filter info in order to do so. If any of these are null, then there is
        // no need to proceed as the client is not interested.
        FilterProfile filterProfile =
            ((LocalRegion) internalCacheEvent.getRegion()).getFilterProfile();

        if (filterProfile != null) {
          FilterRoutingInfo filterRoutingInfo =
              filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent);

          if (filterRoutingInfo != null) {
            FilterRoutingInfo.FilterInfo filterInfo = filterRoutingInfo.getLocalFilterInfo();

            if (filterInfo != null) {
              ClientUpdateMessageImpl clientUpdateMessage = conflatable instanceof HAEventWrapper
                  ? (ClientUpdateMessageImpl) ((HAEventWrapper) conflatable)
                      .getClientUpdateMessage()
                  : (ClientUpdateMessageImpl) conflatable;

              internalCacheEvent.setLocalFilterInfo(filterInfo);

              Set<ClientProxyMembershipID> newFilterClientIDs =
                  cacheClientNotifier.getFilterClientIDs(internalCacheEvent, filterProfile,
                      filterInfo,
                      clientUpdateMessage);

              ClientProxyMembershipID proxyID = cacheClientProxy.getProxyID();
              if (eventNotInOriginalFilterClientIDs(proxyID, newFilterClientIDs,
                  originalFilterClientIDs) && newFilterClientIDs.contains(proxyID)) {
                cacheClientProxy.deliverMessage(conflatable);
              }
            }
          }
        }
      }
    } finally {
      // Once we have processed the conflatable, if it is an HAEventWrapper we can
      // decrement the PutInProgress counter, allowing the ClientUpdateMessage to be
      // set to null. See decrementPutInProgressCounter() for more details.
      if (conflatable instanceof HAEventWrapper) {
        ((HAEventWrapper) conflatable).decrementPutInProgressCounter();
      }
    }
  }

  /*
   * This is to handle the edge case where the original filter client IDs
   * calculated by "normal" put processing did not include the registering client
   * because the filter info had not been received yet, but we now found that the client
   * is interested in the event so we should deliver it.
   */
  private boolean eventNotInOriginalFilterClientIDs(final ClientProxyMembershipID proxyID,
      final Set<ClientProxyMembershipID> newFilterClientIDs,
      final Set<ClientProxyMembershipID> originalFilterClientIDs) {
    return originalFilterClientIDs == null
        || (!originalFilterClientIDs.contains(proxyID) && newFilterClientIDs.contains(proxyID));
  }

  /**
   * For simplicity, we will copy off-heap registration queue values to heap to avoid
   * complicated off-heap reference counting. Since the registration queue is only a
   * temporary construct during client registration, the overhead should not be significant.
   *
   * @param event The InternalCacheEvent whose value will be copied to the heap if need be
   */
  private void copyOffHeapToHeapForRegistrationQueue(final InternalCacheEvent event) {
    if (event.getOperation().isEntry()) {
      EntryEventImpl entryEvent = ((EntryEventImpl) event);
      entryEvent.copyOffHeapToHeap();
    }
  }

  private void drainEventsReceivedWhileRegisteringClient(
      final CacheClientProxy cacheClientProxy,
      final ClientRegistrationEventQueue registrationEventQueue,
      final CacheClientNotifier cacheClientNotifier) {
    ClientRegistrationEvent queuedEvent;
    while ((queuedEvent = registrationEventQueue.poll()) != null) {
      InternalCacheEvent internalCacheEvent = queuedEvent.internalCacheEvent;
      Conflatable conflatable = queuedEvent.conflatable;
      processEventAndDeliverConflatable(cacheClientProxy,
          cacheClientNotifier, internalCacheEvent, conflatable, null);
    }
  }

  /**
   * Represents a conflatable and event processed while a client was registering.
   * This needs to be queued and processed after registration is complete. The conflatable
   * is what we will actually be delivering to the MessageDispatcher (and thereby adding
   * to the HARegionQueue). The internal cache event is required to rehydrate the filter
   * info and determine if the client which was registering does have a CQ that matches or
   * has registered interest in the key.
   */
  private class ClientRegistrationEvent {
    private final Conflatable conflatable;
    private final InternalCacheEvent internalCacheEvent;

    ClientRegistrationEvent(final InternalCacheEvent internalCacheEvent,
        final Conflatable conflatable) {
      this.conflatable = conflatable;
      this.internalCacheEvent = internalCacheEvent;
    }
  }

  class ClientRegistrationEventQueue {
    private final ClientProxyMembershipID clientProxyMembershipID;
    private final Queue<ClientRegistrationEvent> eventQueue;
    private final ReentrantReadWriteLock eventAddDrainLock;

    ClientRegistrationEventQueue(
        final ClientProxyMembershipID clientProxyMembershipID,
        final Queue<ClientRegistrationEvent> eventQueue,
        final ReentrantReadWriteLock eventAddDrainLock) {
      this.clientProxyMembershipID = clientProxyMembershipID;
      this.eventQueue = eventQueue;
      this.eventAddDrainLock = eventAddDrainLock;
    }

    public ClientProxyMembershipID getClientProxyMembershipID() {
      return clientProxyMembershipID;
    }

    boolean isEmpty() {
      return eventQueue.isEmpty();
    }

    private void add(final ClientRegistrationEvent clientRegistrationEvent) {
      eventQueue.add(clientRegistrationEvent);
    }

    public ClientRegistrationEvent poll() {
      return eventQueue.poll();
    }

    private void lockForDraining() {
      eventAddDrainLock.writeLock().lock();
    }

    private void unlockForDraining() {
      eventAddDrainLock.writeLock().unlock();
    }

    private boolean isLockForDrainingHeld() {
      return eventAddDrainLock.writeLock().isHeldByCurrentThread();
    }

    private void lockForPutting() {
      eventAddDrainLock.readLock().lock();
    }

    private void unlockForPutting() {
      eventAddDrainLock.readLock().unlock();
    }
  }
}
