/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.distributed.internal.membership.gms.messenger;

import static org.apache.geode.distributed.ConfigurationProperties.ACK_WAIT_THRESHOLD;
import static org.apache.geode.distributed.ConfigurationProperties.DISABLE_TCP;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_TTL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.commons.lang3.SerializationException;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.util.Digest;
import org.jgroups.util.UUID;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.ForcedDisconnectException;
import org.apache.geode.GemFireIOException;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.SerialAckedMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.NetView;
import org.apache.geode.distributed.internal.membership.gms.GMSMember;
import org.apache.geode.distributed.internal.membership.gms.ServiceConfig;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.Services.Stopper;
import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
import org.apache.geode.distributed.internal.membership.gms.messages.InstallViewMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.JoinRequestMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.JoinResponseMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.LeaveRequestMessage;
import org.apache.geode.distributed.internal.membership.gms.messenger.JGroupsMessenger.JGroupsReceiver;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.admin.remote.RemoteTransportConfig;
import org.apache.geode.internal.alerting.AlertingAction;
import org.apache.geode.internal.cache.DistributedCacheOperation;
import org.apache.geode.internal.statistics.StatisticsRegistry;
import org.apache.geode.test.junit.categories.MembershipTest;

@Category({MembershipTest.class})
public class JGroupsMessengerJUnitTest {

  private static final String AES_128 = "AES:128";

  private Services services;
  private JGroupsMessenger messenger;
  private JoinLeave joinLeave;
  private Manager manager;
  private Stopper stopper;
  private HealthMonitor healthMonitor;
  private InterceptUDP interceptor;
  private long statsId = 123;

  private void initMocks(boolean enableMcast) throws Exception {
    initMocks(enableMcast, new Properties());
  }

  /**
   * Create stub and mock objects
   */
  private void initMocks(boolean enableMcast, Properties addProp) throws Exception {
    if (messenger != null) {
      messenger.stop();
      messenger = null;
    }
    Properties nonDefault = new Properties();
    nonDefault.put(DISABLE_TCP, "true");
    nonDefault.put(MCAST_PORT,
        enableMcast ? "" + AvailablePortHelper.getRandomAvailableUDPPort() : "0");
    nonDefault.put(MCAST_TTL, "0");
    nonDefault.put(LOG_FILE, "");
    nonDefault.put(LOG_LEVEL, "fine");
    nonDefault.put(LOCATORS, "localhost[10344]");
    nonDefault.put(ACK_WAIT_THRESHOLD, "1");
    nonDefault.putAll(addProp);
    DistributionConfigImpl config = new DistributionConfigImpl(nonDefault);
    RemoteTransportConfig tconfig =
        new RemoteTransportConfig(config, ClusterDistributionManager.NORMAL_DM_TYPE);

    stopper = mock(Stopper.class);
    when(stopper.isCancelInProgress()).thenReturn(false);

    manager = mock(Manager.class);
    when(manager.isMulticastAllowed()).thenReturn(enableMcast);

    healthMonitor = mock(HealthMonitor.class);

    joinLeave = mock(JoinLeave.class);

    ServiceConfig serviceConfig = new ServiceConfig(tconfig, config);

    services = mock(Services.class);
    when(services.getConfig()).thenReturn(serviceConfig);
    when(services.getCancelCriterion()).thenReturn(stopper);
    when(services.getHealthMonitor()).thenReturn(healthMonitor);
    when(services.getManager()).thenReturn(manager);
    when(services.getJoinLeave()).thenReturn(joinLeave);

    DistributionManager dm = mock(DistributionManager.class);
    InternalDistributedSystem system =
        new InternalDistributedSystem.BuilderForTesting(nonDefault)
            .setDistributionManager(dm)
            .setStatisticsManagerFactory(
                (name, startTime, statsDisabled) -> new StatisticsRegistry(name, startTime))
            .build();
    when(services.getStatistics()).thenReturn(new DistributionStats(system, statsId));

    messenger = new JGroupsMessenger();
    messenger.init(services);

    // if I do this earlier then test this return messenger as null
    when(services.getMessenger()).thenReturn(messenger);

    String jgroupsConfig = messenger.jgStackConfig;
    int startIdx = jgroupsConfig.indexOf("<org");
    int insertIdx = jgroupsConfig.indexOf('>', startIdx + 4) + 1;
    jgroupsConfig = jgroupsConfig.substring(0, insertIdx) + "<" + InterceptUDP.class.getName()
        + "/>" + jgroupsConfig.substring(insertIdx);
    messenger.jgStackConfig = jgroupsConfig;
    // System.out.println("jgroups config: " + jgroupsConfig);

    messenger.start();
    messenger.started();

    interceptor =
        (InterceptUDP) messenger.myChannel.getProtocolStack().getTransport().getUpProtocol();

  }

  @After
  public void stopMessenger() {
    if (messenger != null && messenger.myChannel != null) {
      messenger.stop();
    }
  }

