/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.distributed.internal.membership.gms.fd;


import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.logging.log4j.Logger;
import org.jgroups.util.UUID;

import org.apache.geode.CancelException;
import org.apache.geode.GemFireConfigException;
import org.apache.geode.SystemConnectException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.NetView;
import org.apache.geode.distributed.internal.membership.gms.GMSMember;
import org.apache.geode.distributed.internal.membership.gms.ServiceConfig;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
import org.apache.geode.distributed.internal.membership.gms.messages.FinalCheckPassedMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatRequestMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.SuspectMembersMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.SuspectRequest;
import org.apache.geode.internal.ConnectionWatcher;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.logging.LoggingExecutors;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;

/**
 * Failure Detection
 * <p>
 * This class make sure that each member is alive and communicating to this member. To make sure
 * that we create the ring of members based on current view. On this ring, each member make sure
 * that next-member in ring is communicating with it. For that we record last message timestamp from
 * next-member. And if it sees this member has not communicated in last period(member-timeout) then
 * we check whether this member is still alive or not. Based on that we informed probable
 * coordinators to remove that member from view.
 * <p>
 * It has {@link #suspect(InternalDistributedMember, String)} api, which can be used to initiate
 * suspect processing for any member. First is checks whether the member is responding or not. Then
 * it informs probable coordinators to remove that member from view.
 * <p>
 * It has {@link HealthMonitor#checkIfAvailable(InternalDistributedMember, String, boolean)} api to
 * see if that member is
 * alive. Then based on removal flag it initiates the suspect processing for that member.
 */
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "NullableProblems"})
public class GMSHealthMonitor implements HealthMonitor {

  private Services services;
  private volatile NetView currentView;
  private volatile InternalDistributedMember nextNeighbor;

  long memberTimeout;
  private volatile boolean isStopping = false;
  private final AtomicInteger requestId = new AtomicInteger();

  /**
   * membership logger
   */
  private static final Logger logger = Services.getLogger();

  /**
   * The number of recipients of periodic heartbeats. The recipients will be selected from the
   * members that are likely to be monitoring this member.
   */
  private static final int NUM_HEARTBEATS = Integer.getInteger("geode.heartbeat-recipients", 2);

  /**
   * Member activity will be recorded per interval/period. Timer task will set interval's starting
   * time. Each interval will be member-timeout/LOGICAL_INTERVAL. LOGICAL_INTERVAL may be configured
   * via a system property with a default of 2. At least 1 interval is needed.
   */
  public static final int LOGICAL_INTERVAL =
      Integer.getInteger("geode.logical-message-received-interval", 2);

  /**
   * stall time to wait for members leaving concurrently
   */
  public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL =
      Long.getLong("geode.suspect-member-collection-interval", 200);

  private volatile long currentTimeStamp;

  /**
   * this member's ID
   */
  private InternalDistributedMember localAddress;

  /**
   * Timestamp at which we last had contact from a member
   */
  final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps =
      new ConcurrentHashMap<>();

  /**
   * Members currently being suspected and the view they were suspected in
   */
  private final ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberIds =
      new ConcurrentHashMap<>();

  /**
   * Members undergoing final checks
   */
  private final List<InternalDistributedMember> membersInFinalCheck =
      Collections.synchronizedList(new ArrayList<>(30));

  /**
   * Replies to messages
   */
  private final Map<Integer, Response> requestIdVsResponse = new ConcurrentHashMap<>();

  /**
   * Members suspected in a particular view
   */
  private final Map<NetView, Set<SuspectRequest>> suspectRequestsInView = new HashMap<>();

  private ScheduledExecutorService scheduler;

  private ExecutorService checkExecutor;

  /**
   * to stop check scheduler
   */
  private ScheduledFuture<?> monitorFuture;

  /**
   * test hook
   */
  private volatile boolean playingDead = false;

  /**
   * test hook
   */
  private volatile boolean beingSick = false;

  // For TCP check
  private ExecutorService serverSocketExecutor;
  static final int OK = 0x7B;
  static final int ERROR = 0x00;
  private volatile int socketPort;
  private volatile ServerSocket serverSocket;

  /**
   * Statistics about health monitor
   */
  private DMStats stats;

  /**
   * Interval to run the Monitor task
   */
  private long monitorInterval;

  /**
   * /**
   * this class is to avoid garbage
   */
  private static class TimeStamp {

    private volatile long timeStamp;

    TimeStamp(long timeStamp) {
      this.timeStamp = timeStamp;
    }

    public long getTime() {
      return timeStamp;
    }

    public void setTime(long timeStamp) {
      this.timeStamp = timeStamp;
    }
  }

  /***
   * This class sets start interval timestamp to record the activity of all members. That is used by
   * {@link GMSHealthMonitor#contactedBy(InternalDistributedMember)} to record the activity of
   * member.
   *
   * It initiates the suspect processing for next neighbour if it doesn't see any activity from that
   * member in last interval(member-timeout)
   */
  private class Monitor implements Runnable {
    /**
     * Here we use the same threshold for detecting JVM pauses as the StatSampler
     */
    private final long MONITOR_DELAY_THRESHOLD =
        Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "statSamplerDelayThreshold", 3000);


    final long memberTimeoutInMillis;

    public Monitor(long memberTimeout) {
      memberTimeoutInMillis = memberTimeout;
    }

