merge from replaceDB
diff --git a/pom.xml b/pom.xml
index a865740..0423a7c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,7 +99,6 @@
         <module>rocketmq-streams-channel-http</module>
         <module>rocketmq-streams-state</module>
         <module>rocketmq-streams-examples</module>
-        <module>rocketmq-streams-checkpoint</module>
         <module>rocketmq-streams-connectors</module>
         <module>rocketmq-streams-channel-syslog</module>
         <module>rocketmq-streams-channel-es</module>
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java
deleted file mode 100644
index 6650744..0000000
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.streams;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.rocketmq.client.consumer.store.OffsetStore;
-import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.streams.common.channel.source.AbstractSupportShuffleSource;
-import org.apache.rocketmq.streams.common.utils.ReflectUtil;
-import org.apache.rocketmq.streams.debug.DebugWriter;
-import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
-
-public class RocketMQOffset implements OffsetStore {
-    protected OffsetStore offsetStore;
-    protected AbstractSupportShuffleSource source;
-    private AtomicBoolean starting;
-
-    public RocketMQOffset(OffsetStore offsetStore, AbstractSupportShuffleSource source) {
-        this.offsetStore = offsetStore;
-        this.source = source;
-        this.starting = new AtomicBoolean(true);
-    }
-
-    @Override
-    public void load() throws MQClientException {
-        offsetStore.load();
-    }
-
-    @Override
-    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
-        offsetStore.updateOffset(mq, offset, increaseOnly);
-    }
-
-    @Override
-    public long readOffset(MessageQueue mq, ReadOffsetType type) {
-        return offsetStore.readOffset(mq, type);
-    }
-
-    @Override
-    public void persistAll(Set<MessageQueue> mqs) {
-        offsetStore.persistAll(mqs);
-    }
-
-    @Override
-    public void persist(MessageQueue mq) {
-        offsetStore.persist(mq);
-    }
-
-    @Override
-    public void removeOffset(MessageQueue mq) {
-        //todo 启动时第一次做rebalance时source中也没有原有消费mq,不做移除,做了会有副作用
-        //后续整个checkpoint机制都会调整成异步,整块代码都不会保留,目前为了整体跑通,不做修改。
-        if (starting.get()) {
-            starting.set(false);
-        } else {
-            Set<String> splitIds = new HashSet<>();
-            splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
-            source.removeSplit(splitIds);
-        }
-
-        offsetStore.removeOffset(mq);
-    }
-
-    @Override
-    public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
-        return offsetStore.cloneOffsetTable(topic);
-    }
-
-    @Override
-    public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        source.sendCheckpoint(new RocketMQMessageQueue(mq).getQueueId());
-        if (DebugWriter.isOpenDebug()) {
-            ConcurrentMap<MessageQueue, AtomicLong> offsetTable = ReflectUtil.getDeclaredField(this.offsetStore, "offsetTable");
-            DebugWriter.getInstance(source.getTopic()).writeSaveOffset(mq, offsetTable.get(mq));
-        }
-        offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
-    }
-}
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/MessageListenerDelegator.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/MessageListenerDelegator.java
new file mode 100644
index 0000000..08c047c
--- /dev/null
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/MessageListenerDelegator.java
@@ -0,0 +1,62 @@
+package org.apache.rocketmq.streams.source;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class MessageListenerDelegator implements MessageQueueListener {
+    private final MessageQueueListener delegator;
+    private Set<MessageQueue> lastDivided = null;
+    private Set<MessageQueue> removingQueue;
+
+
+    public MessageListenerDelegator(MessageQueueListener delegator) {
+        this.delegator = delegator;
+    }
+
+
+    @Override
+    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+
+        //上一次分配有,但是这一次没有,需要对这些mq进行状态移除
+        if (lastDivided != null) {
+            this.removingQueue = new HashSet<>();
+            for (MessageQueue last : lastDivided) {
+                if (!mqDivided.contains(last)) {
+                    removingQueue.add(last);
+                }
+            }
+        }
+
+        delegator.messageQueueChanged(topic, mqAll, mqDivided);
+
+        lastDivided = mqDivided;
+    }
+
+    public Set<MessageQueue> getLastDivided() {
+        return Collections.unmodifiableSet(this.lastDivided);
+    }
+
+    public Set<MessageQueue> getRemovingQueue() {
+        return Collections.unmodifiableSet(this.removingQueue);
+    }
+}
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index 5d60532..b9f89e1 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -18,7 +18,9 @@
 package org.apache.rocketmq.streams.source;
 
 import com.alibaba.fastjson.JSONObject;
