blob: 15bb0451fe0d056cb6bf66010516432fdbdb05e0 [file] [log] [blame]
package com.gemstone.gemfire.cache.query.dunit;
import java.util.Iterator;
import java.util.Properties;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.query.CqAttributes;
import com.gemstone.gemfire.cache.query.CqAttributesFactory;
import com.gemstone.gemfire.cache.query.CqEvent;
import com.gemstone.gemfire.cache.query.CqException;
import com.gemstone.gemfire.cache.query.CqExistsException;
import com.gemstone.gemfire.cache.query.CqListener;
import com.gemstone.gemfire.cache.query.IndexExistsException;
import com.gemstone.gemfire.cache.query.IndexNameConflictException;
import com.gemstone.gemfire.cache.query.QueryTestUtils;
import com.gemstone.gemfire.cache.query.RegionNotFoundException;
import com.gemstone.gemfire.cache.query.internal.index.CompactRangeIndex;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import dunit.AsyncInvocation;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
public class HelperTestCase extends CacheTestCase {
public HelperTestCase(String name) {
super(name);
}
protected void createPartitionRegion(VM vm, final String regionName, final Class valueConstraint) {
vm.invoke(new SerializableCallable() {
public Object call() throws Exception {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
RegionFactory factory = getCache().createRegionFactory(RegionShortcut.PARTITION)
.setPartitionAttributes(paf.create());
if (valueConstraint != null) {
factory.setValueConstraint(valueConstraint);
}
factory.create(regionName);
return true;
}
});
}
protected void createReplicatedRegion(VM vm, final String regionName, final Class valueConstraint) {
vm.invoke(new SerializableCallable() {
public Object call() throws Exception {
RegionFactory factory = getCache().createRegionFactory(
RegionShortcut.REPLICATE);
if (valueConstraint != null) {
factory.setValueConstraint(valueConstraint);
}
factory.create(regionName);
return true;
}
});
}
protected void createCachingProxyRegion(VM vm, final String regionName, final Class valueConstraint) {
vm.invoke(new SerializableCallable() {
public Object call() throws Exception {
ClientRegionFactory factory = ((ClientCache) getCache())
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
if (valueConstraint != null) {
factory.setValueConstraint(valueConstraint);
}
factory.create(regionName);
return true;
}
});
}
protected void createCQ(VM vm, final String cqName, final String query, final CqAttributes cqAttr) {
vm.invoke(new SerializableCallable() {
public Object call() throws CqException, RegionNotFoundException, CqExistsException {
CqAttributes attrs = cqAttr;
if (attrs == null) {
attrs = createDummyCqAttributes();
}
getCache().getQueryService().newCq(cqName, query, attrs);
return true;
}
});
}
protected void registerInterest(VM vm, final String regionName) {
vm.invoke(new SerializableCallable() {
public Object call() throws CqException, RegionNotFoundException, CqExistsException {
getCache().getRegion("/" + regionName).registerInterestRegex(".*");
return true;
}
});
}
protected void createIndex(VM vm, final String indexName, final String indexedExpression, final String regionPath) {
vm.invoke(new SerializableCallable() {
public Object call() throws RegionNotFoundException, CqExistsException, IndexExistsException, IndexNameConflictException {
getCache().getQueryService().createIndex(indexName, indexedExpression, regionPath);
return true;
}
});
}
protected void printIndexMapping(VM vm, final String regionName, final String indexName) {
vm.invoke(new SerializableCallable() {
public Object call() throws RegionNotFoundException, CqExistsException, IndexExistsException, IndexNameConflictException {
Region region = getCache().getRegion("/" + regionName);
CompactRangeIndex index = (CompactRangeIndex)getCache().getQueryService().getIndex(region, indexName);
System.out.println(index.dump());
return true;
}
});
}
protected void put(VM vm, final String regionName, final Object key, final Object value) {
vm.invoke(new SerializableCallable() {
public Object call() throws RegionNotFoundException, CqExistsException, IndexExistsException, IndexNameConflictException {
getCache().getRegion("/" + regionName).put(key, value);
return true;
}
});
}
private CqAttributes createDummyCqAttributes() {
// Create CQ Attributes.
CqAttributesFactory cqAf = new CqAttributesFactory();
// Initialize and set CqListener.
CqListener[] cqListeners = { new CqListener() {
@Override
public void close() {
}
@Override
public void onEvent(CqEvent aCqEvent) {
}
@Override
public void onError(CqEvent aCqEvent) {
}
}};
cqAf.initCqListeners(cqListeners);
CqAttributes cqa = cqAf.create();
return cqa;
}
protected AsyncInvocation executeCQ(VM vm, final String cqName) {
return vm.invokeAsync(new SerializableCallable() {
public Object call() throws CqException, RegionNotFoundException {
getCache().getQueryService().getCq(cqName).executeWithInitialResults();
return true;
}
});
}
public void stopCacheServer(VM server) {
server.invoke(new SerializableRunnable("Close CacheServer") {
public void run() {
CacheServer cs = getCache().getCacheServers().iterator().next();
cs.stop();
assertFalse(cs.isRunning());
}
});
}
protected void startCacheServer(VM server, final int port, final Properties properties) throws Exception {
createCacheServer(server, port, properties);
startCacheServers(server);
}
/*
* creates but does not start the cache server.
* the start() method needs to be invoked explicitly;
*/
protected void createCacheServer(VM server, final int port, final Properties properties) throws Exception {
server.invoke(new SerializableCallable() {
public Object call() throws Exception {
getSystem(properties);
GemFireCacheImpl cache = (GemFireCacheImpl)getCache();
cache.setCopyOnRead(true);
CacheServer cacheServer = getCache().addCacheServer();
cacheServer.setPort(port);
QueryTestUtils.setCache(cache);
return true;
}
});
}
protected void startCacheServers(VM server) {
server.invoke(new SerializableCallable() {
public Object call() throws Exception {
Iterator<CacheServer> iterator = getCache().getCacheServers().iterator();
while (iterator.hasNext()) {
iterator.next().start();
}
return true;
}
});
}
protected void startClient(VM client, final VM[] servers, final int[] ports, final int redundancyLevel, final Properties properties) {
client.invoke(new CacheSerializableRunnable("Start client") {
public void run2() throws CacheException {
getSystem(properties);
final ClientCacheFactory ccf = new ClientCacheFactory(properties);
for (int i = 0; i < servers.length; i++) {
ccf.addPoolServer(getServerHostName(servers[i].getHost()), ports[i]);
}
ccf.setPoolSubscriptionEnabled(true);
ccf.setPoolSubscriptionRedundancy(redundancyLevel);
ClientCache cache = (ClientCache)getClientCache(ccf);
}
});
}
protected Properties getClientProperties() {
Properties p = new Properties();
p.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
p.setProperty(DistributionConfig.LOCATORS_NAME, "");
return p;
}
protected Properties getServerProperties(int mcastPort) {
Properties p = new Properties();
p.setProperty(DistributionConfig.MCAST_PORT_NAME, mcastPort+"");
p.setProperty(DistributionConfig.LOCATORS_NAME, "");
return p;
}
}