/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.client.internal.locator.wan;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;

import org.apache.logging.log4j.Logger;

import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.internal.CopyOnWriteHashSet;
import org.apache.geode.internal.admin.remote.DistributionLocatorId;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadFactory;

/**
 * An implementation of
 * {@link org.apache.geode.cache.client.internal.locator.wan.LocatorMembershipListener}
 *
 *
 */
public class LocatorMembershipListenerImpl implements LocatorMembershipListener {
  private static final Logger logger = LogService.getLogger();
  static final int LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS = 3;
  private static final String LOCATORS_DISTRIBUTOR_THREAD_NAME = "LocatorsDistributorThread";
  private static final String LISTENER_FAILURE_MESSAGE =
      "Locator Membership listener could not exchange locator information {}:{} with {}:{} after {} retry attempts";
  private static final String LISTENER_FINAL_FAILURE_MESSAGE =
      "Locator Membership listener permanently failed to exchange locator information {}:{} with {}:{} after {} retry attempts";

  private int port;
  private DistributionConfig config;
  private final TcpClient tcpClient;
  private final ConcurrentMap<Integer, Set<String>> allServerLocatorsInfo =
      new ConcurrentHashMap<>();
  private final ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo =
      new ConcurrentHashMap<>();

  LocatorMembershipListenerImpl() {
    this.tcpClient = new TcpClient();
  }

  LocatorMembershipListenerImpl(TcpClient tcpClient) {
    this.tcpClient = tcpClient;
  }

  @Override
  public void setPort(int port) {
    this.port = port;
  }

  @Override
  public void setConfig(DistributionConfig config) {
    this.config = config;
  }

  Thread buildLocatorsDistributorThread(DistributionLocatorId localLocatorId,
      Map<Integer, Set<DistributionLocatorId>> remoteLocators, DistributionLocatorId joiningLocator,
      int joiningLocatorDistributedSystemId) {
    Runnable distributeLocatorsRunnable =
        new DistributeLocatorsRunnable(config.getMemberTimeout(), tcpClient, localLocatorId,
            remoteLocators, joiningLocator, joiningLocatorDistributedSystemId);
    ThreadFactory threadFactory = new LoggingThreadFactory(LOCATORS_DISTRIBUTOR_THREAD_NAME, true);

    return threadFactory.newThread(distributeLocatorsRunnable);
  }

  /**
   * When the new locator is added to remote locator metadata, inform all other locators in remote
   * locator metadata about the new locator so that they can update their remote locator metadata.
   *
   * @param distributedSystemId Id of the joining locator.
   * @param locator Id of the joining locator.
   * @param sourceLocator Id of the locator that notified this locator about the new one.
   */
  @Override
  public void locatorJoined(final int distributedSystemId, final DistributionLocatorId locator,
      final DistributionLocatorId sourceLocator) {
    // DistributionLocatorId for local locator.
    DistributionLocatorId localLocatorId;
    String localLocator = config.getStartLocator();
    if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) {
      localLocatorId = new DistributionLocatorId(port, config.getBindAddress());
    } else {
      localLocatorId = new DistributionLocatorId(localLocator);
    }

