Fixing JDBC sink to handle null fields. Also added new unit tests (#6848)
### Motivation
JDBC sink does not handle `null` fields. For example, the field `example` can potentially be null. The schema registered in Pulsar allows for it, and the table schema in MySQL has a column of the same name, is configured as double and also allows nulls. When messages are sent to the JDBC sink without that field, an exception like this is seen:
```
21:08:38.472 [pool-5-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception
java.sql.SQLException: Data truncated for column 'example' at row 1
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:127) ~[mysql-connector-java-8.0.11.jar:8.0.11]
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:95) ~[mysql-connector-java-8.0.11.jar:8.0.11]
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[mysql-connector-java-8.0.11.jar:8.0.11]
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:960) ~[mysql-connector-java-8.0.11.jar:8.0.11]
at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:388) ~[mysql-connector-java-8.0.11.jar:8.0.11]
at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:202) ~[pulsar-io-jdbc-2.5.0.nar-unpacked/:?]
at org.apache.pulsar.io.jdbc.JdbcAbstractSink.lambda$open$0(JdbcAbstractSink.java:108) ~[pulsar-io-jdbc-2.5.0.nar-unpacked/:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_232]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_232]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_232]
```
Looking at the code for the JDBC sink, there was no handling of the case where the field was `null`. The PR adds code to handle that case. It also adds unit tests to cover this for both binary and JSON encoding of the schema.
### Modifications
When the sink encounters a `null` field value it uses the `setColumnNull` method to properly reflect this in the database row.
2 files changed
tree: ed1be732e63653c5119e96d3ccee2005229b3234
- pulsar-io/