| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.tier.sockets; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static java.util.concurrent.TimeUnit.MINUTES; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID; |
| import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.assertj.core.api.Assertions.fail; |
| import static org.hamcrest.Matchers.notNullValue; |
| import static org.hamcrest.Matchers.nullValue; |
| import static org.junit.Assert.assertNotNull; |
| |
| import java.util.Arrays; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.concurrent.CountDownLatch; |
| |
| import org.apache.logging.log4j.Logger; |
| import org.awaitility.Duration; |
| |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.InterestResultPolicy; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.client.Pool; |
| import org.apache.geode.cache.client.PoolFactory; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.query.CqAttributes; |
| import org.apache.geode.cache.query.CqAttributesFactory; |
| import org.apache.geode.cache.query.CqException; |
| import org.apache.geode.cache.query.CqExistsException; |
| import org.apache.geode.cache.query.CqListener; |
| import org.apache.geode.cache.query.CqQuery; |
| import org.apache.geode.cache.query.QueryService; |
| import org.apache.geode.cache.query.RegionNotFoundException; |
| import org.apache.geode.cache.query.cq.internal.CqQueryImpl; |
| import org.apache.geode.cache.query.data.Portfolio; |
| import org.apache.geode.cache.query.internal.cq.CqService; |
| import org.apache.geode.cache30.CacheSerializableRunnable; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.internal.cache.CacheServerImpl; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.PoolFactoryImpl; |
| import org.apache.geode.internal.cache.ha.HARegionQueue; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.test.dunit.IgnoredException; |
| import org.apache.geode.test.dunit.NetworkUtils; |
| import org.apache.geode.test.dunit.SerializableRunnableIF; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; |
| |
| public class DurableClientTestBase extends JUnit4DistributedTestCase { |
| |
| protected static final Logger logger = LogService.getLogger(); |
| private static final Duration VERY_LONG_DURABLE_CLIENT_TIMEOUT = new Duration(10, MINUTES); |
| static final int VERY_LONG_DURABLE_TIMEOUT_SECONDS = |
| (int) VERY_LONG_DURABLE_CLIENT_TIMEOUT.getValueInMS() / 1000; |
| static final int HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER = 10; |
| |
| VM server1VM; |
| VM server2VM; |
| VM durableClientVM; |
| VM publisherClientVM; |
| protected String regionName; |
| int server1Port; |
| String durableClientId; |
| |
| |
| @Override |
| public final void postSetUp() throws Exception { |
| this.server1VM = VM.getVM(0); |
| this.server2VM = VM.getVM(1); |
| this.durableClientVM = VM.getVM(2); |
| this.publisherClientVM = VM.getVM(3); |
| this.regionName = getName() + "_region"; |
| // Clients see this when the servers disconnect |
| IgnoredException.addIgnoredException("Could not find any server"); |
| System.out.println("\n\n[setup] START TEST " + getClass().getSimpleName() + "." |
| + getTestMethodName() + "\n\n"); |
| postSetUpDurableClientTestBase(); |
| } |
| |
| protected void postSetUpDurableClientTestBase() {} |
| |
| @Override |
| public final void preTearDown() { |
| preTearDownDurableClientTestBase(); |
| |
| this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| this.server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| } |
| |
| protected void preTearDownDurableClientTestBase() {} |
| |
| |
| void startupDurableClientAndServer(final int durableClientTimeout) { |
| |
| server1Port = this.server1VM |
| .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE)); |
| |
| durableClientId = getName() + "_client"; |
| startupDurableClient(durableClientTimeout, Boolean.TRUE); |
| verifyDurableClientPresent(durableClientTimeout, durableClientId, server1VM); |
| |
| } |
| |
| // This exists so child classes can override the behavior and mock out network failures |
| public void restartDurableClient(int durableClientTimeout, Pool clientPool, |
| Boolean addControlListener) { |
| startupDurableClient(durableClientTimeout, clientPool, addControlListener); |
| } |
| |
| // This exists so child classes can override the behavior and mock out network failures |
| public void restartDurableClient(int durableClientTimeout, Boolean addControlListener) { |
| startupDurableClient(durableClientTimeout, addControlListener); |
| } |
| |
| |
| void startupDurableClient(int durableClientTimeout, Pool clientPool, |
| Boolean addControlListener) { |
| this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient( |
| clientPool, |
| regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout), |
| addControlListener)); |
| |
| this.durableClientVM.invoke(() -> { |
| await().atMost(1 * HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES) |
| .pollInterval(100, MILLISECONDS) |
| .until(CacheServerTestUtil::getCache, notNullValue()); |
| }); |
| |
| // Send clientReady message |
| sendClientReady(durableClientVM); |
| } |
| |
| private void startupDurableClient(int durableClientTimeout, Boolean addControlListener) { |
| startupDurableClient(durableClientTimeout, |
| getClientPool(NetworkUtils.getServerHostName(), server1Port, true), addControlListener); |
| } |
| |
| void verifySimpleDurableClient() { |
| verifyDurableClientNotPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, |
| durableClientId, durableClientVM); |
| } |
| |
| void verifyDurableClientPresent(int durableClientTimeout, String durableClientId, |
| final VM serverVM) { |
| verifyDurableClientPresence(durableClientTimeout, durableClientId, serverVM, 1); |
| } |
| |
| void verifyDurableClientNotPresent(int durableClientTimeout, String durableClientId, |
| final VM serverVM) { |
| verifyDurableClientPresence(durableClientTimeout, durableClientId, serverVM, 0); |
| } |
| |
| void verifyDurableClientPresence(int durableClientTimeout, String durableClientId, |
| VM serverVM, final int count) { |
| serverVM.invoke(() -> { |
| checkNumberOfClientProxies(count); |
| |
| if (count > 0) { |
| CacheClientProxy proxy = getClientProxy(); |
| |
| assertThat(proxy).isNotNull(); |
| // checkProxyIsAlive(proxy); |
| |
| // Verify that it is durable and its properties are correct |
| assertThat(proxy.isDurable()).isTrue(); |
| assertThat(durableClientId).isEqualTo(proxy.getDurableId()); |
| assertThat(durableClientTimeout).isEqualTo(proxy.getDurableTimeout()); |
| } |
| }); |
| } |
| |
| public void closeDurableClient() { |
| this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| } |
| |
| public void disconnectDurableClient(boolean keepAlive) { |
| printClientProxyState("Before"); |
| this.durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(keepAlive)); |
| await() |
| .until(CacheServerTestUtil::getCache, nullValue()); |
| printClientProxyState("after"); |
| } |
| |
| private void printClientProxyState(String st) { |
| CacheSerializableRunnable s = |
| new CacheSerializableRunnable("Logging CCCP and ServerConnection state") { |
| @Override |
| public void run2() throws CacheException { |
| // TODO Auto-generated method stub |
| CacheServerTestUtil.getCache().getLogger() |
| .info(st + " CCP states: " + getAllClientProxyState()); |
| CacheServerTestUtil.getCache().getLogger().info(st + " CHM states: " |
| + printMap( |
| ClientHealthMonitor.getInstance().getConnectedClients(null))); |
| } |
| }; |
| server1VM.invoke(s); |
| } |
| |
| private static String printMap(Map<String, Object[]> m) { |
| Iterator<Map.Entry<String, Object[]>> itr = m.entrySet().iterator(); |
| StringBuffer sb = new StringBuffer(); |
| sb.append("size = ").append(m.size()).append(" "); |
| while (itr.hasNext()) { |
| sb.append("{"); |
| Map.Entry<String, Object[]> entry = itr.next(); |
| sb.append(entry.getKey()); |
| sb.append(", "); |
| printMapValue(entry.getValue(), sb); |
| sb.append("}"); |
| } |
| return sb.toString(); |
| } |
| |
| private static void printMapValue(Object value, StringBuffer sb) { |
| if (value.getClass().isArray()) { |
| |
| sb.append("{"); |
| sb.append(Arrays.toString((Object[]) value)); |
| sb.append("}"); |
| } else { |
| sb.append(value); |
| } |
| } |
| |
| static void waitForCacheClientProxyPaused() { |
| final CacheClientProxy proxy = getClientProxy(); |
| assertThat(proxy).isNotNull(); |
| |
| await() |
| .until(proxy::isPaused); |
| |
| assertThat(proxy.isPaused()).isTrue(); |
| } |
| |
| /* |
| * Due to the way removal from ha region queue is implemented a dummy cq or interest needs to be |
| * created and a dummy value used so that none of the actual cqs will be triggered and yet an |
| * event will flush the queue |
| */ |
| void flushEntries(VM server, VM client, final String regionName) { |
| // This wait is to make sure that all acks have been responded to... |
| // We can add a stat later on the cache client proxy stats that checks |
| // ack counts |
| try { |
| Thread.sleep(2000); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| |
| registerInterest(client, regionName, false, InterestResultPolicy.NONE); |
| server.invoke(new CacheSerializableRunnable("flush entries") { |
| @Override |
| public void run2() throws CacheException { |
| Region<String, String> region = CacheServerTestUtil.getCache().getRegion(regionName); |
| assertNotNull(region); |
| region.put("LAST", "ENTRY"); |
| } |
| }); |
| } |
| |
| // First we will have the client wait before trying to reconnect |
| // Then the drain will lock and begins to drain |
| // The client will then be able to continue, and get rejected |
| // Then we proceed to drain and release all locks |
| // The client will then reconnect |
| public class RejectClientReconnectTestHook implements CacheClientProxy.TestHook { |
| final CountDownLatch reconnectLatch = new CountDownLatch(1); |
| final CountDownLatch continueDrain = new CountDownLatch(1); |
| volatile boolean clientWasRejected = false; |
| |
| @Override |
| public void doTestHook(String spot) { |
| try { |
| switch (spot) { |
| case "CLIENT_PRE_RECONNECT": |
| if (!reconnectLatch.await(60, SECONDS)) { |
| fail("reconnect latch was never released."); |
| } |
| break; |
| case "DRAIN_IN_PROGRESS_BEFORE_DRAIN_LOCK_CHECK": |
| // let client try to reconnect |
| reconnectLatch.countDown(); |
| // we wait until the client is rejected |
| if (!continueDrain.await(120, SECONDS)) { |
| fail("Latch was never released."); |
| } |
| break; |
| case "CLIENT_REJECTED_DUE_TO_CQ_BEING_DRAINED": |
| clientWasRejected = true; |
| continueDrain.countDown(); |
| break; |
| default: |
| break; |
| } |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| boolean wasClientRejected() { |
| return clientWasRejected; |
| } |
| } |
| |
| /* |
| * This hook will cause the close cq to throw an exception due to a client in the middle of |
| * activating sequence - server will pause before draining client will begin to reconnect and then |
| * wait to continue server will be unblocked, and rejected client will the be unlocked after |
| * server is rejected and continue |
| */ |
| public class CqExceptionDueToActivatingClientTestHook implements CacheClientProxy.TestHook { |
| final CountDownLatch unblockDrain = new CountDownLatch(1); |
| final CountDownLatch unblockClient = new CountDownLatch(1); |
| final CountDownLatch finish = new CountDownLatch(1); |
| |
| @Override |
| public void doTestHook(String spot) { |
| if (spot.equals("PRE_DRAIN_IN_PROGRESS")) { |
| try { |
| // Unblock any client waiting to reconnect |
| unblockClient.countDown(); |
| // Wait until client is reconnecting |
| assertThat(unblockDrain.await(120, SECONDS)) |
| .describedAs("client never got far enough reconnected to unlatch lock.").isTrue(); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| if (spot.equals("PRE_RELEASE_DRAIN_LOCK")) { |
| // Client is reconnecting but still holds the drain lock |
| // let the test continue to try to close a cq |
| unblockDrain.countDown(); |
| // wait until the server has finished attempting to close the cq |
| try { |
| assertThat(finish.await(30, SECONDS)) |
| .describedAs("Test did not complete, server never finished attempting to close cq") |
| .isTrue(); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| if (spot.equals("DRAIN_COMPLETE")) { |
| finish.countDown(); |
| } |
| } |
| } |
| |
| private CqQuery createCq(String cqName, String cqQuery, boolean durable) |
| throws CqException, CqExistsException { |
| QueryService qs = CacheServerTestUtil.getCache().getQueryService(); |
| CqAttributesFactory cqf = new CqAttributesFactory(); |
| CqListener[] cqListeners = {new CacheServerTestUtil.ControlCqListener()}; |
| cqf.initCqListeners(cqListeners); |
| CqAttributes cqa = cqf.create(); |
| return qs.newCq(cqName, cqQuery, cqa, durable); |
| |
| } |
| |
| Pool getClientPool(String host, int serverPort, boolean establishCallbackConnection) { |
| PoolFactory pf = PoolManager.createFactory(); |
| pf.addServer(host, serverPort).setSubscriptionEnabled(establishCallbackConnection) |
| .setSubscriptionAckInterval(1); |
| return ((PoolFactoryImpl) pf).getPoolAttributes(); |
| } |
| |
| Pool getClientPool(String host, int server1Port, int server2Port, |
| boolean establishCallbackConnection) { |
| return getClientPool(host, server1Port, server2Port, establishCallbackConnection, 1); |
| } |
| |
| Properties getClientDistributedSystemProperties(String durableClientId) { |
| return getClientDistributedSystemProperties(durableClientId, |
| DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT); |
| } |
| |
| Properties getClientDistributedSystemProperties(String durableClientId, |
| int durableClientTimeout) { |
| Properties properties = new Properties(); |
| properties.setProperty(MCAST_PORT, "0"); |
| properties.setProperty(LOCATORS, ""); |
| properties.setProperty(DURABLE_CLIENT_ID, durableClientId); |
| properties.setProperty(DURABLE_CLIENT_TIMEOUT, String.valueOf(durableClientTimeout)); |
| return properties; |
| } |
| |
| static CacheClientProxy getClientProxy() { |
| // Get the CacheClientNotifier |
| CacheClientNotifier notifier = getBridgeServer().getAcceptor().getCacheClientNotifier(); |
| |
| // Get the CacheClientProxy or not (if proxy set is empty) |
| CacheClientProxy proxy = null; |
| Iterator<CacheClientProxy> i = notifier.getClientProxies().iterator(); |
| if (i.hasNext()) { |
| proxy = i.next(); |
| } |
| return proxy; |
| } |
| |
| private static String getAllClientProxyState() { |
| // Get the CacheClientNotifier |
| CacheClientNotifier notifier = getBridgeServer().getAcceptor().getCacheClientNotifier(); |
| |
| // Get the CacheClientProxy or not (if proxy set is empty) |
| Iterator<CacheClientProxy> i = notifier.getClientProxies().iterator(); |
| StringBuilder sb = new StringBuilder(); |
| while (i.hasNext()) { |
| sb.append(" ["); |
| sb.append(i.next().getState()); |
| sb.append(" ]"); |
| } |
| return sb.toString(); |
| } |
| |
| static void checkNumberOfClientProxies(final int expected) { |
| await() |
| .until(() -> { |
| return expected == getNumberOfClientProxies(); |
| }); |
| } |
| |
| static void checkProxyIsAlive(final CacheClientProxy proxy) { |
| await() |
| .until(proxy::isAlive); |
| } |
| |
| private static int getNumberOfClientProxies() { |
| return getBridgeServer().getAcceptor().getCacheClientNotifier().getClientProxies().size(); |
| } |
| |
| static CacheServerImpl getBridgeServer() { |
| CacheServerImpl bridgeServer = |
| (CacheServerImpl) CacheServerTestUtil.getCache().getCacheServers().iterator().next(); |
| assertThat(bridgeServer).isNotNull(); |
| return bridgeServer; |
| } |
| |
| |
| Pool getClientPool(String host, int server1Port, int server2Port, |
| boolean establishCallbackConnection, int redundancyLevel) { |
| PoolFactory pf = PoolManager.createFactory(); |
| pf.addServer(host, server1Port).addServer(host, server2Port) |
| .setSubscriptionEnabled(establishCallbackConnection) |
| .setSubscriptionRedundancy(redundancyLevel).setSubscriptionAckInterval(1); |
| return ((PoolFactoryImpl) pf).getPoolAttributes(); |
| } |
| |
| /** |
| * Returns the durable client proxy's HARegionQueue region name. This method is accessed via |
| * reflection on a server VM. |
| * |
| * @return the durable client proxy's HARegionQueue region name |
| */ |
| static String getHARegionQueueName() { |
| checkNumberOfClientProxies(1); |
| CacheClientProxy proxy = getClientProxy(); |
| assertThat(proxy).isNotNull(); |
| return proxy.getHARegionName(); |
| } |
| |
| static void verifyReceivedMarkerAck() { |
| await().atMost(3 * HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES) |
| .pollInterval(200, MILLISECONDS) |
| .until(HARegionQueue::isTestMarkerMessageReceived); |
| } |
| |
| static void setTestFlagToVerifyActForMarker(Boolean flag) { |
| HARegionQueue.setUsedByTest(flag); |
| } |
| |
| void sendClientReady(VM vm) { |
| // Send clientReady message |
| vm.invoke(new CacheSerializableRunnable("Send clientReady") { |
| @Override |
| public void run2() throws CacheException { |
| CacheServerTestUtil.getClientCache().readyForEvents(); |
| } |
| }); |
| } |
| |
| protected void registerInterest(VM vm, final String regionName, final boolean durable, |
| final InterestResultPolicy interestResultPolicy) { |
| vm.invoke(new CacheSerializableRunnable("Register interest on region : " + regionName) { |
| @Override |
| public void run2() throws CacheException { |
| |
| Region<Object, Object> region = CacheServerTestUtil.getCache().getRegion(regionName); |
| assertThat(region).isNotNull(); |
| |
| // Register interest in all keys |
| region.registerInterestRegex(".*", interestResultPolicy, durable); |
| } |
| }); |
| |
| // This seems to be necessary for the queue to start up. Ideally should be replaced with |
| // Awaitility if possible. |
| try { |
| java.lang.Thread.sleep(5000); |
| } catch (java.lang.InterruptedException ex) { |
| fail("interrupted"); |
| } |
| } |
| |
| void createCq(VM vm, final String cqName, final String cqQuery, final boolean durable) { |
| vm.invoke(new CacheSerializableRunnable("Register cq " + cqName) { |
| @Override |
| public void run2() throws CacheException { |
| |
| try { |
| createCq(cqName, cqQuery, durable).execute(); |
| } catch (CqExistsException | CqException | RegionNotFoundException e) { |
| throw new CacheException(e) {}; |
| } |
| |
| } |
| }); |
| } |
| |
| // Publishes strings |
| void publishEntries(int startingValue, final int count) { |
| this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish entries") { |
| @Override |
| public void run2() throws CacheException { |
| Region<String, String> region = CacheServerTestUtil.getCache().getRegion( |
| regionName); |
| assertThat(region).isNotNull(); |
| |
| // Publish some entries |
| for (int i = startingValue; i < startingValue + count; i++) { |
| String keyAndValue = String.valueOf(i); |
| region.put(keyAndValue, keyAndValue); |
| } |
| |
| assertThat(region.get(String.valueOf(startingValue))).isNotNull(); |
| } |
| }); |
| } |
| |
| // Publishes portfolios |
| void publishEntries(final String regionName, final int numEntries) { |
| publisherClientVM.invoke(new CacheSerializableRunnable("publish " + numEntries + " entries") { |
| @Override |
| public void run2() throws CacheException { |
| // Get the region |
| Region<Object, Object> region = CacheServerTestUtil.getCache().getRegion(regionName); |
| assertThat(region).isNotNull(); |
| |
| // Publish some entries |
| for (int i = 0; i < numEntries; i++) { |
| String keyAndValue = String.valueOf(i); |
| region.put(keyAndValue, new Portfolio(i)); |
| } |
| |
| assertThat(region.get(String.valueOf(0))).isNotNull(); |
| } |
| }); |
| } |
| |
| public void verifyListenerUpdatesDisconnected(int numberOfEntries) { |
| // ARB: do nothing. |
| } |
| |
| void checkCqStatOnServer(VM server, final String durableClientId, final String cqName, |
| final int expectedNumber) { |
| server.invoke(new CacheSerializableRunnable( |
| "Check ha queued cq stats for durable client " + durableClientId + " cq: " + cqName) { |
| @Override |
| public void run2() throws CacheException { |
| |
| final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance(); |
| final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId); |
| ClientProxyMembershipID proxyId = clientProxy.getProxyID(); |
| CqService cqService = ((InternalCache) CacheServerTestUtil.getCache()).getCqService(); |
| cqService.start(); |
| final CqQueryImpl cqQuery = (CqQueryImpl) cqService.getClientCqFromServer(proxyId, cqName); |
| |
| // Wait until we get the expected number of events or until 10 seconds are up |
| await() |
| .until(() -> cqQuery.getVsdStats().getNumHAQueuedEvents() == expectedNumber); |
| |
| assertThat(expectedNumber).isEqualTo(cqQuery.getVsdStats().getNumHAQueuedEvents()); |
| } |
| }); |
| } |
| |
| /* |
| * Remaining is the number of events that could still be in the queue due to timing issues with |
| * acks and receiving them after remove from ha queue region has been called. |
| */ |
| void checkHAQueueSize(VM server, final String durableClientId, final int expectedNumber, |
| final int remaining) { |
| server.invoke(new CacheSerializableRunnable( |
| "Check ha queued size for durable client " + durableClientId) { |
| @Override |
| public void run2() throws CacheException { |
| |
| final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance(); |
| final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId); |
| |
| // Wait until we get the expected number of events or until 10 seconds are up |
| await() |
| .until(() -> clientProxy.getQueueSizeStat() == expectedNumber |
| || clientProxy.getQueueSizeStat() == remaining); |
| |
| assertThat(clientProxy.getQueueSizeStat() == expectedNumber |
| || clientProxy.getQueueSizeStat() == remaining).isTrue(); |
| } |
| }); |
| } |
| |
| void checkNumDurableCqs(VM server, final String durableClientId, |
| final int expectedNumber) { |
| server.invoke(new CacheSerializableRunnable( |
| "check number of durable cqs on server for durable client: " + durableClientId) { |
| @Override |
| public void run2() throws CacheException { |
| try { |
| final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance(); |
| final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId); |
| ClientProxyMembershipID proxyId = clientProxy.getProxyID(); |
| CqService cqService = ((InternalCache) CacheServerTestUtil.getCache()).getCqService(); |
| cqService.start(); |
| List<String> cqNames = cqService.getAllDurableClientCqs(proxyId); |
| assertThat(expectedNumber).isEqualTo(cqNames.size()); |
| } catch (Exception e) { |
| throw new CacheException(e) {}; |
| } |
| } |
| }); |
| } |
| |
| /* |
| * @param numEventsToWaitFor most times will be the same as numEvents, but there are times where |
| * we want to wait for an event we know is not coming just to be sure an event actually isn't |
| * received |
| * |
| */ |
| void checkCqListenerEvents(VM vm, final String cqName, final int numEvents, |
| final int secondsToWait) { |
| vm.invoke(() -> { |
| QueryService qs = CacheServerTestUtil.getCache().getQueryService(); |
| CqQuery cq = qs.getCq(cqName); |
| // Get the listener and wait for the appropriate number of events |
| CacheServerTestUtil.ControlCqListener listener = |
| (CacheServerTestUtil.ControlCqListener) cq.getCqAttributes().getCqListener(); |
| listener.waitWhileNotEnoughEvents(secondsToWait * 1000, numEvents); |
| assertThat(numEvents).isEqualTo(listener.events.size()); |
| }); |
| } |
| |
| void checkListenerEvents(int numberOfEntries, final int sleepMinutes, final int eventType, |
| final VM vm) { |
| vm.invoke(() -> { |
| // Get the region |
| Region<Object, Object> region = CacheServerTestUtil.getCache().getRegion(regionName); |
| assertThat(region).isNotNull(); |
| |
| // Get the listener and wait for the appropriate number of events |
| CacheServerTestUtil.ControlListener controlListener = |
| (CacheServerTestUtil.ControlListener) region.getAttributes().getCacheListeners()[0]; |
| |
| controlListener.waitWhileNotEnoughEvents(sleepMinutes * 60 * 1000, numberOfEntries, |
| controlListener.getEvents(eventType)); |
| }); |
| } |
| |
| void startDurableClient(VM vm, String durableClientId, int serverPort1, |
| String regionName, int durableTimeoutInSeconds) { |
| vm.invoke(() -> CacheServerTestUtil.createCacheClient( |
| getClientPool(NetworkUtils.getServerHostName(), serverPort1, true), |
| regionName, getClientDistributedSystemProperties(durableClientId, durableTimeoutInSeconds), |
| Boolean.TRUE)); |
| } |
| |
| void startDurableClient(VM vm, String durableClientId, int serverPort1, |
| String regionName) { |
| vm.invoke(() -> { |
| CacheServerTestUtil.createCacheClient( |
| getClientPool(NetworkUtils.getServerHostName(), serverPort1, true), |
| regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE); |
| assertThat(CacheServerTestUtil.getClientCache()).isNotNull(); |
| }); |
| } |
| |
| void startDurableClient(VM vm, String durableClientId, int serverPort1, int serverPort2, |
| String regionName) { |
| vm.invoke(() -> CacheServerTestUtil.createCacheClient( |
| getClientPool(NetworkUtils.getServerHostName(), serverPort1, serverPort2, true), |
| regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE)); |
| } |
| |
| void startClient(VM vm, int serverPort1, String regionName) { |
| vm.invoke(() -> { |
| CacheServerTestUtil.createCacheClient( |
| getClientPool(NetworkUtils.getServerHostName(), serverPort1, false), |
| regionName); |
| assertThat(CacheServerTestUtil.getClientCache()).isNotNull(); |
| }); |
| } |
| |
| void checkPrimaryUpdater(VM vm) { |
| vm.invoke(new CacheSerializableRunnable("Verify durable client") { |
| @Override |
| public void run2() throws CacheException { |
| |
| await() |
| .until(() -> CacheServerTestUtil.getPool().isPrimaryUpdaterAlive()); |
| |
| assertThat(CacheServerTestUtil.getPool().isPrimaryUpdaterAlive()).isTrue(); |
| } |
| }); |
| } |
| |
| protected void closeCache(VM vm) { |
| vm.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| } |
| |
| |
| } |