/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.distributed.internal.membership;

import static org.apache.geode.distributed.ConfigurationProperties.DISABLE_TCP;
import static org.apache.geode.distributed.ConfigurationProperties.GROUPS;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
import java.net.InetAddress;
import java.util.List;
import java.util.Properties;

import org.apache.commons.lang3.tuple.Pair;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import org.apache.geode.GemFireConfigException;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.distributed.internal.SerialAckedMessage;
import org.apache.geode.distributed.internal.membership.adapter.GMSMembershipManager;
import org.apache.geode.distributed.internal.membership.adapter.ServiceConfig;
import org.apache.geode.distributed.internal.membership.adapter.auth.GMSAuthenticator;
import org.apache.geode.distributed.internal.membership.gms.GMSMemberData;
import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.gms.api.MemberIdentifierFactory;
import org.apache.geode.distributed.internal.membership.gms.api.MembershipBuilder;
import org.apache.geode.distributed.internal.membership.gms.api.MembershipConfig;
import org.apache.geode.distributed.internal.membership.gms.api.MembershipListener;
import org.apache.geode.distributed.internal.membership.gms.api.MessageListener;
import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.admin.remote.RemoteTransportConfig;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.security.SecurityServiceFactory;
import org.apache.geode.internal.serialization.DSFIDSerializer;

@Category({MembershipJUnitTest.class})
public class MembershipJUnitTest {

  @Rule
  public TemporaryFolder temporaryFolder = new TemporaryFolder();

  /**
   * This test creates a locator with a colocated membership manager and then creates a second
   * manager that joins the system of the first.
   *
   * It then makes assertions about the state of the membership view, closes one of the managers and
   * makes more assertions. It also ensures that a cache message can be sent from one manager to the
   * other.
   */
  @Test
  public void testMultipleManagersInSameProcess() throws Exception {
    doTestMultipleManagersInSameProcessWithGroups("red, blue");
  }

  /**
   * Ensure that a large membership group doesn't cause communication issues
   */
  @Test
  public void testManagersWithLargeGroups() throws Exception {
    StringBuilder stringBuilder = new StringBuilder(80000);
    boolean first = true;
    // create 8000 10-byte group names
    for (int thousands = 1; thousands < 9; thousands++) {
      for (int group = 0; group < 1000; group++) {
        if (!first) {
          stringBuilder.append(',');
        }
        first = false;
        stringBuilder.append(String.format("%1$02d%2$08d", thousands, group));
      }
    }
    List<String> result = doTestMultipleManagersInSameProcessWithGroups(stringBuilder.toString());
    assertEquals(8000, result.size());
    for (String group : result) {
      assertEquals(10, group.length());
    }
  }

