[FLINK-33018][Tests] Fix flaky test when cancelling source
diff --git a/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java b/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
index c6049d8..abcb15e 100644
--- a/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
+++ b/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
@@ -108,19 +108,19 @@
         Thread thread = createSourceThread(pubSubSource, lock, results);
         try {
             thread.start();
-            awaitRecordCount(results, 2);
-
-            // we do not emit the end of stream record
-            assertThat(new ArrayList<>(results)).isEqualTo(Arrays.asList("A", "B"));
-            pubSubSource.snapshotState(0, 0);
-            pubSubSource.notifyCheckpointComplete(0);
-            // we acknowledge also the end of the stream record
-            assertThat(testPubSubSubscriber.getAcknowledgedIds())
-                    .isEqualTo(Arrays.asList("1", "2", "3"));
+            // The source thread will finish automatically, without waiting for records or
+            // explicitly cancelling the source.
         } finally {
-            pubSubSource.cancel();
             thread.join();
         }
+
+        // we do not emit the end of stream record
+        assertThat(new ArrayList<>(results)).isEqualTo(Arrays.asList("A", "B"));
+        pubSubSource.snapshotState(0, 0);
+        pubSubSource.notifyCheckpointComplete(0);
+        // we acknowledge also the end of the stream record
+        assertThat(testPubSubSubscriber.getAcknowledgedIds())
+                .isEqualTo(Arrays.asList("1", "2", "3"));
     }
 
     @Test