[bug] Fix abort failure bug (#359)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 5e2a697..d049a2b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -154,20 +154,20 @@
     }
 
     /**
-     * try to discard pending transactions with labels beginning with labelSuffix.
+     * try to discard pending transactions with labels beginning with labelPrefix.
      *
-     * @param labelSuffix the suffix of the stream load.
+     * @param labelPrefix the prefix of the stream load.
      * @param chkID checkpoint id of task.
      * @throws Exception
      */
-    public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
+    public void abortPreCommit(String labelPrefix, long chkID) throws Exception {
         long startChkID = chkID;
-        LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
+        LOG.info("abort for labelPrefix {}. start chkId {}.", labelPrefix, chkID);
         while (true) {
             try {
-                // TODO: According to label abort txn. Currently, it can only be aborted based on
-                // txnid,
-                //  so we must first request a streamload based on the label to get the txnid.
+                // TODO: According to label abort txn.
+                //  Currently, it can only be aborted based on txnid, so we must
+                //  first request a streamload based on the label to get the txnid.
                 String label = labelGenerator.generateTableLabel(startChkID);
                 HttpPutBuilder builder = new HttpPutBuilder();
                 builder.setUrl(loadUrlStr)
@@ -214,7 +214,7 @@
                 throw e;
             }
         }
-        LOG.info("abort for labelSuffix {} finished", labelSuffix);
+        LOG.info("abort for labelPrefix {} finished", labelPrefix);
     }
 
     /**
@@ -315,14 +315,18 @@
 
         ObjectMapper mapper = new ObjectMapper();
         String loadResult = EntityUtils.toString(response.getEntity());
+        LOG.info("abort Result {}", loadResult);
         Map<String, String> res =
                 mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {});
         if (!SUCCESS.equals(res.get("status"))) {
-            if (ResponseUtil.isCommitted(res.get("msg"))) {
+            String msg = res.get("msg");
+            if (msg != null && ResponseUtil.isCommitted(msg)) {
                 throw new DorisException(
                         "try abort committed transaction, " + "do you recover from old savepoint?");
             }
-            LOG.warn("Fail to abort transaction. txnId: {}, error: {}", txnID, res.get("msg"));
+
+            LOG.error("Fail to abort transaction. txnId: {}, error: {}", txnID, msg);
+            throw new DorisException("Fail to abort transaction, " + loadResult);
         }
     }
 
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index 5ead3e8..1352a26 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -20,6 +20,7 @@
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.sink.HttpTestUtil;
 import org.apache.doris.flink.sink.OptionUtils;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -88,7 +89,7 @@
         dorisStreamLoad.abortTransaction(anyLong());
     }
 
-    @Test
+    @Test(expected = DorisException.class)
     public void testAbortTransactionFailed() throws Exception {
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
         CloseableHttpResponse abortFailedResponse =