blob: ad66da65b50e4258b546c11342b02c4293c21cb9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.test.dunit.Assert.assertEquals;
import static org.apache.geode.test.dunit.Assert.assertFalse;
import static org.apache.geode.test.dunit.Assert.assertNotNull;
import static org.apache.geode.test.dunit.Assert.assertNull;
import static org.apache.geode.test.dunit.Assert.assertTrue;
import static org.apache.geode.test.dunit.Assert.fail;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.geode.DataSerializable;
import org.apache.geode.Delta;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.internal.DestroyOp;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.MembershipManager;
import org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
/**
* tests for the concurrentMapOperations. there are more tests in ClientServerMiscDUnitTest
*/
public class ConcurrentMapOpsDUnitTest extends JUnit4CacheTestCase {
private static final String REP_REG_NAME = "repRegion";
private static final String PR_REG_NAME = "prRegion";
private static final int MAX_ENTRIES = 113;
enum OP {
PUTIFABSENT, REPLACE, REMOVE
}
@Override
public Properties getDistributedSystemProperties() {
Properties result = super.getDistributedSystemProperties();
result.put(ENABLE_NETWORK_PARTITION_DETECTION, "false");
return result;
}
private void createRegions(VM vm) {
vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
createReplicateRegion();
createPartitionedRegion();
return null;
}
});
}
private void createRedundantRegions(VM vm) {
vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
getCache().createRegionFactory(RegionShortcut.REPLICATE).setConcurrencyChecksEnabled(true)
.create(REP_REG_NAME);
getCache().createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
.setConcurrencyChecksEnabled(true).create(PR_REG_NAME);
return null;
}
});
}
private Region createReplicateRegion() {
return getCache().createRegionFactory(RegionShortcut.REPLICATE)
.setConcurrencyChecksEnabled(true).create(REP_REG_NAME);
}
private Region createPartitionedRegion() {
return getCache().createRegionFactory(RegionShortcut.PARTITION)
.setConcurrencyChecksEnabled(true).create(PR_REG_NAME);
}
private Integer createRegionsAndStartServer(VM vm) {
return createRegionsAndStartServer(vm, false);
}
private Integer createRegionsAndStartServer(VM vm, final boolean withRedundancy) {
return (Integer) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(REP_REG_NAME);
if (withRedundancy) {
getCache().createRegionFactory(RegionShortcut.PARTITION_REDUNDANT).create(PR_REG_NAME);
} else {
getCache().createRegionFactory(RegionShortcut.PARTITION).create(PR_REG_NAME);
}
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer s = getCache().addCacheServer();
s.setPort(port);
s.start();
return port;
}
});
}
private void createEmptyRegion(VM vm) {
vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
getCache().createRegionFactory(RegionShortcut.REPLICATE_PROXY)
.setConcurrencyChecksEnabled(true).create(REP_REG_NAME);
getCache().createRegionFactory(RegionShortcut.PARTITION_PROXY)
.setConcurrencyChecksEnabled(true).create(PR_REG_NAME);
return null;
}
});
}
private void createClientRegionWithRI(VM vm, final int port, final boolean isEmpty) {
createClientRegion(vm, port, isEmpty, true, -1);
}
private void createClientRegion(VM vm, final int port1, final boolean isEmpty, int port2) {
createClientRegion(vm, port1, isEmpty, false, port2);
}
private void createClientRegion(VM vm, final int port1, final boolean isEmpty, final boolean ri,
final int port2) {
vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port1);
if (port2 > 0) {
ccf.addPoolServer("localhost", port2);
}
ccf.setPoolSubscriptionEnabled(true);
ccf.set(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
ClientCache cCache = getClientCache(ccf);
ClientRegionFactory<Integer, String> crf = cCache.createClientRegionFactory(
isEmpty ? ClientRegionShortcut.PROXY : ClientRegionShortcut.CACHING_PROXY);
Region<Integer, String> r = crf.create(REP_REG_NAME);
Region<Integer, String> pr = crf.create(PR_REG_NAME);
if (ri) {
r.registerInterestRegex(".*");
pr.registerInterestRegex(".*");
}
return null;
}
});
}
private abstract static class AbstractConcMapOpsListener
implements CacheListener<Integer, String> {
@Override
public void afterCreate(EntryEvent<Integer, String> event) {
validate(event);
}
@Override
public void afterDestroy(EntryEvent<Integer, String> event) {
validate(event);
}
@Override
public void afterInvalidate(EntryEvent<Integer, String> event) {
validate(event);
}
@Override
public void afterRegionClear(RegionEvent<Integer, String> event) {}
@Override
public void afterRegionCreate(RegionEvent<Integer, String> event) {}
@Override
public void afterRegionDestroy(RegionEvent<Integer, String> event) {}
@Override
public void afterRegionInvalidate(RegionEvent<Integer, String> event) {}
@Override
public void afterRegionLive(RegionEvent<Integer, String> event) {}
@Override
public void afterUpdate(EntryEvent<Integer, String> event) {
validate(event);
}
@Override
public void close() {}
abstract void validate(EntryEvent event);
}
private static class NotInvokedListener extends AbstractConcMapOpsListener {
@Override
void validate(EntryEvent event) {
fail("should not be called. Event=" + event);
}
}
private static class InitialCreatesListener extends AbstractConcMapOpsListener {
AtomicInteger numCreates = new AtomicInteger();
@Override
void validate(EntryEvent event) {
if (!event.getOperation().isCreate()) {
fail("expected only create events");
}
numCreates.incrementAndGet();
}
}
// test for bug #42164
@Test
public void testListenerNotInvokedForRejectedOperation() {
Host host = Host.getHost(0);
VM vm1 = host.getVM(0);
VM vm2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
int port1 = createRegionsAndStartServer(vm1);
int port2 = createRegionsAndStartServer(vm2);
createClientRegionWithRI(client1, port1, true);
createClientRegionWithRI(client2, port2, true);
SerializableCallable addListenerToClientForInitialCreates = new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(REP_REG_NAME);
r.getAttributesMutator().addCacheListener(new InitialCreatesListener());
Region pr = getCache().getRegion(PR_REG_NAME);
pr.getAttributesMutator().addCacheListener(new InitialCreatesListener());
return null;
}
};
client1.invoke(addListenerToClientForInitialCreates);
client2.invoke(addListenerToClientForInitialCreates);
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region<Integer, String> r = getGemfireCache().getRegion(REP_REG_NAME);
Region<Integer, String> pr = getGemfireCache().getRegion(PR_REG_NAME);
for (int i = 0; i < MAX_ENTRIES; i++) {
r.put(i, "value" + i);
pr.put(i, "value" + i);
}
return null;
}
});
SerializableCallable waitForInitialCreates = new SerializableCallable() {
@Override
public Object call() throws Exception {
Region<Integer, String> r = getGemfireCache().getRegion(REP_REG_NAME);
Region<Integer, String> pr = getGemfireCache().getRegion(PR_REG_NAME);
waitForCreates(r);
waitForCreates(pr);
return null;
}
private void waitForCreates(Region region) {
CacheListener[] listeners = region.getAttributes().getCacheListeners();
boolean listenerFound = false;
for (CacheListener listener : listeners) {
if (listener instanceof InitialCreatesListener) {
listenerFound = true;
final InitialCreatesListener initialCreatesListener = (InitialCreatesListener) listener;
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return initialCreatesListener.numCreates.get() == MAX_ENTRIES;
}
@Override
public String description() {
return "Client expected to get " + MAX_ENTRIES + " creates, but got "
+ initialCreatesListener.numCreates.get();
}
};
GeodeAwaitility.await().untilAsserted(wc);
}
}
if (!listenerFound) {
fail("Client listener should have been found");
}
}
};
client1.invoke(waitForInitialCreates);
client2.invoke(waitForInitialCreates);
SerializableCallable addListener = new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(REP_REG_NAME);
r.getAttributesMutator().addCacheListener(new NotInvokedListener());
Region pr = getCache().getRegion(PR_REG_NAME);
pr.getAttributesMutator().addCacheListener(new NotInvokedListener());
return null;
}
};
vm1.invoke(addListener);
vm2.invoke(addListener);
SerializableCallable addListenerToClient = new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(REP_REG_NAME);
r.getAttributesMutator().addCacheListener(new NotInvokedListener());
Region pr = getCache().getRegion(PR_REG_NAME);
pr.getAttributesMutator().addCacheListener(new NotInvokedListener());
return null;
}
};
client1.invoke(addListenerToClient);
client2.invoke(addListenerToClient);
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(REP_REG_NAME);
Region pr = getCache().getRegion(PR_REG_NAME);
for (int i = 0; i < MAX_ENTRIES; i++) {
assertEquals("value" + i, r.putIfAbsent(i, "piavalue"));
assertEquals("value" + i, pr.putIfAbsent(i, "piavalue"));
}
for (int i = 0; i < MAX_ENTRIES; i++) {
assertFalse(r.replace(i, "value", "replace1Value"));
assertFalse(pr.replace(i, "value", "replace1Value"));
}
for (int i = MAX_ENTRIES + 1; i < MAX_ENTRIES * 2; i++) {
assertNull(r.replace(i, "replace2value" + i));
assertNull(pr.replace(i, "replace2value" + i));
}
for (int i = MAX_ENTRIES + 1; i < MAX_ENTRIES * 2; i++) {
assertFalse(r.remove(i, "removeValue" + i));
assertFalse(pr.remove(i, "removeValue" + i));
}
return null;
}
});
}
@Test
public void testBug42162() {
dotestConcOps(false);
}
@Test
public void testBug42162EmptyClient() {
dotestConcOps(true);
}
private void dotestConcOps(final boolean emptyClient) {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(2);
int port1 = createRegionsAndStartServer(server);
createClientRegion(client, port1, emptyClient, -1);
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
r.registerInterestRegex(".*");
pr.registerInterestRegex(".*");
return null;
}
});
server.invoke(new SerializableCallable() {
@Override
public Object call() {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
r.put("key0", "value");
pr.put("key0", "value");
assertNull(r.putIfAbsent("keyForNull", null));
assertNull(pr.putIfAbsent("keyForNull", null));
assertEquals("value", r.putIfAbsent("key0", null));
assertEquals("value", pr.putIfAbsent("key0", null));
assertTrue(r.containsKey("keyForNull"));
assertTrue(pr.containsKey("keyForNull"));
assertFalse(r.containsValueForKey("keyForNull"));
assertFalse(pr.containsValueForKey("keyForNull"));
r.put("key0", "value");
pr.put("key0", "value");
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
WaitCriterion wc = new WaitCriterion() {
AssertionError e = null;
@Override
public boolean done() {
try {
if (!emptyClient) {
assertTrue(r.containsKey("key0"));
assertTrue(pr.containsKey("key0"));
assertTrue(r.containsKey("keyForNull"));
assertTrue(pr.containsKey("keyForNull"));
assertFalse(r.containsValueForKey("keyForNull"));
assertFalse(pr.containsValueForKey("keyForNull"));
}
assertEquals("value", r.putIfAbsent("key0", null));
assertEquals("value", pr.putIfAbsent("key0", null));
assertNull(r.putIfAbsent("keyForNull", null));
assertNull(pr.putIfAbsent("keyForNull", null));
assertNull(r.putIfAbsent("clientNullKey", null));
assertNull(pr.putIfAbsent("clientNullKey", null));
} catch (AssertionError ex) {
r.getCache().getLogger().fine("SWAP:caught ", ex);
e = ex;
return false;
}
return true;
}
@Override
public String description() {
return "timeout " + e;
}
};
GeodeAwaitility.await().untilAsserted(wc);
return null;
}
});
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
assertTrue(r.containsKey("clientNullKey"));
assertTrue(pr.containsKey("clientNullKey"));
assertFalse(r.containsValueForKey("clientNullKey"));
assertFalse(pr.containsValueForKey("clientNullKey"));
assertNotNull(r.replace("key0", "value2"));
assertNotNull(pr.replace("key0", "value2"));
assertTrue(r.replace("keyForNull", null, "newValue"));
assertTrue(pr.replace("keyForNull", null, "newValue"));
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
WaitCriterion wc = new WaitCriterion() {
AssertionError e = null;
@Override
public boolean done() {
try {
assertEquals("value2", r.putIfAbsent("key0", null));
assertEquals("value2", pr.putIfAbsent("key0", null));
assertEquals("newValue", r.putIfAbsent("keyForNull", null));
assertEquals("newValue", pr.putIfAbsent("keyForNull", null));
// replace from client
assertEquals("value2", r.replace("key0", "value"));
assertEquals("value2", pr.replace("key0", "value"));
assertNull(r.replace("NoKeyOnServer", "value"));
assertNull(r.replace("NoKeyOnServer", "value"));
assertTrue(r.replace("clientNullKey", null, "newValue"));
assertTrue(pr.replace("clientNullKey", null, "newValue"));
} catch (AssertionError ex) {
e = ex;
return false;
}
return true;
}
@Override
public String description() {
return "timeout " + e.getMessage();
}
};
GeodeAwaitility.await().untilAsserted(wc);
return null;
}
});
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
assertEquals("newValue", r.get("clientNullKey"));
assertEquals("newValue", pr.get("clientNullKey"));
return null;
}
});
}
@Test
public void testNullValueFromNonEmptyClients() {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(2);
int port1 = createRegionsAndStartServer(server);
createClientRegion(client, port1, true, -1);
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
r.create("createKey", null);
pr.create("createKey", null);
assertNull(r.putIfAbsent("putAbsentKey", null));
assertNull(pr.putIfAbsent("putAbsentKey", null));
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
assertEquals(r.get("createKey"), r.get("putAbsentKey"));
assertEquals(pr.get("createKey"), pr.get("putAbsentKey"));
assertFalse(r.containsKey("createKey"));
assertFalse(pr.containsKey("createKey"));
assertEquals(r.containsKey("createKey"), r.containsKey("putAbsentKey"));
assertEquals(pr.containsKey("createKey"), pr.containsKey("putAbsentKey"));
return null;
}
});
}
@Test
public void testPutIfAbsent() {
doPutIfAbsentWork(false);
}
@Test
public void testPutIfAbsentCS() {
doPutIfAbsentWork(true);
}
private void doPutIfAbsentWork(final boolean cs) {
Host host = Host.getHost(0);
VM vm1 = host.getVM(0);
VM vm2 = host.getVM(2);
if (cs) {
int port1 = createRegionsAndStartServer(vm1);
createClientRegion(vm2, port1, false, -1);
} else {
createRegions(vm1);
createRegions(vm2);
}
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
assertNull(r.putIfAbsent("key0", "value"));
assertNull(pr.putIfAbsent("key0", "value"));
assertNull(r.putIfAbsent("keyForClient", "value"));
assertNull(pr.putIfAbsent("keyForClient", "value"));
assertEquals("value", r.putIfAbsent("key0", "value2"));
assertEquals("value", pr.putIfAbsent("key0", "value2"));
return null;
}
});
vm2.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
assertEquals("value", r.putIfAbsent("key0", "value2"));
assertEquals("value", pr.putIfAbsent("key0", "value2"));
if (cs) {
r.get("key0");
pr.get("key0");
}
assertTrue(r.containsKey("key0"));
assertTrue(pr.containsKey("key0"));
assertTrue(r.containsValueForKey("key0"));
assertTrue(pr.containsValueForKey("key0"));
return null;
}
});
}
@Test
public void testRemove() {
doRemoveWork(false);
}
@Test
public void testRemoveCS() {
doRemoveWork(true);
}
private void doRemoveWork(final boolean cs) {
Host host = Host.getHost(0);
VM vm1 = host.getVM(0);
VM vm2 = host.getVM(2);
if (cs) {
int port1 = createRegionsAndStartServer(vm1);
createClientRegion(vm2, port1, true, -1);
} else {
createRegions(vm1);
createRegions(vm2);
}
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
assertNull(r.putIfAbsent("key0", "value"));
assertNull(pr.putIfAbsent("key0", "value"));
assertNull(r.putIfAbsent("keyForClient", "value"));
assertNull(pr.putIfAbsent("keyForClient", "value"));
assertFalse(r.remove("nonExistentkey", "value"));
assertFalse(pr.remove("nonExistentkey", "value"));
assertFalse(r.remove("key0", "newValue"));
assertFalse(pr.remove("key0", "newValue"));
assertTrue(r.remove("key0", "value"));
assertTrue(pr.remove("key0", "value"));
assertFalse(r.containsKey("key0"));
assertFalse(pr.containsKey("key0"));
return null;
}
});
vm2.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
assertFalse(r.remove("nonExistentkey", "value"));
assertFalse(pr.remove("nonExistentkey", "value"));
assertFalse(r.remove("keyForClient", "newValue"));
assertFalse(pr.remove("keyForClient", "newValue"));
assertTrue(r.remove("keyForClient", "value"));
assertTrue(pr.remove("keyForClient", "value"));
return null;
}
});
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
assertFalse(r.containsKey("keyForClient"));
assertFalse(pr.containsKey("keyForClient"));
return null;
}
});
}
@Test
public void testReplaceCS() {
doReplaceWork(true);
}
@Test
public void testReplace() {
doReplaceWork(false);
}
private void doReplaceWork(final boolean cs) {
Host host = Host.getHost(0);
VM vm1 = host.getVM(0);
VM vm2 = host.getVM(2);
if (cs) {
int port1 = createRegionsAndStartServer(vm1);
createClientRegion(vm2, port1, true, -1);
} else {
createRegions(vm1);
createRegions(vm2);
}
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
assertNull(r.putIfAbsent("key0", "value"));
assertNull(pr.putIfAbsent("key0", "value"));
assertNull(r.putIfAbsent("keyForClient", "value"));
assertNull(pr.putIfAbsent("keyForClient", "value"));
assertNull(r.replace("nonExistentkey", "value"));
assertNull(pr.replace("nonExistentkey", "value"));
assertEquals("value", r.replace("key0", "value2"));
assertEquals("value", pr.replace("key0", "value2"));
assertFalse(r.replace("key0", "value", "newValue"));
assertFalse(pr.replace("key0", "value", "newValue"));
assertTrue(r.replace("key0", "value2", "newValue"));
assertTrue(pr.replace("key0", "value2", "newValue"));
return null;
}
});
vm2.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
assertNull(r.replace("nonExistentkey", "value"));
assertNull(pr.replace("nonExistentkey", "value"));
assertEquals("value", r.replace("keyForClient", "value2"));
assertEquals("value", pr.replace("keyForClient", "value2"));
assertFalse(r.replace("keyForClient", "value", "newValue"));
assertFalse(pr.replace("keyForClient", "value", "newValue"));
assertTrue(r.replace("keyForClient", "value2", "newValue"));
assertTrue(pr.replace("keyForClient", "value2", "newValue"));
return null;
}
});
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(REP_REG_NAME);
final Region pr = getCache().getRegion(PR_REG_NAME);
assertFalse(r.containsKey("nonExistentkey"));
assertFalse(pr.containsKey("nonExistentkey"));
return null;
}
});
}
@Test
public void testBug42167() {
do42167Work(false, REP_REG_NAME);
}
@Test
public void testBug42167PR() {
do42167Work(false, PR_REG_NAME);
}
@Test
public void testBug42167Empty() {
do42167Work(true, REP_REG_NAME);
}
@Test
public void testBug42167EmptyPR() {
do42167Work(true, PR_REG_NAME);
}
private void do42167Work(final boolean emptyClient, final String regionName) {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(2);
int port1 = createRegionsAndStartServer(server);
createClientRegion(client, port1, emptyClient, -1);
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(regionName);
r.put("key0", "value");
r.put("key2", "value2");
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(regionName);
assertEquals("value", r.get("key0"));
if (!emptyClient) {
r.localDestroy("key0");
assertFalse(r.containsKey("key0"));
}
getCache().getLogger().fine("SWAP:doingRemove");
assertTrue(r.remove("key0", "value"));
DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND = false;
assertTrue(r.remove("key0") == null);
assertTrue(DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND);
DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND = false;
assertFalse(r.remove("key0", "value"));
assertTrue(DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND);
DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND = false;
assertTrue(r.destroy("key0") == null);
assertTrue(DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND);
DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND = false;
assertTrue(r.remove("nonExistentKey1") == null);
assertTrue(DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND);
DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND = false;
assertFalse(r.remove("nonExistentKey2", "value"));
assertTrue(DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND);
DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND = false;
assertTrue(r.destroy("nonExistentKey3") == null);
assertTrue(DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND);
getCache().getLogger().fine("SWAP:doingReplace");
assertEquals("value2", r.replace("key2", "newValue2"));
getCache().getLogger().fine("SWAP:doingReplace2");
assertEquals(null, r.replace("key0", "newValue"));
assertNull(r.putIfAbsent("key4", "value4"));
return null;
}
});
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(regionName);
assertFalse(r.containsKey("key0"));
assertFalse(r.containsValueForKey("key0"));
assertTrue(r.containsKey("key2"));
assertEquals("newValue2", r.get("key2"));
r.getCache().getLogger().fine("SWAP:doingGet");
assertEquals("value4", r.get("key4"));
return null;
}
});
}
@Test
public void testBug42189() {
doBug42189Work(REP_REG_NAME);
}
@Test
public void testBug42189PR() {
doBug42189Work(PR_REG_NAME);
}
private void doBug42189Work(final String regionName) {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(2);
int port1 = createRegionsAndStartServer(server);
createClientRegion(client, port1, false, -1);
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(regionName);
r.create("key0", null);
assertNull(r.putIfAbsent("key0", "value"));
assertTrue(r.containsKey("key0"));
Object v = r.get("key0");
assertNull("expected null but was " + v, v);
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(regionName);
assertNull(r.putIfAbsent("key0", "value"));
assertTrue(r.containsKeyOnServer("key0"));
Object v = r.get("key0");
assertNull("expected null but was " + v, v);
return null;
}
});
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(regionName);
assertTrue(r.containsKey("key0"));
assertFalse(r.containsValueForKey("key0"));
return null;
}
});
}
/**
* Replicate Region test for bug #42195: putIfAbsent from client does not put old value in local
* cache
*/
@Ignore("TODO")
@Test
public void testBug42195() {
doPutIfAbsentPutsKeyInLocalClientCacheWork(REP_REG_NAME);
}
/**
* Partitioned Region test for bug #42195: putIfAbsent from client does not put old value in local
* cache
*/
@Ignore("TODO")
@Test
public void testBug42195PR() {
doPutIfAbsentPutsKeyInLocalClientCacheWork(PR_REG_NAME);
}
private void doPutIfAbsentPutsKeyInLocalClientCacheWork(final String regionName) {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(2);
int port1 = createRegionsAndStartServer(server);
createClientRegion(client, port1, false, -1);
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(regionName);
assertNull(r.putIfAbsent("key0", "value"));
assertTrue(r.containsKey("key0"));
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(regionName);
assertEquals("value", r.putIfAbsent("key0", "newValue"));
assertTrue(r.containsKeyOnServer("key0"));
assertTrue(r.containsKey("key0"));
assertTrue(r.containsValueForKey("key0"));
return null;
}
});
}
@Test
public void testReplacePutsKeyInLocalClientCache() {
doReplacePutsKeyInLocalClientCacheWork(REP_REG_NAME);
}
@Test
public void testReplacePutsKeyInLocalClientCachePR() {
doReplacePutsKeyInLocalClientCacheWork(PR_REG_NAME);
}
private void doReplacePutsKeyInLocalClientCacheWork(final String regionName) {
Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(2);
int port1 = createRegionsAndStartServer(server);
createClientRegion(client, port1, false, -1);
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(regionName);
assertNull(r.putIfAbsent("key0", "value"));
assertTrue(r.containsKey("key0"));
assertNull(r.putIfAbsent("key2", "value2"));
assertTrue(r.containsKey("key2"));
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(regionName);
assertEquals("value", r.replace("key0", "newValue"));
assertTrue(r.containsKeyOnServer("key0"));
assertTrue(r.containsKey("key0"));
assertTrue(r.containsValueForKey("key0"));
assertFalse(r.replace("key2", "DontReplace", "newValue"));
assertTrue(r.replace("key2", "value2", "newValu2"));
assertTrue(r.containsKeyOnServer("key2"));
assertTrue(r.containsKey("key2"));
assertTrue(r.containsValueForKey("key2"));
return null;
}
});
// bug #42221 - replace does not put entry on client when server has invalid value
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(regionName);
final String key = "bug42221";
r.putIfAbsent(key, null);
assertTrue(r.containsKey(key));
Object result = r.replace(key, "not null");
assertEquals(null, result);
assertTrue(r.containsKey(key));
assertEquals(r.get(key), "not null");
r.remove(key); // cleanup
return null;
}
});
// bug #42242 - remove(K,null) doesn't work
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(regionName);
final String key = "bug42242";
r.putIfAbsent(key, null);
assertTrue(r.containsKey(key));
assertTrue(r.containsKeyOnServer(key));
boolean result = r.remove(key, null);
assertTrue(result);
assertFalse(r.containsKey(key));
assertFalse(r.containsKeyOnServer(key));
return null;
}
});
// bug #42242b - second scenario with a replace(K,V,V) that didn't work
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(regionName);
final String key = "bug42242b";
r.putIfAbsent(key, null);
assertTrue(r.containsKey(key));
assertTrue(r.containsKeyOnServer(key));
boolean result = r.replace(key, null, "new value");
assertTrue(result);
result = r.remove(key, "new value");
assertTrue(result);
assertFalse(r.containsKey(key));
assertFalse(r.containsKeyOnServer(key));
return null;
}
});
// bug #42242c - remove does not work for entry that's on the server but not on the client
final String key = "bug42242c";
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(regionName);
r.registerInterest("ALL_KEYS");
return null;
}
});
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(regionName);
r.putIfAbsent(key, null);
assertTrue(r.containsKey(key));
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
final Region r = getCache().getRegion(regionName);
WaitCriterion w = new WaitCriterion() {
@Override
public String description() {
return "waiting for server operation to reach client";
}
@Override
public boolean done() {
return r.containsKey(key);
}
};
GeodeAwaitility.await().untilAsserted(w);
assertTrue(r.containsKeyOnServer(key));
boolean result = r.remove(key, null);
// if (!result) {
// ((LocalRegion)r).dumpBackingMap();
// }
assertTrue(result);
assertFalse(r.containsKey(key));
assertFalse(r.containsKeyOnServer(key));
return null;
}
});
}
@Test
public void testWithDelta() {
doTestWithDeltaWork(false, REP_REG_NAME);
}
@Test
public void testWithDeltaPR() {
doTestWithDeltaWork(false, PR_REG_NAME);
}
@Test
public void testWithDeltaCS() {
doTestWithDeltaWork(true, REP_REG_NAME);
}
@Test
public void testWithDeltaPRCS() {
doTestWithDeltaWork(true, PR_REG_NAME);
}
private void doTestWithDeltaWork(final boolean clientServer, final String regName) {
Host host = Host.getHost(0);
VM vm1 = host.getVM(0);
VM vm2 = host.getVM(1);
if (clientServer) {
int port = createRegionsAndStartServer(vm1);
createClientRegion(vm2, port, false, -1);
} else {
createRegions(vm1);
createRegions(vm2);
}
vm2.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(regName);
CustomerDelta c = new CustomerDelta("cust1", "addr1");
assertNull(r.putIfAbsent("k1", c));
CustomerDelta newc = new CustomerDelta(c);
newc.setAddress("updatedAddress");
assertEquals(c, r.putIfAbsent("k1", c));
assertEquals(c, r.replace("k1", newc));
assertFalse(r.replace("k1", c, newc));
assertTrue(r.replace("k1", newc, c));
assertFalse(r.remove("k1", newc));
assertTrue(r.remove("k1", c));
return null;
}
});
}
/** test putIfAbsent with failover & retry. This is bugs 42559 and 43640 */
@Test
public void testRetriedPutIfAbsent() throws Exception {
doRetriedOperation(Operation.PUT_IF_ABSENT, false);
}
@Test
public void testRetriedReplace() throws Exception {
doRetriedOperation(Operation.REPLACE, false);
}
@Test
public void testRetriedRemove() throws Exception {
doRetriedOperation(Operation.REMOVE, false);
}
@Test
public void testRetriedPutIfAbsentPR() throws Exception {
doRetriedOperation(Operation.PUT_IF_ABSENT, false);
}
@Test
public void testRetriedReplacePR() throws Exception {
doRetriedOperation(Operation.REPLACE, false);
}
@Test
public void testRetriedRemovePR() throws Exception {
doRetriedOperation(Operation.REMOVE, false);
}
private void doRetriedOperation(final Operation op, boolean usePR) {
Host host = Host.getHost(0);
final VM server1 = host.getVM(0);
final VM server2 = host.getVM(1);
final VM client = host.getVM(2);
final int port1 = createRegionsAndStartServer(server1, true);
final int port2 = createRegionsAndStartServer(server2, true);
final String regionName = usePR ? PR_REG_NAME : REP_REG_NAME;
IgnoredException.addIgnoredException("java.net.SocketException");
createClientRegion(client, port1, false, port2);
SerializableCallable getID = new SerializableCallable("get DM ID") {
@Override
public Object call() {
return getSystem().getDistributedMember();
}
};
final DistributedMember server1ID = (DistributedMember) server1.invoke(getID);
final DistributedMember server2ID = (DistributedMember) server2.invoke(getID);
Set<IgnoredException> exceptions = new HashSet<IgnoredException>();
exceptions.add(IgnoredException.addIgnoredException("Membership: requesting removal", server1));
exceptions.add(IgnoredException.addIgnoredException("Membership: requesting removal", server2));
exceptions.add(IgnoredException.addIgnoredException("ForcedDisconnect", server1));
exceptions.add(IgnoredException.addIgnoredException("ForcedDisconnect", server2));
try {
server1.invoke(new SerializableCallable("install crasher in server1") {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(regionName);
r.put("key0", "value");
if (op == Operation.PUT_IF_ABSENT) {
r.destroy("key0");
}
// force client to use server1 for now
// getCache().getCacheServers().get(0).stop();
r.getAttributesMutator().addCacheListener(new CacheListenerAdapter() {
private void killSender(EntryEvent event) {
if (event.isOriginRemote()) {
MembershipManager mgr =
MembershipManagerHelper.getMembership(getSystem());
mgr.requestMemberRemoval(server2ID, "removing for test");
try {
mgr.waitForDeparture(server2ID);
} catch (Exception e) {
fail("failed to stop the other server for this test:" + e.getMessage());
}
}
}
@Override
public void afterCreate(EntryEvent event) {
getCache().getLogger().info("afterCreate invoked with " + event);
killSender(event);
}
@Override
public void afterUpdate(EntryEvent event) {
getCache().getLogger().info("afterUpdate invoked with " + event);
killSender(event);
}
@Override
public void afterDestroy(EntryEvent event) {
getCache().getLogger().info("afterDestroy invoked with " + event);
killSender(event);
}
});
return null;
}
});
server2.invoke(new SerializableCallable("install crasher in server2") {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(regionName);
// force client to use server1 for now
// getCache().getCacheServers().get(0).stop();
r.getAttributesMutator().addCacheListener(new CacheListenerAdapter() {
private void killSender(EntryEvent event) {
if (event.isOriginRemote()) {
MembershipManager mgr =
MembershipManagerHelper.getMembership(getSystem());
mgr.requestMemberRemoval(server1ID, "removing for test");
try {
mgr.waitForDeparture(server1ID);
} catch (Exception e) {
fail("failed to stop the other server for this test:" + e.getMessage());
}
}
}
@Override
public void afterCreate(EntryEvent event) {
getCache().getLogger().info("afterCreate invoked with " + event);
killSender(event);
}
@Override
public void afterUpdate(EntryEvent event) {
getCache().getLogger().info("afterUpdate invoked with " + event);
killSender(event);
}
@Override
public void afterDestroy(EntryEvent event) {
getCache().getLogger().info("afterDestroy invoked with " + event);
killSender(event);
}
});
return null;
}
});
client.invoke(new SerializableRunnable() {
@Override
public void run() {
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
Region r = cache.getRegion(regionName);
if (op == Operation.PUT_IF_ABSENT) {
assertTrue("expected putIfAbsent to succeed and return null",
r.putIfAbsent("key0", "newvalue") == null);
} else if (op == Operation.REMOVE) {
assertTrue("expected remove operation to succeed and return true",
r.remove("key0", "value"));
} else if (op == Operation.REPLACE) {
assertTrue("expected replace operation to succeed and return true",
r.replace("key0", "value", "newvalue"));
}
}
});
} finally {
disconnectAllFromDS();
for (IgnoredException ex : exceptions) {
ex.remove();
}
}
}
private static class CustomerDelta implements DataSerializable, Delta {
private String name;
private String address;
private boolean nameChanged;
private boolean addressChanged;
public CustomerDelta() {}
public CustomerDelta(CustomerDelta o) {
this.address = o.address;
this.name = o.name;
}
public CustomerDelta(String name, String address) {
this.name = name;
this.address = address;
}
@Override
public void toData(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeUTF(address);
out.writeBoolean(nameChanged);
out.writeBoolean(addressChanged);
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
name = in.readUTF();
address = in.readUTF();
nameChanged = in.readBoolean();
addressChanged = in.readBoolean();
}
@Override
public void fromDelta(DataInput in) throws IOException, InvalidDeltaException {
boolean nameC = in.readBoolean();
if (nameC) {
this.name = in.readUTF();
}
boolean addressC = in.readBoolean();
if (addressC) {
this.address = in.readUTF();
}
}
@Override
public boolean hasDelta() {
return nameChanged || addressChanged;
}
@Override
public void toDelta(DataOutput out) throws IOException {
if (this.nameChanged) {
out.writeBoolean(nameChanged);
out.writeUTF(name);
}
if (this.addressChanged) {
out.writeBoolean(addressChanged);
out.writeUTF(address);
}
}
public void setName(String name) {
this.nameChanged = true;
this.name = name;
}
public String getName() {
return name;
}
public void setAddress(String address) {
this.addressChanged = true;
this.address = address;
}
public String getAddress() {
return address;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof CustomerDelta))
return false;
CustomerDelta other = (CustomerDelta) obj;
return this.name.equals(other.name) && this.address.equals(other.address);
}
@Override
public int hashCode() {
return this.address.hashCode() + this.name.hashCode();
}
}
}