blob: 3ce8cec00738e5a2ac813c42c0c4e4916e0f6451 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import javax.naming.Context;
import javax.transaction.UserTransaction;
import org.junit.Ignore;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.CacheLoader;
import com.gemstone.gemfire.cache.CacheLoaderException;
import com.gemstone.gemfire.cache.CacheTransactionManager;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.CommitConflictException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.LoaderHelper;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.Region.Entry;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
import com.gemstone.gemfire.cache.TransactionEvent;
import com.gemstone.gemfire.cache.TransactionException;
import com.gemstone.gemfire.cache.TransactionId;
import com.gemstone.gemfire.cache.TransactionInDoubtException;
import com.gemstone.gemfire.cache.TransactionWriter;
import com.gemstone.gemfire.cache.TransactionWriterException;
import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.client.PoolFactory;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.execute.Execution;
import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.cache.util.CacheWriterAdapter;
import com.gemstone.gemfire.cache.util.TransactionListenerAdapter;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.execute.TransactionFunctionService;
import com.gemstone.gemfire.internal.cache.execute.data.CustId;
import com.gemstone.gemfire.internal.cache.execute.data.Customer;
import com.gemstone.gemfire.internal.cache.execute.data.Order;
import com.gemstone.gemfire.internal.cache.execute.data.OrderId;
import com.gemstone.gemfire.internal.cache.execute.util.CommitFunction;
import com.gemstone.gemfire.internal.cache.execute.util.RollbackFunction;
import com.gemstone.gemfire.internal.cache.tx.ClientTXStateStub;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* Tests the basic client-server transaction functionality
* @author sbawaska
*/
public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest {
protected static final int MAX_ENTRIES = 10;
protected static final String OTHER_REGION = "OtherRegion";
public ClientServerTransactionDUnitTest(String name) {
super(name);
}
public void setUp() throws Exception {
super.setUp();
addExpectedException("java.net.SocketException");
}
private Integer createRegionsAndStartServer(VM vm, boolean accessor) {
return createRegionOnServer(vm, true, accessor);
}
private void createRegionOnServer(VM vm) {
createRegionOnServer(vm, false, false);
}
private Integer createRegionOnServer(VM vm, final boolean startServer,
final boolean accessor) {
return createRegionOnServer(vm, startServer, accessor, 0);
}
private Integer createRegionOnServer(VM vm, final boolean startServer, final boolean accessor, final int redundantCopies) {
return (Integer)vm.invoke(new SerializableCallable() {
public Object call() throws Exception {
createRegion(accessor, redundantCopies, null);
if (startServer) {
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer s = getCache().addCacheServer();
s.setPort(port);
((BridgeServerImpl)s).setTransactionTimeToLive(10);
s.start();
return port;
}
return 0;
}
});
}
private Integer createRegionOnDisconnectedServer(VM vm, final boolean startServer) {
return (Integer)vm.invoke(new SerializableCallable() {
public Object call() throws Exception {
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDataPolicy(DataPolicy.REPLICATE);
Properties props = getDistributedSystemProperties();
props.put("mcast-port", ""+AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS));
props.remove("locators");
system = (InternalDistributedSystem)DistributedSystem.connect(props);
Cache cache = CacheFactory.create(system);
cache.createRegion(OTHER_REGION,af.create());
if (startServer) {
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer s = cache.addCacheServer();
s.setPort(port);
((BridgeServerImpl)s).setTransactionTimeToLive(10);
s.start();
return port;
}
return 0;
}
});
}
// private void createClientRegionWithRI(VM vm, final int port, final boolean isEmpty) {
// createClientRegion(vm, port, isEmpty, true);
// }
private void createClientRegion(VM vm, final int port, final boolean isEmpty) {
createClientRegion2(vm, port, isEmpty, false);
}
private void createClientRegionAndPopulateData(VM vm, final int port, final boolean isEmpty) {
createClientRegion(vm, port, isEmpty);
vm.invoke(new SerializableCallable() {
public Object call() throws Exception {
populateData();
return null;
}
});
}
private void createClientRegion2(VM vm, final int port, final boolean isEmpty, final boolean ri) {
vm.invoke(new SerializableCallable() {
public Object call() throws Exception {
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port);
ccf.setPoolSubscriptionEnabled(false);
ccf.set("log-level", getDUnitLogLevel());
// these settings were used to manually check that tx operation stats were being updated
//ccf.set(DistributionConfig.STATISTIC_SAMPLING_ENABLED_NAME, "true");
//ccf.set(DistributionConfig.STATISTIC_ARCHIVE_FILE_NAME, "clientStats.gfs");
ClientCache cCache = getClientCache(ccf);
ClientRegionFactory<Integer, String> crf = cCache
.createClientRegionFactory(isEmpty ? ClientRegionShortcut.PROXY
: ClientRegionShortcut.CACHING_PROXY);
crf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
Region<Integer, String> r = crf.create(D_REFERENCE);
Region<Integer, String> customer = crf.create(CUSTOMER);
Region<Integer, String> order = crf.create(ORDER);
if (ri) {
r.registerInterestRegex(".*");
customer.registerInterestRegex(".*");
order.registerInterestRegex(".*");
}
return null;
}
});
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.RemoteTransactionDUnitTest#getVMForTransactions(dunit.VM, dunit.VM)
*/
@Override
public VM getVMForTransactions(VM accessor, VM datastore) {
// create a cache server in the accessor VM and then create and return
// a client VM
final int serverPort = (Integer)accessor.invoke(new SerializableCallable("create cache server") {
public Object call() throws Exception {
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer s = getCache().addCacheServer();
s.setPort(port);
((BridgeServerImpl)s).setTransactionTimeToLive(10);
s.start();
return port;
}
});
VM clientVM = Host.getHost(0).getVM(2); // superclass always uses 0 and 1
createClientRegion(clientVM, serverPort, false, false);
return clientVM;
}
public void testTwoPoolsNotAllowed() {
Host host = Host.getHost(0);
VM datastore1 = host.getVM(0);
VM datastore2 = host.getVM(1);
final boolean cachingProxy = false;
disconnectAllFromDS(); // some other VMs seem to be hanging around and have the region this tests uses
final int port1 = createRegionsAndStartServer(datastore1, false);
System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
ccf.setPoolSubscriptionEnabled(false);
ccf.set("log-level", getDUnitLogLevel());
ClientCache cCache = getClientCache(ccf);
ClientRegionFactory<CustId, Customer> custrf = cCache
.createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
ClientRegionFactory<Integer, String> refrf = cCache
.createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
Region<Integer, String> r = refrf.create(D_REFERENCE);
Region<CustId, Customer> pr = custrf.create(CUSTOMER);
// set up a second pool for the other distributed system's region
final int port2 = createRegionOnDisconnectedServer(datastore2, true);
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost"/*getServerHostName(Host.getHost(0))*/, port2);
pf.create("otherServer");
ClientRegionFactory otherrf = cCache
.createClientRegionFactory(cachingProxy? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
otherrf.setPoolName("otherServer");
Region<Object, Object> otherRegion = otherrf.create(OTHER_REGION);
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.begin();
for (int i=0; i<5; i++) {
CustId custId = new CustId(i);
Customer cust = new Customer("name"+i, "address"+i);
getGemfireCache().getLogger().info("putting:"+custId);
pr.put(custId, cust);
r.put(i, "value"+i);
}
boolean exceptionThrown = false;
try {
otherRegion.put("tx", "not allowed");
} catch (TransactionException expected) {
exceptionThrown = true;
}
SerializableCallable disconnect = new SerializableCallable("disconnect") {
public Object call() throws Exception {
InternalDistributedSystem.getConnectedInstance().disconnect();
return null;
}
};
cCache.close();
datastore1.invoke(disconnect);
datastore2.invoke(disconnect);
if (!exceptionThrown) {
fail("expected TransactionException to be thrown since two pools were used");
}
}
public void testCleanupAfterClientFailure() {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
final boolean cachingProxy = false;
disconnectAllFromDS(); // some other VMs seem to be hanging around and have the region this tests uses
final int port1 = createRegionsAndStartServer(accessor, true);
createRegionOnServer(datastore, false, false);
System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
ccf.setPoolSubscriptionEnabled(false);
ccf.set("log-level", getDUnitLogLevel());
ClientCache cCache = getClientCache(ccf);
ClientRegionFactory<CustId, Customer> custrf = cCache
.createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
ClientRegionFactory<Integer, String> refrf = cCache
.createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
Region<Integer, String> r = refrf.create(D_REFERENCE);
Region<CustId, Customer> pr = custrf.create(CUSTOMER);
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.begin();
for (int i=0; i<5; i++) {
CustId custId = new CustId(i);
Customer cust = new Customer("name"+i, "address"+i);
getGemfireCache().getLogger().info("putting:"+custId);
pr.put(custId, cust);
r.put(i, "value"+i);
}
final DistributedMember myId = cCache.getDistributedSystem().getDistributedMember();
SerializableCallable verifyExists = new SerializableCallable("verify txstate for client exists") {
public Object call() throws Exception {
TXManagerImpl txmgr = getGemfireCache().getTxManager();
Set states = txmgr.getTransactionsForClient((InternalDistributedMember)myId);
assertEquals(1, states.size()); // only one in-progress transaction
return null;
}
};
accessor.invoke(verifyExists);
datastore.invoke(verifyExists);
cCache.close();
SerializableCallable verifyExpired = new SerializableCallable("verify txstate is expired") {
public Object call() throws Exception {
final TXManagerImpl txmgr = getGemfireCache().getTxManager();
try {
waitForCriterion(new WaitCriterion() {
public boolean done() {
Set states = txmgr.getTransactionsForClient((InternalDistributedMember)myId);
getLogWriter().info("found " + states.size() + " tx states for " + myId);
return states.isEmpty();
}
public String description() {
return "Waiting for transaction state to expire";
}
}, 15000, 500, true);
return null;
} finally {
getGemfireCache().getDistributedSystem().disconnect();
}
}
};
try {
accessor.invoke(verifyExpired);
datastore.invoke(verifyExpired);
} finally {
cCache.close();
}
}
public void testBasicCommitOnEmpty() {
doBasicCommit(false, false);
}
public void testBasicCommitOnEmptyUsingJTA() {
doBasicCommit(false, true);
}
public void testBasicCommit() {
doBasicCommit(true, false);
}
public void testBasicCommitUsingJTA() {
doBasicCommit(true, true);
}
private void doBasicCommit(final boolean prePopulateData, final boolean useJTA) {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
int port1 = createRegionsAndStartServer(server, false);
if (prePopulateData) {
createClientRegionAndPopulateData(client, port1, false);
} else {
createClientRegion(client, port1, false);
}
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
String suffix = prePopulateData ? "Updated" : "";
Region<CustId, Customer> r = getGemfireCache().getRegion(D_REFERENCE);
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
int initSize = prePopulateData ? 5 : 0;
assertEquals(initSize, pr.size());
assertEquals(initSize, r.size());
getLogWriter().info("Looking up transaction manager");
TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
Context ctx = getCache().getJNDIContext();
UserTransaction utx = (UserTransaction)ctx.lookup("java:/UserTransaction");
getLogWriter().info("starting transaction");
if (useJTA) {
utx.begin();
} else {
mgr.begin();
}
getLogWriter().info("done starting transaction");
for (int i = 0; i < MAX_ENTRIES; i++) {
CustId custId = new CustId(i);
Customer cust = new Customer("name"+suffix+i, "address"+suffix+i);
r.put(custId, cust);
pr.put(custId, cust);
}
for (int i=0; i<MAX_ENTRIES; i++) {
CustId custId = new CustId(i);
Customer cust = new Customer("name"+suffix+i, "address"+suffix+i);
assertEquals(cust, r.get(custId));
assertEquals(cust, pr.get(custId));
getLogWriter().info("SWAP:get:"+r.get(custId));
getLogWriter().info("SWAP:get:"+pr.get(custId));
}
getLogWriter().info("suspending transaction");
if (!useJTA) {
TXStateProxy tx = mgr.internalSuspend();
if (prePopulateData) {
for (int i=0; i<5; i++) {
CustId custId = new CustId(i);
Customer cust = new Customer("customer"+i, "address"+i);
assertEquals(cust, r.get(custId));
assertEquals(cust, pr.get(custId));
}
}
for (int i=5; i<MAX_ENTRIES; i++) {
assertNull(r.get(new CustId(i)));
assertNull(pr.get(new CustId(i)));
}
getLogWriter().info("resuming transaction");
mgr.resume(tx);
}
assertEquals(
"r sized should be " + MAX_ENTRIES + " but it is:" + r.size(),
MAX_ENTRIES, r.size());
assertEquals(
"pr sized should be " + MAX_ENTRIES + " but it is:" + pr.size(),
MAX_ENTRIES, pr.size());
getLogWriter().info("committing transaction");
if (useJTA) {
utx.commit();
} else {
getCache().getCacheTransactionManager().commit();
}
getLogWriter().info("done committing transaction");
assertEquals(
"r sized should be " + MAX_ENTRIES + " but it is:" + r.size(),
MAX_ENTRIES, r.size());
assertEquals(
"pr sized should be " + MAX_ENTRIES + " but it is:" + pr.size(),
MAX_ENTRIES, pr.size());
return null;
}
});
server.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<Integer, String> r = getGemfireCache().getRegion(D_REFERENCE);
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
assertEquals(
"r sized should be " + MAX_ENTRIES + " but it is:" + r.size(),
MAX_ENTRIES, r.size());
assertEquals(
"pr sized should be " + MAX_ENTRIES + " but it is:" + pr.size(),
MAX_ENTRIES, pr.size());
return null;
}
});
server.invoke(new SerializableCallable() {
public Object call() throws Exception {
String suffix = prePopulateData ? "Updated" : "";
Region<Integer, String> r = getGemfireCache().getRegion(D_REFERENCE);
Region<Integer, String> pr = getGemfireCache().getRegion(CUSTOMER);
for (int i = 0; i < MAX_ENTRIES; i++) {
CustId custId = new CustId(i);
Customer cust = new Customer("name"+suffix+i, "address"+suffix+i);
assertEquals(cust, r.get(custId));
assertEquals(cust, pr.get(custId));
}
return null;
}
});
verifyVersionTags(client, server, null, null);
}
public void testTXCreationAndCleanupAtCommit() throws Exception {
doBasicChecks(true);
}
public void testTXCreationAndCleanupAtRollback() throws Exception {
doBasicChecks(false);
}
private void doBasicChecks(final boolean commit) throws Exception {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
int port1 = createRegionsAndStartServer(server, false);
createClientRegionAndPopulateData(client, port1, false);
final TXId txId = (TXId)client.invoke(new DoOpsInTX(OP.PUT));
server.invoke(new SerializableCallable("verify tx") {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
assertTrue(mgr.isHostedTxInProgress(txId));
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
TXStateProxy tx = mgr.internalSuspend();
assertNotNull(tx);
mgr.resume(tx);
if (commit) {
mgr.commit();
} else {
mgr.rollback();
}
return null;
}
});
server.invoke(new SerializableCallable() {
public Object call() throws Exception {
final TXManagerImpl mgr = getGemfireCache().getTxManager();
WaitCriterion w = new WaitCriterion() {
public boolean done() {
return !mgr.isHostedTxInProgress(txId);
}
public String description() {
return "waiting for hosted tx in progress to terminate";
}
};
waitForCriterion(w, 10000, 200, true);
return null;
}
});
if (commit) {
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
verifyAfterCommit(OP.PUT);
System.out.println("expected verification to fail for this VM");
return null;
}
});
} else {
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
verifyAfterRollback(OP.PUT);
return null;
}
});
}
}
public void testPutallRollbackInServer() throws Exception {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
int port1 = createRegionsAndStartServer(server, false);
createClientRegionAndPopulateData(client, port1, false);
server.invoke(new SerializableCallable("verify tx") {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
mgr.begin();
CustId custId = new CustId(1);
OrderId orderId = new OrderId(1000, custId);
Order expectedOrder = new Order("fooOrder");
Map map = new HashMap();
map.put(orderId,expectedOrder);
orderRegion.putAll(map);
mgr.rollback();
assertNull(orderRegion.get(orderId));
return null;
}
});
}
public void testPutallRollbackInClient() throws Exception {
Host host = Host.getHost(0);
VM server = host.getVM(0);
/*int port1 =*/ createRegionsAndStartServer(server, false);
server.invoke(new SerializableCallable("verify tx") {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
mgr.begin();
CustId custId = new CustId(1);
OrderId orderId = new OrderId(1000, custId);
Order expectedOrder = new Order("fooOrder");
Map map = new HashMap();
map.put(orderId,expectedOrder);
orderRegion.putAll(map);
mgr.rollback();
assertNull(orderRegion.get(orderId));
return null;
}
});
}
public void disabled_testGetAllRollbackInServer() throws Exception {
Host host = Host.getHost(0);
VM server = host.getVM(0);
createRegionsAndStartServer(server, false);
server.invoke(new SerializableCallable("verify getAll tx") {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
orderRegion.getAttributesMutator().setCacheLoader(new CacheLoader() {
public void close() {
}
public Object load(LoaderHelper helper) throws CacheLoaderException {
return new Order(helper.getKey()+"_loaded");
}
});
mgr.begin();
CustId custId = new CustId(1);
OrderId orderId = new OrderId(1000, custId);
Set<OrderId> keys = new HashSet();
keys.add(orderId);
Order order = (orderRegion.getAll(keys)).get(orderId);
assertNotNull(order);
mgr.rollback();
assertNull(orderRegion.getEntry(orderId));
return null;
}
});
}
public void disabled_testGetAllRollbackInClient() throws Exception {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
int port1 = createRegionsAndStartServer(server, false);
createClientRegionAndPopulateData(client, port1, false);
server.invoke(new SerializableCallable("add cache loader") {
public Object call() throws Exception {
Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
orderRegion.getAttributesMutator().setCacheLoader(new CacheLoader() {
public void close() {
}
public Object load(LoaderHelper helper) throws CacheLoaderException {
return new Order(helper.getKey()+"_loaded");
}
});
return null;
}
});
client.invoke(new SerializableCallable("verify getAll uses tx") {
public Object call() throws Exception {
Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.begin();
CustId custId = new CustId(1);
OrderId orderId = new OrderId(1000, custId);
Set<OrderId> keys = new HashSet();
keys.add(orderId);
Order order = (orderRegion.getAll(keys)).get(orderId);
assertNotNull(order);
mgr.rollback();
getLogWriter().info("entry for " + orderId + " = " + orderRegion.getEntry(orderId));
assertNull(orderRegion.getEntry(orderId));
return null;
}
});
}
public void testClientCommitAndDataStoreGetsEvent() throws Exception {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
initAccessorAndDataStore(accessor, datastore, 0);
int port = startServer(accessor);
createClientRegion(client, port, false, true);
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
custRegion.getAttributesMutator().addCacheListener(new ServerListener());
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
CustId custId = new CustId(1);
// OrderId orderId = new OrderId(1, custId);
getCache().getCacheTransactionManager().begin();
custRegion.put(custId, new Customer("foo", "bar"));
// orderRegion.put(orderId, new Order("fooOrder"));
// refRegion.put(custId, new Customer("foo", "bar"));
getCache().getCacheTransactionManager().commit();
return null;
}
});
Thread.sleep(10000);
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
ServerListener l = (ServerListener) custRegion.getAttributes().getCacheListeners()[0];
getCache().getLogger().info("SWAP:CLIENTinvoked:"+l.invoked);
assertTrue(l.invoked);
return null;
}
});
}
public void testClientCreateAndTwoInvalidates() throws Exception {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
initAccessorAndDataStore(accessor, datastore, 0);
int port = startServer(accessor);
createClientRegion(client, port, false, true);
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
custRegion.getAttributesMutator().addCacheListener(new ClientListener());
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
CustId custId = new CustId(1938493204);
// OrderId orderId = new OrderId(1, custId);
getCache().getCacheTransactionManager().begin();
custRegion.create(custId,new Customer("foo", "bar"));
custRegion.invalidate(custId);
custRegion.invalidate(custId);
// orderRegion.put(orderId, new Order("fooOrder"));
// refRegion.put(custId, new Customer("foo", "bar"));
getCache().getCacheTransactionManager().commit();
return null;
}
});
}
public void testClientCommitsAndJustGets() throws Exception {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
initAccessorAndDataStore(accessor, datastore, 0);
int port = startServer(accessor);
createClientRegion(client, port, false, true);
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
custRegion.getAttributesMutator().addCacheListener(new ClientListener());
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
CustId custId = new CustId(1);
// OrderId orderId = new OrderId(1, custId);
getCache().getCacheTransactionManager().begin();
custRegion.get(custId);
// orderRegion.put(orderId, new Order("fooOrder"));
// refRegion.put(custId, new Customer("foo", "bar"));
getCache().getCacheTransactionManager().commit();
return null;
}
});
}
public void testClientDoesUnsupportedLocalOps() throws Exception {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
initAccessorAndDataStore(accessor, datastore, 0);
int port = startServer(accessor);
createClientRegion(client, port, false, true);
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
custRegion.getAttributesMutator().addCacheListener(new ClientListener());
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
CustId custId = new CustId(1);
// OrderId orderId = new OrderId(1, custId);
custRegion.put(custId, new Customer("foo", "bar"));
getCache().getCacheTransactionManager().begin();
try {
custRegion.localDestroy(custId);
fail("Should have thrown UOE");
} catch(UnsupportedOperationInTransactionException uoi) {
// chill
}
try {
custRegion.localInvalidate(custId);
fail("Should have thrown UOE");
} catch(UnsupportedOperationInTransactionException uoi) {
// chill
}
// orderRegion.put(orderId, new Order("fooOrder"));
// refRegion.put(custId, new Customer("foo", "bar"));
getCache().getCacheTransactionManager().commit();
return null;
}
});
}
public void testClientCommitsWithRIAndOnlyGetsOneEvent() throws Exception {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
initAccessorAndDataStore(accessor, datastore, 0);
int port = startServer(accessor);
createClientRegion(client, port, false, true);
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
custRegion.getAttributesMutator().addCacheListener(new ClientListener());
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
CustId custId = new CustId(1);
// OrderId orderId = new OrderId(1, custId);
getCache().getCacheTransactionManager().begin();
custRegion.put(custId, new Customer("foo", "bar"));
// orderRegion.put(orderId, new Order("fooOrder"));
// refRegion.put(custId, new Customer("foo", "bar"));
getCache().getCacheTransactionManager().commit();
return null;
}
});
Thread.sleep(10000);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
assertTrue(cl.invoked);
assertEquals("it should be 1 but its:"+cl.invokeCount,1,cl.invokeCount);
return null;
}
});
}
public void testDatastoreCommitsWithPutAllAndRI() throws Exception {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
initAccessorAndDataStore(accessor, datastore, 0);
int port = startServer(accessor);
createClientRegion(client, port, false, true);
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
custRegion.getAttributesMutator().addCacheListener(new ClientListener());
return null;
}
});
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
CustId custId = new CustId(1);
// OrderId orderId = new OrderId(1, custId);
getCache().getCacheTransactionManager().begin();
Map map = new HashMap();
map.put(custId, new Customer("foo", "bar"));
custRegion.putAll(map);
// orderRegion.put(orderId, new Order("fooOrder"));
// refRegion.put(custId, new Customer("foo", "bar"));
getCache().getCacheTransactionManager().commit();
return null;
}
});
Thread.sleep(10000);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
assertTrue(cl.invoked);
assertEquals("it should be 1 but its:"+cl.invokeCount,1,cl.invokeCount);
return null;
}
});
}
public void testClientCommitsWithPutAllAndRI() throws Exception {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
initAccessorAndDataStore(accessor, datastore, 0);
int port = startServer(accessor);
createClientRegion(client, port, false, true);
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
custRegion.getAttributesMutator().addCacheListener(new ClientListener());
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
CustId custId = new CustId(1);
// OrderId orderId = new OrderId(1, custId);
getCache().getCacheTransactionManager().begin();
Map map = new HashMap();
map.put(custId, new Customer("foo", "bar"));
custRegion.putAll(map);
// orderRegion.put(orderId, new Order("fooOrder"));
// refRegion.put(custId, new Customer("foo", "bar"));
getCache().getCacheTransactionManager().commit();
return null;
}
});
Thread.sleep(10000);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
assertTrue(cl.invoked);
assertTrue(cl.putAllOp);
assertFalse(cl.isOriginRemote);
assertEquals("it should be 1 but its:"+cl.invokeCount,1,cl.invokeCount);
return null;
}
});
}
public void testClientRollsbackWithPutAllAndRI() throws Exception {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
initAccessorAndDataStore(accessor, datastore, 0);
int port = startServer(accessor);
createClientRegion(client, port, false, true);
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
custRegion.getAttributesMutator().addCacheListener(new ClientListener());
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
CustId custId = new CustId(1);
// OrderId orderId = new OrderId(1, custId);
getCache().getCacheTransactionManager().begin();
Map map = new HashMap();
map.put(custId, new Customer("foo", "bar"));
custRegion.putAll(map);
// orderRegion.put(orderId, new Order("fooOrder"));
// refRegion.put(custId, new Customer("foo", "bar"));
getCache().getCacheTransactionManager().rollback();
return null;
}
});
Thread.sleep(10000);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
assertTrue(!cl.invoked);
assertTrue(!cl.putAllOp);
assertEquals("it should be 0 but its:"+cl.invokeCount,0,cl.invokeCount);
return null;
}
});
}
public void testClientInitiatedInvalidates() throws Exception {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
initAccessorAndDataStore(accessor, datastore, 0);
int port = startServer(accessor);
createClientRegion(client, port, false, true);
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
custRegion.getAttributesMutator().addCacheListener(new ClientListener());
return null;
}
});
/*
* Test a no-op commit: put/invalidate
*/
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
CustId custId = new CustId(1777777777);
getCache().getCacheTransactionManager().begin();
custRegion.put(custId, new Customer("foo", "bar"));
custRegion.invalidate(custId);
// orderRegion.put(orderId, new Order("fooOrder"));
// refRegion.put(custId, new Customer("foo", "bar"));
getCache().getCacheTransactionManager().commit();
return null;
}
});
Thread.sleep(10000);
/*
* Validate nothing came through
*/
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
assertTrue(cl.invoked);
assertEquals(1,cl.putCount);
assertEquals(1,cl.invokeCount);
assertEquals(0, cl.invalidateCount);
CustId custId = new CustId(1777777777);
assertTrue(custRegion.containsKey(custId));
assertTrue(!custRegion.containsValueForKey(custId));
cl.reset();
return null;
}
});
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
assertTrue(cl.invoked);
assertEquals(1,cl.putCount);
assertEquals(1,cl.invokeCount);
CustId custId = new CustId(1777777777);
assertTrue(custRegion.containsKey(custId));
assertTrue(!custRegion.containsValueForKey(custId));
cl.reset();
return null;
}
});
/*
* Ok lets do a put in tx, then an invalidate in a another tx and make sure it invalidates on client and server
*/
client.invoke(doAPutInTx);
client.invoke(doAnInvalidateInTx);
Thread.sleep(10000);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
assertTrue(cl.invoked);
assertEquals("totalEvents should be 2 but its:"+cl.invokeCount,2,cl.invokeCount);
assertEquals("it should be 1 but its:"+cl.invalidateCount,1,cl.invalidateCount);
assertEquals("it should be 1 but its:"+cl.putCount,1,cl.putCount);
CustId custId = new CustId(1);
assertTrue(custRegion.containsKey(custId));
assertFalse(custRegion.containsValueForKey(custId));
return null;
}
});
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
assertTrue(cl.invoked);
assertEquals("totalEvents should be 2 but its:"+cl.invokeCount,2,cl.invokeCount);
assertEquals("it should be 1 but its:"+cl.invalidateCount,1,cl.invalidateCount);
assertEquals("it should be 1 but its:"+cl.putCount,1,cl.putCount);
return null;
}
});
}
SerializableCallable validateNoEvents = new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
assertTrue(cl.invoked);
assertEquals("it should be 0 but its:"+cl.invokeCount,0,cl.invokeCount);
return null;
}
};
SerializableCallable doAPutInTx = new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
CustId custId = new CustId(1);
// OrderId orderId = new OrderId(1, custId);
getCache().getCacheTransactionManager().begin();
custRegion.put(custId, new Customer("foo", "bar"));
getCache().getCacheTransactionManager().commit();
return null;
}
};
SerializableCallable doAnInvalidateInTx = new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
// Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
// Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
CustId custId = new CustId(1);
// OrderId orderId = new OrderId(1, custId);
getCache().getCacheTransactionManager().begin();
custRegion.invalidate(custId);
getCache().getCacheTransactionManager().commit();
return null;
}
};
/**
* client connectes to an accessor and completes a transaction
*/
public void testServerDelegate() {
Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
int port = createRegionsAndStartServer(server1, true);
createRegionOnServer(server2);
createClientRegion(client, port, false);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
Region<Integer, String> r = getGemfireCache().getRegion(D_REFERENCE);
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
CustId custId = new CustId(10);
mgr.begin();
pr.put(custId, new Customer("name10", "address10"));
r.put(10, "value10");
TXStateProxy txState = mgr.internalSuspend();
assertNull(pr.get(custId));
assertNull(r.get(10));
mgr.resume(txState);
mgr.commit();
return null;
}
});
server1.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<Integer, String> r = getGemfireCache().getRegion(D_REFERENCE);
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
assertEquals(new Customer("name10", "address10"), pr.get(new CustId(10)));
assertEquals("value10", r.get(10));
return null;
}
});
}
public void testCommitWithPRAccessor() {
doTxWithPRAccessor(true);
}
public void testRollbackWithPRAccessor() {
doTxWithPRAccessor(false);
}
private void doTxWithPRAccessor(final boolean commit) {
Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
final int port1 = createRegionsAndStartServer(server1, true);
createRegionOnServer(server2);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
ccf.setPoolSubscriptionEnabled(false);
ccf.set("log-level", getDUnitLogLevel());
ClientCache cCache = getClientCache(ccf);
ClientRegionFactory<CustId, Customer> custrf = cCache
.createClientRegionFactory(ClientRegionShortcut.PROXY);
ClientRegionFactory<Integer, String> refrf = cCache
.createClientRegionFactory(ClientRegionShortcut.PROXY);
Region<Integer, String> r = refrf.create(D_REFERENCE);
Region<CustId, Customer> pr = custrf.create(CUSTOMER);
// Region<Integer, String> order = refrf.create(ORDER);
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.begin();
for (int i=0; i<10; i++) {
CustId custId = new CustId(i);
Customer cust = new Customer("name"+i, "address"+i);
pr.put(custId, cust);
r.put(i, "value"+i);
Thread.sleep(100);
}
return null;
}
});
SerializableCallable countActiveTx = new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
return mgr.hostedTransactionsInProgressForTest();
}
};
int serv1TxCount = (Integer) server1.invoke(countActiveTx);
int serv2TxCount = (Integer) server2.invoke(countActiveTx);
assertEquals(2, serv1TxCount + serv2TxCount);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
Region<Integer, String> r = getGemfireCache().getRegion(D_REFERENCE);
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
if (commit) {
mgr.commit();
for (int i=0;i<10;i++) {
assertEquals(new Customer("name"+i, "address"+i), pr.get(new CustId(i)));
assertEquals("value"+i, r.get(i));
}
} else {
mgr.rollback();
for (int i=0;i<10;i++) {
assertNull(pr.get(new CustId(i)));
assertNull(r.get(i));
}
}
return null;
}
});
serv1TxCount = (Integer) server1.invoke(countActiveTx);
serv2TxCount = (Integer) server2.invoke(countActiveTx);
assertEquals(0, serv1TxCount + serv2TxCount);
}
/**
* there is one txState and zero or more txProxyStates
* @throws Exception
*/
public void testConnectionAffinity() throws Exception {
Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
addExpectedException("java.net.SocketException");
final int port1 = createRegionsAndStartServer(server1, true);
final int port2 = createRegionsAndStartServer(server2, false);
SerializableCallable hostedSize = new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
return mgr.hostedTransactionsInProgressForTest();
}
};
int txcount = (Integer) server1.invoke(hostedSize) + (Integer)server2.invoke(hostedSize);
assertTrue("expected count to be 0"+txcount, txcount==0);
final TXId txid = (TXId) client.invoke(new SerializableCallable() {
public Object call() throws Exception {
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
ccf.addPoolServer("localhost", port2);
ccf.setPoolLoadConditioningInterval(1);
ccf.setPoolSubscriptionEnabled(false);
ccf.set("log-level", getDUnitLogLevel());
ClientCache cCache = getClientCache(ccf);
ClientRegionFactory<CustId, Customer> custrf = cCache
.createClientRegionFactory(ClientRegionShortcut.PROXY);
ClientRegionFactory<Integer, String> refrf = cCache
.createClientRegionFactory(ClientRegionShortcut.PROXY);
Region<Integer, String> r = refrf.create(D_REFERENCE);
Region<CustId, Customer> pr = custrf.create(CUSTOMER);
// Region<Integer, String> order = refrf.create(ORDER);
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.begin();
int i=0;
for (int j=0; j<10; j++) {
CustId custId = new CustId(i);
Customer cust = new Customer("name"+i, "address"+i);
getGemfireCache().getLogger().info("SWAP:putting:"+custId);
pr.put(custId, cust);
r.put(i, "value"+i);
Thread.sleep(100);
}
return mgr.getTransactionId();
}
});
SerializableCallable activeTx = new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
TXStateProxy tx = mgr.getHostedTXState(txid);
mgr.getCache().getLogger().info("SWAP:activeTx:"+tx);
// rather than returning strings representing objects
// return different ints to represent different objects
if (tx != null) {
TXStateInterface realtx = ((TXStateProxyImpl)tx).getRealDeal(null, null);
if (realtx instanceof TXState) {
return 11;
}
}
return 1;
}
};
int myCount = (Integer) server1.invoke(activeTx) + (Integer)server2.invoke(activeTx);
assertTrue("expected count to be 11 or 12 but was "+myCount, myCount >= 11 && myCount <= 12);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.commit();
return null;
}
});
}
/**
* client has a client, an accessor and a datastore
* pool connects to the accessor and datastore
* we then close the server in the accessor and verify failover
*/
public void testFailover() {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
doFailoverWork(accessor, null, datastore, client, true, false);
}
public void testFailoverAndCachingProxy() {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
doFailoverWork(accessor, null, datastore, client, true, true);
}
/**
* test has a client, two accessors and a datastore
* pool connects to two accessors.
* we then close the server in first accessor and verify failover
*/
public void testFailoverWithP2PMessaging() {
Host host = Host.getHost(0);
VM accessor1 = host.getVM(0);
VM accessor2 = host.getVM(1);
VM datastore = host.getVM(2);
VM client = host.getVM(3);
doFailoverWork(accessor1, accessor2, datastore, client, false, false);
}
public void testFailoverWithP2PMessagingAndCachingProxy() {
Host host = Host.getHost(0);
VM accessor1 = host.getVM(0);
VM accessor2 = host.getVM(1);
VM datastore = host.getVM(2);
VM client = host.getVM(3);
doFailoverWork(accessor1, accessor2, datastore, client, false, true);
}
private void doFailoverWork(VM accessor1, VM accessor2, VM datastore, VM client, boolean serverOnDatastore, final boolean cachingProxy) {
final int port1 = createRegionsAndStartServer(accessor1, true);
final int port2 = accessor2 == null ? 0 : createRegionsAndStartServer(accessor2, true);
final int port3 = serverOnDatastore ? createRegionsAndStartServer(datastore, false) : createRegionOnServer(datastore, false, false);
/*final TXId txid = (TXId) */client.invoke(new SerializableCallable() {
public Object call() throws Exception {
System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
if (port2 != 0) ccf.addPoolServer("localhost", port2);
if (port3 != 0) ccf.addPoolServer("localhost", port3);
ccf.setPoolSubscriptionEnabled(false);
ccf.set("log-level", getDUnitLogLevel());
ClientCache cCache = getClientCache(ccf);
ClientRegionFactory<CustId, Customer> custrf = cCache
.createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
ClientRegionFactory<Integer, String> refrf = cCache
.createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
Region<Integer, String> r = refrf.create(D_REFERENCE);
Region<CustId, Customer> pr = custrf.create(CUSTOMER);
// Region<Integer, String> order = refrf.create(ORDER);
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.begin();
for (int i=0; i<5; i++) {
CustId custId = new CustId(i);
Customer cust = new Customer("name"+i, "address"+i);
getGemfireCache().getLogger().info("SWAP:putting:"+custId);
pr.put(custId, cust);
r.put(i, "value"+i);
}
return mgr.getTransactionId();
}
});
accessor1.invoke(new SerializableCallable() {
public Object call() throws Exception {
for (CacheServer s : getCache().getCacheServers()) {
getCache().getLogger().info("SWAP:Stopping "+s);
s.stop();
}
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
/*TXManagerImpl mgr =*/ getGemfireCache().getTxManager();
Region<Integer, String> r = getGemfireCache().getRegion(D_REFERENCE);
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
for (int i=5; i<10; i++) {
CustId custId = new CustId(i);
Customer cust = new Customer("name"+i, "address"+i);
getGemfireCache().getLogger().info("SWAP:AfterFailover:putting:"+custId);
pr.put(custId, cust);
r.put(i, "value"+i);
}
return null;
}
});
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
assertEquals(1, mgr.hostedTransactionsInProgressForTest());
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
Region<Integer, String> r = getGemfireCache().getRegion(D_REFERENCE);
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
mgr.commit();
for (int i=0;i<10;i++) {
if (cachingProxy) {
assertTrue(pr.containsKey(new CustId(i)));
assertTrue(r.containsKey(i));
}
assertEquals(new Customer("name"+i, "address"+i), pr.get(new CustId(i)));
assertEquals("value"+i, r.get(i));
}
return null;
}
});
}
public void testGetEntry() {
Host host = Host.getHost(0);
//VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
int port = createRegionsAndStartServer(datastore, false);
createClientRegion(client, port, true);
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
pr.getAttributesMutator().setCacheLoader(new CacheLoader<CustId, Customer>() {
public void close() {
}
public Customer load(LoaderHelper<CustId, Customer> helper)
throws CacheLoaderException {
throw new RuntimeException("Loader should not be called");
}
});
pr.put(new CustId(10), new Customer("name10", "address10"));
pr.invalidate(new CustId(10));
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
// Region<Integer, String> r = getGemfireCache().getRegion(D_REFERENCE);
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
CustId key = new CustId(0);
Customer val = new Customer("name0", "address0");
pr.put(key, val);
mgr.begin();
Entry entry = pr.getEntry(key);
assertNotNull(entry);
assertEquals(key, entry.getKey());
assertEquals(val, entry.getValue());
Entry entry2 = pr.getEntry(new CustId(10));
assertNotNull(entry2);
assertEquals(new CustId(10), entry2.getKey());
assertNull(entry2.getValue());
assertNull(pr.getEntry(new CustId(100)));
mgr.commit();
return null;
}
});
}
public void testBug42920() {
Host host = Host.getHost(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
int port = createRegionsAndStartServer(datastore, false);
createClientRegion(client, port, true);
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
pr.getAttributesMutator().setCacheWriter(new ServerWriter());
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
/*CacheTransactionManager mgr =*/ getCache().getCacheTransactionManager();
pr.put(new CustId(0), new Customer("name0", "address0"));
pr.put(new CustId(1), new Customer("name1", "address1"));
return null;
}
});
}
//Disabled due to bug 47083
public void DISABLED_testCallbacks() {
Host host = Host.getHost(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
int port = createRegionsAndStartServer(datastore, false);
createClientRegion(client, port, true);
class ClientTxListener extends TransactionListenerAdapter {
private boolean afterRollbackInvoked = false;
boolean afterCommitInvoked = false;
@Override
public void afterCommit(TransactionEvent event) {
afterCommitInvoked = true;
verifyEvent(event);
}
@Override
public void afterRollback(TransactionEvent event) {
afterRollbackInvoked = true;
verifyEvent(event);
}
protected void verifyEvent(TransactionEvent event) {
Iterator it = event.getEvents().iterator();
int i=0;
while (it.hasNext()) {
EntryEvent ev = (EntryEvent) it.next();
if (i==0) assertNull(ev.getNewValue());
if (i > 1) {
assertEquals(new CustId(i), ev.getKey());
assertEquals(new Customer("name"+i, "address"+i), ev.getNewValue());
}
assertTrue(ev.isOriginRemote());
i++;
}
assertEquals(5, event.getEvents().size());
}
}
class ClientTxWriter implements TransactionWriter {
boolean txWriterCalled = false;
public void close() {
}
public void beforeCommit(TransactionEvent event)
throws TransactionWriterException {
txWriterCalled = true;
Iterator it = event.getEvents().iterator();
int i=0;
while (it.hasNext()) {
EntryEvent ev = (EntryEvent) it.next();
if (i==0) assertNull(ev.getNewValue());
if (i > 1) {
assertEquals(new CustId(i), ev.getKey());
assertEquals(new Customer("name"+i, "address"+i), ev.getNewValue());
}
assertTrue(ev.isOriginRemote());
i++;
}
assertEquals(5, event.getEvents().size());
}
}
class ClientListener extends CacheListenerAdapter {
boolean invoked = false;
@Override
public void afterCreate(EntryEvent event) {
CustId c = (CustId) event.getKey();
if (c.getCustId() > 1) {
invoked = true;
}
// we document that client transaction are proxied to the server
// and that the callback should be handled appropriately (hence true)
assertTrue(event.isOriginRemote());
assertTrue(event.isOriginRemote());
}
@Override
public void afterUpdate(EntryEvent event) {
assertFalse(event.isOriginRemote());
}
}
class ClientWriter extends CacheWriterAdapter {
@Override
public void beforeCreate(EntryEvent event) throws CacheWriterException {
CustId c = (CustId) event.getKey();
if (c.getCustId() < 2) {
return;
}
fail("cache writer should not be invoked");
}
@Override
public void beforeUpdate(EntryEvent event) throws CacheWriterException {
fail("cache writer should not be invoked");
}
}
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
CacheTransactionManager mgr = getCache().getCacheTransactionManager();
mgr.addListener(new ClientTxListener());
mgr.setWriter(new ClientTxWriter());
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
pr.getAttributesMutator().addCacheListener(new ServerListener());
pr.getAttributesMutator().setCacheWriter(new ServerWriter());
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
CacheTransactionManager mgr = getCache().getCacheTransactionManager();
mgr.addListener(new ClientTxListener());
try {
mgr.setWriter(new ClientTxWriter());
fail("expected exception not thrown");
} catch (IllegalStateException e) {
}
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
pr.getAttributesMutator().addCacheListener(new ClientListener());
pr.getAttributesMutator().setCacheWriter(new ClientWriter());
return null;
}
});
class doOps extends SerializableCallable {
public doOps(boolean commit) {
this.commit = commit;
}
boolean commit = false;
public Object call() throws Exception {
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
CacheTransactionManager mgr = getCache().getCacheTransactionManager();
pr.put(new CustId(0), new Customer("name0", "address0"));
pr.put(new CustId(1), new Customer("name1", "address1"));
mgr.begin();
pr.invalidate(new CustId(0));
pr.destroy(new CustId(1));
for (int i=2; i<5; i++) {
pr.put(new CustId(i), new Customer("name"+i, "address"+i));
}
if (commit) {
mgr.commit();
} else {
mgr.rollback();
}
return null;
}
}
client.invoke(new doOps(false));
datastore.invoke(new SerializableCallable() {
@SuppressWarnings("synthetic-access")
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getCacheTransactionManager();
ClientTxListener l = (ClientTxListener) mgr.getListeners()[0];
assertTrue(l.afterRollbackInvoked);
return null;
}
});
client.invoke(new doOps(true));
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getCacheTransactionManager();
ClientTxListener l = (ClientTxListener) mgr.getListeners()[0];
assertTrue(l.afterCommitInvoked);
ClientTxWriter w = (ClientTxWriter) mgr.getWriter();
assertTrue(w.txWriterCalled);
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getCacheTransactionManager();
ClientTxListener l = (ClientTxListener) mgr.getListeners()[0];
assertFalse(l.afterCommitInvoked);
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
ClientListener cl = (ClientListener) pr.getAttributes().getCacheListeners()[0];
assertTrue(cl.invoked);
return null;
}
});
}
public void testTXListenerOnRedundant() {
Host host = Host.getHost(0);
VM datastore1 = host.getVM(0);
VM datastore2 = host.getVM(1);
VM client = host.getVM(2);
int port = createRegionOnServer(datastore1, true, false, 1);
createRegionOnServer(datastore2, false, false, 1);
createClientRegion(client, port, true);
class RedundantListener extends TransactionListenerAdapter {
int invoked = 0;
@Override
public void afterCommit(TransactionEvent event) {
invoked++;
}
}
SerializableCallable registerTxListener = new SerializableCallable() {
public Object call() throws Exception {
getCache().getCacheTransactionManager().addListener(new RedundantListener());
return null;
}
};
datastore1.invoke(registerTxListener);
datastore2.invoke(registerTxListener);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
getGemfireCache().getCacheTransactionManager().begin();
pr.put(new CustId(1), new Customer("name1", "address1"));
getGemfireCache().getCacheTransactionManager().commit();
return null;
}
});
SerializableCallable listenerInvoked = new SerializableCallable() {
public Object call() throws Exception {
RedundantListener l = (RedundantListener) getCache().getCacheTransactionManager().getListeners()[0];
return l.invoked;
}
};
int listenerInvokedCount = (Integer)datastore1.invoke(listenerInvoked) + (Integer)datastore2.invoke(listenerInvoked);
assertEquals(1, listenerInvokedCount);
}
public void testBasicFunctionExecution() {
Host host = Host.getHost(0);
VM datastore = host.getVM(0);
VM client = host.getVM(1);
doBasicFunctionExecution(client, null, datastore);
}
public void testRemotedFunctionExecution() {
Host host = Host.getHost(0);
VM datastore = host.getVM(0);
VM client = host.getVM(1);
VM accessor = host.getVM(2);
doBasicFunctionExecution(client, accessor, datastore);
}
private void doBasicFunctionExecution(VM client, VM accessor, VM datastore) {
int datastorePort = createRegionsAndStartServer(datastore, false);
int accessorPort = accessor == null ? 0 : createRegionsAndStartServer(accessor, true);
final int port = accessorPort == 0 ? datastorePort : accessorPort;
createClientRegion(client, port, true);
class BasicTransactionalFunction extends FunctionAdapter {
static final String ID = "BasicTransactionalFunction";
@Override
public void execute(FunctionContext context) {
getGemfireCache().getLogger().info("SWAP:in function");
RegionFunctionContext ctx = (RegionFunctionContext) context;
Region pr = ctx.getDataSet();
pr.put(new CustId(0), new Customer("name0", "address0"));
pr.replace(new CustId(1), new Customer("name1", "address1"));
pr.put(new CustId(10), new Customer("name10", "address10"));
pr.put(new CustId(11), new Customer("name11", "address11"));
Region r = ctx.getDataSet().getRegionService().getRegion(D_REFERENCE);
r.put(new CustId(10), new Customer("name10", "address10"));
r.put(new CustId(11), new Customer("name11", "address11"));
ctx.getResultSender().lastResult(Boolean.TRUE);
}
@Override
public String getId() {
return ID;
}
}
SerializableCallable registerFunction = new SerializableCallable() {
public Object call() throws Exception {
FunctionService.registerFunction(new BasicTransactionalFunction());
return null;
}
};
datastore.invoke(registerFunction);
if (accessor != null) {
accessor.invoke(registerFunction);
}
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
FunctionService.registerFunction(new BasicTransactionalFunction());
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
Region<CustId, Customer> r = getGemfireCache().getRegion(D_REFERENCE);
TXManagerImpl mgr = getGemfireCache().getTxManager();
pr.put(new CustId(0), new Customer("oldname0", "oldaddress0"));
pr.put(new CustId(1), new Customer("oldname1", "oldaddress1"));
mgr.begin();
final Set filter = new HashSet();
filter.add(new CustId(0));filter.add(new CustId(1));
getGemfireCache().getLogger().info("SWAP:calling execute");
FunctionService.onRegion(pr).withFilter(filter).execute(BasicTransactionalFunction.ID).getResult();
assertEquals(new Customer("name0", "address0"), pr.get(new CustId(0)));
assertEquals(new Customer("name10", "address10"), pr.get(new CustId(10)));
assertEquals(new Customer("name10", "address10"), r.get(new CustId(10)));
TXStateProxy tx = mgr.internalSuspend();
assertEquals(new Customer("oldname0", "oldaddress0"), pr.get(new CustId(0)));
assertEquals(new Customer("oldname1", "oldaddress1"), pr.get(new CustId(1)));
assertNull(pr.get(new CustId(10)));
assertNull(r.get(new CustId(10)));
mgr.resume(tx);
mgr.commit();
assertEquals(new Customer("name0", "address0"), pr.get(new CustId(0)));
assertEquals(new Customer("name1", "address1"), pr.get(new CustId(1)));
assertEquals(new Customer("name10", "address10"), pr.get(new CustId(10)));
assertEquals(new Customer("name10", "address10"), r.get(new CustId(10)));
return null;
}
});
}
public void testEmptyTX() {
Host host = Host.getHost(0);
VM datastore = host.getVM(0);
VM client = host.getVM(1);
int port = createRegionsAndStartServer(datastore, false);
createClientRegion(client, port, false);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
getCache().getCacheTransactionManager().begin();
pr.size();
getCache().getCacheTransactionManager().commit();
return null;
}
});
}
public void testSuspendResumeOnDifferentThreads() {
Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
final int port1 = createRegionsAndStartServer(server1, false);
final int port2 = createRegionsAndStartServer(server2, false);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
ccf.addPoolServer("localhost", port2);
ccf.setPoolSubscriptionEnabled(false);
ccf.setPoolLoadConditioningInterval(1);
ccf.set("log-level", getDUnitLogLevel());
ClientCache cCache = getClientCache(ccf);
ClientRegionFactory<CustId, Customer> custrf = cCache
.createClientRegionFactory(ClientRegionShortcut.PROXY);
ClientRegionFactory<Integer, String> refrf = cCache
.createClientRegionFactory(ClientRegionShortcut.PROXY);
Region<Integer, String> r = refrf.create(D_REFERENCE);
Region<CustId, Customer> pr = custrf.create(CUSTOMER);
final TXManagerImpl mgr = getGemfireCache().getTxManager();
CustId custId = new CustId(10);
mgr.begin();
pr.put(custId, new Customer("name10", "address10"));
r.put(10, "value10");
final TXStateProxy txState = mgr.internalSuspend();
assertNull(pr.get(custId));
assertNull(r.get(10));
final CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread(new Runnable() {
public void run() {
mgr.resume(txState);
mgr.commit();
latch.countDown();
}
});
t.start();
latch.await();
assertEquals(new Customer("name10", "address10"), pr.get(custId));
assertEquals("value10", r.get(10));
return null;
}
});
}
/////////////////////////////////////////////////////////////////////////
// The following tests are inherited but since this class adds no new
// behavior for them they are reimplemented here to not execute
/////////////////////////////////////////////////////////////////////////
@Override
public void testPRTXGet() {
}
@Override
public void testPRTXGetOnRemoteWithLoader() {
}
@Override
public void testPRTXGetEntryOnRemoteSide() {
}
@Override
public void testPRTXGetOnLocalWithLoader() {
}
@Override
public void testNonColocatedTX() {
}
@Override
public void testRemoteExceptionThrown() {
}
@Override
public void testSize() {
}
@Override
public void testKeysIterator() {
}
@Override
public void testValuesIterator() {
}
@Override
public void testEntriesIterator() {
}
@Override
public void testKeysIterator1() {
}
@Override
public void testValuesIterator1() {
}
@Override
public void testEntriesIterator1() {
}
@Override
public void testKeysIteratorOnDestroy() {
}
@Override
public void testValuesIteratorOnDestroy() {
}
@Override
public void testEntriesIteratorOnDestroy() {
}
@Override
public void testKeysIterator1OnDestroy() {
}
@Override
public void testValuesIterator1OnDestroy() {
}
@Override
public void testEntriesIterator1OnDestroy() {
}
@Override
public void testKeyIterationOnRR() {
}
@Override
public void testValuesIterationOnRR() {
}
@Override
public void testEntriesIterationOnRR() {
}
@Override
public void testIllegalIteration() {
}
@Override
public void testTxFunctionOnRegion() {
}
@Override
public void testTxFunctionOnMember() {
}
@Override
public void testNestedTxFunction() {
}
@Override
public void testDRFunctionExecution() {
}
@Override
public void testTxFunctionWithOtherOps() {
}
@Override
public void testRemoteJTACommit() {
}
@Override
public void testRemoteJTARollback() {
}
@Override
public void testOriginRemoteIsTrueForRemoteReplicatedRegions() {
}
@Override
public void testRemoteCreateInReplicatedRegion() {
}
@Override
public void testRemoteTxCleanupOnCrash() {
}
@Override
public void testNonColocatedPutAll() {
}
@Override
public void testDestroyCreateConflation() {
}
@Override
public void testTXWithRI() throws Exception {
}
@Override
public void testBug43176() {
}
@Override
public void testTXWithRICommitInDatastore() throws Exception {
}
@Override
public void testListenersNotInvokedOnSecondary() {
}
@Override
public void testBug33073() {
}
@Override
public void testBug43081() throws Exception {
}
@Override
public void testBug45556() {
}
public void testBug42942() {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
final int port = createRegionsAndStartServer(accessor, true);
createRegionOnServer(datastore);
createClientRegion(client, port, false);
final CustId key = new CustId(1);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER);
assertNull(pr.get(key));
getCache().getCacheTransactionManager().begin();
pr.putIfAbsent(key, new Customer("name1", "address"));
assertNotNull(pr.get(key));
return null;
}
});
datastore.invoke(new SerializableCallable() {
public Object call() throws Exception {
getCache().close();
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
try {
getCache().getCacheTransactionManager().commit();
fail("expected exception not thrown");
} catch (TransactionInDoubtException e) {
}
Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER);
assertFalse(pr.containsKey(key));
return null;
}
});
}
public void testOnlyGet() {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore = host.getVM(1);
VM client = host.getVM(2);
final int port = createRegionsAndStartServer(accessor, true);
createRegionOnServer(datastore, false, false);
createClientRegion(client, port, false);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER);
pr.put(new CustId(1), new Customer("name1", "address"));
getCache().getCacheTransactionManager().begin();
pr.get(new CustId(1));
getCache().getCacheTransactionManager().commit();
return null;
}
});
}
public void testBug43237() {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
int port = createRegionsAndStartServer(server, false);
server.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER);
Region<String, String> r = getCache().getRegion(D_REFERENCE);
pr.getAttributesMutator().addCacheListener(new ServerListener());
r.getAttributesMutator().addCacheListener(new ServerListener());
return null;
}
});
createClientRegion(client, port, false);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER);
Region<String, String> r = getCache().getRegion(D_REFERENCE);
pr.getAttributesMutator().addCacheListener(new ClientListener());
r.getAttributesMutator().addCacheListener(new ClientListener());
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER);
Region<String, String> r = getCache().getRegion(D_REFERENCE);
pr.put(new CustId(1), new Customer("name1", "address1"));
r.put("key1", "value1");
ClientListener prl = (ClientListener) pr.getAttributes().getCacheListeners()[0];
assertEquals(1, pr.getAttributes().getCacheListeners().length);
ClientListener rl = (ClientListener) r.getAttributes().getCacheListeners()[0];
assertEquals(1, r.getAttributes().getCacheListeners().length);
assertFalse(prl.equals(rl));
prl.reset();
rl.reset();
getCache().getCacheTransactionManager().begin();
pr.put(new CustId(1), new Customer("newname1", "newaddress1"));
r.put("key1", "newvalue1");
pr.put(new CustId(2), new Customer("name2", "address2"));
r.put("key2", "value2");
getCache().getLogger().info("SWAP:issuingCommit");
getCache().getCacheTransactionManager().commit();
assertEquals(1, prl.creates);
assertEquals(1, prl.updates);
assertEquals(1, rl.creates);
assertEquals(1, rl.updates);
return null;
}
});
server.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER);
Region<String, String> r = getCache().getRegion(D_REFERENCE);
ServerListener prl = (ServerListener) pr.getAttributes().getCacheListeners()[0];
assertEquals(1, pr.getAttributes().getCacheListeners().length);
ServerListener rl = (ServerListener) r.getAttributes().getCacheListeners()[0];
assertEquals(1, r.getAttributes().getCacheListeners().length);
assertEquals(2, prl.creates);
assertEquals(1, prl.updates);
assertEquals(2, rl.creates);
assertEquals(1, rl.updates);
return null;
}
});
}
/**
* start 3 servers, accessor has r1 and r2; ds1 has r1, ds2 has r2
* stop server after distributing commit but b4 replying to client
*/
public void testFailoverAfterCommitDistribution() {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM datastore1 = host.getVM(1);
VM datastore2 = host.getVM(2);
VM client = host.getVM(3);
final int port1 = createRegionsAndStartServer(accessor, true);
final int port2 = (Integer) datastore1.invoke(new SerializableCallable() {
public Object call() throws Exception {
return AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
}
});
class CreateReplicateRegion extends SerializableCallable {
String regionName;
public CreateReplicateRegion(String replicateRegionName) {
this.regionName = replicateRegionName;
}
public Object call() throws Exception {
RegionFactory rf = getCache().createRegionFactory(RegionShortcut.REPLICATE);
rf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
rf.create(regionName);
return null;
}
}
accessor.invoke(new CreateReplicateRegion("r1"));
accessor.invoke(new CreateReplicateRegion("r2"));
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
disconnectFromDS();
System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
ccf.addPoolServer("localhost", port2);
ccf.setPoolMinConnections(5);
ccf.setPoolLoadConditioningInterval(-1);
ccf.setPoolSubscriptionEnabled(false);
ccf.set("log-level", getDUnitLogLevel());
ClientCache cCache = getClientCache(ccf);
Region r1 = cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("r1");
Region r2 = cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("r2");
return null;
}
});
datastore1.invoke(new SerializableCallable() {
public Object call() throws Exception {
CacheServer s = getCache().addCacheServer();
getCache().getLogger().info("SWAP:ds1");
s.setPort(port2);
s.start();
return null;
}
});
datastore1.invoke(new CreateReplicateRegion("r1"));
datastore2.invoke(new CreateReplicateRegion("r2"));
final TransactionId txId = (TransactionId) client.invoke(new SerializableCallable() {
public Object call() throws Exception {
ClientCache cCache = (ClientCache) getCache();
Region r1 = cCache.getRegion("r1");
Region r2 = cCache.getRegion("r2");
cCache.getCacheTransactionManager().begin();
cCache.getLogger().info("SWAP:beganTX");
r1.put("key1", "value1");
r2.put("key2", "value2");
return cCache.getCacheTransactionManager().getTransactionId();
}
});
accessor.invoke(new SerializableCallable() {
public Object call() throws Exception {
getCache().getLogger().info("SWAP:accessor");
final TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
assertTrue(mgr.isHostedTxInProgress((TXId) txId));
TXStateProxyImpl txProxy = (TXStateProxyImpl) mgr.getHostedTXState((TXId) txId);
final TXState txState = (TXState) txProxy.getRealDeal(null, null);
txState.setAfterSend(new Runnable() {
public void run() {
getCache().getLogger().info("SWAP:closing cache");
System.setProperty("gemfire.no-flush-on-close", "true");
try {
mgr.removeHostedTXState((TXId) txState.getTransactionId());
getCache().close();
} finally {
System.getProperties().remove("gemfire.no-flush-on-close");
}
}
});
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
getCache().getLogger().info("SWAP:commiting transaction");
getCache().getCacheTransactionManager().commit();
Region r1 = getCache().getRegion("r1");
Region r2 = getCache().getRegion("r2");
assertTrue(r1.containsKey("key1"));
assertTrue(r2.containsKey("key2"));
return null;
}
});
}
/**
* start a tx in a thread, obtain local locks and wait. start
* another tx and commit, make sure 2nd thread gets CCE
*/
public void testClientTxLocks() {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
int port = createRegionsAndStartServer(server, false);
createClientRegion(client, port, false);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
final Region r = getCache().getRegion(CUSTOMER);
final CountDownLatch outer = new CountDownLatch(1);
final CountDownLatch inner = new CountDownLatch(1);
Thread t = new Thread(new Runnable() {
public void run() {
TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
mgr.begin();
r.put(new CustId(1), new Customer("name1", "address1"));
Map<CustId, Customer> m = new HashMap<CustId, Customer>();
m.put(new CustId(2), new Customer("name2", "address2"));
r.putAll(m);
TXStateProxyImpl tx = (TXStateProxyImpl) mgr.internalSuspend();
ClientTXStateStub txStub = (ClientTXStateStub) tx.getRealDeal(null, null);
txStub.setAfterLocalLocks(new Runnable() {
public void run() {
try {
inner.countDown();
outer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
mgr.resume(tx);
mgr.commit();
}
});
t.start();
inner.await();
getCache().getCacheTransactionManager().begin();
r.put(new CustId(1), new Customer("name2", "address2"));
try {
getCache().getLogger().info("SWAP:Commit expect CCE");
getCache().getCacheTransactionManager().commit();
fail("expected CCE not thrown");
} catch (CommitConflictException cce) {
getCache().getLogger().info("SWAP:Commit Caught CCE");
}
outer.countDown();
t.join();
assertTrue(r.containsKey(new CustId(1)));
assertEquals(new Customer("name1", "address1"), r.get(new CustId(1)));
return null;
}
});
}
class TXFunction extends FunctionAdapter {
@Override
public void execute(FunctionContext context) {
List l = (List) context.getArguments();
CustId cusId = (CustId) l.get(0);
Customer cust = (Customer) l.get(1);
TransactionId txId = (TransactionId) l.get(2);
assertNotNull(cusId);
assertNotNull(cust);
RegionFunctionContext rfc = (RegionFunctionContext) context;
CacheTransactionManager mgr = getCache().getCacheTransactionManager();
if (txId != null) {
assertTrue(mgr.isSuspended(txId));
mgr.resume(txId);
} else {
mgr.begin();
}
rfc.getDataSet().put(cusId, cust);
txId = mgr.suspend();
context.getResultSender().lastResult(txId);
}
@Override
public String getId() {
return "TXFunction";
}
}
class TestCommitFunction extends FunctionAdapter {
@Override
public void execute(FunctionContext context) {
CacheTransactionManager mgr = getCache().getCacheTransactionManager();
TransactionId txId = (TransactionId) context.getArguments();
assertTrue(mgr.isSuspended(txId));
mgr.resume(txId);
mgr.commit();
context.getResultSender().lastResult(Boolean.TRUE);
}
@Override
public String getId() {
return "CommitFunction";
}
}
public void testBasicResumableTX() {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
final int port = createRegionsAndStartServer(server, false);
createClientRegion(client, port, false);
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region cust = getCache().getRegion(CUSTOMER);
assertNull(cust.get(new CustId(0)));
assertNull(cust.get(new CustId(1)));
ArrayList args = new ArrayList();
args.add(new CustId(0));args.add(new Customer("name0", "address0"));args.add(null);
List result = (List) FunctionService.onRegion(cust).withArgs(args).execute(new TXFunction()).getResult();
TransactionId txId = (TransactionId) result.get(0);
assertNotNull(txId);
args = new ArrayList();
args.add(new CustId(1));args.add(new Customer("name1", "address1"));args.add(txId);
result = (List) FunctionService.onRegion(cust).withArgs(args).execute(new TXFunction()).getResult();
TransactionId txId2 = (TransactionId) result.get(0);
assertEquals(txId, txId2);
result = (List) TransactionFunctionService.onTransaction(txId).execute(new TestCommitFunction()).getResult();
Boolean b = (Boolean) result.get(0);
assertEquals(Boolean.TRUE, b);
assertEquals(new Customer("name0", "address0"), cust.get(new CustId(0)));
assertEquals(new Customer("name1", "address1"), cust.get(new CustId(1)));
return null;
}
});
}
/**
* client connects to server1 which is an accessor. It then does
* transactional ops in functions, commit is done using internal
* ClientCommitFunction.
*/
public void testClientCommitFunction() {
doFunctionWork(true);
}
@Ignore("Bug 52331")
public void DISABLED_testClientRollbackFunction() {
doFunctionWork(false);
}
private void doFunctionWork(final boolean commit) {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
final int port2 = createRegionsAndStartServer(server2, false);
final int port = createRegionsAndStartServer(server1, true);
addExpectedException("ClassCastException");
SerializableRunnable suspectStrings = new SerializableRunnable("suspect string") {
public void run() {
InternalDistributedSystem.getLoggerI18n().convertToLogWriter().info(
"<ExpectedException action=add>" + "ClassCastException" + "</ExpectedException>" +
"<ExpectedException action=add>" + "TransactionDataNodeHasDeparted" + "</ExpectedException>"
);
}
};
server1.invoke(suspectStrings);
server2.invoke(suspectStrings);
try {
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port);
ccf.addPoolServer("localhost", port2);
ccf.setPoolSubscriptionEnabled(false);
ccf.set("log-level", getDUnitLogLevel());
// these settings were used to manually check that tx operation stats were being updated
//ccf.set(DistributionConfig.STATISTIC_SAMPLING_ENABLED_NAME, "true");
//ccf.set(DistributionConfig.STATISTIC_ARCHIVE_FILE_NAME, "clientStats.gfs");
ClientCache cCache = getClientCache(ccf);
ClientRegionFactory<Integer, String> crf = cCache
.createClientRegionFactory(ClientRegionShortcut.PROXY);
Region<Integer, String> customer = crf.create(CUSTOMER);
cCache.getLogger().info(
"<ExpectedException action=add>" + "ClassCastException" + "</ExpectedException>" +
"<ExpectedException action=add>" + "TransactionDataNodeHasDeparted" + "</ExpectedException>"
);
Region cust = getCache().getRegion(CUSTOMER);
getLogWriter().fine("SWAP:doing first get from client");
assertNull(cust.get(new CustId(0)));
assertNull(cust.get(new CustId(1)));
ArrayList args = new ArrayList();
args.add(new CustId(0));args.add(new Customer("name0", "address0"));args.add(null);
List result = (List) FunctionService.onRegion(cust).withArgs(args).execute(new TXFunction()).getResult();
TransactionId txId = (TransactionId) result.get(0);
assertNotNull(txId);
args = new ArrayList();
args.add(new CustId(1));args.add(new Customer("name1", "address1"));args.add(txId);
result = (List) FunctionService.onRegion(cust).withArgs(args).execute(new TXFunction()).getResult();
TransactionId txId2 = (TransactionId) result.get(0);
assertEquals(txId, txId2);
// invoke ClientCommitFunction
try {
if (commit) {
FunctionService.onServer(getCache()).withArgs(new CustId(0)).execute(new CommitFunction()).getResult();
} else {
FunctionService.onServer(getCache()).withArgs(new CustId(0)).execute(new RollbackFunction()).getResult();
}
fail("expected exception not thrown");
} catch (FunctionException e) {
assertTrue(e.getCause() instanceof ClassCastException);
}
List list = null;
if (commit) {
list = (List) TransactionFunctionService.onTransaction(txId).execute(new CommitFunction()).getResult();
} else {
list = (List) TransactionFunctionService.onTransaction(txId).execute(new RollbackFunction()).getResult();
}
assertEquals(Boolean.TRUE, list.get(0));
if (commit) {
assertEquals(new Customer("name0", "address0"), cust.get(new CustId(0)));
assertEquals(new Customer("name1", "address1"), cust.get(new CustId(1)));
} else {
assertNull(cust.get(new CustId(0)));
assertNull(cust.get(new CustId(1)));
}
return null;
}
});
} finally {
suspectStrings = new SerializableRunnable("suspect string") {
public void run() {
InternalDistributedSystem.getLoggerI18n().convertToLogWriter().info(
"<ExpectedException action=remove>" + "ClassCastException" + "</ExpectedException>" +
"<ExpectedException action=remove>" + "TransactionDataNodeHasDeparted" + "</ExpectedException>"
);
}
};
server1.invoke(suspectStrings);
server2.invoke(suspectStrings);
client.invoke(suspectStrings);
}
}
public void testClientCommitFunctionWithFailure() {
doFunctionWithFailureWork(true);
}
public void testRollbackFunctionWithFailure() {
doFunctionWithFailureWork(false);
}
private void doFunctionWithFailureWork(final boolean commit) {
addExpectedException("TransactionDataNodeHasDepartedException");
addExpectedException("ClassCastException");
Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
createRegionOnServer(server2);
final int port = createRegionsAndStartServer(server1, true);
createClientRegion(client, port, true);
final TransactionId txId = (TransactionId) client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region cust = getCache().getRegion(CUSTOMER);
assertNull(cust.get(new CustId(0)));
assertNull(cust.get(new CustId(1)));
ArrayList args = new ArrayList();
args.add(new CustId(0));args.add(new Customer("name0", "address0"));args.add(null);
List result = (List) FunctionService.onRegion(cust).withArgs(args).execute(new TXFunction()).getResult();
TransactionId txId = (TransactionId) result.get(0);
assertNotNull(txId);
args = new ArrayList();
args.add(new CustId(1));args.add(new Customer("name1", "address1"));args.add(txId);
result = (List) FunctionService.onRegion(cust).withArgs(args).execute(new TXFunction()).getResult();
TransactionId txId2 = (TransactionId) result.get(0);
assertEquals(txId, txId2);
// invoke ClientCommitFunction
try {
FunctionService.onServer(getCache()).withArgs(new CustId(0)).execute(new CommitFunction()).getResult();
fail("expected exception not thrown");
} catch (FunctionException e) {
assertTrue(e.getCause() instanceof ClassCastException);
}
return txId;
}
});
server2.invoke(new SerializableCallable() {
public Object call() throws Exception {
disconnectFromDS();
return null;
}
});
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region cust = getCache().getRegion(CUSTOMER);
try {
List list = null;
if (commit) {
list = (List) FunctionService.onServer(getCache()).withArgs(txId).execute(new CommitFunction()).getResult();
} else {
list = (List) FunctionService.onServer(getCache()).withArgs(txId).execute(new RollbackFunction()).getResult();
}
fail("expected exception not thrown");
} catch (FunctionException e) {
assertTrue(e.getCause() instanceof TransactionDataNodeHasDepartedException);
}
return null;
}
});
}
/**
* start an accessor and two peers, then commit transaction
* from accessor
*/
public void testCommitFunctionFromPeer() {
doTestFunctionFromPeer(true);
}
public void testRollbackFunctionFromPeer() {
doTestFunctionFromPeer(false);
}
private void doTestFunctionFromPeer(final boolean commit) {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM peer1 = host.getVM(1);
VM peer2 = host.getVM(2);
createRegionOnServer(peer1);
createRegionOnServer(peer2);
createRegionOnServer(accessor, false, true);
final TransactionId txId = (TransactionId) peer1.invoke(new SerializableCallable() {
public Object call() throws Exception {
PartitionedRegion r = (PartitionedRegion) getCache().getRegion(CUSTOMER);
CustId cust = null;
DistributedMember myId = getCache().getDistributedSystem().getDistributedMember();
List<CustId> keys = new ArrayList<CustId>();
for (int i=0; i<10;i++) {
cust = new CustId(i);
int bucketId = PartitionedRegionHelper.getHashKey(r, cust);
if (!myId.equals(r.getBucketPrimary(bucketId))) {
keys.add(cust);
}
}
assertTrue(keys.size() > 2);
CacheTransactionManager mgr = getCache().getCacheTransactionManager();
mgr.begin();
for (CustId custId : keys) {
r.put(cust, new Customer("newname", "newaddress"));
}
return mgr.suspend();
}
});
assertNotNull(txId);
accessor.invoke(new SerializableCallable() {
public Object call() throws Exception {
Execution exe = TransactionFunctionService.onTransaction(txId);
List list = null;
if (commit) {
list = (List) exe.execute(new CommitFunction()).getResult();
} else {
list = (List) exe.execute(new RollbackFunction()).getResult();
}
assertEquals(1, list.size());
assertTrue((Boolean) list.get(0));
return null;
}
});
}
public void testBug43752() {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
createRegionOnServer(server1);
createRegionOnServer(server2);
server1.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region r = getCache().getRegion(CUSTOMER);
CacheTransactionManager mgr = getCache().getCacheTransactionManager();
mgr.begin();
try {
for (int i=0; i<5; i++) {
getCache().getLogger().info("SWAP:put:custId:"+i);
r.put(new CustId(i), new Customer("name"+i, "address"+i));
}
fail("expected exception not thrown");
} catch (TransactionDataNotColocatedException e) {
// expected
}
mgr.commit();
return null;
}
});
}
public void testSuspendTimeout() throws Exception {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
final int port = createRegionsAndStartServer(server, false);
createClientRegion(client, port, true);
final TransactionId txId = (TransactionId) client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
mgr.setSuspendedTransactionTimeout(1);
Region r = getCache().getRegion(CUSTOMER);
assertNull(r.get(new CustId(101)));
mgr.begin();
r.put(new CustId(101), new Customer("name101", "address101"));
TransactionId txId = mgr.suspend();
Thread.sleep(70*1000);
try {
mgr.resume(txId);
fail("expected exception not thrown");
} catch (IllegalStateException expected) {
}
assertNull(r.get(new CustId(101)));
return txId;
}
});
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
assertNull(mgr.getHostedTXState((TXId) txId));
assertEquals(0, mgr.hostedTransactionsInProgressForTest());
return null;
}
});
}
/**
* test that re-tried operations from client do not result in multiple ops in tx
*/
public void testEventTracker() {
Host host = Host.getHost(0);
VM delegate = host.getVM(0);
VM server = host.getVM(1);
VM client = host.getVM(2);
final int port1 = createRegionsAndStartServer(delegate, true);
final int port2 = createRegionsAndStartServer(server, false);
final TXId txid = (TXId) client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
if (port2 != 0) ccf.addPoolServer("localhost", port2);
ccf.setPoolSubscriptionEnabled(false);
ccf.set("log-level", getDUnitLogLevel());
ClientCache cCache = getClientCache(ccf);
ClientRegionFactory<CustId, Customer> custrf = cCache
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
ClientRegionFactory<Integer, String> refrf = cCache
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
custrf.addCacheListener(new ClientListener());
Region<Integer, String> r = refrf.create(D_REFERENCE);
Region<CustId, Customer> pr = custrf.create(CUSTOMER);
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.begin();
CustId custId = new CustId(0);
Customer cust = new Customer("name"+0, "address"+0);
pr.put(custId, cust);
r.put(0, "value"+0);
return mgr.getTransactionId();
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
EntryEventImpl event = null;
Region<Integer, String> r = getGemfireCache().getRegion(D_REFERENCE);
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
LocalRegion lr = (LocalRegion) pr;
CustId custId = new CustId(1);
Customer cust = new Customer("name"+1, "address"+1);
event = lr.newUpdateEntryEvent(custId, cust, null);
assertNotNull(event);
event.copyOffHeapToHeap();
lr.validatedPut(event, System.currentTimeMillis());
lr.validatedPut(event, System.currentTimeMillis());
lr.validatedPut(event, System.currentTimeMillis());
return null;
}
});
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
TXStateProxyImpl txProxy = (TXStateProxyImpl) mgr.getHostedTXState(txid);
assert txProxy.isRealDealLocal();
TXState tx = (TXState) txProxy.getRealDeal(null, null);
assert tx != null;
//2 for put, 1 for validatedPut
assertEquals(3, tx.seenEvents.size());
return null;
}
});
delegate.invoke(new SerializableCallable() {
public Object call() throws Exception {
for (CacheServer s : getCache().getCacheServers()) {
getCache().getLogger().info("SWAP:Stopping "+s);
s.stop();
}
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.commit();
Region<CustId, Customer> pr = getGemfireCache().getRegion(CUSTOMER);
CacheListener<CustId, Customer>[] clarray = pr.getAttributes().getCacheListeners();
assert clarray.length == 1;
ClientListener cl = (ClientListener) clarray[0];
// 1 for put 1 for validatedPut
assertEquals(2, cl.putCount);
return null;
}
});
}
public void verifyVersionTags(VM client, VM server1, VM server2, VM server3) {
}
/**
* start two servers and a client. make the server throw TransactionException.
* verify that the exception does not cause the client to failover to the second server
* see bug 51666
*/
public void testTransactionException() {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
final int port1 = createRegionsAndStartServer(server1, true);
final int port2 = createRegionsAndStartServer(server2, true);
final Integer troubleKey = Integer.valueOf(1234);
// add cacheListener to throw exception on server1
class ExceptionWriter extends CacheWriterAdapter<Integer, String> {
@Override
public void beforeCreate(EntryEvent<Integer, String> event)
throws CacheWriterException {
throwException(event);
}
@Override
public void beforeUpdate(EntryEvent<Integer, String> event)
throws CacheWriterException {
throwException(event);
}
private void throwException(EntryEvent<Integer, String> event) {
if (event.getKey().equals(troubleKey)) {
getCache().getLogger().info("SWAP:In cache writer throwing exception");
throw new TransactionException("SWAP:TEST");
}
}
}
server1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region ref = getCache().getRegion(D_REFERENCE);
getCache().getLogger().info("SWAP:ADDWRITER:server1");
ref.getAttributesMutator().setCacheWriter(new ExceptionWriter());
return null;
}
});
/*final TXId txid = (TXId) */client.invoke(new SerializableCallable() {
public Object call() throws Exception {
System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
ccf.addPoolServer("localhost", port2);
ccf.setPoolMinConnections(0);
ccf.setPoolSubscriptionEnabled(false);
ccf.set("log-level", getDUnitLogLevel());
ClientCache cCache = getClientCache(ccf);
ClientRegionFactory<CustId, Customer> custrf = cCache
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
ClientRegionFactory<Integer, String> refrf = cCache
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
Region<Integer, String> r = refrf.create(D_REFERENCE);
// Region<Integer, String> order = refrf.create(ORDER);
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.begin();
for (int i=0; i<5; i++) {
getGemfireCache().getLogger().info("SWAP:putting:"+i);
r.put(i, "value"+i);
}
try {
getGemfireCache().getLogger().info("SWAP:putting:"+troubleKey);
r.put(troubleKey, "valueException");
fail("expected TransactionException exception not thrown");
} catch (TransactionException e) {
// expected
}
return mgr.getTransactionId();
}
});
// make sure tx has not failed over to server2
server2.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
GemFireCacheImpl cache = getGemfireCache();
assertEquals(0, cache.getTxManager().hostedTransactionsInProgressForTest());
return null;
}
});
}
/**
* In a server callback, enroll one more region within a transaction (the
* client does not have this region) commit the transaction to make sure
* that the client ignores this region.
* see bug 51922
*/
public void testNotAllRegionsHaveClient() {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(2);
final String regionName = getName();
final int port1 = createRegionsAndStartServer(server, true);
// add cacheListener to throw exception on server1
class SecurityWriter extends CacheWriterAdapter<Integer, String> {
@Override
public void beforeCreate(EntryEvent<Integer, String> event)
throws CacheWriterException {
enrollRegion(event);
}
@Override
public void beforeUpdate(EntryEvent<Integer, String> event)
throws CacheWriterException {
enrollRegion(event);
}
private void enrollRegion(EntryEvent<Integer, String> event) {
Region r = getCache().getRegion(regionName);
r.put("key", "value");
}
}
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region ref = getCache().getRegion(D_REFERENCE);
Region r = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
ref.getAttributesMutator().setCacheWriter(new SecurityWriter());
return null;
}
});
/*final TXId txid = (TXId) */client.invoke(new SerializableCallable() {
public Object call() throws Exception {
System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
ccf.setPoolMinConnections(0);
ccf.setPoolSubscriptionEnabled(false);
ccf.set("log-level", getDUnitLogLevel());
ClientCache cCache = getClientCache(ccf);
ClientRegionFactory<CustId, Customer> custrf = cCache
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
ClientRegionFactory<Integer, String> refrf = cCache
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
Region<Integer, String> r = refrf.create(D_REFERENCE);
// Region<Integer, String> order = refrf.create(ORDER);
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.begin();
for (int i=0; i<5; i++) {
getGemfireCache().getLogger().info("SWAP:putting:"+i);
r.put(i, "value"+i);
}
mgr.commit();
return null;
}
});
}
}