  @Test
  public void ioExceptionInitiatesSuspectProcessing() throws Exception {
    // see GEODE-634
    initMocks(false);
    NetView v = createView();
    when(joinLeave.getView()).thenReturn(v);
    messenger.installView(v);
    messenger.handleJGroupsIOException(new IOException("je m'en fiche"),
        new JGAddress(v.getMembers().get(1)));
    verify(healthMonitor).suspect(isA(InternalDistributedMember.class), isA(String.class));
  }

  @Test
  public void ioExceptionDuringShutdownAvoidsSuspectProcessing() throws Exception {
    // see GEODE-634
    initMocks(false);
    NetView v = createView();
    when(joinLeave.getView()).thenReturn(v);
    when(manager.shutdownInProgress()).thenReturn(true);
    messenger.installView(v);
    messenger.handleJGroupsIOException(new IOException("fichez-moi le camp"),
        new JGAddress(v.getMembers().get(1)));
    verify(healthMonitor, never()).checkIfAvailable(isA(InternalDistributedMember.class),
        isA(String.class), isA(Boolean.class));
  }

  private NetView createView() {
    InternalDistributedMember sender = messenger.getMemberID();
    List<InternalDistributedMember> mbrs = new ArrayList<>();
    mbrs.add(sender);
    mbrs.add(createAddress(100));
    mbrs.add(createAddress(101));
    NetView v = new NetView(sender, 1, mbrs);
    return v;
  }

  @Test
  public void alertMessagesBypassFlowControl() throws Exception {
    initMocks(false);
    Message jgmsg = new Message();
    DistributionMessage dmsg = mock(DistributionMessage.class);
    when(dmsg.getProcessorType()).thenReturn(ClusterDistributionManager.SERIAL_EXECUTOR);
    messenger.setMessageFlags(dmsg, jgmsg);
    assertFalse("expected no_fc to not be set in " + jgmsg.getFlags(),
        jgmsg.isFlagSet(Message.Flag.NO_FC));
    AlertingAction.execute(() -> {
      messenger.setMessageFlags(dmsg, jgmsg);
      assertTrue("expected no_fc to be set in " + jgmsg.getFlags(),
          jgmsg.isFlagSet(Message.Flag.NO_FC));
    });
  }

  @Test
  public void testMemberWeightIsSerialized() throws Exception {
    HeapDataOutputStream out = new HeapDataOutputStream(500, Version.CURRENT);
    InternalDistributedMember mbr = createAddress(8888);
    ((GMSMember) mbr.getNetMember()).setMemberWeight((byte) 40);
    mbr.toData(out);
    DataInputStream in = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
    mbr = new InternalDistributedMember();
    mbr.fromData(in);
    assertEquals(40, mbr.getNetMember().getMemberWeight());
  }

  @Test
  public void testSerializationError() throws Exception {
    for (int i = 0; i < 2; i++) {
      boolean enableMcast = (i == 1);
      initMocks(enableMcast);
      InternalDistributedMember mbr = createAddress(8888);
      DistributedCacheOperation.CacheOperationMessage msg =
          mock(DistributedCacheOperation.CacheOperationMessage.class);
      when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
      when(msg.getMulticast()).thenReturn(enableMcast);
      if (!enableMcast) {
        // for non-mcast we send a message with a reply-processor
        when(msg.getProcessorId()).thenReturn(1234);
      } else {
        // for mcast we send a direct-ack message and expect the messenger
        // to register it
        when(msg.isDirectAck()).thenReturn(true);
      }
      when(msg.getDSFID()).thenReturn((int) DataSerializableFixedID.PUT_ALL_MESSAGE);

      // for code coverage we need to test with both a SerializationException and
      // an IOException. The former is wrapped in a GemfireIOException while the
      // latter is not
      doThrow(new SerializationException()).when(msg).toData(any(DataOutput.class));
      try {
        messenger.send(msg);
        fail("expected a failure");
      } catch (GemFireIOException e) {
        // success
      }
      if (enableMcast) {
        verify(msg, atLeastOnce()).registerProcessor();
      }
      doThrow(new IOException()).when(msg).toData(any(DataOutput.class));
      try {
        messenger.send(msg);
        fail("expected a failure");
      } catch (GemFireIOException e) {
        // success
      }
    }
  }

  @Test
  public void testJChannelError() throws Exception {
    for (int i = 0; i < 2; i++) {
      boolean enableMcast = (i == 1);
      initMocks(enableMcast);
      JChannel mockChannel = mock(JChannel.class);
      when(mockChannel.isConnected()).thenReturn(true);
      doThrow(new RuntimeException()).when(mockChannel).send(any(Message.class));
      JChannel realChannel = messenger.myChannel;
      messenger.myChannel = mockChannel;
      try {
        InternalDistributedMember mbr = createAddress(8888);
        DistributedCacheOperation.CacheOperationMessage msg =
            mock(DistributedCacheOperation.CacheOperationMessage.class);
        when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
        when(msg.getMulticast()).thenReturn(enableMcast);
        when(msg.getProcessorId()).thenReturn(1234);
        when(msg.getDSFID()).thenReturn((int) DataSerializableFixedID.PUT_ALL_MESSAGE);
        try {
          messenger.send(msg);
          fail("expected a failure");
        } catch (DistributedSystemDisconnectedException e) {
          // success
        }
        verify(mockChannel).send(isA(Message.class));
      } finally {
        messenger.myChannel = realChannel;
      }
    }
  }

