/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.test.junit.rules;

import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_BIND_ADDRESS;
import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.NAME;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_MANAGER;
import static org.apache.geode.management.internal.ManagementConstants.OBJECTNAME__CLIENTSERVICE_MXBEAN;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;

import java.io.File;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import javax.management.ObjectName;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.assertj.core.api.Assertions;
import org.awaitility.core.ConditionTimeoutException;

import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.ssl.CertStores;
import org.apache.geode.cache.ssl.CertificateBuilder;
import org.apache.geode.cache.ssl.CertificateMaterial;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
import org.apache.geode.internal.UniquePortSupplier;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.management.CacheServerMXBean;
import org.apache.geode.management.DistributedRegionMXBean;
import org.apache.geode.management.DistributedSystemMXBean;
import org.apache.geode.management.ManagementService;
import org.apache.geode.management.internal.MBeanJMXAdapter;
import org.apache.geode.management.internal.SystemManagementService;
import org.apache.geode.management.internal.cli.CliUtils;
import org.apache.geode.pdx.internal.TypeRegistry;
import org.apache.geode.security.SecurityManager;
import org.apache.geode.security.templates.UserPasswordAuthInit;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.junit.rules.accessible.AccessibleRestoreSystemProperties;
import org.apache.geode.test.junit.rules.serializable.SerializableExternalResource;

/**
 * the abstract class that's used by LocatorStarterRule and ServerStarterRule to avoid code
 * duplication.
 *
 * The rule will try to clean up the working dir as best as it can. Any first level children
 * created in the test will be cleaned up after the test.
 */
public abstract class MemberStarterRule<T> extends SerializableExternalResource implements Member {
  private final int availableJmxPort;
  private final int availableHttpPort;

  protected int memberPort;
  protected int jmxPort = -1; // -1 means not start on servere, 0 means use default, >0 means preset
  protected int httpPort = -1; // -1 means do not start, 0 means use default, >0 means pre-assigned

  protected String name;
  protected boolean logFile = false;
  protected Properties properties = new Properties();
  protected Properties systemProperties = new Properties();

  protected boolean autoStart = false;

  private List<File> firstLevelChildrenFile = new ArrayList<>();
  private boolean cleanWorkingDir = true;

  public static void setWaitUntilTimeout(int waitUntilTimeout) {
    WAIT_UNTIL_TIMEOUT = waitUntilTimeout;
  }

  private static int WAIT_UNTIL_TIMEOUT = 30;

  private final AccessibleRestoreSystemProperties restore = new AccessibleRestoreSystemProperties();

  public MemberStarterRule() {
    this(new UniquePortSupplier());
  }

  public MemberStarterRule(UniquePortSupplier portSupplier) {
    availableJmxPort = portSupplier.getAvailablePort();
    availableHttpPort = portSupplier.getAvailablePort();
    memberPort = portSupplier.getAvailablePort();

    // initial values
    properties.setProperty(MCAST_PORT, "0");
    properties.setProperty(LOCATORS, "");
    properties.setProperty(MAX_WAIT_TIME_RECONNECT, "5000");
  }

  @Override
  public void before() {
    try {
      restore.before();
    } catch (Throwable throwable) {
      throw new RuntimeException(throwable.getMessage(), throwable);
    }
    normalizeProperties();
    if (httpPort < 0) {
      // at this point, httpPort is not being configured by api, we assume they do not
      // want to start the http service.
      // use putIfAbsent if it was configured using withProperty
      properties.putIfAbsent(HTTP_SERVICE_PORT, "0");
    }
    firstLevelChildrenFile = Arrays.asList(getWorkingDir().listFiles());

    for (String key : systemProperties.stringPropertyNames()) {
      System.setProperty(key, systemProperties.getProperty(key));
    }
  }

