/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.execute;

import static org.apache.geode.test.dunit.Wait.pause;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;

import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.RegionFunctionContext;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.VM;

public abstract class FunctionServiceClientAccessorPRBase extends FunctionServiceClientBase {

  public static final String REGION = "region";
  protected transient Region<Object, Object> region;

  @Before
  public void createRegions() {
    ClientCache cache = createServersAndClient(numberOfExecutions());

    Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);

    for (int i = 0; i < numberOfExecutions(); i++) {
      VM vm = host.getVM(i);
      createRegion(vm);
    }

    vm0.invoke(() -> {
      Region region = getCache().getRegion(REGION);
      PartitionRegionHelper.assignBucketsToPartitions(region);
    });

    region = cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(REGION);
  }

  private void createRegion(final VM vm) {
    vm.invoke(() -> {
      getCache().createRegionFactory(RegionShortcut.PARTITION).create(REGION);
    });
  }

  @Override
  public Execution getExecution() {
    return FunctionService.onRegion(region);
  }



  /**
   * Test that a custom result collector will still receive all partial results from other members
   * when one source fails
   */
  @Test
  public void nonHAFunctionResultCollectorIsPassedPartialResultsAfterBucketMove() {
    List<InternalDistributedMember> members = getAllMembers();
    // Only run this test if there is more than two members
    Assume.assumeTrue(members.size() >= 2);

    final Iterator<InternalDistributedMember> iterator = members.iterator();
    InternalDistributedMember firstMember = iterator.next();
    InternalDistributedMember secondMember = iterator.next();

    // Execute a function which will close the cache on one source.
    try {
      ResultCollector rc = getExecution().withCollector(customCollector)
          .execute(new BucketMovingNonHAFunction(firstMember, secondMember));
      rc.getResult();
      fail("Should have thrown an exception");
    } catch (Exception expected) {
      // do nothing
    }
    assertEquals(new HashSet(members), new HashSet(customCollector.getResult()));
    assertEquals(numberOfExecutions(), customCollector.getResult().size());
  }

  /**
   * A function which will close the cache if the given source matches the source executing this
   * function
   */
  private class BucketMovingNonHAFunction implements Function {

    private final InternalDistributedMember source;
    private final InternalDistributedMember destination;

    public BucketMovingNonHAFunction(final InternalDistributedMember source,
        final InternalDistributedMember destination) {
      this.source = source;
      this.destination = destination;
    }

    @Override
    public void execute(FunctionContext context) {
      RegionFunctionContext regionFunctionContext = (RegionFunctionContext) context;
      final InternalDistributedMember myId =
          InternalDistributedSystem.getAnyInstance().getDistributedMember();
      // Move all buckets to the destination
      if (myId.equals(source)) {
        PartitionRegionHelper.moveData(regionFunctionContext.getDataSet(), source, destination,
            100);
      }
      pause(1000);
      context.getResultSender().lastResult(myId);
    }

    @Override
    public boolean isHA() {
      return false;
    }
  }
}
