/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.management;

import static com.jayway.jsonpath.matchers.JsonPathMatchers.hasJsonPath;
import static com.jayway.jsonpath.matchers.JsonPathMatchers.isJson;
import static com.jayway.jsonpath.matchers.JsonPathMatchers.withJsonPath;
import static org.apache.geode.cache.FixedPartitionAttributes.createFixedPartition;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.cache.query.Utils.createPortfoliosAndPositions;
import static org.apache.geode.management.internal.ManagementConstants.DEFAULT_QUERY_LIMIT;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.anything;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Set;

import javax.management.ObjectName;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.FixedPartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.partitioned.fixed.SingleHopQuarterPartitionResolver;
import org.apache.geode.management.internal.SystemManagementService;
import org.apache.geode.management.internal.beans.BeanUtilFuncs;
import org.apache.geode.management.internal.beans.QueryDataFunction;
import org.apache.geode.pdx.PdxInstance;
import org.apache.geode.pdx.PdxInstanceFactory;
import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.DistributedUseJacksonForJsonPathRule;
import org.apache.geode.test.junit.rules.serializable.SerializableTestName;

/**
 * Distributed tests for DistributedSystemMXBean#queryData(String, String, int).
 * <p>
 *
 * <pre>
 * Test Basic Json Strings for Partitioned Regions
 * Test Basic Json Strings for Replicated Regions
 * Test for all Region Types
 * Test for primitive types
 * Test for Nested Objects
 * Test for Enums
 * Test for collections
 * Test for huge collection
 * Test PDX types
 * Test different projects type e.g. SelectResult, normal bean etc..
 * Test Colocated Regions
 * Test for Limit ( both row count and Depth)
 * ORDER by orders
 * Test all attributes are covered in an complex type
 * </pre>
 */

@SuppressWarnings({"serial", "unused"})
public class QueryDataDUnitTest implements Serializable {

  private static final int NUM_OF_BUCKETS = 20;

  // PARTITIONED_REGION_NAME5 is co-located with PARTITIONED_REGION_NAME4
  private static final String PARTITIONED_REGION_NAME1 = "PARTITIONED_REGION_NAME1";
  private static final String PARTITIONED_REGION_NAME2 = "PARTITIONED_REGION_NAME2";
  private static final String PARTITIONED_REGION_NAME3 = "PARTITIONED_REGION_NAME3";
  private static final String PARTITIONED_REGION_NAME4 = "PARTITIONED_REGION_NAME4";
  private static final String PARTITIONED_REGION_NAME5 = "PARTITIONED_REGION_NAME5";

  private static final String REPLICATE_REGION_NAME1 = "REPLICATE_REGION_NAME1";
  private static final String REPLICATE_REGION_NAME2 = "REPLICATE_REGION_NAME2";
  private static final String REPLICATE_REGION_NAME3 = "REPLICATE_REGION_NAME3";
  private static final String REPLICATE_REGION_NAME4 = "REPLICATE_REGION_NAME4";

  private static final String LOCAL_REGION_NAME = "LOCAL_REGION_NAME";

  private static final String BIG_COLLECTION_ELEMENT_ = "BIG_COLLECTION_ELEMENT_";
  private static final String BIG_COLLECTION_ = "BIG_COLLECTION_";

  private static final String[] QUERIES =
      new String[] {"SELECT * FROM " + SEPARATOR + PARTITIONED_REGION_NAME1 + " WHERE ID >= 0",
          "SELECT * FROM " + SEPARATOR + PARTITIONED_REGION_NAME1 + " r1, " + SEPARATOR
              + PARTITIONED_REGION_NAME2
              + " r2 WHERE r1.ID = r2.ID",
          "SELECT * FROM " + SEPARATOR + PARTITIONED_REGION_NAME1 + " r1, " + SEPARATOR
              + PARTITIONED_REGION_NAME2
              + " r2 WHERE r1.ID = r2.ID AND r1.status = r2.status",
          "SELECT * FROM " + SEPARATOR + PARTITIONED_REGION_NAME1 + " r1, " + SEPARATOR
              + PARTITIONED_REGION_NAME2
              + " r2, " + SEPARATOR + PARTITIONED_REGION_NAME3
              + " r3 WHERE r1.ID = r2.ID AND r2.ID = r3.ID",
          "SELECT * FROM " + SEPARATOR + PARTITIONED_REGION_NAME1 + " r1, " + SEPARATOR
              + PARTITIONED_REGION_NAME2
              + " r2, " + SEPARATOR + PARTITIONED_REGION_NAME3 + " r3, " + SEPARATOR
              + REPLICATE_REGION_NAME1
              + " r4 WHERE r1.ID = r2.ID AND r2.ID = r3.ID AND r3.ID = r4.ID",
          "SELECT * FROM " + SEPARATOR + PARTITIONED_REGION_NAME4 + " r4, " + SEPARATOR
              + PARTITIONED_REGION_NAME5
              + " r5 WHERE r4.ID = r5.ID"};

