blob: 1eb6d34be5c3d541c5c9f1f6460f41952c7a76df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache30;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.getInstance;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.AttributesMutator;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.ExpirationAction;
import org.apache.geode.cache.ExpirationAttributes;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.locks.DistributedLockStats;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.CachePerfStats;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.DistributedTombstoneOperation.TombstoneMessage;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.KeyInfo;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionMap;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.entries.AbstractRegionEntry;
import org.apache.geode.internal.cache.ha.HARegionQueue;
import org.apache.geode.internal.cache.partitioned.PRTombstoneMessage;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.ClientServerTest;
/**
* concurrency-control tests for client/server
*
*
*/
@Category({ClientServerTest.class})
public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
public static LocalRegion TestRegion;
@Before
public void setup() {
// for bug #50683 we need a short queue-removal-message processing interval
HARegionQueue.setMessageSyncInterval(5);
IgnoredException.addIgnoredException("java.net.ConnectException");
}
@Override
public final void preTearDownCacheTestCase() {
disconnectAllFromDS();
HARegionQueue.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
TestRegion = null;
}
/**
* GEODE-3519 servers are not locking on remove or invalidate ops initiated by clients
* <p>
* This test sets up two servers each with a client attached. The clients perform operations on
* the same key in a region which, in the servers, has Scope.GLOBAL. There should be no conflation
* and each operation should obtain a lock.
*
*/
@Test
public void testClientEventsAreNotConflatedByGlobalRegionOnServer() throws Exception {
VM[] serverVMs = new VM[] {Host.getHost(0).getVM(0), Host.getHost(0).getVM(1)};
VM[] clientVMs = new VM[] {Host.getHost(0).getVM(2), Host.getHost(0).getVM(3)};
final String name = getUniqueName() + "Region";
int[] serverPorts = new int[] {createServerRegion(serverVMs[0], name, true, Scope.GLOBAL),
createServerRegion(serverVMs[1], name, true, Scope.GLOBAL)};
for (int i = 0; i < clientVMs.length; i++) {
createClientRegion(clientVMs[i], name, serverPorts[i], false,
ClientRegionShortcut.CACHING_PROXY, false);
}
getBlackboard().initBlackboard();
final int numIterations = 500;
AsyncInvocation<?>[] asyncInvocations = new AsyncInvocation[clientVMs.length];
for (int i = 0; i < clientVMs.length; i++) {
final String clientGateName = "client" + i + "Ready";
asyncInvocations[i] = clientVMs[i].invokeAsync("doOps Thread", () -> {
doOps(name, numIterations, clientGateName);
});
getBlackboard().waitForGate(clientGateName, 30, SECONDS);
}
getBlackboard().signalGate("proceed");
for (final AsyncInvocation<?> asyncInvocation : asyncInvocations) {
asyncInvocation.join();
}
for (final VM serverVM : serverVMs) {
serverVM.invoke("verify thread", () -> {
verifyServerState(name, numIterations);
});
}
}
private void verifyServerState(String name, int numIterations) {
Cache cache = getCache();
DistributedRegion region = (DistributedRegion) cache.getRegion(name);
CachePerfStats stats = region.getCachePerfStats();
assertEquals(0, stats.getConflatedEventsCount());
DLockService dLockService = (DLockService) region.getLockService();
DistributedLockStats distributedLockStats = dLockService.getStats();
assertEquals(numIterations, distributedLockStats.getLockReleasesCompleted());
}
private void doOps(String name, int numIterations, String clientGateName) {
ClientCache cache = (ClientCache) getCache();
Region region = cache.getRegion(name);
getBlackboard().signalGate(clientGateName);
try {
getBlackboard().waitForGate("proceed", 30, SECONDS);
} catch (TimeoutException | InterruptedException e) {
throw new RuntimeException("failed to start", e);
}
String key = "lockingKey";
String value = "lockingValue";
for (int j = 0; j < numIterations; j++) {
int operation = j % 5;
switch (operation) {
case 0:
region.remove(key);
break;
case 1:
region.putIfAbsent(key, value);
break;
case 2:
region.invalidate(key);
break;
case 3:
region.replace(key, value);
break;
case 4:
region.replace(key, value, value);
break;
// case 5:
// remove(k,v) can't be included in this test as it checks the old value
// against what is in the local cache before sending the operation to the server
// region.remove(key, value);
// break;
default:
throw new RuntimeException("" + j + " % 5 == " + operation + "?");
}
}
}
@Test
public void testClientDoesNotExpireEntryPrematurely() throws Exception {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final String name = getUniqueName() + "Region";
final String key = "testKey";
int port = createServerRegion(vm0, name, true);
vm0.invoke(new SerializableCallable("create old entry") {
@Override
public Object call() throws Exception {
LocalRegion r = (LocalRegion) basicGetCache().getRegion(name);
r.put(key, "value");
AbstractRegionEntry entry = (AbstractRegionEntry) r.basicGetEntry(key);
// set an old timestamp in the entry - thirty minutes ago
entry.getVersionStamp().setVersionTimeStamp(System.currentTimeMillis() - 1800000L);
return null;
}
});
createClientRegion(vm1, name, port, true, ClientRegionShortcut.CACHING_PROXY, false);
vm1.invoke(new SerializableCallable("fetch entry and validate") {
@Override
public Object call() throws Exception {
final Long[] expirationTimeMillis = new Long[1];
int expirationSeconds = 1;
LocalRegion r = (LocalRegion) basicGetCache().getRegion(name);
AttributesMutator mutator = r.getAttributesMutator();
mutator.setEntryIdleTimeout(
new ExpirationAttributes(expirationSeconds, ExpirationAction.LOCAL_DESTROY));
mutator.addCacheListener(new CacheListenerAdapter() {
@Override
public void afterDestroy(EntryEvent event) {
expirationTimeMillis[0] = System.currentTimeMillis();
}
});
final long expirationTime = System.currentTimeMillis() + (expirationSeconds * 1000);
// Set the expiration time on the client entry.
r.get(key);
await("waiting for object to expire").until(() -> {
return expirationTimeMillis[0] != null;
});
disconnectFromDS();
assertTrue(
"entry expired " + (expirationTime - expirationTimeMillis[0]) + " milliseconds early",
expirationTimeMillis[0] >= expirationTime);
return null;
}
});
vm0.invoke(new SerializableRunnable() {
@Override
public void run() {
disconnectFromDS();
}
});
}
public ClientServerCCEDUnitTest() {
super();
}
@Test
public void testClientServerRRTombstoneGC() {
clientServerTombstoneGCTest(getUniqueName(), true);
}
@Test
public void testClientServerPRTombstoneGC() {
clientServerTombstoneGCTest(getUniqueName(), false);
}
@Test
public void testPutAllInNonCCEClient() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final String name = getUniqueName() + "Region";
int port = createServerRegion(vm0, name, true);
createClientRegion(vm1, name, port, false, ClientRegionShortcut.CACHING_PROXY);
doPutAllInClient(vm1);
}
/**
* test that distributed GC messages are sent to clients and properly processed
*
* @param replicatedRegion whether to use a RR or PR in the servers
*/
private void clientServerTombstoneGCTest(String uniqueName, boolean replicatedRegion) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final String name = uniqueName + "Region";
createServerRegion(vm0, name, replicatedRegion);
int port = createServerRegion(vm1, name, replicatedRegion);
createClientRegion(vm2, name, port, true, ClientRegionShortcut.CACHING_PROXY);
createClientRegion(vm3, name, port, true, ClientRegionShortcut.CACHING_PROXY);
createEntries(vm2);
destroyEntries(vm3);
unregisterInterest(vm3);
forceGC(vm0);
if (!replicatedRegion) {
// other bucket might be in vm1
forceGC(vm1);
}
checkClientReceivedGC(vm2);
checkClientDoesNotReceiveGC(vm3);
}
@Test
public void testTombstoneMessageSentToReplicatesAreNotProcessedInLine() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final String name = "Region";
createServerRegion(vm0, name, true);
createEntries(vm0);
createServerRegion(vm1, name, true);
try {
vm1.invoke(() -> {
DistributionMessageObserver.setInstance(new PRTombstoneMessageObserver());
});
destroyEntries(vm0);
forceGC(vm0);
vm1.invoke(() -> {
PRTombstoneMessageObserver mo =
(PRTombstoneMessageObserver) DistributionMessageObserver.getInstance();
await().until(() -> {
return mo.tsMessageProcessed >= 1;
});
assertTrue("Tombstone GC message is not expected.", mo.thName.contains(
"Pooled Message Processor "));
});
} finally {
vm1.invoke(() -> {
DistributionMessageObserver.setInstance(null);
});
}
}
@Test
public void testTombstoneGcMessagesAreOnlySentToPRNodesWithInterestRegistration() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final String name = "Region";
createServerRegion(vm0, name, false);
// Create all the buckets on this vm.
createEntries(vm0);
createServerRegion(vm1, name, false);
int port = createServerRegion(vm2, name, false);
// Create client and register interest on one server.
createClientRegion(vm3, name, port, true, ClientRegionShortcut.CACHING_PROXY);
try {
vm1.invoke(() -> {
DistributionMessageObserver.setInstance(new PRTombstoneMessageObserver());
});
vm2.invoke(() -> {
DistributionMessageObserver.setInstance(new PRTombstoneMessageObserver());
});
destroyEntries(vm0);
forceGC(vm0);
// vm2 should receive tombstone GC messages
vm2.invoke(() -> {
PRTombstoneMessageObserver mo =
(PRTombstoneMessageObserver) DistributionMessageObserver.getInstance();
// Should receive tombstone message for each bucket.
await().until(() -> {
return mo.prTsMessageProcessed >= 2;
});
assertEquals("Tombstone GC message is expected.", 2, mo.prTsMessageProcessed);
});
// Since there is no interest registered, vm1 should not receive any tombstone GC messages
vm1.invoke(() -> {
PRTombstoneMessageObserver mo =
(PRTombstoneMessageObserver) DistributionMessageObserver.getInstance();
assertEquals("Tombstone GC message is not expected.", 0, mo.prTsMessageProcessed);
});
} finally {
vm1.invoke(() -> {
DistributionMessageObserver.setInstance(null);
});
vm2.invoke(() -> {
DistributionMessageObserver.setInstance(null);
});
}
}
private class PRTombstoneMessageObserver extends DistributionMessageObserver {
public int tsMessageProcessed = 0;
public int prTsMessageProcessed = 0;
public String thName;
@Override
public void afterProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
thName = Thread.currentThread().getName();
if (message instanceof TombstoneMessage) {
tsMessageProcessed++;
}
if (message instanceof PRTombstoneMessage) {
prTsMessageProcessed++;
}
}
}
/**
* for bug #40791 we pull tombstones into clients on get(), getAll() and registerInterest() to
* protect the client cache from stray putAll events sitting in backup queues on the server
*/
@Test
public void testClientRIGetsTombstonesRR() throws Exception {
clientRIGetsTombstoneTest(getUniqueName(), true);
}
@Test
public void testClientRIGetsTombstonesPR() throws Exception {
clientRIGetsTombstoneTest(getUniqueName(), false);
}
/**
* test that clients receive tombstones in register-interest results
*/
private void clientRIGetsTombstoneTest(String uniqueName, boolean replicatedRegion) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final String name = uniqueName + "Region";
createServerRegion(vm0, name, replicatedRegion);
int port = createServerRegion(vm1, name, replicatedRegion);
createEntries(vm0);
destroyEntries(vm0);
LogWriterUtils.getLogWriter().info("***************** register interest on all keys");
createClientRegion(vm2, name, port, true, ClientRegionShortcut.CACHING_PROXY);
registerInterest(vm2);
ensureAllTombstonesPresent(vm2);
LogWriterUtils.getLogWriter()
.info("***************** clear cache and register interest on one key, Object0");
clearLocalCache(vm2);
registerInterestOneKey(vm2, "Object0");
List<String> keys = new ArrayList(1);
keys.add("Object0");
ensureAllTombstonesPresent(vm2, keys);
LogWriterUtils.getLogWriter()
.info("***************** clear cache and register interest on four keys");
clearLocalCache(vm2);
keys = new ArrayList(4);
for (int i = 0; i < 4; i++) {
keys.add("Object" + i);
}
registerInterest(vm2, keys);
ensureAllTombstonesPresent(vm2, keys);
LogWriterUtils.getLogWriter()
.info("***************** clear cache and register interest with regex on four keys");
clearLocalCache(vm2);
registerInterestRegex(vm2, "Object[0-3]");
ensureAllTombstonesPresent(vm2, keys);
LogWriterUtils.getLogWriter().info("***************** fetch entries with getAll()");
clearLocalCache(vm2);
getAll(vm2);
ensureAllTombstonesPresent(vm2);
}
@Test
public void testClientRIGetsInvalidEntriesRR() throws Exception {
clientRIGetsInvalidEntriesTest(getUniqueName(), true);
}
@Test
public void testClientRIGetsInvalidEntriesPR() throws Exception {
clientRIGetsInvalidEntriesTest(getUniqueName(), false);
}
private void clientRIGetsInvalidEntriesTest(String uniqueName, boolean replicatedRegion) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final String name = uniqueName + "Region";
createServerRegion(vm0, name, replicatedRegion);
int port = createServerRegion(vm1, name, replicatedRegion);
createEntries(vm0);
invalidateEntries(vm0);
LogWriterUtils.getLogWriter().info("***************** register interest on all keys");
createClientRegion(vm2, name, port, true, ClientRegionShortcut.CACHING_PROXY);
registerInterest(vm2);
ensureAllInvalidsPresent(vm2);
LogWriterUtils.getLogWriter()
.info("***************** clear cache and register interest on one key, Object0");
clearLocalCache(vm2);
registerInterestOneKey(vm2, "Object0");
List<String> keys = new ArrayList(1);
keys.add("Object0");
ensureAllInvalidsPresent(vm2, keys);
LogWriterUtils.getLogWriter()
.info("***************** clear cache and register interest on four keys");
clearLocalCache(vm2);
keys = new ArrayList(4);
for (int i = 0; i < 4; i++) {
keys.add("Object" + i);
}
registerInterest(vm2, keys);
ensureAllInvalidsPresent(vm2, keys);
LogWriterUtils.getLogWriter()
.info("***************** clear cache and register interest with regex on four keys");
clearLocalCache(vm2);
registerInterestRegex(vm2, "Object[0-3]");
ensureAllInvalidsPresent(vm2, keys);
LogWriterUtils.getLogWriter().info("***************** fetch entries with getAll()");
clearLocalCache(vm2);
getAll(vm2);
ensureAllInvalidsPresent(vm2);
}
@Test
public void testClientInvalidateAfterDestroyLeavesInvalidEntryRR() throws Exception {
clientInvalidateAfterDestroyLeavesInvalidEntryTest(getUniqueName(), true);
}
@Test
public void testClientInvalidateAfterDestroyLeavesInvalidEntryPR() throws Exception {
clientInvalidateAfterDestroyLeavesInvalidEntryTest(getUniqueName(), false);
}
private void clientInvalidateAfterDestroyLeavesInvalidEntryTest(String uniqueName,
boolean useReplicateRegion) {
Host host = Host.getHost(0);
VM serverVM = host.getVM(0);
VM clientVM = host.getVM(1);
final String name = uniqueName + "Region";
int port = createServerRegion(serverVM, name, useReplicateRegion);
createClientRegion(clientVM, name, port, true, ClientRegionShortcut.CACHING_PROXY, false);
final String key = "Object0";
// use the client cache to create and destroy an entry
clientVM.invoke(() -> {
TestRegion.put(key, "some value"); // v1
TestRegion.destroy(key); // v2
RegionMap map = TestRegion.getRegionMap();
AbstractRegionEntry regionEntry = (AbstractRegionEntry) map.getEntry(key);
assertEquals(Token.TOMBSTONE, regionEntry.getValueAsToken());
});
// use the server cache to recreate the entry, but don't let the client cache know about it
serverVM.invoke(() -> {
TestRegion.put(key, "new value"); // v3 - not known by client cache
});
// now invalidate the entry in the client cache and show that it holds an INVALID entry
clientVM.invoke(() -> {
RegionMap map = TestRegion.getRegionMap();
AbstractRegionEntry regionEntry = (AbstractRegionEntry) map.getEntry(key);
EntryEventImpl invalidateEvent = new EntryEventImpl();
invalidateEvent.setRegion(TestRegion);
invalidateEvent.setKeyInfo(new KeyInfo(key, Token.INVALID, null));
invalidateEvent.setOperation(Operation.INVALIDATE);
invalidateEvent.setEventId(new EventID(TestRegion.getCache().getDistributedSystem()));
// invoke invalidate() with forceNewEntry=true to have it create an INVALID entry
map.invalidate(invalidateEvent, true, true, false);
assertEquals(Token.INVALID, regionEntry.getValueAsToken());
System.out.println("entry=" + regionEntry);
assertEquals(4, regionEntry.getVersionStamp().getEntryVersion());
});
serverVM.invoke(() -> {
assertTrue(TestRegion.containsKey(key));
assertNull(TestRegion.get(key));
});
}
@Test
public void testClientCacheListenerDoesNotSeeTombstones() throws Exception {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final String name = getUniqueName() + "Region";
createServerRegion(vm0, name, true);
int port = createServerRegion(vm1, name, true);
createEntries(vm0);
destroyEntries(vm0);
LogWriterUtils.getLogWriter().info("***************** register interest on all keys");
createClientRegion(vm2, name, port, true, ClientRegionShortcut.PROXY);
vm2.invoke(
() -> TestRegion.getAttributesMutator().addCacheListener(new RecordingCacheListener()));
getAll(vm2);
vm2.invoke(() -> {
RecordingCacheListener listener = (RecordingCacheListener) TestRegion.getCacheListener();
assertEquals(Collections.emptyList(), listener.events);
});
}
private void registerInterest(VM vm) {
vm.invoke(new SerializableRunnable("register interest in all keys") {
@Override
public void run() {
TestRegion.registerInterestRegex(".*");
}
});
}
private void unregisterInterest(VM vm) {
vm.invoke(new SerializableRunnable("unregister interest in all keys") {
@Override
public void run() {
TestRegion.unregisterInterestRegex(".*");
}
});
}
private void registerInterest(VM vm, final List keys) {
vm.invoke(new SerializableRunnable("register interest in key list") {
@Override
public void run() {
TestRegion.registerInterest(keys);
}
});
}
private void registerInterestOneKey(VM vm, final String key) {
vm.invoke(new SerializableRunnable("register interest in " + key) {
@Override
public void run() {
TestRegion.registerInterest(key);
}
});
}
private void registerInterestRegex(VM vm, final String pattern) {
vm.invoke(new SerializableRunnable("register interest in key list") {
@Override
public void run() {
TestRegion.registerInterestRegex(pattern);
}
});
}
private void ensureAllTombstonesPresent(VM vm) {
vm.invoke(new SerializableCallable("check all are tombstones") {
@Override
public Object call() {
for (int i = 0; i < 10; i++) {
assertTrue("expected a tombstone for Object" + i,
TestRegion.containsTombstone("Object" + i));
}
return null;
}
});
}
private void ensureAllTombstonesPresent(VM vm, final List keys) {
vm.invoke(new SerializableCallable("check tombstones in list") {
@Override
public Object call() {
for (Object key : keys) {
assertTrue("expected to find a tombstone for " + key, TestRegion.containsTombstone(key));
}
return null;
}
});
}
private void ensureAllInvalidsPresent(VM vm) {
vm.invoke(new SerializableCallable("check all are tombstones") {
@Override
public Object call() {
for (int i = 0; i < 10; i++) {
assertTrue("expected to find an entry for Object" + i,
TestRegion.containsKey("Object" + i));
assertTrue("expected to find entry invalid for Object" + i,
!TestRegion.containsValue("Object" + i));
}
return null;
}
});
}
private void ensureAllInvalidsPresent(VM vm, final List keys) {
vm.invoke(new SerializableCallable("check tombstones in list") {
@Override
public Object call() {
for (Object key : keys) {
assertTrue("expected to find an entry for " + key, TestRegion.containsKey(key));
assertTrue("expected to find entry invalid for " + key, !TestRegion.containsValue(key));
}
return null;
}
});
}
/* do a getAll of all keys */
private void getAll(VM vm) {
vm.invoke(new SerializableRunnable("getAll for all keys") {
@Override
public void run() {
Set<String> keys = new HashSet();
for (int i = 0; i < 10; i++) {
keys.add("Object" + i);
}
Map result = TestRegion.getAll(keys);
for (int i = 0; i < 10; i++) {
assertNull("expected no result for Object" + i, result.get("Object" + i));
}
}
});
}
/* this should remove all entries from the region, including tombstones */
private void clearLocalCache(VM vm) {
vm.invoke(new SerializableRunnable("clear local cache") {
@Override
public void run() {
TestRegion.localClear();
}
});
}
// private void closeCache(VM vm) {
@Test
public void testClientServerRRQueueCleanup() { // see bug #50879 if this fails
clientServerTombstoneMessageTest(true);
}
@Test
public void testClientServerPRQueueCleanup() { // see bug #50879 if this fails
clientServerTombstoneMessageTest(false);
}
/**
* test that distributed GC messages are properly cleaned out of durable client HA queues
*/
private void clientServerTombstoneMessageTest(boolean replicatedRegion) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final String name = getUniqueName() + "Region";
int port1 = createServerRegion(vm0, name, replicatedRegion);
int port2 = createServerRegion(vm1, name, replicatedRegion);
createDurableClientRegion(vm2, name, port1, port2, true);
createDurableClientRegion(vm3, name, port1, port2, true);
createEntries(vm2);
destroyEntries(vm3);
forceGC(vm0);
if (!replicatedRegion) {
// other bucket might be in vm1
forceGC(vm1);
}
Wait.pause(5000); // better chance that WaitCriteria will succeed 1st time if we pause a bit
checkClientReceivedGC(vm2);
checkClientReceivedGC(vm3);
checkServerQueuesEmpty(vm0);
checkServerQueuesEmpty(vm1);
}
// private void closeCache(VM vm) {
// vm.invoke(new SerializableCallable() {
// public Object call() throws Exception {
// closeCache();
// return null;
// }
// });
// }
private void createEntries(VM vm) {
vm.invoke(new SerializableCallable("create entries") {
@Override
public Object call() {
for (int i = 0; i < 10; i++) {
TestRegion.create("Object" + i, i);
}
return null;
}
});
}
private void destroyEntries(VM vm) {
vm.invoke(new SerializableCallable("destroy entries") {
@Override
public Object call() {
for (int i = 0; i < 10; i++) {
TestRegion.destroy("Object" + i, i);
}
assertEquals(0, TestRegion.size());
if (TestRegion.getDataPolicy().isReplicate()) {
assertEquals(10, TestRegion.getTombstoneCount());
}
return null;
}
});
}
private void doPutAllInClient(VM vm) {
vm.invoke(new SerializableRunnable("do putAll") {
@Override
public void run() {
Map map = new HashMap();
for (int i = 1000; i < 1100; i++) {
map.put("object_" + i, i);
}
try {
TestRegion.putAll(map);
for (int i = 1000; i < 1100; i++) {
assertTrue("expected key object_" + i + " to be in the cache but it isn't",
TestRegion.containsKey("object_" + i));
}
} catch (NullPointerException e) {
Assert.fail("caught NPE", e);
}
}
});
}
private void invalidateEntries(VM vm) {
vm.invoke(new SerializableCallable("invalidate entries") {
@Override
public Object call() {
for (int i = 0; i < 10; i++) {
TestRegion.invalidate("Object" + i, i);
}
assertEquals(10, TestRegion.size());
return null;
}
});
}
private void forceGC(VM vm) {
vm.invoke(new SerializableCallable("force GC") {
@Override
public Object call() throws Exception {
TestRegion.getCache().getTombstoneService().forceBatchExpirationForTests(10);
return null;
}
});
}
private void checkClientReceivedGC(VM vm) {
vm.invoke(new SerializableCallable("check that GC happened") {
@Override
public Object call() throws Exception {
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
getLogWriter()
.info("tombstone count = " + TestRegion.getTombstoneCount());
getLogWriter().info("region size = " + TestRegion.size());
return TestRegion.getTombstoneCount() == 0 && TestRegion.size() == 0;
}
@Override
public String description() {
return "waiting for garbage collection to occur";
}
};
GeodeAwaitility.await().untilAsserted(wc);
return null;
}
});
}
private void checkServerQueuesEmpty(VM vm) {
vm.invoke(new SerializableCallable(
"check that client queues are properly cleared of old ClientTombstone messages") {
@Override
public Object call() throws Exception {
WaitCriterion wc = new WaitCriterion() {
// boolean firstTime = true;
@Override
public boolean done() {
CacheClientNotifier singleton = getInstance();
Collection<CacheClientProxy> proxies = singleton.getClientProxies();
// boolean first = firstTime;
// firstTime = false;
for (CacheClientProxy proxy : proxies) {
if (!proxy.isPrimary()) { // bug #50683 only applies to backup queues
int size = proxy.getQueueSize();
if (size > 0) {
// if (first) {
// ((LocalRegion)proxy.getHARegion()).dumpBackingMap();
// }
getLogWriter()
.info("queue size (" + size + ") is still > 0 for " + proxy.getProxyID());
return false;
}
}
}
// also ensure that server regions have been cleaned up
int regionEntryCount = TestRegion.getRegionMap().size();
if (regionEntryCount > 0) {
getLogWriter()
.info("TestRegion has unexpected entries - all should have been GC'd but we have "
+ regionEntryCount);
TestRegion.dumpBackingMap();
return false;
}
return true;
}
@Override
public String description() {
return "waiting for queue removal messages to clear client queues";
}
};
GeodeAwaitility.await().untilAsserted(wc);
return null;
}
});
}
private void checkClientDoesNotReceiveGC(VM vm) {
vm.invoke(new SerializableCallable("check that GC did not happen") {
@Override
public Object call() throws Exception {
if (TestRegion.getTombstoneCount() == 0) {
LogWriterUtils.getLogWriter().warning("region has no tombstones");
// TestRegion.dumpBackingMap();
throw new AssertionError("expected to find tombstones but region is empty");
}
return null;
}
});
}
private int createServerRegion(VM vm, final String regionName, final boolean replicatedRegion) {
return createServerRegion(vm, regionName, replicatedRegion, Scope.DISTRIBUTED_ACK);
}
private int createServerRegion(VM vm, final String regionName, final boolean replicatedRegion,
Scope regionScope) {
SerializableCallable createRegion = new SerializableCallable() {
@Override
public Object call() throws Exception {
// TombstoneService.VERBOSE = true;
AttributesFactory af = new AttributesFactory();
if (replicatedRegion) {
af.setScope(regionScope);
af.setDataPolicy(DataPolicy.REPLICATE);
} else {
af.setDataPolicy(DataPolicy.PARTITION);
af.setPartitionAttributes(
(new PartitionAttributesFactory()).setTotalNumBuckets(2).create());
}
af.setConcurrencyChecksEnabled(true);
TestRegion = (LocalRegion) createRootRegion(regionName, af.create());
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailableTCPPort();
server.setPort(port);
server.start();
return port;
}
};
return (Integer) vm.invoke(createRegion);
}
private void createClientRegion(final VM vm, final String regionName, final int port,
final boolean ccEnabled, final ClientRegionShortcut clientRegionShortcut) {
createClientRegion(vm, regionName, port, ccEnabled, clientRegionShortcut, true);
}
private void createClientRegion(final VM vm, final String regionName, final int port,
final boolean ccEnabled, final ClientRegionShortcut clientRegionShortcut,
final boolean registerInterest) {
SerializableCallable createRegion = new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port);
if (registerInterest) {
cf.setPoolSubscriptionEnabled(true);
}
cf.set(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
ClientCache cache = getClientCache(cf);
ClientRegionFactory crf = cache.createClientRegionFactory(clientRegionShortcut);
crf.setConcurrencyChecksEnabled(ccEnabled);
crf.setStatisticsEnabled(true);
TestRegion = (LocalRegion) crf.create(regionName);
if (registerInterest) {
TestRegion.registerInterestRegex(".*", InterestResultPolicy.KEYS_VALUES, false, true);
}
return null;
}
};
vm.invoke(createRegion);
}
// For durable client QRM testing we need a backup queue (redundancy=1) and
// durable attributes. We also need to invoke readyForEvents()
private void createDurableClientRegion(final VM vm, final String regionName, final int port1,
final int port2, final boolean ccEnabled) {
SerializableCallable createRegion = new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port1);
cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port2);
cf.setPoolSubscriptionEnabled(true);
cf.setPoolSubscriptionRedundancy(1);
// bug #50683 - secondary durable queue retains all GC messages
cf.set(DURABLE_CLIENT_ID, "" + vm.getId());
cf.set(DURABLE_CLIENT_TIMEOUT, "" + 200);
cf.set(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
ClientCache cache = getClientCache(cf);
ClientRegionFactory crf =
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
crf.setConcurrencyChecksEnabled(ccEnabled);
TestRegion = (LocalRegion) crf.create(regionName);
TestRegion.registerInterestRegex(".*", InterestResultPolicy.KEYS_VALUES, true, true);
cache.readyForEvents();
return null;
}
};
vm.invoke(createRegion);
}
private static class RecordingCacheListener extends CacheListenerAdapter {
List<EntryEvent> events = new ArrayList<>();
@Override
public void afterCreate(final EntryEvent event) {
events.add(event);
}
@Override
public void afterDestroy(final EntryEvent event) {
events.add(event);
}
@Override
public void afterInvalidate(final EntryEvent event) {
events.add(event);
}
@Override
public void afterUpdate(final EntryEvent event) {
events.add(event);
}
}
}