blob: fc65a63af05b657b0c70d109e07baf6efd7974b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.internal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.query.internal.types.CollectionTypeImpl;
import org.apache.geode.cache.query.internal.types.StructTypeImpl;
import org.apache.geode.cache.query.internal.utils.LimitIterator;
import org.apache.geode.cache.query.internal.utils.PDXUtils;
import org.apache.geode.cache.query.types.CollectionType;
import org.apache.geode.cache.query.types.ObjectType;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.HeapDataOutputStream.LongUpdater;
import org.apache.geode.internal.Version;
/**
* This is used as a wrapper over all the results of PR which are of non distinct type
*
*
*/
public class CumulativeNonDistinctResults<E> implements SelectResults<E>, DataSerializableFixedID {
private CollectionType collectionType;
private Collection<E> data;
public CumulativeNonDistinctResults() {}
public CumulativeNonDistinctResults(Collection<? extends Collection<E>> results, int limit,
ObjectType elementType, List<Metadata> collectionsMetadata) {
this.collectionType = new CollectionTypeImpl(CumulativeNonDistinctResults.class, elementType);
this.data = new CumulativeNonDistinctResultsCollection(results, limit, collectionsMetadata);
}
@Override
public int size() {
return this.data.size();
}
@Override
public boolean isEmpty() {
return this.data.isEmpty();
}
@Override
public boolean contains(Object o) {
return this.data.contains(o);
}
@Override
public Iterator<E> iterator() {
return this.data.iterator();
}
@Override
public Object[] toArray() {
return this.data.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return this.data.toArray(a);
}
@Override
public boolean add(E e) {
throw new UnsupportedOperationException("Addition to collection not supported");
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException("Removal from collection not supported");
}
@Override
public boolean containsAll(Collection<?> c) {
return this.data.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends E> c) {
throw new UnsupportedOperationException("Addition to collection not supported");
}
@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException("Removal from collection not supported");
}
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException("Removal from collection not supported");
}
@Override
public void clear() {
throw new UnsupportedOperationException("Removal from collection not supported");
}
@Override
public boolean isModifiable() {
return false;
}
@Override
public int occurrences(E element) {
// expensive!!
int count = 0;
for (Iterator<E> itr = this.iterator()/* this.base.iterator() */; itr.hasNext();) {
E v = itr.next();
if (element == null ? v == null : element.equals(v)) {
count++;
}
}
return count;
}
@Override
public Set<E> asSet() {
return new HashSet<E>(this);
}
@Override
public List<E> asList() {
return new ArrayList<E>(this);
}
@Override
public CollectionType getCollectionType() {
return this.collectionType;
}
@Override
public void setElementType(ObjectType elementType) {
throw new UnsupportedOperationException(" not supported");
}
private class CumulativeNonDistinctResultsCollection extends AbstractCollection<E> {
private final Collection<? extends Collection<E>> results;
private final List<Metadata> collectionsMetdata;
private final int limit;
public CumulativeNonDistinctResultsCollection(Collection<? extends Collection<E>> results,
int limit, List<Metadata> collectionsMetadata) {
this.results = results;
this.limit = limit;
this.collectionsMetdata = collectionsMetadata;
}
@Override
public int size() {
int totalSize = 0;
for (Collection<E> result : this.results) {
totalSize += result.size();
}
if (this.limit >= 0) {
return totalSize > this.limit ? this.limit : totalSize;
} else {
return totalSize;
}
}
@Override
public Iterator<E> iterator() {
Iterator<E> iter = new CumulativeCollectionIterator();
if (this.limit > -1) {
iter = new LimitIterator<E>(iter, this.limit);
}
return iter;
}
private class CumulativeCollectionIterator implements Iterator<E> {
protected final Iterator<E>[] iterators;
protected int currentIterator = 0;
private Boolean cachedHasNext = null;
private final boolean isStruct;
private final boolean[] objectChangedMarker = new boolean[1];
protected CumulativeCollectionIterator() {
this.iterators = new Iterator[results.size()];
Iterator<? extends Collection<E>> listIter = results.iterator();
int index = 0;
while (listIter.hasNext()) {
Iterator<E> temp = (Iterator<E>) listIter.next().iterator();
this.iterators[index++] = temp;
}
this.isStruct = collectionType.getElementType().isStructType();
}
@Override
public boolean hasNext() {
if (this.cachedHasNext != null) {
return this.cachedHasNext.booleanValue();
}
boolean hasNext = false;
for (int i = currentIterator; i < this.iterators.length; ++i) {
if (this.iterators[i].hasNext()) {
hasNext = true;
this.currentIterator = i;
break;
}
}
this.cachedHasNext = Boolean.valueOf(hasNext);
return hasNext;
}
@SuppressWarnings("unchecked")
@Override
public E next() {
if (this.cachedHasNext == null) {
this.hasNext();
}
this.cachedHasNext = null;
Metadata metadata = collectionsMetdata.get(this.currentIterator);
E original = this.iterators[this.currentIterator].next();
Object e = PDXUtils.convertPDX(original, isStruct, metadata.getDomainObjectForPdx,
metadata.getDeserializedObject, metadata.localResults, objectChangedMarker, false);
if (isStruct) {
if (objectChangedMarker[0]) {
return (E) new StructImpl((StructTypeImpl) collectionType.getElementType(),
(Object[]) e);
} else {
return original;
}
} else {
return (E) e;
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("remove not supported");
}
}
}
@Override
public Version[] getSerializationVersions() {
return null;
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
ObjectType elementType = (ObjectType) DataSerializer.readObject(in);
this.collectionType = new CollectionTypeImpl(CumulativeNonDistinctResults.class, elementType);
boolean isStruct = elementType.isStructType();
long size = in.readLong();
this.data = new ArrayList<E>((int) size);
long numLeft = size;
while (numLeft > 0) {
if (isStruct) {
Object[] fields = DataSerializer.readObjectArray(in);
this.data.add((E) new StructImpl((StructTypeImpl) elementType, fields));
} else {
E element = DataSerializer.readObject(in);
this.data.add(element);
}
--numLeft;
}
}
@Override
public int getDSFID() {
return CUMULATIVE_RESULTS;
}
// TODO : optimize for struct elements , by directly writing the fields
// instead
// of struct
@Override
public void toData(DataOutput out) throws IOException {
boolean isStruct = this.collectionType.getElementType().isStructType();
DataSerializer.writeObject(this.collectionType.getElementType(), out);
HeapDataOutputStream hdos = new HeapDataOutputStream(1024, null);
LongUpdater lu = hdos.reserveLong();
Iterator<E> iter = this.iterator();
int numElements = 0;
while (iter.hasNext()) {
E data = iter.next();
if (isStruct) {
Object[] fields = ((Struct) data).getFieldValues();
DataSerializer.writeObjectArray(fields, out);
} else {
DataSerializer.writeObject(data, hdos);
}
++numElements;
}
lu.update(numElements);
hdos.sendTo(out);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder("CumulativeNonDistinctResults::");
builder.append('[');
Iterator<E> iter = this.iterator();
while (iter.hasNext()) {
builder.append(iter.next()).append(',');
}
builder.deleteCharAt(builder.length() - 1);
builder.append(']');
return builder.toString();
}
public static class Metadata {
final boolean getDomainObjectForPdx;
final boolean getDeserializedObject;
final boolean localResults;
private Metadata(boolean getDomainObjectForPdx, boolean getDeserializedObject,
boolean localResults) {
this.getDomainObjectForPdx = getDomainObjectForPdx;
this.getDeserializedObject = getDeserializedObject;
this.localResults = localResults;
}
}
public static Metadata getCollectionMetadata(boolean getDomainObjectForPdx,
boolean getDeserializedObject, boolean localResults) {
return new Metadata(getDomainObjectForPdx, getDeserializedObject, localResults);
}
}