/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.execute;

import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.runners.Parameterized.UseParametersRunnerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionAdapter;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.cache.PartitionAttributesImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionTestHelper;
import org.apache.geode.internal.cache.functions.TestFunction;
import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.SerializableCallableIF;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.junit.categories.FunctionServiceTest;
import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;

/**
 * This is DUnite Test to test the Function Execution stats under various scenarios like
 * Client-Server with Region/without Region, P2P with partitioned Region/Distributed Region,member
 * Execution
 */
@Category({FunctionServiceTest.class})
@RunWith(Parameterized.class)
@UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
public class FunctionServiceStatsDUnitTest extends PRClientServerTestBase {

  private static final Logger logger = LogService.getLogger();

  static InternalDistributedSystem ds = null;

  private static int noOfExecutionCalls_Aggregate = 0;
  private static int noOfExecutionsCompleted_Aggregate = 0;
  private static int resultReceived_Aggregate = 0;

  private static int noOfExecutionCalls_TESTFUNCTION1 = 0;
  private static int noOfExecutionsCompleted_TESTFUNCTION1 = 0;
  private static int resultReceived_TESTFUNCTION1 = 0;

  private static int noOfExecutionCalls_TESTFUNCTION2 = 0;
  private static int noOfExecutionsCompleted_TESTFUNCTION2 = 0;
  private static int resultReceived_TESTFUNCTION2 = 0;

  private static int noOfExecutionCalls_TESTFUNCTION3 = 0;
  private static int noOfExecutionsCompleted_TESTFUNCTION3 = 0;
  private static int resultReceived_TESTFUNCTION3 = 0;

  private static int noOfExecutionCalls_TESTFUNCTION5 = 0;
  private static int noOfExecutionsCompleted_TESTFUNCTION5 = 0;
  private static int resultReceived_TESTFUNCTION5 = 0;

  private static int noOfExecutionCalls_Inline = 0;
  private static int noOfExecutionsCompleted_Inline = 0;
  private static int resultReceived_Inline = 0;

  @Override
  protected final void postSetUpPRClientServerTestBase() {
    // Make sure stats to linger from a previous test
    disconnectAllFromDS();
  }

  private final transient SerializableRunnableIF initializeStats = () -> {
    noOfExecutionCalls_Aggregate = 0;
    noOfExecutionsCompleted_Aggregate = 0;
    resultReceived_Aggregate = 0;

    noOfExecutionCalls_TESTFUNCTION1 = 0;
    noOfExecutionsCompleted_TESTFUNCTION1 = 0;
    resultReceived_TESTFUNCTION1 = 0;

    noOfExecutionCalls_TESTFUNCTION2 = 0;
    noOfExecutionsCompleted_TESTFUNCTION2 = 0;
    resultReceived_TESTFUNCTION2 = 0;

    noOfExecutionCalls_TESTFUNCTION3 = 0;
    noOfExecutionsCompleted_TESTFUNCTION3 = 0;
    resultReceived_TESTFUNCTION3 = 0;

    noOfExecutionCalls_TESTFUNCTION5 = 0;
    noOfExecutionsCompleted_TESTFUNCTION5 = 0;
    resultReceived_TESTFUNCTION5 = 0;

    noOfExecutionCalls_Inline = 0;
    noOfExecutionsCompleted_Inline = 0;
    resultReceived_Inline = 0;
  };

  private final transient SerializableRunnableIF closeDistributedSystem = () -> {
    if (getCache() != null && !getCache().isClosed()) {
      getCache().close();
      getCache().getDistributedSystem().disconnect();
    }
  };

  /*
   * This helper method prevents race conditions in local functions. Typically, when calling
   * ResultCollector.getResult() one might expect the function to have completed. For local
   * functions this is true, however, at this point the function stats may not have been updated yet
   * thus any code which checks stats after calling getResult() may get wrong data.
   */
  private void waitNoFunctionsRunning(FunctionServiceStats stats) {
    int count = 100;
    while (stats.getFunctionExecutionsRunning() > 0 && count > 0) {
      count--;
      try {
        Thread.sleep(50);
      } catch (InterruptedException ex) {
        // Ignored
      }
    }
  }

