blob: 9680f06f75812afe51ffdd94ed51543127d492bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PoolFactoryImpl;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* We have 2 servers and One client which registers some keys with durable interest and some without
* it. We maintain queues on only One server as redundancy level is one. Following 2 tests have the
* two TestCase scenarios
*
* There are two Tests First Test does the follows : // Step 1: Starting the servers // Step 2:
* Bring Up the Client // Step 3: Client registers Interests // Step 4: Update Values on the Server
* for Keys // Step 5: Verify Updates on the Client // Step 6: Close Cache of the DurableClient //
* Step 7: Update the Values // Step 8: Re-start the Client // Step 9: Verify Updates on the Client
* // Step 10 : Stop all VMs
*
* For Test 2 the steps are as follows // Step 1: Starting the servers // Step 2: Bring Up the
* Client // Step 3: Client registers Interests // Step 4: Update Values on the Server for Keys //
* Step 5: Verify Updates on the Client // Step 6: Close Cache of the DurableClient // Step 7:
* Update the Values // Step 8: Re-start the Client // Step 9: Send Client Ready Message // Step 10:
* Register all Keys (K1, K2 as Non-Durable. K3, K4 as Durable) // Step 11: Unregister Some Keys
* (Here K1, K3) // Step 12: Modify values on the server for all the Keys // Step 13: Check the
* values for the ones not unregistered and the Unregistered Keys' Values should be null
*/
@Category({ClientSubscriptionTest.class})
public class DurableRegistrationDUnitTest extends JUnit4DistributedTestCase {
private VM server1VM;
private VM server2VM;
private VM durableClientVM;
private String regionName;
private int PORT1;
private int PORT2;
private static final String K1 = "KEY_STONE1";
private static final String K2 = "KEY_STONE2";
private static final String K3 = "KEY_STONE3";
private static final String K4 = "KEY_STONE4";
public DurableRegistrationDUnitTest() {
super();
}
@Override
public final void postSetUp() throws Exception {
Host host = Host.getHost(0);
this.server1VM = host.getVM(0);
this.server2VM = host.getVM(1);
this.durableClientVM = host.getVM(2);
regionName = DurableRegistrationDUnitTest.class.getName() + "_region";
CacheServerTestUtil.disableShufflingOfEndpoints();
}
@Test
public void testSimpleDurableClient() {
// Step 1: Starting the servers
PORT1 = ((Integer) this.server1VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true))))
.intValue();
PORT2 = ((Integer) this.server2VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true))))
.intValue();
// Step 2: Bring Up the Client
// Start a durable client that is not kept alive on the server when it
// stops normally
final String durableClientId = getName() + "_client";
final int durableClientTimeout = 600; // keep the client alive for 600
// seconds
this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2, true,
0),
regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
Boolean.TRUE));
// Send clientReady message
this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@Override
public void run2() throws CacheException {
CacheServerTestUtil.getCache().readyForEvents();
}
});
// Step 3: Client registers Interests
// KEY_STONE1, KEY_STONE2 are registered as durableKeys & KEY_STONE3,
// KEY_STONE4 as non-durableKeys
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, new Boolean(false)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, new Boolean(false)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, new Boolean(true)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, new Boolean(true)));
// Step 4: Update Values on the Server for KEY_STONE1, KEY_STONE2,
// KEY_STONE3, KEY_STONE4
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, "Value1"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, "Value2"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, "Value3"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, "Value4"));
Wait.pause(1000);
// Step 5: Verify Updates on the Client
assertEquals("Value1", this.server2VM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
assertEquals("Value1", this.server1VM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
assertEquals("Value1",
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
assertEquals("Value2",
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));
assertEquals("Value3",
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K3)));
assertEquals("Value4",
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K4)));
// Step 6: Close Cache of the DurableClient
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.closeCache());
// pause(5000);
// Step 7: Update KEY_STONE1,KEY_STONE2,KEY_STONE3,KEY_STONE4 on the
// Server say with values PingPong1, PingPong2, PingPong3, PingPong4
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, "PingPong1"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, "PingPong2"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, "PingPong3"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, "PingPong4"));
// Step 8: Re-start the Client
this.durableClientVM
.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2,
true, 0),
regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
// Send clientReady message
this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@Override
public void run2() throws CacheException {
CacheServerTestUtil.getCache().readyForEvents();
}
});
Wait.pause(5000);
assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, new Boolean(false)));
Wait.pause(5000);
assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, "PingPong_updated_1"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, "PingPong_updated_2"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, "PingPong_updated_3"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, "PingPong_updated_4"));
Wait.pause(5000);
// Step 9: Verify Updates on the Client
assertEquals("PingPong_updated_1",
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));
assertEquals("PingPong_updated_3",
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K3)));
assertEquals("PingPong_updated_4",
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K4)));
// Step 10 : Stop all VMs
// Stop the durable client
this.durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
// Stop server 2
this.server2VM.invoke(() -> CacheServerTestUtil.closeCache());
// Stop server 1
this.server1VM.invoke(() -> CacheServerTestUtil.closeCache());
}
@Test
public void testSimpleDurableClientWithRegistration() {
// Step 1: Starting the servers
PORT1 = ((Integer) this.server1VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true))))
.intValue();
PORT2 = ((Integer) this.server2VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true))))
.intValue();
// Step 2: Bring Up the Client
// Start a durable client that is not kept alive on the server when it
// stops normally
final String durableClientId = getName() + "_client";
// keep the client alive for 600 seconds
final int durableClientTimeout = 600;
this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2, true,
0),
regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout)));
// Send clientReady message
this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@Override
public void run2() throws CacheException {
CacheServerTestUtil.getCache().readyForEvents();
}
});
// Step 3: Client registers Interests
// KEY_STONE1, KEY_STONE2 are registered as durableKeys & KEY_STONE3,
// KEY_STONE4 as non-durableKeys
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, new Boolean(false)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, new Boolean(false)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, new Boolean(true)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, new Boolean(true)));
// Step 4: Update Values on the Server for KEY_STONE1, KEY_STONE2,
// KEY_STONE3, KEY_STONE4
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, "Value1"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, "Value2"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, "Value3"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, "Value4"));
Wait.pause(1000);
// Step 5: Verify Updates on the Client
assertEquals("Value1", this.server2VM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
assertEquals("Value1", this.server1VM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
assertEquals("Value1",
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
assertEquals("Value2",
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));
assertEquals("Value3",
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K3)));
assertEquals("Value4",
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K4)));
// Step 6: Close Cache of the DurableClient
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.closeCache());
// pause(5000);
// Step 7: Update KEY_STONE1,KEY_STONE2,KEY_STONE3,KEY_STONE4 on the
// Server say with values PingPong1, PingPong2, PingPong3, PingPong4
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, "PingPong1"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, "PingPong2"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, "PingPong3"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, "PingPong4"));
// Step 8: Re-start the Client
this.durableClientVM
.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2,
true, 0),
regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
// Step 9: Send clientReady message
this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@Override
public void run2() throws CacheException {
CacheServerTestUtil.getCache().readyForEvents();
}
});
// pause(1000);
// Step 10: Register all Keys
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, new Boolean(true)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, new Boolean(true)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, new Boolean(false)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, new Boolean(false)));
// Step 11: Unregister Some Keys (Here K1, K3)
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.unregisterKey(K1));
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.unregisterKey(K3));
Wait.pause(5000);
// Step 12: Modify values on the server for all the Keys
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, "PingPong_updated_1"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, "PingPong_updated_2"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, "PingPong_updated_3"));
this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, "PingPong_updated_4"));
Wait.pause(5000);
// Step 13: Check the values for the ones not unregistered and the
// Unregistered Keys' Values should be null
try {
assertEquals("PingPong_updated_2",
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));
} catch (Exception e) {
fail("Prob in KEY_STONE2: "
+ this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));
}
try {
assertEquals("PingPong_updated_4",
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K4)));
} catch (Exception e) {
fail("Prob in KEY_STONE4: "
+ this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K4)));
}
try {
assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
} catch (Exception e) {
fail("Prob in KEY_STONE1: "
+ this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
}
try {
assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K3)));
} catch (Exception e) {
fail("Prob in KEY_STONE3: "
+ this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K3)));
}
// Stop the durable client
this.durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
// Stop server 2
this.server2VM.invoke(() -> CacheServerTestUtil.closeCache());
// Stop server 1
this.server1VM.invoke(() -> CacheServerTestUtil.closeCache());
}
@Test
public void testDurableClientWithRegistrationHA() {
// Step 1: Start server1
PORT2 = getRandomAvailableTCPPort();
PORT1 = ((Integer) this.server1VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true))))
.intValue();
// Step 2: Bring Up the Client
final String durableClientId = getName() + "_client";
// keep the client alive for 600 seconds
final int durableClientTimeout = 600;
this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2, true,
1),
regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout)));
// Send clientReady message
this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@Override
public void run2() throws CacheException {
CacheServerTestUtil.getCache().readyForEvents();
}
});
// Step 3: Client registers Interests
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, new Boolean(false)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, new Boolean(false)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, new Boolean(true)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, new Boolean(true)));
// Step 4: Bring up the server2
this.server2VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true), PORT2));
Wait.pause(3000);
// Check server2 got all the interests registered by the durable client.
server2VM.invoke(new CacheSerializableRunnable("Verify Interests.") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter()
.info("### Verifying interests registered by DurableClient. ###");
CacheClientNotifier ccn = CacheClientNotifier.getInstance();
CacheClientProxy p = null;
// Get proxy for the client.
for (int i = 0; i < 60; i++) {
Iterator ps = ccn.getClientProxies().iterator();
if (!ps.hasNext()) {
Wait.pause(1000);
continue;
} else {
p = (CacheClientProxy) ps.next();
break;
}
}
if (p == null) {
fail("Proxy initialization taking long time. Increase the wait time.");
}
Iterator rs = p.getInterestRegisteredRegions().iterator();
String rName = (String) rs.next();
assertNotNull("Null region Name found.", rs);
LocalRegion r = (LocalRegion) GemFireCacheImpl.getInstance().getRegion(rName);
assertNotNull("Null region found.", r);
FilterProfile pf = r.getFilterProfile();
Set intrests = Collections.EMPTY_SET;
Set interestKeys = pf.getKeysOfInterest(p.getProxyID().getDurableId());
assertNotNull("durable Interests not found for the proxy", interestKeys);
assertEquals("The number of durable keys registered during HARegion GII doesn't match.",
interestKeys.size(), 2);
interestKeys = pf.getKeysOfInterest(p.getProxyID());
assertNotNull("non-durable Interests not found for the proxy", interestKeys);
assertEquals("The number of non-durable keys registered during HARegion GII doesn't match.",
interestKeys.size(), 2);
}
});
// Stop the durable client
this.durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
// Stop server 2
this.server2VM.invoke(() -> CacheServerTestUtil.closeCache());
// Stop server 1
this.server1VM.invoke(() -> CacheServerTestUtil.closeCache());
}
@Test
public void testDurableClientDisConnectWithRegistrationHA() {
// Step 1: Start server1
PORT2 = getRandomAvailableTCPPort();
PORT1 = ((Integer) this.server1VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true))))
.intValue();
// Step 2: Bring Up the Client
final String durableClientId = getName() + "_client";
// keep the client alive for 600 seconds
final int durableClientTimeout = 600;
this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2, true,
1),
regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout)));
// Send clientReady message
this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@Override
public void run2() throws CacheException {
CacheServerTestUtil.getCache().readyForEvents();
}
});
// Step 3: Client registers Interests
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, new Boolean(false)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, new Boolean(false)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, new Boolean(true)));
this.durableClientVM
.invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, new Boolean(true)));
// Close Cache of the DurableClient
this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.closeCache());
Wait.pause(2000);
// Re-start the Client
this.durableClientVM
.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2,
true, 1),
regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
// Send clientReady message
this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@Override
public void run2() throws CacheException {
CacheServerTestUtil.getCache().readyForEvents();
}
});
// Step 4: Bring up the server2
this.server2VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true), PORT2));
Wait.pause(3000);
// Check server2 got all the interests registered by the durable client.
server2VM.invoke(new CacheSerializableRunnable("Verify Interests.") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter()
.info("### Verifying interests registered by DurableClient. ###");
CacheClientNotifier ccn = CacheClientNotifier.getInstance();
CacheClientProxy p = null;
// Get proxy for the client.
for (int i = 0; i < 60; i++) {
Iterator ps = ccn.getClientProxies().iterator();
if (!ps.hasNext()) {
Wait.pause(1000);
continue;
} else {
p = (CacheClientProxy) ps.next();
break;
}
}
if (p == null) {
fail("Proxy initialization taking long time. Increase the wait time.");
}
Iterator rs = p.getInterestRegisteredRegions().iterator();
String rName = (String) rs.next();
assertNotNull("Null region Name found.", rs);
LocalRegion r = (LocalRegion) GemFireCacheImpl.getInstance().getRegion(rName);
assertNotNull("Null region found.", r);
FilterProfile pf = r.getFilterProfile();
Set intrests = Collections.EMPTY_SET;
Set interestKeys = pf.getKeysOfInterest(p.getProxyID().getDurableId());
assertNotNull("durable Interests not found for the proxy", interestKeys);
assertEquals("The number of durable keys registered during HARegion GII doesn't match.",
interestKeys.size(), 2);
interestKeys = pf.getKeysOfInterest(p.getProxyID());
assertNull("non-durable Interests found for the proxy", interestKeys);
}
});
// Stop the durable client
this.durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
// Stop server 2
this.server2VM.invoke(() -> CacheServerTestUtil.closeCache());
// Stop server 1
this.server1VM.invoke(() -> CacheServerTestUtil.closeCache());
}
private static void unregisterAllKeys() {
// Get the region
Region region = CacheServerTestUtil.getCache()
.getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
// Region region =
// CacheServerTestUtil.getCache().getRegion(DurableClientSampleDUnitTest.regionName);
assertNotNull(region);
region.unregisterInterest(K1);
region.unregisterInterest(K2);
region.unregisterInterest(K3);
region.unregisterInterest(K4);
}
private static void registerKeys() throws Exception {
try {
// Get the region
Region region = CacheServerTestUtil.getCache()
.getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
// Region region =
// CacheServerTestUtil.getCache().getRegion(DurableClientSampleDUnitTest.regionName);
assertNotNull(region);
region.registerInterest(K1, InterestResultPolicy.KEYS_VALUES, false);
region.registerInterest(K2, InterestResultPolicy.KEYS_VALUES, false);
region.registerInterest(K3, InterestResultPolicy.KEYS_VALUES, true);
region.registerInterest(K4, InterestResultPolicy.KEYS_VALUES, true);
assertNotNull(region.getInterestList());
} catch (Exception ex) {
Assert.fail("failed while registering interest in registerKey function", ex);
}
}
private static String getValue(String key) {
Region r = CacheServerTestUtil.getCache()
.getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
// Region r = CacheServerTestUtil.getCache().getRegion(regionName);
assertNotNull(r);
// String value = (String)r.get(key);
// String value = (String)r.getEntry(key).getValue();
Region.Entry re = r.getEntry(key);
if (re == null) {
return null;
} else {
return (String) re.getValue();
}
}
private static void registerKey(String key, boolean isDurable) throws Exception {
try {
// Get the region
Region region = CacheServerTestUtil.getCache()
.getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
// Region region =
// CacheServerTestUtil.getCache().getRegion(regionName);
assertNotNull(region);
region.registerInterest(key, InterestResultPolicy.NONE, isDurable);
} catch (Exception ex) {
Assert.fail("failed while registering interest in registerKey function", ex);
}
}
private static void unregisterKey(String key) throws Exception {
try {
// Get the region
Region region = CacheServerTestUtil.getCache()
.getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
// Region region =
// CacheServerTestUtil.getCache().getRegion(regionName);
assertNotNull(region);
region.unregisterInterest(key);
} catch (Exception ex) {
Assert.fail("failed while registering interest in registerKey function", ex);
}
}
private static void putValue(String key, String value) {
try {
Region r = CacheServerTestUtil.getCache()
.getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
// Region r = CacheServerTestUtil.getCache().getRegion(regionName);
assertNotNull(r);
if (r.getEntry(key) != null) {
r.put(key, value);
} else {
r.create(key, value);
}
assertEquals(value, r.getEntry(key).getValue());
} catch (Exception e) {
fail("Put in Server has some fight");
}
}
private Pool getClientPool(String host, int server1Port, int server2Port,
boolean establishCallbackConnection, int redundancyLevel) {
PoolFactory pf = PoolManager.createFactory();
pf.addServer(host, server1Port).addServer(host, server2Port)
.setSubscriptionEnabled(establishCallbackConnection)
.setSubscriptionRedundancy(redundancyLevel);
return ((PoolFactoryImpl) pf).getPoolAttributes();
}
private Properties getClientDistributedSystemProperties(String durableClientId) {
return getClientDistributedSystemProperties(durableClientId,
DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
}
private static void checkNumberOfClientProxies(final int expected) {
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return expected == getNumberOfClientProxies();
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
}
protected static int getNumberOfClientProxies() {
return getBridgeServer().getAcceptor().getCacheClientNotifier().getClientProxies().size();
}
private Properties getClientDistributedSystemProperties(String durableClientId,
int durableClientTimeout) {
Properties properties = new Properties();
properties.setProperty(MCAST_PORT, "0");
properties.setProperty(LOCATORS, "");
properties.setProperty(DURABLE_CLIENT_ID, durableClientId);
properties.setProperty(DURABLE_CLIENT_TIMEOUT, String.valueOf(durableClientTimeout));
return properties;
}
private static CacheClientProxy getClientProxy() {
// Get the CacheClientNotifier
CacheClientNotifier notifier = getBridgeServer().getAcceptor().getCacheClientNotifier();
// Get the CacheClientProxy or not (if proxy set is empty)
CacheClientProxy proxy = null;
Iterator i = notifier.getClientProxies().iterator();
if (i.hasNext()) {
proxy = (CacheClientProxy) i.next();
}
return proxy;
}
private static CacheServerImpl getBridgeServer() {
CacheServerImpl bridgeServer =
(CacheServerImpl) CacheServerTestUtil.getCache().getCacheServers().iterator().next();
assertNotNull(bridgeServer);
return bridgeServer;
}
public static void closeCache() {
Cache cache = CacheServerTestUtil.getCache();
if (cache != null && !cache.isClosed()) {
cache.close(true);
cache.getDistributedSystem().disconnect();
}
}
public String getRegionName() {
return regionName;
}
public void setRegionName(String regionName) {
this.regionName = regionName;
}
@Override
public final void preTearDown() throws Exception {
CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag();
}
}