[pulsar-io-jdbc] not set action as insert (#4862)
### Motivation
jdbc sink treat all record as INSERT before #4358 , now it requires an indispensable action property which seems to be a break change, and we can deal records without any action property as INSERT.
### Modifications
treat action not set as INSERT action like before.
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index f0ccec3..fe22a30 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -182,18 +182,29 @@
// bind each record value
for (Record<T> record : swapList) {
String action = record.getProperties().get(ACTION);
- if (action != null && action.equals(DELETE)) {
- bindValue(deleteStatment, record, action);
- count += 1;
- deleteStatment.execute();
- } else if (action != null && action.equals(UPDATE)) {
- bindValue(updateStatment, record, action);
- count += 1;
- updateStatment.execute();
- } else if (action != null && action.equals(INSERT)){
- bindValue(insertStatement, record, action);
- count += 1;
- insertStatement.execute();
+ if (action == null) {
+ action = INSERT;
+ }
+ switch (action) {
+ case DELETE:
+ bindValue(deleteStatment, record, action);
+ count += 1;
+ deleteStatment.execute();
+ break;
+ case UPDATE:
+ bindValue(updateStatment, record, action);
+ count += 1;
+ updateStatment.execute();
+ break;
+ case INSERT:
+ bindValue(insertStatement, record, action);
+ count += 1;
+ insertStatement.execute();
+ break;
+ default:
+ String msg = String.format("Unsupported action %s, can be one of %s, or not set which indicate %s",
+ action, Arrays.asList(INSERT, UPDATE, DELETE), INSERT);
+ throw new IllegalArgumentException(msg);
}
}
connection.commit();
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index ce4aae4..84c4406 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -19,9 +19,13 @@
package org.apache.pulsar.io.jdbc;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -40,6 +44,7 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -96,7 +101,6 @@
conf.put("batchSize", 1);
jdbcSink = new JdbcAutoSchemaSink();
- jdbcSink = new JdbcAutoSchemaSink();
// open should success
jdbcSink.open(conf, null);
@@ -109,8 +113,7 @@
jdbcSink.close();
}
- @Test
- public void TestOpenAndWriteSink() throws Exception {
+ private void testOpenAndWriteSink(Map<String, String> actionProperties) throws Exception {
Message<GenericRecord> insertMessage = mock(MessageImpl.class);
GenericSchema<GenericRecord> genericAvroSchema;
// prepare a foo Record
@@ -121,19 +124,16 @@
AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
byte[] insertBytes = schema.encode(insertObj);
-
+ CompletableFuture<Void> future = new CompletableFuture<>();
Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
.message(insertMessage)
.topicName("fake_topic_name")
- .ackFunction(() -> {})
+ .ackFunction(() -> future.complete(null))
.build();
genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
-
- Map<String, String> insertProperties = Maps.newHashMap();
- insertProperties.put("ACTION", "INSERT");
when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
- when(insertMessage.getProperties()).thenReturn(insertProperties);
+ when(insertMessage.getProperties()).thenReturn(actionProperties);
log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
insertObj.toString(),
insertMessage.getValue().toString(),
@@ -143,7 +143,7 @@
jdbcSink.write(insertRecord);
log.info("executed write");
// sleep to wait backend flush complete
- Thread.sleep(1000);
+ future.get(1, TimeUnit.SECONDS);
// value has been written to db, read it out and verify.
String querySql = "SELECT * FROM " + tableName + " WHERE field3=3";
@@ -156,6 +156,25 @@
}
+ @Test
+ public void TestInsertAction() throws Exception {
+ testOpenAndWriteSink(ImmutableMap.of("ACTION", "INSERT"));
+ }
+
+ @Test
+ public void TestNoAction() throws Exception {
+ testOpenAndWriteSink(ImmutableMap.of());
+ }
+
+ @Test
+ public void TestUnknownAction() throws Exception {
+ Record<GenericRecord> recordRecord = mock(Record.class);
+ when(recordRecord.getProperties()).thenReturn(ImmutableMap.of("ACTION", "UNKNOWN"));
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ doAnswer(a -> future.complete(null)).when(recordRecord).fail();
+ jdbcSink.write(recordRecord);
+ future.get(1, TimeUnit.SECONDS);
+ }
@Test
public void TestUpdateAction() throws Exception {
@@ -169,10 +188,11 @@
byte[] updateBytes = schema.encode(updateObj);
Message<GenericRecord> updateMessage = mock(MessageImpl.class);
+ CompletableFuture<Void> future = new CompletableFuture<>();
Record<GenericRecord> updateRecord = PulsarRecord.<GenericRecord>builder()
.message(updateMessage)
.topicName("fake_topic_name")
- .ackFunction(() -> {})
+ .ackFunction(() -> future.complete(null))
.build();
GenericSchema<GenericRecord> updateGenericAvroSchema;
@@ -188,8 +208,7 @@
updateRecord.getValue().toString());
jdbcSink.write(updateRecord);
-
- Thread.sleep(1000);
+ future.get(1, TimeUnit.SECONDS);
// value has been written to db, read it out and verify.
String updateQuerySql = "SELECT * FROM " + tableName + " WHERE field3=4";
@@ -210,10 +229,11 @@
byte[] deleteBytes = schema.encode(deleteObj);
Message<GenericRecord> deleteMessage = mock(MessageImpl.class);
+ CompletableFuture<Void> future = new CompletableFuture<>();
Record<GenericRecord> deleteRecord = PulsarRecord.<GenericRecord>builder()
.message(deleteMessage)
.topicName("fake_topic_name")
- .ackFunction(() -> {})
+ .ackFunction(() -> future.complete(null))
.build();
GenericSchema<GenericRecord> deleteGenericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
@@ -228,8 +248,7 @@
deleteRecord.getValue().toString());
jdbcSink.write(deleteRecord);
-
- Thread.sleep(1000);
+ future.get(1, TimeUnit.SECONDS);
// value has been written to db, read it out and verify.
String deleteQuerySql = "SELECT * FROM " + tableName + " WHERE field3=5";