fix BatchMapperIterator stopped when fetched none in the middle batch (#64)

Change-Id: I658d8e95a68c8a9494efa98e88f350cf6d65b021
diff --git a/src/main/java/com/baidu/hugegraph/iterator/BatchMapperIterator.java b/src/main/java/com/baidu/hugegraph/iterator/BatchMapperIterator.java
index 6b2a913..283fc45 100644
--- a/src/main/java/com/baidu/hugegraph/iterator/BatchMapperIterator.java
+++ b/src/main/java/com/baidu/hugegraph/iterator/BatchMapperIterator.java
@@ -55,14 +55,16 @@
             return true;
         }
 
-        List<T> list = this.nextBatch();
-        if (!list.isEmpty()) {
-            assert this.batchIterator == null;
+        List<T> batch = this.nextBatch();
+        assert this.batchIterator == null;
+        while (!batch.isEmpty()) {
             // Do fetch
-            this.batchIterator = this.mapperCallback.apply(list);
+            this.batchIterator = this.mapperCallback.apply(batch);
             if (this.batchIterator != null && this.fetchFromBatch()) {
                 return true;
             }
+            // Try next batch
+            batch = this.nextBatch();
         }
         return false;
     }
diff --git a/src/main/java/com/baidu/hugegraph/iterator/WrappedIterator.java b/src/main/java/com/baidu/hugegraph/iterator/WrappedIterator.java
index c6e9ae7..21c4e9e 100644
--- a/src/main/java/com/baidu/hugegraph/iterator/WrappedIterator.java
+++ b/src/main/java/com/baidu/hugegraph/iterator/WrappedIterator.java
@@ -44,9 +44,9 @@
     public R next() {
         if (this.current == none()) {
             this.fetch();
-        }
-        if (this.current == none()) {
-            throw new NoSuchElementException();
+            if (this.current == none()) {
+                throw new NoSuchElementException();
+            }
         }
         R current = this.current;
         this.current = none();
diff --git a/src/test/java/com/baidu/hugegraph/unit/iterator/BatchMapperIteratorTest.java b/src/test/java/com/baidu/hugegraph/unit/iterator/BatchMapperIteratorTest.java
index e08ff69..a5d33d7 100644
--- a/src/test/java/com/baidu/hugegraph/unit/iterator/BatchMapperIteratorTest.java
+++ b/src/test/java/com/baidu/hugegraph/unit/iterator/BatchMapperIteratorTest.java
@@ -24,6 +24,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 import org.junit.Test;
@@ -235,6 +236,42 @@
         });
         Assert.assertFalse(results.hasNext());
         Assert.assertFalse(results.hasNext());
+
+        AtomicInteger count1 = new AtomicInteger(0);
+        results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+            if (count1.incrementAndGet() == 1) {
+                return null;
+            }
+            return batch.iterator();
+        });
+        Assert.assertTrue(results.hasNext());
+        Assert.assertEquals(5, results.next());
+        Assert.assertEquals(6, results.next());
+        Assert.assertFalse(results.hasNext());
+
+        AtomicInteger count2 = new AtomicInteger(0);
+        results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+            if (count2.incrementAndGet() == 2) {
+                return null;
+            }
+            return batch.iterator();
+        });
+        Assert.assertTrue(results.hasNext());
+        Assert.assertEquals(4, results.next());
+        Assert.assertEquals(6, results.next());
+        Assert.assertFalse(results.hasNext());
+
+        AtomicInteger count3 = new AtomicInteger(0);
+        results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+            if (count3.incrementAndGet() == 3) {
+                return null;
+            }
+            return batch.iterator();
+        });
+        Assert.assertTrue(results.hasNext());
+        Assert.assertEquals(4, results.next());
+        Assert.assertEquals(5, results.next());
+        Assert.assertFalse(results.hasNext());
     }
 
     @Test
@@ -253,6 +290,53 @@
     }
 
     @Test
+    public void testMapperReturnEmptyThenHasNext() {
+        Iterator<Integer> results;
+
+        results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+            return Collections.emptyIterator();
+        });
+        Assert.assertFalse(results.hasNext());
+        Assert.assertFalse(results.hasNext());
+
+        AtomicInteger count1 = new AtomicInteger(0);
+        results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+            if (count1.incrementAndGet() == 1) {
+                return Collections.emptyIterator();
+            }
+            return batch.iterator();
+        });
+        Assert.assertTrue(results.hasNext());
+        Assert.assertEquals(5, results.next());
+        Assert.assertEquals(6, results.next());
+        Assert.assertFalse(results.hasNext());
+
+        AtomicInteger count2 = new AtomicInteger(0);
+        results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+            if (count2.incrementAndGet() == 2) {
+                return Collections.emptyIterator();
+            }
+            return batch.iterator();
+        });
+        Assert.assertTrue(results.hasNext());
+        Assert.assertEquals(4, results.next());
+        Assert.assertEquals(6, results.next());
+        Assert.assertFalse(results.hasNext());
+
+        AtomicInteger count3 = new AtomicInteger(0);
+        results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+            if (count3.incrementAndGet() == 3) {
+                return Collections.emptyIterator();
+            }
+            return batch.iterator();
+        });
+        Assert.assertTrue(results.hasNext());
+        Assert.assertEquals(4, results.next());
+        Assert.assertEquals(5, results.next());
+        Assert.assertFalse(results.hasNext());
+    }
+
+    @Test
     public void testClose() throws Exception {
         CloseableItor<Integer> vals = new CloseableItor<>(DATA1.iterator());