  /**
   * 1-client 3-Servers Function : TEST_FUNCTION2 Function : TEST_FUNCTION3 Execution of the
   * function on serverRegion with set multiple keys as the routing object and using the name of the
   * function
   *
   * On server side, function execution calls should be equal to the no of function executions
   * completed.
   */
  @Test
  public void testClientServerPartitonedRegionFunctionExecutionStats() {
    createScenario();
    registerFunctionAtServer(new TestFunction(true, TestFunction.TEST_FUNCTION2));
    registerFunctionAtServer(new TestFunction(true, TestFunction.TEST_FUNCTION3));

    client.invoke(initializeStats);
    server1.invoke(initializeStats);
    server2.invoke(initializeStats);
    server3.invoke(initializeStats);

    client.invoke(() -> {
      Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
      assertNotNull(region);
      final HashSet<String> testKeysSet = new HashSet<>();
      for (int i = (totalNumBuckets * 2); i > 0; i--) {
        testKeysSet.add("execKey-" + i);
      }
      DistributedSystem.setThreadsSocketPolicy(false);
      Function function = new TestFunction(true, TestFunction.TEST_FUNCTION2);
      if (shouldRegisterFunctionsOnClient()) {
        FunctionService.registerFunction(function);
      }
      Execution dataSet = FunctionService.onRegion(region);
      try {
        int j = 0;
        for (String s : testKeysSet) {
          Integer val = j++;
          region.put(s, val);
        }
        ResultCollector rc =
            dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(function.getId());
        int resultSize = ((List) rc.getResult()).size();
        resultReceived_Aggregate += resultSize;
        resultReceived_TESTFUNCTION2 += resultSize;
        noOfExecutionCalls_Aggregate++;
        noOfExecutionCalls_TESTFUNCTION2++;
        noOfExecutionsCompleted_Aggregate++;
        noOfExecutionsCompleted_TESTFUNCTION2++;

        rc = dataSet.withFilter(testKeysSet).setArguments(testKeysSet).execute(function.getId());
        resultSize = ((List) rc.getResult()).size();
        resultReceived_Aggregate += resultSize;
        resultReceived_TESTFUNCTION2 += resultSize;
        noOfExecutionCalls_Aggregate++;
        noOfExecutionCalls_TESTFUNCTION2++;
        noOfExecutionsCompleted_Aggregate++;
        noOfExecutionsCompleted_TESTFUNCTION2++;

        function = new TestFunction(true, TestFunction.TEST_FUNCTION3);
        FunctionService.registerFunction(function);
        rc = dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(function.getId());
        resultSize = ((List) rc.getResult()).size();
        resultReceived_Aggregate += resultSize;
        resultReceived_TESTFUNCTION3 += resultSize;
        noOfExecutionCalls_Aggregate++;
        noOfExecutionCalls_TESTFUNCTION3++;
        noOfExecutionsCompleted_Aggregate++;
        noOfExecutionsCompleted_TESTFUNCTION3++;

      } catch (Exception e) {
        logger.info("Exception : " + e.getMessage());
        e.printStackTrace();
        fail("Test failed after the put operation");
      }
    });

    client.invoke(() -> {
      // checks for the aggregate stats
      InternalDistributedSystem iDS = (InternalDistributedSystem) cache.getDistributedSystem();
      FunctionServiceStats functionServiceStats = iDS.getFunctionServiceStats();
      waitNoFunctionsRunning(functionServiceStats);

      assertEquals(noOfExecutionCalls_Aggregate,
          functionServiceStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_Aggregate,
          functionServiceStats.getFunctionExecutionsCompleted());
      assertTrue(functionServiceStats.getResultsReceived() >= resultReceived_Aggregate);

      logger.info("Calling FunctionStats for  TEST_FUNCTION2 :");
      FunctionStats functionStats =
          FunctionStats.getFunctionStats(TestFunction.TEST_FUNCTION2, iDS);
      logger.info("Called FunctionStats for  TEST_FUNCTION2 :");
      assertEquals(noOfExecutionCalls_TESTFUNCTION2, functionStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_TESTFUNCTION2,
          functionStats.getFunctionExecutionsCompleted());
      assertTrue(functionStats.getResultsReceived() >= resultReceived_TESTFUNCTION2);

      functionStats = FunctionStats.getFunctionStats(TestFunction.TEST_FUNCTION3, iDS);
      assertEquals(noOfExecutionCalls_TESTFUNCTION3, functionStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_TESTFUNCTION3,
          functionStats.getFunctionExecutionsCompleted());
      assertTrue(functionStats.getResultsReceived() >= resultReceived_TESTFUNCTION3);
    });

    SerializableRunnableIF checkStatsOnServer = () -> {
      // checks for the aggregate stats
      InternalDistributedSystem iDS = (InternalDistributedSystem) cache.getDistributedSystem();
      FunctionServiceStats functionServiceStats = iDS.getFunctionServiceStats();
      waitNoFunctionsRunning(functionServiceStats);

      // functions are executed 3 times
      noOfExecutionCalls_Aggregate += 3;
      assertTrue(
          functionServiceStats.getFunctionExecutionCalls() >= noOfExecutionCalls_Aggregate);
      noOfExecutionsCompleted_Aggregate += 3;
      assertTrue(functionServiceStats
          .getFunctionExecutionsCompleted() >= noOfExecutionsCompleted_Aggregate);

      FunctionStats functionStats =
          FunctionStats.getFunctionStats(TestFunction.TEST_FUNCTION2, iDS);
      // TEST_FUNCTION2 is executed twice
      noOfExecutionCalls_TESTFUNCTION2 += 2;
      assertTrue(functionStats.getFunctionExecutionCalls() >= noOfExecutionCalls_TESTFUNCTION2);
      noOfExecutionsCompleted_TESTFUNCTION2 += 2;
      assertTrue(functionStats
          .getFunctionExecutionsCompleted() >= noOfExecutionsCompleted_TESTFUNCTION2);

      functionStats = FunctionStats.getFunctionStats(TestFunction.TEST_FUNCTION3, iDS);
      // TEST_FUNCTION3 is executed once
      noOfExecutionCalls_TESTFUNCTION3 += 1;
      assertTrue(functionStats.getFunctionExecutionCalls() >= noOfExecutionCalls_TESTFUNCTION3);
      noOfExecutionsCompleted_TESTFUNCTION3 += 1;
      assertTrue(functionStats
          .getFunctionExecutionsCompleted() >= noOfExecutionsCompleted_TESTFUNCTION3);
    };

    server1.invoke(checkStatsOnServer);
    server2.invoke(checkStatsOnServer);
    server3.invoke(checkStatsOnServer);
  }

