blob: b0bf9b84f27c53722ddc292a1b8357829505a531 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Set;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheRule;
import org.apache.geode.test.dunit.rules.DistributedRule;
@SuppressWarnings("serial")
public class PRCustomPartitioningDistributedTest implements Serializable {
private static final int TOTAL_NUM_BUCKETS = 7;
private List<Date> listOfKeysInVM1;
private List<Date> listOfKeysInVM2;
private List<Date> listOfKeysInVM3;
private List<Date> listOfKeysInVM4;
private String regionName;
private VM datastoreVM0;
private VM datastoreVM1;
private VM datastoreVM2;
private VM accessorVM3;
@Rule
public DistributedRule distributedRule = new DistributedRule();
@Rule
public CacheRule cacheRule = new CacheRule();
@Before
public void setUp() {
datastoreVM0 = getVM(0);
datastoreVM1 = getVM(1);
datastoreVM2 = getVM(2);
accessorVM3 = getVM(3);
regionName = "PR1";
listOfKeysInVM1 = new ArrayList<>();
listOfKeysInVM2 = new ArrayList<>();
listOfKeysInVM3 = new ArrayList<>();
listOfKeysInVM4 = new ArrayList<>();
}
@Test
public void testPartitionedRegionOperationsCustomPartitioning() {
datastoreVM0.invoke(this::createPartitionedRegionWithPartitionResolver);
datastoreVM1.invoke(this::createPartitionedRegionWithPartitionResolver);
datastoreVM2.invoke(this::createPartitionedRegionWithPartitionResolver);
accessorVM3.invoke(this::createPartitionedRegionAccessorWithPartitionResolver);
datastoreVM0.invoke(() -> doPutOperations(regionName, 2100, listOfKeysInVM1));
datastoreVM1.invoke(() -> doPutOperations(regionName, 2200, listOfKeysInVM2));
datastoreVM2.invoke(() -> doPutOperations(regionName, 2300, listOfKeysInVM3));
accessorVM3.invoke(() -> doPutOperations(regionName, 2400, listOfKeysInVM4));
datastoreVM0.invoke(() -> verifyKeys(regionName, listOfKeysInVM1));
datastoreVM1.invoke(() -> verifyKeys(regionName, listOfKeysInVM2));
datastoreVM2.invoke(() -> verifyKeys(regionName, listOfKeysInVM3));
accessorVM3.invoke(() -> verifyKeysInAccessor(regionName, listOfKeysInVM4));
}
private void doPutOperations(final String regionName, final int century,
final List<Date> listOfKeys) {
Calendar calendar = Calendar.getInstance();
Region<Date, Integer> region = cacheRule.getCache().getRegion(regionName);
for (int month = 0; month <= 11; month++) {
int year = (int) (Math.random() * century);
int date = (int) (Math.random() * 30);
calendar.set(year, month, date);
Date key = calendar.getTime();
listOfKeys.add(key);
region.put(key, month);
}
}
private void verifyKeysInAccessor(final String regionName, final List<Date> listOfKeys) {
PartitionedRegion partitionedRegion =
(PartitionedRegion) cacheRule.getCache().getRegion(regionName);
for (Object key : listOfKeys) {
assertThat(containsKeyInSomeBucket(partitionedRegion, (Date) key)).isTrue();
}
assertThat(partitionedRegion.getDataStore()).isNull();
}
private void verifyKeys(final String regionName, final List<Date> listOfKeys) {
PartitionedRegion partitionedRegion =
(PartitionedRegion) cacheRule.getCache().getRegion(regionName);
for (Date key : listOfKeys) {
assertThat(containsKeyInSomeBucket(partitionedRegion, key)).isTrue();
}
partitionedRegion.getDataStore().visitBuckets((bucketId, region) -> {
Set<Object> bucketKeys = partitionedRegion.getBucketKeys(bucketId);
for (Object key : bucketKeys) {
EntryOperation<Date, Integer> entryOperation =
new EntryOperationImpl(partitionedRegion, null, key, null, null);
PartitionResolver<Date, Integer> partitionResolver =
partitionedRegion.getPartitionResolver();
Object routingObject = partitionResolver.getRoutingObject(entryOperation);
int routingObjectHashCode = routingObject.hashCode() % TOTAL_NUM_BUCKETS;
assertThat(routingObjectHashCode).isEqualTo(bucketId);
}
});
}
private void createPartitionedRegionWithPartitionResolver() {
PartitionAttributesFactory<Date, Integer> paf = new PartitionAttributesFactory<>();
paf.setPartitionResolver(new MonthBasedPartitionResolver());
paf.setRedundantCopies(0);
paf.setTotalNumBuckets(TOTAL_NUM_BUCKETS);
RegionFactory<Date, Integer> regionFactory =
cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION);
regionFactory.setPartitionAttributes(paf.create());
regionFactory.create(regionName);
}
private void createPartitionedRegionAccessorWithPartitionResolver() {
PartitionAttributesFactory<Date, Integer> paf = new PartitionAttributesFactory<>();
paf.setLocalMaxMemory(0);
paf.setPartitionResolver(new MonthBasedPartitionResolver());
paf.setRedundantCopies(0);
paf.setTotalNumBuckets(TOTAL_NUM_BUCKETS);
RegionFactory<Date, Integer> regionFactory =
cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION);
regionFactory.setPartitionAttributes(paf.create());
regionFactory.create(regionName);
}
/**
* Returns true if the key is found in any buckets.
*/
private boolean containsKeyInSomeBucket(PartitionedRegion partitionedRegion, Date key) {
int numberOfBuckets = partitionedRegion.getTotalNumberOfBuckets();
for (int bucketId = 0; bucketId < numberOfBuckets; bucketId++) {
if (partitionedRegion.getBucketKeys(bucketId).contains(key)) {
return true;
}
}
return false;
}
}