[FLINK-33018][Tests] Fix flaky test when cancelling source
diff --git a/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java b/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
index c6049d8..abcb15e 100644
--- a/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
+++ b/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
@@ -108,19 +108,19 @@
Thread thread = createSourceThread(pubSubSource, lock, results);
try {
thread.start();
- awaitRecordCount(results, 2);
-
- // we do not emit the end of stream record
- assertThat(new ArrayList<>(results)).isEqualTo(Arrays.asList("A", "B"));
- pubSubSource.snapshotState(0, 0);
- pubSubSource.notifyCheckpointComplete(0);
- // we acknowledge also the end of the stream record
- assertThat(testPubSubSubscriber.getAcknowledgedIds())
- .isEqualTo(Arrays.asList("1", "2", "3"));
+ // The source thread will finish automatically, without waiting for records or
+ // explicitly cancelling the source.
} finally {
- pubSubSource.cancel();
thread.join();
}
+
+ // we do not emit the end of stream record
+ assertThat(new ArrayList<>(results)).isEqualTo(Arrays.asList("A", "B"));
+ pubSubSource.snapshotState(0, 0);
+ pubSubSource.notifyCheckpointComplete(0);
+ // we acknowledge also the end of the stream record
+ assertThat(testPubSubSubscriber.getAcknowledgedIds())
+ .isEqualTo(Arrays.asList("1", "2", "3"));
}
@Test