blob: a33720ea5734ac5879cc31e0abddbf0c6f606313 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.cq.dunit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.query.cq.internal.ServerCQImpl;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.internal.DefaultQueryService;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.cache.query.internal.index.IndexManager;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* This class tests the ContinuousQuery mechanism in GemFire.
*
*/
@Category({ClientSubscriptionTest.class})
public class CqResultSetUsingPoolDUnitTest extends JUnit4CacheTestCase {
protected CqQueryUsingPoolDUnitTest cqDUnitTest = new CqQueryUsingPoolDUnitTest();
private final String selStr = "SELECT * FROM /root/regionA";
/** Supported queries */
public final String[] condition = new String[] {/* 0 */ " p WHERE p.ID > 3",
/* 1 */ " p WHERE p.ID < 3", /* 2 */ " WHERE ID = 3", /* 3 */ " p WHERE p.ID >= 3",
/* 4 */ " p WHERE p.ID <= 3", /* 5 */ " p WHERE p.ID > 3 AND p.status = 'active'",
/* 6 */ " WHERE status = 'active' AND ID < 3", /* 7 */ " p WHERE p.names[0] = 'aaa'",
/* 8 */ " p WHERE p.status LIKE 'active'",
/* 9 */ " p WHERE p.collectionHolderMap.get('1').arr[0] = '0'",
/* 10 */ " p WHERE p.position1.portfolioId > 3",
/* 11 */ " p where p.position3[1].portfolioId = 2",
/* 12 */ " p where NOT(SELECT DISTINCT * FROM positions.values pos "
+ " WHERE pos.secId in SET('YHOO', 'SUN', 'IBM', 'YHOO', 'GOOG', "
+ " 'MSFT', 'AOL', 'APPL', 'ORCL', 'SAP', 'DELL', 'RHAT', 'NOVL', 'HP')).isEmpty",
/* 13 */ " p WHERE p.ID != 3 AND p.ID !=4",
/* 14 */ " p WHERE (p.ID = 2 OR p.ID = 4) AND p.status = 'active'",
};
/** For intial size of 5 */
public final int[] resultSize = new int[] {/* 0 */ 2, /* 1 */ 2, /* 2 */ 1, /* 3 */ 3, /* 4 */ 3,
/* 5 */ 1, /* 6 */ 1, /* 7 */ 5, /* 8 */ 2, /* 9 */ 5, /* 10 */ 2, /* 11 */ 5, /* 12 */ 5,
/* 13 */ 3, /* 14 */ 2,};
/** For intial size of 5 */
public final String[][] expectedKeys = new String[][] {/* 0 */ {"key-4", "key-5"},
/* 1 */ {"key-1", "key-2"}, /* 2 */ {"key-3"}, /* 3 */ {"key-3", "key-4", "key-5"},
/* 4 */ {"key-1", "key-2", "key-3"}, /* 5 */ {"key-4"}, /* 6 */ {"key-2"},
/* 7 */ {"key-1", "key-2", "key-3", "key-4", "key-5"}, /* 8 */ {"key-2", "key-4"},
/* 9 */ {"key-1", "key-2", "key-3", "key-4", "key-5"}, /* 10 */ {"key-4", "key-5"},
/* 11 */ {"key-1", "key-2", "key-3", "key-4", "key-5"},
/* 12 */ {"key-1", "key-2", "key-3", "key-4", "key-5"}, /* 13 */ {"key-1", "key-2", "key-5"},
/* 14 */ {"key-2", "key-4"},};
/** Unsupported queries */
public final String[] condition2 =
new String[] {/* 0 */ " p1, /root/regionB p2 WHERE p1.status = p2.status",
/* 1 */ " p, p.positions.values p1 WHERE p1.secId = 'IBM'",
/* 2 */ " p, p.positions.values AS pos WHERE pos.secId != '1'",
/* 3 */ " p WHERE p.ID in (SELECT p1.ID FROM /root/regionA p1 WHERE p1.ID > 3)",
/* 4 */ ".entries entry WHERE entry.key = '1'",
/* 5 */ ".entries entry WHERE entry.value.ID > '3'",
/* 6 */ ".values p WHERE p.ID > '3' and p.status = 'active'",
/* 7 */ " p, p.position3 pos where pos.portfolioId = 1",};
public CqResultSetUsingPoolDUnitTest() {
super();
}
@Override
public final void postSetUp() throws Exception {
// avoid IllegalStateException from HandShake by connecting all vms tor
// system before creating ConnectionPools
getSystem();
Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
@Override
public void run() {
getSystem();
}
});
postSetUpCqResultSetUsingPoolDUnitTest();
}
protected void postSetUpCqResultSetUsingPoolDUnitTest() throws Exception {}
/**
* Tests CQ Result Set.
*
*/
@Test
public void testCqResults() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
cqDUnitTest.createServer(server);
final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testCqResults";
cqDUnitTest.createPool(client, poolName, host0, port);
// Create client.
cqDUnitTest.createClient(client, port, host0);
// Put 5 entries into the region.
cqDUnitTest.createValues(server, "regionA", 5);
// Test for supported queries.
String cqQuery = "";
for (int queryCnt = 0; queryCnt < condition.length; queryCnt++) {
cqQuery = selStr + condition[queryCnt];
cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, cqQuery);
cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, resultSize[queryCnt],
expectedKeys[queryCnt], null);
}
// Test unsupported queries.
for (int queryCnt = 0; queryCnt < condition2.length; queryCnt++) {
cqQuery = selStr + condition2[queryCnt];
try {
cqDUnitTest.createCQ(client, poolName, "testCqResultsF_" + queryCnt, cqQuery);
// cqDUnitTest.executeCQ(client, "testCqResultsF_" + queryCnt, true, cqDUnitTest.noTest,
// null);
fail("UnSupported CQ Query, Expected to fail. " + " CQ :" + "testCqResultsF_" + queryCnt
+ " Query : " + cqQuery);
} catch (Exception ex) {
// Expected.
}
}
// Close.
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server);
}
/**
* Tests CQ Result Set with Compact Range Index.
*
*/
@Test
public void testCqResultsWithCompactRangeIndex() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
cqDUnitTest.createServer(server);
final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testCqResults";
cqDUnitTest.createPool(client, poolName, host0, port);
// Create client.
cqDUnitTest.createClient(client, port, host0);
// Create index.
cqDUnitTest.createFunctionalIndex(server, "IdIndex", "p.ID", "/root/regionA p");
cqDUnitTest.createFunctionalIndex(server, "statusIndex", "p.status", "/root/regionA p");
cqDUnitTest.createFunctionalIndex(server, "portfolioIdIndex", "p.position1.portfolioId",
"/root/regionA p");
// Put 5 entries into the region.
cqDUnitTest.createValues(server, "regionA", 5);
// Test for supported queries.
String cqQuery = "";
for (int queryCnt = 0; queryCnt < condition.length; queryCnt++) {
cqQuery = selStr + condition[queryCnt];
cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, cqQuery);
cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, resultSize[queryCnt],
expectedKeys[queryCnt], null);
}
// Close.
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server);
}
/**
* Tests CQ Result Set with Range Index.
*
*/
@Test
public void testCqResultsWithRangeIndex() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
cqDUnitTest.createServer(server);
final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testCqResults";
cqDUnitTest.createPool(client, poolName, host0, port);
// Create client.
cqDUnitTest.createClient(client, port, host0);
// Create index.
server.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") {
@Override
public void run2() throws CacheException {
IndexManager.TEST_RANGEINDEX_ONLY = true;
}
});
cqDUnitTest.createFunctionalIndex(server, "IdIndex", "p.ID", "/root/regionA p");
cqDUnitTest.createFunctionalIndex(server, "statusIndex", "p.status", "/root/regionA p");
cqDUnitTest.createFunctionalIndex(server, "portfolioIdIndex", "p.position1.portfolioId",
"/root/regionA p");
// Put 5 entries into the region.
cqDUnitTest.createValues(server, "regionA", 5);
// Test for supported queries.
String cqQuery = "";
for (int queryCnt = 0; queryCnt < condition.length; queryCnt++) {
cqQuery = selStr + condition[queryCnt];
cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, cqQuery);
cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, resultSize[queryCnt],
expectedKeys[queryCnt], null);
}
// Unset the flag.
server.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") {
@Override
public void run2() throws CacheException {
IndexManager.TEST_RANGEINDEX_ONLY = false;
}
});
// Close.
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server);
}
/**
* Tests CQ Result Set.
*
*/
@Test
public void testCqResultsOnPR() throws Exception {
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
cqDUnitTest.createServerWithPR(server1, 0, false, 0);
cqDUnitTest.createServerWithPR(server2, 0, false, 0);
final int port = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
String poolName = "testCqResults";
cqDUnitTest.createPool(client, poolName, host0, port);
// Create client.
cqDUnitTest.createClient(client, port, host0);
// Put 5 entries into the region.
cqDUnitTest.createValues(server1, "regionA", 5);
// Test for supported queries.
String cqQuery = "";
for (int queryCnt = 0; queryCnt < condition.length; queryCnt++) {
cqQuery = selStr + condition[queryCnt];
cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, cqQuery);
cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, resultSize[queryCnt],
expectedKeys[queryCnt], null);
}
// Test unsupported queries.
for (int queryCnt = 0; queryCnt < condition2.length; queryCnt++) {
cqQuery = selStr + condition2[queryCnt];
try {
cqDUnitTest.createCQ(client, poolName, "testCqResultsF_" + queryCnt, cqQuery);
// cqDUnitTest.executeCQ(client, "testCqResultsF_" + queryCnt, true, cqDUnitTest.noTest,
// null);
fail("UnSupported CQ Query, Expected to fail. " + " CQ :" + "testCqResultsF_" + queryCnt
+ " Query : " + cqQuery);
} catch (Exception ex) {
// Expected.
}
}
// Close.
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server1);
cqDUnitTest.closeServer(server2);
}
/**
* Tests CQ Result Set with Compact Range Index.
*
*/
@Test
public void testCqResultsWithCompactRangeIndexOnPR() throws Exception {
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
cqDUnitTest.createServerWithPR(server1, 0, false, 0);
cqDUnitTest.createServerWithPR(server2, 0, false, 0);
final int port = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
String poolName = "testCqResults";
cqDUnitTest.createPool(client, poolName, host0, port);
// Create client.
cqDUnitTest.createClient(client, port, host0);
// Create index.
cqDUnitTest.createFunctionalIndex(server1, "IdIndex", "p.ID", "/root/regionA p");
cqDUnitTest.createFunctionalIndex(server1, "statusIndex", "p.status", "/root/regionA p");
cqDUnitTest.createFunctionalIndex(server1, "portfolioIdIndex", "p.position1.portfolioId",
"/root/regionA p");
// Put 5 entries into the region.
cqDUnitTest.createValues(server1, "regionA", 5);
// Test for supported queries.
String cqQuery = "";
for (int queryCnt = 0; queryCnt < condition.length; queryCnt++) {
cqQuery = selStr + condition[queryCnt];
cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, cqQuery);
cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, resultSize[queryCnt],
expectedKeys[queryCnt], null);
}
// Close.
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server1);
cqDUnitTest.closeServer(server2);
}
/**
* Tests CQ Result Set with Range Index.
*
*/
@Test
public void testCqResultsWithRangeIndexOnPR() throws Exception {
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
cqDUnitTest.createServerWithPR(server1, 0, false, 0);
cqDUnitTest.createServerWithPR(server2, 0, false, 0);
final int port = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
String poolName = "testCqResults";
cqDUnitTest.createPool(client, poolName, host0, port);
// Create client.
cqDUnitTest.createClient(client, port, host0);
// Create index.
server1.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") {
@Override
public void run2() throws CacheException {
IndexManager.TEST_RANGEINDEX_ONLY = true;
}
});
server2.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") {
@Override
public void run2() throws CacheException {
IndexManager.TEST_RANGEINDEX_ONLY = true;
}
});
cqDUnitTest.createFunctionalIndex(server1, "IdIndex", "p.ID", "/root/regionA p");
cqDUnitTest.createFunctionalIndex(server1, "statusIndex", "p.status", "/root/regionA p");
cqDUnitTest.createFunctionalIndex(server1, "portfolioIdIndex", "p.position1.portfolioId",
"/root/regionA p");
// Put 5 entries into the region.
cqDUnitTest.createValues(server1, "regionA", 5);
// Test for supported queries.
String cqQuery = "";
for (int queryCnt = 0; queryCnt < condition.length; queryCnt++) {
cqQuery = selStr + condition[queryCnt];
cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, cqQuery);
cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, resultSize[queryCnt],
expectedKeys[queryCnt], null);
}
// Create index.
server1.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") {
@Override
public void run2() throws CacheException {
IndexManager.TEST_RANGEINDEX_ONLY = false;
}
});
server2.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") {
@Override
public void run2() throws CacheException {
IndexManager.TEST_RANGEINDEX_ONLY = false;
}
});
// Close.
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server1);
cqDUnitTest.closeServer(server2);
}
/**
* Tests CQ Result Set.
*
*/
@Test
public void testCqResultsCaching() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
cqDUnitTest.createServer(server);
final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testCqResults";
final String cqName = "testCqResultsP_0";
cqDUnitTest.createPool(client, poolName, host0, port);
// Create client.
cqDUnitTest.createClient(client, port, host0);
// create CQ.
cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]);
final int numObjects = 300;
final int totalObjects = 500;
// initialize Region.
server.invoke(new CacheSerializableRunnable("Update Region") {
@Override
public void run2() throws CacheException {
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
for (int i = 1; i <= numObjects; i++) {
Portfolio p = new Portfolio(i);
region.put("" + i, p);
}
}
});
// Keep updating region (async invocation).
server.invokeAsync(new CacheSerializableRunnable("Update Region") {
@Override
public void run2() throws CacheException {
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
// Update (totalObjects - 1) entries.
for (int i = 1; i < totalObjects; i++) {
// Destroy entries.
if (i > 25 && i < 201) {
region.destroy("" + i);
continue;
}
Portfolio p = new Portfolio(i);
region.put("" + i, p);
}
// recreate destroyed entries.
for (int j = 26; j < 201; j++) {
Portfolio p = new Portfolio(j);
region.put("" + j, p);
}
// Add the last key.
Portfolio p = new Portfolio(totalObjects);
region.put("" + totalObjects, p);
}
});
// Execute CQ.
// While region operation is in progress execute CQ.
cqDUnitTest.executeCQ(client, cqName, true, null);
// Verify CQ Cache results.
server.invoke(new CacheSerializableRunnable("Verify CQ Cache results") {
@Override
public void run2() throws CacheException {
CqService cqService = null;
try {
cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
Assert.fail("Failed to get the internal CqService.", ex);
}
// Wait till all the region update is performed.
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
while (true) {
if (region.get("" + totalObjects) == null) {
try {
Thread.sleep(50);
} catch (Exception ex) {
// ignore.
}
continue;
}
break;
}
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cqQuery = (ServerCQImpl) cq;
if (cqQuery.getName().equals(cqName)) {
int size = cqQuery.getCqResultKeysSize();
if (size != totalObjects) {
LogWriterUtils.getLogWriter().info("The number of Cached events " + size
+ " is not equal to the expected size " + totalObjects);
HashSet expectedKeys = new HashSet();
for (int i = 1; i < totalObjects; i++) {
expectedKeys.add("" + i);
}
Set cachedKeys = cqQuery.getCqResultKeyCache();
expectedKeys.removeAll(cachedKeys);
LogWriterUtils.getLogWriter().info("Missing keys from the Cache : " + expectedKeys);
}
assertEquals("The number of keys cached for cq " + cqName + " is wrong.", totalObjects,
cqQuery.getCqResultKeysSize());
}
}
}
});
// Close.
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server);
}
/**
* Tests CQ Result Set.
*
*/
@Test
public void testCqResultsCachingForMultipleCQs() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client1 = host.getVM(1);
VM client2 = host.getVM(2);
cqDUnitTest.createServer(server);
final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testCqResults";
final String cqName1 = "testCqResultsP_0";
final String cqName2 = "testCqResultsP_1";
cqDUnitTest.createPool(client1, poolName, host0, port);
cqDUnitTest.createPool(client2, poolName, host0, port);
// Create client.
cqDUnitTest.createClient(client1, port, host0);
cqDUnitTest.createClient(client2, port, host0);
// create CQ.
cqDUnitTest.createCQ(client1, poolName, cqName1, cqDUnitTest.cqs[0]);
cqDUnitTest.createCQ(client2, poolName, cqName2, cqDUnitTest.cqs[0]);
final int numObjects = 300;
final int totalObjects = 500;
// initialize Region.
server.invoke(new CacheSerializableRunnable("Update Region") {
@Override
public void run2() throws CacheException {
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
for (int i = 1; i <= numObjects; i++) {
Portfolio p = new Portfolio(i);
region.put("" + i, p);
}
}
});
// Keep updating region (async invocation).
server.invokeAsync(new CacheSerializableRunnable("Update Region") {
@Override
public void run2() throws CacheException {
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
// Update (totalObjects - 1) entries.
for (int i = 1; i < totalObjects; i++) {
// Destroy entries.
if (i > 25 && i < 201) {
region.destroy("" + i);
continue;
}
Portfolio p = new Portfolio(i);
region.put("" + i, p);
}
// recreate destroyed entries.
for (int j = 26; j < 201; j++) {
Portfolio p = new Portfolio(j);
region.put("" + j, p);
}
// Add the last key.
Portfolio p = new Portfolio(totalObjects);
region.put("" + totalObjects, p);
}
});
// Execute CQ.
// While region operation is in progress execute CQ.
cqDUnitTest.executeCQ(client1, cqName1, true, null);
cqDUnitTest.executeCQ(client2, cqName2, true, null);
// Verify CQ Cache results.
server.invoke(new CacheSerializableRunnable("Verify CQ Cache results") {
@Override
public void run2() throws CacheException {
CqService cqService = null;
try {
cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
Assert.fail("Failed to get the internal CqService.", ex);
}
// Wait till all the region update is performed.
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
while (true) {
if (region.get("" + totalObjects) == null) {
try {
Thread.sleep(50);
} catch (Exception ex) {
// ignore.
}
continue;
}
break;
}
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cqQuery = (ServerCQImpl) cq;
int size = cqQuery.getCqResultKeysSize();
if (size != totalObjects) {
LogWriterUtils.getLogWriter().info("The number of Cached events " + size
+ " is not equal to the expected size " + totalObjects);
HashSet expectedKeys = new HashSet();
for (int i = 1; i < totalObjects; i++) {
expectedKeys.add("" + i);
}
Set cachedKeys = cqQuery.getCqResultKeyCache();
expectedKeys.removeAll(cachedKeys);
LogWriterUtils.getLogWriter().info("Missing keys from the Cache : " + expectedKeys);
}
assertEquals("The number of keys cached for cq " + cqQuery.getName() + " is wrong.",
totalObjects, cqQuery.getCqResultKeysSize());
}
}
});
// Close.
cqDUnitTest.closeClient(client1);
cqDUnitTest.closeClient(client2);
cqDUnitTest.closeServer(server);
}
/**
* Tests CQ Result Set.
*
*/
@Test
public void testCqResultsCachingForPR() throws Exception {
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
cqDUnitTest.createServerWithPR(server1, 0, false, 0);
cqDUnitTest.createServerWithPR(server2, 0, false, 0);
final int port = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
String poolName = "testCqResults";
final String cqName = "testCqResultsP_0";
cqDUnitTest.createPool(client, poolName, host0, port);
// Create client.
cqDUnitTest.createClient(client, port, host0);
// create CQ.
cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]);
final int numObjects = 300;
final int totalObjects = 500;
// initialize Region.
server1.invoke(new CacheSerializableRunnable("Update Region") {
@Override
public void run2() throws CacheException {
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
for (int i = 1; i <= numObjects; i++) {
Portfolio p = new Portfolio(i);
region.put("" + i, p);
}
}
});
cqDUnitTest.executeCQ(client, cqName, true, null);
// Keep updating region (async invocation).
server2.invoke(new CacheSerializableRunnable("Update Region") {
@Override
public void run2() throws CacheException {
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
// Update (totalObjects - 1) entries.
for (int i = 1; i < totalObjects; i++) {
// Destroy entries.
if (i > 25 && i < 201) {
region.destroy("" + i);
continue;
}
Portfolio p = new Portfolio(i);
region.put("" + i, p);
}
// recreate destroyed entries.
for (int j = 26; j < 201; j++) {
Portfolio p = new Portfolio(j);
region.put("" + j, p);
}
// Add the last key.
Portfolio p = new Portfolio(totalObjects);
region.put("" + totalObjects, p);
}
});
// Verify CQ Cache results.
server1.invoke(new CacheSerializableRunnable("Verify CQ Cache results") {
@Override
public void run2() throws CacheException {
CqService cqService = null;
try {
cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
Assert.fail("Failed to get the internal CqService.", ex);
}
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cqQuery = (ServerCQImpl) cq;
if (cqQuery.getCqResultKeysSize() <= 0) {
fail("The Result Cache for CQ on PR is not working. CQ :" + cqName);
}
}
}
});
// Close.
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server1);
cqDUnitTest.closeServer(server2);
}
/**
* Tests CQ Result Set caching for destroy events.
*
*/
@Test
public void testCqResultsCachingForDestroyEventsOnPR() throws Exception {
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
cqDUnitTest.createServerWithPR(server1, 0, false, 0);
cqDUnitTest.createServerWithPR(server2, 0, false, 0);
final int port = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
String poolName = "testCqResults";
final String cqName = "testCqResultsCachingForDestroyEventsOnPR_0";
cqDUnitTest.createPool(client, poolName, host0, port);
// Create client.
cqDUnitTest.createClient(client, port, host0);
// create CQ.
cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]);
// Execute CQ.
cqDUnitTest.executeCQ(client, cqName, true, null);
final int numObjects = 50;
// initialize Region.
server1.invoke(new CacheSerializableRunnable("Update Region") {
@Override
public void run2() throws CacheException {
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
for (int i = 1; i <= numObjects; i++) {
Portfolio p = new Portfolio(i);
region.put("" + i, p);
}
}
});
// Update from server2.
server2.invoke(new CacheSerializableRunnable("Update Region") {
@Override
public void run2() throws CacheException {
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
for (int i = 1; i <= numObjects; i++) {
Portfolio p = new Portfolio(i);
region.put("" + i, p);
}
}
});
// Destroy entries.
server2.invoke(new CacheSerializableRunnable("Update Region") {
@Override
public void run2() throws CacheException {
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
for (int i = 1; i <= numObjects; i++) {
Portfolio p = new Portfolio(i);
region.destroy("" + i);
}
}
});
// Wait for events to be sent.
cqDUnitTest.waitForDestroyed(client, cqName, "" + numObjects);
// Verify CQ Cache results.
server1.invoke(new CacheSerializableRunnable("Verify CQ Cache results") {
@Override
public void run2() throws CacheException {
CqService cqService = null;
try {
cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
Assert.fail("Failed to get the internal CqService.", ex);
}
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cqQuery = (ServerCQImpl) cq;
if (cqQuery.getCqResultKeysSize() > 0) {
fail("The CQ Result Cache on PR should have been empty for CQ :" + cqName + " keys="
+ cqQuery.getCqResultKeyCache());
}
}
}
});
server2.invoke(new CacheSerializableRunnable("Verify CQ Cache results") {
@Override
public void run2() throws CacheException {
CqService cqService = null;
try {
cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
Assert.fail("Failed to get the internal CqService.", ex);
}
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cqQuery = (ServerCQImpl) cq;
if (cqQuery.getCqResultKeysSize() > 0) {
fail("The CQ Result Cache on PR should have been empty for CQ :" + cqName);
}
}
}
});
// Close.
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server1);
cqDUnitTest.closeServer(server2);
}
/**
* Tests CQ Result Caching with CQ Failover.
*
*/
@Test
public void testCqResultsCachingWithFailOver() throws Exception {
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
cqDUnitTest.createServer(server1);
final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
String poolName = "testCQFailOver";
final String cqName = "testCQFailOver_0";
cqDUnitTest.createPool(client, poolName, new String[] {host0, host0},
new int[] {port1, ports[0]});
// create CQ.
cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]);
final int numObjects = 300;
final int totalObjects = 500;
// initialize Region.
server1.invoke(new CacheSerializableRunnable("Update Region") {
@Override
public void run2() throws CacheException {
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
for (int i = 1; i <= numObjects; i++) {
Portfolio p = new Portfolio(i);
region.put("" + i, p);
}
}
});
// Keep updating region (async invocation).
server1.invokeAsync(new CacheSerializableRunnable("Update Region") {
@Override
public void run2() throws CacheException {
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
// Update (totalObjects - 1) entries.
for (int i = 1; i < totalObjects; i++) {
// Destroy entries.
if (i > 25 && i < 201) {
region.destroy("" + i);
continue;
}
Portfolio p = new Portfolio(i);
region.put("" + i, p);
}
// recreate destroyed entries.
for (int j = 26; j < 201; j++) {
Portfolio p = new Portfolio(j);
region.put("" + j, p);
}
// Add the last key.
Portfolio p = new Portfolio(totalObjects);
region.put("" + totalObjects, p);
}
});
// Execute CQ.
// While region operation is in progress execute CQ.
cqDUnitTest.executeCQ(client, cqName, true, null);
// Verify CQ Cache results.
server1.invoke(new CacheSerializableRunnable("Verify CQ Cache results") {
@Override
public void run2() throws CacheException {
CqService cqService = null;
try {
cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
Assert.fail("Failed to get the internal CqService.", ex);
}
// Wait till all the region update is performed.
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
while (true) {
if (region.get("" + totalObjects) == null) {
try {
Thread.sleep(50);
} catch (Exception ex) {
// ignore.
}
continue;
}
break;
}
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cqQuery = (ServerCQImpl) cq;
if (cqQuery.getName().equals(cqName)) {
int size = cqQuery.getCqResultKeysSize();
if (size != totalObjects) {
LogWriterUtils.getLogWriter().info("The number of Cached events " + size
+ " is not equal to the expected size " + totalObjects);
HashSet expectedKeys = new HashSet();
for (int i = 1; i < totalObjects; i++) {
expectedKeys.add("" + i);
}
Set cachedKeys = cqQuery.getCqResultKeyCache();
expectedKeys.removeAll(cachedKeys);
LogWriterUtils.getLogWriter().info("Missing keys from the Cache : " + expectedKeys);
}
assertEquals("The number of keys cached for cq " + cqName + " is wrong.", totalObjects,
cqQuery.getCqResultKeysSize());
}
}
}
});
cqDUnitTest.createServer(server2, ports[0]);
final int thePort2 = server2.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
System.out
.println("### Port on which server1 running : " + port1 + " Server2 running : " + thePort2);
Wait.pause(3 * 1000);
// Close server1 for CQ fail over to server2.
cqDUnitTest.closeServer(server1);
Wait.pause(3 * 1000);
// Verify CQ Cache results.
server2.invoke(new CacheSerializableRunnable("Verify CQ Cache results") {
@Override
public void run2() throws CacheException {
CqService cqService = null;
try {
cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
Assert.fail("Failed to get the internal CqService.", ex);
}
// Wait till all the region update is performed.
Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
while (true) {
if (region.get("" + totalObjects) == null) {
try {
Thread.sleep(50);
} catch (Exception ex) {
// ignore.
}
continue;
}
break;
}
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cqQuery = (ServerCQImpl) cq;
if (cqQuery.getName().equals(cqName)) {
int size = cqQuery.getCqResultKeysSize();
if (size != totalObjects) {
LogWriterUtils.getLogWriter().info("The number of Cached events " + size
+ " is not equal to the expected size " + totalObjects);
HashSet expectedKeys = new HashSet();
for (int i = 1; i < totalObjects; i++) {
expectedKeys.add("" + i);
}
Set cachedKeys = cqQuery.getCqResultKeyCache();
expectedKeys.removeAll(cachedKeys);
LogWriterUtils.getLogWriter().info("Missing keys from the Cache : " + expectedKeys);
}
assertEquals("The number of keys cached for cq " + cqName + " is wrong.", totalObjects,
cqQuery.getCqResultKeysSize());
}
}
}
});
// Close.
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server2);
}
}