[improve]Unique type table not support 2pc (#51)
* [improve]Unique type table not support 2pc
* fix
diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index 4262edd..c041ed8 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -50,7 +50,8 @@
private final boolean enableCustomJMX;
private final int taskId;
private final boolean enableDelete;
- private final boolean enable2PC;
+ private boolean enable2PC = true;
+ private boolean force2PC;
private boolean autoRedirect = true;
private int requestReadTimeoutMs;
private int requestConnectTimeoutMs;
@@ -91,7 +92,14 @@
ConfigCheckUtils.parseTopicToTableMap(
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
- this.enable2PC = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC));
+ if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) {
+ if (Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC))) {
+ this.enable2PC = true;
+ this.force2PC = true;
+ } else {
+ this.enable2PC = false;
+ }
+ }
this.enableCustomJMX = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT));
this.enableDelete =
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_DELETE));
@@ -116,8 +124,7 @@
Integer.parseInt(config.get(DorisSinkConnectorConfig.REQUEST_READ_TIMEOUT_MS));
}
this.streamLoadProp = getStreamLoadPropFromConfig(config);
- this.enableGroupCommit =
- ConfigCheckUtils.validateGroupCommitMode(getStreamLoadProp(), enable2PC());
+ this.enableGroupCommit = ConfigCheckUtils.validateGroupCommitMode(this);
}
private Properties getStreamLoadPropFromConfig(Map<String, String> config) {
@@ -190,6 +197,14 @@
return enable2PC;
}
+ public boolean force2PC() {
+ return force2PC;
+ }
+
+ public void setEnable2PC(boolean enable2PC) {
+ this.enable2PC = enable2PC;
+ }
+
public boolean enableGroupCommit() {
return enableGroupCommit;
}
diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index a204947..e154db5 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -114,7 +114,6 @@
setFieldToDefaultValues(config, CONVERTER_MODE, CONVERT_MODE_DEFAULT);
setFieldToDefaultValues(
config, DEBEZIUM_SCHEMA_EVOLUTION, DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT);
- setFieldToDefaultValues(config, ENABLE_2PC, String.valueOf(ENABLE_2PC_DEFAULT));
setFieldToDefaultValues(config, JMX_OPT, String.valueOf(JMX_OPT_DEFAULT));
}
diff --git a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
index e7e4a4f..7662aaa 100644
--- a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
+++ b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
@@ -63,6 +63,7 @@
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
/**
* get Doris BE nodes to request.
@@ -111,6 +112,18 @@
return nodeList;
}
+ public static boolean isUniqueKeyType(
+ DorisOptions dorisOptions, String tableName, Logger logger) {
+ try {
+ return UNIQUE_KEYS_TYPE.equals(
+ getSchema(dorisOptions, dorisOptions.getDatabase(), tableName, logger)
+ .getKeysType());
+ } catch (Exception e) {
+ logger.error("Failed to match table unique key types", e);
+ throw new DorisException(e);
+ }
+ }
+
/**
* send request to Doris FE and get response json string.
*
diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index e370253..c6611bc 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -21,10 +21,12 @@
import static org.apache.doris.kafka.connector.writer.LoadConstants.PARTIAL_COLUMNS;
+import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.converter.ConverterMode;
import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
@@ -301,7 +303,11 @@
return false;
}
- public static boolean validateGroupCommitMode(Properties streamLoadProp, boolean enable2PC) {
+ @VisibleForTesting
+ public static boolean validateGroupCommitMode(DorisOptions dorisOptions) {
+ Properties streamLoadProp = dorisOptions.getStreamLoadProp();
+ boolean enable2PC = dorisOptions.enable2PC();
+ boolean force2PC = dorisOptions.force2PC();
if (!streamLoadProp.containsKey(LoadConstants.GROUP_COMMIT)) {
return false;
}
@@ -312,13 +318,18 @@
throw new DorisException(
"The value of group commit mode is an illegal parameter, illegal value="
+ value);
- } else if (enable2PC) {
+ } else if (enable2PC && force2PC) {
throw new DorisException(
"When group commit is enabled, you should disable two phase commit! Please set 'enable.2pc':'false'");
} else if (streamLoadProp.containsKey(PARTIAL_COLUMNS)
&& streamLoadProp.get(PARTIAL_COLUMNS).equals("true")) {
throw new DorisException(
"When group commit is enabled,you can not load data with partial column update.");
+ } else if (enable2PC) {
+ // The default enable2PC is true, in the scenario of group commit, it needs to be closed
+ LOG.info(
+ "The Group Commit mode is on, the two phase commit default value should be disabled.");
+ dorisOptions.setEnable2PC(false);
}
return true;
}
diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
index 1e5fd43..fd8355c 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
@@ -34,6 +34,7 @@
import org.apache.doris.kafka.connector.exception.StreamLoadException;
import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
import org.apache.doris.kafka.connector.model.KafkaRespContent;
+import org.apache.doris.kafka.connector.service.RestService;
import org.apache.doris.kafka.connector.utils.BackendUtils;
import org.apache.doris.kafka.connector.utils.FileNameUtils;
import org.apache.doris.kafka.connector.writer.commit.DorisCommittable;
@@ -66,6 +67,20 @@
BackendUtils backendUtils = BackendUtils.getInstance(dorisOptions, LOG);
this.dorisCommitter = new DorisCommitter(dorisOptions, backendUtils);
this.dorisStreamLoad = new DorisStreamLoad(backendUtils, dorisOptions, topic);
+ checkDorisTableKey(tableName);
+ }
+
+ /** The uniq model has 2pc close by default unless 2pc is forced open. */
+ @VisibleForTesting
+ public void checkDorisTableKey(String tableName) {
+ if (dorisOptions.enable2PC()
+ && !dorisOptions.force2PC()
+ && RestService.isUniqueKeyType(dorisOptions, tableName, LOG)) {
+ LOG.info(
+ "The {} table type is unique model, the two phase commit default value should be disabled.",
+ tableName);
+ dorisOptions.setEnable2PC(false);
+ }
}
public void fetchOffset() {
diff --git a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
index 8f44707..9e46cc1 100644
--- a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
+++ b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
@@ -19,6 +19,8 @@
package org.apache.doris.kafka.connector.cfg;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -292,22 +294,50 @@
}
}
+ private DorisOptions initDorisOptions(Map<String, String> customConfig) {
+ Properties loadProps = new Properties();
+ InputStream stream =
+ this.getClass()
+ .getClassLoader()
+ .getResourceAsStream("doris-connector-sink.properties");
+ try {
+ loadProps.load(stream);
+ DorisSinkConnectorConfig.setDefaultValues((Map) loadProps);
+ loadProps.put("task_id", "1");
+ } catch (IOException e) {
+ throw new DorisException(e);
+ }
+ Map<String, String> config = (Map) loadProps;
+ config.putAll(customConfig);
+ return new DorisOptions(config);
+ }
+
@Test(expected = DorisException.class)
public void testGroupCommitWithIllegalParams() {
Map<String, String> config = getConfig();
config.put("sink.properties.group_commit", "sync_modes");
- Properties streamLoadProp = getStreamLoadPropFromConfig(config);
config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
- ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
+ DorisOptions dorisOptions = initDorisOptions(config);
+ ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
}
@Test(expected = DorisException.class)
public void testGroupCommitModeWithEnable2pc() {
Map<String, String> config = getConfig();
config.put("sink.properties.group_commit", "sync_mode");
- Properties streamLoadProp = getStreamLoadPropFromConfig(config);
- boolean enable2pc = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC));
- ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, enable2pc);
+ config.put(DorisSinkConnectorConfig.ENABLE_2PC, "true");
+ DorisOptions dorisOptions = initDorisOptions(config);
+ ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
+ }
+
+ @Test
+ public void testGroupCommitModeWithDisable2pc() {
+ Map<String, String> config = getConfig();
+ config.put("sink.properties.group_commit", "sync_mode");
+ config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
+ DorisOptions dorisOptions = initDorisOptions(config);
+ ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
+ Assert.assertFalse(dorisOptions.enable2PC());
}
@Test(expected = DorisException.class)
@@ -315,31 +345,16 @@
Map<String, String> config = getConfig();
config.put("sink.properties.group_commit", "sync_mode");
config.put("sink.properties.partial_columns", "true");
- config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
- Properties streamLoadProp = getStreamLoadPropFromConfig(config);
- ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
+ DorisOptions dorisOptions = initDorisOptions(config);
+ ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
}
@Test
public void testGroupCommitWithAsyncMode() {
Map<String, String> config = getConfig();
config.put("sink.properties.group_commit", "async_mode");
- Properties streamLoadProp = getStreamLoadPropFromConfig(config);
- config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
- ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
- }
-
- private Properties getStreamLoadPropFromConfig(Map<String, String> config) {
- Properties streamLoadProp = new Properties();
- for (Map.Entry<String, String> entry : config.entrySet()) {
- if (entry.getKey().startsWith(DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX)) {
- String subKey =
- entry.getKey()
- .substring(
- DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX.length());
- streamLoadProp.put(subKey, entry.getValue());
- }
- }
- return streamLoadProp;
+ DorisOptions dorisOptions = initDorisOptions(config);
+ ConfigCheckUtils.validateGroupCommitMode(dorisOptions);
+ Assert.assertFalse(dorisOptions.enable2PC());
}
}
diff --git a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
index ea54211..252d83a 100644
--- a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
+++ b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
@@ -24,6 +24,7 @@
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import java.io.IOException;
@@ -37,12 +38,15 @@
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
+import org.apache.doris.kafka.connector.service.RestService;
import org.apache.doris.kafka.connector.writer.commit.DorisCommittable;
import org.apache.doris.kafka.connector.writer.load.DorisStreamLoad;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.MockedStatic;
public class TestStreamLoadWriter {
@@ -50,6 +54,7 @@
private DorisOptions dorisOptions;
private final Map<String, String> label2Status = new HashMap<>();
+ private MockedStatic<RestService> mockRestService;
@Before
public void init() throws IOException {
@@ -63,6 +68,7 @@
props.put("task_id", "1");
props.put("name", "sink-connector-test");
dorisOptions = new DorisOptions((Map) props);
+ mockRestService = mockStatic(RestService.class);
fillLabel2Status();
}
@@ -99,6 +105,9 @@
new JdbcConnectionProvider(dorisOptions),
dorisConnectMonitor));
+ mockRestService
+ .when(() -> RestService.isUniqueKeyType(any(), any(), any()))
+ .thenReturn(true);
doReturn(label2Status).when(streamLoadWriter).fetchLabel2Status();
return streamLoadWriter;
}
@@ -128,6 +137,9 @@
dorisConnectMonitor);
streamLoadWriter.setDorisStreamLoad(streamLoad);
+ mockRestService
+ .when(() -> RestService.isUniqueKeyType(any(), any(), any()))
+ .thenReturn(true);
dorisWriter = streamLoadWriter;
dorisWriter.insert(TestRecordBuffer.newSinkRecord("doris-1", 1));
dorisWriter.insert(TestRecordBuffer.newSinkRecord("doris-2", 2));
@@ -148,8 +160,40 @@
dorisOptions,
new JdbcConnectionProvider(dorisOptions),
dorisConnectMonitor);
+
+ mockRestService
+ .when(() -> RestService.isUniqueKeyType(any(), any(), any()))
+ .thenReturn(false);
SinkRecord record = TestRecordBuffer.newSinkRecord("doris-1", 2);
dorisWriter.putBuffer(record);
Assert.assertEquals(2, dorisWriter.getBuffer().getLastOffset());
}
+
+ @Test
+ public void test2PCInUnique() {
+
+ StreamLoadWriter dorisWriter = (StreamLoadWriter) mockStreamLoadWriter(new HashMap<>());
+ // test 2PC in unique key model scenario
+ mockRestService
+ .when(() -> RestService.isUniqueKeyType(any(), any(), any()))
+ .thenReturn(true);
+ dorisWriter.checkDorisTableKey("unique_table");
+ Assert.assertFalse(dorisOptions.enable2PC());
+ }
+
+ @Test
+ public void test2PCNotUnique() {
+ StreamLoadWriter dorisWriter = (StreamLoadWriter) mockStreamLoadWriter(new HashMap<>());
+ // test 2PC in not unique key model scenario
+ mockRestService
+ .when(() -> RestService.isUniqueKeyType(any(), any(), any()))
+ .thenReturn(false);
+ dorisWriter.checkDorisTableKey("not_unique_table");
+ Assert.assertTrue(dorisOptions.enable2PC());
+ }
+
+ @After
+ public void close() {
+ mockRestService.close();
+ }
}