fix: code style issues to make compile pass with checkstyle plugin enabled (#160)
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
diff --git a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
index ed6a91c..33b59fa 100644
--- a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
+++ b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
@@ -48,19 +48,19 @@
return;
}
- // tps: 每秒文件打印的行数
+ // tps: rows to print for each second
final long tps = currentRowCount - previousRowCount.get();
previousRowCount.set(currentRowCount);
writeCount.add(currentRowCount);
costTime.add(1000);
- // delayTime(条/ms)=接收的数量/花费的时间
+ // delayTime(record/ms)= receiving-amount / time
final double delayTime = writeCount.longValue() / costTime.longValue();
// String delayTimeStr = twoDecimal(delayTime);
String info = String.format("Current Time: %s | TPS: %d ",
UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), tps);
- System.out.println(info);
+ System.out.printf("%s%n", info);
}
diff --git a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
index e9acb87..8f91eb8 100644
--- a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
+++ b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
@@ -16,18 +16,20 @@
*/
package org.apache.rocketmq.eventbridge.adapter.benchmark;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.LineNumberReader;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.*;
import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
- * 整条链路
+ * End-to-End use case
*/
public class EventTPSCommon extends AbstractEventCommon {
public static void main(String[] args) {
@@ -35,12 +37,10 @@
if (args.length > 0) {
filePath = args[0];
}
- EventTPSCommon tpsCommon = null;
+ EventTPSCommon tpsCommon;
try {
tpsCommon = new EventTPSCommon(filePath);
tpsCommon.start();
- } catch (FileNotFoundException e) {
- e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
@@ -56,7 +56,7 @@
previousRowCount = new AtomicReference<>();
previousRowCount.set(0);
executorService = new ScheduledThreadPoolExecutor(1,
- new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-all-%d").build());
+ new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-all-%d").build());
}
@Override
diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
index 6718b8d..8418830 100644
--- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
+++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
@@ -75,7 +75,7 @@
private Long validationTimeoutMs;
@Bean("dataSource")
- public DataSource getMasterDataSource(){
+ public DataSource getMasterDataSource() {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(baseUrl);
hikariConfig.setDriverClassName(baseDriverClassName);
@@ -102,7 +102,8 @@
}
@Bean("sqlSessionTemplate")
- public SqlSessionTemplate masterSqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory){
+ public SqlSessionTemplate masterSqlSessionTemplate(
+ @Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
index 97bfb88..adab7ec 100644
--- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
+++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
@@ -22,7 +22,6 @@
import java.util.Map;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.converter.EventTargetRunnerConverter;
import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.dataobject.EventTargetRunnerDO;
import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.mapper.EventTargetRunnerMapper;
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
index 77c6bc9..a0ebc6e 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
@@ -52,19 +52,21 @@
targetRunnerConfig.setName(name);
List<Map<String, String>> components = Lists.newArrayList();
targetRunnerConfig.setComponents(components);
- Map<String, String> sourceComponent = new Gson().fromJson(new Gson().toJson(source
- .getConfig()), new TypeToken<Map<String, String>>() {
- }.getType());
- Map<String, String> filterComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeFilterTransform(filterPattern)
- .getConfig()), new TypeToken<Map<String, String>>() {
- }.getType());
- Map<String, String> transformComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeTransform(targetTransform)
- .getConfig()), new TypeToken<Map<String, String>>() {
- }.getType());
- Map<String, String> targetComponent = new Gson().fromJson(new Gson().toJson(target
- .getConfig()), new TypeToken<Map<String, String>>() {
- }.getType());
+ Map<String, String> sourceComponent = new Gson().fromJson(
+ new Gson().toJson(source.getConfig()),
+ new TypeToken<Map<String, String>>() {}.getType());
+
+ Map<String, String> filterComponent = new Gson().fromJson(
+ new Gson().toJson(RocketMQConverter.buildEventBridgeFilterTransform(filterPattern).getConfig()),
+ new TypeToken<Map<String, String>>() {}.getType());
+
+ Map<String, String> transformComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeTransform(targetTransform).getConfig()),
+ new TypeToken<Map<String, String>>() {}.getType());
+
+ Map<String, String> targetComponent = new Gson().fromJson(new Gson().toJson(target.getConfig()),
+ new TypeToken<Map<String, String>>() {}.getType());
+
components.add(sourceComponent);
components.add(filterComponent);
components.add(transformComponent);
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
index af8bbbd..ba3ed1f 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
@@ -34,19 +34,16 @@
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.StartAndShutdown;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.AbstractStartAndShutdown;
-
import javax.annotation.PostConstruct;
import java.util.concurrent.atomic.AtomicReference;
/**
* event bridge runtime
- *
- * @author artisan
*/
@Component
public class Runtime {
- private static final Logger logger = LoggerFactory.getLogger(Runtime.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(Runtime.class);
private AtomicReference<RuntimeState> runtimerState;
@@ -65,7 +62,7 @@
@PostConstruct
public void initAndStart() throws Exception {
- logger.info("Start init runtime.");
+ LOGGER.info("Start init runtime.");
circulatorContext.initCirculatorContext(runnerConfigObserver.getTargetRunnerConfig());
runnerConfigObserver.registerListener(circulatorContext);
runnerConfigObserver.registerListener(eventSubscriber);
@@ -80,11 +77,11 @@
RUNTIME_START_AND_SHUTDOWN.start();
java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- logger.info("try to shutdown server");
+ LOGGER.info("try to shutdown server");
try {
RUNTIME_START_AND_SHUTDOWN.shutdown();
} catch (Exception e) {
- logger.error("err when shutdown runtime ", e);
+ LOGGER.error("err when shutdown runtime ", e);
}
}));
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
index 48af27f..b14b551 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
@@ -30,12 +30,10 @@
/**
* listen the event and offer to queue
- *
- * @author artisan
*/
public class EventBusListener extends ServiceThread {
- private static final Logger logger = LoggerFactory.getLogger(EventBusListener.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(EventBusListener.class);
private final CirculatorContext circulatorContext;
private final EventSubscriber eventSubscriber;
@@ -60,7 +58,7 @@
}
circulatorContext.offerEventRecords(pullRecordList);
} catch (Exception exception) {
- logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception);
+ LOGGER.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception);
pullRecordList.forEach(pullRecord -> errorHandler.handle(pullRecord, exception));
}
}
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
index f24ed1b..bc671db 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
@@ -41,7 +41,7 @@
*/
public class EventRuleTransfer extends ServiceThread {
- private static final Logger logger = LoggerFactory.getLogger(EventRuleTransfer.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(EventRuleTransfer.class);
private volatile Integer batchSize = 100;
@@ -68,18 +68,18 @@
@Override
public void run() {
- List<ConnectRecord> afterTransformConnect = new CopyOnWriteArrayList<>();;
+ List<ConnectRecord> afterTransformConnect = new CopyOnWriteArrayList<>();
while (!stopped) {
try {
Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize);
if (MapUtils.isEmpty(eventRecordMap)) {
- logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
+ LOGGER.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
this.waitForRunning(1000);
continue;
}
Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap();
if (MapUtils.isEmpty(latestTransformMap)) {
- logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis());
+ LOGGER.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis());
this.waitForRunning(3000);
continue;
}
@@ -92,7 +92,7 @@
curEventRecords.forEach(pullRecord -> {
CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
.exceptionally((exception) -> {
- logger.error("transfer do transform event record failed,stackTrace-", exception);
+ LOGGER.error("transfer do transform event record failed, stackTrace-", exception);
errorHandler.handle(pullRecord, exception);
return null;
})
@@ -108,9 +108,9 @@
}
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get();
circulatorContext.offerTargetTaskQueue(afterTransformConnect);
- logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
+ LOGGER.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
} catch (Exception exception) {
- logger.error("transfer event record failed, stackTrace-", exception);
+ LOGGER.error("transfer event record failed, stackTrace-", exception);
afterTransformConnect.forEach(transferRecord -> errorHandler.handle(transferRecord, exception));
}
@@ -127,7 +127,7 @@
try {
circulatorContext.releaseTaskTransform();
} catch (Exception e) {
- logger.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e)));
+ LOGGER.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e)));
}
}
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
index 85dae3b..b9d175b 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
@@ -36,12 +36,10 @@
/**
* event target push to sink task
- *
- * @author artisan
*/
public class EventTargetTrigger extends ServiceThread {
- private static final Logger logger = LoggerFactory.getLogger(EventTargetTrigger.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(EventTargetTrigger.class);
private final CirculatorContext circulatorContext;
private final OffsetManager offsetManager;
@@ -49,7 +47,7 @@
private volatile Integer batchSize = 100;
public EventTargetTrigger(CirculatorContext circulatorContext, OffsetManager offsetManager,
- ErrorHandler errorHandler) {
+ ErrorHandler errorHandler) {
this.circulatorContext = circulatorContext;
this.offsetManager = offsetManager;
this.errorHandler = errorHandler;
@@ -60,15 +58,15 @@
while (!stopped) {
Map<String, List<ConnectRecord>> targetRecordMap = circulatorContext.takeTargetRecords(batchSize);
if (MapUtils.isEmpty(targetRecordMap)) {
- logger.trace("current target pusher is empty");
+ LOGGER.trace("current target pusher is empty");
this.waitForRunning(1000);
continue;
}
- if (logger.isDebugEnabled()) {
- logger.debug("start push content by pusher - {}", JSON.toJSONString(targetRecordMap));
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("start push content by pusher - {}", JSON.toJSONString(targetRecordMap));
}
- for(String runnerName: targetRecordMap.keySet()){
+ for (String runnerName : targetRecordMap.keySet()) {
ExecutorService executorService = circulatorContext.getExecutorService(runnerName);
executorService.execute(() -> {
SinkTask sinkTask = circulatorContext.getPusherTaskMap().get(runnerName);
@@ -77,7 +75,7 @@
sinkTask.put(triggerRecords);
offsetManager.commit(triggerRecords);
} catch (Exception exception) {
- logger.error(getServiceName() + " push target exception, stackTrace-", exception);
+ LOGGER.error(getServiceName() + " push target exception, stackTrace-", exception);
triggerRecords.forEach(triggerRecord -> errorHandler.handle(triggerRecord, exception));
}
});
@@ -101,7 +99,7 @@
circulatorContext.releaseExecutorService();
circulatorContext.releaseTriggerTask();
} catch (Exception e) {
- logger.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e)));
+ LOGGER.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e)));
}
}
}
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
index 2763be3..3ea21fa 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
@@ -21,6 +21,12 @@
import com.google.common.collect.Maps;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.trigger.TriggerTaskContext;
@@ -41,40 +47,40 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.*;
/**
- * event circulator context for listener, transfer and trigger
+ * Event circulatory context for listener, transfer and trigger
*/
@Component
public class CirculatorContext implements TargetRunnerListener {
- private final static Logger logger = LoggerFactory.getLogger(LoggerName.EventBus_Listener);
+ private final static Logger LOGGER = LoggerFactory.getLogger(LoggerName.EVENT_BUS_LISTENER);
@Autowired
private Plugin plugin;
- private static Integer QUEUE_CAPACITY = 50000;
+ private static final Integer QUEUE_CAPACITY = 50000;
- private BlockingQueue<ConnectRecord> eventQueue = new LinkedBlockingQueue<>(50000);
+ private final BlockingQueue<ConnectRecord> eventQueue = new LinkedBlockingQueue<>(50000);
- private BlockingQueue<ConnectRecord> targetQueue = new LinkedBlockingQueue<>(50000);
+ private final BlockingQueue<ConnectRecord> targetQueue = new LinkedBlockingQueue<>(50000);
- private Map<String/*RunnerName*/, TargetRunnerConfig> runnerConfigMap = new ConcurrentHashMap<>(30);
+ private final Map<String/*RunnerName*/, TargetRunnerConfig> runnerConfigMap = new ConcurrentHashMap<>(30);
- private Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>> eventQueueMap = new ConcurrentHashMap<>(30);
+ private final Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>> eventQueueMap = new ConcurrentHashMap<>(30);
- private Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>> targetQueueMap = new ConcurrentHashMap<>(30);
+ private final Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>> targetQueueMap = new ConcurrentHashMap<>(30);
- private Map<String/*RunnerName*/, TransformEngine<ConnectRecord>> taskTransformMap = new ConcurrentHashMap<>(20);
+ private final Map<String/*RunnerName*/, TransformEngine<ConnectRecord>> taskTransformMap = new ConcurrentHashMap<>(20);
- private Map<String/*RunnerName*/, SinkTask> pusherTaskMap = new ConcurrentHashMap<>(20);
+ private final Map<String/*RunnerName*/, SinkTask> pusherTaskMap = new ConcurrentHashMap<>(20);
- private Map<String/*RunnerName*/, ExecutorService> pusherExecutorMap = new ConcurrentHashMap<>(10);
+ private final Map<String/*RunnerName*/, ExecutorService> pusherExecutorMap = new ConcurrentHashMap<>(10);
/**
* initial targetRunnerMap, taskTransformMap, pusherTaskMap
- * @param targetRunnerConfigs
+ *
+ * @param targetRunnerConfigs Configurations for the target runner
*/
public void initCirculatorContext(Set<TargetRunnerConfig> targetRunnerConfigs) {
if (CollectionUtils.isEmpty(targetRunnerConfigs)) {
@@ -102,10 +108,11 @@
/**
* get target runner config by runner name
+ *
* @param runnerName
* @return
*/
- public TargetRunnerConfig getRunnerConfig(String runnerName){
+ public TargetRunnerConfig getRunnerConfig(String runnerName) {
return runnerConfigMap.get(runnerName);
}
@@ -122,21 +129,23 @@
/**
* update record queue map
+ *
* @param recordMap
* @param eventQueueMap
*/
- private boolean updateRecordQueueMap(Map<String, List<ConnectRecord>> recordMap, Map<String, BlockingQueue<ConnectRecord>> eventQueueMap) {
- try{
- for(String runnerName : recordMap.keySet()){
+ private boolean updateRecordQueueMap(Map<String, List<ConnectRecord>> recordMap,
+ Map<String, BlockingQueue<ConnectRecord>> eventQueueMap) {
+ try {
+ for (String runnerName : recordMap.keySet()) {
BlockingQueue<ConnectRecord> recordQueue = eventQueueMap.get(runnerName);
- if(CollectionUtils.isEmpty(recordQueue)){
+ if (CollectionUtils.isEmpty(recordQueue)) {
recordQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
}
recordQueue.addAll(recordMap.get(runnerName));
eventQueueMap.put(runnerName, recordQueue);
}
return true;
- }catch (Exception exception){
+ } catch (Exception exception) {
return false;
}
}
@@ -147,7 +156,7 @@
* @return
*/
public Map<String, List<ConnectRecord>> takeEventRecords(int batchSize) {
- if(eventQueue.isEmpty()){
+ if (eventQueue.isEmpty()) {
return null;
}
List<ConnectRecord> eventRecords = Lists.newArrayList();
@@ -171,11 +180,12 @@
/**
* take batch target records
+ *
* @param batchSize
* @return
*/
public Map<String, List<ConnectRecord>> takeTargetRecords(Integer batchSize) {
- if(targetQueue.isEmpty()){
+ if (targetQueue.isEmpty()) {
return null;
}
List<ConnectRecord> targetRecords = Lists.newArrayList();
@@ -185,6 +195,7 @@
/**
* user runner-name as key
+ *
* @param eventRecords
* @return
*/
@@ -193,7 +204,7 @@
for (ConnectRecord connectRecord : eventRecords) {
String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName);
- if(CollectionUtils.isEmpty(curEventRecords)){
+ if (CollectionUtils.isEmpty(curEventRecords)) {
curEventRecords = Lists.newArrayList();
}
curEventRecords.add(connectRecord);
@@ -204,15 +215,17 @@
/**
* get specific thread pool by push name
+ *
* @param runnerName
* @return
*/
- public ExecutorService getExecutorService(String runnerName){
+ public ExecutorService getExecutorService(String runnerName) {
return pusherExecutorMap.get(runnerName);
}
/**
* refresh target runner where config changed
+ *
* @param targetRunnerConfig
* @param refreshTypeEnum
*/
@@ -225,7 +238,7 @@
TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(targetRunnerConfig.getComponents(), plugin);
taskTransformMap.put(runnerName, transformChain);
- int endIndex = targetRunnerConfig.getComponents().size() -1;
+ int endIndex = targetRunnerConfig.getComponents().size() - 1;
TargetKeyValue targetKeyValue = new TargetKeyValue(targetRunnerConfig.getComponents().get(endIndex));
SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
pusherTaskMap.put(runnerName, sinkTask);
@@ -234,16 +247,16 @@
pusherExecutorMap.put(runnerName, initDefaultThreadPoolExecutor(runnerName));
}
- if(logger.isInfoEnabled()){
- logger.info("runnerName -{}- refresh context by refresh type -{}- succeed", runnerName, refreshTypeEnum.name());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("runnerName -{}- refresh context by refresh type -{}- succeed", runnerName, refreshTypeEnum.name());
}
break;
case DELETE:
runnerConfigMap.remove(runnerName);
taskTransformMap.remove(runnerName);
pusherTaskMap.remove(runnerName);
- if(logger.isInfoEnabled()){
- logger.info("runnerName -{}- remove context succeed", runnerName);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("runnerName -{}- remove context succeed", runnerName);
}
break;
default:
@@ -253,16 +266,18 @@
/**
* init default thread poll param, support auto config
+ *
* @param threadPollName
* @return
*/
private ExecutorService initDefaultThreadPoolExecutor(String threadPollName) {
return new ThreadPoolExecutor(200, 300, 1, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(300), ThreadUtils.newThreadFactory(threadPollName, false));
+ new LinkedBlockingQueue<>(300), ThreadUtils.newThreadFactory(threadPollName, false));
}
/**
* init target sink task
+ *
* @param targetKeyValue
* @return
*/
@@ -286,13 +301,12 @@
Plugin.compareAndSwapLoaders(loader);
}
return sinkTask;
- }catch (Exception exception) {
- logger.error("task class -" + taskClass + "- init its sinkTask failed, ex- ", exception);
+ } catch (Exception exception) {
+ LOGGER.error("task class -" + taskClass + "- init its sinkTask failed, ex- ", exception);
}
return null;
}
-
public void releaseTaskTransform() throws Exception {
for (Map.Entry<String, TransformEngine<ConnectRecord>> taskTransform : taskTransformMap.entrySet()) {
String runnerName = taskTransform.getKey();
@@ -303,7 +317,7 @@
}
public void releaseTriggerTask() {
- for (Map.Entry<String, SinkTask> triggerTask: pusherTaskMap.entrySet()) {
+ for (Map.Entry<String, SinkTask> triggerTask : pusherTaskMap.entrySet()) {
SinkTask sinkTask = triggerTask.getValue();
String runnerName = triggerTask.getKey();
sinkTask.stop();
@@ -312,7 +326,7 @@
}
public void releaseExecutorService() throws Exception {
- for (Map.Entry<String, ExecutorService> pusherExecutor: pusherExecutorMap.entrySet()) {
+ for (Map.Entry<String, ExecutorService> pusherExecutor : pusherExecutorMap.entrySet()) {
ExecutorService pusher = pusherExecutor.getValue();
ShutdownUtils.shutdownThreadPool(pusher);
}
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
index be4db9b..dc18ab7 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
@@ -29,6 +29,7 @@
/**
* Refresh subscriber inner data when runner keys changed
+ *
* @param subscribeRunnerKeys
* @param refreshTypeEnum
*/
@@ -42,7 +43,7 @@
public abstract List<ConnectRecord> pull();
/**
- * Commit the connect records.
+ * Commit connect records.
*
* @param connectRecordList
*/
@@ -54,12 +55,13 @@
public abstract void close();
/**
- * Put the connect record to the eventbus.
+ * Put connect record to the eventbus.
+ *
* @param eventBusName
* @param connectRecord
* @param delaySec
*/
- public boolean put(String eventBusName, ConnectRecord connectRecord, int delaySec){
+ public boolean put(String eventBusName, ConnectRecord connectRecord, int delaySec) {
// convert the eventBusName to Topic ?
return true;
}
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
index f9e879a..5737892 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
@@ -37,7 +37,7 @@
public class TransformEngine<R extends ConnectRecord> implements AutoCloseable {
- private static final Logger logger = LoggerFactory.getLogger(LoggerName.EventRule_Transfer);
+ private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.EVENT_RULE_TRANSFER);
private final List<Transform> transformList;
@@ -76,18 +76,19 @@
transform.init(transformConfig);
this.transformList.add(transform);
} catch (Exception e) {
- logger.error("transform new instance error", e);
+ LOGGER.error("transform new instance error", e);
}
}
}
/**
* format listener and pusher key
+ *
* @param components
* @return
*/
private TargetKeyValue formatTargetKey(List<Map<String, String>> components) {
- if(CollectionUtils.isEmpty(components)){
+ if (CollectionUtils.isEmpty(components)) {
return null;
}
int startIndex = 0;
@@ -101,6 +102,7 @@
/**
* transform event record for target record
+ *
* @param connectRecord
* @return
*/
@@ -120,10 +122,11 @@
/**
* get task config value by key
+ *
* @param configKey
* @return
*/
- public String getConnectConfig(String configKey){
+ public String getConnectConfig(String configKey) {
return config.getString(configKey);
}
@@ -149,8 +152,10 @@
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
TransformEngine<?> that = (TransformEngine<?>) o;
return transformList.equals(that.transformList) && config.equals(that.config);
}
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
index 6af6382..6edb06d 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
@@ -43,7 +43,7 @@
*/
private final TargetKeyValue taskConfig;
- private static final Logger logger = LoggerFactory.getLogger(LoggerName.EventTarget_Trigger);
+ private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.EVENT_TARGET_TRIGGER);
private final Map<MessageQueue, Long> messageQueuesOffsetMap = new ConcurrentHashMap<>(64);
@@ -61,20 +61,20 @@
@Override
public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
if (null == recordPartition || null == recordPartition.getPartition() || null == recordOffset || null == recordOffset.getOffset()) {
- logger.warn("recordPartition {} info is null or recordOffset {} info is null", recordPartition, recordOffset);
+ LOGGER.warn("recordPartition {} info is null or recordOffset {} info is null", recordPartition, recordOffset);
return;
}
String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
String topic = (String) recordPartition.getPartition().get(TOPIC);
Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
- logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+ LOGGER.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
return;
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET));
if (null == offset) {
- logger.warn("resetOffset, offset is null");
+ LOGGER.warn("resetOffset, offset is null");
return;
}
messageQueuesOffsetMap.put(messageQueue, offset);
@@ -83,12 +83,12 @@
@Override
public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
if (MapUtils.isEmpty(offsets)) {
- logger.warn("resetOffset, offsets {} is null", offsets);
+ LOGGER.warn("resetOffset, offsets {} is null", offsets);
return;
}
for (Map.Entry<RecordPartition, RecordOffset> entry : offsets.entrySet()) {
if (null == entry || null == entry.getKey() || null == entry.getKey().getPartition() || null == entry.getValue() || null == entry.getValue().getOffset()) {
- logger.warn("recordPartition {} info is null or recordOffset {} info is null, entry {}", entry);
+ LOGGER.warn("recordPartition {} info is null or recordOffset {} info is null, entry {}", entry);
continue;
}
RecordPartition recordPartition = entry.getKey();
@@ -96,14 +96,14 @@
String topic = (String) recordPartition.getPartition().get(TOPIC);
Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
- logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+ LOGGER.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
continue;
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
RecordOffset recordOffset = entry.getValue();
Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET));
if (null == offset) {
- logger.warn("resetOffset, offset is null");
+ LOGGER.warn("resetOffset, offset is null");
continue;
}
messageQueuesOffsetMap.put(messageQueue, offset);
@@ -113,24 +113,24 @@
@Override
public void pause(List<RecordPartition> recordPartitions) {
if (recordPartitions == null || recordPartitions.size() == 0) {
- logger.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
+ LOGGER.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
return;
}
for (RecordPartition recordPartition : recordPartitions) {
if (null == recordPartition || null == recordPartition.getPartition()) {
- logger.warn("recordPartition {} info is null", recordPartition);
+ LOGGER.warn("recordPartition {} info is null", recordPartition);
continue;
}
String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
String topic = (String) recordPartition.getPartition().get(TOPIC);
Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
- logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+ LOGGER.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
continue;
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
- logger.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
+ LOGGER.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
continue;
}
messageQueuesStateMap.put(messageQueue, QueueState.PAUSE);
@@ -140,24 +140,24 @@
@Override
public void resume(List<RecordPartition> recordPartitions) {
if (recordPartitions == null || recordPartitions.size() == 0) {
- logger.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
+ LOGGER.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
return;
}
for (RecordPartition recordPartition : recordPartitions) {
if (null == recordPartition || null == recordPartition.getPartition()) {
- logger.warn("recordPartition {} info is null", recordPartition);
+ LOGGER.warn("recordPartition {} info is null", recordPartition);
continue;
}
String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
String topic = (String) recordPartition.getPartition().get(TOPIC);
Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
- logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+ LOGGER.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
continue;
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
- logger.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
+ LOGGER.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
continue;
}
messageQueuesStateMap.remove(messageQueue);
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
index 39734c4..2174060 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
@@ -21,8 +21,8 @@
* Define all the logger name of the runtime.
*/
public class LoggerName {
- public static final String EventBridge_RUNTIMER = "EventBridgeRuntimer";
- public static final String EventBus_Listener = "EventBusListener";
- public static final String EventRule_Transfer = "EventRuleTransfer";
- public static final String EventTarget_Trigger = "EventTargetTrigger";
+ public static final String EVENT_BRIDGE_RUNTIMER = "EventBridgeRuntimer";
+ public static final String EVENT_BUS_LISTENER = "EventBusListener";
+ public static final String EVENT_RULE_TRANSFER = "EventRuleTransfer";
+ public static final String EVENT_TARGET_TRIGGER = "EventTargetTrigger";
}
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
index 400cf47..027925e 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
@@ -27,7 +27,7 @@
public abstract class ServiceThread extends AbstractStartAndShutdown implements Runnable {
- private static final Logger logger = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+ private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
private static final long JOIN_TIME = 90 * 1000;
@@ -45,14 +45,14 @@
public abstract String getServiceName();
public void start() {
- logger.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread);
+ LOGGER.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread);
if (!hasNotified.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread.setDaemon(isDaemon);
this.thread.start();
- logger.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread);
+ LOGGER.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread);
}
public void shutdown() {
@@ -61,7 +61,7 @@
public void shutdown(final boolean interrupt) {
this.stopped = true;
- logger.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
+ LOGGER.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
@@ -77,10 +77,10 @@
this.thread.join(this.getJointime());
}
long eclipseTime = System.currentTimeMillis() - beginTime;
- logger.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
+ LOGGER.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
+ this.getJointime());
} catch (InterruptedException e) {
- logger.error("Interrupted", e);
+ LOGGER.error("Interrupted", e);
}
}
@@ -94,7 +94,7 @@
public void stop(final boolean interrupt) {
this.stopped = true;
- logger.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
+ LOGGER.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
@@ -107,7 +107,7 @@
public void makeStop() {
this.stopped = true;
- logger.info("makestop thread " + this.getServiceName());
+ LOGGER.info("makestop thread " + this.getServiceName());
}
public void wakeup() {
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
index aa696e8..58d8e2b 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
@@ -139,15 +139,17 @@
this.properties = properties;
}
- public KeyValue putAll(Map<String,String> configProps){
+ public KeyValue putAll(Map<String, String> configProps) {
this.properties.putAll(configProps);
return this;
}
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
TargetKeyValue that = (TargetKeyValue) o;
return Objects.equals(targetKeyId, that.targetKeyId) && Objects.equals(properties, that.properties);
}
@@ -160,9 +162,9 @@
@Override
public String toString() {
return "TargetKeyValue{" +
- "targetKeyId='" + targetKeyId + '\'' +
- ", properties=" + properties +
- '}';
+ "targetKeyId='" + targetKeyId + '\'' +
+ ", properties=" + properties +
+ '}';
}
public String getTargetKeyId() {
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
index 6cbc2f7..d3ab201 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
@@ -71,19 +71,11 @@
private boolean isEqualsComponents(List<Map<String, String>> source, List<Map<String, String>> target) {
if (source == null || target == null) {
- if (source != target) {
- return false;
- } else {
- return true;
- }
+ return source == target;
}
if (source.isEmpty() || target.isEmpty()) {
- if (source.isEmpty() && target.isEmpty()) {
- return true;
- } else {
- return false;
- }
+ return source.isEmpty() && target.isEmpty();
}
if (source.size() != target.size()) {
@@ -99,10 +91,8 @@
String element = targetComponent.get(entry.getKey());
if (element == null && entry.getValue() == null) {
return true;
- } else if (element.equals(entry.getValue())) {
- return true;
} else {
- return false;
+ return element.equals(entry.getValue());
}
}
}
@@ -117,7 +107,7 @@
return components.get(0).get(ACCOUNT_ID);
}
- public SubscribeRunnerKeys getSubscribeRunnerKeys(){
+ public SubscribeRunnerKeys getSubscribeRunnerKeys() {
SubscribeRunnerKeys subscribeRunnerKeys = new SubscribeRunnerKeys();
subscribeRunnerKeys.setRunnerName(this.getName());
subscribeRunnerKeys.setAccountId(this.getAccountId());
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
index 26475f2..cf14e66 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
@@ -19,6 +19,7 @@
import io.openmessaging.connector.api.component.Transform;
import io.openmessaging.connector.api.component.connector.Connector;
import io.openmessaging.connector.api.component.task.Task;
+import java.util.ArrayList;
import org.apache.commons.lang3.StringUtils;
import org.reflections.Configuration;
import org.reflections.Reflections;
@@ -39,7 +40,12 @@
import java.nio.file.Paths;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.*;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collection;
+import java.util.Set;
+import java.util.Arrays;
@Component
public class Plugin extends URLClassLoader {
@@ -48,7 +54,7 @@
@Value("${runtime.pluginpath:}")
private String pluginPath;
- private Map<String, PluginWrapper> classLoaderMap = new HashMap<>();
+ private final Map<String, PluginWrapper> classLoaderMap = new HashMap<>();
public Plugin() {
super(new URL[0], Plugin.class.getClassLoader());
@@ -62,7 +68,7 @@
}
}
- private List<String> initPluginPath(String plugin){
+ private List<String> initPluginPath(String plugin) {
List<String> pluginPaths = new ArrayList<>();
if (StringUtils.isNotEmpty(plugin)) {
String[] strArr = plugin.split(",");
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
index 2c11e1a..0cae63d 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
@@ -16,6 +16,16 @@
*/
package org.apache.rocketmq.eventbridge.adapter.runtime.common.plugin;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.TreeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,7 +33,6 @@
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.*;
import java.util.regex.Pattern;
public class PluginUtils {
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
index e16d4ec..04aa07b 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
@@ -37,7 +37,7 @@
*/
public class FileBaseKeyValueStore<K, V> extends MemoryBasedKeyValueStore<K, V> {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
private String configFilePath;
private Converter keyConverter;
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
index ea5fd11..cf0cc79 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
@@ -29,25 +29,24 @@
*/
public class RuntimeConfigProps {
- private final static Logger logger = LoggerFactory.getLogger(RuntimeConfigProps.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(RuntimeConfigProps.class);
private Properties properties;
- private RuntimeConfigProps(){
+ private RuntimeConfigProps() {
try {
- properties = PropertiesLoaderUtils.loadAllProperties("runtime.properties");
+ properties = PropertiesLoaderUtils.loadAllProperties("runtime.properties");
} catch (IOException exception) {
- logger.error("runtime load properties failed, stackTrace-", exception);
+ LOGGER.error("runtime load properties failed, stackTrace-", exception);
}
}
- private static class RuntimerConfigPropsHolder{
- private static final RuntimeConfigProps instance = new RuntimeConfigProps();
+ private static class RuntimerConfigPropsHolder {
+ private static final RuntimeConfigProps INSTANCE = new RuntimeConfigProps();
}
- public static RuntimeConfigProps build(){
- return RuntimerConfigPropsHolder.instance;
+ public static RuntimeConfigProps build() {
+ return RuntimerConfigPropsHolder.INSTANCE;
}
-
}
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
index 56fa123..00b3c4a 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
@@ -26,7 +26,6 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.DependsOn;
@Configuration
public class RuntimeConfiguration {
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
index dfad654..34800ba 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
@@ -30,7 +30,7 @@
*/
public class JsonConverter implements Converter {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
private Class clazz;
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
index b704c7b..2c31046 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
@@ -32,7 +32,7 @@
*/
public class ListConverter implements Converter<List> {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
private Class clazz;
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
index 4a6d9ef..5b3af12 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
@@ -31,7 +31,7 @@
*/
public class RecordOffsetConverter implements Converter<RecordOffset> {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
@Override
public byte[] objectToByte(RecordOffset recordOffset) {
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
index b7cee55..a988981 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
@@ -91,7 +91,7 @@
return -1;
}
int pow = (int) Math.pow(2, 3 + retryTimes);
- return (pow > 512 ? 512 : pow);
+ return pow > 512 ? 512 : pow;
default:
return -1;
}
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
index 0615e35..589238f 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
@@ -46,7 +46,7 @@
@Override
public Set<SubscribeRunnerKeys> getSubscribeRunnerKeys() {
- if(CollectionUtils.isEmpty(targetRunnerConfigs)){
+ if (CollectionUtils.isEmpty(targetRunnerConfigs)) {
return null;
}
return targetRunnerConfigs.stream().map(item -> {
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
index cbd01e0..85547f2 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
@@ -38,7 +38,6 @@
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
-import org.springframework.stereotype.Component;
@Slf4j
//@Component
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
index e6a9d1d..30bb31e 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
@@ -25,7 +25,7 @@
public class ExceptionUtil {
- private static final Logger logger = LoggerFactory.getLogger(ExceptionUtil.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ExceptionUtil.class);
public static String getErrorMessage(Throwable e) {
if (null == e) {
@@ -40,7 +40,7 @@
StringBuffer buffer = stringWriter.getBuffer();
return buffer.toString();
} catch (Throwable ex) {
- logger.error("", ex);
+ LOGGER.error("", ex);
}
return null;
}
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
index cf8679d..2405a8f 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
@@ -20,14 +20,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public class ShutdownUtils {
- private static final Logger logger = LoggerFactory.getLogger(ShutdownUtils.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownUtils.class);
public static void shutdownThreadPool(ExecutorService executor) {
if (executor != null) {
@@ -35,7 +33,7 @@
try {
executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (Exception e) {
- logger.error("Shutdown threadPool failed", e);
+ LOGGER.error("Shutdown threadPool failed", e);
}
if (!executor.isTerminated()) {
executor.shutdownNow();
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index 4d32dad..58dda61 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -27,6 +27,13 @@
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
@@ -57,7 +64,6 @@
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
-import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -72,7 +78,7 @@
@DependsOn("flyway")
public class RocketMQEventSubscriber extends EventSubscriber {
- private static final Logger logger = LoggerFactory.getLogger(RocketMQEventSubscriber.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQEventSubscriber.class);
@Autowired
private EventDataRepository eventDataRepository;
@@ -98,7 +104,7 @@
public static final String MSG_ID = "msgId";
@PostConstruct
- public void initRocketMQEventSubscriber(){
+ public void initRocketMQEventSubscriber() {
this.initMqProperties();
this.initConsumeWorkers();
}
@@ -123,18 +129,18 @@
ArrayList<MessageExt> messages = new ArrayList<>();
messageBuffer.drainTo(messages, pullBatchSize);
if (CollectionUtils.isEmpty(messages)) {
- logger.trace("consumer poll message empty.");
+ LOGGER.trace("consumer poll message empty.");
return null;
}
List<ConnectRecord> connectRecords = new CopyOnWriteArrayList<>();
List<CompletableFuture<Void>> completableFutures = Lists.newArrayList();
- messages.forEach(item->{
- CompletableFuture<Void> recordCompletableFuture = CompletableFuture.supplyAsync(()-> convertToSinkRecord(item))
- .exceptionally((exception) -> {
- logger.error("execute completable job failed,stackTrace-", exception);
- return null;
- })
- .thenAccept(connectRecords::add);
+ messages.forEach(item -> {
+ CompletableFuture<Void> recordCompletableFuture = CompletableFuture.supplyAsync(() -> convertToSinkRecord(item))
+ .exceptionally((exception) -> {
+ LOGGER.error("execute completable job failed", exception);
+ return null;
+ })
+ .thenAccept(connectRecords::add);
completableFutures.add(recordCompletableFuture);
});
@@ -145,24 +151,25 @@
/**
* group by runner name batch commit
+ *
* @param connectRecordList
*/
@Override
public void commit(List<ConnectRecord> connectRecordList) {
- if(CollectionUtils.isEmpty(connectRecordList)){
- logger.warn("commit event record data empty!");
+ if (CollectionUtils.isEmpty(connectRecordList)) {
+ LOGGER.warn("commit event record data empty!");
return;
}
String runnerName = connectRecordList.iterator().next().getExtension(RuntimeConfigDefine.RUNNER_NAME);
List<String> msgIds = connectRecordList.stream().map(item -> item.getPosition()
- .getPartition().getPartition().get(MSG_ID).toString()).collect(Collectors.toList());
+ .getPartition().getPartition().get(MSG_ID).toString()).collect(Collectors.toList());
consumeWorkerMap.get(runnerName).commit(msgIds);
}
@Override
public void close() {
for (Map.Entry<String, ConsumeWorker> item : consumeWorkerMap.entrySet()) {
- ConsumeWorker consumeWorker = item.getValue();
+ ConsumeWorker consumeWorker = item.getValue();
consumeWorker.shutdown();
}
}
@@ -187,7 +194,7 @@
clientConfig.setNameSrvAddr(namesrvAddr);
clientConfig.setAccessChannel(AccessChannel.CLOUD.name().equals(accessChannel) ?
- AccessChannel.CLOUD : AccessChannel.LOCAL);
+ AccessChannel.CLOUD : AccessChannel.LOCAL);
clientConfig.setNamespace(namespace);
this.clientConfig = clientConfig;
@@ -196,7 +203,7 @@
}
if (StringUtils.isNotBlank(socks5UserName) && StringUtils.isNotBlank(socks5Password)
- && StringUtils.isNotBlank(socks5Endpoint)) {
+ && StringUtils.isNotBlank(socks5Endpoint)) {
SocksProxyConfig proxyConfig = new SocksProxyConfig();
proxyConfig.setUsername(socks5UserName);
proxyConfig.setPassword(socks5Password);
@@ -206,8 +213,8 @@
this.socksProxy = new Gson().toJson(proxyConfigMap);
}
- }catch (Exception exception){
- logger.error("init rocket mq property exception, stack trace-", exception);
+ } catch (Exception exception) {
+ LOGGER.error("init rocket mq property exception, stack trace-", exception);
}
}
@@ -215,8 +222,8 @@
* init rocket mq pull consumer
*/
private void initConsumeWorkers() {
- Set<SubscribeRunnerKeys> subscribeRunnerKeysSet = runnerConfigObserver.getSubscribeRunnerKeys();
- if(subscribeRunnerKeysSet == null || subscribeRunnerKeysSet.isEmpty()){
+ Set<SubscribeRunnerKeys> subscribeRunnerKeysSet = runnerConfigObserver.getSubscribeRunnerKeys();
+ if (subscribeRunnerKeysSet == null || subscribeRunnerKeysSet.isEmpty()) {
return;
}
for (SubscribeRunnerKeys subscribeRunnerKeys : subscribeRunnerKeysSet) {
@@ -229,6 +236,7 @@
/**
* first init default rocketmq pull consumer
+ *
* @return
*/
public LitePullConsumer initLitePullConsumer(SubscribeRunnerKeys subscribeRunnerKeys) {
@@ -245,7 +253,7 @@
pullConsumer.attachTopic(topic, "*");
pullConsumer.startup();
} catch (Exception exception) {
- logger.error("init default pull consumer exception, topic -" + topic + "-stackTrace-", exception);
+ LOGGER.error("init default pull consumer exception, topic -" + topic + "-stackTrace-", exception);
throw new EventBridgeException(" init rocketmq consumer failed");
}
return pullConsumer;
@@ -265,6 +273,7 @@
/**
* MessageExt convert to connect record
+ *
* @param messageExt
* @return
*/
@@ -311,7 +320,7 @@
private void putConsumeWorker(SubscribeRunnerKeys subscribeRunnerKeys) {
ConsumeWorker consumeWorker = consumeWorkerMap.get(subscribeRunnerKeys.getRunnerName());
- if (!Objects.isNull(consumeWorker)){
+ if (!Objects.isNull(consumeWorker)) {
consumeWorker.shutdown();
}
LitePullConsumer litePullConsumer = initLitePullConsumer(subscribeRunnerKeys);
@@ -322,7 +331,7 @@
private void removeConsumeWorker(SubscribeRunnerKeys subscribeRunnerKeys) {
ConsumeWorker consumeWorker = consumeWorkerMap.remove(subscribeRunnerKeys.getRunnerName());
- if (!Objects.isNull(consumeWorker)){
+ if (!Objects.isNull(consumeWorker)) {
consumeWorker.shutdown();
}
}
@@ -352,12 +361,12 @@
messageBuffer.put(message);
}
} catch (Exception exception) {
- logger.error(getServiceName() + " - RocketMQEventSubscriber pull record exception, stackTrace - ", exception);
+ LOGGER.error(getServiceName() + " - RocketMQEventSubscriber pull record exception, stackTrace - ", exception);
}
}
}
- public void commit(List<String> messageIds){
+ public void commit(List<String> messageIds) {
this.pullConsumer.commit(messageIds);
}
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
index b37b851..f42cf9b 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
@@ -20,10 +20,6 @@
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:08 上午
- */
public class ClientConfig {
private int rmqPullMessageCacheCapacity = 1000;
private int rmqPullMessageBatchNums = 20;
@@ -59,7 +55,7 @@
}
public void setConsumeFromWhere(
- final ConsumeFromWhere consumeFromWhere) {
+ final ConsumeFromWhere consumeFromWhere) {
this.consumeFromWhere = consumeFromWhere;
}
@@ -119,7 +115,6 @@
this.accessChannel = accessChannel;
}
-
public static ClientConfig cloneConfig(ClientConfig clientConfig) {
ClientConfig newConfig = new ClientConfig();
newConfig.setRmqPullMessageBatchNums(clientConfig.getRmqPullMessageBatchNums());
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
index 9923eab..1755490 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
@@ -21,10 +21,6 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:07 上午
- */
public class ConsumeRequest {
private final MessageExt messageExt;
private final MessageQueue messageQueue;
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
index 667f17a..834b7c1 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
@@ -23,10 +23,6 @@
import java.time.Duration;
import java.util.List;
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:09 上午
- */
public interface LitePullConsumer {
void startup() throws MQClientException;
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
index a76012e..888d36d 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
@@ -48,12 +48,8 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:10 上午
- */
public class LitePullConsumerImpl implements LitePullConsumer {
- private static final Logger log = LoggerFactory.getLogger(LitePullConsumerImpl.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(LitePullConsumerImpl.class);
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final LocalMessageCache localMessageCache;
private final ClientConfig clientConfig;
@@ -83,7 +79,7 @@
@Override
public void startup() throws MQClientException {
rocketmqPullConsumer.start();
- log.info("RocketmqPullConsumer start.");
+ LOGGER.info("RocketmqPullConsumer start.");
}
@Override
@@ -98,7 +94,7 @@
try {
executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (Exception e) {
- log.error("Shutdown threadPool failed", e);
+ LOGGER.error("Shutdown threadPool failed", e);
}
if (!executor.isTerminated()) {
executor.shutdownNow();
@@ -114,7 +110,7 @@
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
submitPullTask(topic, tag, mqDivided);
localMessageCache.shrinkPullOffsetTable(mqDivided);
- log.info("Load balance result of topic {} changed, mqAll {}, mqDivided {}.", topic, mqAll, mqDivided);
+ LOGGER.info("Load balance result of topic {} changed, mqAll {}, mqDivided {}.", topic, mqAll, mqDivided);
}
});
}
@@ -156,7 +152,7 @@
}
}
if (CollectionUtils.isEmpty(assignedQueues)) {
- log.warn("Not found any messageQueue, topic:{}", topic);
+ LOGGER.warn("Not found any messageQueue, topic:{}", topic);
return;
}
@@ -167,10 +163,10 @@
try {
PullTask pullTask = new PullTask(messageQueue, tag);
pullImmediately(pullTask);
- log.info("Submit pullTask:{}", messageQueue);
+ LOGGER.info("Submit pullTask:{}", messageQueue);
} catch (Exception e) {
- log.error("Failed submit pullTask:{}, {}, wait next balancing", topic, messageQueue, e);
- // 添加pull失败,等待下次 rebalance
+ LOGGER.error("Failed submit pullTask:{}, {}, wait next balancing", topic, messageQueue, e);
+ // Failed to add pull task, waiting for the next round of re-balance
processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
.getProcessQueueTable().remove(messageQueue);
if (processQueue != null) {
@@ -215,13 +211,13 @@
public void run() {
try {
if (!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState())) {
- log.warn("RocketmqPullConsumer not running, pullTask exit.");
+ LOGGER.warn("RocketmqPullConsumer not running, pullTask exit.");
return;
}
ProcessQueue processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
.getProcessQueueTable().get(messageQueue);
if (processQueue == null || processQueue.isDropped()) {
- log.info("ProcessQueue {} dropped, pullTask exit", messageQueue);
+ LOGGER.info("ProcessQueue {} dropped, pullTask exit", messageQueue);
return;
}
long offset = localMessageCache.nextPullOffset(messageQueue);
@@ -231,7 +227,7 @@
public void onSuccess(PullResult pullResult) {
try {
if (!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState())) {
- log.warn("rocketmqPullConsumer not running, pullTask exit.");
+ LOGGER.warn("rocketmqPullConsumer not running, pullTask exit.");
return;
}
@@ -248,11 +244,11 @@
pullImmediately(PullTask.this);
} else {
localMessageCache.removePullOffset(messageQueue);
- log.info("ProcessQueue {} dropped, discard the pulled message.", messageQueue);
+ LOGGER.info("ProcessQueue {} dropped, discard the pulled message.", messageQueue);
}
break;
case OFFSET_ILLEGAL:
- log.warn("The pull request offset is illegal, offset is {}, message queue is {}, " +
+ LOGGER.warn("The pull request offset is illegal, offset is {}, message queue is {}, " +
"pull result is {}, delay {} ms for next pull",
offset, messageQueue, pullResult, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
@@ -260,16 +256,16 @@
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
- log.info("No NEW_MSG or MATCHED_MSG for mq:{}, pull again.", messageQueue);
+ LOGGER.info("No NEW_MSG or MATCHED_MSG for mq:{}, pull again.", messageQueue);
localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
pullImmediately(PullTask.this);
break;
default:
- log.warn("Failed to process pullResult, mq:{} {}", messageQueue, pullResult);
+ LOGGER.warn("Failed to process pullResult, mq:{} {}", messageQueue, pullResult);
break;
}
} catch (Throwable t) {
- log.error("Exception occurs when process pullResult", t);
+ LOGGER.error("Exception occurs when process pullResult", t);
pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
}
}
@@ -282,13 +278,13 @@
} else {
delayTimeMillis = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
}
- log.error("Exception happens when pull message process, delay {} ms for message queue {}",
+ LOGGER.error("Exception happens when pull message process, delay {} ms for message queue {}",
delayTimeMillis, messageQueue, e);
pullLater(PullTask.this, delayTimeMillis, TimeUnit.MILLISECONDS);
}
});
} catch (Throwable t) {
- log.error("Error occurs when pull message process, delay {} ms for message queue {}",
+ LOGGER.error("Error occurs when pull message process, delay {} ms for message queue {}",
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, messageQueue, t);
pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
}
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
index f3e3617..131e6fc 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
@@ -37,10 +37,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:06 上午
- */
public class LocalMessageCache {
private static final Logger log = LoggerFactory.getLogger(LocalMessageCache.class);
private final BlockingQueue<ConsumeRequest> consumeRequestCache;
diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
index 61000fd..aae17c2 100644
--- a/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
@@ -24,8 +24,9 @@
* 3 times: every 10s~20s
*/
BACKOFF_RETRY(1, 3),
+
/**
- * 176 times: 1,2,4,8,16,32,64,128,256,512,512...512秒 ... 512s(176)
+ * 176 times: 1, 2, 4, 8, 16, 36, 64, 128, 256, 512, 512...512s(176)
*/
EXPONENTIAL_DECAY_RETRY(2, 176);
diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
index 739ecbe..e9e578d 100644
--- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
+++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
@@ -19,9 +19,8 @@
/**
- * EventMeshTraceService
- * SPI可扩展
- * 基于OpenTelemetry实现封装不同追踪器
+ * Offers extension capability via SPI, allowing different tracing/metrics observation implementations: OpenTelemetry,
+ * Jaeger, Zipkin, etc.
*/
public interface TraceStrategy {
diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
index e6f81c2..0ed31ba 100644
--- a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
+++ b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
@@ -43,27 +43,27 @@
private List<AuthValidation> validations = new CopyOnWriteArrayList<>();
- @Value(value="${auth.validation:default}")
+ @Value(value = "${auth.validation:default}")
private String validationName;
@PostConstruct
public void init() {
List<String> validationNames = Arrays.stream(validationName.split(",")).collect(Collectors.toList());
- boolean match = Arrays.stream(validationName.split(",")).allMatch(validationName-> validationName.equals("default"));
+ boolean match = Arrays.stream(validationName.split(",")).allMatch(validationName -> validationName.equals("default"));
if (!match) {
validationNames.add(0, "default");
}
- validationNames.forEach(action->validations.add(ValidationServiceFactory.getInstance(action)));
+ validationNames.forEach(action -> validations.add(ValidationServiceFactory.getInstance(action)));
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
return chain.filter(exchange)
- .subscriberContext(ctx -> {
- AtomicReference<Context> result = new AtomicReference<Context>();
- validations.forEach(validation-> result.set(validation.validate(request, ctx)));
- return result.get();
- });
+ .subscriberContext(ctx -> {
+ AtomicReference<Context> result = new AtomicReference<Context>();
+ validations.forEach(validation -> result.set(validation.validate(request, ctx)));
+ return result.get();
+ });
}
}