  @Test
  public void testJChannelErrorDuringDisconnect() throws Exception {
    for (int i = 0; i < 4; i++) {
      System.out.println("loop #" + i);
      boolean enableMcast = (i % 2 == 1);
      initMocks(enableMcast);
      JChannel mockChannel = mock(JChannel.class);
      when(mockChannel.isConnected()).thenReturn(true);
      Exception ex, shutdownCause;
      if (i < 2) {
        ex = new RuntimeException("");
        shutdownCause = new RuntimeException("shutdownCause");
      } else {
        shutdownCause = new ForcedDisconnectException("");
        ex = new RuntimeException("", shutdownCause);
      }
      doThrow(ex).when(mockChannel).send(any(Message.class));
      JChannel realChannel = messenger.myChannel;
      messenger.myChannel = mockChannel;

      when(services.getShutdownCause()).thenReturn(shutdownCause);

      try {
        InternalDistributedMember mbr = createAddress(8888);
        DistributedCacheOperation.CacheOperationMessage msg =
            mock(DistributedCacheOperation.CacheOperationMessage.class);
        when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
        when(msg.getMulticast()).thenReturn(enableMcast);
        when(msg.getProcessorId()).thenReturn(1234);
        when(msg.getDSFID()).thenReturn((int) DataSerializableFixedID.PUT_ALL_MESSAGE);
        try {
          messenger.send(msg);
          fail("expected a failure");
        } catch (DistributedSystemDisconnectedException e) {
          // the ultimate cause should be the shutdownCause returned
          // by Services.getShutdownCause()
          Throwable cause = e;
          while (cause.getCause() != null) {
            cause = cause.getCause();
          }
          assertTrue(cause != e);
          assertTrue(cause == shutdownCause);
        }
        verify(mockChannel).send(isA(Message.class));
      } finally {
        messenger.myChannel = realChannel;
      }
    }
  }

  @Test
  public void testSendWhenChannelIsClosed() throws Exception {
    for (int i = 0; i < 2; i++) {
      initMocks(false);
      JChannel mockChannel = mock(JChannel.class);
      when(mockChannel.isConnected()).thenReturn(false);
      doThrow(new RuntimeException()).when(mockChannel).send(any(Message.class));
      JChannel realChannel = messenger.myChannel;
      messenger.myChannel = mockChannel;
      try {
        InternalDistributedMember mbr = createAddress(8888);
        DistributedCacheOperation.CacheOperationMessage msg =
            mock(DistributedCacheOperation.CacheOperationMessage.class);
        when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
        when(msg.getMulticast()).thenReturn(false);
        when(msg.getProcessorId()).thenReturn(1234);
        try {
          messenger.send(msg);
          fail("expected a failure");
        } catch (DistributedSystemDisconnectedException e) {
          // success
        }
        verify(mockChannel, never()).send(isA(Message.class));
      } finally {
        messenger.myChannel = realChannel;
      }
    }
  }

  @Test
  public void testSendUnreliably() throws Exception {
    for (int i = 0; i < 2; i++) {
      boolean enableMcast = (i == 1);
      initMocks(enableMcast);
      InternalDistributedMember mbr = createAddress(8888);
      DistributedCacheOperation.CacheOperationMessage msg =
          mock(DistributedCacheOperation.CacheOperationMessage.class);
      when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
      when(msg.getMulticast()).thenReturn(enableMcast);
      if (!enableMcast) {
        // for non-mcast we send a message with a reply-processor
        when(msg.getProcessorId()).thenReturn(1234);
      } else {
        // for mcast we send a direct-ack message and expect the messenger
        // to register it
        when(msg.isDirectAck()).thenReturn(true);
      }
      when(msg.getDSFID()).thenReturn((int) DataSerializableFixedID.PUT_ALL_MESSAGE);
      interceptor.collectMessages = true;
      try {
        messenger.sendUnreliably(msg);
      } catch (GemFireIOException e) {
        fail("expected success");
      }
      if (enableMcast) {
        verify(msg, atLeastOnce()).registerProcessor();
      }
      verify(msg).toData(isA(DataOutput.class));
      assertTrue("expected 1 message but found " + interceptor.collectedMessages,
          interceptor.collectedMessages.size() == 1);
      assertTrue(interceptor.collectedMessages.get(0).isFlagSet(Message.Flag.NO_RELIABILITY));
    }
  }

  @Test
  public void testMessageDeliveredToHandler() throws Exception {
    doTestMessageDeliveredToHandler(false);
  }

  @Test
  public void testMessageDeliveredToHandlerMcast() throws Exception {
    doTestMessageDeliveredToHandler(true);
  }