+
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -26,36 +28,27 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueByConfig;
-import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
-import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.body.Connection;
 import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.streams.common.channel.source.AbstractSupportShuffleSource;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.utils.ReflectUtil;
-import org.apache.rocketmq.streams.debug.DebugWriter;
 import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 
@@ -64,24 +57,19 @@
     protected static final Log LOG = LogFactory.getLog(RocketMQSource.class);
 
     private static final String STRATEGY_AVERAGE = "average";
-    private static final String STRATEGY_MACHINE = "machine";
-    private static final String STRATEGY_CONFIG = "config";
 
     @ENVDependence
-    protected String tags = "*";
+    private String tags = SubscriptionData.SUB_ALL;
 
-    /**
-     * 消息队列命名空间接入点
-     */
-
-
-    protected Long pullIntervalMs;
-
-    protected String strategyName;
-
-    protected transient DefaultMQPushConsumer consumer;
-    protected transient ConsumeFromWhere consumeFromWhere;//默认从哪里消费,不会被持久化。不设置默认从尾部消费
-    protected transient String consumerOffset;//从哪里开始消费
+    private int userPullThreadNum = 1;
+    private long pullTimeout;
+    private long commitInternalMs = 1000;
+    private String strategyName;
+    private transient ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET;//默认从哪里消费,不会被持久化。不设置默认从尾部消费
+    private RPCHook rpcHook;
+    private transient DefaultLitePullConsumer pullConsumer;
+    private transient ExecutorService executorService;
+    private transient PullTask[] pullTasks;
 
     public RocketMQSource() {
     }
@@ -107,109 +95,55 @@
     protected boolean startSource() {
         try {
             destroyConsumer();
-            consumer = startConsumer();
+
+            this.pullConsumer = buildPullConsumer(topic, groupName, namesrvAddr, tags, rpcHook, consumeFromWhere);
+            this.pullConsumer.start();
+
+            if (this.executorService == null) {
+                this.executorService = new ThreadPoolExecutor(userPullThreadNum, userPullThreadNum, 0, TimeUnit.MILLISECONDS,
+                        new ArrayBlockingQueue<>(1000), r -> new Thread(r, "RStream-poll-thread"));
+            }
+
+            pullTasks = new PullTask[userPullThreadNum];
+            for (int i = 0; i < userPullThreadNum; i++) {
+                pullTasks[i] = new PullTask(this.pullConsumer, pullTimeout, commitInternalMs);
+                this.executorService.execute(pullTasks[i]);
+            }
+
             return true;
-        } catch (Exception e) {
+        } catch (MQClientException e) {
             setInitSuccess(false);
-            e.printStackTrace();
             throw new RuntimeException("start rocketmq channel error " + topic, e);
         }
     }
 
