blob: 83d844b857037d982978684f99ee90be5c981ae3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.internal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.query.internal.types.CollectionTypeImpl;
import org.apache.geode.cache.query.internal.types.StructTypeImpl;
import org.apache.geode.cache.query.internal.utils.LimitIterator;
import org.apache.geode.cache.query.types.CollectionType;
import org.apache.geode.cache.query.types.ObjectType;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.serialization.BufferDataOutputStream.LongUpdater;
import org.apache.geode.internal.serialization.DataSerializableFixedID;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.Version;
/**
* The n - way merge results returns a sorted results on the cumulative sorted results for
* partitioned region based query
*
*
*/
public class NWayMergeResults<E> implements SelectResults<E>, Ordered, DataSerializableFixedID {
private CollectionType collectionType;
private Collection<E> data;
private boolean isDistinct;
public NWayMergeResults() {}
public NWayMergeResults(Collection<? extends Collection<E>> sortedResults, boolean isDistinct,
int limit, List<CompiledSortCriterion> orderByAttribs, ExecutionContext context,
ObjectType elementType) {
this.isDistinct = isDistinct;
this.collectionType = new CollectionTypeImpl(Ordered.class, elementType);
this.data = new NWayMergeResultsCollection(sortedResults, limit, orderByAttribs, context);
}
@Override
public int size() {
return this.data.size();
}
@Override
public boolean isEmpty() {
return this.data.isEmpty();
}
@Override
public boolean contains(Object o) {
return this.data.contains(o);
}
@Override
public Iterator<E> iterator() {
return this.data.iterator();
}
@Override
public Object[] toArray() {
return this.data.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return this.data.toArray(a);
}
@Override
public boolean add(E e) {
throw new UnsupportedOperationException("Addition to collection not supported");
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException("Removal from collection not supported");
}
@Override
public boolean containsAll(Collection<?> c) {
return this.data.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends E> c) {
throw new UnsupportedOperationException("Addition to collection not supported");
}
@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException("Removal from collection not supported");
}
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException("Removal from collection not supported");
}
@Override
public void clear() {
throw new UnsupportedOperationException("Removal from collection not supported");
}
@Override
public boolean isModifiable() {
return false;
}
@Override
public int occurrences(E element) {
if (this.isDistinct) {
return this.data.contains(element) ? 1 : 0;
}
// expensive!!
int count = 0;
for (Iterator<E> itr = this.iterator()/* this.base.iterator() */; itr.hasNext();) {
E v = itr.next();
if (element == null ? v == null : element.equals(v)) {
count++;
}
}
return count;
}
@Override
public Set<E> asSet() {
return new HashSet<E>(this);
}
@Override
public List<E> asList() {
return new ArrayList<E>(this);
}
@Override
public CollectionType getCollectionType() {
return this.collectionType;
}
@Override
public void setElementType(ObjectType elementType) {
throw new UnsupportedOperationException(" not supported");
}
private class NWayMergeResultsCollection extends AbstractCollection<E> {
private final Collection<? extends Collection<E>> sortedResults;
private final OrderByComparator comparator;
private final int limit;
public NWayMergeResultsCollection(Collection<? extends Collection<E>> sortedResults, int limit,
List<CompiledSortCriterion> orderByAttribs, ExecutionContext context) {
this.sortedResults = sortedResults;
this.limit = limit;
this.comparator =
new OrderByComparator(orderByAttribs, collectionType.getElementType(), context);
}
@Override
public int size() {
if (isDistinct) {
Iterator<E> iter = this.iterator();
int count = 0;
while (iter.hasNext()) {
++count;
iter.next();
}
return count;
} else {
int totalSize = 0;
for (Collection<E> result : this.sortedResults) {
totalSize += result.size();
}
if (this.limit >= 0) {
return totalSize > this.limit ? this.limit : totalSize;
} else {
return totalSize;
}
}
}
@Override
public Iterator<E> iterator() {
Iterator<E> iter;
if (isDistinct) {
iter = new NWayMergeDistinctIterator();
} else {
iter = new NWayMergeIterator();
}
if (this.limit > -1) {
iter = new LimitIterator<E>(iter, this.limit);
}
return iter;
}
private class NWayMergeIterator implements Iterator<E> {
protected final IteratorWrapper<E>[] iterators;
protected int lastReturnedIteratorIndex = -1;
protected NWayMergeIterator() {
this.iterators = new IteratorWrapper[sortedResults.size()];
Iterator<? extends Collection<E>> listIter = sortedResults.iterator();
int index = 0;
while (listIter.hasNext()) {
IteratorWrapper<E> temp = new IteratorWrapper<E>(listIter.next().iterator());
this.iterators[index++] = temp;
// initialize
temp.move();
}
}
@Override
public boolean hasNext() {
boolean hasNext = false;
for (int i = 0; i < this.iterators.length; ++i) {
if (i == this.lastReturnedIteratorIndex) {
hasNext = this.iterators[i].hasNext();
} else {
hasNext = !this.iterators[i].EOF;
}
if (hasNext) {
break;
}
}
return hasNext;
}
protected E basicNext() {
if (this.iterators.length == 1) {
this.lastReturnedIteratorIndex = 0;
if (iterators[0].EOF) {
throw new NoSuchElementException();
}
return iterators[0].get();
}
int iteratorIndex = -1;
E refObject = null;
for (int j = 0; j < this.iterators.length; ++j) {
if (!this.iterators[j].EOF) {
E temp = this.iterators[j].get();
iteratorIndex = j;
refObject = temp;
break;
}
}
if (iteratorIndex == -1) {
throw new NoSuchElementException();
}
E currentOptima = null;
int indexOfIteratorForOptima = -1;
currentOptima = refObject;
indexOfIteratorForOptima = iteratorIndex;
for (int j = iteratorIndex + 1; j < this.iterators.length; ++j) {
if (this.iterators[j].EOF) {
continue;
}
E temp = this.iterators[j].get();
int compareResult = compare(currentOptima, temp);
if (compareResult > 0) {
currentOptima = temp;
indexOfIteratorForOptima = j;
}
}
this.lastReturnedIteratorIndex = indexOfIteratorForOptima;
return currentOptima;
}
protected int compare(E obj1, E obj2) {
return collectionType.getElementType().isStructType() ? comparator
.compare(((StructImpl) obj1).getFieldValues(), ((StructImpl) obj2).getFieldValues())
: comparator.compare(obj1, obj2);
}
@Override
public E next() {
if (this.lastReturnedIteratorIndex != -1) {
iterators[this.lastReturnedIteratorIndex].move();
}
return this.basicNext();
}
@Override
public void remove() {
throw new UnsupportedOperationException("remove not supported");
}
private class IteratorWrapper<T> {
private final Iterator<T> iter;
private T current = null;
private boolean EOF = false;
private IteratorWrapper(Iterator<T> iter) {
this.iter = iter;
}
T get() {
return this.current;
}
boolean hasNext() {
return this.iter.hasNext();
}
void move() {
if (this.iter.hasNext()) {
this.current = this.iter.next();
} else {
this.current = null;
this.EOF = true;
}
}
}
}
private class NWayMergeDistinctIterator extends NWayMergeIterator {
private E lastReturned = null;
private Boolean cachedHasNext = null;
private boolean uninitialized = true;
NWayMergeDistinctIterator() {}
@Override
public boolean hasNext() {
if (this.cachedHasNext != null) {
return this.cachedHasNext.booleanValue();
}
boolean hasNext = false;
for (int i = 0; i < this.iterators.length; ++i) {
if (this.uninitialized) {
hasNext = !this.iterators[i].EOF;
if (hasNext) {
break;
}
} else {
if (this.lastReturnedIteratorIndex == i) {
do {
this.iterators[i].move();
if (this.iterators[i].EOF) {
break;
} // else if (!this.lastReturned.equals(this.iterators[i].get()))
// {
else if (compare(lastReturned, this.iterators[i].get()) != 0) {
hasNext = true;
break;
}
} while (true);
} else {
do {
if (this.iterators[i].EOF) {
break;
} // else if
// (!this.iterators[i].get().equals(this.lastReturned)) {
else if (compare(this.iterators[i].get(), this.lastReturned) != 0) {
hasNext = true;
break;
} else {
this.iterators[i].move();
}
} while (true);
}
}
}
this.uninitialized = false;
this.cachedHasNext = Boolean.valueOf(hasNext);
return hasNext;
}
@Override
public E next() {
if (this.cachedHasNext == null) {
this.hasNext();
}
this.cachedHasNext = null;
this.lastReturned = this.basicNext();
return this.lastReturned;
}
@Override
public void remove() {
super.remove();
}
}
}
@Override
public Version[] getSerializationVersions() {
return null;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
ObjectType elementType = (ObjectType) context.getDeserializer().readObject(in);
this.collectionType = new CollectionTypeImpl(NWayMergeResults.class, elementType);
boolean isStruct = elementType.isStructType();
this.isDistinct = DataSerializer.readPrimitiveBoolean(in);
long size = in.readLong();
this.data = new ArrayList<E>((int) size);
long numLeft = size;
while (numLeft > 0) {
if (isStruct) {
Object[] fields = DataSerializer.readObjectArray(in);
this.data.add((E) new StructImpl((StructTypeImpl) elementType, fields));
} else {
E element = context.getDeserializer().readObject(in);
this.data.add(element);
}
--numLeft;
}
}
@Override
public int getDSFID() {
return NWAY_MERGE_RESULTS;
}
// TODO : optimize for struct elements , by directly writing the fields
// instead
// of struct
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
boolean isStruct = this.collectionType.getElementType().isStructType();
context.getSerializer().writeObject(this.collectionType.getElementType(), out);
DataSerializer.writePrimitiveBoolean(this.isDistinct, out);
HeapDataOutputStream hdos = new HeapDataOutputStream(1024, null);
LongUpdater lu = hdos.reserveLong();
Iterator<E> iter = this.iterator();
int numElements = 0;
while (iter.hasNext()) {
E data = iter.next();
if (isStruct) {
Object[] fields = ((Struct) data).getFieldValues();
DataSerializer.writeObjectArray(fields, out);
} else {
context.getSerializer().writeObject(data, hdos);
}
++numElements;
}
lu.update(numElements);
hdos.sendTo(out);
}
@Override
public String toString() {
StringBuilder builder =
new StringBuilder("NWayMergeResults:: isDistinct=" + this.isDistinct).append(":");
builder.append('[');
Iterator<E> iter = this.iterator();
while (iter.hasNext()) {
builder.append(iter.next()).append(',');
}
builder.deleteCharAt(builder.length() - 1);
builder.append(']');
return builder.toString();
}
@Override
public Comparator comparator() {
if (this.data instanceof NWayMergeResults.NWayMergeResultsCollection) {
return ((NWayMergeResultsCollection) this.data).comparator;
} else {
return null;
}
}
@Override
public boolean dataPreordered() {
if (this.data instanceof NWayMergeResults.NWayMergeResultsCollection) {
return false;
} else {
return true;
}
}
}