[pulsar-io-jdbc] not set action as insert (#4862)

### Motivation

jdbc sink treat all record as INSERT before #4358 , now it requires an indispensable action property which seems to be a break change, and we can deal records without any action property as INSERT.

### Modifications

treat action not set as INSERT action like before.
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index f0ccec3..fe22a30 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -182,18 +182,29 @@
                 // bind each record value
                 for (Record<T> record : swapList) {
                     String action = record.getProperties().get(ACTION);
-                    if (action != null && action.equals(DELETE)) {
-                        bindValue(deleteStatment, record, action);
-                        count += 1;
-                        deleteStatment.execute();
-                    } else if (action != null && action.equals(UPDATE)) {
-                        bindValue(updateStatment, record, action);
-                        count += 1;
-                        updateStatment.execute();
-                    } else if (action != null && action.equals(INSERT)){
-                        bindValue(insertStatement, record, action);
-                        count += 1;
-                        insertStatement.execute();
+                    if (action == null) {
+                        action = INSERT;
+                    }
+                    switch (action) {
+                        case DELETE:
+                            bindValue(deleteStatment, record, action);
+                            count += 1;
+                            deleteStatment.execute();
+                            break;
+                        case UPDATE:
+                            bindValue(updateStatment, record, action);
+                            count += 1;
+                            updateStatment.execute();
+                            break;
+                        case INSERT:
+                            bindValue(insertStatement, record, action);
+                            count += 1;
+                            insertStatement.execute();
+                            break;
+                        default:
+                            String msg = String.format("Unsupported action %s, can be one of %s, or not set which indicate %s",
+                                                       action, Arrays.asList(INSERT, UPDATE, DELETE), INSERT);
+                            throw new IllegalArgumentException(msg);
                     }
                 }
                 connection.commit();
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index ce4aae4..84c4406 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -19,9 +19,13 @@
 
 package org.apache.pulsar.io.jdbc;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -40,6 +44,7 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -96,7 +101,6 @@
         conf.put("batchSize", 1);
 
         jdbcSink = new JdbcAutoSchemaSink();
-        jdbcSink = new JdbcAutoSchemaSink();
 
         // open should success
         jdbcSink.open(conf, null);
@@ -109,8 +113,7 @@
         jdbcSink.close();
     }
 
-    @Test
-    public void TestOpenAndWriteSink() throws Exception {
+    private void testOpenAndWriteSink(Map<String, String> actionProperties) throws Exception {
         Message<GenericRecord> insertMessage = mock(MessageImpl.class);
         GenericSchema<GenericRecord> genericAvroSchema;
         // prepare a foo Record
@@ -121,19 +124,16 @@
         AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
         byte[] insertBytes = schema.encode(insertObj);
-
+        CompletableFuture<Void> future = new CompletableFuture<>();
         Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
             .message(insertMessage)
             .topicName("fake_topic_name")
-            .ackFunction(() -> {})
+            .ackFunction(() -> future.complete(null))
             .build();
 
         genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
-
-        Map<String, String> insertProperties = Maps.newHashMap();
-        insertProperties.put("ACTION", "INSERT");
         when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
-        when(insertMessage.getProperties()).thenReturn(insertProperties);
+        when(insertMessage.getProperties()).thenReturn(actionProperties);
         log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
                 insertObj.toString(),
                 insertMessage.getValue().toString(),
@@ -143,7 +143,7 @@
         jdbcSink.write(insertRecord);
         log.info("executed write");
         // sleep to wait backend flush complete
-        Thread.sleep(1000);
+        future.get(1, TimeUnit.SECONDS);
 
         // value has been written to db, read it out and verify.
         String querySql = "SELECT * FROM " + tableName + " WHERE field3=3";
@@ -156,6 +156,25 @@
 
     }
 
+    @Test
+    public void TestInsertAction() throws Exception {
+        testOpenAndWriteSink(ImmutableMap.of("ACTION", "INSERT"));
+    }
+
+    @Test
+    public void TestNoAction() throws Exception {
+        testOpenAndWriteSink(ImmutableMap.of());
+    }
+
+    @Test
+    public void TestUnknownAction() throws Exception {
+        Record<GenericRecord> recordRecord = mock(Record.class);
+        when(recordRecord.getProperties()).thenReturn(ImmutableMap.of("ACTION", "UNKNOWN"));
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doAnswer(a -> future.complete(null)).when(recordRecord).fail();
+        jdbcSink.write(recordRecord);
+        future.get(1, TimeUnit.SECONDS);
+    }
 
     @Test
     public void TestUpdateAction() throws Exception {
@@ -169,10 +188,11 @@
 
         byte[] updateBytes = schema.encode(updateObj);
         Message<GenericRecord> updateMessage = mock(MessageImpl.class);
+        CompletableFuture<Void> future = new CompletableFuture<>();
         Record<GenericRecord> updateRecord = PulsarRecord.<GenericRecord>builder()
                 .message(updateMessage)
                 .topicName("fake_topic_name")
-                .ackFunction(() -> {})
+                .ackFunction(() -> future.complete(null))
                 .build();
 
         GenericSchema<GenericRecord> updateGenericAvroSchema;
@@ -188,8 +208,7 @@
                 updateRecord.getValue().toString());
 
         jdbcSink.write(updateRecord);
-
-        Thread.sleep(1000);
+        future.get(1, TimeUnit.SECONDS);
 
         // value has been written to db, read it out and verify.
         String updateQuerySql = "SELECT * FROM " + tableName + " WHERE field3=4";
@@ -210,10 +229,11 @@
 
         byte[] deleteBytes = schema.encode(deleteObj);
         Message<GenericRecord> deleteMessage = mock(MessageImpl.class);
+        CompletableFuture<Void> future = new CompletableFuture<>();
         Record<GenericRecord> deleteRecord = PulsarRecord.<GenericRecord>builder()
                 .message(deleteMessage)
                 .topicName("fake_topic_name")
-                .ackFunction(() -> {})
+                .ackFunction(() -> future.complete(null))
                 .build();
 
         GenericSchema<GenericRecord> deleteGenericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
@@ -228,8 +248,7 @@
                 deleteRecord.getValue().toString());
 
         jdbcSink.write(deleteRecord);
-
-        Thread.sleep(1000);
+        future.get(1, TimeUnit.SECONDS);
 
         // value has been written to db, read it out and verify.
         String deleteQuerySql = "SELECT * FROM " + tableName + " WHERE field3=5";