  /**
   * this runs the test with a given set of member groups. Returns the groups of the member that was
   * not the coordinator for verification that they were correctly transmitted
   */
  private List<String> doTestMultipleManagersInSameProcessWithGroups(String groups)
      throws Exception {

    MembershipManager m1 = null, m2 = null;
    Locator l = null;
    // int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort();

    try {

      // boot up a locator
      int port = AvailablePortHelper.getRandomAvailableTCPPort();
      InetAddress localHost = SocketCreator.getLocalHost();

      // this locator will hook itself up with the first MembershipManager
      // to be created
      l = InternalLocator.startLocator(port, new File(""), null, null, localHost, false,
          new Properties(), null, temporaryFolder.getRoot().toPath());

      // create configuration objects
      Properties nonDefault = new Properties();
      nonDefault.put(DISABLE_TCP, "true");
      nonDefault.put(MCAST_PORT, "0");
      nonDefault.put(LOG_FILE, "");
      nonDefault.put(LOG_LEVEL, "fine");
      nonDefault.put(GROUPS, groups);
      nonDefault.put(MEMBER_TIMEOUT, "2000");
      nonDefault.put(LOCATORS, localHost.getHostName() + '[' + port + ']');
      DistributionConfigImpl config = new DistributionConfigImpl(nonDefault);
      RemoteTransportConfig transport =
          new RemoteTransportConfig(config, ClusterDistributionManager.NORMAL_DM_TYPE);

      // start the first membership manager
      try {
        System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
        m1 = createMembershipManager(config, transport).getLeft();
      } finally {
        System.getProperties().remove(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY);
      }

      // start the second membership manager
      final Pair<MembershipManager, MessageListener> pair =
          createMembershipManager(config, transport);
      m2 = pair.getLeft();
      final MessageListener listener2 = pair.getRight();

      // we have to check the views with JoinLeave because the membership
      // manager queues new views for processing through the DM listener,
      // which is a mock object in this test
      System.out.println("waiting for views to stabilize");
      JoinLeave jl1 = ((GMSMembershipManager) m1).getServices().getJoinLeave();
      JoinLeave jl2 = ((GMSMembershipManager) m2).getServices().getJoinLeave();
      long giveUp = System.currentTimeMillis() + 15000;
      for (;;) {
        try {
          assertTrue("view = " + jl2.getView(), jl2.getView().size() == 2);
          assertTrue("view = " + jl1.getView(), jl1.getView().size() == 2);
          assertTrue(jl1.getView().getCreator().equals(jl2.getView().getCreator()));
          assertTrue(jl1.getView().getViewId() == jl2.getView().getViewId());
          break;
        } catch (AssertionError e) {
          if (System.currentTimeMillis() > giveUp) {
            throw e;
          }
        }
      }

      GMSMembershipView view = jl1.getView();
      MemberIdentifier notCreator;
      if (view.getCreator().equals(jl1.getMemberID())) {
        notCreator = view.getMembers().get(1);
      } else {
        notCreator = view.getMembers().get(0);
      }
      List<String> result = notCreator.getGroups();

      System.out.println("sending SerialAckedMessage from m1 to m2");
      SerialAckedMessage msg = new SerialAckedMessage();
      msg.setRecipient(m2.getLocalMember());
      msg.setMulticast(false);
      m1.send(new InternalDistributedMember[] {m2.getLocalMember()}, msg);
      giveUp = System.currentTimeMillis() + 15000;
      boolean verified = false;
      Throwable problem = null;
      while (giveUp > System.currentTimeMillis()) {
        try {
          verify(listener2).messageReceived(isA(SerialAckedMessage.class));
          verified = true;
          break;
        } catch (Error e) {
          problem = e;
          Thread.sleep(500);
        }
      }
      if (!verified) {
        AssertionError error = new AssertionError("Expected a message to be received");
        if (problem != null) {
          error.initCause(problem);
        }
        throw error;
      }

      // let the managers idle for a while and get used to each other
      // Thread.sleep(4000l);

      m2.disconnect(false);
      assertTrue(!m2.isConnected());

      assertTrue(m1.getView().size() == 1);

      return result;
    } finally {

      if (m2 != null) {
        m2.shutdown();
      }
      if (m1 != null) {
        m1.shutdown();
      }
      if (l != null) {
        l.stop();
      }
    }
  }

  private Pair<MembershipManager, MessageListener> createMembershipManager(
      final DistributionConfigImpl config,
      final RemoteTransportConfig transport) {
    final MembershipListener listener = mock(MembershipListener.class);
    final MessageListener messageListener = mock(MessageListener.class);
    final DMStats stats1 = mock(DMStats.class);
    final InternalDistributedSystem mockSystem = mock(InternalDistributedSystem.class);
    final SecurityService securityService = SecurityServiceFactory.create();
    DSFIDSerializer serializer = InternalDataSerializer.getDSFIDSerializer();
    final MemberIdentifierFactory memberFactory = mock(MemberIdentifierFactory.class);
    when(memberFactory.create(isA(GMSMemberData.class))).thenAnswer(new Answer<MemberIdentifier>() {
      @Override
      public MemberIdentifier answer(InvocationOnMock invocation) throws Throwable {
        return new InternalDistributedMember((GMSMemberData) invocation.getArgument(0));
      }
    });
    final MembershipManager m1 =
        MembershipBuilder.newMembershipBuilder(null)
            .setAuthenticator(new GMSAuthenticator(config.getSecurityProps(), securityService,
                mockSystem.getSecurityLogWriter(), mockSystem.getInternalLogWriter()))
            .setStatistics(stats1)
            .setMessageListener(messageListener)
            .setMembershipListener(listener)
            .setConfig(new ServiceConfig(transport, config))
            .setSerializer(serializer)
            .create();
    m1.startEventProcessing();
    return Pair.of(m1, messageListener);
  }

