| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.tier.sockets; |
| |
| |
| import static org.apache.geode.cache.Region.SEPARATOR; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.assertj.core.api.Assertions.fail; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.InterestResultPolicy; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.client.Pool; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.client.internal.ConnectionStats; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.cache.query.CqAttributes; |
| import org.apache.geode.cache.query.CqAttributesFactory; |
| import org.apache.geode.cache.query.CqException; |
| import org.apache.geode.cache.query.CqExistsException; |
| import org.apache.geode.cache.query.CqListener; |
| import org.apache.geode.cache.query.CqQuery; |
| import org.apache.geode.cache.query.QueryService; |
| import org.apache.geode.cache.query.RegionNotFoundException; |
| import org.apache.geode.cache30.CacheSerializableRunnable; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.ServerLocation; |
| import org.apache.geode.distributed.internal.ServerLocationAndMemberId; |
| import org.apache.geode.internal.cache.ClientServerObserverAdapter; |
| import org.apache.geode.internal.cache.ClientServerObserverHolder; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.AsyncInvocation; |
| import org.apache.geode.test.dunit.IgnoredException; |
| import org.apache.geode.test.dunit.NetworkUtils; |
| import org.apache.geode.test.dunit.SerializableRunnableIF; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.WaitCriterion; |
| import org.apache.geode.test.junit.categories.ClientSubscriptionTest; |
| |
| @Category({ClientSubscriptionTest.class}) |
| public class DurableClientCQDUnitTest extends DurableClientTestBase { |
| |
| /** |
| * Test functionality to close the cq and drain all events from the ha queue from the server |
| */ |
| @Test |
| public void testCloseCqAndDrainEvents() { |
| String greaterThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID > 5"; |
| String allQuery = "select * from " + SEPARATOR + regionName + " p where p.ID > -1"; |
| String lessThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID < 5"; |
| |
| // Start a server |
| server1Port = this.server1VM |
| .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE)); |
| |
| // Start a durable client that is kept alive on the server when it stops |
| // normally |
| durableClientId = getName() + "_client"; |
| startDurableClient(durableClientVM, durableClientId, server1Port, regionName); |
| // register durable cqs |
| createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); |
| createCq(durableClientVM, "All", allQuery, true); |
| createCq(durableClientVM, "LessThan5", lessThan5Query, true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| // Verify durable client on server |
| verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId, |
| server1VM); |
| |
| // Stop the durable client |
| this.disconnectDurableClient(true); |
| |
| // Start normal publisher client |
| startClient(publisherClientVM, server1Port, regionName); |
| |
| // Publish some entries |
| publishEntries(regionName, 10); |
| |
| this.server1VM.invoke(new CacheSerializableRunnable("Close cq for durable client") { |
| @Override |
| public void run2() throws CacheException { |
| |
| final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance(); |
| |
| try { |
| ccnInstance.closeClientCq(durableClientId, "All"); |
| } catch (CqException e) { |
| fail("failed", e); |
| } |
| } |
| }); |
| |
| // Restart the durable client |
| startDurableClient(durableClientVM, durableClientId, server1Port, regionName); |
| |
| // Re-register durable cqs |
| createCq(durableClientVM, "GreaterThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > 5", |
| true); |
| createCq(durableClientVM, "All", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > -1", true); |
| createCq(durableClientVM, "LessThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID < 5", |
| true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| // verify cq events for all 3 cqs |
| checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "All", 0 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 5/* secondsToWait */); |
| |
| // Stop the durable client |
| this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the publisher client |
| this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the server |
| this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| } |
| |
| /** |
| * Test that durable CQ is correctly re-registered to new server after the failover and |
| * that the durable client functionality works as expected. |
| * Steps: |
| * 1. Start two servers |
| * 2. Start durable client without HA and register durable CQs |
| * 3. Shutdown the server that is hosting CQs subscription queue (primary server) |
| * 4. Wait for the durable client to perform the failover to the another server |
| * 5. Shutdown the durable client with keepAlive flag set to true |
| * 6. Provision remaining server with the data that should fulfil CQ condition and fill the queue |
| * 7. Start the durable client again and check that it receives correct events from queue |
| */ |
| @Test |
| public void testDurableCQServerFailoverWithoutHAConfigured() |
| throws Exception { |
| String greaterThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID > 5"; |
| String allQuery = "select * from " + SEPARATOR + regionName + " p where p.ID > -1"; |
| String lessThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID < 5"; |
| |
| // Start a server 1 |
| server1Port = this.server1VM |
| .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE)); |
| |
| // Start server 2 |
| server2Port = this.server2VM.invoke(CacheServerTestUtil.class, |
| "createCacheServer", new Object[] {regionName, Boolean.TRUE}); |
| |
| // Start a durable client that is kept alive on the server when it stops normally |
| durableClientId = getName() + "_client"; |
| CacheServerTestUtil.createCacheClient( |
| getClientPool(NetworkUtils.getServerHostName(), server1Port, server2Port, true, 0), |
| regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE); |
| |
| // register non durable cq |
| createCq("GreaterThan5", greaterThan5Query, false).execute(); |
| |
| // register durable cqs |
| createCq("All", allQuery, true).execute(); |
| createCq("LessThan5", lessThan5Query, true).execute(); |
| |
| // send client ready |
| CacheServerTestUtil.getClientCache().readyForEvents(); |
| |
| int oldPrimaryPort = getPrimaryServerPort(); |
| // Close the server that is hosting subscription queue |
| VM primary = getPrimaryServerVM(); |
| // Verify durable client on server |
| verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId, |
| primary); |
| |
| primary.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Wait until failover to the another server is successfully performed |
| waitForFailoverToPerform(oldPrimaryPort); |
| primary = getPrimaryServerVM(); |
| waitForDurableClientPresence(durableClientId, primary, 1); |
| int primaryPort = getPrimaryServerPort(); |
| |
| // Stop the durable client |
| CacheServerTestUtil.closeCache(true); |
| |
| // Start normal publisher client |
| startClient(publisherClientVM, primaryPort, regionName); |
| |
| // Publish some entries |
| publishEntries(regionName, 10); |
| |
| // Restart the durable client |
| CacheServerTestUtil.createCacheClient( |
| getClientPool(NetworkUtils.getServerHostName(), primaryPort, true), |
| regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE); |
| assertThat(CacheServerTestUtil.getClientCache()).isNotNull(); |
| |
| // Re-register non durable cq |
| createCq("GreaterThan5", greaterThan5Query, false).execute(); |
| |
| // Re-register durable cqs |
| createCq("All", allQuery, true).execute(); |
| createCq("LessThan5", lessThan5Query, true).execute(); |
| |
| // send client ready |
| CacheServerTestUtil.getClientCache().readyForEvents(); |
| |
| // verify cq events for all 3 cqs |
| checkCqListenerEvents("GreaterThan5", 0 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| checkCqListenerEvents("LessThan5", 5 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| checkCqListenerEvents("All", 10 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| |
| primary = getPrimaryServerVM(); |
| // Stop the durable client |
| CacheServerTestUtil.closeCache(false); |
| // Stop the publisher client |
| this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| // Stop the remaining server |
| primary.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| } |
| |
| |
| /** |
| * Test functionality to close the cq and drain all events from the ha queue from the server This |
| * draining should not affect events that still have register interest |
| */ |
| @Test |
| public void testCloseAllCqsAndDrainEvents() { |
| String greaterThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID > 5"; |
| String allQuery = "select * from " + SEPARATOR + regionName + " p where p.ID > -1"; |
| String lessThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID < 5"; |
| |
| // Start server 1 |
| server1Port = this.server1VM.invoke( |
| () -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE)); |
| |
| final String durableClientId = getName() + "_client"; |
| |
| startDurableClient(durableClientVM, durableClientId, server1Port, regionName); |
| // register durable cqs |
| registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE); |
| createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); |
| createCq(durableClientVM, "All", allQuery, true); |
| createCq(durableClientVM, "LessThan5", lessThan5Query, true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| // Verify durable client on server |
| verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId, |
| server1VM); |
| |
| // Stop the durable client |
| this.disconnectDurableClient(true); |
| |
| // Start normal publisher client |
| startClient(publisherClientVM, server1Port, regionName); |
| |
| // Publish some entries |
| publishEntries(regionName, 10); |
| |
| closeCQsforDurableClient(durableClientId); |
| |
| // Restart the durable client |
| startDurableClient(durableClientVM, durableClientId, server1Port, regionName); |
| |
| // Reregister durable cqs |
| registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE); |
| createCq(durableClientVM, "GreaterThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > 5", |
| true); |
| createCq(durableClientVM, "All", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > -1", true); |
| createCq(durableClientVM, "LessThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID < 5", |
| true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| checkCqListenerEvents(durableClientVM, "GreaterThan5", 0 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 5/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "LessThan5", 0 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 5/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "All", 0 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 5/* secondsToWait */); |
| checkListenerEvents(10, 1, -1, this.durableClientVM); |
| |
| // Stop the durable client |
| this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the publisher client |
| this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the server |
| this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| } |
| |
| /** |
| * Test functionality to close the cq and drain all events from the ha queue from the server This |
| * draining should remove all events due to no interest registered Continues to publish afterwards |
| * to verify that stats are correct |
| */ |
| @Test |
| public void testCloseAllCqsAndDrainEventsNoInterestRegistered() { |
| String greaterThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID > 5"; |
| String allQuery = "select * from " + SEPARATOR + regionName + " p where p.ID > -1"; |
| String lessThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID < 5"; |
| |
| // Start server 1 |
| server1Port = this.server1VM.invoke( |
| () -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE)); |
| |
| durableClientId = getName() + "_client"; |
| |
| startDurableClient(durableClientVM, durableClientId, server1Port, regionName); |
| // register durable cqs |
| createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); |
| createCq(durableClientVM, "All", allQuery, true); |
| createCq(durableClientVM, "LessThan5", lessThan5Query, true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| // Verify durable client on server |
| verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId, |
| server1VM); |
| |
| // Stop the durable client |
| this.disconnectDurableClient(true); |
| |
| // Start normal publisher client |
| startClient(publisherClientVM, server1Port, regionName); |
| |
| // Publish some entries |
| publishEntries(regionName, 10); |
| |
| closeCQsforDurableClient(durableClientId); |
| |
| // Restart the durable client |
| startDurableClient(durableClientVM, durableClientId, server1Port, regionName); |
| |
| // Reregister durable cqs |
| createCq(durableClientVM, "GreaterThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > 5", |
| true); |
| createCq(durableClientVM, "All", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > -1", true); |
| createCq(durableClientVM, "LessThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID < 5", |
| true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| checkCqListenerEvents(durableClientVM, "GreaterThan5", 0 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 5/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "LessThan5", 0 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 5/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "All", 0 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 5/* secondsToWait */); |
| // Due to the implementation of DurableHARegionQueue where remove is called after dispatch. |
| // This can cause events to linger in the queue due to a "later" ack and only cleared on |
| // the next dispatch. We need to send one more message to dispatch, that calls remove one more |
| // time and any remaining acks (with or without this final published events ack) |
| flushEntries(server1VM, durableClientVM, regionName); |
| |
| // the flush entry message may remain in the queue due |
| // verify the queue stats are as close/correct as possible |
| this.checkHAQueueSize(server1VM, durableClientId, 0, 1); |
| |
| // continue to publish and make sure we get the events |
| publishEntries(regionName, 10); |
| checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 10/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 10/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "All", 10 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 10/* secondsToWait */); |
| |
| // Due to the implementation of DurableHARegionQueue where remove is called after dispatch. |
| // This can cause events to linger in the queue due to a "later" ack and only cleared on |
| // the next dispatch. We need to send one more message to dispatch, that calls remove one more |
| // time and any remaining acks (with or without this final published events ack) |
| flushEntries(server1VM, durableClientVM, regionName); |
| |
| // the flush entry message may remain in the queue due |
| // verify the queue stats are as close/correct as possible |
| this.checkHAQueueSize(server1VM, durableClientId, 0, 1); |
| |
| // Stop the durable client |
| this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the publisher client |
| this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the server |
| this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| } |
| |
| private void closeCQsforDurableClient(String durableClientId) { |
| this.server1VM.invoke(new CacheSerializableRunnable("Close cq for durable client") { |
| @Override |
| public void run2() throws CacheException { |
| |
| final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance(); |
| |
| try { |
| ccnInstance.closeClientCq(durableClientId, "All"); |
| ccnInstance.closeClientCq(durableClientId, "GreaterThan5"); |
| ccnInstance.closeClientCq(durableClientId, "LessThan5"); |
| } catch (CqException e) { |
| fail("failed", e); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * Test functionality to close the cq and drain all events from the ha queue from the server Two |
| * durable clients, one will have a cq be closed, the other should be unaffected |
| */ |
| @Test |
| public void testCloseCqAndDrainEvents2Client() { |
| String greaterThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID > 5"; |
| String allQuery = "select * from " + SEPARATOR + regionName + " p where p.ID > -1"; |
| String lessThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID < 5"; |
| |
| // Start server 1 |
| server1Port = this.server1VM.invoke( |
| () -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE)); |
| |
| final String durableClientId = getName() + "_client"; |
| final String durableClientId2 = getName() + "_client2"; |
| startDurableClient(durableClientVM, durableClientId, server1Port, regionName); |
| // register durable cqs |
| createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); |
| createCq(durableClientVM, "All", allQuery, true); |
| createCq(durableClientVM, "LessThan5", lessThan5Query, true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| // Verify durable client on server |
| verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId, |
| server1VM); |
| |
| // Stop the durable client |
| this.disconnectDurableClient(true); |
| |
| startDurableClient(durableClientVM, durableClientId2, server1Port, regionName); |
| // register durable cqs |
| createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); |
| createCq(durableClientVM, "All", allQuery, true); |
| createCq(durableClientVM, "LessThan5", lessThan5Query, true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| // Verify 2nd durable client on server |
| this.server1VM.invoke(new CacheSerializableRunnable("Verify 2nd durable client") { |
| @Override |
| public void run2() throws CacheException { |
| // Find the proxy |
| checkNumberOfClientProxies(2); |
| } |
| }); |
| |
| this.disconnectDurableClient(true); |
| |
| // Start normal publisher client |
| startClient(publisherClientVM, server1Port, regionName); |
| |
| // Publish some entries |
| publishEntries(regionName, 10); |
| |
| this.server1VM.invoke(new CacheSerializableRunnable("Close cq for durable client 1") { |
| @Override |
| public void run2() throws CacheException { |
| |
| final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance(); |
| |
| try { |
| ccnInstance.closeClientCq(durableClientId, "All"); |
| } catch (CqException e) { |
| fail("failed", e); |
| } |
| } |
| }); |
| |
| // Restart the durable client |
| startDurableClient(durableClientVM, durableClientId, server1Port, regionName); |
| |
| // Reregister durable cqs |
| createCq(durableClientVM, "GreaterThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > 5", |
| true); |
| createCq(durableClientVM, "All", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > -1", true); |
| createCq(durableClientVM, "LessThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID < 5", |
| true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| // verify cq events for all 3 cqs, where ALL should have 0 entries |
| checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "All", 0 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 5/* secondsToWait */); |
| |
| this.disconnectDurableClient(false); |
| |
| // Restart the 2nd durable client |
| startDurableClient(durableClientVM, durableClientId2, server1Port, regionName); |
| |
| // Reregister durable cqs |
| createCq(durableClientVM, "GreaterThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > 5", |
| true); |
| createCq(durableClientVM, "All", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > -1", true); |
| createCq(durableClientVM, "LessThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID < 5", |
| true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| // verify cq events for all 3 cqs, where ALL should have 10 entries |
| checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "All", 10 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| |
| // Stop the durable client |
| this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the publisher client |
| this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the server |
| this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| } |
| |
| /** |
| * Tests situation where a client is trying to reconnect while a cq is being drained. The client |
| * should be rejected until no cqs are currently being drained |
| */ |
| @Test |
| public void testRejectClientWhenDrainingCq() { |
| try { |
| IgnoredException.addIgnoredException( |
| "CacheClientNotifier: Connection refused due to cq queue being drained from admin command, please wait..."); |
| IgnoredException.addIgnoredException( |
| "Could not initialize a primary queue on startup. No queue servers available."); |
| |
| String greaterThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID > 5"; |
| String allQuery = "select * from " + SEPARATOR + regionName + " p where p.ID > -1"; |
| String lessThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID < 5"; |
| |
| // Start server 1 |
| server1Port = this.server1VM.invoke( |
| () -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE)); |
| |
| final String durableClientId = getName() + "_client"; |
| this.durableClientVM.invoke(CacheServerTestUtil::disableShufflingOfEndpoints); |
| |
| startDurableClient(durableClientVM, durableClientId, server1Port, regionName); |
| |
| // register durable cqs |
| createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); |
| createCq(durableClientVM, "All", allQuery, true); |
| createCq(durableClientVM, "LessThan5", lessThan5Query, true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId, |
| server1VM); |
| |
| // Stop the durable client |
| this.disconnectDurableClient(true); |
| |
| // Start normal publisher client |
| startClient(publisherClientVM, server1Port, regionName); |
| |
| // Publish some entries |
| publishEntries(regionName, 10); |
| |
| this.server1VM.invoke(new CacheSerializableRunnable("Set test hook") { |
| @Override |
| public void run2() throws CacheException { |
| // Set the Test Hook! |
| // This test hook will pause during the drain process |
| CacheClientProxy.testHook = new RejectClientReconnectTestHook(); |
| } |
| }); |
| |
| this.server1VM.invokeAsync(new CacheSerializableRunnable("Close cq for durable client") { |
| @Override |
| public void run2() throws CacheException { |
| |
| final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance(); |
| |
| try { |
| ccnInstance.closeClientCq(durableClientId, "All"); |
| } catch (CqException e) { |
| fail("failed", e); |
| } |
| } |
| }); |
| |
| // Restart the durable client |
| startDurableClient(durableClientVM, durableClientId, server1Port, regionName); |
| |
| this.server1VM.invoke(new CacheSerializableRunnable("verify was rejected at least once") { |
| @Override |
| public void run2() throws CacheException { |
| await() |
| .until(() -> CacheClientProxy.testHook != null |
| && (((RejectClientReconnectTestHook) CacheClientProxy.testHook) |
| .wasClientRejected())); |
| assertTrue( |
| ((RejectClientReconnectTestHook) CacheClientProxy.testHook).wasClientRejected()); |
| } |
| }); |
| |
| checkPrimaryUpdater(durableClientVM); |
| |
| // After rejection, the client will retry and eventually connect |
| // Verify durable client on server2 |
| verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId, |
| server1VM); |
| |
| createCq(durableClientVM, "GreaterThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > 5", true); |
| createCq(durableClientVM, "All", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > -1", true); |
| createCq(durableClientVM, "LessThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID < 5", |
| true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "All", 0 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 5/* secondsToWait */); |
| |
| // Stop the durable client |
| this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the publisher client |
| this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the server |
| this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| } finally { |
| this.server1VM.invoke(new CacheSerializableRunnable("unset test hook") { |
| @Override |
| public void run2() throws CacheException { |
| CacheClientProxy.testHook = null; |
| } |
| }); |
| } |
| } |
| |
| |
| /** |
| * Tests scenario where close cq will throw an exception due to a client being reactivated |
| */ |
| @Test |
| public void testCqCloseExceptionDueToActivatingClient() throws Exception { |
| try { |
| String greaterThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID > 5"; |
| String allQuery = "select * from " + SEPARATOR + regionName + " p where p.ID > -1"; |
| String lessThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID < 5"; |
| |
| // Start server 1 |
| server1Port = this.server1VM.invoke( |
| () -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE)); |
| |
| durableClientId = getName() + "_client"; |
| |
| startDurableClient(durableClientVM, durableClientId, server1Port, regionName); |
| // register durable cqs |
| createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); |
| createCq(durableClientVM, "All", allQuery, true); |
| createCq(durableClientVM, "LessThan5", lessThan5Query, true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| // Verify durable client on server |
| verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId, |
| server1VM); |
| |
| // Stop the durable client |
| this.disconnectDurableClient(true); |
| |
| // Start normal publisher client |
| startClient(publisherClientVM, server1Port, regionName); |
| |
| // Publish some entries |
| publishEntries(regionName, 10); |
| |
| AsyncInvocation async = |
| this.server1VM.invokeAsync(new CacheSerializableRunnable("Close cq for durable client") { |
| @Override |
| public void run2() throws CacheException { |
| |
| // Set the Test Hook! |
| // This test hook will pause during the drain process |
| CacheClientProxy.testHook = new CqExceptionDueToActivatingClientTestHook(); |
| |
| final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance(); |
| final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId); |
| ClientProxyMembershipID proxyId = clientProxy.getProxyID(); |
| |
| try { |
| ccnInstance.closeClientCq(durableClientId, "All"); |
| fail("Should have thrown an exception due to activating client"); |
| } catch (CqException e) { |
| String expected = |
| String.format( |
| "CacheClientProxy: Could not drain cq %s due to client proxy id %s reconnecting.", |
| "All", proxyId.getDurableId()); |
| if (!e.getMessage().equals(expected)) { |
| fail("Not the expected exception, was expecting " |
| + expected |
| + " instead of exception: " + e.getMessage()); |
| } |
| } |
| } |
| }); |
| |
| // Restart the durable client |
| startDurableClient(durableClientVM, durableClientId, server1Port, regionName); |
| |
| // Reregister durable cqs |
| createCq(durableClientVM, "GreaterThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > 5", true); |
| createCq(durableClientVM, "All", |
| "select * from " + SEPARATOR + regionName + " p where p.ID > -1", true); |
| createCq(durableClientVM, "LessThan5", |
| "select * from " + SEPARATOR + regionName + " p where p.ID < 5", |
| true); |
| // send client ready |
| sendClientReady(durableClientVM); |
| |
| async.join(); |
| assertFalse(async.getException() != null ? async.getException().toString() : "No error", |
| async.exceptionOccurred()); |
| |
| // verify cq listener events |
| checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "All", 10 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| |
| // Stop the durable client |
| this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the publisher client |
| this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the server |
| this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| } finally { |
| this.server1VM.invoke(new CacheSerializableRunnable("unset test hook") { |
| @Override |
| public void run2() throws CacheException { |
| CacheClientProxy.testHook = null; |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Tests situation where a client is trying to reconnect while a cq is being drained |
| */ |
| @Test |
| public void testCqCloseExceptionDueToActiveConnection() { |
| String greaterThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID > 5"; |
| String allQuery = "select * from " + SEPARATOR + regionName + " p where p.ID > -1"; |
| String lessThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID < 5"; |
| |
| // Start a server |
| server1Port = this.server1VM |
| .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE)); |
| |
| // Start a durable client that is kept alive on the server when it stops |
| // normally |
| durableClientId = getName() + "_client"; |
| startDurableClient(durableClientVM, durableClientId, server1Port, regionName); |
| sendClientReady(durableClientVM); |
| |
| // register durable cqs |
| createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); |
| createCq(durableClientVM, "All", allQuery, true); |
| createCq(durableClientVM, "LessThan5", lessThan5Query, true); |
| |
| verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId, |
| server1VM); |
| |
| // Start normal publisher client |
| startClient(publisherClientVM, server1Port, regionName); |
| |
| // Publish some entries |
| publishEntries(regionName, 10); |
| |
| // Attempt to close a cq even though the client is running |
| this.server1VM.invoke(new CacheSerializableRunnable("Close cq for durable client") { |
| @Override |
| public void run2() throws CacheException { |
| |
| final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance(); |
| final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId); |
| ClientProxyMembershipID proxyId = clientProxy.getProxyID(); |
| |
| try { |
| ccnInstance.closeClientCq(durableClientId, "All"); |
| fail( |
| "expected a cq exception. We have an active client proxy, the close cq command should have failed"); |
| } catch (CqException e) { |
| // expected exception; |
| String expected = |
| String.format( |
| "CacheClientProxy: Could not drain cq %s because client proxy id %s is connected.", |
| "All", proxyId.getDurableId()); |
| if (!e.getMessage().equals(expected)) { |
| fail("Not the expected exception, was expecting " |
| + expected + " instead of exception: " |
| + e.getMessage(), |
| e); |
| } |
| } |
| } |
| }); |
| |
| // verify cq events for all 3 cqs |
| checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15 * 4/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| checkCqListenerEvents(durableClientVM, "All", 10 /* numEventsExpected */, |
| /* numEventsToWaitFor */ 15/* secondsToWait */); |
| |
| // Stop the durable client |
| this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the publisher client |
| this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the server |
| this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| } |
| |
| @Test |
| public void testGetAllDurableCqsFromServer() { |
| |
| |
| // Start server 1 |
| server1Port = this.server1VM.invoke(CacheServerTestUtil.class, |
| "createCacheServer", new Object[] {regionName, Boolean.TRUE}); |
| |
| // Start server 2 |
| final int server2Port = this.server2VM.invoke(CacheServerTestUtil.class, |
| "createCacheServer", new Object[] {regionName, Boolean.TRUE}); |
| |
| // Start a durable client |
| durableClientId = getName() + "_client"; |
| this.durableClientVM.invoke(() -> { |
| CacheServerTestUtil |
| .createCacheClient(getClientPool(getServerHostName(), server1Port, server2Port, |
| true, 0), |
| regionName, getClientDistributedSystemProperties(durableClientId, 60), Boolean.TRUE); |
| |
| }); |
| |
| // Send clientReady message |
| sendClientReady(this.durableClientVM); |
| |
| // Register durable CQ |
| String cqName = getName() + "_cq"; |
| registerDurableCq(cqName); |
| |
| // Execute getAllDurableCqsFromServer on the client |
| List<String> durableCqNames = |
| this.durableClientVM.invoke(DurableClientCQDUnitTest::getAllDurableCqsFromServer); |
| |
| this.durableClientVM.invoke(() -> verifyDurableCqs(durableCqNames, cqName)); |
| |
| // Stop the durable client |
| this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the servers |
| this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| this.server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| } |
| |
| private static List<String> getAllDurableCqsFromServer() throws CqException { |
| QueryService queryService = CacheServerTestUtil.getPool().getQueryService(); |
| return queryService.getAllDurableCqsFromServer(); |
| } |
| |
| private static void verifyDurableCqs(final List<String> durableCqNames, |
| final String registeredCqName) { |
| // Verify the number of durable CQ names is one, and it matches the registered name |
| assertEquals(1, durableCqNames.size()); |
| String returnedCqName = durableCqNames.get(0); |
| assertEquals(registeredCqName, returnedCqName); |
| |
| // Get client's primary server |
| PoolImpl pool = CacheServerTestUtil.getPool(); |
| ServerLocation primaryServerLocation = pool.getPrimary(); |
| |
| // Verify the primary server was used and no other server was used |
| Map<ServerLocationAndMemberId, ConnectionStats> statistics = |
| pool.getEndpointManager().getAllStats(); |
| for (Map.Entry<ServerLocationAndMemberId, ConnectionStats> entry : statistics.entrySet()) { |
| int expectedGetDurableCqInvocations = |
| entry.getKey().getServerLocation().equals(primaryServerLocation) ? 1 : 0; |
| assertEquals(expectedGetDurableCqInvocations, entry.getValue().getGetDurableCqs()); |
| } |
| } |
| |
| /** |
| * This test method is disabled because it is failing periodically and causing cruise control |
| * failures See bug #47060 (test seems to be enabled now!) |
| */ |
| @Test |
| public void testReadyForEventsNotCalledImplicitlyWithCacheXML() throws InterruptedException { |
| try { |
| setPeriodicACKObserver(durableClientVM); |
| final String cqName = "cqTest"; |
| // Start a server |
| server1Port = |
| this.server1VM.invoke(() -> CacheServerTestUtil.createCacheServerFromXml( |
| DurableClientTestBase.class.getResource("durablecq-server-cache.xml"))); |
| |
| // Start a durable client that is not kept alive on the server when it |
| // stops normally |
| final String durableClientId = getName() + "_client"; |
| |
| // create client cache from xml |
| this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClientFromXml( |
| DurableClientTestBase.class.getResource("durablecq-client-cache.xml"), "client", |
| durableClientId, VERY_LONG_DURABLE_TIMEOUT_SECONDS, Boolean.FALSE)); |
| |
| // verify that readyForEvents has not yet been called on all the client's pools |
| this.durableClientVM.invoke(new CacheSerializableRunnable("check readyForEvents not called") { |
| @Override |
| public void run2() throws CacheException { |
| for (Pool p : PoolManager.getAll().values()) { |
| assertFalse(((PoolImpl) p).getReadyForEventsCalled()); |
| } |
| } |
| }); |
| |
| // Send clientReady message |
| sendClientReady(durableClientVM); |
| registerDurableCq(cqName); |
| |
| // Verify durable client on server1 |
| verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM); |
| |
| // Start normal publisher client |
| this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient( |
| getClientPool(getServerHostName(), server1Port, false), |
| regionName)); |
| |
| // Publish some entries |
| final int numberOfEntries = 10; |
| publishEntries(0, numberOfEntries); |
| |
| // Verify the durable client received the updates |
| checkCqListenerEvents(this.durableClientVM, cqName, numberOfEntries, |
| VERY_LONG_DURABLE_TIMEOUT_SECONDS); |
| |
| Thread.sleep(10000); |
| |
| // Stop the durable client |
| this.durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(Boolean.TRUE)); |
| |
| // Verify the durable client still exists on the server |
| verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, |
| server1VM); |
| |
| // Publish some more entries |
| publishEntries(10, numberOfEntries); |
| |
| this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Re-start the durable client |
| this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClientFromXml( |
| DurableClientTestBase.class.getResource("durablecq-client-cache.xml"), "client", |
| durableClientId, VERY_LONG_DURABLE_TIMEOUT_SECONDS, Boolean.FALSE)); |
| |
| // Durable client registers durable cq on server |
| registerDurableCq(cqName); |
| |
| // Send clientReady message |
| sendClientReady(this.durableClientVM); |
| |
| // Verify durable client on server |
| verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM); |
| |
| // Verify the durable client received the updates held for it on the server |
| checkCqListenerEvents(this.durableClientVM, cqName, 10, VERY_LONG_DURABLE_TIMEOUT_SECONDS); |
| |
| // Stop the durable client |
| this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| |
| // Stop the server |
| this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); |
| } finally { |
| unsetPeriodicACKObserver(durableClientVM); |
| } |
| } |
| |
| private void setPeriodicACKObserver(VM vm) { |
| CacheSerializableRunnable cacheSerializableRunnable = |
| new CacheSerializableRunnable("Set ClientServerObserver") { |
| @Override |
| public void run2() throws CacheException { |
| PoolImpl.BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG = true; |
| ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() { |
| @Override |
| public void beforeSendingClientAck() { |
| // logger.info("beforeSendingClientAck invoked"); |
| |
| } |
| }); |
| |
| } |
| }; |
| vm.invoke(cacheSerializableRunnable); |
| } |
| |
| private void unsetPeriodicACKObserver(VM vm) { |
| CacheSerializableRunnable cacheSerializableRunnable = |
| new CacheSerializableRunnable("Unset ClientServerObserver") { |
| @Override |
| public void run2() throws CacheException { |
| PoolImpl.BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG = false; |
| } |
| }; |
| vm.invoke(cacheSerializableRunnable); |
| } |
| |
| public VM getPrimaryServerVM() { |
| if (this.server1Port == getPrimaryServerPort()) { |
| return server1VM; |
| } else { |
| return server2VM; |
| } |
| } |
| |
| public int getPrimaryServerPort() { |
| PoolImpl pool = CacheServerTestUtil.getPool(); |
| ServerLocation primaryServerLocation = pool.getPrimary(); |
| return primaryServerLocation.getPort(); |
| } |
| |
| public void waitForFailoverToPerform(int oldPrimaryPort) { |
| final PoolImpl pool = CacheServerTestUtil.getPool(); |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return pool.getPrimary() != null && pool.getPrimary().getPort() != oldPrimaryPort; |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| |
| GeodeAwaitility.await().untilAsserted(ev); |
| assertNotNull(pool.getPrimary()); |
| } |
| |
| void registerDurableCq(final String cqName) { |
| // Durable client registers durable cq on server |
| this.durableClientVM.invoke(new CacheSerializableRunnable("Register Cq") { |
| @Override |
| public void run2() throws CacheException { |
| // Get the region |
| Region<Object, Object> region = CacheServerTestUtil.getCache().getRegion(regionName); |
| assertNotNull(region); |
| |
| // Create CQ Attributes. |
| CqAttributesFactory cqAf = new CqAttributesFactory(); |
| |
| // Initialize and set CqListener. |
| CqListener[] cqListeners = {new CacheServerTestUtil.ControlCqListener()}; |
| cqAf.initCqListeners(cqListeners); |
| CqAttributes cqa = cqAf.create(); |
| |
| // Create cq's |
| // Get the query service for the Pool |
| QueryService queryService = CacheServerTestUtil.getPool().getQueryService(); |
| |
| try { |
| CqQuery query = |
| queryService.newCq(cqName, "Select * from " + SEPARATOR + regionName, cqa, true); |
| query.execute(); |
| } catch (CqExistsException | CqException e) { |
| fail("Failed due to ", e); |
| } catch (RegionNotFoundException e) { |
| fail("Could not find specified region:" + regionName + ":", e); |
| } |
| } |
| }); |
| } |
| |
| } |