  @Override
  public void after() {
    restore.after();
    // invoke stop() first and then ds.disconnect
    stopMember();

    disconnectDSIfAny();
    // this will clean up the SocketCreators created in this VM so that it won't contaminate
    // future tests
    SocketCreatorFactory.close();

    // This is required if PDX is in use and tests are run repeatedly.
    TypeRegistry.init();

    // delete the first-level children files that are created in the tests
    if (cleanWorkingDir) {
      Arrays.stream(getWorkingDir().listFiles())
          // do not delete the pre-existing files
          .filter(f -> !firstLevelChildrenFile.contains(f))
          // do not delete the dunit folder that might have been created by dunit launcher
          .filter(f -> !(f.isDirectory() && f.getName().equals("dunit")))
          .forEach(FileUtils::deleteQuietly);
    }
  }

  public T withPort(int memberPort) {
    this.memberPort = memberPort;
    return (T) this;
  }

  /*
   * All the logs are written in the logfile instead on the console. this is usually used with
   * withWorkingDir so that logs are accessible and will be cleaned up afterwards.
   */
  public T withLogFile() {
    logFile = true;
    return (T) this;
  }

  public static void disconnectDSIfAny() {
    DistributedSystem ds = InternalDistributedSystem.getConnectedInstance();
    if (ds != null) {
      ds.disconnect();
    }
  }

  public T withSystemProperty(String key, String value) {
    systemProperties.put(key, value);
    return (T) this;
  }

  public T withProperty(String key, String value) {
    properties.setProperty(key, value);
    return (T) this;
  }

  public T withProperties(Properties props) {
    if (props != null) {
      properties.putAll(props);
    }
    return (T) this;
  }

  public T withSSL(String components, boolean requireAuth,
      boolean endPointIdentification) {
    Properties sslProps = getSSLProperties(components, requireAuth, endPointIdentification);
    properties.putAll(sslProps);
    return (T) this;
  }

  public static Properties getSSLProperties(String components, boolean requireAuth,
      boolean endPointIdentification) {
    CertificateMaterial ca = new CertificateBuilder()
        .commonName("Test CA")
        .isCA()
        .generate();

    CertificateMaterial memberMaterial = new CertificateBuilder()
        .commonName("member")
        .issuedBy(ca)
        .generate();

    CertStores memberStore = new CertStores("member");
    memberStore.withCertificate("member", memberMaterial);
    memberStore.trust("ca", ca);

    try {
      return memberStore.propertiesWith(components, requireAuth, endPointIdentification);
    } catch (Exception e) {
      throw new RuntimeException(e.getMessage(), e);
    }
  }

  public T withSecurityManager(Class<? extends SecurityManager> securityManager) {
    properties.setProperty(SECURITY_MANAGER, securityManager.getName());
    return (T) this;
  }

  public T withCredential(String username, String password) {
    properties.setProperty(UserPasswordAuthInit.USER_NAME, username);
    properties.setProperty(UserPasswordAuthInit.PASSWORD, password);
    return (T) this;
  }

  public T withAutoStart() {
    autoStart = true;
    return (T) this;
  }

  public T withName(String name) {
    // only if name is not defined yet
    if (!properties.containsKey(NAME)) {
      this.name = name;
      properties.putIfAbsent(NAME, name);
    }
    return (T) this;
  }

  public T withConnectionToLocator(int... locatorPorts) {
    if (locatorPorts.length == 0) {
      return (T) this;
    }
    String locators = Arrays.stream(locatorPorts).mapToObj(i -> "localhost[" + i + "]")
        .collect(Collectors.joining(","));
    properties.setProperty(LOCATORS, locators);
    return (T) this;
  }

  /*
   * be able to start JMX manager and admin rest on default ports
   */
  public T withJMXManager(boolean useProductDefaultPorts) {
    if (!useProductDefaultPorts) {
      // do no override these properties if already exists
      properties.putIfAbsent(JMX_MANAGER_PORT, availableJmxPort + "");
      jmxPort = Integer.parseInt(properties.getProperty(JMX_MANAGER_PORT));
    } else {
      // the real port numbers will be set after we started the server/locator.
      jmxPort = 0;
    }
    properties.putIfAbsent(JMX_MANAGER, "true");
    properties.putIfAbsent(JMX_MANAGER_START, "true");
    return (T) this;
  }

