[bug] Fix abort failure bug (#359)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 5e2a697..d049a2b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -154,20 +154,20 @@
}
/**
- * try to discard pending transactions with labels beginning with labelSuffix.
+ * try to discard pending transactions with labels beginning with labelPrefix.
*
- * @param labelSuffix the suffix of the stream load.
+ * @param labelPrefix the prefix of the stream load.
* @param chkID checkpoint id of task.
* @throws Exception
*/
- public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
+ public void abortPreCommit(String labelPrefix, long chkID) throws Exception {
long startChkID = chkID;
- LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
+ LOG.info("abort for labelPrefix {}. start chkId {}.", labelPrefix, chkID);
while (true) {
try {
- // TODO: According to label abort txn. Currently, it can only be aborted based on
- // txnid,
- // so we must first request a streamload based on the label to get the txnid.
+ // TODO: According to label abort txn.
+ // Currently, it can only be aborted based on txnid, so we must
+ // first request a streamload based on the label to get the txnid.
String label = labelGenerator.generateTableLabel(startChkID);
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(loadUrlStr)
@@ -214,7 +214,7 @@
throw e;
}
}
- LOG.info("abort for labelSuffix {} finished", labelSuffix);
+ LOG.info("abort for labelPrefix {} finished", labelPrefix);
}
/**
@@ -315,14 +315,18 @@
ObjectMapper mapper = new ObjectMapper();
String loadResult = EntityUtils.toString(response.getEntity());
+ LOG.info("abort Result {}", loadResult);
Map<String, String> res =
mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {});
if (!SUCCESS.equals(res.get("status"))) {
- if (ResponseUtil.isCommitted(res.get("msg"))) {
+ String msg = res.get("msg");
+ if (msg != null && ResponseUtil.isCommitted(msg)) {
throw new DorisException(
"try abort committed transaction, " + "do you recover from old savepoint?");
}
- LOG.warn("Fail to abort transaction. txnId: {}, error: {}", txnID, res.get("msg"));
+
+ LOG.error("Fail to abort transaction. txnId: {}, error: {}", txnID, msg);
+ throw new DorisException("Fail to abort transaction, " + loadResult);
}
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index 5ead3e8..1352a26 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -20,6 +20,7 @@
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.sink.HttpTestUtil;
import org.apache.doris.flink.sink.OptionUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -88,7 +89,7 @@
dorisStreamLoad.abortTransaction(anyLong());
}
- @Test
+ @Test(expected = DorisException.class)
public void testAbortTransactionFailed() throws Exception {
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
CloseableHttpResponse abortFailedResponse =