blob: 53cd2a0281a2e4632cfdea572a3901bf86d8a72f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.geode.redis;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.control.RebalanceFactory;
import org.apache.geode.cache.control.ResourceManager;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.internal.cache.BucketAdvisor;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.rules.GfshCommandRule;
public class EnsurePrimaryStaysPutDUnitTest {
@Rule
public ClusterStartupRule cluster = new ClusterStartupRule();
@Rule
public GfshCommandRule gfsh = new GfshCommandRule();
private MemberVM locator;
private MemberVM server1;
private MemberVM server2;
private static final String KEY = "foo";
private static final String VALUE = "bar";
private static final String REGION = "TEST";
@Before
public void setup() throws Exception {
locator = cluster.startLocatorVM(0);
int locatorPort = locator.getPort();
server1 = cluster.startServerVM(1, cf -> cf.withConnectionToLocator(locatorPort));
server2 = cluster.startServerVM(2, cf -> cf.withConnectionToLocator(locatorPort));
gfsh.connectAndVerify(locator);
gfsh.executeAndAssertThat(
String.format("create region --name=%s --type=PARTITION_REDUNDANT", REGION))
.statusIsSuccess();
server1.invoke(() -> FunctionService.registerFunction(new CheckPrimaryBucketFunction()));
server2.invoke(() -> FunctionService.registerFunction(new CheckPrimaryBucketFunction()));
}
@Test
public void primaryRemainsWhileLocalFunctionExecutes() throws InterruptedException {
primaryRemainsWhileFunctionExecutes(true, false);
}
@Test
public void primaryRemainsWhileRemoteFunctionExecutes() throws InterruptedException {
primaryRemainsWhileFunctionExecutes(false, false);
}
@Test
public void localFunctionRetriesIfNotOnPrimary() throws InterruptedException {
primaryRemainsWhileFunctionExecutes(true, true);
}
@Test
public void remoteFunctionRetriesIfNotOnPrimary() throws InterruptedException {
primaryRemainsWhileFunctionExecutes(false, true);
}
private void primaryRemainsWhileFunctionExecutes(boolean runLocally, boolean releaseLatchEarly)
throws InterruptedException {
// Create entry and return name of primary
String memberForPrimary = server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
Region<String, String> region = cache.getRegion("TEST");
region.put(KEY, VALUE);
GeodeAwaitility.await()
.until(() -> PartitionRegionHelper.getRedundantMembersForKey(region, KEY).size() == 1);
rebalanceRegions(cache, region);
return awaitForPrimary(region);
});
// who is primary?
MemberVM primary = memberForPrimary.equals("server-1") ? server1 : server2;
MemberVM secondary = memberForPrimary.equals("server-1") ? server2 : server1;
MemberVM memberToRunOn = runLocally ? primary : secondary;
AsyncInvocation<Boolean> asyncChecking =
memberToRunOn.invokeAsync("CheckPrimaryBucketFunction", () -> {
InternalCache cache = ClusterStartupRule.getCache();
Region<String, String> region = cache.getRegion(REGION);
@SuppressWarnings("unchecked")
ResultCollector<?, List<Boolean>> rc = FunctionService.onRegion(region)
.withFilter(Collections.singleton(KEY))
.setArguments(releaseLatchEarly)
.execute(CheckPrimaryBucketFunction.class.getName());
return rc.getResult().get(0);
});
primary.invoke("waitForFunctionToStart", () -> {
CheckPrimaryBucketFunction fn =
(CheckPrimaryBucketFunction) FunctionService.getRegisteredFunctions()
.get(CheckPrimaryBucketFunction.ID);
fn.waitForFunctionToStart();
});
// switch primary to secondary while running test fn()
secondary.invoke("Switching bucket", () -> {
InternalCache cache = ClusterStartupRule.getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion(REGION);
// get bucketId
int bucketId = PartitionedRegionHelper.getHashKey(region, KEY);
BucketAdvisor bucketAdvisor = region.getRegionAdvisor().getBucketAdvisor(bucketId);
bucketAdvisor.becomePrimary(false);
CheckPrimaryBucketFunction fn =
(CheckPrimaryBucketFunction) FunctionService.getRegisteredFunctions()
.get(CheckPrimaryBucketFunction.ID);
fn.finishedMovingPrimary();
});
primary.invoke("finishedMovingPrimary", () -> {
CheckPrimaryBucketFunction fn =
(CheckPrimaryBucketFunction) FunctionService.getRegisteredFunctions()
.get(CheckPrimaryBucketFunction.ID);
fn.finishedMovingPrimary();
});
assertThat(asyncChecking.get())
.as("CheckPrimaryBucketFunction determined that the primary has moved")
.isTrue();
}
private static String awaitForPrimary(Region<String, String> region) {
AtomicReference<String> lastPrimary =
new AtomicReference<>(PartitionRegionHelper.getPrimaryMemberForKey(region, KEY).getName());
GeodeAwaitility.await()
.during(10, TimeUnit.SECONDS)
.atMost(60, TimeUnit.SECONDS)
.until(() -> {
String currentPrimary =
PartitionRegionHelper.getPrimaryMemberForKey(region, KEY).getName();
return lastPrimary.getAndSet(currentPrimary).equals(currentPrimary);
});
return lastPrimary.get();
}
private static void rebalanceRegions(Cache cache, Region<?, ?> region) {
ResourceManager manager = cache.getResourceManager();
Set<String> includeRegions = new HashSet<>();
includeRegions.add(region.getName());
RebalanceFactory factory = manager.createRebalanceFactory();
factory.includeRegions(includeRegions);
try {
factory.start().getResults();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}