[improve]Unique type table not support 2pc (#51)

* [improve]Unique type table not support 2pc

* fix
diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index 4262edd..c041ed8 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -50,7 +50,8 @@
     private final boolean enableCustomJMX;
     private final int taskId;
     private final boolean enableDelete;
-    private final boolean enable2PC;
+    private boolean enable2PC = true;
+    private boolean force2PC;
     private boolean autoRedirect = true;
     private int requestReadTimeoutMs;
     private int requestConnectTimeoutMs;
@@ -91,7 +92,14 @@
                 ConfigCheckUtils.parseTopicToTableMap(
                         config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
 
-        this.enable2PC = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC));
+        if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) {
+            if (Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC))) {
+                this.enable2PC = true;
+                this.force2PC = true;
+            } else {
+                this.enable2PC = false;
+            }
+        }
         this.enableCustomJMX = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT));
         this.enableDelete =
                 Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_DELETE));
@@ -116,8 +124,7 @@
                     Integer.parseInt(config.get(DorisSinkConnectorConfig.REQUEST_READ_TIMEOUT_MS));
         }
         this.streamLoadProp = getStreamLoadPropFromConfig(config);
-        this.enableGroupCommit =
-                ConfigCheckUtils.validateGroupCommitMode(getStreamLoadProp(), enable2PC());
+        this.enableGroupCommit = ConfigCheckUtils.validateGroupCommitMode(this);
     }
 
     private Properties getStreamLoadPropFromConfig(Map<String, String> config) {
@@ -190,6 +197,14 @@
         return enable2PC;
     }
 
+    public boolean force2PC() {
+        return force2PC;
+    }
+
+    public void setEnable2PC(boolean enable2PC) {
+        this.enable2PC = enable2PC;
+    }
+
     public boolean enableGroupCommit() {
         return enableGroupCommit;
     }
diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index a204947..e154db5 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -114,7 +114,6 @@
         setFieldToDefaultValues(config, CONVERTER_MODE, CONVERT_MODE_DEFAULT);
         setFieldToDefaultValues(
                 config, DEBEZIUM_SCHEMA_EVOLUTION, DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT);
-        setFieldToDefaultValues(config, ENABLE_2PC, String.valueOf(ENABLE_2PC_DEFAULT));
         setFieldToDefaultValues(config, JMX_OPT, String.valueOf(JMX_OPT_DEFAULT));
     }
 
