[BEAM-12802] Define a prefetchable iterator and iterable and utility functions that work over them.

diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterable.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterable.java
new file mode 100644
index 0000000..5700f6c
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+/** An {@link Iterable} that returns {@link PrefetchableIterator}s. */
+public interface PrefetchableIterable<T> extends Iterable<T> {
+
+  @Override
+  PrefetchableIterator<T> iterator();
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterables.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterables.java
new file mode 100644
index 0000000..489c727
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterables.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import static org.apache.beam.sdk.fn.stream.PrefetchableIterators.emptyIterator;
+
+import java.util.NoSuchElementException;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+
+/**
+ * This class contains static utility functions that operate on or return objects of type {@link
+ * PrefetchableIterable}.
+ */
+public class PrefetchableIterables {
+
+  private static final PrefetchableIterable<Object> EMPTY_ITERABLE =
+      new PrefetchableIterable<Object>() {
+        @Override
+        public PrefetchableIterator<Object> iterator() {
+          return emptyIterator();
+        }
+      };
+
+  /** Returns an empty {@link PrefetchableIterable}. */
+  public static <T> PrefetchableIterable<T> emptyIterable() {
+    return (PrefetchableIterable<T>) EMPTY_ITERABLE;
+  }
+
+  /**
+   * Returns a {@link PrefetchableIterable} over the specified values.
+   *
+   * <p>{@link PrefetchableIterator#prefetch()} is a no-op and {@link
+   * PrefetchableIterator#isReady()} always returns true.
+   */
+  public static <T> PrefetchableIterable<T> fromArray(T... values) {
+    if (values.length == 0) {
+      return emptyIterable();
+    }
+    return new PrefetchableIterable<T>() {
+      @Override
+      public PrefetchableIterator<T> iterator() {
+        return PrefetchableIterators.fromArray(values);
+      }
+    };
+  }
+
+  /**
+   * Converts the {@link Iterable} into a {@link PrefetchableIterable}.
+   *
+   * <p>If the {@link Iterable#iterator} does not return {@link PrefetchableIterator}s then one is
+   * constructed that ensures that {@link PrefetchableIterator#prefetch()} is a no-op and {@link
+   * PrefetchableIterator#isReady()} always returns true.
+   */
+  private static <T> PrefetchableIterable<T> maybePrefetchable(Iterable<T> iterable) {
+    if (iterable instanceof PrefetchableIterable) {
+      return (PrefetchableIterable<T>) iterable;
+    }
+    return new PrefetchableIterable<T>() {
+      @Override
+      public PrefetchableIterator<T> iterator() {
+        return PrefetchableIterators.maybePrefetchable(iterable.iterator());
+      }
+    };
+  }
+
+  /**
+   * Concatentates the {@link Iterable}s.
+   *
+   * <p>See {@link PrefetchableIterators#concat} for additional details.
+   */
+  public static <T> PrefetchableIterable<T> concat(Iterable<T>... iterables) {
+    for (int i = 0; i < iterables.length; ++i) {
+      if (iterables[i] == null) {
+        throw new IllegalArgumentException("Iterable at position " + i + " was null.");
+      }
+    }
+    if (iterables.length == 0) {
+      return emptyIterable();
+    } else if (iterables.length == 1) {
+      return maybePrefetchable(iterables[0]);
+    }
+    return new PrefetchableIterable<T>() {
+      @SuppressWarnings("methodref.receiver.invalid")
+      @Override
+      public PrefetchableIterator<T> iterator() {
+        return PrefetchableIterators.concatIterators(
+            FluentIterable.from(iterables).transform(Iterable::iterator).iterator());
+      }
+    };
+  }
+
+  /** Limits the {@link PrefetchableIterable} to the specified number of elements. */
+  public static <T> PrefetchableIterable<T> limit(Iterable<T> iterable, int limit) {
+    PrefetchableIterable<T> prefetchableIterable = maybePrefetchable(iterable);
+    return new PrefetchableIterable<T>() {
+      @Override
+      public PrefetchableIterator<T> iterator() {
+        return new PrefetchableIterator<T>() {
+          PrefetchableIterator<T> delegate = prefetchableIterable.iterator();
+          int currentPosition;
+
+          @Override
+          public boolean isReady() {
+            if (currentPosition < limit) {
+              return delegate.isReady();
+            }
+            return true;
+          }
+
+          @Override
+          public void prefetch() {
+            if (!isReady()) {
+              delegate.prefetch();
+            }
+          }
+
+          @Override
+          public boolean hasNext() {
+            if (currentPosition != limit) {
+              return delegate.hasNext();
+            }
+            return false;
+          }
+
+          @Override
+          public T next() {
+            if (!hasNext()) {
+              throw new NoSuchElementException();
+            }
+            currentPosition += 1;
+            return delegate.next();
+          }
+        };
+      }
+    };
+  }
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterator.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterator.java
new file mode 100644
index 0000000..0a32260
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import java.util.Iterator;
+
+/** {@link Iterator} that supports prefetching the next set of records. */
+public interface PrefetchableIterator<T> extends Iterator<T> {
+
+  /**
+   * Returns {@code true} if and only if {@link #hasNext} and {@link #next} will not require an
+   * expensive operation.
+   */
+  boolean isReady();
+
+  /**
+   * If not {@link #isReady}, schedules the next expensive operation such that at some point in time
+   * in the future {@link #isReady} will return true.
+   */
+  void prefetch();
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterators.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterators.java
new file mode 100644
index 0000000..b8ea509
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterators.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class PrefetchableIterators {
+
+  private static final PrefetchableIterator<Object> EMPTY_ITERATOR =
+      new PrefetchableIterator<Object>() {
+        @Override
+        public boolean isReady() {
+          return true;
+        }
+
+        @Override
+        public void prefetch() {}
+
+        @Override
+        public boolean hasNext() {
+          return false;
+        }
+
+        @Override
+        public Object next() {
+          throw new NoSuchElementException();
+        }
+      };
+
+  /** Returns an empty {@link PrefetchableIterator}. */
+  public static <T> PrefetchableIterator<T> emptyIterator() {
+    return (PrefetchableIterator<T>) EMPTY_ITERATOR;
+  }
+
+  /**
+   * Returns a {@link PrefetchableIterator} over the specified values.
+   *
+   * <p>{@link PrefetchableIterator#prefetch()} is a no-op and {@link
+   * PrefetchableIterator#isReady()} always returns true.
+   */
+  public static <T> PrefetchableIterator<T> fromArray(T... values) {
+    if (values.length == 0) {
+      return emptyIterator();
+    }
+    return new PrefetchableIterator<T>() {
+      int currentIndex;
+
+      @Override
+      public boolean isReady() {
+        return true;
+      }
+
+      @Override
+      public void prefetch() {}
+
+      @Override
+      public boolean hasNext() {
+        return currentIndex < values.length;
+      }
+
+      @Override
+      public T next() {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+        return values[currentIndex++];
+      }
+    };
+  }
+
+  /**
+   * If the {@link Iterator} is not a {@link PrefetchableIterator} then one is constructed that
+   * ensures {@link PrefetchableIterator#prefetch} is a no-op and {@link
+   * PrefetchableIterator#isReady} always returns true.
+   */
+  // package private for PrefetchableIterables.
+  static <T> PrefetchableIterator<T> maybePrefetchable(Iterator<T> iterator) {
+    if (iterator == null) {
+      throw new IllegalArgumentException("Expected non-null iterator.");
+    }
+    if (iterator instanceof PrefetchableIterator) {
+      return (PrefetchableIterator<T>) iterator;
+    }
+    return new PrefetchableIterator<T>() {
+      @Override
+      public boolean isReady() {
+        return true;
+      }
+
+      @Override
+      public void prefetch() {}
+
+      @Override
+      public boolean hasNext() {
+        return iterator.hasNext();
+      }
+
+      @Override
+      public T next() {
+        return iterator.next();
+      }
+    };
+  }
+
+  public static <T> PrefetchableIterator<T> concatIterators(Iterator<Iterator<T>> iterators) {
+    if (!iterators.hasNext()) {
+      return emptyIterator();
+    }
+    return new PrefetchableIterator<T>() {
+      PrefetchableIterator<T> delegate = maybePrefetchable(iterators.next());
+
+      @Override
+      public boolean isReady() {
+        // Ensure that we advance from iterators that don't have the next
+        // element to an iterator that supports prefetch or does have an element
+        for (; ; ) {
+          // If the delegate isn't ready then we aren't ready.
+          // We assume that non prefetchable iterators are always ready.
+          if (!delegate.isReady()) {
+            return false;
+          }
+
+          // If the delegate has a next and is ready then we are ready
+          if (delegate.hasNext()) {
+            return true;
+          }
+
+          // Otherwise we should advance to the next index since we know this iterator is empty
+          // and re-evaluate whether we are ready
+          if (!iterators.hasNext()) {
+            return true;
+          }
+          delegate = maybePrefetchable(iterators.next());
+        }
+      }
+
+      @Override
+      public void prefetch() {
+        if (!isReady()) {
+          delegate.prefetch();
+        }
+      }
+
+      @Override
+      public boolean hasNext() {
+        for (; ; ) {
+          if (delegate.hasNext()) {
+            return true;
+          }
+          if (!iterators.hasNext()) {
+            return false;
+          }
+          delegate = maybePrefetchable(iterators.next());
+        }
+      }
+
+      @Override
+      public T next() {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+        return delegate.next();
+      }
+    };
+  }
+
+  /**
+   * Concatentates the {@link Iterator}s.
+   *
+   * <p>{@link Iterable}s are first converted into a {@link PrefetchableIterable} via {@link
+   * #maybePrefetchable}.
+   *
+   * <p>The returned {@link PrefetchableIterable} ensures that iterators which are returned
+   * guarantee that {@link PrefetchableIterator#isReady} always advances till it finds an {@link
+   * Iterable} that is not {@link PrefetchableIterator#isReady}. {@link
+   * PrefetchableIterator#prefetch} is also guaranteed to advance past empty iterators till it finds
+   * one that is not ready.
+   */
+  public static <T> PrefetchableIterator<T> concat(Iterator<T>... iterators) {
+    for (int i = 0; i < iterators.length; ++i) {
+      if (iterators[i] == null) {
+        throw new IllegalArgumentException("Iterator at position " + i + " was null.");
+      }
+    }
+    if (iterators.length == 0) {
+      return emptyIterator();
+    } else if (iterators.length == 1) {
+      return maybePrefetchable(iterators[0]);
+    }
+    return concatIterators(Arrays.asList(iterators).iterator());
+  }
+}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/PrefetchableIterablesTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/PrefetchableIterablesTest.java
new file mode 100644
index 0000000..e4d1b0d
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/PrefetchableIterablesTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import static org.junit.Assert.assertSame;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PrefetchableIterators}. */
+@RunWith(JUnit4.class)
+public class PrefetchableIterablesTest {
+  @Test
+  public void testEmptyIterable() {
+    verifyIterable(PrefetchableIterables.emptyIterable());
+  }
+
+  @Test
+  public void testFromArray() {
+    verifyIterable(PrefetchableIterables.fromArray("A", "B", "C"), "A", "B", "C");
+    verifyIterable(PrefetchableIterables.fromArray());
+  }
+
+  @Test
+  public void testLimit() {
+    verifyIterable(PrefetchableIterables.limit(PrefetchableIterables.fromArray(), 0));
+    verifyIterable(PrefetchableIterables.limit(PrefetchableIterables.fromArray(), 1));
+    verifyIterable(PrefetchableIterables.limit(PrefetchableIterables.fromArray("A", "B", "C"), 0));
+    verifyIterable(
+        PrefetchableIterables.limit(PrefetchableIterables.fromArray("A", "B", "C"), 2), "A", "B");
+    verifyIterable(
+        PrefetchableIterables.limit(PrefetchableIterables.fromArray("A", "B", "C"), 3),
+        "A",
+        "B",
+        "C");
+    verifyIterable(
+        PrefetchableIterables.limit(PrefetchableIterables.fromArray("A", "B", "C"), 4),
+        "A",
+        "B",
+        "C");
+  }
+
+  @Test
+  public void testConcat() {
+    verifyIterable(PrefetchableIterables.concat());
+
+    PrefetchableIterable<String> instance = PrefetchableIterables.fromArray("A", "B");
+    assertSame(PrefetchableIterables.concat(instance), instance);
+
+    verifyIterable(
+        PrefetchableIterables.concat(
+            PrefetchableIterables.fromArray(),
+            PrefetchableIterables.fromArray(),
+            PrefetchableIterables.fromArray()));
+    verifyIterable(
+        PrefetchableIterables.concat(
+            PrefetchableIterables.fromArray("A", "B"),
+            PrefetchableIterables.fromArray(),
+            PrefetchableIterables.fromArray()),
+        "A",
+        "B");
+    verifyIterable(
+        PrefetchableIterables.concat(
+            PrefetchableIterables.fromArray(),
+            PrefetchableIterables.fromArray("C", "D"),
+            PrefetchableIterables.fromArray()),
+        "C",
+        "D");
+    verifyIterable(
+        PrefetchableIterables.concat(
+            PrefetchableIterables.fromArray(),
+            PrefetchableIterables.fromArray(),
+            PrefetchableIterables.fromArray("E", "F")),
+        "E",
+        "F");
+    verifyIterable(
+        PrefetchableIterables.concat(
+            PrefetchableIterables.fromArray(),
+            PrefetchableIterables.fromArray("C", "D"),
+            PrefetchableIterables.fromArray("E", "F")),
+        "C",
+        "D",
+        "E",
+        "F");
+    verifyIterable(
+        PrefetchableIterables.concat(
+            PrefetchableIterables.fromArray("A", "B"),
+            PrefetchableIterables.fromArray(),
+            PrefetchableIterables.fromArray("E", "F")),
+        "A",
+        "B",
+        "E",
+        "F");
+    verifyIterable(
+        PrefetchableIterables.concat(
+            PrefetchableIterables.fromArray("A", "B"),
+            PrefetchableIterables.fromArray("C", "D"),
+            PrefetchableIterables.fromArray()),
+        "A",
+        "B",
+        "C",
+        "D");
+    verifyIterable(
+        PrefetchableIterables.concat(
+            PrefetchableIterables.fromArray("A", "B"),
+            PrefetchableIterables.fromArray("C", "D"),
+            PrefetchableIterables.fromArray("E", "F")),
+        "A",
+        "B",
+        "C",
+        "D",
+        "E",
+        "F");
+  }
+
+  public static <T> void verifyIterable(Iterable<T> iterable, T... expected) {
+    // Ensure we can access the iterator multiple times
+    for (int i = 0; i < 3; i++) {
+      PrefetchableIteratorsTest.verifyIterator(iterable.iterator(), expected);
+    }
+  }
+}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/PrefetchableIteratorsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/PrefetchableIteratorsTest.java
new file mode 100644
index 0000000..6131634
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/PrefetchableIteratorsTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PrefetchableIterators}. */
+@RunWith(JUnit4.class)
+public class PrefetchableIteratorsTest {
+
+  @Test
+  public void testEmpty() {
+    verifyIterator(PrefetchableIterators.emptyIterator());
+    verifyIsAlwaysReady(PrefetchableIterators.emptyIterator());
+  }
+
+  @Test
+  public void testFromArray() {
+    verifyIterator(PrefetchableIterators.fromArray("A", "B", "C"), "A", "B", "C");
+    verifyIsAlwaysReady(PrefetchableIterators.fromArray("A", "B", "C"));
+    verifyIterator(PrefetchableIterators.fromArray());
+    verifyIsAlwaysReady(PrefetchableIterators.fromArray());
+  }
+
+  @Test
+  public void testConcat() {
+    verifyIterator(PrefetchableIterators.concat());
+
+    PrefetchableIterator<String> instance = PrefetchableIterators.fromArray("A", "B");
+    assertSame(PrefetchableIterators.concat(instance), instance);
+
+    verifyIterator(
+        PrefetchableIterators.concat(
+            PrefetchableIterators.fromArray(),
+            PrefetchableIterators.fromArray(),
+            PrefetchableIterators.fromArray()));
+    verifyIterator(
+        PrefetchableIterators.concat(
+            PrefetchableIterators.fromArray("A", "B"),
+            PrefetchableIterators.fromArray(),
+            PrefetchableIterators.fromArray()),
+        "A",
+        "B");
+    verifyIterator(
+        PrefetchableIterators.concat(
+            PrefetchableIterators.fromArray(),
+            PrefetchableIterators.fromArray("C", "D"),
+            PrefetchableIterators.fromArray()),
+        "C",
+        "D");
+    verifyIterator(
+        PrefetchableIterators.concat(
+            PrefetchableIterators.fromArray(),
+            PrefetchableIterators.fromArray(),
+            PrefetchableIterators.fromArray("E", "F")),
+        "E",
+        "F");
+    verifyIterator(
+        PrefetchableIterators.concat(
+            PrefetchableIterators.fromArray(),
+            PrefetchableIterators.fromArray("C", "D"),
+            PrefetchableIterators.fromArray("E", "F")),
+        "C",
+        "D",
+        "E",
+        "F");
+    verifyIterator(
+        PrefetchableIterators.concat(
+            PrefetchableIterators.fromArray("A", "B"),
+            PrefetchableIterators.fromArray(),
+            PrefetchableIterators.fromArray("E", "F")),
+        "A",
+        "B",
+        "E",
+        "F");
+    verifyIterator(
+        PrefetchableIterators.concat(
+            PrefetchableIterators.fromArray("A", "B"),
+            PrefetchableIterators.fromArray("C", "D"),
+            PrefetchableIterators.fromArray()),
+        "A",
+        "B",
+        "C",
+        "D");
+    verifyIterator(
+        PrefetchableIterators.concat(
+            PrefetchableIterators.fromArray("A", "B"),
+            PrefetchableIterators.fromArray("C", "D"),
+            PrefetchableIterators.fromArray("E", "F")),
+        "A",
+        "B",
+        "C",
+        "D",
+        "E",
+        "F");
+  }
+
+  private static class NeverReady implements PrefetchableIterator<String> {
+    PrefetchableIterator<String> delegate = PrefetchableIterators.fromArray("A", "B");
+    int prefetchCalled;
+
+    @Override
+    public boolean isReady() {
+      return false;
+    }
+
+    @Override
+    public void prefetch() {
+      prefetchCalled += 1;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return delegate.hasNext();
+    }
+
+    @Override
+    public String next() {
+      return delegate.next();
+    }
+  }
+
+  private static class ReadyAfterPrefetch extends NeverReady {
+    @Override
+    public boolean isReady() {
+      return prefetchCalled > 0;
+    }
+  }
+
+  @Test
+  public void testConcatIsReadyAdvancesToNextIteratorWhenAble() {
+    NeverReady readyAfterPrefetch1 = new NeverReady();
+    ReadyAfterPrefetch readyAfterPrefetch2 = new ReadyAfterPrefetch();
+    ReadyAfterPrefetch readyAfterPrefetch3 = new ReadyAfterPrefetch();
+
+    PrefetchableIterator<String> iterator =
+        PrefetchableIterators.concat(readyAfterPrefetch1, readyAfterPrefetch2, readyAfterPrefetch3);
+
+    // Expect no prefetches yet
+    assertEquals(0, readyAfterPrefetch1.prefetchCalled);
+    assertEquals(0, readyAfterPrefetch2.prefetchCalled);
+    assertEquals(0, readyAfterPrefetch3.prefetchCalled);
+
+    // We expect to attempt to prefetch for the first time.
+    iterator.prefetch();
+    assertEquals(1, readyAfterPrefetch1.prefetchCalled);
+    assertEquals(0, readyAfterPrefetch2.prefetchCalled);
+    assertEquals(0, readyAfterPrefetch3.prefetchCalled);
+    iterator.next();
+
+    // We expect to attempt to prefetch again since we aren't ready.
+    iterator.prefetch();
+    assertEquals(2, readyAfterPrefetch1.prefetchCalled);
+    assertEquals(0, readyAfterPrefetch2.prefetchCalled);
+    assertEquals(0, readyAfterPrefetch3.prefetchCalled);
+    iterator.next();
+
+    // The current iterator is done but is never ready so we can't advance to the next one and
+    // expect another prefetch to go to the current iterator.
+    iterator.prefetch();
+    assertEquals(3, readyAfterPrefetch1.prefetchCalled);
+    assertEquals(0, readyAfterPrefetch2.prefetchCalled);
+    assertEquals(0, readyAfterPrefetch3.prefetchCalled);
+    iterator.next();
+
+    // Now that we know the last iterator is done and have advanced to the next one we expect
+    // prefetch to go through
+    iterator.prefetch();
+    assertEquals(3, readyAfterPrefetch1.prefetchCalled);
+    assertEquals(1, readyAfterPrefetch2.prefetchCalled);
+    assertEquals(0, readyAfterPrefetch3.prefetchCalled);
+    iterator.next();
+
+    // The last iterator is done so we should be able to prefetch the next one before advancing
+    iterator.prefetch();
+    assertEquals(3, readyAfterPrefetch1.prefetchCalled);
+    assertEquals(1, readyAfterPrefetch2.prefetchCalled);
+    assertEquals(1, readyAfterPrefetch3.prefetchCalled);
+    iterator.next();
+
+    // The current iterator is ready so no additional prefetch is necessary
+    iterator.prefetch();
+    assertEquals(3, readyAfterPrefetch1.prefetchCalled);
+    assertEquals(1, readyAfterPrefetch2.prefetchCalled);
+    assertEquals(1, readyAfterPrefetch3.prefetchCalled);
+    iterator.next();
+  }
+
+  public static <T> void verifyIsAlwaysReady(PrefetchableIterator<T> iterator) {
+    while (iterator.hasNext()) {
+      assertTrue(iterator.isReady());
+      iterator.next();
+    }
+    assertTrue(iterator.isReady());
+  }
+
+  public static <T> void verifyIterator(Iterator<T> iterator, T... expected) {
+    for (int i = 0; i < expected.length; ++i) {
+      assertTrue(iterator.hasNext());
+      assertEquals(expected[i], iterator.next());
+    }
+    assertFalse(iterator.hasNext());
+    assertThrows(NoSuchElementException.class, () -> iterator.next());
+    // Ensure that multiple hasNext/next after a failure are repeatable
+    assertFalse(iterator.hasNext());
+    assertThrows(NoSuchElementException.class, () -> iterator.next());
+  }
+}