  /**
   * 1-client 3-Servers server1 : Replicate server2 : Replicate server3 : Replicate client : Empty
   * Function : TEST_FUNCTION2 Execution of the function on serverRegion with set multiple keys as
   * the routing object and using the name of the function
   *
   * On server side, function execution calls should be equal to the no of function executions
   * completed.
   */
  @Test
  public void testClientServerDistributedRegionFunctionExecutionStats() {

    final String regionName = "FunctionServiceStatsDUnitTest";
    SerializableCallableIF<Integer> createCahenServer = () -> {
      try {
        Properties props = new Properties();
        DistributedSystem ds = getSystem(props);
        assertNotNull(ds);
        ds.disconnect();
        ds = getSystem(props);
        cache = CacheFactory.create(ds);
        logger.info("Created Cache on Server");
        assertNotNull(cache);
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_ACK);
        factory.setDataPolicy(DataPolicy.REPLICATE);
        assertNotNull(cache);
        Region<String, Integer> region = cache.createRegion(regionName, factory.create());
        logger.info("Region Created :" + region);
        assertNotNull(region);
        for (int i = 1; i <= 200; i++) {
          region.put("execKey-" + i, i);
        }
        CacheServer server = cache.addCacheServer();
        assertNotNull(server);
        int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
        server.setPort(port);
        try {
          server.start();
        } catch (IOException e) {
          Assert.fail("Failed to start the Server", e);
        }
        assertTrue(server.isRunning());
        return server.getPort();
      } catch (Exception e) {
        Assert.fail("FunctionServiceStatsDUnitTest#createCache() Failed while creating the cache",
            e);
        throw e;
      }
    };
    final Integer port1 = server1.invoke(createCahenServer);
    final Integer port2 = server2.invoke(createCahenServer);
    final Integer port3 = server3.invoke(createCahenServer);

