blob: 2b03b1aa585e3c56a0ac8ecd996919ad439bb8d9 [file] [log] [blame]
/*=========================================================================
* (c)Copyright 2002-2011, GemStone Systems, Inc. All Rights Reserved.
* 1260 NW Waterhouse Ave., Suite 200, Beaverton, OR 97006
* All Rights Reserved.
* =======================================================================*/
package com.gemstone.gemfire.mgmt.DataBrowser.connection.internal;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.query.CqAttributes;
import com.gemstone.gemfire.cache.query.CqAttributesFactory;
import com.gemstone.gemfire.cache.query.CqListener;
import com.gemstone.gemfire.cache.query.CqQuery;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.internal.GemFireVersion;
import com.gemstone.gemfire.mgmt.DataBrowser.connection.GFMemberDiscovery;
import com.gemstone.gemfire.mgmt.DataBrowser.connection.GemFireConnection;
import com.gemstone.gemfire.mgmt.DataBrowser.connection.GemFireConnectionListener;
import com.gemstone.gemfire.mgmt.DataBrowser.connection.GemFireMemberListener;
import com.gemstone.gemfire.mgmt.DataBrowser.model.member.GemFireMember;
import com.gemstone.gemfire.mgmt.DataBrowser.prefs.DataBrowserPreferences;
import com.gemstone.gemfire.mgmt.DataBrowser.query.IntrospectionRepository;
import com.gemstone.gemfire.mgmt.DataBrowser.query.QueryExecutionException;
import com.gemstone.gemfire.mgmt.DataBrowser.query.QueryResult;
import com.gemstone.gemfire.mgmt.DataBrowser.query.QueryUtil;
import com.gemstone.gemfire.mgmt.DataBrowser.query.cq.CQException;
import com.gemstone.gemfire.mgmt.DataBrowser.query.cq.CQQuery;
import com.gemstone.gemfire.mgmt.DataBrowser.query.cq.CQQueryImpl;
import com.gemstone.gemfire.mgmt.DataBrowser.query.cq.CQResult;
import com.gemstone.gemfire.mgmt.DataBrowser.query.internal.PdxHelper;
import com.gemstone.gemfire.mgmt.DataBrowser.utils.LogUtil;
public class GemFireClientConnectionImpl implements GemFireConnection {
static final String GEMFIRE_LIMIT_AVL = "6.*";
private DistributedSystem system;
private Cache cache;
private GFMemberDiscovery discovery;
private Map<String, CQQuery> cqs;
private volatile boolean closed;
private boolean limitClauseAvl = false;
public GemFireClientConnectionImpl(DistributedSystem sys,
GFMemberDiscovery dscvry) {
if (sys == null) {
throw new IllegalArgumentException(
"A valid GemFire client connection is required.");
} else if (dscvry == null) {
throw new IllegalArgumentException(
"A valid member discovery mechanism must be provided.");
}
this.system = sys;
this.cache = createCache(this.system);
this.discovery = dscvry;
this.cqs = Collections.synchronizedMap(new HashMap<String, CQQuery>());
this.closed = false;
String version = discovery.getGemFireSystemVersion();
/**
* A check is added to verify if the current version of GemFire supports LIMIT clause.
* LIMIT clause is available from GemFire 6.0 onwards.
**/
if(null == version) {
this.limitClauseAvl = false;
} else {
this.limitClauseAvl = version.matches(GEMFIRE_LIMIT_AVL);
}
String gemFireJarVersion = GemFireVersion.getGemFireVersion();
LogUtil.info("Version of GemFire jar in classpath: "+gemFireJarVersion);
LogUtil.info("The GemFire system version is :"+version);
LogUtil.info("This version of GemFire system supports LIMIT clause :"+this.limitClauseAvl);
//See comment for IntrospectionRepository.removePdxIntrospectionResults.
//Done only if Pdx is supported
IntrospectionRepository repos = IntrospectionRepository.singleton();
if (PdxHelper.getInstance().isPdxSupported()) {
if (repos.removePdxIntrospectionResults()) {
LogUtil.fine("Cleared cached PDX Type information");
}
}
}
public void close() {
if (this.system != null) {
this.closed = true;
//Stop All CQs.
List<String> currCqs = new ArrayList<String>();
currCqs.addAll(this.cqs.keySet());
for(String name : currCqs) {
try {
closeCQ(name);
}
catch (CQException e) {
LogUtil.warning("Failed to close CQ with name :" + name);
}
}
this.discovery.close();
this.cache.close();
this.system.disconnect();
this.cache = null;
this.system = null;
}
}
public QueryResult executeQuery(String query, GemFireMember member)
throws QueryExecutionException {
boolean param = limitClauseAvl && DataBrowserPreferences.getEnforceLimitToQuery();
QueryService service = getQueryService(member);
return QueryUtil.executeQuery(service, query, param);
}
public CQQuery newCQ(String name, String queryStr, GemFireMember member)
throws QueryExecutionException {
if(this.closed) {
throw new IllegalStateException("This connection is closed.");
}
if(member == null)
throw new IllegalArgumentException("Please specify a non-null member attribute");
if(name == null)
throw new IllegalArgumentException("Please specify a non-null name attribute for CQ.");
if(!member.isNotifyBySubscriptionEnabled()) {
throw new CQException("notify-by-subscription is not enabled on this server "+member.getId());
}
if(this.cqs.containsKey(name)) {
LogUtil.info("A CQ with name :"+name+" already exists. Hence closing the this CQ.");
closeCQ(name);
}
QueryService service = getQueryService(member);
CQResult result = new CQResult();
CqListener[] listeners = new CqListener[] {result};
CqAttributesFactory cqAf = new CqAttributesFactory();
cqAf.initCqListeners(listeners);
CqAttributes attrs = cqAf.create();
try {
CqQuery query = service.newCq(name, queryStr, attrs);
CQQueryImpl wrapper = new CQQueryImpl(member, query, result);
this.cqs.put(name, wrapper);
return wrapper;
}
catch (Exception e) {
throw new CQException(e);
}
}
public CQQuery getCQ(String name) {
if(name == null)
throw new IllegalArgumentException("Please specify a non-null name attribute for CQ.");
return this.cqs.get(name);
}
public void closeCQ(String name) throws CQException {
CQQuery query = getCQ(name);
if((query != null) && (!query.isClosed())) {
query.close();
LogUtil.info("A CQ with name :"+name+" is closed.");
}
//Make sure to remove the reference of CQ.
this.cqs.remove(name);
}
public void stopCQ(String name) throws CQException {
CQQuery query = getCQ(name);
if((query != null) && (query.isRunning())) {
query.stop();
}
}
public void addGemFireMemberListener(GemFireMemberListener listener) {
this.discovery.addGemFireMemberListener(listener);
}
public GemFireMember getMember(String id) {
return this.discovery.getMember(id);
}
public GemFireMember[] getMembers() {
return this.discovery.getMembers();
}
public QueryService getQueryService(GemFireMember member) {
return this.discovery.getQueryService(member);
}
public void removeGemFireMemberListener(GemFireMemberListener listener) {
this.discovery.removeGemFireMemberListener(listener);
}
public void addConnectionNotificationListener(GemFireConnectionListener listener) {
this.discovery.addConnectionNotificationListener(listener);
}
public void removeConnectionNotificationListener(GemFireConnectionListener listener) {
this.discovery.removeConnectionNotificationListener(listener);
}
public String getGemFireSystemVersion() {
return this.discovery.getGemFireSystemVersion();
}
public long getRefreshInterval() {
return this.discovery.getRefreshInterval();
}
public void setRefreshInterval(long time) {
this.discovery.setRefreshInterval(time);
}
/**
* Create cache. For version prior to 6.6 uses
* CacheFactory.create(DistributedSystem). For 6.6 & later it uses reflection
* to create the cache. Using reflection we also set configuration required to
* receive PdxInstance for PdxSerializable objects. The cache is loner as per
* the properties set in DistributedSystem.
*
* @param system
* Distributed System instance
* @return Cache instance
*/
private Cache createCache(DistributedSystem system) {
Cache cache = null;
boolean pdxSupported = PdxHelper.getInstance().isPdxSupported();
if (pdxSupported) {
// Not using ClientCacheFactory as 6.0 didn't have it.
CacheFactory factory = new CacheFactory();
Class<? extends CacheFactory> klass = factory.getClass();
Exception failure = null;
try {
Class<?>[] booleanArgs = new Class<?>[] {boolean.class};
Object[] booleanVals = new Object[] {Boolean.TRUE};
Method setPdxIgnoreUnreadFields = klass.getMethod("setPdxIgnoreUnreadFields", booleanArgs);
Method setPdxReadSerialized = klass.getMethod("setPdxReadSerialized", booleanArgs);
Method create = klass.getMethod("create", new Class<?>[0]);
//CacheFactory.setPdxIgnoreUnreadFields(boolean) - as Data Browser will only be reading data
setPdxIgnoreUnreadFields.invoke(factory, booleanVals);
//CacheFactory.setPdxReadSerialized(boolean)
setPdxReadSerialized.invoke(factory, booleanVals);
//CacheFactory.create()
cache = (Cache) create.invoke(factory, new Object[0]);
} catch (NoSuchMethodException e) {
failure = e;
} catch (IllegalAccessException e) {
failure = e;
} catch (InvocationTargetException e) {
failure = e;
} finally {
if (failure != null) {
LogUtil.error("Failed to create GemFire(" + CacheFactory.getVersion()
+ ") Cache.", failure);
throw new UnsupportedOperationException("Failed to create cache " +
"with PDX Support. Verify GemFire product directory is accessible.");
}
}
} else {
cache = CacheFactory.create(system);
}
LogUtil.info("GemFire("+CacheFactory.getVersion()+") Cache is created. " +
"PDX support is" + (pdxSupported ? "" : " NOT") + " available.");
return cache;
}
}