[ISSUE #375]Refactoring cluster discovery module, removed the message exchange dependency (#387)

* Merge master branch

* Refactoring cluster discovery module, removed the message dependency

* Fix code style

* Remove useless code
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
index cf6b38a..f7ca487 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
@@ -55,7 +55,7 @@
 
         try {
             controller.start();
-            String tip = "The worker [" + controller.getWorker().getWorkerId() + "] boot success.";
+            String tip = "The worker [" + controller.getClusterManagementService().getCurrentWorker() + "] boot success.";
             log.info(tip);
             System.out.printf("%s%n", tip);
         } catch (Throwable e) {
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
index 0720c44..88e00e4 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
@@ -18,7 +18,6 @@
 package org.apache.rocketmq.connect.runtime.config;
 
 import java.io.File;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 /**
  * Configurations for runtime.
@@ -26,11 +25,6 @@
 public class ConnectConfig {
 
     /**
-     * Worker id to distinguish with other workers. Should be unique in a cluster.
-     */
-    private String workerId = "DEFAULT_WORKER_1";
-
-    /**
      * Storage directory for file store.
      */
     private String storePathRootDir = System.getProperty("user.home") + File.separator + "connectorStore";
@@ -41,8 +35,6 @@
 
     private int maxMessageSize;
 
-    private LanguageCode language;
-
     private int operationTimeout = 3000;
 
     private String rmqConsumerGroup = "connector-consumer-group";
@@ -60,7 +52,6 @@
      */
     private String clusterStoreTopic = "connector-cluster-topic";
 
-
     /**
      * Default topic to send/consume config change message.
      */
@@ -98,6 +89,7 @@
 
     private String pluginPaths;
 
+    private String connectClusterId = "DefaultConnectCluster";
 
     public String getNamesrvAddr() {
         return namesrvAddr;
@@ -123,14 +115,6 @@
         this.maxMessageSize = maxMessageSize;
     }
 
-    public LanguageCode getLanguage() {
-        return language;
-    }
-
-    public void setLanguage(LanguageCode language) {
-        this.language = language;
-    }
-
     public int getOperationTimeout() {
         return operationTimeout;
     }
@@ -179,14 +163,6 @@
         this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums;
     }
 
-    public String getWorkerId() {
-        return workerId;
-    }
-
-    public void setWorkerId(String workerId) {
-        this.workerId = workerId;
-    }
-
     public String getStorePathRootDir() {
         return storePathRootDir;
     }
@@ -266,4 +242,13 @@
     public void setOffsetStoreTopic(String offsetStoreTopic) {
         this.offsetStoreTopic = offsetStoreTopic;
     }
+
+    public String getConnectClusterId() {
+        return connectClusterId;
+    }
+
+    public void setConnectClusterId(String connectClusterId) {
+        this.connectClusterId = connectClusterId;
+    }
+
 }
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
index 5ea6e77..089b49c 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
@@ -52,7 +52,7 @@
 
     public static final String RMQ_PRODUCER_GROUP = "rmq-producer-group";
 
