fix: code style issues to make compile pass with checkstyle plugin enabled (#160)

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
diff --git a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
index ed6a91c..33b59fa 100644
--- a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
+++ b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
@@ -48,19 +48,19 @@
             return;
         }
 
-        // tps: 每秒文件打印的行数
+        // tps: rows to print for each second
         final long tps = currentRowCount - previousRowCount.get();
         previousRowCount.set(currentRowCount);
         writeCount.add(currentRowCount);
         costTime.add(1000);
-        // delayTime(条/ms)=接收的数量/花费的时间
+        // delayTime(record/ms)= receiving-amount / time
         final double delayTime = writeCount.longValue() / costTime.longValue();
         // String delayTimeStr = twoDecimal(delayTime);
 
         String info = String.format("Current Time: %s  |  TPS: %d  ",
                 UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), tps);
 
-        System.out.println(info);
+        System.out.printf("%s%n", info);
     }
 
 
diff --git a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
index e9acb87..8f91eb8 100644
--- a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
+++ b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
@@ -16,18 +16,20 @@
  */
 package org.apache.rocketmq.eventbridge.adapter.benchmark;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.LineNumberReader;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.*;
 import java.util.TimerTask;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * 整条链路
+ * End-to-End use case
  */
 public class EventTPSCommon extends AbstractEventCommon {
     public static void main(String[] args) {
@@ -35,12 +37,10 @@
         if (args.length > 0) {
             filePath = args[0];
         }
-        EventTPSCommon tpsCommon = null;
+        EventTPSCommon tpsCommon;
         try {
             tpsCommon = new EventTPSCommon(filePath);
             tpsCommon.start();
-        } catch (FileNotFoundException e) {
-            e.printStackTrace();
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -56,7 +56,7 @@
         previousRowCount = new AtomicReference<>();
         previousRowCount.set(0);
         executorService = new ScheduledThreadPoolExecutor(1,
-                new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-all-%d").build());
+            new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-all-%d").build());
     }
 
     @Override
diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
index 6718b8d..8418830 100644
--- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
+++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
@@ -75,7 +75,7 @@
     private Long validationTimeoutMs;
 
     @Bean("dataSource")
-    public DataSource getMasterDataSource(){
+    public DataSource getMasterDataSource() {
         HikariConfig hikariConfig = new HikariConfig();
         hikariConfig.setJdbcUrl(baseUrl);
         hikariConfig.setDriverClassName(baseDriverClassName);
@@ -102,7 +102,8 @@
     }
 
     @Bean("sqlSessionTemplate")
-    public SqlSessionTemplate masterSqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory){
+    public SqlSessionTemplate masterSqlSessionTemplate(
+        @Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
         return new SqlSessionTemplate(sqlSessionFactory);
     }
 
diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
index 97bfb88..adab7ec 100644
--- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
+++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
@@ -22,7 +22,6 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.converter.EventTargetRunnerConverter;
 import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.dataobject.EventTargetRunnerDO;
 import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.mapper.EventTargetRunnerMapper;
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
index 77c6bc9..a0ebc6e 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
@@ -52,19 +52,21 @@
         targetRunnerConfig.setName(name);
         List<Map<String, String>> components = Lists.newArrayList();
         targetRunnerConfig.setComponents(components);
-        Map<String, String> sourceComponent = new Gson().fromJson(new Gson().toJson(source
-            .getConfig()), new TypeToken<Map<String, String>>() {
-        }.getType());
-        Map<String, String> filterComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeFilterTransform(filterPattern)
-            .getConfig()), new TypeToken<Map<String, String>>() {
-        }.getType());
 
-        Map<String, String> transformComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeTransform(targetTransform)
-            .getConfig()), new TypeToken<Map<String, String>>() {
-        }.getType());
-        Map<String, String> targetComponent = new Gson().fromJson(new Gson().toJson(target
-            .getConfig()), new TypeToken<Map<String, String>>() {
-        }.getType());
+        Map<String, String> sourceComponent = new Gson().fromJson(
+            new Gson().toJson(source.getConfig()),
+            new TypeToken<Map<String, String>>() {}.getType());
+
+        Map<String, String> filterComponent = new Gson().fromJson(
+            new Gson().toJson(RocketMQConverter.buildEventBridgeFilterTransform(filterPattern).getConfig()),
+            new TypeToken<Map<String, String>>() {}.getType());
+
+        Map<String, String> transformComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeTransform(targetTransform).getConfig()),
+            new TypeToken<Map<String, String>>() {}.getType());
+
+        Map<String, String> targetComponent = new Gson().fromJson(new Gson().toJson(target.getConfig()),
+            new TypeToken<Map<String, String>>() {}.getType());
+
         components.add(sourceComponent);
         components.add(filterComponent);
         components.add(transformComponent);
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
index af8bbbd..ba3ed1f 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
@@ -34,19 +34,16 @@
 import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.StartAndShutdown;
 import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.AbstractStartAndShutdown;
 
-
 import javax.annotation.PostConstruct;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * event bridge runtime
- *
- * @author artisan
  */
 @Component
 public class Runtime {
 
-    private static final Logger logger = LoggerFactory.getLogger(Runtime.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(Runtime.class);
 
     private AtomicReference<RuntimeState> runtimerState;
 
@@ -65,7 +62,7 @@
 
     @PostConstruct
     public void initAndStart() throws Exception {
-        logger.info("Start init runtime.");
+        LOGGER.info("Start init runtime.");
         circulatorContext.initCirculatorContext(runnerConfigObserver.getTargetRunnerConfig());
         runnerConfigObserver.registerListener(circulatorContext);
         runnerConfigObserver.registerListener(eventSubscriber);
@@ -80,11 +77,11 @@
         RUNTIME_START_AND_SHUTDOWN.start();
 
         java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            logger.info("try to shutdown server");
+            LOGGER.info("try to shutdown server");
             try {
                 RUNTIME_START_AND_SHUTDOWN.shutdown();
             } catch (Exception e) {
-                logger.error("err when shutdown runtime ", e);
+                LOGGER.error("err when shutdown runtime ", e);
             }
         }));
 
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
index 48af27f..b14b551 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
@@ -30,12 +30,10 @@
 
 /**
  * listen the event and offer to queue
- *
- * @author artisan
  */
 public class EventBusListener extends ServiceThread {
 
-    private static final Logger logger = LoggerFactory.getLogger(EventBusListener.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventBusListener.class);
 
     private final CirculatorContext circulatorContext;
     private final EventSubscriber eventSubscriber;
@@ -60,7 +58,7 @@
                 }
                 circulatorContext.offerEventRecords(pullRecordList);
             } catch (Exception exception) {
-                logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception);
+                LOGGER.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception);
                 pullRecordList.forEach(pullRecord -> errorHandler.handle(pullRecord, exception));
             }
         }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
