[improve] commit message compatible (#220)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java
index 1e9a9c0..8839a1a 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java
@@ -24,13 +24,11 @@
*/
public class ResponseUtil {
public static final Pattern LABEL_EXIST_PATTERN =
- Pattern.compile("errCode = 2, detailMessage = Label \\[(.*)\\] " +
- "has already been used, relate to txn \\[(\\d+)\\]");
+ Pattern.compile("Label \\[(.*)\\] has already been used, relate to txn \\[(\\d+)\\]");
public static final Pattern COMMITTED_PATTERN =
- Pattern.compile("errCode = 2, detailMessage = transaction \\[(\\d+)\\] " +
- "is already \\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed.");
+ Pattern.compile("transaction \\[(\\d+)\\] is already \\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed.");
public static boolean isCommitted(String msg) {
- return COMMITTED_PATTERN.matcher(msg).matches();
+ return COMMITTED_PATTERN.matcher(msg).find();
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 295a0be..b61d174 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -33,7 +33,6 @@
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
@@ -171,7 +170,7 @@
return Collections.emptyList();
}
long txnId = respContent.getTxnId();
- return ImmutableList.of(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
+ return Collections.singletonList(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
}
@Override
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 978fa27..bccb8b7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -111,6 +111,7 @@
options.add(USERNAME);
options.add(PASSWORD);
options.add(JDBC_URL);
+ options.add(AUTO_REDIRECT);
options.add(DORIS_READ_FIELD);
options.add(DORIS_FILTER_QUERY);