refactor shutdown
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
index e5bd487..aae2f70 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
@@ -31,6 +31,10 @@
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.StartAndShutdown;
+import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.AbstractStartAndShutdown;
+
+
 import javax.annotation.PostConstruct;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -46,6 +50,8 @@
 
     private AtomicReference<RuntimeState> runtimerState;
 
+    private static final RuntimeStartAndShutdown RUNTIME_START_AND_SHUTDOWN = new RuntimeStartAndShutdown();
+
     @Autowired
     private CirculatorContext circulatorContext;
     @Autowired
@@ -58,17 +64,40 @@
     private ErrorHandler errorHandler;
 
     @PostConstruct
-    public void initAndStart() {
-        logger.info("Start init runtime.");
+    public void initAndStart() throws Exception {
+        logger.info("Start init runtimer.");
         circulatorContext.initCirculatorContext(runnerConfigObserver.getTargetRunnerConfig());
         runnerConfigObserver.registerListener(circulatorContext);
         runnerConfigObserver.registerListener(eventSubscriber);
-        new EventBusListener(circulatorContext, eventSubscriber, errorHandler).start();
-        new EventRuleTransfer(circulatorContext, offsetManager, errorHandler).start();
-        new EventTargetTrigger(circulatorContext, offsetManager, errorHandler).start();
+        EventBusListener eventBusListener = new EventBusListener(circulatorContext, eventSubscriber, errorHandler);
+        EventRuleTransfer eventRuleTransfer = new EventRuleTransfer(circulatorContext, offsetManager, errorHandler);
+        EventTargetTrigger eventTargetPusher = new EventTargetTrigger(circulatorContext, offsetManager, errorHandler);
+        RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventBusListener);
+        RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventRuleTransfer);
+        RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventTargetPusher);
+
+        // start servers one by one.
+        RUNTIME_START_AND_SHUTDOWN.start();
+
+        java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            logger.info("try to shutdown server");
+            try {
+                RUNTIME_START_AND_SHUTDOWN.shutdown();
+            } catch (Exception e) {
+                logger.error("err when shutdown rocketmq-proxy", e);
+            }
+        }));
+
         startRuntimer();
     }
 
+    private static class RuntimeStartAndShutdown extends AbstractStartAndShutdown {
+        @Override
+        protected void appendStartAndShutdown(StartAndShutdown startAndShutdown) {
+            super.appendStartAndShutdown(startAndShutdown);
+        }
+    }
+
     public void startRuntimer() {
         runtimerState = new AtomicReference<>(RuntimeState.START);
     }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
index e6f06c2..48af27f 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
@@ -70,4 +70,9 @@
     public String getServiceName() {
         return EventBusListener.class.getSimpleName();
     }
+
+    @Override
+    public void shutdown() {
+        eventSubscriber.close();
+    }
 }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
index 8612733..4322187 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
@@ -32,9 +32,9 @@
 import org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine;
 import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
 import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