-    protected DefaultMQPushConsumer startConsumer() {
-        try {
-            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
-            if (pullIntervalMs != null) {
-                consumer.setPullInterval(pullIntervalMs);
-            }
-            AllocateMessageQueueStrategy defaultStrategy = new AllocateMessageQueueAveragely();
-            if (STRATEGY_AVERAGE.equalsIgnoreCase(this.strategyName)) {
-                consumer.setAllocateMessageQueueStrategy(defaultStrategy);
-            } else if (STRATEGY_MACHINE.equalsIgnoreCase(this.strategyName)) {
-                //consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueByMachine(defaultStrategy));
-            } else if (STRATEGY_CONFIG.equalsIgnoreCase(this.strategyName)) {
-                consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueByConfig());
-            }
+    public DefaultLitePullConsumer buildPullConsumer(String topic, String groupName, String namesrv, String tags,
+                                                     RPCHook rpcHook, ConsumeFromWhere consumeFromWhere) throws MQClientException {
+        DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(groupName, rpcHook);
+        pullConsumer.setNamesrvAddr(namesrv);
+        pullConsumer.setConsumeFromWhere(consumeFromWhere);
+        pullConsumer.subscribe(topic, tags);
+        pullConsumer.setAutoCommit(false);
+        pullConsumer.setPullBatchSize(1000);
 
-            consumer.setPersistConsumerOffsetInterval((int) this.checkpointTime);
-            consumer.setConsumeMessageBatchMaxSize(maxFetchLogGroupSize);
-            consumer.setNamesrvAddr(this.namesrvAddr);
-            if (consumeFromWhere != null) {
-                consumer.setConsumeFromWhere(consumeFromWhere);
-                if (consumerOffset != null) {
-                    consumer.setConsumeTimestamp(consumerOffset);
-                }
-            }
-            Map<String, Boolean> isFirstDataForQueue = new HashMap<>();
-            //consumer.setCommitOffsetWithPullRequestEnable(false);
-            consumer.subscribe(topic, tags);
-            consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
-                try {
-                    int i = 0;
-                    for (MessageExt msg : msgs) {
+        MessageQueueListener origin = pullConsumer.getMessageQueueListener();
 
-                        JSONObject jsonObject = create(msg.getBody(), msg.getProperties());
+        MessageListenerDelegator delegator = new MessageListenerDelegator(origin);
 
-                        String queueId = RocketMQMessageQueue.getQueueId(context.getMessageQueue());
-                        String offset = msg.getQueueOffset() + "";
-                        org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, queueId, offset, false);
-                        message.getHeader().setOffsetIsLong(true);
+        pullConsumer.setMessageQueueListener(delegator);
 
-                        if (DebugWriter.isOpenDebug()) {
-                            Boolean isFirstData = isFirstDataForQueue.get(queueId);
-                            if (isFirstData == null) {
-                                synchronized (this) {
-                                    isFirstData = isFirstDataForQueue.get(queueId);
-                                    if (isFirstData == null) {
-                                        isFirstDataForQueue.put(queueId, true);
-                                    }
-                                    DebugWriter.getInstance(getTopic()).receiveFirstData(queueId, msg.getQueueOffset());
-                                }
-                            }
-                        }
-
-                        if (i == msgs.size() - 1) {
-                            message.getHeader().setNeedFlush(true);
-                        }
-                        executeMessage(message);
-                        i++;
-                    }
-                } catch (Exception e) {
-
-                    LOG.error("consume message from rocketmq error " + e, e);
-                    e.printStackTrace();
-
-                }
-
-                return ConsumeOrderlyStatus.SUCCESS;// 返回消费成功
-            });
-
-            setOffsetStore(consumer);
-            consumer.start();
-
-            return consumer;
-        } catch (Exception e) {
-            setInitSuccess(false);
-            e.printStackTrace();
-            throw new RuntimeException("start metaq channel error " + topic, e);
-        }
+        return pullConsumer;
     }
 
+
     @Override
     public List<ISplit> getAllSplits() {
         try {
             List<ISplit> messageQueues = new ArrayList<>();
-            if (messageQueues == null || messageQueues.size() == 0) {
-                Set<MessageQueue> metaqQueueSet = consumer.fetchSubscribeMessageQueues(this.topic);
-                for (MessageQueue queue : metaqQueueSet) {
-                    RocketMQMessageQueue metaqMessageQueue = new RocketMQMessageQueue(queue);
-                    if (isNotDataSplit(metaqMessageQueue.getQueueId())) {
-                        continue;
-                    }
-
-                    messageQueues.add(metaqMessageQueue);
-
-                }
+            Collection<MessageQueue> metaqQueueSet = this.pullConsumer.fetchMessageQueues(this.topic);
+            for (MessageQueue queue : metaqQueueSet) {
+                RocketMQMessageQueue metaqMessageQueue = new RocketMQMessageQueue(queue);
+                messageQueues.add(metaqMessageQueue);
             }
             return messageQueues;
         } catch (MQClientException e) {
@@ -218,12 +152,13 @@
         }
     }
 
+    //todo 计算正在工作的分片?
     @Override
     public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
         DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
         defaultMQAdminExt.setVipChannelEnabled(false);
         defaultMQAdminExt.setAdminExtGroup(UUID.randomUUID().toString());
-        defaultMQAdminExt.setInstanceName(this.consumer.getInstanceName());
+        defaultMQAdminExt.setInstanceName(this.pullConsumer.getInstanceName());
         try {
             defaultMQAdminExt.start();
             Map<MessageQueue, String> queue2Instances = getMessageQueueAllocationResult(defaultMQAdminExt, this.groupName);
@@ -234,11 +169,7 @@
                     continue;
                 }
                 String instanceName = queue2Instances.get(messageQueue);
-                List<ISplit> splits = instanceOwnerQueues.get(instanceName);
-                if (splits == null) {
-                    splits = new ArrayList<>();
-                    instanceOwnerQueues.put(instanceName, splits);
-                }
+                List<ISplit> splits = instanceOwnerQueues.computeIfAbsent(instanceName, k -> new ArrayList<>());
                 splits.add(metaqMessageQueue);
             }
             return instanceOwnerQueues;
