/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.distributed.internal.membership.gms.messenger;

import static org.apache.geode.distributed.internal.membership.gms.GMSUtil.replaceStrings;
import static org.apache.geode.distributed.internal.membership.gms.messages.AbstractGMSMessage.ALL_RECIPIENTS;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.FIND_COORDINATOR_REQ;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.FIND_COORDINATOR_RESP;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.JOIN_REQUEST;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.JOIN_RESPONSE;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.logging.log4j.Logger;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.Event;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Message.Flag;
import org.jgroups.Message.TransientFlag;
import org.jgroups.Receiver;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.ViewId;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.UDP;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.NakAckHeader2;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.Digest;
import org.jgroups.util.UUID;

import org.apache.geode.ForcedDisconnectException;
import org.apache.geode.GemFireConfigException;
import org.apache.geode.GemFireIOException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.SystemConnectException;
import org.apache.geode.alerting.internal.spi.AlertingAction;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.gms.GMSMemberData;
import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
import org.apache.geode.distributed.internal.membership.gms.GMSUtil;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.api.MemberData;
import org.apache.geode.distributed.internal.membership.gms.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.gms.api.MembershipConfig;
import org.apache.geode.distributed.internal.membership.gms.api.MembershipStatistics;
import org.apache.geode.distributed.internal.membership.gms.interfaces.GMSMessage;
import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
import org.apache.geode.distributed.internal.membership.gms.messages.JoinRequestMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.JoinResponseMessage;
import org.apache.geode.internal.ClassPathLoader;
import org.apache.geode.internal.OSProcess;
import org.apache.geode.internal.cache.DistributedCacheOperation;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.serialization.BufferDataOutputStream;
import org.apache.geode.internal.serialization.StaticSerialization;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.serialization.VersionedDataInputStream;
import org.apache.geode.internal.tcp.MemberShunnedException;

@SuppressWarnings("StatementWithEmptyBody")
public class JGroupsMessenger implements Messenger {

  private static final Logger logger = Services.getLogger();

  /**
   * The location (in the product) of the non-mcast Jgroups config file.
   */
  private static final String DEFAULT_JGROUPS_TCP_CONFIG =
      "org/apache/geode/distributed/internal/membership/gms/messenger/jgroups-config.xml";

  /**
   * The location (in the product) of the mcast Jgroups config file.
   */
  private static final String JGROUPS_MCAST_CONFIG_FILE_NAME =
      "org/apache/geode/distributed/internal/membership/gms/messenger/jgroups-mcast.xml";

  /** JG magic numbers for types added to the JG ClassConfigurator */
  private static final short JGROUPS_TYPE_JGADDRESS = 2000;
  private static final short JGROUPS_PROTOCOL_TRANSPORT = 1000;

  @MutableForTesting
  public static boolean THROW_EXCEPTION_ON_START_HOOK;

  protected String jgStackConfig;

  JChannel myChannel;
  MemberIdentifier localAddress;
  JGAddress jgAddress;
  private Services services;

  /** handlers that receive certain classes of messages instead of the Manager */
  private final Map<Class, MessageHandler> handlers = new ConcurrentHashMap<>();

  private volatile GMSMembershipView view;

  protected final GMSPingPonger pingPonger = new GMSPingPonger();

  protected final AtomicLong pongsReceived = new AtomicLong(0);

  /** tracks multicast messages that have been scheduled for processing */
  protected final Map<MemberIdentifier, MessageTracker> scheduledMcastSeqnos = new HashMap<>();

  protected short nackack2HeaderId;

  /**
   * A set that contains addresses that we have logged JGroups IOExceptions for in the current
   * membership view and possibly initiated suspect processing. This reduces the amount of suspect
   * processing initiated by IOExceptions and the amount of exceptions logged
   */
  private final Set<Address> addressesWithIoExceptionsProcessed =
      Collections.synchronizedSet(new HashSet<Address>());

  static {
    // register classes that we've added to jgroups that are put on the wire
    // or need a header ID
    ClassConfigurator.add(JGROUPS_TYPE_JGADDRESS, JGAddress.class);
    ClassConfigurator.addProtocol(JGROUPS_PROTOCOL_TRANSPORT, Transport.class);
  }

  private GMSEncrypt encrypt;

  /**
   * Member identifiers already used, either in this JGroupsMessenger instance
   * or in a past one & retained through an auto-reconnect.
   */
  private Set<MemberIdentifier> usedMemberIdentifiers = new HashSet<>();

  /**
   * During reconnect a QuorumChecker holds the JGroups channel and responds to Ping
   * and Pong messages but also queues any messages it doesn't recognize. These need
   * to be delivered to handlers after membership services have been rebuilt.
   */
  private Queue<Message> queuedMessagesFromReconnect;

  /**
   * The JGroupsReceiver is handed messages by the JGroups Channel. It is responsible
   * for deserializating and dispatching those messages to the appropriate handler
   */
  private JGroupsReceiver jgroupsReceiver;

  public static void setChannelReceiver(JChannel channel, Receiver r) {
    try {
      // Channel.setReceiver() will issue a warning if we try to set a new receiver
      // and the channel already has one. Rather than set the receiver to null &
      // then establish a new one we use reflection to set the channel receiver. See GEODE-7220
      Field receiver = Channel.class.getDeclaredField("receiver");
      receiver.setAccessible(true);
      receiver.set(channel, r);
    } catch (NoSuchFieldException | IllegalAccessException e) {
      throw new InternalGemFireException("unable to establish a JGroups receiver", e);
    }
  }