  private static final String[] QUERIES_FOR_REPLICATED =
      new String[] {
          "<TRACE> SELECT * FROM " + SEPARATOR + REPLICATE_REGION_NAME1 + " WHERE ID >= 0",
          "SELECT * FROM " + SEPARATOR + REPLICATE_REGION_NAME1 + " r1, " + SEPARATOR
              + REPLICATE_REGION_NAME2
              + " r2 WHERE r1.ID = r2.ID",
          "SELECT * FROM " + SEPARATOR + REPLICATE_REGION_NAME3 + " WHERE ID >= 0"};

  private static final String[] QUERIES_FOR_LIMIT =
      new String[] {"SELECT * FROM " + SEPARATOR + REPLICATE_REGION_NAME4};

  private DistributedMember member1;
  private DistributedMember member2;
  private DistributedMember member3;

  @Member
  private VM[] memberVMs;

  @Manager
  private VM managerVM;

  @Rule
  public DistributedUseJacksonForJsonPathRule useJacksonForJsonPathRule =
      new DistributedUseJacksonForJsonPathRule();

  @Rule
  public ManagementTestRule managementTestRule =
      ManagementTestRule.builder().defineManagersFirst(false).start(true).build();

  @Rule
  public SerializableTestName testName = new SerializableTestName();

  @Before
  public void before() throws Exception {
    member1 = managementTestRule.getDistributedMember(memberVMs[0]);
    member2 = managementTestRule.getDistributedMember(memberVMs[1]);
    member3 = managementTestRule.getDistributedMember(memberVMs[2]);

    createRegionsInNodes();
    generateValuesInRegions();
  }

  @Test
  public void testQueryOnPartitionedRegion() {
    managerVM.invoke(testName.getMethodName(), () -> {
      DistributedSystemMXBean distributedSystemMXBean =
          managementTestRule.getSystemManagementService().getDistributedSystemMXBean();

      String jsonString = distributedSystemMXBean.queryData(QUERIES[0], null, 10);
      assertThat(jsonString).contains("result").doesNotContain("No Data Found");

      for (int i = 0; i < QUERIES.length; i++) {
        jsonString = distributedSystemMXBean.queryData(QUERIES[i], member1.getId(), 10);
        assertThat(jsonString).contains("result");
        assertThat(jsonString).contains("member");
        assertThat("QUERIES[" + i + "]", jsonString, isJson(withJsonPath("$..result", anything())));

        // TODO: create better assertions
        // assertThat("QUERIES[" + i + "]", result,
        // isJson(withJsonPath("$..member",
        // equalTo(JsonPath.compile(result)))));
        // //equalTo(new JSONObject().put(String.class.getName(), member1.getId())))));
        // System.out.println(JsonPath.read(jsonString, "$.result.*"));
        // System.out.println(JsonPath.read(jsonString, "$['result']['member']"));

        verifyJsonIsValid(jsonString);
      }
    });
  }

  @Test
  public void testQueryOnReplicatedRegion() {
    managerVM.invoke(testName.getMethodName(), () -> {
      DistributedSystemMXBean distributedSystemMXBean =
          managementTestRule.getSystemManagementService().getDistributedSystemMXBean();

      String jsonString = distributedSystemMXBean.queryData(QUERIES_FOR_REPLICATED[0], null, 10);
      assertThat(jsonString).contains("result").doesNotContain("No Data Found");

      for (int i = 0; i < QUERIES_FOR_REPLICATED.length; i++) {
        assertThat(jsonString).contains("result");
        verifyJsonIsValid(jsonString);
      }
    });
  }

