/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.execute;

import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.UseParametersRunnerFactory;

import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.Locator;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.functions.TestFunction;
import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.ClientServerTest;
import org.apache.geode.test.junit.categories.FunctionServiceTest;
import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;

@Category({ClientServerTest.class, FunctionServiceTest.class})
@RunWith(Parameterized.class)
@UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
public class PRClientServerRegionFunctionExecutionFailoverDUnitTest extends PRClientServerTestBase {

  private static final Logger logger = LogService.getLogger();

  private static Locator locator = null;

  private static Region<Integer, Object> region = null;

  @Override
  protected void postSetUpPRClientServerTestBase() {
    IgnoredException.addIgnoredException("Connection reset");
    IgnoredException.addIgnoredException("SocketTimeoutException");
    IgnoredException.addIgnoredException("ServerConnectivityException");
    IgnoredException.addIgnoredException("Socket Closed");
  }

  @Test
  public void testserverMultiKeyExecution_SocektTimeOut() {
    createScenario();
    Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
    registerFunctionAtServer(function);
    client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
        .serverMultiKeyExecutionSocketTimeOut(Boolean.TRUE));
  }

  /*
   * Ensure that the while executing the function if the servers is down then the execution is
   * failover to other available server
   */
  @Test
  public void testServerFailoverWithTwoServerAliveHA() {
    IgnoredException.addIgnoredException("FunctionInvocationTargetException");
    ArrayList commonAttributes =
        createCommonServerAttributes("TestPartitionedRegion", null, 1, null);
    createClientServerScenarion(commonAttributes, 20, 20, 20);
    Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA);
    registerFunctionAtServer(function);
    server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::stopServerHA);
    server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::stopServerHA);
    client.invoke(PRClientServerRegionFunctionExecutionDUnitTest::putOperation);
    int AsyncInvocationArrSize = 1;
    AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize];
    async[0] = client
        .invokeAsync(PRClientServerRegionFunctionExecutionDUnitTest::executeFunctionHA);
    server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::startServerHA);
    server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::startServerHA);
    server1.invoke(PRClientServerRegionFunctionExecutionDUnitTest::stopServerHA);
    client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
        .verifyDeadAndLiveServers(2));
    ThreadUtils.join(async[0], 6 * 60 * 1000);
    if (async[0].getException() != null) {
      Assert.fail("UnExpected Exception Occurred : ", async[0].getException());
    }
    List l = (List) async[0].getReturnValue();
    assertEquals(2, l.size());
  }

  /*
   * Ensure that the while executing the function if the servers is down then the execution is
   * failover to other available server
   */
  @Test
  public void testServerCacheClosedFailoverWithTwoServerAliveHA() {
    IgnoredException.addIgnoredException("FunctionInvocationTargetException");
    ArrayList commonAttributes =
        createCommonServerAttributes("TestPartitionedRegion", null, 1, null);
    createClientServerScenarion(commonAttributes, 20, 20, 20);
    Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA);
    registerFunctionAtServer(function);
    server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::stopServerHA);
    server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::stopServerHA);
    client.invoke(PRClientServerRegionFunctionExecutionDUnitTest::putOperation);
    int AsyncInvocationArrSize = 1;
    AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize];
    async[0] = client
        .invokeAsync(PRClientServerRegionFunctionExecutionDUnitTest::executeFunctionHA);
    server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::startServerHA);
    server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::startServerHA);
    server1.invoke(PRClientServerRegionFunctionExecutionDUnitTest::closeCacheHA);
    client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
        .verifyDeadAndLiveServers(2));
    ThreadUtils.join(async[0], 5 * 60 * 1000);
    if (async[0].getException() != null) {
      Assert.fail("UnExpected Exception Occurred : ", async[0].getException());
    }
    List l = (List) async[0].getReturnValue();
    assertEquals(2, l.size());
  }

  @Test
  public void testBug40714() {
    createScenario();
    server1.invoke(
        (SerializableRunnableIF) PRClientServerRegionFunctionExecutionDUnitTest::registerFunction);
    server1.invoke(
        (SerializableRunnableIF) PRClientServerRegionFunctionExecutionDUnitTest::registerFunction);
    server1.invoke(
        (SerializableRunnableIF) PRClientServerRegionFunctionExecutionDUnitTest::registerFunction);
    client.invoke(
        (SerializableRunnableIF) PRClientServerRegionFunctionExecutionDUnitTest::registerFunction);
    client.invoke(
        PRClientServerRegionFunctionExecutionDUnitTest::FunctionExecution_Inline_Bug40714);
  }

  @Test
  public void testOnRegionFailoverWithTwoServerDownHA() {
    IgnoredException.addIgnoredException("FunctionInvocationTargetException");
    createScenario();

    server1.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);

    server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);

    server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);

    client.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createProxyRegion);

    Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA_REGION);
    registerFunctionAtServer(function);

    client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
        .regionExecutionHATwoServerDown(Boolean.FALSE, function, Boolean.FALSE));

    client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest.verifyMetaData(2, 1));
  }

  // retry attempts is 2
  @Test
  public void testOnRegionFailoverWithOneServerDownHA() {
    IgnoredException.addIgnoredException("FunctionInvocationTargetException");
    createScenario();

    server1
        .invokeAsync(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);

    server2
        .invokeAsync(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);

    server3
        .invokeAsync(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);

    client.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createProxyRegion);

    Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA_REGION);
    registerFunctionAtServer(function);

    client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
        .regionExecutionHAOneServerDown(Boolean.FALSE, function, Boolean.FALSE));

    client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
        .verifyMetaData(1, 1));
  }

  /*
   * Ensure that the while executing the function if the servers are down then the execution
   * shouldn't failover to other available server
   */
  @Test
  public void testOnRegionFailoverNonHA() {
    createScenario();
    IgnoredException.addIgnoredException("FunctionInvocationTargetException");
    server1.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);

    server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);

    server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);

    client.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createProxyRegion);

    Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_NONHA_REGION);
    registerFunctionAtServer(function);

    client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
        .regionSingleKeyExecutionNonHA(Boolean.FALSE, function, Boolean.FALSE));
    client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
        .verifyMetaData(1, 0));
  }

  /*
   * Ensure that the while executing the function if the servers are down then the execution
   * shouldn't failover to other available server
   */
  @Test
  public void testOnRegionFailoverNonHASingleHop() {
    ArrayList commonAttributes =
        createCommonServerAttributes("TestPartitionedRegion", null, 0, null);
    createClientServerScenarioSingleHop(commonAttributes, 20, 20, 20);

    server1.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);

    server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);

    server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);

    client.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createProxyRegion);

    // Make sure the buckets are created.
    client.invoke(new SerializableRunnable() {
      @Override
      public void run() {
        region = cache.getRegion(PRClientServerTestBase.PartitionedRegionName);
        for (int i = 0; i < 13; i++) {
          region.put(i, i);
        }
      }
    });

    // Make sure the client metadata is up to date.
    client.invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::fetchMetaData);

    Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_NONHA_REGION);
    registerFunctionAtServer(function);
    final Function function2 = new TestFunction(true, TestFunction.TEST_FUNCTION_NONHA_NOP);
    registerFunctionAtServer(function2);

    client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
        .regionSingleKeyExecutionNonHA(Boolean.FALSE, function, Boolean.FALSE));


    // This validation doesn't work because the client may
    // still be asynchronously recording the departure of the
    // failed server
    // System.err.println("Trying the second function");
    // //Make sure the client can now execute a function
    // //on the server
    // client.invoke(new SerializableRunnable() {
    // @Override
    // public void run() {
    // ResultCollector rs = FunctionService.onRegion(region).execute(function2);
    // rs.getResult();
    // }
    // });

    client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
        .verifyMetaData(1, 0));
  }

  @Test
  public void testServerBucketMovedException() {

    IgnoredException.addIgnoredException("BucketMovedException");
    final Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);
    VM server4 = host.getVM(3);

    disconnectAllFromDS();

    ArrayList commonAttributes =
        createCommonServerAttributes("TestPartitionedRegion", null, 1, null);

    final int portLocator = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
    final String hostLocator = NetworkUtils.getServerHostName(server1.getHost());
    final String locator = hostLocator + "[" + portLocator + "]";

    startLocatorInVM(portLocator);
    try {

      server1.invoke(() -> createServerWithLocator(locator, false, commonAttributes));

      server2.invoke(() -> createServerWithLocator(locator, false, commonAttributes));

      server4.invoke(() -> createClientWithLocator(hostLocator, portLocator));
      server4.invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::putIntoRegion);

      server4.invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::fetchMetaData);

      server3.invoke(() -> createServerWithLocator(locator, false, commonAttributes));

      Object result = server4
          .invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::executeFunction);
      List l = (List) result;
      assertEquals(2, l.size());

    } finally {
      stopLocator();
    }
  }

  @Test
  public void testServerBucketMovedException_LocalServer() {
    IgnoredException.addIgnoredException("BucketMovedException");

    final Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server4 = host.getVM(3);

    ArrayList commonAttributes =
        createCommonServerAttributes("TestPartitionedRegion", null, 0, null);

    final int portLocator = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
    final String hostLocator = NetworkUtils.getServerHostName(server1.getHost());
    final String locator = hostLocator + "[" + portLocator + "]";

    startLocatorInVM(portLocator);
    try {

      server1.invoke(() -> createServerWithLocator(locator, false, commonAttributes));

      server4.invoke(() -> createClientWithLocator(hostLocator, portLocator));
      server4.invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::putIntoRegion);

      server4.invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::fetchMetaData);

      server2.invoke(() -> createServerWithLocator(locator, false, commonAttributes));

      Object result = server4
          .invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::executeFunction);
      List l = (List) result;
      assertEquals(2, l.size());

    } finally {
      stopLocator();
    }
  }

  private static void fetchMetaData() {
    ((GemFireCacheImpl) cache).getClientMetadataService().getClientPRMetadata((LocalRegion) region);
  }

  private void startLocatorInVM(final int locatorPort) {

    File logFile = new File("locator-" + locatorPort + ".log");

    Properties props = new Properties();
    props = DistributedTestUtils.getAllDistributedSystemProperties(props);
    props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");

    try {
      locator = Locator.startLocatorAndDS(locatorPort, logFile, null, props);
    } catch (IOException e) {
      Assert.fail("Unable to start locator ", e);
    }
  }

  public static void stopLocator() {
    locator.stop();
  }

  private int createServerWithLocator(String locator, boolean isAccessor, ArrayList commonAttrs) {
    Properties props = new Properties();
    props.setProperty(LOCATORS, locator);
    DistributedSystem ds = getSystem(props);
    cache = CacheFactory.create(ds);

    CacheServer server = cache.addCacheServer();
    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
    server.setPort(port);
    server.setHostnameForClients("localhost");
    try {
      server.start();
    } catch (IOException e) {
      Assert.fail("Failed to start server ", e);
    }
    PartitionAttributesFactory paf = new PartitionAttributesFactory();
    if (isAccessor) {
      paf.setLocalMaxMemory(0);
    }
    paf.setTotalNumBuckets((Integer) commonAttrs.get(3))
        .setRedundantCopies((Integer) commonAttrs.get(2));


    AttributesFactory attr = new AttributesFactory();
    attr.setPartitionAttributes(paf.create());
    region = cache.createRegion(regionName, attr.create());
    assertNotNull(region);
    logger
        .info("Partitioned Region " + regionName + " created Successfully :" + region.toString());
    return port;
  }

  private void createClientWithLocator(String host, int port0) {
    Properties props = new Properties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "");
    DistributedSystem ds = getSystem(props);
    cache = CacheFactory.create(ds);
    assertNotNull(cache);
    CacheServerTestUtil.disableShufflingOfEndpoints();
    Pool p;
    try {
      p = PoolManager.createFactory().addLocator(host, port0).setPingInterval(250)
          .setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(2000)
          .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10).setRetryAttempts(3)
          .create("Pool_" + regionName);
    } finally {
      CacheServerTestUtil.enableShufflingOfEndpoints();
    }
    AttributesFactory factory = new AttributesFactory();
    factory.setPoolName(p.getName());
    factory.setDataPolicy(DataPolicy.EMPTY);
    RegionAttributes attrs = factory.create();
    region = cache.createRegion(regionName, attrs);
    assertNotNull(region);
    logger
        .info("Distributed Region " + regionName + " created Successfully :" + region.toString());
  }

  public static void putIntoRegion() {
    for (int i = 0; i < 113; i++) {
      region.put(i, "KB_" + i);
    }
    logger
        .info("Distributed Region " + regionName + " Have size :" + region.size());
  }

  public static Object executeFunction() {
    Execution execute = FunctionService.onRegion(region);
    ResultCollector rc = execute.setArguments(Boolean.TRUE)
        .execute(new TestFunction(true, TestFunction.TEST_FUNCTION_LASTRESULT));
    logger.info("Exeuction Result :" + rc.getResult());
    List l = ((List) rc.getResult());
    return l;
  }

  protected void createScenario() {
    ArrayList commonAttributes =
        createCommonServerAttributes("TestPartitionedRegion", null, 0, null);
    createClientServerScenarion(commonAttributes, 20, 20, 20);
  }


}
