Core: Remove reflection from TestParallelIterable (#10857)
This is a unit test, so can leverage package-private access.
diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
index 6486bd7..16fa6f3 100644
--- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
+++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
@@ -35,6 +35,7 @@
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.io.Closer;
@@ -77,7 +78,8 @@
return iter;
}
- private static class ParallelIterator<T> implements CloseableIterator<T> {
+ @VisibleForTesting
+ static class ParallelIterator<T> implements CloseableIterator<T> {
private final Iterator<Task<T>> tasks;
private final Deque<Task<T>> yieldedTasks = new ArrayDeque<>();
private final ExecutorService workerPool;
@@ -229,6 +231,11 @@
}
return queue.poll();
}
+
+ @VisibleForTesting
+ int queueSize() {
+ return queue.size();
+ }
}
private static class Task<T> implements Supplier<Optional<Task<T>>>, Closeable {
diff --git a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
index 4910732..c259bbd 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
@@ -21,12 +21,9 @@
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -40,6 +37,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Multiset;
+import org.apache.iceberg.util.ParallelIterable.ParallelIterator;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
@@ -64,20 +62,17 @@
});
ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(transform, executor);
- CloseableIterator<Integer> iterator = parallelIterable.iterator();
- Field queueField = iterator.getClass().getDeclaredField("queue");
- queueField.setAccessible(true);
- ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>) queueField.get(iterator);
+ ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator();
assertThat(iterator.hasNext()).isTrue();
assertThat(iterator.next()).isNotNull();
Awaitility.await("Queue is populated")
.atMost(5, TimeUnit.SECONDS)
- .untilAsserted(() -> queueHasElements(iterator, queue));
+ .untilAsserted(() -> queueHasElements(iterator));
iterator.close();
Awaitility.await("Queue is cleared")
.atMost(5, TimeUnit.SECONDS)
- .untilAsserted(() -> assertThat(queue).isEmpty());
+ .untilAsserted(() -> assertThat(iterator.queueSize()).isEqualTo(0));
}
@Test
@@ -124,20 +119,21 @@
});
ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(transform, executor);
- CloseableIterator<Integer> iterator = parallelIterable.iterator();
- Field queueField = iterator.getClass().getDeclaredField("queue");
- queueField.setAccessible(true);
- ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>) queueField.get(iterator);
+ ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator();
assertThat(iterator.hasNext()).isTrue();
assertThat(iterator.next()).isNotNull();
Awaitility.await("Queue is populated")
.atMost(5, TimeUnit.SECONDS)
- .untilAsserted(() -> queueHasElements(iterator, queue));
+ .untilAsserted(() -> queueHasElements(iterator));
iterator.close();
Awaitility.await("Queue is cleared")
.atMost(5, TimeUnit.SECONDS)
- .untilAsserted(() -> assertThat(queue).as("Queue is not empty after cleaning").isEmpty());
+ .untilAsserted(
+ () ->
+ assertThat(iterator.queueSize())
+ .as("Queue is not empty after cleaning")
+ .isEqualTo(0));
}
@Test
@@ -159,17 +155,14 @@
ExecutorService executor = Executors.newCachedThreadPool();
ParallelIterable<Integer> parallelIterable =
new ParallelIterable<>(iterables, executor, maxQueueSize);
- CloseableIterator<Integer> iterator = parallelIterable.iterator();
- Field queueField = iterator.getClass().getDeclaredField("queue");
- queueField.setAccessible(true);
- ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>) queueField.get(iterator);
+ ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator();
Multiset<Integer> actualValues = HashMultiset.create();
while (iterator.hasNext()) {
- assertThat(queue)
- .as("iterator internal queue")
- .hasSizeLessThanOrEqualTo(maxQueueSize + iterables.size());
+ assertThat(iterator.queueSize())
+ .as("iterator internal queue size")
+ .isLessThanOrEqualTo(maxQueueSize + iterables.size());
actualValues.add(iterator.next());
}
@@ -181,9 +174,9 @@
executor.shutdownNow();
}
- private void queueHasElements(CloseableIterator<Integer> iterator, Queue queue) {
+ private void queueHasElements(ParallelIterator<Integer> iterator) {
assertThat(iterator.hasNext()).isTrue();
assertThat(iterator.next()).isNotNull();
- assertThat(queue).isNotEmpty();
+ assertThat(iterator.queueSize()).as("queue size").isGreaterThan(0);
}
}