  @Test
  public void testMemberWise() {
    managerVM.invoke(testName.getMethodName(), () -> {
      DistributedSystemMXBean distributedSystemMXBean =
          managementTestRule.getSystemManagementService().getDistributedSystemMXBean();

      byte[] bytes = distributedSystemMXBean.queryDataForCompressedResult(QUERIES_FOR_REPLICATED[0],
          member1.getId() + "," + member2.getId(), 2);
      String jsonString = BeanUtilFuncs.decompress(bytes);

      verifyJsonIsValid(jsonString);
    });
  }

  @Test
  public void testLimitForQuery() {
    memberVMs[0].invoke("putBigInstances", () -> putBigInstances(REPLICATE_REGION_NAME4));

    managerVM.invoke(testName.getMethodName(), () -> {
      DistributedSystemMXBean distributedSystemMXBean =
          managementTestRule.getSystemManagementService().getDistributedSystemMXBean();

      // Query With Default values
      assertThat(distributedSystemMXBean.getQueryCollectionsDepth())
          .isEqualTo(QueryDataFunction.DEFAULT_COLLECTION_ELEMENT_LIMIT);
      assertThat(distributedSystemMXBean.getQueryResultSetLimit()).isEqualTo(DEFAULT_QUERY_LIMIT);

      String jsonString = distributedSystemMXBean.queryData(QUERIES_FOR_LIMIT[0], null, 0);

      verifyJsonIsValid(jsonString);
      assertThat(jsonString).contains("result").doesNotContain("No Data Found");
      assertThat(jsonString).contains(BIG_COLLECTION_ELEMENT_);

      JsonNode jsonObject = new ObjectMapper().readTree(jsonString);
      JsonNode jsonArray = jsonObject.get("result");
      assertThat(jsonArray.size()).isEqualTo(DEFAULT_QUERY_LIMIT);

      // Get the ObjectValue
      JsonNode collectionObject = jsonArray.get(0).get(1);
      assertThat(collectionObject.size()).isEqualTo(100);

      // Query With Override Values
      int newQueryCollectionDepth = 150;
      int newQueryResultSetLimit = 500;

      distributedSystemMXBean.setQueryCollectionsDepth(newQueryCollectionDepth);
      distributedSystemMXBean.setQueryResultSetLimit(newQueryResultSetLimit);

      assertThat(distributedSystemMXBean.getQueryCollectionsDepth())
          .isEqualTo(newQueryCollectionDepth);
      assertThat(distributedSystemMXBean.getQueryResultSetLimit())
          .isEqualTo(newQueryResultSetLimit);

      jsonString = distributedSystemMXBean.queryData(QUERIES_FOR_LIMIT[0], null, 0);

      verifyJsonIsValid(jsonString);
      assertThat(jsonString).contains("result").doesNotContain("No Data Found");

      jsonObject = new ObjectMapper().readTree(jsonString);
      assertThat(jsonString).contains(BIG_COLLECTION_ELEMENT_);

      jsonArray = jsonObject.get("result");
      assertThat(jsonArray.size()).isEqualTo(newQueryResultSetLimit);

      // Get the ObjectValue
      collectionObject = jsonArray.get(0).get(1);
      assertThat(collectionObject.size()).isEqualTo(newQueryCollectionDepth);
    });
  }