  /**
   * This test ensures that secure communications are enabled.
   *
   * This test creates a locator with a colocated membership manager and then creates a second
   * manager that joins the system of the first.
   *
   * It then makes assertions about the state of the membership view, closes one of the managers and
   * makes more assertions.
   */
  @Test
  public void testLocatorAndTwoServersJoinUsingDiffeHellman() throws Exception {

    MembershipManager m1 = null, m2 = null;
    Locator l = null;
    int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort();

    try {

      // boot up a locator
      int port = AvailablePortHelper.getRandomAvailableTCPPort();
      InetAddress localHost = SocketCreator.getLocalHost();
      Properties p = new Properties();
      p.setProperty(ConfigurationProperties.SECURITY_UDP_DHALGO, "AES:128");
      // this locator will hook itself up with the first MembershipManager
      // to be created
      l = InternalLocator.startLocator(port, new File(""), null, null, localHost, false, p, null,
          temporaryFolder.getRoot().toPath());

      // create configuration objects
      Properties nonDefault = new Properties();
      nonDefault.put(DistributionConfig.DISABLE_TCP_NAME, "true");
      nonDefault.put(DistributionConfig.MCAST_PORT_NAME, String.valueOf(mcastPort));
      nonDefault.put(DistributionConfig.LOG_FILE_NAME, "");
      nonDefault.put(DistributionConfig.LOG_LEVEL_NAME, "fine");
      nonDefault.put(DistributionConfig.GROUPS_NAME, "red, blue");
      nonDefault.put(DistributionConfig.MEMBER_TIMEOUT_NAME, "2000");
      nonDefault.put(DistributionConfig.LOCATORS_NAME, localHost.getHostName() + '[' + port + ']');
      nonDefault.put(ConfigurationProperties.SECURITY_UDP_DHALGO, "AES:128");
      DistributionConfigImpl config = new DistributionConfigImpl(nonDefault);
      RemoteTransportConfig transport =
          new RemoteTransportConfig(config, ClusterDistributionManager.NORMAL_DM_TYPE);

      // start the first membership manager
      try {
        System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
        m1 = createMembershipManager(config, transport).getLeft();
      } finally {
        System.getProperties().remove(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY);
      }

      // start the second membership manager
      final Pair<MembershipManager, MessageListener> pair =
          createMembershipManager(config, transport);
      m2 = pair.getLeft();
      final MessageListener listener2 = pair.getRight();

      // we have to check the views with JoinLeave because the membership
      // manager queues new views for processing through the DM listener,
      // which is a mock object in this test
      System.out.println("waiting for views to stabilize");
      JoinLeave jl1 = ((GMSMembershipManager) m1).getServices().getJoinLeave();
      JoinLeave jl2 = ((GMSMembershipManager) m2).getServices().getJoinLeave();
      long giveUp = System.currentTimeMillis() + 15000;
      for (;;) {
        try {
          assertTrue("view = " + jl2.getView(), jl2.getView().size() == 2);
          assertTrue("view = " + jl1.getView(), jl1.getView().size() == 2);
          assertTrue(jl1.getView().getCreator().equals(jl2.getView().getCreator()));
          assertTrue(jl1.getView().getViewId() == jl2.getView().getViewId());
          break;
        } catch (AssertionError e) {
          if (System.currentTimeMillis() > giveUp) {
            throw e;
          }
        }
      }

      System.out.println("testing multicast availability");
      assertTrue(m1.testMulticast());

      System.out.println("multicasting SerialAckedMessage from m1 to m2");
      SerialAckedMessage msg = new SerialAckedMessage();
      msg.setRecipient(m2.getLocalMember());
      msg.setMulticast(true);
      m1.send(new InternalDistributedMember[] {m2.getLocalMember()}, msg);
      giveUp = System.currentTimeMillis() + 5000;
      boolean verified = false;
      Throwable problem = null;
      while (giveUp > System.currentTimeMillis()) {
        try {
          verify(listener2).messageReceived(isA(SerialAckedMessage.class));
          verified = true;
          break;
        } catch (Error e) {
          problem = e;
          Thread.sleep(500);
        }
      }
      if (!verified) {
        if (problem != null) {
          problem.printStackTrace();
        }
        fail("Expected a multicast message to be received");
      }

      // let the managers idle for a while and get used to each other
      Thread.sleep(4000l);

      m2.disconnect(false);
      assertTrue(!m2.isConnected());

      assertTrue(m1.getView().size() == 1);

    } finally {

      if (m2 != null) {
        m2.disconnect(false);
      }
      if (m1 != null) {
        m1.disconnect(false);
      }
      if (l != null) {
        l.stop();
      }
    }
  }