    client.invoke(() -> {
      try {
        Properties props = new Properties();
        props.put(MCAST_PORT, "0");
        props.put(LOCATORS, "");
        DistributedSystem ds = getSystem(props);
        assertNotNull(ds);
        ds.disconnect();
        ds = getSystem(props);
        cache = CacheFactory.create(ds);
        logger.info("Created Cache on Client");
        assertNotNull(cache);


        CacheServerTestUtil.disableShufflingOfEndpoints();
        Pool p;
        try {
          p = PoolManager.createFactory().addServer("localhost", port1)
              .addServer("localhost", port2).addServer("localhost", port3)
              .setPingInterval(250).setSubscriptionEnabled(false).setSubscriptionRedundancy(-1)
              .setReadTimeout(2000).setSocketBufferSize(1000).setMinConnections(6)
              .setMaxConnections(10).setRetryAttempts(3)
              .create("FunctionServiceStatsDUnitTest_pool");
        } finally {
          CacheServerTestUtil.enableShufflingOfEndpoints();
        }
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.LOCAL);
        factory.setDataPolicy(DataPolicy.EMPTY);
        factory.setPoolName(p.getName());
        assertNotNull(cache);
        Region<String, Integer> region = cache.createRegion(regionName, factory.create());
        logger.info("Client Region Created :" + region);
        assertNotNull(region);
        for (int i = 1; i <= 200; i++) {
          region.put("execKey-" + i, i);
        }
      } catch (Exception e) {
        Assert.fail("FunctionServiceStatsDUnitTest#createCache() Failed while creating the cache",
            e);
        throw e;
      }
    });

    client.invoke(initializeStats);
    server1.invoke(initializeStats);
    server2.invoke(initializeStats);
    server3.invoke(initializeStats);

    registerFunctionAtServer(new TestFunction(true, TestFunction.TEST_FUNCTION2));
    registerFunctionAtServer(new TestFunction(true, TestFunction.TEST_FUNCTION3));

    client.invoke(() -> {
      Function function2 = new TestFunction(true, TestFunction.TEST_FUNCTION2);
      Function function3 = new TestFunction(true, TestFunction.TEST_FUNCTION3);
      if (shouldRegisterFunctionsOnClient()) {
        FunctionService.registerFunction(function2);
        FunctionService.registerFunction(function3);
      }
      Region region = cache.getRegion(regionName);
      Set<String> filter = new HashSet<>();
      for (int i = 100; i < 120; i++) {
        filter.add("execKey-" + i);
      }

      try {
        noOfExecutionCalls_Aggregate++;
        noOfExecutionCalls_TESTFUNCTION2++;
        List list = (List) FunctionService.onRegion(region).withFilter(filter)
            .execute(function2).getResult();
        noOfExecutionsCompleted_Aggregate++;
        noOfExecutionsCompleted_TESTFUNCTION2++;
        int size = list.size();
        resultReceived_Aggregate += size;
        resultReceived_TESTFUNCTION2 += size;

        noOfExecutionCalls_Aggregate++;
        noOfExecutionCalls_TESTFUNCTION2++;
        list = (List) FunctionService.onRegion(region).withFilter(filter).execute(function2)
            .getResult();
        noOfExecutionsCompleted_Aggregate++;
        noOfExecutionsCompleted_TESTFUNCTION2++;
        size = list.size();
        resultReceived_Aggregate += size;
        resultReceived_TESTFUNCTION2 += size;
      } catch (Exception e) {
        e.printStackTrace();
        Assert.fail("test failed due to", e);
        throw e;
      }

    });

    client.invoke(() -> {
      // checks for the aggregate stats
      InternalDistributedSystem iDS = (InternalDistributedSystem) cache.getDistributedSystem();
      FunctionServiceStats functionServiceStats = iDS.getFunctionServiceStats();
      waitNoFunctionsRunning(functionServiceStats);

      assertEquals(noOfExecutionCalls_Aggregate,
          functionServiceStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_Aggregate,
          functionServiceStats.getFunctionExecutionsCompleted());
      assertEquals(resultReceived_Aggregate, functionServiceStats.getResultsReceived());

      FunctionStats functionStats =
          FunctionStats.getFunctionStats(TestFunction.TEST_FUNCTION2, iDS);
      assertEquals(noOfExecutionCalls_TESTFUNCTION2, functionStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_TESTFUNCTION2,
          functionStats.getFunctionExecutionsCompleted());
      assertEquals(resultReceived_TESTFUNCTION2, functionStats.getResultsReceived());
    });
  }

  /**
   * Execution of the function on server using the name of the function TEST_FUNCTION1
   * TEST_FUNCTION5 On client side, the no of result received should equal to the no of function
   * execution calls. On server side, function execution calls should be equal to the no of function
   * executions completed.
   */
  @Test
  public void testClientServerwithoutRegion() {
    createClientServerScenarionWithoutRegion();
    registerFunctionAtServer(new TestFunction(true, TestFunction.TEST_FUNCTION1));
    registerFunctionAtServer(new TestFunction(true, TestFunction.TEST_FUNCTION5));

    client.invoke(initializeStats);
    server1.invoke(initializeStats);
    server2.invoke(initializeStats);
    server3.invoke(initializeStats);

    client.invoke(() -> {
      DistributedSystem.setThreadsSocketPolicy(false);
      Function function = new TestFunction(true, TestFunction.TEST_FUNCTION1);
      if (shouldRegisterFunctionsOnClient()) {
        FunctionService.registerFunction(function);
      }
      Execution member = FunctionService.onServers(pool);

      try {
        ResultCollector rs = member.setArguments(Boolean.TRUE).execute(function.getId());
        int size = ((List) rs.getResult()).size();
        resultReceived_Aggregate += size;
        noOfExecutionCalls_Aggregate++;
        noOfExecutionsCompleted_Aggregate++;
        resultReceived_TESTFUNCTION1 += size;
        noOfExecutionCalls_TESTFUNCTION1++;
        noOfExecutionsCompleted_TESTFUNCTION1++;
      } catch (Exception ex) {
        ex.printStackTrace();
        logger.info("Exception : ", ex);
        fail("Test failed after the execute operation nn TRUE");
      }
      function = new TestFunction(true, TestFunction.TEST_FUNCTION5);
      if (shouldRegisterFunctionsOnClient()) {
        FunctionService.registerFunction(function);
      }
      try {
        ResultCollector rs = member.setArguments("Success").execute(function.getId());
        int size = ((List) rs.getResult()).size();
        resultReceived_Aggregate += size;
        noOfExecutionCalls_Aggregate++;
        noOfExecutionsCompleted_Aggregate++;
        resultReceived_TESTFUNCTION5 += size;
        noOfExecutionCalls_TESTFUNCTION5++;
        noOfExecutionsCompleted_TESTFUNCTION5++;
      } catch (Exception ex) {
        ex.printStackTrace();
        logger.info("Exception : ", ex);
        fail("Test failed after the execute operationssssss");
      }
    });


    client.invoke(() -> {
      // checks for the aggregate stats
      InternalDistributedSystem iDS = (InternalDistributedSystem) cache.getDistributedSystem();
      FunctionServiceStats functionServiceStats = iDS.getFunctionServiceStats();
      waitNoFunctionsRunning(functionServiceStats);

      assertEquals(noOfExecutionCalls_Aggregate,
          functionServiceStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_Aggregate,
          functionServiceStats.getFunctionExecutionsCompleted());
      assertEquals(resultReceived_Aggregate, functionServiceStats.getResultsReceived());

      FunctionStats functionStats =
          FunctionStats.getFunctionStats(TestFunction.TEST_FUNCTION1, iDS);
      assertEquals(noOfExecutionCalls_TESTFUNCTION1, functionStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_TESTFUNCTION1,
          functionStats.getFunctionExecutionsCompleted());
      assertEquals(resultReceived_TESTFUNCTION1, functionStats.getResultsReceived());

      functionStats = FunctionStats.getFunctionStats(TestFunction.TEST_FUNCTION5, iDS);
      assertEquals(noOfExecutionCalls_TESTFUNCTION5, functionStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_TESTFUNCTION5,
          functionStats.getFunctionExecutionsCompleted());
      assertEquals(resultReceived_TESTFUNCTION5, functionStats.getResultsReceived());

    });

    SerializableRunnableIF checkStatsOnServer = () -> {
      // checks for the aggregate stats
      InternalDistributedSystem iDS = (InternalDistributedSystem) cache.getDistributedSystem();
      FunctionServiceStats functionServiceStats = iDS.getFunctionServiceStats();
      waitNoFunctionsRunning(functionServiceStats);

      // functions are executed 2 times
      noOfExecutionCalls_Aggregate += 2;
      assertEquals(noOfExecutionCalls_Aggregate,
          functionServiceStats.getFunctionExecutionCalls());
      noOfExecutionsCompleted_Aggregate += 2;
      // this check is time sensitive, so allow it to fail a few times
      // before giving up
      for (int i = 0; i < 10; i++) {
        try {
          assertEquals(noOfExecutionsCompleted_Aggregate,
              functionServiceStats.getFunctionExecutionsCompleted());
        } catch (RuntimeException r) {
          if (i == 9) {
            throw r;
          }
          try {
            Thread.sleep(1000);
          } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw r;
          }
        }
      }

      FunctionStats functionStats =
          FunctionStats.getFunctionStats(TestFunction.TEST_FUNCTION1, iDS);
      // TEST_FUNCTION1 is executed once
      noOfExecutionCalls_TESTFUNCTION1 += 1;
      assertEquals(noOfExecutionCalls_TESTFUNCTION1, functionStats.getFunctionExecutionCalls());
      noOfExecutionsCompleted_TESTFUNCTION1 += 1;
      assertEquals(noOfExecutionsCompleted_TESTFUNCTION1,
          functionStats.getFunctionExecutionsCompleted());

      functionStats = FunctionStats.getFunctionStats(TestFunction.TEST_FUNCTION5, iDS);
      // TEST_FUNCTION5 is executed once
      noOfExecutionCalls_TESTFUNCTION5 += 1;
      assertEquals(noOfExecutionCalls_TESTFUNCTION5, functionStats.getFunctionExecutionCalls());
      noOfExecutionsCompleted_TESTFUNCTION5 += 1;
      assertEquals(noOfExecutionsCompleted_TESTFUNCTION5,
          functionStats.getFunctionExecutionsCompleted());

    };

    server1.invoke(checkStatsOnServer);
    server2.invoke(checkStatsOnServer);
    server3.invoke(checkStatsOnServer);
  }

  @Test
  public void testP2PDummyExecutionStats() {
    Host host = Host.getHost(0);
    final VM datastore0 = host.getVM(0);
    final VM datastore1 = host.getVM(1);
    final VM datastore2 = host.getVM(2);
    final VM accessor = host.getVM(3);
    accessor.invoke(closeDistributedSystem);
    datastore0.invoke(closeDistributedSystem);
    datastore1.invoke(closeDistributedSystem);
    datastore2.invoke(closeDistributedSystem);
  }

  /**
   * Ensure that the execution is happening all the PR as a whole
   *
   * Function Execution will not take place on accessor, accessor will onlu receive the
   * resultsReceived. On datastore, no of function execution calls should be equal to the no of
   * function execution calls from the accessor.
   */
  @Test
  public void testP2PPartitionedRegionsFunctionExecutionStats() {
    final String rName = getUniqueName();
    Host host = Host.getHost(0);
    final VM datastore0 = host.getVM(0);
    final VM datastore1 = host.getVM(1);
    final VM datastore2 = host.getVM(2);
    final VM accessor = host.getVM(3);

    datastore0.invoke(initializeStats);
    datastore1.invoke(initializeStats);
    datastore2.invoke(initializeStats);
    accessor.invoke(initializeStats);

    accessor.invoke(() -> {
      RegionAttributes ra = PartitionedRegionTestHelper.createRegionAttrsForPR(0, 0);
      AttributesFactory raf = new AttributesFactory(ra);
      PartitionAttributesImpl pa = new PartitionAttributesImpl();
      pa.setAll(ra.getPartitionAttributes());
      pa.setTotalNumBuckets(17);
      raf.setPartitionAttributes(pa);

      getCache().createRegion(rName, raf.create());
    });

    SerializableRunnableIF dataStoreCreate = () -> {
      RegionAttributes ra = PartitionedRegionTestHelper.createRegionAttrsForPR(0, 10);
      AttributesFactory raf = new AttributesFactory(ra);
      PartitionAttributesImpl pa = new PartitionAttributesImpl();
      pa.setAll(ra.getPartitionAttributes());
      pa.setTotalNumBuckets(17);
      raf.setPartitionAttributes(pa);
      getCache().createRegion(rName, raf.create());
      Function function = new TestFunction(true, TestFunction.TEST_FUNCTION2);
      FunctionService.registerFunction(function);
      function = new TestFunction(true, TestFunction.TEST_FUNCTION3);
      FunctionService.registerFunction(function);
    };
    datastore0.invoke(dataStoreCreate);
    datastore1.invoke(dataStoreCreate);
    datastore2.invoke(dataStoreCreate);

    accessor.invoke(() -> {
      PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(rName);
      DistributedSystem.setThreadsSocketPolicy(false);
      final HashSet<String> testKeys = new HashSet<>();
      for (int i = (pr.getTotalNumberOfBuckets() * 3); i > 0; i--) {
        testKeys.add("execKey-" + i);
      }
      int j = 0;
      for (Object testKey : testKeys) {
        Integer val = j++;
        pr.put(testKey, val);
      }
      Function function = new TestFunction(true, TestFunction.TEST_FUNCTION2);
      FunctionService.registerFunction(function);
      Execution dataSet = FunctionService.onRegion(pr);
      ResultCollector rc1 = dataSet.setArguments(Boolean.TRUE).execute(function);
      int size = ((List) rc1.getResult()).size();
      resultReceived_Aggregate += size;
      resultReceived_TESTFUNCTION2 += size;

      rc1 = dataSet.setArguments(testKeys).execute(function);
      size = ((List) rc1.getResult()).size();
      resultReceived_Aggregate += size;
      resultReceived_TESTFUNCTION2 += size;

      function = new TestFunction(true, TestFunction.TEST_FUNCTION3);
      FunctionService.registerFunction(function);
      rc1 = dataSet.setArguments(Boolean.TRUE).execute(function);
      size = ((List) rc1.getResult()).size();
      resultReceived_Aggregate += size;
      resultReceived_TESTFUNCTION3 += size;
    });

    accessor.invoke(() -> {
      InternalDistributedSystem iDS =
          ((InternalDistributedSystem) getCache().getDistributedSystem());
      FunctionServiceStats functionServiceStats = iDS.getFunctionServiceStats();
      waitNoFunctionsRunning(functionServiceStats);

      assertEquals(noOfExecutionCalls_Aggregate,
          functionServiceStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_Aggregate,
          functionServiceStats.getFunctionExecutionsCompleted());
      assertEquals(resultReceived_Aggregate, functionServiceStats.getResultsReceived());

      FunctionStats functionStats =
          FunctionStats.getFunctionStats(TestFunction.TEST_FUNCTION2, iDS);
      assertEquals(noOfExecutionCalls_TESTFUNCTION2, functionStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_TESTFUNCTION2,
          functionStats.getFunctionExecutionsCompleted());
      assertEquals(resultReceived_TESTFUNCTION2, functionStats.getResultsReceived());

      functionStats = FunctionStats.getFunctionStats(TestFunction.TEST_FUNCTION3, iDS);
      assertEquals(noOfExecutionCalls_TESTFUNCTION3, functionStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_TESTFUNCTION3,
          functionStats.getFunctionExecutionsCompleted());
      assertEquals(resultReceived_TESTFUNCTION3, functionStats.getResultsReceived());
    });

    SerializableRunnableIF checkFunctionExecutionStatsForDataStore = () -> {
      InternalDistributedSystem iDS =
          ((InternalDistributedSystem) getCache().getDistributedSystem());
      // 3 Function Executions took place
      FunctionServiceStats functionServiceStats = iDS.getFunctionServiceStats();
      waitNoFunctionsRunning(functionServiceStats);

      noOfExecutionCalls_Aggregate += 3;
      noOfExecutionsCompleted_Aggregate += 3;
      assertEquals(noOfExecutionCalls_Aggregate,
          functionServiceStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_Aggregate,
          functionServiceStats.getFunctionExecutionsCompleted());

      FunctionStats functionStats =
          FunctionStats.getFunctionStats(TestFunction.TEST_FUNCTION2, iDS);
      // TEST_FUNCTION2 is executed twice
      noOfExecutionCalls_TESTFUNCTION2 += 2;
      assertEquals(noOfExecutionCalls_TESTFUNCTION2,
          functionStats.getFunctionExecutionCalls());
      noOfExecutionsCompleted_TESTFUNCTION2 += 2;
      assertEquals(noOfExecutionsCompleted_TESTFUNCTION2,
          functionStats.getFunctionExecutionsCompleted());

      functionStats = FunctionStats.getFunctionStats(TestFunction.TEST_FUNCTION3, iDS);
      // TEST_FUNCTION3 is executed once
      noOfExecutionCalls_TESTFUNCTION3 += 1;
      assertEquals(noOfExecutionCalls_TESTFUNCTION3,
          functionStats.getFunctionExecutionCalls());
      noOfExecutionsCompleted_TESTFUNCTION3 += 1;
      assertEquals(noOfExecutionsCompleted_TESTFUNCTION3,
          functionStats.getFunctionExecutionsCompleted());
    };
    datastore0.invoke(checkFunctionExecutionStatsForDataStore);
    datastore1.invoke(checkFunctionExecutionStatsForDataStore);
    datastore2.invoke(checkFunctionExecutionStatsForDataStore);

    accessor.invoke(closeDistributedSystem);
    datastore0.invoke(closeDistributedSystem);
    datastore1.invoke(closeDistributedSystem);
    datastore2.invoke(closeDistributedSystem);
  }

  /**
   * Test the function execution statistics in case of the distributed Region P2P DataStore0 is with
   * Empty datapolicy
   */
  @Test
  public void testP2PDistributedRegionFunctionExecutionStats() {
    final String rName = getUniqueName();
    Host host = Host.getHost(0);
    final VM datastore0 = host.getVM(0);
    final VM datastore1 = host.getVM(1);
    final VM datastore2 = host.getVM(2);
    final VM datastore3 = host.getVM(3);

    datastore0.invoke(initializeStats);
    datastore1.invoke(initializeStats);
    datastore2.invoke(initializeStats);
    datastore3.invoke(initializeStats);

    datastore0.invoke(() -> {
      AttributesFactory factory = new AttributesFactory();
      factory.setScope(Scope.DISTRIBUTED_ACK);
      factory.setDataPolicy(DataPolicy.EMPTY);
      Region<String, Integer> region = getCache().createRegion(rName, factory.create());
      logger.info("Region Created :" + region);
      assertNotNull(region);
      FunctionService.registerFunction(new TestFunction(true, TestFunction.TEST_FUNCTION2));
      for (int i = 1; i <= 200; i++) {
        region.put("execKey-" + i, i);
      }
    });

    SerializableRunnableIF createAndPopulateRegionWithReplicate = () -> {
      AttributesFactory factory = new AttributesFactory();
      factory.setScope(Scope.DISTRIBUTED_ACK);
      factory.setDataPolicy(DataPolicy.REPLICATE);
      Region<String, Integer> region = getCache().createRegion(rName, factory.create());
      logger.info("Region Created :" + region);
      assertNotNull(region);
      FunctionService.registerFunction(new TestFunction(true, TestFunction.TEST_FUNCTION2));
      for (int i = 1; i <= 200; i++) {
        region.put("execKey-" + i, i);
      }
    };

    datastore1.invoke(createAndPopulateRegionWithReplicate);
    datastore2.invoke(createAndPopulateRegionWithReplicate);
    datastore3.invoke(createAndPopulateRegionWithReplicate);

    datastore0.invoke(() -> {
      Region region = getCache().getRegion(rName);
      try {
        List list = (List) FunctionService.onRegion(region).setArguments(Boolean.TRUE)
            .execute(TestFunction.TEST_FUNCTION2).getResult();
        // this is the Distributed Region with Empty Data policy.
        // therefore no function execution takes place here. it only receives the results.
        resultReceived_Aggregate += list.size();
        assertEquals(resultReceived_Aggregate,
            ((InternalDistributedSystem) getCache().getDistributedSystem())
                .getFunctionServiceStats().getResultsReceived());

        resultReceived_TESTFUNCTION2 += list.size();
        assertEquals(resultReceived_TESTFUNCTION2,
            ((InternalDistributedSystem) getCache().getDistributedSystem())
                .getFunctionServiceStats().getResultsReceived());

      } catch (Exception e) {
        e.printStackTrace();
        Assert.fail("test failed due to", e);
      }
    });

    // there is a replicated region on 3 nodes so we cannot predict on which
    // node the function execution will take place
    // so i have avoided that check.

    datastore0.invoke(closeDistributedSystem);
    datastore1.invoke(closeDistributedSystem);
    datastore2.invoke(closeDistributedSystem);
    datastore3.invoke(closeDistributedSystem);
  }


  /**
   * Test the execution of function on all memebers haveResults = true
   *
   * member1 calls for the function executions sp the results received on memeber 1 should be equal
   * to the no of function execution calls. Function Execution should happen on all other members
   * too. so the no of function execution calls and no of function executions completed should be
   * equal tio the no of functions from member 1
   */
  @Test
  public void testP2PMembersFunctionExecutionStats() {
    Host host = Host.getHost(0);
    VM member1 = host.getVM(0);
    VM member2 = host.getVM(1);
    VM member3 = host.getVM(2);
    VM member4 = host.getVM(3);

    SerializableRunnableIF connectToDistributedSystem = () -> {
      Properties props = new Properties();
      try {
        ds = getSystem(props);
        assertNotNull(ds);
      } catch (Exception e) {
        Assert.fail("Failed while creating the Distribued System", e);
      }
    };
    member1.invoke(connectToDistributedSystem);
    member2.invoke(connectToDistributedSystem);
    member3.invoke(connectToDistributedSystem);
    member4.invoke(connectToDistributedSystem);

    member1.invoke(initializeStats);
    member2.invoke(initializeStats);
    member3.invoke(initializeStats);
    member4.invoke(initializeStats);

    final Function inlineFunction = new FunctionAdapter() {
      @Override
      public void execute(FunctionContext context) {
        @SuppressWarnings("unchecked")
        final ResultSender<String> resultSender = context.getResultSender();
        if (context.getArguments() instanceof String) {
          resultSender.lastResult("Success");
        } else {
          resultSender.lastResult("Failure");
        }
      }

      @Override
      public String getId() {
        return getClass().getName();
      }

      @Override
      public boolean hasResult() {
        return true;
      }
    };

    member1.invoke(() -> {

      assertNotNull(ds);
      DistributedMember localmember = ds.getDistributedMember();
      Execution memberExecution = FunctionService.onMember(localmember);

      memberExecution.setArguments("Key");
      try {
        ResultCollector rc = memberExecution.execute(inlineFunction);
        int size = ((List) rc.getResult()).size();
        resultReceived_Aggregate += size;
        noOfExecutionCalls_Aggregate++;
        noOfExecutionsCompleted_Aggregate++;
        resultReceived_Inline += size;
        noOfExecutionCalls_Inline++;
        noOfExecutionsCompleted_Inline++;

      } catch (Exception e) {
        logger.info("Exception Occurred : " + e.getMessage());
        e.printStackTrace();
        Assert.fail("Test failed", e);
      }
    });

    member1.invoke(() -> {
      FunctionServiceStats functionServiceStats = ds.getFunctionServiceStats();
      waitNoFunctionsRunning(functionServiceStats);

      assertEquals(noOfExecutionCalls_Aggregate,
          functionServiceStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_Aggregate,
          functionServiceStats.getFunctionExecutionsCompleted());
      assertEquals(resultReceived_Aggregate, functionServiceStats.getResultsReceived());

      FunctionStats functionStats = FunctionStats.getFunctionStats(inlineFunction.getId(), ds);
      assertEquals(noOfExecutionCalls_Inline, functionStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_Inline,
          functionStats.getFunctionExecutionsCompleted());
      assertEquals(resultReceived_Inline, functionStats.getResultsReceived());
    });

    SerializableRunnableIF checkFunctionExecutionStatsForOtherMember = () -> {
      FunctionServiceStats functionServiceStats = ds.getFunctionServiceStats();
      waitNoFunctionsRunning(functionServiceStats);

      // One function Execution took place on there members
      // noOfExecutionCalls_Aggregate++;
      // noOfExecutionsCompleted_Aggregate++;
      assertEquals(noOfExecutionCalls_Aggregate,
          functionServiceStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_Aggregate,
          functionServiceStats.getFunctionExecutionsCompleted());

      FunctionStats functionStats =
          FunctionStats.getFunctionStats(inlineFunction.getId(), ds);
      // noOfExecutionCalls_Inline++;
      // noOfExecutionsCompleted_Inline++;
      assertEquals(noOfExecutionCalls_Inline, functionStats.getFunctionExecutionCalls());
      assertEquals(noOfExecutionsCompleted_Inline,
          functionStats.getFunctionExecutionsCompleted());
    };
    member2.invoke(checkFunctionExecutionStatsForOtherMember);
    member3.invoke(checkFunctionExecutionStatsForOtherMember);
    member4.invoke(checkFunctionExecutionStatsForOtherMember);

    member1.invoke(closeDistributedSystem);
    member2.invoke(closeDistributedSystem);
    member3.invoke(closeDistributedSystem);
    member4.invoke(closeDistributedSystem);
  }

  @Override
  public Properties getDistributedSystemProperties() {
    Properties properties = super.getDistributedSystemProperties();
    properties.put(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
        "org.apache.geode.internal.cache.execute.MyFunctionExecutionException");
    return properties;
  }

  /**
   * Test the exception occurred while invoking the function execution on all members of DS
   *
   * Function throws the Exception, The check is added to for the no of function execution execption
   * in datatostore1
   */
  @Test
  public void testFunctionExecutionExceptionStatsOnAllNodesPRegion() {
    final String rName = getUniqueName();
    Host host = Host.getHost(0);
    final VM datastore0 = host.getVM(0);
    final VM datastore1 = host.getVM(1);
    final VM datastore2 = host.getVM(2);
    final VM datastore3 = host.getVM(3);

    datastore0.invoke(initializeStats);
    datastore1.invoke(initializeStats);
    datastore2.invoke(initializeStats);
    datastore3.invoke(initializeStats);

    SerializableRunnableIF dataStoreCreate = () -> {
      RegionAttributes ra = PartitionedRegionTestHelper.createRegionAttrsForPR(0, 10);
      AttributesFactory raf = new AttributesFactory(ra);
      PartitionAttributesImpl pa = new PartitionAttributesImpl();
      pa.setAll(ra.getPartitionAttributes());
      pa.setTotalNumBuckets(17);
      raf.setPartitionAttributes(pa);
      getCache().createRegion(rName, raf.create());
      Function function = new TestFunction(true, "TestFunctionException");
      FunctionService.registerFunction(function);
    };
    datastore0.invoke(dataStoreCreate);
    datastore1.invoke(dataStoreCreate);
    datastore2.invoke(dataStoreCreate);
    datastore3.invoke(dataStoreCreate);

    datastore3.invoke(() -> {
      PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(rName);
      DistributedSystem.setThreadsSocketPolicy(false);
      final HashSet<String> testKeys = new HashSet<>();
      for (int i = (pr.getTotalNumberOfBuckets() * 3); i > 0; i--) {
        testKeys.add("execKey-" + i);
      }
      int j = 0;
      for (Object testKey : testKeys) {
        Integer key = j++;
        pr.put(key, testKey);
      }
      try {
        Function function = new TestFunction(true, "TestFunctionException");
        FunctionService.registerFunction(function);
        Execution dataSet = FunctionService.onRegion(pr);
        ResultCollector rc = dataSet.setArguments(Boolean.TRUE).execute(function.getId());
        // Wait Criterion is added to make sure that the function execution
        // happens on all nodes and all nodes get the FunctionException so that the stats will get
        // incremented,
        Wait.pause(2000);
        rc.getResult();
        fail("No exception Occurred");
      } catch (Exception ignored) {
      }
    });

    datastore0.invoke(closeDistributedSystem);
    datastore1.invoke(closeDistributedSystem);
    datastore2.invoke(closeDistributedSystem);
    datastore3.invoke(closeDistributedSystem);
  }

  private void createScenario() {
    ArrayList commonAttributes =
        createCommonServerAttributes("TestPartitionedRegion", null, 0, null);
    createClientServerScenarion(commonAttributes, 20, 20, 20);
  }
}
