/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.wan;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.logging.log4j.Logger;

import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.client.internal.LocatorDiscoveryCallback;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderImpl;
import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderImpl;
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
import org.apache.geode.internal.cache.xmlcache.ParallelGatewaySenderCreation;
import org.apache.geode.internal.cache.xmlcache.SerialGatewaySenderCreation;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * @since GemFire 7.0
 */
public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory {

  private static final Logger logger = LogService.getLogger();

  private static final AtomicBoolean GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED =
      new AtomicBoolean(false);

  /**
   * Used internally to pass the attributes from this factory to the real GatewaySender it is
   * creating.
   */
  private final GatewaySenderAttributes attrs = new GatewaySenderAttributes();

  private final InternalCache cache;

  private final StatisticsClock statisticsClock;

  public GatewaySenderFactoryImpl(InternalCache cache, StatisticsClock statisticsClock) {
    this.cache = cache;
    this.statisticsClock = statisticsClock;
  }

  @Override
  public GatewaySenderFactory setParallel(boolean isParallel) {
    this.attrs.isParallel = isParallel;
    return this;
  }

  @Override
  public GatewaySenderFactory setForInternalUse(boolean isForInternalUse) {
    this.attrs.isForInternalUse = isForInternalUse;
    return this;
  }

  @Override
  public GatewaySenderFactory addGatewayEventFilter(GatewayEventFilter filter) {
    this.attrs.addGatewayEventFilter(filter);
    return this;
  }

  @Override
  public GatewaySenderFactory addGatewayTransportFilter(GatewayTransportFilter filter) {
    this.attrs.addGatewayTransportFilter(filter);
    return this;
  }

  @Override
  public GatewaySenderFactory addAsyncEventListener(AsyncEventListener listener) {
    this.attrs.addAsyncEventListener(listener);
    return this;
  }

  @Override
  public GatewaySenderFactory setSocketBufferSize(int socketBufferSize) {
    this.attrs.socketBufferSize = socketBufferSize;
    return this;
  }

  @Override
  public GatewaySenderFactory setSocketReadTimeout(int socketReadTimeout) {
    this.attrs.socketReadTimeout = socketReadTimeout;
    return this;
  }

  @Override
  public GatewaySenderFactory setDiskStoreName(String diskStoreName) {
    this.attrs.diskStoreName = diskStoreName;
    return this;
  }

  @Override
  public GatewaySenderFactory setMaximumQueueMemory(int maximumQueueMemory) {
    this.attrs.maximumQueueMemory = maximumQueueMemory;
    return this;
  }

  @Override
  public GatewaySenderFactory setBatchSize(int batchSize) {
    this.attrs.batchSize = batchSize;
    return this;
  }

  @Override
  public GatewaySenderFactory setBatchTimeInterval(int batchTimeInterval) {
    this.attrs.batchTimeInterval = batchTimeInterval;
    return this;
  }

  @Override
  public GatewaySenderFactory setBatchConflationEnabled(boolean enableBatchConflation) {
    this.attrs.isBatchConflationEnabled = enableBatchConflation;
    return this;
  }

  @Override
  public GatewaySenderFactory setPersistenceEnabled(boolean enablePersistence) {
    this.attrs.isPersistenceEnabled = enablePersistence;
    return this;
  }

  @Override
  public GatewaySenderFactory setAlertThreshold(int threshold) {
    this.attrs.alertThreshold = threshold;
    return this;
  }

  @Override
  public GatewaySenderFactory setManualStart(boolean start) {
    this.attrs.manualStart = start;
    return this;
  }

  @Override
  public GatewaySenderFactory setLocatorDiscoveryCallback(LocatorDiscoveryCallback locCallback) {
    this.attrs.locatorDiscoveryCallback = locCallback;
    return this;
  }

  @Override
  public GatewaySenderFactory setDiskSynchronous(boolean isSynchronous) {
    this.attrs.isDiskSynchronous = isSynchronous;
    return this;
  }

  @Override
  public GatewaySenderFactory setDispatcherThreads(int numThreads) {
    if ((numThreads > 1) && this.attrs.policy == null) {
      this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
    }
    this.attrs.dispatcherThreads = numThreads;
    return this;
  }

  @Override
  public GatewaySenderFactory setParallelFactorForReplicatedRegion(int parallel) {
    this.attrs.parallelism = parallel;
    this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
    return this;
  }

  @Override
  public GatewaySenderFactory setOrderPolicy(OrderPolicy policy) {
    this.attrs.policy = policy;
    return this;
  }

  @Override
  public GatewaySenderFactory setBucketSorted(boolean isBucketSorted) {
    this.attrs.isBucketSorted = isBucketSorted;
    return this;
  }

  @Override
  public GatewaySender create(String id, int remoteDSId) {
    int myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager()
        .getDistributedSystemId();
    if (remoteDSId == myDSId) {
      throw new GatewaySenderException(
          String.format(
              "GatewaySender %s cannot be created with remote DS Id equal to this DS Id. ",
              id));
    }
    if (remoteDSId < 0) {
      throw new GatewaySenderException(
          String.format("GatewaySender %s cannot be created with remote DS Id less than 0. ",
              id));
    }
    this.attrs.id = id;
    this.attrs.remoteDs = remoteDSId;
    GatewaySender sender = null;

    if (this.attrs.getDispatcherThreads() <= 0) {
      throw new GatewaySenderException(
          String.format("GatewaySender %s can not be created with dispatcher threads less than 1",
              id));
    }

    // Verify socket read timeout if a proper logger is available
    if (this.cache instanceof GemFireCacheImpl) {
      // If socket read timeout is less than the minimum, log a warning.
      // Ideally, this should throw a GatewaySenderException, but wan dunit tests
      // were failing, and we were running out of time to change them.
      if (this.attrs.getSocketReadTimeout() != 0
          && this.attrs.getSocketReadTimeout() < GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT) {
        logger.warn(
            "{} cannot configure socket read timeout of {} milliseconds because it is less than the minimum of {} milliseconds. The default will be used instead.",
            new Object[] {"GatewaySender " + id, this.attrs.getSocketReadTimeout(),
                GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT});
        this.attrs.socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT;
      }

      // Log a warning if the old system property is set.
      if (GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED.compareAndSet(false, true)) {
        if (System.getProperty(GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY) != null) {
          logger.warn(
              "Obsolete java system property named {} was set to control {}. This property is no longer supported. Please use the GemFire API instead.",
              new Object[] {GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY,
                  "GatewaySender socket read timeout"});
        }
      }
    }

    if (this.attrs.isParallel()) {
      if ((this.attrs.getOrderPolicy() != null)
          && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
        throw new GatewaySenderException(
            String.format("Parallel Gateway Sender %s can not be created with OrderPolicy %s",
                id, this.attrs.getOrderPolicy()));
      }
      if (this.cache instanceof GemFireCacheImpl) {
        sender = new ParallelGatewaySenderImpl(cache, statisticsClock, attrs);
        this.cache.addGatewaySender(sender);

        if (!this.attrs.isManualStart()) {
          sender.start();
        }
      } else if (this.cache instanceof CacheCreation) {
        sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
        this.cache.addGatewaySender(sender);
      }
    } else {
      if (this.attrs.getAsyncEventListeners().size() > 0) {
        throw new GatewaySenderException(
            String.format(
                "SerialGatewaySender %s cannot define a remote site because at least AsyncEventListener is already added. Both listeners and remote site cannot be defined for the same gateway sender.",
                id));
      }
      if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
        this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
      }
      if (this.cache instanceof GemFireCacheImpl) {
        sender = new SerialGatewaySenderImpl(cache, statisticsClock, attrs);
        this.cache.addGatewaySender(sender);

        if (!this.attrs.isManualStart()) {
          sender.start();
        }
      } else if (this.cache instanceof CacheCreation) {
        sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
        this.cache.addGatewaySender(sender);
      }
    }
    return sender;
  }

  @Override
  public GatewaySender create(String id) {
    this.attrs.id = id;
    GatewaySender sender = null;

    if (this.attrs.getDispatcherThreads() <= 0) {
      throw new AsyncEventQueueConfigurationException(
          String.format("AsyncEventQueue %s can not be created with dispatcher threads less than 1",
              id));
    }

    if (this.attrs.isParallel()) {
      if ((this.attrs.getOrderPolicy() != null)
          && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
        throw new AsyncEventQueueConfigurationException(
            String.format(
                "AsyncEventQueue %s can not be created with OrderPolicy %s when it is set parallel",
                id, this.attrs.getOrderPolicy()));
      }

      if (this.cache instanceof GemFireCacheImpl) {
        sender = new ParallelGatewaySenderImpl(cache, statisticsClock, attrs);
        this.cache.addGatewaySender(sender);
        if (!this.attrs.isManualStart()) {
          sender.start();
        }
      } else if (this.cache instanceof CacheCreation) {
        sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
        this.cache.addGatewaySender(sender);
      }
    } else {
      if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
        this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
      }
      if (this.cache instanceof GemFireCacheImpl) {
        sender = new SerialGatewaySenderImpl(cache, statisticsClock, attrs);
        this.cache.addGatewaySender(sender);
        if (!this.attrs.isManualStart()) {
          sender.start();
        }
      } else if (this.cache instanceof CacheCreation) {
        sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
        this.cache.addGatewaySender(sender);
      }
    }
    return sender;
  }

  @Override
  public GatewaySenderFactory removeGatewayEventFilter(GatewayEventFilter filter) {
    this.attrs.eventFilters.remove(filter);
    return this;
  }

  @Override
  public GatewaySenderFactory removeGatewayTransportFilter(GatewayTransportFilter filter) {
    this.attrs.transFilters.remove(filter);
    return this;
  }

  @Override
  public GatewaySenderFactory setGatewayEventSubstitutionFilter(
      GatewayEventSubstitutionFilter filter) {
    this.attrs.eventSubstitutionFilter = filter;
    return this;
  }

  @Override
  public void configureGatewaySender(GatewaySender senderCreation) {
    this.attrs.isParallel = senderCreation.isParallel();
    this.attrs.manualStart = senderCreation.isManualStart();
    this.attrs.socketBufferSize = senderCreation.getSocketBufferSize();
    this.attrs.socketReadTimeout = senderCreation.getSocketReadTimeout();
    this.attrs.isBatchConflationEnabled = senderCreation.isBatchConflationEnabled();
    this.attrs.batchSize = senderCreation.getBatchSize();
    this.attrs.batchTimeInterval = senderCreation.getBatchTimeInterval();
    this.attrs.isPersistenceEnabled = senderCreation.isPersistenceEnabled();
    this.attrs.diskStoreName = senderCreation.getDiskStoreName();
    this.attrs.isDiskSynchronous = senderCreation.isDiskSynchronous();
    this.attrs.maximumQueueMemory = senderCreation.getMaximumQueueMemory();
    this.attrs.alertThreshold = senderCreation.getAlertThreshold();
    this.attrs.dispatcherThreads = senderCreation.getDispatcherThreads();
    this.attrs.policy = senderCreation.getOrderPolicy();
    for (GatewayEventFilter filter : senderCreation.getGatewayEventFilters()) {
      this.attrs.eventFilters.add(filter);
    }
    for (GatewayTransportFilter filter : senderCreation.getGatewayTransportFilters()) {
      this.attrs.transFilters.add(filter);
    }
    this.attrs.eventSubstitutionFilter = senderCreation.getGatewayEventSubstitutionFilter();
  }
}
