/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.distributed.internal.membership.gms;

import static org.apache.geode.distributed.internal.membership.gms.util.MemberIdentifierUtil.createMemberID;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.HIGH_PRIORITY_ACKED_MESSAGE;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.stream.Collectors;

import org.jgroups.util.UUID;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

import org.apache.geode.distributed.internal.membership.api.Authenticator;
import org.apache.geode.distributed.internal.membership.api.LifecycleListener;
import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedException;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.api.MemberShunnedException;
import org.apache.geode.distributed.internal.membership.api.MemberStartupException;
import org.apache.geode.distributed.internal.membership.api.MembershipConfig;
import org.apache.geode.distributed.internal.membership.api.MembershipListener;
import org.apache.geode.distributed.internal.membership.api.MembershipView;
import org.apache.geode.distributed.internal.membership.api.Message;
import org.apache.geode.distributed.internal.membership.api.MessageListener;
import org.apache.geode.distributed.internal.membership.gms.GMSMembership.StartupEvent;
import org.apache.geode.distributed.internal.membership.gms.Services.Stopper;
import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
import org.apache.geode.internal.serialization.DSFIDSerializer;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.serialization.Versioning;
import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl;
import org.apache.geode.test.junit.categories.MembershipTest;

@Category({MembershipTest.class})
public class GMSMembershipJUnitTest {

  private static final Version OLDER_THAN_CURRENT_VERSION =
      Versioning.getVersion((short) (KnownVersion.CURRENT_ORDINAL - 1));
  private static final Version NEWER_THAN_CURRENT_VERSION =
      Versioning.getVersion((short) (KnownVersion.CURRENT_ORDINAL + 1));;
  private static final int DEFAULT_PORT = 8888;

  private Services services;
  private MembershipConfig mockConfig;
  private Authenticator authenticator;
  private HealthMonitor healthMonitor;
  private MemberIdentifier myMemberId;
  private MemberIdentifier[] mockMembers;
  private Messenger messenger;
  private JoinLeave joinLeave;
  private Stopper stopper;
  private MembershipListener listener;
  private GMSMembership<MemberIdentifier> manager;
  private List<MemberIdentifier> members;
  private MessageListener messageListener;
  private LifecycleListener directChannelCallback;

  @Before
  public void initMocks() throws Exception {
    mockConfig = new MembershipConfig() {
      @Override
      public long getMemberTimeout() {
        return 2000;
      }

      @Override
      public String getLocators() {
        return "localhost[10344]";
      }

      @Override
      public boolean getDisableTcp() {
        return true;
      }

      @Override
      public long getAckWaitThreshold() {
        return 1;
      }

      @Override
      public long getAckSevereAlertThreshold() {
        return 10;
      }
    };

    authenticator = mock(Authenticator.class);
    myMemberId = createMemberID(8887);
    UUID uuid = new UUID(12345, 12345);
    myMemberId.setUUID(uuid);

    messenger = mock(Messenger.class);
    when(messenger.getMemberID()).thenReturn(myMemberId);

    stopper = mock(Stopper.class);
    when(stopper.isCancelInProgress()).thenReturn(false);

    healthMonitor = mock(HealthMonitor.class);
    when(healthMonitor.getFailureDetectionPort()).thenReturn(Integer.valueOf(-1));

    joinLeave = mock(JoinLeave.class);

    services = mock(Services.class);
    when(services.getAuthenticator()).thenReturn(authenticator);
    when(services.getConfig()).thenReturn(mockConfig);
    when(services.getMessenger()).thenReturn(messenger);
    when(services.getCancelCriterion()).thenReturn(stopper);
    when(services.getHealthMonitor()).thenReturn(healthMonitor);
    when(services.getJoinLeave()).thenReturn(joinLeave);

    Timer t = new Timer(true);
    when(services.getTimer()).thenReturn(t);

    Random r = new Random();
    mockMembers = new MemberIdentifier[5];
    for (int i = 0; i < mockMembers.length; i++) {
      mockMembers[i] = createMemberID(DEFAULT_PORT + i);
      uuid = new UUID(r.nextLong(), r.nextLong());
      mockMembers[i].setUUID(uuid);
    }
    members = new ArrayList<>(Arrays.asList(mockMembers));

    listener = mock(MembershipListener.class);
    messageListener = mock(MessageListener.class);
    directChannelCallback = mock(LifecycleListener.class);
    manager = new GMSMembership(listener, messageListener, directChannelCallback);
    manager.getGMSManager().init(services);
    when(services.getManager()).thenReturn(manager.getGMSManager());

    DSFIDSerializer serializer = new DSFIDSerializerImpl();
    when(services.getSerializer()).thenReturn(serializer);
    Services.registerSerializables(serializer);
  }

  @After
  public void tearDown() throws Exception {
    if (manager != null) {
      manager.getGMSManager().stop();
      manager.getGMSManager().stopped();
    }
  }

