/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.control;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.geode.cache.EvictionAction.OVERFLOW_TO_DISK;
import static org.apache.geode.cache.EvictionAttributes.createLRUEntryAttributes;
import static org.apache.geode.cache.RegionShortcut.PARTITION;
import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT;
import static org.apache.geode.cache.RegionShortcut.REPLICATE;
import static org.apache.geode.distributed.ConfigurationProperties.ENFORCE_UNIQUE_HOST;
import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
import static org.apache.geode.test.dunit.VM.getAllVMs;
import static org.apache.geode.test.dunit.VM.getController;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.apache.geode.test.dunit.VM.toArray;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.assertj.core.api.Assertions.within;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;

import java.io.File;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import junitparams.naming.TestCaseName;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheLoaderException;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.LoaderHelper;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.control.RebalanceOperation;
import org.apache.geode.cache.control.RebalanceResults;
import org.apache.geode.cache.control.ResourceManager;
import org.apache.geode.cache.partition.PartitionMemberInfo;
import org.apache.geode.cache.partition.PartitionRebalanceInfo;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.partition.PartitionRegionInfo;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.ColocationHelper;
import org.apache.geode.internal.cache.DiskStoreImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PRHARedundancyProvider;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
import org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.CacheTestCase;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;

/**
 * TODO test colocated regions where buckets aren't created for all subregions
 *
 * <p>
 * TODO test colocated regions where members aren't consistent in which regions they have
 */
@RunWith(JUnitParamsRunner.class)
@SuppressWarnings("serial")
public class RebalanceOperationDistributedTest extends CacheTestCase {

  private static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getValue();

  @Rule
  public DistributedRestoreSystemProperties restoreSystemProperties =
      new DistributedRestoreSystemProperties();

  @Rule
  public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();

  @After
  public void tearDown() {
    for (VM vm : toArray(getAllVMs(), getController())) {
      vm.invoke(() -> InternalResourceManager.setResourceObserver(null));
    }
  }

  @Test
  @Parameters({"true", "false"})
  @TestCaseName("{method}(simulate={0})")
  public void testRecoverRedundancy(boolean simulate) {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);

    // Create the region in only 1 VM
    vm0.invoke(() -> createPartitionedRegion("region1", 1));

    // Create some buckets
    vm0.invoke(() -> {
      Region<Number, String> region = getCache().getRegion("region1");
      region.put(1, "A");
      region.put(2, "A");
      region.put(3, "A");
      region.put(4, "A");
      region.put(5, "A");
      region.put(6, "A");
    });

    // make sure we can tell that the buckets have low redundancy
    vm0.invoke(() -> validateRedundancy("region1", 6, 0, 6));

    // Create the region in the other VM (should have no effect)
    vm1.invoke(() -> createPartitionedRegion("region1", 1));

    // Make sure we still have low redundancy
    vm0.invoke(() -> validateRedundancy("region1", 6, 0, 6));

