Support query message by page (#688)

diff --git a/rocketmq-console/pom.xml b/rocketmq-console/pom.xml
index d454166..5cad525 100644
--- a/rocketmq-console/pom.xml
+++ b/rocketmq-console/pom.xml
@@ -58,7 +58,7 @@
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
 
-        <guava.version>16.0.1</guava.version>
+        <guava.version>29.0-jre</guava.version>
         <commons-digester.version>2.1</commons-digester.version>
         <commons-lang.version>2.6</commons-lang.version>
         <commons-io.version>2.4</commons-io.version>
@@ -83,6 +83,11 @@
             <version>${spring.boot.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.springframework.data</groupId>
+            <artifactId>spring-data-commons</artifactId>
+            <version>${spring.boot.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-test</artifactId>
             <version>${spring.boot.version}</version>
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/MessageController.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/MessageController.java
index dd3cdb8..fa0fe00 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/MessageController.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/MessageController.java
@@ -16,24 +16,28 @@
  */
 package org.apache.rocketmq.console.controller;
 
+import com.google.common.collect.Maps;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-import org.apache.rocketmq.tools.admin.api.MessageTrack;
+import org.apache.rocketmq.console.model.MessagePage;
 import org.apache.rocketmq.console.model.MessageView;
+import org.apache.rocketmq.console.model.request.MessageQuery;
 import org.apache.rocketmq.console.service.MessageService;
 import org.apache.rocketmq.console.util.JsonUtil;
-import com.google.common.collect.Maps;
+import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 
 import javax.annotation.Resource;
 import java.util.List;
 import java.util.Map;
-import org.springframework.web.bind.annotation.ResponseBody;
 
 @Controller
 @RequestMapping("/message")
@@ -52,6 +56,12 @@
         return messageViewMap;
     }
 
+    @PostMapping("/queryMessagePageByTopic.query")
+    @ResponseBody
+    public MessagePage queryMessagePageByTopic(@RequestBody MessageQuery query) {
+        return messageService.queryMessageByPage(query);
+    }
+
     @RequestMapping(value = "/queryMessageByTopicAndKey.query", method = RequestMethod.GET)
     @ResponseBody
     public Object queryMessageByTopicAndKey(@RequestParam String topic, @RequestParam String key) {
@@ -61,15 +71,15 @@
     @RequestMapping(value = "/queryMessageByTopic.query", method = RequestMethod.GET)
     @ResponseBody
     public Object queryMessageByTopic(@RequestParam String topic, @RequestParam long begin,
-        @RequestParam long end) {
+                                      @RequestParam long end) {
         return messageService.queryMessageByTopic(topic, begin, end);
     }
 
     @RequestMapping(value = "/consumeMessageDirectly.do", method = RequestMethod.POST)
     @ResponseBody
     public Object consumeMessageDirectly(@RequestParam String topic, @RequestParam String consumerGroup,
-        @RequestParam String msgId,
-        @RequestParam(required = false) String clientId) {
+                                         @RequestParam String msgId,
+                                         @RequestParam(required = false) String clientId) {
         logger.info("msgId={} consumerGroup={} clientId={}", msgId, consumerGroup, clientId);
         ConsumeMessageDirectlyResult consumeMessageDirectlyResult = messageService.consumeMessageDirectly(topic, msgId, consumerGroup, clientId);
         logger.info("consumeMessageDirectlyResult={}", JsonUtil.obj2String(consumeMessageDirectlyResult));
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessagePage.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessagePage.java
new file mode 100644
index 0000000..2a803d8
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessagePage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+
+import org.springframework.data.domain.Page;
+
+public class MessagePage {
+    private Page<MessageView> page;
+    private String taskId;
+
+    public MessagePage(Page<MessageView> page, String taskId) {
+        this.page = page;
+        this.taskId = taskId;
+    }
+
+    public Page<MessageView> getPage() {
+        return page;
+    }
+
+    public void setPage(Page<MessageView> page) {
+        this.page = page;
+    }
+
+    public String getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(String taskId) {
+        this.taskId = taskId;
+    }
+
+    @Override
+    public String toString() {
+        return "MessagePage{" +
+                "page=" + page +
+                ", taskId='" + taskId + '\'' +
+                '}';
+    }
+}
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessagePageTask.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessagePageTask.java
new file mode 100644
index 0000000..31cc18a
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessagePageTask.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.springframework.data.domain.Page;
+
+import java.util.List;
+
+public class MessagePageTask {
+    private Page<MessageView> page;
+    private List<QueueOffsetInfo> queueOffsetInfos;
+
+    public MessagePageTask(Page<MessageView> page, List<QueueOffsetInfo> queueOffsetInfos) {
+        this.page = page;
+        this.queueOffsetInfos = queueOffsetInfos;
+    }
+
+    public Page<MessageView> getPage() {
+        return page;
+    }
+
+    public void setPage(Page<MessageView> page) {
+        this.page = page;
+    }
+
+    public List<QueueOffsetInfo> getQueueOffsetInfos() {
+        return queueOffsetInfos;
+    }
+
+    public void setQueueOffsetInfos(List<QueueOffsetInfo> queueOffsetInfos) {
+        this.queueOffsetInfos = queueOffsetInfos;
+    }
+
+    @Override
+    public String toString() {
+        return "MessagePageTask{" +
+                "page=" + page +
+                ", queueOffsetInfos=" + queueOffsetInfos +
+                '}';
+    }
+}
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageQueryByPage.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageQueryByPage.java
new file mode 100644
index 0000000..b3370c4
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageQueryByPage.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+
+import org.springframework.data.domain.PageRequest;
+
+public class MessageQueryByPage {
+    public static final int DEFAULT_PAGE = 0;
+
+    public static final int MIN_PAGE_SIZE = 10;
+
+    public static final int MAX_PAGE_SIZE = 100;
+
+    /**
+     * current page num
+     */
+    private int pageNum;
+
+    private int pageSize;
+
+    private String topic;
+    private long begin;
+    private long end;
+
+    public MessageQueryByPage(int pageNum, int pageSize, String topic, long begin, long end) {
+        this.pageNum = pageNum;
+        this.pageSize = pageSize;
+        this.topic = topic;
+        this.begin = begin;
+        this.end = end;
+    }
+
+    public void setPageNum(int pageNum) {
+        this.pageNum = pageNum;
+    }
+
+    public void setPageSize(int pageSize) {
+        this.pageSize = pageSize;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public long getBegin() {
+        return begin;
+    }
+
+    public void setBegin(long begin) {
+        this.begin = begin;
+    }
+
+    public long getEnd() {
+        return end;
+    }
+
+    public void setEnd(long end) {
+        this.end = end;
+    }
+
+    public int getPageNum() {
+        return pageNum <= 0 ? DEFAULT_PAGE : pageNum - 1;
+    }
+
+    public int getPageSize() {
+        if (pageSize <= 1) {
+            return MIN_PAGE_SIZE;
+        } else if (pageSize > MAX_PAGE_SIZE) {
+            return MAX_PAGE_SIZE;
+        }
+        return this.pageSize;
+    }
+
+    public PageRequest page() {
+        return PageRequest.of(this.getPageNum(), this.getPageSize());
+    }
+
+    @Override
+    public String toString() {
+        return "MessageQueryByPage{" +
+                "pageNum=" + pageNum +
+                ", pageSize=" + pageSize +
+                ", topic='" + topic + '\'' +
+                ", begin=" + begin +
+                ", end=" + end +
+                '}';
+    }
+}
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueOffsetInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueOffsetInfo.java
new file mode 100644
index 0000000..0940712
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueOffsetInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class QueueOffsetInfo {
+    private Integer idx;
+
+    private Long start;
+    private Long end;
+
+    private Long startOffset;
+    private Long endOffset;
+    private MessageQueue messageQueues;
+
+    public QueueOffsetInfo() {
+    }
+
+    public QueueOffsetInfo(Integer idx, Long start, Long end, Long startOffset, Long endOffset, MessageQueue messageQueues) {
+        this.idx = idx;
+        this.start = start;
+        this.end = end;
+        this.startOffset = startOffset;
+        this.endOffset = endOffset;
+        this.messageQueues = messageQueues;
+    }
+
+    public Integer getIdx() {
+        return idx;
+    }
+
+    public void setIdx(Integer idx) {
+        this.idx = idx;
+    }
+
+    public Long getStart() {
+        return start;
+    }
+
+    public void setStart(Long start) {
+        this.start = start;
+    }
+
+    public Long getEnd() {
+        return end;
+    }
+
+    public void setEnd(Long end) {
+        this.end = end;
+    }
+
+    public Long getStartOffset() {
+        return startOffset;
+    }
+
+    public void setStartOffset(Long startOffset) {
+        this.startOffset = startOffset;
+    }
+
+    public Long getEndOffset() {
+        return endOffset;
+    }
+
+    public void setEndOffset(Long endOffset) {
+        this.endOffset = endOffset;
+    }
+
+    public MessageQueue getMessageQueues() {
+        return messageQueues;
+    }
+
+    public void setMessageQueues(MessageQueue messageQueues) {
+        this.messageQueues = messageQueues;
+    }
+
+    public void incStartOffset() {
+        this.startOffset++;
+        this.endOffset++;
+    }
+
+    public void incEndOffset() {
+        this.endOffset++;
+    }
+
+    public void incStartOffset(long size) {
+        this.startOffset += size;
+        this.endOffset += size;
+    }
+}
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/MessageQuery.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/MessageQuery.java
new file mode 100644
index 0000000..f78fe09
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/MessageQuery.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+public class MessageQuery {
+    /**
+     * current page num
+     */
+    private int pageNum;
+
+    private int pageSize;
+
+    private String topic;
+
+    private String taskId;
+
+    private long begin;
+
+    private long end;
+
+    public int getPageNum() {
+        return pageNum;
+    }
+
+    public void setPageNum(int pageNum) {
+        this.pageNum = pageNum;
+    }
+
+    public int getPageSize() {
+        return pageSize;
+    }
+
+    public void setPageSize(int pageSize) {
+        this.pageSize = pageSize;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(String taskId) {
+        this.taskId = taskId;
+    }
+
+    public long getBegin() {
+        return begin;
+    }
+
+    public void setBegin(long begin) {
+        this.begin = begin;
+    }
+
+    public long getEnd() {
+        return end;
+    }
+
+    public void setEnd(long end) {
+        this.end = end;
+    }
+}
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java
index e56b4d8..af8d6fb 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java
@@ -20,6 +20,8 @@
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.console.model.MessagePage;
+import org.apache.rocketmq.console.model.request.MessageQuery;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.apache.rocketmq.console.model.MessageView;
 
@@ -48,4 +50,10 @@
     ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup,
         String clientId);
 
+
+    MessagePage queryMessageByPage(MessageQuery query);
+
+
+
+
 }
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
index 34d3994..a7961b4 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
@@ -21,21 +21,19 @@
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.List;
-import java.util.Set;
-import javax.annotation.Resource;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.Connection;
@@ -43,7 +41,12 @@
 import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.console.config.RMQConfigure;
 import org.apache.rocketmq.console.exception.ServiceException;
+import org.apache.rocketmq.console.model.QueueOffsetInfo;
 import org.apache.rocketmq.console.model.MessageView;
+import org.apache.rocketmq.console.model.MessagePage;
+import org.apache.rocketmq.console.model.MessagePageTask;
+import org.apache.rocketmq.console.model.MessageQueryByPage;
+import org.apache.rocketmq.console.model.request.MessageQuery;
 import org.apache.rocketmq.console.service.MessageService;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.tools.admin.MQAdminExt;
@@ -51,13 +54,31 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.Page;
+import org.springframework.data.domain.PageImpl;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.Resource;
+import java.util.Collections;
+import java.util.List;
+import java.util.Comparator;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.Collection;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
 @Service
 public class MessageServiceImpl implements MessageService {
 
     private Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
 
+    private static final Cache<String, List<QueueOffsetInfo>> CACHE = CacheBuilder.newBuilder()
+            .maximumSize(10000)
+            .expireAfterWrite(60, TimeUnit.MINUTES)
+            .build();
+
     @Autowired
     private RMQConfigure configure;
     /**
@@ -75,8 +96,7 @@
             MessageExt messageExt = mqAdminExt.viewMessage(subject, msgId);
             List<MessageTrack> messageTrackList = messageTrackDetail(messageExt);
             return new Pair<>(MessageView.fromMessageExt(messageExt), messageTrackList);
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             throw new ServiceException(-1, String.format("Failed to query message by Id: %s", msgId));
         }
     }
@@ -90,8 +110,7 @@
                     return MessageView.fromMessageExt(messageExt);
                 }
             });
-        }
-        catch (Exception err) {
+        } catch (Exception err) {
             throw Throwables.propagate(err);
         }
     }
@@ -101,9 +120,9 @@
         boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
         RPCHook rpcHook = null;
         if (isEnableAcl) {
-            rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(),configure.getSecretKey()));
+            rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
         }
-        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP,rpcHook);
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
         List<MessageView> messageViewList = Lists.newArrayList();
         try {
             String subExpression = "*";
@@ -146,8 +165,7 @@
                             case OFFSET_ILLEGAL:
                                 break READQ;
                         }
-                    }
-                    catch (Exception e) {
+                    } catch (Exception e) {
                         break;
                     }
                 }
@@ -162,11 +180,9 @@
                 }
             });
             return messageViewList;
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             throw Throwables.propagate(e);
-        }
-        finally {
+        } finally {
             consumer.shutdown();
         }
     }
@@ -175,8 +191,7 @@
     public List<MessageTrack> messageTrackDetail(MessageExt msg) {
         try {
             return mqAdminExt.messageTrackDetail(msg);
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             logger.error("op=messageTrackDetailError", e);
             return Collections.emptyList();
         }
@@ -185,12 +200,11 @@
 
     @Override
     public ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup,
-        String clientId) {
+                                                               String clientId) {
         if (StringUtils.isNotBlank(clientId)) {
             try {
                 return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 throw Throwables.propagate(e);
             }
         }
@@ -204,12 +218,322 @@
                 logger.info("clientId={}", connection.getClientId());
                 return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId);
             }
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             throw Throwables.propagate(e);
         }
         throw new IllegalStateException("NO CONSUMER");
 
     }
 
+    @Override
+    public MessagePage queryMessageByPage(MessageQuery query) {
+        MessageQueryByPage queryByPage = new MessageQueryByPage(
+                query.getPageNum(),
+                query.getPageSize(),
+                query.getTopic(),
+                query.getBegin(),
+                query.getEnd());
+
+        List<QueueOffsetInfo> queueOffsetInfos = CACHE.getIfPresent(query.getTaskId());
+
+        if (queueOffsetInfos == null) {
+            query.setPageNum(1);
+            MessagePageTask task = this.queryFirstMessagePage(queryByPage);
+            String taskId = MessageClientIDSetter.createUniqID();
+            CACHE.put(taskId, task.getQueueOffsetInfos());
+
+            return new MessagePage(task.getPage(), taskId);
+        }
+        Page<MessageView> messageViews = queryMessageByTaskPage(queryByPage, queueOffsetInfos);
+        return new MessagePage(messageViews, query.getTaskId());
+
+    }
+
+    private MessagePageTask queryFirstMessagePage(MessageQueryByPage query) {
+        boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
+        RPCHook rpcHook = null;
+        if (isEnableAcl) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
+        }
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
+
+        long total = 0;
+        List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>();
+
+        List<MessageView> messageViews = new ArrayList<>();
+
+        try {
+            consumer.start();
+            Collection<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(query.getTopic());
+            int idx = 0;
+            for (MessageQueue messageQueue : messageQueues) {
+                Long minOffset = consumer.searchOffset(messageQueue, query.getBegin());
+                Long maxOffset = consumer.searchOffset(messageQueue, query.getEnd()) + 1;
+                queueOffsetInfos.add(new QueueOffsetInfo(idx++, minOffset, maxOffset, minOffset, minOffset, messageQueue));
+            }
+
+            // check first offset has message
+            // filter the begin time
+            for (QueueOffsetInfo queueOffset : queueOffsetInfos) {
+                Long start = queueOffset.getStart();
+                boolean hasData = false;
+                boolean hasIllegalOffset = true;
+                while (hasIllegalOffset) {
+                    PullResult pullResult = consumer.pull(queueOffset.getMessageQueues(), "*", start, 32);
+                    if (pullResult.getPullStatus() == PullStatus.FOUND) {
+                        hasData = true;
+                        List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
+                        for (MessageExt messageExt : msgFoundList) {
+                            if (messageExt.getStoreTimestamp() < query.getBegin()) {
+                                start++;
+                            } else {
+                                hasIllegalOffset = false;
+                                break;
+                            }
+                        }
+                    } else {
+                        hasIllegalOffset = false;
+                    }
+                }
+                if (!hasData) {
+                    queueOffset.setEnd(queueOffset.getStart());
+                }
+                queueOffset.setStart(start);
+                queueOffset.setStartOffset(start);
+                queueOffset.setEndOffset(start);
+            }
+
+            // filter the end time
+            for (QueueOffsetInfo queueOffset : queueOffsetInfos) {
+                if (queueOffset.getStart().equals(queueOffset.getEnd())) {
+                    continue;
+                }
+                long end = queueOffset.getEnd();
+                long pullOffset = end;
+                int pullSize = 32;
+                boolean hasIllegalOffset = true;
+                while (hasIllegalOffset) {
+
+                    if (pullOffset - pullSize > queueOffset.getStart()) {
+                        pullOffset = pullOffset - pullSize;
+                    } else {
+                        pullOffset = queueOffset.getStartOffset();
+                        pullSize = (int) (end - pullOffset);
+                    }
+                    PullResult pullResult = consumer.pull(queueOffset.getMessageQueues(), "*", pullOffset, pullSize);
+                    if (pullResult.getPullStatus() == PullStatus.FOUND) {
+                        List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
+                        for (int i = msgFoundList.size() - 1; i >= 0; i--) {
+                            MessageExt messageExt = msgFoundList.get(i);
+                            if (messageExt.getStoreTimestamp() < query.getBegin()) {
+                                end--;
+                            } else {
+                                hasIllegalOffset = false;
+                                break;
+                            }
+                        }
+                    } else {
+                        hasIllegalOffset = false;
+                    }
+                    if (pullOffset == queueOffset.getStartOffset()) {
+                        break;
+                    }
+                }
+                queueOffset.setEnd(end);
+                total += queueOffset.getEnd() - queueOffset.getStart();
+            }
+
+            long pageSize = total > query.getPageSize() ? query.getPageSize() : total;
+
+
+            // move startOffset
+            int next = moveStartOffset(queueOffsetInfos, query);
+            moveEndOffset(queueOffsetInfos, query, next);
+
+            // find the first page of message
+            for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
+                Long start = queueOffsetInfo.getStartOffset();
+                Long end = queueOffsetInfo.getEndOffset();
+                long size = Math.min(end - start, pageSize);
+                if (size == 0) {
+                    continue;
+                }
+
+                while (size > 0) {
+                    PullResult pullResult = consumer.pull(queueOffsetInfo.getMessageQueues(), "*", start, 32);
+                    if (pullResult.getPullStatus() == PullStatus.FOUND) {
+                        List<MessageExt> poll = pullResult.getMsgFoundList();
+                        if (poll.size() == 0) {
+                            break;
+                        }
+                        List<MessageView> collect = poll.stream()
+                                .map(MessageView::fromMessageExt).collect(Collectors.toList());
+
+                        for (MessageView view : collect) {
+                            if (size > 0) {
+                                messageViews.add(view);
+                                size--;
+                            }
+                        }
+                    } else {
+                        break;
+                    }
+
+                }
+            }
+            PageImpl<MessageView> page = new PageImpl<>(messageViews, query.page(), total);
+            return new MessagePageTask(page, queueOffsetInfos);
+        } catch (Exception e) {
+            throw Throwables.propagate(e);
+        } finally {
+            consumer.shutdown();
+        }
+    }
+
+    private Page<MessageView> queryMessageByTaskPage(MessageQueryByPage query, List<QueueOffsetInfo> queueOffsetInfos) {
+        boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
+        RPCHook rpcHook = null;
+        if (isEnableAcl) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
+        }
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
+        List<MessageView> messageViews = new ArrayList<>();
+
+        long offset = query.getPageNum() * query.getPageSize();
+
+        long total = 0;
+        try {
+            consumer.start();
+            for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
+                long start = queueOffsetInfo.getStart();
+                long end = queueOffsetInfo.getEnd();
+                queueOffsetInfo.setStartOffset(start);
+                queueOffsetInfo.setEndOffset(start);
+                total += end - start;
+            }
+            if (total <= offset) {
+                return Page.empty();
+            }
+            long pageSize = total - offset > query.getPageSize() ? query.getPageSize() : total - offset;
+
+            int next = moveStartOffset(queueOffsetInfos, query);
+            moveEndOffset(queueOffsetInfos, query, next);
+
+            for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
+                Long start = queueOffsetInfo.getStartOffset();
+                Long end = queueOffsetInfo.getEndOffset();
+                long size = Math.min(end - start, pageSize);
+                if (size == 0) {
+                    continue;
+                }
+
+                while (size > 0) {
+                    PullResult pullResult = consumer.pull(queueOffsetInfo.getMessageQueues(), "*", start, 32);
+                    if (pullResult.getPullStatus() == PullStatus.FOUND) {
+                        List<MessageExt> poll = pullResult.getMsgFoundList();
+                        if (poll.size() == 0) {
+                            break;
+                        }
+                        List<MessageView> collect = poll.stream()
+                                .map(MessageView::fromMessageExt).collect(Collectors.toList());
+
+                        for (MessageView view : collect) {
+                            if (size > 0) {
+                                messageViews.add(view);
+                                size--;
+                            }
+                        }
+                    } else {
+                        break;
+                    }
+
+                }
+            }
+            return new PageImpl<>(messageViews, query.page(), total);
+        } catch (Exception e) {
+            throw Throwables.propagate(e);
+        } finally {
+            consumer.shutdown();
+        }
+    }
+
+    private int moveStartOffset(List<QueueOffsetInfo> queueOffsets, MessageQueryByPage query) {
+        int size = queueOffsets.size();
+        int next = 0;
+        long offset = query.getPageNum() * query.getPageSize();
+        if (offset == 0) {
+            return next;
+        }
+        // sort by queueOffset size
+        List<QueueOffsetInfo> orderQueue = queueOffsets
+                .stream()
+                .sorted((o1, o2) -> {
+                    long size1 = o1.getEnd() - o1.getStart();
+                    long size2 = o2.getEnd() - o2.getStart();
+                    if (size1 < size2) {
+                        return -1;
+                    } else if (size1 > size2) {
+                        return 1;
+                    }
+                    return 0;
+                }).collect(Collectors.toList());
+
+        // Take the smallest one each time
+        for (int i = 0; i < size && offset >= (size - i); i++) {
+            long minSize = orderQueue.get(i).getEnd() - orderQueue.get(i).getStartOffset();
+            if (minSize == 0) {
+                continue;
+            }
+            long reduce = minSize * (size - i);
+            if (reduce <= offset) {
+                offset -= reduce;
+                for (int j = i; j < size; j++) {
+                    orderQueue.get(j).incStartOffset(minSize);
+                }
+            } else {
+                long addOffset = offset / (size - i);
+                offset -= addOffset * (size - i);
+                if (addOffset != 0) {
+                    for (int j = i; j < size; j++) {
+                        orderQueue.get(j).incStartOffset(addOffset);
+                    }
+                }
+            }
+        }
+        for (QueueOffsetInfo info : orderQueue) {
+            QueueOffsetInfo queueOffsetInfo = queueOffsets.get(info.getIdx());
+            queueOffsetInfo.setStartOffset(info.getStartOffset());
+            queueOffsetInfo.setEndOffset(info.getEndOffset());
+        }
+
+        for (QueueOffsetInfo info : queueOffsets) {
+            if (offset == 0) {
+                break;
+            }
+            next = (next + 1) % size;
+            if (info.getStartOffset() < info.getEnd()) {
+                info.incStartOffset();
+                --offset;
+            }
+        }
+        return next;
+    }
+
+    private void moveEndOffset(List<QueueOffsetInfo> queueOffsets, MessageQueryByPage query, int next) {
+        int size = queueOffsets.size();
+        for (int j = 0; j < query.getPageSize(); j++) {
+            QueueOffsetInfo nextQueueOffset = queueOffsets.get(next);
+            next = (next + 1) % size;
+            int start = next;
+            while (nextQueueOffset.getEndOffset() >= nextQueueOffset.getEnd()) {
+                nextQueueOffset = queueOffsets.get(next);
+                next = (next + 1) % size;
+                if (start == next) {
+                    return;
+                }
+            }
+            nextQueueOffset.incEndOffset();
+        }
+    }
+
 }
diff --git a/rocketmq-console/src/main/resources/static/src/message.js b/rocketmq-console/src/main/resources/static/src/message.js
index 0c0aefb..d2de6d9 100644
--- a/rocketmq-console/src/main/resources/static/src/message.js
+++ b/rocketmq-console/src/main/resources/static/src/message.js
@@ -43,6 +43,8 @@
     $scope.timepickerEnd = moment().add(1,'hour').format('YYYY-MM-DD HH:mm');
     $scope.timepickerOptions ={format: 'YYYY-MM-DD HH:mm', showClear: true};
 
+    $scope.taskId = "";
+
     $scope.paginationConf = {
         currentPage: 1,
         totalItems: 0,
@@ -51,10 +53,44 @@
         perPageOptions: [10],
         rememberPerPage: 'perPageItems',
         onChange: function () {
-            $scope.changeShowMessageList(this.currentPage,this.totalItems);
+            $scope.queryMessagePageByTopic()
         }
     };
 
+    $scope.queryMessagePageByTopic = function () {
+        if ($scope.timepickerEnd < $scope.timepickerBegin) {
+            Notification.error({message: "endTime is later than beginTime!", delay: 2000});
+            return
+        }
+        if( $scope.selectedTopic === [] || (typeof $scope.selectedTopic) == "object"){
+            return
+        }
+        $http({
+            method: "POST",
+            url: "message/queryMessagePageByTopic.query",
+            data: {
+                topic: $scope.selectedTopic,
+                begin: $scope.timepickerBegin.valueOf(),
+                end: $scope.timepickerEnd.valueOf(),
+                pageNum: $scope.paginationConf.currentPage,
+                pageSize: $scope.paginationConf.itemsPerPage,
+                taskId: $scope.taskId
+            }
+        }).success(function (resp) {
+            if (resp.status === 0) {
+                console.log(resp);
+                $scope.messageShowList = resp.data.page.content;
+                if(resp.data.page.first){
+                    $scope.paginationConf.currentPage = 1;
+                }
+                $scope.paginationConf.currentPage = resp.data.page.number + 1;
+                $scope.paginationConf.totalItems = resp.data.page.totalElements;
+                $scope.taskId = resp.data.taskId
+            }else {
+                Notification.error({message: resp.errMsg, delay: 2000});
+            }
+        });
+    }
 
     $scope.queryMessageByTopic = function () {
         console.log($scope.selectedTopic);
@@ -153,7 +189,6 @@
         });
     };
 
-
     $scope.changeShowMessageList = function (currentPage,totalItem) {
         var perPage = $scope.paginationConf.itemsPerPage;
         var from = (currentPage - 1) * perPage;
@@ -161,6 +196,13 @@
         $scope.messageShowList = $scope.queryMessageByTopicResult.slice(from, to);
         $scope.paginationConf.totalItems = totalItem ;
     };
+
+    $scope.onChangeQueryCondition = function (){
+        console.log("change")
+        $scope.taskId = "";
+        $scope.paginationConf.currentPage = 1;
+        $scope.paginationConf.totalItems = 0;
+    }
 }]);
 
 module.controller('messageDetailViewDialogController',['$scope', 'ngDialog', '$http','Notification', function ($scope, ngDialog, $http,Notification) {
diff --git a/rocketmq-console/src/main/resources/static/view/pages/message.html b/rocketmq-console/src/main/resources/static/view/pages/message.html
index f8bf3c8..8cac242 100644
--- a/rocketmq-console/src/main/resources/static/view/pages/message.html
+++ b/rocketmq-console/src/main/resources/static/view/pages/message.html
@@ -21,7 +21,7 @@
                 <md-tabs md-dynamic-height="" md-border-bottom="">
                     <md-tab label="Topic">
                         <md-content class="md-padding" style="min-height:600px">
-                            <h5 class="md-display-5">Only Return 2000 Messages</h5>
+                            <h5 class="md-display-5">Total {{paginationConf.totalItems}} Messages</h5>
                             <div class="row">
                                 <form class="form-inline pull-left col-sm-12">
                                     <div class="form-group">
@@ -32,6 +32,7 @@
                                             <select name="mySelectTopic" chosen
                                                     ng-model="selectedTopic"
                                                     ng-options="item for item in allTopicList"
+                                                    ng-change="onChangeQueryCondition()"
                                                     required>
                                                 <option value=""></option>
                                             </select>
@@ -40,7 +41,7 @@
                                     <div class="form-group ">
                                         <label>{{'BEGIN' | translate}}:</label>
                                         <div class="input-group">
-                                            <input class="form-control" datetimepicker ng-model="timepickerBegin"
+                                            <input class="form-control" datetimepicker ng-change="onChangeQueryCondition()" ng-model="timepickerBegin"
                                                    options="timepickerOptions"/>
                                             <span class="input-group-addon"><span
                                                     class="glyphicon glyphicon-calendar"></span></span>
@@ -49,7 +50,7 @@
                                     <div class="form-group">
                                         <label>{{'END' | translate}}:</label>
                                         <div class="input-group">
-                                            <input class="form-control" datetimepicker ng-model="timepickerEnd"
+                                            <input class="form-control" datetimepicker ng-change="onChangeQueryCondition()" ng-model="timepickerEnd"
                                                    options="timepickerOptions"/>
                                             <span class="input-group-addon"><span
                                                     class="glyphicon glyphicon-calendar"></span></span>
@@ -58,7 +59,7 @@
                                     <button id="searchAppsButton" type="button"
                                             class="btn btn-raised btn-sm  btn-primary"
                                             data-toggle="modal"
-                                            ng-click="queryMessageByTopic()">
+                                            ng-click="queryMessagePageByTopic()">
                                         <span class="glyphicon glyphicon-search"></span>{{ 'SEARCH' | translate}}
                                     </button>
                                 </form>