| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; |
| import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.Iterator; |
| |
| import org.junit.Test; |
| |
| import org.apache.geode.DataSerializable; |
| import org.apache.geode.Delta; |
| import org.apache.geode.InvalidDeltaException; |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.CommitConflictException; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.InterestPolicy; |
| import org.apache.geode.cache.PartitionAttributesFactory; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.SubscriptionAttributes; |
| import org.apache.geode.cache.UnsupportedOperationInTransactionException; |
| import org.apache.geode.cache.client.ClientCache; |
| import org.apache.geode.cache.client.ClientCacheFactory; |
| import org.apache.geode.cache.client.ClientRegionFactory; |
| import org.apache.geode.cache.client.ClientRegionShortcut; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.internal.cache.execute.CustomerIDPartitionResolver; |
| import org.apache.geode.internal.cache.execute.data.CustId; |
| import org.apache.geode.internal.cache.execute.data.Order; |
| import org.apache.geode.internal.cache.execute.data.OrderId; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.LogWriterUtils; |
| import org.apache.geode.test.dunit.SerializableCallable; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; |
| |
| |
| public class TransactionsWithDeltaDUnitTest extends JUnit4CacheTestCase { |
| |
| private static final String D_REFERENCE = "ref"; |
| private static final String CUSTOMER = "Customer"; |
| private static final String ORDER = "Order"; |
| |
| public TransactionsWithDeltaDUnitTest() { |
| super(); |
| } |
| |
| private Integer createRegionOnServer(VM vm, final boolean startServer, final boolean accessor) { |
| return (Integer) vm.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| createRegion(accessor, 0, null); |
| if (startServer) { |
| int port = getRandomAvailableTCPPort(); |
| CacheServer s = getCache().addCacheServer(); |
| s.setPort(port); |
| s.start(); |
| return port; |
| } |
| return 0; |
| } |
| }); |
| } |
| |
| private void createRegion(boolean accessor, int redundantCopies, InterestPolicy interestPolicy) { |
| AttributesFactory af = new AttributesFactory(); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setCloningEnabled(true); |
| getCache().createRegion(D_REFERENCE, af.create()); |
| af = new AttributesFactory(); |
| af.setCloningEnabled(true); |
| if (interestPolicy != null) { |
| af.setSubscriptionAttributes(new SubscriptionAttributes(interestPolicy)); |
| } |
| af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>() |
| .setTotalNumBuckets(4).setLocalMaxMemory(accessor ? 0 : 1) |
| .setPartitionResolver(new CustomerIDPartitionResolver("resolver1")) |
| .setRedundantCopies(redundantCopies).create()); |
| getCache().createRegion(CUSTOMER, af.create()); |
| af.setPartitionAttributes(new PartitionAttributesFactory<OrderId, Order>().setTotalNumBuckets(4) |
| .setLocalMaxMemory(accessor ? 0 : 1) |
| .setPartitionResolver(new CustomerIDPartitionResolver("resolver2")) |
| .setRedundantCopies(redundantCopies).setColocatedWith(CUSTOMER).create()); |
| getCache().createRegion(ORDER, af.create()); |
| } |
| |
| private void createClientRegion(VM vm, final int port, final boolean isEmpty, final boolean ri) { |
| vm.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| ClientCacheFactory ccf = new ClientCacheFactory(); |
| ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port); |
| ccf.setPoolSubscriptionEnabled(false); |
| ccf.set(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); |
| ClientCache cCache = getClientCache(ccf); |
| ClientRegionFactory<Integer, String> crf = cCache.createClientRegionFactory( |
| isEmpty ? ClientRegionShortcut.PROXY : ClientRegionShortcut.CACHING_PROXY); |
| Region<Integer, String> r = crf.create(D_REFERENCE); |
| Region<Integer, String> customer = crf.create(CUSTOMER); |
| Region<Integer, String> order = crf.create(ORDER); |
| if (ri) { |
| r.registerInterestRegex(".*"); |
| customer.registerInterestRegex(".*"); |
| order.registerInterestRegex(".*"); |
| } |
| return null; |
| } |
| }); |
| } |
| |
| static class Customer implements Delta, DataSerializable { |
| private int id; |
| private String name; |
| private boolean idChanged; |
| private boolean nameChanged; |
| private boolean fromDeltaCalled; |
| private boolean toDeltaCalled; |
| |
| public Customer() {} |
| |
| public Customer(int id, String name) { |
| this.id = id; |
| this.name = name; |
| } |
| |
| public void setId(int id) { |
| this.idChanged = true; |
| this.id = id; |
| } |
| |
| public void setName(String name) { |
| this.nameChanged = true; |
| this.name = name; |
| } |
| |
| public int getId() { |
| return id; |
| } |
| |
| public String getName() { |
| return name; |
| } |
| |
| @Override |
| public void fromDelta(DataInput in) throws IOException, InvalidDeltaException { |
| if (in.readBoolean()) { |
| id = in.readInt(); |
| } |
| if (in.readBoolean()) { |
| name = in.readUTF(); |
| } |
| fromDeltaCalled = true; |
| } |
| |
| @Override |
| public boolean hasDelta() { |
| return this.idChanged || this.nameChanged; |
| } |
| |
| @Override |
| public void toDelta(DataOutput out) throws IOException { |
| out.writeBoolean(idChanged); |
| if (idChanged) { |
| out.writeInt(id); |
| } |
| out.writeBoolean(nameChanged); |
| if (nameChanged) { |
| out.writeUTF(name); |
| } |
| toDeltaCalled = true; |
| } |
| |
| @Override |
| public String toString() { |
| return " id:" + id + " name:" + name; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) |
| return true; |
| if (obj instanceof Customer) { |
| Customer other = (Customer) obj; |
| return this.id == other.id && this.name.equals(other.name); |
| } |
| return false; |
| } |
| |
| @Override |
| public int hashCode() { |
| return this.id + this.name.hashCode(); |
| } |
| |
| public boolean isFromDeltaCalled() { |
| boolean retVal = this.fromDeltaCalled; |
| this.fromDeltaCalled = false; |
| return retVal; |
| } |
| |
| public boolean isToDeltaCalled() { |
| boolean retVal = this.toDeltaCalled; |
| this.toDeltaCalled = false; |
| return retVal; |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| out.writeInt(id); |
| out.writeUTF(name); |
| out.writeBoolean(idChanged); |
| out.writeBoolean(nameChanged); |
| out.writeBoolean(fromDeltaCalled); |
| out.writeBoolean(toDeltaCalled); |
| } |
| |
| @Override |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| id = in.readInt(); |
| name = in.readUTF(); |
| idChanged = in.readBoolean(); |
| nameChanged = in.readBoolean(); |
| fromDeltaCalled = in.readBoolean(); |
| toDeltaCalled = in.readBoolean(); |
| } |
| } |
| |
| @Test |
| public void testTxWithCloning() { |
| Host host = Host.getHost(0); |
| VM vm1 = host.getVM(0); |
| VM vm2 = host.getVM(1); |
| final String regionName = getUniqueName(); |
| |
| SerializableCallable createRegion = new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| AttributesFactory af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| af.setCloningEnabled(true); |
| getCache().createRegion(regionName, af.create()); |
| return null; |
| } |
| }; |
| |
| vm1.invoke(createRegion); |
| vm2.invoke(createRegion); |
| final String key = "cust1"; |
| |
| vm1.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| Region r = getCache().getRegion(regionName); |
| Customer cust = new Customer(1, "cust1"); |
| r.put(key, cust); |
| mgr.begin(); |
| cust.setName(""); |
| r.put(key, cust); |
| return null; |
| } |
| }); |
| |
| vm2.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region<String, Customer> r = getCache().getRegion(regionName); |
| Customer c = r.get(key); |
| c.setName("cust1updated"); |
| r.put(key, c); |
| return null; |
| } |
| }); |
| |
| vm1.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| Region r = getCache().getRegion(regionName); |
| assertNotNull(mgr.getTXState()); |
| try { |
| mgr.commit(); |
| fail("expected CommitConflict not thrown"); |
| } catch (CommitConflictException e) { |
| // expected |
| } |
| return null; |
| } |
| }); |
| } |
| |
| @Test |
| public void testExceptionThrown() { |
| Host host = Host.getHost(0); |
| VM vm1 = host.getVM(0); |
| VM vm2 = host.getVM(1); |
| final String regionName = getUniqueName(); |
| |
| SerializableCallable createRegion = new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| AttributesFactory af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| final RegionAttributes attr = af.create(); |
| getCache().createRegion(regionName, attr); |
| return null; |
| } |
| }; |
| |
| vm1.invoke(createRegion); |
| vm2.invoke(createRegion); |
| final String key = "cust1"; |
| |
| vm1.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| Region r = getCache().getRegion(regionName); |
| Customer cust = new Customer(1, "cust1"); |
| r.put(key, cust); |
| mgr.begin(); |
| cust.setName(""); |
| try { |
| r.put(key, cust); |
| fail("exception not thrown"); |
| } catch (UnsupportedOperationInTransactionException expected) { |
| } |
| mgr.rollback(); |
| return null; |
| } |
| }); |
| } |
| |
| @Test |
| public void testClientServerDelta() { |
| Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client = host.getVM(1); |
| int port = createRegionOnServer(server, true, false); |
| createClientRegion(client, port, false, false); |
| |
| server.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER); |
| CustId cust1 = new CustId(1); |
| pr.put(cust1, new Customer(1, "name1")); |
| Iterator<CustId> it = pr.keySet().iterator(); |
| while (it.hasNext()) { |
| LogWriterUtils.getLogWriter().info("SWAP:iterator1:" + pr.get(it.next())); |
| } |
| Customer c = pr.get(cust1); |
| assertNotNull(c); |
| return null; |
| } |
| }); |
| |
| client.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| CustId cust1 = new CustId(1); |
| // pr.put(cust1, new Customer(1, "name1")); |
| // pr.create(cust1, new Customer(1, "name1")); |
| mgr.begin(); |
| Customer c = pr.get(cust1); |
| c.setName("updatedName"); |
| LogWriterUtils.getLogWriter().info("SWAP:doingPut"); |
| pr.put(cust1, c); |
| LogWriterUtils.getLogWriter().info("SWAP:getfromtx:" + pr.get(cust1)); |
| LogWriterUtils.getLogWriter().info("SWAP:doingCommit"); |
| assertEquals("updatedName", pr.get(cust1).getName()); |
| TXStateProxy tx = mgr.pauseTransaction(); |
| assertEquals("name1", pr.get(cust1).getName()); |
| mgr.unpauseTransaction(tx); |
| mgr.commit(); |
| assertTrue(c.isToDeltaCalled()); |
| assertEquals(c, pr.get(cust1)); |
| return null; |
| } |
| }); |
| server.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER); |
| CustId cust1 = new CustId(1); |
| Customer c = pr.get(cust1); |
| assertTrue(c.isFromDeltaCalled()); |
| return null; |
| } |
| }); |
| } |
| } |