  @Test
  public void testJoinTimeoutSetting() throws Exception {
    long timeout = 30000;
    Properties nonDefault = new Properties();
    nonDefault.put(MEMBER_TIMEOUT, "" + timeout);
    DistributionConfigImpl config = new DistributionConfigImpl(nonDefault);
    RemoteTransportConfig transport =
        new RemoteTransportConfig(config, ClusterDistributionManager.NORMAL_DM_TYPE);
    MembershipConfig sc = new ServiceConfig(transport, config);
    assertEquals(2 * timeout + MembershipConfig.MEMBER_REQUEST_COLLECTION_INTERVAL,
        sc.getJoinTimeout());

    nonDefault.clear();
    config = new DistributionConfigImpl(nonDefault);
    transport = new RemoteTransportConfig(config, ClusterDistributionManager.NORMAL_DM_TYPE);
    sc = new ServiceConfig(transport, config);
    assertEquals(24000, sc.getJoinTimeout());

    nonDefault.clear();
    nonDefault.put(LOCATORS, SocketCreator.getLocalHost().getHostAddress() + "[" + 12345 + "]");
    config = new DistributionConfigImpl(nonDefault);
    transport = new RemoteTransportConfig(config, ClusterDistributionManager.NORMAL_DM_TYPE);
    sc = new ServiceConfig(transport, config);
    assertEquals(60000, sc.getJoinTimeout());

    timeout = 2000;
    System.setProperty("p2p.joinTimeout", "" + timeout);
    try {
      config = new DistributionConfigImpl(nonDefault);
      transport = new RemoteTransportConfig(config, ClusterDistributionManager.NORMAL_DM_TYPE);
      sc = new ServiceConfig(transport, config);
      assertEquals(timeout, sc.getJoinTimeout());
    } finally {
      System.getProperties().remove("p2p.joinTimeout");
    }

  }

  @Test
  public void testMulticastDiscoveryNotAllowed() {
    Properties nonDefault = new Properties();
    nonDefault.put(DISABLE_TCP, "true");
    nonDefault.put(MCAST_PORT, "12345");
    nonDefault.put(LOG_FILE, "");
    nonDefault.put(LOG_LEVEL, "fine");
    nonDefault.put(LOCATORS, "");
    DistributionConfigImpl config = new DistributionConfigImpl(nonDefault);
    RemoteTransportConfig transport =
        new RemoteTransportConfig(config, ClusterDistributionManager.NORMAL_DM_TYPE);

    MembershipConfig membershipConfig = new ServiceConfig(transport, config);

    Services services = mock(Services.class);
    when(services.getConfig()).thenReturn(membershipConfig);

    GMSJoinLeave joinLeave = new GMSJoinLeave();
    try {
      joinLeave.init(services);
      throw new Error(
          "expected a GemFireConfigException to be thrown because no locators are configured");
    } catch (GemFireConfigException e) {
      // expected
    }
  }
}
