| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.management; |
| |
| import static com.jayway.jsonpath.matchers.JsonPathMatchers.hasJsonPath; |
| import static com.jayway.jsonpath.matchers.JsonPathMatchers.isJson; |
| import static com.jayway.jsonpath.matchers.JsonPathMatchers.withJsonPath; |
| import static org.apache.geode.cache.FixedPartitionAttributes.createFixedPartition; |
| import static org.apache.geode.cache.Region.SEPARATOR; |
| import static org.apache.geode.cache.query.Utils.createPortfoliosAndPositions; |
| import static org.apache.geode.management.internal.ManagementConstants.DEFAULT_QUERY_LIMIT; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.hamcrest.Matchers.anything; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.junit.Assert.assertThat; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Calendar; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Set; |
| |
| import javax.management.ObjectName; |
| |
| import com.fasterxml.jackson.databind.JsonNode; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.EntryOperation; |
| import org.apache.geode.cache.FixedPartitionAttributes; |
| import org.apache.geode.cache.PartitionAttributesFactory; |
| import org.apache.geode.cache.PartitionResolver; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionFactory; |
| import org.apache.geode.cache.RegionShortcut; |
| import org.apache.geode.cache.query.data.Portfolio; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.internal.cache.BucketRegion; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.PartitionedRegion; |
| import org.apache.geode.internal.cache.PartitionedRegionHelper; |
| import org.apache.geode.internal.cache.partitioned.fixed.SingleHopQuarterPartitionResolver; |
| import org.apache.geode.management.internal.SystemManagementService; |
| import org.apache.geode.management.internal.beans.BeanUtilFuncs; |
| import org.apache.geode.management.internal.beans.QueryDataFunction; |
| import org.apache.geode.pdx.PdxInstance; |
| import org.apache.geode.pdx.PdxInstanceFactory; |
| import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.rules.DistributedUseJacksonForJsonPathRule; |
| import org.apache.geode.test.junit.rules.serializable.SerializableTestName; |
| |
| /** |
| * Distributed tests for DistributedSystemMXBean#queryData(String, String, int). |
| * </p> |
| * |
| * <pre> |
| * Test Basic Json Strings for Partitioned Regions |
| * Test Basic Json Strings for Replicated Regions |
| * Test for all Region Types |
| * Test for primitive types |
| * Test for Nested Objects |
| * Test for Enums |
| * Test for collections |
| * Test for huge collection |
| * Test PDX types |
| * Test different projects type e.g. SelectResult, normal bean etc.. |
| * Test Colocated Regions |
| * Test for Limit ( both row count and Depth) |
| * ORDER by orders |
| * Test all attributes are covered in an complex type |
| * </pre> |
| */ |
| |
| @SuppressWarnings({"serial", "unused"}) |
| public class QueryDataDUnitTest implements Serializable { |
| |
| private static final int NUM_OF_BUCKETS = 20; |
| |
| // PARTITIONED_REGION_NAME5 is co-located with PARTITIONED_REGION_NAME4 |
| private static final String PARTITIONED_REGION_NAME1 = "PARTITIONED_REGION_NAME1"; |
| private static final String PARTITIONED_REGION_NAME2 = "PARTITIONED_REGION_NAME2"; |
| private static final String PARTITIONED_REGION_NAME3 = "PARTITIONED_REGION_NAME3"; |
| private static final String PARTITIONED_REGION_NAME4 = "PARTITIONED_REGION_NAME4"; |
| private static final String PARTITIONED_REGION_NAME5 = "PARTITIONED_REGION_NAME5"; |
| |
| private static final String REPLICATE_REGION_NAME1 = "REPLICATE_REGION_NAME1"; |
| private static final String REPLICATE_REGION_NAME2 = "REPLICATE_REGION_NAME2"; |
| private static final String REPLICATE_REGION_NAME3 = "REPLICATE_REGION_NAME3"; |
| private static final String REPLICATE_REGION_NAME4 = "REPLICATE_REGION_NAME4"; |
| |
| private static final String LOCAL_REGION_NAME = "LOCAL_REGION_NAME"; |
| |
| private static final String BIG_COLLECTION_ELEMENT_ = "BIG_COLLECTION_ELEMENT_"; |
| private static final String BIG_COLLECTION_ = "BIG_COLLECTION_"; |
| |
| private static final String[] QUERIES = |
| new String[] {"SELECT * FROM " + SEPARATOR + PARTITIONED_REGION_NAME1 + " WHERE ID >= 0", |
| "SELECT * FROM " + SEPARATOR + PARTITIONED_REGION_NAME1 + " r1, " + SEPARATOR |
| + PARTITIONED_REGION_NAME2 |
| + " r2 WHERE r1.ID = r2.ID", |
| "SELECT * FROM " + SEPARATOR + PARTITIONED_REGION_NAME1 + " r1, " + SEPARATOR |
| + PARTITIONED_REGION_NAME2 |
| + " r2 WHERE r1.ID = r2.ID AND r1.status = r2.status", |
| "SELECT * FROM " + SEPARATOR + PARTITIONED_REGION_NAME1 + " r1, " + SEPARATOR |
| + PARTITIONED_REGION_NAME2 |
| + " r2, " + SEPARATOR + PARTITIONED_REGION_NAME3 |
| + " r3 WHERE r1.ID = r2.ID AND r2.ID = r3.ID", |
| "SELECT * FROM " + SEPARATOR + PARTITIONED_REGION_NAME1 + " r1, " + SEPARATOR |
| + PARTITIONED_REGION_NAME2 |
| + " r2, " + SEPARATOR + PARTITIONED_REGION_NAME3 + " r3, " + SEPARATOR |
| + REPLICATE_REGION_NAME1 |
| + " r4 WHERE r1.ID = r2.ID AND r2.ID = r3.ID AND r3.ID = r4.ID", |
| "SELECT * FROM " + SEPARATOR + PARTITIONED_REGION_NAME4 + " r4, " + SEPARATOR |
| + PARTITIONED_REGION_NAME5 |
| + " r5 WHERE r4.ID = r5.ID"}; |
| |
| private static final String[] QUERIES_FOR_REPLICATED = |
| new String[] { |
| "<TRACE> SELECT * FROM " + SEPARATOR + REPLICATE_REGION_NAME1 + " WHERE ID >= 0", |
| "SELECT * FROM " + SEPARATOR + REPLICATE_REGION_NAME1 + " r1, " + SEPARATOR |
| + REPLICATE_REGION_NAME2 |
| + " r2 WHERE r1.ID = r2.ID", |
| "SELECT * FROM " + SEPARATOR + REPLICATE_REGION_NAME3 + " WHERE ID >= 0"}; |
| |
| private static final String[] QUERIES_FOR_LIMIT = |
| new String[] {"SELECT * FROM " + SEPARATOR + REPLICATE_REGION_NAME4}; |
| |
| private DistributedMember member1; |
| private DistributedMember member2; |
| private DistributedMember member3; |
| |
| @Member |
| private VM[] memberVMs; |
| |
| @Manager |
| private VM managerVM; |
| |
| @Rule |
| public DistributedUseJacksonForJsonPathRule useJacksonForJsonPathRule = |
| new DistributedUseJacksonForJsonPathRule(); |
| |
| @Rule |
| public ManagementTestRule managementTestRule = |
| ManagementTestRule.builder().defineManagersFirst(false).start(true).build(); |
| |
| @Rule |
| public SerializableTestName testName = new SerializableTestName(); |
| |
| @Before |
| public void before() throws Exception { |
| member1 = managementTestRule.getDistributedMember(memberVMs[0]); |
| member2 = managementTestRule.getDistributedMember(memberVMs[1]); |
| member3 = managementTestRule.getDistributedMember(memberVMs[2]); |
| |
| createRegionsInNodes(); |
| generateValuesInRegions(); |
| } |
| |
| @Test |
| public void testQueryOnPartitionedRegion() { |
| managerVM.invoke(testName.getMethodName(), () -> { |
| DistributedSystemMXBean distributedSystemMXBean = |
| managementTestRule.getSystemManagementService().getDistributedSystemMXBean(); |
| |
| String jsonString = distributedSystemMXBean.queryData(QUERIES[0], null, 10); |
| assertThat(jsonString).contains("result").doesNotContain("No Data Found"); |
| |
| for (int i = 0; i < QUERIES.length; i++) { |
| jsonString = distributedSystemMXBean.queryData(QUERIES[i], member1.getId(), 10); |
| assertThat(jsonString).contains("result"); |
| assertThat(jsonString).contains("member"); |
| assertThat("QUERIES[" + i + "]", jsonString, isJson(withJsonPath("$..result", anything()))); |
| |
| // TODO: create better assertions |
| // assertThat("QUERIES[" + i + "]", result, |
| // isJson(withJsonPath("$..member", |
| // equalTo(JsonPath.compile(result))))); |
| // //equalTo(new JSONObject().put(String.class.getName(), member1.getId()))))); |
| // System.out.println(JsonPath.read(jsonString, "$.result.*")); |
| // System.out.println(JsonPath.read(jsonString, "$['result']['member']")); |
| |
| verifyJsonIsValid(jsonString); |
| } |
| }); |
| } |
| |
| @Test |
| public void testQueryOnReplicatedRegion() { |
| managerVM.invoke(testName.getMethodName(), () -> { |
| DistributedSystemMXBean distributedSystemMXBean = |
| managementTestRule.getSystemManagementService().getDistributedSystemMXBean(); |
| |
| String jsonString = distributedSystemMXBean.queryData(QUERIES_FOR_REPLICATED[0], null, 10); |
| assertThat(jsonString).contains("result").doesNotContain("No Data Found"); |
| |
| for (int i = 0; i < QUERIES_FOR_REPLICATED.length; i++) { |
| assertThat(jsonString).contains("result"); |
| verifyJsonIsValid(jsonString); |
| } |
| }); |
| } |
| |
| @Test |
| public void testMemberWise() { |
| managerVM.invoke(testName.getMethodName(), () -> { |
| DistributedSystemMXBean distributedSystemMXBean = |
| managementTestRule.getSystemManagementService().getDistributedSystemMXBean(); |
| |
| byte[] bytes = distributedSystemMXBean.queryDataForCompressedResult(QUERIES_FOR_REPLICATED[0], |
| member1.getId() + "," + member2.getId(), 2); |
| String jsonString = BeanUtilFuncs.decompress(bytes); |
| |
| verifyJsonIsValid(jsonString); |
| }); |
| } |
| |
| @Test |
| public void testLimitForQuery() { |
| memberVMs[0].invoke("putBigInstances", () -> putBigInstances(REPLICATE_REGION_NAME4)); |
| |
| managerVM.invoke(testName.getMethodName(), () -> { |
| DistributedSystemMXBean distributedSystemMXBean = |
| managementTestRule.getSystemManagementService().getDistributedSystemMXBean(); |
| |
| // Query With Default values |
| assertThat(distributedSystemMXBean.getQueryCollectionsDepth()) |
| .isEqualTo(QueryDataFunction.DEFAULT_COLLECTION_ELEMENT_LIMIT); |
| assertThat(distributedSystemMXBean.getQueryResultSetLimit()).isEqualTo(DEFAULT_QUERY_LIMIT); |
| |
| String jsonString = distributedSystemMXBean.queryData(QUERIES_FOR_LIMIT[0], null, 0); |
| |
| verifyJsonIsValid(jsonString); |
| assertThat(jsonString).contains("result").doesNotContain("No Data Found"); |
| assertThat(jsonString).contains(BIG_COLLECTION_ELEMENT_); |
| |
| JsonNode jsonObject = new ObjectMapper().readTree(jsonString); |
| JsonNode jsonArray = jsonObject.get("result"); |
| assertThat(jsonArray.size()).isEqualTo(DEFAULT_QUERY_LIMIT); |
| |
| // Get the ObjectValue |
| JsonNode collectionObject = jsonArray.get(0).get(1); |
| assertThat(collectionObject.size()).isEqualTo(100); |
| |
| // Query With Override Values |
| int newQueryCollectionDepth = 150; |
| int newQueryResultSetLimit = 500; |
| |
| distributedSystemMXBean.setQueryCollectionsDepth(newQueryCollectionDepth); |
| distributedSystemMXBean.setQueryResultSetLimit(newQueryResultSetLimit); |
| |
| assertThat(distributedSystemMXBean.getQueryCollectionsDepth()) |
| .isEqualTo(newQueryCollectionDepth); |
| assertThat(distributedSystemMXBean.getQueryResultSetLimit()) |
| .isEqualTo(newQueryResultSetLimit); |
| |
| jsonString = distributedSystemMXBean.queryData(QUERIES_FOR_LIMIT[0], null, 0); |
| |
| verifyJsonIsValid(jsonString); |
| assertThat(jsonString).contains("result").doesNotContain("No Data Found"); |
| |
| jsonObject = new ObjectMapper().readTree(jsonString); |
| assertThat(jsonString).contains(BIG_COLLECTION_ELEMENT_); |
| |
| jsonArray = jsonObject.get("result"); |
| assertThat(jsonArray.size()).isEqualTo(newQueryResultSetLimit); |
| |
| // Get the ObjectValue |
| collectionObject = jsonArray.get(0).get(1); |
| assertThat(collectionObject.size()).isEqualTo(newQueryCollectionDepth); |
| }); |
| } |
| |
| @Test |
| public void testErrors() { |
| managerVM.invoke(testName.getMethodName(), () -> { |
| DistributedSystemMXBean distributedSystemMXBean = |
| managementTestRule.getSystemManagementService().getDistributedSystemMXBean(); |
| |
| String invalidQuery = "SELECT * FROM " + PARTITIONED_REGION_NAME1; |
| String invalidQueryResult = distributedSystemMXBean.queryData(invalidQuery, null, 2); |
| assertThat(invalidQueryResult, |
| isJson( |
| withJsonPath("$.message", equalTo(String.format("Query is invalid due to error : %s", |
| "Region mentioned in query probably missing " + SEPARATOR))))); |
| |
| String nonexistentRegionName = testName.getMethodName() + "_NONEXISTENT_REGION"; |
| String regionsNotFoundQuery = "SELECT * FROM " + SEPARATOR + nonexistentRegionName |
| + " r1, PARTITIONED_REGION_NAME2 r2 WHERE r1.ID = r2.ID"; |
| String regionsNotFoundResult = |
| distributedSystemMXBean.queryData(regionsNotFoundQuery, null, 2); |
| assertThat(regionsNotFoundResult, isJson(withJsonPath("$.message", |
| equalTo(String.format("Cannot find regions %s in any of the members", |
| SEPARATOR + nonexistentRegionName))))); |
| |
| String regionName = testName.getMethodName() + "_REGION"; |
| String regionsNotFoundOnMembersQuery = "SELECT * FROM " + SEPARATOR + regionName; |
| |
| RegionFactory regionFactory = |
| managementTestRule.getCache().createRegionFactory(RegionShortcut.REPLICATE); |
| regionFactory.create(regionName); |
| |
| String regionsNotFoundOnMembersResult = |
| distributedSystemMXBean.queryData(regionsNotFoundOnMembersQuery, member1.getId(), 2); |
| assertThat(regionsNotFoundOnMembersResult, isJson(withJsonPath("$.message", |
| equalTo( |
| String.format("Cannot find regions %s in specified members", |
| SEPARATOR + regionName))))); |
| |
| String joinMissingMembersQuery = QUERIES[1]; |
| String joinMissingMembersResult = |
| distributedSystemMXBean.queryData(joinMissingMembersQuery, null, 2); |
| assertThat(joinMissingMembersResult, |
| isJson(withJsonPath("$.message", equalTo( |
| "Join operation can only be executed on targeted members, please give member input")))); |
| }); |
| } |
| |
| @Test |
| public void testNormalRegions() { |
| managerVM.invoke(testName.getMethodName(), () -> { |
| DistributedSystemMXBean distributedSystemMXBean = |
| managementTestRule.getSystemManagementService().getDistributedSystemMXBean(); |
| |
| String normalRegionName1 = testName.getMethodName() + "_NORMAL_REGION_1"; |
| String tempRegionName1 = testName.getMethodName() + "_TEMP_REGION_1"; |
| |
| // to Reverse order of regions while getting Random region in QueryDataFunction [?] |
| String normalRegionName2 = testName.getMethodName() + "_NORMAL_REGION_2"; |
| String tempRegionName2 = testName.getMethodName() + "_TEMP_REGION_2"; |
| |
| Cache cache = managementTestRule.getCache(); |
| |
| RegionFactory regionFactory = cache.createRegionFactory(RegionShortcut.LOCAL_HEAP_LRU); |
| regionFactory.create(normalRegionName1); |
| regionFactory.create(normalRegionName2); |
| |
| Region region = cache.getRegion(SEPARATOR + normalRegionName1); |
| assertThat(region.getAttributes().getDataPolicy()).isEqualTo(DataPolicy.NORMAL); |
| |
| RegionFactory regionFactory1 = cache.createRegionFactory(RegionShortcut.REPLICATE); |
| regionFactory1.create(tempRegionName1); |
| regionFactory1.create(tempRegionName2); |
| |
| String query1 = |
| "SELECT * FROM " + SEPARATOR + tempRegionName1 + " r1, " + SEPARATOR + normalRegionName1 |
| + " r2 WHERE r1.ID = r2.ID"; |
| String query2 = |
| "SELECT * FROM " + SEPARATOR + normalRegionName2 + " r1, " + SEPARATOR + tempRegionName2 |
| + " r2 WHERE r1.ID = r2.ID"; |
| String query3 = "SELECT * FROM " + SEPARATOR + normalRegionName2; |
| |
| distributedSystemMXBean.queryDataForCompressedResult(query1, null, 2); |
| distributedSystemMXBean.queryDataForCompressedResult(query2, null, 2); |
| distributedSystemMXBean.queryDataForCompressedResult(query3, null, 2); |
| |
| // TODO: assert results of queryDataForCompressedResult? |
| }); |
| } |
| |
| @Test |
| public void testRegionsLocalDataSet() { |
| String partitionedRegionName = testName.getMethodName() + "_PARTITIONED_REGION"; |
| |
| String[] values1 = new String[] {"val1", "val2", "val3"}; |
| String[] values2 = new String[] {"val4", "val5", "val6"}; |
| |
| memberVMs[0].invoke(testName.getMethodName() + " Create Region", () -> { |
| PartitionAttributesFactory<Date, Object> partitionAttributesFactory = |
| new PartitionAttributesFactory<>(); |
| partitionAttributesFactory.setRedundantCopies(2).setTotalNumBuckets(12); |
| |
| List<FixedPartitionAttributes> fixedPartitionAttributesList = createFixedPartitionList(1); |
| for (FixedPartitionAttributes fixedPartitionAttributes : fixedPartitionAttributesList) { |
| partitionAttributesFactory.addFixedPartitionAttributes(fixedPartitionAttributes); |
| } |
| partitionAttributesFactory.setPartitionResolver(new SingleHopQuarterPartitionResolver()); |
| |
| RegionFactory<Date, Object> regionFactory = |
| managementTestRule.getCache().<Date, Object>createRegionFactory(RegionShortcut.PARTITION) |
| .setPartitionAttributes(partitionAttributesFactory.create()); |
| Region<Date, Object> region = regionFactory.create(partitionedRegionName); |
| |
| for (int i = 0; i < values1.length; i++) { |
| region.put(getDate(2013, 1, i + 5), values1[i]); |
| } |
| }); |
| |
| memberVMs[1].invoke(testName.getMethodName() + " Create Region", () -> { |
| PartitionAttributesFactory<Date, Object> partitionAttributesFactory = |
| new PartitionAttributesFactory<>(); |
| partitionAttributesFactory.setRedundantCopies(2).setTotalNumBuckets(12); |
| |
| List<FixedPartitionAttributes> fixedPartitionAttributesList = createFixedPartitionList(2); |
| for (FixedPartitionAttributes fixedPartitionAttributes : fixedPartitionAttributesList) { |
| partitionAttributesFactory.addFixedPartitionAttributes(fixedPartitionAttributes); |
| } |
| partitionAttributesFactory.setPartitionResolver(new SingleHopQuarterPartitionResolver()); |
| |
| RegionFactory<Date, Object> regionFactory = |
| managementTestRule.getCache().<Date, Object>createRegionFactory(RegionShortcut.PARTITION) |
| .setPartitionAttributes(partitionAttributesFactory.create()); |
| Region<Date, Object> region = regionFactory.create(partitionedRegionName); |
| |
| for (int i = 0; i < values2.length; i++) { |
| region.put(getDate(2013, 5, i + 5), values2[i]); |
| } |
| }); |
| |
| memberVMs[2].invoke(testName.getMethodName() + " Create Region", () -> { |
| PartitionAttributesFactory<Date, Object> partitionAttributesFactory = |
| new PartitionAttributesFactory<>(); |
| partitionAttributesFactory.setRedundantCopies(2).setTotalNumBuckets(12); |
| |
| List<FixedPartitionAttributes> fixedPartitionAttributesList = createFixedPartitionList(3); |
| fixedPartitionAttributesList.forEach(partitionAttributesFactory::addFixedPartitionAttributes); |
| partitionAttributesFactory.setPartitionResolver(new SingleHopQuarterPartitionResolver()); |
| |
| RegionFactory<Date, Object> regionFactory = |
| managementTestRule.getCache().<Date, Object>createRegionFactory(RegionShortcut.PARTITION) |
| .setPartitionAttributes(partitionAttributesFactory.create()); |
| regionFactory.create(partitionedRegionName); |
| }); |
| |
| List<String> member1RealData = |
| memberVMs[0].invoke(() -> getLocalDataSet(partitionedRegionName)); |
| List<String> member2RealData = |
| memberVMs[1].invoke(() -> getLocalDataSet(partitionedRegionName)); |
| List<String> member3RealData = |
| memberVMs[2].invoke(() -> getLocalDataSet(partitionedRegionName)); |
| |
| managerVM.invoke(testName.getMethodName(), () -> { |
| DistributedSystemMXBean distributedSystemMXBean = |
| managementTestRule.getSystemManagementService().getDistributedSystemMXBean(); |
| DistributedRegionMXBean distributedRegionMXBean = |
| awaitDistributedRegionMXBean(SEPARATOR + partitionedRegionName, 3); |
| |
| String alias = "Waiting for all entries to get reflected at managing node"; |
| int expectedEntryCount = values1.length + values2.length; |
| await(alias) |
| .untilAsserted(() -> assertThat(distributedRegionMXBean.getSystemRegionEntryCount()) |
| .isEqualTo(expectedEntryCount)); |
| |
| String query = "Select * from " + SEPARATOR + partitionedRegionName; |
| |
| String member1Result = distributedSystemMXBean.queryData(query, member1.getId(), 0); |
| verifyJsonIsValid(member1Result); |
| |
| String member2Result = distributedSystemMXBean.queryData(query, member2.getId(), 0); |
| verifyJsonIsValid(member2Result); |
| |
| String member3Result = distributedSystemMXBean.queryData(query, member3.getId(), 0); |
| verifyJsonIsValid(member3Result); |
| |
| for (String val : member1RealData) { |
| assertThat(member1Result).contains(val); |
| } |
| |
| for (String val : member2RealData) { |
| assertThat(member2Result).contains(val); |
| } |
| |
| assertThat(member3Result).contains("No Data Found"); |
| }); |
| } |
| |
| private Date getDate(final int year, final int month, final int date) { |
| Calendar calendar = Calendar.getInstance(); |
| calendar.set(year, month, date); |
| return calendar.getTime(); |
| } |
| |
| private void verifyJsonIsValid(final String jsonString) { |
| assertThat(jsonString, isJson()); |
| assertThat(jsonString, hasJsonPath("$.result")); |
| } |
| |
| private void putDataInRegion(final String regionName, final Object[] portfolio, final int from, |
| final int to) { |
| Region<Integer, Object> region = managementTestRule.getCache().getRegion(regionName); |
| for (int i = from; i < to; i++) { |
| region.put(i, portfolio[i]); |
| } |
| } |
| |
| private void generateValuesInRegions() { |
| int COUNT_DESTINATION = 30; |
| int COUNT_FROM = 0; |
| |
| // Create common Portfolios and NewPortfolios |
| Portfolio[] portfolio = createPortfoliosAndPositions(COUNT_DESTINATION); |
| |
| // Fill local region |
| memberVMs[0] |
| .invoke(() -> putDataInRegion(LOCAL_REGION_NAME, portfolio, COUNT_FROM, COUNT_DESTINATION)); |
| |
| // Fill replicated region |
| memberVMs[0].invoke( |
| () -> putDataInRegion(REPLICATE_REGION_NAME1, portfolio, COUNT_FROM, COUNT_DESTINATION)); |
| memberVMs[1].invoke( |
| () -> putDataInRegion(REPLICATE_REGION_NAME2, portfolio, COUNT_FROM, COUNT_DESTINATION)); |
| |
| // Fill Partition Region |
| memberVMs[0].invoke( |
| () -> putDataInRegion(PARTITIONED_REGION_NAME1, portfolio, COUNT_FROM, COUNT_DESTINATION)); |
| memberVMs[0].invoke( |
| () -> putDataInRegion(PARTITIONED_REGION_NAME2, portfolio, COUNT_FROM, COUNT_DESTINATION)); |
| memberVMs[0].invoke( |
| () -> putDataInRegion(PARTITIONED_REGION_NAME3, portfolio, COUNT_FROM, COUNT_DESTINATION)); |
| memberVMs[0].invoke( |
| () -> putDataInRegion(PARTITIONED_REGION_NAME4, portfolio, COUNT_FROM, COUNT_DESTINATION)); |
| memberVMs[0].invoke( |
| () -> putDataInRegion(PARTITIONED_REGION_NAME5, portfolio, COUNT_FROM, COUNT_DESTINATION)); |
| |
| memberVMs[0].invoke(() -> putPdxInstances(REPLICATE_REGION_NAME3)); |
| } |
| |
| private void putPdxInstances(final String regionName) throws CacheException { |
| InternalCache cache = managementTestRule.getCache(); |
| Region<String, PdxInstance> region = cache.getRegion(regionName); |
| |
| PdxInstanceFactory pdxInstanceFactory = |
| PdxInstanceFactoryImpl.newCreator("Portfolio", false, cache); |
| pdxInstanceFactory.writeInt("ID", 111); |
| pdxInstanceFactory.writeString("status", "active"); |
| pdxInstanceFactory.writeString("secId", "IBM"); |
| PdxInstance pdxInstance = pdxInstanceFactory.create(); |
| region.put("IBM", pdxInstance); |
| |
| pdxInstanceFactory = PdxInstanceFactoryImpl.newCreator("Portfolio", false, cache); |
| pdxInstanceFactory.writeInt("ID", 222); |
| pdxInstanceFactory.writeString("status", "inactive"); |
| pdxInstanceFactory.writeString("secId", "YHOO"); |
| pdxInstance = pdxInstanceFactory.create(); |
| region.put("YHOO", pdxInstance); |
| |
| pdxInstanceFactory = PdxInstanceFactoryImpl.newCreator("Portfolio", false, cache); |
| pdxInstanceFactory.writeInt("ID", 333); |
| pdxInstanceFactory.writeString("status", "active"); |
| pdxInstanceFactory.writeString("secId", "GOOGL"); |
| pdxInstance = pdxInstanceFactory.create(); |
| region.put("GOOGL", pdxInstance); |
| |
| pdxInstanceFactory = PdxInstanceFactoryImpl.newCreator("Portfolio", false, cache); |
| pdxInstanceFactory.writeInt("ID", 111); |
| pdxInstanceFactory.writeString("status", "inactive"); |
| pdxInstanceFactory.writeString("secId", "VMW"); |
| pdxInstance = pdxInstanceFactory.create(); |
| region.put("VMW", pdxInstance); |
| } |
| |
| private void putBigInstances(final String regionName) { |
| Region<String, List<String>> region = managementTestRule.getCache().getRegion(regionName); |
| |
| for (int i = 0; i < 1200; i++) { |
| List<String> bigCollection = new ArrayList<>(); |
| for (int j = 0; j < 200; j++) { |
| bigCollection.add(BIG_COLLECTION_ELEMENT_ + j); |
| } |
| region.put(BIG_COLLECTION_ + i, bigCollection); |
| } |
| } |
| |
| private void createLocalRegion() { |
| managementTestRule.getCache().createRegionFactory(RegionShortcut.LOCAL) |
| .create(LOCAL_REGION_NAME); |
| } |
| |
| private void createReplicatedRegion() { |
| managementTestRule.getCache().createRegionFactory(RegionShortcut.REPLICATE) |
| .create(REPLICATE_REGION_NAME1); |
| } |
| |
| private void createColocatedPR() { |
| PartitionResolver<Integer, Object> testKeyBasedResolver = new TestPartitionResolver(); |
| managementTestRule.getCache().createRegionFactory(RegionShortcut.PARTITION) |
| .setPartitionAttributes( |
| new PartitionAttributesFactory<Integer, Object>().setTotalNumBuckets(NUM_OF_BUCKETS) |
| .setPartitionResolver(testKeyBasedResolver).create()) |
| .create(PARTITIONED_REGION_NAME1); |
| managementTestRule.getCache().createRegionFactory(RegionShortcut.PARTITION) |
| .setPartitionAttributes( |
| new PartitionAttributesFactory<Integer, Object>().setTotalNumBuckets(NUM_OF_BUCKETS) |
| .setPartitionResolver(testKeyBasedResolver) |
| .setColocatedWith(PARTITIONED_REGION_NAME1) |
| .create()) |
| .create(PARTITIONED_REGION_NAME2); |
| managementTestRule.getCache().createRegionFactory(RegionShortcut.PARTITION) |
| .setPartitionAttributes( |
| new PartitionAttributesFactory<Integer, Object>().setTotalNumBuckets(NUM_OF_BUCKETS) |
| .setPartitionResolver(testKeyBasedResolver) |
| .setColocatedWith(PARTITIONED_REGION_NAME2) |
| .create()) |
| .create(PARTITIONED_REGION_NAME3); |
| managementTestRule.getCache().createRegionFactory(RegionShortcut.PARTITION) |
| .setPartitionAttributes( |
| new PartitionAttributesFactory<Integer, Object>().setTotalNumBuckets(NUM_OF_BUCKETS) |
| .setPartitionResolver(testKeyBasedResolver).create()) |
| .create(PARTITIONED_REGION_NAME4); // not collocated |
| managementTestRule.getCache().createRegionFactory(RegionShortcut.PARTITION) |
| .setPartitionAttributes( |
| new PartitionAttributesFactory<Integer, Object>().setTotalNumBuckets(NUM_OF_BUCKETS) |
| .setPartitionResolver(testKeyBasedResolver) |
| .setColocatedWith(PARTITIONED_REGION_NAME4) |
| .create()) |
| .create(PARTITIONED_REGION_NAME5); // collocated with 4 |
| } |
| |
| private void createDistributedRegion(final String regionName) { |
| managementTestRule.getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName); |
| } |
| |
| private void createRegionsInNodes() { |
| // Create local Region on servers |
| memberVMs[0].invoke(this::createLocalRegion); |
| |
| // Create ReplicatedRegion on servers |
| memberVMs[0].invoke(this::createReplicatedRegion); |
| memberVMs[1].invoke(this::createReplicatedRegion); |
| memberVMs[2].invoke(this::createReplicatedRegion); |
| |
| memberVMs[1].invoke(() -> createDistributedRegion(REPLICATE_REGION_NAME2)); |
| memberVMs[0].invoke(() -> createDistributedRegion(REPLICATE_REGION_NAME3)); |
| memberVMs[0].invoke(() -> createDistributedRegion(REPLICATE_REGION_NAME4)); |
| |
| // Create two co-located PartitionedRegions On Servers. |
| memberVMs[0].invoke(this::createColocatedPR); |
| memberVMs[1].invoke(this::createColocatedPR); |
| memberVMs[2].invoke(this::createColocatedPR); |
| |
| managerVM.invoke("Wait for all Region Proxies to get replicated", () -> { |
| awaitDistributedRegionMXBean(SEPARATOR + PARTITIONED_REGION_NAME1, 3); |
| awaitDistributedRegionMXBean(SEPARATOR + PARTITIONED_REGION_NAME2, 3); |
| awaitDistributedRegionMXBean(SEPARATOR + PARTITIONED_REGION_NAME3, 3); |
| awaitDistributedRegionMXBean(SEPARATOR + PARTITIONED_REGION_NAME4, 3); |
| awaitDistributedRegionMXBean(SEPARATOR + PARTITIONED_REGION_NAME5, 3); |
| awaitDistributedRegionMXBean(SEPARATOR + REPLICATE_REGION_NAME1, 3); |
| awaitDistributedRegionMXBean(SEPARATOR + REPLICATE_REGION_NAME2, 1); |
| awaitDistributedRegionMXBean(SEPARATOR + REPLICATE_REGION_NAME3, 1); |
| awaitDistributedRegionMXBean(SEPARATOR + REPLICATE_REGION_NAME4, 1); |
| }); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private List<String> getLocalDataSet(final String region) { |
| PartitionedRegion partitionedRegion = |
| PartitionedRegionHelper.getPartitionedRegion(region, managementTestRule.getCache()); |
| assertThat(partitionedRegion).isNotNull(); |
| |
| Set<BucketRegion> localPrimaryBucketRegions = |
| partitionedRegion.getDataStore().getAllLocalPrimaryBucketRegions(); |
| |
| List<String> allPrimaryValues = new ArrayList<>(); |
| |
| for (BucketRegion bucketRegion : localPrimaryBucketRegions) { |
| for (Object value : bucketRegion.values()) { |
| allPrimaryValues.add((String) value); |
| } |
| } |
| |
| return allPrimaryValues; |
| } |
| |
| private List<FixedPartitionAttributes> createFixedPartitionList(final int primaryIndex) { |
| List<FixedPartitionAttributes> fixedPartitionAttributesList = new ArrayList<>(); |
| if (primaryIndex == 1) { |
| fixedPartitionAttributesList.add(createFixedPartition("Q1", true, 3)); |
| fixedPartitionAttributesList.add(createFixedPartition("Q2", 3)); |
| fixedPartitionAttributesList.add(createFixedPartition("Q3", 3)); |
| } |
| if (primaryIndex == 2) { |
| fixedPartitionAttributesList.add(createFixedPartition("Q1", 3)); |
| fixedPartitionAttributesList.add(createFixedPartition("Q2", true, 3)); |
| fixedPartitionAttributesList.add(createFixedPartition("Q3", 3)); |
| } |
| if (primaryIndex == 3) { |
| fixedPartitionAttributesList.add(createFixedPartition("Q1", 3)); |
| fixedPartitionAttributesList.add(createFixedPartition("Q2", 3)); |
| fixedPartitionAttributesList.add(createFixedPartition("Q3", true, 3)); |
| } |
| return fixedPartitionAttributesList; |
| } |
| |
| private MemberMXBean awaitMemberMXBeanProxy(final DistributedMember member) { |
| SystemManagementService service = managementTestRule.getSystemManagementService(); |
| ObjectName objectName = service.getMemberMBeanName(member); |
| String alias = "awaiting MemberMXBean proxy for " + member; |
| |
| await(alias) |
| .untilAsserted( |
| () -> assertThat(service.getMBeanProxy(objectName, MemberMXBean.class)).isNotNull()); |
| |
| return service.getMBeanProxy(objectName, MemberMXBean.class); |
| } |
| |
| private DistributedSystemMXBean awaitDistributedSystemMXBean() { |
| SystemManagementService service = managementTestRule.getSystemManagementService(); |
| |
| await().untilAsserted(() -> assertThat(service.getDistributedSystemMXBean()).isNotNull()); |
| |
| return service.getDistributedSystemMXBean(); |
| } |
| |
| private DistributedRegionMXBean awaitDistributedRegionMXBean(final String name) { |
| SystemManagementService service = managementTestRule.getSystemManagementService(); |
| |
| await().untilAsserted(() -> assertThat(service.getDistributedRegionMXBean(name)).isNotNull()); |
| |
| return service.getDistributedRegionMXBean(name); |
| } |
| |
| private DistributedRegionMXBean awaitDistributedRegionMXBean(final String name, |
| final int memberCount) { |
| SystemManagementService service = managementTestRule.getSystemManagementService(); |
| |
| await().untilAsserted(() -> assertThat(service.getDistributedRegionMXBean(name)).isNotNull()); |
| await() |
| .untilAsserted(() -> assertThat(service.getDistributedRegionMXBean(name).getMemberCount()) |
| .isEqualTo(memberCount)); |
| |
| return service.getDistributedRegionMXBean(name); |
| } |
| |
| private static class TestPartitionResolver implements PartitionResolver<Integer, Object> { |
| |
| @Override |
| public void close() {} |
| |
| @Override |
| public Serializable getRoutingObject(EntryOperation opDetails) { |
| return ((Integer) opDetails.getKey() % NUM_OF_BUCKETS); |
| } |
| |
| @Override |
| public String getName() { |
| return getClass().getName(); |
| } |
| } |
| } |