blob: d8e06b48db2d311ede7b584886df6a4663c27a68 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.Assert.assertEquals;
import static org.apache.geode.test.dunit.Assert.assertFalse;
import static org.apache.geode.test.dunit.Assert.assertNotNull;
import static org.apache.geode.test.dunit.Assert.assertNull;
import static org.apache.geode.test.dunit.Assert.assertTrue;
import static org.apache.geode.test.dunit.Assert.fail;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.DataSerializable;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Declarable;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqClosedException;
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.QueryInvalidException;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache.util.CacheWriterAdapter;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.cache30.ClientServerTestCase;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.junit.categories.ClientServerTest;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* Tests putAll for c/s. Also tests removeAll
*
* @since GemFire 5.0.23
*/
@Category({ClientServerTest.class, ClientSubscriptionTest.class})
@SuppressWarnings("serial")
public class PutAllCSDUnitTest extends ClientServerTestCase {
final int numberOfEntries = 100;
final int testEndPointSwitchNumber = 200;
final int thousandEntries = 1000;
final int TOTAL_BUCKETS = 10;
static Object lockObject = new Object();
static Object lockObject2 = new Object();
static Object lockObject3 = new Object();
static Object lockObject4 = new Object();
final String expectedExceptions = PutAllPartialResultException.class.getName() + "||"
+ ServerConnectivityException.class.getName() + "||"
+ RegionDestroyedException.class.getName() + "||java.net.ConnectException";
List<VersionTag> client1Versions = null;
List<VersionTag> client2Versions = null;
List<VersionTag> client1RAVersions = null;
List<VersionTag> client2RAVersions = null;
List<String> expectedVersions = null;
List<String> actualVersions = null;
List<String> expectedRAVersions = null;
List<String> actualRAVersions = null;
private static void checkRegionSize(final Region region, final int expectedSize) {
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return region.size() == expectedSize;
}
@Override
public String description() {
return null;
}
};
Wait.waitForCriterion(ev, 10 * 1000, 1000, true);
assertEquals(expectedSize, region.size());
}
/**
* Tests putAll to one server.
*/
@Test
public void testOneServer() throws CacheException, InterruptedException {
final String title = "testOneServer:";
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server.getHost());
// set <false, true> means <PR=false, notifyBySubscription=true> to enable registerInterest and
// CQ
final int serverPort = createBridgeServer(server, regionName, 0, false, 0, null);
createClient(client1, regionName, serverHost, new int[] {serverPort}, -1, -1, false, true,
true);
createClient(client2, regionName, serverHost, new int[] {serverPort}, -1, -1, false, true,
true);
server.invoke(new CacheSerializableRunnable(title + "server add listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
}
});
client2
.invoke(new CacheSerializableRunnable(title + "client2 registerInterest and add listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
// registerInterest for ALL_KEYS
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client2 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
client1.invoke(
new CacheSerializableRunnable(title + "client1 create local region and run putAll") {
@Override
public void run2() throws CacheException {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.LOCAL);
factory2.addCacheListener(new MyListener(false));
createRegion("localsave", factory2.create());
Region region = doPutAll(regionName, "key-", numberOfEntries);
assertEquals(numberOfEntries, region.size());
}
});
AsyncInvocation async1 =
client1.invokeAsync(new CacheSerializableRunnable(title + "client1 create CQ") {
@Override
public void run2() throws CacheException {
// create a CQ for key 10-20
Region localregion = getRootRegion().getSubregion("localsave");
CqAttributesFactory cqf1 = new CqAttributesFactory();
EOCQEventListener EOCQListener = new EOCQEventListener(localregion);
cqf1.addCqListener(EOCQListener);
CqAttributes cqa1 = cqf1.create();
String cqName1 = "EOInfoTracker";
String queryStr1 = "SELECT ALL * FROM /root/" + regionName
+ " ii WHERE ii.getTicker() >= '10' and ii.getTicker() < '20'";
LogWriterUtils.getLogWriter().info("Query String: " + queryStr1);
try {
QueryService cqService = getCache().getQueryService();
CqQuery EOTracker = cqService.newCq(cqName1, queryStr1, cqa1);
SelectResults rs1 = EOTracker.executeWithInitialResults();
List list1 = rs1.asList();
for (int i = 0; i < list1.size(); i++) {
Struct s = (Struct) list1.get(i);
TestObject o = (TestObject) s.get("value");
LogWriterUtils.getLogWriter().info("InitialResult:" + i + ":" + o);
localregion.put("key-" + i, o);
}
if (localregion.size() > 0) {
LogWriterUtils.getLogWriter().info("CQ is ready");
synchronized (lockObject) {
lockObject.notify();
}
}
waitTillNotify(lockObject2, 20000,
(EOCQListener.num_creates == 5 && EOCQListener.num_updates == 5));
EOTracker.close();
} catch (CqClosedException e) {
Assert.fail("CQ", e);
} catch (RegionNotFoundException e) {
Assert.fail("CQ", e);
} catch (QueryInvalidException e) {
Assert.fail("CQ", e);
} catch (CqExistsException e) {
Assert.fail("CQ", e);
} catch (CqException e) {
Assert.fail("CQ", e);
}
}
});
server.invoke(new CacheSerializableRunnable(title + "verify cache server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries, region.size());
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("key-" + i).getValue();
assertEquals(i, obj.getPrice());
}
}
});
// verify CQ is ready
client1.invoke(new CacheSerializableRunnable(title + "verify CQ is ready") {
@Override
public void run2() throws CacheException {
Region localregion = getRootRegion().getSubregion("localsave");
waitTillNotify(lockObject, 10000, (localregion.size() > 0));
assertTrue(localregion.size() > 0);
}
});
// verify registerInterest result at client2
client2.invoke(new CacheSerializableRunnable(title + "verify client2") {
@Override
public void run2() throws CacheException {
final Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, numberOfEntries);
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("key-" + i).getValue();
assertEquals(i, obj.getPrice());
}
// then do update for key 10-20 to trigger CQ at server2
// destroy key 10-14 to simulate create/update mix case
region.removeAll(Arrays.asList("key-10", "key-11", "key-12", "key-13", "key-14"),
"removeAllCallback");
assertEquals(null, region.get("key-10"));
assertEquals(null, region.get("key-11"));
assertEquals(null, region.get("key-12"));
assertEquals(null, region.get("key-13"));
assertEquals(null, region.get("key-14"));
}
});
// verify CQ result at client1
client1.invoke(new CacheSerializableRunnable(title + "Verify client1") {
@Override
public void run2() throws CacheException {
Region localregion = getRootRegion().getSubregion("localsave");
for (int i = 10; i < 15; i++) {
TestObject obj = null;
int cnt = 0;
while (cnt < 100) {
obj = (TestObject) localregion.get("key-" + i);
if (obj != null) {
// wait for the key to be destroyed
Wait.pause(100);
if (LogWriterUtils.getLogWriter().fineEnabled()) {
LogWriterUtils.getLogWriter()
.info("Waiting 100ms(" + cnt + ") for key-" + i + " to be destroyed");
}
cnt++;
} else {
break;
}
}
}
}
});
client2.invoke(new CacheSerializableRunnable(title + "verify client2") {
@Override
public void run2() throws CacheException {
final Region region = getRootRegion().getSubregion(regionName);
LinkedHashMap map = new LinkedHashMap();
for (int i = 10; i < 20; i++) {
map.put("key-" + i, new TestObject(i * 10));
}
region.putAll(map, "putAllCallback");
}
});
// verify CQ result at client1
client1.invoke(new CacheSerializableRunnable(title + "Verify client1") {
@Override
public void run2() throws CacheException {
Region localregion = getRootRegion().getSubregion("localsave");
for (int i = 10; i < 20; i++) {
TestObject obj = null;
int cnt = 0;
while (cnt < 100) {
obj = (TestObject) localregion.get("key-" + i);
if (obj == null || obj.getPrice() != i * 10) {
Wait.pause(100);
LogWriterUtils.getLogWriter()
.info("Waiting 100ms(" + cnt + ") for obj.getPrice() == i*10 at entry " + i);
cnt++;
} else {
break;
}
}
assertEquals(i * 10, obj.getPrice());
}
synchronized (lockObject2) {
lockObject2.notify();
}
}
});
ThreadUtils.join(async1, 30 * 1000);
// Test Exception handling
// verify CQ is ready
client1.invoke(new CacheSerializableRunnable(title + "test exception handling") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
Map m = null;
boolean NPEthrowed = false;
try {
region.putAll(m, "putAllCallback");
fail("Should have thrown NullPointerException");
} catch (NullPointerException ex) {
NPEthrowed = true;
}
assertTrue(NPEthrowed);
region.localDestroyRegion();
boolean RDEthrowed = false;
try {
m = new HashMap();
for (int i = 1; i < 21; i++) {
m.put(new Integer(i), Integer.toString(i));
}
region.putAll(m, "putAllCallback");
fail("Should have thrown RegionDestroyedException");
} catch (RegionDestroyedException ex) {
RDEthrowed = true;
}
assertTrue(RDEthrowed);
try {
region.removeAll(Arrays.asList("key-10", "key-11"), "removeAllCallback");
fail("Should have thrown RegionDestroyedException");
} catch (RegionDestroyedException expected) {
}
}
});
// Stop server
stopBridgeServers(getCache());
}
/**
* Tests putAll afterUpdate event contained oldValue.
*/
@Test
public void testOldValueInEvent() throws CacheException, InterruptedException {
final String title = "testOldValueInEvent:";
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set notifyBySubscription=false to test local-invalidates
final int serverPort1 = createBridgeServer(server1, regionName, 0, false, 0, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, false, 0, null);
createClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, -1, false, true,
true);
createClient(client2, regionName, serverHost, new int[] {serverPort2}, -1, -1, false, true,
true);
client2
.invoke(new CacheSerializableRunnable(title + "client2 registerInterest and add listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
// registerInterest for ALL_KEYS
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client2 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
client1.invoke(
new CacheSerializableRunnable(title + "client1 create local region and run putAll") {
@Override
public void run2() throws CacheException {
// create keys
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
doPutAll(regionName, title, numberOfEntries);
assertEquals(numberOfEntries, region.size());
// update keys
doPutAll(regionName, title, numberOfEntries);
assertEquals(numberOfEntries, region.size());
}
});
// verify 41890, the local PUTALL_UPDATE event should contain old value
client1.invoke(new CacheSerializableRunnable(title + "verify after update events") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
waitTillNotify(lockObject, 20000, (region.size() == numberOfEntries));
}
});
client2.invoke(new CacheSerializableRunnable(title + "verify after update events") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
waitTillNotify(lockObject, 20000, (region.size() == numberOfEntries));
}
});
// Stop server
stopBridgeServers(getCache());
}
/**
* Create PR without redundancy on 2 servers with lucene index. Feed some key s. From a client, do
* removeAll on keys in server1. During the removeAll, restart server1 and trigger the removeAll
* to retry. The retried removeAll should return the version tag of tombstones. Do removeAll again
* on the same key, it should get the version tag again.
*/
@Test
public void shouldReturnVersionTagOfTombstoneVersionWhenRemoveAllRetried()
throws CacheException, InterruptedException {
final String title = "test51871:";
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set notifyBySubscription=false to test local-invalidates
final int serverPort1 = createBridgeServer(server1, regionName, 0, true, 0, "ds1");
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, -1, true);
client1.invoke(new CacheSerializableRunnable(title + "client1 add listener and putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
doPutAll(regionName, "key-", numberOfEntries);
assertEquals(numberOfEntries, region.size());
}
});
// verify cache server 1, its data are from client
server1.invoke(new CacheSerializableRunnable(title + "verify cache server 1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries, region.size());
}
});
@SuppressWarnings("unchecked")
VersionedObjectList versions =
(VersionedObjectList) client1.invoke(new SerializableCallable(title + "client1 removeAll") {
@Override
public Object call() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
VersionedObjectList versions = doRemoveAll(regionName, "key-", numberOfEntries);
assertEquals(0, region.size());
return versions;
}
});
@SuppressWarnings("unchecked")
VersionedObjectList versionsAfterRetry = (VersionedObjectList) client1
.invoke(new SerializableCallable(title + "client1 removeAll again") {
@Override
public Object call() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
VersionedObjectList versions = doRemoveAll(regionName, "key-", numberOfEntries);
assertEquals(0, region.size());
return versions;
}
});
LogWriterUtils.getLogWriter().info("Version tags are:" + versions.getVersionTags() + ":"
+ versionsAfterRetry.getVersionTags());
assertEquals(versionsAfterRetry.getVersionTags(), versions.getVersionTags());
// clean up
// Stop serverß
stopBridgeServers(getCache());
}
/**
* Tests putAll and removeAll to 2 servers. Use Case: 1) putAll from a single-threaded client to a
* replicated region 2) putAll from a multi-threaded client to a replicated region 3)
*/
@Test
public void test2Server() throws CacheException, InterruptedException {
final String title = "test2Server:";
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set notifyBySubscription=false to test local-invalidates
final int serverPort1 = createBridgeServer(server1, regionName, 0, false, 0, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, false, 0, null);
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, -1, true);
createBridgeClient(client2, regionName, serverHost, new int[] {serverPort2}, -1, -1, true);
client2.invoke(new CacheSerializableRunnable(title + "client2 add listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 add listener and putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
doPutAll(regionName, "key-", numberOfEntries);
assertEquals(numberOfEntries, region.size());
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("key-" + i).getValue();
assertEquals(i, obj.getPrice());
}
}
});
// verify cache server 1, its data are from client
server1.invoke(new CacheSerializableRunnable(title + "verify cache server 1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().setCacheWriter(new MyWriter("key-"));
assertEquals(numberOfEntries, region.size());
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("key-" + i).getValue();
assertEquals(i, obj.getPrice());
}
}
});
// verify cache server 2, because its data are from distribution
server2.invoke(new CacheSerializableRunnable(title + "verify cache server 2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().setCacheWriter(new MyWriter("key-"));
assertEquals(numberOfEntries, region.size());
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("key-" + i).getValue();
assertEquals(i, obj.getPrice());
}
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 verify putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, numberOfEntries);
for (int i = 0; i < numberOfEntries; i++) {
Region.Entry re = region.getEntry("key-" + i);
assertNotNull(re);
assertEquals(null, re.getValue());
}
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
doRemoveAll(regionName, "key-", numberOfEntries);
assertEquals(0, region.size());
}
});
// verify cache server 1, its data are from client
server1.invoke(new CacheSerializableRunnable(title + "verify removeAll cache server 1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
MyWriter mywriter = (MyWriter) region.getAttributes().getCacheWriter();
LogWriterUtils.getLogWriter()
.info("server cachewriter triggered for destroy: " + mywriter.num_destroyed);
assertEquals(numberOfEntries, mywriter.num_destroyed);
}
});
// verify cache server 2, because its data are from distribution
server2.invoke(new CacheSerializableRunnable(title + "verify removeAll cache server 2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
MyWriter mywriter = (MyWriter) region.getAttributes().getCacheWriter();
LogWriterUtils.getLogWriter()
.info("server cachewriter triggered for destroy: " + mywriter.num_destroyed);
// beforeDestroys are only triggered at server1 since the removeAll is submitted from
// client1
assertEquals(0, mywriter.num_destroyed);
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 verify removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, 0);
}
});
{
// Execute client putAll from multithread client
AsyncInvocation async1 =
client1.invokeAsync(new CacheSerializableRunnable(title + "async putAll1 from client1") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, "async1key-", numberOfEntries);
}
});
AsyncInvocation async2 =
client1.invokeAsync(new CacheSerializableRunnable(title + "async putAll2 from client1") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, "async2key-", numberOfEntries);
}
});
ThreadUtils.join(async1, 30 * 1000);
ThreadUtils.join(async2, 30 * 1000);
}
client1.invoke(new CacheSerializableRunnable(title + "verify client 1 for async keys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries * 2, region.size());
long ts1 = 0, ts2 = 0;
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("async1key-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts1);
ts1 = obj.getTS();
obj = (TestObject) region.getEntry("async2key-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts2);
ts2 = obj.getTS();
}
}
});
// verify cache server 1 for asyn keys
server1.invoke(new CacheSerializableRunnable(title + "verify cache server 1 for async keys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries * 2, region.size());
long ts1 = 0, ts2 = 0;
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("async1key-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts1);
ts1 = obj.getTS();
obj = (TestObject) region.getEntry("async2key-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts2);
ts2 = obj.getTS();
}
}
});
// verify cache server 2 for asyn keys
server2.invoke(new CacheSerializableRunnable(title + "verify cache server 2 for async keys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries * 2, region.size());
long ts1 = 0, ts2 = 0;
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("async1key-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts1);
ts1 = obj.getTS();
obj = (TestObject) region.getEntry("async2key-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts2);
ts2 = obj.getTS();
}
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 verify async putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, numberOfEntries * 2);
for (int i = 0; i < numberOfEntries; i++) {
Region.Entry re = region.getEntry("async1key-" + i);
assertNotNull(re);
assertEquals(null, re.getValue());
}
for (int i = 0; i < numberOfEntries; i++) {
Region.Entry re = region.getEntry("async2key-" + i);
assertNotNull(re);
assertEquals(null, re.getValue());
}
}
});
{
// Execute client removeAll from multithread client
AsyncInvocation async1 =
client1.invokeAsync(new CacheSerializableRunnable(title + "async putAll1 from client1") {
@Override
public void run2() throws CacheException {
doRemoveAll(regionName, "async1key-", numberOfEntries);
}
});
AsyncInvocation async2 =
client1.invokeAsync(new CacheSerializableRunnable(title + "async putAll2 from client1") {
@Override
public void run2() throws CacheException {
doRemoveAll(regionName, "async2key-", numberOfEntries);
}
});
ThreadUtils.join(async1, 30 * 1000);
ThreadUtils.join(async2, 30 * 1000);
}
client1.invoke(new CacheSerializableRunnable(title + "client1 removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
doRemoveAll(regionName, "key-", numberOfEntries);
assertEquals(0, region.size());
}
});
// verify cache server 1, its data are from client
server1.invoke(new CacheSerializableRunnable(title + "verify async removeAll cache server 1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
}
});
// verify cache server 2, because its data are from distribution
server2.invoke(new CacheSerializableRunnable(title + "verify async removeAll cache server 2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 verify async removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, 0);
}
});
// Execute p2p putAll
server1.invoke(new CacheSerializableRunnable(title + "server1 execute P2P putAll") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, "p2pkey-", numberOfEntries);
Region region = getRootRegion().getSubregion(regionName);
long ts1 = 0;
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("p2pkey-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts1);
ts1 = obj.getTS();
}
}
});
// verify cache server 2 for p2p keys
server2.invoke(new CacheSerializableRunnable(title + "verify cache server 2 for p2p keys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
long ts1 = 0;
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("p2pkey-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts1);
ts1 = obj.getTS();
}
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 verify p2p putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, numberOfEntries);
for (int i = 0; i < numberOfEntries; i++) {
Region.Entry re = region.getEntry("p2pkey-" + i);
assertNotNull(re);
assertEquals(null, re.getValue());
}
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 verify p2p putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, numberOfEntries);
for (int i = 0; i < numberOfEntries; i++) {
Region.Entry re = region.getEntry("p2pkey-" + i);
assertNotNull(re);
assertEquals(null, re.getValue());
}
}
});
// Execute p2p removeAll
server1.invoke(new CacheSerializableRunnable(title + "server1 execute P2P removeAll") {
@Override
public void run2() throws CacheException {
doRemoveAll(regionName, "p2pkey-", numberOfEntries);
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
}
});
// verify cache server 2, because its data are from distribution
server2.invoke(new CacheSerializableRunnable(title + "verify p2p removeAll cache server 2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 verify p2p removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, 0);
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 verify p2p removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, 0);
}
});
// putAll at client2 to trigger local-invalidates at client1
client2.invoke(new CacheSerializableRunnable(title + "execute putAll on client2 for key 0-10") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, "key-", 10);
}
});
// verify client 2 for key 0-10
client1.invoke(new CacheSerializableRunnable(title + "verify client1 for local invalidate") {
@Override
public void run2() throws CacheException {
final Region region = getRootRegion().getSubregion(regionName);
for (int i = 0; i < 10; i++) {
final int ii = i;
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
Entry entry = region.getEntry("key-" + ii);
return entry != null && entry.getValue() == null;
}
@Override
public String description() {
return null;
}
};
Wait.waitForCriterion(ev, 10 * 1000, 1000, true);
// local invalidate will set the value to null
TestObject obj = null;
obj = (TestObject) region.getEntry("key-" + i).getValue();
assertEquals(null, obj);
}
}
});
// clean up
// Stop server
stopBridgeServers(getCache());
}
/* same as test2Server(), but all the servers are using policy normal */
private int createServerRegion(VM vm, final String regionName, final boolean CCE) {
SerializableCallable createRegion = new SerializableCallable() {
@Override
public Object call() throws Exception {
AttributesFactory af = new AttributesFactory();
af.setConcurrencyChecksEnabled(CCE);
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDataPolicy(DataPolicy.NORMAL);
createRootRegion(new AttributesFactory().create());
Region region = createRegion(regionName, af.create());
CacheServer server = getCache().addCacheServer();
int port = 0;
server.setPort(port);
server.start();
port = server.getPort();
return port;
}
};
return (Integer) vm.invoke(createRegion);
}
public void doTest2NormalServerCCE(boolean CCE) throws CacheException, InterruptedException {
final String title = "doTest2NormalServerCCE=" + CCE + ":";
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set notifyBySubscription=false to test local-invalidates
int serverPort1 = createServerRegion(server1, regionName, CCE);
int serverPort2 = createServerRegion(server2, regionName, CCE);
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, 59000, true);
createBridgeClient(client2, regionName, serverHost, new int[] {serverPort2}, -1, 59000, true);
client2.invoke(new CacheSerializableRunnable(title + "client2 add listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
}
});
// test case 1: putAll and removeAll to server1
client1.invoke(new CacheSerializableRunnable(title + "client1 add listener and putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
doPutAll(regionName, "case1-", numberOfEntries);
assertEquals(numberOfEntries, region.size());
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("case1-" + i).getValue();
assertEquals(i, obj.getPrice());
}
region.put(numberOfEntries, "specialvalue");
}
});
// verify cache server 1, its data are from client
server1.invoke(new CacheSerializableRunnable(title + "verify cache server 1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().setCacheWriter(new MyWriter("case1-"));
region.localDestroy(numberOfEntries);
assertEquals(numberOfEntries, region.size());
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("case1-" + i).getValue();
assertEquals(i, obj.getPrice());
}
}
});
server2.invoke(new CacheSerializableRunnable(title + "verify cache server 2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().setCacheWriter(new MyWriter("case1-"));
// normal policy will not distribute create events
assertEquals(0, region.size());
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 removeAll") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries + 1, region.size());
// do removeAll with some keys not exist
doRemoveAll(regionName, "case1-", numberOfEntries * 2);
assertEquals(1, region.size());
region.localDestroy(numberOfEntries); // do cleanup
}
});
// verify cache server 1, its data are from client
server1.invoke(new CacheSerializableRunnable(title + "verify removeAll cache server 1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
MyWriter mywriter = (MyWriter) region.getAttributes().getCacheWriter();
LogWriterUtils.getLogWriter()
.info("server cachewriter triggered for destroy: " + mywriter.num_destroyed);
assertEquals(numberOfEntries, mywriter.num_destroyed);
}
});
// verify cache server 2, because its data are from distribution
server2.invoke(new CacheSerializableRunnable(title + "verify removeAll cache server 2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
MyWriter mywriter = (MyWriter) region.getAttributes().getCacheWriter();
LogWriterUtils.getLogWriter()
.info("server cachewriter triggered for destroy: " + mywriter.num_destroyed);
// beforeDestroys are only triggered at server1 since the removeAll is submitted from
// client1
assertEquals(0, mywriter.num_destroyed);
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 verify removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, 0);
}
});
// test case 2: putAll to server1, removeAll to server2
client1.invoke(new CacheSerializableRunnable(title + "client1 add listener and putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
doPutAll(regionName, "case2-", numberOfEntries);
assertEquals(numberOfEntries, region.size());
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("case2-" + i).getValue();
assertEquals(i, obj.getPrice());
}
}
});
// verify cache server 1, its data are from client
server1.invoke(new CacheSerializableRunnable(title + "verify cache server 1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries, region.size());
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("case2-" + i).getValue();
assertEquals(i, obj.getPrice());
}
}
});
server2.invoke(new CacheSerializableRunnable(title + "verify cache server 2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// normal policy will not distribute create events
assertEquals(0, region.size());
}
});
client2.invoke(new CacheSerializableRunnable(title + "client1 removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
doRemoveAll(regionName, "case2-", numberOfEntries);
assertEquals(0, region.size());
}
});
server1.invoke(new CacheSerializableRunnable(title + "verify removeAll cache server 1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(100, region.size());
}
});
server2.invoke(new CacheSerializableRunnable(title + "verify removeAll cache server 2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 verify removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, 100);
doRemoveAll(regionName, "case2-", numberOfEntries); // do cleanup
}
});
// test case 3: removeAll a list with duplicated keys
client1.invoke(new CacheSerializableRunnable(title + "put 3 keys then removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.put("case3-1", "case3-1");
region.put("case3-2", "case3-2");
region.put("case3-3", "case3-3");
assertEquals(3, region.size());
ArrayList keys = new ArrayList();
keys.add("case3-1");
keys.add("case3-2");
keys.add("case3-3");
keys.add("case3-1");
region.removeAll(keys, "removeAllCallback");
assertEquals(0, region.size());
}
});
// verify cache server 1, its data are from client
server1.invoke(new CacheSerializableRunnable(title + "verify cache server 1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
}
});
// clean up
// Stop server
stopBridgeServers(getCache());
}
@Test
public void test2NormalServerCCE() throws CacheException, InterruptedException {
doTest2NormalServerCCE(true);
disconnectAllFromDS();
doTest2NormalServerCCE(false);
disconnectAllFromDS();
}
@Test
public void testPRServerRVDuplicatedKeys() throws CacheException, InterruptedException {
doRVDuplicatedKeys(true, 1);
disconnectAllFromDS();
doRVDuplicatedKeys(true, 0);
disconnectAllFromDS();
doRVDuplicatedKeys(false, 1);
disconnectAllFromDS();
}
public void doRVDuplicatedKeys(final boolean isPR, final int redundantCopies)
throws CacheException, InterruptedException {
final String title = "doRVDuplicatedKeys:";
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
final int serverPort1 = createBridgeServer(server1, regionName, 0, isPR, redundantCopies, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, isPR, redundantCopies, null);
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, 59000, false);
createBridgeClient(client2, regionName, serverHost, new int[] {serverPort2}, -1, 59000, false);
client2.invoke(new CacheSerializableRunnable(title + "client2 add listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 add listener and putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
doPutAll(regionName, "key-", numberOfEntries);
assertEquals(numberOfEntries, region.size());
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("key-" + i).getValue();
assertEquals(i, obj.getPrice());
}
}
});
// test case 3: removeAll a list with duplicated keys
client1.invoke(new CacheSerializableRunnable(title + "put 3 keys then removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.put("case3-1", "case3-1");
region.put("case3-2", "case3-2");
region.put("case3-3", "case3-3");
}
});
client2.invoke(new CacheSerializableRunnable(title + "verify cache server 2") {
@Override
public void run2() throws CacheException {
Wait.pause(5000);
Region region = getRootRegion().getSubregion(regionName);
Region.Entry re = region.getEntry("case3-1");
assertNotNull(re);
re = region.getEntry("case3-2");
assertNotNull(re);
re = region.getEntry("case3-3");
assertNotNull(re);
}
});
client1.invoke(new CacheSerializableRunnable(title + "put 3 keys then removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
ArrayList keys = new ArrayList();
keys.add("case3-1");
keys.add("case3-2");
keys.add("case3-3");
keys.add("case3-1");
region.removeAll(keys, "removeAllCallback");
Region.Entry re = region.getEntry("case3-1");
assertNull(re);
re = region.getEntry("case3-2");
assertNull(re);
re = region.getEntry("case3-3");
assertNull(re);
}
});
server1.invoke(new CacheSerializableRunnable(title + "verify cache server 1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
Region.Entry re = region.getEntry("case3-1");
assertNull(re);
re = region.getEntry("case3-2");
assertNull(re);
re = region.getEntry("case3-3");
assertNull(re);
}
});
server2.invoke(new CacheSerializableRunnable(title + "verify cache server 2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
Region.Entry re = region.getEntry("case3-1");
assertNull(re);
re = region.getEntry("case3-2");
assertNull(re);
re = region.getEntry("case3-3");
assertNull(re);
}
});
client2.invoke(new CacheSerializableRunnable(title + "verify cache server 2") {
@Override
public void run2() throws CacheException {
Wait.pause(5000);
Region region = getRootRegion().getSubregion(regionName);
Region.Entry re = region.getEntry("case3-1");
assertNull(re);
re = region.getEntry("case3-2");
assertNull(re);
re = region.getEntry("case3-3");
assertNull(re);
}
});
// clean up
// Stop server
stopBridgeServers(getCache());
}
@Test
public void testBug51725() throws CacheException, InterruptedException {
doBug51725(false);
disconnectAllFromDS();
}
@Test
public void testBug51725_singlehup() throws CacheException, InterruptedException {
doBug51725(true);
disconnectAllFromDS();
}
/**
* One failure found in bug 51725, both for putAll and removeAll: non-singlehop with redundency=1.
* Do some singlehop putAll to see if retry succeeded after PRE This is a singlehop putAll test.
*/
public void doBug51725(boolean enableSingleHop) throws CacheException, InterruptedException {
final String title = "doBug51725:";
int client1Size;
int client2Size;
int server1Size;
int server2Size;
final Host host = Host.getHost(0);
final VM server1 = host.getVM(0);
final VM server2 = host.getVM(1);
final VM client1 = host.getVM(2);
final VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
final int serverPort1 = createBridgeServer(server1, regionName, 0, true, 0, "ds1");
final int serverPort2 = createBridgeServer(server2, regionName, 0, true, 0, "ds1");
createClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, 59000, false, false,
enableSingleHop);
createClient(client2, regionName, serverHost, new int[] {serverPort1}, -1, 59000, false, false,
enableSingleHop);
server1.invoke(addExceptionTag1(expectedExceptions));
server2.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(addExceptionTag1(expectedExceptions));
client2.invoke(addExceptionTag1(expectedExceptions));
client2.invoke(new CacheSerializableRunnable(title + "client2 add listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client2 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
client1.invoke(new CacheSerializableRunnable(title + "put 3 keys then removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
// do putAll to create all buckets
doPutAll(regionName, "key-", numberOfEntries);
assertEquals(numberOfEntries, region.size());
}
});
closeCache(server2);
client1.invoke(new CacheSerializableRunnable(title + "putAll from client again") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
try {
doPutAll(regionName, title, numberOfEntries);
fail("Expect ServerOperationException caused by PutAllParitialResultException");
} catch (ServerOperationException soe) {
assertTrue(soe.getMessage()
.contains(
String.format("Region %s putAll at server applied partial keys due to exception.",
region.getFullPath())));
assertTrue(soe.getCause() instanceof RuntimeException);
}
assertEquals(numberOfEntries * 3 / 2, region.size());
VersionTag tag;
int cnt = 0;
for (int i = 0; i < numberOfEntries; i++) {
tag = region.getVersionTag(title + i);
if (tag != null && 1 == tag.getEntryVersion()) {
cnt++;
}
}
assertEquals(numberOfEntries / 2, cnt);
try {
doRemoveAll(regionName, title, numberOfEntries);
} catch (ServerOperationException soe) {
assertTrue(soe.getMessage()
.contains(String.format(
"Region %s removeAll at server applied partial keys due to exception.",
region.getFullPath())));
assertTrue(soe.getCause() instanceof RuntimeException);
}
Region.Entry re;
// putAll only created 50 entries, removeAll removed them. So 100 entries are all NULL
for (int i = 0; i < numberOfEntries; i++) {
re = region.getEntry(title + i);
assertNull(re);
}
}
});
client2.invoke(new CacheSerializableRunnable(title + "verify entries from client2") {
@Override
public void run2() throws CacheException {
Wait.pause(5000);
Region region = getRootRegion().getSubregion(regionName);
Region.Entry re;
for (int i = 0; i < numberOfEntries; i++) {
re = region.getEntry(title + i);
assertNull(re);
}
}
});
// Stop server
stopBridgeServers(getCache());
}
/**
* Tests putAll to 2 PR servers.
*/
@Test
public void testPRServer() throws CacheException, InterruptedException {
final String title = "testPRServer:";
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
final int serverPort1 = createBridgeServer(server1, regionName, 0, true, 1, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, true, 1, null);
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, 59000, false);
createBridgeClient(client2, regionName, serverHost, new int[] {serverPort2}, -1, 59000, false);
client2.invoke(new CacheSerializableRunnable(title + "client2 add listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 add listener and putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
doPutAll(regionName, "key-", numberOfEntries);
assertEquals(numberOfEntries, region.size());
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("key-" + i).getValue();
assertEquals(i, obj.getPrice());
}
}
});
// verify cache server 1, its data are from client
server1.invoke(new CacheSerializableRunnable(title + "verify cache server 1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().setCacheWriter(new MyWriter("key-"));
assertEquals(numberOfEntries, region.size());
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("key-" + i).getValue();
assertEquals(i, obj.getPrice());
}
}
});
// verify cache server 2, because its data are from distribution
server2.invoke(new CacheSerializableRunnable(title + "verify cache server 2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().setCacheWriter(new MyWriter("key-"));
assertEquals(numberOfEntries, region.size());
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("key-" + i).getValue();
assertEquals(i, obj.getPrice());
}
}
});
client2.invoke(new CacheSerializableRunnable(title + "verify client2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, numberOfEntries);
for (int i = 0; i < numberOfEntries; i++) {
Region.Entry re = region.getEntry("key-" + i);
assertNotNull(re);
assertEquals(null, re.getValue());
}
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
doRemoveAll(regionName, "key-", numberOfEntries);
assertEquals(0, region.size());
}
});
// verify cache server 1, its data are from client
server1.invoke(new CacheSerializableRunnable(title + "verify removeAll cache server 1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
MyWriter mywriter = (MyWriter) region.getAttributes().getCacheWriter();
LogWriterUtils.getLogWriter()
.info("server cachewriter triggered for destroy: " + mywriter.num_destroyed);
// beforeDestroys are only triggered at primary buckets. server1 and server2 each holds half
// of buckets
assertEquals(numberOfEntries / 2, mywriter.num_destroyed);
}
});
// verify cache server 2, because its data are from distribution
server2.invoke(new CacheSerializableRunnable(title + "verify removeAll cache server 2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
MyWriter mywriter = (MyWriter) region.getAttributes().getCacheWriter();
LogWriterUtils.getLogWriter()
.info("server cachewriter triggered for destroy: " + mywriter.num_destroyed);
// beforeDestroys are only triggered at primary buckets. server1 and server2 each holds half
// of buckets
assertEquals(numberOfEntries / 2, mywriter.num_destroyed);
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 verify removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, 0);
}
});
// Execute client putAll from multithread client
{
AsyncInvocation async1 =
client1.invokeAsync(new CacheSerializableRunnable(title + "async putAll1 from client1") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, "async1key-", numberOfEntries);
}
});
AsyncInvocation async2 =
client1.invokeAsync(new CacheSerializableRunnable(title + "async putAll2 from client1") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, "async2key-", numberOfEntries);
}
});
ThreadUtils.join(async1, 30 * 1000);
ThreadUtils.join(async2, 30 * 1000);
}
client1.invoke(new CacheSerializableRunnable(title + "verify client 1 for async keys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries * 2, region.size());
long ts1 = 0, ts2 = 0;
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("async1key-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts1);
ts1 = obj.getTS();
obj = (TestObject) region.getEntry("async2key-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts2);
ts2 = obj.getTS();
}
}
});
// verify cache server 2 for asyn keys
server2.invoke(new CacheSerializableRunnable(title + "verify cache server 2 for async keys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
long ts1 = 0, ts2 = 0;
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("async1key-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts1);
ts1 = obj.getTS();
obj = (TestObject) region.getEntry("async2key-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts2);
ts2 = obj.getTS();
}
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 verify async putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, numberOfEntries * 2);
for (int i = 0; i < numberOfEntries; i++) {
Region.Entry re = region.getEntry("async1key-" + i);
assertNotNull(re);
assertEquals(null, re.getValue());
}
for (int i = 0; i < numberOfEntries; i++) {
Region.Entry re = region.getEntry("async2key-" + i);
assertNotNull(re);
assertEquals(null, re.getValue());
}
}
});
{
// Execute client removeAll from multithread client
AsyncInvocation async1 =
client1.invokeAsync(new CacheSerializableRunnable(title + "async putAll1 from client1") {
@Override
public void run2() throws CacheException {
doRemoveAll(regionName, "async1key-", numberOfEntries);
}
});
AsyncInvocation async2 =
client1.invokeAsync(new CacheSerializableRunnable(title + "async putAll2 from client1") {
@Override
public void run2() throws CacheException {
doRemoveAll(regionName, "async2key-", numberOfEntries);
}
});
ThreadUtils.join(async1, 30 * 1000);
ThreadUtils.join(async2, 30 * 1000);
}
client1.invoke(new CacheSerializableRunnable(title + "client1 removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
doRemoveAll(regionName, "key-", numberOfEntries);
assertEquals(0, region.size());
}
});
// verify cache server 1, its data are from client
server1.invoke(new CacheSerializableRunnable(title + "verify async removeAll cache server 1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
}
});
// verify cache server 2, because its data are from distribution
server2.invoke(new CacheSerializableRunnable(title + "verify async removeAll cache server 2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 verify async removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, 0);
}
});
// Execute p2p putAll
server1.invoke(new CacheSerializableRunnable(title + "server1 execute P2P putAll") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, "p2pkey-", numberOfEntries);
Region region = getRootRegion().getSubregion(regionName);
long ts1 = 0;
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("p2pkey-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts1);
ts1 = obj.getTS();
}
}
});
// verify cache server 2 for p2p keys
server2.invoke(new CacheSerializableRunnable(title + "verify cache server 2 for async keys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
long ts1 = 0;
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("p2pkey-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts1);
ts1 = obj.getTS();
}
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 verify p2p putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, numberOfEntries);
for (int i = 0; i < numberOfEntries; i++) {
Region.Entry re = region.getEntry("p2pkey-" + i);
assertNotNull(re);
assertEquals(null, re.getValue());
}
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 verify p2p putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, numberOfEntries);
for (int i = 0; i < numberOfEntries; i++) {
Region.Entry re = region.getEntry("p2pkey-" + i);
assertNotNull(re);
assertEquals(null, re.getValue());
}
}
});
// Execute p2p removeAll
server1.invoke(new CacheSerializableRunnable(title + "server1 execute P2P removeAll") {
@Override
public void run2() throws CacheException {
doRemoveAll(regionName, "p2pkey-", numberOfEntries);
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
}
});
// verify cache server 2, because its data are from distribution
server2.invoke(new CacheSerializableRunnable(title + "verify p2p removeAll cache server 2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, region.size());
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 verify p2p removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, 0);
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 verify p2p removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
checkRegionSize(region, 0);
}
});
// putAll at client2 to trigger local-invalidates at client1
client2.invoke(new CacheSerializableRunnable(title + "execute putAll on client2 for key 0-10") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, "key-", 10);
}
});
// verify client 2 for key 0-10
client1.invoke(new CacheSerializableRunnable(title + "verify client1 for local invalidate") {
@Override
public void run2() throws CacheException {
final Region region = getRootRegion().getSubregion(regionName);
for (int i = 0; i < 10; i++) {
final int ii = i;
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
Entry entry = region.getEntry("key-" + ii);
return entry != null && entry.getValue() == null;
}
@Override
public String description() {
return null;
}
};
Wait.waitForCriterion(ev, 10 * 1000, 1000, true);
// local invalidate will set the value to null
TestObject obj = (TestObject) region.getEntry("key-" + i).getValue();
assertEquals(null, obj);
}
}
});
// Stop server
stopBridgeServers(getCache());
}
/**
* Checks to see if a client does a destroy that throws an exception from CacheWriter
* beforeDestroy that the size of the region is still correct. See bug 51583.
*/
@Test
public void testClientDestroyOfUncreatedEntry() throws CacheException, InterruptedException {
final String title = "testClientDestroyOfUncreatedEntry:";
final Host host = Host.getHost(0);
final VM server1 = host.getVM(0);
final VM client1 = host.getVM(1);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
final int serverPort1 = createBridgeServer(server1, regionName, 0, false, 0, null);
createClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, -1, false, true,
true);
server1.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(addExceptionTag1(expectedExceptions));
server1.invoke(new CacheSerializableRunnable(title + "server1 add cacheWriter") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// Install cacheWriter that causes the very first destroy to fail
region.getAttributesMutator().setCacheWriter(new MyWriter(0));
}
});
assertEquals(0, getRegionSize(server1, regionName));
client1.invoke(new CacheSerializableRunnable(title + "client1 destroy") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
try {
region.destroy("bogusKey");
fail("Expect ServerOperationException caused by CacheWriterException");
} catch (ServerOperationException expected) {
assertTrue(expected.getCause() instanceof CacheWriterException);
}
}
});
assertEquals(0, getRegionSize(server1, regionName));
server1.invoke(removeExceptionTag1(expectedExceptions));
client1.invoke(removeExceptionTag1(expectedExceptions));
stopBridgeServers(getCache());
}
/**
* Tests partial key putAll and removeAll to 2 servers with local region
*/
@Test
public void testPartialKeyInLocalRegion() throws CacheException, InterruptedException {
final String title = "testPartialKeyInLocalRegion:";
final Host host = Host.getHost(0);
final VM server1 = host.getVM(0);
final VM server2 = host.getVM(1);
final VM client1 = host.getVM(2);
final VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
final int serverPort1 = createBridgeServer(server1, regionName, 0, false, 0, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, false, 0, null);
createClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, -1, false, true,
true);
createClient(client2, regionName, serverHost, new int[] {serverPort1}, -1, -1, false, true,
true);
server1.invoke(addExceptionTag1(expectedExceptions));
server2.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(addExceptionTag1(expectedExceptions));
client2.invoke(addExceptionTag1(expectedExceptions));
server1.invoke(new CacheSerializableRunnable(title + "server1 add cacheWriter") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// let the server to trigger exception after created 15 keys
region.getAttributesMutator().setCacheWriter(new MyWriter(15));
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 add listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client1 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// create keys
try {
doPutAll(regionName, title, numberOfEntries);
fail("Expect ServerOperationException caused by PutAllParitialResultException");
} catch (ServerOperationException soe) {
assertTrue(soe.getMessage()
.contains(
String.format("Region %s putAll at server applied partial keys due to exception.",
region.getFullPath())));
assertTrue(soe.getCause() instanceof RuntimeException);
assertTrue(soe.getCause().getMessage()
.contains("Triggered exception as planned, created 15 keys"));
}
}
});
{
WaitCriterion waitForSizes = new WaitCriterion() {
@Override
public String description() {
return "waiting for conditions to be met";
}
@Override
public boolean done() {
int c1Size = getRegionSize(client1, regionName);
int c2Size = getRegionSize(client2, regionName);
int s1Size = getRegionSize(server1, regionName);
int s2Size = getRegionSize(server2, regionName);
LogWriterUtils.getLogWriter()
.info("region sizes: " + c1Size + "," + c2Size + "," + s1Size + "," + s2Size);
if (c1Size != 15) {
LogWriterUtils.getLogWriter().info("waiting for client1 to get all updates");
return false;
}
if (c2Size != 15) {
LogWriterUtils.getLogWriter().info("waiting for client2 to get all updates");
return false;
}
if (s1Size != 15) {
LogWriterUtils.getLogWriter().info("waiting for server1 to get all updates");
return false;
}
if (s2Size != 15) {
LogWriterUtils.getLogWriter().info("waiting for server2 to get all updates");
return false;
}
return true;
}
};
Wait.waitForCriterion(waitForSizes, 10000, 1000, true);
}
int server1Size = getRegionSize(server1, regionName);
int server2Size = getRegionSize(server1, regionName);
// reset cacheWriter's count to allow another 15 keys to be created
server1.invoke(new CacheSerializableRunnable(title + "server1 add cacheWriter") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// let the server to trigger exception after created 15 keys
region.getAttributesMutator().setCacheWriter(new MyWriter(15));
}
});
// p2p putAll on DR and expect exception
server2.invoke(new CacheSerializableRunnable(title + "server2 add listener and putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
// create keys
try {
doPutAll(regionName, title + "again:", numberOfEntries);
fail("Expect original RuntimeException caused by cacheWriter");
} catch (RuntimeException rte) {
assertTrue(rte.getMessage().contains("Triggered exception as planned, created 15 keys"));
}
}
});
server2Size = getRegionSize(server1, regionName);
assertEquals(server1Size + 15, server2Size);
{
WaitCriterion waitForSizes = new WaitCriterion() {
@Override
public String description() {
return "waiting for conditions to be met";
}
@Override
public boolean done() {
int c1Size = getRegionSize(client1, regionName);
int c2Size = getRegionSize(client2, regionName);
int s1Size = getRegionSize(server1, regionName);
int s2Size = getRegionSize(server2, regionName);
LogWriterUtils.getLogWriter()
.info("region sizes: " + c1Size + "," + c2Size + "," + s1Size + "," + s2Size);
if (c1Size != 15) { // client 1 did not register interest
LogWriterUtils.getLogWriter().info("waiting for client1 to get all updates");
return false;
}
if (c2Size != 15 * 2) {
LogWriterUtils.getLogWriter().info("waiting for client2 to get all updates");
return false;
}
if (s1Size != 15 * 2) {
LogWriterUtils.getLogWriter().info("waiting for server1 to get all updates");
return false;
}
if (s2Size != 15 * 2) {
LogWriterUtils.getLogWriter().info("waiting for server2 to get all updates");
return false;
}
return true;
}
};
Wait.waitForCriterion(waitForSizes, 10000, 1000, true);
}
// now do a removeAll that is not allowed to remove everything
server1.invoke(new CacheSerializableRunnable(title + "server1 add cacheWriter") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// server triggers exception after destroying 5 keys
region.getAttributesMutator().setCacheWriter(new MyWriter(5));
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// create keys
try {
doRemoveAll(regionName, title, numberOfEntries);
fail("Expect ServerOperationException caused by PutAllParitialResultException");
} catch (ServerOperationException soe) {
assertTrue(soe.getMessage()
.contains(String.format(
"Region %s removeAll at server applied partial keys due to exception.",
region.getFullPath())));
assertTrue(soe.getCause() instanceof RuntimeException);
assertTrue(soe.getCause().getMessage()
.contains("Triggered exception as planned, destroyed 5 keys"));
}
}
});
{
WaitCriterion waitForSizes = new WaitCriterion() {
@Override
public String description() {
return "waiting for conditions to be met";
}
@Override
public boolean done() {
int c1Size = getRegionSize(client1, regionName);
int c2Size = getRegionSize(client2, regionName);
int s1Size = getRegionSize(server1, regionName);
int s2Size = getRegionSize(server2, regionName);
LogWriterUtils.getLogWriter()
.info("region sizes: " + c1Size + "," + c2Size + "," + s1Size + "," + s2Size);
if (c1Size != 15 - 5) { // client 1 did not register interest
LogWriterUtils.getLogWriter().info("waiting for client1 to get all destroys");
return false;
}
if (c2Size != (15 * 2) - 5) {
LogWriterUtils.getLogWriter().info("waiting for client2 to get all destroys");
return false;
}
if (s1Size != (15 * 2) - 5) {
LogWriterUtils.getLogWriter().info("waiting for server1 to get all destroys");
return false;
}
if (s2Size != (15 * 2) - 5) {
LogWriterUtils.getLogWriter().info("waiting for server2 to get all destroys");
return false;
}
return true;
}
};
Wait.waitForCriterion(waitForSizes, 10000, 1000, true);
}
// reset cacheWriter's count to allow another 5 keys to be destroyed
server1.invoke(new CacheSerializableRunnable(title + "server1 add cacheWriter") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// server triggers exception after destroying 5 keys
region.getAttributesMutator().setCacheWriter(new MyWriter(5));
}
});
// p2p putAll on DR and expect exception
server2.invoke(new CacheSerializableRunnable(title + "server2 add listener and removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
// create keys
try {
doRemoveAll(regionName, title + "again:", numberOfEntries);
fail("Expect original RuntimeException caused by cacheWriter");
} catch (RuntimeException rte) {
assertTrue(rte.getMessage().contains("Triggered exception as planned, destroyed 5 keys"));
}
}
});
{
WaitCriterion waitForSizes = new WaitCriterion() {
@Override
public String description() {
return "waiting for conditions to be met";
}
@Override
public boolean done() {
int c1Size = getRegionSize(client1, regionName);
int c2Size = getRegionSize(client2, regionName);
int s1Size = getRegionSize(server1, regionName);
int s2Size = getRegionSize(server2, regionName);
LogWriterUtils.getLogWriter()
.info("region sizes: " + c1Size + "," + c2Size + "," + s1Size + "," + s2Size);
if (c1Size != 15 - 5) { // client 1 did not register interest
LogWriterUtils.getLogWriter().info("waiting for client1 to get all destroys");
return false;
}
if (c2Size != (15 * 2) - 5 - 5) {
LogWriterUtils.getLogWriter().info("waiting for client2 to get all destroys");
return false;
}
if (s1Size != (15 * 2) - 5 - 5) {
LogWriterUtils.getLogWriter().info("waiting for server1 to get all destroys");
return false;
}
if (s2Size != (15 * 2) - 5 - 5) {
LogWriterUtils.getLogWriter().info("waiting for server2 to get all destroys");
return false;
}
return true;
}
};
Wait.waitForCriterion(waitForSizes, 10000, 1000, true);
}
server1.invoke(removeExceptionTag1(expectedExceptions));
server2.invoke(removeExceptionTag1(expectedExceptions));
client1.invoke(removeExceptionTag1(expectedExceptions));
client2.invoke(removeExceptionTag1(expectedExceptions));
// Stop server
stopBridgeServers(getCache());
}
/**
* Tests partial key putAll to 2 PR servers, because putting data at server side is different
* between PR and LR. PR does it in postPutAll. It's not running in singleHop putAll
*/
@Test
public void testPartialKeyInPR() throws CacheException, InterruptedException {
final String title = "testPartialKeyInPR:";
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
final VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
final int serverPort1 = createBridgeServer(server1, regionName, 0, true, 0, "ds1");
final int serverPort2 = createBridgeServer(server2, regionName, 0, true, 0, "ds1");
createClient(client1, regionName, serverHost, new int[] {serverPort1, serverPort2}, -1, -1,
false, false, true);
createClient(client2, regionName, serverHost, new int[] {serverPort1, serverPort2}, -1, -1,
false, false, true);
server1.invoke(addExceptionTag1(expectedExceptions));
server2.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(addExceptionTag1(expectedExceptions));
client2.invoke(addExceptionTag1(expectedExceptions));
server1.invoke(new CacheSerializableRunnable(title + "server1 add slow listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(true));
}
});
final SharedCounter sc_server2 = new SharedCounter("server2");
server2.invoke(new CacheSerializableRunnable(title + "server2 add slow listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator()
.addCacheListener(new MyListener(server2, true, sc_server2, 10));
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 add listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client2 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
AsyncInvocation async1 = client1
.invokeAsync(new CacheSerializableRunnable(title + "client1 add listener and putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
region.registerInterest("ALL_KEYS");
// create keys
try {
doPutAll(regionName, title, numberOfEntries);
fail("Expect ServerOperationException caused by PutAllParitialResultException");
} catch (ServerOperationException soe) {
if (!(soe.getCause() instanceof PartitionOfflineException)) {
throw soe;
}
if (!soe.getMessage()
.contains(String.format(
"Region %s putAll at server applied partial keys due to exception.",
region.getFullPath()))) {
throw soe;
}
}
}
});
// server2 will closeCache after created 10 keys
ThreadUtils.join(async1, 30 * 1000);
if (async1.exceptionOccurred()) {
Assert.fail("Aync1 get exceptions:", async1.getException());
}
int client1Size = getRegionSize(client1, regionName);
// client2Size maybe more than client1Size
int client2Size = getRegionSize(client2, regionName);
int server1Size = getRegionSize(server1, regionName);
LogWriterUtils.getLogWriter()
.info("region sizes: " + client1Size + "," + client2Size + "," + server1Size);
// restart server2
createBridgeServer(server2, regionName, serverPort2, true, 0, "ds1");
server1Size = getRegionSize(server1, regionName);
int server2Size = getRegionSize(server2, regionName);
LogWriterUtils.getLogWriter().info("region sizes after server2 restarted: " + client1Size + ","
+ client2Size + "," + server1Size + ":" + server2Size);
assertEquals(client2Size, server1Size);
assertEquals(client2Size, server2Size);
// close a server to re-run the test
closeCache(server2);
server1Size = getRegionSize(server1, regionName);
client1.invoke(new CacheSerializableRunnable(title + "client1 does putAll again") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// create keys
try {
doPutAll(regionName, title + "again:", numberOfEntries);
fail("Expect ServerOperationException caused by PutAllParitialResultException");
} catch (ServerOperationException soe) {
assertTrue(soe.getMessage()
.contains(
String.format("Region %s putAll at server applied partial keys due to exception.",
region.getFullPath())));
assertTrue(soe.getCause() instanceof PartitionOfflineException);
}
}
});
int new_server1Size = getRegionSize(server1, regionName);
int new_client1Size = getRegionSize(client1, regionName);
int new_client2Size = getRegionSize(client2, regionName);
LogWriterUtils.getLogWriter().info("region sizes after re-run the putAll: " + new_client1Size
+ "," + new_client2Size + "," + new_server1Size);
assertEquals(server1Size + numberOfEntries / 2, new_server1Size);
assertEquals(client1Size + numberOfEntries / 2, new_client1Size);
assertEquals(client2Size + numberOfEntries / 2, new_client2Size);
// restart server2
createBridgeServer(server2, regionName, serverPort2, true, 0, "ds1");
server1Size = getRegionSize(server1, regionName);
server2Size = getRegionSize(server2, regionName);
LogWriterUtils.getLogWriter()
.info("region sizes after restart server2: " + server1Size + "," + server2Size);
assertEquals(server1Size, server2Size);
// add a cacheWriter for server to stop after created 15 keys
server1.invoke(new CacheSerializableRunnable(title + "server1 execute P2P putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// let the server to trigger exception after created 15 keys
region.getAttributesMutator().setCacheWriter(new MyWriter(15));
}
});
// p2p putAll on PR and expect exception
server2.invoke(new CacheSerializableRunnable(title + "server2 add listener and putAll") {
@Override
public void run2() throws CacheException {
// create keys
try {
doPutAll(regionName, title + "once again:", numberOfEntries);
fail("Expected a CacheWriterException to be thrown by test");
} catch (CacheWriterException rte) {
assertTrue(rte.getMessage().contains("Triggered exception as planned, created 15 keys"));
}
}
});
new_server1Size = getRegionSize(server1, regionName);
int new_server2Size = getRegionSize(server2, regionName);
LogWriterUtils.getLogWriter()
.info("region sizes after restart server2: " + new_server1Size + "," + new_server2Size);
assertEquals(server1Size + 15, new_server1Size);
assertEquals(server2Size + 15, new_server2Size);
server1.invoke(removeExceptionTag1(expectedExceptions));
server2.invoke(removeExceptionTag1(expectedExceptions));
client1.invoke(removeExceptionTag1(expectedExceptions));
client2.invoke(removeExceptionTag1(expectedExceptions));
// Stop server
stopBridgeServers(getCache());
}
/**
* Tests partial key putAll to 2 PR servers, because putting data at server side is different
* between PR and LR. PR does it in postPutAll. This is a singlehop putAll test.
*/
@Test
public void testPartialKeyInPRSingleHop() throws CacheException, InterruptedException {
final String title = "testPartialKeyInPRSingleHop_";
final int cacheWriterAllowedKeyNum = 16;
final Host host = Host.getHost(0);
final VM server1 = host.getVM(0);
final VM server2 = host.getVM(1);
final VM client1 = host.getVM(2);
final VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
final int serverPort1 = createBridgeServer(server1, regionName, 0, true, 0, "ds1");
final int serverPort2 = createBridgeServer(server2, regionName, 0, true, 0, "ds1");
createClient(client1, regionName, serverHost, new int[] {serverPort1, serverPort2}, -1, -1,
false, false, true, false);
createClient(client2, regionName, serverHost, new int[] {serverPort1, serverPort2}, -1, -1,
false, false, true);
server1.invoke(addExceptionTag1(expectedExceptions));
server2.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(addExceptionTag1(expectedExceptions));
client2.invoke(addExceptionTag1(expectedExceptions));
try {
client2.invoke(new CacheSerializableRunnable(title + "client2 add listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client2 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
client1.invoke(new CacheSerializableRunnable(
title + "do some putAll to get ClientMetaData for future putAll") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, "key-", numberOfEntries);
}
});
WaitCriterion waitForSizes = new WaitCriterion() {
@Override
public String description() {
return "waiting for conditions to be met";
}
@Override
public boolean done() {
int c1Size = getRegionSize(client1, regionName);
int c2Size = getRegionSize(client2, regionName);
int s1Size = getRegionSize(server1, regionName);
int s2Size = getRegionSize(server2, regionName);
LogWriterUtils.getLogWriter()
.info("region sizes: " + c1Size + "," + c2Size + "," + s1Size + "," + s2Size);
if (c1Size != numberOfEntries) {
LogWriterUtils.getLogWriter().info("waiting for client1 to get all updates");
return false;
}
if (c2Size != numberOfEntries) {
LogWriterUtils.getLogWriter().info("waiting for client2 to get all updates");
return false;
}
if (s1Size != numberOfEntries) {
LogWriterUtils.getLogWriter().info("waiting for server1 to get all updates");
return false;
}
if (s2Size != numberOfEntries) {
LogWriterUtils.getLogWriter().info("waiting for server2 to get all updates");
return false;
}
return true;
}
};
Wait.waitForCriterion(waitForSizes, 10000, 1000, true);
server1.invoke(new CacheSerializableRunnable(title + "server1 add slow listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(true));
}
});
// add a listener that will close the cache at the 10th update
final SharedCounter sc_server2 = new SharedCounter("server2");
server2.invoke(new CacheSerializableRunnable(title + "server2 add slow listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator()
.addCacheListener(new MyListener(server2, true, sc_server2, 10));
}
});
AsyncInvocation async1 = client1
.invokeAsync(new CacheSerializableRunnable(title + "client1 add listener and putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
// create keys
try {
doPutAll(regionName, title, numberOfEntries);
fail("Expect ServerOperationException caused by PutAllParitialResultException");
} catch (ServerOperationException soe) {
assertTrue(soe.getMessage()
.contains(String.format(
"Region %s putAll at server applied partial keys due to exception.",
region.getFullPath())));
}
}
});
// server2 will closeCache after creating 10 keys
ThreadUtils.join(async1, 30 * 1000);
if (async1.exceptionOccurred()) {
Assert.fail("putAll client threw an exception", async1.getException());
}
// restart server2
System.out.println("restarting server 2");
createBridgeServer(server2, regionName, serverPort2, true, 0, "ds1");
// Test Case1: Trigger singleHop putAll. Stop server2 in middle.
// numberOfEntries/2 + X keys will be created at servers. i.e. X keys at server2,
// numberOfEntries/2 keys at server1.
// The client should receive a PartialResultException due to PartitionOffline
// close a server to re-run the test
closeCache(server2);
client1.invoke(new CacheSerializableRunnable(title + "client1 does putAll again") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// create keys
try {
doPutAll(regionName, title + "again:", numberOfEntries);
fail("Expect ServerOperationException caused by PutAllParitialResultException");
} catch (ServerOperationException soe) {
assertTrue(soe.getMessage()
.contains(String.format(
"Region %s putAll at server applied partial keys due to exception.",
region.getFullPath())));
}
}
});
// Test Case 2: based on case 1, but this time, there should be no X keys
// created on server2.
// restart server2
createBridgeServer(server2, regionName, serverPort2, true, 0, "ds1");
// add a cacheWriter for server to fail putAll after it created cacheWriterAllowedKeyNum keys
server1.invoke(new CacheSerializableRunnable(
title + "server1 add cachewriter to throw exception after created some keys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().setCacheWriter(new MyWriter(cacheWriterAllowedKeyNum));
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 does putAll once more") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// create keys
try {
doPutAll(regionName, title + "once more:", numberOfEntries);
fail("Expect ServerOperationException caused by PutAllParitialResultException");
} catch (ServerOperationException soe) {
assertTrue(soe.getMessage()
.contains(String.format(
"Region %s putAll at server applied partial keys due to exception.",
region.getFullPath())));
}
}
});
} finally {
server1.invoke(removeExceptionTag1(expectedExceptions));
server2.invoke(removeExceptionTag1(expectedExceptions));
client1.invoke(removeExceptionTag1(expectedExceptions));
client2.invoke(removeExceptionTag1(expectedExceptions));
// Stop server
stopBridgeServers(getCache());
}
}
/**
* Set redundancy=1 to see if retry succeeded after PRE This is a singlehop putAll test.
*/
@Test
public void testPartialKeyInPRSingleHopWithRedundency()
throws CacheException, InterruptedException {
final String title = "testPartialKeyInPRSingleHopWithRedundency_";
int client1Size;
int client2Size;
int server1Size;
int server2Size;
final Host host = Host.getHost(0);
final VM server1 = host.getVM(0);
final VM server2 = host.getVM(1);
final VM client1 = host.getVM(2);
final VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
final int serverPort1 = createBridgeServer(server1, regionName, 0, true, 1, "ds1");
final int serverPort2 = createBridgeServer(server2, regionName, 0, true, 1, "ds1");
createClient(client1, regionName, serverHost, new int[] {serverPort1, serverPort2}, -1, -1,
false, false, true, false);
createClient(client2, regionName, serverHost, new int[] {serverPort1, serverPort2}, -1, -1,
false, false, true);
server1.invoke(addExceptionTag1(expectedExceptions));
server2.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(addExceptionTag1(expectedExceptions));
client2.invoke(addExceptionTag1(expectedExceptions));
client2.invoke(new CacheSerializableRunnable(title + "client2 add listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client2 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
client1.invoke(new CacheSerializableRunnable(
title + "do some putAll to get ClientMetaData for future putAll") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, "key-", numberOfEntries);
}
});
WaitCriterion waitForSizes = new WaitCriterion() {
@Override
public String description() {
return "waiting for conditions to be met";
}
@Override
public boolean done() {
int c1Size = getRegionSize(client1, regionName);
int c2Size = getRegionSize(client2, regionName);
int s1Size = getRegionSize(server1, regionName);
int s2Size = getRegionSize(server2, regionName);
LogWriterUtils.getLogWriter()
.info("region sizes: " + c1Size + "," + c2Size + "," + s1Size + "," + s2Size);
if (c1Size != numberOfEntries) {
LogWriterUtils.getLogWriter().info("waiting for client1 to get all updates");
return false;
}
if (c2Size != numberOfEntries) {
LogWriterUtils.getLogWriter().info("waiting for client2 to get all updates");
return false;
}
if (s1Size != numberOfEntries) {
LogWriterUtils.getLogWriter().info("waiting for server1 to get all updates");
return false;
}
if (s2Size != numberOfEntries) {
LogWriterUtils.getLogWriter().info("waiting for server2 to get all updates");
return false;
}
return true;
}
};
Wait.waitForCriterion(waitForSizes, 10000, 1000, true);
client1Size = getRegionSize(client1, regionName);
client2Size = getRegionSize(client2, regionName);
server1Size = getRegionSize(server1, regionName);
server2Size = getRegionSize(server2, regionName);
server1.invoke(new CacheSerializableRunnable(title + "server1 add slow listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(true));
}
});
final SharedCounter sc_server2 = new SharedCounter("server2");
server2.invoke(new CacheSerializableRunnable(title + "server2 add slow listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator()
.addCacheListener(new MyListener(server2, true, sc_server2, 10));
}
});
AsyncInvocation async1 = client1
.invokeAsync(new CacheSerializableRunnable(title + "client1 add listener and putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
region.registerInterest("ALL_KEYS");
// create keys
doPutAll(regionName, title, numberOfEntries);
}
});
// server2 will closeCache after created 10 keys
ThreadUtils.join(async1, 30 * 1000);
if (async1.exceptionOccurred()) {
Assert.fail("Aync1 get exceptions:", async1.getException());
}
client1Size = getRegionSize(client1, regionName);
// client2Size maybe more than client1Size
client2Size = getRegionSize(client2, regionName);
server1Size = getRegionSize(server1, regionName);
// putAll should succeed after retry
LogWriterUtils.getLogWriter()
.info("region sizes: " + client1Size + "," + client2Size + "," + server1Size);
assertEquals(server1Size, client1Size);
assertEquals(server1Size, client2Size);
// restart server2
createBridgeServer(server2, regionName, serverPort2, true, 1, "ds1");
server1Size = getRegionSize(server1, regionName);
server2Size = getRegionSize(server2, regionName);
LogWriterUtils.getLogWriter().info("region sizes after server2 restarted: " + client1Size + ","
+ client2Size + "," + server1Size);
assertEquals(client2Size, server1Size);
assertEquals(client2Size, server2Size);
// close a server to re-run the test
closeCache(server2);
server1Size = getRegionSize(server1, regionName);
client1.invoke(new CacheSerializableRunnable(title + "client1 does putAll again") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, title + "again:", numberOfEntries);
}
});
int new_server1Size = getRegionSize(server1, regionName);
int new_client1Size = getRegionSize(client1, regionName);
int new_client2Size = getRegionSize(client2, regionName);
// putAll should succeed, all the numbers should match
LogWriterUtils.getLogWriter().info("region sizes after re-run the putAll: " + new_client1Size
+ "," + new_client2Size + "," + new_server1Size);
assertEquals(new_server1Size, new_client1Size);
assertEquals(new_server1Size, new_client2Size);
// Stop server
stopBridgeServers(getCache());
}
/**
* Tests bug 41403: let 2 sub maps both failed with partial key applied. This is a singlehop
* putAll test.
*/
@Test
public void testEventIdMisorderInPRSingleHop() throws CacheException, InterruptedException {
final String title = "testEventIdMisorderInPRSingleHop_";
final Host host = Host.getHost(0);
final VM server1 = host.getVM(0);
final VM server2 = host.getVM(1);
final VM server3 = host.getVM(2);
final VM client1 = host.getVM(3);
final String regionName = getUniqueName();
final int[] serverPorts = new int[3];
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
final SharedCounter sc_server1 = new SharedCounter("server1");
final SharedCounter sc_server2 = new SharedCounter("server2");
final SharedCounter sc_server3 = new SharedCounter("server3");
final SharedCounter sc_client2 = new SharedCounter("client2");
// set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
serverPorts[0] = createBridgeServer(server1, regionName, 0, true, 0, null);
serverPorts[1] = createBridgeServer(server2, regionName, 0, true, 0, null);
serverPorts[2] = createBridgeServer(server3, regionName, 0, true, 0, null);
createClient(client1, regionName, serverHost, serverPorts, -1, -1, false, true, true, false);
{
// Create local region
Properties config = new Properties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(LOCATORS, "");
getSystem(config);
// Create Region
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
try {
getCache();
ClientServerTestCase.configureConnectionPool(factory, serverHost, serverPorts, true, -1, -1,
null);
createRegion(regionName, factory.create());
assertNotNull(getRootRegion().getSubregion(regionName));
} catch (CacheException ex) {
Assert.fail("While creating Region on Edge", ex);
}
}
server1.invoke(addExceptionTag1(expectedExceptions));
server2.invoke(addExceptionTag1(expectedExceptions));
server3.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(addExceptionTag1(expectedExceptions));
addExceptionTag1(expectedExceptions);
client1.invoke(new CacheSerializableRunnable(
title + "do some putAll to get ClientMetaData for future putAll") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, "key-", numberOfEntries);
}
});
// register interest and add listener
MyListener myListener = new MyListener(false, sc_client2);
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(myListener);
region.registerInterest("ALL_KEYS");
server1.invoke(new CacheSerializableRunnable(title + "server1 add slow listener") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion().getSubregion(regionName);
r.getAttributesMutator().addCacheListener(new MyListener(server1, true, sc_server1, 10));
}
});
server2.invoke(new CacheSerializableRunnable(title + "server2 add slow listener") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion().getSubregion(regionName);
r.getAttributesMutator().addCacheListener(new MyListener(server2, true, sc_server2, 10));
}
});
server3.invoke(new CacheSerializableRunnable(title + "server3 add slow listener") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion().getSubregion(regionName);
r.getAttributesMutator().addCacheListener(new MyListener(true, sc_server3));
}
});
int client1Size = getRegionSize(client1, regionName);
int server1Size = getRegionSize(server1, regionName);
int server2Size = getRegionSize(server2, regionName);
int server3Size = getRegionSize(server2, regionName);
LogWriterUtils.getLogWriter().info(
"region sizes: " + client1Size + "," + server1Size + "," + server2Size + "," + server3Size);
AsyncInvocation async1 = client1
.invokeAsync(new CacheSerializableRunnable(title + "client1 add listener and putAll") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion().getSubregion(regionName);
r.getAttributesMutator().addCacheListener(new MyListener(false));
doPutAll(regionName, title, numberOfEntries);
}
});
// server1 and server2 will closeCache after created 10 keys
ThreadUtils.join(async1, 30 * 1000);
if (async1.exceptionOccurred()) {
Assert.fail("Aync1 get exceptions:", async1.getException());
}
server3.invoke(new CacheSerializableRunnable(title + "server3 print counter") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion().getSubregion(regionName);
MyListener l = (MyListener) r.getAttributes().getCacheListeners()[0];
LogWriterUtils.getLogWriter().info("event counters : " + l.sc);
assertEquals(numberOfEntries, l.sc.num_create_event);
assertEquals(0, l.sc.num_update_event);
}
});
LogWriterUtils.getLogWriter().info("event counters before wait : " + myListener.sc);
await()
.untilAsserted(() -> assertEquals(numberOfEntries, myListener.sc.num_create_event));
LogWriterUtils.getLogWriter().info("event counters after wait : " + myListener.sc);
assertEquals(0, myListener.sc.num_update_event);
server1.invoke(removeExceptionTag1(expectedExceptions));
server2.invoke(removeExceptionTag1(expectedExceptions));
server3.invoke(removeExceptionTag1(expectedExceptions));
client1.invoke(removeExceptionTag1(expectedExceptions));
removeExceptionTag1(expectedExceptions);
// Stop server
stopBridgeServers(getCache());
}
/**
* Tests while putAll to 2 distributed servers, one server failed over Add a listener to slow down
* the processing of putAll
*/
@Test
public void test2FailOverDistributedServer() throws CacheException, InterruptedException {
IgnoredException.addIgnoredException("Broken pipe");
IgnoredException.addIgnoredException("Connection reset");
IgnoredException.addIgnoredException("Unexpected IOException");
final String title = "test2FailOverDistributedServer:";
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set notifyBySubscription=true to test register interest
final int serverPort1 = createBridgeServer(server1, regionName, 0, false, 0, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, false, 0, null);
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1, serverPort2}, -1,
-1, true);
createBridgeClient(client2, regionName, serverHost, new int[] {serverPort2, serverPort1}, -1,
-1, true);
server1.invoke(new CacheSerializableRunnable(title + "server1 add slow listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(true));
}
});
server2.invoke(new CacheSerializableRunnable(title + "server2 add slow listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(true));
}
});
client1.invoke(new CacheSerializableRunnable(title + "client1 registerInterest") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
// registerInterest for ALL_KEYS
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client1 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 registerInterest") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
// registerInterest for ALL_KEYS
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client2 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
// Execute client putAll from multithread client
AsyncInvocation async1 =
client1.invokeAsync(new CacheSerializableRunnable(title + "async putAll1 from client1") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, "async1key-", numberOfEntries);
}
});
Wait.pause(2000);
server1.invoke(new CacheSerializableRunnable(title + "stop cache server 1") {
@Override
public void run2() throws CacheException {
stopOneBridgeServer(serverPort1);
}
});
ThreadUtils.join(async1, 30 * 1000);
// verify cache server 2 for asyn keys
server2.invoke(new CacheSerializableRunnable(title + "verify cache server 2 for async keys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
long ts1 = 0;
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("async1key-" + i).getValue();
assertEquals(i, obj.getPrice());
assertTrue(obj.getTS() >= ts1);
ts1 = obj.getTS();
}
}
});
// Stop server
stopBridgeServers(getCache());
}
/**
* Tests while putAll timeout's exception
*/
@Test
public void testClientTimeOut() throws CacheException, InterruptedException {
final String title = "testClientTimeOut:";
disconnectAllFromDS();
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set notifyBySubscription=true to test register interest
final int serverPort1 = createBridgeServer(server1, regionName, 0, false, 0, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, false, 0, null);
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1, serverPort2}, -1,
-1, true);
createBridgeClient(client2, regionName, serverHost, new int[] {serverPort2, serverPort1}, -1,
-1, true);
server1.invoke(new CacheSerializableRunnable(title + "server1 add slow listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(true));
}
});
server2.invoke(new CacheSerializableRunnable(title + "server2 add slow listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(true));
}
});
// Execute client putAll
client1.invoke(new CacheSerializableRunnable(title + "client1 execute putAll") {
@Override
public void run2() throws CacheException {
boolean exceptionTriggered = false;
try {
doPutAll(regionName, "key-", thousandEntries);
} catch (Exception e) {
LogWriterUtils.getLogWriter().info(title + "Expected SocketTimeOut:" + e.getMessage());
exceptionTriggered = true;
}
assertTrue(exceptionTriggered);
}
});
// clean up
// Stop server
stopBridgeServers(getCache());
}
/**
* Tests while putAll timeout at endpoint1 and switch to endpoint2
*/
@Test
public void testEndPointSwitch() throws CacheException, InterruptedException {
IgnoredException.addIgnoredException("Broken pipe");
IgnoredException.addIgnoredException("Connection reset");
IgnoredException.addIgnoredException("Unexpected IOException");
final String title = "testEndPointSwitch:";
disconnectAllFromDS();
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set notifyBySubscription=true to test register interest
final int serverPort1 = createBridgeServer(server1, regionName, 0, false, 0, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, false, 0, null);
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1, serverPort2}, 1, -1,
true);
createClient(client2, regionName, serverHost, new int[] {serverPort2, serverPort1}, 1, -1,
false, true, true);
// only add slow listener to server1, because we wish it to succeed
server1.invoke(new CacheSerializableRunnable(title + "server1 add slow listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(true));
}
});
// only register interest on client2
client2.invoke(new CacheSerializableRunnable(title + "client2 registerInterest") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client2 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
// Execute client1 putAll
client1.invoke(new CacheSerializableRunnable(title + "putAll from client1") {
@Override
public void run2() throws CacheException {
try {
doPutAll(regionName, title, testEndPointSwitchNumber);
} catch (Exception e) {
LogWriterUtils.getLogWriter().info(title + "Expected SocketTimeOut" + e.getMessage());
}
}
});
// verify client 2 for all keys
client2.invoke(
new CacheSerializableRunnable(title + "verify Bridge client2 for keys arrived finally") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
waitTillNotify(lockObject3, 100000, (region.size() == testEndPointSwitchNumber));
assertEquals(testEndPointSwitchNumber, region.size());
}
});
// clean up
// Stop server
stopBridgeServers(getCache());
}
/**
* Tests while putAll to 2 distributed servers, one server failed over Add a listener to slow down
* the processing of putAll
*/
@Test
public void testHADRFailOver() throws CacheException, InterruptedException {
final String title = "testHADRFailOver:";
disconnectAllFromDS();
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set notifyBySubscription=true to test register interest
final int serverPort1 = createBridgeServer(server1, regionName, 0, false, 0, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, false, 0, null);
// set queueRedundency=1
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1, serverPort2}, 1, -1,
true);
createClient(client2, regionName, serverHost, new int[] {serverPort2, serverPort1}, 1, -1,
false, true, true);
server1.invoke(addExceptionTag1(expectedExceptions));
server2.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(addExceptionTag1(expectedExceptions));
client2.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(new CacheSerializableRunnable(title + "client1 registerInterest") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client1 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 registerInterest") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(false));
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client2 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
// Server2 do a putAll to use HARegionQueues
AsyncInvocation async1 =
server2.invokeAsync(new CacheSerializableRunnable(title + "async putAll1 from server2") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, title + "1:", thousandEntries);
}
});
// client1 do a putAll to start HARegionQueues
AsyncInvocation async2 =
client1.invokeAsync(new CacheSerializableRunnable(title + "async putAll1 from client1") {
@Override
public void run2() throws CacheException {
doPutAll(regionName, title + "2:", thousandEntries);
}
});
Wait.pause(2000);
server1.invoke(new CacheSerializableRunnable(title + "stop cache server 1") {
@Override
public void run2() throws CacheException {
stopOneBridgeServer(serverPort1);
}
});
ThreadUtils.join(async1, 30 * 1000);
ThreadUtils.join(async2, 30 * 1000);
// verify client 2 for asyn keys
client2
.invokeAsync(new CacheSerializableRunnable(title + "verify Bridge client2 for async keys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
waitTillNotify(lockObject4, 100000, (region.size() == thousandEntries * 2));
assertEquals(thousandEntries * 2, region.size());
}
});
server1.invoke(removeExceptionTag1(expectedExceptions));
server2.invoke(removeExceptionTag1(expectedExceptions));
client1.invoke(removeExceptionTag1(expectedExceptions));
client2.invoke(removeExceptionTag1(expectedExceptions));
// clean up
// Stop server
stopBridgeServers(getCache());
}
/**
* Test TX for putAll. There's no TX for c/s. We only test P2P This is disabled because putAll in
* TX is disabled.
*/
@Ignore("TODO: test is disabled")
@Test
public void testTX() throws CacheException, InterruptedException {
final String title = "testTX:";
disconnectAllFromDS();
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
final String regionName = getUniqueName();
final int serverPort1 = createBridgeServer(server1, regionName, 0, false, 0, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, false, 0, null);
// final String serverHost = getServerHostName(server1.getHost());
// set notifyBySubscription=true to test register interest
// add slow listener
server1.invoke(new CacheSerializableRunnable(title + "server1 add slow listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(true));
}
});
server2.invoke(new CacheSerializableRunnable(title + "server2 add slow listener") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator().addCacheListener(new MyListener(true));
}
});
// TX1: server1 do a putAll
AsyncInvocation async1 = server1
.invokeAsync(new CacheSerializableRunnable(title + "TX1: async putAll from server1") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// Get JNDI(Java Naming and Directory interface) context
// CacheTransactionManager tx = getCache().getCacheTransactionManager();
LinkedHashMap map = new LinkedHashMap();
// tx.begin();
for (int i = 0; i < numberOfEntries; i++) {
// region.put("key-"+i, new TestObject(i));
map.put("key-" + i, new TestObject(i));
}
region.putAll(map, "putAllCallback");
try {
LogWriterUtils.getLogWriter().info("before commit TX1");
// tx.commit();
LogWriterUtils.getLogWriter().info("TX1 committed");
} catch (CommitConflictException e) {
LogWriterUtils.getLogWriter().info("TX1 rollbacked");
}
}
});
// we have to pause a while to let TX1 finish earlier
Wait.pause(500);
// TX2: server2 do a putAll
AsyncInvocation async2 = server2
.invokeAsync(new CacheSerializableRunnable(title + "TX2: async putAll from server2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// Get JNDI(Java Naming and Directory interface) context
// CacheTransactionManager tx = getCache().getCacheTransactionManager();
LinkedHashMap map = new LinkedHashMap();
// tx.begin();
for (int i = 0; i < numberOfEntries; i++) {
// region.put("key-"+i, new TestObject(i + numberOfEntries));
map.put("key-" + i, new TestObject(i + numberOfEntries));
}
region.putAll(map, "putAllCallback");
try {
LogWriterUtils.getLogWriter().info("before commit TX2");
// tx.commit();
LogWriterUtils.getLogWriter().info("TX2 committed");
} catch (CommitConflictException e) {
LogWriterUtils.getLogWriter().info("TX2 rollbacked");
}
}
});
// TX3: server2 do a putAll in another thread
AsyncInvocation async3 = server2
.invokeAsync(new CacheSerializableRunnable(title + "TX3: async putAll from server2") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
// Get JNDI(Java Naming and Directory interface) context
// CacheTransactionManager tx = getCache().getCacheTransactionManager();
LinkedHashMap map = new LinkedHashMap();
// Can't do tx with putall anymore
// tx.begin();
for (int i = 0; i < numberOfEntries; i++) {
// region.put("key-"+i, new TestObject(i+numberOfEntries*2));
map.put("key-" + i, new TestObject(i + numberOfEntries * 2));
}
region.putAll(map, "putAllCallback");
try {
LogWriterUtils.getLogWriter().info("before commit TX3");
// tx.commit();
LogWriterUtils.getLogWriter().info("TX3 committed");
} catch (CommitConflictException e) {
LogWriterUtils.getLogWriter().info("TX3 rollbacked");
}
}
});
ThreadUtils.join(async1, 30 * 1000);
ThreadUtils.join(async2, 30 * 1000);
ThreadUtils.join(async3, 30 * 1000);
// verify server 2 for asyn keys
server2.invoke(new CacheSerializableRunnable(title + "verify cache server2 for keys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries, region.size());
int tx_no = 0;
for (int i = 0; i < numberOfEntries; i++) {
TestObject obj = (TestObject) region.getEntry("key-" + i).getValue();
if (tx_no == 0) {
// only check which tx took control once
if (obj.getPrice() == i) {
tx_no = 1;
} else if (obj.getPrice() == i + numberOfEntries) {
tx_no = 2;
} else if (obj.getPrice() == i + numberOfEntries * 2) {
tx_no = 3;
}
LogWriterUtils.getLogWriter().info("Verifying TX:" + tx_no);
}
if (tx_no == 1) {
assertEquals(i, obj.getPrice());
} else if (tx_no == 2) {
assertEquals(i + numberOfEntries, obj.getPrice());
} else {
assertEquals(i + numberOfEntries * 2, obj.getPrice());
}
}
}
});
// clean up
// Stop server
stopBridgeServers(getCache());
}
@Test
public void testVersionsOnClientsWithNotificationsOnly() {
final String title = "testVersionsInClients";
disconnectAllFromDS();
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set notifyBySubscription=true to test register interest
final int serverPort1 = createBridgeServer(server1, regionName, 0, true, 0, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, true, 0, null);
// set queueRedundency=1
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1}, 0, 59000, true);
createBridgeClient(client2, regionName, serverHost, new int[] {serverPort2}, 0, 59000, true);
server1.invoke(addExceptionTag1(expectedExceptions));
server2.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(addExceptionTag1(expectedExceptions));
client2.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(new CacheSerializableRunnable(title + "client1 putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
doPutAll(regionName, "key-", numberOfEntries * 2);
assertEquals(numberOfEntries * 2, region.size());
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 versions collection") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client2 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
client1Versions = (List<VersionTag>) client1
.invoke(new SerializableCallable(title + "client1 versions collection") {
@Override
public Object call() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries * 2, region.size());
List<VersionTag> versions = new ArrayList<VersionTag>(numberOfEntries * 2);
RegionMap entries = ((LocalRegion) region).entries;
for (Object key : entries.keySet()) {
RegionEntry internalRegionEntry = entries.getEntry(key);
VersionTag tag = internalRegionEntry.getVersionStamp().asVersionTag();
LogWriterUtils.getLogWriter()
.info("Entry version tag on client for " + key + ": " + tag);
versions.add(tag);
}
return versions;
}
});
client2Versions = (List<VersionTag>) client2
.invoke(new SerializableCallable(title + "client2 versions collection") {
@Override
public Object call() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries * 2, region.size());
List<VersionTag> versions = new ArrayList<VersionTag>(numberOfEntries * 2);
RegionMap entries = ((LocalRegion) region).entries;
for (Object key : entries.keySet()) {
RegionEntry internalRegionEntry = entries.getEntry(key);
VersionTag tag = internalRegionEntry.getVersionStamp().asVersionTag();
LogWriterUtils.getLogWriter()
.info("Entry version tag on client for " + key + ": " + tag);
versions.add(tag);
}
return versions;
}
});
assertEquals(numberOfEntries * 2, client1Versions.size());
LogWriterUtils.getLogWriter().info(Arrays.toString(client1Versions.toArray()));
LogWriterUtils.getLogWriter().info(Arrays.toString(client2Versions.toArray()));
for (VersionTag tag : client1Versions) {
if (!client2Versions.contains(tag)) {
fail("client 2 does not have the tag contained in client 1" + tag);
}
}
}
/**
* basically same test as testVersionsOnClientsWithNotificationsOnly but also do a removeAll
*/
@Test
public void testRAVersionsOnClientsWithNotificationsOnly() {
final String title = "testRAVersionsInClients";
disconnectAllFromDS();
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set notifyBySubscription=true to test register interest
final int serverPort1 = createBridgeServer(server1, regionName, 0, true, 0, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, true, 0, null);
// set queueRedundency=1
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1}, 0, 59000, true);
createBridgeClient(client2, regionName, serverHost, new int[] {serverPort2}, 0, 59000, true);
server1.invoke(addExceptionTag1(expectedExceptions));
server2.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(addExceptionTag1(expectedExceptions));
client2.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(new CacheSerializableRunnable(title + "client1 putAll+removeAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
doPutAll(regionName, "key-", numberOfEntries * 2);
assertEquals(numberOfEntries * 2, region.size());
doRemoveAll(regionName, "key-", numberOfEntries);
assertEquals(numberOfEntries, region.size());
}
});
client2.invoke(new CacheSerializableRunnable(title + "client2 versions collection") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.registerInterest("ALL_KEYS");
assertEquals(numberOfEntries, region.size());
LogWriterUtils.getLogWriter()
.info("client2 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
client1RAVersions = (List<VersionTag>) client1
.invoke(new SerializableCallable(title + "client1 versions collection") {
@Override
public Object call() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries, region.size());
List<VersionTag> versions = new ArrayList<VersionTag>(numberOfEntries * 2);
RegionMap entries = ((LocalRegion) region).entries;
assertEquals(numberOfEntries * 2, entries.size());
for (Object key : entries.keySet()) {
RegionEntry internalRegionEntry = entries.getEntry(key);
VersionTag tag = internalRegionEntry.getVersionStamp().asVersionTag();
LogWriterUtils.getLogWriter()
.info("Entry version tag on client for " + key + ": " + tag);
versions.add(tag);
}
return versions;
}
});
client2RAVersions = (List<VersionTag>) client2
.invoke(new SerializableCallable(title + "client2 versions collection") {
@Override
public Object call() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries, region.size());
List<VersionTag> versions = new ArrayList<VersionTag>(numberOfEntries * 2);
RegionMap entries = ((LocalRegion) region).entries;
assertEquals(numberOfEntries * 2, entries.size());
for (Object key : entries.keySet()) {
RegionEntry internalRegionEntry = entries.getEntry(key);
VersionTag tag = internalRegionEntry.getVersionStamp().asVersionTag();
LogWriterUtils.getLogWriter()
.info("Entry version tag on client for " + key + ": " + tag);
versions.add(tag);
}
return versions;
}
});
assertEquals(numberOfEntries * 2, client1RAVersions.size());
LogWriterUtils.getLogWriter().info(Arrays.toString(client1RAVersions.toArray()));
LogWriterUtils.getLogWriter().info(Arrays.toString(client2RAVersions.toArray()));
for (VersionTag tag : client1RAVersions) {
if (!client2RAVersions.contains(tag)) {
fail("client 2 does not have the tag contained in client 1" + tag);
}
}
}
@Test
public void testVersionsOnServersWithNotificationsOnly() {
final String title = "testVersionsOnServers";
disconnectAllFromDS();
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM server3 = host.getVM(2);
VM client1 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set notifyBySubscription=true to test register interest
final int serverPort1 = createBridgeServer(server1, regionName, 0, true, 1, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, true, 1, null);
final int serverPort3 = createBridgeServer(server3, regionName, 0, true, 1, null);
// set queueRedundency=1
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort3}, 0, 59000, true);
server1.invoke(addExceptionTag1(expectedExceptions));
server2.invoke(addExceptionTag1(expectedExceptions));
server3.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(new CacheSerializableRunnable(title + "client2 versions collection") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client2 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
server1.invoke(new CacheSerializableRunnable(title + "client1 putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
doPutAll(regionName, "key-", numberOfEntries * 2);
assertEquals(numberOfEntries * 2, region.size());
}
});
expectedVersions = (List<String>) server1
.invoke(new SerializableCallable(title + "server1 versions collection") {
@Override
public Object call() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries * 2, region.size());
List<String> versions = new ArrayList<String>(numberOfEntries * 2);
Set<BucketRegion> buckets =
((PartitionedRegion) region).dataStore.getAllLocalPrimaryBucketRegions();
for (BucketRegion br : buckets) {
RegionMap entries = br.entries;
for (Object key : entries.keySet()) {
RegionEntry internalRegionEntry = entries.getEntry(key);
VersionTag tag = internalRegionEntry.getVersionStamp().asVersionTag();
LogWriterUtils.getLogWriter().info("Entry version tag on server1:" + tag);
versions.add(key + " " + tag);
}
}
return versions;
}
});
// Let client be updated with all keys.
Wait.pause(1000);
actualVersions = (List<String>) client1
.invoke(new SerializableCallable(title + "client2 versions collection") {
@Override
public Object call() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries * 2, region.size());
List<String> versions = new ArrayList<String>(numberOfEntries * 2);
RegionMap entries = ((LocalRegion) region).entries;
for (Object key : entries.keySet()) {
RegionEntry internalRegionEntry = entries.getEntry(key);
VersionTag tag = internalRegionEntry.getVersionStamp().asVersionTag();
tag.setMemberID(null);
versions.add(key + " " + tag);
}
return versions;
}
});
LogWriterUtils.getLogWriter().info(Arrays.toString(expectedVersions.toArray()));
assertEquals(numberOfEntries * 2, actualVersions.size());
LogWriterUtils.getLogWriter().info(Arrays.toString(actualVersions.toArray()));
for (String keyTag : expectedVersions) {
if (!actualVersions.contains(keyTag)) {
fail("client 2 does not have the tag contained in client 1" + keyTag);
}
}
}
/**
* Same test as testVersionsOnServersWithNotificationsOnly but also does a removeAll
*/
@Test
public void testRAVersionsOnServersWithNotificationsOnly() {
final String title = "testRAVersionsOnServers";
disconnectAllFromDS();
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM server3 = host.getVM(2);
VM client1 = host.getVM(3);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set notifyBySubscription=true to test register interest
final int serverPort1 = createBridgeServer(server1, regionName, 0, true, 1, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, true, 1, null);
final int serverPort3 = createBridgeServer(server3, regionName, 0, true, 1, null);
// set queueRedundency=1
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort3}, 0, 59000, true);
server1.invoke(addExceptionTag1(expectedExceptions));
server2.invoke(addExceptionTag1(expectedExceptions));
server3.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(new CacheSerializableRunnable(title + "client2 versions collection") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
region.registerInterest("ALL_KEYS");
LogWriterUtils.getLogWriter()
.info("client2 registerInterest ALL_KEYS at " + region.getFullPath());
}
});
server1.invoke(new CacheSerializableRunnable(title + "client1 putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
doPutAll(regionName, "key-", numberOfEntries * 2);
assertEquals(numberOfEntries * 2, region.size());
doRemoveAll(regionName, "key-", numberOfEntries);
assertEquals(numberOfEntries, region.size());
}
});
expectedRAVersions = (List<String>) server1
.invoke(new SerializableCallable(title + "server1 versions collection") {
@Override
public Object call() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries, region.size());
List<String> versions = new ArrayList<String>(numberOfEntries * 2);
Set<BucketRegion> buckets =
((PartitionedRegion) region).dataStore.getAllLocalPrimaryBucketRegions();
for (BucketRegion br : buckets) {
RegionMap entries = br.entries;
for (Object key : entries.keySet()) {
RegionEntry internalRegionEntry = entries.getEntry(key);
VersionTag tag = internalRegionEntry.getVersionStamp().asVersionTag();
LogWriterUtils.getLogWriter().info("Entry version tag on server1:" + tag);
versions.add(key + " " + tag);
}
}
return versions;
}
});
// Let client be updated with all keys.
Wait.pause(1000);
actualRAVersions = (List<String>) client1
.invoke(new SerializableCallable(title + "client2 versions collection") {
@Override
public Object call() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries, region.size());
List<String> versions = new ArrayList<String>(numberOfEntries * 2);
RegionMap entries = ((LocalRegion) region).entries;
assertEquals(numberOfEntries * 2, entries.size());
for (Object key : entries.keySet()) {
RegionEntry internalRegionEntry = entries.getEntry(key);
VersionTag tag = internalRegionEntry.getVersionStamp().asVersionTag();
tag.setMemberID(null);
versions.add(key + " " + tag);
}
return versions;
}
});
LogWriterUtils.getLogWriter().info(Arrays.toString(expectedRAVersions.toArray()));
assertEquals(numberOfEntries * 2, actualRAVersions.size());
LogWriterUtils.getLogWriter().info(Arrays.toString(actualRAVersions.toArray()));
for (String keyTag : expectedRAVersions) {
if (!actualRAVersions.contains(keyTag)) {
fail("client 2 does not have the tag contained in client 1" + keyTag);
}
}
}
@Test
public void testVersionsOnReplicasAfterPutAllAndRemoveAll() {
final String title = "testVersionsOnReplicas";
client1Versions = null;
client2Versions = null;
disconnectAllFromDS();
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
final String regionName = getUniqueName();
final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
// set notifyBySubscription=true to test register interest
final int serverPort1 = createBridgeServer(server1, regionName, 0, false, 0, null);
final int serverPort2 = createBridgeServer(server2, regionName, 0, false, 0, null);
// set queueRedundency=1
createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1}, 0, 59000, true);
server1.invoke(addExceptionTag1(expectedExceptions));
server2.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(addExceptionTag1(expectedExceptions));
client1.invoke(new CacheSerializableRunnable(title + "client1 putAll") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
doPutAll(regionName, "key-", numberOfEntries * 2);
assertEquals(numberOfEntries * 2, region.size());
doRemoveAll(regionName, "key-", numberOfEntries);
assertEquals(numberOfEntries, region.size());
}
});
client1Versions = (List<VersionTag>) server1
.invoke(new SerializableCallable(title + "client1 versions collection") {
@Override
public Object call() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries, region.size());
List<VersionTag> versions = new ArrayList<VersionTag>(numberOfEntries * 2);
RegionMap entries = ((LocalRegion) region).entries;
assertEquals(numberOfEntries * 2, entries.size());
for (Object key : entries.keySet()) {
RegionEntry internalRegionEntry = entries.getEntry(key);
VersionTag tag = internalRegionEntry.getVersionStamp().asVersionTag();
LogWriterUtils.getLogWriter().info("Entry version tag on client:" + tag);
versions.add(tag);
}
return versions;
}
});
client2Versions = (List<VersionTag>) server2
.invoke(new SerializableCallable(title + "client2 versions collection") {
@Override
public Object call() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
assertEquals(numberOfEntries, region.size());
List<VersionTag> versions = new ArrayList<VersionTag>(numberOfEntries * 2);
RegionMap entries = ((LocalRegion) region).entries;
assertEquals(numberOfEntries * 2, entries.size());
for (Object key : entries.keySet()) {
RegionEntry internalRegionEntry = entries.getEntry(key);
versions.add(internalRegionEntry.getVersionStamp().asVersionTag());
}
return versions;
}
});
assertEquals(numberOfEntries * 2, client1Versions.size());
LogWriterUtils.getLogWriter().info(Arrays.toString(client1Versions.toArray()));
LogWriterUtils.getLogWriter().info(Arrays.toString(client2Versions.toArray()));
for (VersionTag tag : client2Versions) {
tag.setMemberID(null);
if (!client1Versions.contains(tag)) {
fail("client 2 have the tag NOT contained in client 1" + tag);
}
}
}
private int createBridgeServer(VM server, final String regionName, final int serverPort,
final boolean createPR, final int redundantCopies, final String diskStoreName) {
return (Integer) server.invoke(new SerializableCallable("Create server") {
@Override
@SuppressWarnings("synthetic-access")
public Object call() throws Exception {
// Create DS
Properties config = new Properties();
config.setProperty(LOCATORS,
"localhost[" + DistributedTestUtils.getDUnitLocatorPort() + "]");
getSystem(config);
// Create Region
AttributesFactory factory = new AttributesFactory();
Cache cache = getCache();
// enable concurrency checks (not for disk now - disk doesn't support versions yet)
if (diskStoreName == null) {
factory.setConcurrencyChecksEnabled(true);
}
// create diskStore if required
if (diskStoreName != null) {
DiskStore ds = cache.findDiskStore(diskStoreName);
if (ds == null) {
ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
}
}
/*
* In this test, no cacheLoader should be defined, otherwise, it will create a value for
* destroyed key factory.setCacheLoader(new CacheServerCacheLoader());
*/
if (createPR) {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(redundantCopies);
paf.setTotalNumBuckets(TOTAL_BUCKETS);
factory.setPartitionAttributes(paf.create());
if (diskStoreName != null) {
factory.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
factory.setDiskStoreName(diskStoreName);
} else {
factory.setDataPolicy(DataPolicy.PARTITION);
}
} else {
factory.setScope(Scope.DISTRIBUTED_ACK);
if (diskStoreName != null) {
factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
factory.setDiskStoreName(diskStoreName);
} else {
factory.setDataPolicy(DataPolicy.REPLICATE);
}
}
createRootRegion(new AttributesFactory().create());
Region region = createRegion(regionName, factory.create());
if (createPR) {
assertTrue(region instanceof PartitionedRegion);
} else {
assertTrue(region instanceof DistributedRegion);
}
int retPort = startBridgeServer(serverPort);
LogWriterUtils.getLogWriter().info("Cache Server Started:" + retPort + ":" + serverPort);
return retPort;
}
});
}
private void createClient(VM client, String regionName, String serverHost, int[] serverPorts,
int redundancy, int readTimeOut, boolean receiveInvalidates, boolean concurrencyChecks,
boolean enableSingleHop) {
createClient(client, regionName, serverHost, serverPorts, redundancy, readTimeOut,
receiveInvalidates, concurrencyChecks, enableSingleHop, true /* subscriptionEnabled */);
}
private void createClient(VM client, final String regionName, final String serverHost,
final int[] serverPorts, final int redundency, final int readTimeOut,
final boolean receiveInvalidates, final boolean concurrencyChecks,
final boolean enableSingleHop, final boolean subscriptionEnabled) {
client.invoke(new CacheSerializableRunnable("Create client") {
@Override
public void run2() throws CacheException {
// Create DS
Properties config = new Properties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(LOCATORS, "");
getSystem(config);
// Create Region
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
if (concurrencyChecks) {
factory.setConcurrencyChecksEnabled(true);
}
try {
getCache();
IgnoredException
.addIgnoredException("java.net.ConnectException||java.net.SocketException");
if (readTimeOut > 0) {
PoolFactory pf = PoolManager.createFactory();
for (int i = 0; i < serverPorts.length; i++) {
pf.addServer(serverHost, serverPorts[i]);
}
pf.setReadTimeout(readTimeOut).setSubscriptionEnabled(subscriptionEnabled)
.setPRSingleHopEnabled(enableSingleHop).create("myPool");
factory.setPoolName("myPool");
} else {
ClientServerTestCase.configureConnectionPool(factory, serverHost, serverPorts, true,
redundency, -1, null);
}
Region r = createRegion(regionName, factory.create());
if (receiveInvalidates) {
r.registerInterestRegex(".*", false, false);
}
assertNotNull(getRootRegion().getSubregion(regionName));
} catch (CacheException ex) {
Assert.fail("While creating Region on Edge", ex);
}
}
});
}
private void createBridgeClient(VM client, final String regionName, final String serverHost,
final int[] serverPorts, final int redundency, final int readTimeOut,
boolean enableSingleHop) {
createClient(client, regionName, serverHost, serverPorts, redundency, readTimeOut, true, true,
enableSingleHop);
}
protected Region doPutAll(String regionName, String keyStub, int numEntries) {
Region region = getRootRegion().getSubregion(regionName);
LinkedHashMap map = new LinkedHashMap();
for (int i = 0; i < numEntries; i++) {
map.put(keyStub + i, new TestObject(i));
}
region.putAll(map, "putAllCallback");
return region;
}
protected VersionedObjectList doRemoveAll(String regionName, String keyStub, int numEntries) {
Region region = getRootRegion().getSubregion(regionName);
ArrayList<String> keys = new ArrayList<String>();
for (int i = 0; i < numEntries; i++) {
keys.add(keyStub + i);
}
// region.removeAll(keys, "removeAllCallback");
LocalRegion lr = (LocalRegion) region;
final EntryEventImpl event = EntryEventImpl.create(lr, Operation.REMOVEALL_DESTROY, null, null,
"removeAllCallback", false, lr.getMyId());
event.disallowOffHeapValues();
DistributedRemoveAllOperation removeAllOp =
new DistributedRemoveAllOperation(event, keys.size(), false);
VersionedObjectList versions = lr.basicRemoveAll((Collection) keys, removeAllOp, null);
return versions;
}
public static void waitTillNotify(Object lock_object, int waitTime, boolean ready) {
synchronized (lock_object) {
if (!ready) {
try {
long then = System.currentTimeMillis();
lock_object.wait(waitTime);
long now = System.currentTimeMillis();
if (now - then > waitTime) {
fail("Did not receive expected events");
}
} catch (InterruptedException e) {
fail("interrupted", e);
}
}
}
}
/**
* Stops the cache server specified by port
*/
public void stopOneBridgeServer(int port) {
CacheServer bridge = null;
boolean foundServer = false;
for (Iterator bsI = getCache().getCacheServers().iterator(); bsI.hasNext();) {
bridge = (CacheServer) bsI.next();
if (bridge.getPort() == port) {
bridge.stop();
assertFalse(bridge.isRunning());
foundServer = true;
break;
}
}
assertTrue(foundServer);
}
protected void closeCache(VM vm0) {
SerializableRunnable close = new SerializableRunnable() {
@Override
public void run() {
Cache cache = getCache();
cache.close();
}
};
vm0.invoke(close);
}
protected void closeCacheAsync(VM vm0) {
SerializableRunnable close = new SerializableRunnable() {
@Override
public void run() {
Cache cache = getCache();
cache.close();
}
};
vm0.invokeAsync(close);
}
protected int getRegionSize(VM vm, final String regionName) {
SerializableCallable getRegionSize = new SerializableCallable("get region size") {
@Override
public Object call() throws Exception {
Region region = getRootRegion().getSubregion(regionName);
return new Integer(region.size());
}
};
return (Integer) (vm.invoke(getRegionSize));
}
public static class TestObject implements DataSerializable {
protected String _ticker;
protected int _price;
protected long _ts = System.currentTimeMillis();
public TestObject() {}
public TestObject(int price) {
this._price = price;
this._ticker = Integer.toString(price);
}
public TestObject(String ticker, int price) {
this._ticker = ticker;
this._price = price;
}
public String getTicker() {
return this._ticker;
}
public int getPrice() {
return this._price;
}
public long getTS() {
return this._ts;
}
@Override
public void toData(DataOutput out) throws IOException {
// System.out.println("Is serializing in WAN: " +
// GatewayEventImpl.isSerializingValue());
DataSerializer.writeString(this._ticker, out);
out.writeInt(this._price);
out.writeLong(this._ts);
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
// System.out.println("Is deserializing in WAN: " +
// GatewayEventImpl.isDeserializingValue());
this._ticker = DataSerializer.readString(in);
this._price = in.readInt();
this._ts = in.readLong();
}
public boolean equals(TestObject o) {
if (this._price == o._price && this._ticker.equals(o._ticker))
return true;
else
return false;
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("Price=" + this._price);
return sb.toString();
}
}
static class EOCQEventListener implements CqListener {
Region localregion;
public int num_creates;
public int num_updates;
public EOCQEventListener(Region region) {
localregion = region;
}
@Override
public void onError(CqEvent cqEvent) {}
@Override
public void onEvent(CqEvent cqEvent) {
if (cqEvent.getQueryOperation() == Operation.DESTROY)
return;
Object key = cqEvent.getKey();
Object newValue = cqEvent.getNewValue();
if (newValue == null) {
localregion.create(key, newValue);
num_creates++;
} else {
localregion.put(key, newValue);
num_updates++;
}
LogWriterUtils.getLogWriter().info("CQListener:TestObject:" + key + ":" + newValue);
}
@Override
public void close() {}
}
static class SharedCounter implements Serializable {
public String owner;
public int num_create_event;
public int num_update_event;
public int num_invalidate_event;
public int num_destroy_event;
public SharedCounter(String owner) {
this.owner = owner;
num_create_event = 0;
num_update_event = 0;
num_invalidate_event = 0;
num_destroy_event = 0;
}
@Override
public String toString() {
String str = "Owner=" + owner + ",create=" + num_create_event + ",update=" + num_update_event
+ ",invalidate=" + num_invalidate_event + ",destroy=" + num_destroy_event;
return str;
}
}
class MyListener extends CacheListenerAdapter implements Declarable {
boolean delay = false;
public int num_testEndPointSwitch;
public int num_testHADRFailOver;
public int num_oldValueInAfterUpdate;
public SharedCounter sc;
public VM vm;
public int closeCacheAtItem = -1;
public MyListener(boolean delay) {
this.delay = delay;
this.sc = new SharedCounter("dummy");
}
public MyListener(boolean delay, SharedCounter sc) {
this.delay = delay;
this.sc = sc;
}
public MyListener(VM vm, boolean delay, SharedCounter sc, int closeCacheAtItem) {
this.vm = vm;
this.delay = delay;
this.sc = sc;
this.closeCacheAtItem = closeCacheAtItem;
}
@Override
public void init(Properties props) {}
@Override
public void afterCreate(EntryEvent event) {
sc.num_create_event++;
if (closeCacheAtItem != -1 && sc.num_create_event >= closeCacheAtItem) {
closeCacheAsync(vm);
}
LogWriterUtils.getLogWriter()
.fine("MyListener:afterCreate " + event.getKey() + ":" + event.getNewValue()
+ ":num_create_event=" + sc.num_create_event + ":eventID="
+ ((EntryEventImpl) event).getEventId());
if (event.getOperation().isPutAll()) {
assertEquals("putAllCallback", event.getCallbackArgument());
}
if (delay) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// this can happen in this test because there are asynchronous
// operations being performed during shutdown
Thread.currentThread().interrupt();
}
return;
}
if (event.getKey().toString().startsWith("testEndPointSwitch")) {
num_testEndPointSwitch++;
if (num_testEndPointSwitch == testEndPointSwitchNumber) {
LogWriterUtils.getLogWriter().info("testEndPointSwitch received expected events");
synchronized (lockObject3) {
lockObject3.notify();
}
}
}
if (event.getKey().toString().startsWith("testHADRFailOver")) {
num_testHADRFailOver++;
if (num_testHADRFailOver == thousandEntries * 2) {
LogWriterUtils.getLogWriter().info("testHADRFailOver received expected events");
synchronized (lockObject4) {
lockObject4.notify();
}
}
}
}
@Override
public void afterUpdate(EntryEvent event) {
sc.num_update_event++;
LogWriterUtils.getLogWriter()
.fine("MyListener:afterUpdate " + event.getKey() + ":" + event.getNewValue() + ":"
+ event.getOldValue() + ":num_update_event=" + sc.num_update_event + ":eventID="
+ ((EntryEventImpl) event).getEventId());
if (event.getOperation().isPutAll()) {
assertEquals("putAllCallback", event.getCallbackArgument());
}
if (delay) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// this can happen in this test because there are asynchronous
// operations being performed during shutdown
Thread.currentThread().interrupt();
}
return;
}
if (event.getKey().toString().contains("OldValue")) {
if (event.getOldValue() != null) {
num_oldValueInAfterUpdate++;
if (num_oldValueInAfterUpdate == numberOfEntries) {
LogWriterUtils.getLogWriter().info("received expected OldValue events");
synchronized (lockObject) {
lockObject.notify();
}
}
}
}
}
@Override
public void afterInvalidate(EntryEvent event) {
sc.num_invalidate_event++;
LogWriterUtils.getLogWriter().info("local invalidate is triggered for " + event.getKey()
+ ":num_invalidate_event=" + sc.num_invalidate_event);
}
@Override
public void afterDestroy(EntryEvent event) {
sc.num_destroy_event++;
if (event.getOperation().isRemoveAll()) {
assertEquals("removeAllCallback", event.getCallbackArgument());
}
LogWriterUtils.getLogWriter().info("local destroy is triggered for " + event.getKey()
+ ":num_destroy_event=" + sc.num_destroy_event);
}
}
/**
* we need cacheWriter for to slow down P2P operations, listener only works for c/s in this case
*/
static class MyWriter extends CacheWriterAdapter implements Declarable {
int exceptionAtItem = -1;
public int num_created;
public int num_destroyed;
public String keystub = null;
public MyWriter(int exceptionAtItem) {
this.exceptionAtItem = exceptionAtItem;
}
public MyWriter(String keystub) {
this.keystub = keystub;
}
@Override
public void init(Properties props) {}
@Override
public synchronized void beforeCreate(EntryEvent event) {
if (exceptionAtItem != -1 && num_created >= exceptionAtItem) {
throw new CacheWriterException(
"Triggered exception as planned, created " + num_created + " keys.");
}
LogWriterUtils.getLogWriter().info("MyWriter:beforeCreate " + event.getKey() + ":"
+ event.getNewValue() + "num_created=" + num_created);
if (event.getOperation().isPutAll()) {
assertEquals("putAllCallback", event.getCallbackArgument());
}
try {
if (keystub == null) {
Thread.sleep(50);
}
} catch (InterruptedException e) {
// this can happen in this test because there are asynchronous
// operations being performed during shutdown
Thread.currentThread().interrupt();
}
num_created++;
}
@Override
public void beforeUpdate(EntryEvent event) {
LogWriterUtils.getLogWriter()
.info("MyWriter:beforeUpdate " + event.getKey() + ":" + event.getNewValue());
if (event.getOperation().isPutAll()) {
assertEquals("putAllCallback", event.getCallbackArgument());
}
try {
if (keystub == null) {
Thread.sleep(50);
}
} catch (InterruptedException e) {
// this can happen in this test because there are asynchronous
// operations being performed during shutdown
Thread.currentThread().interrupt();
}
}
@Override
public void beforeDestroy(EntryEvent event) {
if (exceptionAtItem != -1 && num_destroyed >= exceptionAtItem) {
throw new CacheWriterException(
"Triggered exception as planned, destroyed " + num_destroyed + " keys.");
}
LogWriterUtils.getLogWriter().info(
"MyWriter:beforeDestroy " + event.getKey() + ":" + "num_destroyed=" + num_destroyed);
if (event.getOperation().isRemoveAll()) {
assertEquals("removeAllCallback", event.getCallbackArgument());
}
try {
if (keystub == null) {
Thread.sleep(50);
}
} catch (InterruptedException e) {
// this can happen in this test because there are asynchronous
// operations being performed during shutdown
Thread.currentThread().interrupt();
}
num_destroyed++;
}
}
}