+import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ShutdownUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 
 /**
  * receive event and transfer the rule to pusher
@@ -48,6 +48,8 @@
     private final CirculatorContext circulatorContext;
     private final OffsetManager offsetManager;
     private final ErrorHandler errorHandler;
+    private Map<String, TransformEngine<ConnectRecord>> latestTransformMap;
+    private List<CompletableFuture<Void>> completableFutures;
 
     public EventRuleTransfer(CirculatorContext circulatorContext, OffsetManager offsetManager,
         ErrorHandler errorHandler) {
@@ -75,7 +77,7 @@
                 this.waitForRunning(1000);
                 continue;
             }
-            Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap();
+            latestTransformMap = circulatorContext.getTaskTransformMap();
             if (MapUtils.isEmpty(latestTransformMap)) {
                 logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis());
                 this.waitForRunning(3000);
@@ -83,7 +85,7 @@
             }
 
             List<ConnectRecord> afterTransformConnect = Lists.newArrayList();
-            List<CompletableFuture<Void>> completableFutures = Lists.newArrayList();
+            completableFutures = Lists.newArrayList();
             for(String runnerName: eventRecordMap.keySet()){
                 TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName);
                 List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName);
@@ -117,4 +119,23 @@
         }
     }
 
+    @Override
+    public void start() {
+        thread.start();
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            for (Map.Entry<String, TransformEngine<ConnectRecord>> taskTransform : latestTransformMap.entrySet()) {
+                TransformEngine<ConnectRecord> transformEngine = taskTransform.getValue();
+                transformEngine.close();
+            }
+            ShutdownUtils.completedFuture(completableFutures);
+            circulatorContext.releaseTaskTransform();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
 }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
index a54ee39..c4ff349 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
@@ -89,4 +89,23 @@
         return EventTargetTrigger.class.getSimpleName();
     }
 
+    @Override
+    public void start() {
+        thread.start();
+    }
+
+    @Override
+    public void shutdown() {
+        Map<String, SinkTask> sinkTaskMap =  circulatorContext.getPusherTaskMap();
+        for (Map.Entry<String, SinkTask> item : sinkTaskMap.entrySet()) {
+            SinkTask sinkTask = item.getValue();
+            sinkTask.stop();
+        }
+        try {
+            circulatorContext.releaseExecutorService();
+            circulatorContext.releaseTriggerTask();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
 }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
index 203f43a..7b467d8 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
@@ -32,6 +32,7 @@
 import org.apache.rocketmq.eventbridge.adapter.runtime.common.plugin.Plugin;
 import org.apache.rocketmq.eventbridge.adapter.runtime.common.plugin.PluginClassLoader;
 import org.apache.rocketmq.eventbridge.adapter.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ShutdownUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -291,4 +292,28 @@
         return null;
     }
 
+
+    public void releaseTaskTransform() {
+        for (Map.Entry<String, TransformEngine<ConnectRecord>> taskTransform : taskTransformMap.entrySet()) {
+            String runnerName = taskTransform.getKey();
+            taskTransformMap.remove(runnerName);
+        }
+    }
+
+    public void releaseTriggerTask() {
+        for (Map.Entry<String, SinkTask> triggerTask: pusherTaskMap.entrySet()) {
+            SinkTask sinkTask = triggerTask.getValue();
+            String runnerName = triggerTask.getKey();
+            sinkTask.stop();
+            pusherTaskMap.remove(runnerName);
+        }
+    }
+
+    public void releaseExecutorService() throws Exception {
+        for (Map.Entry<String, ExecutorService> pusherExecutor: pusherExecutorMap.entrySet()) {
+            ExecutorService pusher = pusherExecutor.getValue();
+            ShutdownUtils.shutdownThreadPool(pusher);
+        }
+    }
+
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/AbstractStartAndShutdown.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/AbstractStartAndShutdown.java
similarity index 96%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/AbstractStartAndShutdown.java
rename to adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/AbstractStartAndShutdown.java
index 051fe5a..9a1f33e 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/AbstractStartAndShutdown.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/AbstractStartAndShutdown.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.hook;
+package org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Shutdown.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Shutdown.java
similarity index 92%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Shutdown.java
rename to adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Shutdown.java
index f3ac5f3..854cd19 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Shutdown.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Shutdown.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.hook;
+package org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook;
 
 public interface Shutdown {
     void shutdown() throws Exception;
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Start.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Start.java
similarity index 92%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Start.java
rename to adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Start.java
index b44d86a..353255f 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Start.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Start.java
@@ -14,7 +14,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.hook;
+package org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook;
 
 public interface Start {
     void start() throws Exception;
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/StartAndShutdown.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/StartAndShutdown.java
similarity index 86%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/StartAndShutdown.java
rename to adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/StartAndShutdown.java
index 242c1b0..cc80740 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/StartAndShutdown.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/StartAndShutdown.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.hook;
+package org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook;
 
-public interface StartAndShutdown extends Start,Shutdown {
+public interface StartAndShutdown extends Start, Shutdown {
 }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
index 0d99fce..be4db9b 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
@@ -49,6 +49,11 @@
     public abstract void commit(List<ConnectRecord> connectRecordList);
 
     /**
+     * close resource such as consumer
+     */
+    public abstract void close();
+
+    /**
      * Put the connect record to the eventbus.
      * @param eventBusName
      * @param connectRecord
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
index 48835c1..400cf47 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtime.common;
 
+import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.AbstractStartAndShutdown;
 import org.apache.rocketmq.common.CountDownLatch2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -24,7 +25,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public abstract class ServiceThread implements Runnable {
+public abstract class ServiceThread extends AbstractStartAndShutdown implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
 
@@ -34,6 +35,8 @@
     protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
     protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
     protected volatile boolean stopped = false;
+    protected boolean isDaemon = false;
+
 
     public ServiceThread() {
         this.thread = new Thread(this, this.getServiceName());
@@ -42,7 +45,14 @@
     public abstract String getServiceName();
 
     public void start() {
+        logger.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread);
+        if (!hasNotified.compareAndSet(false, true)) {
+            return;
+        }
+        stopped = false;
+        this.thread.setDaemon(isDaemon);
         this.thread.start();
+        logger.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread);
     }
 
     public void shutdown() {
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/utils/ShutdownUtils.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
similarity index 96%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/utils/ShutdownUtils.java
rename to adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
index 980c242..7feb906 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/utils/ShutdownUtils.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtimer.utils;
+package org.apache.rocketmq.eventbridge.adapter.runtime.utils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index f6b08cd..3005486 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -158,6 +158,14 @@
         consumeWorkerMap.get(runnerName).commit(msgIds);
     }
 
+    @Override
+    public void close() {
+        for (Map.Entry<String, ConsumeWorker> item : consumeWorkerMap.entrySet()) {
+            ConsumeWorker consumeWorker =  item.getValue();
+            consumeWorker.shutdown();
+        }
+    }
+
     /**
      * init rocketmq ref config
      */