[BEAM-10300] Improve JdbcIOTest.testFluentBackOffConfiguration stability (#12517)

* Move commitThread control out of try

* Commit connection on main thread after pipeline has finished.
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index de03549..aaed524 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -53,6 +53,7 @@
 import java.util.TimeZone;
 import java.util.logging.LogRecord;
 import javax.sql.DataSource;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -840,7 +841,6 @@
     insertStatement.setString(2, "TEST");
     insertStatement.execute();
 
-    // try to write to this table
     pipeline
         .apply(Create.of(Collections.singletonList(KV.of(1, "TEST"))))
         .apply(
@@ -861,50 +861,41 @@
                       statement.setString(2, element.getValue());
                     }));
 
-    // starting a thread to perform the commit later, while the pipeline is running into the
-
-    Thread commitThread =
-        new Thread(
+    PipelineExecutionException exception =
+        assertThrows(
+            PipelineExecutionException.class,
             () -> {
-              try {
-                Thread.sleep(10000);
-                connection.commit();
-              } catch (Exception e) {
-                // nothing to do
-              }
+              pipeline.run().waitUntilFinish();
             });
 
-    try {
-      commitThread.start();
-      pipeline.run();
-      commitThread.join();
-    } catch (Exception e) {
+    // Finally commit the original connection, now that the pipeline has failed due to deadlock.
+    connection.commit();
 
-      expectedLogs.verifyLogRecords(
-          new TypeSafeMatcher<Iterable<LogRecord>>() {
-            @Override
-            public void describeTo(Description description) {}
+    assertThat(
+        exception.getMessage(),
+        containsString(
+            "java.sql.BatchUpdateException: A lock could not be obtained within the time requested"));
 
-            @Override
-            protected boolean matchesSafely(Iterable<LogRecord> logRecords) {
-              int count = 0;
-              for (LogRecord logRecord : logRecords) {
-                if (logRecord.getMessage().contains("Deadlock detected, retrying")) {
-                  count += 1;
-                }
+    // Verify that pipeline retried the write twice, but encountered a deadlock every time.
+    expectedLogs.verifyLogRecords(
+        new TypeSafeMatcher<Iterable<LogRecord>>() {
+          @Override
+          public void describeTo(Description description) {}
+
+          @Override
+          protected boolean matchesSafely(Iterable<LogRecord> logRecords) {
+            int count = 0;
+            for (LogRecord logRecord : logRecords) {
+              if (logRecord.getMessage().contains("Deadlock detected, retrying")) {
+                count += 1;
               }
-              // Max retries will be 2 + the original deadlock error.
-              return count == 3;
             }
-          });
+            // Max retries will be 2 + the original deadlock error.
+            return count == 3;
+          }
+        });
 
-      assertThat(
-          e.getMessage(),
-          containsString(
-              "java.sql.BatchUpdateException: A lock could not be obtained within the time requested"));
-    }
-
-    // since, we got an error we will only have one row and the second one wouldn't go through.
+    // Since the pipeline was unable to write, only the row from insertStatement was written.
     assertRowCount(tableName, 1);
   }
 }