fix BatchMapperIterator stopped when fetched none in the middle batch (#64)
Change-Id: I658d8e95a68c8a9494efa98e88f350cf6d65b021
diff --git a/src/main/java/com/baidu/hugegraph/iterator/BatchMapperIterator.java b/src/main/java/com/baidu/hugegraph/iterator/BatchMapperIterator.java
index 6b2a913..283fc45 100644
--- a/src/main/java/com/baidu/hugegraph/iterator/BatchMapperIterator.java
+++ b/src/main/java/com/baidu/hugegraph/iterator/BatchMapperIterator.java
@@ -55,14 +55,16 @@
return true;
}
- List<T> list = this.nextBatch();
- if (!list.isEmpty()) {
- assert this.batchIterator == null;
+ List<T> batch = this.nextBatch();
+ assert this.batchIterator == null;
+ while (!batch.isEmpty()) {
// Do fetch
- this.batchIterator = this.mapperCallback.apply(list);
+ this.batchIterator = this.mapperCallback.apply(batch);
if (this.batchIterator != null && this.fetchFromBatch()) {
return true;
}
+ // Try next batch
+ batch = this.nextBatch();
}
return false;
}
diff --git a/src/main/java/com/baidu/hugegraph/iterator/WrappedIterator.java b/src/main/java/com/baidu/hugegraph/iterator/WrappedIterator.java
index c6e9ae7..21c4e9e 100644
--- a/src/main/java/com/baidu/hugegraph/iterator/WrappedIterator.java
+++ b/src/main/java/com/baidu/hugegraph/iterator/WrappedIterator.java
@@ -44,9 +44,9 @@
public R next() {
if (this.current == none()) {
this.fetch();
- }
- if (this.current == none()) {
- throw new NoSuchElementException();
+ if (this.current == none()) {
+ throw new NoSuchElementException();
+ }
}
R current = this.current;
this.current = none();
diff --git a/src/test/java/com/baidu/hugegraph/unit/iterator/BatchMapperIteratorTest.java b/src/test/java/com/baidu/hugegraph/unit/iterator/BatchMapperIteratorTest.java
index e08ff69..a5d33d7 100644
--- a/src/test/java/com/baidu/hugegraph/unit/iterator/BatchMapperIteratorTest.java
+++ b/src/test/java/com/baidu/hugegraph/unit/iterator/BatchMapperIteratorTest.java
@@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.junit.Test;
@@ -235,6 +236,42 @@
});
Assert.assertFalse(results.hasNext());
Assert.assertFalse(results.hasNext());
+
+ AtomicInteger count1 = new AtomicInteger(0);
+ results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+ if (count1.incrementAndGet() == 1) {
+ return null;
+ }
+ return batch.iterator();
+ });
+ Assert.assertTrue(results.hasNext());
+ Assert.assertEquals(5, results.next());
+ Assert.assertEquals(6, results.next());
+ Assert.assertFalse(results.hasNext());
+
+ AtomicInteger count2 = new AtomicInteger(0);
+ results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+ if (count2.incrementAndGet() == 2) {
+ return null;
+ }
+ return batch.iterator();
+ });
+ Assert.assertTrue(results.hasNext());
+ Assert.assertEquals(4, results.next());
+ Assert.assertEquals(6, results.next());
+ Assert.assertFalse(results.hasNext());
+
+ AtomicInteger count3 = new AtomicInteger(0);
+ results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+ if (count3.incrementAndGet() == 3) {
+ return null;
+ }
+ return batch.iterator();
+ });
+ Assert.assertTrue(results.hasNext());
+ Assert.assertEquals(4, results.next());
+ Assert.assertEquals(5, results.next());
+ Assert.assertFalse(results.hasNext());
}
@Test
@@ -253,6 +290,53 @@
}
@Test
+ public void testMapperReturnEmptyThenHasNext() {
+ Iterator<Integer> results;
+
+ results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+ return Collections.emptyIterator();
+ });
+ Assert.assertFalse(results.hasNext());
+ Assert.assertFalse(results.hasNext());
+
+ AtomicInteger count1 = new AtomicInteger(0);
+ results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+ if (count1.incrementAndGet() == 1) {
+ return Collections.emptyIterator();
+ }
+ return batch.iterator();
+ });
+ Assert.assertTrue(results.hasNext());
+ Assert.assertEquals(5, results.next());
+ Assert.assertEquals(6, results.next());
+ Assert.assertFalse(results.hasNext());
+
+ AtomicInteger count2 = new AtomicInteger(0);
+ results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+ if (count2.incrementAndGet() == 2) {
+ return Collections.emptyIterator();
+ }
+ return batch.iterator();
+ });
+ Assert.assertTrue(results.hasNext());
+ Assert.assertEquals(4, results.next());
+ Assert.assertEquals(6, results.next());
+ Assert.assertFalse(results.hasNext());
+
+ AtomicInteger count3 = new AtomicInteger(0);
+ results = new BatchMapperIterator<>(1, DATA3.iterator(), batch -> {
+ if (count3.incrementAndGet() == 3) {
+ return Collections.emptyIterator();
+ }
+ return batch.iterator();
+ });
+ Assert.assertTrue(results.hasNext());
+ Assert.assertEquals(4, results.next());
+ Assert.assertEquals(5, results.next());
+ Assert.assertFalse(results.hasNext());
+ }
+
+ @Test
public void testClose() throws Exception {
CloseableItor<Integer> vals = new CloseableItor<>(DATA1.iterator());