blob: 13228e5cd4f38f54071d052cb3b54c5e7853fae8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.execute;
import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.UseParametersRunnerFactory;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.Locator;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.functions.TestFunction;
import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.ClientServerTest;
import org.apache.geode.test.junit.categories.FunctionServiceTest;
import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
@Category({ClientServerTest.class, FunctionServiceTest.class})
@RunWith(Parameterized.class)
@UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
public class PRClientServerRegionFunctionExecutionFailoverDUnitTest extends PRClientServerTestBase {
private static final Logger logger = LogService.getLogger();
private static Locator locator = null;
private static Region<Integer, Object> region = null;
@Override
protected void postSetUpPRClientServerTestBase() {
IgnoredException.addIgnoredException("Connection reset");
IgnoredException.addIgnoredException("SocketTimeoutException");
IgnoredException.addIgnoredException("ServerConnectivityException");
IgnoredException.addIgnoredException("Socket Closed");
}
@Test
public void testserverMultiKeyExecution_SocektTimeOut() {
createScenario();
Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
registerFunctionAtServer(function);
client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
.serverMultiKeyExecutionSocketTimeOut(Boolean.TRUE));
}
/*
* Ensure that the while executing the function if the servers is down then the execution is
* failover to other available server
*/
@Test
public void testServerFailoverWithTwoServerAliveHA() {
IgnoredException.addIgnoredException("FunctionInvocationTargetException");
ArrayList commonAttributes =
createCommonServerAttributes("TestPartitionedRegion", null, 1, null);
createClientServerScenarion(commonAttributes, 20, 20, 20);
Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA);
registerFunctionAtServer(function);
server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::stopServerHA);
server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::stopServerHA);
client.invoke(PRClientServerRegionFunctionExecutionDUnitTest::putOperation);
int AsyncInvocationArrSize = 1;
AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize];
async[0] = client
.invokeAsync(PRClientServerRegionFunctionExecutionDUnitTest::executeFunctionHA);
server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::startServerHA);
server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::startServerHA);
server1.invoke(PRClientServerRegionFunctionExecutionDUnitTest::stopServerHA);
client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
.verifyDeadAndLiveServers(2));
ThreadUtils.join(async[0], 6 * 60 * 1000);
if (async[0].getException() != null) {
Assert.fail("UnExpected Exception Occurred : ", async[0].getException());
}
List l = (List) async[0].getReturnValue();
assertEquals(2, l.size());
}
/*
* Ensure that the while executing the function if the servers is down then the execution is
* failover to other available server
*/
@Test
public void testServerCacheClosedFailoverWithTwoServerAliveHA() {
IgnoredException.addIgnoredException("FunctionInvocationTargetException");
ArrayList commonAttributes =
createCommonServerAttributes("TestPartitionedRegion", null, 1, null);
createClientServerScenarion(commonAttributes, 20, 20, 20);
Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA);
registerFunctionAtServer(function);
server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::stopServerHA);
server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::stopServerHA);
client.invoke(PRClientServerRegionFunctionExecutionDUnitTest::putOperation);
int AsyncInvocationArrSize = 1;
AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize];
async[0] = client
.invokeAsync(PRClientServerRegionFunctionExecutionDUnitTest::executeFunctionHA);
server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::startServerHA);
server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::startServerHA);
server1.invoke(PRClientServerRegionFunctionExecutionDUnitTest::closeCacheHA);
client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
.verifyDeadAndLiveServers(2));
ThreadUtils.join(async[0], 5 * 60 * 1000);
if (async[0].getException() != null) {
Assert.fail("UnExpected Exception Occurred : ", async[0].getException());
}
List l = (List) async[0].getReturnValue();
assertEquals(2, l.size());
}
@Test
public void testBug40714() {
createScenario();
server1.invoke(
(SerializableRunnableIF) PRClientServerRegionFunctionExecutionDUnitTest::registerFunction);
server1.invoke(
(SerializableRunnableIF) PRClientServerRegionFunctionExecutionDUnitTest::registerFunction);
server1.invoke(
(SerializableRunnableIF) PRClientServerRegionFunctionExecutionDUnitTest::registerFunction);
client.invoke(
(SerializableRunnableIF) PRClientServerRegionFunctionExecutionDUnitTest::registerFunction);
client.invoke(
PRClientServerRegionFunctionExecutionDUnitTest::FunctionExecution_Inline_Bug40714);
}
@Test
public void testOnRegionFailoverWithTwoServerDownHA() {
IgnoredException.addIgnoredException("FunctionInvocationTargetException");
createScenario();
server1.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);
server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);
server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);
client.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createProxyRegion);
Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA_REGION);
registerFunctionAtServer(function);
client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
.regionExecutionHATwoServerDown(Boolean.FALSE, function, Boolean.FALSE));
client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest.verifyMetaData(2, 1));
}
// retry attempts is 2
@Test
public void testOnRegionFailoverWithOneServerDownHA() {
IgnoredException.addIgnoredException("FunctionInvocationTargetException");
createScenario();
server1
.invokeAsync(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);
server2
.invokeAsync(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);
server3
.invokeAsync(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);
client.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createProxyRegion);
Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA_REGION);
registerFunctionAtServer(function);
client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
.regionExecutionHAOneServerDown(Boolean.FALSE, function, Boolean.FALSE));
client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
.verifyMetaData(1, 1));
}
/*
* Ensure that the while executing the function if the servers are down then the execution
* shouldn't failover to other available server
*/
@Test
public void testOnRegionFailoverNonHA() {
createScenario();
IgnoredException.addIgnoredException("FunctionInvocationTargetException");
server1.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);
server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);
server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);
client.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createProxyRegion);
Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_NONHA_REGION);
registerFunctionAtServer(function);
client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
.regionSingleKeyExecutionNonHA(Boolean.FALSE, function, Boolean.FALSE));
client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
.verifyMetaData(1, 0));
}
/*
* Ensure that the while executing the function if the servers are down then the execution
* shouldn't failover to other available server
*/
@Test
public void testOnRegionFailoverNonHASingleHop() {
ArrayList commonAttributes =
createCommonServerAttributes("TestPartitionedRegion", null, 0, null);
createClientServerScenarioSingleHop(commonAttributes, 20, 20, 20);
server1.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);
server2.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);
server3.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createReplicatedRegion);
client.invoke(PRClientServerRegionFunctionExecutionDUnitTest::createProxyRegion);
// Make sure the buckets are created.
client.invoke(new SerializableRunnable() {
@Override
public void run() {
region = cache.getRegion(PRClientServerTestBase.PartitionedRegionName);
for (int i = 0; i < 13; i++) {
region.put(i, i);
}
}
});
// Make sure the client metadata is up to date.
client.invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::fetchMetaData);
Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_NONHA_REGION);
registerFunctionAtServer(function);
final Function function2 = new TestFunction(true, TestFunction.TEST_FUNCTION_NONHA_NOP);
registerFunctionAtServer(function2);
client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
.regionSingleKeyExecutionNonHA(Boolean.FALSE, function, Boolean.FALSE));
// This validation doesn't work because the client may
// still be asynchronously recording the departure of the
// failed server
// System.err.println("Trying the second function");
// //Make sure the client can now execute a function
// //on the server
// client.invoke(new SerializableRunnable() {
// @Override
// public void run() {
// ResultCollector rs = FunctionService.onRegion(region).execute(function2);
// rs.getResult();
// }
// });
client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
.verifyMetaData(1, 0));
}
@Test
public void testServerBucketMovedException() {
IgnoredException.addIgnoredException("BucketMovedException");
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM server3 = host.getVM(2);
VM server4 = host.getVM(3);
disconnectAllFromDS();
ArrayList commonAttributes =
createCommonServerAttributes("TestPartitionedRegion", null, 1, null);
final int portLocator = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
final String hostLocator = NetworkUtils.getServerHostName(server1.getHost());
final String locator = hostLocator + "[" + portLocator + "]";
startLocatorInVM(portLocator);
try {
server1.invoke(() -> createServerWithLocator(locator, false, commonAttributes));
server2.invoke(() -> createServerWithLocator(locator, false, commonAttributes));
server4.invoke(() -> createClientWithLocator(hostLocator, portLocator));
server4.invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::putIntoRegion);
server4.invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::fetchMetaData);
server3.invoke(() -> createServerWithLocator(locator, false, commonAttributes));
Object result = server4
.invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::executeFunction);
List l = (List) result;
assertEquals(2, l.size());
} finally {
stopLocator();
}
}
@Test
public void testServerBucketMovedException_LocalServer() {
IgnoredException.addIgnoredException("BucketMovedException");
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM server4 = host.getVM(3);
ArrayList commonAttributes =
createCommonServerAttributes("TestPartitionedRegion", null, 0, null);
final int portLocator = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
final String hostLocator = NetworkUtils.getServerHostName(server1.getHost());
final String locator = hostLocator + "[" + portLocator + "]";
startLocatorInVM(portLocator);
try {
server1.invoke(() -> createServerWithLocator(locator, false, commonAttributes));
server4.invoke(() -> createClientWithLocator(hostLocator, portLocator));
server4.invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::putIntoRegion);
server4.invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::fetchMetaData);
server2.invoke(() -> createServerWithLocator(locator, false, commonAttributes));
Object result = server4
.invoke(PRClientServerRegionFunctionExecutionFailoverDUnitTest::executeFunction);
List l = (List) result;
assertEquals(2, l.size());
} finally {
stopLocator();
}
}
private static void fetchMetaData() {
((GemFireCacheImpl) cache).getClientMetadataService().getClientPRMetadata((LocalRegion) region);
}
private void startLocatorInVM(final int locatorPort) {
File logFile = new File("locator-" + locatorPort + ".log");
Properties props = new Properties();
props = DistributedTestUtils.getAllDistributedSystemProperties(props);
props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
try {
locator = Locator.startLocatorAndDS(locatorPort, logFile, null, props);
} catch (IOException e) {
Assert.fail("Unable to start locator ", e);
}
}
public static void stopLocator() {
locator.stop();
}
private int createServerWithLocator(String locator, boolean isAccessor, ArrayList commonAttrs) {
Properties props = new Properties();
props.setProperty(LOCATORS, locator);
DistributedSystem ds = getSystem(props);
cache = CacheFactory.create(ds);
CacheServer server = cache.addCacheServer();
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
server.setPort(port);
server.setHostnameForClients("localhost");
try {
server.start();
} catch (IOException e) {
Assert.fail("Failed to start server ", e);
}
PartitionAttributesFactory paf = new PartitionAttributesFactory();
if (isAccessor) {
paf.setLocalMaxMemory(0);
}
paf.setTotalNumBuckets((Integer) commonAttrs.get(3))
.setRedundantCopies((Integer) commonAttrs.get(2));
AttributesFactory attr = new AttributesFactory();
attr.setPartitionAttributes(paf.create());
region = cache.createRegion(regionName, attr.create());
assertNotNull(region);
logger
.info("Partitioned Region " + regionName + " created Successfully :" + region.toString());
return port;
}
private void createClientWithLocator(String host, int port0) {
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
DistributedSystem ds = getSystem(props);
cache = CacheFactory.create(ds);
assertNotNull(cache);
CacheServerTestUtil.disableShufflingOfEndpoints();
Pool p;
try {
p = PoolManager.createFactory().addLocator(host, port0).setPingInterval(250)
.setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(2000)
.setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10).setRetryAttempts(3)
.create("Pool_" + regionName);
} finally {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
AttributesFactory factory = new AttributesFactory();
factory.setPoolName(p.getName());
factory.setDataPolicy(DataPolicy.EMPTY);
RegionAttributes attrs = factory.create();
region = cache.createRegion(regionName, attrs);
assertNotNull(region);
logger
.info("Distributed Region " + regionName + " created Successfully :" + region.toString());
}
public static void putIntoRegion() {
for (int i = 0; i < 113; i++) {
region.put(i, "KB_" + i);
}
logger
.info("Distributed Region " + regionName + " Have size :" + region.size());
}
public static Object executeFunction() {
Execution execute = FunctionService.onRegion(region);
ResultCollector rc = execute.setArguments(Boolean.TRUE)
.execute(new TestFunction(true, TestFunction.TEST_FUNCTION_LASTRESULT));
logger.info("Exeuction Result :" + rc.getResult());
List l = ((List) rc.getResult());
return l;
}
protected void createScenario() {
ArrayList commonAttributes =
createCommonServerAttributes("TestPartitionedRegion", null, 0, null);
createClientServerScenarion(commonAttributes, 20, 20, 20);
}
}