@@ -250,9 +181,9 @@
         }
     }
 
-    protected Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
-        String groupName) {
-        HashMap results = new HashMap();
+    private Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
+                                                                      String groupName) {
+        HashMap<MessageQueue, String> results = new HashMap<>();
 
         try {
             ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
@@ -276,49 +207,9 @@
         return results;
     }
 
-    /**
-     * 设置offset存储,包装原有的RemoteBrokerOffsetStore,在保存offset前发送系统消息 this method suggest to be removed, use check barrier to achieve checkpoint asynchronous.
-     *
-     * @param consumer
-     */
-    protected void setOffsetStore(DefaultMQPushConsumer consumer) {
-        DefaultMQPushConsumerImpl defaultMQPushConsumer = consumer.getDefaultMQPushConsumerImpl();
-        if (consumer.getMessageModel() == MessageModel.CLUSTERING) {
-            consumer.changeInstanceNameToPID();
-        }
-        MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(defaultMQPushConsumer.getDefaultMQPushConsumer());
-        RemoteBrokerOffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, NamespaceUtil.wrapNamespace(consumer.getNamespace(), consumer.getConsumerGroup())) {
-            Set<MessageQueue> firstComing = new HashSet<>();
-            @Override
-            public void removeOffset(MessageQueue mq) {
-                if (!firstComing.contains(mq)){
-                    firstComing.add(mq);
-                } else {
-                    Set<String> splitIds = new HashSet<>();
-                    splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
-                    removeSplit(splitIds);
-                }
-                super.removeOffset(mq);
-            }
-
-            @Override
-            public void updateConsumeOffsetToBroker(MessageQueue mq, long offset,
-                boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-                sendCheckpoint(new RocketMQMessageQueue(mq).getQueueId());
-                if (DebugWriter.isOpenDebug()) {
-                    ConcurrentMap<MessageQueue, AtomicLong> offsetTable = ReflectUtil.getDeclaredField(this, "offsetTable");
-                    DebugWriter.getInstance(getTopic()).writeSaveOffset(mq, offsetTable.get(mq));
-                }
-                //                LOG.info("the queue Id is " + new RocketMQMessageQueue(mq).getQueueId() + ",rocketmq start save offset,the save time is " + DateUtil.getCurrentTimeString());
-                super.updateConsumeOffsetToBroker(mq, offset, isOneway);
-            }
-        };
-        consumer.setOffsetStore(offsetStore);//每个一分钟运行一次
-    }
-
     @Override
     protected boolean isNotDataSplit(String queueId) {
-        return queueId.toUpperCase().startsWith("RETRY") || queueId.toUpperCase().startsWith("%RETRY%");
+        return false;
     }
 
     @Override
@@ -337,21 +228,24 @@
     }
 
     public void destroyConsumer() {
-        List<DefaultMQPushConsumer> oldConsumers = new ArrayList<>();
-        if (consumer != null) {
-            oldConsumers.add(consumer);
-        }
-        try {
-            for (DefaultMQPushConsumer consumer : oldConsumers) {
-                consumer.shutdown();
-            }
-
-        } catch (Throwable t) {
-            if (LOG.isWarnEnabled()) {
-                LOG.warn(t.getMessage(), t);
-            }
+        if (this.pullConsumer == null || this.pullTasks == null || this.pullTasks.length == 0) {
+            return;
         }
 
+        //不在拉取新的数据
+        for (PullTask pullTask : pullTasks) {
+            pullTask.shutdown();
+        }
+
+        //线程池关闭
+        this.executorService.shutdown();
+
+        //关闭消费实例
+        this.pullConsumer.shutdown();
+    }
+
+    public void commit(Set<MessageQueue> messageQueues) {
+        this.pullConsumer.commit(messageQueues, true);
     }
 
     @Override
@@ -360,12 +254,83 @@
         destroyConsumer();
     }
 
