blob: 2291a753ff9cdf81147557caba8696493b3aba69 [file] [log] [blame]
package com.gemstone.gemfire.mgmt.DataBrowser.controller.internal;
import java.util.ArrayList;
import java.util.List;
import com.gemstone.gemfire.mgmt.DataBrowser.connection.GemFireConnection;
import com.gemstone.gemfire.mgmt.DataBrowser.controller.CQConfiguarationPrms;
import com.gemstone.gemfire.mgmt.DataBrowser.controller.IQueryExecutedEvent;
import com.gemstone.gemfire.mgmt.DataBrowser.controller.IQueryExecutionListener;
import com.gemstone.gemfire.mgmt.DataBrowser.controller.QueryConfigurationPrms;
import com.gemstone.gemfire.mgmt.DataBrowser.model.member.GemFireMember;
import com.gemstone.gemfire.mgmt.DataBrowser.query.QueryResult;
import com.gemstone.gemfire.mgmt.DataBrowser.query.cq.CQQuery;
import com.gemstone.gemfire.mgmt.DataBrowser.utils.LogUtil;
/**
*
* Singleton Helper class to perform query fired by view also process post query
* data
*
* initialize() method has to be called after each connection to
* initialize/reset the state of field.
*
*
*
* @author mjha
*
*/
public class QueryExecutionHelper {
private static QueryExecutionHelper instance;
private GemFireConnection connection;
private List<QueryConfigurationPrms> queriesToBeExecutedQueue;
private List<PostQueryData> postQueryDataQueue;
private QueryResult lastQueryResult = null;
private Thread queryExecuter;
private Thread queryResultProcessor;
private boolean isShutDownStarted;
private QueryExecutionHelper() {
}
public static QueryExecutionHelper getInstance() {
if (instance == null)
instance = new QueryExecutionHelper();
return instance;
}
/**
* Initialize the fields , also starts the thread to execute query and process
* query result
*/
public void start(GemFireConnection conn) {
queriesToBeExecutedQueue = new ArrayList<QueryConfigurationPrms>();
postQueryDataQueue = new ArrayList<PostQueryData>();
connection = conn;
isShutDownStarted = false;
}
public void shutDown() {
this.isShutDownStarted = true;
if (isShutDownStarted) {
connection = null;
if (queryExecuter != null){
synchronized (queriesToBeExecutedQueue) {
queriesToBeExecutedQueue.clear();
queriesToBeExecutedQueue.notify();
}
}
if (queryResultProcessor != null){
synchronized (postQueryDataQueue) {
postQueryDataQueue.clear();
postQueryDataQueue.notify();
}
}
queryResultProcessor = null;
queryExecuter = null;
setLastQueryResults(null);
}
}
/**
* Append query to the queue. This will return with adding query if connection
* is no more or data browser shut down
*
* @param queryPrms
* encapsulating parameters to perform query and process post query
* result
*/
public void submitQuery(QueryConfigurationPrms queryPrms) {
if (connection == null || isShutDownStarted)
return;
synchronized (queriesToBeExecutedQueue) {
queriesToBeExecutedQueue.add(queryPrms);
if (queryExecuter == null) {
queryExecuter = new Thread(new QueryExecutor());
queryExecuter.setName("Query Executer");
queryExecuter.setDaemon(true);
queryExecuter.start();
}
queriesToBeExecutedQueue.notify();
}
}
private void processResult(PostQueryData queryData) {
if (connection == null || isShutDownStarted)
return;
synchronized (postQueryDataQueue) {
postQueryDataQueue.add(queryData);
if (queryResultProcessor == null) {
queryResultProcessor = new Thread(new QueryResultProcessor());
queryResultProcessor.setName("Query Result Processor");
queryResultProcessor.setDaemon(true);
queryResultProcessor.start();
}
postQueryDataQueue.notify();
}
}
/**
* Runnable to execute query on DS
*
* The runnable keeps on running as long as connection is establish with DS or
* Data Browser is running
*
* @author mjha
*/
private class QueryExecutor implements Runnable {
public void run() {
while (true) {
QueryConfigurationPrms prms = null;
synchronized (queriesToBeExecutedQueue) {
while (queriesToBeExecutedQueue.isEmpty()) {
if(connection != null && !isShutDownStarted){
try {
queriesToBeExecutedQueue.wait();
}
catch (InterruptedException e) {
continue;
}
}else{
return;
}
}
prms = queriesToBeExecutedQueue.remove(0);
}
String queryString = prms.getQueryString();
GemFireMember member = prms.getMember();
boolean isCQ = prms.isCQ();
Object result = null;
CQQuery cQuery = null;
if (connection != null && !isShutDownStarted) {
try {
if(!isCQ){
QueryResult executeQuery = connection.executeQuery(queryString,
member);
result = executeQuery;
}else{
CQConfiguarationPrms cqPrms= (CQConfiguarationPrms)prms;
cQuery = cqPrms.getCQuery();
cQuery.execute();
}
}
catch (Throwable e) {
result = e;
LogUtil.error("Error occured while executing the query: " + prms, e);
}
finally {
PostQueryData queryData = new PostQueryData(prms, result);
processResult(queryData);
}
}
else {
return;
}
}
}
}
public synchronized QueryResult getLastQueryResults(){
return lastQueryResult;
}
public synchronized void setLastQueryResults(QueryResult result) {
LogUtil.fine("QueryHelper.setLastQueryResults =>"+result);
lastQueryResult = result;
}
/**
* Runnable to process post query results, if query successful else it will
* process error occurred while querying
*
* The runnable keeps on running as long as connection is establish with DS or
* Data Browser is running
*
* @author mjha
*/
private class QueryResultProcessor implements Runnable {
public void run() {
while (true) {
PostQueryData data = null;
synchronized (postQueryDataQueue) {
while (postQueryDataQueue.isEmpty()) {
if(connection != null && !isShutDownStarted){
try {
postQueryDataQueue.wait();
}
catch (InterruptedException e) {
continue;
}
}else{
return;
}
}
data = postQueryDataQueue.remove(0);
}
QueryConfigurationPrms prms = data.getQueryPrms();
boolean isCQ = prms.isCQ();
IQueryExecutionListener listener = prms.getQueryExecutionListener();
String queryString = prms.getQueryString();
GemFireMember member = prms.getMember();
Object object = data.getQueryResult();
if (connection != null && !isShutDownStarted) {
if (object instanceof Throwable) {
IQueryExecutedEvent event = new QueryExecutedEvent(queryString,
null, member);
listener.queryFailed(event, (Throwable)object);
}
else {
if(!isCQ){
if (object instanceof QueryResult) {
IQueryExecutedEvent event = new QueryExecutedEvent(queryString,
(QueryResult)object, member);
setLastQueryResults((QueryResult)object);
listener.queryExecuted(event);
}
}else{
IQueryExecutedEvent event = new QueryExecutedEvent(queryString, null, member);
listener.queryExecuted(event);
}
}
}
else {
return;
}
}
}
}
/**
* represents post query data along with query information
*
* Post query data can be ResultSet or error occurred
*
* @author mjha
*/
private static class PostQueryData {
private QueryConfigurationPrms queryPrms;
private Object queryResult;
PostQueryData(QueryConfigurationPrms prms, Object resQuery) {
this.queryPrms = prms;
this.queryResult = resQuery;
}
public QueryConfigurationPrms getQueryPrms() {
return queryPrms;
}
public Object getQueryResult() {
return queryResult;
}
}
}