-    public static final String RMQ_CONSUMNER_GROUP = "rmq-consumner-group";
+    public static final String RMQ_CONSUMNER_GROUP = "rmq-consumer-group";
 
     public static final String OPERATION_TIMEOUT = "operation-timeout";
 
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index ba7a204..8ef8a78 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -23,7 +23,6 @@
 import io.openmessaging.connector.api.data.Converter;
 import io.openmessaging.connector.api.sink.SinkTask;
 import io.openmessaging.connector.api.source.SourceTask;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -33,32 +32,29 @@
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.connect.runtime.ConnectController;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
 import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 import org.apache.rocketmq.connect.runtime.service.TaskPositionCommitService;
 import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A worker to schedule all connectors and tasks in a process.
  */
 public class Worker {
-
-    /**
-     * Current worker id.
-     */
-    private final String workerId;
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
     /**
      * Current running connectors.
@@ -103,7 +99,6 @@
                   PositionManagementService positionManagementService, PositionManagementService offsetManagementService,
                   Plugin plugin) {
         this.connectConfig = connectConfig;
-        this.workerId = connectConfig.getWorkerId();
         this.taskExecutor = Executors.newCachedThreadPool();
         this.positionManagementService = positionManagementService;
         this.offsetManagementService = offsetManagementService;
@@ -273,7 +268,6 @@
 
                 if (task instanceof SourceTask) {
                     checkRmqProducerState();
-
                     WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName,
                             (SourceTask) task, keyValue,
                             new PositionStorageReaderImpl(positionManagementService),
@@ -287,7 +281,6 @@
                     consumer.setConsumerGroup(ConnectUtil.createGroupName(connectConfig.getRmqConsumerGroup()));
                     consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
                     consumer.setConsumerPullTimeoutMillis((long) connectConfig.getRmqMessageConsumeTimeout());
-                    consumer.setLanguage(LanguageCode.JAVA);
                     consumer.start();
 
                     WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName,
@@ -319,21 +312,16 @@
     }
 
     private void checkRmqProducerState() {
-        if (!this.producerStarted && this.producer.getDefaultMQProducerImpl().getServiceState() != ServiceState.RUNNING) {
+        if (!this.producerStarted) {
             try {
                 this.producer.start();
                 this.producerStarted = true;
             } catch (MQClientException e) {
-                //print log
-                e.printStackTrace();
+                log.error("Start producer failed!", e);
             }
         }
     }
 
-    public String getWorkerId() {
-        return workerId;
-    }
-
     public void stop() {
         if (this.producerStarted && this.producer != null) {
             this.producer.shutdown();
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 8e84c86..46078fa 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -167,7 +167,6 @@
                 producer.send(sourceMessage, new SendCallback() {
                     @Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) {
                         try {
-                            // send ok
                             if (null != partition && null != position) {
                                 positionData.put(partition, position);
                             }
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
index faae8ca..67c3e2f 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
@@ -17,7 +17,7 @@
 
 package org.apache.rocketmq.connect.runtime.service;
 
-import java.util.Map;
+import java.util.List;
 
 /**
  * Interface for cluster management.
@@ -41,7 +41,7 @@
      *
      * @return
      */
-    Map<String, Long> getAllAliveWorkers();
+    List<String> getAllAliveWorkers();
 
     /**
      * Register a worker status listener to listen the change of alive workers.
@@ -50,6 +50,8 @@
      */
     void registerListener(WorkerStatusListener listener);
 
+    String getCurrentWorker();
+
     interface WorkerStatusListener {
 
         /**
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
index 436b914..31dd61d 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
@@ -17,20 +17,20 @@
 
 package org.apache.rocketmq.connect.runtime.service;
 
-import java.util.HashMap;
+import io.netty.channel.ChannelHandlerContext;
+import io.openmessaging.connector.api.exception.ConnectException;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
-import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
-import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
-import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,249 +38,99 @@
 
     private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
-    /**
-     * Record all alive workers in memory.
-     */
-    private Map<String, Long> aliveWorker = new HashMap<>();
-
-    /**
-     * Data synchronizer to synchronize data with other workers.
-     */
-    private DataSynchronizer<String, Map> dataSynchronizer;
-
-    /**
-     * Listeners to trigger while worker change.
-     */
-    private Set<WorkerStatusListener> workerStatusListener;
-
-    /**
-     * Thread pool for scheduled tasks.
-     */
-    private final ScheduledExecutorService scheduledExecutorService;
+    private Set<WorkerStatusListener> workerStatusListeners;
 
     /**
      * Configs of current worker.
      */
     private final ConnectConfig connectConfig;
 
+    /**
+     * Used for worker discovery
+     */
+    private DefaultMQPullConsumer defaultMQPullConsumer;
+
     public ClusterManagementServiceImpl(ConnectConfig connectConfig) {
         this.connectConfig = connectConfig;
-        this.dataSynchronizer = new BrokerBasedLog<>(connectConfig,
-            connectConfig.getClusterStoreTopic(),
-            connectConfig.getWorkerId() + System.currentTimeMillis(),
-            new ClusterChangeCallback(),
-            new JsonConverter(),
-            new JsonConverter());
-        this.workerStatusListener = new HashSet<>();
-        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((r) ->
-            new Thread(r, "HeartBeatScheduledThread"));
+        this.workerStatusListeners = new HashSet<>();
+        this.defaultMQPullConsumer = new DefaultMQPullConsumer(connectConfig.getConnectClusterId());
+        this.defaultMQPullConsumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
     }
 
     @Override
     public void start() {
+        try {
+            this.defaultMQPullConsumer.start();
+        } catch (MQClientException ex) {
+            log.error("Start RocketMQ consumer for cluster management service error");
+            throw new ConnectException(-1, "Start RocketMQ consumer for cluster management service error");
+        }
+        WorkerChangeListener workerChangeListener = new WorkerChangeListener();
 
-        dataSynchronizer.start();
-
-        // On worker online
-        sendOnlineHeartBeat();
-
-        // check whether a machine is offline
-        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
-
-            try {
-
-                boolean changed = false;
-                for (String workerId : aliveWorker.keySet()) {
-                    if ((aliveWorker.get(workerId) + ClusterManagementService.WORKER_TIME_OUT) < System.currentTimeMillis()) {
-                        changed = true;
-                        aliveWorker.remove(workerId);
-                    }
-                }
-                if (!changed) {
-                    return;
-                }
-                for (WorkerStatusListener listener : ClusterManagementServiceImpl.this.workerStatusListener) {
-                    listener.onWorkerChange();
-                }
-            } catch (Exception e) {
-                log.error("schedule cluster alive workers error.", e);
-            }
-        }, 1000, 20 * 1000, TimeUnit.MILLISECONDS);
-
-        // Send heart beat periodically.
-        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
-            try {
-                sendAliveHeartBeat();
-            } catch (Exception e) {
-                log.error("schedule alive heart beat error.", e);
-            }
-        }, 1000, 10 * 1000, TimeUnit.MILLISECONDS);
+        this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl()
+            .getRebalanceImpl()
+            .getmQClientFactory()
+            .getMQClientAPIImpl()
+            .getRemotingClient()
+            .registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, workerChangeListener,
+                null);
     }
 
     @Override
     public void stop() {
-
-        sendOffLineHeartBeat();
-        this.scheduledExecutorService.shutdown();
-        try {
-            this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            log.error("shutdown scheduledExecutorService error.", e);
-        }
-        dataSynchronizer.stop();
-    }
-
-    public void sendAliveHeartBeat() {
-
-        aliveWorker.put(connectConfig.getWorkerId(), System.currentTimeMillis());
-        dataSynchronizer.send(HeartBeatEnum.ALIVE.name(), aliveWorker);
-    }
-
-    public void sendOnlineHeartBeat() {
-
-        aliveWorker.put(connectConfig.getWorkerId(), System.currentTimeMillis());
-        dataSynchronizer.send(HeartBeatEnum.ONLINE_BEGIN.name(), aliveWorker);
-    }
-
-    public void sendOnlineFinishHeartBeat() {
-
-        aliveWorker.put(connectConfig.getWorkerId(), System.currentTimeMillis());
-        dataSynchronizer.send(HeartBeatEnum.ONLINE_FINISH.name(), aliveWorker);
-    }
-
-    public void sendOffLineHeartBeat() {
-
-        Map<String, Long> offlineMap = new HashMap<>();
-        offlineMap.put(connectConfig.getWorkerId(), System.currentTimeMillis());
-        dataSynchronizer.send(HeartBeatEnum.OFFLINE.name(), offlineMap);
+        this.defaultMQPullConsumer.shutdown();
     }
 
     @Override
