Fixed union iterator duplicate issue.  Need to refactor cursor logic
diff --git a/stack/core/src/main/java/org/usergrid/persistence/query/ir/result/UnionIterator.java b/stack/core/src/main/java/org/usergrid/persistence/query/ir/result/UnionIterator.java
index 561e337..ec5d1a6 100644
--- a/stack/core/src/main/java/org/usergrid/persistence/query/ir/result/UnionIterator.java
+++ b/stack/core/src/main/java/org/usergrid/persistence/query/ir/result/UnionIterator.java
@@ -16,14 +16,17 @@
 package org.usergrid.persistence.query.ir.result;
 
 
-import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
 import org.usergrid.persistence.cassandra.CursorCache;
-
-import com.google.common.collect.Sets;
+import org.usergrid.utils.UUIDUtils;
 
 
 /**
@@ -33,10 +36,11 @@
  */
 public class UnionIterator extends MultiIterator {
 
-    /** results that were left from our previous union. These are kept and returned before advancing iterators */
-    private Set<ScanColumn> remainderResults;
+    private static final ScanColumnComparator COMP = new ScanColumnComparator();
 
-    private int currentIndex = -1;
+
+     private SortedColumnList list;
+
 
 
     /**
@@ -44,6 +48,7 @@
      */
     public UnionIterator( int pageSize ) {
         super( pageSize );
+        list = new SortedColumnList( pageSize );
     }
 
 
@@ -61,60 +66,28 @@
             return null;
         }
 
-        Set<ScanColumn> resultSet = null;
 
-        if ( remainderResults != null ) {
-            resultSet = remainderResults;
-            remainderResults = null;
-        }
-        else {
-            resultSet = new LinkedHashSet<ScanColumn>();
-        }
+        list.clear();
 
-        /**
-         * We have results from a previous merge
-         */
-
-
-        int complete = 0;
-
-        while ( resultSet.size() < pageSize && complete < size ) {
-
-            currentIndex = ( currentIndex + 1 ) % iterators.size();
-
-            ResultIterator itr = iterators.get( currentIndex );
-
-            if ( !itr.hasNext() ) {
-                complete++;
-                continue;
-            }
-
-            resultSet = Sets.union( resultSet, itr.next() );
-        }
-
-        // now check if we need to split our results if they went over the page size
-        if ( resultSet.size() > pageSize ) {
-            Set<ScanColumn> returnSet = new LinkedHashSet<ScanColumn>( pageSize );
-
-            Iterator<ScanColumn> itr = resultSet.iterator();
-
-            for ( int i = 0; i < pageSize && itr.hasNext(); i++ ) {
-                returnSet.add( itr.next() );
-            }
-
-            remainderResults = new LinkedHashSet<ScanColumn>( pageSize );
+        for ( ResultIterator itr : iterators ) {
 
             while ( itr.hasNext() ) {
-                remainderResults.add( itr.next() );
+                list.addAll( itr.next() );
             }
 
-            resultSet = returnSet;
+            itr.reset();
         }
 
-        return resultSet.size() > 0 ? resultSet: null;
+        //mark us for the next page
+        list.mark();
+
+
+        return list.asSet();
     }
 
 
+
+
     /*
      * (non-Javadoc)
      *
@@ -124,9 +97,144 @@
      */
     @Override
     public void finalizeCursor( CursorCache cache, UUID lastLoaded ) {
-        //we can create a cursor for every iterator in our union
-        for ( ResultIterator current : iterators ) {
-            current.finalizeCursor( cache, lastLoaded );
+
+        //get our scan column and put them in the cache
+       //we finalize the cursor of the min
+    }
+
+
+    /**
+     * A Sorted Set with a max size. When a new entry is added, the max is removed.  You can mark the next
+     * "min" by calling the mark method.  Values > min are accepted.  Values > min and that are over size are
+     * discarded
+     */
+    public static final class SortedColumnList {
+
+        private static final ScanColumnComparator COMP = new ScanColumnComparator();
+
+        private final int maxSize;
+
+        private final List<ScanColumn> list;
+
+
+        private ScanColumn min;
+
+
+        public SortedColumnList( int maxSize ) {
+            //we need to allocate the extra space if required
+            this.list = new ArrayList<ScanColumn>(maxSize);
+            this.maxSize = maxSize;
+        }
+
+
+        /**
+         * Add the column to this list
+         * @param col
+         */
+        public void add( ScanColumn col ) {
+            //less than our min, don't add
+            if(COMP.compare( min, col ) >= 0){
+                return;
+            }
+
+            int index = Collections.binarySearch( this.list, col, COMP );
+
+            //already present
+            if(index > -1){
+                return ;
+            }
+
+            index = (index * -1) - 1;
+
+            //outside the renage
+            if(index >= maxSize){
+                return;
+            }
+
+            this.list.add( index, col );
+
+            final int size = this.list.size();
+
+            if(size > maxSize){
+                this.list.subList( maxSize, size ).clear();
+            }
+
+        }
+
+
+        /**
+         * Add all the elements to this list
+         * @param cols
+         */
+        public void addAll( final Collection<? extends ScanColumn> cols ) {
+            for(ScanColumn col:cols){
+               add(col);
+            }
+        }
+
+
+        /**
+         * Returns a new list.  If no elements are present, returns null
+         * @return
+         */
+        public Set<ScanColumn> asSet(){
+            if(this.list.size() == 0){
+                return null;
+            }
+
+            return new LinkedHashSet<ScanColumn>(this.list);
+        }
+
+
+        /**
+         * Mark our last element in the tree as the max
+         */
+        public void mark() {
+
+            final int size = this.list.size();
+
+            //we don't have any elements in the list, and we've never set a min
+            if(size == 0){
+                return;
+            }
+
+            min = this.list.get( size -1);
+        }
+
+
+        /**
+         * Clear the list
+         */
+        public void clear(){
+            this.list.clear();
+        }
+
+
+
+
+    }
+
+
+    /**
+     * Simple comparator for comparing scan columns.  Orders them by time uuid
+     */
+    private static class ScanColumnComparator implements Comparator<ScanColumn> {
+
+        @Override
+        public int compare( final ScanColumn o1, final ScanColumn o2 ) {
+            if ( o1 == null ) {
+                if ( o2 == null ) {
+                    return 0;
+                }
+
+                return -1;
+            }
+
+            else if ( o2 == null ) {
+                return 1;
+            }
+
+            return UUIDUtils.compare( o1.getUUID(), o2.getUUID() );
         }
     }
 }
