Fixed union iterator duplicate issue. Need to refactor cursor logic
diff --git a/stack/core/src/main/java/org/usergrid/persistence/query/ir/result/UnionIterator.java b/stack/core/src/main/java/org/usergrid/persistence/query/ir/result/UnionIterator.java
index 561e337..ec5d1a6 100644
--- a/stack/core/src/main/java/org/usergrid/persistence/query/ir/result/UnionIterator.java
+++ b/stack/core/src/main/java/org/usergrid/persistence/query/ir/result/UnionIterator.java
@@ -16,14 +16,17 @@
package org.usergrid.persistence.query.ir.result;
-import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.usergrid.persistence.cassandra.CursorCache;
-
-import com.google.common.collect.Sets;
+import org.usergrid.utils.UUIDUtils;
/**
@@ -33,10 +36,11 @@
*/
public class UnionIterator extends MultiIterator {
- /** results that were left from our previous union. These are kept and returned before advancing iterators */
- private Set<ScanColumn> remainderResults;
+ private static final ScanColumnComparator COMP = new ScanColumnComparator();
- private int currentIndex = -1;
+
+ private SortedColumnList list;
+
/**
@@ -44,6 +48,7 @@
*/
public UnionIterator( int pageSize ) {
super( pageSize );
+ list = new SortedColumnList( pageSize );
}
@@ -61,60 +66,28 @@
return null;
}
- Set<ScanColumn> resultSet = null;
- if ( remainderResults != null ) {
- resultSet = remainderResults;
- remainderResults = null;
- }
- else {
- resultSet = new LinkedHashSet<ScanColumn>();
- }
+ list.clear();
- /**
- * We have results from a previous merge
- */
-
-
- int complete = 0;
-
- while ( resultSet.size() < pageSize && complete < size ) {
-
- currentIndex = ( currentIndex + 1 ) % iterators.size();
-
- ResultIterator itr = iterators.get( currentIndex );
-
- if ( !itr.hasNext() ) {
- complete++;
- continue;
- }
-
- resultSet = Sets.union( resultSet, itr.next() );
- }
-
- // now check if we need to split our results if they went over the page size
- if ( resultSet.size() > pageSize ) {
- Set<ScanColumn> returnSet = new LinkedHashSet<ScanColumn>( pageSize );
-
- Iterator<ScanColumn> itr = resultSet.iterator();
-
- for ( int i = 0; i < pageSize && itr.hasNext(); i++ ) {
- returnSet.add( itr.next() );
- }
-
- remainderResults = new LinkedHashSet<ScanColumn>( pageSize );
+ for ( ResultIterator itr : iterators ) {
while ( itr.hasNext() ) {
- remainderResults.add( itr.next() );
+ list.addAll( itr.next() );
}
- resultSet = returnSet;
+ itr.reset();
}
- return resultSet.size() > 0 ? resultSet: null;
+ //mark us for the next page
+ list.mark();
+
+
+ return list.asSet();
}
+
+
/*
* (non-Javadoc)
*
@@ -124,9 +97,144 @@
*/
@Override
public void finalizeCursor( CursorCache cache, UUID lastLoaded ) {
- //we can create a cursor for every iterator in our union
- for ( ResultIterator current : iterators ) {
- current.finalizeCursor( cache, lastLoaded );
+
+ //get our scan column and put them in the cache
+ //we finalize the cursor of the min
+ }
+
+
+ /**
+ * A Sorted Set with a max size. When a new entry is added, the max is removed. You can mark the next
+ * "min" by calling the mark method. Values > min are accepted. Values > min and that are over size are
+ * discarded
+ */
+ public static final class SortedColumnList {
+
+ private static final ScanColumnComparator COMP = new ScanColumnComparator();
+
+ private final int maxSize;
+
+ private final List<ScanColumn> list;
+
+
+ private ScanColumn min;
+
+
+ public SortedColumnList( int maxSize ) {
+ //we need to allocate the extra space if required
+ this.list = new ArrayList<ScanColumn>(maxSize);
+ this.maxSize = maxSize;
+ }
+
+
+ /**
+ * Add the column to this list
+ * @param col
+ */
+ public void add( ScanColumn col ) {
+ //less than our min, don't add
+ if(COMP.compare( min, col ) >= 0){
+ return;
+ }
+
+ int index = Collections.binarySearch( this.list, col, COMP );
+
+ //already present
+ if(index > -1){
+ return ;
+ }
+
+ index = (index * -1) - 1;
+
+ //outside the renage
+ if(index >= maxSize){
+ return;
+ }
+
+ this.list.add( index, col );
+
+ final int size = this.list.size();
+
+ if(size > maxSize){
+ this.list.subList( maxSize, size ).clear();
+ }
+
+ }
+
+
+ /**
+ * Add all the elements to this list
+ * @param cols
+ */
+ public void addAll( final Collection<? extends ScanColumn> cols ) {
+ for(ScanColumn col:cols){
+ add(col);
+ }
+ }
+
+
+ /**
+ * Returns a new list. If no elements are present, returns null
+ * @return
+ */
+ public Set<ScanColumn> asSet(){
+ if(this.list.size() == 0){
+ return null;
+ }
+
+ return new LinkedHashSet<ScanColumn>(this.list);
+ }
+
+
+ /**
+ * Mark our last element in the tree as the max
+ */
+ public void mark() {
+
+ final int size = this.list.size();
+
+ //we don't have any elements in the list, and we've never set a min
+ if(size == 0){
+ return;
+ }
+
+ min = this.list.get( size -1);
+ }
+
+
+ /**
+ * Clear the list
+ */
+ public void clear(){
+ this.list.clear();
+ }
+
+
+
+
+ }
+
+
+ /**
+ * Simple comparator for comparing scan columns. Orders them by time uuid
+ */
+ private static class ScanColumnComparator implements Comparator<ScanColumn> {
+
+ @Override
+ public int compare( final ScanColumn o1, final ScanColumn o2 ) {
+ if ( o1 == null ) {
+ if ( o2 == null ) {
+ return 0;
+ }
+
+ return -1;
+ }
+
+ else if ( o2 == null ) {
+ return 1;
+ }
+
+ return UUIDUtils.compare( o1.getUUID(), o2.getUUID() );
}
}
}
diff --git a/stack/core/src/main/java/org/usergrid/persistence/query/util/PeekingIterator.java b/stack/core/src/main/java/org/usergrid/persistence/query/util/PeekingIterator.java
deleted file mode 100644
index 3ff85ee..0000000
--- a/stack/core/src/main/java/org/usergrid/persistence/query/util/PeekingIterator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.usergrid.persistence.query.util;
-
-
-import java.util.Iterator;
-
-
-/**
- * A simple iterator that allows us to "peek" to the next value without actually popping it.
- * <p/>
- * Meant as a wrapper to an existing iterator
- */
-public class PeekingIterator<T> implements Iterable<T>, Iterator<T> {
-
- private Iterator<T> delegate;
- private T peeked;
-
-
- public PeekingIterator( Iterator<T> delegate ) {
- this.delegate = delegate;
- }
-
-
- @Override
- public Iterator<T> iterator() {
- return this;
- }
-
-
- @Override
- public boolean hasNext() {
- return peeked != null || delegate.hasNext();
- }
-
-
- @Override
- public T next() {
- T toReturn = null;
-
- if ( peeked != null ) {
- toReturn = peeked;
- peeked = null;
- }
- else {
- toReturn = delegate.next();
- }
-
- return toReturn;
- }
-
-
- /** Peek ahead in the iterator. Assumes a next is present and has been checked */
- public T peek() {
- if ( peeked == null && delegate.hasNext() ) {
- peeked = delegate.next();
- }
- return peeked;
- }
-
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException( "Remove is unsupported" );
- }
-}
diff --git a/stack/core/src/test/java/org/usergrid/persistence/query/ir/result/UnionIteratorTest.java b/stack/core/src/test/java/org/usergrid/persistence/query/ir/result/UnionIteratorTest.java
index c7eb88e..79fc8af 100644
--- a/stack/core/src/test/java/org/usergrid/persistence/query/ir/result/UnionIteratorTest.java
+++ b/stack/core/src/test/java/org/usergrid/persistence/query/ir/result/UnionIteratorTest.java
@@ -17,13 +17,13 @@
import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.Set;
import java.util.UUID;
import org.junit.Test;
import org.usergrid.utils.UUIDUtils;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -66,21 +66,22 @@
second.add( id10 );
InOrderIterator third = new InOrderIterator( 100 );
+ third.add( id6 );
+ third.add( id7 );
third.add( id1 );
third.add( id3 );
third.add( id5 );
- third.add( id6 );
- third.add( id7 );
third.add( id8 );
InOrderIterator fourth = new InOrderIterator( 100 );
fourth.add( id1 );
+ fourth.add( id6 );
fourth.add( id2 );
fourth.add( id3 );
- fourth.add( id6 );
fourth.add( id8 );
fourth.add( id9 );
+
UnionIterator iter = new UnionIterator( 100 );
iter.addIterator( first );
iter.addIterator( second );
@@ -183,17 +184,17 @@
int firstIntersection = 100;
int secondIntersection = 200;
- int pageSize = 10;
+ int pageSize = 20;
UUID[] firstSet = new UUID[size];
UUID[] secondSet = new UUID[size];
UUID[] thirdSet = new UUID[size];
- InOrderIterator first = new InOrderIterator( pageSize/2 );
- InOrderIterator second = new InOrderIterator( pageSize/2 );
- InOrderIterator third = new InOrderIterator( pageSize/2 );
+ InOrderIterator first = new InOrderIterator( pageSize / 2 );
+ InOrderIterator second = new InOrderIterator( pageSize / 2 );
+ InOrderIterator third = new InOrderIterator( pageSize / 2 );
- Set<UUID> results = new HashSet<UUID>( size );
+ Set<UUID> results = new HashSet<UUID>( size );
for ( int i = 0; i < size; i++ ) {
firstSet[i] = UUIDUtils.newTimeUUID();
@@ -242,11 +243,11 @@
for ( ScanColumn col : resultSet ) {
boolean existed = results.remove( col.getUUID() );
- assertTrue("Duplicate element was detected", existed);
+ assertTrue( "Duplicate element was detected", existed );
}
}
- assertTrue( results.isEmpty() );
+ assertEquals( 0, results.size() );
assertFalse( union.hasNext() );
}
@@ -290,9 +291,7 @@
//now try to get the next page
ids = union.next();
- assertNull(ids);
-
-
+ assertNull( ids );
}