-    public Map<String, Long> getAllAliveWorkers() {
+    public List<String> getAllAliveWorkers() {
+        return this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl()
+            .getRebalanceImpl()
+            .getmQClientFactory()
+            .findConsumerIdList(connectConfig.getClusterStoreTopic(), connectConfig.getConnectClusterId());
+    }
 
-        return this.aliveWorker;
+    @Override
+    public String getCurrentWorker() {
+        return this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().getClientId();
     }
 
     @Override
     public void registerListener(WorkerStatusListener listener) {
-
-        this.workerStatusListener.add(listener);
+        this.workerStatusListeners.add(listener);
     }
 
-    /**
-     * Merge new received alive worker with info stored in memory.
-     *
-     * @param newAliveWorkerInfo
-     * @return
-     */
-    private boolean mergeAliveWorker(Map<String, Long> newAliveWorkerInfo) {
-
-        removeExpiredWorker(newAliveWorkerInfo);
-        boolean changed = false;
-        for (String workerId : newAliveWorkerInfo.keySet()) {
-
-            Long lastAliveTime = aliveWorker.get(workerId);
-            if (null == lastAliveTime) {
-
-                changed = true;
-                aliveWorker.put(workerId, newAliveWorkerInfo.get(workerId));
-            } else {
-
-                if (newAliveWorkerInfo.get(workerId) > lastAliveTime) {
-                    changed = true;
-                    aliveWorker.put(workerId, newAliveWorkerInfo.get(workerId));
-                }
-            }
-        }
-
-        return removeExpiredWorker(aliveWorker) && changed;
-    }
-
-    /**
-     * Remove expired workers in {@link ClusterManagementServiceImpl#aliveWorker}.
-     *
-     * @param aliveWorker
-     * @return
-     */
-    private boolean removeExpiredWorker(Map<String, Long> aliveWorker) {
-
-        boolean changed = false;
-        Iterator<String> iterator = aliveWorker.keySet().iterator();
-        while (iterator.hasNext()) {
-            String workerId = iterator.next();
-            if (aliveWorker.get(workerId) + ClusterManagementService.WORKER_TIME_OUT < System.currentTimeMillis()) {
-                changed = true;
-                iterator.remove();
-            }
-        }
-        return changed;
-    }
-
-    /**
-     * Callback from {@link ClusterManagementServiceImpl#dataSynchronizer}.
-     */
-    private class ClusterChangeCallback implements DataSynchronizerCallback<String, Map> {
+    public class WorkerChangeListener implements NettyRequestProcessor {
 
         @Override
-        public void onCompletion(Throwable error, String heartBeatEnum, Map result) {
-
-            boolean changed = true;
-            switch (HeartBeatEnum.valueOf(heartBeatEnum)) {
-
-                case ALIVE:
-                    changed = mergeAliveWorker(result);
-                    break;
-                case ONLINE_BEGIN:
-                    mergeAliveWorker(result);
-                    changed = false;
-                    sendOnlineFinishHeartBeat();
-                    break;
-                case ONLINE_FINISH:
-                    mergeAliveWorker(result);
-                    changed = true;
-                    break;
-                case OFFLINE:
-                    for (Object key : result.keySet()) {
-                        String workerId = (String) key;
-                        Long offlineTime = (Long) result.get(workerId);
-                        Long lastOnlineTime = aliveWorker.get(workerId);
-                        if (null == lastOnlineTime || lastOnlineTime > offlineTime) {
-                            changed = false;
-                        } else {
-                            changed = true;
-                            aliveWorker.remove(workerId);
-                        }
-                    }
-                    break;
+        public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+            switch (request.getCode()) {
+                case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
+                    return this.workerChanged(ctx, request);
                 default:
                     break;
             }
-            if (!changed) {
-                return;
-            }
-            for (WorkerStatusListener listener : ClusterManagementServiceImpl.this.workerStatusListener) {
-                listener.onWorkerChange();
-            }
+            return null;
         }
-    }
 
-    private enum HeartBeatEnum {
+        public RemotingCommand workerChanged(ChannelHandlerContext ctx,
+            RemotingCommand request) {
+            try {
+                final NotifyConsumerIdsChangedRequestHeader requestHeader =
+                    (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
+                log.info("Receive broker's notification[{}], the consumer group for connect: {} changed,  rebalance immediately",
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                    requestHeader.getConsumerGroup());
+                for (WorkerStatusListener workerChangeListener : workerStatusListeners) {
+                    workerChangeListener.onWorkerChange();
+                }
+            } catch (Exception e) {
+                log.error("NotifyConsumerIdsChanged for connect exception", RemotingHelper.exceptionSimpleDesc(e));
+            }
+            return null;
+        }
 
-        /**
-         * Send when first online.
-         */
-        ONLINE_BEGIN,
-        /**
-         * Send after receive online_begin.
-         */
-        ONLINE_FINISH,
-        /**
-         * Send when offline.
-         */
-        OFFLINE,
-        /**
-         * Alive heartbeat
-         */
-        ALIVE
+        @Override public boolean rejectRequest() {
+            return false;
+        }
     }
 }
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index aa453af..8ef7fe7 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -35,6 +35,7 @@
 import org.apache.rocketmq.connect.runtime.converter.ListConverter;
 import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
@@ -68,12 +69,14 @@
 
     private final Plugin plugin;
 
+    private final String configManagePrefix = "ConfigManage";
+
     public ConfigManagementServiceImpl(ConnectConfig connectConfig, Plugin plugin) {
 
         this.connectorConfigUpdateListener = new HashSet<>();
         this.dataSynchronizer = new BrokerBasedLog<>(connectConfig,
             connectConfig.getConfigStoreTopic(),
-            connectConfig.getWorkerId() + System.currentTimeMillis(),
+            ConnectUtil.createGroupName(configManagePrefix),
             new ConfigChangeCallback(),
             new JsonConverter(),
             new ConnAndTaskConfigConverter());
@@ -195,7 +198,6 @@
 
     @Override
     public Map<String, List<ConnectKeyValue>> getTaskConfigs() {
-
         Map<String, List<ConnectKeyValue>> result = new HashMap<>();
         Map<String, List<ConnectKeyValue>> taskConfigs = taskKeyValueStore.getKVMap();
         Map<String, ConnectKeyValue> filteredConnector = getConnectorConfigs();
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
index 0b564cd..44b4866 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
@@ -28,6 +28,7 @@
 import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
 import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
 import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
 import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
@@ -45,6 +46,8 @@
      */
     private DataSynchronizer<String, Map<ByteBuffer, ByteBuffer>> dataSynchronizer;
 
+    private final String offsetManagePrefix = "OffsetManage";
+
     /**
      * Listeners.
      */
@@ -57,7 +60,7 @@
             new ByteBufferConverter());
         this.dataSynchronizer = new BrokerBasedLog(connectConfig,
             connectConfig.getOffsetStoreTopic(),
-            connectConfig.getWorkerId() + System.currentTimeMillis(),
+            ConnectUtil.createGroupName(offsetManagePrefix),
             new OffsetChangeCallback(),
             new JsonConverter(),
             new ByteMapConverter());
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index 8d7e4fd..01c0c3b 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -28,6 +28,7 @@
 import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
 import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
 import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
 import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
@@ -50,6 +51,8 @@
      */
     private Set<PositionUpdateListener> positionUpdateListener;
 
+    private final String positionManagePrefix = "PositionManage";
+
     public PositionManagementServiceImpl(ConnectConfig connectConfig) {
 
         this.positionStore = new FileBaseKeyValueStore<>(FilePathConfigUtil.getPositionPath(connectConfig.getStorePathRootDir()),
@@ -57,7 +60,7 @@
             new ByteBufferConverter());
         this.dataSynchronizer = new BrokerBasedLog(connectConfig,
             connectConfig.getPositionStoreTopic(),
-            connectConfig.getWorkerId() + System.currentTimeMillis(),
+            ConnectUtil.createGroupName(positionManagePrefix),
             new PositionChangeCallback(),
             new JsonConverter(),
             new ByteMapConverter());
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
index 9bf7038..c5644d7 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
@@ -73,11 +73,11 @@
      */
     public void doRebalance() {
 
-        Map<String, Long> curAliveWorkers = clusterManagementService.getAllAliveWorkers();
+        List<String> curAliveWorkers = clusterManagementService.getAllAliveWorkers();
         Map<String, ConnectKeyValue> curConnectorConfigs = configManagementService.getConnectorConfigs();
         Map<String, List<ConnectKeyValue>> curTaskConfigs = configManagementService.getTaskConfigs();
 
-        ConnAndTaskConfigs allocateResult = allocateConnAndTaskStrategy.allocate(curAliveWorkers.keySet(), worker.getWorkerId(), curConnectorConfigs, curTaskConfigs);
+        ConnAndTaskConfigs allocateResult = allocateConnAndTaskStrategy.allocate(curAliveWorkers, clusterManagementService.getCurrentWorker(), curConnectorConfigs, curTaskConfigs);
         log.info("Allocated connector:{}", allocateResult.getConnectorConfigs());
         log.info("Allocated task:{}", allocateResult.getTaskConfigs());
         updateProcessConfigsInRebalance(allocateResult);
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceService.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceService.java
index 323d1f2..49945f5 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceService.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceService.java
@@ -76,6 +76,7 @@
          */
         @Override
         public void onWorkerChange() {
+            log.info("Wake up rebalance service");
             RebalanceService.this.wakeup();
         }
     }
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/AllocateConnAndTaskStrategy.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/AllocateConnAndTaskStrategy.java
index 523b29b..0d2f78d 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/AllocateConnAndTaskStrategy.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/AllocateConnAndTaskStrategy.java
@@ -19,7 +19,6 @@
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 
@@ -37,6 +36,6 @@
      * @param taskConfigs All task configs.
      * @return
      */
-    ConnAndTaskConfigs allocate(Set<String> allWorker, String curWorker, Map<String, ConnectKeyValue> connectorConfigs,
+    ConnAndTaskConfigs allocate(List<String> allWorker, String curWorker, Map<String, ConnectKeyValue> connectorConfigs,
         Map<String, List<ConnectKeyValue>> taskConfigs);
 }
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
index 6d91d3e..a61982d 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
@@ -21,18 +21,21 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default allocate strategy, distribute connectors and tasks averagely.
  */
 public class DefaultAllocateConnAndTaskStrategy implements AllocateConnAndTaskStrategy {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
     @Override
-    public ConnAndTaskConfigs allocate(Set<String> allWorker, String curWorker,
+    public ConnAndTaskConfigs allocate(List<String> allWorker, String curWorker,
         Map<String, ConnectKeyValue> connectorConfigs,
         Map<String, List<ConnectKeyValue>> taskConfigs) {
         ConnAndTaskConfigs allocateResult = new ConnAndTaskConfigs();
@@ -42,8 +45,11 @@
 
         List<String> sortedWorkers = new ArrayList<>(allWorker);
         Collections.sort(sortedWorkers);
+        log.debug("sortedWorkers: {}", sortedWorkers);
         Map<String, ConnectKeyValue> sortedConnectorConfigs = getSortedMap(connectorConfigs);
+        log.debug("SortedConnectorConfigs: {}", sortedConnectorConfigs);
         Map<String, List<ConnectKeyValue>> sortedTaskConfigs = getSortedMap(taskConfigs);
+        log.debug("SortedTaskConfigs: {}", sortedTaskConfigs);
         int index = 0;
         for (String connectorName : sortedConnectorConfigs.keySet()) {
             String allocatedWorker = sortedWorkers.get(index % sortedWorkers.size());
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index 5f3f1a7..0380046 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -20,11 +20,18 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
 
 public class ConnectUtil {
 
     public static String createGroupName(String prefix) {
-        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+        StringBuilder sb = new StringBuilder();
+        sb.append(prefix).append("-");
+        sb.append(RemotingUtil.getLocalAddress()).append("-");
+        sb.append(UtilAll.getPid()).append("-");
+        sb.append(System.nanoTime());
+        return sb.toString().replace(".", "-");
     }
 
     public static String createInstance(String servers) {
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
index cf6addb..2474fe5 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
@@ -30,12 +30,12 @@
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,7 +96,6 @@
         this.producer.setProducerGroup(workId);
         this.producer.setSendMsgTimeout(connectConfig.getOperationTimeout());
         this.producer.setMaxMessageSize(MAX_MESSAGE_SIZE);
-        this.producer.setLanguage(LanguageCode.JAVA);
 
         this.consumer = new DefaultMQPushConsumer();
         this.consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
@@ -104,10 +103,8 @@
         this.consumer.setConsumerGroup(workId);
         this.consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
         this.consumer.setConsumeTimeout((long) connectConfig.getRmqMessageConsumeTimeout());
-        this.consumer.setConsumeThreadMax(connectConfig.getRmqMaxConsumeThreadNums());
         this.consumer.setConsumeThreadMin(connectConfig.getRmqMinConsumeThreadNums());
-        this.consumer.setLanguage(LanguageCode.JAVA);
-
+        this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
     }
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/resources/connect.conf b/rocketmq-connect/rocketmq-connect-runtime/src/main/resources/connect.conf
index af23ac5..bbbbb4a 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/resources/connect.conf
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/resources/connect.conf
@@ -13,14 +13,11 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-## Worker id, should be unique
-workerId=DEFAULT_WORKER_1
-
 ## Http prot for user to access REST API
 httpPort=8081
 
 # Rocketmq namesrvAddr
-namesrvAddr=127.0.0.1:9876
+namesrvAddr=localhost:9876
 
 # Source or sink connector jar file dir,The default value is rocketmq-connect-sample
 pluginPaths=/home/connect/rocketmq-connect/rocketmq-connect-sample/target
\ No newline at end of file
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/config/ConnectConfigTest.java b/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/config/ConnectConfigTest.java
index d2cbcb2..1935604 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/config/ConnectConfigTest.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/config/ConnectConfigTest.java
@@ -27,8 +27,6 @@
     public void testConnectConfigAttribute() {
         ConnectConfig connectConfig = new ConnectConfig();
         connectConfig.setHttpPort(8081);
-        connectConfig.setWorkerId("DEFAULT_WORKER_1");
         assertThat(connectConfig.getHttpPort()).isEqualTo(8081);
-        assertThat(connectConfig.getWorkerId()).isEqualTo("DEFAULT_WORKER_1");
     }
 }
\ No newline at end of file
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java b/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 1fac6b9..58079e8 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -76,8 +76,8 @@
     public void init() {
         connectConfig = new ConnectConfig();
         connectConfig.setHttpPort(8081);
-        connectConfig.setWorkerId("DEFAULT_WORKER_1");
         connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
+        connectConfig.setNamesrvAddr("localhost:9876");
         worker = new Worker(connectConfig, positionManagementService, offsetManagementService, plugin);
 
         Set<WorkerConnector> workingConnectors = new HashSet<>();
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java b/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index 11c5c6b..3bbf183 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -114,7 +114,7 @@
 
     private HttpClient httpClient;
 
-    private Map<String, Long> aliveWorker;
+    private List<String> aliveWorker;
 
     private Map<String, ConnectKeyValue> connectorConfigs;
 
@@ -158,10 +158,10 @@
         when(configManagementService.getConnectorConfigs()).thenReturn(connectorConfigs);
         when(configManagementService.getTaskConfigs()).thenReturn(taskConfigs);
 
-        aliveWorker = new HashMap<String, Long>() {
+        aliveWorker = new ArrayList<String>() {
             {
-                put("workerId1", System.currentTimeMillis());
-                put("workerId2", System.currentTimeMillis());
+                add("workerId1");
+                add("workerId2");
             }
         };
 
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImplTest.java b/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImplTest.java
deleted file mode 100644
index ec539a0..0000000
--- a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImplTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.connect.runtime.service;
-
-import io.openmessaging.Future;
-import io.openmessaging.producer.SendResult;
-import java.io.File;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
-import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-@RunWith(MockitoJUnitRunner.class)
-public class ClusterManagementServiceImplTest {
-
-    private ConnectConfig connectConfig;
-
-    @Mock
-    private DefaultMQProducer producer;
-
-    @Mock
-    private DefaultMQPushConsumer consumer;
-
-    @Mock
-    private Future<SendResult> future;
-
-    private ClusterManagementServiceImpl clusterManagementService;
-
-    @Before
-    public void init() throws RemotingException, MQClientException, InterruptedException, NoSuchFieldException, IllegalAccessException {
-        String consumerGroup = UUID.randomUUID().toString();
-        String producerGroup = UUID.randomUUID().toString();
-
-        connectConfig = new ConnectConfig();
-        connectConfig.setHttpPort(8081);
-        connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
-        connectConfig.setWorkerId("testWorkerId");
-        connectConfig.setRmqConsumerGroup(consumerGroup);
-        connectConfig.setRmqProducerGroup(producerGroup);
-        connectConfig.setNamesrvAddr("127.0.0.1:9876");
-        connectConfig.setRmqMinConsumeThreadNums(1);
-        connectConfig.setRmqMaxConsumeThreadNums(32);
-        connectConfig.setRmqMessageConsumeTimeout(3 * 1000);
-        doAnswer(new Answer() {
-            @Override
-            public Void answer(InvocationOnMock invocation) throws Exception {
-                final Message message = invocation.getArgument(0);
-                byte[] bytes = message.getBody();
-
-                final Field dataSynchronizerField = ClusterManagementServiceImpl.class.getDeclaredField("dataSynchronizer");
-                dataSynchronizerField.setAccessible(true);
-                BrokerBasedLog<String, Map> dataSynchronizer = (BrokerBasedLog<String, Map>) dataSynchronizerField.get(clusterManagementService);
-
-                final Method decodeKeyValueMethod = BrokerBasedLog.class.getDeclaredMethod("decodeKeyValue", byte[].class);
-                decodeKeyValueMethod.setAccessible(true);
-                Map<String, Map> map = (Map<String, Map>) decodeKeyValueMethod.invoke(dataSynchronizer, bytes);
-
-                final Field dataSynchronizerCallbackField = BrokerBasedLog.class.getDeclaredField("dataSynchronizerCallback");
-                dataSynchronizerCallbackField.setAccessible(true);
-                final DataSynchronizerCallback<String, Map> dataSynchronizerCallback = (DataSynchronizerCallback<String, Map>) dataSynchronizerCallbackField.get(dataSynchronizer);
-                for (String key : map.keySet()) {
-                    dataSynchronizerCallback.onCompletion(null, key, map.get(key));
-                }
-                return null;
-            }
-        }).when(producer).send(any(Message.class), any(SendCallback.class));
-
-        clusterManagementService = new ClusterManagementServiceImpl(connectConfig);
-        final Field dataSynchronizerField = ClusterManagementServiceImpl.class.getDeclaredField("dataSynchronizer");
-        dataSynchronizerField.setAccessible(true);
-
-        final Field producerField = BrokerBasedLog.class.getDeclaredField("producer");
-        producerField.setAccessible(true);
-        producerField.set((BrokerBasedLog<String, Map>) dataSynchronizerField.get(clusterManagementService), producer);
-
-        final Field consumerField = BrokerBasedLog.class.getDeclaredField("consumer");
-        consumerField.setAccessible(true);
-        consumerField.set((BrokerBasedLog<String, Map>) dataSynchronizerField.get(clusterManagementService), consumer);
-    }
-
-    @Test
-    public void testSendHeartBeat() throws Exception {
-        Map<String, Long> aliveWorker = new HashMap<String, Long>() {
-            {
-                put("testWorkerId2", System.currentTimeMillis());
-            }
-        };
-        Field aliveWorkerField = ClusterManagementServiceImpl.class.getDeclaredField("aliveWorker");
-        aliveWorkerField.setAccessible(true);
-        aliveWorkerField.set(clusterManagementService, aliveWorker);
-
-        clusterManagementService.sendAliveHeartBeat();
-
-        verify(producer, times(1)).send(any(Message.class), any(SendCallback.class));
-
-        Map<String, Long> allAliveWorkers = clusterManagementService.getAllAliveWorkers();
-        Set<String> keys = allAliveWorkers.keySet();
-        assertEquals(2, keys.size());
-        aliveWorker.remove("testWorkerId2");
-        for (String s : keys) {
-            Assert.assertTrue(s.equals(connectConfig.getWorkerId()));
-        }
-
-        aliveWorker.clear();
-        clusterManagementService.sendOnlineFinishHeartBeat();
-
-        verify(producer, times(2)).send(any(Message.class), any(SendCallback.class));
-
-        allAliveWorkers = clusterManagementService.getAllAliveWorkers();
-        keys = allAliveWorkers.keySet();
-        assertEquals(1, keys.size());
-        for (String s : keys) {
-            Assert.assertTrue(s.equals(connectConfig.getWorkerId()));
-        }
-
-        aliveWorker.clear();
-        clusterManagementService.sendOnlineHeartBeat();
-
-        verify(producer, times(4)).send(any(Message.class), any(SendCallback.class));
-
-        allAliveWorkers = clusterManagementService.getAllAliveWorkers();
-        keys = allAliveWorkers.keySet();
-        assertEquals(1, keys.size());
-        for (String s : keys) {
-            Assert.assertTrue(s.equals(connectConfig.getWorkerId()));
-        }
-
-        aliveWorker.clear();
-        aliveWorker.put("testWorkerId", System.currentTimeMillis());
-        assertEquals(1, aliveWorker.size());
-        clusterManagementService.sendOffLineHeartBeat();
-
-        verify(producer, times(5)).send(any(Message.class), any(SendCallback.class));
-
-        allAliveWorkers = clusterManagementService.getAllAliveWorkers();
-        keys = allAliveWorkers.keySet();
-        assertEquals(0, keys.size());
-    }
-
-}
\ No newline at end of file
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java b/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
index 5f248fe..4af8f61 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
@@ -84,7 +84,6 @@
         connectConfig = new ConnectConfig();
         connectConfig.setHttpPort(8081);
         connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
-        connectConfig.setWorkerId("testWorkerId");
         connectConfig.setRmqConsumerGroup("testConsumerGroup");
         connectorName = "testConnectorName";
 
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java b/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
index 16d05eb..31c70f6 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
@@ -80,8 +80,8 @@
     public void init() throws Exception {
         connectConfig = new ConnectConfig();
         connectConfig.setHttpPort(8081);
+        connectConfig.setNamesrvAddr("localhost:9876");
         connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
-        connectConfig.setWorkerId("testWorkerId");
         connectConfig.setRmqConsumerGroup("testConsumerGroup");
         doAnswer(new Answer() {
             @Override
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategyTest.java b/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategyTest.java
index 95caac7..21610f7 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategyTest.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategyTest.java
@@ -36,7 +36,7 @@
     @Test
     public void testAllocate() {
         DefaultAllocateConnAndTaskStrategy defaultAllocateConnAndTaskStrategy = new DefaultAllocateConnAndTaskStrategy();
-        Set<String> allWorker = new HashSet<String>() {
+        List<String> allWorker = new ArrayList<String>() {
             {
                 add("workId1");
                 add("workId2");