diff --git a/stack/core/src/main/java/org/usergrid/persistence/query/util/PeekingIterator.java b/stack/core/src/main/java/org/usergrid/persistence/query/util/PeekingIterator.java
deleted file mode 100644
index 3ff85ee..0000000
--- a/stack/core/src/main/java/org/usergrid/persistence/query/util/PeekingIterator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.usergrid.persistence.query.util;
-
-
-import java.util.Iterator;
-
-
-/**
- * A simple iterator that allows us to "peek" to the next value without actually popping it.
- * <p/>
- * Meant as a wrapper to an existing iterator
- */
-public class PeekingIterator<T> implements Iterable<T>, Iterator<T> {
-
-    private Iterator<T> delegate;
-    private T peeked;
-
-
-    public PeekingIterator( Iterator<T> delegate ) {
-        this.delegate = delegate;
-    }
-
-
-    @Override
-    public Iterator<T> iterator() {
-        return this;
-    }
-
-
-    @Override
-    public boolean hasNext() {
-        return peeked != null || delegate.hasNext();
-    }
-
-
-    @Override
-    public T next() {
-        T toReturn = null;
-
-        if ( peeked != null ) {
-            toReturn = peeked;
-            peeked = null;
-        }
-        else {
-            toReturn = delegate.next();
-        }
-
-        return toReturn;
-    }
-
-
-    /** Peek ahead in the iterator.  Assumes a next is present and has been checked */
-    public T peek() {
-        if ( peeked == null && delegate.hasNext() ) {
-            peeked = delegate.next();
-        }
-        return peeked;
-    }
-
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException( "Remove is unsupported" );
-    }
-}
diff --git a/stack/core/src/test/java/org/usergrid/persistence/query/ir/result/UnionIteratorTest.java b/stack/core/src/test/java/org/usergrid/persistence/query/ir/result/UnionIteratorTest.java
index c7eb88e..79fc8af 100644
--- a/stack/core/src/test/java/org/usergrid/persistence/query/ir/result/UnionIteratorTest.java
+++ b/stack/core/src/test/java/org/usergrid/persistence/query/ir/result/UnionIteratorTest.java
@@ -17,13 +17,13 @@
 
 
 import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.Set;
 import java.util.UUID;
 
 import org.junit.Test;
 import org.usergrid.utils.UUIDUtils;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -66,21 +66,22 @@
         second.add( id10 );
 
         InOrderIterator third = new InOrderIterator( 100 );
+        third.add( id6 );
+        third.add( id7 );
         third.add( id1 );
         third.add( id3 );
         third.add( id5 );
-        third.add( id6 );
-        third.add( id7 );
         third.add( id8 );
 
         InOrderIterator fourth = new InOrderIterator( 100 );
         fourth.add( id1 );
+        fourth.add( id6 );
         fourth.add( id2 );
         fourth.add( id3 );
-        fourth.add( id6 );
         fourth.add( id8 );
         fourth.add( id9 );
 
+
         UnionIterator iter = new UnionIterator( 100 );
         iter.addIterator( first );
         iter.addIterator( second );
@@ -183,17 +184,17 @@
         int firstIntersection = 100;
         int secondIntersection = 200;
 
-        int pageSize = 10;
+        int pageSize = 20;
 
         UUID[] firstSet = new UUID[size];
         UUID[] secondSet = new UUID[size];
         UUID[] thirdSet = new UUID[size];
 
-        InOrderIterator first = new InOrderIterator( pageSize/2 );
-        InOrderIterator second = new InOrderIterator( pageSize/2 );
-        InOrderIterator third = new InOrderIterator( pageSize/2 );
+        InOrderIterator first = new InOrderIterator( pageSize / 2 );
+        InOrderIterator second = new InOrderIterator( pageSize / 2 );
+        InOrderIterator third = new InOrderIterator( pageSize / 2 );
 
-        Set<UUID> results = new HashSet<UUID>( size  );
+        Set<UUID> results = new HashSet<UUID>( size );
 
         for ( int i = 0; i < size; i++ ) {
             firstSet[i] = UUIDUtils.newTimeUUID();
@@ -242,11 +243,11 @@
             for ( ScanColumn col : resultSet ) {
                 boolean existed = results.remove( col.getUUID() );
 
-                assertTrue("Duplicate element was detected", existed);
+                assertTrue( "Duplicate element was detected", existed );
             }
         }
 
-        assertTrue( results.isEmpty() );
+        assertEquals( 0, results.size() );
         assertFalse( union.hasNext() );
     }
 
@@ -290,9 +291,7 @@
 
         //now try to get the next page
         ids = union.next();
-        assertNull(ids);
-
-
+        assertNull( ids );
     }