  @Test
  public void testSendMessage() throws Exception {
    services.getSerializer().registerDSFID(HIGH_PRIORITY_ACKED_MESSAGE, TestMessage.class);
    TestMessage m = new TestMessage();
    m.setRecipient(mockMembers[0]);
    manager.getGMSManager().start();
    manager.getGMSManager().started();
    MemberIdentifier myGMSMemberId = myMemberId;
    List<MemberIdentifier> gmsMembers =
        members.stream().map(x -> ((MemberIdentifier) x)).collect(Collectors.toList());
    manager.getGMSManager().installView(new GMSMembershipView<>(myGMSMemberId, 1, gmsMembers));
    MemberIdentifier[] destinations = new MemberIdentifier[] {mockMembers[0]};
    Set<MemberIdentifier> failures =
        manager.send(destinations, m);
    verify(messenger).send(isA(Message.class));
    if (failures != null) {
      assertEquals(0, failures.size());
    }
  }



  private GMSMembershipView createView(MemberIdentifier creator, int viewId,
      List<MemberIdentifier> members) {
    List<MemberIdentifier> gmsMembers = new ArrayList<>(members);
    return new GMSMembershipView(creator, viewId, gmsMembers);
  }

  @Test
  public void testStartupEvents() throws Exception {
    manager.getGMSManager().start();
    manager.getGMSManager().started();
    manager.isJoining = true;

    List<MemberIdentifier> viewMembers =
        Arrays.asList(new MemberIdentifier[] {mockMembers[0], myMemberId});
    manager.getGMSManager().installView(createView(myMemberId, 2, viewMembers));

    // add a surprise member that will be shunned due to it's having
    // an old view ID
    MemberIdentifier surpriseMember = mockMembers[2];
    surpriseMember.setVmViewId(1);
    manager.handleOrDeferSurpriseConnect(surpriseMember);
    assertEquals(1, manager.getStartupEvents().size());

    // add a surprise member that will be accepted
    MemberIdentifier surpriseMember2 = mockMembers[3];
    surpriseMember2.setVmViewId(3);
    manager.handleOrDeferSurpriseConnect(surpriseMember2);
    assertEquals(2, manager.getStartupEvents().size());

    // suspect a member
    MemberIdentifier suspectMember = mockMembers[1];
    manager.handleOrDeferSuspect(
        new SuspectMember<>(mockMembers[0], suspectMember, "testing"));
    // suspect messages aren't queued - they're ignored before joining the system
    assertEquals(2, manager.getStartupEvents().size());
    verify(listener, never()).memberSuspect(suspectMember, mockMembers[0], "testing");

    TestMessage m = new TestMessage();
    mockMembers[0].setVmViewId(1);
    m.setRecipient(mockMembers[0]);
    m.setSender(mockMembers[1]);
    manager.handleOrDeferMessage(m);
    assertEquals(3, manager.getStartupEvents().size());

    // this view officially adds surpriseMember2
    viewMembers = Arrays
        .asList(new MemberIdentifier[] {mockMembers[0], myMemberId, surpriseMember2});
    manager.handleOrDeferViewEvent(new MembershipView<>(myMemberId, 3, viewMembers));
    assertEquals(4, manager.getStartupEvents().size());

    // add a surprise member that will be shunned due to it's having
    // an old view ID
    MemberIdentifier surpriseMember3 = mockMembers[4];
    surpriseMember.setVmViewId(1);
    manager.handleOrDeferSurpriseConnect(surpriseMember);
    assertEquals(5, manager.getStartupEvents().size());

    // process a new view after we finish joining but before event processing has started
    manager.isJoining = false;
    mockMembers[4].setVmViewId(4);
    viewMembers = Arrays.asList(new MemberIdentifier[] {mockMembers[0], myMemberId,
        surpriseMember2, mockMembers[4]});
    manager.handleOrDeferViewEvent(new MembershipView<>(myMemberId, 4, viewMembers));
    assertEquals(6, manager.getStartupEvents().size());

    // exercise the toString methods for code coverage
    for (StartupEvent<MemberIdentifier> ev : manager.getStartupEvents()) {
      ev.toString();
    }

    manager.startEventProcessing();

    // all startup events should have been processed
    assertEquals(0, manager.getStartupEvents().size());
    // the new view should have been installed
    assertEquals(4, manager.getView().getViewId());
    // supriseMember2 should have been announced
    verify(listener).newMemberConnected(surpriseMember2);
    // supriseMember should have been rejected (old view ID)
    verify(listener, never()).newMemberConnected(surpriseMember);

    // for code coverage also install a view after we finish joining but before
    // event processing has started. This should notify the distribution manager
    // with a LocalViewMessage to process the view
    manager.handleOrDeferViewEvent(new MembershipView<>(myMemberId, 5, viewMembers));
    await().untilAsserted(() -> assertEquals(manager.getView().getViewId(), 5));

    // process a suspect now - it will be passed to the listener
    reset(listener);
    suspectMember = mockMembers[1];
    manager.handleOrDeferSuspect(
        new SuspectMember<>(mockMembers[0], suspectMember, "testing"));
    verify(listener).memberSuspect(suspectMember, mockMembers[0], "testing");
  }

