blob: 9ea26f39878d994d0f2c3f20e2cc4bab8486a366 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.execute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.client.PoolFactory;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.execute.Execution;
import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.Locator;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.VM;
/**
*
* @author sbawaska
*/
public class OnGroupsFunctionExecutionDUnitTest extends DistributedTestCase {
public OnGroupsFunctionExecutionDUnitTest(String name) {
super(name);
}
@Override
public void tearDown2() throws Exception {
invokeInEveryVM(new SerializableCallable() {
@Override
public Object call() throws Exception {
Cache c = null;
try {
c = CacheFactory.getAnyInstance();
if (c != null) {
c.close();
}
} catch (CacheClosedException e) {
}
return null;
}
});
}
static class OnGroupsFunction extends FunctionAdapter {
private static final long serialVersionUID = -1032915440862585532L;
public static final String Id = "OnGroupsFunction";
public static int invocationCount;
@Override
public void execute(FunctionContext context) {
getLogWriter().fine("SWAP:1:executing OnGroupsFunction:"+invocationCount);
InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance();
invocationCount++;
ArrayList<String> l = (ArrayList<String>) context.getArguments();
if (l != null) {
assertFalse(Collections.disjoint(l, ds.getDistributedMember().getGroups()));
}
if(hasResult()) {
context.getResultSender().lastResult(Boolean.TRUE);
}
}
@Override
public String getId() {
return Id;
}
}
private void initVM(VM vm, final String groups, final String regionName, final boolean startServer) {
vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Properties props = new Properties();
props.put("groups", groups);
if (regionName != null) {
Cache c = null;
try {
c = CacheFactory.getInstance(getSystem(props));
c.close();
} catch (CacheClosedException cce) {
}
c = CacheFactory.create(getSystem(props));
c.createRegionFactory(RegionShortcut.PARTITION).create(regionName);
if (startServer) {
CacheServer s = c.addCacheServer();
s.setPort(AvailablePortHelper.getRandomAvailableTCPPort());
s.start();
}
} else {
getSystem(props);
}
return null;
}
});
}
private void registerFunction(VM vm) {
vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
FunctionService.registerFunction(new OnGroupsFunction());
return null;
}
});
}
private void verifyAndResetInvocationCount(VM vm, final int count) {
vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
OnGroupsFunction f = (OnGroupsFunction) FunctionService.getFunction(OnGroupsFunction.Id);
assertEquals(count, f.invocationCount);
// assert succeeded, reset count
f.invocationCount = 0;
return null;
}
});
}
private int getAndResetInvocationCount(VM vm) {
return (Integer) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
OnGroupsFunction f = (OnGroupsFunction) FunctionService.getFunction(OnGroupsFunction.Id);
int count = f.invocationCount;
f.invocationCount = 0;
return count;
}
});
}
private int getInvocationCount(VM vm) {
return (Integer) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
OnGroupsFunction f = (OnGroupsFunction) FunctionService.getFunction(OnGroupsFunction.Id);
int count = f.invocationCount;
f.invocationCount = 0;
return count;
}
});
}
private void resetInvocationCount(VM vm) {
vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
OnGroupsFunction f = (OnGroupsFunction) FunctionService.getFunction(OnGroupsFunction.Id);
f.invocationCount = 0;
return null;
}
});
}
public void testBasicP2PFunctionNoCache() {
doBasicP2PFunctionNoCache(false);
}
public void testBasicP2pRegisteredFunctionNoCache() {
doBasicP2PFunctionNoCache(true);
}
private void doBasicP2PFunctionNoCache(final boolean registerFunction) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
initVM(vm0, "g0,gm", null, false);
initVM(vm1, "g1", null, false);
initVM(vm2, "g0,g1", null, false);
if (registerFunction) {
registerFunction(vm0);
registerFunction(vm1);
registerFunction(vm2);
}
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
getLogWriter().fine("SWAP:invoking on gm");
DistributedSystem ds = getSystem();
try {
FunctionService.onMember("no such group");
fail("expected exception not thrown");
} catch (FunctionException e) {
}
Execution e = FunctionService.onMembers("gm");
ArrayList<String> args = new ArrayList<String>();
args.add("gm");
e = e.withArgs(args);
if (registerFunction) {
e.execute(OnGroupsFunction.Id).getResult();
} else {
e.execute(new OnGroupsFunction()).getResult();
}
return null;
}
});
verifyAndResetInvocationCount(vm0, 1);
verifyAndResetInvocationCount(vm1, 0);
verifyAndResetInvocationCount(vm2, 0);
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
DistributedSystem ds = getSystem();
getLogWriter().fine("SWAP:invoking on g0");
Execution e = FunctionService.onMembers("g0");
ArrayList<String> args = new ArrayList<String>();
args.add("g0");
e = e.withArgs(args);
if (registerFunction) {
e.execute(OnGroupsFunction.Id).getResult();
} else {
e.execute(new OnGroupsFunction()).getResult();
}
return null;
}
});
verifyAndResetInvocationCount(vm0, 1);
verifyAndResetInvocationCount(vm1, 0);
verifyAndResetInvocationCount(vm2, 1);
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
DistributedSystem ds = getSystem();
Execution e = FunctionService.onMembers("g1");
ArrayList<String> args = new ArrayList<String>();
args.add("g1");
e = e.withArgs(args);
if (registerFunction) {
e.execute(OnGroupsFunction.Id).getResult();
} else {
e.execute(new OnGroupsFunction()).getResult();
}
return null;
}
});
verifyAndResetInvocationCount(vm0, 0);
verifyAndResetInvocationCount(vm1, 1);
verifyAndResetInvocationCount(vm2, 1);
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
getLogWriter().fine("SWAP:invoking on g0 g1");
InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance();
Execution e = FunctionService.onMembers("g0", "g1");
ArrayList<String> args = new ArrayList<String>();
args.add("g0");args.add("g1");
e = e.withArgs(args);
if (registerFunction) {
e.execute(OnGroupsFunction.Id).getResult();
} else {
e.execute(new OnGroupsFunction()).getResult();
}
return null;
}
});
verifyAndResetInvocationCount(vm0, 1);
verifyAndResetInvocationCount(vm1, 1);
verifyAndResetInvocationCount(vm2, 1);
}
public void testonMember() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
initVM(vm0, "g0,gm", null, false);
initVM(vm1, "g1", null, false);
initVM(vm2, "g0,g1", null, false);
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
DistributedSystem ds = getSystem();
try {
FunctionService.onMember("no such group");
fail("expected exception not thrown");
} catch (FunctionException e) {
}
try {
FunctionService.onMember();
fail("expected exception not thrown");
} catch (FunctionException e) {
}
FunctionService.onMember("g1").execute(new OnGroupsFunction()).getResult();
return null;
}
});
int c0 = getAndResetInvocationCount(vm0);
int c1 = getAndResetInvocationCount(vm1);
int c2 = getAndResetInvocationCount(vm2);
assertEquals(1, c0 + c1 + c2);
// test that function is invoked locally when this member belongs to group
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
DistributedSystem ds = getSystem();
FunctionService.onMember("g0").execute(new OnGroupsFunction()).getResult();
return null;
}
});
verifyAndResetInvocationCount(vm0, 1);
verifyAndResetInvocationCount(vm1, 0);
verifyAndResetInvocationCount(vm2, 0);
}
static class OnGroupMultiResultFunction extends FunctionAdapter {
private static final long serialVersionUID = 8190290175486881994L;
public static final String Id = "OnGroupMultiResultFunction";
@Override
public void execute(FunctionContext context) {
// send 5 1s
for (int i=0; i<4; i++) {
context.getResultSender().sendResult(1);
}
context.getResultSender().lastResult(1);
}
@Override
public String getId() {
return Id;
}
}
public void testBasicP2PFunction() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final String regionName = getName();
initVM(vm0, "g0,mg", regionName, false);
initVM(vm1, "g1", regionName, false);
initVM(vm2, "g0,g1", regionName, false);
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
DistributedSystem ds = getSystem();
Execution e = FunctionService.onMembers("mg");
ArrayList<Integer> l = (ArrayList<Integer>) e.execute(new OnGroupMultiResultFunction()).getResult();
int sum = 0;
for (int i=0; i<l.size(); i++) {
sum += l.get(i);
}
assertEquals(5, sum);
return null;
}
});
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
DistributedSystem ds = getSystem();
Execution e = FunctionService.onMembers("g0");
ArrayList<Integer> l = (ArrayList<Integer>) e.execute(new OnGroupMultiResultFunction()).getResult();
int sum = 0;
for (int i=0; i<l.size(); i++) {
sum += l.get(i);
}
assertEquals(10, sum);
return null;
}
});
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
DistributedSystem ds = getSystem();
Execution e = FunctionService.onMembers("g0", "g1");
ArrayList<Integer> l = (ArrayList<Integer>) e.execute(new OnGroupMultiResultFunction()).getResult();
int sum = 0;
for (int i=0; i<l.size(); i++) {
sum += l.get(i);
}
assertEquals(15, sum);
return null;
}
});
}
private int getLocatorPort(VM locator) {
return (Integer) locator.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
return Locator.getLocator().getPort();
}
});
}
static class OnGroupsExceptionFunction extends FunctionAdapter {
private static final long serialVersionUID = 6488843931404616442L;
public static final String Id = "OnGroupsExceptionFunction";
@Override
public void execute(FunctionContext context) {
ArrayList<String> args = (ArrayList<String>) context.getArguments();
if (args.get(0).equals("runtime")) {
if (args.size() > 1) {
String group = args.get(1);
InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance();
if (ds.getDistributedMember().getGroups().contains(group)) {
throw new NullPointerException();
}
} else {
throw new NullPointerException();
}
} else {
InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance();
if (args.size() > 1) {
String group = args.get(1);
if (ds.getDistributedMember().getGroups().contains(group)) {
ds.disconnect();
}
} else {
ds.disconnect();
}
}
context.getResultSender().lastResult(Boolean.TRUE);
}
@Override
public String getId() {
return Id;
}
}
public void testP2PException () {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final String regionName = getName();
//The test function deliberately throws a null pointer exception.
//which is logged.
addExpectedException(NullPointerException.class.getSimpleName());
initVM(vm0, "g0,mg", regionName, false);
initVM(vm1, "g1", regionName, false);
initVM(vm2, "g0,g1,g2", regionName, false);
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
DistributedSystem ds = getSystem();
Execution e = FunctionService.onMembers("mg");
ArrayList<String> args = new ArrayList<String>();
args.add("runtime");
e = e.withArgs(args);
try {
e.execute(new OnGroupsExceptionFunction()).getResult();
fail("expected exception not thrown");
} catch (FunctionException ex) {
assertTrue(ex.getCause() instanceof NullPointerException);
}
Execution e1 = FunctionService.onMembers("g1");
e1 = e1.withArgs(args);
try {
e1.execute(new OnGroupsExceptionFunction()).getResult();
fail("expected exception not thrown");
} catch (FunctionException ex) {
assertTrue(ex.getCause() instanceof NullPointerException);
}
// fail on only one member
Execution e2 = FunctionService.onMembers("g1");
args.add("g2");
e2 = e2.withArgs(args);
try {
e2.execute(new OnGroupsExceptionFunction()).getResult();
fail("expected exception not thrown");
} catch (FunctionException ex) {
assertTrue(ex.getCause() instanceof NullPointerException);
}
return null;
}
});
}
public void testP2PMemberFailure() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final String regionName = getName();
initVM(vm0, "g0,mg", regionName, false);
initVM(vm1, "g1", regionName, false);
initVM(vm2, "g0,g1,g2", regionName, false);
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
DistributedSystem ds = getSystem();
Execution e1 = FunctionService.onMembers("g1");
ArrayList<String> args = new ArrayList<String>();
args.add("shutdown");
e1 = e1.withArgs(args);
try {
e1.execute(new OnGroupsExceptionFunction()).getResult();
fail("expected exception not thrown");
} catch (FunctionException ex) {
assertTrue(ex.getCause() instanceof FunctionInvocationTargetException);
}
return null;
}
});
}
public void testP2POneMemberFailure() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final String regionName = getName();
initVM(vm0, "g0,mg", regionName, false);
initVM(vm1, "g1", regionName, false);
initVM(vm2, "g0,g1,g2", regionName, false);
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
DistributedSystem ds = getSystem();
Execution e1 = FunctionService.onMembers("g1");
ArrayList<String> args = new ArrayList<String>();
args.add("shutdown");
args.add("g2");
e1 = e1.withArgs(args);
try {
e1.execute(new OnGroupsExceptionFunction()).getResult();
fail("expected exception not thrown");
} catch (FunctionException ex) {
assertTrue(ex.getCause() instanceof FunctionInvocationTargetException);
}
return null;
}
});
}
public void testP2PIgnoreMemberFailure() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final String regionName = getName();
initVM(vm0, "g0,mg", regionName, false);
initVM(vm1, "g1", regionName, false);
initVM(vm2, "g0,g1,g2", regionName, false);
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
DistributedSystem ds = getSystem();
Execution e1 = FunctionService.onMembers("g1");
ArrayList<String> args = new ArrayList<String>();
args.add("shutdown");
args.add("g2");
e1 = e1.withArgs(args);
((AbstractExecution)e1).setIgnoreDepartedMembers(true);
ArrayList l = (ArrayList) e1.execute(new OnGroupsExceptionFunction()).getResult();
assertEquals(2, l.size());
if (l.get(0) instanceof FunctionInvocationTargetException) {
assertTrue((Boolean) l.get(1));
} else if (l.get(0) instanceof Boolean) {
assertTrue(l.get(1) instanceof FunctionInvocationTargetException);
} else {
fail("expected to find a Boolean or throwable at index 0");
}
return null;
}
});
}
public void testBasicClientServerFunction() {
dotestBasicClientServerFunction(false, true);
}
public void testBasicClientServerRegisteredFunction() {
dotestBasicClientServerFunction(true, true);
}
public void testBasicClientServerFunctionNoArgs() {
dotestBasicClientServerFunction(false, false);
}
public void testBasicClientServerRegisteredFunctionNoArgs() {
dotestBasicClientServerFunction(true, false);
}
private void dotestBasicClientServerFunction(final boolean register, final boolean withArgs) {
Host host = Host.getHost(0);
VM server0 = host.getVM(0);
VM server1 = host.getVM(1);
VM server2 = host.getVM(2);
VM client = host.getVM(3);
VM locator = Host.getLocator();
final String regionName = getName();
initVM(server0, "mg,g0", regionName, true);
initVM(server1, "g1", regionName, true);
initVM(server2, "g0,g1", regionName, true);
if (register) {
registerFunction(server0);
registerFunction(server1);
registerFunction(server2);
}
final int locatorPort = getLocatorPort(locator);
final String hostName = host.getHostName();
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
Cache c = CacheFactory.getAnyInstance();
c.close();
} catch (CacheClosedException cce) {
}
disconnectFromDS();
getLogWriter().fine("SWAP:creating client cache");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolLocator(hostName, locatorPort);
ccf.setPoolServerGroup("mg");
ccf.set("log-level", getDUnitLogLevel());
ClientCache c = ccf.create();
c.getLogger().info("SWAP:invoking function from client on g0");
Execution e = InternalFunctionService.onServers(c, "g0");
if (withArgs) {
ArrayList<String> args = new ArrayList<String>();
args.add("g0");
e = e.withArgs(args);
}
if (register) {
e.execute(OnGroupsFunction.Id).getResult();
} else {
e.execute(new OnGroupsFunction()).getResult();
}
return null;
}
});
verifyAndResetInvocationCount(server0, 1);
verifyAndResetInvocationCount(server1, 0);
verifyAndResetInvocationCount(server2, 1);
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCache c = ClientCacheFactory.getAnyInstance();
c.getLogger().fine("SWAP:invoking function from client on mg");
Execution e = InternalFunctionService.onServers(c, "mg");
if (withArgs) {
ArrayList<String> args = new ArrayList<String>();
args.add("mg");
e = e.withArgs(args);
}
if (register) {
e.execute(OnGroupsFunction.Id).getResult();
} else {
e.execute(new OnGroupsFunction()).getResult();
}
return null;
}
});
verifyAndResetInvocationCount(server0, 1);
verifyAndResetInvocationCount(server1, 0);
verifyAndResetInvocationCount(server2, 0);
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCache c = ClientCacheFactory.getAnyInstance();
c.getLogger().fine("SWAP:invoking function from client on g0 g1");
Execution e = InternalFunctionService.onServers(c, "g0", "g1");
if (withArgs) {
ArrayList<String> args = new ArrayList<String>();
args.add("g0");args.add("g1");
e = e.withArgs(args);
}
if (register) {
e.execute(OnGroupsFunction.Id).getResult();
} else {
e.execute(new OnGroupsFunction()).getResult();
}
return null;
}
});
verifyAndResetInvocationCount(server0, 1);
verifyAndResetInvocationCount(server1, 1);
verifyAndResetInvocationCount(server2, 1);
}
public void testStreamingClientServerFunction() {
Host host = Host.getHost(0);
VM server0 = host.getVM(0);
VM server1 = host.getVM(1);
VM server2 = host.getVM(2);
VM client = host.getVM(3);
VM locator = Host.getLocator();
final String regionName = getName();
initVM(server0, "mg,g0", regionName, true);
initVM(server1, "g1", regionName, true);
initVM(server2, "g0,g1", regionName, true);
final int locatorPort = getLocatorPort(locator);
final String hostName = host.getHostName();
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
Cache c = CacheFactory.getAnyInstance();
c.close();
} catch (CacheClosedException cce) {
}
disconnectFromDS();
getLogWriter().fine("SWAP:creating client cache");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolLocator(hostName, locatorPort);
ccf.setPoolServerGroup("mg");
ccf.set("log-level", getDUnitLogLevel());
ClientCache c = ccf.create();
c.getLogger().info("SWAP:invoking function from client on g0");
Execution e = InternalFunctionService.onServers(c, "g0");
ArrayList<Integer> l = (ArrayList<Integer>) e.execute(new OnGroupMultiResultFunction()).getResult();
int sum = 0;
for (int i=0; i<l.size(); i++) {
sum += l.get(i);
}
assertEquals(10, sum);
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCache c = ClientCacheFactory.getAnyInstance();
c.getLogger().fine("SWAP:invoking function from client on mg");
Execution e = InternalFunctionService.onServers(c, "mg");
ArrayList<Integer> l = (ArrayList<Integer>) e.execute(new OnGroupMultiResultFunction()).getResult();
int sum = 0;
for (int i=0; i<l.size(); i++) {
sum += l.get(i);
}
assertEquals(5, sum);
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCache c = ClientCacheFactory.getAnyInstance();
c.getLogger().fine("SWAP:invoking function from client on g0 g1");
Execution e = InternalFunctionService.onServers(c, "g0", "g1");
ArrayList<Integer> l = (ArrayList<Integer>) e.execute(new OnGroupMultiResultFunction()).getResult();
int sum = 0;
for (int i=0; i<l.size(); i++) {
sum += l.get(i);
}
assertEquals(15, sum);
return null;
}
});
}
public void testOnServer() {
Host host = Host.getHost(0);
VM server0 = host.getVM(0);
VM server1 = host.getVM(1);
VM server2 = host.getVM(2);
VM client = host.getVM(3);
VM locator = Host.getLocator();
final String regionName = getName();
initVM(server0, "mg,g0", regionName, true);
initVM(server1, "g1", regionName, true);
initVM(server2, "g0,g1,g2", regionName, true);
final int locatorPort = getLocatorPort(locator);
final String hostName = host.getHostName();
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
Cache c = CacheFactory.getAnyInstance();
c.close();
} catch (CacheClosedException cce) {
}
disconnectFromDS();
getLogWriter().fine("SWAP:creating client cache");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolLocator(hostName, locatorPort);
ccf.setPoolServerGroup("mg");
ccf.set("log-level", getDUnitLogLevel());
ClientCache c = ccf.create();
ExpectedException ex = addExpectedException("No member found");
try {
InternalFunctionService.onServer(c, "no such group").execute(new OnGroupsFunction()).getResult();
fail("expected exception not thrown");
} catch (FunctionException e) {
} finally {
ex.remove();
}
InternalFunctionService.onServer(c, "g1").execute(new OnGroupsFunction()).getResult();
return null;
}
});
int c0 = getAndResetInvocationCount(server0);
int c1 = getAndResetInvocationCount(server1);
int c2 = getAndResetInvocationCount(server2);
assertEquals(1, c0 + c1 + c2);
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCache c = ClientCacheFactory.getAnyInstance();
InternalFunctionService.onServer(c, "g0").execute(new OnGroupsFunction()).getResult();
return null;
}
});
verifyAndResetInvocationCount(server0, 1);
verifyAndResetInvocationCount(server1, 0);
verifyAndResetInvocationCount(server2, 0);
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCache c = ClientCacheFactory.getAnyInstance();
InternalFunctionService.onServer(c, "mg", "g1").execute(new OnGroupsFunction()).getResult();
return null;
}
});
c0 = getAndResetInvocationCount(server0);
c1 = getAndResetInvocationCount(server1);
c2 = getAndResetInvocationCount(server2);
assertEquals(2, c0 + c1 + c2);
}
public void testClientServerException() {
Host host = Host.getHost(0);
VM server0 = host.getVM(0);
VM server1 = host.getVM(1);
VM server2 = host.getVM(2);
VM client = host.getVM(3);
VM locator = Host.getLocator();
final String regionName = getName();
initVM(server0, "mg,g0", regionName, true);
initVM(server1, "g1", regionName, true);
initVM(server2, "g0,g1,g2", regionName, true);
final int locatorPort = getLocatorPort(locator);
final String hostName = host.getHostName();
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
Cache c = CacheFactory.getAnyInstance();
c.close();
} catch (CacheClosedException cce) {
}
disconnectFromDS();
getLogWriter().fine("SWAP:creating client cache");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolLocator(hostName, locatorPort);
ccf.setPoolServerGroup("mg");
ccf.set("log-level", getDUnitLogLevel());
ClientCache c = ccf.create();
ExpectedException expected = addExpectedException("No member found");
try {
InternalFunctionService.onServers(c, "no such group").execute(new OnGroupsFunction()).getResult();
fail("expected exception not thrown");
} catch (FunctionException e) {
} finally {
expected.remove();
}
addExpectedException("NullPointerException");
Execution e = InternalFunctionService.onServers(c, "mg");
ArrayList<String> args = new ArrayList<String>();
args.add("runtime");
e = e.withArgs(args);
try {
e.execute(new OnGroupsExceptionFunction()).getResult();
fail("expected exception not thrown");
} catch (FunctionException ex) {
assertTrue(ex.getCause() instanceof NullPointerException);
}
Execution e1 = InternalFunctionService.onServers(c, "g1");
e1 = e1.withArgs(args);
try {
e1.execute(new OnGroupsExceptionFunction()).getResult();
fail("expected exception not thrown");
} catch (FunctionException ex) {
assertTrue(ex.getCause() instanceof NullPointerException);
}
// only one member
Execution e2 = InternalFunctionService.onServers(c, "g1");
args.add("g2");
e2 = e2.withArgs(args);
try {
e2.execute(new OnGroupsExceptionFunction()).getResult();
fail("expected exception not thrown");
} catch (FunctionException ex) {
assertTrue(ex.getCause() instanceof NullPointerException);
}
return null;
}
});
}
public void testClientServerMemberFailure() {
Host host = Host.getHost(0);
VM server0 = host.getVM(0);
VM server1 = host.getVM(1);
VM server2 = host.getVM(2);
VM client = host.getVM(3);
VM locator = Host.getLocator();
final String regionName = getName();
initVM(server0, "mg,g0", regionName, true);
initVM(server1, "g1", regionName, true);
initVM(server2, "g0,g1,g2", regionName, true);
final int locatorPort = getLocatorPort(locator);
final String hostName = host.getHostName();
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
Cache c = CacheFactory.getAnyInstance();
c.close();
} catch (CacheClosedException cce) {
}
disconnectFromDS();
getLogWriter().fine("SWAP:creating client cache");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolLocator(hostName, locatorPort);
ccf.setPoolServerGroup("mg");
ccf.set("log-level", getDUnitLogLevel());
ClientCache c = ccf.create();
Execution e = InternalFunctionService.onServers(c, "g1");
ArrayList<String> args = new ArrayList<String>();
args.add("disconnect");
e = e.withArgs(args);
addExpectedException("FunctionInvocationTargetException");
try {
e.execute(new OnGroupsExceptionFunction()).getResult();
fail("expected exception not thrown");
} catch (FunctionException ex) {
assertTrue(ex.getCause() instanceof FunctionInvocationTargetException);
}
return null;
}
});
}
public void testClientServerOneMemberFailure() {
Host host = Host.getHost(0);
VM server0 = host.getVM(0);
VM server1 = host.getVM(1);
VM server2 = host.getVM(2);
VM client = host.getVM(3);
VM locator = Host.getLocator();
final String regionName = getName();
initVM(server0, "mg,g0", regionName, true);
initVM(server1, "g1", regionName, true);
initVM(server2, "g0,g1,g2", regionName, true);
final int locatorPort = getLocatorPort(locator);
final String hostName = host.getHostName();
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
Cache c = CacheFactory.getAnyInstance();
c.close();
} catch (CacheClosedException cce) {
}
disconnectFromDS();
getLogWriter().fine("SWAP:creating client cache");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolLocator(hostName, locatorPort);
ccf.setPoolServerGroup("mg");
ccf.set("log-level", getDUnitLogLevel());
ClientCache c = ccf.create();
Execution e = InternalFunctionService.onServers(c, "g1");
ArrayList<String> args = new ArrayList<String>();
args.add("disconnect");
args.add("g2");
e = e.withArgs(args);
addExpectedException("FunctionInvocationTargetException");
try {
e.execute(new OnGroupsExceptionFunction()).getResult();
fail("expected exception not thrown");
} catch (FunctionException ex) {
assertTrue(ex.getCause() instanceof FunctionInvocationTargetException);
}
return null;
}
});
}
public void testClientServerIgnoreMemberFailure() {
Host host = Host.getHost(0);
VM server0 = host.getVM(0);
VM server1 = host.getVM(1);
VM server2 = host.getVM(2);
VM client = host.getVM(3);
VM locator = Host.getLocator();
final String regionName = getName();
initVM(server0, "mg,g0", regionName, true);
initVM(server1, "g1", regionName, true);
initVM(server2, "g0,g1,g2", regionName, true);
final int locatorPort = getLocatorPort(locator);
final String hostName = host.getHostName();
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
Cache c = CacheFactory.getAnyInstance();
c.close();
} catch (CacheClosedException cce) {
}
disconnectFromDS();
getLogWriter().fine("SWAP:creating client cache");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolLocator(hostName, locatorPort);
ccf.setPoolServerGroup("mg");
ccf.set("log-level", getDUnitLogLevel());
ClientCache c = ccf.create();
Execution e = InternalFunctionService.onServers(c, "g1");
ArrayList<String> args = new ArrayList<String>();
args.add("disconnect");
args.add("g2");
e = e.withArgs(args);
((AbstractExecution)e).setIgnoreDepartedMembers(true);
ArrayList l = (ArrayList) e.execute(new OnGroupsExceptionFunction()).getResult();
getLogWriter().info("SWAP:result:"+l);
assertEquals(2, l.size());
if (l.get(0) instanceof Throwable) {
assertTrue((Boolean) l.get(1));
} else if (l.get(0) instanceof Boolean) {
assertTrue(l.get(1) instanceof Throwable);
} else {
fail("expected to find a Boolean or throwable at index 0");
}
return null;
}
});
}
static class OnGroupsNoAckFunction extends OnGroupsFunction {
@Override
public boolean hasResult() {
return false;
}
@Override
public boolean isHA() {
return false;
}
}
public void testNoAckGroupsFunction() {
//Workaround for #52005. This is a product bug
//that should be fixed
addExpectedException("Cannot return any result");
Host host = Host.getHost(0);
final VM server0 = host.getVM(0);
final VM server1 = host.getVM(1);
final VM server2 = host.getVM(2);
VM client = host.getVM(3);
VM locator = Host.getLocator();
final String regionName = getName();
initVM(server0, "mg,g0", regionName, true);
initVM(server1, "g1", regionName, true);
initVM(server2, "g0,g1", regionName, true);
final int locatorPort = getLocatorPort(locator);
final String hostName = host.getHostName();
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
Cache c = CacheFactory.getAnyInstance();
c.close();
} catch (CacheClosedException cce) {
}
disconnectFromDS();
getLogWriter().fine("SWAP:creating client cache");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolLocator(hostName, locatorPort);
ccf.setPoolServerGroup("mg");
ccf.set("log-level", getDUnitLogLevel());
ClientCache c = ccf.create();
c.getLogger().info("SWAP:invoking function from client on g0");
Execution e = InternalFunctionService.onServers(c, "g0");
e.execute(new OnGroupsNoAckFunction());
return null;
}
});
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
int c0 = getInvocationCount(server0);
int c1 = getInvocationCount(server1);
int c2 = getInvocationCount(server2);
return (c0 + c1 + c2) == 2;
}
@Override
public String description() {
return "OnGroupsNoAck invocation count mismatch";
}
};
DistributedTestCase.waitForCriterion(wc, 30000, 1000, true);
resetInvocationCount(server0);
resetInvocationCount(server1);
resetInvocationCount(server2);
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCache c = ClientCacheFactory.getAnyInstance();
Execution e = InternalFunctionService.onServer(c, "g1");
e.execute(new OnGroupsNoAckFunction());
return null;
}
});
// pause here to verify that we do not get more than 1 invocation
DistributedTestCase.pause(5000);
WaitCriterion wc2 = new WaitCriterion() {
@Override
public boolean done() {
int c0 = getInvocationCount(server0);
int c1 = getInvocationCount(server1);
int c2 = getInvocationCount(server2);
return (c0 + c1 + c2) == 1;
}
@Override
public String description() {
return "OnGroupsNoAck invocation count mismatch";
}
};
DistributedTestCase.waitForCriterion(wc2, 30000, 1000, true);
}
}