[BEAM-10300] Improve JdbcIOTest.testFluentBackOffConfiguration stability (#12517)
* Move commitThread control out of try
* Commit connection on main thread after pipeline has finished.
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index de03549..aaed524 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -53,6 +53,7 @@
import java.util.TimeZone;
import java.util.logging.LogRecord;
import javax.sql.DataSource;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -840,7 +841,6 @@
insertStatement.setString(2, "TEST");
insertStatement.execute();
- // try to write to this table
pipeline
.apply(Create.of(Collections.singletonList(KV.of(1, "TEST"))))
.apply(
@@ -861,50 +861,41 @@
statement.setString(2, element.getValue());
}));
- // starting a thread to perform the commit later, while the pipeline is running into the
-
- Thread commitThread =
- new Thread(
+ PipelineExecutionException exception =
+ assertThrows(
+ PipelineExecutionException.class,
() -> {
- try {
- Thread.sleep(10000);
- connection.commit();
- } catch (Exception e) {
- // nothing to do
- }
+ pipeline.run().waitUntilFinish();
});
- try {
- commitThread.start();
- pipeline.run();
- commitThread.join();
- } catch (Exception e) {
+ // Finally commit the original connection, now that the pipeline has failed due to deadlock.
+ connection.commit();
- expectedLogs.verifyLogRecords(
- new TypeSafeMatcher<Iterable<LogRecord>>() {
- @Override
- public void describeTo(Description description) {}
+ assertThat(
+ exception.getMessage(),
+ containsString(
+ "java.sql.BatchUpdateException: A lock could not be obtained within the time requested"));
- @Override
- protected boolean matchesSafely(Iterable<LogRecord> logRecords) {
- int count = 0;
- for (LogRecord logRecord : logRecords) {
- if (logRecord.getMessage().contains("Deadlock detected, retrying")) {
- count += 1;
- }
+ // Verify that pipeline retried the write twice, but encountered a deadlock every time.
+ expectedLogs.verifyLogRecords(
+ new TypeSafeMatcher<Iterable<LogRecord>>() {
+ @Override
+ public void describeTo(Description description) {}
+
+ @Override
+ protected boolean matchesSafely(Iterable<LogRecord> logRecords) {
+ int count = 0;
+ for (LogRecord logRecord : logRecords) {
+ if (logRecord.getMessage().contains("Deadlock detected, retrying")) {
+ count += 1;
}
- // Max retries will be 2 + the original deadlock error.
- return count == 3;
}
- });
+ // Max retries will be 2 + the original deadlock error.
+ return count == 3;
+ }
+ });
- assertThat(
- e.getMessage(),
- containsString(
- "java.sql.BatchUpdateException: A lock could not be obtained within the time requested"));
- }
-
- // since, we got an error we will only have one row and the second one wouldn't go through.
+ // Since the pipeline was unable to write, only the row from insertStatement was written.
assertRowCount(tableName, 1);
}
}