index f24ed1b..bc671db 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
@@ -41,7 +41,7 @@
  */
 public class EventRuleTransfer extends ServiceThread {
 
-    private static final Logger logger = LoggerFactory.getLogger(EventRuleTransfer.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventRuleTransfer.class);
 
     private volatile Integer batchSize = 100;
 
@@ -68,18 +68,18 @@
 
     @Override
     public void run() {
-        List<ConnectRecord> afterTransformConnect = new CopyOnWriteArrayList<>();;
+        List<ConnectRecord> afterTransformConnect = new CopyOnWriteArrayList<>();
         while (!stopped) {
             try {
                 Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize);
                 if (MapUtils.isEmpty(eventRecordMap)) {
-                    logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
+                    LOGGER.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
                     this.waitForRunning(1000);
                     continue;
                 }
                 Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap();
                 if (MapUtils.isEmpty(latestTransformMap)) {
-                    logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis());
+                    LOGGER.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis());
                     this.waitForRunning(3000);
                     continue;
                 }
@@ -92,7 +92,7 @@
                     curEventRecords.forEach(pullRecord -> {
                         CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
                             .exceptionally((exception) -> {
-                                logger.error("transfer do transform event record failed,stackTrace-", exception);
+                                LOGGER.error("transfer do transform event record failed, stackTrace-", exception);
                                 errorHandler.handle(pullRecord, exception);
                                 return null;
                             })
@@ -108,9 +108,9 @@
                 }
                 CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get();
                 circulatorContext.offerTargetTaskQueue(afterTransformConnect);
-                logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
+                LOGGER.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
             } catch (Exception exception) {
-                logger.error("transfer event record failed, stackTrace-", exception);
+                LOGGER.error("transfer event record failed, stackTrace-", exception);
                 afterTransformConnect.forEach(transferRecord -> errorHandler.handle(transferRecord, exception));
             }
 
@@ -127,7 +127,7 @@
         try {
             circulatorContext.releaseTaskTransform();
         } catch (Exception e) {
-            logger.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e)));
+            LOGGER.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e)));
         }
     }
 
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
index 85dae3b..b9d175b 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
@@ -36,12 +36,10 @@
 
 /**
  * event target push to sink task
- *
- * @author artisan
  */
 public class EventTargetTrigger extends ServiceThread {
 
-    private static final Logger logger = LoggerFactory.getLogger(EventTargetTrigger.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventTargetTrigger.class);
 
     private final CirculatorContext circulatorContext;
     private final OffsetManager offsetManager;
@@ -49,7 +47,7 @@
     private volatile Integer batchSize = 100;
 
     public EventTargetTrigger(CirculatorContext circulatorContext, OffsetManager offsetManager,
-                              ErrorHandler errorHandler) {
+        ErrorHandler errorHandler) {
         this.circulatorContext = circulatorContext;
         this.offsetManager = offsetManager;
         this.errorHandler = errorHandler;
@@ -60,15 +58,15 @@
         while (!stopped) {
             Map<String, List<ConnectRecord>> targetRecordMap = circulatorContext.takeTargetRecords(batchSize);
             if (MapUtils.isEmpty(targetRecordMap)) {
-                logger.trace("current target pusher is empty");
+                LOGGER.trace("current target pusher is empty");
                 this.waitForRunning(1000);
                 continue;
             }
-            if (logger.isDebugEnabled()) {
-                logger.debug("start push content by pusher - {}", JSON.toJSONString(targetRecordMap));
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("start push content by pusher - {}", JSON.toJSONString(targetRecordMap));
             }
 
-            for(String runnerName: targetRecordMap.keySet()){
+            for (String runnerName : targetRecordMap.keySet()) {
                 ExecutorService executorService = circulatorContext.getExecutorService(runnerName);
                 executorService.execute(() -> {
                     SinkTask sinkTask = circulatorContext.getPusherTaskMap().get(runnerName);
@@ -77,7 +75,7 @@
                         sinkTask.put(triggerRecords);
                         offsetManager.commit(triggerRecords);
                     } catch (Exception exception) {
-                        logger.error(getServiceName() + " push target exception, stackTrace-", exception);
+                        LOGGER.error(getServiceName() + " push target exception, stackTrace-", exception);
                         triggerRecords.forEach(triggerRecord -> errorHandler.handle(triggerRecord, exception));
                     }
                 });
@@ -101,7 +99,7 @@
             circulatorContext.releaseExecutorService();
             circulatorContext.releaseTriggerTask();
         } catch (Exception e) {
-            logger.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e)));
+            LOGGER.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e)));
         }
     }
 }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
index 2763be3..3ea21fa 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
@@ -21,6 +21,12 @@
 import com.google.common.collect.Maps;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.eventbridge.adapter.runtime.boot.trigger.TriggerTaskContext;
