blob: 1887bc4e5c9cd8fe6b8cdeb9b51bb6135d6e562f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.execute;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.internal.ClientMetadataService;
import org.apache.geode.cache.client.internal.ClientPartitionAdvisor;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.BucketServerLocation66;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
/**
* This class tests single-hop bulk operations in client caches. Single-hop makes use
* of metadata concerning partitioned region bucket locations to find primary buckets
* on which to operate. If the metadata is incorrect it forces scheduling of a refresh.
* A total count of all refresh requests is kept in the metadata service and is used
* by this test to verify whether the cache thought the metadata was correct or not.
*/
public class SingleHopGetAllPutAllDUnitTest extends PRClientServerTestBase {
private static final long serialVersionUID = 3873751456134028508L;
public SingleHopGetAllPutAllDUnitTest() {
super();
}
@Before
public void createScenario() {
ArrayList commonAttributes =
createCommonServerAttributes("TestPartitionedRegion", null, 2, null);
createClientServerScenarioSingleHop(commonAttributes, 20, 20, 20);
}
/**
* populate the region, do a getAll, verify that metadata is fetched
* do another getAll and verify that metadata did not need to be refetched
*/
@Test
public void testGetAllInClient() {
client.invoke("testGetAllInClient", () -> {
Region<Integer, Object> region = cache.getRegion(PartitionedRegionName);
assertThat(region).isNotNull();
final List<Object> testValueList = new ArrayList<>();
final List<Integer> testKeyList = new ArrayList<>();
for (int i = (totalNumBuckets * 3); i > 0; i--) {
testValueList.add("execKey-" + i);
}
DistributedSystem.setThreadsSocketPolicy(false);
int j = 0;
Map<Integer, Object> origVals = new HashMap<>();
for (Object o : testValueList) {
testKeyList.add(j);
Integer key = j++;
origVals.put(key, o);
region.put(key, o);
}
// check if the client meta-data is in synch
verifyMetadata();
long metadataRefreshes =
((GemFireCacheImpl) cache).getClientMetadataService()
.getTotalRefreshTaskCount_TEST_ONLY();
Map<Integer, Object> resultMap = region.getAll(testKeyList);
assertThat(resultMap).isEqualTo(origVals);
// a new refresh should not have been triggered
assertThat(((GemFireCacheImpl) cache).getClientMetadataService()
.getTotalRefreshTaskCount_TEST_ONLY())
.isEqualTo(metadataRefreshes);
});
}
/**
* perform a putAll and ensure that metadata is fetched. Then do another
* putAll and ensure that metadata did not need to be refreshed
*/
@Test
public void testPutAllInClient() {
client.invoke("testPutAllInClient", () -> {
Region<String, String> region = cache.getRegion(PartitionedRegionName);
assertThat(region).isNotNull();
Map<String, String> keysValuesMap = new HashMap<>();
List<String> testKeysList = new ArrayList<>();
for (int i = (totalNumBuckets * 3); i > 0; i--) {
testKeysList.add("putAllKey-" + i);
keysValuesMap.put("putAllKey-" + i, "values-" + i);
}
DistributedSystem.setThreadsSocketPolicy(false);
region.putAll(keysValuesMap);
verifyMetadata();
long metadataRefreshes =
((GemFireCacheImpl) cache).getClientMetadataService()
.getTotalRefreshTaskCount_TEST_ONLY();
region.putAll(keysValuesMap);
// a new refresh should not have been triggered
assertThat(((GemFireCacheImpl) cache).getClientMetadataService()
.getTotalRefreshTaskCount_TEST_ONLY())
.isEqualTo(metadataRefreshes);
});
}
/**
* Do a putAll and ensure that metadata has been fetched. Then do a removeAll and
* ensure that metadata did not need to be refreshed. Finally do a getAll to ensure
* that the removeAll did its job.
*/
@Test
public void testRemoveAllInClient() {
client.invoke("testRemoveAllInClient", () -> {
Region<String, String> region = cache.getRegion(PartitionedRegionName);
assertThat(region).isNotNull();
Map<String, String> keysValuesMap = new HashMap<>();
List<String> testKeysList = new ArrayList<>();
for (int i = (totalNumBuckets * 3); i > 0; i--) {
testKeysList.add("putAllKey-" + i);
keysValuesMap.put("putAllKey-" + i, "values-" + i);
}
DistributedSystem.setThreadsSocketPolicy(false);
region.putAll(keysValuesMap);
verifyMetadata();
long metadataRefreshes =
((GemFireCacheImpl) cache).getClientMetadataService()
.getTotalRefreshTaskCount_TEST_ONLY();
region.removeAll(testKeysList);
// a new refresh should not have been triggered
assertThat(((GemFireCacheImpl) cache).getClientMetadataService()
.getTotalRefreshTaskCount_TEST_ONLY())
.isEqualTo(metadataRefreshes);
HashMap<String, Object> noValueMap = new HashMap<>();
for (String key : testKeysList) {
noValueMap.put(key, null);
}
assertThat(noValueMap).isEqualTo(region.getAll(testKeysList));
assertThat(((GemFireCacheImpl) cache).getClientMetadataService()
.getTotalRefreshTaskCount_TEST_ONLY())
.isEqualTo(metadataRefreshes);
});
}
/**
* If a client doesn't know the primary location of a bucket it should perform a
* metadata refresh. This test purposefully removes all primary location knowledge
* from PR metadata in a client cache and then performs a bulk operation. This
* should trigger a refresh.
*/
@Test
public void testBulkOpInClientWithBadMetadataCausesRefresh() {
client.invoke("testBulkOpInClientWithBadMetadataCausesRefresh", () -> {
Region<Integer, Object> region = cache.getRegion(PartitionedRegionName);
assertThat(region).isNotNull();
final List<Object> testValueList = new ArrayList<>();
final List<Integer> testKeyList = new ArrayList<>();
for (int i = (totalNumBuckets * 3); i > 0; i--) {
testValueList.add("execKey-" + i);
}
DistributedSystem.setThreadsSocketPolicy(false);
int j = 0;
Map<Integer, Object> origVals = new HashMap<>();
for (Object o : testValueList) {
testKeyList.add(j);
Integer key = j++;
origVals.put(key, o);
region.put(key, o);
}
// check if the client meta-data is in synch
verifyMetadata();
long metadataRefreshes =
((GemFireCacheImpl) cache).getClientMetadataService()
.getTotalRefreshTaskCount_TEST_ONLY();
removePrimaryMetadata();
Map<Integer, Object> resultMap = region.getAll(testKeyList);
assertThat(resultMap).isEqualTo(origVals);
// a new refresh should have been triggered
assertThat(((GemFireCacheImpl) cache).getClientMetadataService()
.getTotalRefreshTaskCount_TEST_ONLY())
.isNotEqualTo(metadataRefreshes);
});
}
private void verifyMetadata() {
Region region = cache.getRegion(PartitionedRegionName);
ClientMetadataService cms = ((GemFireCacheImpl) cache).getClientMetadataService();
cms.getClientPRMetadata((LocalRegion) region);
await().until(cms::isMetadataStable);
await().until(() -> cms.getClientPRMetadata_TEST_ONLY().size() > 0);
final Map<String, ClientPartitionAdvisor> regionMetaData = cms.getClientPRMetadata_TEST_ONLY();
assertThat(regionMetaData).containsKey(region.getFullPath());
await().until(() -> {
ClientPartitionAdvisor prMetaData = regionMetaData.get(region.getFullPath());
assertThat(prMetaData).isNotNull();
assertThat(prMetaData.adviseRandomServerLocation()).isNotNull();
return true;
});
}
private void removePrimaryMetadata() {
Region region = cache.getRegion(PartitionedRegionName);
ClientMetadataService cms = ((GemFireCacheImpl) cache).getClientMetadataService();
cms.getClientPRMetadata((LocalRegion) region);
final Map<String, ClientPartitionAdvisor> regionMetaData = cms.getClientPRMetadata_TEST_ONLY();
final ClientPartitionAdvisor prMetaData = regionMetaData.get(region.getFullPath());
Map<Integer, List<BucketServerLocation66>> bucketLocations =
prMetaData.getBucketServerLocationsMap_TEST_ONLY();
for (Map.Entry<Integer, List<BucketServerLocation66>> locationEntry : bucketLocations
.entrySet()) {
List<BucketServerLocation66> newList = new ArrayList<>(locationEntry.getValue());
for (Iterator<BucketServerLocation66> bucketIterator = newList.iterator(); bucketIterator
.hasNext();) {
BucketServerLocation66 location = bucketIterator.next();
if (location.isPrimary()) {
bucketIterator.remove();
}
bucketLocations.put(locationEntry.getKey(), newList);
}
}
}
}