blob: efb246480c204588575bf60a7ab7c2e2c5560fa7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import org.junit.Test;
import org.apache.geode.DataSerializable;
import org.apache.geode.Delta;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.InterestPolicy;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.SubscriptionAttributes;
import org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.internal.cache.execute.CustomerIDPartitionResolver;
import org.apache.geode.internal.cache.execute.data.CustId;
import org.apache.geode.internal.cache.execute.data.Order;
import org.apache.geode.internal.cache.execute.data.OrderId;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
public class TransactionsWithDeltaDUnitTest extends JUnit4CacheTestCase {
private static final String D_REFERENCE = "ref";
private static final String CUSTOMER = "Customer";
private static final String ORDER = "Order";
public TransactionsWithDeltaDUnitTest() {
super();
}
private Integer createRegionOnServer(VM vm, final boolean startServer, final boolean accessor) {
return (Integer) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
createRegion(accessor, 0, null);
if (startServer) {
int port = getRandomAvailableTCPPort();
CacheServer s = getCache().addCacheServer();
s.setPort(port);
s.start();
return port;
}
return 0;
}
});
}
private void createRegion(boolean accessor, int redundantCopies, InterestPolicy interestPolicy) {
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDataPolicy(DataPolicy.REPLICATE);
af.setCloningEnabled(true);
getCache().createRegion(D_REFERENCE, af.create());
af = new AttributesFactory();
af.setCloningEnabled(true);
if (interestPolicy != null) {
af.setSubscriptionAttributes(new SubscriptionAttributes(interestPolicy));
}
af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>()
.setTotalNumBuckets(4).setLocalMaxMemory(accessor ? 0 : 1)
.setPartitionResolver(new CustomerIDPartitionResolver("resolver1"))
.setRedundantCopies(redundantCopies).create());
getCache().createRegion(CUSTOMER, af.create());
af.setPartitionAttributes(new PartitionAttributesFactory<OrderId, Order>().setTotalNumBuckets(4)
.setLocalMaxMemory(accessor ? 0 : 1)
.setPartitionResolver(new CustomerIDPartitionResolver("resolver2"))
.setRedundantCopies(redundantCopies).setColocatedWith(CUSTOMER).create());
getCache().createRegion(ORDER, af.create());
}
private void createClientRegion(VM vm, final int port, final boolean isEmpty, final boolean ri) {
vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port);
ccf.setPoolSubscriptionEnabled(false);
ccf.set(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
ClientCache cCache = getClientCache(ccf);
ClientRegionFactory<Integer, String> crf = cCache.createClientRegionFactory(
isEmpty ? ClientRegionShortcut.PROXY : ClientRegionShortcut.CACHING_PROXY);
Region<Integer, String> r = crf.create(D_REFERENCE);
Region<Integer, String> customer = crf.create(CUSTOMER);
Region<Integer, String> order = crf.create(ORDER);
if (ri) {
r.registerInterestRegex(".*");
customer.registerInterestRegex(".*");
order.registerInterestRegex(".*");
}
return null;
}
});
}
static class Customer implements Delta, DataSerializable {
private int id;
private String name;
private boolean idChanged;
private boolean nameChanged;
private boolean fromDeltaCalled;
private boolean toDeltaCalled;
public Customer() {}
public Customer(int id, String name) {
this.id = id;
this.name = name;
}
public void setId(int id) {
this.idChanged = true;
this.id = id;
}
public void setName(String name) {
this.nameChanged = true;
this.name = name;
}
public int getId() {
return id;
}
public String getName() {
return name;
}
@Override
public void fromDelta(DataInput in) throws IOException, InvalidDeltaException {
if (in.readBoolean()) {
id = in.readInt();
}
if (in.readBoolean()) {
name = in.readUTF();
}
fromDeltaCalled = true;
}
@Override
public boolean hasDelta() {
return this.idChanged || this.nameChanged;
}
@Override
public void toDelta(DataOutput out) throws IOException {
out.writeBoolean(idChanged);
if (idChanged) {
out.writeInt(id);
}
out.writeBoolean(nameChanged);
if (nameChanged) {
out.writeUTF(name);
}
toDeltaCalled = true;
}
@Override
public String toString() {
return " id:" + id + " name:" + name;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj instanceof Customer) {
Customer other = (Customer) obj;
return this.id == other.id && this.name.equals(other.name);
}
return false;
}
@Override
public int hashCode() {
return this.id + this.name.hashCode();
}
public boolean isFromDeltaCalled() {
boolean retVal = this.fromDeltaCalled;
this.fromDeltaCalled = false;
return retVal;
}
public boolean isToDeltaCalled() {
boolean retVal = this.toDeltaCalled;
this.toDeltaCalled = false;
return retVal;
}
@Override
public void toData(DataOutput out) throws IOException {
out.writeInt(id);
out.writeUTF(name);
out.writeBoolean(idChanged);
out.writeBoolean(nameChanged);
out.writeBoolean(fromDeltaCalled);
out.writeBoolean(toDeltaCalled);
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
id = in.readInt();
name = in.readUTF();
idChanged = in.readBoolean();
nameChanged = in.readBoolean();
fromDeltaCalled = in.readBoolean();
toDeltaCalled = in.readBoolean();
}
}
@Test
public void testTxWithCloning() {
Host host = Host.getHost(0);
VM vm1 = host.getVM(0);
VM vm2 = host.getVM(1);
final String regionName = getUniqueName();
SerializableCallable createRegion = new SerializableCallable() {
@Override
public Object call() throws Exception {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.REPLICATE);
af.setScope(Scope.DISTRIBUTED_ACK);
af.setCloningEnabled(true);
getCache().createRegion(regionName, af.create());
return null;
}
};
vm1.invoke(createRegion);
vm2.invoke(createRegion);
final String key = "cust1";
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
Region r = getCache().getRegion(regionName);
Customer cust = new Customer(1, "cust1");
r.put(key, cust);
mgr.begin();
cust.setName("");
r.put(key, cust);
return null;
}
});
vm2.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region<String, Customer> r = getCache().getRegion(regionName);
Customer c = r.get(key);
c.setName("cust1updated");
r.put(key, c);
return null;
}
});
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
Region r = getCache().getRegion(regionName);
assertNotNull(mgr.getTXState());
try {
mgr.commit();
fail("expected CommitConflict not thrown");
} catch (CommitConflictException e) {
// expected
}
return null;
}
});
}
@Test
public void testExceptionThrown() {
Host host = Host.getHost(0);
VM vm1 = host.getVM(0);
VM vm2 = host.getVM(1);
final String regionName = getUniqueName();
SerializableCallable createRegion = new SerializableCallable() {
@Override
public Object call() throws Exception {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.REPLICATE);
af.setScope(Scope.DISTRIBUTED_ACK);
final RegionAttributes attr = af.create();
getCache().createRegion(regionName, attr);
return null;
}
};
vm1.invoke(createRegion);
vm2.invoke(createRegion);
final String key = "cust1";
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
Region r = getCache().getRegion(regionName);
Customer cust = new Customer(1, "cust1");
r.put(key, cust);
mgr.begin();
cust.setName("");
try {
r.put(key, cust);
fail("exception not thrown");
} catch (UnsupportedOperationInTransactionException expected) {
}
mgr.rollback();
return null;
}
});
}
@Test
public void testClientServerDelta() {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
int port = createRegionOnServer(server, true, false);
createClientRegion(client, port, false, false);
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER);
CustId cust1 = new CustId(1);
pr.put(cust1, new Customer(1, "name1"));
Iterator<CustId> it = pr.keySet().iterator();
while (it.hasNext()) {
LogWriterUtils.getLogWriter().info("SWAP:iterator1:" + pr.get(it.next()));
}
Customer c = pr.get(cust1);
assertNotNull(c);
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER);
TXManagerImpl mgr = getGemfireCache().getTxManager();
CustId cust1 = new CustId(1);
// pr.put(cust1, new Customer(1, "name1"));
// pr.create(cust1, new Customer(1, "name1"));
mgr.begin();
Customer c = pr.get(cust1);
c.setName("updatedName");
LogWriterUtils.getLogWriter().info("SWAP:doingPut");
pr.put(cust1, c);
LogWriterUtils.getLogWriter().info("SWAP:getfromtx:" + pr.get(cust1));
LogWriterUtils.getLogWriter().info("SWAP:doingCommit");
assertEquals("updatedName", pr.get(cust1).getName());
TXStateProxy tx = mgr.pauseTransaction();
assertEquals("name1", pr.get(cust1).getName());
mgr.unpauseTransaction(tx);
mgr.commit();
assertTrue(c.isToDeltaCalled());
assertEquals(c, pr.get(cust1));
return null;
}
});
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region<CustId, Customer> pr = getCache().getRegion(CUSTOMER);
CustId cust1 = new CustId(1);
Customer c = pr.get(cust1);
assertTrue(c.isFromDeltaCalled());
return null;
}
});
}
}