  private void doTestMessageDeliveredToHandler(boolean mcast) throws Exception {
    initMocks(mcast);
    MessageHandler mh = mock(MessageHandler.class);
    messenger.addHandler(JoinRequestMessage.class, mh);

    InternalDistributedMember addr = messenger.getMemberID();
    NetView v = new NetView(addr);
    when(joinLeave.getView()).thenReturn(v);


    InternalDistributedMember sender = createAddress(8888);

    JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1, 0);

    Message jmsg = messenger.createJGMessage(msg, messenger.jgAddress, Version.CURRENT_ORDINAL);
    interceptor.up(new Event(Event.MSG, jmsg));

    verify(mh, times(1)).processMessage(any(JoinRequestMessage.class));

    LeaveRequestMessage lmsg = new LeaveRequestMessage(messenger.localAddress, sender, "testing");
    when(joinLeave.getMemberID(any())).thenReturn(sender);
    jmsg = messenger.createJGMessage(lmsg, messenger.jgAddress, Version.CURRENT_ORDINAL);
    interceptor.up(new Event(Event.MSG, jmsg));

    verify(manager).processMessage(any(LeaveRequestMessage.class));

  }


  @Test
  public void testBigMessageIsFragmented() throws Exception {
    doTestBigMessageIsFragmented(false, false);
  }

  @Test
  public void testBigMessageIsFragmentedMcast() throws Exception {
    doTestBigMessageIsFragmented(true, true);
  }

  @Test
  public void testBroadcastUDPMessage() throws Exception {
    doTestBigMessageIsFragmented(false, true);
  }

  public void doTestBigMessageIsFragmented(boolean mcastEnabled, boolean mcastMsg)
      throws Exception {
    initMocks(mcastEnabled);
    MessageHandler mh = mock(MessageHandler.class);
    messenger.addHandler(JoinRequestMessage.class, mh);

    InternalDistributedMember sender = messenger.getMemberID();
    NetView v = new NetView(sender);
    when(joinLeave.getView()).thenReturn(v);
    messenger.installView(v);
    JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1, 0);
    if (mcastMsg) {
      msg.setMulticast(true);
    }

    messenger.send(msg);
    int sentMessages = (mcastEnabled && mcastMsg) ? interceptor.mcastSentDataMessages
        : interceptor.unicastSentDataMessages;
    assertTrue("expected 1 message to be sent but found " + sentMessages, sentMessages == 1);

    // send a big message and expect fragmentation
    msg = new JoinRequestMessage(messenger.localAddress, sender,
        new byte[(int) (services.getConfig().getDistributionConfig().getUdpFragmentSize() * (1.5))],
        -1, 0);

    // configure an incoming message handler for JoinRequestMessage
    final JoinRequestMessage[] messageReceived = new JoinRequestMessage[1];
    messenger.addHandler(JoinRequestMessage.class, message -> messageReceived[0] = message);

    // configure the outgoing message interceptor
    interceptor.unicastSentDataMessages = 0;
    interceptor.collectMessages = true;
    interceptor.collectedMessages.clear();

    messenger.send(msg);

    assertTrue("expected 2 messages to be sent but found " + interceptor.unicastSentDataMessages,
        interceptor.unicastSentDataMessages == 2);

    List<Message> messages = new ArrayList<>(interceptor.collectedMessages);
    UUID fakeMember = new UUID(50, 50);
    short unicastHeaderId = ClassConfigurator.getProtocolId(UNICAST3.class);
    int seqno = 1;
    for (Message m : messages) {
      m.setSrc(fakeMember);
      UNICAST3.Header oldHeader = (UNICAST3.Header) m.getHeader(unicastHeaderId);
      if (oldHeader == null)
        continue;
      UNICAST3.Header newHeader =
          UNICAST3.Header.createDataHeader(seqno, oldHeader.connId(), seqno == 1);
      seqno += 1;
      m.putHeader(unicastHeaderId, newHeader);
      interceptor.up(new Event(Event.MSG, m));
    }
    Thread.sleep(5000);
    System.out.println("received message = " + messageReceived[0]);
  }

  @Test
  public void testSendToMultipleMembers() throws Exception {
    initMocks(false);
    InternalDistributedMember sender = messenger.getMemberID();
    InternalDistributedMember other = createAddress(8888);

    NetView v = new NetView(sender);
    v.add(other);
    when(joinLeave.getView()).thenReturn(v);
    messenger.installView(v);

    List<InternalDistributedMember> recipients = v.getMembers();
    SerialAckedMessage msg = new SerialAckedMessage();
    msg.setRecipients(recipients);

    messenger.send(msg);
    int sentMessages = interceptor.unicastSentDataMessages;
    assertTrue("expected 2 message to be sent but found " + sentMessages, sentMessages == 2);
  }

  @Test
  public void testChannelStillConnectedAfterEmergencyCloseAfterForcedDisconnectWithAutoReconnect()
      throws Exception {
    initMocks(false);
    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
    doCallRealMethod().when(services).getShutdownCause();
    doCallRealMethod().when(services).emergencyClose();
    doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
    doCallRealMethod().when(services).isAutoReconnectEnabled();
    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
    assertTrue(messenger.myChannel.isConnected());
    messenger.emergencyClose();
    assertTrue(messenger.myChannel.isConnected());
  }

  @Test
  public void testChannelStillConnectedAfterStopAfterForcedDisconnectWithAutoReconnect()
      throws Exception {
    initMocks(false);
    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
    doCallRealMethod().when(services).getShutdownCause();
    doCallRealMethod().when(services).emergencyClose();
    doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
    doCallRealMethod().when(services).isAutoReconnectEnabled();
    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
    assertTrue(messenger.myChannel.isConnected());
    messenger.stop();
    assertTrue(messenger.myChannel.isConnected());
  }

  @Test
  public void testChannelStillConnectedAfteremergencyWhileReconnectingDS() throws Exception {
    initMocks(false);
    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
    doCallRealMethod().when(services).getShutdownCause();
    doCallRealMethod().when(services).emergencyClose();
    doReturn(false).when(services).isShutdownDueToForcedDisconnect();
    doReturn(false).when(services).isAutoReconnectEnabled();
    doReturn(true).when(manager).isReconnectingDS();
    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
    assertTrue(messenger.myChannel.isConnected());
    messenger.emergencyClose();
    assertTrue(messenger.myChannel.isConnected());
  }


  @Test
  public void testChannelStillConnectedAfterStopWhileReconnectingDS() throws Exception {
    initMocks(false);
    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
    doCallRealMethod().when(services).getShutdownCause();
    doCallRealMethod().when(services).emergencyClose();
    doReturn(false).when(services).isShutdownDueToForcedDisconnect();
    doReturn(false).when(services).isAutoReconnectEnabled();
    doReturn(true).when(manager).isReconnectingDS();
    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
    assertTrue(messenger.myChannel.isConnected());
    messenger.stop();
    assertTrue(messenger.myChannel.isConnected());
  }

  @Test
  public void testChannelClosedOnEmergencyClose() throws Exception {
    initMocks(false);
    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
    doCallRealMethod().when(services).getShutdownCause();
    doCallRealMethod().when(services).emergencyClose();
    doReturn(false).when(services).isShutdownDueToForcedDisconnect();
    doReturn(false).when(services).isAutoReconnectEnabled();
    doReturn(false).when(manager).isReconnectingDS();
    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
    assertTrue(messenger.myChannel.isConnected());
    messenger.emergencyClose();
    assertFalse(messenger.myChannel.isConnected());
  }

  @Test
  public void testChannelClosedOnStop() throws Exception {
    initMocks(false);
    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
    doCallRealMethod().when(services).getShutdownCause();
    doCallRealMethod().when(services).emergencyClose();
    doReturn(false).when(services).isShutdownDueToForcedDisconnect();
    doReturn(false).when(services).isAutoReconnectEnabled();
    doReturn(false).when(manager).isReconnectingDS();
    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
    assertTrue(messenger.myChannel.isConnected());
    messenger.stop();
    assertFalse(messenger.myChannel.isConnected());
  }

  @Test
  public void testChannelClosedAfterEmergencyCloseForcedDisconnectWithoutAutoReconnect()
      throws Exception {
    initMocks(false);
    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
    doCallRealMethod().when(services).getShutdownCause();
    doCallRealMethod().when(services).emergencyClose();
    doReturn(true).when(services).isShutdownDueToForcedDisconnect();
    doReturn(false).when(services).isAutoReconnectEnabled();
    doReturn(false).when(manager).isReconnectingDS();
    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
    assertTrue(messenger.myChannel.isConnected());
    messenger.emergencyClose();
    assertFalse(messenger.myChannel.isConnected());
  }

  @Test
  public void testChannelStillConnectedStopAfterForcedDisconnectWithoutAutoReconnect()
      throws Exception {
    initMocks(false);
    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
    doCallRealMethod().when(services).getShutdownCause();
    doCallRealMethod().when(services).emergencyClose();
    doReturn(true).when(services).isShutdownDueToForcedDisconnect();
    doReturn(false).when(services).isAutoReconnectEnabled();
    doReturn(false).when(manager).isReconnectingDS();
    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
    assertTrue(messenger.myChannel.isConnected());
    messenger.stop();
    assertFalse(messenger.myChannel.isConnected());
  }

  @Test
  public void testChannelClosedAfterEmergencyCloseNotForcedDisconnectWithAutoReconnect()
      throws Exception {
    initMocks(false);
    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
    doCallRealMethod().when(services).getShutdownCause();
    doCallRealMethod().when(services).emergencyClose();
    doReturn(false).when(services).isShutdownDueToForcedDisconnect();
    doReturn(true).when(services).isAutoReconnectEnabled();
    doReturn(false).when(manager).isReconnectingDS();
    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
    assertTrue(messenger.myChannel.isConnected());
    messenger.emergencyClose();
    assertFalse(messenger.myChannel.isConnected());
  }

  @Test
  public void testChannelStillConnectedStopNotForcedDisconnectWithAutoReconnect() throws Exception {
    initMocks(false);
    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
    doCallRealMethod().when(services).getShutdownCause();
    doCallRealMethod().when(services).emergencyClose();
    doReturn(false).when(services).isShutdownDueToForcedDisconnect();
    doReturn(true).when(services).isAutoReconnectEnabled();
    doReturn(false).when(manager).isReconnectingDS();
    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
    assertTrue(messenger.myChannel.isConnected());
    messenger.stop();
    assertFalse(messenger.myChannel.isConnected());
  }

  @Test
  public void testMessageFiltering() throws Exception {
    initMocks(true);
    InternalDistributedMember mbr = createAddress(8888);
    NetView view = new NetView(mbr);

    // the digest should be set in an outgoing join response
    JoinResponseMessage joinResponse = new JoinResponseMessage(mbr, view, 0);
    messenger.filterOutgoingMessage(joinResponse);
    assertNotNull(joinResponse.getMessengerData());

    // save the view digest for later
    byte[] data = joinResponse.getMessengerData();

    // the digest should be used and the message bytes nulled out in an incoming join response
    messenger.filterIncomingMessage(joinResponse);
    assertNull(joinResponse.getMessengerData());

    // the digest shouldn't be set in an outgoing rejection message
    joinResponse =
        new JoinResponseMessage("you can't join my distributed system.  nyah nyah nyah!", 0);
    messenger.filterOutgoingMessage(joinResponse);
    assertNull(joinResponse.getMessengerData());

    // the digest shouldn't be installed from an incoming rejection message
    joinResponse.setMessengerData(data);
    messenger.filterIncomingMessage(joinResponse);
    assertNotNull(joinResponse.getMessengerData());
  }

  @Test
  public void testPingPong() throws Exception {
    initMocks(false);
    GMSPingPonger pinger = messenger.pingPonger;
    InternalDistributedMember mbr = createAddress(8888);
    JGAddress addr = new JGAddress(mbr);

    Message pingMessage = pinger.createPingMessage(null, addr);
    assertTrue(pinger.isPingMessage(pingMessage.getBuffer()));
    assertFalse(pinger.isPongMessage(pingMessage.getBuffer()));

    Message pongMessage = pinger.createPongMessage(null, addr);
    assertTrue(pinger.isPongMessage(pongMessage.getBuffer()));
    assertFalse(pinger.isPingMessage(pongMessage.getBuffer()));

    interceptor.collectMessages = true;
    pinger.sendPingMessage(messenger.myChannel, null, addr);
    assertEquals("expected 1 message but found " + interceptor.collectedMessages,
        interceptor.collectedMessages.size(), 1);
    pingMessage = interceptor.collectedMessages.get(0);
    assertTrue(pinger.isPingMessage(pingMessage.getBuffer()));

    interceptor.collectedMessages.clear();
    pinger.sendPongMessage(messenger.myChannel, null, addr);
    assertEquals("expected 1 message but found " + interceptor.collectedMessages,
        interceptor.collectedMessages.size(), 1);
    pongMessage = interceptor.collectedMessages.get(0);
    assertTrue(pinger.isPongMessage(pongMessage.getBuffer()));

    interceptor.collectedMessages.clear();
    JGroupsReceiver receiver = (JGroupsReceiver) messenger.myChannel.getReceiver();
    long pongsReceived = messenger.pongsReceived.longValue();
    receiver.receive(pongMessage);
    assertEquals(pongsReceived + 1, messenger.pongsReceived.longValue());
    receiver.receive(pingMessage);
    assertEquals("expected 1 message but found " + interceptor.collectedMessages,
        interceptor.collectedMessages.size(), 1);
    Message m = interceptor.collectedMessages.get(0);
    assertTrue(pinger.isPongMessage(m.getBuffer()));
  }

  @Test
  public void testJGroupsIOExceptionHandler() throws Exception {
    initMocks(false);
    InternalDistributedMember mbr = createAddress(8888);
    NetView v = new NetView(mbr);
    v.add(messenger.getMemberID());
    messenger.installView(v);

    IOException ioe = new IOException("test exception");
    messenger.handleJGroupsIOException(ioe, new JGAddress(mbr));
    messenger.handleJGroupsIOException(ioe, new JGAddress(mbr)); // should be ignored
    verify(healthMonitor).suspect(mbr, "Unable to send messages to this member via JGroups");
  }

  @Test
  public void testReceiver() throws Exception {
    try {
      DistributionStats.enableClockStats = true;
      initMocks(false);
      JGroupsReceiver receiver = (JGroupsReceiver) messenger.myChannel.getReceiver();

      // a zero-length message is ignored
      Message msg = new Message(new JGAddress(messenger.getMemberID()));
      Object result = messenger.readJGMessage(msg);
      assertNull(result);

      // for code coverage we need to pump this message through the receiver
      receiver.receive(msg);

      // for more code coverage we need to actually set a buffer in the message
      msg.setBuffer(new byte[0]);
      result = messenger.readJGMessage(msg);
      assertNull(result);
      receiver.receive(msg);

      // now create a view and a real distribution-message
      InternalDistributedMember myAddress = messenger.getMemberID();
      InternalDistributedMember other = createAddress(8888);
      NetView v = new NetView(myAddress);
      v.add(other);
      when(joinLeave.getView()).thenReturn(v);
      messenger.installView(v);

      List<InternalDistributedMember> recipients = v.getMembers();
      SerialAckedMessage dmsg = new SerialAckedMessage();
      dmsg.setRecipients(recipients);

      // a message is ignored during manager shutdown
      msg = messenger.createJGMessage(dmsg, new JGAddress(other), Version.CURRENT_ORDINAL);
      when(manager.shutdownInProgress()).thenReturn(Boolean.TRUE);
      receiver.receive(msg);
      verify(manager, never()).processMessage(isA(DistributionMessage.class));

      assertTrue("There should be UDPDispatchRequestTime stats",
          services.getStatistics().getUDPDispatchRequestTime() > 0);
    } finally {
      DistributionStats.enableClockStats = false;
    }
  }

  @Test
  public void testUseOldJChannel() throws Exception {
    initMocks(false);
    JChannel channel = messenger.myChannel;
    services.getConfig().getTransport().setOldDSMembershipInfo(new MembershipInformation(channel,
        Collections.singleton(new InternalDistributedMember("localhost", 10000)),
        new ConcurrentLinkedQueue<>()));
    JGroupsMessenger newMessenger = new JGroupsMessenger();
    newMessenger.init(services);
    newMessenger.start();
    newMessenger.started();
    newMessenger.stop();
    assertTrue(newMessenger.myChannel == messenger.myChannel);
  }

  @Test
  public void testGetMessageState() throws Exception {
    initMocks(true/* multicast */);
    messenger.testMulticast(50); // do some multicast messaging
    NAKACK2 nakack = (NAKACK2) messenger.myChannel.getProtocolStack().findProtocol("NAKACK2");
    assertNotNull(nakack);
    long seqno = nakack.getCurrentSeqno();
    Map state = new HashMap();
    messenger.getMessageState(null, state, true);
    assertEquals(1, state.size());
    Long stateLong = (Long) state.values().iterator().next();
    assertTrue(
        "expected multicast state to be at least " + seqno + " but it was " + stateLong.longValue(),
        stateLong.longValue() >= seqno);
  }

  @Test
  public void testGetMessageStateNoMulticast() throws Exception {
    initMocks(false/* multicast */);
    Map state = new HashMap();
    messenger.getMessageState(null, state, true);
    assertEquals("expected an empty map but received " + state, 0, state.size());
  }

  @Test
  public void testWaitForMessageStateSucceeds() throws Exception {
    initMocks(true/* multicast */);
    JGroupsMessenger.MessageTracker tracker = mock(JGroupsMessenger.MessageTracker.class);
    InternalDistributedMember mbr = createAddress(1234);
    messenger.scheduledMcastSeqnos.put(mbr, tracker);
    when(tracker.get()).thenReturn(0l, 2l, 49l, 50l, 80l);
    Map state = new HashMap();
    state.put("JGroups.mcastState", Long.valueOf(50));
    messenger.waitForMessageState(mbr, state);
    verify(tracker, times(4)).get();

    reset(tracker);
    when(tracker.get()).thenReturn(0l, 2l, 60l);
    messenger.waitForMessageState(mbr, state);
    verify(tracker, times(3)).get();
  }

  @Test
  public void testWaitForMessageStateThrowsExceptionIfMessagesMissing() throws Exception {
    initMocks(true/* multicast */);
    NAKACK2 nakack = mock(NAKACK2.class);
    Digest digest = mock(Digest.class);
    when(nakack.getDigest(any(Address.class))).thenReturn(digest);
    when(digest.get(any(Address.class))).thenReturn(new long[] {0, 0}, new long[] {2, 50},
        new long[] {49, 50});
    try {
      // message 50 will never arrive
      Map state = new HashMap();
      state.put("JGroups.mcastState", Long.valueOf(50));
      InternalDistributedMember mbr = createAddress(1234);
      messenger.scheduledMcastSeqnos.put(mbr, new JGroupsMessenger.MessageTracker(30));
      messenger.waitForMessageState(mbr, state);
      fail("expected a GemFireIOException to be thrown");
    } catch (GemFireIOException e) {
      // pass
    }
  }

  private NetView createView(InternalDistributedMember otherMbr) {
    InternalDistributedMember sender = messenger.getMemberID();
    List<InternalDistributedMember> mbrs = new ArrayList<>();
    mbrs.add(sender);
    mbrs.add(otherMbr);
    NetView v = new NetView(sender, 1, mbrs);
    return v;
  }

  @Test
  public void testEncryptedFindCoordinatorRequest() throws Exception {
    InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);

    Properties p = new Properties();
    final String udpDhalgo = "AES:128";
    p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, udpDhalgo);
    initMocks(false, p);

    NetView v = createView(otherMbr);
    when(joinLeave.getMemberID(messenger.getMemberID().getNetMember()))
        .thenReturn(messenger.getMemberID());
    GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services, udpDhalgo);

    messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
    messenger.initClusterKey();

    FindCoordinatorRequest gfmsg = new FindCoordinatorRequest(messenger.getMemberID(),
        new ArrayList<InternalDistributedMember>(2), 1,
        messenger.getPublicKey(messenger.getMemberID()), 1, "");
    Set<InternalDistributedMember> recipients = new HashSet<>();
    recipients.add(otherMbr);
    gfmsg.setRecipients(recipients);

    short version = Version.CURRENT_ORDINAL;

    HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);

    messenger.writeEncryptedMessage(gfmsg, version, out);

    byte[] requestBytes = out.toByteArray();

    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));

    DistributionMessage distributionMessage =
        messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);

    assertEquals(gfmsg, distributionMessage);
  }

  @Test
  public void testEncryptedFindCoordinatorResponse() throws Exception {
    InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);

    Properties p = new Properties();

    p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, AES_128);
    initMocks(false, p);

    NetView v = createView(otherMbr);

    GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services, AES_128);
    otherMbrEncrptor.setPublicKey(messenger.getPublicKey(messenger.getMemberID()),
        messenger.getMemberID());

    messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
    messenger.initClusterKey();

    FindCoordinatorResponse gfmsg = new FindCoordinatorResponse(messenger.getMemberID(),
        messenger.getMemberID(), messenger.getClusterSecretKey(), 1);
    Set<InternalDistributedMember> recipients = new HashSet<>();
    recipients.add(otherMbr);
    gfmsg.setRecipients(recipients);

    short version = Version.CURRENT_ORDINAL;

    HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);

    messenger.writeEncryptedMessage(gfmsg, version, out);

    byte[] requestBytes = out.toByteArray();

    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));

    messenger.addRequestId(1, messenger.getMemberID());

    DistributionMessage distributionMessage =
        messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);

    assertEquals(gfmsg, distributionMessage);
  }

  @Test
  public void testEncryptedJoinRequest() throws Exception {
    InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);

    Properties p = new Properties();
    p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, AES_128);
    initMocks(false, p);

    NetView v = createView(otherMbr);

    GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services, AES_128);

    messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
    messenger.initClusterKey();

    JoinRequestMessage gfmsg =
        new JoinRequestMessage(otherMbr, messenger.getMemberID(), null, 9789, 1);

    short version = Version.CURRENT_ORDINAL;

    HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);

    messenger.writeEncryptedMessage(gfmsg, version, out);

    byte[] requestBytes = out.toByteArray();

    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));

    DistributionMessage distributionMessage =
        messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);

    assertEquals(gfmsg, distributionMessage);
  }

  @Test
  public void testEncryptedJoinResponse() throws Exception {
    InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);

    Properties p = new Properties();
    p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, AES_128);
    initMocks(false, p);

    NetView v = createView(otherMbr);

    GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services, AES_128);
    otherMbrEncrptor.setPublicKey(messenger.getPublicKey(messenger.getMemberID()),
        messenger.getMemberID());

    messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
    messenger.initClusterKey();

    JoinResponseMessage gfmsg =
        new JoinResponseMessage(otherMbr, messenger.getClusterSecretKey(), 1);

    short version = Version.CURRENT_ORDINAL;

    HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);

    messenger.writeEncryptedMessage(gfmsg, version, out);

    byte[] requestBytes = out.toByteArray();

    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));

    messenger.addRequestId(1, messenger.getMemberID());

    DistributionMessage gfMessageAtOtherMbr =
        messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);

    assertEquals(gfmsg, gfMessageAtOtherMbr);

    // lets send view as well..

    InstallViewMessage installViewMessage = new InstallViewMessage(v, null, true);

    out = new HeapDataOutputStream(Version.CURRENT);

    messenger.writeEncryptedMessage(installViewMessage, version, out);

    requestBytes = out.toByteArray();

    otherMbrEncrptor.setClusterKey(((JoinResponseMessage) gfMessageAtOtherMbr).getSecretPk());

    dis = new DataInputStream(new ByteArrayInputStream(requestBytes));

    gfMessageAtOtherMbr = messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);

    assertEquals(installViewMessage, gfMessageAtOtherMbr);

  }

  /**
   * creates an InternalDistributedMember address that can be used with the doctored JGroups
   * channel. This includes a logical (UUID) address and a physical (IpAddress) address.
   *
   * @param port the UDP port to use for the new address
   */
  private InternalDistributedMember createAddress(int port) {
    GMSMember gms = new GMSMember("localhost", port);
    gms.setUUID(UUID.randomUUID());
    gms.setVmKind(ClusterDistributionManager.NORMAL_DM_TYPE);
    gms.setVersionOrdinal(Version.CURRENT_ORDINAL);
    return new InternalDistributedMember(gms);
  }

}