  @Override
  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
      value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
  public void init(Services s) {
    this.services = s;

    MembershipConfig config = services.getConfig();


    boolean enableNetworkPartitionDetection = config.getEnableNetworkPartitionDetection();
    System.setProperty("jgroups.resolve_dns", String.valueOf(!enableNetworkPartitionDetection));

    InputStream is;

    String r;
    if (config.isMcastEnabled()) {
      r = JGROUPS_MCAST_CONFIG_FILE_NAME;
    } else {
      r = DEFAULT_JGROUPS_TCP_CONFIG;
    }
    is = ClassPathLoader.getLatest().getResourceAsStream(getClass(), r);
    if (is == null) {
      throw new GemFireConfigException(
          String.format("Cannot find %s", r));
    }

    String properties;
    try {
      StringBuilder sb = new StringBuilder(3000);
      BufferedReader br;
      br = new BufferedReader(new InputStreamReader(is, "US-ASCII"));
      String input;
      while ((input = br.readLine()) != null) {
        sb.append(input);
      }
      br.close();
      properties = sb.toString();
    } catch (Exception ex) {
      throw new GemFireConfigException(
          "An Exception was thrown while reading JGroups config.",
          ex);
    }

    if (properties.startsWith("<!--")) {
      int commentEnd = properties.indexOf("-->");
      properties = properties.substring(commentEnd + 3);
    }


    if (config.isMcastEnabled()) {
      properties = replaceStrings(properties, "MCAST_PORT",
          String.valueOf(config.getMcastPort()));
      properties =
          replaceStrings(properties, "MCAST_ADDRESS", config.getMcastAddress().getHostAddress());
      properties = replaceStrings(properties, "MCAST_TTL", String.valueOf(config.getMcastTtl()));
      properties = replaceStrings(properties, "MCAST_SEND_BUFFER_SIZE",
          String.valueOf(config.getMcastSendBufferSize()));
      properties = replaceStrings(properties, "MCAST_RECV_BUFFER_SIZE",
          String.valueOf(config.getMcastRecvBufferSize()));
      properties = replaceStrings(properties, "MCAST_RETRANSMIT_INTERVAL", "" + Integer
          .getInteger(DistributionConfig.GEMFIRE_PREFIX + "mcast-retransmit-interval", 500));
      properties = replaceStrings(properties, "RETRANSMIT_LIMIT",
          String.valueOf(config.getUdpFragmentSize() - 256));
    }

    if (config.isMcastEnabled() || config.isTcpDisabled()
        || (config.getUdpRecvBufferSize() != DistributionConfig.DEFAULT_UDP_RECV_BUFFER_SIZE)) {
      properties =
          replaceStrings(properties, "UDP_RECV_BUFFER_SIZE", "" + config.getUdpRecvBufferSize());
    } else {
      properties = replaceStrings(properties, "UDP_RECV_BUFFER_SIZE",
          "" + DistributionConfig.DEFAULT_UDP_RECV_BUFFER_SIZE_REDUCED);
    }
    properties =
        replaceStrings(properties, "UDP_SEND_BUFFER_SIZE", "" + config.getUdpSendBufferSize());

    String str = config.getBindAddress();
    // JGroups UDP protocol requires a bind address
    if (str == null || str.length() == 0) {
      try {
        str = SocketCreator.getLocalHost().getHostAddress();
      } catch (UnknownHostException e) {
        throw new GemFireConfigException(e.getMessage(), e);
      }
    }
    properties = replaceStrings(properties, "BIND_ADDR_SETTING", "bind_addr=\"" + str + "\"");

    int port = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "jg-bind-port", 0);
    if (port != 0) {
      properties = replaceStrings(properties, "MEMBERSHIP_PORT_RANGE_START", "" + port);
      properties = replaceStrings(properties, "MEMBERSHIP_PORT_RANGE", "" + 0);
    } else {
      int[] ports = config.getMembershipPortRange();
      properties = replaceStrings(properties, "MEMBERSHIP_PORT_RANGE_START", "" + ports[0]);
      properties = replaceStrings(properties, "MEMBERSHIP_PORT_RANGE", "" + (ports[1] - ports[0]));
    }

    properties = replaceStrings(properties, "UDP_FRAGMENT_SIZE", "" + config.getUdpFragmentSize());

    properties = replaceStrings(properties, "FC_MAX_CREDITS",
        "" + config.getMcastByteAllowance());
    properties = replaceStrings(properties, "FC_THRESHOLD",
        "" + config.getMcastRechargeThreshold());
    properties = replaceStrings(properties, "FC_MAX_BLOCK",
        "" + config.getMcastRechargeBlockMs());

    this.jgStackConfig = properties;

