blob: 3c4b627b15f57048137cf1bc192a8636266fcc23 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertNotNull;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.logging.log4j.Logger;
import org.awaitility.Duration;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.cache.query.cq.internal.CqQueryImpl;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PoolFactoryImpl;
import org.apache.geode.internal.cache.ha.HARegionQueue;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
public class DurableClientTestBase extends JUnit4DistributedTestCase {
protected static final Logger logger = LogService.getLogger();
private static final Duration VERY_LONG_DURABLE_CLIENT_TIMEOUT = new Duration(10, MINUTES);
static final int VERY_LONG_DURABLE_TIMEOUT_SECONDS =
(int) VERY_LONG_DURABLE_CLIENT_TIMEOUT.getValueInMS() / 1000;
static final int HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER = 10;
VM server1VM;
VM server2VM;
VM durableClientVM;
VM publisherClientVM;
protected String regionName;
int server1Port;
String durableClientId;
@Override
public final void postSetUp() throws Exception {
this.server1VM = VM.getVM(0);
this.server2VM = VM.getVM(1);
this.durableClientVM = VM.getVM(2);
this.publisherClientVM = VM.getVM(3);
this.regionName = getName() + "_region";
// Clients see this when the servers disconnect
IgnoredException.addIgnoredException("Could not find any server");
System.out.println("\n\n[setup] START TEST " + getClass().getSimpleName() + "."
+ getTestMethodName() + "\n\n");
postSetUpDurableClientTestBase();
}
protected void postSetUpDurableClientTestBase() {}
@Override
public final void preTearDown() {
preTearDownDurableClientTestBase();
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
this.server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
protected void preTearDownDurableClientTestBase() {}
void startupDurableClientAndServer(final int durableClientTimeout) {
server1Port = this.server1VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
durableClientId = getName() + "_client";
startupDurableClient(durableClientTimeout, Boolean.TRUE);
verifyDurableClientPresent(durableClientTimeout, durableClientId, server1VM);
}
// This exists so child classes can override the behavior and mock out network failures
public void restartDurableClient(int durableClientTimeout, Pool clientPool,
Boolean addControlListener) {
startupDurableClient(durableClientTimeout, clientPool, addControlListener);
}
// This exists so child classes can override the behavior and mock out network failures
public void restartDurableClient(int durableClientTimeout, Boolean addControlListener) {
startupDurableClient(durableClientTimeout, addControlListener);
}
void startupDurableClient(int durableClientTimeout, Pool clientPool,
Boolean addControlListener) {
this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
clientPool,
regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
addControlListener));
this.durableClientVM.invoke(() -> {
await().atMost(1 * HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES)
.pollInterval(100, MILLISECONDS)
.until(CacheServerTestUtil::getCache, notNullValue());
});
// Send clientReady message
sendClientReady(durableClientVM);
}
private void startupDurableClient(int durableClientTimeout, Boolean addControlListener) {
startupDurableClient(durableClientTimeout,
getClientPool(NetworkUtils.getServerHostName(), server1Port, true), addControlListener);
}
void verifySimpleDurableClient() {
verifyDurableClientNotPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT,
durableClientId, durableClientVM);
}
void verifyDurableClientPresent(int durableClientTimeout, String durableClientId,
final VM serverVM) {
verifyDurableClientPresence(durableClientTimeout, durableClientId, serverVM, 1);
}
void verifyDurableClientNotPresent(int durableClientTimeout, String durableClientId,
final VM serverVM) {
verifyDurableClientPresence(durableClientTimeout, durableClientId, serverVM, 0);
}
void verifyDurableClientPresence(int durableClientTimeout, String durableClientId,
VM serverVM, final int count) {
serverVM.invoke(() -> {
checkNumberOfClientProxies(count);
if (count > 0) {
CacheClientProxy proxy = getClientProxy();
assertThat(proxy).isNotNull();
// checkProxyIsAlive(proxy);
// Verify that it is durable and its properties are correct
assertThat(proxy.isDurable()).isTrue();
assertThat(durableClientId).isEqualTo(proxy.getDurableId());
assertThat(durableClientTimeout).isEqualTo(proxy.getDurableTimeout());
}
});
}
public void closeDurableClient() {
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
public void disconnectDurableClient(boolean keepAlive) {
printClientProxyState("Before");
this.durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(keepAlive));
await()
.until(CacheServerTestUtil::getCache, nullValue());
printClientProxyState("after");
}
private void printClientProxyState(String st) {
CacheSerializableRunnable s =
new CacheSerializableRunnable("Logging CCCP and ServerConnection state") {
@Override
public void run2() throws CacheException {
// TODO Auto-generated method stub
CacheServerTestUtil.getCache().getLogger()
.info(st + " CCP states: " + getAllClientProxyState());
CacheServerTestUtil.getCache().getLogger().info(st + " CHM states: "
+ printMap(
ClientHealthMonitor.getInstance().getConnectedClients(null)));
}
};
server1VM.invoke(s);
}
private static String printMap(Map<String, Object[]> m) {
Iterator<Map.Entry<String, Object[]>> itr = m.entrySet().iterator();
StringBuffer sb = new StringBuffer();
sb.append("size = ").append(m.size()).append(" ");
while (itr.hasNext()) {
sb.append("{");
Map.Entry<String, Object[]> entry = itr.next();
sb.append(entry.getKey());
sb.append(", ");
printMapValue(entry.getValue(), sb);
sb.append("}");
}
return sb.toString();
}
private static void printMapValue(Object value, StringBuffer sb) {
if (value.getClass().isArray()) {
sb.append("{");
sb.append(Arrays.toString((Object[]) value));
sb.append("}");
} else {
sb.append(value);
}
}
static void waitForCacheClientProxyPaused() {
final CacheClientProxy proxy = getClientProxy();
assertThat(proxy).isNotNull();
await()
.until(proxy::isPaused);
assertThat(proxy.isPaused()).isTrue();
}
/*
* Due to the way removal from ha region queue is implemented a dummy cq or interest needs to be
* created and a dummy value used so that none of the actual cqs will be triggered and yet an
* event will flush the queue
*/
void flushEntries(VM server, VM client, final String regionName) {
// This wait is to make sure that all acks have been responded to...
// We can add a stat later on the cache client proxy stats that checks
// ack counts
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
registerInterest(client, regionName, false, InterestResultPolicy.NONE);
server.invoke(new CacheSerializableRunnable("flush entries") {
@Override
public void run2() throws CacheException {
Region<String, String> region = CacheServerTestUtil.getCache().getRegion(regionName);
assertNotNull(region);
region.put("LAST", "ENTRY");
}
});
}
// First we will have the client wait before trying to reconnect
// Then the drain will lock and begins to drain
// The client will then be able to continue, and get rejected
// Then we proceed to drain and release all locks
// The client will then reconnect
public class RejectClientReconnectTestHook implements CacheClientProxy.TestHook {
final CountDownLatch reconnectLatch = new CountDownLatch(1);
final CountDownLatch continueDrain = new CountDownLatch(1);
volatile boolean clientWasRejected = false;
@Override
public void doTestHook(String spot) {
try {
switch (spot) {
case "CLIENT_PRE_RECONNECT":
if (!reconnectLatch.await(60, SECONDS)) {
fail("reconnect latch was never released.");
}
break;
case "DRAIN_IN_PROGRESS_BEFORE_DRAIN_LOCK_CHECK":
// let client try to reconnect
reconnectLatch.countDown();
// we wait until the client is rejected
if (!continueDrain.await(120, SECONDS)) {
fail("Latch was never released.");
}
break;
case "CLIENT_REJECTED_DUE_TO_CQ_BEING_DRAINED":
clientWasRejected = true;
continueDrain.countDown();
break;
default:
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
boolean wasClientRejected() {
return clientWasRejected;
}
}
/*
* This hook will cause the close cq to throw an exception due to a client in the middle of
* activating sequence - server will pause before draining client will begin to reconnect and then
* wait to continue server will be unblocked, and rejected client will the be unlocked after
* server is rejected and continue
*/
public class CqExceptionDueToActivatingClientTestHook implements CacheClientProxy.TestHook {
final CountDownLatch unblockDrain = new CountDownLatch(1);
final CountDownLatch unblockClient = new CountDownLatch(1);
final CountDownLatch finish = new CountDownLatch(1);
@Override
public void doTestHook(String spot) {
if (spot.equals("PRE_DRAIN_IN_PROGRESS")) {
try {
// Unblock any client waiting to reconnect
unblockClient.countDown();
// Wait until client is reconnecting
assertThat(unblockDrain.await(120, SECONDS))
.describedAs("client never got far enough reconnected to unlatch lock.").isTrue();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
if (spot.equals("PRE_RELEASE_DRAIN_LOCK")) {
// Client is reconnecting but still holds the drain lock
// let the test continue to try to close a cq
unblockDrain.countDown();
// wait until the server has finished attempting to close the cq
try {
assertThat(finish.await(30, SECONDS))
.describedAs("Test did not complete, server never finished attempting to close cq")
.isTrue();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
if (spot.equals("DRAIN_COMPLETE")) {
finish.countDown();
}
}
}
private CqQuery createCq(String cqName, String cqQuery, boolean durable)
throws CqException, CqExistsException {
QueryService qs = CacheServerTestUtil.getCache().getQueryService();
CqAttributesFactory cqf = new CqAttributesFactory();
CqListener[] cqListeners = {new CacheServerTestUtil.ControlCqListener()};
cqf.initCqListeners(cqListeners);
CqAttributes cqa = cqf.create();
return qs.newCq(cqName, cqQuery, cqa, durable);
}
Pool getClientPool(String host, int serverPort, boolean establishCallbackConnection) {
PoolFactory pf = PoolManager.createFactory();
pf.addServer(host, serverPort).setSubscriptionEnabled(establishCallbackConnection)
.setSubscriptionAckInterval(1);
return ((PoolFactoryImpl) pf).getPoolAttributes();
}
Pool getClientPool(String host, int server1Port, int server2Port,
boolean establishCallbackConnection) {
return getClientPool(host, server1Port, server2Port, establishCallbackConnection, 1);
}
Properties getClientDistributedSystemProperties(String durableClientId) {
return getClientDistributedSystemProperties(durableClientId,
DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
}
Properties getClientDistributedSystemProperties(String durableClientId,
int durableClientTimeout) {
Properties properties = new Properties();
properties.setProperty(MCAST_PORT, "0");
properties.setProperty(LOCATORS, "");
properties.setProperty(DURABLE_CLIENT_ID, durableClientId);
properties.setProperty(DURABLE_CLIENT_TIMEOUT, String.valueOf(durableClientTimeout));
return properties;
}
static CacheClientProxy getClientProxy() {
// Get the CacheClientNotifier
CacheClientNotifier notifier = getBridgeServer().getAcceptor().getCacheClientNotifier();
// Get the CacheClientProxy or not (if proxy set is empty)
CacheClientProxy proxy = null;
Iterator<CacheClientProxy> i = notifier.getClientProxies().iterator();
if (i.hasNext()) {
proxy = i.next();
}
return proxy;
}
private static String getAllClientProxyState() {
// Get the CacheClientNotifier
CacheClientNotifier notifier = getBridgeServer().getAcceptor().getCacheClientNotifier();
// Get the CacheClientProxy or not (if proxy set is empty)
Iterator<CacheClientProxy> i = notifier.getClientProxies().iterator();
StringBuilder sb = new StringBuilder();
while (i.hasNext()) {
sb.append(" [");
sb.append(i.next().getState());
sb.append(" ]");
}
return sb.toString();
}
static void checkNumberOfClientProxies(final int expected) {
await()
.until(() -> {
return expected == getNumberOfClientProxies();
});
}
static void checkProxyIsAlive(final CacheClientProxy proxy) {
await()
.until(proxy::isAlive);
}
private static int getNumberOfClientProxies() {
return getBridgeServer().getAcceptor().getCacheClientNotifier().getClientProxies().size();
}
static CacheServerImpl getBridgeServer() {
CacheServerImpl bridgeServer =
(CacheServerImpl) CacheServerTestUtil.getCache().getCacheServers().iterator().next();
assertThat(bridgeServer).isNotNull();
return bridgeServer;
}
Pool getClientPool(String host, int server1Port, int server2Port,
boolean establishCallbackConnection, int redundancyLevel) {
PoolFactory pf = PoolManager.createFactory();
pf.addServer(host, server1Port).addServer(host, server2Port)
.setSubscriptionEnabled(establishCallbackConnection)
.setSubscriptionRedundancy(redundancyLevel).setSubscriptionAckInterval(1);
return ((PoolFactoryImpl) pf).getPoolAttributes();
}
/**
* Returns the durable client proxy's HARegionQueue region name. This method is accessed via
* reflection on a server VM.
*
* @return the durable client proxy's HARegionQueue region name
*/
static String getHARegionQueueName() {
checkNumberOfClientProxies(1);
CacheClientProxy proxy = getClientProxy();
assertThat(proxy).isNotNull();
return proxy.getHARegionName();
}
static void verifyReceivedMarkerAck() {
await().atMost(3 * HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES)
.pollInterval(200, MILLISECONDS)
.until(HARegionQueue::isTestMarkerMessageReceived);
}
static void setTestFlagToVerifyActForMarker(Boolean flag) {
HARegionQueue.setUsedByTest(flag);
}
void sendClientReady(VM vm) {
// Send clientReady message
vm.invoke(new CacheSerializableRunnable("Send clientReady") {
@Override
public void run2() throws CacheException {
CacheServerTestUtil.getClientCache().readyForEvents();
}
});
}
protected void registerInterest(VM vm, final String regionName, final boolean durable,
final InterestResultPolicy interestResultPolicy) {
vm.invoke(new CacheSerializableRunnable("Register interest on region : " + regionName) {
@Override
public void run2() throws CacheException {
Region<Object, Object> region = CacheServerTestUtil.getCache().getRegion(regionName);
assertThat(region).isNotNull();
// Register interest in all keys
region.registerInterestRegex(".*", interestResultPolicy, durable);
}
});
// This seems to be necessary for the queue to start up. Ideally should be replaced with
// Awaitility if possible.
try {
java.lang.Thread.sleep(5000);
} catch (java.lang.InterruptedException ex) {
fail("interrupted");
}
}
void createCq(VM vm, final String cqName, final String cqQuery, final boolean durable) {
vm.invoke(new CacheSerializableRunnable("Register cq " + cqName) {
@Override
public void run2() throws CacheException {
try {
createCq(cqName, cqQuery, durable).execute();
} catch (CqExistsException | CqException | RegionNotFoundException e) {
throw new CacheException(e) {};
}
}
});
}
// Publishes strings
void publishEntries(int startingValue, final int count) {
this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish entries") {
@Override
public void run2() throws CacheException {
Region<String, String> region = CacheServerTestUtil.getCache().getRegion(
regionName);
assertThat(region).isNotNull();
// Publish some entries
for (int i = startingValue; i < startingValue + count; i++) {
String keyAndValue = String.valueOf(i);
region.put(keyAndValue, keyAndValue);
}
assertThat(region.get(String.valueOf(startingValue))).isNotNull();
}
});
}
// Publishes portfolios
void publishEntries(final String regionName, final int numEntries) {
publisherClientVM.invoke(new CacheSerializableRunnable("publish " + numEntries + " entries") {
@Override
public void run2() throws CacheException {
// Get the region
Region<Object, Object> region = CacheServerTestUtil.getCache().getRegion(regionName);
assertThat(region).isNotNull();
// Publish some entries
for (int i = 0; i < numEntries; i++) {
String keyAndValue = String.valueOf(i);
region.put(keyAndValue, new Portfolio(i));
}
assertThat(region.get(String.valueOf(0))).isNotNull();
}
});
}
public void verifyListenerUpdatesDisconnected(int numberOfEntries) {
// ARB: do nothing.
}
void checkCqStatOnServer(VM server, final String durableClientId, final String cqName,
final int expectedNumber) {
server.invoke(new CacheSerializableRunnable(
"Check ha queued cq stats for durable client " + durableClientId + " cq: " + cqName) {
@Override
public void run2() throws CacheException {
final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance();
final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId);
ClientProxyMembershipID proxyId = clientProxy.getProxyID();
CqService cqService = ((InternalCache) CacheServerTestUtil.getCache()).getCqService();
cqService.start();
final CqQueryImpl cqQuery = (CqQueryImpl) cqService.getClientCqFromServer(proxyId, cqName);
// Wait until we get the expected number of events or until 10 seconds are up
await()
.until(() -> cqQuery.getVsdStats().getNumHAQueuedEvents() == expectedNumber);
assertThat(expectedNumber).isEqualTo(cqQuery.getVsdStats().getNumHAQueuedEvents());
}
});
}
/*
* Remaining is the number of events that could still be in the queue due to timing issues with
* acks and receiving them after remove from ha queue region has been called.
*/
void checkHAQueueSize(VM server, final String durableClientId, final int expectedNumber,
final int remaining) {
server.invoke(new CacheSerializableRunnable(
"Check ha queued size for durable client " + durableClientId) {
@Override
public void run2() throws CacheException {
final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance();
final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId);
// Wait until we get the expected number of events or until 10 seconds are up
await()
.until(() -> clientProxy.getQueueSizeStat() == expectedNumber
|| clientProxy.getQueueSizeStat() == remaining);
assertThat(clientProxy.getQueueSizeStat() == expectedNumber
|| clientProxy.getQueueSizeStat() == remaining).isTrue();
}
});
}
void checkNumDurableCqs(VM server, final String durableClientId,
final int expectedNumber) {
server.invoke(new CacheSerializableRunnable(
"check number of durable cqs on server for durable client: " + durableClientId) {
@Override
public void run2() throws CacheException {
try {
final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance();
final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId);
ClientProxyMembershipID proxyId = clientProxy.getProxyID();
CqService cqService = ((InternalCache) CacheServerTestUtil.getCache()).getCqService();
cqService.start();
List<String> cqNames = cqService.getAllDurableClientCqs(proxyId);
assertThat(expectedNumber).isEqualTo(cqNames.size());
} catch (Exception e) {
throw new CacheException(e) {};
}
}
});
}
/*
* @param numEventsToWaitFor most times will be the same as numEvents, but there are times where
* we want to wait for an event we know is not coming just to be sure an event actually isn't
* received
*
*/
void checkCqListenerEvents(VM vm, final String cqName, final int numEvents,
final int secondsToWait) {
vm.invoke(() -> {
QueryService qs = CacheServerTestUtil.getCache().getQueryService();
CqQuery cq = qs.getCq(cqName);
// Get the listener and wait for the appropriate number of events
CacheServerTestUtil.ControlCqListener listener =
(CacheServerTestUtil.ControlCqListener) cq.getCqAttributes().getCqListener();
listener.waitWhileNotEnoughEvents(secondsToWait * 1000, numEvents);
assertThat(numEvents).isEqualTo(listener.events.size());
});
}
void checkListenerEvents(int numberOfEntries, final int sleepMinutes, final int eventType,
final VM vm) {
vm.invoke(() -> {
// Get the region
Region<Object, Object> region = CacheServerTestUtil.getCache().getRegion(regionName);
assertThat(region).isNotNull();
// Get the listener and wait for the appropriate number of events
CacheServerTestUtil.ControlListener controlListener =
(CacheServerTestUtil.ControlListener) region.getAttributes().getCacheListeners()[0];
controlListener.waitWhileNotEnoughEvents(sleepMinutes * 60 * 1000, numberOfEntries,
controlListener.getEvents(eventType));
});
}
void startDurableClient(VM vm, String durableClientId, int serverPort1,
String regionName, int durableTimeoutInSeconds) {
vm.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(), serverPort1, true),
regionName, getClientDistributedSystemProperties(durableClientId, durableTimeoutInSeconds),
Boolean.TRUE));
}
void startDurableClient(VM vm, String durableClientId, int serverPort1,
String regionName) {
vm.invoke(() -> {
CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(), serverPort1, true),
regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE);
assertThat(CacheServerTestUtil.getClientCache()).isNotNull();
});
}
void startDurableClient(VM vm, String durableClientId, int serverPort1, int serverPort2,
String regionName) {
vm.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(), serverPort1, serverPort2, true),
regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
}
void startClient(VM vm, int serverPort1, String regionName) {
vm.invoke(() -> {
CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(), serverPort1, false),
regionName);
assertThat(CacheServerTestUtil.getClientCache()).isNotNull();
});
}
void checkPrimaryUpdater(VM vm) {
vm.invoke(new CacheSerializableRunnable("Verify durable client") {
@Override
public void run2() throws CacheException {
await()
.until(() -> CacheServerTestUtil.getPool().isPrimaryUpdaterAlive());
assertThat(CacheServerTestUtil.getPool().isPrimaryUpdaterAlive()).isTrue();
}
});
}
protected void closeCache(VM vm) {
vm.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
}