diff --git a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
index e7e4a4f..7662aaa 100644
--- a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
+++ b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
@@ -63,6 +63,7 @@
     private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
     private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
 
     /**
      * get Doris BE nodes to request.
@@ -111,6 +112,18 @@
         return nodeList;
     }
 
+    public static boolean isUniqueKeyType(
+            DorisOptions dorisOptions, String tableName, Logger logger) {
+        try {
+            return UNIQUE_KEYS_TYPE.equals(
+                    getSchema(dorisOptions, dorisOptions.getDatabase(), tableName, logger)
+                            .getKeysType());
+        } catch (Exception e) {
+            logger.error("Failed to match table unique key types", e);
+            throw new DorisException(e);
+        }
+    }
+
     /**
      * send request to Doris FE and get response json string.
      *
diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index e370253..c6611bc 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -21,10 +21,12 @@
 
 import static org.apache.doris.kafka.connector.writer.LoadConstants.PARTIAL_COLUMNS;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
 import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.doris.kafka.connector.converter.ConverterMode;
 import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
@@ -301,7 +303,11 @@
         return false;
     }
 
-    public static boolean validateGroupCommitMode(Properties streamLoadProp, boolean enable2PC) {
+    @VisibleForTesting
+    public static boolean validateGroupCommitMode(DorisOptions dorisOptions) {
+        Properties streamLoadProp = dorisOptions.getStreamLoadProp();
+        boolean enable2PC = dorisOptions.enable2PC();
+        boolean force2PC = dorisOptions.force2PC();
         if (!streamLoadProp.containsKey(LoadConstants.GROUP_COMMIT)) {
             return false;
         }
@@ -312,13 +318,18 @@
             throw new DorisException(
                     "The value of group commit mode is an illegal parameter, illegal value="
                             + value);
-        } else if (enable2PC) {
+        } else if (enable2PC && force2PC) {
             throw new DorisException(
                     "When group commit is enabled, you should disable two phase commit! Please  set 'enable.2pc':'false'");
         } else if (streamLoadProp.containsKey(PARTIAL_COLUMNS)
                 && streamLoadProp.get(PARTIAL_COLUMNS).equals("true")) {
             throw new DorisException(
                     "When group commit is enabled,you can not load data with partial column update.");
+        } else if (enable2PC) {
+            // The default enable2PC is true, in the scenario of group commit, it needs to be closed
+            LOG.info(
+                    "The Group Commit mode is on, the two phase commit default value should be disabled.");
+            dorisOptions.setEnable2PC(false);
         }
         return true;
     }
diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
index 1e5fd43..fd8355c 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
@@ -34,6 +34,7 @@
 import org.apache.doris.kafka.connector.exception.StreamLoadException;
 import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
 import org.apache.doris.kafka.connector.model.KafkaRespContent;
+import org.apache.doris.kafka.connector.service.RestService;
 import org.apache.doris.kafka.connector.utils.BackendUtils;
 import org.apache.doris.kafka.connector.utils.FileNameUtils;
 import org.apache.doris.kafka.connector.writer.commit.DorisCommittable;
@@ -66,6 +67,20 @@
         BackendUtils backendUtils = BackendUtils.getInstance(dorisOptions, LOG);
         this.dorisCommitter = new DorisCommitter(dorisOptions, backendUtils);
         this.dorisStreamLoad = new DorisStreamLoad(backendUtils, dorisOptions, topic);
+        checkDorisTableKey(tableName);
+    }
+
+    /** The uniq model has 2pc close by default unless 2pc is forced open. */
+    @VisibleForTesting
+    public void checkDorisTableKey(String tableName) {
+        if (dorisOptions.enable2PC()
+                && !dorisOptions.force2PC()
+                && RestService.isUniqueKeyType(dorisOptions, tableName, LOG)) {
+            LOG.info(
+                    "The {} table type is unique model, the two phase commit default value should be disabled.",
+                    tableName);
+            dorisOptions.setEnable2PC(false);
+        }
     }
 
     public void fetchOffset() {
diff --git a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
index 8f44707..9e46cc1 100644
--- a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
+++ b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
@@ -19,6 +19,8 @@
 
 package org.apache.doris.kafka.connector.cfg;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -292,22 +294,50 @@
         }
     }
 
+    private DorisOptions initDorisOptions(Map<String, String> customConfig) {
+        Properties loadProps = new Properties();
+        InputStream stream =
+                this.getClass()
+                        .getClassLoader()
+                        .getResourceAsStream("doris-connector-sink.properties");
+        try {
+            loadProps.load(stream);
+            DorisSinkConnectorConfig.setDefaultValues((Map) loadProps);
+            loadProps.put("task_id", "1");
+        } catch (IOException e) {
+            throw new DorisException(e);
+        }
+        Map<String, String> config = (Map) loadProps;
+        config.putAll(customConfig);
+        return new DorisOptions(config);
+    }
+
     @Test(expected = DorisException.class)
     public void testGroupCommitWithIllegalParams() {
         Map<String, String> config = getConfig();
         config.put("sink.properties.group_commit", "sync_modes");
-        Properties streamLoadProp = getStreamLoadPropFromConfig(config);
         config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
-        ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
+        DorisOptions dorisOptions = initDorisOptions(config);
+        ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
     }
 
     @Test(expected = DorisException.class)
     public void testGroupCommitModeWithEnable2pc() {
         Map<String, String> config = getConfig();
         config.put("sink.properties.group_commit", "sync_mode");
-        Properties streamLoadProp = getStreamLoadPropFromConfig(config);
-        boolean enable2pc = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC));
-        ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, enable2pc);
+        config.put(DorisSinkConnectorConfig.ENABLE_2PC, "true");
+        DorisOptions dorisOptions = initDorisOptions(config);
+        ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
+    }
+
+    @Test
+    public void testGroupCommitModeWithDisable2pc() {
+        Map<String, String> config = getConfig();
+        config.put("sink.properties.group_commit", "sync_mode");
+        config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
+        DorisOptions dorisOptions = initDorisOptions(config);
+        ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
+        Assert.assertFalse(dorisOptions.enable2PC());
     }
 
     @Test(expected = DorisException.class)
@@ -315,31 +345,16 @@
         Map<String, String> config = getConfig();
         config.put("sink.properties.group_commit", "sync_mode");
         config.put("sink.properties.partial_columns", "true");
