[ISSUE #67] Bugfix for consume blocking

Bugfix for consume blocking

Co-authored-by: 高思伟 <siwei.gao@amh-group.com>
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 29272d8..97037a1 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -353,7 +353,8 @@
                                         }
                                         return true;
                                     },
-                                    "RuntimeException"));
+                                    "RuntimeException",
+                                    runningChecker));
         }
 
         awaitTermination();
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
index e53caf1..aae2148 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.flink.legacy.common.util;
 
+import org.apache.rocketmq.flink.legacy.RunningChecker;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +41,11 @@
     }
 
     public static <T> T call(Callable<T> callable, String errorMsg) throws RuntimeException {
+        return call(callable, errorMsg, null);
+    }
+
+    public static <T> T call(Callable<T> callable, String errorMsg, RunningChecker runningChecker)
+            throws RuntimeException {
         long backoff = INITIAL_BACKOFF;
         int retries = 0;
         do {
@@ -46,6 +53,9 @@
                 return callable.call();
             } catch (Exception ex) {
                 if (retries >= MAX_ATTEMPTS) {
+                    if (null != runningChecker) {
+                        runningChecker.setRunning(false);
+                    }
                     throw new RuntimeException(ex);
                 }
                 log.error("{}, retry {}/{}", errorMsg, retries, MAX_ATTEMPTS, ex);
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtilTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtilTest.java
new file mode 100644
index 0000000..0b7178b
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtilTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.legacy.common.util;
+
+import org.apache.rocketmq.flink.legacy.RunningChecker;
+
+import junit.framework.TestCase;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** Tests for {@link RetryUtil}. */
+@Slf4j
+public class RetryUtilTest extends TestCase {
+
+    public void testCall() {
+        try {
+            User user = new User();
+            RunningChecker runningChecker = new RunningChecker();
+            runningChecker.setRunning(true);
+            ExecutorService executorService = Executors.newCachedThreadPool();
+            executorService.execute(
+                    () ->
+                            RetryUtil.call(
+                                    () -> {
+                                        user.setName("test");
+                                        user.setAge(Integer.parseInt("12e"));
+                                        return true;
+                                    },
+                                    "Something is error",
+                                    runningChecker));
+            Thread.sleep(10000);
+            executorService.shutdown();
+            log.info("Thread has finished");
+            assertEquals(0, user.age);
+            assertEquals("test", user.name);
+            assertEquals(false, runningChecker.isRunning());
+        } catch (Exception e) {
+            log.warn("Exception has been caught");
+        }
+    }
+
+    @Data
+    public class User {
+        String name;
+        int age;
+    }
+}