  public T withHttpService(boolean useDefaultPort) {
    properties.setProperty(HTTP_SERVICE_BIND_ADDRESS, "localhost");
    if (!useDefaultPort) {
      properties.put(HTTP_SERVICE_PORT, availableHttpPort + "");
      httpPort = Integer.parseInt(properties.getProperty(HTTP_SERVICE_PORT));
    } else {
      // indicate start http service but with default port
      // (different from Gemfire properties, 0 means do not start http service)
      httpPort = 0;
    }
    return (T) this;
  }

  public void setCleanWorkingDir(boolean cleanWorkingDir) {
    this.cleanWorkingDir = cleanWorkingDir;
  }

  /*
   * start the jmx manager and admin rest on a random ports
   */
  public T withJMXManager() {
    return withJMXManager(false);
  }

  public T withHttpService() {
    return withHttpService(false);
  }


  protected void normalizeProperties() {
    // if name is set via property, not with API
    if (name == null) {
      if (properties.containsKey(NAME)) {
        name = properties.getProperty(NAME);
      } else {
        if (this instanceof ServerStarterRule) {
          name = "server";
        } else {
          name = "locator";
        }
      }
      withName(name);
    }

    // if jmxPort is set via property, not with API
    if (jmxPort < 0 && properties.containsKey(JMX_MANAGER_PORT)) {
      // this will make sure we have all the missing properties, but it won't override
      // the existing properties
      withJMXManager(false);
    }

    // if caller wants the logs being put into a file instead of in console output
    // do it here since only here, we can guarantee the name is present
    if (logFile) {
      properties.putIfAbsent(LOG_FILE, new File(name + ".log").getAbsolutePath());
    }
  }

  public DistributedRegionMXBean getRegionMBean(String regionName) {
    return getManagementService().getDistributedRegionMXBean(regionName);
  }

  public ManagementService getManagementService() {
    ManagementService managementService =
        ManagementService.getExistingManagementService(getCache());
    if (managementService == null) {
      throw new IllegalStateException("Management service is not available on this member");
    }
    return managementService;
  }

  public abstract InternalCache getCache();

  public void waitUntilRegionIsReadyOnExactlyThisManyServers(String regionName,
      int exactServerCount) throws Exception {
    if (exactServerCount == 0) {
      waitUntilEqual(
          () -> getRegionMBean(regionName),
          Objects::isNull,
          true,
          String.format("Expecting to not find an mbean for region '%s'", regionName),
          WAIT_UNTIL_TIMEOUT, TimeUnit.SECONDS);
      return;
    }
    // First wait until the region mbean is not null...
    waitUntilEqual(
        () -> getRegionMBean(regionName),
        Objects::nonNull,
        true,
        String.format("Expecting to find an mbean for region '%s'", regionName),
        WAIT_UNTIL_TIMEOUT, TimeUnit.SECONDS);

    // Now actually wait for the members to receive the region
    String assertionConditionDescription = String.format(
        "Expecting region '%s' to be found on exactly %d servers", regionName, exactServerCount);
    waitUntilSatisfied(
        () -> Arrays.asList(getRegionMBean(regionName).getMembers()),
        Function.identity(),
        members -> Assertions.assertThat(members).isNotNull().hasSize(exactServerCount),
        assertionConditionDescription,
        WAIT_UNTIL_TIMEOUT, TimeUnit.SECONDS);
  }

  public void waitTillClientsAreReadyOnServer(String serverName, int serverPort, int clientCount) {
    waitTillCacheServerIsReady(serverName, serverPort);
    CacheServerMXBean bean = getCacheServerMXBean(serverName, serverPort);
    await().until(() -> bean.getClientIds().length == clientCount);
  }

  /*
   * Invoked in serverVM
   */

  /**
   * convenience method to create a region with customized regionFactory
   *
   * @param <K> the type of keys in the region
   * @param <V> the ype of values in the region
   * @param type the {@link RegionShortcut} to use to create the region
   * @param name the name of the region to create
   * @param regionFactoryConsumer a lamda that allows you to customize the regionFactory
   * @return the newly created region
   */
  public <K, V> Region<K, V> createRegion(RegionShortcut type, String name,
      Consumer<RegionFactory<K, V>> regionFactoryConsumer) {
    RegionFactory<K, V> regionFactory = getCache().createRegionFactory(type);
    regionFactoryConsumer.accept(regionFactory);
    return regionFactory.create(name);
  }