-        config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
-        Properties streamLoadProp = getStreamLoadPropFromConfig(config);
-        ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
+        DorisOptions dorisOptions = initDorisOptions(config);
+        ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
     }
 
     @Test
     public void testGroupCommitWithAsyncMode() {
         Map<String, String> config = getConfig();
         config.put("sink.properties.group_commit", "async_mode");
-        Properties streamLoadProp = getStreamLoadPropFromConfig(config);
-        config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
-        ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
-    }
-
-    private Properties getStreamLoadPropFromConfig(Map<String, String> config) {
-        Properties streamLoadProp = new Properties();
-        for (Map.Entry<String, String> entry : config.entrySet()) {
-            if (entry.getKey().startsWith(DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX)) {
-                String subKey =
-                        entry.getKey()
-                                .substring(
-                                        DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX.length());
-                streamLoadProp.put(subKey, entry.getValue());
-            }
-        }
-        return streamLoadProp;
+        DorisOptions dorisOptions = initDorisOptions(config);
+        ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
+        Assert.assertFalse(dorisOptions.enable2PC());
     }
 }
diff --git a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
index ea54211..252d83a 100644
--- a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
+++ b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
@@ -24,6 +24,7 @@
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
@@ -37,12 +38,15 @@
 import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
 import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
+import org.apache.doris.kafka.connector.service.RestService;
 import org.apache.doris.kafka.connector.writer.commit.DorisCommittable;
 import org.apache.doris.kafka.connector.writer.load.DorisStreamLoad;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.MockedStatic;
 
 public class TestStreamLoadWriter {
 
@@ -50,6 +54,7 @@
     private DorisOptions dorisOptions;
 
     private final Map<String, String> label2Status = new HashMap<>();
+    private MockedStatic<RestService> mockRestService;
 
     @Before
     public void init() throws IOException {
@@ -63,6 +68,7 @@
         props.put("task_id", "1");
         props.put("name", "sink-connector-test");
         dorisOptions = new DorisOptions((Map) props);
+        mockRestService = mockStatic(RestService.class);
         fillLabel2Status();
     }
 
@@ -99,6 +105,9 @@
                                 new JdbcConnectionProvider(dorisOptions),
                                 dorisConnectMonitor));
 
+        mockRestService
+                .when(() -> RestService.isUniqueKeyType(any(), any(), any()))
+                .thenReturn(true);
         doReturn(label2Status).when(streamLoadWriter).fetchLabel2Status();
         return streamLoadWriter;
     }
@@ -128,6 +137,9 @@
                         dorisConnectMonitor);
         streamLoadWriter.setDorisStreamLoad(streamLoad);
 
+        mockRestService
+                .when(() -> RestService.isUniqueKeyType(any(), any(), any()))
+                .thenReturn(true);
         dorisWriter = streamLoadWriter;
         dorisWriter.insert(TestRecordBuffer.newSinkRecord("doris-1", 1));
         dorisWriter.insert(TestRecordBuffer.newSinkRecord("doris-2", 2));
@@ -148,8 +160,40 @@
                         dorisOptions,
                         new JdbcConnectionProvider(dorisOptions),
                         dorisConnectMonitor);
+
+        mockRestService
+                .when(() -> RestService.isUniqueKeyType(any(), any(), any()))
+                .thenReturn(false);
         SinkRecord record = TestRecordBuffer.newSinkRecord("doris-1", 2);
         dorisWriter.putBuffer(record);
         Assert.assertEquals(2, dorisWriter.getBuffer().getLastOffset());
     }
+
+    @Test
+    public void test2PCInUnique() {
+
+        StreamLoadWriter dorisWriter = (StreamLoadWriter) mockStreamLoadWriter(new HashMap<>());
+        // test 2PC in unique key model scenario
+        mockRestService
+                .when(() -> RestService.isUniqueKeyType(any(), any(), any()))
+                .thenReturn(true);
+        dorisWriter.checkDorisTableKey("unique_table");
+        Assert.assertFalse(dorisOptions.enable2PC());
+    }
+
+    @Test
+    public void test2PCNotUnique() {
+        StreamLoadWriter dorisWriter = (StreamLoadWriter) mockStreamLoadWriter(new HashMap<>());
+        // test 2PC in not unique key model scenario
+        mockRestService
+                .when(() -> RestService.isUniqueKeyType(any(), any(), any()))
+                .thenReturn(false);
+        dorisWriter.checkDorisTableKey("not_unique_table");
+        Assert.assertTrue(dorisOptions.enable2PC());
+    }
+
+    @After
+    public void close() {
+        mockRestService.close();
+    }
 }