| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.cache.query.cq.dunit; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID; |
| import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.test.dunit.Assert.assertEquals; |
| import static org.apache.geode.test.dunit.Assert.assertFalse; |
| import static org.apache.geode.test.dunit.Assert.assertNotNull; |
| import static org.apache.geode.test.dunit.Assert.assertNull; |
| import static org.apache.geode.test.dunit.Assert.assertTrue; |
| import static org.apache.geode.test.dunit.Assert.fail; |
| |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.logging.log4j.Logger; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.EvictionAttributes; |
| import org.apache.geode.cache.MirrorType; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.client.Pool; |
| import org.apache.geode.cache.client.PoolFactory; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.query.CqAttributes; |
| import org.apache.geode.cache.query.CqAttributesFactory; |
| import org.apache.geode.cache.query.CqEvent; |
| import org.apache.geode.cache.query.CqException; |
| import org.apache.geode.cache.query.CqExistsException; |
| import org.apache.geode.cache.query.CqListener; |
| import org.apache.geode.cache.query.CqQuery; |
| import org.apache.geode.cache.query.QueryService; |
| import org.apache.geode.cache.query.RegionNotFoundException; |
| import org.apache.geode.cache.query.SelectResults; |
| import org.apache.geode.cache.query.Struct; |
| import org.apache.geode.cache.query.cq.internal.CqQueryImpl; |
| import org.apache.geode.cache.query.cq.internal.CqQueryImpl.TestHook; |
| import org.apache.geode.cache.query.data.Portfolio; |
| import org.apache.geode.cache30.CacheSerializableRunnable; |
| import org.apache.geode.cache30.CertifiableTestCacheListener; |
| import org.apache.geode.internal.AvailablePortHelper; |
| import org.apache.geode.internal.cache.PoolFactoryImpl; |
| import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.Assert; |
| import org.apache.geode.test.dunit.AsyncInvocation; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.IgnoredException; |
| import org.apache.geode.test.dunit.Invoke; |
| import org.apache.geode.test.dunit.LogWriterUtils; |
| import org.apache.geode.test.dunit.NetworkUtils; |
| import org.apache.geode.test.dunit.SerializableRunnable; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.Wait; |
| import org.apache.geode.test.dunit.WaitCriterion; |
| import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; |
| import org.apache.geode.test.junit.categories.ClientSubscriptionTest; |
| |
| /** |
| * This class tests the ContinuousQuery mechanism in GemFire. This includes the test with different |
| * data activities. |
| */ |
| @Category({ClientSubscriptionTest.class}) |
| public class CqDataUsingPoolDUnitTest extends JUnit4CacheTestCase { |
| |
| protected CqQueryUsingPoolDUnitTest cqDUnitTest = new CqQueryUsingPoolDUnitTest(); // TODO: don't |
| // do this! |
| private static final Logger logger = LogService.getLogger(); |
| |
| @Override |
| public final void postSetUp() throws Exception { |
| // avoid IllegalStateException from HandShake by connecting all vms tor |
| // system before creating ConnectionPools |
| getSystem(); |
| Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") { |
| @Override |
| public void run() { |
| getSystem(); |
| } |
| }); |
| postSetUpCqDataUsingPoolDUnitTest(); |
| } |
| |
| protected void postSetUpCqDataUsingPoolDUnitTest() throws Exception {} |
| |
| /** |
| * Tests with client acting as feeder/publisher and registering cq. Added wrt bug 37161. In case |
| * of InterestList the events are not sent back to the client if its the originator, this is not |
| * true for cq. |
| */ |
| @Test |
| public void testClientWithFeederAndCQ() throws Exception { |
| final Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client = host.getVM(1); |
| |
| cqDUnitTest.createServer(server); |
| |
| final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(server.getHost()); |
| |
| String poolName = "testClientWithFeederAndCQ"; |
| cqDUnitTest.createPool(client, poolName, host0, port); |
| |
| // Create client. |
| cqDUnitTest.createClient(client, port, host0); |
| |
| |
| cqDUnitTest.createCQ(client, poolName, "testClientWithFeederAndCQ_0", cqDUnitTest.cqs[0]); |
| cqDUnitTest.executeCQ(client, "testClientWithFeederAndCQ_0", false, null); |
| |
| final int size = 10; |
| cqDUnitTest.createValues(client, cqDUnitTest.regions[0], size); |
| cqDUnitTest.waitForCreated(client, "testClientWithFeederAndCQ_0", |
| CqQueryUsingPoolDUnitTest.KEY + size); |
| |
| cqDUnitTest.validateCQ(client, "testClientWithFeederAndCQ_0", |
| /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest, /* creates: */ size, /* updates: */ 0, |
| /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 0, /* queryDeletes: */ 0, |
| /* totalEvents: */ size); |
| |
| // Close. |
| cqDUnitTest.closeClient(client); |
| cqDUnitTest.closeServer(server); |
| } |
| |
| /** |
| * Test for CQ Fail over/HA with redundancy level set. |
| */ |
| @Test |
| public void testCQHAWithState() throws Exception { |
| final Host host = Host.getHost(0); |
| VM server1 = host.getVM(0); |
| VM server2 = host.getVM(1); |
| VM server3 = host.getVM(2); |
| |
| VM client = host.getVM(3); |
| |
| cqDUnitTest.createServer(server1); |
| |
| final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(server1.getHost()); |
| |
| final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2); |
| |
| cqDUnitTest.createServer(server2, ports[0]); |
| final int port2 = server2.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort()); |
| |
| // Create client - With 3 server endpoints and redundancy level set to 2. |
| |
| // Create client with redundancyLevel 1 |
| |
| String poolName = "testCQHAWithState"; |
| cqDUnitTest.createPool(client, poolName, new String[] {host0, host0, host0}, |
| new int[] {port1, port2, ports[1]}, "1"); |
| |
| // Create CQs. |
| int numCQs = 1; |
| for (int i = 0; i < numCQs; i++) { |
| // Create CQs. |
| cqDUnitTest.createCQ(client, poolName, "testCQHAWithState_" + i, cqDUnitTest.cqs[i]); |
| cqDUnitTest.executeCQ(client, "testCQHAWithState_" + i, false, null); |
| } |
| |
| Wait.pause(1 * 1000); |
| |
| int size = 10; |
| |
| // CREATE. |
| cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], size); |
| cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], size); |
| |
| for (int i = 1; i <= size; i++) { |
| cqDUnitTest.waitForCreated(client, "testCQHAWithState_0", CqQueryUsingPoolDUnitTest.KEY + i); |
| } |
| |
| // Clients expected initial result. |
| int[] resultsCnt = new int[] {10, 1, 2}; |
| |
| for (int i = 0; i < numCQs; i++) { |
| cqDUnitTest.validateCQ(client, "testCQHAWithState_" + i, CqQueryUsingPoolDUnitTest.noTest, |
| resultsCnt[i], 0, 0); |
| } |
| |
| // Close server1. |
| // To maintain the redundancy; it will make connection to endpoint-3. |
| cqDUnitTest.closeServer(server1); |
| Wait.pause(3 * 1000); |
| |
| |
| // UPDATE-1. |
| cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], 10); |
| cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], 10); |
| |
| for (int i = 1; i <= size; i++) { |
| cqDUnitTest.waitForUpdated(client, "testCQHAWithState_0", |
| CqQueryUsingPoolDUnitTest.KEY + size); |
| } |
| |
| for (int i = 0; i < numCQs; i++) { |
| cqDUnitTest.validateCQ(client, "testCQHAWithState_" + i, CqQueryUsingPoolDUnitTest.noTest, |
| resultsCnt[i], resultsCnt[i], CqQueryUsingPoolDUnitTest.noTest); |
| } |
| |
| // Stop cq. |
| cqDUnitTest.stopCQ(client, "testCQHAWithState_0"); |
| |
| Wait.pause(2 * 1000); |
| |
| // UPDATE with stop. |
| cqDUnitTest.createServer(server3, ports[1]); |
| server3.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort()); |
| Wait.pause(2 * 1000); |
| |
| cqDUnitTest.clearCQListenerEvents(client, "testCQHAWithState_0"); |
| |
| cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], 10); |
| cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], 10); |
| |
| // Wait for events at client. |
| try { |
| cqDUnitTest.waitForUpdated(client, "testCQHAWithState_0", CqQueryUsingPoolDUnitTest.KEY + 1); |
| fail("Events not expected since CQ is in stop state."); |
| } catch (Exception expected) { |
| // Success. |
| } |
| |
| cqDUnitTest.executeCQ(client, "testCQHAWithState_0", false, null); |
| |
| // Update - 2 |
| cqDUnitTest.createValues(server3, cqDUnitTest.regions[0], 10); |
| cqDUnitTest.createValues(server3, cqDUnitTest.regions[1], 10); |
| |
| for (int i = 1; i <= size; i++) { |
| cqDUnitTest.waitForUpdated(client, "testCQHAWithState_0", |
| CqQueryUsingPoolDUnitTest.KEY + size); |
| } |
| |
| for (int i = 0; i < numCQs; i++) { |
| cqDUnitTest.validateCQ(client, "testCQHAWithState_" + i, CqQueryUsingPoolDUnitTest.noTest, |
| resultsCnt[i], resultsCnt[i] * 2, CqQueryUsingPoolDUnitTest.noTest); |
| } |
| |
| // Close. |
| cqDUnitTest.closeClient(client); |
| cqDUnitTest.closeServer(server2); |
| cqDUnitTest.closeServer(server3); |
| } |
| |
| /** |
| * Tests propogation of invalidates and destorys to the clients. Bug 37242. |
| */ |
| @Test |
| public void testCQWithDestroysAndInvalidates() throws Exception { |
| final Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client = host.getVM(1); |
| VM producer = host.getVM(2); |
| cqDUnitTest.createServer(server, 0, true); |
| final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(server.getHost()); |
| |
| String poolName = "testCQWithDestroysAndInvalidates"; |
| cqDUnitTest.createPool(client, poolName, host0, port); |
| |
| // Create client. |
| // cqDUnitTest.createClient(client, port, host0); |
| |
| // producer is not doing any thing. |
| cqDUnitTest.createClient(producer, port, host0); |
| |
| final int size = 10; |
| final String name = "testQuery_4"; |
| cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size); |
| |
| cqDUnitTest.createCQ(client, poolName, name, cqDUnitTest.cqs[4]); |
| cqDUnitTest.executeCQ(client, name, true, null); |
| |
| // do destroys and invalidates. |
| server.invoke(new CacheSerializableRunnable("Create values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getRootRegion().getSubregion(cqDUnitTest.regions[0]); |
| for (int i = 1; i <= 5; i++) { |
| region1.destroy(CqQueryUsingPoolDUnitTest.KEY + i); |
| } |
| } |
| }); |
| for (int i = 1; i <= 5; i++) { |
| cqDUnitTest.waitForDestroyed(client, name, CqQueryUsingPoolDUnitTest.KEY + i); |
| } |
| // recreate the key values from 1 - 5 |
| cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 5); |
| // wait for all creates to arrive. |
| for (int i = 1; i <= 5; i++) { |
| cqDUnitTest.waitForCreated(client, name, CqQueryUsingPoolDUnitTest.KEY + i); |
| } |
| |
| // do more puts to push first five key-value to disk. |
| cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 10); |
| // do invalidates on fisrt five keys. |
| server.invoke(new CacheSerializableRunnable("Create values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getRootRegion().getSubregion(cqDUnitTest.regions[0]); |
| for (int i = 1; i <= 5; i++) { |
| region1.invalidate(CqQueryUsingPoolDUnitTest.KEY + i); |
| } |
| } |
| }); |
| // wait for invalidates now. |
| for (int i = 1; i <= 5; i++) { |
| cqDUnitTest.waitForInvalidated(client, name, CqQueryUsingPoolDUnitTest.KEY + i); |
| } |
| |
| // Close. |
| cqDUnitTest.closeClient(client); |
| cqDUnitTest.closeServer(server); |
| } |
| |
| /** |
| * Tests make sure that the second client doesnt get more events then there should be. This will |
| * test the fix for bug 37295. |
| */ |
| @Test |
| public void testCQWithMultipleClients() throws Exception { |
| final Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client1 = host.getVM(1); |
| VM client2 = host.getVM(2); |
| VM client3 = host.getVM(3); |
| |
| /* Create Server and Client */ |
| cqDUnitTest.createServer(server); |
| final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(server.getHost()); |
| |
| String poolName1 = "testCQWithMultipleClients1"; |
| String poolName2 = "testCQWithMultipleClients2"; |
| |
| cqDUnitTest.createPool(client1, poolName1, host0, port); |
| cqDUnitTest.createPool(client2, poolName2, host0, port); |
| |
| /* Create CQs. and initialize the region */ |
| // this should stasify every thing since id is always greater than |
| // zero. |
| cqDUnitTest.createCQ(client1, poolName1, "testCQWithMultipleClients_0", cqDUnitTest.cqs[0]); |
| cqDUnitTest.executeCQ(client1, "testCQWithMultipleClients_0", false, null); |
| // should only satisfy one key-value pair in the region. |
| cqDUnitTest.createCQ(client2, poolName2, "testCQWithMultipleClients_0", cqDUnitTest.cqs[1]); |
| cqDUnitTest.executeCQ(client2, "testCQWithMultipleClients_0", false, null); |
| |
| int size = 10; |
| |
| // Create Values on Server. |
| cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size); |
| |
| cqDUnitTest.waitForCreated(client1, "testCQWithMultipleClients_0", |
| CqQueryUsingPoolDUnitTest.KEY + 10); |
| |
| /* Validate the CQs */ |
| cqDUnitTest.validateCQ(client1, "testCQWithMultipleClients_0", |
| /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest, /* creates: */ size, /* updates: */ 0, |
| /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 0, /* queryDeletes: */ 0, |
| /* totalEvents: */ size); |
| |
| cqDUnitTest.waitForCreated(client2, "testCQWithMultipleClients_0", |
| CqQueryUsingPoolDUnitTest.KEY + 2); |
| |
| cqDUnitTest.validateCQ(client2, "testCQWithMultipleClients_0", |
| /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest, /* creates: */ 1, /* updates: */ 0, |
| /* deletes; */ 0, /* queryInserts: */ 1, /* queryUpdates: */ 0, /* queryDeletes: */ 0, |
| /* totalEvents: */ 1); |
| |
| /* Close Server and Client */ |
| cqDUnitTest.closeClient(client2); |
| cqDUnitTest.closeClient(client3); |
| cqDUnitTest.closeServer(server); |
| } |
| |
| /** |
| * Test for CQ when region is populated with net load. |
| */ |
| @Test |
| public void testCQWithLoad() throws Exception { |
| final Host host = Host.getHost(0); |
| VM server1 = host.getVM(0); |
| VM server2 = host.getVM(1); |
| |
| VM client = host.getVM(2); |
| |
| cqDUnitTest.createServer(server1, 0, false, MirrorType.KEYS_VALUES); |
| cqDUnitTest.createServer(server2, 0, false, MirrorType.KEYS); |
| |
| final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(server1.getHost()); |
| |
| String poolName = "testCQWithLoad"; |
| cqDUnitTest.createPool(client, poolName, host0, port1); |
| |
| // cqDUnitTest.createClient(client, port1, host0); |
| |
| // Create CQs. |
| cqDUnitTest.createCQ(client, poolName, "testCQWithLoad_0", cqDUnitTest.cqs[0]); |
| cqDUnitTest.executeCQ(client, "testCQWithLoad_0", false, null); |
| |
| Wait.pause(1 * 1000); |
| |
| final int size = 10; |
| |
| // CREATE VALUES. |
| cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], size); |
| |
| server1.invoke(new CacheSerializableRunnable("Load from second server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); |
| for (int i = 1; i <= size; i++) { |
| region1.get(CqQueryUsingPoolDUnitTest.KEY + i); |
| } |
| } |
| }); |
| |
| for (int i = 1; i <= size; i++) { |
| cqDUnitTest.waitForCreated(client, "testCQWithLoad_0", CqQueryUsingPoolDUnitTest.KEY + i); |
| } |
| |
| cqDUnitTest.validateCQ(client, "testCQWithLoad_0", CqQueryUsingPoolDUnitTest.noTest, size, 0, |
| 0); |
| |
| // Close. |
| cqDUnitTest.closeClient(client); |
| cqDUnitTest.closeServer(server1); |
| cqDUnitTest.closeServer(server2); |
| } |
| |
| /** |
| * Test for CQ when entries are evicted from region. |
| */ |
| @Test |
| public void testCQWithEviction() throws Exception { |
| final Host host = Host.getHost(0); |
| VM server1 = host.getVM(0); |
| VM server2 = host.getVM(1); |
| |
| VM client = host.getVM(2); |
| |
| final int evictionThreshold = 5; |
| server1.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| LogWriterUtils.getLogWriter().info("### Create Cache Server. ###"); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setMirrorType(MirrorType.NONE); |
| |
| // setting the eviction attributes. |
| factory |
| .setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(evictionThreshold)); |
| for (int i = 0; i < cqDUnitTest.regions.length; i++) { |
| Region region = createRegion(cqDUnitTest.regions[i], factory.createRegionAttributes()); |
| // Set CacheListener. |
| region.getAttributesMutator() |
| .addCacheListener(new CertifiableTestCacheListener(LogWriterUtils.getLogWriter())); |
| } |
| Wait.pause(2000); |
| |
| try { |
| cqDUnitTest.startBridgeServer(0, true); |
| } catch (Exception ex) { |
| Assert.fail("While starting CacheServer", ex); |
| } |
| Wait.pause(2000); |
| } |
| }); |
| |
| cqDUnitTest.createServer(server2, 0, false, MirrorType.NONE); |
| |
| final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(server1.getHost()); |
| |
| String poolName = "testCQWithEviction"; |
| cqDUnitTest.createPool(client, poolName, host0, port1); |
| |
| // cqDUnitTest.createClient(client, port1, host0); |
| |
| // Create CQs. |
| cqDUnitTest.createCQ(client, poolName, "testCQWithEviction_0", cqDUnitTest.cqs[0]); |
| |
| // This should fail as Region is not replicated. |
| // There is a bug37966 filed on this. |
| try { |
| cqDUnitTest.executeCQ(client, "testCQWithEviction_0", false, "CqException"); |
| fail("Should have thrown exception, cq not supported on Non-replicated region."); |
| } catch (Exception expected) { |
| // Ignore expected. |
| } |
| |
| Wait.pause(1 * 1000); |
| |
| final int size = 10; |
| |
| // CREATE VALUES. |
| cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], size); |
| |
| server1.invoke(new CacheSerializableRunnable("Load from second server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); |
| for (int i = 1; i <= size; i++) { |
| region1.get(CqQueryUsingPoolDUnitTest.KEY + i); |
| } |
| } |
| }); |
| |
| Wait.pause(2 * 1000); |
| |
| server1.invoke(new CacheSerializableRunnable("validate destroy") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(cqDUnitTest.regions[0]); |
| assertNotNull(region); |
| |
| Set keys = region.entrySet(); |
| int keyCnt = size - evictionThreshold; |
| assertEquals("Mismatch, number of keys in local region is not equal to the expected size", |
| keyCnt, keys.size()); |
| |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| for (int i = 1; i <= keyCnt; i++) { |
| ctl.waitForDestroyed(CqQueryUsingPoolDUnitTest.KEY + i); |
| assertNull(region.getEntry(CqQueryUsingPoolDUnitTest.KEY + i)); |
| } |
| } |
| }); |
| |
| // Close. |
| cqDUnitTest.closeClient(client); |
| cqDUnitTest.closeServer(server1); |
| cqDUnitTest.closeServer(server2); |
| } |
| |
| /** |
| * Test for CQ with establishCallBackConnection. |
| */ |
| @Test |
| public void testCQWithEstablishCallBackConnection() throws Exception { |
| final Host host = Host.getHost(0); |
| VM server1 = host.getVM(0); |
| VM client = host.getVM(1); |
| |
| cqDUnitTest.createServer(server1, 0, false, MirrorType.KEYS_VALUES); |
| |
| final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String serverHost = NetworkUtils.getServerHostName(server1.getHost()); |
| |
| final String poolName = "testCQWithEstablishCallBackConnection"; |
| |
| client.invoke(new CacheSerializableRunnable("createPool :" + poolName) { |
| @Override |
| public void run2() throws CacheException { |
| // Create Cache. |
| getCache(); |
| |
| PoolFactory cpf = PoolManager.createFactory(); |
| cpf.setSubscriptionEnabled(false); |
| cpf.addServer(serverHost, port1); |
| cpf.create(poolName); |
| } |
| }); |
| |
| // Create CQs. |
| cqDUnitTest.createCQ(client, poolName, "testCQWithEstablishCallBackConnection_0", |
| cqDUnitTest.cqs[0]); |
| |
| // This should fail. |
| try { |
| cqDUnitTest.executeCQ(client, "testCQWithEstablishCallBackConnection_0", false, |
| "CqException"); |
| fail("Test should have failed with connection with establishCallBackConnection not found."); |
| } catch (Exception expected) { |
| // Expected. |
| } |
| |
| // Close. |
| cqDUnitTest.closeClient(client); |
| cqDUnitTest.closeServer(server1); |
| } |
| |
| /** |
| * Test for: Region destroy, calls close on the server. Region clear triggers cqEvent with query |
| * op region clear. Region invalidate triggers cqEvent with query op region invalidate. |
| */ |
| @Test |
| public void testRegionEvents() throws Exception { |
| final Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client = host.getVM(1); |
| |
| cqDUnitTest.createServer(server); |
| final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(server.getHost()); |
| |
| String poolName = "testRegionEvents"; |
| cqDUnitTest.createPool(client, poolName, host0, port); |
| |
| // cqDUnitTest.createClient(client, port, host0); |
| |
| // Create CQ on regionA |
| cqDUnitTest.createCQ(client, poolName, "testRegionEvents_0", cqDUnitTest.cqs[0]); |
| cqDUnitTest.executeCQ(client, "testRegionEvents_0", false, null); |
| |
| // Create CQ on regionB |
| cqDUnitTest.createCQ(client, poolName, "testRegionEvents_1", cqDUnitTest.cqs[2]); |
| cqDUnitTest.executeCQ(client, "testRegionEvents_1", false, null); |
| |
| // Test for Event on Region Clear. |
| server.invoke(new CacheSerializableRunnable("testRegionEvents") { |
| @Override |
| public void run2() throws CacheException { |
| LogWriterUtils.getLogWriter().info("### Clearing the region on the server ###"); |
| Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); |
| for (int i = 1; i <= 5; i++) { |
| region.put(CqQueryUsingPoolDUnitTest.KEY + i, new Portfolio(i)); |
| } |
| region.clear(); |
| } |
| }); |
| |
| cqDUnitTest.waitForRegionClear(client, "testRegionEvents_0"); |
| |
| // Test for Event on Region invalidate. |
| server.invoke(new CacheSerializableRunnable("testRegionEvents") { |
| @Override |
| public void run2() throws CacheException { |
| LogWriterUtils.getLogWriter().info("### Invalidate the region on the server ###"); |
| Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); |
| for (int i = 1; i <= 5; i++) { |
| region.put(CqQueryUsingPoolDUnitTest.KEY + i, new Portfolio(i)); |
| } |
| region.invalidateRegion(); |
| } |
| }); |
| |
| cqDUnitTest.waitForRegionInvalidate(client, "testRegionEvents_0"); |
| |
| // Test for Event on Region destroy. |
| server.invoke(new CacheSerializableRunnable("testRegionEvents") { |
| @Override |
| public void run2() throws CacheException { |
| LogWriterUtils.getLogWriter().info("### Destroying the region on the server ###"); |
| Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[1]); |
| for (int i = 1; i <= 5; i++) { |
| region.put(CqQueryUsingPoolDUnitTest.KEY + i, new Portfolio(i)); |
| } |
| // this should close one cq on client. |
| region.destroyRegion(); |
| } |
| }); |
| |
| Wait.pause(1000); // wait for cq to close becuse of region destroy on server. |
| // cqDUnitTest.waitForClose(client,"testRegionEvents_1"); |
| cqDUnitTest.validateCQCount(client, 1); |
| |
| // Close. |
| cqDUnitTest.closeClient(client); |
| cqDUnitTest.closeServer(server); |
| } |
| |
| /** |
| * Test for events created during the CQ query execution. When CQs are executed using |
| * executeWithInitialResults there may be possibility that the region changes during that time may |
| * not be reflected in the query result set thus making the query data and region data |
| * inconsistent. |
| */ |
| @Test |
| public void testEventsDuringQueryExecution() throws Exception { |
| final Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| final VM client = host.getVM(1); |
| final String cqName = "testEventsDuringQueryExecution_0"; |
| cqDUnitTest.createServer(server); |
| |
| final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(server.getHost()); |
| |
| String poolName = "testEventsDuringQueryExecution"; |
| cqDUnitTest.createPool(client, poolName, host0, port); |
| |
| // create CQ. |
| cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]); |
| |
| final int numObjects = 200; |
| final int totalObjects = 500; |
| |
| // initialize Region. |
| server.invoke(new CacheSerializableRunnable("Update Region") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); |
| for (int i = 1; i <= numObjects; i++) { |
| Portfolio p = new Portfolio(i); |
| region.put("" + i, p); |
| } |
| } |
| }); |
| |
| // First set testhook in executeWithInitialResults so that queued events |
| // are not drained before we verify there number. |
| client.invoke(setTestHook()); |
| |
| // Execute CQ while update is in progress. |
| AsyncInvocation executeCq = |
| client.invokeAsync(new CacheSerializableRunnable("Execute CQ AsyncInvoke") { |
| @Override |
| public void run2() throws CacheException { |
| QueryService cqService = getCache().getQueryService(); |
| // Get CqQuery object. |
| CqQuery cq1 = cqService.getCq(cqName); |
| if (cq1 == null) { |
| fail("Failed to get CQ " + cqName); |
| } |
| SelectResults cqResults = null; |
| try { |
| cqResults = cq1.executeWithInitialResults(); |
| } catch (Exception ex) { |
| fail("CQ execution failed", ex); |
| } |
| |
| // Check num of events received during executeWithInitialResults. |
| final TestHook testHook = CqQueryImpl.testHook; |
| GeodeAwaitility.await().untilAsserted(new WaitCriterion() { |
| |
| @Override |
| public boolean done() { |
| return testHook.numQueuedEvents() > 0; |
| } |
| |
| @Override |
| public String description() { |
| return "No queued events found."; |
| } |
| }); |
| |
| getCache().getLogger().fine("Queued Events Size" + testHook.numQueuedEvents()); |
| // Make sure CQEvents are queued during execute with initial results. |
| |
| CqQueryTestListener cqListener = |
| (CqQueryTestListener) cq1.getCqAttributes().getCqListener(); |
| // Wait for the last key to arrive. |
| cqListener.waitForCreated("" + totalObjects); |
| |
| // Check if the events from CqListener are in order. |
| int oldId = 0; |
| for (Object cqEvent : cqListener.events.toArray()) { |
| int newId = new Integer(cqEvent.toString()).intValue(); |
| if (oldId > newId) { |
| fail("Queued events for CQ Listener during execution with " |
| + "Initial results is not in the order in which they are created."); |
| } |
| oldId = newId; |
| } |
| |
| // Check if all the IDs are present as part of Select Results and CQ Events. |
| HashSet ids = new HashSet(cqListener.events); |
| |
| for (Object o : cqResults.asList()) { |
| Struct s = (Struct) o; |
| ids.add(s.get("key")); |
| } |
| |
| HashSet missingIds = new HashSet(); |
| String key = ""; |
| for (int i = 1; i <= totalObjects; i++) { |
| key = "" + i; |
| if (!(ids.contains(key))) { |
| missingIds.add(key); |
| } |
| } |
| |
| if (!missingIds.isEmpty()) { |
| fail("Missing Keys in either ResultSet or the Cq Event list. " |
| + " Missing keys : [size : " + missingIds.size() + "]" + missingIds |
| + " Ids in ResultSet and CQ Events :" + ids); |
| } |
| } |
| }); |
| |
| // Keep updating region (async invocation). |
| server.invoke(new CacheSerializableRunnable("Update Region") { |
| @Override |
| public void run2() throws CacheException { |
| Wait.pause(200); |
| client.invoke(new CacheSerializableRunnable("Releasing the latch") { |
| @Override |
| public void run2() throws CacheException { |
| // Now release the testhook so that CQListener can proceed. |
| final TestHook testHook = CqQueryImpl.testHook; |
| testHook.ready(); |
| } |
| }); |
| Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); |
| for (int i = numObjects + 1; i <= totalObjects; i++) { |
| Portfolio p = new Portfolio(i); |
| region.put("" + i, p); |
| } |
| } |
| }); |
| |
| // Close. |
| cqDUnitTest.closeClient(client); |
| cqDUnitTest.closeServer(server); |
| } |
| |
| @Test |
| public void testCqStatInitializationTimingIssue() { |
| disconnectAllFromDS(); |
| |
| // The async close can cause this exception on the server |
| IgnoredException.addIgnoredException("java.net.SocketException: Broken pipe"); |
| final String regionName = "testCqStatInitializationTimingIssue"; |
| final String cq1Name = "testCq1"; |
| final Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client = host.getVM(1); |
| VM client2 = host.getVM(2); |
| |
| // Start server 1 |
| final int server1Port = ((Integer) server |
| .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true)))) |
| .intValue(); |
| |
| // Start a client |
| client.invoke(() -> CacheServerTestUtil.createCacheClient( |
| getClientPool(NetworkUtils.getServerHostName(client.getHost()), server1Port), regionName)); |
| |
| // Start a pub client |
| client2.invoke(() -> CacheServerTestUtil.createCacheClient( |
| getClientPool(NetworkUtils.getServerHostName(client2.getHost()), server1Port), regionName)); |
| |
| // client has thread that invokes new and remove cq over and over |
| client.invokeAsync(new CacheSerializableRunnable("Register cq") { |
| @Override |
| public void run2() throws CacheException { |
| for (int i = 0; i < 10000; i++) { |
| CqQuery query = createCq(regionName, cq1Name); |
| if (query != null) { |
| try { |
| query.close(); |
| } catch (Exception e) { |
| fail("exception while closing cq:", e); |
| } |
| } |
| } |
| } |
| }); |
| |
| client2.invokeAsync(new CacheSerializableRunnable("pub updates") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = CacheServerTestUtil.getCache().getRegion(regionName); |
| while (true) { |
| for (int i = 0; i < 50000; i++) { |
| region.put("" + i, "" + Math.random()); |
| } |
| } |
| } |
| }); |
| |
| server.invokeAsync(new CacheSerializableRunnable("pub updates") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = CacheServerTestUtil.getCache().getRegion(regionName); |
| while (true) { |
| for (int i = 0; i < 50000; i++) { |
| region.put("" + i, "" + Math.random()); |
| } |
| } |
| } |
| }); |
| |
| // client has another thread that retrieves cq map and checks stat over and over |
| client.invoke(new CacheSerializableRunnable("Check Stats") { |
| @Override |
| public void run2() throws CacheException { |
| for (int i = 0; i < 10000; i++) { |
| checkCqStats(cq1Name); |
| } |
| } |
| }); |
| |
| client.invoke(() -> CacheServerTestUtil.closeCache()); |
| client2.invoke(() -> CacheServerTestUtil.closeCache()); |
| server.invoke(() -> CacheServerTestUtil.closeCache()); |
| } |
| |
| @Test |
| public void testGetDurableCQsFromPoolOnly() throws Exception { |
| final String regionName = "regionA"; |
| final Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client1 = host.getVM(1); |
| VM client2 = host.getVM(2); |
| |
| /* Create Server and Client */ |
| cqDUnitTest.createServer(server); |
| final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(server.getHost()); |
| |
| final String poolName1 = "pool1"; |
| final String poolName2 = "pool2"; |
| |
| cqDUnitTest.createPool(client1, poolName1, host0, port); |
| cqDUnitTest.createPool(client2, poolName2, host0, port); |
| |
| client1.invoke(new CacheSerializableRunnable("Register cq for client 1") { |
| @Override |
| public void run2() throws CacheException { |
| |
| QueryService queryService = null; |
| try { |
| queryService = (PoolManager.find(poolName1)).getQueryService(); |
| } catch (Exception cqe) { |
| Assert.fail("Failed to getCQService.", cqe); |
| } |
| try { |
| CqAttributesFactory cqAf = new CqAttributesFactory(); |
| CqAttributes attributes = cqAf.create(); |
| queryService.newCq("client1DCQ1", "Select * From /root/" + regionName + " where id = 1", |
| attributes, true).execute(); |
| queryService.newCq("client1DCQ2", "Select * From /root/" + regionName + " where id = 10", |
| attributes, true).execute(); |
| queryService.newCq("client1NoDC1", "Select * From /root/" + regionName, attributes, false) |
| .execute(); |
| queryService.newCq("client1NoDC2", "Select * From /root/" + regionName + " where id = 3", |
| attributes, false).execute(); |
| } catch (CqException e) { |
| fail("failed", e); |
| } catch (CqExistsException e) { |
| fail("failed", e); |
| } catch (RegionNotFoundException e) { |
| fail("failed", e); |
| } |
| } |
| }); |
| |
| client2.invoke(new CacheSerializableRunnable("Register cq for client 2") { |
| @Override |
| public void run2() throws CacheException { |
| |
| QueryService queryService = null; |
| try { |
| queryService = (PoolManager.find(poolName2)).getQueryService(); |
| } catch (Exception cqe) { |
| Assert.fail("Failed to getCQService.", cqe); |
| } |
| try { |
| CqAttributesFactory cqAf = new CqAttributesFactory(); |
| CqAttributes attributes = cqAf.create(); |
| queryService.newCq("client2DCQ1", "Select * From /root/" + regionName + " where id = 1", |
| attributes, true).execute(); |
| queryService.newCq("client2DCQ2", "Select * From /root/" + regionName + " where id = 10", |
| attributes, true).execute(); |
| queryService.newCq("client2DCQ3", "Select * From /root/" + regionName, attributes, true) |
| .execute(); |
| queryService.newCq("client2DCQ4", "Select * From /root/" + regionName + " where id = 3", |
| attributes, true).execute(); |
| } catch (CqException e) { |
| fail("failed", e); |
| } catch (CqExistsException e) { |
| fail("failed", e); |
| } catch (RegionNotFoundException e) { |
| fail("failed", e); |
| } |
| } |
| }); |
| |
| client2.invoke(new CacheSerializableRunnable("test getDurableCQsFromServer for client2") { |
| @Override |
| public void run2() throws CacheException { |
| QueryService queryService = null; |
| try { |
| queryService = (PoolManager.find(poolName2)).getQueryService(); |
| } catch (Exception cqe) { |
| Assert.fail("Failed to getCQService.", cqe); |
| } |
| List<String> list = null; |
| try { |
| list = queryService.getAllDurableCqsFromServer(); |
| } catch (CqException e) { |
| fail("failed", e); |
| } |
| assertEquals(4, list.size()); |
| assertTrue(list.contains("client2DCQ1")); |
| assertTrue(list.contains("client2DCQ2")); |
| assertTrue(list.contains("client2DCQ3")); |
| assertTrue(list.contains("client2DCQ4")); |
| } |
| }); |
| |
| client1.invoke(new CacheSerializableRunnable("test getDurableCQsFromServer for client1") { |
| @Override |
| public void run2() throws CacheException { |
| QueryService queryService = null; |
| try { |
| queryService = (PoolManager.find(poolName1)).getQueryService(); |
| } catch (Exception cqe) { |
| Assert.fail("Failed to getCQService.", cqe); |
| } |
| List<String> list = null; |
| try { |
| list = queryService.getAllDurableCqsFromServer(); |
| } catch (CqException e) { |
| fail("failed", e); |
| } |
| assertEquals(2, list.size()); |
| assertTrue(list.contains("client1DCQ1")); |
| assertTrue(list.contains("client1DCQ2")); |
| } |
| }); |
| |
| cqDUnitTest.closeClient(client2); |
| cqDUnitTest.closeClient(client1); |
| cqDUnitTest.closeServer(server); |
| } |
| |
| @Test |
| public void testGetDurableCQsFromServerWithEmptyList() throws Exception { |
| final Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client1 = host.getVM(1); |
| |
| /* Create Server and Client */ |
| cqDUnitTest.createServer(server); |
| final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(server.getHost()); |
| |
| final String poolName1 = "pool1"; |
| |
| cqDUnitTest.createPool(client1, poolName1, host0, port); |
| |
| client1.invoke(new CacheSerializableRunnable("test getDurableCQsFromServer for client1") { |
| @Override |
| public void run2() throws CacheException { |
| QueryService queryService = null; |
| try { |
| queryService = (PoolManager.find(poolName1)).getQueryService(); |
| } catch (Exception cqe) { |
| Assert.fail("Failed to getCQService.", cqe); |
| } |
| List<String> list = null; |
| try { |
| list = queryService.getAllDurableCqsFromServer(); |
| } catch (CqException e) { |
| fail("failed", e); |
| } |
| assertEquals(0, list.size()); |
| assertFalse(list.contains("client1DCQ1")); |
| assertFalse(list.contains("client1DCQ2")); |
| } |
| }); |
| |
| cqDUnitTest.closeClient(client1); |
| cqDUnitTest.closeServer(server); |
| } |
| |
| @Test |
| public void testGetDurableCqsFromServer() { |
| disconnectAllFromDS(); |
| |
| final String regionName = "testGetAllDurableCqsFromServer"; |
| final String cq1Name = "testCq1"; |
| final Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client1 = host.getVM(1); |
| VM client2 = host.getVM(2); |
| |
| // Start server 1 |
| final int server1Port = ((Integer) server |
| .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true)))) |
| .intValue(); |
| |
| // Start client 1 |
| client1.invoke(() -> CacheServerTestUtil.createClientCache( |
| getClientPool(NetworkUtils.getServerHostName(client1.getHost()), server1Port), regionName)); |
| |
| // Start client 2 |
| client2.invoke(() -> CacheServerTestUtil.createClientCache( |
| getClientPool(NetworkUtils.getServerHostName(client2.getHost()), server1Port), regionName)); |
| |
| createClient1CqsAndDurableCqs(client1, regionName); |
| createClient2CqsAndDurableCqs(client2, regionName); |
| |
| client2.invoke(new CacheSerializableRunnable("check durable cqs for client 2") { |
| @Override |
| public void run2() throws CacheException { |
| QueryService queryService = CacheServerTestUtil.getCache().getQueryService(); |
| List<String> list = null; |
| try { |
| list = queryService.getAllDurableCqsFromServer(); |
| } catch (CqException e) { |
| fail("failed", e); |
| } |
| assertEquals(4, list.size()); |
| assertTrue(list.contains("client2DCQ1")); |
| assertTrue(list.contains("client2DCQ2")); |
| assertTrue(list.contains("client2DCQ3")); |
| assertTrue(list.contains("client2DCQ4")); |
| } |
| }); |
| |
| client1.invoke(new CacheSerializableRunnable("check durable cqs for client 1") { |
| @Override |
| public void run2() throws CacheException { |
| QueryService queryService = CacheServerTestUtil.getCache().getQueryService(); |
| List<String> list = null; |
| try { |
| list = queryService.getAllDurableCqsFromServer(); |
| } catch (CqException e) { |
| fail("failed", e); |
| } |
| assertEquals(2, list.size()); |
| assertTrue(list.contains("client1DCQ1")); |
| assertTrue(list.contains("client1DCQ2")); |
| } |
| }); |
| |
| client1.invoke(() -> CacheServerTestUtil.closeCache()); |
| client2.invoke(() -> CacheServerTestUtil.closeCache()); |
| server.invoke(() -> CacheServerTestUtil.closeCache()); |
| } |
| |
| @Test |
| public void testGetDurableCqsFromServerCycleClients() { |
| disconnectAllFromDS(); |
| |
| final String regionName = "testGetAllDurableCqsFromServerCycleClients"; |
| final Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client1 = host.getVM(1); |
| VM client2 = host.getVM(2); |
| int timeout = 60000; |
| // Start server 1 |
| final int server1Port = ((Integer) server |
| .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true)))) |
| .intValue(); |
| |
| // Start client 1 |
| client1.invoke(() -> CacheServerTestUtil.createClientCache( |
| getClientPool(NetworkUtils.getServerHostName(client1.getHost()), server1Port), regionName, |
| getDurableClientProperties("client1_dc", timeout))); |
| |
| // Start client 2 |
| client2.invoke(() -> CacheServerTestUtil.createClientCache( |
| getClientPool(NetworkUtils.getServerHostName(client2.getHost()), server1Port), regionName, |
| getDurableClientProperties("client2_dc", timeout))); |
| |
| createClient1CqsAndDurableCqs(client1, regionName); |
| createClient2CqsAndDurableCqs(client2, regionName); |
| |
| cycleDurableClient(client1, "client1_dc", server1Port, regionName, timeout); |
| cycleDurableClient(client2, "client2_dc", server1Port, regionName, timeout); |
| |
| client2.invoke(new CacheSerializableRunnable("check durable cqs for client 2") { |
| @Override |
| public void run2() throws CacheException { |
| QueryService queryService = CacheServerTestUtil.getCache().getQueryService(); |
| List<String> list = null; |
| try { |
| list = queryService.getAllDurableCqsFromServer(); |
| } catch (CqException e) { |
| fail("failed", e); |
| } |
| assertEquals(4, list.size()); |
| assertTrue(list.contains("client2DCQ1")); |
| assertTrue(list.contains("client2DCQ2")); |
| assertTrue(list.contains("client2DCQ3")); |
| assertTrue(list.contains("client2DCQ4")); |
| } |
| }); |
| |
| client1.invoke(new CacheSerializableRunnable("check durable cqs for client 1") { |
| @Override |
| public void run2() throws CacheException { |
| QueryService queryService = CacheServerTestUtil.getCache().getQueryService(); |
| List<String> list = null; |
| try { |
| list = queryService.getAllDurableCqsFromServer(); |
| } catch (CqException e) { |
| fail("failed", e); |
| } |
| assertEquals(2, list.size()); |
| assertTrue(list.contains("client1DCQ1")); |
| assertTrue(list.contains("client1DCQ2")); |
| } |
| }); |
| |
| client1.invoke(() -> CacheServerTestUtil.closeCache()); |
| client2.invoke(() -> CacheServerTestUtil.closeCache()); |
| server.invoke(() -> CacheServerTestUtil.closeCache()); |
| } |
| |
| @Test |
| public void testGetDurableCqsFromServerCycleClientsAndMoreCqs() { |
| final String regionName = "testGetAllDurableCqsFromServerCycleClients"; |
| final Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client1 = host.getVM(1); |
| VM client2 = host.getVM(2); |
| int timeout = 60000; |
| // Start server 1 |
| final int server1Port = ((Integer) server |
| .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true)))) |
| .intValue(); |
| |
| // Start client 1 |
| client1.invoke(() -> CacheServerTestUtil.createClientCache( |
| getClientPool(NetworkUtils.getServerHostName(client1.getHost()), server1Port), regionName, |
| getDurableClientProperties("client1_dc", timeout))); |
| |
| // Start client 2 |
| client2.invoke(() -> CacheServerTestUtil.createClientCache( |
| getClientPool(NetworkUtils.getServerHostName(client2.getHost()), server1Port), regionName, |
| getDurableClientProperties("client2_dc", timeout))); |
| |
| // create the test cqs |
| createClient1CqsAndDurableCqs(client1, regionName); |
| createClient2CqsAndDurableCqs(client2, regionName); |
| |
| cycleDurableClient(client1, "client1_dc", server1Port, regionName, timeout); |
| cycleDurableClient(client2, "client2_dc", server1Port, regionName, timeout); |
| |
| client1.invoke(new CacheSerializableRunnable("Register more cq for client 1") { |
| @Override |
| public void run2() throws CacheException { |
| // register the cq's |
| QueryService queryService = CacheServerTestUtil.getCache().getQueryService(); |
| CqAttributesFactory cqAf = new CqAttributesFactory(); |
| CqAttributes attributes = cqAf.create(); |
| try { |
| queryService.newCq("client1MoreDCQ1", "Select * From /" + regionName + " where id = 1", |
| attributes, true).execute(); |
| queryService.newCq("client1MoreDCQ2", "Select * From /" + regionName + " where id = 10", |
| attributes, true).execute(); |
| queryService.newCq("client1MoreNoDC1", "Select * From /" + regionName, attributes, false) |
| .execute(); |
| queryService.newCq("client1MoreNoDC2", "Select * From /" + regionName + " where id = 3", |
| attributes, false).execute(); |
| } catch (RegionNotFoundException e) { |
| fail("failed", e); |
| } catch (CqException e) { |
| fail("failed", e); |
| } catch (CqExistsException e) { |
| fail("failed", e); |
| } |
| } |
| }); |
| |
| client2.invoke(new CacheSerializableRunnable("Register more cq for client 2") { |
| @Override |
| public void run2() throws CacheException { |
| // register the cq's |
| QueryService queryService = CacheServerTestUtil.getCache().getQueryService(); |
| CqAttributesFactory cqAf = new CqAttributesFactory(); |
| CqAttributes attributes = cqAf.create(); |
| try { |
| queryService.newCq("client2MoreDCQ1", "Select * From /" + regionName + " where id = 1", |
| attributes, true).execute(); |
| queryService.newCq("client2MoreDCQ2", "Select * From /" + regionName + " where id = 10", |
| attributes, true).execute(); |
| queryService.newCq("client2MoreDCQ3", "Select * From /" + regionName, attributes, true) |
| .execute(); |
| queryService.newCq("client2MoreDCQ4", "Select * From /" + regionName + " where id = 3", |
| attributes, true).execute(); |
| } catch (RegionNotFoundException e) { |
| fail("failed", e); |
| } catch (CqException e) { |
| fail("failed", e); |
| } catch (CqExistsException e) { |
| fail("failed", e); |
| } |
| } |
| }); |
| |
| // Cycle clients a second time |
| cycleDurableClient(client1, "client1_dc", server1Port, regionName, timeout); |
| cycleDurableClient(client2, "client2_dc", server1Port, regionName, timeout); |
| |
| client2.invoke(new CacheSerializableRunnable("check durable cqs for client 2") { |
| @Override |
| public void run2() throws CacheException { |
| QueryService queryService = CacheServerTestUtil.getCache().getQueryService(); |
| List<String> list = null; |
| try { |
| list = queryService.getAllDurableCqsFromServer(); |
| } catch (CqException e) { |
| fail("failed", e); |
| } |
| assertEquals(8, list.size()); |
| assertTrue(list.contains("client2DCQ1")); |
| assertTrue(list.contains("client2DCQ2")); |
| assertTrue(list.contains("client2DCQ3")); |
| assertTrue(list.contains("client2DCQ4")); |
| assertTrue(list.contains("client2MoreDCQ1")); |
| assertTrue(list.contains("client2MoreDCQ2")); |
| assertTrue(list.contains("client2MoreDCQ3")); |
| assertTrue(list.contains("client2MoreDCQ4")); |
| } |
| }); |
| |
| client1.invoke(new CacheSerializableRunnable("check durable cqs for client 1") { |
| @Override |
| public void run2() throws CacheException { |
| QueryService queryService = CacheServerTestUtil.getCache().getQueryService(); |
| List<String> list = null; |
| try { |
| list = queryService.getAllDurableCqsFromServer(); |
| } catch (CqException e) { |
| fail("failed", e); |
| } |
| assertEquals(4, list.size()); |
| assertTrue(list.contains("client1DCQ1")); |
| assertTrue(list.contains("client1DCQ2")); |
| assertTrue(list.contains("client1MoreDCQ1")); |
| assertTrue(list.contains("client1MoreDCQ2")); |
| } |
| }); |
| |
| client1.invoke(() -> CacheServerTestUtil.closeCache()); |
| client2.invoke(() -> CacheServerTestUtil.closeCache()); |
| server.invoke(() -> CacheServerTestUtil.closeCache()); |
| } |
| |
| private Properties getDurableClientProperties(String durableClientId, int durableClientTimeout) { |
| Properties properties = new Properties(); |
| properties.setProperty(MCAST_PORT, "0"); |
| properties.setProperty(LOCATORS, ""); |
| properties.setProperty(DURABLE_CLIENT_ID, durableClientId); |
| properties.setProperty(DURABLE_CLIENT_TIMEOUT, String.valueOf(durableClientTimeout)); |
| return properties; |
| } |
| |
| // helper to create durable cqs to test out getAllDurableCqs functionality |
| private void createClient1CqsAndDurableCqs(VM client, final String regionName) { |
| client.invoke(new CacheSerializableRunnable("Register cq for client 1") { |
| @Override |
| public void run2() throws CacheException { |
| // register the cq's |
| QueryService queryService = CacheServerTestUtil.getCache().getQueryService(); |
| CqAttributesFactory cqAf = new CqAttributesFactory(); |
| CqAttributes attributes = cqAf.create(); |
| try { |
| queryService.newCq("client1DCQ1", "Select * From /" + regionName + " where id = 1", |
| attributes, true).execute(); |
| queryService.newCq("client1DCQ2", "Select * From /" + regionName + " where id = 10", |
| attributes, true).execute(); |
| queryService.newCq("client1NoDC1", "Select * From /" + regionName, attributes, false) |
| .execute(); |
| queryService.newCq("client1NoDC2", "Select * From /" + regionName + " where id = 3", |
| attributes, false).execute(); |
| } catch (RegionNotFoundException e) { |
| fail("failed", e); |
| } catch (CqException e) { |
| fail("failed", e); |
| } catch (CqExistsException e) { |
| fail("failed", e); |
| } |
| } |
| }); |
| } |
| |
| private void createClient2CqsAndDurableCqs(VM client, final String regionName) { |
| client.invoke(new CacheSerializableRunnable("Register cq for client 2") { |
| @Override |
| public void run2() throws CacheException { |
| // register the cq's |
| QueryService queryService = CacheServerTestUtil.getCache().getQueryService(); |
| CqAttributesFactory cqAf = new CqAttributesFactory(); |
| CqAttributes attributes = cqAf.create(); |
| try { |
| queryService.newCq("client2DCQ1", "Select * From /" + regionName + " where id = 1", |
| attributes, true).execute(); |
| queryService.newCq("client2DCQ2", "Select * From /" + regionName + " where id = 10", |
| attributes, true).execute(); |
| queryService.newCq("client2DCQ3", "Select * From /" + regionName, attributes, true) |
| .execute(); |
| queryService.newCq("client2DCQ4", "Select * From /" + regionName + " where id = 3", |
| attributes, true).execute(); |
| } catch (RegionNotFoundException e) { |
| fail("failed", e); |
| } catch (CqException e) { |
| fail("failed", e); |
| } catch (CqExistsException e) { |
| fail("failed", e); |
| } |
| |
| } |
| }); |
| } |
| |
| private void cycleDurableClient(VM client, final String dcName, final int serverPort, |
| final String regionName, final int durableClientTimeout) { |
| client.invoke(new CacheSerializableRunnable("cycle client") { |
| @Override |
| public void run2() throws CacheException { |
| CacheServerTestUtil.closeCache(true); |
| } |
| }); |
| |
| client.invoke(() -> CacheServerTestUtil.createClientCache( |
| getClientPool(NetworkUtils.getServerHostName(client.getHost()), serverPort), regionName, |
| getDurableClientProperties(dcName, durableClientTimeout))); |
| } |
| |
| private CqQuery createCq(String regionName, String cqName) { |
| // Create CQ Attributes. |
| CqAttributesFactory cqAf = new CqAttributesFactory(); |
| |
| // Initialize and set CqListener. |
| CqListener[] cqListeners = {new CqListener() { |
| @Override |
| public void close() {} |
| |
| @Override |
| public void onEvent(CqEvent aCqEvent) {} |
| |
| @Override |
| public void onError(CqEvent aCqEvent) {} |
| }}; |
| cqAf.initCqListeners(cqListeners); |
| CqAttributes cqa = cqAf.create(); |
| |
| // Create cq's |
| // Get the query service for the Pool |
| QueryService queryService = CacheServerTestUtil.getCache().getQueryService(); |
| CqQuery query = null; |
| try { |
| query = queryService.newCq(cqName, "Select * from /" + regionName, cqa); |
| query.execute(); |
| } catch (CqExistsException e) { |
| fail("Could not find specified region:" + regionName + ":", e); |
| } catch (CqException e) { |
| fail("Could not find specified region:" + regionName + ":", e); |
| |
| } catch (RegionNotFoundException e) { |
| fail("Could not find specified region:" + regionName + ":", e); |
| } |
| return query; |
| } |
| |
| private void checkCqStats(String cqName) { |
| QueryService queryService = CacheServerTestUtil.getPool().getQueryService(); |
| CqQueryImpl query = null; |
| query = (CqQueryImpl) queryService.getCq(cqName); |
| if (query != null) { |
| query.getVsdStats(); |
| query.getVsdStats().getNumEvents(); |
| } |
| } |
| |
| private Pool getClientPool(String host, int serverPort) { |
| PoolFactory pf = PoolManager.createFactory(); |
| pf.addServer(host, serverPort).setSubscriptionAckInterval(1).setSubscriptionEnabled(true); |
| return ((PoolFactoryImpl) pf).getPoolAttributes(); |
| } |
| |
| public CacheSerializableRunnable setTestHook() { |
| SerializableRunnable sr = new CacheSerializableRunnable("TestHook") { |
| @Override |
| public void run2() { |
| class CqQueryTestHook implements CqQueryImpl.TestHook { |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| private int numEvents = 0; |
| |
| @Override |
| public void pauseUntilReady() { |
| try { |
| logger.debug("CqQueryTestHook: Going to wait on latch until ready is called."); |
| if (!latch.await(10, TimeUnit.SECONDS)) { |
| fail("query was never unlatched"); |
| } |
| } catch (Exception e) { |
| fail("interrupted", e); |
| } |
| } |
| |
| @Override |
| public void ready() { |
| latch.countDown(); |
| logger.debug("CqQueryTestHook: The latch has been released."); |
| } |
| |
| @Override |
| public int numQueuedEvents() { |
| return numEvents; |
| } |
| |
| @Override |
| public void setEventCount(int count) { |
| logger.debug("CqQueryTestHook: Setting numEVents to: " + count); |
| numEvents = count; |
| } |
| }; |
| CqQueryImpl.testHook = new CqQueryTestHook(); |
| } |
| }; |
| return (CacheSerializableRunnable) sr; |
| } |
| |
| } |