blob: 25166016439576bf5fc7eeb043c688db47c408bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.cq.dunit;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.test.dunit.Assert.assertEquals;
import static org.apache.geode.test.dunit.Assert.assertFalse;
import static org.apache.geode.test.dunit.Assert.assertNotNull;
import static org.apache.geode.test.dunit.Assert.assertTrue;
import static org.apache.geode.test.dunit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.MirrorType;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqAttributesMutator;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.IndexType;
import org.apache.geode.cache.query.Query;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.internal.CqStateImpl;
import org.apache.geode.cache.query.internal.DefaultQueryService;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.cache30.CertifiableTestCacheListener;
import org.apache.geode.cache30.ClientServerTestCase;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.InitialImageOperation;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.RMIException;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* This class tests the ContinuousQuery mechanism in GemFire. It does so by creating a cache server
* with a cache and a pre-defined region and a data loader. The client creates the same region and
* attaches the connection pool.
*/
@Category({ClientSubscriptionTest.class})
public class CqQueryUsingPoolDUnitTest extends JUnit4CacheTestCase {
/** The port on which the cache server was started in this VM */
private static int bridgeServerPort;
protected static int port = 0;
protected static int port2 = 0;
public static int noTest = -1;
public final String[] regions = new String[] {"regionA", "regionB"};
private static final int CREATE = 0;
private static final int UPDATE = 1;
private static final int DESTROY = 2;
private static final int INVALIDATE = 3;
private static final int CLOSE = 4;
private static final int REGION_CLEAR = 5;
private static final int REGION_INVALIDATE = 6;
public static final String KEY = "key-";
private static final String WAIT_PROPERTY = "CqQueryTest.maxWaitTime";
private static final int WAIT_DEFAULT = (20 * 1000);
public static final long MAX_TIME = Integer.getInteger(WAIT_PROPERTY, WAIT_DEFAULT).intValue();
public final String[] cqs = new String[] {
// 0 - Test for ">"
"SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0",
// 1 - Test for "=" and "and".
"SELECT ALL * FROM /root/" + regions[0] + " p where p.ID = 2 and p.status='active'",
// 2 - Test for "<" and "and".
"SELECT ALL * FROM /root/" + regions[1] + " p where p.ID < 5 and p.status='active'",
// FOLLOWING CQS ARE NOT TESTED WITH VALUES; THEY ARE USED TO TEST PARSING LOGIC WITHIN CQ.
// 3
"SELECT * FROM /root/" + regions[0] + " ;",
// 4
"SELECT ALL * FROM /root/" + regions[0],
// 5
"import org.apache.geode.cache.\"query\".data.Portfolio; " + "SELECT ALL * FROM /root/"
+ regions[0] + " TYPE Portfolio",
// 6
"import org.apache.geode.cache.\"query\".data.Portfolio; " + "SELECT ALL * FROM /root/"
+ regions[0] + " p TYPE Portfolio",
// 7
"SELECT ALL * FROM /root/" + regions[1] + " p where p.ID < 5 and p.status='active';",
// 8
"SELECT ALL * FROM /root/" + regions[0] + " ;",
// 9
"SELECT ALL * FROM /root/" + regions[0] + " p where p.description = NULL",
// 10
"SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0 and p.status='active'",
// 11 - Test for "No Alias"
"SELECT ALL * FROM /root/" + regions[0] + " where ID > 0",};
private String[] invalidCQs = new String[] {
// Test for ">"
"SELECT ALL * FROM /root/invalidRegion p where p.ID > 0"};
@Override
public final void postSetUp() throws Exception {
// We're seeing this on the server when the client
// disconnects.
IgnoredException.addIgnoredException("Connection reset");
IgnoredException.addIgnoredException("SocketTimeoutException");
IgnoredException.addIgnoredException("ServerConnectivityException");
IgnoredException.addIgnoredException("Socket Closed");
IgnoredException.addIgnoredException("SocketException");
// avoid IllegalStateException from HandShake by connecting all vms tor
// system before creating connection pools
getSystem();
Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
@Override
public void run() {
getSystem();
}
});
postSetUpCqQueryUsingPoolDUnitTest();
}
protected void postSetUpCqQueryUsingPoolDUnitTest() throws Exception {}
/* Returns Cache Server Port */
static int getCacheServerPort() {
return bridgeServerPort;
}
/* Create Cache Server */
public void createServer(VM server) {
createServer(server, 0);
}
public void createServer(VM server, final int p) {
createServer(server, p, false);
}
public void createServer(VM server, final int thePort, final boolean eviction) {
MirrorType mirrorType = MirrorType.KEYS_VALUES;
createServer(server, thePort, eviction, mirrorType);
}
public void createServer(VM server, final int thePort, final boolean eviction,
final MirrorType mirrorType) {
SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setMirrorType(mirrorType);
// setting the eviction attributes.
if (eviction) {
EvictionAttributes evictAttrs =
EvictionAttributes.createLRUEntryAttributes(100000, EvictionAction.OVERFLOW_TO_DISK);
factory.setEvictionAttributes(evictAttrs);
}
for (int i = 0; i < regions.length; i++) {
createRegion(regions[i], factory.createRegionAttributes());
}
try {
startBridgeServer(thePort, true);
}
catch (Exception ex) {
Assert.fail("While starting CacheServer", ex);
}
}
};
server.invoke(createServer);
}
/**
* Create a cache server with partitioned region.
*
* @param server VM where to create the cache server.
* @param port cache server port.
* @param isAccessor if true the under lying partitioned region will not host data on this vm.
* @param redundantCopies number of redundant copies for the primary bucket.
*/
public void createServerWithPR(VM server, final int port, final boolean isAccessor,
final int redundantCopies) {
SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
if (isAccessor) {
paf.setLocalMaxMemory(0);
}
PartitionAttributes prAttr =
paf.setTotalNumBuckets(197).setRedundantCopies(redundantCopies).create();
attr.setPartitionAttributes(prAttr);
assertFalse(getSystem().isLoner());
for (int i = 0; i < regions.length; i++) {
Region r = createRegion(regions[i], attr.create());
LogWriterUtils.getLogWriter().info("Server created the region: " + r);
}
try {
startBridgeServer(port, true);
} catch (Exception ex) {
Assert.fail("While starting CacheServer", ex);
}
}
};
server.invoke(createServer);
}
/* Close Cache Server */
public void closeServer(VM server) {
server.invoke(new SerializableRunnable("Close CacheServer") {
@Override
public void run() {
LogWriterUtils.getLogWriter().info("### Close CacheServer. ###");
stopBridgeServer(getCache());
}
});
}
/* Create Client */
public void createClient(VM client, final int serverPort, final String serverHost) {
int[] serverPorts = new int[] {serverPort};
createClient(client, serverPorts, serverHost, null, null);
}
/* Create Client */
public void createClient(VM client, final int[] serverPorts, final String serverHost,
final String redundancyLevel, final String poolName) {
SerializableRunnable createQService = new CacheSerializableRunnable("Create Client") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Create Client. ###");
// Region region1 = null;
// Initialize CQ Service.
try {
getCache().getQueryService();
IgnoredException.addIgnoredException("java.net.ConnectException");
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
AttributesFactory regionFactory = new AttributesFactory();
regionFactory.setScope(Scope.LOCAL);
if (poolName != null) {
regionFactory.setPoolName(poolName);
} else {
if (redundancyLevel != null) {
ClientServerTestCase.configureConnectionPool(regionFactory, serverHost, serverPorts,
true, Integer.parseInt(redundancyLevel), -1, null);
} else {
ClientServerTestCase.configureConnectionPool(regionFactory, serverHost, serverPorts,
true, -1, -1, null);
}
}
for (int i = 0; i < regions.length; i++) {
createRegion(regions[i], regionFactory.createRegionAttributes());
LogWriterUtils.getLogWriter()
.info("### Successfully Created Region on Client :" + regions[i]);
// region1.getAttributesMutator().setCacheListener(new CqListener());
}
}
};
client.invoke(createQService);
}
/* Close Client */
public void closeClient(VM client) {
SerializableRunnable closeCQService = new CacheSerializableRunnable("Close Client") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Close Client. ###");
try {
((DefaultQueryService) getCache().getQueryService()).closeCqService();
} catch (Exception ex) {
LogWriterUtils.getLogWriter()
.info("### Failed to get CqService during ClientClose() ###");
// TODO: fix eaten exception
}
}
};
client.invoke(closeCQService);
Wait.pause(1000);
}
public void createFunctionalIndex(VM vm, final String indexName, final String indexedExpression,
final String fromClause) {
vm.invoke(new CacheSerializableRunnable("Create Functional Index") {
@Override
public void run2() throws CacheException {
QueryService qs = null;
try {
qs = getCache().getQueryService();
} catch (Exception ex) {
LogWriterUtils.getLogWriter()
.info("### Failed to get CqService during ClientClose() ###");
// TODO: fix eaten exception
}
try {
qs.createIndex(indexName, IndexType.FUNCTIONAL, indexedExpression, fromClause);
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("### Failed to create Index :" + indexName);
// TODO: fix eaten exception
}
}
});
}
/* Create/Init values */
public void createValues(VM vm, final String regionName, final int size) {
vm.invoke(new CacheSerializableRunnable("Create values") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regionName);
for (int i = 1; i <= size; i++) {
region1.put(KEY + i, new Portfolio(i, i));
}
LogWriterUtils.getLogWriter()
.info("### Number of Entries in Region :" + region1.keySet().size());
}
});
}
/* Create/Init values */
public void createValuesWithTime(VM vm, final String regionName, final int size) {
vm.invoke(new CacheSerializableRunnable("Create values") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regionName);
for (int i = 1; i <= size; i++) {
Portfolio portfolio = new Portfolio(i);
portfolio.createTime = System.currentTimeMillis();
region1.put(KEY + i, portfolio);
}
LogWriterUtils.getLogWriter()
.info("### Number of Entries in Region :" + region1.keySet().size());
}
});
}
/* delete values */
public void deleteValues(VM vm, final String regionName, final int size) {
vm.invoke(new CacheSerializableRunnable("Create values") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regionName);
for (int i = 1; i <= size; i++) {
region1.destroy(KEY + i);
}
LogWriterUtils.getLogWriter()
.info("### Number of Entries In Region after Delete :" + region1.keySet().size());
}
});
}
/**
* support for invalidating values.
*/
public void invalidateValues(VM vm, final String regionName, final int size) {
vm.invoke(new CacheSerializableRunnable("Create values") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regionName);
for (int i = 1; i <= size; i++) {
region1.invalidate(KEY + i);
}
LogWriterUtils.getLogWriter()
.info("### Number of Entries In Region after Delete :" + region1.keySet().size());
}
});
}
public void createPool(VM vm, String poolName, String server, int port) {
createPool(vm, poolName, new String[] {server}, new int[] {port});
}
public void createPool(VM vm, final String poolName, final String[] servers, final int[] ports) {
createPool(vm, poolName, servers, ports, null);
}
public void createPool(VM vm, final String poolName, final String[] servers, final int[] ports,
final String redundancyLevel) {
vm.invoke(new CacheSerializableRunnable("createPool :" + poolName) {
@Override
public void run2() throws CacheException {
// Create Cache.
getCache();
IgnoredException.addIgnoredException("java.net.ConnectException");
PoolFactory cpf = PoolManager.createFactory();
cpf.setSubscriptionEnabled(true);
if (redundancyLevel != null) {
int redundancy = Integer.parseInt(redundancyLevel);
cpf.setSubscriptionRedundancy(redundancy);
}
for (int i = 0; i < servers.length; i++) {
LogWriterUtils.getLogWriter()
.info("### Adding to Pool. ### Server : " + servers[i] + " Port : " + ports[i]);
cpf.addServer(servers[i], ports[i]);
}
cpf.create(poolName);
}
});
}
/* Register CQs */
public void createCQ(VM vm, final String poolName, final String cqName, final String queryStr) {
vm.invoke(new CacheSerializableRunnable("Create CQ :" + cqName) {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Create CQ. ###" + cqName);
// Get CQ Service.
QueryService qService = null;
try {
qService = (PoolManager.find(poolName)).getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
// Create CQ Attributes.
CqAttributesFactory cqf = new CqAttributesFactory();
CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
((CqQueryTestListener) cqListeners[0]).cqName = cqName;
cqf.initCqListeners(cqListeners);
CqAttributes cqa = cqf.create();
// Create CQ.
try {
CqQuery cq1 = qService.newCq(cqName, queryStr, cqa);
assertTrue("newCq() state mismatch", cq1.getState().isStopped());
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("QueryService is :" + qService, ex);
Assert.fail("Failed to create CQ " + cqName + " . ", ex);
}
}
});
}
// REMOVE..........
public void createCQ(VM vm, final String cqName, final String queryStr) {
vm.invoke(new CacheSerializableRunnable("Create CQ :" + cqName) {
@Override
public void run2() throws CacheException {
// pause(60 * 1000);
// getLogWriter().info("### DEBUG CREATE CQ START ####");
// pause(20 * 1000);
LogWriterUtils.getLogWriter().info("### Create CQ. ###" + cqName);
// Get CQ Service.
QueryService qService = null;
try {
qService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
// Create CQ Attributes.
CqAttributesFactory cqf = new CqAttributesFactory();
CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
((CqQueryTestListener) cqListeners[0]).cqName = cqName;
cqf.initCqListeners(cqListeners);
CqAttributes cqa = cqf.create();
// Create CQ.
try {
CqQuery cq1 = qService.newCq(cqName, queryStr, cqa);
assertTrue("newCq() state mismatch", cq1.getState().isStopped());
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("QueryService is :" + qService, ex);
Assert.fail("Failed to create CQ " + cqName + " . ", ex);
}
}
});
}
/* Register CQs with no name, execute, and close */
public void createAndExecCQNoName(VM vm, final String poolName, final String queryStr) {
vm.invoke(new CacheSerializableRunnable("Create CQ with no name:") {
@Override
public void run2() throws CacheException {
// pause(60 * 1000);
LogWriterUtils.getLogWriter().info("### DEBUG CREATE CQ START ####");
// pause(20 * 1000);
LogWriterUtils.getLogWriter().info("### Create CQ with no name. ###");
// Get CQ Service.
QueryService qService = null;
CqQuery cq1 = null;
String cqName = null;
try {
qService = (PoolManager.find(poolName)).getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
SelectResults cqResults = null;
for (int i = 0; i < 20; ++i) {
// Create CQ Attributes.
CqAttributesFactory cqf = new CqAttributesFactory();
CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
cqf.initCqListeners(cqListeners);
CqAttributes cqa = cqf.create();
// Create CQ with no name and execute with initial results.
try {
cq1 = qService.newCq(queryStr, cqa);
((CqQueryTestListener) cqListeners[0]).cqName = cq1.getName();
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("CQService is :" + qService);
Assert.fail("Failed to create CQ with no name" + " . ", ex);
}
if (cq1 == null) {
LogWriterUtils.getLogWriter().info("Failed to get CqQuery object for CQ with no name.");
} else {
cqName = cq1.getName();
LogWriterUtils.getLogWriter().info("Created CQ with no name, generated CQ name: "
+ cqName + " CQ state:" + cq1.getState());
assertTrue("Create CQ with no name illegal state", cq1.getState().isStopped());
}
if (i % 2 == 0) {
try {
cqResults = cq1.executeWithInitialResults();
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("CqService is :" + qService);
Assert.fail("Failed to execute CQ with initial results, cq name: " + cqName + " . ",
ex);
}
LogWriterUtils.getLogWriter().info("initial result size = " + cqResults.size());
LogWriterUtils.getLogWriter()
.info("CQ state after execute with initial results = " + cq1.getState());
assertTrue("executeWithInitialResults() state mismatch", cq1.getState().isRunning());
} else {
try {
cq1.execute();
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("CQService is :" + qService);
Assert.fail("Failed to execute CQ " + cqName + " . ", ex);
}
LogWriterUtils.getLogWriter().info("CQ state after execute = " + cq1.getState());
assertTrue("execute() state mismatch", cq1.getState().isRunning());
}
// Close the CQ
try {
cq1.close();
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("CqService is :" + qService);
Assert.fail("Failed to close CQ " + cqName + " . ", ex);
}
assertTrue("closeCq() state mismatch", cq1.getState().isClosed());
}
}
});
}
public void executeCQ(VM vm, final String cqName, final boolean initialResults,
String expectedErr) {
executeCQ(vm, cqName, initialResults, noTest, null, expectedErr);
}
/**
* Execute/register CQ as running.
*
* @param initialResults true if initialResults are requested
* @param expectedResultsSize if >= 0, validate results against this size
* @param expectedErr if not null, an error we expect
*/
public void executeCQ(VM vm, final String cqName, final boolean initialResults,
final int expectedResultsSize, final String[] expectedKeys, final String expectedErr) {
vm.invoke(new CacheSerializableRunnable("Execute CQ :" + cqName) {
private void work() throws CacheException {
LogWriterUtils.getLogWriter().info("### DEBUG EXECUTE CQ START ####");
// Get CQ Service.
QueryService cqService = null;
CqQuery cq1 = null;
cqService = getCache().getQueryService();
// Get CqQuery object.
try {
cq1 = cqService.getCq(cqName);
if (cq1 == null) {
LogWriterUtils.getLogWriter()
.info("Failed to get CqQuery object for CQ name: " + cqName);
fail("Failed to get CQ " + cqName);
} else {
LogWriterUtils.getLogWriter().info("Obtained CQ, CQ name: " + cq1.getName());
assertTrue("newCq() state mismatch", cq1.getState().isStopped());
}
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("CqService is :" + cqService);
LogWriterUtils.getLogWriter().error(ex);
Assert.fail("Failed to execute CQ " + cqName, ex);
}
if (initialResults) {
SelectResults cqResults = null;
try {
cqResults = cq1.executeWithInitialResults();
} catch (Exception ex) {
fail("Failed to execute CQ " + cqName, ex);
}
LogWriterUtils.getLogWriter().info("initial result size = " + cqResults.size());
assertTrue("executeWithInitialResults() state mismatch", cq1.getState().isRunning());
if (expectedResultsSize >= 0) {
assertEquals(
"Unexpected results size for CQ: " + cqName + " CQ Query :" + cq1.getQueryString(),
expectedResultsSize, cqResults.size());
}
if (expectedKeys != null) {
HashSet resultKeys = new HashSet();
for (Object o : cqResults.asList()) {
Struct s = (Struct) o;
resultKeys.add(s.get("key"));
}
for (int i = 0; i < expectedKeys.length; i++) {
assertTrue(
"Expected key :" + expectedKeys[i] + " Not found in CqResults for CQ: " + cqName
+ " CQ Query :" + cq1.getQueryString() + " Keys in CqResults :" + resultKeys,
resultKeys.contains(expectedKeys[i]));
}
}
} else {
try {
cq1.execute();
} catch (Exception ex) {
if (expectedErr == null) {
LogWriterUtils.getLogWriter().info("CqService is :" + cqService, ex);
}
Assert.fail("Failed to execute CQ " + cqName, ex);
}
assertTrue("execute() state mismatch", cq1.getState().isRunning());
}
}
@Override
public void run2() throws CacheException {
if (expectedErr != null) {
getCache().getLogger()
.info("<ExpectedException action=add>" + expectedErr + "</ExpectedException>");
}
try {
work();
} finally {
if (expectedErr != null) {
getCache().getLogger()
.info("<ExpectedException action=remove>" + expectedErr + "</ExpectedException>");
}
}
}
});
}
/* Stop/pause CQ */
public void stopCQ(VM vm, final String cqName) throws Exception {
vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Stop CQ. ###" + cqName);
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
// Stop CQ.
CqQuery cq1 = null;
try {
cq1 = cqService.getCq(cqName);
cq1.stop();
} catch (Exception ex) {
Assert.fail("Failed to stop CQ " + cqName + " . ", ex);
}
assertTrue("Stop CQ state mismatch", cq1.getState().isStopped());
}
});
}
// Stop and execute CQ repeatedly
/* Stop/pause CQ */
private void stopExecCQ(VM vm, final String cqName, final int count) throws Exception {
vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) {
@Override
public void run2() throws CacheException {
CqQuery cq1 = null;
LogWriterUtils.getLogWriter().info("### Stop and Exec CQ. ###" + cqName);
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCqService.", cqe);
}
// Get CQ.
try {
cq1 = cqService.getCq(cqName);
} catch (Exception ex) {
Assert.fail("Failed to get CQ " + cqName + " . ", ex);
}
for (int i = 0; i < count; ++i) {
// Stop CQ.
try {
cq1.stop();
} catch (Exception ex) {
Assert.fail("Count = " + i + "Failed to stop CQ " + cqName + " . ", ex);
}
assertTrue("Stop CQ state mismatch, count = " + i, cq1.getState().isStopped());
LogWriterUtils.getLogWriter()
.info("After stop in Stop and Execute loop, ran successfully, loop count: " + i);
LogWriterUtils.getLogWriter().info("CQ state: " + cq1.getState());
// Re-execute CQ
try {
cq1.execute();
} catch (Exception ex) {
Assert.fail("Count = " + i + "Failed to execute CQ " + cqName + " . ", ex);
}
assertTrue("Execute CQ state mismatch, count = " + i, cq1.getState().isRunning());
LogWriterUtils.getLogWriter()
.info("After execute in Stop and Execute loop, ran successfully, loop count: " + i);
LogWriterUtils.getLogWriter().info("CQ state: " + cq1.getState());
}
}
});
}
/* UnRegister CQs */
public void closeCQ(VM vm, final String cqName) throws Exception {
vm.invoke(new CacheSerializableRunnable("Close CQ :" + cqName) {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Close CQ. ###" + cqName);
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCqService.", cqe);
}
// Close CQ.
CqQuery cq1 = null;
try {
cq1 = cqService.getCq(cqName);
cq1.close();
} catch (Exception ex) {
Assert.fail("Failed to close CQ " + cqName + " . ", ex);
}
assertTrue("Close CQ state mismatch", cq1.getState().isClosed());
}
});
}
/* Register CQs */
public void registerInterestListCQ(VM vm, final String regionName, final int keySize,
final boolean all) {
vm.invoke(new CacheSerializableRunnable("Register InterestList and CQ") {
@Override
public void run2() throws CacheException {
// Get CQ Service.
Region region = null;
try {
region = getRootRegion().getSubregion(regionName);
region.getAttributesMutator()
.addCacheListener(new CertifiableTestCacheListener(LogWriterUtils.getLogWriter()));
} catch (Exception cqe) {
fail("Failed to get Region.", cqe);
}
try {
if (all) {
region.registerInterest("ALL_KEYS");
} else {
List list = new ArrayList();
for (int i = 1; i <= keySize; i++) {
list.add(KEY + i);
}
region.registerInterest(list);
}
} catch (Exception ex) {
fail("Failed to Register InterestList", ex);
}
}
});
}
/* Validate CQ Count */
public void validateCQCount(VM vm, final int cqCnt) throws Exception {
vm.invoke(new CacheSerializableRunnable("validate cq count") {
@Override
public void run2() throws CacheException {
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
int numCqs = 0;
try {
numCqs = cqService.getCqs().length;
} catch (Exception ex) {
Assert.fail("Failed to get the CQ Count.", ex);
}
assertEquals("Number of cqs mismatch.", cqCnt, numCqs);
}
});
}
/**
* Throws AssertionError if the CQ can be found or if any other error occurs
*/
private void failIfCQExists(VM vm, final String cqName) {
vm.invoke(new CacheSerializableRunnable("Fail if CQ exists") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Fail if CQ Exists. ### " + cqName);
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
CqQuery cQuery = cqService.getCq(cqName);
if (cQuery != null) {
fail("Unexpectedly found CqQuery for CQ : " + cqName);
}
}
});
}
private void validateCQError(VM vm, final String cqName, final int numError) {
vm.invoke(new CacheSerializableRunnable("Validate CQs") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Validating CQ. ### " + cqName);
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
CqQuery cQuery = cqService.getCq(cqName);
if (cQuery == null) {
fail("Failed to get CqQuery for CQ : " + cqName);
}
CqAttributes cqAttr = cQuery.getCqAttributes();
CqListener cqListener = cqAttr.getCqListener();
CqQueryTestListener listener = (CqQueryTestListener) cqListener;
listener.printInfo(false);
// Check for totalEvents count.
if (numError != noTest) {
// Result size validation.
listener.printInfo(true);
assertEquals("Total Event Count mismatch", numError, listener.getErrorEventCount());
}
}
});
}
public void validateCQ(VM vm, final String cqName, final int resultSize, final int creates,
final int updates, final int deletes) {
validateCQ(vm, cqName, resultSize, creates, updates, deletes, noTest, noTest, noTest, noTest);
}
public void validateCQ(VM vm, final String cqName, final int resultSize, final int creates,
final int updates, final int deletes, final int queryInserts, final int queryUpdates,
final int queryDeletes, final int totalEvents) {
vm.invoke(new CacheSerializableRunnable("Validate CQs") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Validating CQ. ### " + cqName);
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
CqQuery cQuery = cqService.getCq(cqName);
if (cQuery == null) {
fail("Failed to get CqQuery for CQ : " + cqName);
}
CqAttributes cqAttr = cQuery.getCqAttributes();
CqListener cqListeners[] = cqAttr.getCqListeners();
CqQueryTestListener listener = (CqQueryTestListener) cqListeners[0];
listener.printInfo(false);
// Wait for expected events to arrive at Listener.
listener.waitForEvents(creates, updates, deletes, queryInserts, queryUpdates, queryDeletes,
totalEvents);
// Check for totalEvents count.
if (totalEvents != noTest) {
// Result size validation.
// listener.printInfo(true);
assertEquals("Total Event Count mismatch", totalEvents, listener.getTotalEventCount());
}
if (resultSize != noTest) {
// SelectResults results = cQuery.getCqResults();
// getLogWriter().info("### CQ Result Size is :" + results.size());
// Result size validation.
// Since ResultSet is not maintained for this release.
// Instead of resultSize its been validated with total number of events.
fail("test for event counts instead of results size");
}
// Check for create count.
if (creates != noTest) {
// Result size validation.
// listener.printInfo(true);
assertEquals("Create Event mismatch", creates, listener.getCreateEventCount());
}
// Check for update count.
if (updates != noTest) {
// Result size validation.
// listener.printInfo(true);
assertEquals("Update Event mismatch", updates, listener.getUpdateEventCount());
}
// Check for delete count.
if (deletes != noTest) {
// Result size validation.
// listener.printInfo(true);
assertEquals("Delete Event mismatch", deletes, listener.getDeleteEventCount());
}
// Check for queryInsert count.
if (queryInserts != noTest) {
// Result size validation.
// listener.printInfo(true);
assertEquals("Query Insert Event mismatch", queryInserts,
listener.getQueryInsertEventCount());
}
// Check for queryUpdate count.
if (queryUpdates != noTest) {
// Result size validation.
// listener.printInfo(true);
assertEquals("Query Update Event mismatch", queryUpdates,
listener.getQueryUpdateEventCount());
}
// Check for queryDelete count.
if (queryDeletes != noTest) {
// Result size validation.
// listener.printInfo(true);
assertEquals("Query Delete Event mismatch", queryDeletes,
listener.getQueryDeleteEventCount());
}
}
});
}
public void waitForCreated(VM vm, final String cqName, final String key) {
waitForEvent(vm, 0, cqName, key);
}
public void waitForUpdated(VM vm, final String cqName, final String key) {
waitForEvent(vm, 1, cqName, key);
}
public void waitForDestroyed(VM vm, final String cqName, final String key) {
waitForEvent(vm, 2, cqName, key);
}
public void waitForInvalidated(VM vm, final String cqName, final String key) {
waitForEvent(vm, 3, cqName, key);
}
public void waitForClose(VM vm, final String cqName) {
waitForEvent(vm, 4, cqName, null);
}
public void waitForRegionClear(VM vm, final String cqName) {
waitForEvent(vm, 5, cqName, null);
}
public void waitForRegionInvalidate(VM vm, final String cqName) {
waitForEvent(vm, 6, cqName, null);
}
private void waitForEvent(VM vm, final int event, final String cqName, final String key) {
vm.invoke(new CacheSerializableRunnable(
"wait for event (" + event + ") in cq " + cqName + "; key=" + key) {
@Override
public void run2() throws CacheException {
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
CqQuery cQuery = cqService.getCq(cqName);
if (cQuery == null) {
fail("Failed to get CqQuery for CQ : " + cqName);
}
CqAttributes cqAttr = cQuery.getCqAttributes();
CqListener[] cqListener = cqAttr.getCqListeners();
CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];
switch (event) {
case CREATE:
listener.waitForCreated(key);
break;
case UPDATE:
listener.waitForUpdated(key);
break;
case DESTROY:
listener.waitForDestroyed(key);
break;
case INVALIDATE:
listener.waitForInvalidated(key);
break;
case CLOSE:
listener.waitForClose();
break;
case REGION_CLEAR:
listener.waitForRegionClear();
break;
case REGION_INVALIDATE:
listener.waitForRegionInvalidate();
break;
}
}
});
}
/**
* Waits till the CQ state is same as the expected. Waits for max time, if the CQ state is not
* same as expected throws exception.
*/
public void waitForCqState(VM vm, final String cqName, final int state) {
vm.invoke(new CacheSerializableRunnable("Wait For cq State") {
@Override
public void run2() throws CacheException {
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
CqQuery cQuery = cqService.getCq(cqName);
if (cQuery == null) {
fail("Failed to get CqQuery for CQ : " + cqName);
}
// Get CQ State.
CqStateImpl cqState = (CqStateImpl) cQuery.getState();
// Wait max time, till the CQ state is as expected.
final long start = System.currentTimeMillis();
while (cqState.getState() != state) {
assertTrue(
"Waited over " + MAX_TIME + "ms for Cq State to be changed to " + state
+ "; consider raising " + WAIT_PROPERTY,
(System.currentTimeMillis() - start) < MAX_TIME);
Wait.pause(100);
}
}
});
}
public void clearCQListenerEvents(VM vm, final String cqName) {
vm.invoke(new CacheSerializableRunnable("validate cq count") {
@Override
public void run2() throws CacheException {
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
CqQuery cQuery = cqService.getCq(cqName);
if (cQuery == null) {
fail("Failed to get CqQuery for CQ : " + cqName);
}
CqAttributes cqAttr = cQuery.getCqAttributes();
CqListener cqListener = cqAttr.getCqListener();
CqQueryTestListener listener = (CqQueryTestListener) cqListener;
listener.getEventHistory();
}
});
}
public void validateQuery(VM vm, final String query, final int resultSize) {
vm.invoke(new CacheSerializableRunnable("Validate Query") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Validating Query. ###");
QueryService qs = getCache().getQueryService();
Query q = qs.newQuery(query);
try {
Object r = q.execute();
if (r instanceof Collection) {
int rSize = ((Collection) r).size();
LogWriterUtils.getLogWriter().info("### Result Size is :" + rSize);
assertEquals(rSize, resultSize);
}
} catch (Exception e) {
Assert.fail("Failed to execute the query.", e);
}
}
});
}
private Properties getConnectionProps(String[] hosts, int[] ports, Properties newProps) {
Properties props = new Properties();
String endPoints = "";
String host = hosts[0];
for (int i = 0; i < ports.length; i++) {
if (hosts.length > 1) {
host = hosts[i];
}
endPoints = endPoints + "server" + i + "=" + host + ":" + ports[i];
if (ports.length > (i + 1)) {
endPoints = endPoints + ",";
}
}
props.setProperty("endpoints", endPoints);
props.setProperty("retryAttempts", "1");
// Add other property elements.
if (newProps != null) {
Enumeration e = newProps.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
props.setProperty(key, newProps.getProperty(key));
}
}
return props;
}
// Exercise CQ attributes mutator functions
private void mutateCQAttributes(VM vm, final String cqName, final int mutator_function)
throws Exception {
vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) {
@Override
public void run2() throws CacheException {
CqQuery cq1 = null;
LogWriterUtils.getLogWriter().info("### CQ attributes mutator for ###" + cqName);
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
// Get CQ.
try {
cq1 = cqService.getCq(cqName);
} catch (Exception ex) {
Assert.fail("Failed to get CQ " + cqName + " . ", ex);
}
CqAttributesMutator cqAttrMutator = cq1.getCqAttributesMutator();
CqAttributes cqAttr = cq1.getCqAttributes();
CqListener cqListeners[];
switch (mutator_function) {
case CREATE:
// Reinitialize with 2 CQ Listeners
CqListener cqListenersArray[] = {new CqQueryTestListener(getCache().getLogger()),
new CqQueryTestListener(getCache().getLogger())};
cqAttrMutator.initCqListeners(cqListenersArray);
cqListeners = cqAttr.getCqListeners();
assertEquals("CqListener count mismatch", cqListeners.length, 2);
break;
case UPDATE:
// Add 2 new CQ Listeners
CqListener newListener1 = new CqQueryTestListener(getCache().getLogger());
CqListener newListener2 = new CqQueryTestListener(getCache().getLogger());
cqAttrMutator.addCqListener(newListener1);
cqAttrMutator.addCqListener(newListener2);
cqListeners = cqAttr.getCqListeners();
assertEquals("CqListener count mismatch", cqListeners.length, 3);
break;
case DESTROY:
cqListeners = cqAttr.getCqListeners();
cqAttrMutator.removeCqListener(cqListeners[0]);
cqListeners = cqAttr.getCqListeners();
assertEquals("CqListener count mismatch", cqListeners.length, 2);
// Remove a listener and validate
cqAttrMutator.removeCqListener(cqListeners[0]);
cqListeners = cqAttr.getCqListeners();
assertEquals("CqListener count mismatch", cqListeners.length, 1);
break;
}
}
});
}
/**
* Test for InterestList and CQ registered from same clients.
*/
@Test
public void testInterestListAndCQs() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
/* Init Server and Client */
createServer(server);
final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testInterestListAndCQs";
createPool(client, poolName, host0, thePort);
createClient(client, thePort, host0);
/* Create CQs. */
createCQ(client, poolName, "testInterestListAndCQs_0", cqs[0]);
validateCQCount(client, 1);
/* Init values at server. */
final int size = 10;
executeCQ(client, "testInterestListAndCQs_0", false, null);
registerInterestListCQ(client, regions[0], size, false);
createValues(server, regions[0], size);
// Wait for client to Synch.
for (int i = 1; i <= 10; i++) {
waitForCreated(client, "testInterestListAndCQs_0", KEY + i);
}
Wait.pause(5 * 1000);
// validate CQs.
validateCQ(client, "testInterestListAndCQs_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ noTest, /* deletes; */ noTest, /* queryInserts: */ size,
/* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);
// Validate InterestList.
// CREATE
client.invoke(new CacheSerializableRunnable("validate updates") {
@Override
public void run2() throws CacheException {
final Region region = getRootRegion().getSubregion(regions[0]);
assertNotNull(region);
// TODO does this WaitCriterion actually help?
WaitCriterion wc = new WaitCriterion() {
String excuse;
@Override
public boolean done() {
int sz = region.entrySet().size();
if (sz == size) {
return true;
}
excuse = "Mismatch, number of keys (" + sz
+ ") in local region is not equal to the interest list size (" + size + ")";
return false;
}
@Override
public String description() {
return excuse;
}
};
GeodeAwaitility.await().untilAsserted(wc);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 1; i <= 10; i++) {
ctl.waitForCreated(KEY + i);
assertNotNull(region.getEntry(KEY + i));
}
}
});
// UPDATE
createValues(server, regions[0], size);
// Wait for client to Synch.
for (int i = 1; i <= 10; i++) {
waitForUpdated(client, "testInterestListAndCQs_0", KEY + i);
}
client.invoke(new CacheSerializableRunnable("validate updates") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regions[0]);
assertNotNull(region);
Set keys = region.entrySet();
assertEquals(
"Mismatch, number of keys in local region is not equal to the interest list size", size,
keys.size());
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 1; i <= 10; i++) {
ctl.waitForUpdated(KEY + i);
assertNotNull(region.getEntry(KEY + i));
}
}
});
// INVALIDATE
server.invoke(new CacheSerializableRunnable("Invalidate values") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regions[0]);
for (int i = 1; i <= size; i++) {
region1.invalidate(KEY + i);
}
}
});
waitForInvalidated(client, "testInterestListAndCQs_0", KEY + 10);
client.invoke(new CacheSerializableRunnable("validate invalidates") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regions[0]);
assertNotNull(region);
Set keys = region.entrySet();
assertEquals(
"Mismatch, number of keys in local region is not equal to the interest list size", size,
keys.size());
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 1; i <= 10; i++) {
ctl.waitForInvalidated(KEY + i);
assertNotNull(region.getEntry(KEY + i));
}
}
});
validateCQ(client, "testInterestListAndCQs_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ size, /* deletes; */ noTest, /* queryInserts: */ size,
/* queryUpdates: */ size, /* queryDeletes: */ size, /* totalEvents: */ size * 3);
// DESTROY - this should not have any effect on CQ, as the events are
// already destroyed from invalidate events.
server.invoke(new CacheSerializableRunnable("Invalidate values") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regions[0]);
for (int i = 1; i <= size; i++) {
region1.destroy(KEY + i);
}
}
});
// Wait for destroyed.
client.invoke(new CacheSerializableRunnable("validate destroys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regions[0]);
assertNotNull(region);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 1; i <= 10; i++) {
ctl.waitForDestroyed(KEY + i);
}
}
});
validateCQ(client, "testInterestListAndCQs_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ size, /* deletes; */ noTest, /* queryInserts: */ size,
/* queryUpdates: */ size, /* queryDeletes: */ size, /* totalEvents: */ size * 3);
closeClient(client);
closeServer(server);
}
/**
* Test for CQ register and UnRegister.
*/
@Test
public void testCQStopExecute() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
/* Init Server and Client */
createServer(server);
final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testCQStopExecute";
createPool(client, poolName, host0, thePort);
/* Create CQs. */
createCQ(client, poolName, "testCQStopExecute_0", cqs[0]);
validateCQCount(client, 1);
executeCQ(client, "testCQStopExecute_0", false, null);
/* Init values at server. */
int size = 10;
createValues(server, regions[0], size);
// Wait for client to Synch.
waitForCreated(client, "testCQStopExecute_0", KEY + size);
// Check if Client and Server in sync.
validateQuery(server, cqs[0], 10);
// validate CQs.
validateCQ(client, "testCQStopExecute_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 0,
/* queryDeletes: */ 0, /* totalEvents: */ size);
// Test CQ stop
stopCQ(client, "testCQStopExecute_0");
// Test CQ re-enable
executeCQ(client, "testCQStopExecute_0", false, null);
/* Init values at server. */
createValues(server, regions[0], 20);
// Wait for client to Synch.
waitForCreated(client, "testCQStopExecute_0", KEY + 20);
size = 30;
// Check if Client and Server in sync.
validateQuery(server, cqs[0], 20);
// validate CQs.
validateCQ(client, "testCQStopExecute_0", /* resultSize: */ noTest, /* creates: */ 20,
/* updates: */ 10, /* deletes; */ 0, /* queryInserts: */ 20, /* queryUpdates: */ 10,
/* queryDeletes: */ 0, /* totalEvents: */ size);
// Stop and execute CQ 20 times
stopExecCQ(client, "testCQStopExecute_0", 20);
// Test CQ Close
closeCQ(client, "testCQStopExecute_0");
// Close.
closeClient(client);
closeServer(server);
}
/**
* Test for CQ Attributes Mutator functions
*/
@Test
public void testCQAttributesMutator() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
/* Init Server and Client */
createServer(server);
final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testCQAttributesMutator";
createPool(client, poolName, host0, thePort);
/* Create CQs. */
String cqName = new String("testCQAttributesMutator_0");
createCQ(client, poolName, cqName, cqs[0]);
validateCQCount(client, 1);
executeCQ(client, cqName, false, null);
/* Init values at server. */
int size = 10;
createValues(server, regions[0], size);
// Wait for client to Synch.
waitForCreated(client, cqName, KEY + size);
// validate CQs.
validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ size, /* updates: */ 0,
/* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 0, /* queryDeletes: */ 0,
/* totalEvents: */ size);
// Add 2 new CQ Listeners
mutateCQAttributes(client, cqName, UPDATE);
/* Init values at server. */
createValues(server, regions[0], size * 2);
waitForCreated(client, cqName, KEY + (size * 2));
validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ 20, /* updates: */ 10,
/* deletes; */ 0, /* queryInserts: */ 20, /* queryUpdates: */ 10, /* queryDeletes: */ 0,
/* totalEvents: */ 30);
// Remove 2 listeners and validate
mutateCQAttributes(client, cqName, DESTROY);
validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ 10, /* updates: */ 10,
/* deletes; */ 0, /* queryInserts: */ 10, /* queryUpdates: */ 10, /* queryDeletes: */ 0,
/* totalEvents: */ 20);
// Reinitialize with 2 CQ Listeners
mutateCQAttributes(client, cqName, CREATE);
/* Delete values at server. */
deleteValues(server, regions[0], 20);
// Wait for client to Synch.
waitForDestroyed(client, cqName, KEY + (size * 2));
validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ 0, /* updates: */ 0,
/* deletes; */ 20, /* queryInserts: */ 0, /* queryUpdates: */ 0, /* queryDeletes: */ 20,
/* totalEvents: */ 20);
// Close CQ
closeCQ(client, cqName);
// Close.
closeClient(client);
closeServer(server);
}
/**
* Test for CQ register and UnRegister.
*/
@Test
public void testCQCreateClose() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
/* Init Server and Client */
createServer(server);
final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testCQCreateClose";
System.out.println("##### Pool Name :" + poolName + " host :" + host0 + " port :" + thePort);
createPool(client, poolName, host0, thePort);
/* Create CQs. */
createCQ(client, poolName, "testCQCreateClose_0", cqs[0]);
validateCQCount(client, 1);
executeCQ(client, "testCQCreateClose_0", false, null);
/* Init values at server. */
int size = 10;
createValues(server, regions[0], size);
// Wait for client to Synch.
waitForCreated(client, "testCQCreateClose_0", KEY + size);
// Check if Client and Server in sync.
// validateServerClientRegionEntries(server, client, regions[0]);
validateQuery(server, cqs[0], 10);
// validate CQs.
validateCQ(client, "testCQCreateClose_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 0,
/* queryDeletes: */ 0, /* totalEvents: */ size);
// Test CQ stop
stopCQ(client, "testCQCreateClose_0");
// Test CQ re-enable
executeCQ(client, "testCQCreateClose_0", false, null);
// Test CQ Close
closeCQ(client, "testCQCreateClose_0");
// Create CQs with no name, execute, and close.
// UNCOMMENT....
createAndExecCQNoName(client, poolName, cqs[0]);
// Accessing the closed CQ.
failIfCQExists(client, "testCQCreateClose_0");
// re-Create the cq which is closed.
createCQ(client, poolName, "testCQCreateClose_0", cqs[0]);
/* Test CQ Count */
validateCQCount(client, 1);
// Registering CQ with same name from same client.
try {
createCQ(client, poolName, "testCQCreateClose_0", cqs[0]);
fail("Trying to create CQ with same name. Should have thrown CQExistsException");
} catch (RMIException rmiExc) {
Throwable cause = rmiExc.getCause();
assertTrue("unexpected cause: " + cause.getClass().getName(),
cause instanceof AssertionError);
Throwable causeCause = cause.getCause(); // should be a CQExistsException
assertTrue("Got wrong exception: " + causeCause.getClass().getName(),
causeCause instanceof CqExistsException);
}
// Getting values from non-existent CQ.
failIfCQExists(client, "testCQCreateClose_NO");
// Server Registering CQ.
try {
createCQ(server, "testCQCreateClose_1", cqs[0]);
fail("Trying to create CQ on Cache Server. Should have thrown Exception.");
} catch (RMIException rmiExc) {
Throwable cause = rmiExc.getCause();
assertTrue("unexpected cause: " + cause.getClass().getName(),
cause instanceof AssertionError);
Throwable causeCause = cause.getCause(); // should be a IllegalStateException
assertTrue("Got wrong exception: " + causeCause.getClass().getName(),
causeCause instanceof IllegalStateException);
}
validateCQCount(client, 1);
createCQ(client, poolName, "testCQCreateClose_3", cqs[2]);
validateCQCount(client, 2);
/* Test for closeAllCQs() */
client.invoke(new CacheSerializableRunnable("CloseAll CQ :") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Close All CQ. ###");
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
LogWriterUtils.getLogWriter().info("Failed to getCQService.", cqe);
Assert.fail("Failed to getCQService.", cqe);
}
// Close CQ.
try {
cqService.closeCqs();
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("Failed to close All CQ.", ex);
Assert.fail("Failed to close All CQ. ", ex);
}
}
});
validateCQCount(client, 0);
// Initialize.
createCQ(client, poolName, "testCQCreateClose_2", cqs[1]);
createCQ(client, poolName, "testCQCreateClose_4", cqs[1]);
createCQ(client, poolName, "testCQCreateClose_5", cqs[1]);
// Execute few of the initialized cqs
executeCQ(client, "testCQCreateClose_4", false, null);
executeCQ(client, "testCQCreateClose_5", false, null);
// Call close all CQ.
client.invoke(new CacheSerializableRunnable("CloseAll CQ 2 :") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Close All CQ 2. ###");
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
// Close CQ.
try {
cqService.closeCqs();
} catch (Exception ex) {
Assert.fail("Failed to close All CQ . ", ex);
}
}
});
// Close.
closeClient(client);
closeServer(server);
}
/**
* This will test the events after region destory. The CQs on the destroy region needs to be
* closed.
*/
@Test
public void testRegionDestroy() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
/* Init Server and Client */
createServer(server);
final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testRegionDestroy";
createPool(client, poolName, host0, thePort);
createClient(client, thePort, host0);
/* Create CQs. */
createCQ(client, poolName, "testRegionDestroy_0", cqs[0]);
createCQ(client, poolName, "testRegionDestroy_1", cqs[0]);
createCQ(client, poolName, "testRegionDestroy_2", cqs[0]);
executeCQ(client, "testRegionDestroy_0", false, null);
executeCQ(client, "testRegionDestroy_1", false, null);
executeCQ(client, "testRegionDestroy_2", false, null);
/* Init values at server. */
final int size = 10;
registerInterestListCQ(client, regions[0], size, false);
createValues(server, regions[0], size);
// Wait for client to Synch.
waitForCreated(client, "testRegionDestroy_0", KEY + 10);
// validate CQs.
validateCQ(client, "testRegionDestroy_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ noTest, /* deletes; */ noTest, /* queryInserts: */ size,
/* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);
// Validate InterestList.
// CREATE
client.invoke(new CacheSerializableRunnable("validate updates") {
@Override
public void run2() throws CacheException {
// Wait for the region to become the correct size
WaitCriterion wc = new WaitCriterion() {
String excuse;
@Override
public boolean done() {
Region region = getRootRegion().getSubregion(regions[0]);
if (region == null) {
excuse = "Can't find region";
return false;
}
int sz = region.entrySet().size();
if (sz != size) {
excuse = "Region is of size " + sz + ", expected " + size;
return false;
}
return true;
}
@Override
public String description() {
return excuse;
}
};
GeodeAwaitility.await().untilAsserted(wc);
Region region = getRootRegion().getSubregion(regions[0]);
assertNotNull(region);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 1; i <= 10; i++) {
ctl.waitForCreated(KEY + i);
assertNotNull(region.getEntry(KEY + i));
}
}
});
// Destroy Region.
server.invoke(new CacheSerializableRunnable("Destroy Region") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regions[0]);
region1.destroyRegion();
}
});
Wait.pause(2 * 1000);
validateCQCount(client, 0);
closeClient(client);
closeServer(server);
}
/**
* Test for CQ with multiple clients.
*/
@Test
public void testCQWithMultipleClients() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client1 = host.getVM(1);
VM client2 = host.getVM(2);
VM client3 = host.getVM(3);
/* Create Server and Client */
createServer(server);
final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName1 = "testCQWithMultipleClients1";
String poolName2 = "testCQWithMultipleClients2";
String poolName3 = "testCQWithMultipleClients3";
createPool(client1, poolName1, host0, thePort);
createPool(client2, poolName2, host0, thePort);
/* Create CQs. and initialize the region */
createCQ(client1, poolName1, "testCQWithMultipleClients_0", cqs[0]);
executeCQ(client1, "testCQWithMultipleClients_0", false, null);
createCQ(client2, poolName2, "testCQWithMultipleClients_0", cqs[0]);
executeCQ(client2, "testCQWithMultipleClients_0", false, null);
int size = 10;
// Create Values on Server.
createValues(server, regions[0], size);
waitForCreated(client1, "testCQWithMultipleClients_0", KEY + 10);
/* Validate the CQs */
validateCQ(client1, "testCQWithMultipleClients_0", /* resultSize: */ noTest,
/* creates: */ size, /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size,
/* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);
waitForCreated(client2, "testCQWithMultipleClients_0", KEY + 10);
validateCQ(client2, "testCQWithMultipleClients_0", /* resultSize: */ noTest,
/* creates: */ size, /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size,
/* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);
/* Close test */
closeCQ(client1, "testCQWithMultipleClients_0");
validateCQ(client2, "testCQWithMultipleClients_0", /* resultSize: */ noTest,
/* creates: */ size, /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size,
/* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);
/* Init new client and create cq */
createPool(client3, poolName3, host0, thePort);
createCQ(client3, poolName3, "testCQWithMultipleClients_0", cqs[0]);
createCQ(client3, poolName3, "testCQWithMultipleClients_1", cqs[1]);
executeCQ(client3, "testCQWithMultipleClients_0", false, null);
executeCQ(client3, "testCQWithMultipleClients_1", false, null);
// Update values on Server. This will be updated on new Client CQs.
createValues(server, regions[0], size);
waitForUpdated(client3, "testCQWithMultipleClients_0", KEY + 10);
validateCQ(client3, "testCQWithMultipleClients_0", /* resultSize: */ noTest, /* creates: */ 0,
/* updates: */ size, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ size,
/* queryDeletes: */ 0, /* totalEvents: */ size);
validateCQ(client3, "testCQWithMultipleClients_1", /* resultSize: */ noTest, /* creates: */ 0,
/* updates: */ 1, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ 1,
/* queryDeletes: */ 0, /* totalEvents: */ 1);
/* Validate the CQ count */
validateCQCount(client1, 0);
validateCQCount(client2, 1);
validateCQCount(client3, 2);
/* Close Client Test */
closeClient(client1);
clearCQListenerEvents(client2, "testCQWithMultipleClients_0");
clearCQListenerEvents(client3, "testCQWithMultipleClients_1");
// Update values on server, update again.
createValues(server, regions[0], size);
waitForUpdated(client2, "testCQWithMultipleClients_0", KEY + 10);
validateCQ(client2, "testCQWithMultipleClients_0", /* resultSize: */ noTest,
/* creates: */ size, /* updates: */ size * 2, /* deletes; */ 0, /* queryInserts: */ size,
/* queryUpdates: */ size * 2, /* queryDeletes: */ 0, /* totalEvents: */ size * 3);
waitForUpdated(client3, "testCQWithMultipleClients_1", KEY + 2);
validateCQ(client3, "testCQWithMultipleClients_1", /* resultSize: */ noTest, /* creates: */ 0,
/* updates: */ 2, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ 2,
/* queryDeletes: */ 0, /* totalEvents: */ 2);
/* Close Server and Client */
closeClient(client2);
closeClient(client3);
closeServer(server);
}
/**
* Test for CQ ResultSet.
*/
@Test
public void testCQResultSet() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
createServer(server);
final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testCQResultSet";
createPool(client, poolName, host0, thePort);
/* CQ Test with initial Values. */
int size = 10;
createValues(server, regions[0], size);
Wait.pause(1 * 500);
// Create CQs.
createCQ(client, poolName, "testCQResultSet_0", cqs[0]);
// Check resultSet Size.
executeCQ(client, "testCQResultSet_0", true, 10, null, null);
/* CQ Test with no Values on Region */
createCQ(client, poolName, "testCQResultSet_1", cqs[2]);
// Check resultSet Size.
executeCQ(client, "testCQResultSet_1", true, 0, null, null);
stopCQ(client, "testCQResultSet_1");
// Init values.
createValues(server, regions[1], 5);
validateQuery(server, cqs[2], 2);
executeCQ(client, "testCQResultSet_1", true, 2, null, null);
// Close.
closeClient(client);
closeServer(server);
}
/**
* Test for CQ Listener events.
*/
@Test
public void testCQEvents() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
createServer(server);
final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testCQEvents";
createPool(client, poolName, host0, thePort);
// Create client.
// createClient(client, thePort, host0);
// Create CQs.
createCQ(client, poolName, "testCQEvents_0", cqs[0]);
executeCQ(client, "testCQEvents_0", false, null);
// Init values at server.
int size = 10;
createValues(server, regions[0], size);
waitForCreated(client, "testCQEvents_0", KEY + size);
// validate Create events.
validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 0,
/* queryDeletes: */ 0, /* totalEvents: */ size);
// Update values.
createValues(server, regions[0], 5);
createValues(server, regions[0], 10);
waitForUpdated(client, "testCQEvents_0", KEY + size);
// validate Update events.
validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ 15, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 15,
/* queryDeletes: */ 0, /* totalEvents: */ size + 15);
// Validate delete events.
deleteValues(server, regions[0], 5);
waitForDestroyed(client, "testCQEvents_0", KEY + 5);
validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ 15, /* deletes; */5, /* queryInserts: */ size, /* queryUpdates: */ 15,
/* queryDeletes: */ 5, /* totalEvents: */ size + 15 + 5);
// Insert invalid Events.
server.invoke(new CacheSerializableRunnable("Create values") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regions[0]);
for (int i = -1; i >= -5; i--) {
// change : new Portfolio(i) rdubey ( for Suspect strings problem).
// region1.put(KEY+i, new Portfolio(i) );
region1.put(KEY + i, KEY + i);
}
}
});
Wait.pause(1 * 1000);
// cqs should not get any creates, deletes or updates. rdubey.
validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ 15, /* deletes; */5, /* queryInserts: */ size, /* queryUpdates: */ 15,
/* queryDeletes: */ 5, /* totalEvents: */ size + 15 + 5);
// Close.
closeClient(client);
closeServer(server);
}
/**
* Test query execution multiple times on server without ALIAS.
*/
@Test
public void testCqEventsWithoutAlias() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
createServer(server);
final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testCQEvents";
createPool(client, poolName, host0, thePort);
// Create CQs.
createCQ(client, poolName, "testCQEvents_0", cqs[11]);
executeCQ(client, "testCQEvents_0", false, null);
// Init values at server.
int size = 10;
createValues(server, regions[0], size);
waitForCreated(client, "testCQEvents_0", KEY + size);
// validate Create events.
validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 0,
/* queryDeletes: */ 0, /* totalEvents: */ size);
// Update values.
createValues(server, regions[0], 5);
createValues(server, regions[0], 10);
waitForUpdated(client, "testCQEvents_0", KEY + size);
// validate Update events.
validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ 15, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 15,
/* queryDeletes: */ 0, /* totalEvents: */ size + 15);
// Validate delete events.
deleteValues(server, regions[0], 5);
waitForDestroyed(client, "testCQEvents_0", KEY + 5);
validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ 15, /* deletes; */5, /* queryInserts: */ size, /* queryUpdates: */ 15,
/* queryDeletes: */ 5, /* totalEvents: */ size + 15 + 5);
// Insert invalid Events.
server.invoke(new CacheSerializableRunnable("Create values") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regions[0]);
for (int i = -1; i >= -5; i--) {
// change : new Portfolio(i) rdubey ( for Suspect strings problem).
// region1.put(KEY+i, new Portfolio(i) );
region1.put(KEY + i, KEY + i);
}
}
});
Wait.pause(1 * 1000);
// cqs should not get any creates, deletes or updates. rdubey.
validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
/* updates: */ 15, /* deletes; */5, /* queryInserts: */ size, /* queryUpdates: */ 15,
/* queryDeletes: */ 5, /* totalEvents: */ size + 15 + 5);
// Close.
closeClient(client);
closeServer(server);
}
/**
* Test for stopping and restarting CQs.
*/
@Test
public void testEnableDisableCQ() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
createServer(server);
final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testEnableDisableCQ";
createPool(client, poolName, host0, thePort);
// Create client.
// createClient(client, thePort, host0);
// Create CQs.
createCQ(client, poolName, "testEnableDisable_0", cqs[0]);
executeCQ(client, "testEnableDisable_0", false, null);
/* Test for disableCQ */
client.invoke(new CacheSerializableRunnable("Client disableCQs()") {
@Override
public void run2() throws CacheException {
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
cqService.stopCqs();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
}
});
Wait.pause(1 * 1000);
// Init values at server.
int size = 10;
createValues(server, regions[0], size);
Wait.pause(1 * 500);
// There should not be any creates.
validateCQ(client, "testEnableDisable_0", /* resultSize: */ noTest, /* creates: */ 0,
/* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ 0,
/* queryDeletes: */ 0, /* totalEvents: */ 0);
/* Test for enable CQ */
client.invoke(new CacheSerializableRunnable("Client enableCQs()") {
@Override
public void run2() throws CacheException {
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
cqService.executeCqs();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
}
});
Wait.pause(1 * 1000);
createValues(server, regions[0], size);
waitForUpdated(client, "testEnableDisable_0", KEY + size);
// It gets created on the CQs
validateCQ(client, "testEnableDisable_0", /* resultSize: */ noTest, /* creates: */ 0,
/* updates: */ size, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ size,
/* queryDeletes: */ 0, /* totalEvents: */ size);
/* Test for disableCQ on Region */
client.invoke(new CacheSerializableRunnable("Client disableCQs()") {
@Override
public void run2() throws CacheException {
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
cqService.stopCqs("/root/" + regions[0]);
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
}
});
Wait.pause(2 * 1000);
deleteValues(server, regions[0], size / 2);
Wait.pause(1 * 500);
// There should not be any deletes.
validateCQ(client, "testEnableDisable_0", /* resultSize: */ noTest, /* creates: */ 0,
/* updates: */ size, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ size,
/* queryDeletes: */ 0, /* totalEvents: */ size);
/* Test for enable CQ on region */
client.invoke(new CacheSerializableRunnable("Client enableCQs()") {
@Override
public void run2() throws CacheException {
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
cqService.executeCqs("/root/" + regions[0]);
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
}
});
Wait.pause(1 * 1000);
createValues(server, regions[0], size / 2);
waitForCreated(client, "testEnableDisable_0", KEY + (size / 2));
// Gets updated on the CQ.
validateCQ(client, "testEnableDisable_0", /* resultSize: */ noTest, /* creates: */ size / 2,
/* updates: */ size, /* deletes; */ 0, /* queryInserts: */ size / 2,
/* queryUpdates: */ size, /* queryDeletes: */ 0, /* totalEvents: */ size * 3 / 2);
// Close.
closeClient(client);
closeServer(server);
}
/**
* Test for Complex queries.
*/
@Test
public void testQuery() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
createServer(server);
final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testQuery";
createPool(client, poolName, host0, thePort);
// Create client.
createClient(client, thePort, host0);
// Create CQs.
createCQ(client, poolName, "testQuery_3", cqs[3]);
executeCQ(client, "testQuery_3", true, null);
createCQ(client, poolName, "testQuery_4", cqs[4]);
executeCQ(client, "testQuery_4", true, null);
createCQ(client, poolName, "testQuery_5", cqs[5]);
executeCQ(client, "testQuery_5", true, null);
createCQ(client, poolName, "testQuery_6", cqs[6]);
executeCQ(client, "testQuery_6", true, null);
createCQ(client, poolName, "testQuery_7", cqs[7]);
executeCQ(client, "testQuery_7", true, null);
createCQ(client, poolName, "testQuery_8", cqs[8]);
executeCQ(client, "testQuery_8", true, null);
// Close.
closeClient(client);
closeServer(server);
}
/**
* Test for CQ Fail over.
*/
@Test
public void testCQFailOver() throws Exception {
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
createServer(server1);
final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
// createClient(client, new int[] {port1, ports[0]}, host0, "-1");
String poolName = "testCQFailOver";
createPool(client, poolName, new String[] {host0, host0}, new int[] {port1, ports[0]});
int numCQs = 1;
for (int i = 0; i < numCQs; i++) {
// Create CQs.
createCQ(client, poolName, "testCQFailOver_" + i, cqs[i]);
executeCQ(client, "testCQFailOver_" + i, false, null);
}
Wait.pause(1 * 1000);
// CREATE.
createValues(server1, regions[0], 10);
createValues(server1, regions[1], 10);
waitForCreated(client, "testCQFailOver_0", KEY + 10);
Wait.pause(1 * 1000);
createServer(server2, ports[0]);
final int thePort2 = server2.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
System.out
.println("### Port on which server1 running : " + port1 + " Server2 running : " + thePort2);
Wait.pause(3 * 1000);
// Extra pause - added after downmerging trunk r17050
Wait.pause(5 * 1000);
// UPDATE - 1.
createValues(server1, regions[0], 10);
createValues(server1, regions[1], 10);
waitForUpdated(client, "testCQFailOver_0", KEY + 10);
int[] resultsCnt = new int[] {10, 1, 2};
for (int i = 0; i < numCQs; i++) {
validateCQ(client, "testCQFailOver_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest);
}
// Close server1.
closeServer(server1);
// Fail over should happen.
Wait.pause(3 * 1000);
for (int i = 0; i < numCQs; i++) {
validateCQ(client, "testCQFailOver_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest);
}
// UPDATE - 2
this.clearCQListenerEvents(client, "testCQFailOver_0");
createValues(server2, regions[0], 10);
createValues(server2, regions[1], 10);
for (int i = 1; i <= 10; i++) {
waitForUpdated(client, "testCQFailOver_0", KEY + i);
}
for (int i = 0; i < numCQs; i++) {
validateCQ(client, "testCQFailOver_" + i, noTest, resultsCnt[i], resultsCnt[i] * 2, noTest);
}
// Close.
closeClient(client);
closeServer(server2);
}
/**
* Test for CQ Fail over/HA with redundancy level set.
*/
@Test
public void testCQHA() throws Exception {
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM server3 = host.getVM(2);
VM client = host.getVM(3);
// Killing servers can cause this message on the client side.
IgnoredException.addIgnoredException("Could not find any server");
createServer(server1);
final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2);
createServer(server2, ports[0]);
final int thePort2 = server2.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
createServer(server3, ports[1]);
final int port3 = server3.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
System.out.println("### Port on which server1 running : " + port1 + " server2 running : "
+ thePort2 + " Server3 running : " + port3);
// Create client - With 3 server endpoints and redundancy level set to 2.
String poolName = "testCQStopExecute";
createPool(client, poolName, new String[] {host0, host0, host0},
new int[] {port1, thePort2, port3});
// Create client with redundancyLevel 1
// createClient(client, new int[] {port1, thePort2, port3}, host0, "1");
// Create CQs.
int numCQs = 1;
for (int i = 0; i < numCQs; i++) {
// Create CQs.
createCQ(client, poolName, "testCQHA_" + i, cqs[i]);
executeCQ(client, "testCQHA_" + i, false, null);
}
Wait.pause(1 * 1000);
// CREATE.
createValues(server1, regions[0], 10);
createValues(server1, regions[1], 10);
waitForCreated(client, "testCQHA_0", KEY + 10);
// Clients expected initial result.
int[] resultsCnt = new int[] {10, 1, 2};
// Close server1.
// To maintain the redundancy; it will make connection to endpoint-3.
closeServer(server1);
Wait.pause(3 * 1000);
// UPDATE-1.
createValues(server2, regions[0], 10);
createValues(server2, regions[1], 10);
waitForUpdated(client, "testCQHA_0", KEY + 10);
// Validate CQ.
for (int i = 0; i < numCQs; i++) {
validateCQ(client, "testCQHA_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest);
}
// Close server-2
closeServer(server2);
Wait.pause(2 * 1000);
// UPDATE - 2.
clearCQListenerEvents(client, "testCQHA_0");
createValues(server3, regions[0], 10);
createValues(server3, regions[1], 10);
// Wait for events at client.
waitForUpdated(client, "testCQHA_0", KEY + 10);
for (int i = 0; i < numCQs; i++) {
validateCQ(client, "testCQHA_" + i, noTest, resultsCnt[i], resultsCnt[i] * 2, noTest);
}
// Close.
closeClient(client);
closeServer(server3);
}
/**
* Test Filter registration during GII. Bug fix 39014
*/
@Test
public void testFilterRegistrationDuringGII() throws Exception {
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
Wait.pause(3 * 1000);
createServer(server1);
final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
String poolName = "testFilterRegistrationDuringGII";
createPool(client1, poolName, new String[] {host0, host0}, new int[] {port1, ports[0]}, "-1");
createPool(client2, poolName, new String[] {host0, host0}, new int[] {port1, ports[0]}, "-1");
createClient(client1, new int[] {port1, ports[0]}, host0, "-1", poolName);
createClient(client2, new int[] {port1, ports[0]}, host0, "-1", poolName);
// Create CQs.
final int numCQs = 2;
for (int i = 0; i < numCQs; i++) {
// Create CQs.
createCQ(client1, poolName, "client1_" + i, cqs[i]);
executeCQ(client1, "client1_" + i, false, null);
createCQ(client2, poolName, "client2_" + i, cqs[i]);
executeCQ(client2, "client2_" + i, false, null);
}
final int interestSize = 20;
registerInterestListCQ(client1, regions[0], interestSize, false);
registerInterestListCQ(client2, regions[0], 0, true);
Wait.pause(1 * 1000);
// CREATE.
createValues(server1, regions[0], 100);
createValues(server1, regions[1], 10);
waitForCreated(client1, "client1_0", KEY + 10);
// Create server2.
server2.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setMirrorType(MirrorType.KEYS_VALUES);
for (int i = 0; i < regions.length; i++) {
createRegion(regions[i], factory.createRegionAttributes());
}
InitialImageOperation.slowImageProcessing = 100;
try {
try {
startBridgeServer(ports[0], true);
} catch (Exception ex) {
Assert.fail("While starting CacheServer", ex);
}
while (true) {
if (InitialImageOperation.slowImageSleeps > 0) {
// Create events while GII for HARegion is in progress.
LocalRegion region1 = (LocalRegion) getRootRegion().getSubregion(regions[0]);
for (int i = 90; i <= 120; i++) {
region1.put(KEY + i, new Portfolio(i, i));
}
break;
}
Wait.pause(20);
}
} finally {
InitialImageOperation.slowImageProcessing = 0;
}
}
});
Wait.pause(3 * 1000);
// Check if CQs are registered as part of GII.
server2.invoke(new CacheSerializableRunnable("Create values") {
@Override
public void run2() throws CacheException {
DefaultQueryService qs = (DefaultQueryService) getCache().getQueryService();
Collection<CacheClientProxy> proxies = CacheClientNotifier.getInstance().getClientProxies();
Iterator iter = proxies.iterator();
try {
for (CacheClientProxy p : proxies) {
ClientProxyMembershipID clientId = p.getProxyID();
List cqs = qs.getCqService().getAllClientCqs(clientId);
getCache().getLogger()
.fine("Number of CQs found for client :" + clientId + " are :" + cqs.size());
if (cqs.size() != numCQs) {
fail("Number of CQs registerted by the client is :" + cqs.size()
+ " less than expected : " + numCQs);
}
CqQuery cq = (CqQuery) cqs.get(0);
LocalRegion region1 = (LocalRegion) getRootRegion().getSubregion(regions[0]);
if (cq.getName().startsWith("client1_")) {
if (region1.getFilterProfile().getKeysOfInterest(clientId) == null || region1
.getFilterProfile().getKeysOfInterest(clientId).size() != interestSize) {
fail("Interest registartion during Secondary HARegion creation has failed.");
}
} else {
if (!region1.getFilterProfile().isInterestedInAllKeys(clientId)) {
fail("Interest registartion during Secondary HARegion creation has failed.");
}
}
}
} catch (Exception ex) {
fail("Exception while validating filter count. ", ex);
}
}
});
// Close.
closeClient(client1);
closeClient(client2);
closeServer(server1);
closeServer(server2);
}
/**
* Test without CQs. This was added after an exception encountered with CQService, when there was
* no CQService initiated.
*/
@Test
public void testWithoutCQs() throws Exception {
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
createServer(server1);
createServer(server2);
final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
final int thePort2 = server2.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
SerializableRunnable createConnectionPool = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getCache();
IgnoredException.addIgnoredException("java.net.ConnectException||java.net.SocketException");
AttributesFactory regionFactory = new AttributesFactory();
regionFactory.setScope(Scope.LOCAL);
ClientServerTestCase.configureConnectionPool(regionFactory, host0, port1, thePort2, true,
-1, -1, null);
createRegion(regions[0], regionFactory.createRegionAttributes());
}
};
// Create client.
client.invoke(createConnectionPool);
server1.invoke(new CacheSerializableRunnable("Create values") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regions[0]);
for (int i = 0; i < 20; i++) {
region1.put("key-string-" + i, "value-" + i);
}
}
});
// Put some values on the client.
client.invoke(new CacheSerializableRunnable("Put values client") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regions[0]);
for (int i = 0; i < 10; i++) {
region1.put("key-string-" + i, "client-value-" + i);
}
}
});
Wait.pause(2 * 1000);
closeServer(server1);
closeServer(server2);
}
/**
* Test getCQs for a regions
*/
@Test
public void testGetCQsForARegionName() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
createServer(server);
final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testGetCQsForARegionName";
createPool(client, poolName, host0, thePort);
// Create CQs.
createCQ(client, poolName, "testQuery_3", cqs[3]);
executeCQ(client, "testQuery_3", true, null);
createCQ(client, poolName, "testQuery_4", cqs[4]);
executeCQ(client, "testQuery_4", true, null);
createCQ(client, poolName, "testQuery_5", cqs[5]);
executeCQ(client, "testQuery_5", true, null);
createCQ(client, poolName, "testQuery_6", cqs[6]);
executeCQ(client, "testQuery_6", true, null);
// with regions[1]
createCQ(client, poolName, "testQuery_7", cqs[7]);
executeCQ(client, "testQuery_7", true, null);
createCQ(client, poolName, "testQuery_8", cqs[8]);
executeCQ(client, "testQuery_8", true, null);
client.invoke(new CacheSerializableRunnable("Client disableCQs()") {
@Override
public void run2() throws CacheException {
// Get CQ Service.
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
CqQuery[] cq = cqService.getCqs("/root/" + regions[0]);
assertNotNull(
"CQservice should not return null for cqs on this region : /root/" + regions[0], cq);
getCache().getLogger().info("cqs for region: /root/" + regions[0] + " : " + cq.length);
// closing on of the cqs.
cq[0].close();
cq = cqService.getCqs("/root/" + regions[0]);
assertNotNull(
"CQservice should not return null for cqs on this region : /root/" + regions[0], cq);
getCache().getLogger().info("cqs for region: /root/" + regions[0]
+ " after closeing one of the cqs : " + cq.length);
cq = cqService.getCqs("/root/" + regions[1]);
getCache().getLogger().info("cqs for region: /root/" + regions[1] + " : " + cq.length);
assertNotNull(
"CQservice should not return null for cqs on this region : /root/" + regions[1], cq);
} catch (Exception cqe) {
Assert.fail("Failed to getCQService", cqe);
}
}
});
// Close.
closeClient(client);
closeServer(server);
}
/**
* Tests execution of queries with NULL in where clause like where ID = NULL etc.
*/
@Test
public void testQueryWithNULLInWhereClause() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
VM producer = host.getVM(2);
createServer(server);
final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
String poolName = "testQueryWithNULLInWhereClause";
createPool(client, poolName, host0, thePort);
// Create client.
// createClient(client, thePort, host0);
// producer is not doing any thing.
createClient(producer, thePort, host0);
final int size = 50;
createValues(producer, regions[0], size);
createCQ(client, poolName, "testQuery_9", cqs[9]);
executeCQ(client, "testQuery_9", true, null);
createValues(producer, regions[0], (2 * size));
for (int i = 1; i <= size; i++) {
if (i % 2 == 0)
waitForUpdated(client, "testQuery_9", KEY + i);
}
for (int i = (size + 1); i <= 2 * size; i++) {
if (i % 2 == 0)
waitForCreated(client, "testQuery_9", KEY + i);
}
validateCQ(client, "testQuery_9", noTest, 25, 25, noTest);
// Close.
closeClient(client);
closeServer(server);
}
/**
* Tests execution of queries with NULL in where clause like where ID = NULL etc.
*/
@Test
public void testForSupportedRegionAttributes() throws Exception {
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
// Create server with Global scope.
SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
// Create region with Global scope
AttributesFactory factory1 = new AttributesFactory();
factory1.setScope(Scope.GLOBAL);
factory1.setMirrorType(MirrorType.KEYS_VALUES);
createRegion(regions[0], factory1.createRegionAttributes());
// Create region with non Global, distributed_ack scope
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_NO_ACK);
factory2.setMirrorType(MirrorType.KEYS_VALUES);
createRegion(regions[1], factory2.createRegionAttributes());
Wait.pause(2000);
try {
startBridgeServer(port, true);
}
catch (Exception ex) {
Assert.fail("While starting CacheServer", ex);
}
Wait.pause(2000);
}
};
server1.invoke(createServer);
server2.invoke(createServer);
final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
final int thePort2 = server2.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
String poolName = "testForSupportedRegionAttributes";
createPool(client, poolName, new String[] {host0, host0}, new int[] {port1, thePort2});
// Create CQ on region with GLOBAL SCOPE.
createCQ(client, poolName, "testForSupportedRegionAttributes_0", cqs[0]);
executeCQ(client, "testForSupportedRegionAttributes_0", false, null);
int size = 5;
createValues(server1, regions[0], size);
for (int i = 1; i <= size; i++) {
waitForCreated(client, "testForSupportedRegionAttributes_0", KEY + i);
}
// Create CQ on region with non GLOBAL, DISTRIBUTED_ACK SCOPE.
createCQ(client, poolName, "testForSupportedRegionAttributes_1", cqs[2]);
String errMsg =
"The replicated region " + " specified in CQ creation does not have scope supported by CQ."
+ " The CQ supported scopes are DISTRIBUTED_ACK and GLOBAL.";
final String expectedErr = "Cq not registered on primary";
client.invoke(new CacheSerializableRunnable("Set expect") {
@Override
public void run2() {
getCache().getLogger()
.info("<ExpectedException action=add>" + expectedErr + "</ExpectedException>");
}
});
try {
executeCQ(client, "testForSupportedRegionAttributes_1", false, "CqException");
fail("The test should have failed with exception, " + errMsg);
} catch (Exception expected) {
// Expected.
} finally {
client.invoke(new CacheSerializableRunnable("Remove expect") {
@Override
public void run2() {
getCache().getLogger()
.info("<ExpectedException action=remove>" + expectedErr + "</ExpectedException>");
}
});
}
// Close.
closeClient(client);
closeServer(server1);
closeServer(server2);
}
/** For debug purpose - Compares entries in the region */
private void validateServerClientRegionEntries(VM server, VM client, final String regionName) {
server.invoke(new CacheSerializableRunnable("Server Region Entries") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
LogWriterUtils.getLogWriter().info("### Entries in Server :" + region.keySet().size());
}
});
client.invoke(new CacheSerializableRunnable("Client Region Entries") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regionName);
LogWriterUtils.getLogWriter().info("### Entries in Client :" + region.keySet().size());
}
});
}
/**
* Starts a cache server on the given port to serve up the given region.
*
* @since GemFire 4.0
*/
public void startBridgeServer(int port) throws IOException {
startBridgeServer(port, CacheServer.DEFAULT_NOTIFY_BY_SUBSCRIPTION);
}
/**
* Starts a cache server on the given port, using the given deserializeValues and
* notifyBySubscription to serve up the given region.
*
* @since GemFire 4.0
*/
public void startBridgeServer(int port, boolean notifyBySubscription) throws IOException {
Cache cache = getCache();
CacheServer bridge = cache.addCacheServer();
bridge.setPort(port);
bridge.setNotifyBySubscription(notifyBySubscription);
bridge.start();
bridgeServerPort = bridge.getPort();
}
/**
* Stops the cache server that serves up the given cache.
*
* @since GemFire 4.0
*/
protected void stopBridgeServer(Cache cache) {
CacheServer bridge = (CacheServer) cache.getCacheServers().iterator().next();
bridge.stop();
assertFalse(bridge.isRunning());
}
private void stopBridgeServers(Cache cache) {
CacheServer bridge = null;
for (Iterator bsI = cache.getCacheServers().iterator(); bsI.hasNext();) {
bridge = (CacheServer) bsI.next();
bridge.stop();
assertFalse(bridge.isRunning());
}
}
private void restartBridgeServers(Cache cache) throws IOException {
CacheServer bridge = null;
for (Iterator bsI = cache.getCacheServers().iterator(); bsI.hasNext();) {
bridge = (CacheServer) bsI.next();
bridge.start();
assertTrue(bridge.isRunning());
}
}
private InternalDistributedSystem createLonerDS() {
disconnectFromDS();
Properties lonerProps = new Properties();
lonerProps.setProperty(MCAST_PORT, "0");
lonerProps.setProperty(LOCATORS, "");
InternalDistributedSystem ds = getSystem(lonerProps);
assertEquals(0, ds.getDistributionManager().getOtherDistributionManagerIds().size());
return ds;
}
/**
* Returns region attributes for a <code>LOCAL</code> region
*/
protected RegionAttributes getRegionAttributes() {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
return factory.createRegionAttributes();
}
}