    @Override
    public void run() {

      if (GMSHealthMonitor.this.isStopping) {
        return;
      }

      InternalDistributedMember neighbour = nextNeighbor;

      long currentTime = System.currentTimeMillis();
      // this is the start of interval to record member activity
      GMSHealthMonitor.this.currentTimeStamp = currentTime;


      long oldTimeStamp = currentTimeStamp;
      currentTimeStamp = System.currentTimeMillis();

      NetView myView = GMSHealthMonitor.this.currentView;
      if (myView == null) {
        return;
      }

      if (currentTimeStamp - oldTimeStamp > monitorInterval + MONITOR_DELAY_THRESHOLD) {
        // delay in running this task - don't suspect anyone for a while
        logger.info(
            "Failure detector has noticed a JVM pause and is giving all members a heartbeat in view {}",
            currentView);
        for (InternalDistributedMember member : myView.getMembers()) {
          contactedBy(member);
        }
        return;
      }

      if (neighbour != null) {
        TimeStamp nextNeighborTS;
        synchronized (GMSHealthMonitor.this) {
          nextNeighborTS = GMSHealthMonitor.this.memberTimeStamps.get(neighbour);
        }

        if (nextNeighborTS == null) {
          TimeStamp customTS = new TimeStamp(currentTime);
          memberTimeStamps.put(neighbour, customTS);
          return;
        }

        long interval = memberTimeoutInMillis / GMSHealthMonitor.LOGICAL_INTERVAL;
        long lastTS = currentTime - nextNeighborTS.getTime();
        if (lastTS + interval >= memberTimeoutInMillis) {
          logger.debug("Checking member {} ", neighbour);
          // now do check request for this member;
          checkMember(neighbour);
        }
      }
    }
  }

  /***
   * Check thread waits on this object for response. It puts requestId in requestIdVsResponse map.
   * Response will have requestId, which is used to get ResponseObject. Then it is used to notify
   * waiting thread.
   */
  private class Response {

    private DistributionMessage responseMsg;

    public DistributionMessage getResponseMsg() {
      return responseMsg;
    }

    public void setResponseMsg(DistributionMessage responseMsg) {
      this.responseMsg = responseMsg;
    }

  }

  class ClientSocketHandler implements Runnable {

    private final Socket socket;

    public ClientSocketHandler(Socket socket) {
      this.socket = socket;
    }

    @Override
    public void run() {
      try {
        socket.setTcpNoDelay(true);
        DataInputStream in = new DataInputStream(socket.getInputStream());
        OutputStream out = socket.getOutputStream();
        @SuppressWarnings("UnusedAssignment")
        short version = in.readShort();
        int vmViewId = in.readInt();
        long uuidLSBs = in.readLong();
        long uuidMSBs = in.readLong();
        GMSHealthMonitor.this.stats.incFinalCheckRequestsReceived();
        GMSHealthMonitor.this.stats.incTcpFinalCheckRequestsReceived();
        GMSMember gmbr = (GMSMember) GMSHealthMonitor.this.localAddress.getNetMember();
        UUID myUUID = gmbr.getUUID();
        // during reconnect or rapid restart we will have a zero viewId but there may still
        // be an old ID in the membership view that we do not want to respond to
        int myVmViewId = gmbr.getVmViewId();
        if (playingDead) {
          logger.debug("HealthMonitor: simulating sick member in health check");
        } else if (uuidLSBs == myUUID.getLeastSignificantBits()
            && uuidMSBs == myUUID.getMostSignificantBits()
            && (vmViewId == myVmViewId || myVmViewId < 0)) {
          logger.debug("HealthMonitor: sending OK reply");
          out.write(OK);
          out.flush();
          socket.shutdownOutput();
          GMSHealthMonitor.this.stats.incFinalCheckResponsesSent();
          GMSHealthMonitor.this.stats.incTcpFinalCheckResponsesSent();
          logger.debug("HealthMonitor: server replied OK.");
        } else {
          if (logger.isDebugEnabled()) {
            logger.debug(
                "HealthMonitor: sending ERROR reply - my UUID is {},{} received is {},{}.  My viewID is {} received is {}",
                Long.toHexString(myUUID.getMostSignificantBits()),
                Long.toHexString(myUUID.getLeastSignificantBits()), Long.toHexString(uuidMSBs),
                Long.toHexString(uuidLSBs), myVmViewId, vmViewId);
          }
          out.write(ERROR);
          out.flush();
          socket.shutdownOutput();
          GMSHealthMonitor.this.stats.incFinalCheckResponsesSent();
          GMSHealthMonitor.this.stats.incTcpFinalCheckResponsesSent();
          logger.debug("HealthMonitor: server replied ERROR.");
        }
      } catch (IOException e) {
        // this is expected if it is a connection-timeout or other failure
        // to connect
      } catch (RuntimeException e) {
        logger.debug("Unexpected runtime exception", e);
        throw e;
      } catch (Error e) {
        logger.debug("Unexpected error", e);
        throw e;
      } finally {
        if (socket != null) {
          try {
            socket.close();
          } catch (IOException e) {
            // expected if the socket is already closed
          }
        }
      }
    }
  }

  public GMSHealthMonitor() {

  }

  @SuppressWarnings("EmptyMethod")
  public static void loadEmergencyClasses() {}

  /*
   * Record the member activity for current time interval.
   */
  @Override
  public void contactedBy(InternalDistributedMember sender) {
    contactedBy(sender, currentTimeStamp);
  }


  /**
   * Record member activity at a specified time
   */
  private void contactedBy(InternalDistributedMember sender, long timeStamp) {
    TimeStamp cTS = new TimeStamp(timeStamp);
    cTS = memberTimeStamps.putIfAbsent(sender, cTS);
    if (cTS != null && cTS.getTime() < timeStamp) {
      cTS.setTime(timeStamp);
    }
    if (suspectedMemberIds.containsKey(sender)) {
      memberUnsuspected(sender);
      setNextNeighbor(currentView, null);
    }
  }


  private HeartbeatRequestMessage constructHeartbeatRequestMessage(
      final InternalDistributedMember mbr) {
    final int reqId = requestId.getAndIncrement();
    final HeartbeatRequestMessage hrm = new HeartbeatRequestMessage(mbr, reqId);
    hrm.setRecipient(mbr);

    return hrm;
  }

  private void checkMember(final InternalDistributedMember mbr) {
    final NetView cv = GMSHealthMonitor.this.currentView;

    // as check may take time
    setNextNeighbor(cv, mbr);

    // we need to check this member
    checkExecutor.execute(() -> {
      boolean pinged;
      try {
        pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
      } catch (CancelException e) {
        return;
      }

      if (!pinged) {
        String reason = "Member isn't responding to heartbeat requests";
        memberSuspected(localAddress, mbr, reason);
        initiateSuspicion(mbr, reason);
        setNextNeighbor(currentView, null);
      } else {
        logger.trace("Setting next neighbor as member {} has responded.", mbr);
        memberUnsuspected(mbr);
        // back to previous one
        setNextNeighbor(currentView, null);
      }
    });

  }

  private void initiateSuspicion(InternalDistributedMember mbr, String reason) {
    if (services.getJoinLeave().isMemberLeaving(mbr)) {
      return;
    }
    sendSuspectRequest(Collections.singletonList(new SuspectRequest(mbr, reason)));
  }

  /**
   * This method sends heartbeat request to other member and waits for member-timeout time for
   * response. If it doesn't see response then it returns false.
   */
  private boolean doCheckMember(InternalDistributedMember member, boolean waitForResponse) {
    if (playingDead || beingSick) {
      // a member playingDead should not be sending messages to other
      // members, so we avoid sending heartbeat requests or suspect
      // messages by returning true.
      return true;
    }
    long startTime = System.currentTimeMillis();
    logger.debug("Requesting heartbeat from {}", member);
    final HeartbeatRequestMessage hrm = constructHeartbeatRequestMessage(member);
    Response pingResp = null;
    if (waitForResponse) {
      pingResp = new Response();
      requestIdVsResponse.put(hrm.getRequestId(), pingResp);
    } else {
      hrm.clearRequestId();
    }
    try {
      Set<InternalDistributedMember> membersNotReceivedMsg = this.services.getMessenger().send(hrm);
      this.stats.incHeartbeatRequestsSent();
      if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(member)) {
        // member is not part of current view.
        logger.trace("Member {} is not part of current view.", member);
      } else if (waitForResponse) {
        synchronized (pingResp) {
          if (pingResp.getResponseMsg() == null) {
            pingResp.wait(memberTimeout);
          }
          TimeStamp ts = memberTimeStamps.get(member);
          if (ts != null && ts.getTime() > startTime) {
            return true;
          }
          if (pingResp.getResponseMsg() == null) {
            if (isStopping) {
              return true;
            }
            logger.debug("no heartbeat response received from {} and no recent activity", member);
            return false;
          } else {
            logger.trace("received heartbeat from {}", member);
            this.stats.incHeartbeatsReceived();
            if (ts != null) {
              ts.setTime(System.currentTimeMillis());
            }
            return true;
          }
        }
      }
    } catch (InterruptedException e) {
      logger.debug(
          "GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .",
          member);
    } finally {
      if (waitForResponse) {
        requestIdVsResponse.remove(hrm.getRequestId());
      }
    }
    return false;
  }

  /**
   * During final check, establish TCP connection between current member and suspect member. And
   * exchange PING/PONG message to see if the suspect member is still alive.
   *
   * @param suspectMember member that does not respond to HeartbeatRequestMessage
   * @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
   */
  boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port,
      boolean retryIfConnectFails) {
    Socket clientSocket = null;
    // make sure we try to check on the member for the contracted memberTimeout period
    // in case a timed socket.connect() returns immediately
    long giveupTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(
        services.getConfig().getMemberTimeout(), TimeUnit.MILLISECONDS);
    boolean passed = false;
    int iteration = 0;
    do {
      iteration++;
      if (iteration > 1) {
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          return false;
        }
      }
      try {
        logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember,
            suspectMember.getInetAddress(), port);
        clientSocket =
            SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER)
                .connect(suspectMember.getInetAddress(), port, (int) memberTimeout,
                    new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false);
        clientSocket.setTcpNoDelay(true);
        passed = doTCPCheckMember(suspectMember, clientSocket);
      } catch (IOException e) {
        // this is expected if it is a connection-timeout or other failure
        // to connect
      } catch (IllegalStateException | GemFireConfigException e) {
        if (!isStopping) {
          logger.trace("Unexpected exception", e);
        }
      } finally {
        try {
          if (clientSocket != null) {
            clientSocket.setSoLinger(true, 0); // abort the connection
            clientSocket.close();
          }
        } catch (IOException e) {
          // expected
        }
      }
    } while (retryIfConnectFails && !passed && !this.isShutdown()
        && System.nanoTime() < giveupTime);
    return passed;
  }

  // Package protected for testing purposes
  boolean doTCPCheckMember(InternalDistributedMember suspectMember, Socket clientSocket) {
    try {
      if (clientSocket.isConnected()) {
        clientSocket.setSoTimeout((int) services.getConfig().getMemberTimeout());
        InputStream in = clientSocket.getInputStream();
        DataOutputStream out = new DataOutputStream(clientSocket.getOutputStream());
        GMSMember gmbr = (GMSMember) suspectMember.getNetMember();
        writeMemberToStream(gmbr, out);
        this.stats.incFinalCheckRequestsSent();
        this.stats.incTcpFinalCheckRequestsSent();
        logger.debug("Connected to suspect member - reading response");
        int b = in.read();
        if (logger.isDebugEnabled()) {
          logger.debug("Received {}",
              (b == OK ? "OK" : (b == ERROR ? "ERROR" : "unknown response: " + b)));
        }
        if (b >= 0) {
          this.stats.incFinalCheckResponsesReceived();
          this.stats.incTcpFinalCheckResponsesReceived();
        }
        if (b == OK) {
          TimeStamp ts = memberTimeStamps.get(suspectMember);
          if (ts != null) {
            ts.setTime(System.currentTimeMillis());
          }
          return true;
        } else {
          // received ERROR
          return false;
        }
      } else {// cannot establish TCP connection with suspect member
        return false;
      }
    } catch (SocketTimeoutException e) {
      logger.debug("Availability check TCP/IP connection timed out for suspect member {}",
          suspectMember);
      return false;
    } catch (IOException e) {
      logger.trace("Unexpected exception", e);
    }
    return false;
  }

  void writeMemberToStream(GMSMember gmbr, DataOutputStream out) throws IOException {
    out.writeShort(Version.CURRENT_ORDINAL);
    out.writeInt(gmbr.getVmViewId());
    out.writeLong(gmbr.getUuidLSBs());
    out.writeLong(gmbr.getUuidMSBs());
    out.flush();
  }

  @Override
  public void suspect(InternalDistributedMember mbr, String reason) {
    initiateSuspicion(mbr, reason);
  }

  @Override
  public boolean checkIfAvailable(InternalDistributedMember mbr, String reason,
      boolean initiateRemoval) {
    return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval,
        mbr, reason);
  }

  @Override
  public void start() {
    scheduler = LoggingExecutors.newScheduledThreadPool("Geode Failure Detection Scheduler", 1);
    checkExecutor = LoggingExecutors.newCachedThreadPool("Geode Failure Detection thread ", true);
    Monitor m = this.new Monitor(memberTimeout);
    monitorInterval = memberTimeout / LOGICAL_INTERVAL;
    monitorFuture =
        scheduler.scheduleAtFixedRate(m, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
    serverSocketExecutor =
        LoggingExecutors.newCachedThreadPool("Geode Failure Detection Server thread ", true);
  }

  ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) {
    ServerSocket serverSocket;
    try {
      serverSocket = SocketCreatorFactory
          .getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER)
          .createServerSocketUsingPortRange(socketAddress, 50/* backlog */, true/* isBindAddress */,
              false/* useNIO */, 65536/* tcpBufferSize */, portRange, false);
      socketPort = serverSocket.getLocalPort();
    } catch (IOException | SystemConnectException e) {
      throw new GemFireConfigException(
          "Unable to allocate a failure detection port in the membership-port range", e);
    }
    return serverSocket;
  }

  /**
   * start the thread that listens for tcp/ip connections and responds to connection attempts
   */
  private void startTcpServer(ServerSocket ssocket) {
    // allocate a socket here so there are no race conditions between knowing the FD
    // socket port and joining the system

    serverSocketExecutor.execute(() -> {
      logger.info("Started failure detection server thread on {}:{}.", ssocket.getInetAddress(),
          socketPort);
      Socket socket = null;
      try {
        while (!services.getCancelCriterion().isCancelInProgress()
            && !GMSHealthMonitor.this.isStopping) {
          try {
            socket = ssocket.accept();
            if (GMSHealthMonitor.this.playingDead) {
              continue;
            }
            serverSocketExecutor.execute(new ClientSocketHandler(socket));
          } catch (RejectedExecutionException e) {
            // this can happen during shutdown

          } catch (IOException e) {
            if (!isStopping) {
              logger.trace("Unexpected exception", e);
            }
            try {
              if (socket != null) {
                socket.close();
              }
            } catch (IOException ioe) {
              logger.trace("Unexpected exception", ioe);
            }
          }
        }
        logger.info("GMSHealthMonitor server thread exiting");
      } finally {
        // close the server socket
        if (!ssocket.isClosed()) {
          try {
            ssocket.close();
          } catch (IOException e) {
            logger.debug("Unexpected exception", e);
          }
        }
      }
    });
  }

  /**
   * start the thread that periodically sends a message to processes that might be watching this
   * process
   */
  private void startHeartbeatThread() {
    checkExecutor.execute(new Runnable() {
      @Override
      public void run() {
        Thread.currentThread().setName("Geode Heartbeat Sender");
        sendPeriodicHeartbeats();
      }

      private void sendPeriodicHeartbeats() {
        while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
          try {
            Thread.sleep(memberTimeout / LOGICAL_INTERVAL);
          } catch (InterruptedException e) {
            return;
          }
          NetView v = currentView;
          if (v != null) {
            List<InternalDistributedMember> mbrs = v.getMembers();
            int index = mbrs.indexOf(localAddress);
            if (index < 0 || mbrs.size() < 2) {
              continue;
            }
            if (!playingDead) {
              sendHeartbeats(mbrs, index);
            }
          }
        }
      }

      private void sendHeartbeats(List<InternalDistributedMember> mbrs, int startIndex) {
        InternalDistributedMember coordinator = currentView.getCoordinator();
        if (coordinator != null && !coordinator.equals(localAddress)) {
          HeartbeatMessage message = new HeartbeatMessage(-1);
          message.setRecipient(coordinator);
          try {
            if (isStopping) {
              return;
            }
            services.getMessenger().sendUnreliably(message);
            GMSHealthMonitor.this.stats.incHeartbeatsSent();
          } catch (CancelException e) {
            return;
          }
        }

        int index = startIndex;
        int numSent = 0;
        for (;;) {
          index--;
          if (index < 0) {
            index = mbrs.size() - 1;
          }
          InternalDistributedMember mbr = mbrs.get(index);
          if (mbr.equals(localAddress)) {
            break;
          }
          if (mbr.equals(coordinator)) {
            continue;
          }
          if (isStopping) {
            return;
          }
          HeartbeatMessage message = new HeartbeatMessage(-1);
          message.setRecipient(mbr);
          try {
            services.getMessenger().sendUnreliably(message);
            GMSHealthMonitor.this.stats.incHeartbeatsSent();
            numSent++;
            if (numSent >= NUM_HEARTBEATS) {
              break;
            }
          } catch (CancelException e) {
            return;
          }
        }
      } // for (;;)
    });
  }

  @Override
  public synchronized void installView(NetView newView) {
    synchronized (suspectRequestsInView) {
      suspectRequestsInView.clear();
    }
    for (Iterator<InternalDistributedMember> it = memberTimeStamps.keySet().iterator(); it
        .hasNext();) {
      if (!newView.contains(it.next())) {
        it.remove();
      }
    }
    for (Iterator<InternalDistributedMember> it = suspectedMemberIds.keySet().iterator(); it
        .hasNext();) {
      if (!newView.contains(it.next())) {
        it.remove();
      }
    }
    currentView = newView;
    setNextNeighbor(newView, null);
  }

  /**
   * this method is primarily for tests. The current view should be pulled from JoinLeave or the
   * MembershipManager (which includes surprise members)
   */
  public synchronized NetView getView() {
    return currentView;
  }

  /***
   * This method sets next neighbour which it needs to watch in current view.
   *
   * if nextTo == null then it watches member next to it.
   *
   * It becomes null when we suspect current neighbour, during that time it watches member next to
   * suspect member.
   */
  protected synchronized void setNextNeighbor(NetView newView, InternalDistributedMember nextTo) {
    if (newView == null) {
      return;
    }
    if (nextTo == null) {
      nextTo = localAddress;
    }

    List<InternalDistributedMember> allMembers = newView.getMembers();

    if (allMembers.size() > 1 && suspectedMemberIds.size() >= allMembers.size() - 1) {
      boolean nonSuspectFound = false;
      for (InternalDistributedMember member : allMembers) {
        if (member.equals(localAddress)) {
          continue;
        }
        if (!suspectedMemberIds.containsKey(member)) {
          nonSuspectFound = true;
          break;
        }
      }
      if (!nonSuspectFound) {
        logger.info("All other members are suspect at this point");
        nextNeighbor = null;
        return;
      }
    }

    int index = allMembers.indexOf(nextTo);
    if (index != -1) {
      int nextNeighborIndex = (index + 1) % allMembers.size();
      InternalDistributedMember newNeighbor = allMembers.get(nextNeighborIndex);
      if (suspectedMemberIds.containsKey(newNeighbor)) {
        setNextNeighbor(newView, newNeighbor);
        return;
      }
      InternalDistributedMember oldNeighbor = nextNeighbor;
      if (oldNeighbor != newNeighbor) {
        logger.debug("Failure detection is now watching " + newNeighbor);
        nextNeighbor = newNeighbor;
      }
    }

    if (nextNeighbor != null && nextNeighbor.equals(localAddress)) {
      if (logger.isDebugEnabled()) {
        logger.debug("Health monitor is unable to find a neighbor to watch.  "
            + "Current suspects are {}", suspectedMemberIds);
      }
      nextNeighbor = null;
    }

  }

  /** test method */
  public InternalDistributedMember getNextNeighbor() {
    return nextNeighbor;
  }

  @Override
  public void init(Services s) {
    isStopping = false;
    services = s;
    memberTimeout = s.getConfig().getMemberTimeout();
    this.stats = services.getStatistics();

    services.getMessenger().addHandler(HeartbeatRequestMessage.class,
        this::processMessage);
    services.getMessenger().addHandler(HeartbeatMessage.class,
        this::processMessage);
    services.getMessenger().addHandler(SuspectMembersMessage.class,
        this::processMessage);
    services.getMessenger().addHandler(FinalCheckPassedMessage.class,
        this::processMessage);
  }

  @Override
  public void started() {
    setLocalAddress(services.getMessenger().getMemberID());
    serverSocket = createServerSocket(localAddress.getInetAddress(),
        services.getConfig().getMembershipPortRange());
    startTcpServer(serverSocket);
    startHeartbeatThread();
  }

  @Override
  public void stop() {
    stopServices();
  }

  private void stopServices() {
    logger.debug("Stopping HealthMonitor");
    isStopping = true;
    if (monitorFuture != null) {
      monitorFuture.cancel(true);
    }
    if (scheduler != null) {
      scheduler.shutdown();
    }

    Collection<Response> val = requestIdVsResponse.values();
    for (Response r : val) {
      synchronized (r) {
        r.notify();
      }
    }

    if (checkExecutor != null) {
      checkExecutor.shutdown();
    }

    stopServer();
  }

  void stopServer() {
    if (serverSocketExecutor != null) {
      if (serverSocket != null && !serverSocket.isClosed()) {
        try {
          serverSocket.close();
        } catch (IOException e) {
          logger.trace("Unexpected exception", e);
        }
      }
      serverSocketExecutor.shutdownNow();
      try {
        serverSocketExecutor.awaitTermination(2000, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
  }

  /***
   * test method
   */
  public boolean isShutdown() {
    return scheduler.isShutdown() && checkExecutor.isShutdown()
        && serverSocketExecutor.isShutdown();
  }

  /**
   * Test method - check to see if a member is under suspicion
   */
  public boolean isSuspectMember(InternalDistributedMember m) {
    return this.suspectedMemberIds.containsKey(m);
  }

  @Override
  public void stopped() {

  }

  @Override
  public void memberSuspected(InternalDistributedMember initiator,
      InternalDistributedMember suspect, String reason) {
    synchronized (suspectRequestsInView) {
      suspectedMemberIds.put(suspect, currentView);
      Collection<SuspectRequest> requests = suspectRequestsInView.get(currentView);
      boolean found = false;
      if (requests == null) {
        requests = new HashSet<>();
        requests.add(new SuspectRequest(suspect, reason));
      }
      for (SuspectRequest request : requests) {
        if (suspect.equals(request.getSuspectMember())) {
          found = true;
          break;
        }
      }
      if (!found) {
        requests.add(new SuspectRequest(suspect, reason));
      }
    }
  }

  private void memberUnsuspected(InternalDistributedMember mbr) {
    synchronized (suspectRequestsInView) {
      if (suspectedMemberIds.remove(mbr) != null) {
        logger.info("No longer suspecting {}", mbr);
      }
      Collection<SuspectRequest> suspectRequests = suspectRequestsInView.get(currentView);
      if (suspectRequests != null) {
        Collection<SuspectRequest> removals = new ArrayList<>(suspectRequests.size());
        for (SuspectRequest suspectRequest : suspectRequests) {
          if (mbr.equals(suspectRequest.getSuspectMember())) {
            removals.add(suspectRequest);
          }
        }
        suspectRequests.removeAll(removals);
      }
    }
  }

  @Override
  public void beSick() {
    this.beingSick = true;
  }

  @Override
  public void playDead() {
    this.playingDead = true;
  }

  @Override
  public void beHealthy() {
    this.beingSick = false;
    this.playingDead = false;
  }

  @Override
  public void emergencyClose() {
    stopServices();
  }

  @Override
  public void setLocalAddress(InternalDistributedMember idm) {
    this.localAddress = idm;
  }

  void processMessage(HeartbeatRequestMessage m) {
    if (isStopping) {
      return;
    }
    if (beingSick || playingDead) {
      logger.debug("sick member is ignoring check request");
      return;
    }

    this.stats.incHeartbeatRequestsReceived();

    if (this.isStopping) {
      return;
    }

    // only respond if the intended recipient is this member
    InternalDistributedMember me = localAddress;

    if (me == null || me.getVmViewId() >= 0 && m.getTarget().equals(me)) {
      HeartbeatMessage hm = new HeartbeatMessage(m.getRequestId());
      hm.setRecipient(m.getSender());
      Set<InternalDistributedMember> membersNotReceivedMsg = services.getMessenger().send(hm);
      this.stats.incHeartbeatsSent();
      if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(m.getSender())) {
        logger.debug("Unable to send heartbeat to member: {}", m.getSender());
      }
    } else {
      logger.debug("Ignoring heartbeat request intended for {}.  My ID is {}", m.getTarget(), me);
    }
  }



  void processMessage(HeartbeatMessage m) {
    if (isStopping) {
      return;
    }
    if (beingSick || playingDead) {
      logger.debug("sick member is ignoring check response");
      return;
    }

    this.stats.incHeartbeatsReceived();
    if (m.getRequestId() >= 0) {
      Response resp = requestIdVsResponse.get(m.getRequestId());
      logger.trace("Got heartbeat from member {}. {}", m.getSender(),
          (resp != null ? "Check thread still waiting" : "Check thread is not waiting"));
      if (resp != null) {
        synchronized (resp) {
          resp.setResponseMsg(m);
          resp.notify();
        }
      }

    }
    // we got heartbeat lets update timestamp
    contactedBy(m.getSender(), System.currentTimeMillis());
  }

  /**
   * Process a Suspect request from another member. This may cause this member to become the new
   * membership coordinator. it will to final check on that member and then it will send remove
   * request for that member
   */
  void processMessage(SuspectMembersMessage incomingRequest) {
    if (isStopping) {
      return;
    }
    if (beingSick || playingDead) {
      logger.debug("sick member is ignoring suspect message");
      return;
    }

    this.stats.incSuspectsReceived();

    NetView cv = currentView;

    if (cv == null) {
      return;
    }

    List<SuspectRequest> suspectRequests = incomingRequest.getMembers();

    InternalDistributedMember sender = incomingRequest.getSender();
    int viewId = sender.getVmViewId();
    if (cv.getViewId() >= viewId && !cv.contains(incomingRequest.getSender())) {
      logger.info("Membership ignoring suspect request for " + incomingRequest + " from non-member "
          + incomingRequest.getSender());
      services.getJoinLeave().remove(sender,
          "this process is initiating suspect processing but is no longer a member");
      return;
    }

    // take care of any suspicion of this member by sending a heartbeat back
    for (Iterator<SuspectRequest> it = incomingRequest.getMembers().iterator(); it.hasNext();) {
      SuspectRequest req = it.next();
      if (req.getSuspectMember().equals(localAddress)) {
        HeartbeatMessage message = new HeartbeatMessage(-1);
        message.setRecipient(sender);
        try {
          services.getMessenger().send(message);
          this.stats.incHeartbeatsSent();
          it.remove();
        } catch (CancelException e) {
          return;
        }
      }
    }

    logger.debug(
        "Processing suspect requests {}\nproposed view is currently {}\nwith coordinator {}",
        suspectRequests, cv, cv.getCoordinator());
    if (cv.getCoordinator().equals(localAddress)) {
      // This process is the membership coordinator and should perform a final check
      logSuspectRequests(incomingRequest, sender);
      checkIfAvailable(sender, suspectRequests, cv);

    } else {
      // Another process has raised suspicion - check to see if
      // this process should become the membership coordinator if
      // all current suspects are gone
      NetView check = new NetView(cv, cv.getViewId() + 1);
      ArrayList<SuspectRequest> membersToCheck = new ArrayList<>();
      synchronized (suspectRequestsInView) {
        recordSuspectRequests(suspectRequests, cv);
        Set<SuspectRequest> suspectsInView = suspectRequestsInView.get(cv);
        logger.debug("Current suspects are {}", suspectsInView);
        for (final SuspectRequest sr : suspectsInView) {
          check.remove(sr.getSuspectMember());
          membersToCheck.add(sr);
        }
      }
      List membersLeaving = new ArrayList();
      for (InternalDistributedMember member : cv.getMembers()) {
        if (services.getJoinLeave().isMemberLeaving(member)) {
          membersLeaving.add(member);
        }
      }
      if (!membersLeaving.isEmpty()) {
        logger.debug("Current leave requests are {}", membersLeaving);
        check.removeAll(membersLeaving);
      }
      logger.trace(
          "Proposed view with suspects & leaving members removed is {}\nwith coordinator {}\nmy address is {}",
          check,
          check.getCoordinator(), localAddress);

      InternalDistributedMember coordinator = check.getCoordinator();
      if (coordinator != null && coordinator.equals(localAddress)) {
        // new coordinator
        logSuspectRequests(incomingRequest, sender);
        checkIfAvailable(sender, membersToCheck, cv);
      }
    }

  }

  void processMessage(FinalCheckPassedMessage m) {
    if (isStopping) {
      return;
    }
    contactedBy(m.getSuspect());
  }



  private void logSuspectRequests(SuspectMembersMessage incomingRequest,
      InternalDistributedMember sender) {
    for (SuspectRequest req : incomingRequest.getMembers()) {
      String who = sender.equals(localAddress) ? "myself" : sender.toString();
      logger.info("received suspect message from {} for {}: {}", who, req.getSuspectMember(),
          req.getReason());
    }
  }

  /***
   * This method make sure that records suspectRequest. We need to make sure this on preferred
   * coordinators, as elder coordinator might be in suspected list next.
   */
  private void recordSuspectRequests(List<SuspectRequest> suspectRequests, NetView cv) {
    // record suspect requests
    Set<SuspectRequest> suspectedMembers;
    synchronized (suspectRequestsInView) {
      suspectedMembers = suspectRequestsInView.get(cv);
      if (suspectedMembers == null) {
        suspectedMembers = new HashSet<>();
        suspectRequestsInView.put(cv, suspectedMembers);
      }
      suspectedMembers.addAll(suspectRequests);
    }
  }

  /**
   * performs a "final" health check on the member. If failure-detection socket information is
   * available for the member (in the view) then we attempt to connect to its socket and ask if it's
   * the expected member. Otherwise we send a heartbeat request and wait for a reply.
   */
  private void checkIfAvailable(final InternalDistributedMember initiator,
      List<SuspectRequest> sMembers, final NetView cv) {

    for (final SuspectRequest sr : sMembers) {
      final InternalDistributedMember mbr = sr.getSuspectMember();

      if (!cv.contains(mbr) || membersInFinalCheck.contains(mbr)) {
        continue;
      }

      if (mbr.equals(localAddress)) {
        continue;// self
      }

      final String reason = sr.getReason();
      logger.debug("Scheduling availability check for member {}; reason={}", mbr, reason);
      // its a coordinator
      checkExecutor.execute(() -> {
        try {
          inlineCheckIfAvailable(initiator, cv, true, mbr, reason);
        } catch (CancelException e) {
          // shutting down
        } catch (Exception e) {
          logger.info("Unexpected exception while verifying member", e);
        }
      });
    }
  }

  protected boolean inlineCheckIfAvailable(final InternalDistributedMember initiator,
      final NetView cv, boolean forceRemovalIfCheckFails, final InternalDistributedMember mbr,
      final String reason) {

    if (services.getJoinLeave().isMemberLeaving(mbr)) {
      return false;
    }

    boolean failed = false;

    logger.info("Performing availability check for suspect member {} reason={}", mbr, reason);
    membersInFinalCheck.add(mbr);
    setNextNeighbor(currentView, mbr);

    try {
      services.memberSuspected(initiator, mbr, reason);
      long startTime = System.currentTimeMillis();
      // for some reason we used to update the timestamp for the member
      // with the startTime, but we don't want to do that because it looks
      // like a heartbeat has been received

      boolean pinged;
      int port = cv.getFailureDetectionPort(mbr);
      if (port <= 0) {
        logger.info("Unable to locate failure detection port - requesting a heartbeat");
        if (logger.isDebugEnabled()) {
          logger.debug("\ncurrent view: {}\nports: {}", cv,
              Arrays.toString(cv.getFailureDetectionPorts()));
        }
        pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
        GMSHealthMonitor.this.stats.incFinalCheckRequestsSent();
        GMSHealthMonitor.this.stats.incUdpFinalCheckRequestsSent();
        if (pinged) {
          GMSHealthMonitor.this.stats.incFinalCheckResponsesReceived();
          GMSHealthMonitor.this.stats.incUdpFinalCheckResponsesReceived();
        }
      } else {
        // this will just send heartbeat request, it will not wait for response
        // if we will get heartbeat then it will change the timestamp, which we are
        // checking below in case of tcp check failure..
        doCheckMember(mbr, false);
        // now, while waiting for a heartbeat, try connecting to the suspect's failure detection
        // port
        final boolean retryIfConnectFails = forceRemovalIfCheckFails;
        pinged = doTCPCheckMember(mbr, port, retryIfConnectFails);
      }

      if (!pinged && !isStopping) {
        failed = true;
        TimeStamp ts = memberTimeStamps.get(mbr);
        if (ts == null || ts.getTime() < startTime) {
          logger.info("Availability check failed for member {}", mbr);
          // if the final check fails & this VM is the coordinator we don't need to do another final
          // check
          if (forceRemovalIfCheckFails) {
            logger.info("Requesting removal of suspect member {}", mbr);
            services.getJoinLeave().remove(mbr, reason);
            // make sure it is still suspected
            memberSuspected(localAddress, mbr, reason);
          } else {
            // if this node can survive an availability check then initiate suspicion about
            // the node that failed the availability check
            if (doTCPCheckMember(localAddress, this.socketPort, false)) {
              membersInFinalCheck.remove(mbr);
              // tell peers about this member and then perform another availability check
              memberSuspected(localAddress, mbr, reason);
              initiateSuspicion(mbr, reason);
              SuspectMembersMessage suspectMembersMessage =
                  new SuspectMembersMessage(Collections.singletonList(localAddress),
                      Collections
                          .singletonList(new SuspectRequest(mbr, "failed availability check")));
              suspectMembersMessage.setSender(localAddress);
              logger.debug("Performing local processing on suspect request");
              processMessage(suspectMembersMessage);
            } else {
              logger.info(
                  "Self-check for availability failed - will not continue to suspect {} for now",
                  mbr);
              failed = false;
            }
          }
        } else {
          logger.info(
              "Availability check failed but detected recent message traffic for suspect member "
                  + mbr);
          failed = false;
        }
      }

      if (!failed) {
        if (!isStopping && !initiator.equals(localAddress)
            && initiator.getVersionObject().compareTo(Version.GEODE_1_3_0) >= 0) {
          // let the sender know that it's okay to monitor this member again
          FinalCheckPassedMessage message = new FinalCheckPassedMessage(initiator, mbr);
          services.getMessenger().send(message);
        }

        logger.info("Availability check passed for suspect member " + mbr);
      }
    } finally {
      if (!failed) {
        memberUnsuspected(mbr);
        setNextNeighbor(currentView, null);
      }
      membersInFinalCheck.remove(mbr);
    }
    return !failed;
  }

  @Override
  public void memberShutdown(DistributedMember mbr, String reason) {}

  @Override
  public int getFailureDetectionPort() {
    return this.socketPort;
  }

  private void sendSuspectRequest(final List<SuspectRequest> requests) {
    logger.debug("Sending suspect request for members {}", requests);
    List<InternalDistributedMember> recipients;
    if (currentView.size() > ServiceConfig.SMALL_CLUSTER_SIZE) {
      HashSet<InternalDistributedMember> filter = new HashSet<>();
      for (Enumeration<InternalDistributedMember> e = suspectedMemberIds.keys(); e
          .hasMoreElements();) {
        filter.add(e.nextElement());
      }
      filter.addAll(
          requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList()));
      recipients =
          currentView.getPreferredCoordinators(filter, services.getJoinLeave().getMemberID(),
              ServiceConfig.SMALL_CLUSTER_SIZE + 1);
    } else {
      recipients = currentView.getMembers();
    }

    logger.trace("Sending suspect messages to {}", recipients);
    SuspectMembersMessage smm = new SuspectMembersMessage(recipients, requests);
    smm.setSender(localAddress);
    Set<InternalDistributedMember> failedRecipients;
    try {
      failedRecipients = services.getMessenger().send(smm);
      this.stats.incSuspectsSent();
    } catch (CancelException e) {
      return;
    }

    if (failedRecipients != null && failedRecipients.size() > 0) {
      logger.trace("Unable to send suspect message to {}", failedRecipients);
    }
    logger.trace("Processing suspect message locally");
    processMessage(smm);
  }

  private static class ConnectTimeoutTask extends TimerTask implements ConnectionWatcher {

    final Timer scheduler;
    Socket socket;
    final long timeout;

    ConnectTimeoutTask(Timer scheduler, long timeout) {
      this.scheduler = scheduler;
      this.timeout = timeout;
    }

    @Override
    public void beforeConnect(Socket socket) {
      this.socket = socket;
      scheduler.schedule(this, timeout);
    }

    @Override
    public void afterConnect(Socket socket) {
      cancel();
    }

    @Override
    public void run() {
      try {
        if (socket != null) {
          socket.close();
        }
      } catch (IOException e) {
        // ignored - nothing useful to do here
      }
    }

  }

  public DMStats getStats() {
    return this.stats;
  }
}
