blob: aa5053ad3e77703125b9a00b721e9afd8174752e [file] [log] [blame]
/*=========================================================================
* (c)Copyright 2002-2011, GemStone Systems, Inc. All Rights Reserved.
* 1260 NW Waterhouse Ave., Suite 200, Beaverton, OR 97006
* All Rights Reserved.
* =======================================================================*/
package com.gemstone.gemfire.mgmt.DataBrowser.query.cq;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.gemstone.gemfire.cache.query.CqEvent;
import com.gemstone.gemfire.cache.query.CqListener;
import com.gemstone.gemfire.cache.query.Struct;
import com.gemstone.gemfire.mgmt.DataBrowser.query.ColumnNotFoundException;
import com.gemstone.gemfire.mgmt.DataBrowser.query.ColumnValueNotAvailableException;
import com.gemstone.gemfire.mgmt.DataBrowser.query.IntrospectionException;
import com.gemstone.gemfire.mgmt.DataBrowser.query.IntrospectionRepository;
import com.gemstone.gemfire.mgmt.DataBrowser.query.IntrospectionResult;
import com.gemstone.gemfire.mgmt.DataBrowser.query.QueryUtil;
import com.gemstone.gemfire.mgmt.DataBrowser.query.ResultSet;
import com.gemstone.gemfire.mgmt.DataBrowser.query.cq.event.ErrorEvent;
import com.gemstone.gemfire.mgmt.DataBrowser.query.cq.event.ICQEvent;
import com.gemstone.gemfire.mgmt.DataBrowser.query.cq.event.RowAdded;
import com.gemstone.gemfire.mgmt.DataBrowser.query.cq.event.RowDeleted;
import com.gemstone.gemfire.mgmt.DataBrowser.query.cq.event.RowUpdated;
import com.gemstone.gemfire.mgmt.DataBrowser.query.internal.PdxHelper;
public final class CQResult implements ResultSet, CqListener {
private List<CQEventListener> listeners;
private List<TypeListener> type_listeners;
private Map<Object, IntrospectionResult> types;
private Map<Object, EventDataImpl> key_to_value_map;
public CQResult() {
this.types = Collections
.synchronizedMap(new HashMap<Object, IntrospectionResult>());
this.key_to_value_map = Collections
.synchronizedMap(new HashMap<Object, EventDataImpl>());
this.listeners = Collections
.synchronizedList(new ArrayList<CQEventListener>());
this.type_listeners = Collections
.synchronizedList(new ArrayList<TypeListener>());
}
public void close() {
Collection<CQEventListener> _listeners = getCQEventListeners();
for (CQEventListener listener : _listeners) {
listener.close();
}
}
public Object getColumnValue(Object tuple, int index)
throws ColumnNotFoundException, ColumnValueNotAvailableException {
Object type = (tuple instanceof Struct) ? ((Struct) tuple).getStructType()
: tuple.getClass();
if (this.types.containsKey(type)) {
IntrospectionResult metaInfo = this.types.get(type);
return metaInfo.getColumnValue(tuple, index);
}
throw new ColumnNotFoundException("Could not identify type :" + type);
}
public IntrospectionResult[] getIntrospectionResult() {
return this.types.values().toArray(new IntrospectionResult[0]);
}
public Collection<?> getQueryResult() {
throw new UnsupportedOperationException();
}
public Collection<?> getQueryResult(IntrospectionResult metaInfo) {
throw new UnsupportedOperationException();
}
public boolean isEmpty() {
return this.key_to_value_map.isEmpty();
}
public void addCQEventListener(CQEventListener listener) {
this.listeners.add(listener);
}
public void removeCQEventListener(CQEventListener listener) {
this.listeners.remove(listener);
}
public void addTypeListener(TypeListener listener) {
this.type_listeners.add(listener);
}
public void removeTypeListener(TypeListener listener) {
this.type_listeners.remove(listener);
}
public void onEvent(CqEvent cqEvent) {
Object key = cqEvent.getKey();
// Here the entry is destroyed from the result set.
if (cqEvent.getQueryOperation().isDestroy()) {
// Clear the key_to_value map.
EventData data = this.key_to_value_map.remove(key);
if (data != null) {
RowDeleted delEvent = new RowDeleted(data);
invokeListeners(delEvent);
}
} else {
// Add/Update the entry operation.
Object value = cqEvent.getNewValue();
// The new entry value should be not-null for this.
if (value != null) {
try {
// Get the type of the object and invoke listeners if the new type is
// detected.
IntrospectionResult metaInfo = getObjectType(value);
EventDataImpl oldEvent = this.key_to_value_map.get(key);
// Old entry is available for this key.
if (oldEvent != null) {
IntrospectionResult oldMetaInfo = oldEvent.getIntrospectionResult();
// Types are same. Hence just update the record.
if (metaInfo.equals(oldMetaInfo)) {
oldEvent.setValue(metaInfo, value);
invokeListeners(new RowUpdated(oldEvent));
} else {
// Types are different. Delete the old record and add the new one.
EventDataImpl temp = (EventDataImpl) oldEvent.clone();
invokeListeners(new RowDeleted(temp));
oldEvent.setValue(metaInfo, value);
invokeListeners(new RowAdded(oldEvent));
}
} else {
// There is no old entry for this key. Add it as a new one.
EventDataImpl newValue = new EventDataImpl(key, value, metaInfo);
this.key_to_value_map.put(key, newValue);
invokeListeners(new RowAdded(newValue));
}
} catch (Exception e) {
ErrorEvent errorEvent = new ErrorEvent(e);
invokeListeners(errorEvent);
}
}
}
}
public void onError(CqEvent cqEvent) {
// Propagate the exception.
invokeListeners(new ErrorEvent(cqEvent.getThrowable()));
}
public EventData getValueForKey(Object key) {
return this.key_to_value_map.get(key);
}
public Set<Object> getKeys() {
Set<Object> keys = new HashSet<Object>();
synchronized (this.key_to_value_map) {
keys.addAll(this.key_to_value_map.keySet());
}
return keys;
}
private void invokeListeners(ICQEvent event) {
Collection<CQEventListener> _listeners = getCQEventListeners();
for (CQEventListener listener : _listeners) {
listener.onEvent(event);
}
}
private IntrospectionResult getObjectType(Object value)
throws IntrospectionException {
Object type = value.getClass();
if (QueryUtil.isPdxInstance(value)) {
//PdxInfoType is used for PdxInstance
type = PdxHelper.getInstance().getPdxInfoType(value);
}
IntrospectionResult metaInfo = this.types.get(type);
if (metaInfo == null) {
metaInfo = IntrospectionRepository.singleton().introspectTypeByObject(value);
this.types.put(type, metaInfo);
notifyTypeListeners(metaInfo);
}
return metaInfo;
}
/**
* Notifies the available TypeListeners about newly added IntrospectionResult
*
* @param metaInfo
* newly added IntrospectionResult
*/
private void notifyTypeListeners(IntrospectionResult metaInfo) {
Collection<TypeListener> typeListeners = getTypeListeners();
for (TypeListener listener : typeListeners) {
listener.onNewTypeAdded(metaInfo);
}
}
private Collection<CQEventListener> getCQEventListeners() {
List<CQEventListener> result = new ArrayList<CQEventListener>();
synchronized (this.listeners) {
result.addAll(this.listeners);
}
return result;
}
private Collection<TypeListener> getTypeListeners() {
List<TypeListener> result = new ArrayList<TypeListener>();
synchronized (this.listeners) {
result.addAll(this.type_listeners);
}
return result;
}
}