[BEAM-12802] Define a prefetchable iterator and iterable and utility functions that work over them.
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterable.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterable.java
new file mode 100644
index 0000000..5700f6c
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+/** An {@link Iterable} that returns {@link PrefetchableIterator}s. */
+public interface PrefetchableIterable<T> extends Iterable<T> {
+
+ @Override
+ PrefetchableIterator<T> iterator();
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterables.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterables.java
new file mode 100644
index 0000000..489c727
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterables.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import static org.apache.beam.sdk.fn.stream.PrefetchableIterators.emptyIterator;
+
+import java.util.NoSuchElementException;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+
+/**
+ * This class contains static utility functions that operate on or return objects of type {@link
+ * PrefetchableIterable}.
+ */
+public class PrefetchableIterables {
+
+ private static final PrefetchableIterable<Object> EMPTY_ITERABLE =
+ new PrefetchableIterable<Object>() {
+ @Override
+ public PrefetchableIterator<Object> iterator() {
+ return emptyIterator();
+ }
+ };
+
+ /** Returns an empty {@link PrefetchableIterable}. */
+ public static <T> PrefetchableIterable<T> emptyIterable() {
+ return (PrefetchableIterable<T>) EMPTY_ITERABLE;
+ }
+
+ /**
+ * Returns a {@link PrefetchableIterable} over the specified values.
+ *
+ * <p>{@link PrefetchableIterator#prefetch()} is a no-op and {@link
+ * PrefetchableIterator#isReady()} always returns true.
+ */
+ public static <T> PrefetchableIterable<T> fromArray(T... values) {
+ if (values.length == 0) {
+ return emptyIterable();
+ }
+ return new PrefetchableIterable<T>() {
+ @Override
+ public PrefetchableIterator<T> iterator() {
+ return PrefetchableIterators.fromArray(values);
+ }
+ };
+ }
+
+ /**
+ * Converts the {@link Iterable} into a {@link PrefetchableIterable}.
+ *
+ * <p>If the {@link Iterable#iterator} does not return {@link PrefetchableIterator}s then one is
+ * constructed that ensures that {@link PrefetchableIterator#prefetch()} is a no-op and {@link
+ * PrefetchableIterator#isReady()} always returns true.
+ */
+ private static <T> PrefetchableIterable<T> maybePrefetchable(Iterable<T> iterable) {
+ if (iterable instanceof PrefetchableIterable) {
+ return (PrefetchableIterable<T>) iterable;
+ }
+ return new PrefetchableIterable<T>() {
+ @Override
+ public PrefetchableIterator<T> iterator() {
+ return PrefetchableIterators.maybePrefetchable(iterable.iterator());
+ }
+ };
+ }
+
+ /**
+ * Concatentates the {@link Iterable}s.
+ *
+ * <p>See {@link PrefetchableIterators#concat} for additional details.
+ */
+ public static <T> PrefetchableIterable<T> concat(Iterable<T>... iterables) {
+ for (int i = 0; i < iterables.length; ++i) {
+ if (iterables[i] == null) {
+ throw new IllegalArgumentException("Iterable at position " + i + " was null.");
+ }
+ }
+ if (iterables.length == 0) {
+ return emptyIterable();
+ } else if (iterables.length == 1) {
+ return maybePrefetchable(iterables[0]);
+ }
+ return new PrefetchableIterable<T>() {
+ @SuppressWarnings("methodref.receiver.invalid")
+ @Override
+ public PrefetchableIterator<T> iterator() {
+ return PrefetchableIterators.concatIterators(
+ FluentIterable.from(iterables).transform(Iterable::iterator).iterator());
+ }
+ };
+ }
+
+ /** Limits the {@link PrefetchableIterable} to the specified number of elements. */
+ public static <T> PrefetchableIterable<T> limit(Iterable<T> iterable, int limit) {
+ PrefetchableIterable<T> prefetchableIterable = maybePrefetchable(iterable);
+ return new PrefetchableIterable<T>() {
+ @Override
+ public PrefetchableIterator<T> iterator() {
+ return new PrefetchableIterator<T>() {
+ PrefetchableIterator<T> delegate = prefetchableIterable.iterator();
+ int currentPosition;
+
+ @Override
+ public boolean isReady() {
+ if (currentPosition < limit) {
+ return delegate.isReady();
+ }
+ return true;
+ }
+
+ @Override
+ public void prefetch() {
+ if (!isReady()) {
+ delegate.prefetch();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (currentPosition != limit) {
+ return delegate.hasNext();
+ }
+ return false;
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ currentPosition += 1;
+ return delegate.next();
+ }
+ };
+ }
+ };
+ }
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterator.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterator.java
new file mode 100644
index 0000000..0a32260
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import java.util.Iterator;
+
+/** {@link Iterator} that supports prefetching the next set of records. */
+public interface PrefetchableIterator<T> extends Iterator<T> {
+
+ /**
+ * Returns {@code true} if and only if {@link #hasNext} and {@link #next} will not require an
+ * expensive operation.
+ */
+ boolean isReady();
+
+ /**
+ * If not {@link #isReady}, schedules the next expensive operation such that at some point in time
+ * in the future {@link #isReady} will return true.
+ */
+ void prefetch();
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterators.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterators.java
new file mode 100644
index 0000000..b8ea509
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterators.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class PrefetchableIterators {
+
+ private static final PrefetchableIterator<Object> EMPTY_ITERATOR =
+ new PrefetchableIterator<Object>() {
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void prefetch() {}
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public Object next() {
+ throw new NoSuchElementException();
+ }
+ };
+
+ /** Returns an empty {@link PrefetchableIterator}. */
+ public static <T> PrefetchableIterator<T> emptyIterator() {
+ return (PrefetchableIterator<T>) EMPTY_ITERATOR;
+ }
+
+ /**
+ * Returns a {@link PrefetchableIterator} over the specified values.
+ *
+ * <p>{@link PrefetchableIterator#prefetch()} is a no-op and {@link
+ * PrefetchableIterator#isReady()} always returns true.
+ */
+ public static <T> PrefetchableIterator<T> fromArray(T... values) {
+ if (values.length == 0) {
+ return emptyIterator();
+ }
+ return new PrefetchableIterator<T>() {
+ int currentIndex;
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void prefetch() {}
+
+ @Override
+ public boolean hasNext() {
+ return currentIndex < values.length;
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return values[currentIndex++];
+ }
+ };
+ }
+
+ /**
+ * If the {@link Iterator} is not a {@link PrefetchableIterator} then one is constructed that
+ * ensures {@link PrefetchableIterator#prefetch} is a no-op and {@link
+ * PrefetchableIterator#isReady} always returns true.
+ */
+ // package private for PrefetchableIterables.
+ static <T> PrefetchableIterator<T> maybePrefetchable(Iterator<T> iterator) {
+ if (iterator == null) {
+ throw new IllegalArgumentException("Expected non-null iterator.");
+ }
+ if (iterator instanceof PrefetchableIterator) {
+ return (PrefetchableIterator<T>) iterator;
+ }
+ return new PrefetchableIterator<T>() {
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void prefetch() {}
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ return iterator.next();
+ }
+ };
+ }
+
+ public static <T> PrefetchableIterator<T> concatIterators(Iterator<Iterator<T>> iterators) {
+ if (!iterators.hasNext()) {
+ return emptyIterator();
+ }
+ return new PrefetchableIterator<T>() {
+ PrefetchableIterator<T> delegate = maybePrefetchable(iterators.next());
+
+ @Override
+ public boolean isReady() {
+ // Ensure that we advance from iterators that don't have the next
+ // element to an iterator that supports prefetch or does have an element
+ for (; ; ) {
+ // If the delegate isn't ready then we aren't ready.
+ // We assume that non prefetchable iterators are always ready.
+ if (!delegate.isReady()) {
+ return false;
+ }
+
+ // If the delegate has a next and is ready then we are ready
+ if (delegate.hasNext()) {
+ return true;
+ }
+
+ // Otherwise we should advance to the next index since we know this iterator is empty
+ // and re-evaluate whether we are ready
+ if (!iterators.hasNext()) {
+ return true;
+ }
+ delegate = maybePrefetchable(iterators.next());
+ }
+ }
+
+ @Override
+ public void prefetch() {
+ if (!isReady()) {
+ delegate.prefetch();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ for (; ; ) {
+ if (delegate.hasNext()) {
+ return true;
+ }
+ if (!iterators.hasNext()) {
+ return false;
+ }
+ delegate = maybePrefetchable(iterators.next());
+ }
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return delegate.next();
+ }
+ };
+ }
+
+ /**
+ * Concatentates the {@link Iterator}s.
+ *
+ * <p>{@link Iterable}s are first converted into a {@link PrefetchableIterable} via {@link
+ * #maybePrefetchable}.
+ *
+ * <p>The returned {@link PrefetchableIterable} ensures that iterators which are returned
+ * guarantee that {@link PrefetchableIterator#isReady} always advances till it finds an {@link
+ * Iterable} that is not {@link PrefetchableIterator#isReady}. {@link
+ * PrefetchableIterator#prefetch} is also guaranteed to advance past empty iterators till it finds
+ * one that is not ready.
+ */
+ public static <T> PrefetchableIterator<T> concat(Iterator<T>... iterators) {
+ for (int i = 0; i < iterators.length; ++i) {
+ if (iterators[i] == null) {
+ throw new IllegalArgumentException("Iterator at position " + i + " was null.");
+ }
+ }
+ if (iterators.length == 0) {
+ return emptyIterator();
+ } else if (iterators.length == 1) {
+ return maybePrefetchable(iterators[0]);
+ }
+ return concatIterators(Arrays.asList(iterators).iterator());
+ }
+}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/PrefetchableIterablesTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/PrefetchableIterablesTest.java
new file mode 100644
index 0000000..e4d1b0d
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/PrefetchableIterablesTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import static org.junit.Assert.assertSame;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PrefetchableIterators}. */
+@RunWith(JUnit4.class)
+public class PrefetchableIterablesTest {
+ @Test
+ public void testEmptyIterable() {
+ verifyIterable(PrefetchableIterables.emptyIterable());
+ }
+
+ @Test
+ public void testFromArray() {
+ verifyIterable(PrefetchableIterables.fromArray("A", "B", "C"), "A", "B", "C");
+ verifyIterable(PrefetchableIterables.fromArray());
+ }
+
+ @Test
+ public void testLimit() {
+ verifyIterable(PrefetchableIterables.limit(PrefetchableIterables.fromArray(), 0));
+ verifyIterable(PrefetchableIterables.limit(PrefetchableIterables.fromArray(), 1));
+ verifyIterable(PrefetchableIterables.limit(PrefetchableIterables.fromArray("A", "B", "C"), 0));
+ verifyIterable(
+ PrefetchableIterables.limit(PrefetchableIterables.fromArray("A", "B", "C"), 2), "A", "B");
+ verifyIterable(
+ PrefetchableIterables.limit(PrefetchableIterables.fromArray("A", "B", "C"), 3),
+ "A",
+ "B",
+ "C");
+ verifyIterable(
+ PrefetchableIterables.limit(PrefetchableIterables.fromArray("A", "B", "C"), 4),
+ "A",
+ "B",
+ "C");
+ }
+
+ @Test
+ public void testConcat() {
+ verifyIterable(PrefetchableIterables.concat());
+
+ PrefetchableIterable<String> instance = PrefetchableIterables.fromArray("A", "B");
+ assertSame(PrefetchableIterables.concat(instance), instance);
+
+ verifyIterable(
+ PrefetchableIterables.concat(
+ PrefetchableIterables.fromArray(),
+ PrefetchableIterables.fromArray(),
+ PrefetchableIterables.fromArray()));
+ verifyIterable(
+ PrefetchableIterables.concat(
+ PrefetchableIterables.fromArray("A", "B"),
+ PrefetchableIterables.fromArray(),
+ PrefetchableIterables.fromArray()),
+ "A",
+ "B");
+ verifyIterable(
+ PrefetchableIterables.concat(
+ PrefetchableIterables.fromArray(),
+ PrefetchableIterables.fromArray("C", "D"),
+ PrefetchableIterables.fromArray()),
+ "C",
+ "D");
+ verifyIterable(
+ PrefetchableIterables.concat(
+ PrefetchableIterables.fromArray(),
+ PrefetchableIterables.fromArray(),
+ PrefetchableIterables.fromArray("E", "F")),
+ "E",
+ "F");
+ verifyIterable(
+ PrefetchableIterables.concat(
+ PrefetchableIterables.fromArray(),
+ PrefetchableIterables.fromArray("C", "D"),
+ PrefetchableIterables.fromArray("E", "F")),
+ "C",
+ "D",
+ "E",
+ "F");
+ verifyIterable(
+ PrefetchableIterables.concat(
+ PrefetchableIterables.fromArray("A", "B"),
+ PrefetchableIterables.fromArray(),
+ PrefetchableIterables.fromArray("E", "F")),
+ "A",
+ "B",
+ "E",
+ "F");
+ verifyIterable(
+ PrefetchableIterables.concat(
+ PrefetchableIterables.fromArray("A", "B"),
+ PrefetchableIterables.fromArray("C", "D"),
+ PrefetchableIterables.fromArray()),
+ "A",
+ "B",
+ "C",
+ "D");
+ verifyIterable(
+ PrefetchableIterables.concat(
+ PrefetchableIterables.fromArray("A", "B"),
+ PrefetchableIterables.fromArray("C", "D"),
+ PrefetchableIterables.fromArray("E", "F")),
+ "A",
+ "B",
+ "C",
+ "D",
+ "E",
+ "F");
+ }
+
+ public static <T> void verifyIterable(Iterable<T> iterable, T... expected) {
+ // Ensure we can access the iterator multiple times
+ for (int i = 0; i < 3; i++) {
+ PrefetchableIteratorsTest.verifyIterator(iterable.iterator(), expected);
+ }
+ }
+}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/PrefetchableIteratorsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/PrefetchableIteratorsTest.java
new file mode 100644
index 0000000..6131634
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/PrefetchableIteratorsTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PrefetchableIterators}. */
+@RunWith(JUnit4.class)
+public class PrefetchableIteratorsTest {
+
+ @Test
+ public void testEmpty() {
+ verifyIterator(PrefetchableIterators.emptyIterator());
+ verifyIsAlwaysReady(PrefetchableIterators.emptyIterator());
+ }
+
+ @Test
+ public void testFromArray() {
+ verifyIterator(PrefetchableIterators.fromArray("A", "B", "C"), "A", "B", "C");
+ verifyIsAlwaysReady(PrefetchableIterators.fromArray("A", "B", "C"));
+ verifyIterator(PrefetchableIterators.fromArray());
+ verifyIsAlwaysReady(PrefetchableIterators.fromArray());
+ }
+
+ @Test
+ public void testConcat() {
+ verifyIterator(PrefetchableIterators.concat());
+
+ PrefetchableIterator<String> instance = PrefetchableIterators.fromArray("A", "B");
+ assertSame(PrefetchableIterators.concat(instance), instance);
+
+ verifyIterator(
+ PrefetchableIterators.concat(
+ PrefetchableIterators.fromArray(),
+ PrefetchableIterators.fromArray(),
+ PrefetchableIterators.fromArray()));
+ verifyIterator(
+ PrefetchableIterators.concat(
+ PrefetchableIterators.fromArray("A", "B"),
+ PrefetchableIterators.fromArray(),
+ PrefetchableIterators.fromArray()),
+ "A",
+ "B");
+ verifyIterator(
+ PrefetchableIterators.concat(
+ PrefetchableIterators.fromArray(),
+ PrefetchableIterators.fromArray("C", "D"),
+ PrefetchableIterators.fromArray()),
+ "C",
+ "D");
+ verifyIterator(
+ PrefetchableIterators.concat(
+ PrefetchableIterators.fromArray(),
+ PrefetchableIterators.fromArray(),
+ PrefetchableIterators.fromArray("E", "F")),
+ "E",
+ "F");
+ verifyIterator(
+ PrefetchableIterators.concat(
+ PrefetchableIterators.fromArray(),
+ PrefetchableIterators.fromArray("C", "D"),
+ PrefetchableIterators.fromArray("E", "F")),
+ "C",
+ "D",
+ "E",
+ "F");
+ verifyIterator(
+ PrefetchableIterators.concat(
+ PrefetchableIterators.fromArray("A", "B"),
+ PrefetchableIterators.fromArray(),
+ PrefetchableIterators.fromArray("E", "F")),
+ "A",
+ "B",
+ "E",
+ "F");
+ verifyIterator(
+ PrefetchableIterators.concat(
+ PrefetchableIterators.fromArray("A", "B"),
+ PrefetchableIterators.fromArray("C", "D"),
+ PrefetchableIterators.fromArray()),
+ "A",
+ "B",
+ "C",
+ "D");
+ verifyIterator(
+ PrefetchableIterators.concat(
+ PrefetchableIterators.fromArray("A", "B"),
+ PrefetchableIterators.fromArray("C", "D"),
+ PrefetchableIterators.fromArray("E", "F")),
+ "A",
+ "B",
+ "C",
+ "D",
+ "E",
+ "F");
+ }
+
+ private static class NeverReady implements PrefetchableIterator<String> {
+ PrefetchableIterator<String> delegate = PrefetchableIterators.fromArray("A", "B");
+ int prefetchCalled;
+
+ @Override
+ public boolean isReady() {
+ return false;
+ }
+
+ @Override
+ public void prefetch() {
+ prefetchCalled += 1;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public String next() {
+ return delegate.next();
+ }
+ }
+
+ private static class ReadyAfterPrefetch extends NeverReady {
+ @Override
+ public boolean isReady() {
+ return prefetchCalled > 0;
+ }
+ }
+
+ @Test
+ public void testConcatIsReadyAdvancesToNextIteratorWhenAble() {
+ NeverReady readyAfterPrefetch1 = new NeverReady();
+ ReadyAfterPrefetch readyAfterPrefetch2 = new ReadyAfterPrefetch();
+ ReadyAfterPrefetch readyAfterPrefetch3 = new ReadyAfterPrefetch();
+
+ PrefetchableIterator<String> iterator =
+ PrefetchableIterators.concat(readyAfterPrefetch1, readyAfterPrefetch2, readyAfterPrefetch3);
+
+ // Expect no prefetches yet
+ assertEquals(0, readyAfterPrefetch1.prefetchCalled);
+ assertEquals(0, readyAfterPrefetch2.prefetchCalled);
+ assertEquals(0, readyAfterPrefetch3.prefetchCalled);
+
+ // We expect to attempt to prefetch for the first time.
+ iterator.prefetch();
+ assertEquals(1, readyAfterPrefetch1.prefetchCalled);
+ assertEquals(0, readyAfterPrefetch2.prefetchCalled);
+ assertEquals(0, readyAfterPrefetch3.prefetchCalled);
+ iterator.next();
+
+ // We expect to attempt to prefetch again since we aren't ready.
+ iterator.prefetch();
+ assertEquals(2, readyAfterPrefetch1.prefetchCalled);
+ assertEquals(0, readyAfterPrefetch2.prefetchCalled);
+ assertEquals(0, readyAfterPrefetch3.prefetchCalled);
+ iterator.next();
+
+ // The current iterator is done but is never ready so we can't advance to the next one and
+ // expect another prefetch to go to the current iterator.
+ iterator.prefetch();
+ assertEquals(3, readyAfterPrefetch1.prefetchCalled);
+ assertEquals(0, readyAfterPrefetch2.prefetchCalled);
+ assertEquals(0, readyAfterPrefetch3.prefetchCalled);
+ iterator.next();
+
+ // Now that we know the last iterator is done and have advanced to the next one we expect
+ // prefetch to go through
+ iterator.prefetch();
+ assertEquals(3, readyAfterPrefetch1.prefetchCalled);
+ assertEquals(1, readyAfterPrefetch2.prefetchCalled);
+ assertEquals(0, readyAfterPrefetch3.prefetchCalled);
+ iterator.next();
+
+ // The last iterator is done so we should be able to prefetch the next one before advancing
+ iterator.prefetch();
+ assertEquals(3, readyAfterPrefetch1.prefetchCalled);
+ assertEquals(1, readyAfterPrefetch2.prefetchCalled);
+ assertEquals(1, readyAfterPrefetch3.prefetchCalled);
+ iterator.next();
+
+ // The current iterator is ready so no additional prefetch is necessary
+ iterator.prefetch();
+ assertEquals(3, readyAfterPrefetch1.prefetchCalled);
+ assertEquals(1, readyAfterPrefetch2.prefetchCalled);
+ assertEquals(1, readyAfterPrefetch3.prefetchCalled);
+ iterator.next();
+ }
+
+ public static <T> void verifyIsAlwaysReady(PrefetchableIterator<T> iterator) {
+ while (iterator.hasNext()) {
+ assertTrue(iterator.isReady());
+ iterator.next();
+ }
+ assertTrue(iterator.isReady());
+ }
+
+ public static <T> void verifyIterator(Iterator<T> iterator, T... expected) {
+ for (int i = 0; i < expected.length; ++i) {
+ assertTrue(iterator.hasNext());
+ assertEquals(expected[i], iterator.next());
+ }
+ assertFalse(iterator.hasNext());
+ assertThrows(NoSuchElementException.class, () -> iterator.next());
+ // Ensure that multiple hasNext/next after a failure are repeatable
+ assertFalse(iterator.hasNext());
+ assertThrows(NoSuchElementException.class, () -> iterator.next());
+ }
+}