-    public String getStrategyName() {
-        return strategyName;
-    }
+    public class PullTask implements Runnable {
+        private final long pullTimeout;
+        private final long commitInternalMs;
+        private volatile long lastCommit = 0L;
 
-    public void setStrategyName(String strategyName) {
-        this.strategyName = strategyName;
+        private final DefaultLitePullConsumer pullConsumer;
+        private final MessageListenerDelegator delegator;
+
+        private volatile boolean isStopped = false;
+
+        public PullTask(DefaultLitePullConsumer pullConsumer, long pullTimeout, long commitInternalMs) {
+            this.pullConsumer = pullConsumer;
+            this.delegator = (MessageListenerDelegator) pullConsumer.getMessageQueueListener();
+            this.pullTimeout = pullTimeout == 0 ? pullConsumer.getPollTimeoutMillis() : pullTimeout;
+            this.commitInternalMs = commitInternalMs;
+        }
+
+        @Override
+        public void run() {
+
+            while (!this.isStopped) {
+
+                synchronized (this.pullConsumer) {
+                    Set<MessageQueue> removingQueue = this.delegator.getRemovingQueue();
+                    if (removingQueue != null && removingQueue.size() != 0) {
+                        try {
+                            Set<String> splitIds = new HashSet<>();
+                            for (MessageQueue mq : removingQueue) {
+                                splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
+                            }
+
+                            RocketMQSource.this.removeSplit(splitIds);
+                        } finally {
+                            removingQueue.clear();
+                        }
+                    }
+                }
+
+
+                List<MessageExt> msgs = pullConsumer.poll(pullTimeout);
+
+                int i = 0;
+                for (MessageExt msg : msgs) {
+                    JSONObject jsonObject = create(msg.getBody(), msg.getProperties());
+
+                    String topic = msg.getTopic();
+                    int queueId = msg.getQueueId();
+                    String brokerName = msg.getBrokerName();
+                    MessageQueue queue = new MessageQueue(topic, brokerName, queueId);
+                    String unionQueueId = RocketMQMessageQueue.getQueueId(queue);
+
+
+                    String offset = msg.getQueueOffset() + "";
+                    org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, unionQueueId, offset, false);
+                    message.getHeader().setOffsetIsLong(true);
+
+                    if (i == msgs.size() - 1) {
+                        message.getHeader().setNeedFlush(true);
+                    }
+                    executeMessage(message);
+                    i++;
+                }
+
+                //拉取的批量消息处理完成以后判断是否提交位点;
+                synchronized (this.pullConsumer) {
+                    if (System.currentTimeMillis() - lastCommit >= commitInternalMs || isStopped) {
+                        lastCommit = System.currentTimeMillis();
+                        //向broker提交消费位点,todo 从consumer那里拿不到正在消费哪些messageQueue
+                        commit(this.delegator.getLastDivided());
+                    }
+                }
+            }
+        }
+
+        public void shutdown() {
+            this.isStopped = true;
+        }
     }
 
     public String getTags() {
@@ -376,31 +341,45 @@
         this.tags = tags;
     }
 
