Core: Remove reflection from TestParallelIterable (#10857)

This is a unit test, so can leverage package-private access.
diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
index 6486bd7..16fa6f3 100644
--- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
+++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
@@ -35,6 +35,7 @@
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.io.Closer;
@@ -77,7 +78,8 @@
     return iter;
   }
 
-  private static class ParallelIterator<T> implements CloseableIterator<T> {
+  @VisibleForTesting
+  static class ParallelIterator<T> implements CloseableIterator<T> {
     private final Iterator<Task<T>> tasks;
     private final Deque<Task<T>> yieldedTasks = new ArrayDeque<>();
     private final ExecutorService workerPool;
@@ -229,6 +231,11 @@
       }
       return queue.poll();
     }
+
+    @VisibleForTesting
+    int queueSize() {
+      return queue.size();
+    }
   }
 
   private static class Task<T> implements Supplier<Optional<Task<T>>>, Closeable {
diff --git a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
index 4910732..c259bbd 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
@@ -21,12 +21,9 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -40,6 +37,7 @@
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Multiset;
+import org.apache.iceberg.util.ParallelIterable.ParallelIterator;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
@@ -64,20 +62,17 @@
                 });
 
     ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(transform, executor);
-    CloseableIterator<Integer> iterator = parallelIterable.iterator();
-    Field queueField = iterator.getClass().getDeclaredField("queue");
-    queueField.setAccessible(true);
-    ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>) queueField.get(iterator);
+    ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator();
 
     assertThat(iterator.hasNext()).isTrue();
     assertThat(iterator.next()).isNotNull();
     Awaitility.await("Queue is populated")
         .atMost(5, TimeUnit.SECONDS)
-        .untilAsserted(() -> queueHasElements(iterator, queue));
+        .untilAsserted(() -> queueHasElements(iterator));
     iterator.close();
     Awaitility.await("Queue is cleared")
         .atMost(5, TimeUnit.SECONDS)
-        .untilAsserted(() -> assertThat(queue).isEmpty());
+        .untilAsserted(() -> assertThat(iterator.queueSize()).isEqualTo(0));
   }
 
   @Test
@@ -124,20 +119,21 @@
                 });
 
     ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(transform, executor);
-    CloseableIterator<Integer> iterator = parallelIterable.iterator();
-    Field queueField = iterator.getClass().getDeclaredField("queue");
-    queueField.setAccessible(true);
-    ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>) queueField.get(iterator);
+    ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator();
 
     assertThat(iterator.hasNext()).isTrue();
     assertThat(iterator.next()).isNotNull();
     Awaitility.await("Queue is populated")
         .atMost(5, TimeUnit.SECONDS)
-        .untilAsserted(() -> queueHasElements(iterator, queue));
+        .untilAsserted(() -> queueHasElements(iterator));
     iterator.close();
     Awaitility.await("Queue is cleared")
         .atMost(5, TimeUnit.SECONDS)
-        .untilAsserted(() -> assertThat(queue).as("Queue is not empty after cleaning").isEmpty());
+        .untilAsserted(
+            () ->
+                assertThat(iterator.queueSize())
+                    .as("Queue is not empty after cleaning")
+                    .isEqualTo(0));
   }
 
   @Test
@@ -159,17 +155,14 @@
     ExecutorService executor = Executors.newCachedThreadPool();
     ParallelIterable<Integer> parallelIterable =
         new ParallelIterable<>(iterables, executor, maxQueueSize);
-    CloseableIterator<Integer> iterator = parallelIterable.iterator();
-    Field queueField = iterator.getClass().getDeclaredField("queue");
-    queueField.setAccessible(true);
-    ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>) queueField.get(iterator);
+    ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator();
 
     Multiset<Integer> actualValues = HashMultiset.create();
 
     while (iterator.hasNext()) {
-      assertThat(queue)
-          .as("iterator internal queue")
-          .hasSizeLessThanOrEqualTo(maxQueueSize + iterables.size());
+      assertThat(iterator.queueSize())
+          .as("iterator internal queue size")
+          .isLessThanOrEqualTo(maxQueueSize + iterables.size());
       actualValues.add(iterator.next());
     }
 
@@ -181,9 +174,9 @@
     executor.shutdownNow();
   }
 
-  private void queueHasElements(CloseableIterator<Integer> iterator, Queue queue) {
+  private void queueHasElements(ParallelIterator<Integer> iterator) {
     assertThat(iterator.hasNext()).isTrue();
     assertThat(iterator.next()).isNotNull();
-    assertThat(queue).isNotEmpty();
+    assertThat(iterator.queueSize()).as("queue size").isGreaterThan(0);
   }
 }