| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| /** |
| * |
| */ |
| package com.gemstone.gemfire.internal.cache; |
| |
| import com.gemstone.gemfire.Delta; |
| import com.gemstone.gemfire.InvalidDeltaException; |
| import com.gemstone.gemfire.cache.AttributesFactory; |
| import com.gemstone.gemfire.cache.CacheTransactionManager; |
| import com.gemstone.gemfire.cache.CommitConflictException; |
| import com.gemstone.gemfire.cache.DataPolicy; |
| import com.gemstone.gemfire.cache.InterestPolicy; |
| import com.gemstone.gemfire.cache.PartitionAttributesFactory; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionAttributes; |
| import com.gemstone.gemfire.cache.Scope; |
| import com.gemstone.gemfire.cache.SubscriptionAttributes; |
| import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException; |
| import com.gemstone.gemfire.cache.client.ClientCache; |
| import com.gemstone.gemfire.cache.client.ClientCacheFactory; |
| import com.gemstone.gemfire.cache.client.ClientRegionFactory; |
| import com.gemstone.gemfire.cache.client.ClientRegionShortcut; |
| import com.gemstone.gemfire.cache.server.CacheServer; |
| import com.gemstone.gemfire.cache30.CacheTestCase; |
| import com.gemstone.gemfire.internal.AvailablePort; |
| import com.gemstone.gemfire.internal.cache.execute.CustomerIDPartitionResolver; |
| import com.gemstone.gemfire.internal.cache.execute.data.CustId; |
| import com.gemstone.gemfire.internal.cache.execute.data.Order; |
| import com.gemstone.gemfire.internal.cache.execute.data.OrderId; |
| |
| import dunit.Host; |
| import dunit.SerializableCallable; |
| import dunit.VM; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.Iterator; |
| |
| /** |
| * @author sbawaska |
| * |
| */ |
| public class TransactionsWithDeltaDUnitTest extends CacheTestCase { |
| |
| private static final String D_REFERENCE = "ref"; |
| private static final String CUSTOMER = "Customer"; |
| private static final String ORDER = "Order"; |
| |
| /** |
| * @param name |
| */ |
| public TransactionsWithDeltaDUnitTest(String name) { |
| super(name); |
| } |
| |
| private Integer createRegionOnServer(VM vm, final boolean startServer, final boolean accessor) { |
| return (Integer)vm.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| createRegion(accessor, 0, null); |
| if (startServer) { |
| int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| CacheServer s = getCache().addCacheServer(); |
| s.setPort(port); |
| s.start(); |
| return port; |
| } |
| return 0; |
| } |
| }); |
| } |
| |
| private void createRegion(boolean accessor, int redundantCopies, InterestPolicy interestPolicy) { |
| AttributesFactory af = new AttributesFactory(); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setCloningEnabled(true); |
| getCache().createRegion(D_REFERENCE,af.create()); |
| af = new AttributesFactory(); |
| af.setCloningEnabled(true); |
| if (interestPolicy != null) { |
| af.setSubscriptionAttributes(new SubscriptionAttributes(interestPolicy)); |
| } |
| af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>() |
| .setTotalNumBuckets(4).setLocalMaxMemory(accessor ? 0 : 1) |
| .setPartitionResolver(new CustomerIDPartitionResolver("resolver1")) |
| .setRedundantCopies(redundantCopies).create()); |
| getCache().createRegion(CUSTOMER, af.create()); |
| af.setPartitionAttributes(new PartitionAttributesFactory<OrderId, Order>() |
| .setTotalNumBuckets(4).setLocalMaxMemory(accessor ? 0 : 1) |
| .setPartitionResolver(new CustomerIDPartitionResolver("resolver2")) |
| .setRedundantCopies(redundantCopies).setColocatedWith(CUSTOMER).create()); |
| getCache().createRegion(ORDER, af.create()); |
| } |
| private void createClientRegion(VM vm, final int port, final boolean isEmpty, final boolean ri) { |
| vm.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| ClientCacheFactory ccf = new ClientCacheFactory(); |
| ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port); |
| ccf.setPoolSubscriptionEnabled(false); |
| ccf.set("log-level", getDUnitLogLevel()); |
| ClientCache cCache = getClientCache(ccf); |
| ClientRegionFactory<Integer, String> crf = cCache |
| .createClientRegionFactory(isEmpty ? ClientRegionShortcut.PROXY |
| : ClientRegionShortcut.CACHING_PROXY); |
| Region<Integer, String> r = crf.create(D_REFERENCE); |
| Region<Integer, String> customer = crf.create(CUSTOMER); |
| Region<Integer, String> order = crf.create(ORDER); |
| if (ri) { |
| r.registerInterestRegex(".*"); |
| customer.registerInterestRegex(".*"); |
| order.registerInterestRegex(".*"); |
| } |
| return null; |
| } |
| }); |
| } |
| |
| static class Customer implements Delta, Serializable { |
| private int id; |
| private String name; |
| private boolean idChanged; |
| private boolean nameChanged; |
| private boolean fromDeltaCalled; |
| private boolean toDeltaCalled; |
| |
| public Customer(int id, String name) { |
| this.id = id; |
| this.name = name; |
| } |
| public void setId(int id) { |
| this.idChanged = true; |
| this.id = id; |
| } |
| public void setName(String name) { |
| this.nameChanged = true; |
| this.name = name; |
| } |
| public int getId() { |
| return id; |
| } |
| public String getName() { |
| return name; |
| } |
| public void fromDelta(DataInput in) throws IOException, |
| InvalidDeltaException { |
| if (in.readBoolean()) { |
| id = in.readInt(); |
| } |
| if (in.readBoolean()) { |
| name = in.readUTF(); |
| } |
| fromDeltaCalled = true; |
| } |
| public boolean hasDelta() { |
| return this.idChanged || this.nameChanged; |
| } |
| |
| public void toDelta(DataOutput out) throws IOException { |
| out.writeBoolean(idChanged); |
| if (idChanged) { |
| out.writeInt(id); |
| } |
| out.writeBoolean(nameChanged); |
| if (nameChanged) { |
| out.writeUTF(name); |
| } |
| toDeltaCalled = true; |
| } |
| @Override |
| public String toString() { |
| return " id:"+id+" name:"+name; |
| } |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) return true; |
| if (obj instanceof Customer) { |
| Customer other = (Customer) obj; |
| return this.id == other.id && this.name.equals(other.name); |
| } |
| return false; |
| } |
| @Override |
| public int hashCode() { |
| return this.id + this.name.hashCode(); |
| } |
| public boolean isFromDeltaCalled() { |
| boolean retVal = this.fromDeltaCalled; |
| this.fromDeltaCalled = false; |
| return retVal; |
| } |
| public boolean isToDeltaCalled() { |
| boolean retVal = this.toDeltaCalled; |
| this.toDeltaCalled = false; |
| return retVal; |
| } |
| } |
| |
| public void testTxWithCloning() { |
| AttributesFactory af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| af.setCloningEnabled(true); |
| basicTest(af.create()); |
| } |
| |
| public void testExceptionThrown() { |
| AttributesFactory af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| final RegionAttributes attr = af.create(); |
| Host host = Host.getHost(0); |
| VM vm1 = host.getVM(0); |
| VM vm2 = host.getVM(1); |
| final String regionName = getUniqueName(); |
| |
| SerializableCallable createRegion = new SerializableCallable() { |
| public Object call() throws Exception { |
| getCache().createRegion(regionName, attr); |
| return null; |
| } |
| }; |
| |
| vm1.invoke(createRegion); |
| vm2.invoke(createRegion); |
| final String key = "cust1"; |
| |
| vm1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| Region r = getCache().getRegion(regionName); |
| Customer cust = new Customer(1, "cust1"); |
| r.put(key, cust); |
| mgr.begin(); |
| cust.setName(""); |
| try { |
| r.put(key, cust); |
| fail("exception not thrown"); |
| } catch (UnsupportedOperationInTransactionException expected) { |
| } |
| mgr.rollback(); |
| return null; |
| } |
| }); |
| } |
| |
| private void basicTest(final RegionAttributes regionAttr) { |
| Host host = Host.getHost(0); |
| VM vm1 = host.getVM(0); |
| VM vm2 = host.getVM(1); |
| final String regionName = getUniqueName(); |
| |
| SerializableCallable createRegion = new SerializableCallable() { |
| public Object call() throws Exception { |
| getCache().createRegion(regionName, regionAttr); |
| return null; |
| } |
| }; |
| |
| vm1.invoke(createRegion); |
| vm2.invoke(createRegion); |
| final String key = "cust1"; |
| |
| vm1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| Region r = getCache().getRegion(regionName); |
| Customer cust = new Customer(1, "cust1"); |
| r.put(key, cust); |
| mgr.begin(); |
| cust.setName(""); |
| r.put(key, cust); |
| return null; |
| } |
| }); |
| |
| vm2.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<String, Customer> r = getCache().getRegion(regionName); |
| Customer c = r.get(key); |
| c.setName("cust1updated"); |
| r.put(key, c); |
| return null; |
| } |
| }); |
| |
| vm1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| Region r = getCache().getRegion(regionName); |
| assertNotNull(mgr.getTXState()); |
| try { |
| mgr.commit(); |
| fail("expected CommitConflict not thrown"); |
| } catch (CommitConflictException e) { |
| //expected |
| } |
| return null; |
| } |
| }); |
| } |
| |
| public void testClientServerDelta() { |
| Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client = host.getVM(1); |
| int port = createRegionOnServer(server, true, false); |
| createClientRegion(client, port, false, false); |
| |
| server.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER); |
| CustId cust1 = new CustId(1); |
| pr.put(cust1, new Customer(1, "name1")); |
| Iterator<CustId> it = pr.keySet().iterator(); |
| while (it.hasNext()) { |
| getLogWriter().info("SWAP:iterator1:"+pr.get(it.next())); |
| } |
| Customer c = pr.get(cust1); |
| assertNotNull(c); |
| return null; |
| } |
| }); |
| |
| client.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| CustId cust1 = new CustId(1); |
| //pr.put(cust1, new Customer(1, "name1")); |
| // pr.create(cust1, new Customer(1, "name1")); |
| mgr.begin(); |
| Customer c = pr.get(cust1); |
| c.setName("updatedName"); |
| getLogWriter().info("SWAP:doingPut"); |
| pr.put(cust1, c); |
| getLogWriter().info("SWAP:getfromtx:"+pr.get(cust1)); |
| getLogWriter().info("SWAP:doingCommit"); |
| assertEquals("updatedName", pr.get(cust1).getName()); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertEquals("name1", pr.get(cust1).getName()); |
| mgr.resume(tx); |
| mgr.commit(); |
| assertTrue(c.isToDeltaCalled()); |
| assertEquals(c, pr.get(cust1)); |
| return null; |
| } |
| }); |
| server.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER); |
| CustId cust1 = new CustId(1); |
| Customer c = pr.get(cust1); |
| assertTrue(c.isFromDeltaCalled()); |
| return null; |
| } |
| }); |
| } |
| } |