  @Test
  public void testErrors() {
    managerVM.invoke(testName.getMethodName(), () -> {
      DistributedSystemMXBean distributedSystemMXBean =
          managementTestRule.getSystemManagementService().getDistributedSystemMXBean();

      String invalidQuery = "SELECT * FROM " + PARTITIONED_REGION_NAME1;
      String invalidQueryResult = distributedSystemMXBean.queryData(invalidQuery, null, 2);
      assertThat(invalidQueryResult,
          isJson(
              withJsonPath("$.message", equalTo(String.format("Query is invalid due to error : %s",
                  "Region mentioned in query probably missing " + SEPARATOR)))));

      String nonexistentRegionName = testName.getMethodName() + "_NONEXISTENT_REGION";
      String regionsNotFoundQuery = "SELECT * FROM " + SEPARATOR + nonexistentRegionName
          + " r1, PARTITIONED_REGION_NAME2 r2 WHERE r1.ID = r2.ID";
      String regionsNotFoundResult =
          distributedSystemMXBean.queryData(regionsNotFoundQuery, null, 2);
      assertThat(regionsNotFoundResult, isJson(withJsonPath("$.message",
          equalTo(String.format("Cannot find regions %s in any of the members",
              SEPARATOR + nonexistentRegionName)))));

      String regionName = testName.getMethodName() + "_REGION";
      String regionsNotFoundOnMembersQuery = "SELECT * FROM " + SEPARATOR + regionName;

      RegionFactory regionFactory =
          managementTestRule.getCache().createRegionFactory(RegionShortcut.REPLICATE);
      regionFactory.create(regionName);

      String regionsNotFoundOnMembersResult =
          distributedSystemMXBean.queryData(regionsNotFoundOnMembersQuery, member1.getId(), 2);
      assertThat(regionsNotFoundOnMembersResult, isJson(withJsonPath("$.message",
          equalTo(
              String.format("Cannot find regions %s in specified members",
                  SEPARATOR + regionName)))));

      String joinMissingMembersQuery = QUERIES[1];
      String joinMissingMembersResult =
          distributedSystemMXBean.queryData(joinMissingMembersQuery, null, 2);
      assertThat(joinMissingMembersResult,
          isJson(withJsonPath("$.message", equalTo(
              "Join operation can only be executed on targeted members, please give member input"))));
    });
  }

  @Test
  public void testNormalRegions() {
    managerVM.invoke(testName.getMethodName(), () -> {
      DistributedSystemMXBean distributedSystemMXBean =
          managementTestRule.getSystemManagementService().getDistributedSystemMXBean();

      String normalRegionName1 = testName.getMethodName() + "_NORMAL_REGION_1";
      String tempRegionName1 = testName.getMethodName() + "_TEMP_REGION_1";

      // to Reverse order of regions while getting Random region in QueryDataFunction [?]
      String normalRegionName2 = testName.getMethodName() + "_NORMAL_REGION_2";
      String tempRegionName2 = testName.getMethodName() + "_TEMP_REGION_2";

      Cache cache = managementTestRule.getCache();

      RegionFactory regionFactory = cache.createRegionFactory(RegionShortcut.LOCAL_HEAP_LRU);
      regionFactory.create(normalRegionName1);
      regionFactory.create(normalRegionName2);

      Region region = cache.getRegion(SEPARATOR + normalRegionName1);
      assertThat(region.getAttributes().getDataPolicy()).isEqualTo(DataPolicy.NORMAL);

      RegionFactory regionFactory1 = cache.createRegionFactory(RegionShortcut.REPLICATE);
      regionFactory1.create(tempRegionName1);
      regionFactory1.create(tempRegionName2);

      String query1 =
          "SELECT * FROM " + SEPARATOR + tempRegionName1 + " r1, " + SEPARATOR + normalRegionName1
              + " r2 WHERE r1.ID = r2.ID";
      String query2 =
          "SELECT * FROM " + SEPARATOR + normalRegionName2 + " r1, " + SEPARATOR + tempRegionName2
              + " r2 WHERE r1.ID = r2.ID";
      String query3 = "SELECT * FROM " + SEPARATOR + normalRegionName2;

      distributedSystemMXBean.queryDataForCompressedResult(query1, null, 2);
      distributedSystemMXBean.queryDataForCompressedResult(query2, null, 2);
      distributedSystemMXBean.queryDataForCompressedResult(query3, null, 2);

      // TODO: assert results of queryDataForCompressedResult?
    });
  }

