/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.client.internal;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.GemFireConfigException;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.GatewayConfigurationException;
import org.apache.geode.cache.client.ServerRefusedConnectionException;
import org.apache.geode.cache.client.internal.ServerDenyList.FailureTracker;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.tier.sockets.CacheClientUpdater;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.security.GemFireSecurityException;

/**
 * Creates connections, using a connection source to determine which server to connect to.
 *
 * @since GemFire 5.7
 *
 */
public class ConnectionFactoryImpl implements ConnectionFactory {

  private static final Logger logger = LogService.getLogger();

  // TODO - GEODE-1746, the handshake holds state. It seems like the code depends
  // on all of the handshake operations happening in a single thread. I don't think we
  // want that, need to refactor.
  private final ServerDenyList denyList;
  private ConnectionSource source;
  private PoolImpl pool;
  private final CancelCriterion cancelCriterion;
  private final ConnectionConnector connectionConnector;


  /**
   * Test hook for client version support
   *
   * @since GemFire 5.7
   */

  @MutableForTesting
  public static boolean testFailedConnectionToServer = false;

  ConnectionFactoryImpl(ConnectionSource source, EndpointManager endpointManager,
      InternalDistributedSystem sys, int socketBufferSize, int handshakeTimeout, int readTimeout,
      ClientProxyMembershipID proxyId, CancelCriterion cancelCriterion, boolean usedByGateway,
      GatewaySender sender, long pingInterval, boolean multiuserSecureMode, PoolImpl pool) {
    this(
        new ConnectionConnector(endpointManager, sys, socketBufferSize, handshakeTimeout,
            readTimeout, usedByGateway, sender,
            (usedByGateway || sender != null) ? SocketCreatorFactory
                .getSocketCreatorForComponent(SecurableCommunicationChannel.GATEWAY)
                : SocketCreatorFactory
                    .getSocketCreatorForComponent(SecurableCommunicationChannel.SERVER),
            new ClientSideHandshakeImpl(proxyId, sys, sys.getSecurityService(),
                multiuserSecureMode)),
        source, pingInterval, pool, cancelCriterion);
  }

  public ConnectionFactoryImpl(ConnectionConnector connectionConnector, ConnectionSource source,
      long pingInterval, PoolImpl pool, CancelCriterion cancelCriterion) {
    this.connectionConnector = connectionConnector;
    this.source = source;
    this.pool = pool;
    this.cancelCriterion = cancelCriterion;

    denyList = new ServerDenyList(pingInterval);
  }

  public void start(ScheduledExecutorService background) {
    denyList.start(background);
  }

  @Override
  public ServerDenyList getDenyList() {
    return denyList;
  }

  @Override
  public Connection createClientToServerConnection(ServerLocation location, boolean forQueue)
      throws GemFireSecurityException {
    FailureTracker failureTracker = denyList.getFailureTracker(location);

    Connection connection = null;
    try {
      connection = connectionConnector.connectClientToServer(location, forQueue);
      failureTracker.reset();
      authenticateIfRequired(connection);
    } catch (GemFireConfigException | CancelException | GemFireSecurityException
        | GatewayConfigurationException e) {
      throw e;
    } catch (ServerRefusedConnectionException src) {
      // propagate this up, don't retry
      logger.warn("Could not create a new connection to server: {}",
          src.getMessage());
      testFailedConnectionToServer = true;
      throw src;
    } catch (Exception e) {
      if (e.getMessage() != null && (e.getMessage().equals("Connection refused")
          || e.getMessage().equals("Connection reset"))) {
        // this is the most common case, so don't print an exception
        if (logger.isDebugEnabled()) {
          logger.debug("Unable to connect to {}: connection refused", location);
        }
      } else {
        logger.warn("Could not connect to: " + location, e);
      }
      testFailedConnectionToServer = true;
    }

    return connection;
  }

