| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.cache.query.dunit; |
| |
| import static org.apache.geode.test.dunit.Assert.assertEquals; |
| import static org.apache.geode.test.dunit.Assert.assertFalse; |
| import static org.apache.geode.test.dunit.Assert.assertTrue; |
| import static org.apache.geode.test.dunit.Assert.fail; |
| import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM; |
| import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter; |
| import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName; |
| |
| import java.io.Serializable; |
| |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionShortcut; |
| import org.apache.geode.cache.client.ClientCache; |
| import org.apache.geode.cache.client.ClientCacheFactory; |
| import org.apache.geode.cache.client.ClientRegionShortcut; |
| import org.apache.geode.cache.query.CacheUtils; |
| import org.apache.geode.cache.query.QueryService; |
| import org.apache.geode.cache.query.SelectResults; |
| import org.apache.geode.cache.query.data.PortfolioPdx; |
| import org.apache.geode.cache.query.data.PositionPdx; |
| import org.apache.geode.cache.query.internal.CompiledValue; |
| import org.apache.geode.cache.query.internal.QueryObserver; |
| import org.apache.geode.cache.query.internal.QueryObserverAdapter; |
| import org.apache.geode.cache.query.internal.QueryObserverHolder; |
| import org.apache.geode.cache.query.internal.StructImpl; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.internal.AvailablePortHelper; |
| import org.apache.geode.internal.cache.GemFireCacheImpl; |
| import org.apache.geode.internal.cache.VMCachedDeserializable; |
| import org.apache.geode.pdx.PdxInstance; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.SerializableCallable; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; |
| import org.apache.geode.test.junit.categories.OQLQueryTest; |
| |
| /** |
| * Test for #44807 to eliminate unnecessary serialization/deserialization in select * queries |
| */ |
| @Category({OQLQueryTest.class}) |
| public class SelectStarQueryDUnitTest extends JUnit4CacheTestCase { |
| |
| /** Used for saving & restoring oldObserver without serialization */ |
| private static volatile QueryObserver oldObserver; |
| |
| private final String regName = "exampleRegion"; |
| private final String regName2 = "exampleRegion2"; |
| |
| private final String[] queries = {"SELECT * FROM /" + regName, // 0 |
| "SELECT * FROM /" + regName + " limit 5", // 1 |
| "SELECT p from /" + regName + " p", // 2 |
| "SELECT count(*) FROM /" + regName, // 3 |
| "SELECT ALL * from /" + regName, // 4 |
| "SELECT * from /" + regName + ".values", // 5 |
| "SELECT distinct * FROM /" + regName, // 6 |
| "SELECT distinct * FROM /" + regName + " p order by p.ID", // 7 |
| "SELECT * from /" + regName + " r, positions.values pos"// 8 |
| }; |
| private final int[] resultSize = {20, 5, 20, 1, 20, 20, 20, 20, 40}; |
| |
| private final String[] multipleRegionQueries = {" SELECT * FROM /" + regName + ", /" + regName2, |
| "SELECT * FROM /" + regName + ", /" + regName2 + " limit 5", |
| "SELECT distinct * FROM /" + regName + ", /" + regName2, |
| "SELECT distinct * FROM /" + regName + " p1, /" + regName2 + " p2 order by p1.ID", |
| "SELECT count(*) FROM /" + regName + ", /" + regName2, |
| "SELECT p, q from /" + regName + " p, /" + regName2 + " q", |
| "SELECT ALL * from /" + regName + " p, /" + regName2 + " q", |
| "SELECT * from /" + regName + ".values" + ", /" + regName2 + ".values", |
| "SELECT * from /" + regName + " p, p.positions.values pos" + ", /" + regName2 |
| + " q, q.positions.values pos",}; |
| private final int[] resultSize2 = {400, 5, 400, 400, 1, 400, 400, 400, 1600}; |
| |
| @Override |
| public final void preTearDownCacheTestCase() throws Exception { |
| invokeInEveryVM(() -> oldObserver = null); |
| } |
| |
| @Test |
| public void functionWithStructTypeInInnerQueryShouldNotThrowException() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM client = host.getVM(3); |
| PortfolioPdx[] portfolios = new PortfolioPdx[10]; |
| for (int i = 0; i < portfolios.length; i++) { |
| portfolios[i] = new PortfolioPdx(i); |
| } |
| |
| // create servers and regions |
| final int port1 = startPartitionedCacheServer(server1, portfolios); |
| |
| // create client |
| client.invoke(new SerializableCallable("Create client") { |
| @Override |
| public Object call() throws Exception { |
| ClientCacheFactory cf = new ClientCacheFactory(); |
| cf.addPoolServer(getServerHostName(server1.getHost()), port1); |
| ClientCache cache = getClientCache(cf); |
| cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regName); |
| return null; |
| } |
| }); |
| |
| // put serialized PortfolioPdx objects |
| client.invoke(new SerializableCallable("Put objects") { |
| @Override |
| public Object call() throws Exception { |
| Region r1 = getRootRegion(regName); |
| for (int i = 10; i < 100; i++) { |
| r1.put("key-" + i, new PortfolioPdx(i)); |
| } |
| return null; |
| } |
| }); |
| |
| // query remotely from client |
| client.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| getLogWriter().info("Querying remotely from client"); |
| QueryService remoteQS = null; |
| try { |
| remoteQS = ((ClientCache) getCache()).getQueryService(); |
| SelectResults sr = (SelectResults) remoteQS |
| .newQuery("select distinct oP.ID, oP.status, oP.getType from /" + regName |
| + " oP where element(select distinct p.ID, p.status, p.getType from /" + regName |
| + " p where p.ID = oP.ID).status = 'inactive'") |
| .execute(); |
| assertEquals(50, sr.size()); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| |
| return null; |
| } |
| }); |
| |
| closeCache(client); |
| closeCache(server1); |
| } |
| |
| @Test |
| public void functionWithStructTypeInInnerQueryShouldNotThrowExceptionWhenRunOnMultipleNodes() |
| throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM server2 = host.getVM(1); |
| final VM server3 = host.getVM(2); |
| final VM client = host.getVM(3); |
| PortfolioPdx[] portfolios = new PortfolioPdx[10]; |
| for (int i = 0; i < portfolios.length; i++) { |
| portfolios[i] = new PortfolioPdx(i); |
| } |
| |
| // create servers and regions |
| final int port1 = startPartitionedCacheServer(server1, portfolios); |
| final int port2 = startPartitionedCacheServer(server2, portfolios); |
| final int port3 = startPartitionedCacheServer(server3, portfolios); |
| |
| // create client |
| client.invoke(new SerializableCallable("Create client") { |
| @Override |
| public Object call() throws Exception { |
| ClientCacheFactory cf = new ClientCacheFactory(); |
| cf.addPoolServer(getServerHostName(server1.getHost()), port1); |
| cf.addPoolServer(getServerHostName(server2.getHost()), port2); |
| cf.addPoolServer(getServerHostName(server3.getHost()), port3); |
| ClientCache cache = getClientCache(cf); |
| cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regName); |
| return null; |
| } |
| }); |
| |
| // put serialized PortfolioPdx objects |
| client.invoke(new SerializableCallable("Put objects") { |
| @Override |
| public Object call() throws Exception { |
| Region r1 = getRootRegion(regName); |
| for (int i = 10; i < 100; i++) { |
| r1.put("key-" + i, new PortfolioPdx(i)); |
| } |
| return null; |
| } |
| }); |
| |
| // query remotely from client |
| client.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| getLogWriter().info("Querying remotely from client"); |
| QueryService remoteQS = null; |
| try { |
| remoteQS = ((ClientCache) getCache()).getQueryService(); |
| SelectResults sr = (SelectResults) remoteQS |
| .newQuery("select distinct oP.ID, oP.status, oP.getType from /" + regName |
| + " oP where element(select distinct p.ID, p.status, p.getType from /" + regName |
| + " p where p.ID = oP.ID).status = 'inactive'") |
| .execute(); |
| assertEquals(50, sr.size()); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| return null; |
| } |
| }); |
| |
| |
| closeCache(client); |
| closeCache(server1); |
| closeCache(server2); |
| closeCache(server3); |
| } |
| |
| @Test |
| public void testSelectStarQueryForPartitionedRegion() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM server2 = host.getVM(1); |
| final VM server3 = host.getVM(2); |
| final VM client = host.getVM(3); |
| PortfolioPdx[] portfolios = new PortfolioPdx[10]; |
| for (int i = 0; i < portfolios.length; i++) { |
| portfolios[i] = new PortfolioPdx(i); |
| } |
| |
| // create servers and regions |
| final int port1 = startPartitionedCacheServer(server1, portfolios); |
| final int port2 = startPartitionedCacheServer(server2, portfolios); |
| final int port3 = startPartitionedCacheServer(server3, portfolios); |
| |
| server1.invoke(new SerializableCallable("Set observer") { |
| @Override |
| public Object call() throws Exception { |
| oldObserver = QueryObserverHolder.setInstance(new QueryResultTrackingObserver()); |
| return null; |
| } |
| }); |
| |
| // create client |
| client.invoke(new SerializableCallable("Create client") { |
| @Override |
| public Object call() throws Exception { |
| ClientCacheFactory cf = new ClientCacheFactory(); |
| cf.addPoolServer(getServerHostName(server1.getHost()), port1); |
| cf.addPoolServer(getServerHostName(server2.getHost()), port2); |
| cf.addPoolServer(getServerHostName(server3.getHost()), port3); |
| ClientCache cache = getClientCache(cf); |
| cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regName); |
| return null; |
| } |
| }); |
| |
| // put serialized PortfolioPdx objects |
| client.invoke(new SerializableCallable("Put objects") { |
| @Override |
| public Object call() throws Exception { |
| Region r1 = getRootRegion(regName); |
| for (int i = 10; i < 20; i++) { |
| r1.put("key-" + i, new PortfolioPdx(i)); |
| } |
| return null; |
| } |
| }); |
| |
| // query remotely from client |
| client.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| getLogWriter().info("Querying remotely from client"); |
| QueryService localQS = null; |
| QueryService remoteQS = null; |
| try { |
| localQS = ((ClientCache) getCache()).getLocalQueryService(); |
| remoteQS = ((ClientCache) getCache()).getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults[][] sr = new SelectResults[1][2]; |
| SelectResults res = null; |
| for (int i = 0; i < queries.length; i++) { |
| try { |
| res = (SelectResults) localQS.newQuery(queries[i]).execute(); |
| sr[0][0] = res; |
| res = (SelectResults) remoteQS.newQuery(queries[i]).execute(); |
| sr[0][1] = res; |
| CacheUtils.compareResultsOfWithAndWithoutIndex(sr); |
| } catch (Exception e) { |
| fail("Error executing query: " + queries[i], e); |
| } |
| assertEquals(resultSize[i], res.size()); |
| if (i == 3) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(20, cnt); |
| } else { |
| for (Object rs : res) { |
| if (rs instanceof StructImpl) { |
| for (Object obj : ((StructImpl) rs).getFieldValues()) { |
| if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + obj.getClass()); |
| } |
| } |
| } else if (rs instanceof PortfolioPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + rs.getClass()); |
| } |
| } |
| } |
| } |
| return null; |
| } |
| }); |
| |
| // verify if objects iterated by query are serialized |
| server1.invoke(new SerializableCallable("Get observer") { |
| @Override |
| public Object call() throws Exception { |
| QueryObserver observer = QueryObserverHolder.getInstance(); |
| assertTrue(QueryObserverHolder.hasObserver()); |
| assertTrue(observer instanceof QueryResultTrackingObserver); |
| QueryResultTrackingObserver resultObserver = (QueryResultTrackingObserver) observer; |
| assertTrue(resultObserver.isObjectSerialized()); |
| return null; |
| } |
| }); |
| |
| // verify if objects returned by local server query are not serialized |
| server1.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| QueryObserver observer = QueryObserverHolder.setInstance(new QueryResultTrackingObserver()); |
| QueryService qs = null; |
| try { |
| qs = getCache().getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| for (int i = 0; i < queries.length; i++) { |
| try { |
| res = (SelectResults) qs.newQuery(queries[i]).execute(); |
| } catch (Exception e) { |
| fail("Error executing query: " + queries[i], e); |
| } |
| assertEquals(resultSize[i], res.size()); |
| if (i == 3) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(20, cnt); |
| } else { |
| for (Object rs : res) { |
| if (rs instanceof StructImpl) { |
| for (Object obj : ((StructImpl) rs).getFieldValues()) { |
| if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + obj.getClass()); |
| } |
| } |
| } else if (rs instanceof PortfolioPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + rs.getClass()); |
| } |
| } |
| } |
| } |
| observer = QueryObserverHolder.getInstance(); |
| assertTrue(QueryObserverHolder.hasObserver()); |
| assertTrue(observer instanceof QueryResultTrackingObserver); |
| QueryResultTrackingObserver resultObserver = (QueryResultTrackingObserver) observer; |
| assertFalse(resultObserver.isObjectSerialized()); |
| // reset observer |
| QueryObserverHolder.setInstance(oldObserver); |
| return null; |
| } |
| }); |
| |
| closeCache(client); |
| closeCache(server1); |
| closeCache(server2); |
| closeCache(server3); |
| } |
| |
| @Test |
| public void testSelectStarQueryForReplicatedRegion() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(1); |
| final VM client = host.getVM(3); |
| // create servers and regions |
| final int port1 = startReplicatedCacheServer(server1); |
| |
| server1.invoke(new SerializableCallable("Set observer") { |
| @Override |
| public Object call() throws Exception { |
| oldObserver = QueryObserverHolder.setInstance(new QueryResultTrackingObserver()); |
| return null; |
| } |
| }); |
| |
| // create client |
| client.invoke(new SerializableCallable("Create client") { |
| @Override |
| public Object call() throws Exception { |
| ClientCacheFactory cf = new ClientCacheFactory(); |
| cf.addPoolServer(getServerHostName(server1.getHost()), port1); |
| ClientCache cache = getClientCache(cf); |
| cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regName); |
| cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regName2); |
| return null; |
| } |
| }); |
| |
| // put serialized PortfolioPdx objects |
| client.invoke(new SerializableCallable("Put objects") { |
| @Override |
| public Object call() throws Exception { |
| Region r1 = getRootRegion(regName); |
| Region r2 = getRootRegion(regName2); |
| for (int i = 10; i < 20; i++) { |
| r1.put("key-" + i, new PortfolioPdx(i)); |
| r2.put("key-" + i, new PortfolioPdx(i)); |
| } |
| return null; |
| } |
| }); |
| |
| // query remotely from client |
| client.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| getLogWriter().info("Querying remotely from client"); |
| QueryService localQS = null; |
| QueryService remoteQS = null; |
| try { |
| localQS = ((ClientCache) getCache()).getLocalQueryService(); |
| remoteQS = ((ClientCache) getCache()).getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| SelectResults[][] sr = new SelectResults[1][2]; |
| |
| for (int i = 0; i < multipleRegionQueries.length; i++) { |
| try { |
| res = (SelectResults) localQS.newQuery(multipleRegionQueries[i]).execute(); |
| sr[0][0] = res; |
| res = (SelectResults) remoteQS.newQuery(multipleRegionQueries[i]).execute(); |
| sr[0][1] = res; |
| CacheUtils.compareResultsOfWithAndWithoutIndex(sr); |
| } catch (Exception e) { |
| fail("Error executing query: " + multipleRegionQueries[i], e); |
| } |
| assertEquals(resultSize2[i], res.size()); |
| if (i == 4) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(400, cnt); |
| |
| } else { |
| for (Object rs : res) { |
| if (rs instanceof StructImpl) { |
| for (Object obj : ((StructImpl) rs).getFieldValues()) { |
| if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else { |
| fail("Result objects for remote client query: " + multipleRegionQueries[i] |
| + " should be instance of PortfolioPdx and not " + obj.getClass()); |
| } |
| } |
| } else if (rs instanceof PortfolioPdx) { |
| } else { |
| fail("Result objects for remote client query: " + multipleRegionQueries[i] |
| + " should be instance of PortfolioPdx and not " + rs.getClass()); |
| } |
| } |
| } |
| |
| } |
| return null; |
| } |
| }); |
| |
| // verify if objects iterated by query are serialized |
| server1.invoke(new SerializableCallable("Get observer") { |
| @Override |
| public Object call() throws Exception { |
| QueryObserver observer = QueryObserverHolder.getInstance(); |
| assertTrue(QueryObserverHolder.hasObserver()); |
| assertTrue(observer instanceof QueryResultTrackingObserver); |
| QueryResultTrackingObserver resultObserver = (QueryResultTrackingObserver) observer; |
| assertTrue(resultObserver.isObjectSerialized()); |
| return null; |
| } |
| }); |
| |
| // verify if objects returned by local server query are not serialized |
| server1.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| QueryObserver observer = QueryObserverHolder.setInstance(new QueryResultTrackingObserver()); |
| QueryService qs = null; |
| try { |
| qs = getCache().getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| for (int i = 0; i < multipleRegionQueries.length; i++) { |
| try { |
| res = (SelectResults) qs.newQuery(multipleRegionQueries[i]).execute(); |
| } catch (Exception e) { |
| fail("Error executing query: " + multipleRegionQueries[i], e); |
| } |
| assertEquals(resultSize2[i], res.size()); |
| if (i == 4) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(400, cnt); |
| |
| } else { |
| for (Object rs : res) { |
| if (rs instanceof StructImpl) { |
| for (Object obj : ((StructImpl) rs).getFieldValues()) { |
| if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else { |
| fail("Result objects for remote client query: " + multipleRegionQueries[i] |
| + " should be instance of PortfolioPdx and not " + obj.getClass()); |
| } |
| } |
| } else if (rs instanceof PortfolioPdx) { |
| } else { |
| fail("Result objects for remote client query: " + multipleRegionQueries[i] |
| + " should be instance of PortfolioPdx and not " + rs.getClass()); |
| } |
| } |
| |
| } |
| } |
| observer = QueryObserverHolder.getInstance(); |
| assertTrue(QueryObserverHolder.hasObserver()); |
| assertTrue(observer instanceof QueryResultTrackingObserver); |
| QueryResultTrackingObserver resultObserver = (QueryResultTrackingObserver) observer; |
| assertFalse(resultObserver.isObjectSerialized()); |
| // reset observer |
| QueryObserverHolder.setInstance(oldObserver); |
| return null; |
| } |
| }); |
| |
| closeCache(client); |
| closeCache(server1); |
| } |
| |
| @Test |
| public void testByteArrayReplicatedRegion() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM client = host.getVM(3); |
| final byte[] ba = new byte[] {1, 2, 3, 4, 5}; |
| |
| // create servers and regions |
| final int port = (Integer) server1.invoke(new SerializableCallable("Create Server1") { |
| @Override |
| public Object call() throws Exception { |
| Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regName); |
| // put domain objects |
| for (int i = 0; i < 10; i++) { |
| r1.put("key-" + i, ba); |
| } |
| CacheServer server = getCache().addCacheServer(); |
| int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); |
| server.setPort(port); |
| server.start(); |
| return port; |
| } |
| }); |
| |
| server1.invoke(new SerializableCallable("Set observer") { |
| @Override |
| public Object call() throws Exception { |
| oldObserver = QueryObserverHolder.setInstance(new QueryResultTrackingObserver()); |
| return null; |
| } |
| }); |
| |
| // create client |
| client.invoke(new SerializableCallable("Create client") { |
| @Override |
| public Object call() throws Exception { |
| ClientCacheFactory cf = new ClientCacheFactory(); |
| cf.addPoolServer(getServerHostName(server1.getHost()), port); |
| ClientCache cache = getClientCache(cf); |
| cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regName); |
| return null; |
| } |
| }); |
| |
| // put serialized PortfolioPdx objects |
| client.invoke(new SerializableCallable("Put objects") { |
| @Override |
| public Object call() throws Exception { |
| Region r1 = getRootRegion(regName); |
| for (int i = 10; i < 20; i++) { |
| r1.put("key-" + i, ba); |
| } |
| return null; |
| } |
| }); |
| |
| // query remotely from client |
| client.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| getLogWriter().info("Querying remotely from client"); |
| QueryService localQS = null; |
| QueryService remoteQS = null; |
| try { |
| localQS = ((ClientCache) getCache()).getLocalQueryService(); |
| remoteQS = ((ClientCache) getCache()).getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| SelectResults[][] sr = new SelectResults[1][2]; |
| |
| for (int i = 0; i < 6; i++) { |
| try { |
| res = (SelectResults) localQS.newQuery(queries[i]).execute(); |
| sr[0][0] = res; |
| res = (SelectResults) remoteQS.newQuery(queries[i]).execute(); |
| sr[0][1] = res; |
| CacheUtils.compareResultsOfWithAndWithoutIndex(sr); |
| } catch (Exception e) { |
| fail("Error executing query: " + queries[i], e); |
| } |
| assertEquals(resultSize[i], res.size()); |
| if (i == 3) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(20, cnt); |
| |
| } else { |
| for (Object o : res) { |
| if (o instanceof byte[]) { |
| int j = 0; |
| for (byte b : ((byte[]) o)) { |
| if (b != ba[j++]) { |
| fail("Bytes in byte array are different when queried from client"); |
| } |
| } |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + o.getClass()); |
| } |
| } |
| } |
| } |
| return null; |
| } |
| }); |
| |
| // verify if objects iterated by query are serialized |
| server1.invoke(new SerializableCallable("Get observer") { |
| @Override |
| public Object call() throws Exception { |
| QueryObserver observer = QueryObserverHolder.getInstance(); |
| assertTrue(QueryObserverHolder.hasObserver()); |
| assertTrue(observer instanceof QueryResultTrackingObserver); |
| QueryResultTrackingObserver resultObserver = (QueryResultTrackingObserver) observer; |
| assertFalse(resultObserver.isObjectSerialized()); |
| return null; |
| } |
| }); |
| |
| // verify if objects returned by local server query are not serialized |
| server1.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| QueryObserver observer = QueryObserverHolder.setInstance(new QueryResultTrackingObserver()); |
| QueryService qs = null; |
| try { |
| qs = getCache().getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| for (int i = 0; i < 6; i++) { |
| try { |
| res = (SelectResults) qs.newQuery(queries[i]).execute(); |
| } catch (Exception e) { |
| fail("Error executing query: " + queries[i], e); |
| } |
| assertEquals(resultSize[i], res.size()); |
| if (i == 3) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(20, cnt); |
| |
| } else { |
| for (Object o : res) { |
| if (o instanceof byte[]) { |
| int j = 0; |
| for (byte b : ((byte[]) o)) { |
| if (b != ba[j++]) { |
| fail("Bytes in byte array are different when queried locally on server"); |
| } |
| } |
| } else { |
| fail("Result objects for server local query: " + queries[i] |
| + " should be instance of byte array and not " + o.getClass()); |
| } |
| } |
| } |
| } |
| observer = QueryObserverHolder.getInstance(); |
| assertTrue(QueryObserverHolder.hasObserver()); |
| assertTrue(observer instanceof QueryResultTrackingObserver); |
| QueryResultTrackingObserver resultObserver = (QueryResultTrackingObserver) observer; |
| assertFalse(resultObserver.isObjectSerialized()); |
| // reset observer |
| QueryObserverHolder.setInstance(oldObserver); |
| return null; |
| } |
| }); |
| |
| closeCache(client); |
| closeCache(server1); |
| } |
| |
| @Test |
| public void testByteArrayPartitionedRegion() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM client = host.getVM(3); |
| final VM server2 = host.getVM(1); |
| final VM server3 = host.getVM(2); |
| final byte[][] objs = new byte[10][5]; |
| |
| for (int i = 0; i < objs.length; i++) { |
| objs[i] = new byte[] {1, 2, 3, 4, 5}; |
| } |
| |
| // create servers and regions |
| final int port1 = startPartitionedCacheServer(server1, objs); |
| final int port2 = startPartitionedCacheServer(server2, objs); |
| final int port3 = startPartitionedCacheServer(server3, objs); |
| |
| server1.invoke(new SerializableCallable("Set observer") { |
| @Override |
| public Object call() throws Exception { |
| oldObserver = QueryObserverHolder.setInstance(new QueryResultTrackingObserver()); |
| return null; |
| } |
| }); |
| |
| // create client |
| client.invoke(new SerializableCallable("Create client") { |
| @Override |
| public Object call() throws Exception { |
| ClientCacheFactory cf = new ClientCacheFactory(); |
| cf.addPoolServer(getServerHostName(server1.getHost()), port1); |
| cf.addPoolServer(getServerHostName(server2.getHost()), port2); |
| cf.addPoolServer(getServerHostName(server3.getHost()), port3); |
| ClientCache cache = getClientCache(cf); |
| cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regName); |
| return null; |
| } |
| }); |
| |
| // put serialized PortfolioPdx objects |
| client.invoke(new SerializableCallable("Put objects") { |
| @Override |
| public Object call() throws Exception { |
| Region r1 = getRootRegion(regName); |
| for (int i = 10; i < 20; i++) { |
| r1.put("key-" + i, new byte[] {1, 2, 3, 4, 5}); |
| } |
| return null; |
| } |
| }); |
| |
| // query remotely from client |
| client.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| getLogWriter().info("Querying remotely from client"); |
| QueryService localQS = null; |
| QueryService remoteQS = null; |
| try { |
| localQS = ((ClientCache) getCache()).getLocalQueryService(); |
| remoteQS = ((ClientCache) getCache()).getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| SelectResults[][] sr = new SelectResults[1][2]; |
| |
| for (int i = 0; i < 6; i++) { |
| try { |
| res = (SelectResults) localQS.newQuery(queries[i]).execute(); |
| sr[0][0] = res; |
| res = (SelectResults) remoteQS.newQuery(queries[i]).execute(); |
| sr[0][1] = res; |
| CacheUtils.compareResultsOfWithAndWithoutIndex(sr); |
| } catch (Exception e) { |
| fail("Error executing query: " + queries[i], e); |
| } |
| assertEquals(resultSize[i], res.size()); |
| if (i == 3) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(20, cnt); |
| |
| } else { |
| for (Object o : res) { |
| if (o instanceof byte[]) { |
| int j = 0; |
| for (byte b : ((byte[]) o)) { |
| if (b != objs[0][j++]) { |
| fail("Bytes in byte array are different when queried from client"); |
| } |
| } |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + o.getClass()); |
| } |
| } |
| } |
| } |
| return null; |
| } |
| }); |
| |
| // verify if objects iterated by query are serialized |
| server1.invoke(new SerializableCallable("Get observer") { |
| @Override |
| public Object call() throws Exception { |
| QueryObserver observer = QueryObserverHolder.getInstance(); |
| assertTrue(QueryObserverHolder.hasObserver()); |
| assertTrue(observer instanceof QueryResultTrackingObserver); |
| QueryResultTrackingObserver resultObserver = (QueryResultTrackingObserver) observer; |
| assertFalse(resultObserver.isObjectSerialized()); |
| return null; |
| } |
| }); |
| |
| // verify if objects returned by local server query are not serialized |
| server1.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| QueryObserver observer = QueryObserverHolder.setInstance(new QueryResultTrackingObserver()); |
| QueryService qs = null; |
| try { |
| qs = getCache().getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| for (int i = 0; i < 6; i++) { |
| try { |
| res = (SelectResults) qs.newQuery(queries[i]).execute(); |
| } catch (Exception e) { |
| fail("Error executing query: " + queries[i], e); |
| } |
| assertEquals(resultSize[i], res.size()); |
| if (i == 3) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(20, cnt); |
| |
| } else { |
| for (Object o : res) { |
| if (o instanceof byte[]) { |
| int j = 0; |
| for (byte b : ((byte[]) o)) { |
| if (b != objs[0][j++]) { |
| fail("Bytes in byte array are different when queried locally on server"); |
| } |
| } |
| } else { |
| fail("Result objects for server local query: " + queries[i] |
| + " should be instance of byte array and not " + o.getClass()); |
| } |
| } |
| } |
| |
| } |
| observer = QueryObserverHolder.getInstance(); |
| assertTrue(QueryObserverHolder.hasObserver()); |
| assertTrue(observer instanceof QueryResultTrackingObserver); |
| QueryResultTrackingObserver resultObserver = (QueryResultTrackingObserver) observer; |
| assertFalse(resultObserver.isObjectSerialized()); |
| // reset observer |
| QueryObserverHolder.setInstance(oldObserver); |
| return null; |
| } |
| }); |
| |
| closeCache(client); |
| closeCache(server1); |
| |
| } |
| |
| @Test |
| public void testSelectStarQueryForIndexes() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM client = host.getVM(3); |
| // create servers and regions |
| final int port1 = startReplicatedCacheServer(server1); |
| // create client |
| client.invoke(new SerializableCallable("Create client") { |
| @Override |
| public Object call() throws Exception { |
| ClientCacheFactory cf = new ClientCacheFactory(); |
| cf.addPoolServer(getServerHostName(server1.getHost()), port1); |
| ClientCache cache = getClientCache(cf); |
| cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regName); |
| cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regName2); |
| return null; |
| } |
| }); |
| |
| // put serialized PortfolioPdx objects |
| client.invoke(new SerializableCallable("Put objects") { |
| @Override |
| public Object call() throws Exception { |
| Region r1 = getRootRegion(regName); |
| Region r2 = getRootRegion(regName2); |
| for (int i = 10; i < 20; i++) { |
| r1.put("key-" + i, new PortfolioPdx(i)); |
| r2.put("key-" + i, new PortfolioPdx(i)); |
| } |
| return null; |
| } |
| }); |
| |
| // create index on exampleRegion1 |
| server1.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| QueryService qs = null; |
| try { |
| qs = getCache().getQueryService(); |
| qs.createIndex("status", "status", "/" + regName); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| |
| return null; |
| } |
| }); |
| |
| // query remotely from client |
| client.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| getLogWriter().info("Querying remotely from client"); |
| QueryService localQS = null; |
| QueryService remoteQS = null; |
| try { |
| localQS = ((ClientCache) getCache()).getLocalQueryService(); |
| remoteQS = ((ClientCache) getCache()).getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| SelectResults[][] sr = new SelectResults[1][2]; |
| |
| for (int i = 0; i < multipleRegionQueries.length; i++) { |
| try { |
| res = (SelectResults) localQS.newQuery(multipleRegionQueries[i]).execute(); |
| sr[0][0] = res; |
| res = (SelectResults) remoteQS.newQuery(multipleRegionQueries[i]).execute(); |
| sr[0][1] = res; |
| CacheUtils.compareResultsOfWithAndWithoutIndex(sr); |
| } catch (Exception e) { |
| fail("Error executing query: " + multipleRegionQueries[i], e); |
| } |
| assertEquals(resultSize2[i], res.size()); |
| if (i == 4) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(400, cnt); |
| |
| } else { |
| for (Object rs : res) { |
| if (rs instanceof StructImpl) { |
| for (Object obj : ((StructImpl) rs).getFieldValues()) { |
| if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else { |
| fail("Result objects for remote client query: " + multipleRegionQueries[i] |
| + " should be instance of PortfolioPdx and not " + obj.getClass()); |
| } |
| } |
| } else if (rs instanceof PortfolioPdx) { |
| } else { |
| fail("Result objects for remote client query: " + multipleRegionQueries[i] |
| + " should be instance of PortfolioPdx and not " + rs.getClass()); |
| } |
| } |
| } |
| |
| } |
| return null; |
| } |
| }); |
| |
| // create index on exampleRegion2 |
| server1.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| QueryService qs = null; |
| try { |
| qs = getCache().getQueryService(); |
| qs.createIndex("status", "status", "/" + regName2); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| |
| return null; |
| } |
| }); |
| |
| // query remotely from client |
| client.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| getLogWriter().info("Querying remotely from client"); |
| QueryService localQS = null; |
| QueryService remoteQS = null; |
| try { |
| localQS = ((ClientCache) getCache()).getLocalQueryService(); |
| remoteQS = ((ClientCache) getCache()).getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| SelectResults[][] sr = new SelectResults[1][2]; |
| |
| for (int i = 0; i < multipleRegionQueries.length; i++) { |
| try { |
| res = (SelectResults) localQS.newQuery(multipleRegionQueries[i]).execute(); |
| sr[0][0] = res; |
| res = (SelectResults) remoteQS.newQuery(multipleRegionQueries[i]).execute(); |
| sr[0][1] = res; |
| CacheUtils.compareResultsOfWithAndWithoutIndex(sr); |
| } catch (Exception e) { |
| fail("Error executing query: " + multipleRegionQueries[i], e); |
| } |
| assertEquals(resultSize2[i], res.size()); |
| if (i == 4) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(400, cnt); |
| |
| } else { |
| for (Object rs : res) { |
| if (rs instanceof StructImpl) { |
| for (Object obj : ((StructImpl) rs).getFieldValues()) { |
| if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else { |
| fail("Result objects for remote client query: " + multipleRegionQueries[i] |
| + " should be instance of PortfolioPdx and not " + obj.getClass()); |
| } |
| } |
| } else if (rs instanceof PortfolioPdx) { |
| } else { |
| fail("Result objects for remote client query: " + multipleRegionQueries[i] |
| + " should be instance of PortfolioPdx and not " + rs.getClass()); |
| } |
| } |
| } |
| |
| } |
| return null; |
| } |
| }); |
| |
| closeCache(client); |
| closeCache(server1); |
| } |
| |
| @Test |
| public void testSelectStarQueryForPdxObjects() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM client = host.getVM(3); |
| // create servers and regions |
| final int port1 = startReplicatedCacheServer(server1); |
| |
| server1.invoke(new SerializableCallable("Set observer") { |
| @Override |
| public Object call() throws Exception { |
| oldObserver = QueryObserverHolder.setInstance(new QueryResultTrackingObserver()); |
| return null; |
| } |
| }); |
| |
| // create client |
| client.invoke(new SerializableCallable("Create client") { |
| @Override |
| public Object call() throws Exception { |
| ClientCacheFactory cf = new ClientCacheFactory(); |
| cf.addPoolServer(getServerHostName(server1.getHost()), port1); |
| ClientCache cache = getClientCache(cf); |
| cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regName); |
| return null; |
| } |
| }); |
| |
| // Update with serialized PortfolioPdx objects |
| client.invoke(new SerializableCallable("Put objects") { |
| @Override |
| public Object call() throws Exception { |
| Region r1 = getRootRegion(regName); |
| for (int i = 0; i < 20; i++) { |
| r1.put("key-" + i, new PortfolioPdx(i)); |
| } |
| return null; |
| } |
| }); |
| |
| // query remotely from client |
| client.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| getLogWriter().info("Querying remotely from client"); |
| QueryService localQS = null; |
| QueryService remoteQS = null; |
| try { |
| localQS = ((ClientCache) getCache()).getLocalQueryService(); |
| remoteQS = ((ClientCache) getCache()).getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| SelectResults[][] sr = new SelectResults[1][2]; |
| |
| for (int i = 0; i < queries.length; i++) { |
| try { |
| res = (SelectResults) localQS.newQuery(queries[i]).execute(); |
| sr[0][0] = res; |
| res = (SelectResults) remoteQS.newQuery(queries[i]).execute(); |
| sr[0][1] = res; |
| CacheUtils.compareResultsOfWithAndWithoutIndex(sr); |
| } catch (Exception e) { |
| fail("Error executing query: " + queries[i], e); |
| } |
| assertEquals(resultSize[i], res.size()); |
| if (i == 3) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(20, cnt); |
| |
| } else { |
| for (Object rs : res) { |
| if (rs instanceof StructImpl) { |
| for (Object obj : ((StructImpl) rs).getFieldValues()) { |
| if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + obj.getClass()); |
| } |
| } |
| } else if (rs instanceof PortfolioPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + rs.getClass()); |
| } |
| } |
| } |
| } |
| return null; |
| } |
| }); |
| |
| // verify if objects iterated by query are serialized |
| server1.invoke(new SerializableCallable("Get observer") { |
| @Override |
| public Object call() throws Exception { |
| QueryObserver observer = QueryObserverHolder.getInstance(); |
| assertTrue(QueryObserverHolder.hasObserver()); |
| assertTrue(observer instanceof QueryResultTrackingObserver); |
| QueryResultTrackingObserver resultObserver = (QueryResultTrackingObserver) observer; |
| assertTrue(resultObserver.isObjectSerialized()); |
| return null; |
| } |
| }); |
| |
| // verify if objects returned by local server query are not serialized |
| server1.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| QueryObserver observer = QueryObserverHolder.setInstance(new QueryResultTrackingObserver()); |
| QueryService qs = null; |
| try { |
| qs = getCache().getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| for (int i = 0; i < queries.length; i++) { |
| try { |
| res = (SelectResults) qs.newQuery(queries[i]).execute(); |
| } catch (Exception e) { |
| fail("Error executing query: " + queries[i], e); |
| } |
| assertEquals(resultSize[i], res.size()); |
| if (i == 3) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(20, cnt); |
| |
| } else { |
| for (Object rs : res) { |
| if (rs instanceof StructImpl) { |
| for (Object obj : ((StructImpl) rs).getFieldValues()) { |
| if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + obj.getClass()); |
| } |
| } |
| } else if (rs instanceof PortfolioPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + rs.getClass()); |
| } |
| } |
| } |
| } |
| observer = QueryObserverHolder.getInstance(); |
| assertTrue(QueryObserverHolder.hasObserver()); |
| assertTrue(observer instanceof QueryResultTrackingObserver); |
| QueryResultTrackingObserver resultObserver = (QueryResultTrackingObserver) observer; |
| assertFalse(resultObserver.isObjectSerialized()); |
| QueryObserverHolder.setInstance(oldObserver); |
| return null; |
| } |
| }); |
| |
| // verify if Pdx instances are returned by local server query |
| // if read-serialized is set true |
| server1.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| GemFireCacheImpl cache = (GemFireCacheImpl) getCache(); |
| cache.setReadSerializedForTest(true); |
| QueryService qs = null; |
| try { |
| qs = getCache().getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| for (int i = 0; i < queries.length; i++) { |
| try { |
| res = (SelectResults) qs.newQuery(queries[i]).execute(); |
| } catch (Exception e) { |
| fail("Error executing query: " + queries[i], e); |
| } |
| assertEquals(resultSize[i], res.size()); |
| if (i == 3) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(20, cnt); |
| |
| } else { |
| for (Object rs : res) { |
| if (rs instanceof StructImpl) { |
| for (Object obj : ((StructImpl) rs).getFieldValues()) { |
| if (obj instanceof PdxInstance) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PdxInstance and not " + obj.getClass()); |
| } |
| } |
| } else if (rs instanceof PdxInstance) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PdxInstance and not " + rs.getClass()); |
| } |
| } |
| } |
| } |
| return null; |
| } |
| }); |
| |
| closeCache(client); |
| closeCache(server1); |
| } |
| |
| @Test |
| public void testSelectStarQueryForPdxAndNonPdxObjects() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM client = host.getVM(3); |
| // create servers and regions |
| // put domain objects |
| final int port1 = startReplicatedCacheServer(server1); |
| |
| server1.invoke(new SerializableCallable("Set observer") { |
| @Override |
| public Object call() throws Exception { |
| oldObserver = QueryObserverHolder.setInstance(new QueryResultTrackingObserver()); |
| return null; |
| } |
| }); |
| |
| // create client |
| client.invoke(new SerializableCallable("Create client") { |
| @Override |
| public Object call() throws Exception { |
| ClientCacheFactory cf = new ClientCacheFactory(); |
| cf.addPoolServer(getServerHostName(server1.getHost()), port1); |
| ClientCache cache = getClientCache(cf); |
| cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regName); |
| return null; |
| } |
| }); |
| |
| // Add some serialized PortfolioPdx objects |
| client.invoke(new SerializableCallable("Put objects") { |
| @Override |
| public Object call() throws Exception { |
| Region r1 = getRootRegion(regName); |
| for (int i = 10; i < 20; i++) { |
| r1.put("key-" + i, new PortfolioPdx(i)); |
| } |
| return null; |
| } |
| }); |
| |
| // query remotely from client |
| client.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| getLogWriter().info("Querying remotely from client"); |
| QueryService localQS = null; |
| QueryService remoteQS = null; |
| try { |
| localQS = ((ClientCache) getCache()).getLocalQueryService(); |
| remoteQS = ((ClientCache) getCache()).getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| SelectResults[][] sr = new SelectResults[1][2]; |
| |
| for (int i = 0; i < queries.length; i++) { |
| try { |
| res = (SelectResults) localQS.newQuery(queries[i]).execute(); |
| sr[0][0] = res; |
| res = (SelectResults) remoteQS.newQuery(queries[i]).execute(); |
| sr[0][1] = res; |
| CacheUtils.compareResultsOfWithAndWithoutIndex(sr); |
| } catch (Exception e) { |
| fail("Error executing query: " + queries[i], e); |
| } |
| assertEquals(resultSize[i], res.size()); |
| if (i == 3) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(20, cnt); |
| |
| } else { |
| for (Object rs : res) { |
| if (rs instanceof StructImpl) { |
| for (Object obj : ((StructImpl) rs).getFieldValues()) { |
| if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx or PortfolioPdx and not " |
| + obj.getClass()); |
| } |
| } |
| } else if (rs instanceof PortfolioPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + rs.getClass()); |
| } |
| } |
| } |
| } |
| return null; |
| } |
| }); |
| |
| // verify if objects iterated by query are serialized |
| server1.invoke(new SerializableCallable("Get observer") { |
| @Override |
| public Object call() throws Exception { |
| QueryObserver observer = QueryObserverHolder.getInstance(); |
| assertTrue(QueryObserverHolder.hasObserver()); |
| assertTrue(observer instanceof QueryResultTrackingObserver); |
| QueryResultTrackingObserver resultObserver = (QueryResultTrackingObserver) observer; |
| assertTrue(resultObserver.isObjectSerialized()); |
| return null; |
| } |
| }); |
| |
| // verify if objects returned by local server query are not serialized |
| server1.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| QueryObserver observer = QueryObserverHolder.setInstance(new QueryResultTrackingObserver()); |
| QueryService qs = null; |
| try { |
| qs = getCache().getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| for (int i = 0; i < queries.length; i++) { |
| try { |
| res = (SelectResults) qs.newQuery(queries[i]).execute(); |
| } catch (Exception e) { |
| fail("Error executing query: " + queries[i], e); |
| } |
| assertEquals(resultSize[i], res.size()); |
| if (i == 3) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(20, cnt); |
| |
| } else { |
| for (Object rs : res) { |
| if (rs instanceof StructImpl) { |
| for (Object obj : ((StructImpl) rs).getFieldValues()) { |
| if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx or PortfolioPdx and not " |
| + obj.getClass()); |
| } |
| } |
| } else if (rs instanceof PortfolioPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + rs.getClass()); |
| } |
| } |
| } |
| } |
| observer = QueryObserverHolder.getInstance(); |
| assertTrue(QueryObserverHolder.hasObserver()); |
| assertTrue(observer instanceof QueryResultTrackingObserver); |
| QueryResultTrackingObserver resultObserver = (QueryResultTrackingObserver) observer; |
| assertFalse(resultObserver.isObjectSerialized()); |
| QueryObserverHolder.setInstance(oldObserver); |
| return null; |
| } |
| }); |
| |
| server1.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| GemFireCacheImpl cache = (GemFireCacheImpl) getCache(); |
| cache.setReadSerializedForTest(true); |
| QueryService qs = null; |
| try { |
| qs = getCache().getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| for (int i = 0; i < queries.length; i++) { |
| try { |
| res = (SelectResults) qs.newQuery(queries[i]).execute(); |
| } catch (Exception e) { |
| fail("Error executing query: " + queries[i], e); |
| } |
| assertEquals(resultSize[i], res.size()); |
| if (i == 3) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(20, cnt); |
| |
| } else { |
| for (Object rs : res) { |
| if (rs instanceof StructImpl) { |
| for (Object obj : ((StructImpl) rs).getFieldValues()) { |
| if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else if (obj instanceof PdxInstance) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PdxInstance and not " + obj.getClass()); |
| } |
| } |
| } else if (rs instanceof PdxInstance || rs instanceof PortfolioPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PdxInstance and not " + rs.getClass()); |
| } |
| } |
| } |
| } |
| return null; |
| } |
| }); |
| |
| closeCache(client); |
| closeCache(server1); |
| } |
| |
| @Test |
| public void testSelectStarQueryForPdxObjectsReadSerializedTrue() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM client = host.getVM(3); |
| // create servers and regions |
| final int port = (Integer) server1.invoke(new SerializableCallable("Create Server1") { |
| @Override |
| public Object call() throws Exception { |
| ((GemFireCacheImpl) getCache()).setReadSerializedForTest(true); |
| Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regName); |
| CacheServer server = getCache().addCacheServer(); |
| int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); |
| server.setPort(port); |
| server.start(); |
| return port; |
| } |
| }); |
| |
| // create client |
| client.invoke(new SerializableCallable("Create client") { |
| @Override |
| public Object call() throws Exception { |
| ClientCacheFactory cf = new ClientCacheFactory(); |
| cf.addPoolServer(getServerHostName(server1.getHost()), port); |
| ClientCache cache = getClientCache(cf); |
| cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regName); |
| Region r1 = getRootRegion(regName); |
| for (int i = 0; i < 20; i++) { |
| r1.put("key-" + i, new PortfolioPdx(i)); |
| } |
| return null; |
| } |
| }); |
| |
| // query remotely from client |
| client.invoke(new SerializableCallable("Query") { |
| @Override |
| public Object call() throws Exception { |
| getLogWriter().info("Querying remotely from client"); |
| QueryService localQS = null; |
| QueryService remoteQS = null; |
| try { |
| localQS = ((ClientCache) getCache()).getLocalQueryService(); |
| remoteQS = ((ClientCache) getCache()).getQueryService(); |
| } catch (Exception e) { |
| fail("Exception getting query service ", e); |
| } |
| SelectResults res = null; |
| SelectResults[][] sr = new SelectResults[1][2]; |
| |
| for (int i = 0; i < queries.length; i++) { |
| try { |
| res = (SelectResults) localQS.newQuery(queries[i]).execute(); |
| sr[0][0] = res; |
| res = (SelectResults) remoteQS.newQuery(queries[i]).execute(); |
| sr[0][1] = res; |
| CacheUtils.compareResultsOfWithAndWithoutIndex(sr); |
| } catch (Exception e) { |
| fail("Error executing query: " + queries[i], e); |
| } |
| assertEquals(resultSize[i], res.size()); |
| if (i == 3) { |
| int cnt = ((Integer) res.iterator().next()); |
| assertEquals(20, cnt); |
| |
| } else { |
| for (Object rs : res) { |
| if (rs instanceof StructImpl) { |
| for (Object obj : ((StructImpl) rs).getFieldValues()) { |
| if (obj instanceof PortfolioPdx || obj instanceof PositionPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + obj.getClass()); |
| } |
| } |
| } else if (rs instanceof PortfolioPdx) { |
| } else { |
| fail("Result objects for remote client query: " + queries[i] |
| + " should be instance of PortfolioPdx and not " + rs.getClass()); |
| } |
| } |
| } |
| } |
| return null; |
| } |
| }); |
| |
| closeCache(client); |
| closeCache(server1); |
| } |
| |
| private int startPartitionedCacheServer(VM vm, final Object[] objs) { |
| final int port = (Integer) vm.invoke(new SerializableCallable("Create Server1") { |
| @Override |
| public Object call() throws Exception { |
| Region r1 = getCache().createRegionFactory(RegionShortcut.PARTITION).create(regName); |
| // put domain objects |
| for (int i = 0; i < objs.length; i++) { |
| r1.put("key-" + i, objs[i]); |
| } |
| |
| CacheServer server = getCache().addCacheServer(); |
| int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); |
| server.setPort(port); |
| server.start(); |
| return port; |
| } |
| }); |
| return port; |
| } |
| |
| private int startReplicatedCacheServer(VM vm) { |
| final int port = (Integer) vm.invoke(new SerializableCallable("Create Server1") { |
| @Override |
| public Object call() throws Exception { |
| Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regName); |
| Region r2 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regName2); |
| // put domain objects |
| for (int i = 0; i < 10; i++) { |
| r1.put("key-" + i, new PortfolioPdx(i)); |
| r2.put("key-" + i, new PortfolioPdx(i)); |
| } |
| |
| CacheServer server = getCache().addCacheServer(); |
| int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); |
| server.setPort(port); |
| server.start(); |
| return port; |
| } |
| }); |
| return port; |
| } |
| |
| private void closeCache(VM vm) { |
| vm.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| closeCache(); |
| return null; |
| } |
| }); |
| } |
| |
| public class QueryResultTrackingObserver extends QueryObserverAdapter implements Serializable { |
| |
| private boolean isObjectSerialized = false; |
| |
| @Override |
| public void beforeIterationEvaluation(CompiledValue executer, Object currentObject) { |
| if (currentObject instanceof VMCachedDeserializable) { |
| getLogWriter().fine("currentObject is serialized object"); |
| isObjectSerialized = true; |
| } else { |
| getLogWriter().fine("currentObject is deserialized object"); |
| } |
| } |
| |
| public boolean isObjectSerialized() { |
| return isObjectSerialized; |
| } |
| |
| public void setObjectSerialized(boolean isObjectSerialized) { |
| this.isObjectSerialized = isObjectSerialized; |
| } |
| |
| } |
| |
| } |