[ISSUE #67] Bugfix for consume blocking
Bugfix for consume blocking
Co-authored-by: 高思伟 <siwei.gao@amh-group.com>
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 29272d8..97037a1 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -353,7 +353,8 @@
}
return true;
},
- "RuntimeException"));
+ "RuntimeException",
+ runningChecker));
}
awaitTermination();
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
index e53caf1..aae2148 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.flink.legacy.common.util;
+import org.apache.rocketmq.flink.legacy.RunningChecker;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +41,11 @@
}
public static <T> T call(Callable<T> callable, String errorMsg) throws RuntimeException {
+ return call(callable, errorMsg, null);
+ }
+
+ public static <T> T call(Callable<T> callable, String errorMsg, RunningChecker runningChecker)
+ throws RuntimeException {
long backoff = INITIAL_BACKOFF;
int retries = 0;
do {
@@ -46,6 +53,9 @@
return callable.call();
} catch (Exception ex) {
if (retries >= MAX_ATTEMPTS) {
+ if (null != runningChecker) {
+ runningChecker.setRunning(false);
+ }
throw new RuntimeException(ex);
}
log.error("{}, retry {}/{}", errorMsg, retries, MAX_ATTEMPTS, ex);
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtilTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtilTest.java
new file mode 100644
index 0000000..0b7178b
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtilTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.legacy.common.util;
+
+import org.apache.rocketmq.flink.legacy.RunningChecker;
+
+import junit.framework.TestCase;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** Tests for {@link RetryUtil}. */
+@Slf4j
+public class RetryUtilTest extends TestCase {
+
+ public void testCall() {
+ try {
+ User user = new User();
+ RunningChecker runningChecker = new RunningChecker();
+ runningChecker.setRunning(true);
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ executorService.execute(
+ () ->
+ RetryUtil.call(
+ () -> {
+ user.setName("test");
+ user.setAge(Integer.parseInt("12e"));
+ return true;
+ },
+ "Something is error",
+ runningChecker));
+ Thread.sleep(10000);
+ executorService.shutdown();
+ log.info("Thread has finished");
+ assertEquals(0, user.age);
+ assertEquals("test", user.name);
+ assertEquals(false, runningChecker.isRunning());
+ } catch (Exception e) {
+ log.warn("Exception has been caught");
+ }
+ }
+
+ @Data
+ public class User {
+ String name;
+ int age;
+ }
+}