    // Make a local copy of the current list of known locators.
    ConcurrentMap<Integer, Set<DistributionLocatorId>> remoteLocators = getAllLocatorsInfo();
    Map<Integer, Set<DistributionLocatorId>> localCopy = new HashMap<>();
    for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : remoteLocators.entrySet()) {
      Set<DistributionLocatorId> value = new CopyOnWriteHashSet<>(entry.getValue());
      localCopy.put(entry.getKey(), value);
    }

    // Remove locators that don't need to be notified (myself, the joining one and the one that
    // notified myself).
    List<DistributionLocatorId> ignoreList = Arrays.asList(locator, localLocatorId, sourceLocator);
    for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : localCopy.entrySet()) {
      for (DistributionLocatorId removeLocId : ignoreList) {
        entry.getValue().remove(removeLocId);
      }
    }

    // Launch Locators Distributor thread.
    Thread locatorsDistributorThread =
        buildLocatorsDistributorThread(localLocatorId, localCopy, locator, distributedSystemId);
    locatorsDistributorThread.start();
  }

  @Override
  public Object handleRequest(Object request) {
    Object response = null;

    if (request instanceof RemoteLocatorJoinRequest) {
      response = updateAllLocatorInfo((RemoteLocatorJoinRequest) request);
    } else if (request instanceof LocatorJoinMessage) {
      response = informAboutRemoteLocators((LocatorJoinMessage) request);
    } else if (request instanceof RemoteLocatorPingRequest) {
      response = getPingResponse((RemoteLocatorPingRequest) request);
    } else if (request instanceof RemoteLocatorRequest) {
      response = getRemoteLocators((RemoteLocatorRequest) request);
    }

    return response;
  }

  /**
   * A locator from the request is checked against the existing remote locator metadata. If it is
   * not available then added to existing remote locator metadata and LocatorMembershipListener is
   * invoked to inform about the this newly added locator to all other locators available in remote
   * locator metadata. As a response, remote locator metadata is sent.
   *
   */
  private synchronized Object updateAllLocatorInfo(RemoteLocatorJoinRequest request) {
    int distributedSystemId = request.getDistributedSystemId();
    DistributionLocatorId locator = request.getLocator();

    LocatorHelper.addLocator(distributedSystemId, locator, this, null);
    return new RemoteLocatorJoinResponse(this.getAllLocatorsInfo());
  }

  @SuppressWarnings("unused")
  private Object getPingResponse(RemoteLocatorPingRequest request) {
    return new RemoteLocatorPingResponse();
  }

  private Object informAboutRemoteLocators(LocatorJoinMessage request) {
    // TODO: FInd out the importance of list locatorJoinMessages. During
    // refactoring I could not understand its significance
    // synchronized (locatorJoinObject) {
    // if (locatorJoinMessages.contains(request)) {
    // return null;
    // }
    // locatorJoinMessages.add(request);
    // }
    int distributedSystemId = request.getDistributedSystemId();
    DistributionLocatorId locator = request.getLocator();
    DistributionLocatorId sourceLocatorId = request.getSourceLocator();

    LocatorHelper.addLocator(distributedSystemId, locator, this, sourceLocatorId);
    return null;
  }

  private Object getRemoteLocators(RemoteLocatorRequest request) {
    int dsId = request.getDsId();
    Set<String> locators = this.getRemoteLocatorInfo(dsId);
    return new RemoteLocatorResponse(locators);
  }

  @Override
  public Set<String> getRemoteLocatorInfo(int dsId) {
    return this.allServerLocatorsInfo.get(dsId);
  }

  @Override
  public ConcurrentMap<Integer, Set<DistributionLocatorId>> getAllLocatorsInfo() {
    return this.allLocatorsInfo;
  }

  @Override
  public ConcurrentMap<Integer, Set<String>> getAllServerLocatorsInfo() {
    return this.allServerLocatorsInfo;
  }

  @Override
  public void clearLocatorInfo() {
    allLocatorsInfo.clear();
    allServerLocatorsInfo.clear();
  }

  private static class DistributeLocatorsRunnable implements Runnable {
    private final int memberTimeout;
    private final TcpClient tcpClient;
    private final DistributionLocatorId localLocatorId;
    private final Map<Integer, Set<DistributionLocatorId>> remoteLocators;
    private final DistributionLocatorId joiningLocator;
    private final int joiningLocatorDistributedSystemId;

    DistributeLocatorsRunnable(int memberTimeout,
        TcpClient tcpClient,
        DistributionLocatorId localLocatorId,
        Map<Integer, Set<DistributionLocatorId>> remoteLocators,
        DistributionLocatorId joiningLocator,
        int joiningLocatorDistributedSystemId) {

      this.memberTimeout = memberTimeout;
      this.tcpClient = tcpClient;
      this.localLocatorId = localLocatorId;
      this.remoteLocators = remoteLocators;
      this.joiningLocator = joiningLocator;
      this.joiningLocatorDistributedSystemId = joiningLocatorDistributedSystemId;
    }

    void sendMessage(DistributionLocatorId targetLocator, LocatorJoinMessage locatorJoinMessage,
        Map<DistributionLocatorId, Set<LocatorJoinMessage>> failedMessages) {
      DistributionLocatorId advertisedLocator = locatorJoinMessage.getLocator();

      try {
        tcpClient.requestToServer(targetLocator.getHost(), locatorJoinMessage, memberTimeout,
            false);
      } catch (Exception exception) {
        if (logger.isDebugEnabled()) {
          logger.debug(LISTENER_FAILURE_MESSAGE,
              new Object[] {advertisedLocator.getHostName(), advertisedLocator.getPort(),
                  targetLocator.getHostName(), targetLocator.getPort(), 1, exception});
        }

        if (!failedMessages.containsKey(targetLocator)) {
          failedMessages.put(targetLocator, new HashSet<>());
        }

        failedMessages.get(targetLocator).add(locatorJoinMessage);
      }
    }

    boolean retryMessage(DistributionLocatorId targetLocator, LocatorJoinMessage locatorJoinMessage,
        int retryAttempt) {
      DistributionLocatorId advertisedLocator = locatorJoinMessage.getLocator();

      try {
        tcpClient.requestToServer(targetLocator.getHost(), locatorJoinMessage, memberTimeout,
            false);

        return true;
      } catch (Exception exception) {
        if (retryAttempt == LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS) {
          logger.warn(LISTENER_FINAL_FAILURE_MESSAGE,
              new Object[] {advertisedLocator.getHostName(), advertisedLocator.getPort(),
                  targetLocator.getHostName(), targetLocator.getPort(), retryAttempt, exception});
        } else {
          if (logger.isDebugEnabled()) {
            logger.debug(LISTENER_FAILURE_MESSAGE,
                new Object[] {advertisedLocator.getHostName(), advertisedLocator.getPort(),
                    targetLocator.getHostName(), targetLocator.getPort(), retryAttempt, exception});
          }
        }

        return false;
      }
    }

    @Override
    public void run() {
      Map<DistributionLocatorId, Set<LocatorJoinMessage>> failedMessages = new HashMap<>();
      for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : remoteLocators.entrySet()) {
        for (DistributionLocatorId value : entry.getValue()) {
          // Notify known remote locator about the advertised locator.
          LocatorJoinMessage advertiseNewLocatorMessage = new LocatorJoinMessage(
              joiningLocatorDistributedSystemId, joiningLocator, localLocatorId, "");
          sendMessage(value, advertiseNewLocatorMessage, failedMessages);

          // Notify the advertised locator about remote known locator.
          LocatorJoinMessage advertiseKnownLocatorMessage =
              new LocatorJoinMessage(entry.getKey(), value, localLocatorId, "");
          sendMessage(joiningLocator, advertiseKnownLocatorMessage, failedMessages);
        }
      }

      // Retry failed messages and remove those that succeed.
      if (!failedMessages.isEmpty()) {
        for (int attempt = 1; attempt <= LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS; attempt++) {

          for (Map.Entry<DistributionLocatorId, Set<LocatorJoinMessage>> entry : failedMessages
              .entrySet()) {
            DistributionLocatorId targetLocator = entry.getKey();
            Set<LocatorJoinMessage> joinMessages = entry.getValue();

            for (LocatorJoinMessage locatorJoinMessage : joinMessages) {
              if (retryMessage(targetLocator, locatorJoinMessage, attempt)) {
                joinMessages.remove(locatorJoinMessage);
              } else {
                // Sleep between retries.
                try {
                  Thread.sleep(memberTimeout);
                } catch (InterruptedException e) {
                  Thread.currentThread().interrupt();
                  logger.warn(
                      "Locator Membership listener permanently failed to exchange locator information due to interruption.");
                }
              }
            }
          }
        }
      }
    }
  }
}