@@ -41,40 +47,40 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.*;
 
 /**
- * event circulator context for listener, transfer and trigger
+ * Event circulatory context for listener, transfer and trigger
  */
 @Component
 public class CirculatorContext implements TargetRunnerListener {
 
-    private final static Logger logger = LoggerFactory.getLogger(LoggerName.EventBus_Listener);
+    private final static Logger LOGGER = LoggerFactory.getLogger(LoggerName.EVENT_BUS_LISTENER);
 
     @Autowired
     private Plugin plugin;
 
-    private static Integer QUEUE_CAPACITY = 50000;
+    private static final Integer QUEUE_CAPACITY = 50000;
 
-    private BlockingQueue<ConnectRecord> eventQueue = new LinkedBlockingQueue<>(50000);
+    private final BlockingQueue<ConnectRecord> eventQueue = new LinkedBlockingQueue<>(50000);
 
-    private BlockingQueue<ConnectRecord> targetQueue = new LinkedBlockingQueue<>(50000);
+    private final BlockingQueue<ConnectRecord> targetQueue = new LinkedBlockingQueue<>(50000);
 
-    private Map<String/*RunnerName*/, TargetRunnerConfig> runnerConfigMap = new ConcurrentHashMap<>(30);
+    private final Map<String/*RunnerName*/, TargetRunnerConfig> runnerConfigMap = new ConcurrentHashMap<>(30);
 
-    private Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>> eventQueueMap = new ConcurrentHashMap<>(30);
+    private final Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>> eventQueueMap = new ConcurrentHashMap<>(30);
 
-    private Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>> targetQueueMap = new ConcurrentHashMap<>(30);
+    private final Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>> targetQueueMap = new ConcurrentHashMap<>(30);
 
-    private Map<String/*RunnerName*/, TransformEngine<ConnectRecord>> taskTransformMap = new ConcurrentHashMap<>(20);
+    private final Map<String/*RunnerName*/, TransformEngine<ConnectRecord>> taskTransformMap = new ConcurrentHashMap<>(20);
 
-    private Map<String/*RunnerName*/, SinkTask> pusherTaskMap = new ConcurrentHashMap<>(20);
+    private final Map<String/*RunnerName*/, SinkTask> pusherTaskMap = new ConcurrentHashMap<>(20);
 
-    private Map<String/*RunnerName*/, ExecutorService> pusherExecutorMap = new ConcurrentHashMap<>(10);
+    private final Map<String/*RunnerName*/, ExecutorService> pusherExecutorMap = new ConcurrentHashMap<>(10);
 
     /**
      * initial targetRunnerMap, taskTransformMap, pusherTaskMap
-     * @param targetRunnerConfigs
+     *
+     * @param targetRunnerConfigs Configurations for the target runner
      */
     public void initCirculatorContext(Set<TargetRunnerConfig> targetRunnerConfigs) {
         if (CollectionUtils.isEmpty(targetRunnerConfigs)) {
@@ -102,10 +108,11 @@
 
     /**
      * get target runner config by runner name
+     *
      * @param runnerName
      * @return
      */
-    public TargetRunnerConfig getRunnerConfig(String runnerName){
+    public TargetRunnerConfig getRunnerConfig(String runnerName) {
         return runnerConfigMap.get(runnerName);
     }
 
@@ -122,21 +129,23 @@
 
     /**
      * update record queue map
+     *
      * @param recordMap
      * @param eventQueueMap
      */
-    private boolean updateRecordQueueMap(Map<String, List<ConnectRecord>> recordMap, Map<String, BlockingQueue<ConnectRecord>> eventQueueMap) {
-        try{
-            for(String runnerName : recordMap.keySet()){
+    private boolean updateRecordQueueMap(Map<String, List<ConnectRecord>> recordMap,
+        Map<String, BlockingQueue<ConnectRecord>> eventQueueMap) {
+        try {
+            for (String runnerName : recordMap.keySet()) {
                 BlockingQueue<ConnectRecord> recordQueue = eventQueueMap.get(runnerName);
-                if(CollectionUtils.isEmpty(recordQueue)){
+                if (CollectionUtils.isEmpty(recordQueue)) {
                     recordQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
                 }
                 recordQueue.addAll(recordMap.get(runnerName));
                 eventQueueMap.put(runnerName, recordQueue);
             }
             return true;
-        }catch (Exception exception){
+        } catch (Exception exception) {
             return false;
         }
     }
@@ -147,7 +156,7 @@
      * @return
      */
     public Map<String, List<ConnectRecord>> takeEventRecords(int batchSize) {
-        if(eventQueue.isEmpty()){
+        if (eventQueue.isEmpty()) {
             return null;
         }
         List<ConnectRecord> eventRecords = Lists.newArrayList();
@@ -171,11 +180,12 @@
 
     /**
      * take batch target records
+     *
      * @param batchSize
      * @return
      */
     public Map<String, List<ConnectRecord>> takeTargetRecords(Integer batchSize) {
-        if(targetQueue.isEmpty()){
+        if (targetQueue.isEmpty()) {
             return null;
         }
         List<ConnectRecord> targetRecords = Lists.newArrayList();
@@ -185,6 +195,7 @@
 
     /**
      * user runner-name as key
+     *
      * @param eventRecords
      * @return
      */
@@ -193,7 +204,7 @@
         for (ConnectRecord connectRecord : eventRecords) {
             String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
             List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName);
-            if(CollectionUtils.isEmpty(curEventRecords)){
+            if (CollectionUtils.isEmpty(curEventRecords)) {
                 curEventRecords = Lists.newArrayList();
             }
             curEventRecords.add(connectRecord);
@@ -204,15 +215,17 @@
 
     /**
      * get specific thread pool by push name
+     *
      * @param runnerName
      * @return
      */
-    public ExecutorService getExecutorService(String runnerName){
+    public ExecutorService getExecutorService(String runnerName) {
         return pusherExecutorMap.get(runnerName);
     }
 
     /**
      * refresh target runner where config changed
+     *
      * @param targetRunnerConfig
      * @param refreshTypeEnum
      */
@@ -225,7 +238,7 @@
                 TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(targetRunnerConfig.getComponents(), plugin);
                 taskTransformMap.put(runnerName, transformChain);
 
-                int endIndex = targetRunnerConfig.getComponents().size() -1;
+                int endIndex = targetRunnerConfig.getComponents().size() - 1;
                 TargetKeyValue targetKeyValue = new TargetKeyValue(targetRunnerConfig.getComponents().get(endIndex));
                 SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
                 pusherTaskMap.put(runnerName, sinkTask);
@@ -234,16 +247,16 @@
                     pusherExecutorMap.put(runnerName, initDefaultThreadPoolExecutor(runnerName));
                 }
 
-                if(logger.isInfoEnabled()){
-                    logger.info("runnerName -{}- refresh context by refresh type -{}- succeed", runnerName, refreshTypeEnum.name());
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("runnerName -{}- refresh context by refresh type -{}- succeed", runnerName, refreshTypeEnum.name());
                 }
                 break;
             case DELETE:
                 runnerConfigMap.remove(runnerName);
                 taskTransformMap.remove(runnerName);
                 pusherTaskMap.remove(runnerName);
-                if(logger.isInfoEnabled()){
-                    logger.info("runnerName -{}- remove context succeed", runnerName);
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("runnerName -{}- remove context succeed", runnerName);
                 }
                 break;
             default:
@@ -253,16 +266,18 @@
 
     /**
      * init default thread poll param, support auto config
+     *
      * @param threadPollName
      * @return
      */
     private ExecutorService initDefaultThreadPoolExecutor(String threadPollName) {
         return new ThreadPoolExecutor(200, 300, 1, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(300), ThreadUtils.newThreadFactory(threadPollName, false));
+            new LinkedBlockingQueue<>(300), ThreadUtils.newThreadFactory(threadPollName, false));
     }
 
     /**
      * init target sink task
+     *
      * @param targetKeyValue
      * @return
      */
@@ -286,13 +301,12 @@
                 Plugin.compareAndSwapLoaders(loader);
             }
             return sinkTask;
-        }catch (Exception exception) {
-            logger.error("task class -" + taskClass + "- init its sinkTask failed, ex- ", exception);
+        } catch (Exception exception) {
+            LOGGER.error("task class -" + taskClass + "- init its sinkTask failed, ex- ", exception);
         }
         return null;
     }
 
-
     public void releaseTaskTransform() throws Exception {
         for (Map.Entry<String, TransformEngine<ConnectRecord>> taskTransform : taskTransformMap.entrySet()) {
             String runnerName = taskTransform.getKey();
@@ -303,7 +317,7 @@
     }
 
     public void releaseTriggerTask() {
-        for (Map.Entry<String, SinkTask> triggerTask: pusherTaskMap.entrySet()) {
+        for (Map.Entry<String, SinkTask> triggerTask : pusherTaskMap.entrySet()) {
             SinkTask sinkTask = triggerTask.getValue();
             String runnerName = triggerTask.getKey();
             sinkTask.stop();
@@ -312,7 +326,7 @@
     }
 
     public void releaseExecutorService() throws Exception {
-        for (Map.Entry<String, ExecutorService> pusherExecutor: pusherExecutorMap.entrySet()) {
+        for (Map.Entry<String, ExecutorService> pusherExecutor : pusherExecutorMap.entrySet()) {
             ExecutorService pusher = pusherExecutor.getValue();
             ShutdownUtils.shutdownThreadPool(pusher);
         }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
index be4db9b..dc18ab7 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
@@ -29,6 +29,7 @@
 
     /**
      * Refresh subscriber inner data when runner keys changed
+     *
      * @param subscribeRunnerKeys
      * @param refreshTypeEnum
      */
@@ -42,7 +43,7 @@
     public abstract List<ConnectRecord> pull();
 
     /**
-     * Commit the connect records.
+     * Commit connect records.
      *
      * @param connectRecordList
      */
@@ -54,12 +55,13 @@
     public abstract void close();
 
     /**
-     * Put the connect record to the eventbus.
+     * Put connect record to the eventbus.
+     *
      * @param eventBusName
      * @param connectRecord
      * @param delaySec
      */
-    public  boolean put(String eventBusName, ConnectRecord connectRecord, int delaySec){
+    public boolean put(String eventBusName, ConnectRecord connectRecord, int delaySec) {
         // convert the eventBusName to Topic ?
         return true;
     }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
index f9e879a..5737892 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
@@ -37,7 +37,7 @@
 
 public class TransformEngine<R extends ConnectRecord> implements AutoCloseable {
 
-    private static final Logger logger = LoggerFactory.getLogger(LoggerName.EventRule_Transfer);
+    private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.EVENT_RULE_TRANSFER);
 
     private final List<Transform> transformList;
 
@@ -76,18 +76,19 @@
                 transform.init(transformConfig);
                 this.transformList.add(transform);
             } catch (Exception e) {
-                logger.error("transform new instance error", e);
+                LOGGER.error("transform new instance error", e);
             }
         }
     }
 
     /**
      * format listener and pusher key
+     *
      * @param components
      * @return
      */
     private TargetKeyValue formatTargetKey(List<Map<String, String>> components) {
-        if(CollectionUtils.isEmpty(components)){
+        if (CollectionUtils.isEmpty(components)) {
             return null;
         }
         int startIndex = 0;
@@ -101,6 +102,7 @@
 
     /**
      * transform event record for target record
+     *
      * @param connectRecord
      * @return
      */
@@ -120,10 +122,11 @@
 
     /**
      * get task config value by key
+     *
      * @param configKey
      * @return
      */
-    public String getConnectConfig(String configKey){
+    public String getConnectConfig(String configKey) {
         return config.getString(configKey);
     }
 
@@ -149,8 +152,10 @@
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
         TransformEngine<?> that = (TransformEngine<?>) o;
         return transformList.equals(that.transformList) && config.equals(that.config);
     }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
index 6af6382..6edb06d 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
@@ -43,7 +43,7 @@
      */
     private final TargetKeyValue taskConfig;
 
-    private static final Logger logger = LoggerFactory.getLogger(LoggerName.EventTarget_Trigger);
+    private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.EVENT_TARGET_TRIGGER);
 
     private final Map<MessageQueue, Long> messageQueuesOffsetMap = new ConcurrentHashMap<>(64);
 
@@ -61,20 +61,20 @@
     @Override
     public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
         if (null == recordPartition || null == recordPartition.getPartition() || null == recordOffset || null == recordOffset.getOffset()) {
-            logger.warn("recordPartition {} info is null or recordOffset {} info is null", recordPartition, recordOffset);
+            LOGGER.warn("recordPartition {} info is null or recordOffset {} info is null", recordPartition, recordOffset);
             return;
         }
         String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
         String topic = (String) recordPartition.getPartition().get(TOPIC);
         Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
         if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
-            logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+            LOGGER.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
             return;
         }
         MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
         Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET));
         if (null == offset) {
-            logger.warn("resetOffset, offset is null");
+            LOGGER.warn("resetOffset, offset is null");
             return;
         }
         messageQueuesOffsetMap.put(messageQueue, offset);
@@ -83,12 +83,12 @@
     @Override
     public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
         if (MapUtils.isEmpty(offsets)) {
-            logger.warn("resetOffset, offsets {} is null", offsets);
+            LOGGER.warn("resetOffset, offsets {} is null", offsets);
             return;
         }
         for (Map.Entry<RecordPartition, RecordOffset> entry : offsets.entrySet()) {
             if (null == entry || null == entry.getKey() || null == entry.getKey().getPartition() || null == entry.getValue() || null == entry.getValue().getOffset()) {
-                logger.warn("recordPartition {} info is null or recordOffset {} info is null, entry {}", entry);
+                LOGGER.warn("recordPartition {} info is null or recordOffset {} info is null, entry {}", entry);
                 continue;
             }
             RecordPartition recordPartition = entry.getKey();
@@ -96,14 +96,14 @@
             String topic = (String) recordPartition.getPartition().get(TOPIC);
             Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
             if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
-                logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+                LOGGER.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
                 continue;
             }
             MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
             RecordOffset recordOffset = entry.getValue();
             Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET));
             if (null == offset) {
-                logger.warn("resetOffset, offset is null");
+                LOGGER.warn("resetOffset, offset is null");
                 continue;
             }
             messageQueuesOffsetMap.put(messageQueue, offset);
@@ -113,24 +113,24 @@
     @Override
     public void pause(List<RecordPartition> recordPartitions) {
         if (recordPartitions == null || recordPartitions.size() == 0) {
-            logger.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
+            LOGGER.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
             return;
         }
         for (RecordPartition recordPartition : recordPartitions) {
             if (null == recordPartition || null == recordPartition.getPartition()) {
-                logger.warn("recordPartition {} info is null", recordPartition);
+                LOGGER.warn("recordPartition {} info is null", recordPartition);
                 continue;
             }
             String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
             String topic = (String) recordPartition.getPartition().get(TOPIC);
             Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
             if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
-                logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+                LOGGER.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
                 continue;
             }
             MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
             if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
-                logger.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
+                LOGGER.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
                 continue;
             }
             messageQueuesStateMap.put(messageQueue, QueueState.PAUSE);
@@ -140,24 +140,24 @@
     @Override
     public void resume(List<RecordPartition> recordPartitions) {
         if (recordPartitions == null || recordPartitions.size() == 0) {
-            logger.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
+            LOGGER.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
             return;
         }
         for (RecordPartition recordPartition : recordPartitions) {
             if (null == recordPartition || null == recordPartition.getPartition()) {
-                logger.warn("recordPartition {} info is null", recordPartition);
+                LOGGER.warn("recordPartition {} info is null", recordPartition);
                 continue;
             }
             String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
             String topic = (String) recordPartition.getPartition().get(TOPIC);
             Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
             if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
-                logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+                LOGGER.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
                 continue;
             }
             MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
             if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
-                logger.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
+                LOGGER.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
                 continue;
             }
             messageQueuesStateMap.remove(messageQueue);
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
index 39734c4..2174060 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
@@ -21,8 +21,8 @@
  * Define all the logger name of the runtime.
  */
 public class LoggerName {
-    public static final String EventBridge_RUNTIMER = "EventBridgeRuntimer";
-    public static final String EventBus_Listener = "EventBusListener";
-    public static final String EventRule_Transfer = "EventRuleTransfer";
-    public static final String EventTarget_Trigger = "EventTargetTrigger";
+    public static final String EVENT_BRIDGE_RUNTIMER = "EventBridgeRuntimer";
+    public static final String EVENT_BUS_LISTENER = "EventBusListener";
+    public static final String EVENT_RULE_TRANSFER = "EventRuleTransfer";
+    public static final String EVENT_TARGET_TRIGGER = "EventTargetTrigger";
 }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
index 400cf47..027925e 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
@@ -27,7 +27,7 @@
 
 public abstract class ServiceThread extends AbstractStartAndShutdown implements Runnable {
 
-    private static final Logger logger = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+    private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
 
     private static final long JOIN_TIME = 90 * 1000;
 
@@ -45,14 +45,14 @@
     public abstract String getServiceName();
 
     public void start() {
-        logger.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread);
+        LOGGER.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread);
         if (!hasNotified.compareAndSet(false, true)) {
             return;
         }
         stopped = false;
         this.thread.setDaemon(isDaemon);
         this.thread.start();
-        logger.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread);
+        LOGGER.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread);
     }
 
     public void shutdown() {
@@ -61,7 +61,7 @@
 
     public void shutdown(final boolean interrupt) {
         this.stopped = true;
-        logger.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
+        LOGGER.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
 
         if (hasNotified.compareAndSet(false, true)) {
             waitPoint.countDown(); // notify
@@ -77,10 +77,10 @@
                 this.thread.join(this.getJointime());
             }
             long eclipseTime = System.currentTimeMillis() - beginTime;
-            logger.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
+            LOGGER.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
                 + this.getJointime());
         } catch (InterruptedException e) {
-            logger.error("Interrupted", e);
+            LOGGER.error("Interrupted", e);
         }
     }
 
@@ -94,7 +94,7 @@
 
     public void stop(final boolean interrupt) {
         this.stopped = true;
-        logger.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
+        LOGGER.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
 
         if (hasNotified.compareAndSet(false, true)) {
             waitPoint.countDown(); // notify
@@ -107,7 +107,7 @@
 
     public void makeStop() {
         this.stopped = true;
-        logger.info("makestop thread " + this.getServiceName());
+        LOGGER.info("makestop thread " + this.getServiceName());
     }
 
     public void wakeup() {
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
index aa696e8..58d8e2b 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
@@ -139,15 +139,17 @@
         this.properties = properties;
     }
 
-    public KeyValue putAll(Map<String,String> configProps){
+    public KeyValue putAll(Map<String, String> configProps) {
         this.properties.putAll(configProps);
         return this;
     }
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
         TargetKeyValue that = (TargetKeyValue) o;
         return Objects.equals(targetKeyId, that.targetKeyId) && Objects.equals(properties, that.properties);
     }
@@ -160,9 +162,9 @@
     @Override
     public String toString() {
         return "TargetKeyValue{" +
-                "targetKeyId='" + targetKeyId + '\'' +
-                ", properties=" + properties +
-                '}';
+            "targetKeyId='" + targetKeyId + '\'' +
+            ", properties=" + properties +
+            '}';
     }
 
     public String getTargetKeyId() {
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
index 6cbc2f7..d3ab201 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
@@ -71,19 +71,11 @@
 
     private boolean isEqualsComponents(List<Map<String, String>> source, List<Map<String, String>> target) {
         if (source == null || target == null) {
-            if (source != target) {
-                return false;
-            } else {
-                return true;
-            }
+            return source == target;
         }
 
         if (source.isEmpty() || target.isEmpty()) {
-            if (source.isEmpty() && target.isEmpty()) {
-                return true;
-            } else {
-                return false;
-            }
+            return source.isEmpty() && target.isEmpty();
         }
 
         if (source.size() != target.size()) {
@@ -99,10 +91,8 @@
                 String element = targetComponent.get(entry.getKey());
                 if (element == null && entry.getValue() == null) {
                     return true;
-                } else if (element.equals(entry.getValue())) {
-                    return true;
                 } else {
-                    return false;
+                    return element.equals(entry.getValue());
                 }
             }
         }
@@ -117,7 +107,7 @@
         return components.get(0).get(ACCOUNT_ID);
     }
 
-    public SubscribeRunnerKeys getSubscribeRunnerKeys(){
+    public SubscribeRunnerKeys getSubscribeRunnerKeys() {
         SubscribeRunnerKeys subscribeRunnerKeys = new SubscribeRunnerKeys();
         subscribeRunnerKeys.setRunnerName(this.getName());
         subscribeRunnerKeys.setAccountId(this.getAccountId());
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
index 26475f2..cf14e66 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
@@ -19,6 +19,7 @@
 import io.openmessaging.connector.api.component.Transform;
 import io.openmessaging.connector.api.component.connector.Connector;
 import io.openmessaging.connector.api.component.task.Task;
+import java.util.ArrayList;
 import org.apache.commons.lang3.StringUtils;
 import org.reflections.Configuration;
 import org.reflections.Reflections;
@@ -39,7 +40,12 @@
 import java.nio.file.Paths;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
-import java.util.*;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collection;
+import java.util.Set;
+import java.util.Arrays;
 
 @Component
 public class Plugin extends URLClassLoader {
@@ -48,7 +54,7 @@
     @Value("${runtime.pluginpath:}")
     private String pluginPath;
 
-    private Map<String, PluginWrapper> classLoaderMap = new HashMap<>();
+    private final Map<String, PluginWrapper> classLoaderMap = new HashMap<>();
 
     public Plugin() {
         super(new URL[0], Plugin.class.getClassLoader());
@@ -62,7 +68,7 @@
         }
     }
 
-    private List<String> initPluginPath(String plugin){
+    private List<String> initPluginPath(String plugin) {
         List<String> pluginPaths = new ArrayList<>();
         if (StringUtils.isNotEmpty(plugin)) {
             String[] strArr = plugin.split(",");
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
index 2c11e1a..0cae63d 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
@@ -16,6 +16,16 @@
  */
 package org.apache.rocketmq.eventbridge.adapter.runtime.common.plugin;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.TreeSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,7 +33,6 @@
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.*;
 import java.util.regex.Pattern;
 
 public class PluginUtils {
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
index e16d4ec..04aa07b 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
@@ -37,7 +37,7 @@
  */
 public class FileBaseKeyValueStore<K, V> extends MemoryBasedKeyValueStore<K, V> {
 
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
 
     private String configFilePath;
     private Converter keyConverter;
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
index ea5fd11..cf0cc79 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
@@ -29,25 +29,24 @@
  */
 public class RuntimeConfigProps {
 
-    private final static Logger logger = LoggerFactory.getLogger(RuntimeConfigProps.class);
+    private final static Logger LOGGER = LoggerFactory.getLogger(RuntimeConfigProps.class);
 
     private Properties properties;
 
-    private RuntimeConfigProps(){
+    private RuntimeConfigProps() {
         try {
-             properties = PropertiesLoaderUtils.loadAllProperties("runtime.properties");
+            properties = PropertiesLoaderUtils.loadAllProperties("runtime.properties");
         } catch (IOException exception) {
-            logger.error("runtime load properties failed, stackTrace-", exception);
+            LOGGER.error("runtime load properties failed, stackTrace-", exception);
         }
     }
 
-    private static class  RuntimerConfigPropsHolder{
-        private static final RuntimeConfigProps instance = new RuntimeConfigProps();
+    private static class RuntimerConfigPropsHolder {
+        private static final RuntimeConfigProps INSTANCE = new RuntimeConfigProps();
     }
 
-    public static RuntimeConfigProps build(){
-        return RuntimerConfigPropsHolder.instance;
+    public static RuntimeConfigProps build() {
+        return RuntimerConfigPropsHolder.INSTANCE;
     }
 
-
 }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
index 56fa123..00b3c4a 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
@@ -26,7 +26,6 @@
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.DependsOn;
 
 @Configuration
 public class RuntimeConfiguration {
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
index dfad654..34800ba 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
@@ -30,7 +30,7 @@
  */
 public class JsonConverter implements Converter {
 
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
 
     private Class clazz;
 
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
index b704c7b..2c31046 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
@@ -32,7 +32,7 @@
  */
 public class ListConverter implements Converter<List> {
 
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
 
     private Class clazz;
 
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
index 4a6d9ef..5b3af12 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
@@ -31,7 +31,7 @@
  */
 public class RecordOffsetConverter implements Converter<RecordOffset> {
 
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
 
     @Override
     public byte[] objectToByte(RecordOffset recordOffset) {
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
index b7cee55..a988981 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
@@ -91,7 +91,7 @@
                     return -1;
                 }
                 int pow = (int) Math.pow(2, 3 + retryTimes);
-                return (pow > 512 ? 512 : pow);
+                return pow > 512 ? 512 : pow;
             default:
                 return -1;
         }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
index 0615e35..589238f 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
@@ -46,7 +46,7 @@
 
     @Override
     public Set<SubscribeRunnerKeys> getSubscribeRunnerKeys() {
-        if(CollectionUtils.isEmpty(targetRunnerConfigs)){
+        if (CollectionUtils.isEmpty(targetRunnerConfigs)) {
             return null;
         }
         return targetRunnerConfigs.stream().map(item -> {
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
index cbd01e0..85547f2 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
@@ -38,7 +38,6 @@
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
-import org.springframework.stereotype.Component;
 
 @Slf4j
 //@Component
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
index e6a9d1d..30bb31e 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
@@ -25,7 +25,7 @@
 
 public class ExceptionUtil {
 
-    private static final Logger logger = LoggerFactory.getLogger(ExceptionUtil.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(ExceptionUtil.class);
 
     public static String getErrorMessage(Throwable e) {
         if (null == e) {
@@ -40,7 +40,7 @@
             StringBuffer buffer = stringWriter.getBuffer();
             return buffer.toString();
         } catch (Throwable ex) {
-            logger.error("", ex);
+            LOGGER.error("", ex);
         }
         return null;
     }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
index cf8679d..2405a8f 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
@@ -20,14 +20,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class ShutdownUtils {
 
-    private static final Logger logger = LoggerFactory.getLogger(ShutdownUtils.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownUtils.class);
 
     public static void shutdownThreadPool(ExecutorService executor) {
         if (executor != null) {
@@ -35,7 +33,7 @@
             try {
                 executor.awaitTermination(60, TimeUnit.SECONDS);
             } catch (Exception e) {
-                logger.error("Shutdown threadPool failed", e);
+                LOGGER.error("Shutdown threadPool failed", e);
             }
             if (!executor.isTerminated()) {
                 executor.shutdownNow();
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index 4d32dad..58dda61 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -27,6 +27,13 @@
 import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -57,7 +64,6 @@
 import javax.annotation.PostConstruct;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
-import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -72,7 +78,7 @@
 @DependsOn("flyway")
 public class RocketMQEventSubscriber extends EventSubscriber {
 
-    private static final Logger logger = LoggerFactory.getLogger(RocketMQEventSubscriber.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQEventSubscriber.class);
 
     @Autowired
     private EventDataRepository eventDataRepository;
@@ -98,7 +104,7 @@
     public static final String MSG_ID = "msgId";
 
     @PostConstruct
-    public void initRocketMQEventSubscriber(){
+    public void initRocketMQEventSubscriber() {
         this.initMqProperties();
         this.initConsumeWorkers();
     }
@@ -123,18 +129,18 @@
         ArrayList<MessageExt> messages = new ArrayList<>();
         messageBuffer.drainTo(messages, pullBatchSize);
         if (CollectionUtils.isEmpty(messages)) {
-            logger.trace("consumer poll message empty.");
+            LOGGER.trace("consumer poll message empty.");
             return null;
         }
         List<ConnectRecord> connectRecords = new CopyOnWriteArrayList<>();
         List<CompletableFuture<Void>> completableFutures = Lists.newArrayList();
-        messages.forEach(item->{
-            CompletableFuture<Void> recordCompletableFuture = CompletableFuture.supplyAsync(()-> convertToSinkRecord(item))
-                    .exceptionally((exception) -> {
-                        logger.error("execute completable job failed,stackTrace-", exception);
-                        return null;
-                    })
-                    .thenAccept(connectRecords::add);
+        messages.forEach(item -> {
+            CompletableFuture<Void> recordCompletableFuture = CompletableFuture.supplyAsync(() -> convertToSinkRecord(item))
+                .exceptionally((exception) -> {
+                    LOGGER.error("execute completable job failed", exception);
+                    return null;
+                })
+                .thenAccept(connectRecords::add);
             completableFutures.add(recordCompletableFuture);
         });
 
@@ -145,24 +151,25 @@
 
     /**
      * group by runner name batch commit
+     *
      * @param connectRecordList
      */
     @Override
     public void commit(List<ConnectRecord> connectRecordList) {
-        if(CollectionUtils.isEmpty(connectRecordList)){
-            logger.warn("commit event record data empty!");
+        if (CollectionUtils.isEmpty(connectRecordList)) {
+            LOGGER.warn("commit event record data empty!");
             return;
         }
         String runnerName = connectRecordList.iterator().next().getExtension(RuntimeConfigDefine.RUNNER_NAME);
         List<String> msgIds = connectRecordList.stream().map(item -> item.getPosition()
-                .getPartition().getPartition().get(MSG_ID).toString()).collect(Collectors.toList());
+            .getPartition().getPartition().get(MSG_ID).toString()).collect(Collectors.toList());
         consumeWorkerMap.get(runnerName).commit(msgIds);
     }
 
     @Override
     public void close() {
         for (Map.Entry<String, ConsumeWorker> item : consumeWorkerMap.entrySet()) {
-            ConsumeWorker consumeWorker =  item.getValue();
+            ConsumeWorker consumeWorker = item.getValue();
             consumeWorker.shutdown();
         }
     }
@@ -187,7 +194,7 @@
 
             clientConfig.setNameSrvAddr(namesrvAddr);
             clientConfig.setAccessChannel(AccessChannel.CLOUD.name().equals(accessChannel) ?
-                    AccessChannel.CLOUD : AccessChannel.LOCAL);
+                AccessChannel.CLOUD : AccessChannel.LOCAL);
             clientConfig.setNamespace(namespace);
             this.clientConfig = clientConfig;
 
@@ -196,7 +203,7 @@
             }
 
             if (StringUtils.isNotBlank(socks5UserName) && StringUtils.isNotBlank(socks5Password)
-                    && StringUtils.isNotBlank(socks5Endpoint)) {
+                && StringUtils.isNotBlank(socks5Endpoint)) {
                 SocksProxyConfig proxyConfig = new SocksProxyConfig();
                 proxyConfig.setUsername(socks5UserName);
                 proxyConfig.setPassword(socks5Password);
@@ -206,8 +213,8 @@
                 this.socksProxy = new Gson().toJson(proxyConfigMap);
             }
 
-        }catch (Exception exception){
-            logger.error("init rocket mq property exception, stack trace-", exception);
+        } catch (Exception exception) {
+            LOGGER.error("init rocket mq property exception, stack trace-", exception);
         }
     }
 
@@ -215,8 +222,8 @@
      * init rocket mq pull consumer
      */
     private void initConsumeWorkers() {
-        Set<SubscribeRunnerKeys> subscribeRunnerKeysSet =  runnerConfigObserver.getSubscribeRunnerKeys();
-        if(subscribeRunnerKeysSet == null || subscribeRunnerKeysSet.isEmpty()){
+        Set<SubscribeRunnerKeys> subscribeRunnerKeysSet = runnerConfigObserver.getSubscribeRunnerKeys();
+        if (subscribeRunnerKeysSet == null || subscribeRunnerKeysSet.isEmpty()) {
             return;
         }
         for (SubscribeRunnerKeys subscribeRunnerKeys : subscribeRunnerKeysSet) {
@@ -229,6 +236,7 @@
 
     /**
      * first init default rocketmq pull consumer
+     *
      * @return
      */
     public LitePullConsumer initLitePullConsumer(SubscribeRunnerKeys subscribeRunnerKeys) {
@@ -245,7 +253,7 @@
             pullConsumer.attachTopic(topic, "*");
             pullConsumer.startup();
         } catch (Exception exception) {
-            logger.error("init default pull consumer exception, topic -" + topic + "-stackTrace-", exception);
+            LOGGER.error("init default pull consumer exception, topic -" + topic + "-stackTrace-", exception);
             throw new EventBridgeException(" init rocketmq consumer failed");
         }
         return pullConsumer;
@@ -265,6 +273,7 @@
 
     /**
      * MessageExt convert to connect record
+     *
      * @param messageExt
      * @return
      */
@@ -311,7 +320,7 @@
 
     private void putConsumeWorker(SubscribeRunnerKeys subscribeRunnerKeys) {
         ConsumeWorker consumeWorker = consumeWorkerMap.get(subscribeRunnerKeys.getRunnerName());
-        if (!Objects.isNull(consumeWorker)){
+        if (!Objects.isNull(consumeWorker)) {
             consumeWorker.shutdown();
         }
         LitePullConsumer litePullConsumer = initLitePullConsumer(subscribeRunnerKeys);
@@ -322,7 +331,7 @@
 
     private void removeConsumeWorker(SubscribeRunnerKeys subscribeRunnerKeys) {
         ConsumeWorker consumeWorker = consumeWorkerMap.remove(subscribeRunnerKeys.getRunnerName());
-        if (!Objects.isNull(consumeWorker)){
+        if (!Objects.isNull(consumeWorker)) {
             consumeWorker.shutdown();
         }
     }
@@ -352,12 +361,12 @@
                         messageBuffer.put(message);
                     }
                 } catch (Exception exception) {
-                    logger.error(getServiceName() + " - RocketMQEventSubscriber pull record exception, stackTrace - ", exception);
+                    LOGGER.error(getServiceName() + " - RocketMQEventSubscriber pull record exception, stackTrace - ", exception);
                 }
             }
         }
 
-        public void commit(List<String> messageIds){
+        public void commit(List<String> messageIds) {
             this.pullConsumer.commit(messageIds);
         }
 
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
index b37b851..f42cf9b 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
@@ -20,10 +20,6 @@
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:08 上午
- */
 public class ClientConfig {
     private int rmqPullMessageCacheCapacity = 1000;
     private int rmqPullMessageBatchNums = 20;
@@ -59,7 +55,7 @@
     }
 
     public void setConsumeFromWhere(
-            final ConsumeFromWhere consumeFromWhere) {
+        final ConsumeFromWhere consumeFromWhere) {
         this.consumeFromWhere = consumeFromWhere;
     }
 
@@ -119,7 +115,6 @@
         this.accessChannel = accessChannel;
     }
 
-
     public static ClientConfig cloneConfig(ClientConfig clientConfig) {
         ClientConfig newConfig = new ClientConfig();
         newConfig.setRmqPullMessageBatchNums(clientConfig.getRmqPullMessageBatchNums());
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
index 9923eab..1755490 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
@@ -21,10 +21,6 @@
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:07 上午
- */
 public class ConsumeRequest {
     private final MessageExt messageExt;
     private final MessageQueue messageQueue;
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
index 667f17a..834b7c1 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
@@ -23,10 +23,6 @@
 import java.time.Duration;
 import java.util.List;
 
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:09 上午
- */
 public interface LitePullConsumer {
     void startup() throws MQClientException;
 
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
index a76012e..888d36d 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
@@ -48,12 +48,8 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:10 上午
- */
 public class LitePullConsumerImpl implements LitePullConsumer {
-    private static final Logger log = LoggerFactory.getLogger(LitePullConsumerImpl.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(LitePullConsumerImpl.class);
     private final DefaultMQPullConsumer rocketmqPullConsumer;
     private final LocalMessageCache localMessageCache;
     private final ClientConfig clientConfig;
@@ -83,7 +79,7 @@
     @Override
     public void startup() throws MQClientException {
         rocketmqPullConsumer.start();
-        log.info("RocketmqPullConsumer start.");
+        LOGGER.info("RocketmqPullConsumer start.");
     }
 
     @Override
@@ -98,7 +94,7 @@
             try {
                 executor.awaitTermination(60, TimeUnit.SECONDS);
             } catch (Exception e) {
-                log.error("Shutdown threadPool failed", e);
+                LOGGER.error("Shutdown threadPool failed", e);
             }
             if (!executor.isTerminated()) {
                 executor.shutdownNow();
@@ -114,7 +110,7 @@
             public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
                 submitPullTask(topic, tag, mqDivided);
                 localMessageCache.shrinkPullOffsetTable(mqDivided);
-                log.info("Load balance result of topic {} changed, mqAll {}, mqDivided {}.", topic, mqAll, mqDivided);
+                LOGGER.info("Load balance result of topic {} changed, mqAll {}, mqDivided {}.", topic, mqAll, mqDivided);
             }
         });
     }
@@ -156,7 +152,7 @@
             }
         }
         if (CollectionUtils.isEmpty(assignedQueues)) {
-            log.warn("Not found any messageQueue, topic:{}", topic);
+            LOGGER.warn("Not found any messageQueue, topic:{}", topic);
             return;
         }
 
@@ -167,10 +163,10 @@
                 try {
                     PullTask pullTask = new PullTask(messageQueue, tag);
                     pullImmediately(pullTask);
-                    log.info("Submit pullTask:{}", messageQueue);
+                    LOGGER.info("Submit pullTask:{}", messageQueue);
                 } catch (Exception e) {
-                    log.error("Failed submit pullTask:{}, {}, wait next balancing", topic, messageQueue, e);
-                    // 添加pull失败,等待下次 rebalance
+                    LOGGER.error("Failed submit pullTask:{}, {}, wait next balancing", topic, messageQueue, e);
+                    // Failed to add pull task, waiting for the next round of re-balance
                     processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
                             .getProcessQueueTable().remove(messageQueue);
                     if (processQueue != null) {
@@ -215,13 +211,13 @@
         public void run() {
             try {
                 if (!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState())) {
-                    log.warn("RocketmqPullConsumer not running, pullTask exit.");
+                    LOGGER.warn("RocketmqPullConsumer not running, pullTask exit.");
                     return;
                 }
                 ProcessQueue processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
                         .getProcessQueueTable().get(messageQueue);
                 if (processQueue == null || processQueue.isDropped()) {
-                    log.info("ProcessQueue {} dropped, pullTask exit", messageQueue);
+                    LOGGER.info("ProcessQueue {} dropped, pullTask exit", messageQueue);
                     return;
                 }
                 long offset = localMessageCache.nextPullOffset(messageQueue);
@@ -231,7 +227,7 @@
                     public void onSuccess(PullResult pullResult) {
                         try {
                             if (!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState())) {
-                                log.warn("rocketmqPullConsumer not running, pullTask exit.");
+                                LOGGER.warn("rocketmqPullConsumer not running, pullTask exit.");
                                 return;
                             }
 
@@ -248,11 +244,11 @@
                                         pullImmediately(PullTask.this);
                                     } else {
                                         localMessageCache.removePullOffset(messageQueue);
-                                        log.info("ProcessQueue {} dropped, discard the pulled message.", messageQueue);
+                                        LOGGER.info("ProcessQueue {} dropped, discard the pulled message.", messageQueue);
                                     }
                                     break;
                                 case OFFSET_ILLEGAL:
-                                    log.warn("The pull request offset is illegal, offset is {}, message queue is {}, " +
+                                    LOGGER.warn("The pull request offset is illegal, offset is {}, message queue is {}, " +
                                                     "pull result is {}, delay {} ms for next pull",
                                             offset, messageQueue, pullResult, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                                     localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
@@ -260,16 +256,16 @@
                                     break;
                                 case NO_NEW_MSG:
                                 case NO_MATCHED_MSG:
-                                    log.info("No NEW_MSG or MATCHED_MSG for mq:{}, pull again.", messageQueue);
+                                    LOGGER.info("No NEW_MSG or MATCHED_MSG for mq:{}, pull again.", messageQueue);
                                     localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
                                     pullImmediately(PullTask.this);
                                     break;
                                 default:
-                                    log.warn("Failed to process pullResult, mq:{} {}", messageQueue, pullResult);
+                                    LOGGER.warn("Failed to process pullResult, mq:{} {}", messageQueue, pullResult);
                                     break;
                             }
                         } catch (Throwable t) {
-                            log.error("Exception occurs when process pullResult", t);
+                            LOGGER.error("Exception occurs when process pullResult", t);
                             pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
                         }
                     }
@@ -282,13 +278,13 @@
                         } else {
                             delayTimeMillis = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
                         }
-                        log.error("Exception happens when pull message process, delay {} ms for message queue {}",
+                        LOGGER.error("Exception happens when pull message process, delay {} ms for message queue {}",
                                 delayTimeMillis, messageQueue, e);
                         pullLater(PullTask.this, delayTimeMillis, TimeUnit.MILLISECONDS);
                     }
                 });
             } catch (Throwable t) {
-                log.error("Error occurs when pull message process, delay {} ms for message queue {}",
+                LOGGER.error("Error occurs when pull message process, delay {} ms for message queue {}",
                         PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, messageQueue, t);
                 pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
             }
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
index f3e3617..131e6fc 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
@@ -37,10 +37,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:06 上午
- */
 public class LocalMessageCache {
     private static final Logger log = LoggerFactory.getLogger(LocalMessageCache.class);
     private final BlockingQueue<ConsumeRequest> consumeRequestCache;
diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
index 61000fd..aae17c2 100644
--- a/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
@@ -24,8 +24,9 @@
      * 3 times: every 10s~20s
      */
     BACKOFF_RETRY(1, 3),
+
     /**
-     * 176 times: 1,2,4,8,16,32,64,128,256,512,512...512秒 ... 512s(176)
+     * 176 times: 1, 2, 4, 8, 16, 36, 64, 128, 256, 512, 512...512s(176)
      */
     EXPONENTIAL_DECAY_RETRY(2, 176);
 
diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
index 739ecbe..e9e578d 100644
--- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
+++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
@@ -19,9 +19,8 @@
 
 
 /**
- * EventMeshTraceService
- * SPI可扩展
- * 基于OpenTelemetry实现封装不同追踪器
+ * Offers extension capability via SPI, allowing different tracing/metrics observation implementations: OpenTelemetry,
+ * Jaeger, Zipkin, etc.
  */
 public interface TraceStrategy {
 
diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
index e6f81c2..0ed31ba 100644
--- a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
+++ b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
@@ -43,27 +43,27 @@
 
     private List<AuthValidation> validations = new CopyOnWriteArrayList<>();
 
-    @Value(value="${auth.validation:default}")
+    @Value(value = "${auth.validation:default}")
     private String validationName;
 
     @PostConstruct
     public void init() {
         List<String> validationNames = Arrays.stream(validationName.split(",")).collect(Collectors.toList());
-        boolean match = Arrays.stream(validationName.split(",")).allMatch(validationName-> validationName.equals("default"));
+        boolean match = Arrays.stream(validationName.split(",")).allMatch(validationName -> validationName.equals("default"));
         if (!match) {
             validationNames.add(0, "default");
         }
-        validationNames.forEach(action->validations.add(ValidationServiceFactory.getInstance(action)));
+        validationNames.forEach(action -> validations.add(ValidationServiceFactory.getInstance(action)));
     }
 
     @Override
     public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
         ServerHttpRequest request = exchange.getRequest();
         return chain.filter(exchange)
-                .subscriberContext(ctx -> {
-                    AtomicReference<Context> result = new AtomicReference<Context>();
-                    validations.forEach(validation-> result.set(validation.validate(request, ctx)));
-                    return result.get();
-                });
+            .subscriberContext(ctx -> {
+                AtomicReference<Context> result = new AtomicReference<Context>();
+                validations.forEach(validation -> result.set(validation.validate(request, ctx)));
+                return result.get();
+            });
     }
 }