  public <K, V> Region<K, V> createRegion(RegionShortcut type, String name) {
    final RegionFactory<K, V> regionFactory = getCache().createRegionFactory(type);
    return regionFactory.create(name);
  }

  /**
   * convenience method to create a partition region with customized regionFactory and a customized
   * PartitionAttributeFactory
   *
   * @param name the name of the region to create
   * @param regionFactoryConsumer a lamda that allows you to customize the regionFactory
   * @param attributesFactoryConsumer a lamda that allows you to customize the
   *        partitionAttributeFactory
   * @return a Partitioned Region
   */
  public Region createPartitionRegion(String name, Consumer<RegionFactory> regionFactoryConsumer,
      Consumer<PartitionAttributesFactory> attributesFactoryConsumer) {
    return createRegion(RegionShortcut.PARTITION, name, rf -> {
      regionFactoryConsumer.accept(rf);
      PartitionAttributesFactory attributeFactory = new PartitionAttributesFactory();
      attributesFactoryConsumer.accept(attributeFactory);
      rf.setPartitionAttributes(attributeFactory.create());
    });
  }

  public void waitTillCacheClientProxyHasBeenPaused() {
    await().until(() -> {
      CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
      Collection<CacheClientProxy> clientProxies = clientNotifier.getClientProxies();

      for (CacheClientProxy clientProxy : clientProxies) {
        if (clientProxy.isPaused()) {
          return true;
        }
      }
      return false;
    });
  }

  public void waitTillCacheServerIsReady(String serverName, int serverPort) {
    await()
        .until(() -> getCacheServerMXBean(serverName, serverPort) != null);
  }

  public CacheServerMXBean getCacheServerMXBean(String serverName, int serverPort) {
    SystemManagementService managementService = (SystemManagementService) getManagementService();
    String objectName = MessageFormat.format(OBJECTNAME__CLIENTSERVICE_MXBEAN,
        String.valueOf(serverPort), serverName);
    ObjectName cacheServerMBeanName = MBeanJMXAdapter.getObjectName(objectName);
    return managementService.getMBeanProxy(cacheServerMBeanName, CacheServerMXBean.class);
  }

  public void waitUntilDiskStoreIsReadyOnExactlyThisManyServers(String diskStoreName,
      int exactServerCount) throws Exception {
    final Supplier<DistributedSystemMXBean> distributedSystemMXBeanSupplier =
        () -> getManagementService().getDistributedSystemMXBean();

    waitUntilSatisfied(distributedSystemMXBeanSupplier,
        Function.identity(),
        bean -> assertThat(bean, notNullValue()),
        "Distributed System MXBean should not be null",
        WAIT_UNTIL_TIMEOUT, TimeUnit.SECONDS);

    DistributedSystemMXBean dsMXBean = distributedSystemMXBeanSupplier.get();

    String predicateDescription = String.format(
        "Expecting exactly %d servers to present mbeans for a disk store with name %s.",
        exactServerCount, diskStoreName);
    Supplier<List<String[]>> diskStoreSupplier = () -> dsMXBean.listMemberDiskstore()
        .values().stream().filter(x1 -> ArrayUtils.contains(x1, diskStoreName))
        .collect(Collectors.toList());

    waitUntilEqual(diskStoreSupplier,
        List::size,
        exactServerCount,
        predicateDescription,
        WAIT_UNTIL_TIMEOUT, TimeUnit.SECONDS);
  }

  public void waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers(String queueId,
      int exactServerCount)
      throws Exception {
    String examinerDescription = String.format(
        "Expecting exactly %d servers to have an AEQ with id '%s'.", exactServerCount, queueId);
    waitUntilEqual(
        () -> CliUtils.getMembersWithAsyncEventQueue(getCache(), queueId),
        Set::size,
        exactServerCount,
        examinerDescription,
        WAIT_UNTIL_TIMEOUT, TimeUnit.SECONDS);
  }