  private void authenticateIfRequired(Connection conn) {
    cancelCriterion.checkCancelInProgress(null);
    if (!pool.isUsedByGateway() && !pool.getMultiuserAuthentication()) {
      ServerLocation server = conn.getServer();
      if (server.getRequiresCredentials()) {
        if (server.getUserId() == -1) {
          Long uniqueID = (Long) AuthenticateUserOp.executeOn(conn, pool);
          server.setUserId(uniqueID);
          if (logger.isDebugEnabled()) {
            logger.debug("CFI.authenticateIfRequired() Completed authentication on {}", conn);
          }
        }
      }
    }
  }

  @Override
  public ServerLocation findBestServer(ServerLocation currentServer,
      Set<ServerLocation> excludedServers) {
    if (currentServer != null && source.isBalanced()) {
      return currentServer;
    }
    final Set<ServerLocation> origExcludedServers = excludedServers;
    excludedServers = new HashSet<>(excludedServers);
    Set<ServerLocation> denyListedServers = denyList.getBadServers();
    excludedServers.addAll(denyListedServers);
    ServerLocation server = source.findReplacementServer(currentServer, excludedServers);
    if (server == null) {
      // Nothing worked! Let's try without the denylist.
      if (excludedServers.size() > origExcludedServers.size()) {
        // We had some servers denylisted so lets give this another whirl.
        server = source.findReplacementServer(currentServer, origExcludedServers);
      }
    }
    if (server == null && logger.isDebugEnabled()) {
      logger.debug("Source was unable to findForReplacement any servers");
    }
    return server;
  }

  @Override
  public Connection createClientToServerConnection(Set<ServerLocation> excludedServers)
      throws GemFireSecurityException {
    final Set<ServerLocation> origExcludedServers = excludedServers;
    excludedServers = new HashSet<>(excludedServers);
    Set<ServerLocation> denyListedServers = denyList.getBadServers();
    excludedServers.addAll(denyListedServers);
    Connection conn = null;
    RuntimeException fatalException = null;
    boolean tryDenyList = true;

    do {
      ServerLocation server = source.findServer(excludedServers);
      if (server == null) {

        if (tryDenyList) {
          // Nothing worked! Let's try without the denylist.
          tryDenyList = false;
          int size = excludedServers.size();
          excludedServers.removeAll(denyListedServers);
          // make sure we didn't remove any of the ones that the caller set not to use
          excludedServers.addAll(origExcludedServers);
          if (excludedServers.size() < size) {
            // We are able to remove some exclusions, so lets give this another whirl.
            continue;
          }
        }
        if (logger.isDebugEnabled()) {
          logger.debug("Source was unable to locate any servers");
        }
        if (fatalException != null) {
          throw fatalException;
        }
        return null;
      }

      try {
        conn = createClientToServerConnection(server, false);
      } catch (CancelException | GemFireSecurityException | GatewayConfigurationException e) {
        throw e;
      } catch (ServerRefusedConnectionException srce) {
        fatalException = srce;
        if (logger.isDebugEnabled()) {
          logger.debug("ServerRefusedConnectionException attempting to connect to {}", server,
              srce);
        }
      } catch (Exception e) {
        logger.warn(String.format("Could not connect to: %s", server), e);
      }

      excludedServers.add(server);
    } while (conn == null);

    return conn;
  }

  @Override
  public ClientUpdater createServerToClientConnection(Endpoint endpoint, QueueManager qManager,
      boolean isPrimary, ClientUpdater failedUpdater) {
    String clientUpdateName = CacheClientUpdater.CLIENT_UPDATER_THREAD_NAME + " on "
        + endpoint.getMemberId() + " port " + endpoint.getLocation().getPort();
    if (logger.isDebugEnabled()) {
      logger.debug("Establishing: {}", clientUpdateName);
    }

    return connectionConnector.connectServerToClient(endpoint, qManager, isPrimary, failedUpdater,
        clientUpdateName);
  }
}
