| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.cache.query.dunit; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.util.Comparator; |
| import java.util.Iterator; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.logging.log4j.Logger; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.client.ClientCache; |
| import org.apache.geode.cache.client.ClientRegionShortcut; |
| import org.apache.geode.cache.client.PoolFactory; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.query.IndexType; |
| import org.apache.geode.cache.query.Query; |
| import org.apache.geode.cache.query.QueryService; |
| import org.apache.geode.cache.query.SelectResults; |
| import org.apache.geode.cache.query.Struct; |
| import org.apache.geode.cache.query.internal.DefaultQuery; |
| import org.apache.geode.cache.query.internal.QueryObserverAdapter; |
| import org.apache.geode.cache.query.internal.QueryObserverHolder; |
| import org.apache.geode.cache.query.types.ObjectType; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.cache30.CacheSerializableRunnable; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.test.dunit.Assert; |
| import org.apache.geode.test.dunit.DistributedTestUtils; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.IgnoredException; |
| import org.apache.geode.test.dunit.NetworkUtils; |
| import org.apache.geode.test.dunit.SerializableRunnable; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.Wait; |
| import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; |
| import org.apache.geode.test.junit.categories.OQLQueryTest; |
| |
| /** |
| * Tests remote (client/server) query execution. |
| * |
| * @since GemFire 5.0.1 |
| */ |
| @Category({OQLQueryTest.class}) |
| public class QueryUsingPoolDUnitTest extends JUnit4CacheTestCase { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** |
| * The port on which the cache server was started in this VM |
| */ |
| private static int bridgeServerPort; |
| |
| private String rootRegionName; |
| private String regionName; |
| private String regName; |
| |
| // Used with compiled queries. |
| private String[] queryString; |
| |
| @Override |
| public final void postSetUp() throws Exception { |
| this.rootRegionName = "root"; |
| this.regionName = this.getName(); |
| this.regName = "/" + this.rootRegionName + "/" + this.regionName; |
| |
| this.queryString = |
| new String[] {"SELECT itr.value FROM " + this.regName + ".entries itr where itr.key = $1", // 0 |
| "SELECT DISTINCT * FROM " + this.regName + " WHERE id < $1 ORDER BY id", // 1 |
| "SELECT DISTINCT * FROM " + this.regName + " WHERE id < $1 ORDER BY id", // 2 |
| "(SELECT DISTINCT * FROM " + this.regName + " WHERE id < $1).size", // 3 |
| "SELECT * FROM " + this.regName + " WHERE id = $1 and Ticker = $2", // 4 |
| "SELECT * FROM " + this.regName + " WHERE id < $1 and Ticker = $2", // 5 |
| }; |
| |
| disconnectAllFromDS(); |
| IgnoredException.addIgnoredException("Connection reset"); |
| IgnoredException.addIgnoredException("Socket input is shutdown"); |
| IgnoredException.addIgnoredException("Connection refused"); |
| } |
| |
| @Override |
| public final void postTearDownCacheTestCase() throws Exception { |
| disconnectAllFromDS(); |
| } |
| |
| public void createPool(String poolName, String server, int port, boolean subscriptionEnabled) { |
| createPool(poolName, new String[] {server}, new int[] {port}, subscriptionEnabled); |
| } |
| |
| public void createPool(String poolName, String server, int port) { |
| createPool(poolName, new String[] {server}, new int[] {port}, false); |
| } |
| |
| public void createPool(final String poolName, final String[] servers, final int[] ports, |
| final boolean subscriptionEnabled) { |
| // Create Cache. |
| getCache(true); |
| |
| PoolFactory poolFactory = PoolManager.createFactory(); |
| poolFactory.setSubscriptionEnabled(subscriptionEnabled); |
| for (int i = 0; i < servers.length; i++) { |
| logger.info("### Adding to Pool. ### Server : " + servers[i] + " Port : " + ports[i]); |
| poolFactory.addServer(servers[i], ports[i]); |
| } |
| poolFactory.setMinConnections(1); |
| poolFactory.setMaxConnections(5); |
| poolFactory.setRetryAttempts(5); |
| poolFactory.create(poolName); |
| } |
| |
| public void validateCompiledQuery(final long compiledQueryCount) { |
| await().timeout(60, TimeUnit.SECONDS) |
| .until(() -> CacheClientNotifier.getInstance().getStats() |
| .getCompiledQueryCount() == compiledQueryCount); |
| } |
| |
| /** |
| * Tests remote import query execution. |
| */ |
| @Test |
| public void testRemoteImportQueries() throws CacheException { |
| |
| final String name = this.getName(); |
| final String rootRegionName = "root"; |
| |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| final int numberOfEntries = 100; |
| |
| // Start server |
| vm0.invoke("Create cache server", () -> { |
| createAndStartBridgeServer(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| createRegion(name, rootRegionName, factory.create()); |
| }); |
| |
| // Initialize server region |
| vm0.invoke("Create cache server", () -> { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < numberOfEntries; i++) { |
| region.put("key-" + i, new TestObject(i, "ibm")); |
| } |
| }); |
| |
| final int port = |
| vm0.invoke("GetCacheServerPort", () -> QueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| final String regionName = "/" + rootRegionName + "/" + name; |
| |
| // Create client pool. |
| final String poolName = "testRemoteImportQueries"; |
| vm1.invoke("createPool", () -> createPool(poolName, host0, port)); |
| |
| // Execute client queries |
| vm1.invoke("Execute queries", () -> { |
| String queryString = null; |
| SelectResults results = null; |
| |
| QueryService qService = null; |
| |
| try { |
| qService = (PoolManager.find(poolName)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| |
| queryString = |
| "import org.apache.geode.admin.QueryUsingPoolDUnitTest.TestObject; select distinct * from " |
| + regionName; |
| |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| |
| assertEquals(numberOfEntries, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates()); |
| |
| queryString = |
| "import org.apache.geode.admin.QueryUsingPoolDUnitTest.TestObject; select distinct * from " |
| + regionName + " where ticker = 'ibm'"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(numberOfEntries, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates()); |
| |
| queryString = |
| "import org.apache.geode.admin.QueryUsingPoolDUnitTest.TestObject; select distinct * from " |
| + regionName + " where ticker = 'IBM'"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(0, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates()); |
| |
| queryString = |
| "import org.apache.geode.admin.QueryUsingPoolDUnitTest.TestObject; select distinct * from " |
| + regionName + " where price > 49"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(numberOfEntries / 2, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates()); |
| |
| queryString = |
| "import org.apache.geode.admin.QueryUsingPoolDUnitTest.TestObject; select distinct * from " |
| + regionName + " where price = 50"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(1, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates()); |
| |
| queryString = |
| "import org.apache.geode.admin.QueryUsingPoolDUnitTest.TestObject; select distinct * from " |
| + regionName + " where ticker = 'ibm' and price = 50"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(1, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates()); |
| }); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> stopBridgeServer(getCache())); |
| } |
| |
| /** |
| * Tests remote struct query execution. |
| */ |
| @Test |
| public void testRemoteStructQueries() throws CacheException { |
| |
| final String name = this.getName(); |
| final String rootRegionName = "root"; |
| |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| final int numberOfEntries = 100; |
| |
| // Start server |
| final int port = vm0.invoke("Create cache server", () -> { |
| setupBridgeServerAndCreateData(name, numberOfEntries); |
| return getCacheServerPort(); |
| }); |
| |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| final String regionName = "/" + rootRegionName + "/" + name; |
| |
| // Create client pool. |
| final String poolName = "testRemoteStructQueries"; |
| vm1.invoke("createPool", () -> createPool(poolName, host0, port)); |
| |
| // Execute client queries |
| vm1.invoke("Execute queries", () -> { |
| String queryString = null; |
| SelectResults results = null; |
| |
| QueryService qService = null; |
| |
| try { |
| qService = (PoolManager.find(poolName)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| |
| queryString = |
| "import org.apache.geode.admin.QueryUsingPoolDUnitTest.TestObject; select distinct ticker, price from " |
| + regionName; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(numberOfEntries, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates() |
| && results.getCollectionType().getElementType().isStructType()); |
| |
| queryString = |
| "import org.apache.geode.admin.QueryUsingPoolDUnitTest.TestObject; select distinct ticker, price from " |
| + regionName + " where ticker = 'ibm'"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(numberOfEntries, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates() |
| && results.getCollectionType().getElementType().isStructType()); |
| |
| queryString = |
| "import org.apache.geode.admin.QueryUsingPoolDUnitTest.TestObject; select distinct ticker, price from " |
| + regionName + " where ticker = 'IBM'"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(0, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates() |
| && results.getCollectionType().getElementType().isStructType()); |
| |
| queryString = |
| "import org.apache.geode.admin.QueryUsingPoolDUnitTest.TestObject; select distinct ticker, price from " |
| + regionName + " where price > 49"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(numberOfEntries / 2, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates() |
| && results.getCollectionType().getElementType().isStructType()); |
| |
| queryString = |
| "import org.apache.geode.admin.QueryUsingPoolDUnitTest.TestObject; select distinct ticker, price from " |
| + regionName + " where price = 50"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(1, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates() |
| && results.getCollectionType().getElementType().isStructType()); |
| |
| queryString = |
| "import org.apache.geode.admin.QueryUsingPoolDUnitTest.TestObject; select distinct ticker, price from " |
| + regionName + " where ticker = 'ibm' and price = 50"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(1, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates() |
| && results.getCollectionType().getElementType().isStructType()); |
| }); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> stopBridgeServer(getCache())); |
| } |
| |
| private void setupBridgeServerAndCreateData(String name, int numberOfEntries) { |
| createAndStartBridgeServer(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| createRegion(this.regionName, this.rootRegionName, factory.create()); |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < numberOfEntries; i++) { |
| region.put("key-" + i, new TestObject(i, "ibm")); |
| } |
| } |
| |
| |
| /** |
| * Tests remote full region query execution. |
| */ |
| @Test |
| public void testRemoteFullRegionQueries() throws CacheException { |
| |
| final String name = this.getName(); |
| final String rootRegionName = "root"; |
| |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| final int numberOfEntries = 100; |
| |
| // Start server |
| vm0.invoke("Create cache server", () -> { |
| createAndStartBridgeServer(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| createRegion(name, factory.create()); |
| }); |
| |
| // Initialize server region |
| vm0.invoke("Create cache server", () -> { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < numberOfEntries; i++) { |
| region.put("key-" + i, new TestObject(i, "ibm")); |
| } |
| }); |
| |
| // Create client region |
| final int port = vm0.invoke(() -> QueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| final String regionName = "/" + rootRegionName + "/" + name; |
| |
| // Create client pool. |
| final String poolName = "testRemoteFullRegionQueries"; |
| vm1.invoke("createPool", () -> createPool(poolName, host0, port)); |
| |
| // Execute client queries |
| vm1.invoke("Execute queries", () -> { |
| String queryString = null; |
| SelectResults results = null; |
| Comparator comparator = null; |
| Object[] resultsArray = null; |
| QueryService qService = null; |
| |
| try { |
| qService = (PoolManager.find(poolName)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| |
| // value query |
| queryString = |
| "SELECT DISTINCT itr.value FROM " + regionName + ".entries itr where itr.key = 'key-1'"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(1, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates()); |
| assertTrue(results.asList().get(0) instanceof TestObject); |
| |
| // key query |
| queryString = |
| "SELECT DISTINCT itr.key FROM " + regionName + ".entries itr where itr.key = 'key-1'"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(1, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates()); |
| assertEquals("key-1", results.asList().get(0)); |
| |
| // order by value query |
| queryString = "SELECT DISTINCT * FROM " + regionName + " WHERE id < 101 ORDER BY id"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(numberOfEntries, results.size()); |
| // All order-by query results are stored in a ResultsCollectionWrapper |
| // wrapping a list, so the assertion below is not correct even though |
| // it should be. |
| // assertTrue(!results.getCollectionType().allowsDuplicates()); |
| assertTrue(results.getCollectionType().isOrdered()); |
| comparator = new IdComparator(); |
| resultsArray = results.toArray(); |
| for (int i = 0; i < resultsArray.length; i++) { |
| if (i + 1 != resultsArray.length) { |
| // The id of the current element in the result set must be less |
| // than the id of the next one to pass. |
| assertTrue( |
| "The id for " + resultsArray[i] + " should be less than the id for " |
| + resultsArray[i + 1], |
| comparator.compare(resultsArray[i], resultsArray[i + 1]) == -1); |
| } |
| } |
| |
| // order by struct query |
| queryString = |
| "SELECT DISTINCT id, ticker, price FROM " + regionName + " WHERE id < 101 ORDER BY id"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(numberOfEntries, results.size()); |
| // All order-by query results are stored in a ResultsCollectionWrapper |
| // wrapping a list, so the assertion below is not correct even though |
| // it should be. |
| // assertTrue(!results.getCollectionType().allowsDuplicates()); |
| assertTrue(results.getCollectionType().isOrdered()); |
| comparator = new StructIdComparator(); |
| resultsArray = results.toArray(); |
| for (int i = 0; i < resultsArray.length; i++) { |
| if (i + 1 != resultsArray.length) { |
| // The id of the current element in the result set must be less |
| // than the id of the next one to pass. |
| assertTrue( |
| "The id for " + resultsArray[i] + " should be less than the id for " |
| + resultsArray[i + 1], |
| comparator.compare(resultsArray[i], resultsArray[i + 1]) == -1); |
| } |
| } |
| |
| // size query |
| queryString = "(SELECT DISTINCT * FROM " + regionName + " WHERE id < 101).size"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(1, results.size()); |
| Object result = results.iterator().next(); |
| assertTrue(result instanceof Integer); |
| int resultInt = ((Integer) result).intValue(); |
| assertEquals(resultInt, 100); |
| |
| // query with leading/trailing spaces |
| queryString = |
| " SELECT DISTINCT itr.key FROM " + regionName + ".entries itr where itr.key = 'key-1' "; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(1, results.size()); |
| assertEquals("key-1", results.asList().get(0)); |
| }); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> stopBridgeServer(getCache())); |
| } |
| |
| private int createAndStartBridgeServer() { |
| Properties config = new Properties(); |
| config.setProperty(LOCATORS, "localhost[" + DistributedTestUtils.getDUnitLocatorPort() + "]"); |
| getSystem(config); |
| try { |
| return startBridgeServer(0, false); |
| } catch (Exception ex) { |
| Assert.fail("While starting CacheServer", ex); |
| return -1; |
| } |
| } |
| |
| /** |
| * Tests client-server query using parameters (compiled queries). |
| */ |
| @Test |
| public void testClientServerQueriesWithParams() throws CacheException { |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| final int numberOfEntries = 100; |
| |
| final Object[][] params = new Object[][] {{"key-1"}, // 0 |
| {101}, // 1 |
| {101}, // 2 |
| {101}, // 3 |
| {50, "ibm"}, // 4 |
| {50, "ibm"}, // 5 |
| }; |
| |
| final int[] expectedResults = new int[] {1, // 0 |
| 100, // 1 |
| 100, // 2 |
| 1, // 3 |
| 1, // 4 |
| 50, // 5 |
| }; |
| |
| assertNotNull(this.regionName); |
| |
| // Start server |
| final int port = vm0.invoke("Create cache server", () -> { |
| setupBridgeServerAndCreateData(regionName, numberOfEntries); |
| return getCacheServerPort(); |
| }); |
| |
| // Create client region |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| // Create client pool. |
| final String poolName = "testClientServerQueriesWithParams"; |
| vm1.invoke("createPool", () -> createPool(poolName, host0, port)); |
| |
| // Execute client queries |
| vm1.invoke(new CacheSerializableRunnable("Execute queries") { |
| @Override |
| public void run2() throws CacheException { |
| SelectResults results = null; |
| QueryService qService = null; |
| |
| try { |
| qService = (PoolManager.find(poolName)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| |
| executeQueriesForClientServerQueriesWithParams(results, qService, params, expectedResults); |
| } |
| }); |
| |
| final int useMaintainedCompiledQueries = queryString.length; |
| |
| // Execute the same compiled queries multiple time |
| vm1.invoke("Execute queries", () -> { |
| SelectResults results = null; |
| QueryService qService = null; |
| |
| try { |
| qService = (PoolManager.find(poolName)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| for (int x = 0; x < useMaintainedCompiledQueries; x++) { |
| executeQueriesForClientServerQueriesWithParams(results, qService, params, expectedResults); |
| } |
| }); |
| |
| // Validate maintained compiled queries. |
| // There should be only queryString.length compiled queries registered. |
| vm0.invoke("validate compiled query.", () -> { |
| long compiledQueryCount = |
| CacheClientNotifier.getInstance().getStats().getCompiledQueryCount(); |
| assertEquals(queryString.length, compiledQueryCount); |
| }); |
| |
| // Check to see if maintained compiled queries are used. |
| vm0.invoke("validate compiled query.", () -> { |
| long compiledQueryUsedCount = |
| CacheClientNotifier.getInstance().getStats().getCompiledQueryUsedCount(); |
| int numTimesUsed = (useMaintainedCompiledQueries + 1) * queryString.length; |
| assertEquals(numTimesUsed, compiledQueryUsedCount); |
| }); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> stopBridgeServer(getCache())); |
| } |
| |
| private void executeQueriesForClientServerQueriesWithParams(SelectResults results, |
| QueryService qService, Object[][] params, int[] expectedResults) { |
| for (int i = 0; i < queryString.length; i++) { |
| try { |
| logger.info("### Executing Query :" + queryString[i]); |
| Query query = qService.newQuery(queryString[i]); |
| results = (SelectResults) query.execute(params[i]); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString[i], e); |
| } |
| try { |
| assertEquals(expectedResults[i], results.size()); |
| } catch (Throwable th) { |
| fail("Result mismatch for query= " + queryString[i] + " expected = " + expectedResults[i] |
| + " actual=" + results.size()); |
| } |
| } |
| } |
| |
| /** |
| * Tests client-server query using parameters (compiled queries). |
| */ |
| @Test |
| public void testMulitipleClientServerQueriesWithParams() throws CacheException { |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| VM vm3 = host.getVM(3); |
| final int numberOfEntries = 100; |
| |
| final Object[][] params = new Object[][] {{"key-1"}, // 0 |
| {101}, // 1 |
| {101}, // 2 |
| {101}, // 3 |
| {50, "ibm"}, // 4 |
| {50, "ibm"}, // 5 |
| }; |
| |
| final int[] expectedResults = new int[] {1, // 0 |
| 100, // 1 |
| 100, // 2 |
| 1, // 3 |
| 1, // 4 |
| 50, // 5 |
| }; |
| |
| // Start server1 |
| final int port0 = vm0.invoke("Create cache server", () -> { |
| setupBridgeServerAndCreateData(regionName, numberOfEntries); |
| return getCacheServerPort(); |
| }); |
| |
| // Start server2 |
| final int port1 = vm1.invoke("Create cache server", () -> { |
| setupBridgeServerAndCreateData(regionName, numberOfEntries); |
| return getCacheServerPort(); |
| }); |
| |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| // Create client pool. |
| final String poolName = "testClientServerQueriesWithParams"; |
| vm2.invoke("createPool", |
| () -> createPool(poolName, new String[] {host0}, new int[] {port0}, true)); |
| vm3.invoke("createPool", |
| () -> createPool(poolName, new String[] {host0}, new int[] {port1}, true)); |
| |
| // Execute client queries |
| SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") { |
| @Override |
| public void run2() throws CacheException { |
| SelectResults results = null; |
| QueryService qService = null; |
| |
| try { |
| qService = (PoolManager.find(poolName)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| for (int j = 0; j < queryString.length; j++) { |
| executeQueriesForClientServerQueriesWithParams(results, qService, params, |
| expectedResults); |
| } |
| } |
| }; |
| |
| vm2.invoke(executeQueries); |
| vm3.invoke(executeQueries); |
| |
| // Validate maintained compiled queries. |
| // There should be only queryString.length compiled queries registered. |
| vm0.invoke("validate compiled query.", () -> { |
| long compiledQueryCount = |
| CacheClientNotifier.getInstance().getStats().getCompiledQueryCount(); |
| assertEquals(queryString.length, compiledQueryCount); |
| }); |
| |
| vm2.invoke("closeClient", () -> closeClient()); |
| vm3.invoke("closeClient", () -> closeClient()); |
| |
| // Validate maintained compiled queries. |
| // All the queries will be still present in the server. |
| // They will be cleaned up periodically1 |
| vm0.invoke("validate compiled query.", () -> { |
| long compiledQueryCount = |
| CacheClientNotifier.getInstance().getStats().getCompiledQueryCount(); |
| assertEquals(queryString.length, compiledQueryCount); |
| }); |
| |
| vm1.invoke("validate compiled query.", () -> { |
| long compiledQueryCount = |
| CacheClientNotifier.getInstance().getStats().getCompiledQueryCount(); |
| assertEquals(queryString.length, compiledQueryCount); |
| }); |
| |
| // recreate clients and execute queries. |
| vm2.invoke("createPool", |
| () -> createPool(poolName, new String[] {host0, host0}, new int[] {port1, port0}, true)); |
| vm3.invoke("createPool", |
| () -> createPool(poolName, new String[] {host0, host0}, new int[] {port0, port1}, true)); |
| |
| vm2.invoke(executeQueries); |
| vm3.invoke(executeQueries); |
| |
| // Validate maintained compiled queries. |
| // All the queries will be still present in the server. |
| vm0.invoke("validate compiled query.", () -> { |
| long compiledQueryCount = |
| CacheClientNotifier.getInstance().getStats().getCompiledQueryCount(); |
| assertEquals(queryString.length, compiledQueryCount); |
| }); |
| |
| vm1.invoke("validate compiled query.", () -> { |
| long compiledQueryCount = |
| CacheClientNotifier.getInstance().getStats().getCompiledQueryCount(); |
| assertEquals(queryString.length, compiledQueryCount); |
| }); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> stopBridgeServer(getCache())); |
| vm1.invoke("Stop CacheServer", () -> stopBridgeServer(getCache())); |
| } |
| |
| /** |
| * Tests client-server compiled query register and cleanup. |
| */ |
| @Test |
| public void testClientServerCompiledQueryRegisterAndCleanup() throws CacheException { |
| |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| final int numberOfEntries = 100; |
| |
| final Object[][] params = new Object[][] {{"key-1"}, // 0 |
| {101}, // 1 |
| {101}, // 2 |
| {101}, // 3 |
| {50, "ibm"}, // 4 |
| {50, "ibm"}, // 5 |
| }; |
| |
| // Start server |
| final int port = vm0.invoke("Create cache server", () -> { |
| setupBridgeServerAndCreateData(name, numberOfEntries); |
| return getCacheServerPort(); |
| }); |
| |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| // Create client pool. |
| final String poolName = "testClientServerQueriesWithParams"; |
| vm1.invoke("createPool", () -> createPool(poolName, host0, port)); |
| vm2.invoke("createPool", () -> createPool(poolName, host0, port)); |
| |
| // Execute client queries |
| vm1.invoke("executeCompiledQueries", () -> executeCompiledQueries(poolName, params)); |
| |
| // Execute client queries |
| vm2.invoke("executeCompiledQueries", () -> executeCompiledQueries(poolName, params)); |
| |
| // Validate maintained compiled queries. |
| vm0.invoke("validate compiled query.", () -> { |
| long compiledQueryCount = |
| CacheClientNotifier.getInstance().getStats().getCompiledQueryCount(); |
| assertEquals(queryString.length, compiledQueryCount); |
| }); |
| |
| vm1.invoke("closeClient", () -> closeClient()); |
| |
| // Validate maintained compiled queries. |
| vm0.invoke("validate compiled query.", () -> { |
| long compiledQueryCount = |
| CacheClientNotifier.getInstance().getStats().getCompiledQueryCount(); |
| assertEquals(queryString.length, compiledQueryCount); |
| }); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> stopBridgeServer(getCache())); |
| } |
| |
| /** |
| * Tests client-server compiled query register and cleanup. |
| */ |
| @Test |
| public void testClientServerCompiledQueryTimeBasedCleanup() throws CacheException { |
| |
| final String name = this.getName(); |
| |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| final int numberOfEntries = 100; |
| |
| final Object[][] params = new Object[][] {{"key-1"}, // 0 |
| {101}, // 1 |
| {101}, // 2 |
| {101}, // 3 |
| {50, "ibm"}, // 4 |
| {50, "ibm"}, // 5 |
| }; |
| |
| // Start server |
| final int port = vm0.invoke("Create cache server", () -> { |
| setupBridgeServerAndCreateData(name, numberOfEntries); |
| QueryService queryService = getCache().getQueryService(); |
| queryService.newQuery("Select * from " + regName); |
| DefaultQuery.setTestCompiledQueryClearTime(2 * 1000); |
| return getCacheServerPort(); |
| }); |
| |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| // Create client pool. |
| final String poolName = "testClientServerQueriesWithParams"; |
| vm1.invoke("createPool", () -> createPool(poolName, host0, port)); |
| vm2.invoke("createPool", () -> createPool(poolName, host0, port)); |
| |
| // Execute client queries |
| vm1.invoke("Execute queries", () -> executeCompiledQueries(poolName, params)); |
| |
| // Execute client queries |
| vm2.invoke("Execute queries", () -> executeCompiledQueries(poolName, params)); |
| |
| // Validate maintained compiled queries. |
| vm0.invoke("validate Compiled query", () -> validateCompiledQuery(0)); |
| |
| // Recreate compiled queries. |
| // Execute client queries |
| vm1.invoke("Execute queries", () -> executeCompiledQueries(poolName, params)); |
| |
| // Execute client queries |
| // The client2 will be using the queries. |
| vm2.invokeAsync("Execute queries", () -> { |
| for (int i = 0; i < 10; i++) { |
| Wait.pause(200); |
| executeCompiledQueries(poolName, params); |
| } |
| }); |
| |
| // Validate maintained compiled queries. |
| vm0.invoke("validate Compiled query", () -> validateCompiledQuery(queryString.length)); |
| |
| // Validate maintained compiled queries. |
| vm0.invoke("validate Compiled query", () -> validateCompiledQuery(0)); |
| |
| // Close clients |
| |
| vm1.invoke("closeClient", () -> closeClient()); |
| vm2.invoke("closeClient", () -> closeClient()); |
| |
| // Validate maintained compiled queries. |
| vm0.invoke("validate Compiled query", () -> validateCompiledQuery(0)); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> stopBridgeServer(getCache())); |
| } |
| |
| /** |
| * Tests client-server compiled query register and cleanup. It creates the client connections |
| * without the subscription enabled. This doesn't create any client proxy on the server. |
| */ |
| @Test |
| public void testClientServerCompiledQueryCleanup() throws CacheException { |
| |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| final int numberOfEntries = 100; |
| |
| final Object[][] params = new Object[][] {{"key-1"}, // 0 |
| {101}, // 1 |
| {101}, // 2 |
| {101}, // 3 |
| {50, "ibm"}, // 4 |
| {50, "ibm"}, // 5 |
| }; |
| |
| // Start server |
| final int port = vm0.invoke("Create cache server", () -> { |
| setupBridgeServerAndCreateData(name, numberOfEntries); |
| QueryService queryService = getCache().getQueryService(); |
| queryService.newQuery("Select * from " + regName); |
| DefaultQuery.setTestCompiledQueryClearTime(2 * 1000); |
| return getCacheServerPort(); |
| }); |
| |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| // Create client pool. |
| final String poolName = "testClientServerQueriesWithParams"; |
| final boolean subscriptiuonEnabled = false; |
| vm1.invoke("createPool", () -> createPool(poolName, host0, port, subscriptiuonEnabled)); |
| vm2.invoke("createPool", () -> createPool(poolName, host0, port, subscriptiuonEnabled)); |
| |
| // Execute client queries |
| vm1.invoke("Execute queries", () -> executeCompiledQueries(poolName, params)); |
| |
| // Execute client queries |
| vm2.invoke("Execute queries", () -> executeCompiledQueries(poolName, params)); |
| |
| // Validate maintained compiled queries. |
| vm0.invoke("validate Compiled query", () -> validateCompiledQuery(0)); |
| |
| // Recreate compiled queries. |
| // Execute client queries |
| vm1.invoke("Execute queries", () -> executeCompiledQueries(poolName, params)); |
| |
| // Execute client queries |
| // The client2 will be using the queries. |
| vm2.invokeAsync("Execute queries", () -> { |
| for (int i = 0; i < 10; i++) { |
| Wait.pause(10); |
| executeCompiledQueries(poolName, params); |
| } |
| }); |
| |
| // Validate maintained compiled queries. |
| vm0.invoke("validate Compiled query", () -> validateCompiledQuery(queryString.length)); |
| |
| // Close clients |
| // Let the compiled queries to be idle (not used). |
| // pause(2 * 1000); |
| |
| // Validate maintained compiled queries. |
| vm0.invoke("validate Compiled query", () -> validateCompiledQuery(0)); |
| |
| // Recreate compiled queries. |
| // Execute client queries |
| vm1.invoke("Execute queries", () -> executeCompiledQueries(poolName, params)); |
| |
| // Validate maintained compiled queries. |
| vm0.invoke("validate compiled query.", () -> { |
| long compiledQueryCount = |
| CacheClientNotifier.getInstance().getStats().getCompiledQueryCount(); |
| assertEquals(queryString.length, compiledQueryCount); |
| }); |
| |
| // Close clients |
| vm2.invoke("closeClient", () -> closeClient()); |
| vm1.invoke("closeClient", () -> closeClient()); |
| |
| // Validate maintained compiled queries. |
| // since not used it should get cleaned up after sometime. |
| vm0.invoke("validate Compiled query", () -> validateCompiledQuery(0)); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> stopBridgeServer(getCache())); |
| } |
| |
| /** |
| * Tests client-server query using parameters (compiled queries). |
| */ |
| @Test |
| public void testBindParamsWithMulitipleClients() throws CacheException { |
| |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| VM vm3 = host.getVM(3); |
| final int numberOfEntries = 100; |
| |
| final Object[][] params = new Object[][] {{"key-1"}, // 0 |
| {101}, // 1 |
| {101}, // 2 |
| {101}, // 3 |
| {50, "ibm"}, // 4 |
| {50, "ibm"}, // 5 |
| }; |
| |
| final String[] querys = |
| new String[] {"SELECT itr.value FROM " + regName + ".entries itr where itr.key = 'key-1'", // 0 |
| "SELECT DISTINCT * FROM " + regName + " WHERE id < 101 ORDER BY id", // 1 |
| "SELECT DISTINCT * FROM " + regName + " WHERE id < 101 ORDER BY id", // 2 |
| "(SELECT DISTINCT * FROM " + regName + " WHERE id < 101).size", // 3 |
| "SELECT * FROM " + regName + " WHERE id = 50 and Ticker = 'ibm'", // 4 |
| "SELECT * FROM " + regName + " WHERE id < 50 and Ticker = 'ibm'", // 5 |
| }; |
| |
| // Start server1 |
| final int port0 = vm0.invoke("Create cache server", () -> { |
| setupBridgeServerAndCreateData(regionName, numberOfEntries); |
| return getCacheServerPort(); |
| }); |
| |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| // Create client pool. |
| final String poolName = "testClientServerQueriesWithParams"; |
| vm1.invoke("createPool", |
| () -> createPool(poolName, new String[] {host0}, new int[] {port0}, true)); |
| vm2.invoke("createPool", |
| () -> createPool(poolName, new String[] {host0}, new int[] {port0}, true)); |
| vm3.invoke("createPool", |
| () -> createPool(poolName, new String[] {host0}, new int[] {port0}, true)); |
| |
| // Execute client query multiple times and validate results. |
| SerializableRunnable executeSameQueries = new CacheSerializableRunnable("Execute queries") { |
| @Override |
| public void run2() throws CacheException { |
| QueryService qService = null; |
| SelectResults[][] rs = new SelectResults[1][2]; |
| |
| try { |
| qService = (PoolManager.find(poolName)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| // Use minimal query, so that there will be multiple |
| // clients using the same compiled query at server. |
| for (int j = 0; j < 5; j++) { |
| for (int i = 0; i < 2; i++) { |
| try { |
| logger.info("### Executing Query :" + queryString[i]); |
| Query query = qService.newQuery(queryString[i]); |
| rs[0][0] = (SelectResults) query.execute(params[i]); |
| Query query2 = qService.newQuery(querys[i]); |
| rs[0][1] = (SelectResults) query2.execute(); |
| // Compare results. |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString[i], e); |
| } |
| logger.info( |
| "### Comparing results for Query :" + ((i + 1) * (j + 1)) + " : " + queryString[i]); |
| compareQueryResultsWithoutAndWithIndexes(rs, 1); |
| logger.info("### Done Comparing results for Query :" + ((i + 1) * (j + 1)) + " : " |
| + queryString[i]); |
| } |
| } |
| } |
| }; |
| |
| vm1.invoke("executeSameQueries", () -> executeSameQueries); |
| vm2.invoke("executeSameQueries", () -> executeSameQueries); |
| vm3.invoke("executeSameQueries", () -> executeSameQueries); |
| |
| // Execute client queries and validate results. |
| SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") { |
| @Override |
| public void run2() throws CacheException { |
| QueryService qService = null; |
| SelectResults[][] rs = new SelectResults[1][2]; |
| |
| try { |
| qService = (PoolManager.find(poolName)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| for (int j = 0; j < queryString.length; j++) { |
| for (int i = 0; i < queryString.length; i++) { |
| try { |
| logger.info("### Executing Query :" + queryString[i]); |
| Query query = qService.newQuery(queryString[i]); |
| rs[0][0] = (SelectResults) query.execute(params[i]); |
| Query query2 = qService.newQuery(querys[i]); |
| rs[0][1] = (SelectResults) query2.execute(); |
| // Compare results. |
| compareQueryResultsWithoutAndWithIndexes(rs, 1); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString[i], e); |
| } |
| } |
| } |
| } |
| }; |
| |
| vm1.invoke("executeQueries", () -> executeQueries); |
| vm2.invoke("executeQueries", () -> executeQueries); |
| vm3.invoke("executeQueries", () -> executeQueries); |
| |
| vm1.invoke("closeClient", () -> closeClient()); |
| vm2.invoke("closeClient", () -> closeClient()); |
| vm3.invoke("closeClient", () -> closeClient()); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> stopBridgeServer(getCache())); |
| } |
| |
| /** |
| * Tests remote join query execution. |
| */ |
| @Test |
| public void testRemoteJoinRegionQueries() throws CacheException { |
| |
| final String name = this.getName(); |
| final String rootRegionName = "root"; |
| |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| final int numberOfEntries = 100; |
| |
| // Start server |
| vm0.invoke("Create cache server", () -> { |
| createAndStartBridgeServer(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| createRegion(name + "1", factory.create()); |
| createRegion(name + "2", factory.create()); |
| }); |
| |
| // Initialize server region |
| vm0.invoke("Create cache server", () -> { |
| Region region1 = getRootRegion().getSubregion(name + "1"); |
| for (int i = 0; i < numberOfEntries; i++) { |
| region1.put("key-" + i, new TestObject(i, "ibm")); |
| } |
| Region region2 = getRootRegion().getSubregion(name + "2"); |
| for (int i = 0; i < numberOfEntries; i++) { |
| region2.put("key-" + i, new TestObject(i, "ibm")); |
| } |
| }); |
| |
| // Create client region |
| final int port = |
| vm0.invoke("getCacheServerPort", () -> QueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| final String regionName1 = "/" + rootRegionName + "/" + name + "1"; |
| final String regionName2 = "/" + rootRegionName + "/" + name + "2"; |
| |
| // Create client pool. |
| final String poolName = "testRemoteJoinRegionQueries"; |
| vm1.invoke("createPool", () -> createPool(poolName, host0, port)); |
| |
| // Execute client queries |
| vm1.invoke("Execute queries", () -> { |
| String queryString = null; |
| SelectResults results = null; |
| QueryService qService = null; |
| |
| try { |
| qService = (PoolManager.find(poolName)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| |
| queryString = "select distinct a, b.price from " + regionName1 + " a, " + regionName2 |
| + " b where a.price = b.price"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(numberOfEntries, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates() |
| && results.getCollectionType().getElementType().isStructType()); |
| |
| queryString = "select distinct a, b.price from " + regionName1 + " a, " + regionName2 |
| + " b where a.price = b.price and a.price = 50"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(1, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates() |
| && results.getCollectionType().getElementType().isStructType()); |
| }); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> stopBridgeServer(getCache())); |
| } |
| |
| /** |
| * Tests remote query execution using a BridgeClient as the CacheWriter and CacheLoader. |
| */ |
| @Test |
| public void testRemoteBridgeClientQueries() throws CacheException { |
| |
| final String name = this.getName(); |
| final String rootRegionName = "root"; |
| |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| final int numberOfEntries = 100; |
| |
| // Start server |
| vm0.invoke("Create cache server", () -> { |
| createAndStartBridgeServer(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| createRegion(name, factory.create()); |
| }); |
| |
| // Initialize server region |
| vm0.invoke("Create cache server", () -> { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < numberOfEntries; i++) { |
| region.put("key-" + i, new TestObject(i, "ibm")); |
| } |
| }); |
| |
| final int port = vm0.invoke("getCacheServerPort", () -> getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| final String regionName = "/" + rootRegionName + "/" + name; |
| |
| // Create client pool. |
| final String poolName1 = "testRemoteBridgeClientQueries1"; |
| final String poolName2 = "testRemoteBridgeClientQueries2"; |
| |
| vm1.invoke("createPool", () -> createPool(poolName1, host0, port)); |
| vm2.invoke("createPool", () -> createPool(poolName2, host0, port)); |
| |
| // Execute client queries in VM1 |
| vm1.invoke("Execute queries", () -> { |
| executeQueries(regionName, poolName1); |
| }); |
| |
| // Execute client queries in VM2 |
| vm2.invoke("Execute queries", () -> { |
| executeQueries(regionName, poolName2); |
| }); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> stopBridgeServer(getCache())); |
| } |
| |
| private void executeQueries(String regionName, String poolName1) { |
| String queryString; |
| SelectResults results = null; |
| QueryService qService = null; |
| |
| try { |
| qService = (PoolManager.find(poolName1)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| |
| queryString = |
| "SELECT DISTINCT itr.value FROM " + regionName + ".entries itr where itr.key = 'key-1'"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(1, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates() |
| && !results.getCollectionType().getElementType().isStructType()); |
| assertTrue(results.asList().get(0) instanceof TestObject); |
| |
| queryString = |
| "SELECT DISTINCT itr.key FROM " + regionName + ".entries itr where itr.key = 'key-1'"; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(1, results.size()); |
| assertTrue(!results.getCollectionType().allowsDuplicates() |
| && !results.getCollectionType().getElementType().isStructType()); |
| assertEquals("key-1", results.asList().get(0)); |
| } |
| |
| /** |
| * This the dunit test for the bug no : 36969 |
| * |
| */ |
| |
| @Test |
| public void testBug36969() throws Exception { |
| final String name = this.getName(); |
| final String rootRegionName = "root"; |
| |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| final int numberOfEntries = 100; |
| |
| // Start server |
| vm0.invoke("Create cache server", () -> { |
| createAndStartBridgeServer(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| final Region region1 = createRegion(name, factory.createRegionAttributes()); |
| final Region region2 = createRegion(name + "_2", factory.createRegionAttributes()); |
| QueryObserverHolder.setInstance(new QueryObserverAdapter() { |
| @Override |
| public void afterQueryEvaluation(Object result) { |
| // Destroy the region in the test |
| region1.close(); |
| } |
| |
| }); |
| for (int i = 0; i < numberOfEntries; i++) { |
| region1.put("key-" + i, new TestObject(i, "ibm")); |
| region2.put("key-" + i, new TestObject(i, "ibm")); |
| } |
| }); |
| |
| final int port = |
| vm0.invoke("getCachedServerPort", () -> QueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| final String regionName1 = "/" + rootRegionName + "/" + name; |
| final String regionName2 = "/" + rootRegionName + "/" + name + "_2"; |
| |
| // Create client pool. |
| final String poolName = "testBug36969"; |
| vm1.invoke("createPool", () -> createPool(poolName, host0, port)); |
| |
| // Execute client queries in VM1 |
| vm1.invoke("Execute queries", () -> { |
| String queryString = "select distinct * from " + regionName1 + ", " + regionName2; |
| // SelectResults results = null; |
| QueryService qService = null; |
| |
| try { |
| qService = (PoolManager.find(poolName)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| |
| try { |
| Query query = qService.newQuery(queryString); |
| query.execute(); |
| fail("The query should have experienced RegionDestroyedException"); |
| } catch (Exception e) { |
| // OK |
| } |
| }); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> { |
| QueryObserverHolder.setInstance(new QueryObserverAdapter()); |
| stopBridgeServer(getCache()); |
| }); |
| } |
| |
| /** |
| * Tests remote full region query execution. |
| */ |
| @Test |
| public void testRemoteSortQueriesUsingIndex() throws CacheException { |
| final String name = this.getName(); |
| final String rootRegionName = "root"; |
| |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| final int numberOfEntries = 100; |
| |
| // Start server |
| vm0.invoke("Create cache server", () -> { |
| createAndStartBridgeServer(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| createRegion(name, factory.create()); |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < numberOfEntries; i++) { |
| region.put("key-" + i, new TestObject(i, "ibm")); |
| } |
| // Create index |
| try { |
| QueryService qService = region.getCache().getQueryService(); |
| qService.createIndex("idIndex", IndexType.FUNCTIONAL, "id", region.getFullPath()); |
| } catch (Exception e) { |
| Assert.fail("Failed to create index.", e); |
| } |
| }); |
| |
| // Create client region |
| final int port = |
| vm0.invoke("getCacheServerPort", () -> QueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| final String regionName = "/" + rootRegionName + "/" + name; |
| |
| // Create client pool. |
| final String poolName = "testRemoteFullRegionQueries"; |
| vm1.invoke("createPool", () -> createPool(poolName, host0, port)); |
| |
| // Execute client queries |
| vm1.invoke("Execute queries", () -> { |
| String queryString = null; |
| SelectResults results = null; |
| Comparator comparator = null; |
| Object[] resultsArray = null; |
| QueryService qService = null; |
| Integer v1 = 0; |
| Integer v2 = 0; |
| |
| try { |
| qService = (PoolManager.find(poolName)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| |
| // order by value query |
| String[] qString = {"SELECT DISTINCT * FROM " + regionName + " WHERE id < 101 ORDER BY id", |
| "SELECT DISTINCT id FROM " + regionName + " WHERE id < 101 ORDER BY id",}; |
| |
| for (int cnt = 0; cnt < qString.length; cnt++) { |
| queryString = qString[cnt]; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(numberOfEntries, results.size()); |
| // All order-by query results are stored in a ResultsCollectionWrapper |
| // wrapping a list, so the assertion below is not correct even though |
| // it should be. |
| // assertTrue(!results.getCollectionType().allowsDuplicates()); |
| assertTrue(results.getCollectionType().isOrdered()); |
| comparator = new IdValueComparator(); |
| |
| resultsArray = results.toArray(); |
| for (int i = 0; i < resultsArray.length; i++) { |
| if (i + 1 != resultsArray.length) { |
| // The id of the current element in the result set must be less |
| // than the id of the next one to pass. |
| if (resultsArray[i] instanceof TestObject) { |
| v1 = ((TestObject) resultsArray[i]).getId(); |
| v2 = ((TestObject) resultsArray[i + 1]).getId(); |
| } else { |
| v1 = (Integer) resultsArray[i]; |
| v2 = (Integer) resultsArray[i + 1]; |
| } |
| assertTrue("The id for " + resultsArray[i] + " should be less than the id for " |
| + resultsArray[i + 1], comparator.compare(v1, v2) == -1); |
| } |
| } |
| } |
| |
| // order by struct query |
| String[] qString2 = { |
| "SELECT DISTINCT id, ticker, price FROM " + regionName + " WHERE id < 101 ORDER BY id", |
| "SELECT DISTINCT ticker, id FROM " + regionName + " WHERE id < 101 ORDER BY id", |
| "SELECT DISTINCT id, ticker FROM " + regionName + " WHERE id < 101 ORDER BY id asc",}; |
| |
| for (int cnt = 0; cnt < qString2.length; cnt++) { |
| queryString = qString2[cnt]; |
| try { |
| Query query = qService.newQuery(queryString); |
| results = (SelectResults) query.execute(); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString, e); |
| } |
| assertEquals(numberOfEntries, results.size()); |
| // All order-by query results are stored in a ResultsCollectionWrapper |
| // wrapping a list, so the assertion below is not correct even though |
| // it should be. |
| // assertTrue(!results.getCollectionType().allowsDuplicates()); |
| assertTrue(results.getCollectionType().isOrdered()); |
| comparator = new StructIdComparator(); |
| resultsArray = results.toArray(); |
| for (int i = 0; i < resultsArray.length; i++) { |
| if (i + 1 != resultsArray.length) { |
| // The id of the current element in the result set must be less |
| // than the id of the next one to pass. |
| assertTrue( |
| "The id for " + resultsArray[i] + " should be less than the id for " |
| + resultsArray[i + 1], |
| comparator.compare(resultsArray[i], resultsArray[i + 1]) == -1); |
| } |
| } |
| } |
| }); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> stopBridgeServer(getCache())); |
| } |
| |
| @Test |
| public void testUnSupportedOps() throws Exception { |
| final String name = this.getName(); |
| final String rootRegionName = "root"; |
| |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| final int numberOfEntries = 100; |
| |
| // Start server |
| vm0.invoke("Create cache server", () -> { |
| createAndStartBridgeServer(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| final Region region1 = createRegion(name, factory.createRegionAttributes()); |
| final Region region2 = createRegion(name + "_2", factory.createRegionAttributes()); |
| for (int i = 0; i < numberOfEntries; i++) { |
| region1.put("key-" + i, new TestObject(i, "ibm")); |
| region2.put("key-" + i, new TestObject(i, "ibm")); |
| } |
| }); |
| |
| final int port = |
| vm0.invoke("getCacheServerPort", () -> QueryUsingPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| final String regionName1 = "/" + rootRegionName + "/" + name; |
| |
| // Create client pool. |
| final String poolName = "testUnSupportedOps"; |
| vm1.invoke("createPool", () -> createPool(poolName, host0, port)); |
| |
| // Execute client queries in VM1 |
| vm1.invoke("Execute queries", () -> { |
| final Region region1 = ((ClientCache) getCache()) |
| .createClientRegionFactory(ClientRegionShortcut.LOCAL).create(name); |
| |
| String queryString = "select distinct * from " + regionName1 + " where ticker = $1"; |
| Object[] params = new Object[] {new String("ibm")}; |
| // SelectResults results = null; |
| QueryService qService = null; |
| |
| try { |
| qService = (PoolManager.find(poolName)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| |
| // Testing Remote Query with params. |
| try { |
| Query query = qService.newQuery(queryString); |
| query.execute(params); |
| } catch (UnsupportedOperationException e) { |
| // Expected behavior. |
| } catch (Exception e) { |
| Assert.fail("Failed with UnExpected Exception.", e); |
| } |
| |
| // Test with Index. |
| try { |
| qService.createIndex("test", IndexType.FUNCTIONAL, "ticker", regionName1); |
| } catch (UnsupportedOperationException e) { |
| // Expected behavior. |
| } catch (Exception e) { |
| Assert.fail("Failed with UnExpected Exception.", e); |
| } |
| |
| try { |
| String importString = "import org.apache.geode.admin.QueryUsingPoolDUnitTest.TestObject;"; |
| qService.createIndex("test", IndexType.FUNCTIONAL, "ticker", regionName1, importString); |
| } catch (UnsupportedOperationException e) { |
| // Expected behavior. |
| } catch (Exception e) { |
| Assert.fail("Failed with UnExpected Exception.", e); |
| } |
| |
| try { |
| qService.getIndex(region1, "test"); |
| } catch (UnsupportedOperationException e) { |
| // Expected behavior. |
| } |
| |
| try { |
| qService.getIndexes(region1); |
| } catch (UnsupportedOperationException e) { |
| // Expected behavior. |
| } |
| |
| try { |
| qService.getIndexes(region1, IndexType.FUNCTIONAL); |
| } catch (UnsupportedOperationException e) { |
| // Expected behavior. |
| } |
| |
| // try { |
| // qService.getIndex(Index index); |
| // } catch(UnsupportedOperationException e) { |
| // // Expected behavior. |
| // } |
| |
| try { |
| qService.removeIndexes(region1); |
| } catch (UnsupportedOperationException e) { |
| // Expected behavior. |
| } |
| |
| try { |
| qService.removeIndexes(); |
| } catch (UnsupportedOperationException e) { |
| // Expected behavior. |
| } |
| }); |
| |
| // Stop server |
| vm0.invoke("Stop CacheServer", () -> { |
| QueryObserverHolder.setInstance(new QueryObserverAdapter()); |
| stopBridgeServer(getCache()); |
| }); |
| |
| } |
| |
| /** |
| * Creates a new instance of StructSetOrResultsSet |
| */ |
| private void compareQueryResultsWithoutAndWithIndexes(Object[][] r, int len) { |
| |
| Set set1 = null; |
| Set set2 = null; |
| Iterator itert1 = null; |
| Iterator itert2 = null; |
| ObjectType type1, type2; |
| |
| for (int j = 0; j < len; j++) { |
| type1 = ((SelectResults) r[j][0]).getCollectionType().getElementType(); |
| type2 = ((SelectResults) r[j][1]).getCollectionType().getElementType(); |
| if ((type1.getClass().getName()).equals(type2.getClass().getName())) { |
| logger.info("Both SelectResults are of the same Type i.e.--> " |
| + ((SelectResults) r[j][0]).getCollectionType().getElementType()); |
| } else { |
| logger |
| .info("Classes are : " + type1.getClass().getName() + " " + type2.getClass().getName()); |
| fail("FAILED:Select result Type is different in both the cases"); |
| } |
| if (((SelectResults) r[j][0]).size() == ((SelectResults) r[j][1]).size()) { |
| logger.info( |
| "Both SelectResults are of Same Size i.e. Size= " + ((SelectResults) r[j][1]).size()); |
| } else { |
| fail("FAILED:SelectResults size is different in both the cases. Size1=" |
| + ((SelectResults) r[j][0]).size() + " Size2 = " + ((SelectResults) r[j][1]).size()); |
| } |
| set2 = (((SelectResults) r[j][1]).asSet()); |
| set1 = (((SelectResults) r[j][0]).asSet()); |
| |
| logger.info(" SIZE1 = " + set1.size() + " SIZE2 = " + set2.size()); |
| |
| // boolean pass = true; |
| itert1 = set1.iterator(); |
| while (itert1.hasNext()) { |
| Object p1 = itert1.next(); |
| itert2 = set2.iterator(); |
| |
| boolean exactMatch = false; |
| while (itert2.hasNext()) { |
| logger.info("### Comparing results.."); |
| Object p2 = itert2.next(); |
| if (p1 instanceof Struct) { |
| logger.info("ITS a Set"); |
| Object[] values1 = ((Struct) p1).getFieldValues(); |
| Object[] values2 = ((Struct) p2).getFieldValues(); |
| assertEquals(values1.length, values2.length); |
| boolean elementEqual = true; |
| for (int i = 0; i < values1.length; ++i) { |
| elementEqual = |
| elementEqual && ((values1[i] == values2[i]) || values1[i].equals(values2[i])); |
| } |
| exactMatch = elementEqual; |
| } else { |
| logger.info("Not a Set p2:" + p2 + " p1: " + p1); |
| if (p2 instanceof TestObject) { |
| logger.info("An instance of TestObject"); |
| exactMatch = p2.equals(p1); |
| } else { |
| logger.info("Not an instance of TestObject" + p2.getClass().getCanonicalName()); |
| exactMatch = p2.equals(p1); |
| } |
| } |
| if (exactMatch) { |
| logger.info("Exact MATCH"); |
| break; |
| } |
| } |
| if (!exactMatch) { |
| logger.info("NOT A MATCH"); |
| fail( |
| "Atleast one element in the pair of SelectResults supposedly identical, is not equal "); |
| } |
| } |
| logger.info("### Done Comparing results.."); |
| } |
| } |
| |
| protected void executeCompiledQueries(String poolName, Object[][] params) { |
| QueryService qService = null; |
| |
| try { |
| qService = (PoolManager.find(poolName)).getQueryService(); |
| } catch (Exception e) { |
| Assert.fail("Failed to get QueryService.", e); |
| } |
| |
| for (int i = 0; i < queryString.length; i++) { |
| try { |
| logger.info("### Executing Query :" + queryString[i]); |
| Query query = qService.newQuery(queryString[i]); |
| query.execute(params[i]); |
| } catch (Exception e) { |
| Assert.fail("Failed executing " + queryString[i], e); |
| } |
| } |
| } |
| |
| /** |
| * Starts a cache server on the given port, using the given deserializeValues and |
| * notifyBySubscription to serve up the given region. |
| */ |
| protected int startBridgeServer(int port, boolean notifyBySubscription) throws IOException { |
| Cache cache = getCache(); |
| CacheServer bridge = cache.addCacheServer(); |
| bridge.setPort(port); |
| bridge.setNotifyBySubscription(notifyBySubscription); |
| bridge.start(); |
| bridgeServerPort = bridge.getPort(); |
| return bridge.getPort(); |
| } |
| |
| /** |
| * Stops the cache server that serves up the given cache. |
| */ |
| protected void stopBridgeServer(Cache cache) { |
| CacheServer bridge = (CacheServer) cache.getCacheServers().iterator().next(); |
| bridge.stop(); |
| assertFalse(bridge.isRunning()); |
| } |
| |
| /* Close Client */ |
| public void closeClient() { |
| logger.info("### Close Client. ###"); |
| try { |
| closeCache(); |
| } catch (Exception ex) { |
| logger.info("### Failed to get close client. ###"); |
| } |
| // Wait.pause(2 * 1000); |
| } |
| |
| private static int getCacheServerPort() { |
| return bridgeServerPort; |
| } |
| |
| private static class IdComparator implements Comparator { |
| |
| @Override |
| public int compare(Object obj1, Object obj2) { |
| int obj1Id = ((TestObject) obj1).getId(); |
| int obj2Id = ((TestObject) obj2).getId(); |
| if (obj1Id > obj2Id) { |
| return 1; |
| } else if (obj1Id < obj2Id) { |
| return -1; |
| } else { |
| return 0; |
| } |
| } |
| } |
| |
| private static class IdValueComparator implements Comparator { |
| |
| @Override |
| public int compare(Object obj1, Object obj2) { |
| int obj1Id = ((Integer) obj1).intValue(); |
| int obj2Id = ((Integer) obj2).intValue(); |
| if (obj1Id > obj2Id) { |
| return 1; |
| } else if (obj1Id < obj2Id) { |
| return -1; |
| } else { |
| return 0; |
| } |
| } |
| } |
| |
| private static class StructIdComparator implements Comparator { |
| |
| @Override |
| public int compare(Object obj1, Object obj2) { |
| int obj1Id = ((Integer) ((Struct) obj1).get("id")).intValue(); |
| int obj2Id = ((Integer) ((Struct) obj2).get("id")).intValue(); |
| if (obj1Id > obj2Id) { |
| return 1; |
| } else if (obj1Id < obj2Id) { |
| return -1; |
| } else { |
| return 0; |
| } |
| } |
| } |
| } |