  /**
   * This method wraps an {@link GeodeAwaitility#await()} call for more meaningful error
   * reporting.
   *
   * @param <K> the type of results supplied by {@code supplier}
   * @param <J> the type of results returned by {@code examiner}
   * @param supplier Method to retrieve the result to be tested, e.g.,
   *        get a list of visible region mbeans
   * @param examiner Method to evaluate the result provided by {@code provider}, e.g.,
   *        get the length of the provided list.
   *        Use {@link java.util.function.Function#identity()} if {@code assertionConsumer}
   *        directly tests the value provided by {@code supplier}.
   * @param assertionConsumer assertThat styled condition on the output of {@code examiner} against
   *        which
   *        the {@code await().untilAsserted(...)} will be called. E.g.,
   *        {@code beanCount -> assertThat(beanCount, is(5))}
   * @param assertionConsumerDescription A description of the {@code assertionConsumer} method,
   *        for additional failure information should this call time out.
   *        E.g., "Visible region mbean count should be 5"
   * @param timeout With {@code unit}, the maximum time to wait before raising an exception.
   * @param unit With {@code timeout}, the maximum time to wait before raising an exception.
   * @throws org.awaitility.core.ConditionTimeoutException The timeout has been reached
   * @throws Exception Any exception produced by {@code provider.call()}
   */
  public <K, J> void waitUntilSatisfied(Supplier<K> supplier, Function<K, J> examiner,
      Consumer<J> assertionConsumer, String assertionConsumerDescription, long timeout,
      TimeUnit unit)
      throws Exception {
    try {
      await(assertionConsumerDescription)
          .atMost(timeout, unit)
          .untilAsserted(() -> assertionConsumer.accept(examiner.apply(supplier.get())));
    } catch (ConditionTimeoutException e) {
      // There is a very slight race condition here, where the above could conceivably time out,
      // and become satisfied before the next supplier.get()
      throw new ConditionTimeoutException(
          "The observed result '" + supplier.get()
              + "' does not satisfy the provided assertionConsumer. \n" + e.getMessage());
    }
  }

  /**
   * Convenience alias for {@link #waitUntilSatisfied},
   * requiring equality rather than a generic assertion.
   *
   * @param <K> the type of results supplied by {@code provider}
   * @param <J> the type of results returned by {@code examiner}
   * @param provider Method to retrieve the result to be tested, e.g.,
   *        get a list of visible region mbeans
   * @param examiner Method to evaluate the result provided by {@code provider}, e.g.,
   *        get the length of the provided list.
   *        Use {@link java.util.function.Function#identity()} if {@code assertionConsumer}
   *        directly tests the value provided by {@code supplier}.
   * @param expectation assertThat styled condition on the output of {@code examiner} against
   *        which
   *        the {@code await().untilAsserted(...)} will be called. E.g.,
   *        {@code beanCount -> assertThat(beanCount, is(5))}
   * @param expectationDesription A description of the {@code assertionConsumer} method,
   *        for additional failure information should this call time out.
   *        E.g., "Visible region mbean count should be 5"
   * @param timeout With {@code unit}, the maximum time to wait before raising an exception.
   * @param unit With {@code timeout}, the maximum time to wait before raising an exception.
   * @throws org.awaitility.core.ConditionTimeoutException The timeout has been reached
   * @throws Exception Any exception produced by {@code provider.call()}
   */
  public <K, J> void waitUntilEqual(Supplier<K> provider,
      Function<K, J> examiner,
      J expectation,
      String expectationDesription,
      long timeout, TimeUnit unit)
      throws Exception {
    Consumer<J> assertionConsumer = examined -> assertThat(examined, is(expectation));
    waitUntilSatisfied(provider, examiner, assertionConsumer, expectationDesription, timeout, unit);
  }

  abstract void stopMember();

  public void forceDisconnectMember() {
    MembershipManagerHelper
        .crashDistributedSystem(InternalDistributedSystem.getConnectedInstance());
  }

  public abstract void waitTilFullyReconnected();

  @Override
  public File getWorkingDir() {
    return new File(System.getProperty("user.dir"));
  }

  @Override
  public int getPort() {
    return memberPort;
  }

  @Override
  public int getJmxPort() {
    return jmxPort;
  }

  @Override
  public int getHttpPort() {
    return httpPort;
  }

  @Override
  public String getName() {
    return name;
  }
}
