[BEAM-12258] Re-throw exception from forked thread in PubsubTableProvideIT.testSQLLimit (#14734)
* Re-throw exception from forked thread in testSQLLimit
* move assertionError throw
* Make get call return immediately, add comments
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
index 11d4feea..b4c4a72 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
@@ -40,10 +40,12 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
@@ -451,7 +453,7 @@
// Because Pubsub only allow new subscription receives message after the subscription is
// created, eventsTopic.publish(messages) can only be called after statement.executeQuery.
// However, because statement.executeQuery is a blocking call, it has to be put into a
- // seperate thread to execute.
+ // separate thread to execute.
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<List<String>> queryResult =
pool.submit(
@@ -466,9 +468,26 @@
return result.build();
});
- eventsTopic.assertSubscriptionEventuallyCreated(
- pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
+ try {
+ eventsTopic.assertSubscriptionEventuallyCreated(
+ pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
+ } catch (AssertionError assertionError) {
+ // If we're here we timed out waiting for a subscription to get created.
+ // Check if the forked thread had an exception.
+ try {
+ queryResult.get(0, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ // Nothing went wrong on the forked thread, but a subscription still wasn't created.
+ } catch (ExecutionException e) {
+ // get() throws an ExecutionException if there was an exception in the thread. Bubble it
+ // up to the user.
+ throw new AssertionError("Exception occurred in statement.executeQuery thread", e);
+ }
+ // Nothing went wrong in executeQuery thread, but still no subscription was created!
+ // Just re-throw the timeout assertion.
+ throw assertionError;
+ }
eventsTopic.publish(messages);
assertThat(queryResult.get(2, TimeUnit.MINUTES).size(), equalTo(3));