blob: 36d35ec10689e4d16f258f53d3ae5f775ea9407a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.DELTA_PROPAGATION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheLoaderException;
import org.apache.geode.cache.ClientSession;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.InterestRegistrationEvent;
import org.apache.geode.cache.InterestRegistrationListener;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.LoaderHelper;
import org.apache.geode.cache.NoSubscriptionServersAvailableException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* Test Scenario :
*
* Test 1: testInterestListRegistration()
*
* one server two clients create Entries in all the vms c1 : register (k1) c2 : register (k2) c1 :
* put (k1 -> vm1-k1) AND (k2 -> vm1-k2) c2 : validate (k1 == k1) AND (k2 == vm1-k2) // as k1 is not
* registered c2 : put (k1 -> vm2-k1) c1 : validate (k1 == vm2-k1) AND (k2 == vm1-k2)// as k2 is not
* registered c1 : unregister(k1) c2 : unregister(k2) c1 : put (k1 -> vm1-k1-again) AND (k2 ->
* vm1-k2-again) c2 : validate (k1 == vm2-k1) AND (k2 == vm2-k2) // as both are not registered c2 :
* put (k1 -> vm2-k1-again) AND (k2 -> vm2-k2-again) c1 : validate (k1 == vm1-k1-again) AND (k2 ==
* vm1-k2-again)// as both are not registered
*
* Test2: testInterestListRegistration_ALL_KEYS
*
* one server two clients create Entries in all the vms register ALL_KEYS and verifies that updates
* are receving to all the keys
*
*
* Test3: testInitializationOfRegionFromInterestList
*
* one server two clients create Entries in all the vms server directly puts some values then both
* client connects to the server c1 register(k1,k2,k3) and c2 register (k4,k5) then verify that
* updates has occurred as a result of interest registration.
*/
@Category({ClientSubscriptionTest.class})
public class InterestListDUnitTest extends JUnit4DistributedTestCase {
private static final String REGION_NAME = "InterestListDUnitTest_region";
// using a Integer instead of String to make sure ALL_KEYS works on non-String keys
private static final Integer key1 = new Integer(1);
private static final Integer key2 = new Integer(2);
private static final String key1_originalValue = "key-1-orig-value";
private static final String key2_originalValue = "key-2-orig-value";
private static Cache cache = null;
/** some tests use this to hold the server for invoke() access */
private static CacheServer server;
/** interestListener listens in cache server vms */
private static InterestListener interestListener;
private VM vm0 = null;
private VM vm1 = null;
private VM vm2 = null;
/** the server cache's port number */
private int PORT1;
@Override
public final void postSetUp() throws Exception {
disconnectAllFromDS();
final Host host = Host.getHost(0);
vm0 = host.getVM(0);
vm1 = host.getVM(1);
vm2 = host.getVM(2);
// start servers first
PORT1 = vm0.invoke(() -> InterestListDUnitTest.createServerCache());
}
@Override
public final void preTearDown() throws Exception {
// close the clients first
vm1.invoke(() -> InterestListDUnitTest.closeCache());
vm2.invoke(() -> InterestListDUnitTest.closeCache());
// then close the servers
vm0.invoke(() -> InterestListDUnitTest.closeCache());
cache = null;
server = null;
interestListener = null;
Invoke.invokeInEveryVM(new SerializableRunnable() {
@Override
public void run() {
cache = null;
server = null;
interestListener = null;
}
});
}
/**
* one server two clients create Entries in all the vms c1 : register (k1) c2 : register (k2) c1 :
* put (k1 -> vm1-k1) AND (k2 -> vm1-k2) c2 : validate (k1 == k1) AND (k2 == vm1-k2) // as k1 is
* not registered c2 : put (k1 -> vm2-k1) c1 : validate (k1 == vm2-k1) AND (k2 == vm1-k2)// as k2
* is not registered c1 : unregister(k1) c2 : unregister(k2) c1 : put (k1 -> vm1-k1-again) AND (k2
* -> vm1-k2-again) c2 : validate (k1 == vm2-k1) AND (k2 == vm2-k2) // as both are not registered
* c2 : put (k1 -> vm2-k1-again) AND (k2 -> vm2-k2-again) c1 : validate (k1 == vm1-k1-again) AND
* (k2 == vm1-k2-again)// as both are not registered
*/
@Test
public void testInterestListRegistration() throws Exception {
vm1.invoke(() -> InterestListDUnitTest
.createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), new Integer(PORT1)));
vm2.invoke(() -> InterestListDUnitTest
.createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), new Integer(PORT1)));
vm1.invoke(() -> InterestListDUnitTest.createEntriesK1andK2());
vm2.invoke(() -> InterestListDUnitTest.createEntriesK1andK2());
vm1.invoke(() -> InterestListDUnitTest.registerKey(key1));
vm2.invoke(() -> InterestListDUnitTest.registerKey(key2));
vm1.invoke(() -> InterestListDUnitTest.put("vm1"));
Wait.pause(10000);
vm2.invoke(() -> InterestListDUnitTest.validateEntriesK1andK2("vm2"));
vm2.invoke(() -> InterestListDUnitTest.put("vm2"));
Wait.pause(10000);
vm1.invoke(() -> InterestListDUnitTest.validateEntriesK1andK2("vm1"));
vm1.invoke(() -> InterestListDUnitTest.unregisterKey(key1));
vm2.invoke(() -> InterestListDUnitTest.unregisterKey(key2));
vm1.invoke(() -> InterestListDUnitTest.putAgain("vm1"));
Wait.pause(10000);
vm2.invoke(() -> InterestListDUnitTest.validateEntriesAgain("vm2"));
vm2.invoke(() -> InterestListDUnitTest.putAgain("vm2"));
Wait.pause(10000);
vm1.invoke(() -> InterestListDUnitTest.validateEntriesAgain("vm1"));
}
/**
* one server two clients create Entries in all the vms
*
* STEP 1: c2: put (k2 -> vm-k2) c1: validate k2 == k2 (not updated because no interest)
*
* STEP 2 c1: register k2 c1 : validate k2 == vm-k2 (updated because of registerInterest) c1:
* validate k1 == k1 (other key not updated because still no interest)
*
* STEP 3: c1: put (k1 -> vm-k1) c2: validate k1 == k1 (not updated because no interest) c2:
* register k1 c2: validate k1 == vm-k1 (updated because of registerInterest)
*
* STEP 4: c2: unregister k1 c1: put k1->k1 (old value) c2: validate k1 == vm-k1 (no interest, so
* missing update)
*/
@Test
public void testValueRefresh() throws Exception {
// Initialization
Host host = Host.getHost(0);
vm1.invoke(() -> InterestListDUnitTest.createClientCache(NetworkUtils.getServerHostName(host),
new Integer(PORT1)));
vm2.invoke(() -> InterestListDUnitTest.createClientCache(NetworkUtils.getServerHostName(host),
new Integer(PORT1)));
vm1.invoke(() -> InterestListDUnitTest.createEntriesK1andK2());
vm2.invoke(() -> InterestListDUnitTest.createEntriesK1andK2());
// STEP 1
vm2.invoke(() -> InterestListDUnitTest.putSingleEntry(key2, "vm2"));
Wait.pause(2000);
vm1.invoke(() -> InterestListDUnitTest.validateSingleEntry(key2, key2_originalValue));
// STEP 2
// Force key2 to synchronize with server cache
vm1.invoke(() -> InterestListDUnitTest.registerKey(key2));
// Verify that new value is present
vm1.invoke(() -> InterestListDUnitTest.validateSingleEntry(key2, "vm2")); // value now magically
// changed
// but the other key should not have changed
vm1.invoke(() -> InterestListDUnitTest.validateSingleEntry(key1, key1_originalValue)); // still
// unchanged
// STEP 3
vm1.invoke(() -> InterestListDUnitTest.putSingleEntry(key1, "vm1"));
Wait.pause(2000);
vm2.invoke(() -> InterestListDUnitTest.validateSingleEntry(key1, key1_originalValue)); // still
// unchanged
vm2.invoke(() -> InterestListDUnitTest.registerKey(key1));
// Verify that new value is present
vm2.invoke(() -> InterestListDUnitTest.validateSingleEntry(key1, "vm1")); // value now magically
// changed
// STEP 4
// unregister on one key
vm2.invoke(() -> InterestListDUnitTest.unregisterKey(key1));
vm1.invoke(() -> InterestListDUnitTest.putSingleEntry(key1, key1_originalValue));
Wait.pause(2000);
vm2.invoke(() -> InterestListDUnitTest.validateSingleEntry(key1, "vm1")); // update lost
}
/**
* one server two clients create Entries in all the vms register ALL_KEYS and verifies that
* updates are receiving to all the keys
*/
@Test
public void testInterestListRegistration_ALL_KEYS() throws Exception {
vm1.invoke(() -> InterestListDUnitTest
.createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), new Integer(PORT1)));
vm2.invoke(() -> InterestListDUnitTest
.createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), new Integer(PORT1)));
vm1.invoke(() -> InterestListDUnitTest.createEntriesK1andK2());
vm2.invoke(() -> InterestListDUnitTest.createEntriesK1andK2());
vm2.invoke(() -> InterestListDUnitTest.registerALL_KEYS());
vm1.invoke(() -> InterestListDUnitTest.put_ALL_KEYS());
Wait.pause(10000);
vm2.invoke(() -> InterestListDUnitTest.validate_ALL_KEYS());
}
/**
* one server two clients create Entries in all the vms server directly puts some values then both
* clients connect to the server c1 register(k1,k2,k3) and c2 register (k4,k5) then verify that
* updates has occurred as a result of interest registration.
*/
@Test
public void testInitializationOfRegionFromInterestList() throws Exception {
// directly put on server
vm0.invoke(() -> InterestListDUnitTest.multiple_put());
Wait.pause(1000);
// create clients to connect to that server
vm1.invoke(() -> InterestListDUnitTest
.createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), new Integer(PORT1)));
vm2.invoke(() -> InterestListDUnitTest
.createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), new Integer(PORT1)));
// register interest
vm1.invoke(() -> InterestListDUnitTest.registerKeys());
vm2.invoke(() -> InterestListDUnitTest.registerKeysAgain());
Wait.pause(10000);
// verify the values for registered keys
vm1.invoke(() -> InterestListDUnitTest.validateRegionEntriesFromInterestListInVm1());
vm2.invoke(() -> InterestListDUnitTest.validateRegionEntriesFromInterestListInVm2());
}
/**
* one server two clients create Entries in all the vms s1 : register (k1) for c1 s1 : register
* (k2) for c2 c1 : put (k1 -> vm1-k1) AND (k2 -> vm1-k2) c2 : validate (k1 == k1) AND (k2 ==
* vm1-k2) // as k1 is not registered c2 : put (k1 -> vm2-k1) c1 : validate (k1 == vm2-k1) AND (k2
* == vm1-k2)// as k2 is not registered s1 : unregister(k1) for c1 s1 : unregister(k2) for c2 c1 :
* put (k1 -> vm1-k1-again) AND (k2 -> vm1-k2-again) c2 : validate (k1 == vm2-k1) AND (k2 ==
* vm2-k2) // as both are not registered c2 : put (k1 -> vm2-k1-again) AND (k2 -> vm2-k2-again) c1
* : validate (k1 == vm1-k1-again) AND (k2 == vm1-k2-again)// as both are not registered
*/
@Test
public void testInterestListRegistrationOnServer() throws Exception {
DistributedMember c1 = (DistributedMember) vm1.invoke(() -> InterestListDUnitTest
.createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), PORT1));
DistributedMember c2 = (DistributedMember) vm2.invoke(() -> InterestListDUnitTest
.createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), PORT1));
vm1.invoke(() -> InterestListDUnitTest.createEntriesK1andK2());
vm2.invoke(() -> InterestListDUnitTest.createEntriesK1andK2());
vm0.invoke(() -> InterestListDUnitTest.registerKeyForClient(c1, key1));
vm0.invoke(() -> InterestListDUnitTest.registerKeyForClient(c2, key2));
vm0.invoke(() -> InterestListDUnitTest.flushQueues());
vm1.invoke(() -> InterestListDUnitTest.put("vm1"));
vm0.invoke(() -> InterestListDUnitTest.flushQueues());
vm2.invoke(() -> InterestListDUnitTest.validateEntriesK1andK2("vm2"));
vm2.invoke(() -> InterestListDUnitTest.put("vm2"));
vm0.invoke(() -> InterestListDUnitTest.flushQueues());
vm1.invoke(() -> InterestListDUnitTest.validateEntriesK1andK2("vm1"));
vm0.invoke(() -> InterestListDUnitTest.unregisterKeyForClient(c1, key1));
vm0.invoke(() -> InterestListDUnitTest.unregisterKeyForClient(c2, key2));
vm1.invoke(() -> InterestListDUnitTest.putAgain("vm1"));
vm0.invoke(() -> InterestListDUnitTest.flushQueues());
vm2.invoke(() -> InterestListDUnitTest.validateEntriesAgain("vm2"));
vm2.invoke(() -> InterestListDUnitTest.putAgain("vm2"));
vm0.invoke(() -> InterestListDUnitTest.flushQueues());
vm1.invoke(() -> InterestListDUnitTest.validateEntriesAgain("vm1"));
}
/**
* two servers one client create Entries in all the vms register interest in various ways and
* ensure that registration listeners are properly invoked
*/
@Test
public void testInterestRegistrationListeners() throws Exception {
int port2;
createCache();
server = addCacheServer();
port2 = server.getPort();
addRegisterInterestListener();
vm0.invoke(() -> InterestListDUnitTest.addRegisterInterestListener());
// servers are set up, now do the clients
DistributedMember c1 = (DistributedMember) vm1.invoke(() -> InterestListDUnitTest
.createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), PORT1, port2));
DistributedMember c2 = (DistributedMember) vm2.invoke(() -> InterestListDUnitTest
.createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), PORT1, port2));
vm1.invoke(() -> InterestListDUnitTest.createEntriesK1andK2());
vm2.invoke(() -> InterestListDUnitTest.createEntriesK1andK2());
// interest registration from clients should cause listeners to be invoked
// in both servers
LogWriterUtils.getLogWriter().info("test phase 1");
vm1.invoke(() -> InterestListDUnitTest.registerKey(key1));
vm2.invoke(() -> InterestListDUnitTest.registerKey(key2));
Integer zero = new Integer(0);
Integer two = new Integer(2);
interestListener.verifyCountsAndClear(2, 0);
vm0.invoke(() -> InterestListDUnitTest.verifyCountsAndClear(two, zero));
// unregistration from clients should invoke listeners on both servers
LogWriterUtils.getLogWriter().info("test phase 2");
vm1.invoke(() -> InterestListDUnitTest.unregisterKey(key1));
vm2.invoke(() -> InterestListDUnitTest.unregisterKey(key2));
interestListener.verifyCountsAndClear(0, 2);
vm0.invoke(() -> InterestListDUnitTest.verifyCountsAndClear(zero, two));
// now the primary server for eache client will register and unregister
LogWriterUtils.getLogWriter().info("test phase 3");
registerKeyForClient(c1, key1);
vm0.invoke(() -> InterestListDUnitTest.registerKeyForClient(c1, key1));
registerKeyForClient(c2, key2);
vm0.invoke(() -> InterestListDUnitTest.registerKeyForClient(c2, key2));
interestListener.verifyCountsAndClear(2, 0);
vm0.invoke(() -> InterestListDUnitTest.verifyCountsAndClear(two, zero));
LogWriterUtils.getLogWriter().info("test phase 4");
unregisterKeyForClient(c1, key1);
vm0.invoke(() -> InterestListDUnitTest.unregisterKeyForClient(c1, key1));
unregisterKeyForClient(c2, key2);
vm0.invoke(() -> InterestListDUnitTest.unregisterKeyForClient(c2, key2));
interestListener.verifyCountsAndClear(0, 2);
vm0.invoke(() -> InterestListDUnitTest.verifyCountsAndClear(zero, two));
}
/**
* This tests whether an exception is thrown in register/unregister when no server is available.
*/
@Test
public void testNoAvailableServer() throws Exception {
// Register interest in key1.
vm1.invoke(() -> InterestListDUnitTest
.createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), new Integer(PORT1)));
vm1.invoke(() -> InterestListDUnitTest.registerKey(key1));
// Stop the server.
vm0.invoke(() -> InterestListDUnitTest.closeCache());
// Try to unregister interest in key1 -- should not throw an exception.
vm1.invoke(() -> InterestListDUnitTest.unregisterKeyEx(key1));
// Now try registration of interest in key2 -- should throw an exception.
vm1.invoke(() -> InterestListDUnitTest.registerKeyEx(key2));
}
@Test
public void testRegisterInterestOnReplicatedRegionWithCacheLoader() {
runRegisterInterestWithCacheLoaderTest(true);
}
@Test
public void testRegisterInterestOnPartitionedRegionWithCacheLoader() throws Exception {
runRegisterInterestWithCacheLoaderTest(false);
}
private void runRegisterInterestWithCacheLoaderTest(boolean addReplicatedRegion) {
// Stop the server (since it was already started with a replicated region)
vm0.invoke(() -> InterestListDUnitTest.closeCache());
// Start two servers with the appropriate region
int port1 =
((Integer) vm0.invoke(() -> InterestListDUnitTest.createServerCache(addReplicatedRegion)))
.intValue();
VM server2VM = Host.getHost(0).getVM(3);
int port2 = ((Integer) server2VM
.invoke(() -> InterestListDUnitTest.createServerCache(addReplicatedRegion))).intValue();
// Add a cache loader to the region in both cache servers
vm0.invoke(() -> InterestListDUnitTest.addCacheLoader());
server2VM.invoke(() -> InterestListDUnitTest.addCacheLoader());
// Create client cache
vm1.invoke(() -> InterestListDUnitTest
.createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), port1, port2));
// Register interest in all keys
vm1.invoke(() -> InterestListDUnitTest.registerALL_KEYS());
// Add CacheListener
int numEvents = 100;
vm1.invoke(() -> InterestListDUnitTest.addCacheListener(numEvents));
// Do gets on the client
vm1.invoke(() -> InterestListDUnitTest.doGets(numEvents));
// Wait for cache listener create events
vm1.invoke(() -> InterestListDUnitTest.waitForCacheListenerCreates());
// Confirm there are no cache listener update events
vm1.invoke(() -> InterestListDUnitTest.confirmNoCacheListenerUpdates());
// Confirm there are no cache listener invalidate events
vm1.invoke(() -> InterestListDUnitTest.confirmNoCacheListenerInvalidates());
}
@Test
public void testRegisterInterestSingleKeyWithDestroyOnReplicatedRegionWithCacheLoader() {
List keysToDestroy = new ArrayList();
keysToDestroy.add("0");
runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, keysToDestroy);
}
@Test
public void testRegisterInterestSingleKeyWithDestroyOnPartitionedRegionWithCacheLoader() {
List keysToDestroy = new ArrayList();
keysToDestroy.add("0");
runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, keysToDestroy);
}
@Test
public void testRegisterInterestListOfKeysWithDestroyOnReplicatedRegionWithCacheLoader() {
List keysToDestroy = new ArrayList();
for (int i = 0; i < 5; i++) {
keysToDestroy.add(String.valueOf(i));
}
runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, keysToDestroy);
}
@Test
public void testRegisterInterestListOfKeysWithDestroyOnPartitionedRegionWithCacheLoader() {
List keysToDestroy = new ArrayList();
for (int i = 0; i < 5; i++) {
keysToDestroy.add(String.valueOf(i));
}
runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, keysToDestroy);
}
@Test
public void testRegisterInterestAllKeysWithDestroyOnReplicatedRegionWithCacheLoader() {
List keysToDestroy = new ArrayList();
keysToDestroy.add("0");
runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, "ALL_KEYS");
}
@Test
public void testRegisterInterestAllKeysWithDestroyOnPartitionedRegionWithCacheLoader() {
List keysToDestroy = new ArrayList();
keysToDestroy.add("0");
runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, "ALL_KEYS");
}
private void runRegisterInterestWithDestroyAndCacheLoaderTest(boolean addReplicatedRegion,
List keysToDestroy, Object keyToRegister) {
// The server was already started with a replicated region. Bounce it if necessary
int port1 = PORT1;
if (!addReplicatedRegion) {
vm0.invoke(() -> closeCache());
port1 =
((Integer) vm0.invoke(() -> InterestListDUnitTest.createServerCache(addReplicatedRegion)))
.intValue();
}
final int port = port1;
// Add a cache loader to the region
vm0.invoke(() -> addCacheLoader());
// Create client cache
vm1.invoke(() -> createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), port));
// Destroy appropriate key(s)
vm1.invoke(() -> destroyKeys(keysToDestroy));
// Register interest in appropriate keys(s)
vm1.invoke(() -> registerKey(keyToRegister));
// Verify CacheLoader was not invoked
vm0.invoke(() -> verifyNoCacheLoaderLoads());
}
private void createCache(Properties props) throws Exception {
DistributedSystem ds = getSystem(props);
cache = CacheFactory.create(ds);
assertNotNull(cache);
}
private static DistributedMember createClientCache(String host, int port) throws Exception {
return createClientCache(host, port, 0);
}
private static DistributedMember createClientCache(String host, int port, int port2)
throws Exception {
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
props.setProperty(DELTA_PROPAGATION, "false");
new InterestListDUnitTest().createCache(props);
PoolFactory pfactory = PoolManager.createFactory().addServer(host, port)
.setMinConnections(3).setSubscriptionEnabled(true)
.setSubscriptionRedundancy(-1).setReadTimeout(10000).setSocketBufferSize(32768);
// .setRetryInterval(10000)
// .setRetryAttempts(5)
if (port2 > 0) {
pfactory.addServer(host, port2);
}
Pool p = pfactory.create("InterestListDUnitTestPool");
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setPoolName(p.getName());
RegionAttributes attrs = factory.create();
cache.createRegion(REGION_NAME, attrs);
return cache.getDistributedSystem().getDistributedMember();
}
private static void createCache() throws Exception {
createCache(true);
}
private static void createCache(boolean addReplicatedRegion) throws Exception {
Properties props = new Properties();
props.setProperty(DELTA_PROPAGATION, "false");
new InterestListDUnitTest().createCache(props);
if (addReplicatedRegion) {
addReplicatedRegion();
} else {
addPartitionedRegion();
}
}
private static void addReplicatedRegion() {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEnableSubscriptionConflation(true);
factory.setDataPolicy(DataPolicy.REPLICATE);
RegionAttributes attrs = factory.create();
cache.createRegion(REGION_NAME, attrs);
}
private static void addPartitionedRegion() {
cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT).create(REGION_NAME);
}
private static CacheServer addCacheServer() throws Exception {
CacheServer s = cache.addCacheServer();
int port = getRandomAvailableTCPPort();
s.setPort(port);
s.start();
return s;
}
// this method is for use in vm0 where the CacheServer used by
// most of these tests resides. This server is held in the
// static variable 'server1'
private static Integer createServerCache() throws Exception {
return createServerCache(true);
}
private static Integer createServerCache(boolean addReplicatedRegion) throws Exception {
createCache(addReplicatedRegion);
server = addCacheServer();
return new Integer(server.getPort());
}
/** wait for queues to drain in the server */
private static void flushQueues() throws Exception {
CacheServerImpl impl = (CacheServerImpl) server;
for (CacheClientProxy proxy : (Set<CacheClientProxy>) impl.getAllClientSessions()) {
final CacheClientProxy fproxy = proxy;
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return fproxy.getHARegionQueue().size() == 0;
}
@Override
public String description() {
return "waiting for queues to drain for " + fproxy.getProxyID();
}
};
GeodeAwaitility.await().untilAsserted(ev);
}
}
private static void addRegisterInterestListener() {
interestListener = new InterestListener();
List<CacheServer> servers = cache.getCacheServers();
for (CacheServer s : servers) {
s.registerInterestRegistrationListener(interestListener);
}
}
private static void addCacheLoader() {
// Add a cache loader
Region region = cache.getRegion(REGION_NAME);
region.getAttributesMutator().setCacheLoader(new ReturnKeyCacheLoader());
}
private static void addCacheListener(int expectedCreates) {
// Add a cache listener to count the number of create and update events
Region region = cache.getRegion(REGION_NAME);
region.getAttributesMutator().addCacheListener(new EventCountingCacheListener(expectedCreates));
}
private static void doGets(int numGets) {
// Do gets.
// These gets cause the CacheLoader to be invoked on the server
Region region = cache.getRegion(REGION_NAME);
for (int i = 0; i < numGets; i++) {
region.get(i);
}
}
private static void waitForCacheListenerCreates() throws Exception {
// Wait for the EventCountingCacheListener to receive all of its create events
Region region = cache.getRegion(REGION_NAME);
final EventCountingCacheListener fCacheListener =
(EventCountingCacheListener) region.getAttributes().getCacheListener();
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return fCacheListener.hasReceivedAllCreateEvents();
}
@Override
public String description() {
return "waiting for " + fCacheListener.getExpectedCreates() + " create events";
}
};
GeodeAwaitility.await().untilAsserted(ev);
}
private static void confirmNoCacheListenerUpdates() throws Exception {
// Confirm there are no EventCountingCacheListener update events.
// These would be coming from the subscription channel.
Region region = cache.getRegion(REGION_NAME);
EventCountingCacheListener cacheListener =
(EventCountingCacheListener) region.getAttributes().getCacheListener();
assertEquals(0/* expected */, cacheListener.getUpdates()/* actual */);
}
private static void confirmNoCacheListenerInvalidates() throws Exception {
// Confirm there are no EventCountingCacheListener invalidate events.
// These would be coming from the subscription channel.
Region region = cache.getRegion(REGION_NAME);
EventCountingCacheListener cacheListener =
(EventCountingCacheListener) region.getAttributes().getCacheListener();
assertEquals(0/* expected */, cacheListener.getInvalidates()/* actual */);
}
private static void verifyCountsAndClear(int count1, int count2) {
interestListener.verifyCountsAndClear(count1, count2);
}
private static void createEntriesK1andK2() {
try {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
if (!r.containsKey(key1)) {
r.create(key1, key1_originalValue);
}
if (!r.containsKey(key2)) {
r.create(key2, key2_originalValue);
}
// Verify that no invalidates occurred to this region
assertEquals(r.getEntry(key1).getValue(), key1_originalValue);
assertEquals(r.getEntry(key2).getValue(), key2_originalValue);
} catch (Exception ex) {
Assert.fail("failed while createEntries()", ex);
}
}
private static void registerKeyOnly(Object key) {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
r.registerInterest(key);
}
private static void registerKey(Object key) {
try {
registerKeyOnly(key);
} catch (Exception ex) {
Assert.fail("failed while registering key(" + key + ")", ex);
}
}
/**
* exercises the ClientSession interface to register interest in a server on behalf of a client
*
* @param clientId the DM of the client
* @param key the key that the client is interested in
*/
private static void registerKeyForClient(DistributedMember clientId, Object key) {
try {
ClientSession cs = server.getClientSession(clientId);
if (cs.isPrimary()) {
cs.registerInterest(SEPARATOR + REGION_NAME, key, InterestResultPolicy.KEYS_VALUES,
false);
}
} catch (Exception ex) {
Assert.fail("failed while registering key(" + key + ")", ex);
}
}
private static void registerKeyEx(Object key) {
try {
registerKeyOnly(key);
fail("Expected an exception during register interest with no available servers.");
} catch (NoSubscriptionServersAvailableException ex) {
// expected an exception
LogWriterUtils.getLogWriter().info("Got expected exception in registerKey: ");
}
}
private static void registerALL_KEYS() {
try {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
r.registerInterest("ALL_KEYS");
} catch (Exception ex) {
Assert.fail("failed while registering ALL_KEYS", ex);
}
}
private static void put_ALL_KEYS() {
try {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
r.put(key1, "vm1-key-1");
r.put(key2, "vm1-key-2");
// Verify that no invalidates occurred to this region
assertEquals(r.getEntry(key1).getValue(), "vm1-key-1");
assertEquals(r.getEntry(key2).getValue(), "vm1-key-2");
} catch (Exception ex) {
Assert.fail("failed while put_ALL_KEY()", ex);
}
}
private static void validate_ALL_KEYS() {
try {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
assertEquals(r.getEntry(key1).getValue(), "vm1-key-1");
assertEquals(r.getEntry(key2).getValue(), "vm1-key-2");
} catch (Exception ex) {
Assert.fail("failed while validate_ALL_KEY", ex);
}
}
private static void registerKeys() {
List list = new ArrayList();
try {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
list.add("k1");
list.add("k2");
r.registerInterest(list);
} catch (Exception ex) {
Assert.fail("failed while registering keys" + list + "", ex);
}
}
private static void registerKeysAgain() {
List list = new ArrayList();
try {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
list.add("k3");
list.add("k4");
list.add("k5");
r.registerInterest(list);
} catch (Exception ex) {
Assert.fail("failed while registering keys" + list + "", ex);
}
}
private static void unregisterKeyOnly(Object key) {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
r.unregisterInterest(key);
}
private static void unregisterKey(Object key) {
try {
unregisterKeyOnly(key);
} catch (Exception ex) {
Assert.fail("failed while un-registering key(" + key + ")", ex);
}
}
/**
* unregister a key for a particular client in the server
*
* @param clientId the client's ID
* @param key the key it's no longer interest in
*/
private static void unregisterKeyForClient(DistributedMember clientId, Object key) {
try {
ClientSession cs = server.getClientSession(clientId);
if (cs.isPrimary()) {
cs.unregisterInterest(SEPARATOR + REGION_NAME, key, false);
}
} catch (Exception ex) {
Assert.fail("failed while un-registering key(" + key + ") for client " + clientId, ex);
}
}
private static void unregisterKeyEx(Object key) {
unregisterKeyOnly(key);
}
private static void validateRegionEntriesFromInterestListInVm1() {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
Region.Entry k1, k2;
k1 = r.getEntry("k1");
k2 = r.getEntry("k2");
assertNotNull(k1);
assertNotNull(k2);
assertEquals(k1.getValue(), "server1");
assertEquals(k2.getValue(), "server2");
}
private static void validateRegionEntriesFromInterestListInVm2() {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
assertEquals(r.getEntry("k3").getValue(), "server3");
assertEquals(r.getEntry("k4").getValue(), "server4");
assertEquals(r.getEntry("k5").getValue(), "server5");
}
private static void putSingleEntry(Object key, String value) {
try {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
r.put(key, value);
// Verify that no invalidates occurred to this region
assertEquals(r.getEntry(key).getValue(), value);
} catch (Exception ex) {
Assert.fail("failed while r.put()", ex);
}
}
private static void put(String vm) {
try {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
if (vm.equals("vm1")) {
r.put(key1, "vm1-key-1");
r.put(key2, "vm1-key-2");
// Verify that no invalidates occurred to this region
assertEquals("vm1-key-1", r.getEntry(key1).getValue());
assertEquals("vm1-key-2", r.getEntry(key2).getValue());
} else {
r.put(key1, "vm2-key-1");
r.put(key2, "vm2-key-2");
// Verify that no invalidates occurred to this region
assertEquals("vm2-key-1", r.getEntry(key1).getValue());
assertEquals("vm2-key-2", r.getEntry(key2).getValue());
}
} catch (Exception ex) {
Assert.fail("failed while r.put()", ex);
}
}
private static void multiple_put() {
try {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
r.put("k1", "server1");
r.put("k2", "server2");
r.put("k3", "server3");
r.put("k4", "server4");
r.put("k5", "server5");
} catch (Exception ex) {
Assert.fail("failed while r.put()", ex);
}
}
private static void putAgain(String vm) {
try {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
if (vm.equals("vm1")) {
r.put(key1, "vm1-key-1-again");
r.put(key2, "vm1-key-2-again");
// Verify that no invalidates occurred to this region
assertEquals(r.getEntry(key1).getValue(), "vm1-key-1-again");
assertEquals(r.getEntry(key2).getValue(), "vm1-key-2-again");
} else {
r.put(key1, "vm2-key-1-again");
r.put(key2, "vm2-key-2-again");
// Verify that no invalidates occurred to this region
assertEquals(r.getEntry(key1).getValue(), "vm2-key-1-again");
assertEquals(r.getEntry(key2).getValue(), "vm2-key-2-again");
}
} catch (Exception ex) {
Assert.fail("failed while r.putAgain()", ex);
}
}
private static void destroyKeys(List keys) {
Region r = cache.getRegion(REGION_NAME);
for (Object key : keys) {
r.destroy(key);
}
}
private static void verifyNoCacheLoaderLoads() throws Exception {
Region region = cache.getRegion(REGION_NAME);
ReturnKeyCacheLoader cacheLoader =
(ReturnKeyCacheLoader) region.getAttributes().getCacheLoader();
assertEquals(0/* expected */, cacheLoader.getLoads()/* actual */);
}
private static void validateEntriesK1andK2(final String vm) {
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
try {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
if (vm.equals("vm1")) {
// Verify that 'key-1' was updated, but 'key-2' was not and contains the
// original value
assertEquals("vm2-key-1", r.getEntry(key1).getValue());
assertEquals("vm1-key-2", r.getEntry(key2).getValue());
} else {
// Verify that 'key-2' was updated, but 'key-1' was not and contains the
// original value
assertEquals(key1_originalValue, r.getEntry(key1).getValue());
assertEquals("vm1-key-2", r.getEntry(key2).getValue());
}
return true;
} catch (AssertionError ex) {
return false;
}
}
@Override
public String description() {
return "waiting for client to apply events from server";
}
};
GeodeAwaitility.await().untilAsserted(ev);
}
private static void validateSingleEntry(Object key, String value) {
try {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertEquals(value, r.getEntry(key).getValue());
} catch (Exception ex) {
Assert.fail("failed while validateSingleEntry()", ex);
}
}
private static void validateEntriesAgain(String vm) {
try {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
if (vm.equals("vm1")) {
// Verify that neither 'key-1' 'key-2' was updated and contain the
// original value
assertEquals(r.getEntry(key1).getValue(), "vm1-key-1-again");
assertEquals(r.getEntry(key2).getValue(), "vm1-key-2-again");
} else {
// Verify that no updates occurred to this region
assertEquals(r.getEntry(key1).getValue(), "vm2-key-1");
assertEquals(r.getEntry(key2).getValue(), "vm2-key-2");
}
} catch (Exception ex) {
Assert.fail("failed while r.put()", ex);
}
}
private static void closeCache() {
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
}
}
private static class InterestListener implements InterestRegistrationListener {
private int registrationCount;
private int unregistrationCount;
@Override
public void afterRegisterInterest(InterestRegistrationEvent event) {
LogWriterUtils.getLogWriter()
.info("InterestListener.afterRegisterInterest invoked with this event: " + event);
registrationCount++;
}
@Override
public void afterUnregisterInterest(InterestRegistrationEvent event) {
LogWriterUtils.getLogWriter()
.info("InterestListener.afterUnregisterInterest invoked with this event: " + event);
unregistrationCount++;
}
/**
* invoke this method after a predetermined number of operations have been performed
*
* @param expectedRegistrations expected count of interest registrations
* @param expectedUnregistrations expected count of unregistrations
*/
public void verifyCountsAndClear(int expectedRegistrations, int expectedUnregistrations) {
assertEquals(expectedRegistrations, registrationCount);
assertEquals(expectedUnregistrations, unregistrationCount);
registrationCount = 0;
unregistrationCount = 0;
}
@Override
public void close() {}
public InterestListener() {}
}
private static class EventCountingCacheListener extends CacheListenerAdapter {
private AtomicInteger creates = new AtomicInteger();
private AtomicInteger updates = new AtomicInteger();
private AtomicInteger invalidates = new AtomicInteger();
private int expectedCreates;
public EventCountingCacheListener(int expectedCreates) {
this.expectedCreates = expectedCreates;
}
public int getExpectedCreates() {
return this.expectedCreates;
}
@Override
public void afterCreate(EntryEvent event) {
incrementCreates();
}
@Override
public void afterUpdate(EntryEvent event) {
incrementUpdates();
event.getRegion().getCache().getLogger()
.warning("Received update event " + getUpdates() + " for " + event.getKey());
}
@Override
public void afterInvalidate(EntryEvent event) {
incrementInvalidates();
event.getRegion().getCache().getLogger()
.warning("Received invalidate event " + getInvalidates() + " for " + event.getKey());
}
private void incrementCreates() {
this.creates.incrementAndGet();
}
private int getCreates() {
return this.creates.get();
}
private void incrementUpdates() {
this.updates.incrementAndGet();
}
private int getUpdates() {
return this.updates.get();
}
private void incrementInvalidates() {
this.invalidates.incrementAndGet();
}
private int getInvalidates() {
return this.invalidates.get();
}
public boolean hasReceivedAllCreateEvents() {
return this.expectedCreates == getCreates();
}
}
private static class ReturnKeyCacheLoader implements CacheLoader {
private AtomicInteger loads = new AtomicInteger();
@Override
public void close() {
// Do nothing
}
@Override
public Object load(LoaderHelper helper) throws CacheLoaderException {
incrementLoads();
return helper.getKey();
}
private void incrementLoads() {
this.loads.incrementAndGet();
}
private int getLoads() {
return this.loads.get();
}
}
}