-    public DefaultMQPushConsumer getConsumer() {
-        return consumer;
+    public Long getPullTimeout() {
+        return pullTimeout;
     }
 
-    public Long getPullIntervalMs() {
-        return pullIntervalMs;
+    public void setPullTimeout(Long pullTimeout) {
+        this.pullTimeout = pullTimeout;
     }
 
-    public void setPullIntervalMs(Long pullIntervalMs) {
-        this.pullIntervalMs = pullIntervalMs;
+    public String getStrategyName() {
+        return strategyName;
     }
 
-    public ConsumeFromWhere getConsumeFromWhere() {
-        return consumeFromWhere;
+    public void setStrategyName(String strategyName) {
+        this.strategyName = strategyName;
     }
 
-    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
-        this.consumeFromWhere = consumeFromWhere;
+    public RPCHook getRpcHook() {
+        return rpcHook;
     }
 
-    public String getConsumerOffset() {
-        return consumerOffset;
+    public void setRpcHook(RPCHook rpcHook) {
+        this.rpcHook = rpcHook;
     }
 
-    public void setConsumerOffset(String consumerOffset) {
-        this.consumerOffset = consumerOffset;
+    public int getUserPullThreadNum() {
+        return userPullThreadNum;
     }
+
+    public void setUserPullThreadNum(int userPullThreadNum) {
+        this.userPullThreadNum = userPullThreadNum;
+    }
+
+    public long getCommitInternalMs() {
+        return commitInternalMs;
+    }
+
+    public void setCommitInternalMs(long commitInternalMs) {
+        this.commitInternalMs = commitInternalMs;
+    }
+
+
 }
\ No newline at end of file
diff --git a/rocketmq-streams-checkpoint/pom.xml b/rocketmq-streams-checkpoint/pom.xml
deleted file mode 100644
index 6da59e3..0000000
--- a/rocketmq-streams-checkpoint/pom.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>rocketmq-streams</artifactId>
-        <groupId>org.apache.rocketmq</groupId>
-        <version>1.0.2-preview-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>rocketmq-streams-checkpoint</artifactId>
-    <name>ROCKETMQ STREAMS :: checkpoint</name>
-    <packaging>jar</packaging>
-
-    <properties>
-        <maven.compiler.source>8</maven.compiler.source>
-        <maven.compiler.target>8</maven.compiler.target>
-    </properties>
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-streams-commons</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.google.auto.service</groupId>
-                    <artifactId>auto-service</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-streams-db-operator</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.google.auto.service</groupId>
-                    <artifactId>auto-service</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>com.google.auto.service</groupId>
-            <artifactId>auto-service</artifactId>
-            <optional>true</optional>
-        </dependency>
-    </dependencies>
-
-</project>
\ No newline at end of file
diff --git a/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java b/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
deleted file mode 100644
index 69150ac..0000000
--- a/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.checkpoint.db;
-
-import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.checkpoint.AbstractCheckPointStorage;
-import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
-import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
-import org.apache.rocketmq.streams.common.checkpoint.SourceSnapShot;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
-
-/**
- * @description
- */
-public class DBCheckPointStorage extends AbstractCheckPointStorage {
-
-    static final Log logger = LogFactory.getLog(DBCheckPointStorage.class);
-    static final String STORAGE_NAME = "DB";
-
-    public DBCheckPointStorage() {
-
-    }
-
-    @Override
-    public String getStorageName() {
-        return STORAGE_NAME;
-    }
-
-    @Override
-    public <T> void save(List<T> checkPointState) {
-        logger.info(String.format("save checkpoint size %d", checkPointState.size()));
-        ORMUtil.batchReplaceInto(checkPointState);
-    }
-
-    @Override
-    public void finish() {
-
-    }
-
-    @Override
-    //todo
-    public CheckPoint recover(ISource iSource, String queueId) {
-        String sourceName = CheckPointManager.createSourceName(iSource, null);
-        String key = CheckPointManager.createCheckPointKey(sourceName, queueId);
-        String sql = "select * from source_snap_shot where `key` = " + "'" + key + "';";
-        SourceSnapShot snapShot = ORMUtil.queryForObject(sql, null, SourceSnapShot.class);
-
-        logger.info(String.format("checkpoint recover key is %s, sql is %s, recover sourceSnapShot : %s", key, sql, snapShot == null ? "null snapShot" : snapShot.toString()));
-        return new CheckPoint().fromSnapShot(snapShot);
-    }
-}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
index 3b85e19..65ac261 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
@@ -141,7 +141,7 @@
                 value2StoreMap.put(groupValue, storeKey);
             }
 
-            List<WindowBaseValue> windowBaseValue =new ArrayList<>();
+            List<WindowBaseValue> windowBaseValue = new ArrayList<>();
 
             RocksdbIterator<WindowBaseValue> rocksdbIterator = storage.getWindowBaseValue(instance.getSplitId(),
                     instance.getWindowInstanceId(), WindowType.SESSION_WINDOW, null);
@@ -423,10 +423,7 @@
                 }
 
             }
-
-
             doFire(queueId, windowInstance, toFireValueList, currentFireTime, nextFireTime);
-
             return toFireValueList.size();
         }
 
@@ -445,8 +442,10 @@
         return false;
     }
 
+
     private void doFire(String queueId, WindowInstance instance, List<WindowValue> valueList, Long currentFireTime,
                         Long nextFireTime) {
+
         if (CollectionUtil.isEmpty(valueList)) {
             return;
         }