  @Test
  public void testRegionsLocalDataSet() {
    String partitionedRegionName = testName.getMethodName() + "_PARTITIONED_REGION";

    String[] values1 = new String[] {"val1", "val2", "val3"};
    String[] values2 = new String[] {"val4", "val5", "val6"};

    memberVMs[0].invoke(testName.getMethodName() + " Create Region", () -> {
      PartitionAttributesFactory<Date, Object> partitionAttributesFactory =
          new PartitionAttributesFactory<>();
      partitionAttributesFactory.setRedundantCopies(2).setTotalNumBuckets(12);

      List<FixedPartitionAttributes> fixedPartitionAttributesList = createFixedPartitionList(1);
      for (FixedPartitionAttributes fixedPartitionAttributes : fixedPartitionAttributesList) {
        partitionAttributesFactory.addFixedPartitionAttributes(fixedPartitionAttributes);
      }
      partitionAttributesFactory.setPartitionResolver(new SingleHopQuarterPartitionResolver());

      RegionFactory<Date, Object> regionFactory =
          managementTestRule.getCache().<Date, Object>createRegionFactory(RegionShortcut.PARTITION)
              .setPartitionAttributes(partitionAttributesFactory.create());
      Region<Date, Object> region = regionFactory.create(partitionedRegionName);

      for (int i = 0; i < values1.length; i++) {
        region.put(getDate(2013, 1, i + 5), values1[i]);
      }
    });

    memberVMs[1].invoke(testName.getMethodName() + " Create Region", () -> {
      PartitionAttributesFactory<Date, Object> partitionAttributesFactory =
          new PartitionAttributesFactory<>();
      partitionAttributesFactory.setRedundantCopies(2).setTotalNumBuckets(12);

      List<FixedPartitionAttributes> fixedPartitionAttributesList = createFixedPartitionList(2);
      for (FixedPartitionAttributes fixedPartitionAttributes : fixedPartitionAttributesList) {
        partitionAttributesFactory.addFixedPartitionAttributes(fixedPartitionAttributes);
      }
      partitionAttributesFactory.setPartitionResolver(new SingleHopQuarterPartitionResolver());

      RegionFactory<Date, Object> regionFactory =
          managementTestRule.getCache().<Date, Object>createRegionFactory(RegionShortcut.PARTITION)
              .setPartitionAttributes(partitionAttributesFactory.create());
      Region<Date, Object> region = regionFactory.create(partitionedRegionName);

      for (int i = 0; i < values2.length; i++) {
        region.put(getDate(2013, 5, i + 5), values2[i]);
      }
    });

    memberVMs[2].invoke(testName.getMethodName() + " Create Region", () -> {
      PartitionAttributesFactory<Date, Object> partitionAttributesFactory =
          new PartitionAttributesFactory<>();
      partitionAttributesFactory.setRedundantCopies(2).setTotalNumBuckets(12);

      List<FixedPartitionAttributes> fixedPartitionAttributesList = createFixedPartitionList(3);
      fixedPartitionAttributesList.forEach(partitionAttributesFactory::addFixedPartitionAttributes);
      partitionAttributesFactory.setPartitionResolver(new SingleHopQuarterPartitionResolver());

      RegionFactory<Date, Object> regionFactory =
          managementTestRule.getCache().<Date, Object>createRegionFactory(RegionShortcut.PARTITION)
              .setPartitionAttributes(partitionAttributesFactory.create());
      regionFactory.create(partitionedRegionName);
    });

    List<String> member1RealData =
        memberVMs[0].invoke(() -> getLocalDataSet(partitionedRegionName));
    List<String> member2RealData =
        memberVMs[1].invoke(() -> getLocalDataSet(partitionedRegionName));
    List<String> member3RealData =
        memberVMs[2].invoke(() -> getLocalDataSet(partitionedRegionName));

    managerVM.invoke(testName.getMethodName(), () -> {
      DistributedSystemMXBean distributedSystemMXBean =
          managementTestRule.getSystemManagementService().getDistributedSystemMXBean();
      DistributedRegionMXBean distributedRegionMXBean =
          awaitDistributedRegionMXBean(SEPARATOR + partitionedRegionName, 3);

      String alias = "Waiting for all entries to get reflected at managing node";
      int expectedEntryCount = values1.length + values2.length;
      await(alias)
          .untilAsserted(() -> assertThat(distributedRegionMXBean.getSystemRegionEntryCount())
              .isEqualTo(expectedEntryCount));

      String query = "Select * from " + SEPARATOR + partitionedRegionName;

      String member1Result = distributedSystemMXBean.queryData(query, member1.getId(), 0);
      verifyJsonIsValid(member1Result);

      String member2Result = distributedSystemMXBean.queryData(query, member2.getId(), 0);
      verifyJsonIsValid(member2Result);

      String member3Result = distributedSystemMXBean.queryData(query, member3.getId(), 0);
      verifyJsonIsValid(member3Result);

      for (String val : member1RealData) {
        assertThat(member1Result).contains(val);
      }

      for (String val : member2RealData) {
        assertThat(member2Result).contains(val);
      }

      assertThat(member3Result).contains("No Data Found");
    });
  }