  /**
   * This test ensures that the membership manager can accept an ID that does not have a UUID and
   * replace it with one that does have a UUID from the current membership view.
   */
  @Test
  public void testAddressesWithoutUUIDs() throws Exception {
    manager.getGMSManager().start();
    manager.getGMSManager().started();
    manager.isJoining = true;

    List<MemberIdentifier> viewMembers =
        Arrays.asList(new MemberIdentifier[] {mockMembers[0], mockMembers[1], myMemberId});
    GMSMembershipView view = createView(myMemberId, 2, viewMembers);
    manager.getGMSManager().installView(view);
    when(services.getJoinLeave().getView()).thenReturn(view);

    MemberIdentifier[] destinations = new MemberIdentifier[viewMembers.size()];
    for (int i = 0; i < destinations.length; i++) {
      MemberIdentifier id = viewMembers.get(i);
      destinations[i] = createMemberID(id.getMembershipPort());
    }
    manager.checkAddressesForUUIDs(destinations);
    // each destination w/o a UUID should have been replaced with the corresponding
    // ID from the membership view
    for (int i = 0; i < destinations.length; i++) {
      assertTrue(destinations[i].hasUUID());
    }
  }

  @Test
  public void noDispatchWhenSick() throws MemberShunnedException, MemberStartupException {
    final Message msg = mock(Message.class);
    when(msg.dropMessageWhenMembershipIsPlayingDead()).thenReturn(true);

    final GMSMembership spy = Mockito.spy(manager);

    spy.beSick();
    spy.getGMSManager().start();
    spy.getGMSManager().started();

    spy.handleOrDeferMessage(msg);

    verify(spy, never()).dispatchMessage(any(Message.class));
    assertThat(spy.getStartupEvents()).isEmpty();
  }

  @Test
  public void testIsMulticastAllowedWithOldVersionSurpriseMember() {
    MembershipView<MemberIdentifier> view = createMembershipView();
    manager.addSurpriseMember(createSurpriseMember(OLDER_THAN_CURRENT_VERSION));

    manager.processView(view);

    assertThat(manager.getGMSManager().isMulticastAllowed()).isFalse();
  }

  @Test
  public void testIsMulticastAllowedWithCurrentVersionSurpriseMember() {
    MembershipView<MemberIdentifier> view = createMembershipView();
    manager.addSurpriseMember(createSurpriseMember(KnownVersion.CURRENT));

    manager.processView(view);

    assertThat(manager.getGMSManager().isMulticastAllowed()).isTrue();
  }

  @Test
  public void testIsMulticastAllowedWithNewVersionSurpriseMember() {
    MembershipView<MemberIdentifier> view = createMembershipView();
    manager.addSurpriseMember(createSurpriseMember(NEWER_THAN_CURRENT_VERSION));

    manager.processView(view);

    assertThat(manager.getGMSManager().isMulticastAllowed()).isTrue();
  }

  @Test
  public void testIsMulticastAllowedWithOldVersionViewMember() {
    MembershipView<MemberIdentifier> view = createMembershipView();
    view.getMembers().get(0).setVersionForTest(OLDER_THAN_CURRENT_VERSION);

    manager.processView(view);

    assertThat(manager.getGMSManager().isMulticastAllowed()).isFalse();
  }

  @Test
  public void testMulticastAllowedWithCurrentVersionViewMember() {
    MembershipView<MemberIdentifier> view = createMembershipView();

    manager.processView(view);

    assertThat(manager.getGMSManager().isMulticastAllowed()).isTrue();
  }

  @Test
  public void testMulticastAllowedWithNewVersionViewMember() {
    MembershipView<MemberIdentifier> view = createMembershipView();
    view.getMembers().get(0).setVersionForTest(NEWER_THAN_CURRENT_VERSION);

    manager.processView(view);

    assertThat(manager.getGMSManager().isMulticastAllowed()).isTrue();
  }

  @Test
  public void membershipInvokesUpstreamListenerDuringForcedDisconnect() {
    // have an exception interrupt the shutdown process and ensure that a thread is
    // launched to inform the cache of shutdown
    IllegalStateException expectedException = new IllegalStateException();
    doThrow(expectedException).when(services).emergencyClose();
    assertThatThrownBy(() -> manager.uncleanShutdown("For testing",
        new MemberDisconnectedException("For Testing")))
            .isEqualTo(expectedException);
    verify(listener).membershipFailure(isA(String.class), isA(Throwable.class));
  }

  private MemberIdentifier createSurpriseMember(Version version) {
    MemberIdentifier surpriseMember = createMemberID(DEFAULT_PORT + 5);
    surpriseMember.setVmViewId(3);
    surpriseMember.setVersionForTest(version);
    return surpriseMember;
  }

  private MembershipView<MemberIdentifier> createMembershipView() {
    List<MemberIdentifier> viewMembers = createMemberIdentifiers();
    return new MembershipView<>(myMemberId, 2, viewMembers);
  }

  private List<MemberIdentifier> createMemberIdentifiers() {
    List<MemberIdentifier> viewMembers = new ArrayList<>();
    for (int i = 0; i < 2; ++i) {
      MemberIdentifier memberIdentifier = createMemberID(DEFAULT_PORT + 6 + i);
      viewMembers.add(memberIdentifier);
    }
    return viewMembers;
  }
}
