blob: b8273e5587e90c9b6fdbd9290c7f9f5408d86b3c [file] [log] [blame]
package com.gemstone.gemfire.cache.query.internal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.eclipse.jetty.webapp.MetaData;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.query.QueryException;
import com.gemstone.gemfire.cache.query.SelectResults;
import com.gemstone.gemfire.cache.query.Struct;
import com.gemstone.gemfire.cache.query.internal.types.CollectionTypeImpl;
import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl;
import com.gemstone.gemfire.cache.query.internal.utils.LimitIterator;
import com.gemstone.gemfire.cache.query.internal.utils.PDXUtils;
import com.gemstone.gemfire.cache.query.types.CollectionType;
import com.gemstone.gemfire.cache.query.types.ObjectType;
import com.gemstone.gemfire.internal.DataSerializableFixedID;
import com.gemstone.gemfire.internal.HeapDataOutputStream;
import com.gemstone.gemfire.internal.HeapDataOutputStream.LongUpdater;
import com.gemstone.gemfire.internal.Version;
/**
* This is used as a wrapper over all the results of PR which are of non
* distinct type
*
* @author asif
*
*/
public class CumulativeNonDistinctResults<E> implements SelectResults<E>,
DataSerializableFixedID {
private CollectionType collectionType;
private Collection<E> data;
public CumulativeNonDistinctResults() {
}
public CumulativeNonDistinctResults(
Collection<? extends Collection<E>> results, int limit,
ObjectType elementType, List<Metadata> collectionsMetadata) {
this.collectionType = new CollectionTypeImpl(
CumulativeNonDistinctResults.class, elementType);
this.data = new CumulativeNonDistinctResultsCollection(results, limit,
collectionsMetadata);
}
@Override
public int size() {
return this.data.size();
}
@Override
public boolean isEmpty() {
return this.data.isEmpty();
}
@Override
public boolean contains(Object o) {
return this.data.contains(o);
}
@Override
public Iterator<E> iterator() {
return this.data.iterator();
}
@Override
public Object[] toArray() {
return this.data.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return this.data.toArray(a);
}
@Override
public boolean add(E e) {
throw new UnsupportedOperationException(
"Addition to collection not supported");
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException(
"Removal from collection not supported");
}
@Override
public boolean containsAll(Collection<?> c) {
return this.data.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends E> c) {
throw new UnsupportedOperationException(
"Addition to collection not supported");
}
@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException(
"Removal from collection not supported");
}
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException(
"Removal from collection not supported");
}
@Override
public void clear() {
throw new UnsupportedOperationException(
"Removal from collection not supported");
}
@Override
public boolean isModifiable() {
return false;
}
@Override
public int occurrences(E element) {
// expensive!!
int count = 0;
for (Iterator<E> itr = this.iterator()/* this.base.iterator() */; itr
.hasNext();) {
E v = itr.next();
if (element == null ? v == null : element.equals(v)) {
count++;
}
}
return count;
}
@Override
public Set<E> asSet() {
return new HashSet<E>(this);
}
@Override
public List<E> asList() {
return new ArrayList<E>(this);
}
@Override
public CollectionType getCollectionType() {
return this.collectionType;
}
@Override
public void setElementType(ObjectType elementType) {
throw new UnsupportedOperationException(" not supported");
}
private class CumulativeNonDistinctResultsCollection extends
AbstractCollection<E> {
private final Collection<? extends Collection<E>> results;
private final List<Metadata> collectionsMetdata;
private final int limit;
public CumulativeNonDistinctResultsCollection(
Collection<? extends Collection<E>> results, int limit,
List<Metadata> collectionsMetadata) {
this.results = results;
this.limit = limit;
this.collectionsMetdata = collectionsMetadata;
}
@Override
public int size() {
int totalSize = 0;
for (Collection<E> result : this.results) {
totalSize += result.size();
}
if (this.limit >= 0) {
return totalSize > this.limit ? this.limit : totalSize;
} else {
return totalSize;
}
}
/*
* @Override public boolean isEmpty() { boolean isEmpty = true; for
* (SelectResults<E> result : this.sortedResults) { isEmpty =
* result.isEmpty(); if (!isEmpty) { break; } } return isEmpty; }
*/
@Override
public Iterator<E> iterator() {
Iterator<E> iter = new CumulativeCollectionIterator();
if (this.limit > -1) {
iter = new LimitIterator<E>(iter, this.limit);
}
return iter;
}
private class CumulativeCollectionIterator implements Iterator<E> {
protected final Iterator<E>[] iterators;
protected int currentIterator = 0;
private Boolean cachedHasNext = null;
final private boolean isStruct;
private final boolean[] objectChangedMarker = new boolean[1];
protected CumulativeCollectionIterator() {
this.iterators = new Iterator[results.size()];
Iterator<? extends Collection<E>> listIter = results.iterator();
int index = 0;
while (listIter.hasNext()) {
Iterator<E> temp = (Iterator<E>) listIter.next().iterator();
this.iterators[index++] = temp;
}
this.isStruct = collectionType.getElementType().isStructType();
}
@Override
public boolean hasNext() {
if (this.cachedHasNext != null) {
return this.cachedHasNext.booleanValue();
}
boolean hasNext = false;
for (int i = currentIterator; i < this.iterators.length; ++i) {
if (this.iterators[i].hasNext()) {
hasNext = true;
this.currentIterator = i;
break;
}
}
this.cachedHasNext = Boolean.valueOf(hasNext);
return hasNext;
}
@SuppressWarnings("unchecked")
@Override
public E next() {
if (this.cachedHasNext == null) {
this.hasNext();
}
this.cachedHasNext = null;
Metadata metadata = collectionsMetdata.get(this.currentIterator);
E original = this.iterators[this.currentIterator].next();
Object e = PDXUtils.convertPDX(original, isStruct,
metadata.getDomainObjectForPdx, metadata.getDeserializedObject,
metadata.localResults, objectChangedMarker, false);
if(isStruct) {
if(objectChangedMarker[0]) {
return (E)new StructImpl((StructTypeImpl)collectionType.getElementType(), (Object[])e);
}else {
return original;
}
}else {
return (E)e;
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("remove not supported");
}
}
}
@Override
public Version[] getSerializationVersions() {
return null;
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
ObjectType elementType = (ObjectType) DataSerializer.readObject(in);
this.collectionType = new CollectionTypeImpl(
CumulativeNonDistinctResults.class, elementType);
boolean isStruct = elementType.isStructType();
long size = in.readLong();
this.data = new ArrayList<E>((int) size);
long numLeft = size;
while (numLeft > 0) {
if (isStruct) {
Object[] fields = DataSerializer.readObjectArray(in);
this.data.add((E) new StructImpl((StructTypeImpl) elementType, fields));
} else {
E element = DataSerializer.readObject(in);
this.data.add(element);
}
--numLeft;
}
}
public int getDSFID() {
return CUMULATIVE_RESULTS;
}
// TODO : optimize for struct elements , by directly writing the fields
// instead
// of struct
@Override
public void toData(DataOutput out) throws IOException {
boolean isStruct = this.collectionType.getElementType().isStructType();
DataSerializer.writeObject(this.collectionType.getElementType(), out);
HeapDataOutputStream hdos = new HeapDataOutputStream(1024, null);
LongUpdater lu = hdos.reserveLong();
Iterator<E> iter = this.iterator();
int numElements = 0;
while (iter.hasNext()) {
E data = iter.next();
if (isStruct) {
Object[] fields = ((Struct) data).getFieldValues();
DataSerializer.writeObjectArray(fields, out);
} else {
DataSerializer.writeObject(data, hdos);
}
++numElements;
}
lu.update(numElements);
hdos.sendTo(out);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder("CumulativeNonDistinctResults::");
builder.append('[');
Iterator<E> iter = this.iterator();
while (iter.hasNext()) {
builder.append(iter.next()).append(',');
}
builder.deleteCharAt(builder.length() - 1);
builder.append(']');
return builder.toString();
}
public static class Metadata {
final boolean getDomainObjectForPdx;
final boolean getDeserializedObject;
final boolean localResults;
private Metadata(boolean getDomainObjectForPdx,
boolean getDeserializedObject, boolean localResults) {
this.getDomainObjectForPdx = getDomainObjectForPdx;
this.getDeserializedObject = getDeserializedObject;
this.localResults = localResults;
}
}
public static Metadata getCollectionMetadata(boolean getDomainObjectForPdx,
boolean getDeserializedObject, boolean localResults) {
return new Metadata(getDomainObjectForPdx, getDeserializedObject,
localResults);
}
}