  private Date getDate(final int year, final int month, final int date) {
    Calendar calendar = Calendar.getInstance();
    calendar.set(year, month, date);
    return calendar.getTime();
  }

  private void verifyJsonIsValid(final String jsonString) {
    assertThat(jsonString, isJson());
    assertThat(jsonString, hasJsonPath("$.result"));
  }

  private void putDataInRegion(final String regionName, final Object[] portfolio, final int from,
      final int to) {
    Region<Integer, Object> region = managementTestRule.getCache().getRegion(regionName);
    for (int i = from; i < to; i++) {
      region.put(i, portfolio[i]);
    }
  }

  private void generateValuesInRegions() {
    int COUNT_DESTINATION = 30;
    int COUNT_FROM = 0;

    // Create common Portfolios and NewPortfolios
    Portfolio[] portfolio = createPortfoliosAndPositions(COUNT_DESTINATION);

    // Fill local region
    memberVMs[0]
        .invoke(() -> putDataInRegion(LOCAL_REGION_NAME, portfolio, COUNT_FROM, COUNT_DESTINATION));

    // Fill replicated region
    memberVMs[0].invoke(
        () -> putDataInRegion(REPLICATE_REGION_NAME1, portfolio, COUNT_FROM, COUNT_DESTINATION));
    memberVMs[1].invoke(
        () -> putDataInRegion(REPLICATE_REGION_NAME2, portfolio, COUNT_FROM, COUNT_DESTINATION));

    // Fill Partition Region
    memberVMs[0].invoke(
        () -> putDataInRegion(PARTITIONED_REGION_NAME1, portfolio, COUNT_FROM, COUNT_DESTINATION));
    memberVMs[0].invoke(
        () -> putDataInRegion(PARTITIONED_REGION_NAME2, portfolio, COUNT_FROM, COUNT_DESTINATION));
    memberVMs[0].invoke(
        () -> putDataInRegion(PARTITIONED_REGION_NAME3, portfolio, COUNT_FROM, COUNT_DESTINATION));
    memberVMs[0].invoke(
        () -> putDataInRegion(PARTITIONED_REGION_NAME4, portfolio, COUNT_FROM, COUNT_DESTINATION));
    memberVMs[0].invoke(
        () -> putDataInRegion(PARTITIONED_REGION_NAME5, portfolio, COUNT_FROM, COUNT_DESTINATION));

    memberVMs[0].invoke(() -> putPdxInstances(REPLICATE_REGION_NAME3));
  }

  private void putPdxInstances(final String regionName) throws CacheException {
    InternalCache cache = managementTestRule.getCache();
    Region<String, PdxInstance> region = cache.getRegion(regionName);

    PdxInstanceFactory pdxInstanceFactory =
        PdxInstanceFactoryImpl.newCreator("Portfolio", false, cache);
    pdxInstanceFactory.writeInt("ID", 111);
    pdxInstanceFactory.writeString("status", "active");
    pdxInstanceFactory.writeString("secId", "IBM");
    PdxInstance pdxInstance = pdxInstanceFactory.create();
    region.put("IBM", pdxInstance);

    pdxInstanceFactory = PdxInstanceFactoryImpl.newCreator("Portfolio", false, cache);
    pdxInstanceFactory.writeInt("ID", 222);
    pdxInstanceFactory.writeString("status", "inactive");
    pdxInstanceFactory.writeString("secId", "YHOO");
    pdxInstance = pdxInstanceFactory.create();
    region.put("YHOO", pdxInstance);

    pdxInstanceFactory = PdxInstanceFactoryImpl.newCreator("Portfolio", false, cache);
    pdxInstanceFactory.writeInt("ID", 333);
    pdxInstanceFactory.writeString("status", "active");
    pdxInstanceFactory.writeString("secId", "GOOGL");
    pdxInstance = pdxInstanceFactory.create();
    region.put("GOOGL", pdxInstance);

    pdxInstanceFactory = PdxInstanceFactoryImpl.newCreator("Portfolio", false, cache);
    pdxInstanceFactory.writeInt("ID", 111);
    pdxInstanceFactory.writeString("status", "inactive");
    pdxInstanceFactory.writeString("secId", "VMW");
    pdxInstance = pdxInstanceFactory.create();
    region.put("VMW", pdxInstance);
  }

