blob: 114c9a25a3505739ff25a49620c94c24d1d2a210 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.concurrent.TimeoutException;
import org.junit.Rule;
import org.junit.Test;
import org.apache.geode.Statistics;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.control.RebalanceOperation;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
@SuppressWarnings("serial")
public class PartitionedRegionStatsDistributedTest implements Serializable {
private static final int CNT = 10;
private static final int PUT = 1;
private static final int GET = 2;
private static final int CONTAINS_KEY = 3;
private static final int CONTAINS_VALUE_FOR_KEY = 4;
private static final int INVALIDATE = 5;
private static final int DESTROY = 6;
private static final int CREATE = 7;
private static final int GET_ENTRY = 8;
private static final int DEFAULT_NUM_BUCKETS = 113;
public static final String STAT_CONFIGURED_REDUNDANT_COPIES = "configuredRedundantCopies";
public static final String STAT_ACTUAL_REDUNDANT_COPIES = "actualRedundantCopies";
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
private final String regionName = "region";
@Test
public void whenOperationsAreDoneOnEntriesThenStatsMustBeCorrect() {
MemberVM locator1 = clusterStartupRule.startLocatorVM(0);
int locator1Port = locator1.getPort();
MemberVM server1 =
clusterStartupRule.startServerVM(2,
s -> s.withConnectionToLocator(locator1Port));
MemberVM server2 =
clusterStartupRule.startServerVM(3,
s -> s.withConnectionToLocator(locator1Port));
MemberVM server3 =
clusterStartupRule.startServerVM(4,
s -> s.withConnectionToLocator(locator1Port));
MemberVM server4 =
clusterStartupRule.startServerVM(5,
s -> s.withConnectionToLocator(locator1Port));
// Create PRs on 3 VMs and accessors on 1 VM
server1.invoke(() -> createPartitionedRegion(regionName, 200, 1, 5));
server2.invoke(() -> createPartitionedRegion(regionName, 200, 1, 5));
server3.invoke(() -> createPartitionedRegion(regionName, 200, 1, 5));
server4.invoke(() -> createPartitionedRegion(regionName, 0, 1, 5));
// Do Region operations.
server1.invoke(() -> doOpsOnRegion(regionName, PUT));
server1.invoke(() -> doOpsOnRegion(regionName, GET));
server1.invoke(() -> doOpsOnRegion(regionName, GET_ENTRY));
server1.invoke(() -> doOpsOnRegion(regionName, CONTAINS_KEY));
server1.invoke(() -> doOpsOnRegion(regionName, CONTAINS_VALUE_FOR_KEY));
server1.invoke(() -> doOpsOnRegion(regionName, INVALIDATE));
server1.invoke(() -> doOpsOnRegion(regionName, DESTROY));
server1.invoke(() -> validatePartitionedRegionOpsStats(regionName));
server1.invoke(() -> validateRedundantCopiesStats(regionName));
}
/**
* Test to ensure that tombstone entries are not counted as a part of region entries
*/
@Test
public void whenEntryIsDestroyedThenTombstoneShouldNotBePartOfEntries() {
MemberVM locator = clusterStartupRule.startLocatorVM(0);
int locatorPort = locator.getPort();
MemberVM server1 =
clusterStartupRule.startServerVM(1, s -> s.withConnectionToLocator(locatorPort));
MemberVM server2 =
clusterStartupRule.startServerVM(2, s -> s.withConnectionToLocator(locatorPort));
server1.invoke(() -> createPartitionedRegionWithRebalance(regionName, 0));
server1.invoke(() -> {
Cache cache = ClusterStartupRule.getCache();
Region<Integer, Integer> region = cache.getRegion(regionName);
region.put(0, 0);
region.put(1, 1);
region.put(113, 113);
region.put(114, 114);
region.destroy(0);
region.destroy(1);
});
server2.invoke(() -> createPartitionedRegionWithRebalance(regionName, 0));
server1.invoke(() -> validateEntryCount(regionName, 1));
server2.invoke(() -> validateEntryCount(regionName, 1));
}
@Test
public void whenRegionPutsAreDoneAndRebalanceCompletedThenStatsShouldBeCorrect() {
MemberVM locator = clusterStartupRule.startLocatorVM(0);
int locatorPort = locator.getPort();
MemberVM server1 =
clusterStartupRule.startServerVM(1, s -> s.withConnectionToLocator(locatorPort));
MemberVM server2 =
clusterStartupRule.startServerVM(2, s -> s.withConnectionToLocator(locatorPort));
server1.invoke(() -> createPartitionedRegionWithRebalance(regionName, 0));
server1.invoke(() -> {
Cache cache = ClusterStartupRule.getCache();
Region<Integer, Integer> region = cache.getRegion(regionName);
for (int i = 0; i <= 5; i++) {
region.put(i, i);
}
});
server2.invoke(() -> createPartitionedRegionWithRebalance(regionName, 0));
server1.invoke(() -> validateEntryCount(regionName, 3));
server2.invoke(() -> validateEntryCount(regionName, 3));
}
/**
* Test to make sure the datastore entry count is accurate.
* Region entry stats are validated before and after entry destroy.
* Ensure that tombstones and tombstone GC are not affecting the region
* entry count.
*/
@Test
public void tombstoneAndGCShouldNotAffectTheEntryCount() {
MemberVM locator = clusterStartupRule.startLocatorVM(0);
int locatorPort = locator.getPort();
MemberVM server0 =
clusterStartupRule.startServerVM(1, s -> s.withConnectionToLocator(locatorPort));
MemberVM server1 =
clusterStartupRule.startServerVM(2, s -> s.withConnectionToLocator(locatorPort));
MemberVM server2 =
clusterStartupRule.startServerVM(3, s -> s.withConnectionToLocator(locatorPort));
server0.invoke(() -> createPartitionedRegionWithRedundantCopies(regionName));
server1.invoke(() -> createPartitionedRegionWithRedundantCopies(regionName));
server0.invoke(() -> putDataInRegion(regionName));
server0.invoke(() -> validateEntryCount(regionName, 4));
server1.invoke(() -> validateEntryCount(regionName, 4));
// Do a destroy
server0.invoke(() -> {
Cache cache = ClusterStartupRule.getCache();
Region<Integer, Integer> region = cache.getRegion(regionName);
region.destroy(0);
});
// We expect the tombstone won't be recorded as part of the entry count
server0.invoke(() -> validateEntryCount(regionName, 3));
server1.invoke(() -> validateEntryCount(regionName, 3));
// Destroy and modify a tombstone
server0.invoke(() -> {
Cache cache = ClusterStartupRule.getCache();
Region<Integer, Integer> region = cache.getRegion(regionName);
region.destroy(113);
region.put(113, 113);
});
server0.invoke(() -> validateEntryCount(regionName, 3));
server1.invoke(() -> validateEntryCount(regionName, 3));
// After GII (which might include the tombstone), a new members should still see only 2 live
// entries.
server2.invoke(() -> createPartitionedRegionWithRedundantCopies(regionName));
// Wait for redundancy to be restored. Once it is the entry count should be 2
server2.invoke(() -> {
Cache cache = ClusterStartupRule.getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
PartitionedRegionStats prStats = region.getPrStats();
await()
.untilAsserted(() -> assertThat(prStats.getLowRedundancyBucketCount()).isEqualTo(0));
});
server2.invoke(() -> validateEntryCount(regionName, 3));
// A tombstone GC shouldn't affect the count.
server0.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
TombstoneService tombstoneService = cache.getTombstoneService();
tombstoneService.forceBatchExpirationForTests(1);
});
server0.invoke(() -> validateEntryCount(regionName, 3));
server1.invoke(() -> validateEntryCount(regionName, 3));
server2.invoke(() -> validateEntryCount(regionName, 3));
}
@Test
public void testThatServersHaveTheCorrectNumberOfBucketsAfterPuts() {
MemberVM locator = clusterStartupRule.startLocatorVM(0);
int locatorPort = locator.getPort();
MemberVM server0 =
clusterStartupRule.startServerVM(1, s -> s.withConnectionToLocator(locatorPort));
MemberVM server1 =
clusterStartupRule.startServerVM(2, s -> s.withConnectionToLocator(locatorPort));
server0.invoke(() -> createPartitionedRegionWithRedundantCopies(regionName));
server1.invoke(() -> createPartitionedRegionWithRedundantCopies(regionName));
server0.invoke(() -> putDataInRegion(regionName));
server1.invoke(() -> putDataInRegion(regionName));
server0.invoke(() -> validateTotalNumBucketsCount(regionName, DEFAULT_NUM_BUCKETS));
server1.invoke(() -> validateTotalNumBucketsCount(regionName, DEFAULT_NUM_BUCKETS));
}
private void putDataInRegion(final String regionName) {
Cache cache = ClusterStartupRule.getCache();
Region<Integer, Integer> region = cache.getRegion(regionName);
region.put(0, 0);
region.put(1, 1);
region.put(113, 113);
region.put(226, 114);
}
private void createPartitionedRegionWithRedundantCopies(final String regionName) {
PartitionAttributesFactory<Integer, Integer> paf = new PartitionAttributesFactory<>();
paf.setRedundantCopies(2);
RegionFactory<Integer, Integer> regionFactory =
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION);
regionFactory.setPartitionAttributes(paf.create());
regionFactory.create(regionName);
}
private void createPartitionedRegion(final String regionName, final int localMaxMemory,
final int redundancy, int totalNumBuckets) {
PartitionAttributesFactory<Integer, Integer> paf = new PartitionAttributesFactory<>();
paf.setLocalMaxMemory(localMaxMemory);
paf.setRedundantCopies(redundancy);
paf.setTotalNumBuckets(totalNumBuckets);
RegionFactory<Integer, Integer> regionFactory =
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION);
regionFactory.setPartitionAttributes(paf.create());
regionFactory.create(regionName);
}
private void createPartitionedRegionWithRebalance(final String regionName, int redundancy)
throws InterruptedException, TimeoutException {
PartitionAttributesFactory<Integer, Integer> paf = new PartitionAttributesFactory<>();
paf.setRedundantCopies(redundancy);
RegionFactory<Integer, Integer> regionFactory =
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION);
regionFactory.setPartitionAttributes(paf.create());
regionFactory.create(regionName);
RebalanceOperation op =
ClusterStartupRule.getCache().getResourceManager().createRebalanceFactory().start();
op.getResults(2, MINUTES);
}
private void validateEntryCount(final String regionName, final int expectedCount) {
Cache cache = ClusterStartupRule.getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
PartitionedRegionStats stats = region.getPrStats();
CachePerfStats cachePerfStats = region.getCachePerfStats();
assertThat(stats.getDataStoreEntryCount()).isEqualTo(expectedCount);
long actualCount = cachePerfStats.stats.getLong(CachePerfStats.entryCountId);
assertThat(actualCount).isEqualTo(expectedCount);
}
private void validateTotalNumBucketsCount(final String regionName, final int expectedCount) {
Cache cache = ClusterStartupRule.getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
PartitionedRegionStats stats = region.getPrStats();
assertThat(stats.getTotalNumBuckets()).isEqualTo(expectedCount);
}
private void doOpsOnRegion(final String regionName, final int opType) {
doOpsOnRegion(ClusterStartupRule.getCache().getRegion(regionName), opType);
}
private void doOpsOnRegion(final Region<Integer, Integer> region, final int opType) {
switch (opType) {
case PUT:
for (int k = 0; k < CNT; k++) {
region.put(k, k);
}
break;
case GET:
for (int k = 0; k < CNT; k++) {
region.get(k);
}
break;
case CONTAINS_KEY:
for (int k = 0; k < CNT; k++) {
region.containsKey(k);
}
break;
case CONTAINS_VALUE_FOR_KEY:
for (int k = 0; k < CNT; k++) {
region.containsValueForKey(k);
}
break;
case INVALIDATE:
for (int k = 0; k < CNT; k++) {
region.invalidate(k);
}
break;
case DESTROY:
for (int k = 0; k < CNT; k++) {
region.destroy(k);
}
break;
case CREATE:
for (int k = 0; k < CNT; k++) {
region.create(k, k);
}
break;
case GET_ENTRY:
for (int k = 0; k < CNT; k++) {
region.getEntry(k);
}
}
}
private void validatePartitionedRegionOpsStats(final String regionName) {
Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(regionName);
assertThat(region).isNotNull();
Statistics stats = ((PartitionedRegion) region).getPrStats().getStats();
int putsCompleted = stats.get("putsCompleted").intValue();
int getsCompleted = stats.get("getsCompleted").intValue();
int getEntryCompleted = stats.get("getEntryCompleted").intValue();
int createsCompleted = stats.get("createsCompleted").intValue();
int containsKeyCompleted = stats.get("containsKeyCompleted").intValue();
int containsValueForKeyCompleted = stats.get("containsValueForKeyCompleted").intValue();
int invalidatesCompleted = stats.get("invalidatesCompleted").intValue();
int destroysCompleted = stats.get("destroysCompleted").intValue();
assertThat(putsCompleted).isEqualTo(CNT);
assertThat(getsCompleted).isEqualTo(CNT);
assertThat(getEntryCompleted).isEqualTo(CNT);
assertThat(createsCompleted).isEqualTo(0);
assertThat(containsKeyCompleted).isEqualTo(CNT);
assertThat(containsValueForKeyCompleted).isEqualTo(CNT);
assertThat(invalidatesCompleted).isEqualTo(CNT);
assertThat(destroysCompleted).isEqualTo(CNT);
// TODO: Need to validate that bucket stats add up....
// partitionedRegion.getDataStore().getCachePerfStats().getGets(); // etc...
}
private void validateRedundantCopiesStats(final String regionName) {
Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(regionName);
assertThat(region).isNotNull();
Statistics stats = ((PartitionedRegion) region).getPrStats().getStats();
int configuredRedundantCopies = stats.get(STAT_CONFIGURED_REDUNDANT_COPIES).intValue();
int actualRedundantCopies = stats.get(STAT_ACTUAL_REDUNDANT_COPIES).intValue();
assertThat(configuredRedundantCopies).isEqualTo(1);
assertThat(actualRedundantCopies).isEqualTo(1);
}
}