    if (!config.getSecurityUDPDHAlgo().isEmpty()) {
      try {
        this.encrypt = new GMSEncrypt(services, config.getSecurityUDPDHAlgo());
        logger.info("Initializing GMSEncrypt ");
      } catch (Exception e) {
        throw new GemFireConfigException("problem initializing encryption protocol", e);
      }
    }
  }

  @Override
  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
      value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
  public void start() {
    // create the configuration XML string for JGroups
    String properties = this.jgStackConfig;

    long start = System.currentTimeMillis();

    // start the jgroups channel and establish the membership ID
    boolean reconnecting = false;
    try {
      Object oldDSMembershipInfo = services.getConfig().getOldDSMembershipInfo();
      if (oldDSMembershipInfo != null) {
        logger.debug("Reusing JGroups channel from previous system", properties);
        MembershipInformation oldInfo = (MembershipInformation) oldDSMembershipInfo;
        myChannel = oldInfo.getChannel();
        usedMemberIdentifiers = oldInfo.getMembershipIdentifiers();
        queuedMessagesFromReconnect = oldInfo.getQueuedMessages();

        // scrub the old channel
        ViewId vid = new ViewId(new JGAddress(), 0);
        List<Address> members = new ArrayList<>();
        members.add(new UUID(0, 0));// TODO open a JGroups JIRA for GEODE-3034
        View jgv = new View(vid, members);
        this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv));
        // attempt to establish a new UUID in the jgroups channel so the member address will be
        // different
        try {
          Method setAddressMethod = JChannel.class.getDeclaredMethod("setAddress");
          setAddressMethod.setAccessible(true);
          setAddressMethod.invoke(myChannel);
        } catch (SecurityException | NoSuchMethodException e) {
          logger.warn("Unable to establish a new JGroups address.  "
              + "My address will be exactly the same as last time. Exception={}",
              e.getMessage());
        }
        reconnecting = true;
      } else {
        logger.debug("JGroups configuration: {}", properties);

        checkForIPv6();
        InputStream is = new ByteArrayInputStream(properties.getBytes("UTF-8"));
        myChannel = new JChannel(is);
      }
    } catch (Exception e) {
      throw new GemFireConfigException("unable to create jgroups channel", e);
    }

    // give the stats to the jchannel statistics recorder
    StatRecorder sr = (StatRecorder) myChannel.getProtocolStack().findProtocol(StatRecorder.class);
    if (sr != null) {
      sr.setServices(services);
    }

    Transport transport = (Transport) myChannel.getProtocolStack().getTransport();
    transport.setMessenger(this);

    nackack2HeaderId = ClassConfigurator.getProtocolId(NAKACK2.class);

    try {
      jgroupsReceiver = new JGroupsReceiver();
      setChannelReceiver(myChannel, jgroupsReceiver);
      if (!reconnecting) {
        myChannel.connect("AG"); // Apache Geode
      }
    } catch (Exception e) {
      myChannel.close();
      throw new SystemConnectException("unable to create jgroups channel", e);
    }

    if (JGroupsMessenger.THROW_EXCEPTION_ON_START_HOOK) {
      JGroupsMessenger.THROW_EXCEPTION_ON_START_HOOK = false;
      throw new SystemConnectException("failing for test");
    }

    establishLocalAddress();

    logger.info("JGroups channel {} (took {}ms)", (reconnecting ? "reinitialized" : "created"),
        System.currentTimeMillis() - start);

  }

  @Override
  public boolean isOldMembershipIdentifier(MemberIdentifier id) {
    return usedMemberIdentifiers.contains(id);
  }

  /**
   * JGroups picks an IPv6 address if preferIPv4Stack is false or not set and preferIPv6Addresses is
   * not set or is true. We want it to use an IPv4 address for a dual-IP stack so that both IPv4 and
   * IPv6 messaging work
   */
  private void checkForIPv6() throws Exception {
    boolean preferIpV6Addr = Boolean.getBoolean("java.net.preferIPv6Addresses");
    if (!preferIpV6Addr) {
      logger.debug("forcing JGroups to think IPv4 is being used so it will choose an IPv4 address");
      Field m = org.jgroups.util.Util.class.getDeclaredField("ip_stack_type");
      m.setAccessible(true);
      m.set(null, org.jgroups.util.StackType.IPv4);
    }
  }

  @Override
  public void started() {
    if (queuedMessagesFromReconnect != null && !services.getConfig().isUDPSecurityEnabled()) {
      logger.info("Delivering {} messages queued by quorum checker",
          queuedMessagesFromReconnect.size());
      for (Message message : queuedMessagesFromReconnect) {
        jgroupsReceiver.receive(message, true);
      }
      queuedMessagesFromReconnect.clear();
      queuedMessagesFromReconnect = null;
    }
  }

  @Override
  public void stop() {
    if (localAddress != null && localAddress.getVmViewId() >= 0) {
      // keep track of old addresses that were used to successfully join the cluster
      usedMemberIdentifiers.add(localAddress);
    }
    if (this.myChannel != null) {
      if ((services.isShutdownDueToForcedDisconnect() && services.isAutoReconnectEnabled())
          || services.getManager().isReconnectingDS()) {
        // leave the channel open for reconnect attempts
      } else {
        this.myChannel.close();
      }
    }
  }

  @Override
  public void stopped() {}

  @Override
  public void memberSuspected(MemberIdentifier initiator,
      MemberIdentifier suspect, String reason) {}

  @Override
  public void installView(GMSMembershipView v) {
    this.view = v;

    if (this.jgAddress.getVmViewId() < 0) {
      this.jgAddress.setVmViewId(this.localAddress.getVmViewId());
    }
    List<JGAddress> mbrs = v.getMembers().stream().map(JGAddress::new).collect(Collectors.toList());
    ViewId vid = new ViewId(new JGAddress(v.getCoordinator()), v.getViewId());
    View jgv = new View(vid, new ArrayList<>(mbrs));
    logger.trace("installing view into JGroups stack: {}", jgv);
    this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv));

    addressesWithIoExceptionsProcessed.clear();
    if (encrypt != null) {
      encrypt.installView(v);
    }
    synchronized (scheduledMcastSeqnos) {
      for (MemberIdentifier mbr : v.getCrashedMembers()) {
        scheduledMcastSeqnos.remove(mbr);
      }
      for (MemberIdentifier mbr : v.getShutdownMembers()) {
        scheduledMcastSeqnos.remove(mbr);
      }
    }
  }


  /**
   * If JGroups is unable to send a message it may mean that the network is down. If so we need to
   * initiate suspect processing on the recipient.
   * <p>
   * see Transport._send()
   */
  @SuppressWarnings("UnusedParameters")
  public void handleJGroupsIOException(IOException e, Address dest) {
    if (services.getManager().shutdownInProgress()) { // GEODE-634 - don't log IOExceptions during
                                                      // shutdown
      return;
    }
    GMSMembershipView v = this.view;
    JGAddress jgMbr = (JGAddress) dest;
    if (jgMbr != null && v != null) {
      List<MemberIdentifier> members = v.getMembers();
      MemberIdentifier recipient = null;
      for (MemberIdentifier gmsMbr : members) {
        MemberData memberData = gmsMbr.getMemberData();
        if (jgMbr.getUUIDLsbs() == memberData.getUuidLeastSignificantBits()
            && jgMbr.getUUIDMsbs() == memberData.getUuidMostSignificantBits()
            && jgMbr.getVmViewId() == memberData.getVmViewId()) {
          recipient = gmsMbr;
          break;
        }
      }
      if (recipient != null) {
        if (!addressesWithIoExceptionsProcessed.contains(dest)) {
          logger.warn("Unable to send message to " + recipient, e);
          addressesWithIoExceptionsProcessed.add(dest);
        }
        // If communications aren't working we need to resolve the issue quickly, so here
        // we initiate a final check. Prior to becoming open-source we did a similar check
        // using JGroups VERIFY_SUSPECT
        services.getHealthMonitor().checkIfAvailable(recipient,
            "Unable to send messages to this member via JGroups", true);
      }
    }
  }

  private void establishLocalAddress() {
    UUID logicalAddress = (UUID) myChannel.getAddress();
    logicalAddress = logicalAddress.copy();

    IpAddress ipaddr = (IpAddress) myChannel.down(new Event(Event.GET_PHYSICAL_ADDRESS));

    if (ipaddr != null) {
      this.jgAddress = new JGAddress(logicalAddress, ipaddr);
    } else {
      UDP udp = (UDP) myChannel.getProtocolStack().getTransport();

      try {
        Method getAddress = UDP.class.getDeclaredMethod("getPhysicalAddress");
        getAddress.setAccessible(true);
        ipaddr = (IpAddress) getAddress.invoke(udp, new Object[0]);
        this.jgAddress = new JGAddress(logicalAddress, ipaddr);
      } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
        throw new InternalGemFireError(
            "Unable to configure JGroups channel for membership communications", e);
      }
    }

    // install the address in the JGroups channel protocols
    myChannel.down(new Event(Event.SET_LOCAL_ADDRESS, this.jgAddress));

    MembershipConfig config = services.getConfig();
    boolean isLocator = (config
        .getVmKind() == MemberIdentifier.LOCATOR_DM_TYPE)
        || !config.getStartLocator().isEmpty();

    // establish the DistributedSystem's address
    String hostname =
        SocketCreator.resolve_dns ? SocketCreator.getHostName(jgAddress.getInetAddress())
            : jgAddress.getInetAddress().getHostAddress();
    GMSMemberData gmsMember = new GMSMemberData(jgAddress.getInetAddress(),
        hostname, jgAddress.getPort(),
        OSProcess.getId(), (byte) services.getConfig().getVmKind(),
        -1 /* directport */, -1 /* viewID */, config.getName(),
        GMSUtil.parseGroups(config.getRoles(), config.getGroups()), config.getDurableClientId(),
        config.getDurableClientTimeout(),
        config.getEnableNetworkPartitionDetection(), isLocator,
        Version.getCurrentVersion().ordinal(),
        jgAddress.getUUIDMsbs(), jgAddress.getUUIDLsbs(),
        (byte) (services.getConfig().getMemberWeight() & 0xff));
    localAddress = services.getMemberFactory().create(gmsMember);
    logger.info("Established local address {}", localAddress);
    services.setLocalAddress(localAddress);
  }

  @Override
  public void beSick() {}

  @Override
  public void playDead() {}

  @Override
  public void beHealthy() {}

  @Override
  public <T> void addHandler(Class<T> c, MessageHandler<T> h) {
    handlers.put(c, h);
  }

  @Override
  public boolean testMulticast(long timeout) throws InterruptedException {
    long pongsSnapshot = pongsReceived.longValue();
    JGAddress dest = null;
    try {
      // noinspection ConstantConditions
      pingPonger.sendPingMessage(myChannel, jgAddress, dest);
    } catch (Exception e) {
      logger.warn("unable to send multicast message: {}",
          (jgAddress == null ? "multicast recipients" : jgAddress), e.getMessage());
      return false;
    }
    long giveupTime = System.currentTimeMillis() + timeout;
    while (pongsReceived.longValue() == pongsSnapshot && System.currentTimeMillis() < giveupTime) {
      Thread.sleep(100);
    }
    return pongsReceived.longValue() > pongsSnapshot;
  }

  @Override
  public void getMessageState(MemberIdentifier target, Map<String, Long> state,
      boolean includeMulticast) {
    if (includeMulticast) {
      NAKACK2 nakack = (NAKACK2) myChannel.getProtocolStack().findProtocol("NAKACK2");
      if (nakack != null) {
        long seqno = nakack.getCurrentSeqno();
        state.put("JGroups.mcastState", seqno);
      }
    }
  }

  @Override
  public void waitForMessageState(MemberIdentifier sender, Map<String, Long> state)
      throws InterruptedException {
    Long seqno = state.get("JGroups.mcastState");
    if (seqno == null) {
      return;
    }
    long timeout = services.getConfig().getAckWaitThreshold() * 1000L;
    long startTime = System.currentTimeMillis();
    long warnTime = startTime + timeout;
    long quitTime = warnTime + timeout - 1000L;
    boolean warned = false;

    for (;;) {
      String received = "none";
      long highSeqno = 0;
      synchronized (scheduledMcastSeqnos) {
        MessageTracker tracker = scheduledMcastSeqnos.get(sender);
        if (tracker == null) { // no longer in the membership view
          break;
        }
        highSeqno = tracker.get();
      }

      if (logger.isDebugEnabled()) {
        logger.debug(
            "waiting for multicast messages from {}.  Current seqno={} and expected seqno={}",
            sender, highSeqno, seqno);
      }
      if (highSeqno >= seqno) {
        break;
      }
      long now = System.currentTimeMillis();
      if (!warned && now >= warnTime) {
        warned = true;
        received = String.valueOf(highSeqno);
        logger.warn(
            "{} seconds have elapsed while waiting for multicast messages from {}.  Received {} but expecting at least {}.",
            Long.toString((warnTime - startTime) / 1000L), sender, received, seqno);
      }
      if (now >= quitTime) {
        throw new GemFireIOException("Multicast operations from " + sender
            + " did not distribute within " + (now - startTime) + " milliseconds");
      }
      Thread.sleep(50);
    }
  }

  @Override
  public Set<MemberIdentifier> sendUnreliably(GMSMessage msg) {
    return send(msg, false);
  }

  @Override
  public Set<MemberIdentifier> send(GMSMessage msg) {
    return send(msg, true);
  }

  private Set<MemberIdentifier> send(GMSMessage msg, boolean reliably) {

    // perform the same jgroups messaging as in 8.2's GMSMembershipManager.send() method

    // BUT: when marshalling messages we need to include the version of the product and
    // localAddress at the beginning of the message. These should be used in the receiver
    // code to create a versioned input stream, read the sender address, then read the message
    // and set its sender address
    MembershipStatistics theStats = services.getStatistics();
    GMSMembershipView oldView = this.view;

    if (!myChannel.isConnected()) {
      logger.info("JGroupsMessenger channel is closed - messaging is not possible");
      throw new DistributedSystemDisconnectedException("Distributed System is shutting down");
    }

    filterOutgoingMessage(msg);

    List<MemberIdentifier> destinations = msg.getRecipients();
    boolean allDestinations = msg.forAll();

    boolean useMcast = false;
    if (services.getConfig().isMcastEnabled()) {
      if (msg.getMulticast() || allDestinations) {
        useMcast = services.getManager().isMulticastAllowed();
      }
    }

    if (logger.isDebugEnabled() && reliably) {
      String recips = useMcast ? "multicast" : destinations.toString();
      logger.debug("sending via JGroups: [{}] recipients: {}", msg, recips);
    }

    JGAddress local = this.jgAddress;

    if (useMcast) {

      long startSer = theStats.startMsgSerialization();
      Message jmsg =
          createJGMessage(msg, local, null, Version.getCurrentVersion().ordinal());
      theStats.endMsgSerialization(startSer);

      Exception problem;
      try {
        jmsg.setTransientFlag(TransientFlag.DONT_LOOPBACK);
        if (!reliably) {
          jmsg.setFlag(Message.Flag.NO_RELIABILITY);
        }
        theStats.incSentBytes(jmsg.getLength());
        logger.trace("Sending JGroups message: {}", jmsg);
        myChannel.send(jmsg);
      } catch (Exception e) {
        logger.debug("caught unexpected exception", e);
        Throwable cause = e.getCause();
        if (cause instanceof ForcedDisconnectException) {
          problem = (Exception) cause;
        } else {
          problem = e;
        }
        if (services.getShutdownCause() != null) {
          Throwable shutdownCause = services.getShutdownCause();
          // If ForcedDisconnectException occurred then report it as actual
          // problem.
          if (shutdownCause instanceof ForcedDisconnectException) {
            problem = (Exception) shutdownCause;
          } else {
            Throwable ne = problem;
            while (ne.getCause() != null) {
              ne = ne.getCause();
            }
            ne.initCause(services.getShutdownCause());
          }
        }
        final String channelClosed =
            "Channel closed";
        throw new DistributedSystemDisconnectedException(channelClosed, problem);
      }
    } // useMcast
    else { // ! useMcast
      int len = destinations.size();
      List<MemberIdentifier> calculatedMembers; // explicit list of members
      int calculatedLen; // == calculatedMembers.len
      if (len == 1 && destinations.get(0) == ALL_RECIPIENTS) { // send to all
        // Grab a copy of the current membership
        GMSMembershipView v = services.getJoinLeave().getView();

        // Construct the list
        calculatedLen = v.size();
        calculatedMembers = new LinkedList<MemberIdentifier>();
        for (int i = 0; i < calculatedLen; i++) {
          MemberIdentifier m = (MemberIdentifier) v.get(i);
          calculatedMembers.add((MemberIdentifier) m);
        }
      } // send to all
      else { // send to explicit list
        calculatedLen = len;
        calculatedMembers = new LinkedList<MemberIdentifier>();
        for (int i = 0; i < calculatedLen; i++) {
          calculatedMembers.add((MemberIdentifier) destinations.get(i));
        }
      } // send to explicit list
      Int2ObjectOpenHashMap<Message> messages = new Int2ObjectOpenHashMap<>();
      long startSer = theStats.startMsgSerialization();
      boolean firstMessage = true;
      for (MemberIdentifier mbr : calculatedMembers) {
        short version = mbr.getVersionOrdinal();
        if (!messages.containsKey(version)) {
          Message jmsg = createJGMessage(msg, local, mbr, version);
          messages.put(version, jmsg);
          if (firstMessage) {
            theStats.incSentBytes(jmsg.getLength());
            firstMessage = false;
          }
        }
      }
      theStats.endMsgSerialization(startSer);
      Collections.shuffle(calculatedMembers);
      int i = 0;
      for (MemberIdentifier mbr : calculatedMembers) {
        JGAddress to = new JGAddress(mbr);
        short version = mbr.getVersionOrdinal();
        Message jmsg = messages.get(version);
        Exception problem = null;
        try {
          Message tmp = (i < (calculatedLen - 1)) ? jmsg.copy(true) : jmsg;
          if (!reliably) {
            jmsg.setFlag(Message.Flag.NO_RELIABILITY);
          }
          tmp.setDest(to);
          tmp.setSrc(this.jgAddress);
          logger.trace("Unicasting to {}", to);
          myChannel.send(tmp);
        } catch (Exception e) {
          problem = e;
        }
        if (problem != null) {
          Throwable cause = services.getShutdownCause();
          if (cause != null) {
            // If ForcedDisconnectException occurred then report it as actual
            // problem.
            if (cause instanceof ForcedDisconnectException) {
              problem = (Exception) cause;
            } else {
              Throwable ne = problem;
              while (ne.getCause() != null) {
                ne = ne.getCause();
              }
              ne.initCause(cause);
            }
          }
          final String channelClosed =
              "Channel closed";
          throw new DistributedSystemDisconnectedException(channelClosed, problem);
        }
      } // send individually
    } // !useMcast

    // The contract is that every destination enumerated in the
    // message should have received the message. If one left
    // (i.e., left the view), we signal it here.
    if (msg.forAll()) {
      return Collections.emptySet();
    }
    Set<MemberIdentifier> result = new HashSet<>();
    GMSMembershipView newView = this.view;
    if (newView != null && newView != oldView) {
      for (MemberIdentifier d : destinations) {
        if (!newView.contains(d)) {
          logger.debug("messenger: member has left the view: {}  view is now {}", d, newView);
          result.add(d);
        }
      }
    }
    return result;
  }

  /**
   * This is the constructor to use to create a JGroups message holding a GemFire
   * DistributionMessage. It sets the appropriate flags in the Message and properly serializes the
   * DistributionMessage for the recipient's product version
   *
   * @param gfmsg the DistributionMessage
   * @param src the sender address
   * @param version the version of the recipient
   * @return the new message
   */
  Message createJGMessage(GMSMessage gfmsg, JGAddress src, MemberIdentifier dst, short version) {
    gfmsg.registerProcessor();
    Message msg = new Message();
    msg.setDest(null);
    msg.setSrc(src);
    setMessageFlags(gfmsg, msg);
    try {
      long start = services.getStatistics().startMsgSerialization();
      BufferDataOutputStream out_stream =
          new BufferDataOutputStream(Version.fromOrdinalNoThrow((short) version, false));
      Version.writeOrdinal(out_stream,
          Version.getCurrentVersion().ordinal(), true);
      if (encrypt != null) {
        out_stream.writeBoolean(true);
        writeEncryptedMessage(gfmsg, dst, version, out_stream);
      } else {
        out_stream.writeBoolean(false);
        serializeMessage(gfmsg, out_stream);
      }

      msg.setBuffer(out_stream.toByteArray());
      services.getStatistics().endMsgSerialization(start);
    } catch (IOException | GemFireIOException ex) {
      logger.warn("Error serializing message", ex);
      if (ex instanceof GemFireIOException) {
        throw (GemFireIOException) ex;
      } else {
        GemFireIOException ioe = new GemFireIOException("Error serializing message");
        ioe.initCause(ex);
        throw ioe;
      }
    } catch (Exception ex) {
      logger.warn("Error serializing message", ex);
      GemFireIOException ioe = new GemFireIOException("Error serializing message");
      ioe.initCause(ex.getCause());
      throw ioe;
    }
    return msg;
  }

  void writeEncryptedMessage(GMSMessage gfmsg, MemberIdentifier recipient, short version,
      BufferDataOutputStream out)
      throws Exception {
    long start = services.getStatistics().startUDPMsgEncryption();
    try {
      services.getSerializer().writeDSFIDHeader(gfmsg.getDSFID(), out);
      byte[] pk = null;
      int requestId = 0;
      MemberIdentifier pkMbr = null;
      switch (gfmsg.getDSFID()) {
        case FIND_COORDINATOR_REQ:
        case JOIN_REQUEST:
          // need to append mine PK
          pk = encrypt.getPublicKey(localAddress);
          pkMbr = recipient;
          requestId = getRequestId(gfmsg, pkMbr, true);
          break;
        case FIND_COORDINATOR_RESP:
        case JOIN_RESPONSE:
          pkMbr = recipient;
          requestId = getRequestId(gfmsg, pkMbr, false);
        default:
          break;
      }
      if (logger.isDebugEnabled()) {
        logger.debug("writeEncryptedMessage gfmsg.getDSFID() = {} for {} with requestid {}",
            gfmsg.getDSFID(), pkMbr, requestId);
      }
      out.writeInt(requestId);
      if (pk != null) {
        StaticSerialization.writeByteArray(pk, out);
      }

      BufferDataOutputStream out_stream =
          new BufferDataOutputStream(Version.fromOrdinalNoThrow((short) version, false));
      byte[] messageBytes = serializeMessage(gfmsg, out_stream);

      if (pkMbr != null) {
        // using members private key
        messageBytes = encrypt.encryptData(messageBytes, pkMbr);
      } else {
        // using cluster secret key
        messageBytes = encrypt.encryptData(messageBytes);
      }
      StaticSerialization.writeByteArray(messageBytes, out);
    } finally {
      services.getStatistics().endUDPMsgEncryption(start);
    }
  }

  int getRequestId(GMSMessage gfmsg, MemberIdentifier destination, boolean add) {
    int requestId = 0;
    if (gfmsg instanceof FindCoordinatorRequest) {
      requestId = ((FindCoordinatorRequest) gfmsg).getRequestId();
    } else if (gfmsg instanceof JoinRequestMessage) {
      requestId = ((JoinRequestMessage) gfmsg).getRequestId();
    } else if (gfmsg instanceof FindCoordinatorResponse) {
      requestId = ((FindCoordinatorResponse) gfmsg).getRequestId();
    } else if (gfmsg instanceof JoinResponseMessage) {
      requestId = ((JoinResponseMessage) gfmsg).getRequestId();
    }

    if (add) {
      addRequestId(requestId, destination);
    }

    return requestId;
  }

  byte[] serializeMessage(GMSMessage gfmsg, BufferDataOutputStream out_stream)
      throws IOException {
    MemberIdentifier m = this.localAddress;
    m.getMemberData().writeEssentialData(out_stream,
        services.getSerializer().createSerializationContext(out_stream));
    services.getSerializer().getObjectSerializer()
        .writeObject(services.getManager().unwrapMessage(gfmsg), out_stream);

    return out_stream.toByteArray();
  }

  void setMessageFlags(GMSMessage gfmsg, Message msg) {
    // Bundling is mostly only useful if we're doing no-ack work,
    // which is fairly rare
    msg.setFlag(Flag.DONT_BUNDLE);

    if (gfmsg.isHighPriority() || AlertingAction.isThreadAlerting()) {
      msg.setFlag(Flag.OOB);
      msg.setFlag(Flag.NO_FC);
      msg.setFlag(Flag.SKIP_BARRIER);
    }

    msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
  }


  /**
   * deserialize a jgroups payload. If it's a DistributionMessage find the ID of the sender and
   * establish it as the message's sender
   */
  Object readJGMessage(Message jgmsg) {
    Object result = null;

    int messageLength = jgmsg.getLength();

    if (logger.isTraceEnabled()) {
      logger.trace("deserializing a message of length " + messageLength);
    }

    if (messageLength == 0) {
      // jgroups messages with no payload are used for protocol interchange, such
      // as STABLE_GOSSIP
      logger.trace("message length is zero - ignoring");
      return null;
    }

    Exception problem = null;
    byte[] buf = jgmsg.getRawBuffer();
    try {
      long start = services.getStatistics().startMsgDeserialization();

      DataInputStream dis =
          new DataInputStream(new ByteArrayInputStream(buf, jgmsg.getOffset(), jgmsg.getLength()));

      short ordinal = Version.readOrdinal(dis);

      if (ordinal < Version.getCurrentVersion().ordinal()) {
        dis = new VersionedDataInputStream(dis,
            Version.fromOrdinalNoThrow((short) ordinal, false));
      }

      // read
      boolean isEncrypted = dis.readBoolean();

      if (isEncrypted && encrypt == null) {
        throw new GemFireConfigException("Got remote message as encrypted");
      }

      if (isEncrypted) {
        result = readEncryptedMessage(dis, ordinal, encrypt);
      } else {
        result = deserializeMessage(dis, ordinal);
      }


      services.getStatistics().endMsgDeserialization(start);
    } catch (ClassNotFoundException | IOException | RuntimeException e) {
      problem = e;
    } catch (Exception e) {
      problem = e;
    }
    if (problem != null) {
      logger.error(String.format("Exception deserializing message payload: %s", jgmsg),
          problem);
      return null;
    }

    return result;
  }

  void setSender(GMSMessage dm, MemberIdentifier m, short ordinal) {
    MemberIdentifier sender = null;
    // JoinRequestMessages are sent with an ID that may have been
    // reused from a previous life by way of auto-reconnect,
    // so we don't want to find a canonical reference for the
    // request's sender ID
    if (dm.getDSFID() == JOIN_REQUEST) {
      sender = ((JoinRequestMessage) dm).getMemberID();
    } else {
      sender = getMemberFromView(m, ordinal);
    }
    dm.setSender(sender);
  }

  @SuppressWarnings("resource")
  GMSMessage readEncryptedMessage(DataInputStream dis, short ordinal,
      GMSEncrypt encryptLocal) throws Exception {
    int dfsid = services.getSerializer().readDSFIDHeader(dis);
    int requestId = dis.readInt();
    long start = services.getStatistics().startUDPMsgDecryption();
    try {
      if (logger.isDebugEnabled()) {
        logger.debug("readEncryptedMessage Reading Request id " + dfsid + " and requestid is "
            + requestId + " myid " + this.localAddress);
      }
      MemberIdentifier pkMbr = null;
      boolean readPK = false;
      switch (dfsid) {
        case FIND_COORDINATOR_REQ:
        case JOIN_REQUEST:
          readPK = true;
          break;
        case FIND_COORDINATOR_RESP:
        case JOIN_RESPONSE:
          // this will have requestId to know the PK
          pkMbr = getRequestedMember(requestId);
          break;
      }

      byte[] data;

      byte[] pk = null;

      if (readPK) {
        pk = StaticSerialization.readByteArray(dis);
        data = StaticSerialization.readByteArray(dis);
        // using prefixed pk from sender
        data = encryptLocal.decryptData(data, pk);
      } else {
        data = StaticSerialization.readByteArray(dis);
        // from cluster key
        if (pkMbr != null) {
          // using member public key
          data = encryptLocal.decryptData(data, pkMbr);
        } else {
          // from cluster key
          data = encryptLocal.decryptData(data);
        }
      }

      {
        DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));

        if (ordinal < Version.getCurrentVersion().ordinal()) {
          in = new VersionedDataInputStream(in,
              Version.fromOrdinalNoThrow((short) ordinal, false));
        }

        GMSMessage result = deserializeMessage(in, ordinal);

        if (pk != null) {
          logger.info("Setting public key for " + result.getSender() + " len " + pk.length);
          setPublicKey(pk, (MemberIdentifier) result.getSender());
        }

        return result;
      }
    } catch (Exception e) {
      throw new Exception("Message id is " + dfsid, e);
    } finally {
      services.getStatistics().endUDPMsgDecryption(start);
    }

  }

  GMSMessage deserializeMessage(DataInputStream in, short ordinal)
      throws ClassNotFoundException, IOException {
    GMSMemberData info = new GMSMemberData();
    info.readEssentialData(in, services.getSerializer().createDeserializationContext(in));
    MemberIdentifier m = services.getMemberFactory().create(info);
    GMSMessage result = services.getManager()
        .wrapMessage(services.getSerializer().getObjectDeserializer().readObject(in));

    setSender(result, m, ordinal);

    return result;
  }

  /** look for certain messages that may need to be altered before being sent */
  void filterOutgoingMessage(GMSMessage m) {
    switch (m.getDSFID()) {
      case JOIN_RESPONSE:
        JoinResponseMessage jrsp = (JoinResponseMessage) m;

        if (jrsp.getRejectionMessage() == null
            && services.getConfig().isMulticastEnabled()) {
          // get the multicast message digest and pass it with the join response
          Digest digest = (Digest) this.myChannel.getProtocolStack().getTopProtocol()
              .down(Event.GET_DIGEST_EVT);
          BufferDataOutputStream hdos = new BufferDataOutputStream(500, Version.CURRENT);
          try {
            digest.writeTo(hdos);
          } catch (Exception e) {
            logger.fatal("Unable to serialize JGroups messaging digest", e);
          }
          jrsp.setMessengerData(hdos.toByteArray());
        }
        break;
      default:
        break;
    }
  }

  void filterIncomingMessage(GMSMessage m) {
    switch (m.getDSFID()) {
      case JOIN_RESPONSE:
        JoinResponseMessage jrsp = (JoinResponseMessage) m;

        if (jrsp.getRejectionMessage() == null
            && services.getConfig().isMulticastEnabled()) {
          byte[] serializedDigest = jrsp.getMessengerData();
          ByteArrayInputStream bis = new ByteArrayInputStream(serializedDigest);
          DataInputStream dis = new DataInputStream(bis);
          try {
            Digest digest = new Digest();
            digest.readFrom(dis);
            logger.trace("installing JGroups message digest {} from {}", digest, m);
            this.myChannel.getProtocolStack().getTopProtocol()
                .down(new Event(Event.MERGE_DIGEST, digest));
            jrsp.setMessengerData(null);
          } catch (Exception e) {
            logger.fatal("Unable to read JGroups messaging digest", e);
          }
        }
        break;
      default:
        break;
    }
  }

  @Override
  public MemberIdentifier getMemberID() {
    return localAddress;
  }

  /**
   * returns the member ID for the given GMSMember object
   */
  @SuppressWarnings("UnusedParameters")
  private MemberIdentifier getMemberFromView(MemberIdentifier jgId, short version) {
    return this.services.getJoinLeave().getMemberID(jgId);
  }


  @Override
  public void emergencyClose() {
    this.view = null;
    if (localAddress.getVmViewId() >= 0) {
      // keep track of old addresses that were used to successfully join the cluster
      usedMemberIdentifiers.add(localAddress);
    }
    if (this.myChannel != null) {
      if ((services.isShutdownDueToForcedDisconnect() && services.isAutoReconnectEnabled())
          || services.getManager().isReconnectingDS()) {
      } else {
        this.myChannel.disconnect();
      }
    }
  }

  @Override
  public GMSQuorumChecker getQuorumChecker() {
    GMSMembershipView view = this.view;
    if (view == null) {
      view = services.getJoinLeave().getView();
      if (view == null) {
        view = services.getJoinLeave().getPreviousView();
        if (view == null) {
          return null;
        }
      }
    }
    GMSQuorumChecker qc =
        new GMSQuorumChecker(view, services.getConfig().getLossThreshold(), this.myChannel,
            usedMemberIdentifiers);
    qc.initialize();
    return qc;
  }

  /**
   * JGroupsReceiver receives incoming JGroups messages and passes them to a handler. It may be
   * accessed through JChannel.getReceiver().
   */
  class JGroupsReceiver extends ReceiverAdapter {

    @Override
    public void receive(Message jgmsg) {
      receive(jgmsg, false);
    }

    private void receive(Message jgmsg, boolean fromQuorumChecker) {
      long startTime = services.getStatistics().startUDPDispatchRequest();
      try {
        if (services.getManager().shutdownInProgress()) {
          return;
        }

        if (logger.isTraceEnabled()) {
          logger.trace("JGroupsMessenger received {} headers: {}", jgmsg, jgmsg.getHeaders());
        }

        // Respond to ping messages sent from other systems that are in a auto reconnect state
        byte[] contents = jgmsg.getBuffer();
        if (contents == null) {
          return;
        }
        if (pingPonger.isPingMessage(contents)) {
          try {
            pingPonger.sendPongMessage(myChannel, jgAddress, jgmsg.getSrc());
          } catch (Exception e) {
            logger.info("Failed sending Pong response to " + jgmsg.getSrc());
          }
          return;
        } else if (pingPonger.isPongMessage(contents)) {
          pongsReceived.incrementAndGet();
          return;
        }

        Object o = readJGMessage(jgmsg);
        if (o == null) {
          return;
        }

        GMSMessage msg = services.getManager().wrapMessage(o);

        // admin-only VMs don't have caches, so we ignore cache operations
        // multicast to them, avoiding deserialization cost and classpath
        // problems
        if ((services.getConfig()
            .getVmKind() == MemberIdentifier.ADMIN_ONLY_DM_TYPE)
            && (msg instanceof DistributedCacheOperation.CacheOperationMessage)) {
          return;
        }

        msg.resetTimestamp();
        msg.setBytesRead(jgmsg.getLength());

        try {

          if (logger.isTraceEnabled()) {
            logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender());
          }
          filterIncomingMessage(msg);
          MessageHandler handler = getMessageHandler(msg);
          if (fromQuorumChecker && handler instanceof HealthMonitor) {
            // ignore suspect / heartbeat messages that happened during
            // auto-reconnect because they very likely have old member IDs in them
          } else {
            handler.processMessage(msg);
          }

          // record the scheduling of broadcast messages
          NakAckHeader2 header = (NakAckHeader2) jgmsg.getHeader(nackack2HeaderId);
          if (header != null && !jgmsg.isFlagSet(Flag.OOB)) {
            recordScheduledSeqno(msg.getSender(), header.getSeqno());
          }

        } catch (MemberShunnedException e) {
          // message from non-member - ignore
        }

      } finally {
        JGroupsMessenger.this.services.getStatistics().endUDPDispatchRequest(startTime);
      }
    }

    private void recordScheduledSeqno(MemberIdentifier member, long seqno) {
      synchronized (scheduledMcastSeqnos) {
        MessageTracker counter = scheduledMcastSeqnos.get(member);
        if (counter == null) {
          counter = new MessageTracker(seqno);
          scheduledMcastSeqnos.put(member, counter);
        }
        counter.record(seqno);
      }
    }

    /**
     * returns the handler that should process the given message. The default handler is the
     * membership manager
     */
    private MessageHandler getMessageHandler(GMSMessage msg) {
      Class<?> msgClazz = msg.getClass();
      MessageHandler h = handlers.get(msgClazz);
      if (h == null) {
        for (Class<?> clazz : handlers.keySet()) {
          if (clazz.isAssignableFrom(msgClazz)) {
            h = handlers.get(clazz);
            handlers.put(msg.getClass(), h);
            break;
          }
        }
      }
      if (h == null) {
        h = services.getManager();
      }
      return h;
    }
  }

  @Override
  public Set<MemberIdentifier> send(GMSMessage msg, GMSMembershipView alternateView) {
    if (this.encrypt != null) {
      this.encrypt.installView(alternateView);
    }
    return send(msg, true);
  }

  @Override
  public byte[] getPublicKey(MemberIdentifier mbr) {
    if (encrypt != null) {
      return encrypt.getPublicKey(mbr);
    }
    return null;
  }

  @Override
  public void setPublicKey(byte[] publickey, MemberIdentifier mbr) {
    if (encrypt != null) {
      logger.debug("Setting PK for member " + mbr);
      encrypt.setPublicKey(publickey, mbr);
    }
  }

  @Override
  public void setClusterSecretKey(byte[] clusterSecretKey) {
    if (encrypt != null) {
      logger.debug("Setting cluster key");
      encrypt.setClusterKey(clusterSecretKey);
    }
  }

  @Override
  public byte[] getClusterSecretKey() {
    if (encrypt != null) {
      return encrypt.getClusterSecretKey();
    }
    return null;
  }

  private AtomicInteger requestId = new AtomicInteger((new Random().nextInt()));
  private HashMap<Integer, MemberIdentifier> requestIdVsRecipients = new HashMap<>();

  MemberIdentifier getRequestedMember(int requestId) {
    return requestIdVsRecipients.remove(requestId);
  }

  void addRequestId(int requestId, MemberIdentifier mbr) {
    requestIdVsRecipients.put(requestId, mbr);
  }

  @Override
  public int getRequestId() {
    return requestId.incrementAndGet();
  }

  @Override
  public void initClusterKey() {
    if (encrypt != null) {
      try {
        logger.info("Initializing cluster key");
        encrypt.initClusterSecretKey();
      } catch (Exception e) {
        throw new RuntimeException("unable to create cluster key ", e);
      }
    }
  }

  static class MessageTracker {
    long highestSeqno;

    MessageTracker(long seqno) {
      highestSeqno = seqno;
    }

    long get() {
      return highestSeqno;
    }

    void record(long seqno) {
      if (seqno > highestSeqno) {
        highestSeqno = seqno;
      }
    }
  }
}