  private void putBigInstances(final String regionName) {
    Region<String, List<String>> region = managementTestRule.getCache().getRegion(regionName);

    for (int i = 0; i < 1200; i++) {
      List<String> bigCollection = new ArrayList<>();
      for (int j = 0; j < 200; j++) {
        bigCollection.add(BIG_COLLECTION_ELEMENT_ + j);
      }
      region.put(BIG_COLLECTION_ + i, bigCollection);
    }
  }

  private void createLocalRegion() {
    managementTestRule.getCache().createRegionFactory(RegionShortcut.LOCAL)
        .create(LOCAL_REGION_NAME);
  }

  private void createReplicatedRegion() {
    managementTestRule.getCache().createRegionFactory(RegionShortcut.REPLICATE)
        .create(REPLICATE_REGION_NAME1);
  }

  private void createColocatedPR() {
    PartitionResolver<Integer, Object> testKeyBasedResolver = new TestPartitionResolver();
    managementTestRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
        .setPartitionAttributes(
            new PartitionAttributesFactory<Integer, Object>().setTotalNumBuckets(NUM_OF_BUCKETS)
                .setPartitionResolver(testKeyBasedResolver).create())
        .create(PARTITIONED_REGION_NAME1);
    managementTestRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
        .setPartitionAttributes(
            new PartitionAttributesFactory<Integer, Object>().setTotalNumBuckets(NUM_OF_BUCKETS)
                .setPartitionResolver(testKeyBasedResolver)
                .setColocatedWith(PARTITIONED_REGION_NAME1)
                .create())
        .create(PARTITIONED_REGION_NAME2);
    managementTestRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
        .setPartitionAttributes(
            new PartitionAttributesFactory<Integer, Object>().setTotalNumBuckets(NUM_OF_BUCKETS)
                .setPartitionResolver(testKeyBasedResolver)
                .setColocatedWith(PARTITIONED_REGION_NAME2)
                .create())
        .create(PARTITIONED_REGION_NAME3);
    managementTestRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
        .setPartitionAttributes(
            new PartitionAttributesFactory<Integer, Object>().setTotalNumBuckets(NUM_OF_BUCKETS)
                .setPartitionResolver(testKeyBasedResolver).create())
        .create(PARTITIONED_REGION_NAME4); // not collocated
    managementTestRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
        .setPartitionAttributes(
            new PartitionAttributesFactory<Integer, Object>().setTotalNumBuckets(NUM_OF_BUCKETS)
                .setPartitionResolver(testKeyBasedResolver)
                .setColocatedWith(PARTITIONED_REGION_NAME4)
                .create())
        .create(PARTITIONED_REGION_NAME5); // collocated with 4
  }

  private void createDistributedRegion(final String regionName) {
    managementTestRule.getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
  }

  private void createRegionsInNodes() {
    // Create local Region on servers
    memberVMs[0].invoke(this::createLocalRegion);

    // Create ReplicatedRegion on servers
    memberVMs[0].invoke(this::createReplicatedRegion);
    memberVMs[1].invoke(this::createReplicatedRegion);
    memberVMs[2].invoke(this::createReplicatedRegion);

    memberVMs[1].invoke(() -> createDistributedRegion(REPLICATE_REGION_NAME2));
    memberVMs[0].invoke(() -> createDistributedRegion(REPLICATE_REGION_NAME3));
    memberVMs[0].invoke(() -> createDistributedRegion(REPLICATE_REGION_NAME4));

    // Create two co-located PartitionedRegions On Servers.
    memberVMs[0].invoke(this::createColocatedPR);
    memberVMs[1].invoke(this::createColocatedPR);
    memberVMs[2].invoke(this::createColocatedPR);

    managerVM.invoke("Wait for all Region Proxies to get replicated", () -> {
      awaitDistributedRegionMXBean(SEPARATOR + PARTITIONED_REGION_NAME1, 3);
      awaitDistributedRegionMXBean(SEPARATOR + PARTITIONED_REGION_NAME2, 3);
      awaitDistributedRegionMXBean(SEPARATOR + PARTITIONED_REGION_NAME3, 3);
      awaitDistributedRegionMXBean(SEPARATOR + PARTITIONED_REGION_NAME4, 3);
      awaitDistributedRegionMXBean(SEPARATOR + PARTITIONED_REGION_NAME5, 3);
      awaitDistributedRegionMXBean(SEPARATOR + REPLICATE_REGION_NAME1, 3);
      awaitDistributedRegionMXBean(SEPARATOR + REPLICATE_REGION_NAME2, 1);
      awaitDistributedRegionMXBean(SEPARATOR + REPLICATE_REGION_NAME3, 1);
      awaitDistributedRegionMXBean(SEPARATOR + REPLICATE_REGION_NAME4, 1);
    });
  }

