/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.asyncqueue.internal;

import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId;

import org.apache.logging.log4j.Logger;

import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.internal.cache.xmlcache.AsyncEventQueueCreation;
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
import org.apache.geode.internal.cache.xmlcache.ParallelAsyncEventQueueCreation;
import org.apache.geode.internal.cache.xmlcache.SerialAsyncEventQueueCreation;
import org.apache.geode.internal.logging.LogService;

public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {

  private static final Logger logger = LogService.getLogger();

  /**
   * The default batchTimeInterval for AsyncEventQueue in milliseconds.
   */
  public static final int DEFAULT_BATCH_TIME_INTERVAL = 5;

  private final InternalCache cache;

  private boolean pauseEventsDispatching = false;

  /**
   * Used internally to pass the attributes from this factory to the real GatewaySender it is
   * creating.
   */
  private final GatewaySenderAttributes gatewaySenderAttributes;

  public AsyncEventQueueFactoryImpl(InternalCache cache) {
    this(cache, new GatewaySenderAttributes(), DEFAULT_BATCH_TIME_INTERVAL);
  }

  AsyncEventQueueFactoryImpl(InternalCache cache, GatewaySenderAttributes gatewaySenderAttributes,
      int batchTimeInterval) {
    this.cache = cache;
    this.gatewaySenderAttributes = gatewaySenderAttributes;
    // set a different default for batchTimeInterval for AsyncEventQueue
    this.gatewaySenderAttributes.batchTimeInterval = batchTimeInterval;
  }

  @Override
  public AsyncEventQueueFactory setBatchSize(int size) {
    gatewaySenderAttributes.batchSize = size;
    return this;
  }

  @Override
  public AsyncEventQueueFactory setPersistent(boolean isPersistent) {
    gatewaySenderAttributes.isPersistenceEnabled = isPersistent;
    return this;
  }

  @Override
  public AsyncEventQueueFactory setDiskStoreName(String name) {
    gatewaySenderAttributes.diskStoreName = name;
    return this;
  }

  @Override
  public AsyncEventQueueFactory setMaximumQueueMemory(int memory) {
    gatewaySenderAttributes.maximumQueueMemory = memory;
    return this;
  }

  @Override
  public AsyncEventQueueFactory setDiskSynchronous(boolean isSynchronous) {
    gatewaySenderAttributes.isDiskSynchronous = isSynchronous;
    return this;
  }

  @Override
  public AsyncEventQueueFactory setBatchTimeInterval(int batchTimeInterval) {
    gatewaySenderAttributes.batchTimeInterval = batchTimeInterval;
    return this;
  }

  @Override
  public AsyncEventQueueFactory setBatchConflationEnabled(boolean isConflation) {
    gatewaySenderAttributes.isBatchConflationEnabled = isConflation;
    return this;
  }

  @Override
  public AsyncEventQueueFactory setDispatcherThreads(int numThreads) {
    gatewaySenderAttributes.dispatcherThreads = numThreads;
    return this;
  }

  @Override
  public AsyncEventQueueFactory setOrderPolicy(OrderPolicy policy) {
    gatewaySenderAttributes.policy = policy;
    return this;
  }

  @Override
  public AsyncEventQueueFactory addGatewayEventFilter(GatewayEventFilter filter) {
    gatewaySenderAttributes.addGatewayEventFilter(filter);
    return this;
  }

  @Override
  public AsyncEventQueueFactory removeGatewayEventFilter(GatewayEventFilter filter) {
    gatewaySenderAttributes.eventFilters.remove(filter);
    return this;
  }

  @Override
  public AsyncEventQueueFactory setGatewayEventSubstitutionListener(
      GatewayEventSubstitutionFilter filter) {
    gatewaySenderAttributes.eventSubstitutionFilter = filter;
    return this;
  }

  public AsyncEventQueueFactory removeGatewayEventAlternateValueProvider(
      GatewayEventSubstitutionFilter provider) {
    return this;
  }

  public AsyncEventQueueFactory addAsyncEventListener(AsyncEventListener listener) {
    gatewaySenderAttributes.addAsyncEventListener(listener);
    return this;
  }

  @Override
  public AsyncEventQueue create(String asyncQueueId, AsyncEventListener listener) {
    if (listener == null) {
      throw new IllegalArgumentException(
          "AsyncEventListener cannot be null");
    }

    AsyncEventQueue asyncEventQueue;

    if (cache instanceof CacheCreation) {
      asyncEventQueue =
          new AsyncEventQueueCreation(asyncQueueId, gatewaySenderAttributes, listener);
      if (pauseEventsDispatching) {
        ((AsyncEventQueueCreation) asyncEventQueue).setPauseEventDispatching(true);
      }
      ((CacheCreation) cache).addAsyncEventQueue(asyncEventQueue);
    } else {
      if (logger.isDebugEnabled()) {
        logger.debug("Creating GatewaySender that underlies the AsyncEventQueue");
      }

      addAsyncEventListener(listener);
      InternalGatewaySender sender =
          (InternalGatewaySender) create(getSenderIdFromAsyncEventQueueId(asyncQueueId));
      AsyncEventQueueImpl asyncEventQueueImpl = new AsyncEventQueueImpl(sender, listener);
      asyncEventQueue = asyncEventQueueImpl;
      cache.addAsyncEventQueue(asyncEventQueueImpl);
      if (pauseEventsDispatching) {
        sender.setStartEventProcessorInPausedState();
      }
      if (!gatewaySenderAttributes.isManualStart()) {
        sender.start();
      }
    }

    if (logger.isDebugEnabled()) {
      logger.debug("Returning AsyncEventQueue" + asyncEventQueue);
    }
    return asyncEventQueue;
  }

  private GatewaySender create(String id) {
    gatewaySenderAttributes.id = id;

    if (gatewaySenderAttributes.getDispatcherThreads() <= 0) {
      throw new AsyncEventQueueConfigurationException(
          String.format("AsyncEventQueue %s can not be created with dispatcher threads less than 1",
              id));
    }

    GatewaySender sender;
    if (gatewaySenderAttributes.isParallel()) {
      if (gatewaySenderAttributes.getOrderPolicy() != null
          && gatewaySenderAttributes.getOrderPolicy().equals(OrderPolicy.THREAD)) {
        throw new AsyncEventQueueConfigurationException(
            String.format(
                "AsyncEventQueue %s can not be created with OrderPolicy %s when it is set parallel",
                id, gatewaySenderAttributes.getOrderPolicy()));
      }

      if (cache instanceof CacheCreation) {
        sender = new ParallelAsyncEventQueueCreation(cache, gatewaySenderAttributes);
      } else {
        sender = new ParallelAsyncEventQueueImpl(cache, gatewaySenderAttributes);
      }
      cache.addGatewaySender(sender);

    } else {
      if (gatewaySenderAttributes.getOrderPolicy() == null
          && gatewaySenderAttributes.getDispatcherThreads() > 1) {
        gatewaySenderAttributes.policy = GatewaySender.DEFAULT_ORDER_POLICY;
      }

      if (cache instanceof CacheCreation) {
        sender = new SerialAsyncEventQueueCreation(cache, gatewaySenderAttributes);
      } else {
        sender = new SerialAsyncEventQueueImpl(cache, gatewaySenderAttributes);
      }
      cache.addGatewaySender(sender);
    }
    return sender;
  }

  public void configureAsyncEventQueue(AsyncEventQueue asyncQueueCreation) {
    gatewaySenderAttributes.batchSize = asyncQueueCreation.getBatchSize();
    gatewaySenderAttributes.batchTimeInterval = asyncQueueCreation.getBatchTimeInterval();
    gatewaySenderAttributes.isBatchConflationEnabled =
        asyncQueueCreation.isBatchConflationEnabled();
    gatewaySenderAttributes.isPersistenceEnabled = asyncQueueCreation.isPersistent();
    gatewaySenderAttributes.diskStoreName = asyncQueueCreation.getDiskStoreName();
    gatewaySenderAttributes.isDiskSynchronous = asyncQueueCreation.isDiskSynchronous();
    gatewaySenderAttributes.maximumQueueMemory = asyncQueueCreation.getMaximumQueueMemory();
    gatewaySenderAttributes.isParallel = asyncQueueCreation.isParallel();
    gatewaySenderAttributes.isBucketSorted =
        ((AsyncEventQueueCreation) asyncQueueCreation).isBucketSorted();
    gatewaySenderAttributes.dispatcherThreads = asyncQueueCreation.getDispatcherThreads();
    gatewaySenderAttributes.policy = asyncQueueCreation.getOrderPolicy();
    gatewaySenderAttributes.eventFilters = asyncQueueCreation.getGatewayEventFilters();
    gatewaySenderAttributes.eventSubstitutionFilter =
        asyncQueueCreation.getGatewayEventSubstitutionFilter();
    gatewaySenderAttributes.isForInternalUse = true;
    gatewaySenderAttributes.forwardExpirationDestroy =
        asyncQueueCreation.isForwardExpirationDestroy();
  }

  @Override
  public AsyncEventQueueFactory setParallel(boolean isParallel) {
    gatewaySenderAttributes.isParallel = isParallel;
    return this;
  }

  public AsyncEventQueueFactory setBucketSorted(boolean isbucketSorted) {
    gatewaySenderAttributes.isBucketSorted = isbucketSorted;
    return this;
  }

  public AsyncEventQueueFactory setIsMetaQueue(boolean isMetaQueue) {
    gatewaySenderAttributes.isMetaQueue = isMetaQueue;
    return this;
  }

  @Override
  public AsyncEventQueueFactory setForwardExpirationDestroy(boolean forward) {
    gatewaySenderAttributes.forwardExpirationDestroy = forward;
    return this;
  }

  @Override
  public AsyncEventQueueFactory pauseEventDispatching() {
    pauseEventsDispatching = true;
    return this;
  }

  @VisibleForTesting
  protected boolean isPauseEventsDispatching() {
    return pauseEventsDispatching;
  }
}
