[FLINK-17544][jdbc] Fix NPE and resource leak problem in JdbcOutputFormat
This closes #12712
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
index af41ed1..c7afc31 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -209,8 +209,6 @@
if (!closed) {
closed = true;
- checkFlushException();
-
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
@@ -220,7 +218,7 @@
try {
flush();
} catch (Exception e) {
- throw new RuntimeException("Writing records to JDBC failed.", e);
+ LOG.warn("Writing records to JDBC failed.", e);
}
}
@@ -233,6 +231,7 @@
}
}
super.close();
+ checkFlushException();
}
public static Builder builder() {
@@ -348,5 +347,4 @@
static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] types) {
return (st, record) -> setRecordToStatement(st, types, record);
}
-
}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java
index 1865ebc..4afea82 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java
@@ -83,7 +83,9 @@
super.close();
} finally {
try {
- deleteExecutor.closeStatements();
+ if (deleteExecutor != null){
+ deleteExecutor.closeStatements();
+ }
} catch (SQLException e) {
LOG.warn("unable to close delete statement runner", e);
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
index f2aec19..15e6425 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
@@ -28,6 +28,7 @@
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
@@ -43,6 +44,7 @@
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.function.Function;
@@ -57,8 +59,10 @@
import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
/**
* Tests using both {@link JdbcInputFormat} and {@link JdbcBatchingOutputFormat}.
@@ -104,6 +108,49 @@
}
}
+ @Test
+ public void testJdbcBatchingOutputFormatCloseDuringRuntime() throws Exception{
+ JdbcOptions options = JdbcOptions.builder()
+ .setDBUrl(getDbMetadata().getUrl())
+ .setTableName(OUTPUT_TABLE)
+ .build();
+ // use scheduledThreadPool
+ JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
+ .withBatchIntervalMs(1000_000L)
+ .withBatchSize(2)
+ .withMaxRetries(1)
+ .build();
+ ExecutionConfig executionConfig = new ExecutionConfig();
+
+ RuntimeContext context = Mockito.mock(RuntimeContext.class);
+ JdbcBatchStatementExecutor executor = Mockito.mock(JdbcBatchStatementExecutor.class);
+
+ doReturn(executionConfig).when(context).getExecutionConfig();
+ // always throw Exception to trigger close() method
+ doThrow(SQLException.class).when(executor).executeBatch();
+
+ JdbcBatchingOutputFormat<Tuple2<Boolean, Row>, Row, JdbcBatchStatementExecutor<Row>> format =
+ new JdbcBatchingOutputFormat<>(
+ new SimpleJdbcConnectionProvider(options),
+ jdbcExecutionOptions,
+ (ctx) -> executor,
+ (tuple2) -> tuple2.f1);
+
+ format.setRuntimeContext(context);
+ format.open(0, 1);
+
+ try {
+ for (JdbcTestFixture.TestEntry entry : TEST_DATA) {
+ format.writeRecord(Tuple2.of(true, toRow(entry)));
+ }
+ } catch (Exception e) {
+ // artifact failure
+ format.close();
+ } finally {
+ assertNull(format.getConnection());
+ }
+ }
+
private void runTest(boolean exploitParallelism) throws Exception {
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
JdbcInputFormat.JdbcInputFormatBuilder inputBuilder = JdbcInputFormat.buildJdbcInputFormat()
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
index 2d67a4d..677db91 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
@@ -65,6 +65,20 @@
}
@Test
+ public void testUpsertFormatCloseBeforeOpen() throws Exception{
+ JdbcOptions options = JdbcOptions.builder()
+ .setDBUrl(getDbMetadata().getUrl())
+ .setTableName(OUTPUT_TABLE)
+ .build();
+ JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
+ .withTableName(options.getTableName()).withDialect(options.getDialect())
+ .withFieldNames(fieldNames).withKeyFields(keyFields).build();
+ format = new TableJdbcUpsertOutputFormat(new SimpleJdbcConnectionProvider(options), dmlOptions, JdbcExecutionOptions.defaults());
+ // FLINK-17544: There should be no NPE thrown from this method
+ format.close();
+ }
+
+ @Test
public void testJdbcOutputFormat() throws Exception {
JdbcOptions options = JdbcOptions.builder()
.setDBUrl(getDbMetadata().getUrl())