  @SuppressWarnings("unchecked")
  private List<String> getLocalDataSet(final String region) {
    PartitionedRegion partitionedRegion =
        PartitionedRegionHelper.getPartitionedRegion(region, managementTestRule.getCache());
    assertThat(partitionedRegion).isNotNull();

    Set<BucketRegion> localPrimaryBucketRegions =
        partitionedRegion.getDataStore().getAllLocalPrimaryBucketRegions();

    List<String> allPrimaryValues = new ArrayList<>();

    for (BucketRegion bucketRegion : localPrimaryBucketRegions) {
      for (Object value : bucketRegion.values()) {
        allPrimaryValues.add((String) value);
      }
    }

    return allPrimaryValues;
  }

  private List<FixedPartitionAttributes> createFixedPartitionList(final int primaryIndex) {
    List<FixedPartitionAttributes> fixedPartitionAttributesList = new ArrayList<>();
    if (primaryIndex == 1) {
      fixedPartitionAttributesList.add(createFixedPartition("Q1", true, 3));
      fixedPartitionAttributesList.add(createFixedPartition("Q2", 3));
      fixedPartitionAttributesList.add(createFixedPartition("Q3", 3));
    }
    if (primaryIndex == 2) {
      fixedPartitionAttributesList.add(createFixedPartition("Q1", 3));
      fixedPartitionAttributesList.add(createFixedPartition("Q2", true, 3));
      fixedPartitionAttributesList.add(createFixedPartition("Q3", 3));
    }
    if (primaryIndex == 3) {
      fixedPartitionAttributesList.add(createFixedPartition("Q1", 3));
      fixedPartitionAttributesList.add(createFixedPartition("Q2", 3));
      fixedPartitionAttributesList.add(createFixedPartition("Q3", true, 3));
    }
    return fixedPartitionAttributesList;
  }

  private MemberMXBean awaitMemberMXBeanProxy(final DistributedMember member) {
    SystemManagementService service = managementTestRule.getSystemManagementService();
    ObjectName objectName = service.getMemberMBeanName(member);
    String alias = "awaiting MemberMXBean proxy for " + member;

    await(alias)
        .untilAsserted(
            () -> assertThat(service.getMBeanProxy(objectName, MemberMXBean.class)).isNotNull());

    return service.getMBeanProxy(objectName, MemberMXBean.class);
  }

  private DistributedSystemMXBean awaitDistributedSystemMXBean() {
    SystemManagementService service = managementTestRule.getSystemManagementService();

    await().untilAsserted(() -> assertThat(service.getDistributedSystemMXBean()).isNotNull());

    return service.getDistributedSystemMXBean();
  }

  private DistributedRegionMXBean awaitDistributedRegionMXBean(final String name) {
    SystemManagementService service = managementTestRule.getSystemManagementService();

    await().untilAsserted(() -> assertThat(service.getDistributedRegionMXBean(name)).isNotNull());

    return service.getDistributedRegionMXBean(name);
  }

  private DistributedRegionMXBean awaitDistributedRegionMXBean(final String name,
      final int memberCount) {
    SystemManagementService service = managementTestRule.getSystemManagementService();

    await().untilAsserted(() -> assertThat(service.getDistributedRegionMXBean(name)).isNotNull());
    await()
        .untilAsserted(() -> assertThat(service.getDistributedRegionMXBean(name).getMemberCount())
            .isEqualTo(memberCount));

    return service.getDistributedRegionMXBean(name);
  }

  private static class TestPartitionResolver implements PartitionResolver<Integer, Object> {

    @Override
    public void close() {}

    @Override
    public Serializable getRoutingObject(EntryOperation opDetails) {
      return ((Integer) opDetails.getKey() % NUM_OF_BUCKETS);
    }

    @Override
    public String getName() {
      return getClass().getName();
    }
  }
}
