blob: 57b86ee2118248939cae559df2601334aad8869a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.dunit;
import static org.apache.geode.internal.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Map;
import java.util.Properties;
import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.query.CacheUtils;
import org.apache.geode.cache.query.IndexType;
import org.apache.geode.cache.query.Query;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.query.data.PortfolioPdx;
import org.apache.geode.cache.query.data.PositionPdx;
import org.apache.geode.cache.query.internal.Undefined;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.cache30.ClientServerTestCase;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.pdx.FieldType;
import org.apache.geode.pdx.JSONFormatter;
import org.apache.geode.pdx.PdxInstance;
import org.apache.geode.pdx.PdxInstanceFactory;
import org.apache.geode.pdx.internal.ClientTypeRegistration;
import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl;
import org.apache.geode.pdx.internal.PdxType;
import org.apache.geode.pdx.internal.PeerTypeRegistration;
import org.apache.geode.pdx.internal.TypeRegistration;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.DistributedTestCase;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.OQLQueryTest;
@Category({OQLQueryTest.class})
public class PdxQueryDUnitTest extends PDXQueryTestBase {
public static final Logger logger = LogService.getLogger();
public PdxQueryDUnitTest() {
super();
}
/**
* Tests client-server query on PdxInstance. The client receives projected value.
*/
@Test
public void testServerQuery() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final int numberOfEntries = 5;
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
for (int i = 0; i < numberOfEntries; i++) {
region.put("key-" + i, new TestObject(i, "vmware"));
}
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
assertEquals(0, TestObject.numInstance);
// Execute query with different type of Results.
QueryService qs = getCache().getQueryService();
Query query = null;
SelectResults sr = null;
for (int i = 0; i < queryString.length; i++) {
try {
query = qs.newQuery(queryString[i]);
sr = (SelectResults) query.execute();
} catch (Exception ex) {
fail("Failed to execute query, " + ex.getMessage());
}
for (Object o : sr.asSet()) {
if (i == 0 && !(o instanceof Integer)) {
fail("Expected type Integer, not found in result set. Found type :" + o.getClass());
} else if (i == 1 && !(o instanceof TestObject)) {
fail(
"Expected type TestObject, not found in result set. Found type :" + o.getClass());
} else if (i == 2 && !(o instanceof String)) {
fail("Expected type String, not found in result set. Found type :" + o.getClass());
}
}
}
// Pdx objects for local queries now get deserialized when results are iterated.
// So the deserialized objects are no longer cached in VMCachedDeserializable.
assertEquals(numberOfEntries * 2, TestObject.numInstance);
}
});
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests client-server query on PdxInstance. The client receives projected value.
*/
@Test
public void testClientServerQueryWithProjections() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
class PdxObject extends TestObject {
PdxObject() {}
PdxObject(int id, String ticker) {
super(id, ticker);
}
};
for (int i = 0; i < numberOfEntries; i++) {
region.put("key-" + i, new PdxObject(i, "vmware"));
}
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
System.out.println("##### Region size is: " + region.size());
assertEquals(0, TestObject.numInstance);
}
});
// Create client region
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueriesWithParamsPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
// Execute client queries
SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
@Override
public void run2() throws CacheException {
SelectResults results = null;
QueryService qService = null;
try {
qService = (PoolManager.find(poolName)).getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
try {
logger.info("### Executing Query :" + queryString[0]);
Query query = qService.newQuery(queryString[0]);
results = (SelectResults) query.execute();
} catch (Exception e) {
Assert.fail("Failed executing " + queryString[0], e);
}
assertEquals(numberOfEntries, results.size());
}
};
vm3.invoke(executeQueries);
// Check for TestObject instances on Server2.
// It should be 0
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, TestObject.numInstance);
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests client-server query on compressed PdxInstance. The client receives uncompressed value.
*/
@Test
public void testClientServerQueryWithCompression() throws CacheException {
final String randomString =
"asddfjkhaskkfdjhzjc0943509328kvnhfjkldsg09q3485ibjafdp9q8y43p9u7hgavpiuaha48uy9afliasdnuaiuqa498qa4"
+ "asddfjkhaskkfdjhzjc0943509328kvnhfjkldsg09q3485ibjafdp9q8y43p9u7hgavpiuaha48uy9afliasdnuaiuqa498qa4"
+ "asddfjkhaskkfdjhzjc0943509328kvnhfjkldsg09q3485ibjafdp9q8y43p9u7hgavpiuaha48uy9afliasdnuaiuqa498qa4"
+ "asddfjkhaskkfdjhzjc0943509328kvnhfjkldsg09q3485ibjafdp9q8y43p9u7hgavpiuaha48uy9afliasdnuaiuqa498qa4";
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(false, false, false, compressor);
Region region = getRootRegion().getSubregion(regionName);
assert (region.getAttributes().getCompressor() != null);
for (int i = 0; i < numberOfEntries; i++) {
region.put("key-" + i, new TestObject(i, randomString));
}
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
System.out.println("##### Region size is: " + region.size());
assertEquals(0, TestObject.numInstance);
}
});
// Create client region
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueriesWithParamsPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
// Execute client queries
SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
@Override
@SuppressWarnings("unchecked")
public void run2() throws CacheException {
SelectResults<String> results = null;
QueryService qService = null;
try {
qService = (PoolManager.find(poolName)).getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
try {
logger.info("### Executing Query :" + queryString[2]);
Query query = qService.newQuery(queryString[2]);
results = (SelectResults<String>) query.execute();
} catch (Exception e) {
Assert.fail("Failed executing " + queryString[2], e);
}
assertEquals(numberOfEntries, results.size());
for (String result : results) {
assertEquals(randomString, result);
}
}
};
vm3.invoke(executeQueries);
// Check for TestObject instances on Server2.
// It should be 0
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, TestObject.numInstance);
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests client-server query on PdxInstance. The client receives projected value.
*/
@Test
public void testVersionedClass() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
try {
for (int i = 0; i < numberOfEntries; i++) {
PdxInstanceFactory pdxFactory =
PdxInstanceFactoryImpl.newCreator("PdxTestObject", false, getCache());
pdxFactory.writeInt("id", i);
pdxFactory.writeString("ticker", "vmware");
pdxFactory.writeString("idTickers", i + "vmware");
PdxInstance pdxInstance = pdxFactory.create();
region.put("key-" + i, pdxInstance);
}
} catch (Exception ex) {
Assert.fail("Failed to load the class.", ex);
}
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
System.out.println("##### Region size is: " + region.size());
assertEquals(0, TestObject.numInstance);
}
});
// Create client region
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueriesWithParamsPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
// Execute client queries
SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
@Override
public void run2() throws CacheException {
SelectResults results = null;
QueryService qService = null;
try {
qService = (PoolManager.find(poolName)).getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
try {
logger.info("### Executing Query :" + queryString[0]);
Query query = qService.newQuery(queryString[0]);
results = (SelectResults) query.execute();
} catch (Exception e) {
Assert.fail("Failed executing " + queryString[0], e);
}
assertEquals(numberOfEntries, results.size());
}
};
vm3.invoke(executeQueries);
// Check for TestObject instances on Server2.
// It should be 0
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, TestObject.numInstance);
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests client-server query on PdxInstance.
*/
@Test
public void testClientServerQuery() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
for (int i = 0; i < numberOfEntries; i++) {
region.put("key-" + i, new TestObject(i, "vmware"));
}
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
}
});
// Client pool.
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueryPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
// Create client region
vm3.invoke(new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
QueryService localQueryService = null;
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
Region region = createRegion(regionName, rootRegionName, factory.create());
for (int i = 0; i < numberOfEntries; i++) {
region.put("key-" + i, new TestObject(i, "vmware"));
}
// Execute query locally.
try {
localQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < 3; i++) {
try {
Query query = localQueryService.newQuery(queryString[i]);
SelectResults results = (SelectResults) query.execute();
assertEquals(numberOfEntries, results.size());
} catch (Exception e) {
Assert.fail("Failed executing " + queryString[i], e);
}
}
}
});
// Execute client queries
SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
@Override
public void run2() throws CacheException {
QueryService remoteQueryService = null;
QueryService localQueryService = null;
SelectResults[][] rs = new SelectResults[1][2];
try {
remoteQueryService = (PoolManager.find(poolName)).getQueryService();
localQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 1; i < 3; i++) {
try {
logger.info("### Executing Query on server:" + queryString[i]);
Query query = remoteQueryService.newQuery(queryString[i]);
rs[0][0] = (SelectResults) query.execute();
assertEquals(numberOfEntries, rs[0][0].size());
logger.info("### Executing Query locally:" + queryString[i]);
query = localQueryService.newQuery(queryString[i]);
rs[0][1] = (SelectResults) query.execute();
assertEquals(numberOfEntries, rs[0][1].size());
// Compare local and remote query results.
if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
fail("Local and Remote Query Results are not matching for query :" + queryString[i]);
}
} catch (Exception e) {
Assert.fail("Failed executing " + queryString[i], e);
}
}
assertEquals(2 * numberOfEntries, TestObject.numInstance);
}
};
vm3.invoke(executeQueries);
// Check for TestObject instances on Server2.
// It should be 0
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, TestObject.numInstance);
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests client-server query on PdxInstance.
*/
@Test
public void testClientServerQueryWithRangeIndex() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final String[] qs = new String[] {"SELECT * FROM " + regName + " p WHERE p.ID > 0",
"SELECT p FROM " + regName + " p WHERE p.ID > 0",
"SELECT * FROM " + regName + " p WHERE p.ID = 1",
"SELECT * FROM " + regName + " p WHERE p.ID < 10",
"SELECT * FROM " + regName + " p WHERE p.ID != 10",
"SELECT * FROM " + regName + " p, p.positions.values pos WHERE p.ID > 0",
"SELECT * FROM " + regName + " p, p.positions.values pos WHERE p.ID = 10",
"SELECT p, pos FROM " + regName + " p, p.positions.values pos WHERE p.ID > 0",
"SELECT p, pos FROM " + regName + " p, p.positions.values pos WHERE p.ID = 10",
"SELECT pos FROM " + regName + " p, p.positions.values pos WHERE p.ID > 0",
"SELECT p, pos FROM " + regName + " p, p.positions.values pos WHERE pos.secId != 'XXX'",
"SELECT pos FROM " + regName + " p, p.positions.values pos WHERE pos.secId != 'XXX'",
"SELECT pos FROM " + regName + " p, p.positions.values pos WHERE pos.secId = 'SUN'",
"SELECT p, pos FROM " + regName + " p, p.positions.values pos WHERE pos.secId = 'SUN'",
"SELECT p, pos FROM " + regName + " p, p.positions.values pos WHERE pos.secId = 'DELL'",
"SELECT * FROM " + regName + " p, p.positions.values pos WHERE pos.secId = 'SUN'",
"SELECT * FROM " + regName + " p, p.positions.values pos WHERE pos.secId = 'DELL'",
"SELECT p, p.position1 FROM " + regName + " p where p.position1.secId != 'XXX'",
"SELECT p, p.position1 FROM " + regName + " p where p.position1.secId = 'SUN'",
"SELECT p.position1 FROM " + regName + " p WHERE p.ID > 0",
"SELECT * FROM " + regName + " p WHERE p.status = 'active'",
"SELECT p FROM " + regName + " p WHERE p.status != 'active'",};
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(false, false, true, null); // Async index
Region region = getRootRegion().getSubregion(regionName);
// Create Range index.
QueryService qs = getCache().getQueryService();
try {
qs.createIndex("idIndex", "p.ID", regName + " p");
qs.createIndex("statusIndex", "p.status", regName + " p");
qs.createIndex("secIdIndex", "pos.secId", regName + " p, p.positions.values pos");
qs.createIndex("pSecIdIdIndex", "p.position1.secId", regName + " p");
} catch (Exception ex) {
fail("Failed to create index." + ex.getMessage());
}
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(false, false, true, null); // Async index
Region region = getRootRegion().getSubregion(regionName);
}
});
// Client pool.
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueryPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
// Create client region
vm3.invoke(new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
Region region = createRegion(regionName, rootRegionName, factory.create());
int j = 0;
for (int i = 0; i < 100; i++) {
region.put("key-" + i, new PortfolioPdx(j, j++));
// To add duplicate:
if (i % 24 == 0) {
j = 0; // reset
}
}
}
});
// Execute query and make sure there is no PdxInstance in the results.
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
// Execute query locally.
QueryService queryService = getCache().getQueryService();
for (int i = 0; i < qs.length; i++) {
try {
Query query = queryService.newQuery(qs[i]);
SelectResults results = (SelectResults) query.execute();
for (Object o : results.asList()) {
if (o instanceof Struct) {
Object[] values = ((Struct) o).getFieldValues();
for (int c = 0; c < values.length; c++) {
if (values[c] instanceof PdxInstance) {
fail("Found unexpected PdxInstance in the query results. At struct field [" + c
+ "] query :" + qs[i] + " Object is: " + values[c]);
}
}
} else {
if (o instanceof PdxInstance) {
fail("Found unexpected PdxInstance in the query results. " + qs[i]);
}
}
}
} catch (Exception e) {
Assert.fail("Failed executing " + qs[i], e);
}
}
}
});
// Re-execute query to fetch PdxInstance in the results.
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
// Execute query locally.
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
cache.setReadSerializedForTest(true);
try {
QueryService queryService = getCache().getQueryService();
for (int i = 0; i < qs.length; i++) {
try {
Query query = queryService.newQuery(qs[i]);
SelectResults results = (SelectResults) query.execute();
for (Object o : results.asList()) {
if (o instanceof Struct) {
Object[] values = ((Struct) o).getFieldValues();
for (int c = 0; c < values.length; c++) {
if (!(values[c] instanceof PdxInstance)) {
fail(
"Didn't found expected PdxInstance in the query results. At struct field ["
+ c + "] query :" + qs[i] + " Object is: " + values[c]);
}
}
} else {
if (!(o instanceof PdxInstance)) {
fail("Didn't found expected PdxInstance in the query results. " + qs[i]
+ " Object is: " + o);
}
}
}
} catch (Exception e) {
Assert.fail("Failed executing " + qs[i], e);
}
}
} finally {
cache.setReadSerializedForTest(false);
}
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests client-server query on PdxInstance.
*/
@Test
public void testClientServerCountQuery() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
final String queryStr = "SELECT COUNT(*) FROM " + regName + " WHERE id >= 0";
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
for (int i = 0; i < numberOfEntries; i++) {
region.put("key-" + i, new TestObject(i, "vmware"));
}
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
}
});
// Client pool.
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueryPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
// Create client region
vm3.invoke(new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
QueryService localQueryService = null;
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
Region region = createRegion(regionName, rootRegionName, factory.create());
for (int i = 0; i < numberOfEntries; i++) {
region.put("key-" + i, new TestObject(i, "vmware"));
}
// Execute query locally.
try {
localQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
try {
Query query = localQueryService.newQuery(queryStr);
SelectResults results = (SelectResults) query.execute();
assertEquals(numberOfEntries, ((Integer) results.asList().get(0)).intValue());
} catch (Exception e) {
Assert.fail("Failed executing " + queryStr, e);
}
}
});
// Execute client queries
SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
@Override
public void run2() throws CacheException {
QueryService remoteQueryService = null;
QueryService localQueryService = null;
SelectResults[][] rs = new SelectResults[1][2];
try {
remoteQueryService = (PoolManager.find(poolName)).getQueryService();
localQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
try {
logger.info("### Executing Query on server:" + queryStr);
Query query = remoteQueryService.newQuery(queryStr);
rs[0][0] = (SelectResults) query.execute();
assertEquals(numberOfEntries, ((Integer) rs[0][0].asList().get(0)).intValue());
logger.info("### Executing Query locally:" + queryStr);
query = localQueryService.newQuery(queryStr);
rs[0][1] = (SelectResults) query.execute();
assertEquals(numberOfEntries, ((Integer) rs[0][1].asList().get(0)).intValue());
// Compare local and remote query results.
if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
fail("Local and Remote Query Results are not matching for query :" + queryStr);
}
} catch (Exception e) {
Assert.fail("Failed executing " + queryStr, e);
}
assertEquals(numberOfEntries, TestObject.numInstance);
}
};
vm3.invoke(executeQueries);
// Check for TestObject instances on Server2.
// It should be 0
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, TestObject.numInstance);
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests client-server query on PdxInstance.
*/
@Test
public void testVersionedClientServerQuery() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
final String[] queryStr = new String[] {"SELECT DISTINCT ID FROM " + regName, // 0
"SELECT * FROM " + regName, // 1
"SELECT pkid FROM " + regName, // 2
"SELECT * FROM " + regName + " WHERE ID > 5", // 3
"SELECT p FROM " + regName + " p, p.positions pos WHERE p.pkid != 'vmware'", // 4
"SELECT entry.value FROM " + this.regName + ".entries entry WHERE entry.value.ID > 0",
"SELECT entry.value FROM " + this.regName + ".entries entry WHERE entry.key = 'key-1'",
"SELECT e.value FROM " + this.regName + ".entrySet e where e.value.pkid >= '0'",
"SELECT * FROM " + this.regName + ".values p WHERE p.pkid in SET('1', '2','3')",
"SELECT * FROM " + this.regName + " pf where pf.position1.secId > '2'",
"SELECT * FROM " + this.regName + " p where p.position3[1].portfolioId = 2",
"SELECT * FROM " + this.regName + " p, p.positions.values AS pos WHERE pos.secId != '1'",
"SELECT key, positions FROM " + this.regName + ".entrySet, value.positions.values "
+ "positions WHERE positions.mktValue >= 25.00",
"SELECT * FROM " + this.regName + " portfolio1, " + this.regName + " portfolio2 WHERE "
+ "portfolio1.status = portfolio2.status",
"SELECT portfolio1.ID, portfolio2.status FROM " + this.regName + " portfolio1, "
+ this.regName + " portfolio2 WHERE portfolio1.status = portfolio2.status",
"SELECT * FROM " + this.regName + " portfolio1, portfolio1.positions.values positions1, "
+ this.regName + " portfolio2, portfolio2.positions.values positions2 WHERE "
+ "positions1.secId = positions1.secId ",
"SELECT * FROM " + this.regName + " portfolio, portfolio.positions.values positions WHERE "
+ "portfolio.Pk IN SET ('1', '2') AND positions.secId = '1'",
"SELECT DISTINCT pf1, pf2 FROM " + this.regName
+ " pf1, pf1.collectionHolderMap.values coll1," + " pf1.positions.values posit1, "
+ this.regName + " pf2, pf2.collectionHolderMap.values "
+ " coll2, pf2.positions.values posit2 WHERE pf1.ID = pf2.ID",};
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
}
});
// Client pool.
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueryPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
// Create client region
vm3.invoke(new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
// Load client/server region.
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
Region region = createRegion(regionName, rootRegionName, factory.create());
try {
// Load TestObject
for (int i = 0; i < numberOfEntries; i++) {
PortfolioPdxVersion portfolioPdxVersion =
new PortfolioPdxVersion(new Integer(i), new Integer(i));
PdxInstanceFactory pdxFactory =
PdxInstanceFactoryImpl.newCreator("PortfolioPdxVersion", false, getCache());
PdxInstance pdxInstance = portfolioPdxVersion.createPdxInstance(pdxFactory);
region.put("key-" + i, pdxInstance);
}
} catch (Exception ex) {
fail("Failed to load the class.");
}
// Execute query:
QueryService remoteQueryService = null;
QueryService localQueryService = null;
SelectResults[][] rs = new SelectResults[1][2];
try {
remoteQueryService = (PoolManager.find(poolName)).getQueryService();
localQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < queryStr.length; i++) {
try {
logger.info("### Executing Query on server:" + queryStr[i]);
Query query = remoteQueryService.newQuery(queryStr[i]);
rs[0][0] = (SelectResults) query.execute();
logger.info("### Executing Query locally:" + queryStr[i]);
query = localQueryService.newQuery(queryStr[i]);
rs[0][1] = (SelectResults) query.execute();
logger.info("### Remote Query rs size: " + (rs[0][0]).size() + "Local Query rs size: "
+ (rs[0][1]).size());
// Compare local and remote query results.
if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
fail("Local and Remote Query Results are not matching for query :" + queryStr[i]);
}
} catch (Exception e) {
Assert.fail("Failed executing " + queryStr[i], e);
}
}
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests client-server query on PdxInstance with mixed types.
*/
@Test
public void testClientServerQueryMixedTypes() throws CacheException {
final String[] testQueries = new String[] {"select ticker from /root/" + regionName,
"select ticker from /root/" + regionName + " p where IS_DEFINED(p.ticker)",
"select ticker from /root/" + regionName + " where ticker = 'vmware'",};
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
for (int i = 0; i < numberOfEntries; i++) {
region.put("key-" + i, new TestObject(i, "vmware"));
}
for (int i = numberOfEntries; i < (numberOfEntries + 10); i++) {
region.put("key-" + i, new TestObject2(i));
}
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
}
});
// Client pool.
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueryPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
// Create client region
vm3.invoke(new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
QueryService localQueryService = null;
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
Region region = createRegion(regionName, rootRegionName, factory.create());
for (int i = 0; i < numberOfEntries; i++) {
region.put("key-" + i, new TestObject(i, "vmware"));
}
for (int i = numberOfEntries; i < (numberOfEntries + 10); i++) {
region.put("key-" + i, new TestObject2(i));
}
// Execute query locally.
try {
localQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < 3; i++) {
try {
Query query = localQueryService.newQuery(testQueries[i]);
SelectResults results = (SelectResults) query.execute();
if (i == 0) {
assertEquals(numberOfEntries + 10, results.size());
} else {
assertEquals(numberOfEntries, results.size());
}
} catch (Exception e) {
Assert.fail("Failed executing " + testQueries[i], e);
}
}
}
});
// Execute client queries
SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
@Override
public void run2() throws CacheException {
QueryService remoteQueryService = null;
QueryService localQueryService = null;
SelectResults[][] rs = new SelectResults[1][2];
try {
remoteQueryService = (PoolManager.find(poolName)).getQueryService();
localQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < 3; i++) {
try {
logger.info("### Executing Query on server:" + testQueries[i]);
Query query = remoteQueryService.newQuery(testQueries[i]);
rs[0][0] = (SelectResults) query.execute();
if (i == 0) {
// defined and undefined values returned
assertEquals(numberOfEntries + 10, rs[0][0].size());
} else {
// only defined values of ticker
assertEquals(numberOfEntries, rs[0][0].size());
}
logger.info("### Executing Query locally:" + testQueries[i]);
query = localQueryService.newQuery(testQueries[i]);
rs[0][1] = (SelectResults) query.execute();
if (i == 0) {
assertEquals(numberOfEntries + 10, rs[0][1].size());
} else {
assertEquals(numberOfEntries, rs[0][1].size());
}
assertEquals(rs[0][0].size(), rs[0][1].size());
} catch (Exception e) {
Assert.fail("Failed executing " + testQueries[i], e);
}
}
assertEquals(2 * (numberOfEntries + 5), (TestObject.numInstance + TestObject2.numInstance));
}
};
vm3.invoke(executeQueries);
// Check for TestObject instances on Server2.
// It should be 0
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, TestObject.numInstance);
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests query on with PR.
*/
@Test
public void testQueryOnPR() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 100;
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(true, true);
}
});
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(true, false);
}
});
vm2.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(true, false);
}
});
// Load region.
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
for (int i = 0; i < numberOfEntries; i++) {
region.put("key-" + i, new TestObject(i, "vmware"));
}
}
});
// Client pool.
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueryPool";
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
// Execute client queries
vm3.invoke(new CacheSerializableRunnable("Execute queries") {
@Override
public void run2() throws CacheException {
QueryService remoteQueryService = null;
try {
remoteQueryService = (PoolManager.find(poolName)).getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 1; i < 3; i++) {
try {
logger.info("### Executing Query on server:" + queryString[i]);
Query query = remoteQueryService.newQuery(queryString[i]);
SelectResults rs = (SelectResults) query.execute();
assertEquals(numberOfEntries, rs.size());
} catch (Exception e) {
Assert.fail("Failed executing " + queryString[i], e);
}
}
assertEquals(numberOfEntries, TestObject.numInstance);
}
});
// Check for TestObject instances.
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(numberOfEntries, TestObject.numInstance);
}
});
// Check for TestObject instances.
// It should be 0
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, TestObject.numInstance);
}
});
// Check for TestObject instances.
// It should be 0
vm2.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, TestObject.numInstance);
}
});
// Execute Query on Server2.
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
QueryService qs = getCache().getQueryService();
Query query = null;
SelectResults sr = null;
for (int i = 0; i < queryString.length; i++) {
try {
query = qs.newQuery(queryString[i]);
sr = (SelectResults) query.execute();
} catch (Exception ex) {
fail("Failed to execute query, " + ex.getMessage());
}
for (Object o : sr.asSet()) {
if (i == 0 && !(o instanceof Integer)) {
fail("Expected type Integer, not found in result set. Found type :" + o.getClass());
} else if (i == 1 && !(o instanceof TestObject)) {
fail(
"Expected type TestObject, not found in result set. Found type :" + o.getClass());
} else if (i == 2 && !(o instanceof String)) {
fail("Expected type String, not found in result set. Found type :" + o.getClass());
}
}
}
if (TestObject.numInstance <= 0) {
fail("Expected TestObject instance to be >= 0.");
}
}
});
// Check for TestObject instances.
// It should be 0
vm2.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, TestObject.numInstance);
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests query on with PR.
*/
@Test
public void testLocalPRQuery() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 100;
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(true, true);
}
});
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(true, false);
}
});
vm2.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(true, false);
}
});
vm3.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(true, false);
}
});
// Load region using class loader and execute query on the same thread.
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
try {
// Load TestObject
for (int i = 0; i < numberOfEntries; i++) {
PdxInstanceFactory pdxInstanceFactory =
PdxInstanceFactoryImpl.newCreator("PortfolioPdxVersion", false, getCache());
PortfolioPdxVersion portfolioPdxVersion =
new PortfolioPdxVersion(new Integer(i), new Integer(i));
PdxInstance pdxInstance = portfolioPdxVersion.createPdxInstance(pdxInstanceFactory);
region.put("key-" + i, pdxInstance);
}
} catch (Exception ex) {
Assert.fail("Failed to load the class.", ex);
}
QueryService localQueryService = null;
try {
localQueryService = region.getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 1; i < 3; i++) {
try {
logger.info("### Executing Query on server:" + queryString[i]);
Query query = localQueryService.newQuery(queryString[i]);
SelectResults rs = (SelectResults) query.execute();
assertEquals(numberOfEntries, rs.size());
} catch (Exception e) {
Assert.fail("Failed executing " + queryString[i], e);
}
}
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests query on with PR.
*/
@Test
public void testPdxReadSerializedForPRQuery() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 100;
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(true, true);
}
});
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(true, false);
}
});
vm2.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(true, false);
}
});
vm3.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(true, false);
}
});
// Load region using class loader and execute query on the same thread.
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
try {
// Load TestObject
for (int i = 0; i < numberOfEntries; i++) {
PortfolioPdxVersion portfolioPdxVersion =
new PortfolioPdxVersion(new Integer(i), new Integer(i));
PdxInstanceFactory pdxInstanceFactory =
PdxInstanceFactoryImpl.newCreator("PortfolioPdxVersion", false, getCache());
PdxInstance pdxInstance = portfolioPdxVersion.createPdxInstance(pdxInstanceFactory);
region.put("key-" + i, pdxInstance);
}
} catch (Exception ex) {
fail("Failed to load the class.");
}
QueryService localQueryService = null;
try {
localQueryService = region.getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 1; i < 3; i++) {
try {
logger.info("### Executing Query on server:" + queryString[i]);
Query query = localQueryService.newQuery(queryString[i]);
SelectResults rs = (SelectResults) query.execute();
assertEquals(numberOfEntries, rs.size());
} catch (Exception e) {
Assert.fail("Failed executing " + queryString[i], e);
}
}
}
});
final String[] qs = new String[] {"SELECT * FROM " + regName,
"SELECT * FROM " + regName + " WHERE ID > 5", "SELECT p FROM " + regName
+ " p, p.positions.values pos WHERE p.ID > 2 or pos.secId = 'vmware'",};
// Execute query on node without class and with pdxReadSerialized.
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
GemFireCacheImpl c = (GemFireCacheImpl) region.getCache();
try {
// Set read serialized.
c.setReadSerializedForTest(true);
QueryService localQueryService = null;
try {
localQueryService = region.getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
// This should not throw class not found exception.
for (int i = 1; i < qs.length; i++) {
try {
logger.info("### Executing Query on server:" + qs[i]);
Query query = localQueryService.newQuery(qs[i]);
SelectResults rs = (SelectResults) query.execute();
for (Object o : rs.asSet()) {
if (!(o instanceof PdxInstance)) {
fail("Expected type PdxInstance, not found in result set. Found type :"
+ o.getClass());
}
}
} catch (Exception e) {
Assert.fail("Failed executing " + qs[i], e);
}
}
} finally {
c.setReadSerializedForTest(false);
}
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
@Test
public void testPdxReadSerializedForPRSelectAllQuery() throws CacheException {
VM vm0 = VM.getVM(0);
VM vm1 = VM.getVM(1);
final int numPuts = 10;
vm0.invoke(new CacheSerializableRunnable("Create Bridge Server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(true, false);
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
cache.setReadSerializedForTest(true);
}
});
vm1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(true, false);
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
cache.setReadSerializedForTest(true);
}
});
vm0.invoke(() -> {
Region region = getRootRegion().getSubregion(regionName);
for (int i = 0; i < numPuts; i++) {
region.put("key-" + i, new TestObject(i, "val-" + i));
}
});
vm0.invoke(() -> {
QueryService qs = getCache().getQueryService();
Query query = qs.newQuery(queryString[1]);
SelectResults sr = (SelectResults) query.execute();
assertThat(sr.size()).isEqualTo(numPuts);
for (Object result : sr) {
assertThat(result).isInstanceOf(PdxInstance.class);
}
});
}
/**
* Tests index on PdxInstance.
*/
@Test
public void testIndex() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
for (int i = 0; i < numberOfEntries; i++) {
if (i % 2 == 0) {
region.put("key-" + i, new TestObject(i, "vmware"));
} else {
region.put("key-" + i, new TestObject(i, "vmware" + i));
}
}
try {
QueryService qs = getCache().getQueryService();
qs.createIndex("idIndex", IndexType.FUNCTIONAL, "id", regName);
qs.createIndex("tickerIndex", IndexType.FUNCTIONAL, "p.ticker", regName + " p");
qs.createIndex("tickerIdTickerMapIndex", IndexType.FUNCTIONAL, "p.ticker",
regName + " p, p.idTickers idTickers");
} catch (Exception ex) {
fail("Unable to create index. " + ex.getMessage());
}
assertEquals(numberOfEntries, TestObject.numInstance);
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
try {
QueryService qs = getCache().getQueryService();
qs.createIndex("idIndex", IndexType.FUNCTIONAL, "id", regName);
qs.createIndex("tickerIndex", IndexType.FUNCTIONAL, "p.ticker", regName + " p");
qs.createIndex("tickerIdTickerMapIndex", IndexType.FUNCTIONAL, "p.ticker",
regName + " p, p.idTickers idTickers");
} catch (Exception ex) {
fail("Unable to create index. " + ex.getMessage());
}
assertEquals(0, TestObject.numInstance);
}
});
// Client pool.
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueryPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
// Create client region
SerializableRunnable createClientRegions = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
Region region = createRegion(regionName, rootRegionName, factory.create());
for (int i = 0; i < numberOfEntries * 2; i++) {
if (i % 2 == 0) {
region.put("key-" + i, new TestObject(i, "vmware"));
} else {
region.put("key-" + i, new TestObject(i, "vmware" + i));
}
}
}
};
vm2.invoke(createClientRegions);
vm3.invoke(createClientRegions);
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(numberOfEntries, TestObject.numInstance);
}
});
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, TestObject.numInstance);
}
});
// Execute client queries
SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
@Override
public void run2() throws CacheException {
QueryService remoteQueryService = null;
QueryService localQueryService = null;
SelectResults[][] rs = new SelectResults[1][2];
try {
remoteQueryService = (PoolManager.find(poolName)).getQueryService();
localQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < 3; i++) {
try {
logger.info("### Executing Query on server:" + queryString[i]);
Query query = remoteQueryService.newQuery(queryString[i]);
rs[0][0] = (SelectResults) query.execute();
assertEquals(numberOfEntries * 2, rs[0][0].size());
logger.info("### Executing Query locally:" + queryString[i]);
query = localQueryService.newQuery(queryString[i]);
rs[0][1] = (SelectResults) query.execute();
assertEquals(numberOfEntries * 2, rs[0][1].size());
// Compare local and remote query results.
if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
fail("Local and Remote Query Results are not matching for query :" + queryString[i]);
}
} catch (Exception e) {
Assert.fail("Failed executing " + queryString[i], e);
}
}
assertEquals(4 * numberOfEntries, TestObject.numInstance);
for (int i = 3; i < queryString.length; i++) {
try {
logger.info("### Executing Query on server:" + queryString[i]);
Query query = remoteQueryService.newQuery(queryString[i]);
rs[0][0] = (SelectResults) query.execute();
logger.info("### Executing Query locally:" + queryString[i]);
query = localQueryService.newQuery(queryString[i]);
rs[0][1] = (SelectResults) query.execute();
// Compare local and remote query results.
if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
fail("Local and Remote Query Results are not matching for query :" + queryString[i]);
}
} catch (Exception e) {
Assert.fail("Failed executing " + queryString[i], e);
}
}
}
};
vm2.invoke(executeQueries);
vm3.invoke(executeQueries);
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(numberOfEntries, TestObject.numInstance);
}
});
// Check for TestObject instances on Server2.
// It should be 0
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, TestObject.numInstance);
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests client-server query with region iterators.
*/
@Test
public void testRegionIterators() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
final String[] queries = new String[] {
"SELECT entry.value FROM " + this.regName + ".entries entry WHERE entry.value.id > 0",
"SELECT entry.value FROM " + this.regName + ".entries entry WHERE entry.key = 'key-1'",
"SELECT e.value FROM " + this.regName + ".entrySet e where e.value.id >= 0",
"SELECT * FROM " + this.regName + ".values p WHERE p.ticker = 'vmware'",};
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
for (int i = 0; i < numberOfEntries; i++) {
region.put("key-" + i, new TestObject(i, "vmware"));
}
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
}
});
// Client pool.
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueryPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
// Create client region
vm3.invoke(new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
QueryService localQueryService = null;
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
Region region = createRegion(regionName, rootRegionName, factory.create());
for (int i = 0; i < numberOfEntries; i++) {
region.put("key-" + i, new TestObject(i, "vmware"));
}
}
});
// Execute client queries
SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
@Override
public void run2() throws CacheException {
QueryService remoteQueryService = null;
QueryService localQueryService = null;
SelectResults[][] rs = new SelectResults[1][2];
try {
remoteQueryService = (PoolManager.find(poolName)).getQueryService();
localQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < queries.length; i++) {
try {
logger.info("### Executing Query on server:" + queries[i]);
Query query = remoteQueryService.newQuery(queries[i]);
rs[0][0] = (SelectResults) query.execute();
logger.info("### Executing Query locally:" + queries[i]);
query = localQueryService.newQuery(queries[i]);
rs[0][1] = (SelectResults) query.execute();
// Compare local and remote query results.
if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
fail("Local and Remote Query Results are not matching for query :" + queries[i]);
}
} catch (Exception e) {
Assert.fail("Failed executing " + queries[i], e);
}
}
}
};
vm3.invoke(executeQueries);
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, TestObject.numInstance);
}
});
// Create index
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
QueryService qs = getCache().getQueryService();
try {
qs.createIndex("idIndex", IndexType.FUNCTIONAL, "entry.value.id",
regName + ".entries entry");
qs.createIndex("tickerIndex", IndexType.FUNCTIONAL, "p.ticker", regName + ".values p");
} catch (Exception ex) {
fail("Unable to create index. " + ex.getMessage());
}
}
});
vm3.invoke(executeQueries);
// Check for TestObject instances.
// It should be 0
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, TestObject.numInstance);
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests client-server query with nested and collection of Pdx.
*/
@Test
public void testNestedAndCollectionPdx() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 50;
final String[] queries = new String[] {
"SELECT * FROM " + this.regName + " pf where pf.position1.secId > '2'",
"SELECT * FROM " + this.regName + " p where p.position3[1].portfolioId = 2",
"SELECT * FROM " + this.regName + " p, p.positions.values AS pos WHERE pos.secId != '1'",
"SELECT key, positions FROM " + this.regName + ".entrySet, value.positions.values "
+ "positions WHERE positions.mktValue >= 25.00",
"SELECT * FROM " + this.regName + " portfolio1, " + this.regName2 + " portfolio2 WHERE "
+ "portfolio1.status = portfolio2.status",
"SELECT portfolio1.ID, portfolio2.status FROM " + this.regName + " portfolio1, "
+ this.regName + " portfolio2 WHERE portfolio1.status = portfolio2.status",
"SELECT * FROM " + this.regName + " portfolio1, portfolio1.positions.values positions1, "
+ this.regName + " portfolio2, portfolio2.positions.values positions2 WHERE "
+ "positions1.secId = positions2.secId ",
"SELECT * FROM " + this.regName + " portfolio, portfolio.positions.values positions WHERE "
+ "portfolio.Pk IN SET ('1', '2') AND positions.secId = '1'",
"SELECT DISTINCT * FROM " + this.regName + " pf1, pf1.collectionHolderMap.values coll1,"
+ " pf1.positions.values posit1, " + this.regName2
+ " pf2, pf2.collectionHolderMap.values "
+ " coll2, pf2.positions.values posit2 WHERE posit1.secId='IBM' AND posit2.secId='IBM'",};
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region1 = getRootRegion().getSubregion(regionName);
Region region2 = getRootRegion().getSubregion(regionName2);
for (int i = 0; i < numberOfEntries; i++) {
region1.put("key-" + i, new PortfolioPdx(i, i));
region2.put("key-" + i, new PortfolioPdx(i, i));
}
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
Region region2 = getRootRegion().getSubregion(regionName2);
}
});
// Client pool.
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueryPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
// Create client region
vm3.invoke(new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
Region region1 = createRegion(regionName, rootRegionName, factory.create());
Region region2 = createRegion(regionName2, rootRegionName, factory.create());
for (int i = 0; i < numberOfEntries; i++) {
region1.put("key-" + i, new PortfolioPdx(i, i));
region2.put("key-" + i, new PortfolioPdx(i, i));
}
}
});
// Execute client queries
SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
@Override
public void run2() throws CacheException {
QueryService remoteQueryService = null;
QueryService localQueryService = null;
SelectResults[][] rs = new SelectResults[1][2];
try {
remoteQueryService = (PoolManager.find(poolName)).getQueryService();
localQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < queries.length; i++) {
try {
logger.info("### Executing Query on server:" + queries[i]);
Query query = remoteQueryService.newQuery(queries[i]);
rs[0][0] = (SelectResults) query.execute();
logger.info("### Executing Query locally:" + queries[i]);
query = localQueryService.newQuery(queries[i]);
rs[0][1] = (SelectResults) query.execute();
// Compare local and remote query results.
if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
fail("Local and Remote Query Results are not matching for query :" + queries[i]);
}
} catch (Exception e) {
Assert.fail("Failed executing " + queries[i], e);
}
}
}
};
vm3.invoke(executeQueries);
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, PortfolioPdx.numInstance);
assertEquals(0, PositionPdx.numInstance);
}
});
// Create index
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
QueryService qs = getCache().getQueryService();
try {
qs.createIndex("pkIndex", IndexType.FUNCTIONAL, "portfolio.Pk", regName + " portfolio");
qs.createIndex("secIdIndex", IndexType.FUNCTIONAL, "pos.secId",
regName + " p, p.positions.values AS pos");
qs.createIndex("tickerIndex", IndexType.FUNCTIONAL, "pf.position1.secId",
regName + " pf");
qs.createIndex("secIdIndexPf1", IndexType.FUNCTIONAL, "pos11.secId",
regName + " pf1, pf1.collectionHolderMap.values coll1, pf1.positions.values pos11");
qs.createIndex("secIdIndexPf2", IndexType.FUNCTIONAL, "pos22.secId",
regName2 + " pf2, pf2.collectionHolderMap.values coll2, pf2.positions.values pos22");
} catch (Exception ex) {
fail("Unable to create index. " + ex.getMessage());
}
}
});
vm3.invoke(executeQueries);
// index is created on portfolio.Pk field which does not exists in
// PorfolioPdx object
// but there is a method getPk(), so for #44436, the objects are
// deserialized to get the value in vm1
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(numberOfEntries, PortfolioPdx.numInstance);
assertEquals(325, PositionPdx.numInstance); // 50 PorforlioPdx objects
// create (50*3)+50+50+50+25
// = 325 PositionPdx objects
// when deserialized
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests client-server query with nested and collection of Pdx.
*/
@Test
public void testNestedAndCollectionPdxWithPR() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 50;
final String[] queries = new String[] {
"SELECT * FROM " + this.regName + " pf where pf.position1.secId > '2'",
"SELECT * FROM " + this.regName + " p where p.position3[1].portfolioId = 2",
"SELECT * FROM " + this.regName + " p, p.positions.values AS pos WHERE pos.secId != '1'",
"SELECT key, positions FROM " + this.regName + ".entrySet, value.positions.values "
+ "positions WHERE positions.mktValue >= 25.00",
"SELECT * FROM " + this.regName + " portfolio1, " + this.regName2 + " portfolio2 WHERE "
+ "portfolio1.status = portfolio2.status",
"SELECT portfolio1.ID, portfolio2.status FROM " + this.regName + " portfolio1, "
+ this.regName + " portfolio2 WHERE portfolio1.status = portfolio2.status",
"SELECT * FROM " + this.regName + " portfolio1, portfolio1.positions.values positions1, "
+ this.regName + " portfolio2, portfolio2.positions.values positions2 WHERE "
+ "positions1.secId = positions2.secId ",
"SELECT * FROM " + this.regName + " portfolio, portfolio.positions.values positions WHERE "
+ "portfolio.Pk IN SET ('1', '2') AND positions.secId = '1'",
"SELECT DISTINCT * FROM " + this.regName + " pf1, pf1.collectionHolderMap.values coll1,"
+ " pf1.positions.values posit1, " + this.regName2
+ " pf2, pf2.collectionHolderMap.values "
+ " coll2, pf2.positions.values posit2 WHERE posit1.secId='IBM' AND posit2.secId='IBM'",};
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(false, true);
Region region1 = getRootRegion().getSubregion(regionName);
Region region2 = getRootRegion().getSubregion(regionName2);
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(false, true);
Region region = getRootRegion().getSubregion(regionName);
Region region2 = getRootRegion().getSubregion(regionName2);
}
});
// Start server2
vm2.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer(false, true);
Region region = getRootRegion().getSubregion(regionName);
Region region2 = getRootRegion().getSubregion(regionName2);
}
});
// Client pool.
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueryPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
// Create client region
vm3.invoke(new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
QueryService localQueryService = null;
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
Region region1 = createRegion(regionName, rootRegionName, factory.create());
Region region2 = createRegion(regionName2, rootRegionName, factory.create());
for (int i = 0; i < numberOfEntries; i++) {
region1.put("key-" + i, new PortfolioPdx(i, i));
region2.put("key-" + i, new PortfolioPdx(i, i));
}
}
});
// Execute client queries
SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
@Override
public void run2() throws CacheException {
QueryService remoteQueryService = null;
QueryService localQueryService = null;
SelectResults[][] rs = new SelectResults[1][2];
try {
remoteQueryService = (PoolManager.find(poolName)).getQueryService();
localQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < queries.length; i++) {
try {
logger.info("### Executing Query on server:" + queries[i]);
Query query = remoteQueryService.newQuery(queries[i]);
rs[0][0] = (SelectResults) query.execute();
logger.info("### Executing Query locally:" + queries[i]);
query = localQueryService.newQuery(queries[i]);
rs[0][1] = (SelectResults) query.execute();
// Compare local and remote query results.
if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
fail("Local and Remote Query Results are not matching for query :" + queries[i]);
}
} catch (Exception e) {
Assert.fail("Failed executing " + queries[i], e);
}
}
}
};
vm3.invoke(executeQueries);
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, PortfolioPdx.numInstance);
assertEquals(0, PositionPdx.numInstance);
}
});
// Create index
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
QueryService qs = getCache().getQueryService();
try {
qs.createIndex("pkIndex", IndexType.FUNCTIONAL, "portfolio.Pk", regName + " portfolio");
qs.createIndex("idIndex", IndexType.FUNCTIONAL, "pos.secId",
regName + " p, p.positions.values AS pos");
qs.createIndex("tickerIndex", IndexType.FUNCTIONAL, "pf.position1.secId",
regName + " pf");
qs.createIndex("secIdIndexPf1", IndexType.FUNCTIONAL, "pos11.secId",
regName + " pf1, pf1.collectionHolderMap.values coll1, pf1.positions.values pos11");
qs.createIndex("secIdIndexPf2", IndexType.FUNCTIONAL, "pos22.secId",
regName2 + " pf2, pf2.collectionHolderMap.values coll2, pf2.positions.values pos22");
} catch (Exception ex) {
fail("Unable to create index. " + ex.getMessage());
}
}
});
vm3.invoke(executeQueries);
// Check for TestObject instances.
// It should be 0
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, PortfolioPdx.numInstance);
assertEquals(0, PositionPdx.numInstance);
}
});
// index is created on portfolio.Pk field which does not exists in
// PorfolioPdx object
// but there is a method getPk(), so for #44436, the objects are
// deserialized to get the value in vm1
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(numberOfEntries, PortfolioPdx.numInstance);
// 50 PorforlioPdx objects create (50*3)+50+50+50+25 = 325 PositionPdx
// objects when deserialized
assertEquals(325, PositionPdx.numInstance);
}
});
vm2.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, PortfolioPdx.numInstance);
assertEquals(0, PositionPdx.numInstance);
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests identity of Pdx.
*/
@Test
public void testPdxIdentity() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
final String queryStr =
"SELECT DISTINCT * FROM " + this.regName + " pf where pf.ID > 2 and pf.ID < 10";
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
}
});
// Client pool.
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueryPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
final int dupIndex = 2;
// Create client region
vm3.invoke(new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
Region region = createRegion(regionName, rootRegionName, factory.create());
int j = 0;
for (int i = 0; i < numberOfEntries * 2; i++) {
// insert duplicate values.
if (i % dupIndex == 0) {
j++;
}
region.put("key-" + i, new PortfolioPdx(j));
}
}
});
// Execute client queries
SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
@Override
public void run2() throws CacheException {
QueryService remoteQueryService = null;
QueryService localQueryService = null;
SelectResults[][] rs = new SelectResults[1][2];
try {
remoteQueryService = (PoolManager.find(poolName)).getQueryService();
localQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
int expectedResultSize = 7;
try {
logger.info("### Executing Query on server:" + queryStr);
Query query = remoteQueryService.newQuery(queryStr);
rs[0][0] = (SelectResults) query.execute();
assertEquals(expectedResultSize, rs[0][0].size());
logger.info("### Executing Query locally:" + queryStr);
query = localQueryService.newQuery(queryStr);
rs[0][1] = (SelectResults) query.execute();
assertEquals(expectedResultSize, rs[0][1].size());
logger.info("### Remote Query rs size: " + (rs[0][0]).size() + "Local Query rs size: "
+ (rs[0][1]).size());
// Compare local and remote query results.
if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
fail("Local and Remote Query Results are not matching for query :" + queryStr);
}
} catch (Exception e) {
Assert.fail("Failed executing " + queryStr, e);
}
}
};
// vm2.invoke(executeQueries);
vm3.invoke(executeQueries);
// Check for TestObject instances on Server2.
// It should be 0
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
assertEquals(0, PortfolioPdx.numInstance);
}
});
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests function calls in the query.
*/
@Test
public void testFunctionCalls() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
final String[] queryStr =
new String[] {"SELECT * FROM " + this.regName + " pf where pf.getIdValue() > 0", // 0
"SELECT * FROM " + this.regName + " pf where pf.test.getId() > 0", // 1
"SELECT * FROM " + this.regName + " pf, pf.positions.values pos where "
+ "pos.getSecId() != 'VMWARE'", // 2
"SELECT * FROM " + this.regName + " pf, pf.positions.values pos where "
+ "pf.getIdValue() > 0 and pos.getSecId() != 'VMWARE'", // 3
"SELECT * FROM " + this.regName + " pf, pf.getPositions('test').values pos where "
+ "pos.getSecId() != 'VMWARE'", // 4
"SELECT * FROM " + this.regName + " pf, pf.getPositions('test').values pos where "
+ "pf.id > 0 and pos.getSecId() != 'IBM'", // 5
"SELECT * FROM " + this.regName + " pf, pf.getPositions('test').values pos where "
+ "pf.getIdValue() > 0 and pos.secId != 'IBM'", // 6
};
final int numPositionsPerTestObject = 2;
final int[] numberOfTestObjectForAllQueries = new int[] {numberOfEntries, // Query 0
0, // Query 1
0, // Query 2
numberOfEntries, // Query 3
numberOfEntries, // Query 4
numberOfEntries, // Query 5
numberOfEntries, // Query 6
};
final int[] numberOfTestObject2ForAllQueries = new int[] {numberOfEntries, // Query 0
numberOfEntries, // Query 1
0, // Query 2
numberOfEntries, // Query 3
numberOfEntries, // Query 4
numberOfEntries, // Query 5
numberOfEntries, // Query 6
};
final int[] numberOfPositionPdxForAllQueries =
new int[] {(numberOfEntries * numPositionsPerTestObject), // Query 0
0, // Query 1
(numberOfEntries * numPositionsPerTestObject), // 2
(numberOfEntries * numPositionsPerTestObject * 2), // 3
(numberOfEntries * numPositionsPerTestObject), // 4
(numberOfEntries * numPositionsPerTestObject), // 5
(numberOfEntries * numPositionsPerTestObject), // 6
};
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
}
});
// Client pool.
final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// Create client pool.
final String poolName = "testClientServerQueryPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);
final int dupIndex = 2;
// Create client region
vm3.invoke(new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
Region region = createRegion(regionName, rootRegionName, factory.create());
int j = 0;
for (int i = 0; i < numberOfEntries; i++) {
// insert duplicate values.
if (i % dupIndex == 0) {
j++;
}
region.put("key-" + i, new TestObject(j, "vmware", numPositionsPerTestObject));
}
}
});
for (int i = 0; i < queryStr.length; i++) {
final int testObjectCnt = numberOfTestObjectForAllQueries[i];
final int positionObjectCnt = numberOfPositionPdxForAllQueries[i];
final int testObjCnt = numberOfTestObject2ForAllQueries[i];
executeClientQueries(vm3, poolName, queryStr[i]);
// Check for TestObject instances on Server2.
vm1.invoke(new CacheSerializableRunnable("validate") {
@Override
public void run2() throws CacheException {
assertEquals(testObjectCnt, TestObject.numInstance);
assertEquals(positionObjectCnt, PositionPdx.numInstance);
assertEquals(testObjCnt, TestObject2.numInstance);
// Reset the instances
TestObject.numInstance = 0;
PositionPdx.numInstance = 0;
TestObject2.numInstance = 0;
}
});
}
this.closeClient(vm2);
this.closeClient(vm3);
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* This test creates 3 cache servers with a PR and one client which puts some PDX values in PR and
* runs a query. This was failing randomly in a POC.
*/
@Test
public void testPutAllWithIndexes() {
final String name = "testRegion";
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final Properties config = new Properties();
config.setProperty("locators", "localhost[" + DistributedTestUtils.getDUnitLocatorPort() + "]");
// Start server
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
Cache cache = new CacheFactory(config).create();
AttributesFactory factory = new AttributesFactory();
PartitionAttributesFactory prfactory = new PartitionAttributesFactory();
prfactory.setRedundantCopies(0);
factory.setPartitionAttributes(prfactory.create());
cache.createRegionFactory(factory.create()).create(name);
try {
startCacheServer(0, false);
} catch (Exception ex) {
Assert.fail("While starting CacheServer", ex);
}
// Create Index on empty region
try {
cache.getQueryService().createIndex("myFuncIndex", "intId", "/" + name);
} catch (Exception e) {
Assert.fail("index creation failed", e);
}
}
});
// Start server
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
Cache cache = new CacheFactory(config).create();
AttributesFactory factory = new AttributesFactory();
PartitionAttributesFactory prfactory = new PartitionAttributesFactory();
prfactory.setRedundantCopies(0);
factory.setPartitionAttributes(prfactory.create());
cache.createRegionFactory(factory.create()).create(name);
try {
startCacheServer(0, false);
} catch (Exception ex) {
Assert.fail("While starting CacheServer", ex);
}
}
});
// Start server
vm2.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
Cache cache = new CacheFactory(config).create();
AttributesFactory factory = new AttributesFactory();
PartitionAttributesFactory prfactory = new PartitionAttributesFactory();
prfactory.setRedundantCopies(0);
factory.setPartitionAttributes(prfactory.create());
cache.createRegionFactory(factory.create()).create(name);
try {
startCacheServer(0, false);
} catch (Exception ex) {
Assert.fail("While starting CacheServer", ex);
}
}
});
// Create client region
final int port = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm2.getHost());
vm3.invoke(new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
Properties config = new Properties();
config.setProperty("mcast-port", "0");
ClientCache cache = new ClientCacheFactory(config).addPoolServer(host0, port)
.setPoolPRSingleHopEnabled(true).setPoolSubscriptionEnabled(true).create();
AttributesFactory factory = new AttributesFactory();
cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(name);
}
});
vm3.invoke(new CacheSerializableRunnable("putAll() test") {
@Override
public void run2() throws CacheException {
try {
ClientCache cache = new ClientCacheFactory().create();
Region region = cache.getRegion(name);
QueryService queryService = cache.getQueryService();
String k;
for (int x = 0; x < 285; x++) {
k = Integer.valueOf(x).toString();
PortfolioPdx v = new PortfolioPdx(x, x);
region.put(k, v);
}
Query q = queryService.newQuery("SELECT DISTINCT * from /" + name + " WHERE ID = 2");
SelectResults qResult = (SelectResults) q.execute();
for (Object o : qResult.asList()) {
System.out.println("o = " + o);
}
} catch (Exception e) {
Assert.fail("Querying failed: ", e);
}
}
});
Invoke.invokeInEveryVM(DistributedTestCase.class, "disconnectFromDS");
// }
}
/**
* In PeerTypeRegistration when a PdxType is updated, a local map of class => PdxTypes is
* populated. This map is used to search a field for a class in different versions (PdxTypes) This
* test verifies that the map is being updated by the cacheListener
*
*/
@Test
public void testLocalMapInPeerTypePdxRegistry() throws CacheException {
final Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
final VM vm1 = host.getVM(1);
final VM vm2 = host.getVM(2);
final VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
final String name = "/" + regionName;
final String[] qs = {"select * from " + name + " where pdxStatus = 'active'",
"select pdxStatus from " + name + " where id > 4"};
// Start server1
final int port1 = (Integer) vm0.invoke(new SerializableCallable("Create Server1") {
@Override
public Object call() throws Exception {
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
server.setPort(port);
server.start();
return port;
}
});
// Start server2
final int port2 = (Integer) vm1.invoke(new SerializableCallable("Create Server2") {
@Override
public Object call() throws Exception {
Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
server.setPort(port);
server.start();
return port;
}
});
// client1 loads version 1 objects on server1
vm2.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(NetworkUtils.getServerHostName(vm0.getHost()), port1);
ClientCache cache = getClientCache(cf);
Region region =
cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
// Load version 1 objects
for (int i = 0; i < numberOfEntries; i++) {
PdxInstanceFactory pdxInstanceFactory =
PdxInstanceFactoryImpl.newCreator("PdxVersionedNewPortfolio", false, getCache());
pdxInstanceFactory.writeInt("id", i);
pdxInstanceFactory.writeString("pdxStatus", (i % 2 == 0 ? "active" : "inactive"));
PdxInstance pdxInstance = pdxInstanceFactory.create();
region.put("key-" + i, pdxInstance);
}
return null;
}
});
// client 2 loads version 2 objects on server2
vm3.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(NetworkUtils.getServerHostName(vm1.getHost()), port2);
ClientCache cache = getClientCache(cf);
Region region =
cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
// Load version 2 objects
for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
PdxInstanceFactory pdxInstanceFactory =
PdxInstanceFactoryImpl.newCreator("PdxVersionedNewPortfolio", false, getCache());
pdxInstanceFactory.writeInt("id", i);
pdxInstanceFactory.writeString("status", (i % 2 == 0 ? "active" : "inactive"));
PdxInstance pdxInstance = pdxInstanceFactory.create();
region.put("key-" + i, pdxInstance);
}
return null;
}
});
final SerializableRunnableIF verifyTwoPdxTypesArePresent = () -> {
TypeRegistration registration = getCache().getPdxRegistry().getTypeRegistration();
assertTrue(registration instanceof PeerTypeRegistration);
final Map<Integer, PdxType> types = registration.types();
assertEquals(2, types.size());
for (PdxType type : types.values()) {
assertEquals("PdxVersionedNewPortfolio", type.getClassName());
}
};
vm0.invoke(verifyTwoPdxTypesArePresent);
vm1.invoke(verifyTwoPdxTypesArePresent);
}
/**
* Test to query a field that is not present in the Pdx object but has a get method
*
*/
@Test
public void testPdxInstanceWithMethodButNoField() throws CacheException {
final Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
final VM vm1 = host.getVM(1);
final VM vm2 = host.getVM(2);
final VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
final String name = "/" + regionName;
final String[] qs = {"select * from " + name + " where status = 'active'",
"select status from " + name + " where id >= 5"};
// Start server1
final int port1 = (Integer) vm0.invoke(new SerializableCallable("Create Server1") {
@Override
public Object call() throws Exception {
Region r1 = getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName);
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
server.setPort(port);
server.start();
return port;
}
});
// Start server2
final int port2 = (Integer) vm1.invoke(new SerializableCallable("Create Server2") {
@Override
public Object call() throws Exception {
Region r1 = getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName);
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
server.setPort(port);
server.start();
return port;
}
});
// Start server3
final int port3 = (Integer) vm2.invoke(new SerializableCallable("Create Server3") {
@Override
public Object call() throws Exception {
Region r1 = getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName);
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
server.setPort(port);
server.start();
return port;
}
});
// create client
vm3.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(NetworkUtils.getServerHostName(vm0.getHost()), port1);
cf.addPoolServer(NetworkUtils.getServerHostName(vm1.getHost()), port2);
cf.addPoolServer(NetworkUtils.getServerHostName(vm2.getHost()), port3);
ClientCache cache = getClientCache(cf);
Region region =
cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
for (int i = 0; i < numberOfEntries; i++) {
region.put("key-" + i, new TestObject(i, "vmware"));
}
return null;
}
});
vm3.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
QueryService remoteQueryService = null;
// Execute query remotely
try {
remoteQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < qs.length; i++) {
try {
SelectResults sr = (SelectResults) remoteQueryService.newQuery(qs[i]).execute();
assertEquals(5, sr.size());
} catch (Exception e) {
Assert.fail("Failed executing " + qs[i], e);
}
}
return null;
}
});
// create index
vm0.invoke(new SerializableCallable("Query") {
@Override
public Object call() throws Exception {
QueryService qs = null;
try {
qs = getCache().getQueryService();
qs.createIndex("status", "status", name);
} catch (Exception e) {
Assert.fail("Exception getting query service ", e);
}
return null;
}
});
// create client
vm3.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
QueryService remoteQueryService = null;
// Execute query remotely
try {
remoteQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < qs.length; i++) {
try {
SelectResults sr = (SelectResults) remoteQueryService.newQuery(qs[i]).execute();
assertEquals(5, sr.size());
} catch (Exception e) {
Assert.fail("Failed executing " + qs[i], e);
}
}
return null;
}
});
Invoke.invokeInEveryVM(DistributedTestCase.class, "disconnectFromDS");
}
/**
* Test to query a field that is not present in the Pdx object but is present in some other
* version of the pdx instance
*
*/
@Test
public void testPdxInstanceFieldInOtherVersion() throws CacheException {
final Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
final VM vm1 = host.getVM(1);
final VM vm2 = host.getVM(2);
final VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
final String name = "/" + regionName;
final String[] qs = {"select pdxStatus from " + name + " where pdxStatus = 'active'",
"select pdxStatus from " + name + " where id > 8 and id < 14"};
// Start server1
final int port1 = (Integer) vm0.invoke(new SerializableCallable("Create Server1") {
@Override
public Object call() throws Exception {
Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
server.setPort(port);
server.start();
return port;
}
});
// Start server2
final int port2 = (Integer) vm1.invoke(new SerializableCallable("Create Server2") {
@Override
public Object call() throws Exception {
Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
server.setPort(port);
server.start();
return port;
}
});
// client1 loads version 1 objects on server1
vm2.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(NetworkUtils.getServerHostName(vm0.getHost()), port1);
ClientCache cache = getClientCache(cf);
Region region =
cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
// Load version 1 objects
for (int i = 0; i < numberOfEntries; i++) {
PdxInstanceFactory pdxInstanceFactory =
PdxInstanceFactoryImpl.newCreator("PdxVersionedNewPortfolio", false, getCache());
pdxInstanceFactory.writeInt("id", i);
pdxInstanceFactory.writeString("pdxStatus", (i % 2 == 0 ? "active" : "inactive"));
PdxInstance pdxInstance = pdxInstanceFactory.create();
region.put("key-" + i, pdxInstance);
}
return null;
}
});
// client 2 loads version 2 objects on server2
vm3.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(NetworkUtils.getServerHostName(vm1.getHost()), port2);
ClientCache cache = getClientCache(cf);
Region region =
cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
// Load version 2 objects
for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
PdxInstanceFactory pdxInstanceFactory =
PdxInstanceFactoryImpl.newCreator("PdxVersionedNewPortfolio", false, getCache());
pdxInstanceFactory.writeInt("id", i);
pdxInstanceFactory.writeString("status", (i % 2 == 0 ? "active" : "inactive"));
PdxInstance pdxInstance = pdxInstanceFactory.create();
region.put("key-" + i, pdxInstance);
}
return null;
}
});
// query remotely from client 1 with version 1 in classpath
vm2.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
QueryService remoteQueryService = null;
// Execute query remotely
try {
remoteQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < qs.length; i++) {
try {
SelectResults sr = (SelectResults) remoteQueryService.newQuery(qs[i]).execute();
assertEquals(5, sr.size());
if (i == 1) {
for (Object o : sr) {
if (o == null) {
} else if (o instanceof String) {
} else {
fail("Result should be either null or String and not " + o.getClass());
}
}
}
} catch (Exception e) {
Assert.fail("Failed executing " + qs[i], e);
}
}
return null;
}
});
// query remotely from client 2 with version 2 in classpath
vm3.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
QueryService remoteQueryService = null;
// Execute query remotely
try {
remoteQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < qs.length; i++) {
try {
SelectResults sr = (SelectResults) remoteQueryService.newQuery(qs[i]).execute();
assertEquals(5, sr.size());
if (i == 1) {
for (Object o : sr) {
if (o == null) {
} else if (o instanceof String) {
} else {
fail("Result should be either null or String and not " + o.getClass());
}
}
}
} catch (Exception e) {
Assert.fail("Failed executing " + qs[i], e);
}
}
return null;
}
});
// query locally on server
vm0.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
cache.setReadSerializedForTest(true);
QueryService queryService = null;
try {
queryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < qs.length; i++) {
try {
SelectResults sr = (SelectResults) queryService.newQuery(qs[i]).execute();
assertEquals(5, sr.size());
if (i == 1) {
for (Object o : sr) {
if (o == null) {
} else if (o instanceof String) {
} else {
fail("Result should be either null or String and not " + o.getClass());
}
}
}
} catch (Exception e) {
Assert.fail("Failed executing " + qs[i], e);
}
}
return null;
}
});
// create index
vm0.invoke(new SerializableCallable("Query") {
@Override
public Object call() throws Exception {
QueryService qs = null;
try {
qs = getCache().getQueryService();
qs.createIndex("status", "status", name);
} catch (Exception e) {
Assert.fail("Exception getting query service ", e);
}
return null;
}
});
// query from client 1 with version 1 in classpath
vm2.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
QueryService remoteQueryService = null;
// Execute query remotely
try {
remoteQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < qs.length; i++) {
try {
SelectResults sr = (SelectResults) remoteQueryService.newQuery(qs[i]).execute();
assertEquals(5, sr.size());
} catch (Exception e) {
Assert.fail("Failed executing " + qs[i], e);
}
}
return null;
}
});
Invoke.invokeInEveryVM(DistributedTestCase.class, "disconnectFromDS");
}
/**
* 2 servers(replicated) and 2 clients. client2 puts version1 and version2 objects on server1
* client1 had registered interest to server2, hence gets the pdx objects for both versions Test
* local query on client1 Test if client1 fetched pdxtypes from server
*
*/
@Test
public void testClientForFieldInOtherVersion() throws CacheException {
final Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
final VM vm1 = host.getVM(1);
final VM vm2 = host.getVM(2);
final VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
final String name = "/" + regionName;
final String[] qs = {"select pdxStatus from " + name + " where pdxStatus = 'active'",
"select status from " + name + " where id > 8 and id < 14"};
// Start server1
final int port1 = (Integer) vm0.invoke(new SerializableCallable("Create Server1") {
@Override
public Object call() throws Exception {
Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
server.setPort(port);
server.start();
return port;
}
});
// Start server2
final int port2 = (Integer) vm1.invoke(new SerializableCallable("Create Server2") {
@Override
public Object call() throws Exception {
Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
server.setPort(port);
server.start();
return port;
}
});
// client 1 registers interest for server2
vm2.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.setPoolSubscriptionEnabled(true);
cf.addPoolServer(NetworkUtils.getServerHostName(vm1.getHost()), port2);
ClientCache cache = getClientCache(cf);
Region region =
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
region.registerInterest("ALL_KEYS");
return null;
}
});
// client2 loads both version objects on server1
vm3.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(NetworkUtils.getServerHostName(vm0.getHost()), port1);
ClientCache cache = getClientCache(cf);
Region region =
cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
// Load version 1 objects
for (int i = 0; i < numberOfEntries; i++) {
PdxInstanceFactory pdxFactory = cache.createPdxInstanceFactory("PdxPortfolio");
pdxFactory.writeString("pdxStatus", (i % 2 == 0 ? "active" : "inactive"));
pdxFactory.writeInt("id", i);
PdxInstance pdxInstance = pdxFactory.create();
region.put("key-" + i, pdxInstance);
} ;
// Load version 2 objects
for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
PdxInstanceFactory pdxFactory = cache.createPdxInstanceFactory("PdxPortfolio");
pdxFactory.writeString("status", i % 2 == 0 ? "active" : "inactive");
pdxFactory.writeInt("id", i);
PdxInstance pdxInstance = pdxFactory.create();
region.put("key-" + i, pdxInstance);
}
return null;
}
});
// query locally on client 1 which has registered interest
vm2.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
cache.setReadSerializedForTest(true);
QueryService localQueryService = null;
// Execute query remotely
try {
localQueryService = ((ClientCache) getCache()).getLocalQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < qs.length; i++) {
try {
SelectResults sr = (SelectResults) localQueryService.newQuery(qs[i]).execute();
assertEquals(5, sr.size());
if (i == 1) {
for (Object o : sr) {
if (o == null) {
} else if (o instanceof String) {
} else {
fail("Result should be either null or String and not " + o.getClass());
}
}
}
} catch (Exception e) {
Assert.fail("Failed executing " + qs[i], e);
}
}
// check if the types registered on server are fetched by the client
TypeRegistration registration = getCache().getPdxRegistry().getTypeRegistration();
assertTrue(registration instanceof ClientTypeRegistration);
Map<Integer, PdxType> m = ((ClientTypeRegistration) registration).types();
assertEquals(2, m.size());
for (PdxType type : m.values()) {
assertEquals("PdxPortfolio", type.getClassName());
}
return null;
}
});
Invoke.invokeInEveryVM("Disconnecting from the Distributed system", () -> disconnectFromDS());
}
/**
* Test to query a field that is not present in the Pdx object Also the implicit method is absent
* in the class
*
*/
@Test
public void testPdxInstanceNoFieldNoMethod() throws CacheException {
final Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
final VM vm3 = host.getVM(3);
final int numberOfEntries = 10;
final String name = "/" + regionName;
final String[] qs = {"select * from " + name + " where pdxStatus = 'active'",
"select pdxStatus from " + name + " where id > 4"};
// Start server1
final int port1 = (Integer) vm0.invoke(new SerializableCallable("Create Server1") {
@Override
public Object call() throws Exception {
Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
server.setPort(port);
server.start();
return port;
}
});
// create client and load only version 1 objects with no pdxStatus field
vm3.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(NetworkUtils.getServerHostName(vm0.getHost()), port1);
ClientCache cache = getClientCache(cf);
Region region =
cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
// Load version 1 objects
for (int i = 0; i < numberOfEntries; i++) {
PdxInstanceFactory pdxInstanceFactory =
PdxInstanceFactoryImpl.newCreator("PdxVersionedNewPortfolio", false, getCache());
pdxInstanceFactory.writeInt("id", i);
pdxInstanceFactory.writeString("status", (i % 2 == 0 ? "active" : "inactive"));
PdxInstance pdxInstance = pdxInstanceFactory.create();
region.put("key-" + i, pdxInstance);
}
return null;
}
});
// Version1 class loader
vm3.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
// Load version 1 classloader
QueryService remoteQueryService = null;
// Execute query remotely
try {
remoteQueryService = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
for (int i = 0; i < qs.length; i++) {
try {
SelectResults sr = (SelectResults) remoteQueryService.newQuery(qs[i]).execute();
if (i == 1) {
assertEquals(5, sr.size());
for (Object o : sr) {
if (!(o instanceof Undefined)) {
fail("Result should be Undefined and not " + o.getClass());
}
}
} else {
assertEquals(0, sr.size());
}
} catch (Exception e) {
Assert.fail("Failed executing " + qs[i], e);
}
}
return null;
}
});
Invoke.invokeInEveryVM(DistributedTestCase.class, "disconnectFromDS");
}
/**
* Test query execution when default values of {@link FieldType} are used. This happens when one
* version of Pdx object does not have a field but other version has.
*
*/
@Test
public void testDefaultValuesInPdxFieldTypes() throws Exception {
final Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
final VM vm1 = host.getVM(1);
final int numberOfEntries = 10;
final String name = "/" + regionName;
final String query =
"select stringField, booleanField, charField, shortField, intField, longField, floatField, doubleField from "
+ name;
// Start server1
final int port1 = (Integer) vm0.invoke(new SerializableCallable("Create Server1") {
@Override
public Object call() throws Exception {
Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
server.setPort(port);
server.start();
return port;
}
});
// client loads version1 and version2 objects on server
vm1.invoke(new SerializableCallable("Create client") {
@Override
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(NetworkUtils.getServerHostName(vm0.getHost()), port1);
ClientCache cache = getClientCache(cf);
Region region =
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
// Load version 1 objects
for (int i = 0; i < numberOfEntries; i++) {
PdxInstanceFactory pdxInstanceFactory =
PdxInstanceFactoryImpl.newCreator("PdxVersionedFieldType", false, getCache());
pdxInstanceFactory.writeString("stringField", "" + i);
pdxInstanceFactory.writeBoolean("booleanField", (i % 2 == 0 ? true : false));
pdxInstanceFactory.writeChar("charField", ((char) i));
pdxInstanceFactory.writeShort("shortField", new Integer(i).shortValue());
PdxInstance pdxInstance = pdxInstanceFactory.create();
logger.info("Putting object: " + pdxInstance);
region.put("key-" + i, pdxInstance);
}
// Load version 2 objects
for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
PdxInstanceFactory pdxInstanceFactory =
PdxInstanceFactoryImpl.newCreator("PdxVersionedFieldType", false, getCache());
pdxInstanceFactory.writeInt("intField", i);
pdxInstanceFactory.writeLong("longField", new Integer(i + 1).longValue());
pdxInstanceFactory.writeFloat("floatField", new Integer(i + 2).floatValue());
pdxInstanceFactory.writeDouble("doubleField", new Integer(i + 3).doubleValue());
PdxInstance pdxInstance = pdxInstanceFactory.create();
logger.info("Putting object: " + pdxInstance);
region.put("key-" + i, pdxInstance);
}
return null;
}
});
// query locally on server, create index, verify results with and without index
vm0.invoke(new SerializableCallable("Create index") {
@Override
public Object call() throws Exception {
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
cache.setReadSerializedForTest(true);
QueryService qs = null;
SelectResults[][] sr = new SelectResults[1][2];
// Execute query locally
try {
qs = getCache().getQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
try {
sr[0][0] = (SelectResults) qs.newQuery(query).execute();
assertEquals(20, sr[0][0].size());
} catch (Exception e) {
Assert.fail("Failed executing " + qs, e);
}
// create index
try {
qs.createIndex("stringIndex", "stringField", name);
qs.createIndex("booleanIndex", "booleanField", name);
qs.createIndex("shortIndex", "shortField", name);
qs.createIndex("charIndex", "charField", name);
qs.createIndex("intIndex", "intField", name);
qs.createIndex("longIndex", "longField", name);
qs.createIndex("floatIndex", "floatField", name);
qs.createIndex("doubleIndex", "doubleField", name);
} catch (Exception e) {
Assert.fail("Exception creating index ", e);
}
// query after index creation
try {
sr[0][1] = (SelectResults) qs.newQuery(query).execute();
assertEquals(20, sr[0][1].size());
} catch (Exception e) {
Assert.fail("Failed executing " + qs, e);
}
CacheUtils.compareResultsOfWithAndWithoutIndex(sr);
return null;
}
});
// Update index
vm1.invoke(new SerializableCallable("update index") {
@Override
public Object call() throws Exception {
Region region = getCache().getRegion(regionName);
// Load version 1 objects
for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
PdxInstanceFactory pdxInstanceFactory =
PdxInstanceFactoryImpl.newCreator("PdxVersionedFieldType", false, getCache());
pdxInstanceFactory.writeString("stringField", "" + i);
pdxInstanceFactory.writeBoolean("booleanField", (i % 2 == 0 ? true : false));
pdxInstanceFactory.writeChar("charField", ((char) i));
pdxInstanceFactory.writeShort("shortField", new Integer(i).shortValue());
PdxInstance pdxInstance = pdxInstanceFactory.create();
logger.info("Putting object: " + pdxInstance);
region.put("key-" + i, pdxInstance);
}
// Load version 2 objects
for (int i = 0; i < numberOfEntries; i++) {
PdxInstanceFactory pdxInstanceFactory =
PdxInstanceFactoryImpl.newCreator("PdxVersionedFieldType", false, getCache());
pdxInstanceFactory.writeInt("intField", i);
pdxInstanceFactory.writeLong("longField", new Integer(i + 1).longValue());
pdxInstanceFactory.writeFloat("floatField", new Integer(i + 2).floatValue());
pdxInstanceFactory.writeDouble("doubleField", new Integer(i + 3).doubleValue());
PdxInstance pdxInstance = pdxInstanceFactory.create();
logger.info("Putting object: " + pdxInstance);
region.put("key-" + i, pdxInstance);
}
return null;
}
});
// query remotely from client
vm1.invoke(new SerializableCallable("query") {
@Override
public Object call() throws Exception {
QueryService remoteQueryService = null;
QueryService localQueryService = null;
SelectResults[][] sr = new SelectResults[1][2];
// Execute query locally
try {
remoteQueryService = getCache().getQueryService();
localQueryService = ((ClientCache) getCache()).getLocalQueryService();
} catch (Exception e) {
Assert.fail("Failed to get QueryService.", e);
}
try {
sr[0][0] = (SelectResults) remoteQueryService.newQuery(query).execute();
assertEquals(20, sr[0][0].size());
sr[0][1] = (SelectResults) localQueryService.newQuery(query).execute();
assertEquals(20, sr[0][1].size());
} catch (Exception e) {
fail("Failed executing query " + e);
}
CacheUtils.compareResultsOfWithAndWithoutIndex(sr);
return null;
}
});
Invoke.invokeInEveryVM(DistributedTestCase.class, "disconnectFromDS");
}
/**
* Tests client-server query on PdxInstance. The client receives projected value.
*/
@Test
public void testJSONWithHeterogenousObjectsDifferingFields() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getCache().createRegionFactory(RegionShortcut.REPLICATE).create("testJson");
String obj1 =
"{ \"FirstName\": \"Vinay\", \"LastName\": \"Upadhya\", \"Address\": [ { \"Line1\": \"NYC\", \"phones\": [ { \"number\": \"212\" }, { \"number\": \"313\" } ] }, { \"Line1\": \"NJ\", \"phones\": [ { \"number\": \"412\" }, { \"number\": \"513\" } ] } ] }";
String obj2 =
"{ \"FirstName\": \"Vinay\", \"LastName\": \"Upadhya\", \"Address\": [ { \"Line1\": \"NYC\", \"phones\": [ { \"number\": \"212\" }, { \"number\": \"313\" } ] }, { \"Line1\": \"NJ\" } ] }";
region.put("value1", JSONFormatter.fromJSON(obj1));
region.put("value2", JSONFormatter.fromJSON(obj2));
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
getCache().createRegionFactory(RegionShortcut.REPLICATE).create("testJson");
// Execute query with different type of Results.
QueryService qs = getCache().getQueryService();
try {
SelectResults results = (SelectResults) qs
.newQuery(
"select r from /testJson r, r.Address a, a.phones pn where pn.number = '412'")
.execute();
assertEquals(results.size(), 1);
} catch (Exception e) {
e.printStackTrace();
throw new CacheException(e) {};
}
}
});
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Tests client-server query on PdxInstance. The client receives projected value.
*/
@Test
public void testJSONWithHeterogenousObjectsDifferingFieldTypes() throws CacheException {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
// Start server1
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getCache().createRegionFactory(RegionShortcut.REPLICATE).create("testJson");
String obj1 =
"{\"FirstName\": \"Vinay\", \"LastName\": \"Upadhya\", \"Address\": [ { \"Line1\": \"NYC\" }, { \"Line1\": \"NJ\" } ] }";
String obj2 =
"{ \"FirstName\": \"Vinay\", \"LastName\": \"Upadhya\", \"Address\": \"my address\" }";
region.put("value1", JSONFormatter.fromJSON(obj2));
region.put("value2", JSONFormatter.fromJSON(obj1));
}
});
// Start server2
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
configAndStartBridgeServer();
getCache().createRegionFactory(RegionShortcut.REPLICATE).create("testJson");
// Execute query with different type of Results.
QueryService qs = getCache().getQueryService();
try {
SelectResults result = (SelectResults) qs
.newQuery("select r from /testJson r, r.Address a where a.Line1 = 'NYC'").execute();
assertEquals(1, result.size());
result = (SelectResults) qs
.newQuery("select r from /testJson r, r.Address a where a = 'my address'").execute();
assertEquals(1, result.size());
} catch (Exception e) {
e.printStackTrace();
throw new CacheException(e) {};
}
}
});
this.closeClient(vm1);
this.closeClient(vm0);
}
/**
* Test Aggregate queries on Pdx instances
*/
@Test
public void testAggregateQueries() {
VM vm0 = VM.getVM(0);
VM vm1 = VM.getVM(1);
VM vm2 = VM.getVM(2);
final int port0 = vm0.invoke(() -> {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
for (int i = 0; i < 10; i++) {
region.put("key-" + i, new TestObject(i, "val-" + i));
}
return getCacheServerPort();
});
final int port1 = vm1.invoke(() -> {
configAndStartBridgeServer();
return getCacheServerPort();
});
final String host0 = NetworkUtils.getServerHostName();
final String poolName = "testClientServerQueryPool";
createPool(vm2, poolName, new String[] {host0}, new int[] {port1}, true);
vm2.invoke(() -> {
QueryService queryService = PoolManager.find(poolName).getQueryService();
Query query = queryService.newQuery("select SUM(price) from " + regName + " where id > 0");
SelectResults<Object> selectResults = (SelectResults) query.execute();
assertEquals(1, selectResults.size());
assertEquals(45, selectResults.asList().get(0));
});
vm2.invoke(() -> {
QueryService queryService = PoolManager.find(poolName).getQueryService();
Query query = queryService.newQuery("select SUM(price) from " + regName + " where id > 9");
SelectResults<Long> selectResults = (SelectResults) query.execute();
assertEquals(0, selectResults.size());
assertEquals(0, selectResults.asList().size());
});
this.closeClient(vm0);
this.closeClient(vm1);
this.closeClient(vm2);
}
}