[BEAM-12258] Re-throw exception from forked thread in PubsubTableProvideIT.testSQLLimit (#14734)

* Re-throw exception from forked thread in testSQLLimit

* move assertionError throw

* Make get call return immediately, add comments
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
index 11d4feea..b4c4a72 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
@@ -40,10 +40,12 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
@@ -451,7 +453,7 @@
     // Because Pubsub only allow new subscription receives message after the subscription is
     // created, eventsTopic.publish(messages) can only be called after statement.executeQuery.
     // However, because statement.executeQuery is a blocking call, it has to be put into a
-    // seperate thread to execute.
+    // separate thread to execute.
     ExecutorService pool = Executors.newFixedThreadPool(1);
     Future<List<String>> queryResult =
         pool.submit(
@@ -466,9 +468,26 @@
                   return result.build();
                 });
 
-    eventsTopic.assertSubscriptionEventuallyCreated(
-        pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
+    try {
+      eventsTopic.assertSubscriptionEventuallyCreated(
+          pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
+    } catch (AssertionError assertionError) {
+      // If we're here we timed out waiting for a subscription to get created.
+      // Check if the forked thread had an exception.
+      try {
+        queryResult.get(0, TimeUnit.SECONDS);
+      } catch (TimeoutException e) {
+        // Nothing went wrong on the forked thread, but a subscription still wasn't created.
+      } catch (ExecutionException e) {
+        // get() throws an ExecutionException if there was an exception in the thread. Bubble it
+        // up to the user.
+        throw new AssertionError("Exception occurred in statement.executeQuery thread", e);
+      }
 
+      // Nothing went wrong in executeQuery thread, but still no subscription was created!
+      // Just re-throw the timeout assertion.
+      throw assertionError;
+    }
     eventsTopic.publish(messages);
 
     assertThat(queryResult.get(2, TimeUnit.MINUTES).size(), equalTo(3));