    // Now simulate a rebalance
    vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(simulate, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(6);
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(3);
      assertThat(results.getTotalBucketTransferBytes()).isEqualTo(0);
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(6);
      assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(3);
      assertThat(details.getBucketTransferBytes()).isEqualTo(0);
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(0);

      Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
      assertThat(afterDetails).hasSize(2);

      for (PartitionMemberInfo memberDetails : afterDetails) {
        assertThat(memberDetails.getBucketCount()).isEqualTo(6);
        assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
      }

      if (!simulate) {
        validateStatistics(manager, results);
      }
    });

    if (simulate) {
      // Make sure the simulation didn't do anything
      vm0.invoke(() -> validateRedundancy("region1", 6, 0, 6));
    } else {
      vm0.invoke(() -> validateRedundancy("region1", 6, 1, 0));
      vm1.invoke(() -> validateRedundancy("region1", 6, 1, 0));
    }
  }

  @Test
  @Parameters({"true", "false"})
  @TestCaseName("{method}(simulate={0})")
  public void testEnforceIP(boolean simulate) {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);

    for (VM vm : toArray(vm0, vm1)) {
      vm.invoke(() -> {
        Properties props = new Properties();
        props.setProperty(ENFORCE_UNIQUE_HOST, "true");
        getCache(props);
      });
    }

    // Create the region in only 1 VM
    vm0.invoke(() -> createPartitionedRegion("region1", 1));

    // Create some buckets
    vm0.invoke(() -> {
      Region<Number, String> region = getCache().getRegion("region1");
      region.put(1, "A");
      region.put(2, "A");
      region.put(3, "A");
      region.put(4, "A");
      region.put(5, "A");
      region.put(6, "A");
    });

    // make sure we can tell that the buckets have low redundancy
    vm0.invoke(() -> validateRedundancy("region1", 6, 0, 6));

    // Create the region in the other VM (should have no effect)
    vm1.invoke(() -> createPartitionedRegion("region1", 1));

    // Make sure we still have low redundancy
    vm0.invoke(() -> validateRedundancy("region1", 6, 0, 6));

    // Now simulate a rebalance
    vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(simulate, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(0);
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(0);

      // We actually *will* transfer buckets, because that improves the balance
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(3);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(0);
      assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(0);
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(3);

      if (!simulate) {
        validateStatistics(manager, results);
      }
    });

    // Make sure we still have low redundancy
    vm0.invoke(() -> validateRedundancy("region1", 6, 0, 6));
    vm1.invoke(() -> validateRedundancy("region1", 6, 0, 6));
  }

  /**
   * Test that we correctly use the redundancy-zone property to determine where to place redundant
   * copies of a buckets.
   */
  @Test
  @Parameters({"true", "false"})
  @TestCaseName("{method}(simulate={0})")
  public void testEnforceZone(boolean simulate) {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);
    VM vm2 = getVM(2);

    vm0.invoke(() -> setRedundancyZone("A"));
    vm1.invoke(() -> setRedundancyZone("A"));

    DistributedMember zoneBMember = vm2.invoke(() -> {
      setRedundancyZone("B");
      return getCache().getDistributedSystem().getDistributedMember();
    });

    // Create the region in only 1 VM
    vm0.invoke(() -> createPartitionedRegion("region1", 1));

    // Create some buckets
    vm0.invoke(() -> {
      Region<Number, String> region = getCache().getRegion("region1");
      region.put(1, "A");
      region.put(2, "A");
      region.put(3, "A");
      region.put(4, "A");
      region.put(5, "A");
      region.put(6, "A");
    });

    // make sure we can tell that the buckets have low redundancy
    vm0.invoke(() -> validateRedundancy("region1", 6, 0, 6));

    // Create the region in the other VMs (should have no effect)
    vm1.invoke(() -> createPartitionedRegion("region1", 1));
    vm2.invoke(() -> createPartitionedRegion("region1", 1));

    // Make sure we still have low redundancy
    vm0.invoke(() -> validateRedundancy("region1", 6, 0, 6));

    // Now simulate a rebalance
    vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(simulate, manager);

      // We expect to satisfy redundancy with the zone B member
      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(6);

      // 2 primaries will go to vm2, leaving vm0 and vm1 with 2 primaries each
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(2);

      // We actually *will* transfer 3 buckets to the other member in zone A, because that
      // improves
      // the balance
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(3);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(6);
      assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(2);
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(3);

      Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
      for (PartitionMemberInfo info : afterDetails) {
        if (info.getDistributedMember().equals(zoneBMember)) {
          assertThat(info.getBucketCount()).isEqualTo(6);
        } else {
          assertThat(info.getBucketCount()).isEqualTo(3);
        }
        assertThat(info.getPrimaryCount()).isEqualTo(2);
      }

      // assertIndexDetailsEquals(0, details.getBucketTransferBytes());
      if (!simulate) {
        validateStatistics(manager, results);
      }
    });

    if (!simulate) {
      vm0.invoke(() -> validateBucketCount("region1", 3));
      vm1.invoke(() -> validateBucketCount("region1", 3));
      vm2.invoke(() -> validateBucketCount("region1", 6));
    }
  }

  @Test
  public void testEnforceZoneWithMultipleRegions() {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);
    VM vm2 = getVM(2);

    vm0.invoke(() -> setRedundancyZone("A"));
    vm1.invoke(() -> setRedundancyZone("A"));

    DistributedMember zoneBMember = vm2.invoke(() -> {
      setRedundancyZone("B");
      return getCache().getDistributedSystem().getDistributedMember();
    });

    vm0.invoke(() -> InternalResourceManager.setResourceObserver(new ParallelRecoveryObserver(2)));

    // Create the region in only 1 VM
    vm0.invoke(() -> createTwoRegionsWithParallelRecoveryObserver());

    // Create some buckets
    vm0.invoke(() -> {
      doPuts("region1");
      doPuts("region2");
    });

    // make sure we can tell that the buckets have low redundancy
    vm0.invoke(() -> validateRedundancyOfTwoRegions("region1", "region2", 6, 0, 6));

    // Create the region in the other VMs (should have no effect)
    for (VM vm : toArray(vm1, vm2)) {
      vm.invoke(() -> {
        InternalResourceManager.setResourceObserver(new ParallelRecoveryObserver(2));
        createTwoRegionsWithParallelRecoveryObserver();
      });
    }

    // Make sure we still have low redundancy
    vm0.invoke(() -> validateRedundancyOfTwoRegions("region1", "region2", 6, 0, 6));

    // Now do a rebalance
    vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(false, manager);

      // We expect to satisfy redundancy with the zone B member
      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(12);

      // 2 primaries will go to vm2, leaving vm0 and vm1 with 2 primaries each
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(4);

      // We actually *will* transfer 3 buckets to the other member in zone A, because that
      // improves the balance
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(6);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(2);

      for (PartitionRebalanceInfo details : detailSet) {
        assertThat(details.getBucketCreatesCompleted()).isEqualTo(6);
        assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(2);
        assertThat(details.getBucketTransfersCompleted()).isEqualTo(3);

        Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
        for (PartitionMemberInfo info : afterDetails) {
          if (info.getDistributedMember().equals(zoneBMember)) {
            assertThat(info.getBucketCount()).isEqualTo(6);
          } else {
            assertThat(info.getBucketCount()).isEqualTo(3);
          }
          assertThat(info.getPrimaryCount()).isEqualTo(2);
        }
      }

      validateStatistics(manager, results);
    });

    vm0.invoke(() -> {
      assertThat(((ParallelRecoveryObserver) InternalResourceManager.getResourceObserver())
          .isObserverCalled()).isTrue();
    });

    vm0.invoke(() -> validateBucketCount("region1", 3));
    vm1.invoke(() -> validateBucketCount("region1", 3));
    vm2.invoke(() -> validateBucketCount("region1", 6));

    vm0.invoke(() -> validateBucketCount("region2", 3));
    vm1.invoke(() -> validateBucketCount("region2", 3));
    vm2.invoke(() -> validateBucketCount("region2", 6));
  }

  @Test
  @Parameters({"true", "false"})
  @TestCaseName("{method}(simulate={0})")
  public void testRecoverRedundancyBalancing(boolean simulate) {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);
    VM vm2 = getVM(2);

    DistributedMember member1 = vm0.invoke(() -> {
      createPartitionedRegion("region1", 1, 200);
      return getCache().getDistributedSystem().getDistributedMember();
    });

    vm0.invoke(() -> {
      Region<Number, String> region = getCache().getRegion("region1");
      for (int i = 0; i < 12; i++) {
        region.put(i, "A");
      }
    });

    vm0.invoke(() -> validateRedundancy("region1", 12, 0, 12));

    // Now create the region in 2 more VMs with half the localMaxMemory
    vm1.invoke(() -> createPartitionedRegion("region1", 1, 100));
    vm2.invoke(() -> createPartitionedRegion("region1", 1, 100));

    vm0.invoke(() -> validateRedundancy("region1", 12, 0, 12));

    // Now simulate a rebalance
    vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(simulate, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(12);
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(6);
      assertThat(results.getTotalBucketTransferBytes()).isEqualTo(0);
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(12);
      assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(6);
      assertThat(details.getBucketTransferBytes()).isEqualTo(0);
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(0);

      Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
      assertThat(afterDetails).hasSize(3);

      for (PartitionMemberInfo memberDetails : afterDetails) {
        // We have 1 member with a size of 200 and two members with size 100
        if (memberDetails.getDistributedMember().equals(member1)) {
          assertThat(memberDetails.getBucketCount()).isEqualTo(12);
          assertThat(memberDetails.getPrimaryCount()).isEqualTo(6);
        } else {
          assertThat(memberDetails.getBucketCount()).isEqualTo(6);
          assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
        }
      }

      if (!simulate) {
        validateStatistics(manager, results);
      }
    });

    if (!simulate) {
      for (VM vm : toArray(vm0, vm1, vm2)) {
        vm.invoke(() -> validateRedundancy("region1", 12, 1, 0));
      }
    }
  }

  @Test
  public void testRecoverRedundancyBalancingIfCreateBucketFails() {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);
    VM vm2 = getVM(2);

    DistributedMember member1 = vm0.invoke(() -> {
      createPartitionedRegion("region1", 1, 100);
      return getCache().getDistributedSystem().getDistributedMember();
    });

    vm0.invoke(() -> {
      Region<Number, String> region = getCache().getRegion("region1");
      for (int i = 0; i < 1; i++) {
        region.put(i, "A");
      }
    });

    vm0.invoke(() -> validateRedundancy("region1", 1, 0, 1));

    // Now create the region in 2 more VMs
    // Let localMaxMemory(VM1) > localMaxMemory(VM2)
    // so that redundant bucket will always be attempted on VM1
    DistributedMember member2 = vm1.invoke(() -> {
      createPartitionedRegion("region1", 1, 100);
      return getCache().getDistributedSystem().getDistributedMember();
    });
    DistributedMember member3 = vm2.invoke(() -> {
      createPartitionedRegion("region1", 1, 90);
      return getCache().getDistributedSystem().getDistributedMember();
    });

    vm0.invoke(() -> validateRedundancy("region1", 1, 0, 1));

    // Inject mock PRHARedundancyProvider to simulate createBucketFailures
    vm0.invoke(() -> {
      InternalCache cache = spy(getCache());

      PartitionedRegion origRegion = (PartitionedRegion) cache.getRegion("region1");
      PartitionedRegion spyRegion = spy(origRegion);
      PRHARedundancyProvider redundancyProvider =
          spy(new PRHARedundancyProvider(spyRegion, mock(InternalResourceManager.class)));

      // return the spied region when ever getPartitionedRegions() is invoked
      Set<PartitionedRegion> parRegions = cache.getPartitionedRegions();
      parRegions.remove(origRegion);
      parRegions.add(spyRegion);

      doReturn(parRegions).when(cache).getPartitionedRegions();
      doReturn(redundancyProvider).when(spyRegion).getRedundancyProvider();

      // simulate create bucket fails on member2 and test if it creates on member3
      doReturn(false).when(redundancyProvider).createBackupBucketOnMember(anyInt(),
          eq((InternalDistributedMember) member2), anyBoolean(), anyBoolean(), any(),
          anyBoolean());

      // Now simulate a rebalance
      // Create operationImpl and not factory as we need spied cache to be passed to operationImpl
      RegionFilter filter = new FilterByPath(null, null);
      RebalanceOperationImpl operation = new RebalanceOperationImpl(cache, false, filter);
      operation.start();

      RebalanceResults results = operation.getResults(TIMEOUT_SECONDS, SECONDS);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(1);
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(0);
      assertThat(results.getTotalBucketTransferBytes()).isEqualTo(0);
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(1);
      assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(0);
      assertThat(details.getBucketTransferBytes()).isEqualTo(0);
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(0);

      Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
      assertThat(afterDetails).hasSize(3);
      for (PartitionMemberInfo memberDetails : afterDetails) {
        if (memberDetails.getDistributedMember().equals(member1)) {
          assertThat(memberDetails.getBucketCount()).isEqualTo(1);
          assertThat(memberDetails.getPrimaryCount()).isEqualTo(1);
        } else if (memberDetails.getDistributedMember().equals(member2)) {
          assertThat(memberDetails.getBucketCount()).isEqualTo(0);
          assertThat(memberDetails.getPrimaryCount()).isEqualTo(0);
        } else if (memberDetails.getDistributedMember().equals(member3)) {
          assertThat(memberDetails.getBucketCount()).isEqualTo(1);
          assertThat(memberDetails.getPrimaryCount()).isEqualTo(0);
        }
      }

      ResourceManagerStats stats = cache.getInternalResourceManager().getStats();

      assertThat(stats.getRebalancesInProgress()).isEqualTo(0);
      assertThat(stats.getRebalancesCompleted()).isEqualTo(1);
      assertThat(stats.getRebalanceBucketCreatesInProgress()).isEqualTo(0);
      assertThat(stats.getRebalanceBucketCreatesCompleted())
          .isEqualTo(results.getTotalBucketCreatesCompleted());
      assertThat(stats.getRebalanceBucketCreatesFailed()).isEqualTo(1);
    });

    for (VM vm : toArray(vm0, vm1, vm2)) {
      vm.invoke(() -> validateRedundancy("region1", 1, 1, 0));
    }
  }

  @Test
  @Parameters({"true", "false"})
  @TestCaseName("{method}(simulate={0})")
  public void testRecoverRedundancyColocatedRegions(boolean simulate) {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);
    VM vm2 = getVM(2);

    DistributedMember member1 = vm0.invoke(() -> {
      createPartitionedRegion("region1", 1, 200);
      return getCache().getDistributedSystem().getDistributedMember();
    });

    vm0.invoke(() -> createPartitionedRegion("region2", 200, "region1"));

    // Create some buckets.
    vm0.invoke(() -> {
      Region<Number, String> region = getCache().getRegion("region1");
      Region<Number, String> region2 = getCache().getRegion("region2");
      for (int i = 0; i < 12; i++) {
        region.put(i, "A");
        region2.put(i, "A");
      }
    });

    // check to make sure our redundancy is impaired
    vm0.invoke(() -> validateRedundancyOfTwoRegions("region1", "region2", 12, 0, 12));

    // Now create the region in 2 more vms, each which
    // has local max memory of 1/2 that of the original VM.
    vm1.invoke(() -> createPartitionedRegion("region1", 1, 100));
    vm2.invoke(() -> createPartitionedRegion("region1", 1, 100));
    vm1.invoke(() -> createPartitionedRegion("region2", 100, "region1"));
    vm2.invoke(() -> createPartitionedRegion("region2", 100, "region1"));

    vm0.invoke(() -> validateRedundancyOfTwoRegions("region1", "region2", 12, 0, 12));

    // Now simulate a rebalance
    vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(simulate, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(24);
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(12);
      assertThat(results.getTotalBucketTransferBytes()).isEqualTo(0);
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(2);

      for (PartitionRebalanceInfo details : detailSet) {
        assertThat(details.getBucketCreatesCompleted()).isEqualTo(12);
        assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(6);
        assertThat(details.getBucketTransferBytes()).isEqualTo(0);
        assertThat(details.getBucketTransfersCompleted()).isEqualTo(0);

        Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
        assertThat(afterDetails).hasSize(3);

        for (PartitionMemberInfo memberDetails : afterDetails) {
          if (memberDetails.getDistributedMember().equals(member1)) {
            assertThat(memberDetails.getBucketCount()).isEqualTo(12);
            assertThat(memberDetails.getPrimaryCount()).isEqualTo(6);
          } else {
            assertThat(memberDetails.getBucketCount()).isEqualTo(6);
            assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
          }
        }
        if (!simulate) {
          validateStatistics(manager, results);
        }
      }
    });

    if (!simulate) {
      for (VM vm : toArray(vm0, vm1, vm2)) {
        vm.invoke(() -> {
          PartitionedRegion region1 = (PartitionedRegion) getCache().getRegion("region1");
          PartitionedRegion region2 = (PartitionedRegion) getCache().getRegion("region2");

          PartitionRegionInfo details =
              PartitionRegionHelper.getPartitionRegionInfo(getCache().getRegion("region1"));
          assertThat(details.getCreatedBucketCount()).isEqualTo(12);
          assertThat(details.getActualRedundantCopies()).isEqualTo(1);
          assertThat(details.getLowRedundancyBucketCount()).isEqualTo(0);

          details = PartitionRegionHelper.getPartitionRegionInfo(getCache().getRegion("region2"));
          assertThat(details.getCreatedBucketCount()).isEqualTo(12);
          assertThat(details.getActualRedundantCopies()).isEqualTo(1);
          assertThat(details.getLowRedundancyBucketCount()).isEqualTo(0);

          assertThat(region2.getLocalPrimaryBucketsListTestOnly())
              .isEqualTo(region1.getLocalPrimaryBucketsListTestOnly());

          assertThat(region2.getLocalBucketsListTestOnly())
              .isEqualTo(region1.getLocalBucketsListTestOnly());
        });
      }
    }
  }

  @Test
  @Parameters({"true", "false"})
  @TestCaseName("{method}(simulate={0})")
  public void testRecoverRedundancyParallelAsyncEventQueue(boolean simulate)
      throws SecurityException {
    if (simulate) {
      invokeInEveryVM(
          () -> System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "LOG_REBALANCE", "true"));
    }

    VM vm0 = getVM(0);
    VM vm1 = getVM(1);
    VM vm2 = getVM(2);

    DistributedMember member1 = vm0.invoke(() -> {
      createPRRegionWithAsyncQueue(200);
      return getCache().getDistributedSystem().getDistributedMember();
    });

    // Create some buckets. Put enough data to cause the queue to overflow (more than 1 MB)
    vm0.invoke(() -> {
      Region<Number, String> region = ((Cache) getCache()).getRegion("region1");
      for (int i = 0; i < 12; i++) {
        region.put(i, "A", new byte[1024 * 512]);
      }

      // The async event queue uses asynchronous writes.
      // Flush the default disk store to make sure all values have overflowed
      getCache().findDiskStore(null).flush();
    });

    // check to make sure our redundancy is impaired

    vm0.invoke(() -> {
      Region region1 = getCache().getRegion("region1");
      PartitionedRegion region2 =
          ColocationHelper.getColocatedChildRegions((PartitionedRegion) region1).get(0);

      validateRedundancyOfTwoRegions(region1, region2, 12, 0, 12);

      assertThat(getCache().getAsyncEventQueue("parallelQueue").size()).isEqualTo(12);
    });

    // Create the region on two more members, each with 1/2 of the memory
    vm1.invoke(() -> createPRRegionWithAsyncQueue(100));
    vm2.invoke(() -> createPRRegionWithAsyncQueue(100));

    vm0.invoke(() -> {
      Region region1 = getCache().getRegion("region1");
      PartitionedRegion region2 =
          ColocationHelper.getColocatedChildRegions((PartitionedRegion) region1).get(0);

      validateRedundancyOfTwoRegions(region1, region2, 12, 0, 12);

      assertThat(getCache().getAsyncEventQueue("parallelQueue").size()).isEqualTo(12);
    });

    // Now simulate a rebalance
    vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(simulate, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(24);
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(12);
      assertThat(results.getTotalBucketTransferBytes()).isEqualTo(0);
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(2);

      for (PartitionRebalanceInfo details : detailSet) {
        assertThat(details.getBucketCreatesCompleted()).isEqualTo(12);
        assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(6);
        assertThat(details.getBucketTransferBytes()).isEqualTo(0);
        assertThat(details.getBucketTransfersCompleted()).isEqualTo(0);

        Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
        assertThat(afterDetails).hasSize(3);

        for (PartitionMemberInfo memberDetails : afterDetails) {
          if (memberDetails.getDistributedMember().equals(member1)) {
            assertThat(memberDetails.getBucketCount()).isEqualTo(12);
            assertThat(memberDetails.getPrimaryCount()).isEqualTo(6);
          } else {
            assertThat(memberDetails.getBucketCount()).isEqualTo(6);
            assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
          }
        }

        if (!simulate) {
          validateStatistics(manager, results);
        }
      }
    });

    if (!simulate) {
      for (VM vm : toArray(vm0, vm1, vm2)) {
        vm.invoke(() -> {
          PartitionedRegion region1 = (PartitionedRegion) getCache().getRegion("region1");
          // Get the async event queue region (It's a colocated region)
          PartitionedRegion region2 = ColocationHelper.getColocatedChildRegions(region1).get(0);

          validateRedundancyOfTwoRegions(region1, region2, 12, 1, 0);

          assertThat(region2.getLocalPrimaryBucketsListTestOnly())
              .isEqualTo(region1.getLocalPrimaryBucketsListTestOnly());

          assertThat(region2.getLocalBucketsListTestOnly())
              .isEqualTo(region1.getLocalBucketsListTestOnly());
        });
      }
    }
  }

  @Test
  public void testCancelOperation() {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);

    // Create the region in only 1 VM
    vm0.invoke(() -> createPartitionedRegion("region1", 1));

    // Create some buckets
    vm0.invoke(() -> {
      Region<Number, String> region = ((Cache) getCache()).getRegion("region1");
      region.put(1, "A");
    });

    // make sure we can tell that the buckets have low redundancy
    vm0.invoke(() -> validateRedundancy("region1", 1, 0, 1));

    // Create the region in the other VM (should have no effect)
    vm1.invoke(() -> createPartitionedRegion("region1", 1));

    // Make sure we still have low redundancy
    vm0.invoke(() -> validateRedundancy("region1", 1, 0, 1));

    // Now do a rebalance, but cancel it in the middle
    vm0.invoke(() -> {
      CountDownLatch rebalancingCancelled = new CountDownLatch(1);
      CountDownLatch rebalancingFinished = new CountDownLatch(1);

      InternalResourceManager manager = getCache().getInternalResourceManager();

      InternalResourceManager.setResourceObserver(new ResourceObserverAdapter() {

        @Override
        public void rebalancingOrRecoveryStarted(Region region) {
          try {
            rebalancingCancelled.await();
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        }

        @Override
        public void rebalancingOrRecoveryFinished(Region region) {
          rebalancingFinished.countDown();
        }
      });

      RebalanceOperation op = manager.createRebalanceFactory().start();
      assertThat(op.isCancelled()).isFalse();
      assertThat(op.isDone()).isFalse();
      assertThat(manager.getRebalanceOperations()).isEqualTo(Collections.singleton(op));

      Throwable thrown = catchThrowable(() -> op.getResults(5, SECONDS));
      assertThat(thrown).isInstanceOf(TimeoutException.class);

      boolean result = op.cancel();
      assertThat(result).isTrue();

      rebalancingCancelled.countDown();

      assertThat(op.isCancelled()).isTrue();
      assertThat(op.isDone()).isTrue();

      rebalancingFinished.await();

      thrown = catchThrowable(() -> op.getResults(60, SECONDS));
      assertThat(thrown).isInstanceOf(CancellationException.class);

      assertThat(manager.getRebalanceOperations()).isEqualTo(Collections.emptySet());
    });

    // We should still have low redundancy, because the rebalancing was cancelled
    vm0.invoke(() -> validateRedundancy("region1", 1, 0, 1));
  }

  /**
   * Test that the rebalancing operation picks up on a concurrent membership change
   */
  @Test
  public void testMembershipChange() {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);
    VM vm2 = getVM(2);

    // Create the region in only 1 VM
    vm0.invoke(() -> createPartitionedRegion("region1", 0));

    // Create some buckets
    vm0.invoke(() -> {
      Region<Number, String> region = getCache().getRegion("region1");
      region.put(1, "A");
      region.put(2, "A");
      region.put(3, "A");
      region.put(4, "A");
      region.put(5, "A");
      region.put(6, "A");
    });

    // Create the region in the other VM (should have no effect)
    vm1.invoke(() -> createPartitionedRegion("region1", 0));

    // Now do a rebalance, but start another member in the middle
    vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();

      InternalResourceManager.setResourceObserver(new ResourceObserverAdapter() {
        private volatile boolean firstBucket = true;

        @Override
        public void movingBucket(Region region, int bucketId, DistributedMember source,
            DistributedMember target) {
          if (firstBucket) {
            firstBucket = false;
            // NOTE: do not replace createPartitionedRegionRunnable with lambda
            // because ResourceObserverAdapter is NOT serializable
            vm2.invoke(createPartitionedRegionRunnable("region1", 0));
          }
        }
      });

      RebalanceResults results = doRebalance(false, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(0);
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(0);
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(4);
      assertThat(results.getTotalBucketTransferBytes()).isGreaterThan(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(0);
      assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(0);
      assertThat(details.getBucketTransferBytes()).isGreaterThan(0);
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(4);

      Set<PartitionMemberInfo> beforeDetails = details.getPartitionMemberDetailsBefore();
      // there should have only been 2 members when the rebalancing started.
      assertThat(beforeDetails).hasSize(2);

      // if it was done, there should now be 3 members.
      Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
      assertThat(afterDetails).hasSize(3);

      for (PartitionMemberInfo memberDetails : afterDetails) {
        assertThat(memberDetails.getBucketCount()).isEqualTo(2);
        assertThat(memberDetails.getPrimaryCount()).isEqualTo(2);
      }

      validateStatistics(manager, results);

      ResourceManagerStats stats = manager.getStats();
      assertThat(stats.getRebalanceMembershipChanges()).isEqualTo(1);
    });
  }

  /**
   * Check to make sure that we balance buckets between two hosts with no redundancy.
   */
  @Test
  @Parameters({"true", "false"})
  @TestCaseName("{method}(simulate={0})")
  public void testMoveBucketsNoRedundancy(boolean simulate) {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);

    // Create the region in only 1 VM
    vm0.invoke(() -> createPartitionedRegion("region1", new NullReturningLoader()));

    // Create some buckets
    vm0.invoke(() -> {
      Region<Number, String> region = getCache().getRegion("region1");
      region.put(1, "A");
      region.put(2, "A");
      region.put(3, "A");
      region.put(4, "A");
      region.put(5, "A");
      region.put(6, "A");
    });

    // Create the region in the other VM (should have no effect)
    vm1.invoke(() -> createPartitionedRegion("region1", new NullReturningLoader()));

    // Now simulate a rebalance
    vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(simulate, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(0);
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(0);
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(3);
      assertThat(results.getTotalBucketTransferBytes()).isGreaterThan(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(0);
      assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(0);
      assertThat(details.getBucketTransferBytes()).isGreaterThan(0);
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(3);

      Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
      assertThat(afterDetails).hasSize(2);

      for (PartitionMemberInfo memberDetails : afterDetails) {
        assertThat(memberDetails.getBucketCount()).isEqualTo(3);
        assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
      }

      if (!simulate) {
        validateStatistics(manager, results);
      }
    });

    if (!simulate) {
      for (VM vm : toArray(vm0, vm1)) {
        vm.invoke(() -> {
          Region region = getCache().getRegion("region1");

          PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
          assertThat(details.getCreatedBucketCount()).isEqualTo(6);
          assertThat(details.getActualRedundantCopies()).isEqualTo(0);
          assertThat(details.getLowRedundancyBucketCount()).isEqualTo(0);
          assertThat(details.getPartitionMemberInfo()).hasSize(2);

          for (PartitionMemberInfo memberDetails : details.getPartitionMemberInfo()) {
            assertThat(memberDetails.getBucketCount()).isEqualTo(3);
            assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
          }

          // check to make sure that moving buckets didn't close the cache loader
          NullReturningLoader loader =
              (NullReturningLoader) getCache().getRegion("region1").getAttributes()
                  .getCacheLoader();
          assertThat(loader.isClosed()).isFalse();
        });
      }

      for (VM vm : toArray(vm0, vm1)) {
        vm.invoke(() -> {
          NullReturningLoader loader =
              (NullReturningLoader) getCache().getRegion("region1").getAttributes()
                  .getCacheLoader();
          assertThat(loader.isClosed()).isFalse();
          // check to make sure that closing the PR closes the cache loader
          getCache().getRegion("region1").close();
          assertThat(loader.isClosed()).isTrue();
        });
      }
    }
  }

  /**
   * Check to make sure that we balance buckets between two hosts with no redundancy.
   */
  @Test
  @Parameters({"true", "false"})
  @TestCaseName("{method}(simulate={0})")
  public void testFilterRegions(boolean simulate) {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);

    Set<String> included = new HashSet<>();
    included.add("region0");
    included.add("region1");

    Set<String> excluded = new HashSet<>();
    excluded.add("region0");
    excluded.add("region3");

    Set<String> expectedRebalanced = new HashSet<>();
    expectedRebalanced.add("/region0");
    expectedRebalanced.add("/region1");

    int numRegions = 4;

    // Create the region in only 1 VM
    vm0.invoke(() -> {
      for (int whichRegion = 0; whichRegion < numRegions; whichRegion++) {
        createPartitionedRegion("region" + whichRegion, 0);
      }
    });

    // Create some buckets
    vm0.invoke(() -> {
      for (int whichRegion = 0; whichRegion < numRegions; whichRegion++) {
        Region<Number, String> region = getCache().getRegion("region" + whichRegion);
        for (int key = 0; key < 6; key++) {
          region.put(key, "A");
        }
      }
    });

    // Create the region in the other VM (should have no effect)
    vm1.invoke(() -> {
      for (int whichRegion = 0; whichRegion < numRegions; whichRegion++) {
        createPartitionedRegion("region" + whichRegion, 0);
      }
    });

    // Now simulate a rebalance
    vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();

      RebalanceResults results = doRebalance(simulate, manager, included, excluded);
      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      Set<String> names = new HashSet<>();

      for (PartitionRebalanceInfo details : detailSet) {
        assertThat(details.getBucketCreatesCompleted()).isEqualTo(0);
        assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(0);
        assertThat(details.getBucketTransferBytes()).isGreaterThan(0);
        assertThat(details.getBucketTransfersCompleted()).isEqualTo(3);

        names.add(details.getRegionPath());

        Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
        assertThat(afterDetails).hasSize(2);

        for (PartitionMemberInfo memberDetails : afterDetails) {
          assertThat(memberDetails.getBucketCount()).isEqualTo(3);
          assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
        }
      }

      assertThat(names).isEqualTo(expectedRebalanced);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(0);
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(0);
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(6);
      assertThat(results.getTotalBucketTransferBytes()).isGreaterThan(0);

      if (!simulate) {
        validateStatistics(manager, results);
      }
    });

    if (!simulate) {
      for (VM vm : toArray(vm0, vm1)) {
        vm.invoke(() -> {
          for (String name : expectedRebalanced) {
            Region region = getCache().getRegion(name);

            PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
            assertThat(details.getCreatedBucketCount()).isEqualTo(6);
            assertThat(details.getActualRedundantCopies()).isEqualTo(0);
            assertThat(details.getLowRedundancyBucketCount()).isEqualTo(0);
            assertThat(details.getPartitionMemberInfo()).hasSize(2);

            for (PartitionMemberInfo memberDetails : details.getPartitionMemberInfo()) {
              assertThat(memberDetails.getBucketCount()).isEqualTo(3);
              assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
            }
          }

          Region region = getCache().getRegion("region2");

          PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
          assertThat(details.getCreatedBucketCount()).isEqualTo(6);
          assertThat(details.getActualRedundantCopies()).isEqualTo(0);
          assertThat(details.getLowRedundancyBucketCount()).isEqualTo(0);
          assertThat(details.getPartitionMemberInfo()).hasSize(2);

          for (PartitionMemberInfo memberDetails : details.getPartitionMemberInfo()) {
            int bucketCount = memberDetails.getBucketCount();
            int primaryCount = memberDetails.getPrimaryCount();
            assertThat(
                bucketCount == 6 && primaryCount == 6 || bucketCount == 0 && primaryCount == 0)
                    .as("Wrong number of buckets on non rebalanced region buckets=" + bucketCount
                        + " primaries=" + primaryCount)
                    .isTrue();
          }
        });
      }
    }
  }

  @Test
  @Parameters({"true", "false"})
  @TestCaseName("{method}(simulate={0})")
  public void testMoveBucketsWithRedundancy(boolean simulate) {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);
    VM vm2 = getVM(2);

    // Create the region in two VMs
    vm0.invoke(() -> createPartitionedRegion("region1", 1));
    vm1.invoke(() -> createPartitionedRegion("region1", 1));

    // Create some buckets
    vm0.invoke(() -> {
      Region<Number, String> region = getCache().getRegion("region1");
      for (int i = 0; i < 12; i++) {
        region.put(i, "A");
      }
    });

    // Create the region in one more VM.
    vm2.invoke(() -> createPartitionedRegion("region1", 1));

    // Now simulate a rebalance
    long totalSize = vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(simulate, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(0);

      // We don't know how many primaries will move, it depends on
      // if the move bucket code moves the primary or a redundant bucket
      // assertIndexDetailsEquals(0, results.getTotalPrimaryTransfersCompleted());
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(8);
      assertThat(results.getTotalBucketTransferBytes()).isGreaterThan(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(0);
      assertThat(details.getBucketTransferBytes()).isGreaterThan(0);
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(8);

      long value = 0;
      Set<PartitionMemberInfo> beforeDetails = details.getPartitionMemberDetailsAfter();
      for (PartitionMemberInfo memberDetails : beforeDetails) {
        value += memberDetails.getSize();
      }

      long afterSize = 0;
      Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
      assertThat(afterDetails).hasSize(3);

      for (PartitionMemberInfo memberDetails : afterDetails) {
        assertThat(memberDetails.getBucketCount()).isEqualTo(8);
        assertThat(memberDetails.getPrimaryCount()).isEqualTo(4);
        afterSize += memberDetails.getSize();
      }

      assertThat(afterSize).isEqualTo(value);

      if (!simulate) {
        validateStatistics(manager, results);
      }

      return value;
    });

    if (!simulate) {
      for (VM vm : toArray(vm0, vm1, vm2)) {
        vm.invoke(() -> {
          Region region = getCache().getRegion("region1");

          PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
          assertThat(details.getCreatedBucketCount()).isEqualTo(12);
          assertThat(details.getActualRedundantCopies()).isEqualTo(1);
          assertThat(details.getLowRedundancyBucketCount()).isEqualTo(0);

          long afterSize = 0;
          for (PartitionMemberInfo memberDetails : details.getPartitionMemberInfo()) {
            assertThat(memberDetails.getBucketCount()).isEqualTo(8);
            assertThat(memberDetails.getPrimaryCount()).isEqualTo(4);
            afterSize += memberDetails.getSize();
          }

          assertThat(afterSize).isEqualTo(totalSize);
        });
      }
    }
  }

  /**
   * Verifies when overflowing entries to disk that the stats are correct and we still rebalance
   * correctly
   */
  @Test
  public void testMoveBucketsOverflowToDisk() {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);
    VM vm2 = getVM(2);
    VM vm3 = getVM(3);

    // Create the region in two VMs
    vm0.invoke(
        () -> createPartitionedRegion("region1", createLRUEntryAttributes(1, OVERFLOW_TO_DISK)));
    vm1.invoke(
        () -> createPartitionedRegion("region1", createLRUEntryAttributes(1, OVERFLOW_TO_DISK)));

    // Create some buckets
    vm0.invoke(() -> {
      Region<Number, String> region = ((Cache) getCache()).getRegion("region1");
      for (int bucket = 0; bucket < 12; bucket++) {
        Map<Number, String> map = new HashMap<>();
        for (int key = 0; key < 200; key++) {
          map.put(bucket + 113 * key, "A");
        }
        region.putAll(map);
      }
    });

    // Do some puts and gets, to trigger eviction

    // Do some operations
    vm0.invoke(() -> {
      Region<Number, String> region = getCache().getRegion("region1");

      Random random = new Random();

      for (int count = 0; count < 5000; count++) {
        int bucket = count % 12;
        int key = random.nextInt(20);
        region.put(bucket + 113 * key, "B");
      }

      for (int count = 0; count < 500; count++) {
        int bucket = count % 12;
        int key = random.nextInt(20);
        region.get(bucket + 113 * key);
      }
    });

    // Create the region in one more VM.
    vm2.invoke(
        () -> createPartitionedRegion("region1", createLRUEntryAttributes(1, OVERFLOW_TO_DISK)));

    // Now do a rebalance
    vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(false, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(0);

      // We don't know how many primaries will move, it depends on
      // if the move bucket code moves the primary or a redundant bucket
      // assertIndexDetailsEquals(0, results.getTotalPrimaryTransfersCompleted());
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(8);
      assertThat(results.getTotalBucketTransferBytes()).isGreaterThan(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(0);
      assertThat(details.getBucketTransferBytes()).isGreaterThan(0);
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(8);

      long value = 0;
      Set<PartitionMemberInfo> beforeDetails = details.getPartitionMemberDetailsAfter();
      for (PartitionMemberInfo memberDetails : beforeDetails) {
        value += memberDetails.getSize();
      }

      Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
      assertThat(afterDetails).hasSize(3);

      long afterSize = 0;
      for (PartitionMemberInfo memberDetails : afterDetails) {
        assertThat(memberDetails.getBucketCount()).isEqualTo(8);
        assertThat(memberDetails.getPrimaryCount()).isEqualTo(4);
        afterSize += memberDetails.getSize();
      }
      assertThat(afterSize).isEqualTo(value);

      validateStatistics(manager, results);
    });

    for (VM vm : toArray(vm0, vm1, vm2)) {
      vm.invoke(() -> {
        Region region = getCache().getRegion("region1");

        PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
        assertThat(details.getCreatedBucketCount()).isEqualTo(12);
        assertThat(details.getActualRedundantCopies()).isEqualTo(1);
        assertThat(details.getLowRedundancyBucketCount()).isEqualTo(0);

        for (PartitionMemberInfo memberDetails : details.getPartitionMemberInfo()) {
          assertThat(memberDetails.getBucketCount()).isEqualTo(8);
          assertThat(memberDetails.getPrimaryCount()).isEqualTo(4);
        }
      });
    }

    // Create the region in one more VM.
    vm3.invoke(
        () -> createPartitionedRegion("region1", createLRUEntryAttributes(1, OVERFLOW_TO_DISK)));

    // Do another rebalance
    vm0.invoke(() -> {
      ResourceManager manager = getCache().getResourceManager();
      RebalanceResults results = doRebalance(false, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(0);

      // We don't know how many primaries will move, it depends on
      // if the move bucket code moves the primary or a redundant bucket
      // assertIndexDetailsEquals(0, results.getTotalPrimaryTransfersCompleted());
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(6);
      assertThat(results.getTotalBucketTransferBytes()).isGreaterThan(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(0);
      assertThat(details.getBucketTransferBytes()).isGreaterThan(0);
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(6);

      long totalSize = 0;
      Set<PartitionMemberInfo> beforeDetails = details.getPartitionMemberDetailsAfter();
      for (PartitionMemberInfo memberDetails : beforeDetails) {
        totalSize += memberDetails.getSize();
      }

      Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
      assertThat(afterDetails).hasSize(4);

      long afterSize = 0;
      for (PartitionMemberInfo memberDetails : afterDetails) {
        assertThat(memberDetails.getBucketCount()).isEqualTo(6);

        afterSize += memberDetails.getSize();
      }
      assertThat(afterSize).isEqualTo(totalSize);
    });

    for (VM vm : toArray(vm0, vm1, vm2)) {
      vm.invoke(() -> {
        Region region = getCache().getRegion("region1");

        PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
        assertThat(details.getCreatedBucketCount()).isEqualTo(12);
        assertThat(details.getActualRedundantCopies()).isEqualTo(1);
        assertThat(details.getLowRedundancyBucketCount()).isEqualTo(0);

        for (PartitionMemberInfo memberDetails : details.getPartitionMemberInfo()) {
          assertThat(memberDetails.getBucketCount()).isEqualTo(6);
        }
      });
    }
  }

  /**
   * Test to make sure we balance buckets between three hosts with redundancy
   */
  @Test
  public void testMoveBucketsNestedPR() {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);
    VM vm2 = getVM(2);

    // Create the region in two VMs
    for (VM vm : toArray(vm0, vm1)) {
      vm.invoke(() -> {
        Region parent = getCache().createRegionFactory(REPLICATE).create("parent");
        createPartitionedRegionAsSubregion(parent, "region1");
      });
    }

    // Create some buckets
    vm0.invoke(() -> {
      Region<Number, String> region = getCache().getRegion("parent/region1");
      for (int i = 0; i < 12; i++) {
        region.put(i, "A");
      }
    });

    // Create the region in one more VM.
    vm2.invoke(() -> {
      Region parent = getCache().createRegionFactory(REPLICATE).create("parent");
      createPartitionedRegionAsSubregion(parent, "region1");
    });

    // Now simulate a rebalance
    long totalSize = vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(false, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(0);

      // We don't know how many primaries will move, it depends on
      // if the move bucket code moves the primary or a redundant bucket
      // assertIndexDetailsEquals(0, results.getTotalPrimaryTransfersCompleted());
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(8);
      assertThat(results.getTotalBucketTransferBytes()).isGreaterThan(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(0);
      assertThat(0 < details.getBucketTransferBytes()).isTrue();
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(8);

      long value = 0;
      Set<PartitionMemberInfo> beforeDetails = details.getPartitionMemberDetailsAfter();
      for (PartitionMemberInfo memberDetails : beforeDetails) {
        value += memberDetails.getSize();
      }

      Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
      assertThat(afterDetails).hasSize(3);

      long afterSize = 0;
      for (PartitionMemberInfo memberDetails : afterDetails) {
        assertThat(memberDetails.getBucketCount()).isEqualTo(8);
        assertThat(memberDetails.getPrimaryCount()).isEqualTo(4);
        afterSize += memberDetails.getSize();
      }
      assertThat(afterSize).isEqualTo(value);
      validateStatistics(manager, results);

      return value;
    });

    for (VM vm : toArray(vm0, vm1, vm2)) {
      vm.invoke(() -> {
        Region region = getCache().getRegion("parent/region1");

        PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
        assertThat(details.getCreatedBucketCount()).isEqualTo(12);
        assertThat(details.getActualRedundantCopies()).isEqualTo(1);
        assertThat(details.getLowRedundancyBucketCount()).isEqualTo(0);

        long afterSize = 0;
        for (PartitionMemberInfo memberDetails : details.getPartitionMemberInfo()) {
          assertThat(memberDetails.getBucketCount()).isEqualTo(8);
          assertThat(memberDetails.getPrimaryCount()).isEqualTo(4);
          afterSize += memberDetails.getSize();
        }
        assertThat(afterSize).isEqualTo(totalSize);
      });
    }
  }

  /**
   * Test to make sure that we move buckets to balance between three hosts with a redundancy of 1
   * and two colocated regions. Makes sure that the buckets stay colocated when we move them.
   */
  @Test
  @Parameters({"true", "false"})
  @TestCaseName("{method}(simulate={0})")
  public void testMoveBucketsColocatedRegions(boolean simulate) {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);
    VM vm2 = getVM(2);

    vm0.invoke(() -> createPartitionedRegion("region1", 1, 200));
    vm0.invoke(() -> createPartitionedRegion("region2", 200, "region1"));
    vm1.invoke(() -> createPartitionedRegion("region1", 1, 200));
    vm1.invoke(() -> createPartitionedRegion("region2", 200, "region1"));

    // Create some buckets.
    vm0.invoke(() -> {
      Region<Number, String> region1 = getCache().getRegion("region1");
      Region<Number, String> region2 = getCache().getRegion("region2");
      for (int key = 0; key < 12; key++) {
        region1.put(key, "A");
        region2.put(key, "A");
      }
    });

    // create the leader region, but not the colocated region in one of the VMs.
    vm2.invoke(() -> createPartitionedRegion("region1", 1, 200));

    // Simulate a rebalance, and make sure we don't
    // move any buckets yet, because we don't have
    // the colocated region in the new VMs.
    vm0.invoke(() -> {
      ResourceManager manager = getCache().getResourceManager();
      RebalanceResults results = doRebalance(simulate, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(0);
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(0);
      assertThat(results.getTotalBucketTransferBytes()).isEqualTo(0);
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(2);

      for (PartitionRebalanceInfo details : detailSet) {
        assertThat(details.getBucketCreatesCompleted()).isEqualTo(0);
        assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(0);
        assertThat(details.getBucketTransferBytes()).isEqualTo(0);
        assertThat(details.getBucketTransfersCompleted()).isEqualTo(0);
      }
    });

    // now create the colocated region in the last VM.
    vm2.invoke(() -> createPartitionedRegion("region2", 200, "region1"));

    vm0.invoke(() -> {
      ResourceManager manager = getCache().getResourceManager();
      RebalanceResults results = doRebalance(simulate, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(0);
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(16);
      assertThat(results.getTotalBucketTransferBytes()).isGreaterThan(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(2);

      for (PartitionRebalanceInfo details : detailSet) {
        assertThat(details.getBucketCreatesCompleted()).isEqualTo(0);
        assertThat(details.getBucketTransferBytes()).isGreaterThan(0);
        assertThat(details.getBucketTransfersCompleted()).isEqualTo(8);

        Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
        assertThat(afterDetails).hasSize(3);

        for (PartitionMemberInfo memberDetails : afterDetails) {
          assertThat(memberDetails.getBucketCount()).isEqualTo(8);
          assertThat(memberDetails.getPrimaryCount()).isEqualTo(4);
        }
      }
    });

    if (!simulate) {
      for (VM vm : toArray(vm0, vm1, vm2)) {
        vm.invoke(() -> {
          PartitionedRegion region1 = (PartitionedRegion) getCache().getRegion("region1");
          PartitionedRegion region2 = (PartitionedRegion) getCache().getRegion("region2");

          PartitionRegionInfo details =
              PartitionRegionHelper.getPartitionRegionInfo(getCache().getRegion("region1"));
          assertThat(details).isNotNull();
          assertThat(details.getCreatedBucketCount()).isEqualTo(12);
          assertThat(details.getActualRedundantCopies()).isEqualTo(1);
          assertThat(details.getLowRedundancyBucketCount()).isEqualTo(0);

          details = PartitionRegionHelper.getPartitionRegionInfo(getCache().getRegion("region2"));
          assertThat(details).isNotNull();
          assertThat(details.getCreatedBucketCount()).isEqualTo(12);
          assertThat(details.getActualRedundantCopies()).isEqualTo(1);
          assertThat(details.getLowRedundancyBucketCount()).isEqualTo(0);

          assertThat(region2.getLocalPrimaryBucketsListTestOnly())
              .isEqualTo(region1.getLocalPrimaryBucketsListTestOnly());

          assertThat(region2.getLocalBucketsListTestOnly())
              .isEqualTo(region1.getLocalBucketsListTestOnly());
        });
      }
    }
  }

  /**
   * Test to ensure that we wait for in progress write operations before moving a primary.
   */
  @Test
  @Parameters({"PUT", "INVALIDATE", "DESTROY", "CACHE_LOADER"})
  @TestCaseName("{method}(operation={0})")
  public void runTestWaitForOperation(OperationEnum operation) throws Exception {
    VM vm1 = getVM(1);

    // Create a region in this VM with a cache writer
    // and cache loader
    createPartitionedRegion("region1", 1, 100, (CacheLoader<Number, String>) helper1 -> "anobject");
    Region<Number, String> region = getCache().getRegion("region1");

    // create some buckets
    region.put(1, "A");
    region.put(2, "A");

    BlockingCacheListener<Number, String> cacheWriter = new BlockingCacheListener<>(2);
    region.getAttributesMutator().addCacheListener(cacheWriter);

    // start two threads doing operations, one on each bucket
    // the threads will block on the cache writer. The rebalance operation
    // will try to move one of these buckets, but it shouldn't
    // be able to because of the in progress operation.
    executorServiceRule.submit(() -> operation.execute(region, 1));
    executorServiceRule.submit(() -> operation.execute(region, 2));

    cacheWriter.waitForOperationsStarted();

    // make sure we can tell that the buckets have low redundancy
    validateRedundancy("region1", 2, 0, 2);

    // Create the region in the other VM (should have no effect)
    vm1.invoke(() -> createPartitionedRegion("region1", 1, 100,
        (CacheLoader<Number, String>) helper1 -> "anobject"));

    // Make sure we still have low redundancy
    validateRedundancy("region1", 2, 0, 2);

    ResourceManager manager = getCache().getResourceManager();
    RebalanceOperation rebalance = manager.createRebalanceFactory().start();

    Throwable thrown = catchThrowable(() -> rebalance.getResults(5, SECONDS));
    assertThat(thrown).isInstanceOf(TimeoutException.class);

    cacheWriter.release();

    RebalanceResults results = rebalance.getResults(TIMEOUT_SECONDS, SECONDS);
    assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(2);
    assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(1);
    assertThat(results.getTotalBucketTransferBytes()).isEqualTo(0);
    assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(0);

    Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
    assertThat(detailSet).hasSize(1);

    PartitionRebalanceInfo details = detailSet.iterator().next();
    assertThat(details.getBucketCreatesCompleted()).isEqualTo(2);
    assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(1);
    assertThat(details.getBucketTransferBytes()).isEqualTo(0);
    assertThat(details.getBucketTransfersCompleted()).isEqualTo(0);

    Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
    assertThat(afterDetails).hasSize(2);

    for (PartitionMemberInfo memberDetails : afterDetails) {
      assertThat(memberDetails.getBucketCount()).isEqualTo(2);
      assertThat(memberDetails.getPrimaryCount()).isEqualTo(1);

      validateRedundancy("region1", 2, 1, 0);
      vm1.invoke(() -> validateRedundancy("region1", 2, 1, 0));
    }
  }

  @Test
  @Parameters({"true,false", "false,false", "true,true", "false,true"})
  @TestCaseName("{method}(simulate={0}, userAccessor={1})")
  public void testRecoverRedundancyWithOfflinePersistence(boolean simulate, boolean useAccessor)
      throws Exception {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);
    VM vm2 = getVM(2);
    VM vm3 = getVM(3);

    // Create the region in only 2 VMs
    for (VM vm : toArray(vm0, vm1)) {
      vm.invoke(() -> createPersistentPartitionedRegion("region1", getUniqueName(), getDiskDirs()));
    }

    VM rebalanceVM = useAccessor ? vm3 : vm0;
    if (useAccessor) {
      // Create an accessor
      rebalanceVM.invoke(() -> createAccessor("region1", "ds1"));
    }

    // Create some buckets
    vm0.invoke(() -> {
      Region<Number, String> region = getCache().getRegion("region1");
      region.put(1, "A");
      region.put(2, "A");
      region.put(3, "A");
      region.put(4, "A");
      region.put(5, "A");
      region.put(6, "A");
    });

    // Close the cache in vm1
    Set<Integer> bucketsOnVM1 = vm1.invoke(() -> getBucketList("region1"));
    vm1.invoke(() -> getCache().getRegion("region1").close());

    // make sure we can tell that the buckets have low redundancy
    vm0.invoke(() -> validateRedundancy("region1", 6, 0, 6));

    // Now create the cache in another member
    vm2.invoke(() -> createPersistentPartitionedRegion("region1", getUniqueName(), getDiskDirs()));

    // Make sure we still have low redundancy
    vm0.invoke(() -> validateRedundancy("region1", 6, 0, 6));

    /*
     * Simulates a rebalance if simulation flag is set. Otherwise, performs a rebalance.
     *
     * A rebalance will replace offline buckets, so this should restore redundancy
     */
    rebalanceVM.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(simulate, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(6);
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(3);
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(6);
      assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(3);
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(0);

      Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
      assertThat(afterDetails).hasSize(2);

      for (PartitionMemberInfo memberDetails : afterDetails) {
        assertThat(memberDetails.getBucketCount()).isEqualTo(6);
        assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
      }

      if (!simulate) {
        validateStatistics(manager, results);
      }
    });

    Set<Integer> bucketsOnVM0 = vm0.invoke(() -> getBucketList("region1"));
    Set<Integer> bucketsOnVM2 = vm2.invoke(() -> getBucketList("region1"));

    if (simulate) {
      // Otherwise, we should still have broken redundancy at this point
      vm0.invoke(() -> validateRedundancy("region1", 6, 0, 6));
    } else {
      // Make sure redundancy is repaired if not simulated
      vm0.invoke(() -> validateRedundancy("region1", 6, 1, 0));
    }

    vm2.invoke(() -> getCache().getRegion("region1").close());
    vm0.invoke(() -> getCache().getRegion("region1").close());

    if (useAccessor) {
      vm3.invoke(() -> getCache().getRegion("region1").close());
    }

    // We need to restart both VMs at the same time, because
    // they will wait for each other before allowing operations.
    AsyncInvocation createRegionOnVM0 = vm0.invokeAsync(
        () -> createPersistentPartitionedRegion("region1", getUniqueName(), getDiskDirs()));
    AsyncInvocation createRegionOnVM2 = vm2.invokeAsync(
        () -> createPersistentPartitionedRegion("region1", getUniqueName(), getDiskDirs()));

    createRegionOnVM0.await();
    createRegionOnVM2.await();

    if (useAccessor) {
      vm3.invoke(() -> createAccessor("region1", "ds1"));
    }

    // pause for async bucket recovery threads to finish their work. Otherwise
    // the rebalance op may think that the other member doesn't have buckets, then
    // ask it to create them and get a negative reply because it actually does
    // have the buckets, causing the test to fail

    // Try to rebalance again. This shouldn't do anything, because
    // we already recovered redundancy earlier.
    // Note that we don't check the primary balance. This rebalance
    // has a race with the redundancy recovery thread, which is also trying
    // to rebalance primaries. So this thread might end up rebalancing primaries,
    // or it might not.
    if (!simulate) {
      rebalanceVM.invoke(() -> {
        ResourceManager manager = getCache().getResourceManager();

        RebalanceResults results = doRebalance(simulate, manager);

        assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(0);
        assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(0);

        Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
        assertThat(detailSet).hasSize(1);

        PartitionRebalanceInfo details = detailSet.iterator().next();
        assertThat(details.getBucketCreatesCompleted()).isEqualTo(0);
        assertThat(details.getBucketTransfersCompleted()).isEqualTo(0);

        Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
        assertThat(afterDetails).hasSize(2);

        for (PartitionMemberInfo memberDetails : afterDetails) {
          assertThat(memberDetails.getBucketCount()).isEqualTo(6);
          assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
        }
      });

      // Redundancy should be repaired.
      vm0.invoke(() -> validateRedundancy("region1", 6, 1, 0));
    }

    vm1.invoke(() -> createPersistentPartitionedRegion("region1", getUniqueName(), getDiskDirs()));

    // Look at vm0 buckets.
    assertThat(vm0.invoke(() -> getBucketList("region1"))).isEqualTo(bucketsOnVM0);

    /*
     * Look at vm1 buckets.
     */
    if (simulate) {
      /*
       * No rebalancing above because the simulation flag is on. Therefore, vm1 will have recovered
       * its buckets. We need to wait for the buckets because they might still be in the middle of
       * creation in the background
       */
      vm1.invoke(() -> waitForBucketList("region1", bucketsOnVM1));
    } else {
      /*
       * vm1 should have no buckets because offline buckets were recovered when vm0 and vm2 were
       * rebalanced above.
       */
      assertThat(vm1.invoke(() -> getBucketList("region1"))).isEmpty();
    }

    // look at vm2 buckets
    assertThat(vm2.invoke(() -> getBucketList("region1"))).isEqualTo(bucketsOnVM2);
  }

  @Test
  @Parameters({"true", "false"})
  @TestCaseName("{method}(simulate={0})")
  public void testMoveBucketsWithUnrecoveredValues(boolean simulate) {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);

    // Create the region in only 1 VM
    vm0.invoke(() -> createPersistentPartitionedRegion("region1", "store", getDiskDirs(),
        new NullReturningLoader<>()));

    // Create some buckets
    vm0.invoke(() -> {
      Region<Number, String> region = getCache().getRegion("region1");
      region.put(1, "A");
      region.put(2, "A");
      region.put(3, "A");
      region.put(4, "A");
      region.put(5, "A");
      region.put(6, "A");
    });

    vm0.invoke(() -> {
      PartitionedRegion region = (PartitionedRegion) getCache().getRegion("region1");
      PartitionedRegionDataStore dataStore = region.getDataStore();

      for (int i = 1; i <= 6; i++) {
        BucketRegion bucket = dataStore.getLocalBucketById(i);

        assertThat(bucket.getNumOverflowBytesOnDisk()).isEqualTo(0);
        assertThat(bucket.getNumOverflowOnDisk()).isEqualTo(0);
        assertThat(bucket.getNumEntriesInVM()).isEqualTo(1);
      }
      getCache().close();
    });

    // now recover the region
    vm0.invoke(() -> createPersistentPartitionedRegion("region1", "store", getDiskDirs(),
        new NullReturningLoader<>()));

    vm0.invoke(() -> {
      PartitionedRegion region = (PartitionedRegion) getCache().getRegion("region1");
      PartitionedRegionDataStore dataStore = region.getDataStore();

      for (int i = 1; i <= 6; i++) {
        BucketRegion bucket = dataStore.getLocalBucketById(i);

        assertThat(bucket.getNumOverflowOnDisk()).isEqualTo(1);
        assertThat(bucket.getNumEntriesInVM()).isEqualTo(0);

        // the size recorded on disk is not the same is the in memory size, apparently
        assertThat(bucket.getNumOverflowBytesOnDisk())
            .as("Bucket size was " + bucket.getNumOverflowBytesOnDisk())
            .isGreaterThan(1);
        assertThat(bucket.getTotalBytes()).isEqualTo(bucket.getNumOverflowBytesOnDisk());
      }
    });

    // Create the region in the other VM (should have no effect)
    vm1.invoke(() -> createPersistentPartitionedRegion("region1", "store", getDiskDirs(),
        new NullReturningLoader<>()));

    // Now simulate a rebalance
    vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(simulate, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(0);
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(0);
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(3);
      assertThat(results.getTotalBucketTransferBytes())
          .as("Transferred Bytes = " + results.getTotalBucketTransferBytes())
          .isGreaterThan(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(0);
      assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(0);
      assertThat(details.getBucketTransferBytes()).isGreaterThan(0);
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(3);

      Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
      assertThat(afterDetails).hasSize(2);

      for (PartitionMemberInfo memberDetails : afterDetails) {
        assertThat(memberDetails.getBucketCount()).isEqualTo(3);
        assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
      }

      if (!simulate) {
        validateStatistics(manager, results);
      }
    });

    if (!simulate) {
      for (VM vm : toArray(vm0, vm1)) {
        vm.invoke(() -> {
          Region region = getCache().getRegion("region1");

          PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
          assertThat(details.getCreatedBucketCount()).isEqualTo(6);
          assertThat(details.getActualRedundantCopies()).isEqualTo(0);
          assertThat(details.getLowRedundancyBucketCount()).isEqualTo(0);
          assertThat(details.getPartitionMemberInfo()).hasSize(2);

          for (PartitionMemberInfo memberDetails : details.getPartitionMemberInfo()) {
            assertThat(memberDetails.getBucketCount()).isEqualTo(3);
            assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
          }

          // check to make sure that moving buckets didn't close the cache loader
          NullReturningLoader loader =
              (NullReturningLoader) getCache().getRegion("region1").getAttributes()
                  .getCacheLoader();
          assertThat(loader.isClosed()).isFalse();
        });
      }

      for (VM vm : toArray(vm0, vm1)) {
        vm.invoke(() -> {
          NullReturningLoader loader =
              (NullReturningLoader) getCache().getRegion("region1").getAttributes()
                  .getCacheLoader();
          assertThat(loader.isClosed()).isFalse();

          // check to make sure that closing the PR closes the cache loader
          getCache().getRegion("region1").close();
          assertThat(loader.isClosed()).isTrue();
        });
      }
    }
  }

  @Test
  @Parameters({"true", "false"})
  @TestCaseName("{method}(simulate={0})")
  public void testBalanceBucketsByCount(boolean simulate) {
    VM vm0 = getVM(0);
    VM vm1 = getVM(1);

    // Cache is closed so loadProbeToRestore does not need to be restored
    vm0.invoke(
        () -> getCache().getInternalResourceManager().setLoadProbe(new BucketCountLoadProbe()));

    // Create the region in only 1 VM
    vm0.invoke(() -> createPartitionedRegion("region1", 0, new NullReturningLoader()));

    // Create some buckets with very uneven sizes
    vm0.invoke(() -> {
      Region<Number, Object> region = getCache().getRegion("region1");
      region.put(1, new byte[1024 * 1024]);
      region.put(2, "A");
      region.put(3, "A");
      region.put(4, "A");
      region.put(5, "A");
      region.put(6, "A");
    });

    // Create the region in the other VM (should have no effect)
    vm1.invoke(() -> createPartitionedRegion("region1", 0, new NullReturningLoader()));

    // Now simulate a rebalance
    vm0.invoke(() -> {
      InternalResourceManager manager = getCache().getInternalResourceManager();
      RebalanceResults results = doRebalance(simulate, manager);

      assertThat(results.getTotalBucketCreatesCompleted()).isEqualTo(0);
      assertThat(results.getTotalPrimaryTransfersCompleted()).isEqualTo(0);
      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(3);
      assertThat(results.getTotalBucketTransferBytes()).isGreaterThan(0);

      Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
      assertThat(detailSet).hasSize(1);

      PartitionRebalanceInfo details = detailSet.iterator().next();
      assertThat(details.getBucketCreatesCompleted()).isEqualTo(0);
      assertThat(details.getPrimaryTransfersCompleted()).isEqualTo(0);
      assertThat(details.getBucketTransferBytes()).isGreaterThan(0);
      assertThat(details.getBucketTransfersCompleted()).isEqualTo(3);

      Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
      assertThat(afterDetails).hasSize(2);

      for (PartitionMemberInfo memberDetails : afterDetails) {
        assertThat(memberDetails.getBucketCount()).isEqualTo(3);
        assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
      }

      if (!simulate) {
        validateStatistics(manager, results);
      }
    });

    if (!simulate) {
      for (VM vm : toArray(vm0, vm1)) {
        vm.invoke(() -> {
          Region region = getCache().getRegion("region1");

          PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
          assertThat(details.getCreatedBucketCount()).isEqualTo(6);
          assertThat(details.getActualRedundantCopies()).isEqualTo(0);
          assertThat(details.getLowRedundancyBucketCount()).isEqualTo(0);
          assertThat(details.getPartitionMemberInfo()).hasSize(2);

          for (PartitionMemberInfo memberDetails : details.getPartitionMemberInfo()) {
            assertThat(memberDetails.getBucketCount()).isEqualTo(3);
            assertThat(memberDetails.getPrimaryCount()).isEqualTo(3);
          }

          // check to make sure that moving buckets didn't close the cache loader
          NullReturningLoader loader =
              (NullReturningLoader) getCache().getRegion("region1").getAttributes()
                  .getCacheLoader();
          assertThat(loader.isClosed()).isFalse();
        });
      }
    }
  }

  private void createPartitionedRegion(String regionName, EvictionAttributes evictionAttributes) {
    PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
    partitionAttributesFactory.setRedundantCopies(1);
    partitionAttributesFactory.setRecoveryDelay(-1);
    partitionAttributesFactory.setStartupRecoveryDelay(-1);

    RegionFactory regionFactory = getCache().createRegionFactory(PARTITION);
    regionFactory.setEvictionAttributes(evictionAttributes);
    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());

    regionFactory.create(regionName);
  }

  private <K, V> void createPartitionedRegion(String regionName, CacheLoader<K, V> cacheLoader) {
    PartitionAttributesFactory<K, V> partitionAttributesFactory =
        new PartitionAttributesFactory<>();
    partitionAttributesFactory.setRedundantCopies(0);
    partitionAttributesFactory.setRecoveryDelay(-1);
    partitionAttributesFactory.setStartupRecoveryDelay(-1);

    RegionFactory<K, V> regionFactory = getCache().createRegionFactory(PARTITION);
    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
    regionFactory.setCacheLoader(cacheLoader);

    regionFactory.create(regionName);
  }

  private void createPartitionedRegion(String regionName, int redundantCopies) {
    PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
    partitionAttributesFactory.setRedundantCopies(redundantCopies);
    partitionAttributesFactory.setRecoveryDelay(-1);
    partitionAttributesFactory.setStartupRecoveryDelay(-1);

    RegionFactory regionFactory = getCache().createRegionFactory(PARTITION);
    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());

    regionFactory.create(regionName);
  }

  /**
   * NOTE: do not replace createPartitionedRegionRunnable with lambda
   * because ResourceObserverAdapter is NOT serializable
   */
  private SerializableRunnableIF createPartitionedRegionRunnable(String regionName,
      int redundantCopies) {
    return new SerializableRunnable() {
      @Override
      public void run() {
        createPartitionedRegion(regionName, redundantCopies);
      }
    };
  }

  private void createPartitionedRegion(String region, int redundantCopies, int localMaxMemory) {
    PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
    partitionAttributesFactory.setLocalMaxMemory(localMaxMemory);
    partitionAttributesFactory.setRedundantCopies(redundantCopies);
    partitionAttributesFactory.setRecoveryDelay(-1);
    partitionAttributesFactory.setStartupRecoveryDelay(-1);

    RegionFactory regionFactory = getCache().createRegionFactory(PARTITION);
    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());

    regionFactory.create(region);
  }

  private <K, V> void createPartitionedRegion(String regionName, int redundantCopies,
      CacheLoader<K, V> cacheLoader) {
    PartitionAttributesFactory<K, V> partitionAttributesFactory =
        new PartitionAttributesFactory<>();
    partitionAttributesFactory.setRedundantCopies(redundantCopies);
    partitionAttributesFactory.setRecoveryDelay(-1);
    partitionAttributesFactory.setStartupRecoveryDelay(-1);

    RegionFactory<K, V> regionFactory = getCache().createRegionFactory(PARTITION);
    regionFactory.setCacheLoader(cacheLoader);
    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());

    regionFactory.create(regionName);
  }

  private <K, V> void createPartitionedRegion(String regionName, int redundantCopies,
      int localMaxMemory, CacheLoader<K, V> cacheLoader) {
    PartitionAttributesFactory<K, V> partitionAttributesFactory =
        new PartitionAttributesFactory<>();
    partitionAttributesFactory.setRedundantCopies(redundantCopies);
    partitionAttributesFactory.setRecoveryDelay(-1);
    partitionAttributesFactory.setStartupRecoveryDelay(-1);
    partitionAttributesFactory.setLocalMaxMemory(localMaxMemory);

    RegionFactory<K, V> regionFactory = getCache().createRegionFactory(PARTITION);
    regionFactory.setCacheLoader(cacheLoader);
    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());

    regionFactory.create(regionName);
  }

  private void createPartitionedRegion(String region, int localMaxMemory, String colocatedWith) {
    PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
    partitionAttributesFactory.setLocalMaxMemory(localMaxMemory);
    partitionAttributesFactory.setRedundantCopies(1);
    partitionAttributesFactory.setRecoveryDelay(-1);
    partitionAttributesFactory.setStartupRecoveryDelay(-1);
    partitionAttributesFactory.setColocatedWith(colocatedWith);

    RegionFactory regionFactory = getCache().createRegionFactory(PARTITION);
    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());

    regionFactory.create(region);
  }

  private void createPersistentPartitionedRegion(String regionName, String diskStoreName,
      File[] diskDirs) {
    DiskStoreFactory diskStoreFactory = getCache().createDiskStoreFactory();
    diskStoreFactory.setDiskDirs(diskDirs).create(diskStoreName);

    PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
    partitionAttributesFactory.setRedundantCopies(1);
    partitionAttributesFactory.setRecoveryDelay(-1);
    partitionAttributesFactory.setStartupRecoveryDelay(-1);

    RegionFactory regionFactory = getCache().createRegionFactory(PARTITION_PERSISTENT);
    regionFactory.setDiskSynchronous(true);
    regionFactory.setDiskStoreName(diskStoreName);
    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());

    regionFactory.create(regionName);
  }

  private <K, V> void createPersistentPartitionedRegion(String regionName, String diskStoreName,
      File[] diskDirs, CacheLoader<K, V> cacheLoader) {
    System.setProperty(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, "false");
    try {
      DiskStoreFactory diskStoreFactory = getCache().createDiskStoreFactory();
      diskStoreFactory.setDiskDirs(diskDirs).setMaxOplogSize(1).create(diskStoreName);

      PartitionAttributesFactory<K, V> partitionAttributesFactory =
          new PartitionAttributesFactory<>();
      partitionAttributesFactory.setRedundantCopies(0);
      partitionAttributesFactory.setRecoveryDelay(-1);
      partitionAttributesFactory.setStartupRecoveryDelay(-1);

      RegionFactory<K, V> regionFactory = getCache().createRegionFactory(PARTITION_PERSISTENT);
      regionFactory.setCacheLoader(cacheLoader);
      regionFactory.setDiskStoreName(diskStoreName);
      regionFactory.setPartitionAttributes(partitionAttributesFactory.create());

      regionFactory.create(regionName);
    } finally {
      System.setProperty(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, "true");
    }
  }

  private void createAccessor(String regionName, String diskStoreName) {
    DiskStoreFactory diskStoreFactory = getCache().createDiskStoreFactory();
    diskStoreFactory.setDiskDirs(getDiskDirs()).create(diskStoreName);

    PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
    partitionAttributesFactory.setRedundantCopies(1);
    partitionAttributesFactory.setRecoveryDelay(-1);
    partitionAttributesFactory.setStartupRecoveryDelay(-1);
    partitionAttributesFactory.setLocalMaxMemory(0);

    AttributesFactory attr = new AttributesFactory();
    attr.setPartitionAttributes(partitionAttributesFactory.create());

    getCache().createRegion(regionName, attr.create());
  }

  private void createPartitionedRegionAsSubregion(Region parent, String regionName) {
    PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
    partitionAttributesFactory.setRedundantCopies(1);
    partitionAttributesFactory.setRecoveryDelay(-1);
    partitionAttributesFactory.setStartupRecoveryDelay(-1);

    RegionFactory regionFactory = getCache().createRegionFactory(PARTITION);
    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
    regionFactory.createSubregion(parent, regionName);
  }

  private void createTwoRegionsWithParallelRecoveryObserver() {
    ParallelRecoveryObserver ob =
        (ParallelRecoveryObserver) InternalResourceManager.getResourceObserver();

    ob.observeRegion("region1");
    ob.observeRegion("region2");

    createPartitionedRegion("region1", 1);
    createPartitionedRegion("region2", 1);
  }

  private void createPRRegionWithAsyncQueue(int localMaxMemory) {
    // Create an async event listener that doesn't dispatch anything

    // NOTE: "new AsyncEventListener" cannot be converted to a lambda -- only one class is
    // allowed per cluster and the lambda may create a different class name in different VMs.
    getCache().createAsyncEventQueueFactory()
        .setMaximumQueueMemory(1)
        .setParallel(true)
        .create("parallelQueue", new AsyncEventListener() {
          @Override
          public boolean processEvents(List<AsyncEvent> events) {
            try {
              Thread.sleep(100);
            } catch (InterruptedException e) {
              return false;
            }
            return false;
          }
        });

    PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
    partitionAttributesFactory.setRedundantCopies(1);
    partitionAttributesFactory.setRecoveryDelay(-1);
    partitionAttributesFactory.setStartupRecoveryDelay(-1);
    partitionAttributesFactory.setLocalMaxMemory(localMaxMemory);

    RegionFactory regionFactory = getCache().createRegionFactory(PARTITION);
    regionFactory.addAsyncEventQueueId("parallelQueue");
    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());

    regionFactory.create("region1");
  }

  private void doPuts(String regionName) {
    Region<Number, String> region = getCache().getRegion(regionName);
    region.put(1, "A");
    region.put(2, "A");
    region.put(3, "A");
    region.put(4, "A");
    region.put(5, "A");
    region.put(6, "A");
  }

  private void setRedundancyZone(String zone) {
    System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "resource.manager.threads", "2");

    Properties props = new Properties();
    props.setProperty(REDUNDANCY_ZONE, zone);

    getCache(props);
  }

  private RebalanceResults doRebalance(boolean simulate, ResourceManager manager)
      throws TimeoutException, InterruptedException {
    return doRebalance(simulate, manager, null, null);
  }

  private RebalanceResults doRebalance(boolean simulate, ResourceManager manager,
      Set<String> includes, Set<String> excludes) throws TimeoutException, InterruptedException {
    RebalanceResults results;
    if (simulate) {
      results = manager.createRebalanceFactory().includeRegions(includes).excludeRegions(excludes)
          .simulate().getResults(TIMEOUT_SECONDS, SECONDS);
    } else {
      results = manager.createRebalanceFactory().includeRegions(includes).excludeRegions(excludes)
          .start().getResults(TIMEOUT_SECONDS, SECONDS);
    }
    assertThat(manager.getRebalanceOperations()).isEmpty();
    return results;
  }

  private Set<Integer> getBucketList(String regionName) {
    PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName);
    return new TreeSet<>(region.getDataStore().getAllLocalBucketIds());
  }

  private void waitForBucketList(String regionName, Collection<Integer> expected) {
    PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName);

    await().untilAsserted(new WaitCriterion() {

      @Override
      public boolean done() {
        return getBuckets().equals(expected);
      }

      private Set<Integer> getBuckets() {
        return new TreeSet<>(region.getDataStore().getAllLocalBucketIds());
      }

      @Override
      public String description() {
        return "Timeout waiting for buckets to match. Expected " + expected + " but got "
            + getBuckets();
      }
    });
  }

  private void validateRedundancy(String regionName, int expectedCreatedBucketCount,
      int expectedRedundantCopies, int expectedLowRedundancyBucketCount) {
    Region region = getCache().getRegion(regionName);
    PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);

    assertThat(details).isNotNull();
    assertThat(details.getCreatedBucketCount()).isEqualTo(expectedCreatedBucketCount);
    assertThat(details.getActualRedundantCopies()).isEqualTo(expectedRedundantCopies);
    assertThat(details.getLowRedundancyBucketCount()).isEqualTo(expectedLowRedundancyBucketCount);
  }

  private void validateRedundancyOfTwoRegions(String regionName1, String regionName2,
      int expectedCreatedBucketCount, int expectedRedundantCopies,
      int expectedLowRedundancyBucketCount) {
    Region region1 = getCache().getRegion(regionName1);
    Region region2 = getCache().getRegion(regionName2);

    validateRedundancyOfTwoRegions(region1, region2, expectedCreatedBucketCount,
        expectedRedundantCopies, expectedLowRedundancyBucketCount);
  }

  private void validateRedundancyOfTwoRegions(Region region1, Region region2,
      int expectedCreatedBucketCount, int expectedRedundantCopies,
      int expectedLowRedundancyBucketCount) {
    PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region1);
    assertThat(details).isNotNull();
    assertThat(details.getCreatedBucketCount()).isEqualTo(expectedCreatedBucketCount);
    assertThat(details.getActualRedundantCopies()).isEqualTo(expectedRedundantCopies);
    assertThat(details.getLowRedundancyBucketCount()).isEqualTo(expectedLowRedundancyBucketCount);

    details = PartitionRegionHelper.getPartitionRegionInfo(region2);
    assertThat(details).isNotNull();
    assertThat(details.getCreatedBucketCount()).isEqualTo(expectedCreatedBucketCount);
    assertThat(details.getActualRedundantCopies()).isEqualTo(expectedRedundantCopies);
    assertThat(details.getLowRedundancyBucketCount()).isEqualTo(expectedLowRedundancyBucketCount);
  }

  private void validateBucketCount(String regionName, int numLocalBuckets) {
    PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName);

    assertThat(region.getLocalBucketsListTestOnly()).hasSize(numLocalBuckets);
  }

  private void validateStatistics(InternalResourceManager manager, RebalanceResults results) {
    ResourceManagerStats stats = manager.getStats();

    assertThat(stats.getRebalancesInProgress())
        .isEqualTo(0);
    assertThat(stats.getRebalancesCompleted())
        .isEqualTo(1);
    assertThat(stats.getRebalanceBucketCreatesInProgress())
        .isEqualTo(0);
    assertThat(stats.getRebalanceBucketCreatesCompleted())
        .isEqualTo(results.getTotalBucketCreatesCompleted());
    assertThat(stats.getRebalanceBucketCreatesFailed())
        .isEqualTo(0);

    // The time stats may not be exactly the same, because they are not
    // incremented at exactly the same time.
    assertThat(TimeUnit.NANOSECONDS.toMillis(stats.getRebalanceBucketCreateTime()))
        .isCloseTo(results.getTotalBucketCreateTime(), within(2000L));
    assertThat(stats.getRebalanceBucketCreateBytes())
        .isEqualTo(results.getTotalBucketCreateBytes());
    assertThat(stats.getRebalanceBucketTransfersInProgress())
        .isEqualTo(0);
    assertThat(stats.getRebalanceBucketTransfersCompleted())
        .isEqualTo(results.getTotalBucketTransfersCompleted());
    assertThat(stats.getRebalanceBucketTransfersFailed())
        .isEqualTo(0);
    assertThat(TimeUnit.NANOSECONDS.toMillis(stats.getRebalanceBucketTransfersTime()))
        .isCloseTo(results.getTotalBucketTransferTime(), within(2000L));
    assertThat(stats.getRebalanceBucketTransfersBytes())
        .isEqualTo(results.getTotalBucketTransferBytes());
    assertThat(stats.getRebalancePrimaryTransfersInProgress())
        .isEqualTo(0);
    assertThat(stats.getRebalancePrimaryTransfersCompleted())
        .isEqualTo(results.getTotalPrimaryTransfersCompleted());
    assertThat(stats.getRebalancePrimaryTransfersFailed())
        .isEqualTo(0);
    assertThat(TimeUnit.NANOSECONDS.toMillis(stats.getRebalancePrimaryTransferTime()))
        .isCloseTo(results.getTotalPrimaryTransferTime(), within(2000L));
  }

  @FunctionalInterface
  private interface Operation {

    void execute(Region region, Integer key);
  }

  private enum OperationEnum implements Operation {
    PUT((region, key) -> region.put(key, "B")),
    INVALIDATE((region, key) -> region.invalidate(key)),
    DESTROY((region, key) -> region.destroy(key)),
    CACHE_LOADER((region, key) -> {
      PartitionedRegion partitionedRegion = (PartitionedRegion) region;
      // get a key that doesn't exist, but is in the same bucket as the given key
      region.get(key + partitionedRegion.getPartitionAttributes().getTotalNumBuckets());
    });

    private final Operation delegate;

    OperationEnum(Operation delegate) {
      this.delegate = delegate;
    }

    @Override
    public void execute(Region region, Integer key) {
      delegate.execute(region, key);
    }
  }

  private static class ParallelRecoveryObserver extends ResourceObserverAdapter {

    private final Set<String> regions = new HashSet<>();
    private final CyclicBarrier barrier;
    private volatile boolean observerCalled;

    private ParallelRecoveryObserver(int numRegions) {
      barrier = new CyclicBarrier(numRegions);
    }

    private void observeRegion(String region) {
      regions.add(region);
    }

    private void checkAllRegionRecoveryOrRebalanceStarted(String rn) {
      if (regions.contains(rn)) {
        try {
          barrier.await(TIMEOUT_SECONDS, SECONDS);
        } catch (BrokenBarrierException | InterruptedException | TimeoutException e) {
          throw new RuntimeException("failed waiting for barrier", e);
        }
        observerCalled = true;
      } else {
        throw new RuntimeException("region not registered " + rn);
      }
    }

    private boolean isObserverCalled() {
      return observerCalled;
    }

    @Override
    public void rebalancingStarted(Region region) {
      super.rebalancingStarted(region);
      checkAllRegionRecoveryOrRebalanceStarted(region.getName());
    }

    @Override
    public void recoveryStarted(Region region) {
      super.recoveryStarted(region);
      checkAllRegionRecoveryOrRebalanceStarted(region.getName());
    }
  }

  private static class BlockingCacheListener<K, V> extends CacheListenerAdapter<K, V> {

    private final CountDownLatch operationStartedLatch;
    private final CountDownLatch resumeOperationLatch = new CountDownLatch(1);

    private BlockingCacheListener(int threads) {
      operationStartedLatch = new CountDownLatch(threads);
    }

    private void waitForOperationsStarted() throws InterruptedException {
      operationStartedLatch.await(TIMEOUT_SECONDS, SECONDS);

    }

    private void doWait() {
      operationStartedLatch.countDown();
      try {
        resumeOperationLatch.await(TIMEOUT_SECONDS, SECONDS);
      } catch (InterruptedException e) {
        throw new CacheWriterException(e);
      }
    }

    @Override
    public void afterUpdate(EntryEvent event) throws CacheWriterException {
      doWait();
    }

    @Override
    public void afterCreate(EntryEvent event) {
      doWait();
    }

    @Override
    public void afterDestroy(EntryEvent event) {
      doWait();
    }

    @Override
    public void afterInvalidate(EntryEvent event) {
      doWait();
    }

    public void release() {
      resumeOperationLatch.countDown();
    }
  }

  private static class NullReturningLoader<K, V> implements CacheLoader<K, V> {

    private volatile boolean closed;

    @Override
    public V load(LoaderHelper helper) throws CacheLoaderException {
      return null;
    }

    public boolean isClosed() {
      return closed;
    }

    @Override
    public void close() {
      closed = true;
    }
  }
}
