blob: 053f80bd962c6aa4e048e396a7be66bf356ec0ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.logging.log4j.Logger;
import org.junit.Rule;
import org.junit.Test;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.partitioned.RemoveBucketMessage;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.rules.ClientVM;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.DistributedBlackboard;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
public class RebalanceWhileCreatingRegionDistributedTest implements Serializable {
@Rule
public ClusterStartupRule cluster = new ClusterStartupRule();
@Rule
public SerializableTestName testName = new SerializableTestName();
@Rule
public DistributedBlackboard blackboard = new DistributedBlackboard();
private static final Logger logger = LogService.getLogger();
public static final String BEFORE_REMOVE_BUCKET_MESSAGE = "Before_RemoveBucketMessage";
public static final String AFTER_CREATE_PROXY_REGION = "After_CreateProxyRegion";
@Test
public void testRebalanceDuringRegionCreation() throws Exception {
// Init Blackboard
blackboard.initBlackboard();
// Start Locator
MemberVM locator = cluster.startLocatorVM(0);
// Start servers
int locatorPort = locator.getPort();
MemberVM server1 = cluster.startServerVM(1, locatorPort);
MemberVM server2 = cluster.startServerVM(2, locatorPort);
MemberVM accessor = cluster.startServerVM(4, locatorPort);
// Add DistributionMessageObserver
String regionName = testName.getMethodName();
Stream.of(server1, server2, accessor)
.forEach(server -> server.invoke(() -> addDistributionMessageObserver(regionName)));
// Create regions in each server
server1.invoke(() -> createRegion(regionName, RegionShortcut.PARTITION));
server2.invoke(() -> createRegion(regionName, RegionShortcut.PARTITION));
// Asynchronously wait to create the proxy region in the accessor
AsyncInvocation<Void> asyncInvocation =
accessor.invokeAsync(() -> waitToCreateProxyRegion(regionName));
// Connect client1
ClientVM client1 =
cluster.startClientVM(5, c -> c.withServerConnection(server1.getPort(), server2.getPort()));
// Do puts
client1.invoke(() -> {
Region<Integer, Integer> region =
ClusterStartupRule.clientCacheRule.createProxyRegion(regionName);
IntStream.range(0, 3).forEach(i -> region.put(i, i));
});
// Start server3
MemberVM server3 = cluster.startServerVM(3, locatorPort);
// Create region in server3
server3.invoke(() -> createRegion(regionName, RegionShortcut.PARTITION));
// Add DistributionMessageObserver to server3
server3.invoke(() -> addDistributionMessageObserver(regionName));
// Rebalance
server1.invoke(() -> ClusterStartupRule.getCache().getResourceManager().createRebalanceFactory()
.start().getResults());
// Stop server3
server3.invoke(() -> ClusterStartupRule.getCache().close());
// Connect client to accessor
ClientVM client2 =
cluster.startClientVM(6, c -> c.withServerConnection(accessor.getPort())
.withCacheSetup(cf -> cf.setPoolReadTimeout(20000)));
// Do puts
client2.invoke(() -> {
Region<Integer, Integer> region =
ClusterStartupRule.clientCacheRule.createProxyRegion(regionName);
IntStream.range(0, 3).forEach(i -> region.put(i, i));
});
asyncInvocation.get();
accessor.invoke(() -> {
Region region = ClusterStartupRule.getCache().getRegion(regionName);
IntStream.range(3, 6).forEach(i -> region.put(i, i));
assertThat(region.size()).isEqualTo(6);
IntStream.range(0, 6).forEach(i -> assertThat(region.get(i)).isEqualTo(i));
});
}
@Test
public void testMoveSingleBucketDuringRegionCreation() throws Exception {
// Init Blackboard
blackboard.initBlackboard();
// Start Locator
MemberVM locator = cluster.startLocatorVM(0);
// Start servers
int locatorPort = locator.getPort();
MemberVM server1 = cluster.startServerVM(1, locatorPort);
MemberVM server2 = cluster.startServerVM(2, locatorPort);
MemberVM accessor = cluster.startServerVM(3, locatorPort);
// Add DistributionMessageObserver
String regionName = testName.getMethodName();
Stream.of(server1, server2, accessor)
.forEach(server -> server.invoke(() -> addDistributionMessageObserver(regionName)));
// Create regions in each server
InternalDistributedMember source = server1.invoke(() -> {
createSingleBucketRegion(regionName, RegionShortcut.PARTITION);
Region<Integer, Integer> region =
ClusterStartupRule.getCache().getRegion(regionName);
region.put(123, 123);
PartitionedRegionDataStore partitionedRegionDataStore =
((PartitionedRegion) region).getDataStore();
// Make sure server1 has the primary bucket
assertThat(partitionedRegionDataStore).isNotNull();
assertThat(partitionedRegionDataStore.getNumberOfPrimaryBucketsManaged()).isEqualTo(1);
return InternalDistributedSystem.getAnyInstance().getDistributedMember();
});
InternalDistributedMember destination = server2.invoke(() -> {
createSingleBucketRegion(regionName, RegionShortcut.PARTITION);
Region<Integer, Integer> region =
ClusterStartupRule.getCache().getRegion(regionName);
PartitionedRegionDataStore partitionedRegionDataStore =
((PartitionedRegion) region).getDataStore();
// Make sure server2 does not have primary bucket
assertThat(partitionedRegionDataStore).isNotNull();
assertThat(partitionedRegionDataStore.getNumberOfPrimaryBucketsManaged()).isEqualTo(0);
return InternalDistributedSystem.getAnyInstance().getDistributedMember();
});
// Asynchronously wait to create the proxy region in the accessor
AsyncInvocation<Void> asyncInvocation = accessor.invokeAsync(() -> {
waitToCreateSingleBucketProxyRegion(regionName);
});
// Move the primary bucket from server1 to server2 and close the cache in the end
server2.invoke(() -> {
PartitionedRegion partitionedRegion =
(PartitionedRegion) ClusterStartupRule.getCache().getRegion(regionName);
PartitionedRegionDataStore partitionedRegionDataStore = partitionedRegion.getDataStore();
// Simulate rebalance operation by calling moveBucket()
partitionedRegionDataStore.moveBucket(0, source, true);
ClusterStartupRule.getCache().close();
});
asyncInvocation.get();
// Make sure the accessor knows that the primary bucket has moved to server2
accessor.invoke(() -> {
PartitionedRegion pr =
(PartitionedRegion) ClusterStartupRule.getCache().getRegion(regionName);
assertThat(pr.getRegionAdvisor().getBucket(0).getBucketAdvisor().getProfile(source))
.isNull();
assertThat(pr.getRegionAdvisor().getBucket(0).getBucketAdvisor().getProfile(destination))
.isNull();
});
}
private void createRegion(String regionName, RegionShortcut shortcut) {
PartitionAttributesFactory<Integer, Integer> paf = new PartitionAttributesFactory<>();
paf.setRedundantCopies(0);
paf.setTotalNumBuckets(3);
if (shortcut.isProxy()) {
paf.setLocalMaxMemory(0);
}
RegionFactory<Integer, Integer> rf =
ClusterStartupRule.getCache().createRegionFactory(shortcut);
rf.setPartitionAttributes(paf.create());
rf.create(regionName);
}
private void createSingleBucketRegion(String regionName, RegionShortcut shortcut) {
PartitionAttributesFactory<Integer, Integer> paf = new PartitionAttributesFactory<>();
paf.setRedundantCopies(0);
paf.setTotalNumBuckets(1);
if (shortcut.isProxy()) {
paf.setLocalMaxMemory(0);
}
RegionFactory<Integer, Integer> rf =
ClusterStartupRule.getCache().createRegionFactory(shortcut);
rf.setPartitionAttributes(paf.create());
rf.create(regionName);
}
private void waitToCreateProxyRegion(String regionName) throws Exception {
logger.info(
"RebalanceWhileCreatingRegionDistributedTest.waitToCreateRegion about to wait for Before_RemoveBucketMessage gate");
// Wait after RemoveBucketMessage is sent due to rebalance or moveBucket()
blackboard.waitForGate(BEFORE_REMOVE_BUCKET_MESSAGE);
logger.info(
"RebalanceWhileCreatingRegionDistributedTest.waitToCreateRegion done wait for Before_RemoveBucketMessage gate");
createRegion(regionName, RegionShortcut.PARTITION_PROXY);
logger.info(
"RebalanceWhileCreatingRegionDistributedTest.waitToCreateRegion about to signal After_CreateProxyRegion gate");
blackboard.signalGate(AFTER_CREATE_PROXY_REGION);
logger.info(
"RebalanceWhileCreatingRegionDistributedTest.waitToCreateRegion done signal After_CreateProxyRegion gate");
}
private void waitToCreateSingleBucketProxyRegion(String regionName) throws Exception {
logger.info(
"RebalanceWhileCreatingRegionDistributedTest.waitToCreateRegion about to wait for Before_RemoveBucketMessage gate");
// Wait after RemoveBucketMessage is sent due to rebalance or moveBucket()
blackboard.waitForGate(BEFORE_REMOVE_BUCKET_MESSAGE);
logger.info(
"RebalanceWhileCreatingRegionDistributedTest.waitToCreateRegion done wait for Before_RemoveBucketMessage gate");
createSingleBucketRegion(regionName, RegionShortcut.PARTITION_PROXY);
logger.info(
"RebalanceWhileCreatingRegionDistributedTest.waitToCreateRegion about to signal After_CreateProxyRegion gate");
blackboard.signalGate(AFTER_CREATE_PROXY_REGION);
logger.info(
"RebalanceWhileCreatingRegionDistributedTest.waitToCreateRegion done signal After_CreateProxyRegion gate");
}
private void addDistributionMessageObserver(String regionName) {
DistributionMessageObserver.setInstance(new TestDistributionMessageObserver(regionName));
}
class TestDistributionMessageObserver extends DistributionMessageObserver {
private final String regionName;
public TestDistributionMessageObserver(String regionName) {
this.regionName = regionName;
}
public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
if (message instanceof RemoveBucketMessage) {
logger.info(
"TestDistributionMessageObserver.beforeProcessMessage about to signal Before_RemoveBucketMessage gate");
// When processing RemoveBucketMessage, it will create DestroyRegionMessage.
// At this time, the partitioned region has not been created on the accessor.
// Therefore, DistributionAdvisor doesn't have PartitionProfile from the accessor.
// If the recipients of DestroyRegionMessage is calculated based on DistributionAdvisor,
// the accessor will miss DestroyRegionMessage.
blackboard.signalGate(BEFORE_REMOVE_BUCKET_MESSAGE);
logger.info(
"TestDistributionMessageObserver.beforeProcessMessage done signal Before_RemoveBucketMessage gate");
}
}
public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) {
if (message instanceof DestroyRegionOperation.DestroyRegionMessage) {
DestroyRegionOperation.DestroyRegionMessage drm =
(DestroyRegionOperation.DestroyRegionMessage) message;
if (drm.regionPath.contains(regionName)) {
logger.info(
"TestDistributionMessageObserver.beforeSendMessage about to wait for After_CreateProxyRegion gate regionName={}",
drm.regionPath);
try {
// When processing RemoveBucketMessage, it will create DestroyRegionMessage.
// At this time, the partitioned region has not been created on the accessor.
// Therefore, DistributionAdvisor doesn't have PartitionProfile from the accessor.
// If the recipients of DestroyRegionMessage is calculated based on DistributionAdvisor,
// the accessor will miss DestroyRegionMessage.
// We also don't want to send DestroyRegionMessage too early before the accessor has
// actually start creating the partitioned region.
// Otherwise, the accessor will not have the bucket profile to be removed.
blackboard.waitForGate(AFTER_CREATE_PROXY_REGION);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info(
"TestDistributionMessageObserver.beforeSendMessage done wait for After_CreateProxyRegion gate regionName={}",
drm.regionPath);
}
}
}
}
}