Support query message by page (#688)
diff --git a/rocketmq-console/pom.xml b/rocketmq-console/pom.xml
index d454166..5cad525 100644
--- a/rocketmq-console/pom.xml
+++ b/rocketmq-console/pom.xml
@@ -58,7 +58,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
- <guava.version>16.0.1</guava.version>
+ <guava.version>29.0-jre</guava.version>
<commons-digester.version>2.1</commons-digester.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-io.version>2.4</commons-io.version>
@@ -83,6 +83,11 @@
<version>${spring.boot.version}</version>
</dependency>
<dependency>
+ <groupId>org.springframework.data</groupId>
+ <artifactId>spring-data-commons</artifactId>
+ <version>${spring.boot.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring.boot.version}</version>
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/MessageController.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/MessageController.java
index dd3cdb8..fa0fe00 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/MessageController.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/MessageController.java
@@ -16,24 +16,28 @@
*/
package org.apache.rocketmq.console.controller;
+import com.google.common.collect.Maps;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-import org.apache.rocketmq.tools.admin.api.MessageTrack;
+import org.apache.rocketmq.console.model.MessagePage;
import org.apache.rocketmq.console.model.MessageView;
+import org.apache.rocketmq.console.model.request.MessageQuery;
import org.apache.rocketmq.console.service.MessageService;
import org.apache.rocketmq.console.util.JsonUtil;
-import com.google.common.collect.Maps;
+import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
-import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequestMapping("/message")
@@ -52,6 +56,12 @@
return messageViewMap;
}
+ @PostMapping("/queryMessagePageByTopic.query")
+ @ResponseBody
+ public MessagePage queryMessagePageByTopic(@RequestBody MessageQuery query) {
+ return messageService.queryMessageByPage(query);
+ }
+
@RequestMapping(value = "/queryMessageByTopicAndKey.query", method = RequestMethod.GET)
@ResponseBody
public Object queryMessageByTopicAndKey(@RequestParam String topic, @RequestParam String key) {
@@ -61,15 +71,15 @@
@RequestMapping(value = "/queryMessageByTopic.query", method = RequestMethod.GET)
@ResponseBody
public Object queryMessageByTopic(@RequestParam String topic, @RequestParam long begin,
- @RequestParam long end) {
+ @RequestParam long end) {
return messageService.queryMessageByTopic(topic, begin, end);
}
@RequestMapping(value = "/consumeMessageDirectly.do", method = RequestMethod.POST)
@ResponseBody
public Object consumeMessageDirectly(@RequestParam String topic, @RequestParam String consumerGroup,
- @RequestParam String msgId,
- @RequestParam(required = false) String clientId) {
+ @RequestParam String msgId,
+ @RequestParam(required = false) String clientId) {
logger.info("msgId={} consumerGroup={} clientId={}", msgId, consumerGroup, clientId);
ConsumeMessageDirectlyResult consumeMessageDirectlyResult = messageService.consumeMessageDirectly(topic, msgId, consumerGroup, clientId);
logger.info("consumeMessageDirectlyResult={}", JsonUtil.obj2String(consumeMessageDirectlyResult));
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessagePage.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessagePage.java
new file mode 100644
index 0000000..2a803d8
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessagePage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+
+import org.springframework.data.domain.Page;
+
+public class MessagePage {
+ private Page<MessageView> page;
+ private String taskId;
+
+ public MessagePage(Page<MessageView> page, String taskId) {
+ this.page = page;
+ this.taskId = taskId;
+ }
+
+ public Page<MessageView> getPage() {
+ return page;
+ }
+
+ public void setPage(Page<MessageView> page) {
+ this.page = page;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+
+ @Override
+ public String toString() {
+ return "MessagePage{" +
+ "page=" + page +
+ ", taskId='" + taskId + '\'' +
+ '}';
+ }
+}
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessagePageTask.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessagePageTask.java
new file mode 100644
index 0000000..31cc18a
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessagePageTask.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.springframework.data.domain.Page;
+
+import java.util.List;
+
+public class MessagePageTask {
+ private Page<MessageView> page;
+ private List<QueueOffsetInfo> queueOffsetInfos;
+
+ public MessagePageTask(Page<MessageView> page, List<QueueOffsetInfo> queueOffsetInfos) {
+ this.page = page;
+ this.queueOffsetInfos = queueOffsetInfos;
+ }
+
+ public Page<MessageView> getPage() {
+ return page;
+ }
+
+ public void setPage(Page<MessageView> page) {
+ this.page = page;
+ }
+
+ public List<QueueOffsetInfo> getQueueOffsetInfos() {
+ return queueOffsetInfos;
+ }
+
+ public void setQueueOffsetInfos(List<QueueOffsetInfo> queueOffsetInfos) {
+ this.queueOffsetInfos = queueOffsetInfos;
+ }
+
+ @Override
+ public String toString() {
+ return "MessagePageTask{" +
+ "page=" + page +
+ ", queueOffsetInfos=" + queueOffsetInfos +
+ '}';
+ }
+}
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageQueryByPage.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageQueryByPage.java
new file mode 100644
index 0000000..b3370c4
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageQueryByPage.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+
+import org.springframework.data.domain.PageRequest;
+
+public class MessageQueryByPage {
+ public static final int DEFAULT_PAGE = 0;
+
+ public static final int MIN_PAGE_SIZE = 10;
+
+ public static final int MAX_PAGE_SIZE = 100;
+
+ /**
+ * current page num
+ */
+ private int pageNum;
+
+ private int pageSize;
+
+ private String topic;
+ private long begin;
+ private long end;
+
+ public MessageQueryByPage(int pageNum, int pageSize, String topic, long begin, long end) {
+ this.pageNum = pageNum;
+ this.pageSize = pageSize;
+ this.topic = topic;
+ this.begin = begin;
+ this.end = end;
+ }
+
+ public void setPageNum(int pageNum) {
+ this.pageNum = pageNum;
+ }
+
+ public void setPageSize(int pageSize) {
+ this.pageSize = pageSize;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public long getBegin() {
+ return begin;
+ }
+
+ public void setBegin(long begin) {
+ this.begin = begin;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+
+ public void setEnd(long end) {
+ this.end = end;
+ }
+
+ public int getPageNum() {
+ return pageNum <= 0 ? DEFAULT_PAGE : pageNum - 1;
+ }
+
+ public int getPageSize() {
+ if (pageSize <= 1) {
+ return MIN_PAGE_SIZE;
+ } else if (pageSize > MAX_PAGE_SIZE) {
+ return MAX_PAGE_SIZE;
+ }
+ return this.pageSize;
+ }
+
+ public PageRequest page() {
+ return PageRequest.of(this.getPageNum(), this.getPageSize());
+ }
+
+ @Override
+ public String toString() {
+ return "MessageQueryByPage{" +
+ "pageNum=" + pageNum +
+ ", pageSize=" + pageSize +
+ ", topic='" + topic + '\'' +
+ ", begin=" + begin +
+ ", end=" + end +
+ '}';
+ }
+}
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueOffsetInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueOffsetInfo.java
new file mode 100644
index 0000000..0940712
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueOffsetInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class QueueOffsetInfo {
+ private Integer idx;
+
+ private Long start;
+ private Long end;
+
+ private Long startOffset;
+ private Long endOffset;
+ private MessageQueue messageQueues;
+
+ public QueueOffsetInfo() {
+ }
+
+ public QueueOffsetInfo(Integer idx, Long start, Long end, Long startOffset, Long endOffset, MessageQueue messageQueues) {
+ this.idx = idx;
+ this.start = start;
+ this.end = end;
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ this.messageQueues = messageQueues;
+ }
+
+ public Integer getIdx() {
+ return idx;
+ }
+
+ public void setIdx(Integer idx) {
+ this.idx = idx;
+ }
+
+ public Long getStart() {
+ return start;
+ }
+
+ public void setStart(Long start) {
+ this.start = start;
+ }
+
+ public Long getEnd() {
+ return end;
+ }
+
+ public void setEnd(Long end) {
+ this.end = end;
+ }
+
+ public Long getStartOffset() {
+ return startOffset;
+ }
+
+ public void setStartOffset(Long startOffset) {
+ this.startOffset = startOffset;
+ }
+
+ public Long getEndOffset() {
+ return endOffset;
+ }
+
+ public void setEndOffset(Long endOffset) {
+ this.endOffset = endOffset;
+ }
+
+ public MessageQueue getMessageQueues() {
+ return messageQueues;
+ }
+
+ public void setMessageQueues(MessageQueue messageQueues) {
+ this.messageQueues = messageQueues;
+ }
+
+ public void incStartOffset() {
+ this.startOffset++;
+ this.endOffset++;
+ }
+
+ public void incEndOffset() {
+ this.endOffset++;
+ }
+
+ public void incStartOffset(long size) {
+ this.startOffset += size;
+ this.endOffset += size;
+ }
+}
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/MessageQuery.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/MessageQuery.java
new file mode 100644
index 0000000..f78fe09
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/MessageQuery.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+public class MessageQuery {
+ /**
+ * current page num
+ */
+ private int pageNum;
+
+ private int pageSize;
+
+ private String topic;
+
+ private String taskId;
+
+ private long begin;
+
+ private long end;
+
+ public int getPageNum() {
+ return pageNum;
+ }
+
+ public void setPageNum(int pageNum) {
+ this.pageNum = pageNum;
+ }
+
+ public int getPageSize() {
+ return pageSize;
+ }
+
+ public void setPageSize(int pageSize) {
+ this.pageSize = pageSize;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+
+ public long getBegin() {
+ return begin;
+ }
+
+ public void setBegin(long begin) {
+ this.begin = begin;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+
+ public void setEnd(long end) {
+ this.end = end;
+ }
+}
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java
index e56b4d8..af8d6fb 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java
@@ -20,6 +20,8 @@
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.console.model.MessagePage;
+import org.apache.rocketmq.console.model.request.MessageQuery;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.console.model.MessageView;
@@ -48,4 +50,10 @@
ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup,
String clientId);
+
+ MessagePage queryMessageByPage(MessageQuery query);
+
+
+
+
}
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
index 34d3994..a7961b4 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
@@ -21,21 +21,19 @@
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.List;
-import java.util.Set;
-import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.Connection;
@@ -43,7 +41,12 @@
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.console.config.RMQConfigure;
import org.apache.rocketmq.console.exception.ServiceException;
+import org.apache.rocketmq.console.model.QueueOffsetInfo;
import org.apache.rocketmq.console.model.MessageView;
+import org.apache.rocketmq.console.model.MessagePage;
+import org.apache.rocketmq.console.model.MessagePageTask;
+import org.apache.rocketmq.console.model.MessageQueryByPage;
+import org.apache.rocketmq.console.model.request.MessageQuery;
import org.apache.rocketmq.console.service.MessageService;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.MQAdminExt;
@@ -51,13 +54,31 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.Page;
+import org.springframework.data.domain.PageImpl;
import org.springframework.stereotype.Service;
+import javax.annotation.Resource;
+import java.util.Collections;
+import java.util.List;
+import java.util.Comparator;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.Collection;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
@Service
public class MessageServiceImpl implements MessageService {
private Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
+ private static final Cache<String, List<QueueOffsetInfo>> CACHE = CacheBuilder.newBuilder()
+ .maximumSize(10000)
+ .expireAfterWrite(60, TimeUnit.MINUTES)
+ .build();
+
@Autowired
private RMQConfigure configure;
/**
@@ -75,8 +96,7 @@
MessageExt messageExt = mqAdminExt.viewMessage(subject, msgId);
List<MessageTrack> messageTrackList = messageTrackDetail(messageExt);
return new Pair<>(MessageView.fromMessageExt(messageExt), messageTrackList);
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new ServiceException(-1, String.format("Failed to query message by Id: %s", msgId));
}
}
@@ -90,8 +110,7 @@
return MessageView.fromMessageExt(messageExt);
}
});
- }
- catch (Exception err) {
+ } catch (Exception err) {
throw Throwables.propagate(err);
}
}
@@ -101,9 +120,9 @@
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
RPCHook rpcHook = null;
if (isEnableAcl) {
- rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(),configure.getSecretKey()));
+ rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
- DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP,rpcHook);
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
List<MessageView> messageViewList = Lists.newArrayList();
try {
String subExpression = "*";
@@ -146,8 +165,7 @@
case OFFSET_ILLEGAL:
break READQ;
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
break;
}
}
@@ -162,11 +180,9 @@
}
});
return messageViewList;
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw Throwables.propagate(e);
- }
- finally {
+ } finally {
consumer.shutdown();
}
}
@@ -175,8 +191,7 @@
public List<MessageTrack> messageTrackDetail(MessageExt msg) {
try {
return mqAdminExt.messageTrackDetail(msg);
- }
- catch (Exception e) {
+ } catch (Exception e) {
logger.error("op=messageTrackDetailError", e);
return Collections.emptyList();
}
@@ -185,12 +200,11 @@
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup,
- String clientId) {
+ String clientId) {
if (StringUtils.isNotBlank(clientId)) {
try {
return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw Throwables.propagate(e);
}
}
@@ -204,12 +218,322 @@
logger.info("clientId={}", connection.getClientId());
return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId);
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw Throwables.propagate(e);
}
throw new IllegalStateException("NO CONSUMER");
}
+ @Override
+ public MessagePage queryMessageByPage(MessageQuery query) {
+ MessageQueryByPage queryByPage = new MessageQueryByPage(
+ query.getPageNum(),
+ query.getPageSize(),
+ query.getTopic(),
+ query.getBegin(),
+ query.getEnd());
+
+ List<QueueOffsetInfo> queueOffsetInfos = CACHE.getIfPresent(query.getTaskId());
+
+ if (queueOffsetInfos == null) {
+ query.setPageNum(1);
+ MessagePageTask task = this.queryFirstMessagePage(queryByPage);
+ String taskId = MessageClientIDSetter.createUniqID();
+ CACHE.put(taskId, task.getQueueOffsetInfos());
+
+ return new MessagePage(task.getPage(), taskId);
+ }
+ Page<MessageView> messageViews = queryMessageByTaskPage(queryByPage, queueOffsetInfos);
+ return new MessagePage(messageViews, query.getTaskId());
+
+ }
+
+ private MessagePageTask queryFirstMessagePage(MessageQueryByPage query) {
+ boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
+ RPCHook rpcHook = null;
+ if (isEnableAcl) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
+ }
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
+
+ long total = 0;
+ List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>();
+
+ List<MessageView> messageViews = new ArrayList<>();
+
+ try {
+ consumer.start();
+ Collection<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(query.getTopic());
+ int idx = 0;
+ for (MessageQueue messageQueue : messageQueues) {
+ Long minOffset = consumer.searchOffset(messageQueue, query.getBegin());
+ Long maxOffset = consumer.searchOffset(messageQueue, query.getEnd()) + 1;
+ queueOffsetInfos.add(new QueueOffsetInfo(idx++, minOffset, maxOffset, minOffset, minOffset, messageQueue));
+ }
+
+ // check first offset has message
+ // filter the begin time
+ for (QueueOffsetInfo queueOffset : queueOffsetInfos) {
+ Long start = queueOffset.getStart();
+ boolean hasData = false;
+ boolean hasIllegalOffset = true;
+ while (hasIllegalOffset) {
+ PullResult pullResult = consumer.pull(queueOffset.getMessageQueues(), "*", start, 32);
+ if (pullResult.getPullStatus() == PullStatus.FOUND) {
+ hasData = true;
+ List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
+ for (MessageExt messageExt : msgFoundList) {
+ if (messageExt.getStoreTimestamp() < query.getBegin()) {
+ start++;
+ } else {
+ hasIllegalOffset = false;
+ break;
+ }
+ }
+ } else {
+ hasIllegalOffset = false;
+ }
+ }
+ if (!hasData) {
+ queueOffset.setEnd(queueOffset.getStart());
+ }
+ queueOffset.setStart(start);
+ queueOffset.setStartOffset(start);
+ queueOffset.setEndOffset(start);
+ }
+
+ // filter the end time
+ for (QueueOffsetInfo queueOffset : queueOffsetInfos) {
+ if (queueOffset.getStart().equals(queueOffset.getEnd())) {
+ continue;
+ }
+ long end = queueOffset.getEnd();
+ long pullOffset = end;
+ int pullSize = 32;
+ boolean hasIllegalOffset = true;
+ while (hasIllegalOffset) {
+
+ if (pullOffset - pullSize > queueOffset.getStart()) {
+ pullOffset = pullOffset - pullSize;
+ } else {
+ pullOffset = queueOffset.getStartOffset();
+ pullSize = (int) (end - pullOffset);
+ }
+ PullResult pullResult = consumer.pull(queueOffset.getMessageQueues(), "*", pullOffset, pullSize);
+ if (pullResult.getPullStatus() == PullStatus.FOUND) {
+ List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
+ for (int i = msgFoundList.size() - 1; i >= 0; i--) {
+ MessageExt messageExt = msgFoundList.get(i);
+ if (messageExt.getStoreTimestamp() < query.getBegin()) {
+ end--;
+ } else {
+ hasIllegalOffset = false;
+ break;
+ }
+ }
+ } else {
+ hasIllegalOffset = false;
+ }
+ if (pullOffset == queueOffset.getStartOffset()) {
+ break;
+ }
+ }
+ queueOffset.setEnd(end);
+ total += queueOffset.getEnd() - queueOffset.getStart();
+ }
+
+ long pageSize = total > query.getPageSize() ? query.getPageSize() : total;
+
+
+ // move startOffset
+ int next = moveStartOffset(queueOffsetInfos, query);
+ moveEndOffset(queueOffsetInfos, query, next);
+
+ // find the first page of message
+ for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
+ Long start = queueOffsetInfo.getStartOffset();
+ Long end = queueOffsetInfo.getEndOffset();
+ long size = Math.min(end - start, pageSize);
+ if (size == 0) {
+ continue;
+ }
+
+ while (size > 0) {
+ PullResult pullResult = consumer.pull(queueOffsetInfo.getMessageQueues(), "*", start, 32);
+ if (pullResult.getPullStatus() == PullStatus.FOUND) {
+ List<MessageExt> poll = pullResult.getMsgFoundList();
+ if (poll.size() == 0) {
+ break;
+ }
+ List<MessageView> collect = poll.stream()
+ .map(MessageView::fromMessageExt).collect(Collectors.toList());
+
+ for (MessageView view : collect) {
+ if (size > 0) {
+ messageViews.add(view);
+ size--;
+ }
+ }
+ } else {
+ break;
+ }
+
+ }
+ }
+ PageImpl<MessageView> page = new PageImpl<>(messageViews, query.page(), total);
+ return new MessagePageTask(page, queueOffsetInfos);
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ } finally {
+ consumer.shutdown();
+ }
+ }
+
+ private Page<MessageView> queryMessageByTaskPage(MessageQueryByPage query, List<QueueOffsetInfo> queueOffsetInfos) {
+ boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
+ RPCHook rpcHook = null;
+ if (isEnableAcl) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
+ }
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
+ List<MessageView> messageViews = new ArrayList<>();
+
+ long offset = query.getPageNum() * query.getPageSize();
+
+ long total = 0;
+ try {
+ consumer.start();
+ for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
+ long start = queueOffsetInfo.getStart();
+ long end = queueOffsetInfo.getEnd();
+ queueOffsetInfo.setStartOffset(start);
+ queueOffsetInfo.setEndOffset(start);
+ total += end - start;
+ }
+ if (total <= offset) {
+ return Page.empty();
+ }
+ long pageSize = total - offset > query.getPageSize() ? query.getPageSize() : total - offset;
+
+ int next = moveStartOffset(queueOffsetInfos, query);
+ moveEndOffset(queueOffsetInfos, query, next);
+
+ for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
+ Long start = queueOffsetInfo.getStartOffset();
+ Long end = queueOffsetInfo.getEndOffset();
+ long size = Math.min(end - start, pageSize);
+ if (size == 0) {
+ continue;
+ }
+
+ while (size > 0) {
+ PullResult pullResult = consumer.pull(queueOffsetInfo.getMessageQueues(), "*", start, 32);
+ if (pullResult.getPullStatus() == PullStatus.FOUND) {
+ List<MessageExt> poll = pullResult.getMsgFoundList();
+ if (poll.size() == 0) {
+ break;
+ }
+ List<MessageView> collect = poll.stream()
+ .map(MessageView::fromMessageExt).collect(Collectors.toList());
+
+ for (MessageView view : collect) {
+ if (size > 0) {
+ messageViews.add(view);
+ size--;
+ }
+ }
+ } else {
+ break;
+ }
+
+ }
+ }
+ return new PageImpl<>(messageViews, query.page(), total);
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ } finally {
+ consumer.shutdown();
+ }
+ }
+
+ private int moveStartOffset(List<QueueOffsetInfo> queueOffsets, MessageQueryByPage query) {
+ int size = queueOffsets.size();
+ int next = 0;
+ long offset = query.getPageNum() * query.getPageSize();
+ if (offset == 0) {
+ return next;
+ }
+ // sort by queueOffset size
+ List<QueueOffsetInfo> orderQueue = queueOffsets
+ .stream()
+ .sorted((o1, o2) -> {
+ long size1 = o1.getEnd() - o1.getStart();
+ long size2 = o2.getEnd() - o2.getStart();
+ if (size1 < size2) {
+ return -1;
+ } else if (size1 > size2) {
+ return 1;
+ }
+ return 0;
+ }).collect(Collectors.toList());
+
+ // Take the smallest one each time
+ for (int i = 0; i < size && offset >= (size - i); i++) {
+ long minSize = orderQueue.get(i).getEnd() - orderQueue.get(i).getStartOffset();
+ if (minSize == 0) {
+ continue;
+ }
+ long reduce = minSize * (size - i);
+ if (reduce <= offset) {
+ offset -= reduce;
+ for (int j = i; j < size; j++) {
+ orderQueue.get(j).incStartOffset(minSize);
+ }
+ } else {
+ long addOffset = offset / (size - i);
+ offset -= addOffset * (size - i);
+ if (addOffset != 0) {
+ for (int j = i; j < size; j++) {
+ orderQueue.get(j).incStartOffset(addOffset);
+ }
+ }
+ }
+ }
+ for (QueueOffsetInfo info : orderQueue) {
+ QueueOffsetInfo queueOffsetInfo = queueOffsets.get(info.getIdx());
+ queueOffsetInfo.setStartOffset(info.getStartOffset());
+ queueOffsetInfo.setEndOffset(info.getEndOffset());
+ }
+
+ for (QueueOffsetInfo info : queueOffsets) {
+ if (offset == 0) {
+ break;
+ }
+ next = (next + 1) % size;
+ if (info.getStartOffset() < info.getEnd()) {
+ info.incStartOffset();
+ --offset;
+ }
+ }
+ return next;
+ }
+
+ private void moveEndOffset(List<QueueOffsetInfo> queueOffsets, MessageQueryByPage query, int next) {
+ int size = queueOffsets.size();
+ for (int j = 0; j < query.getPageSize(); j++) {
+ QueueOffsetInfo nextQueueOffset = queueOffsets.get(next);
+ next = (next + 1) % size;
+ int start = next;
+ while (nextQueueOffset.getEndOffset() >= nextQueueOffset.getEnd()) {
+ nextQueueOffset = queueOffsets.get(next);
+ next = (next + 1) % size;
+ if (start == next) {
+ return;
+ }
+ }
+ nextQueueOffset.incEndOffset();
+ }
+ }
+
}
diff --git a/rocketmq-console/src/main/resources/static/src/message.js b/rocketmq-console/src/main/resources/static/src/message.js
index 0c0aefb..d2de6d9 100644
--- a/rocketmq-console/src/main/resources/static/src/message.js
+++ b/rocketmq-console/src/main/resources/static/src/message.js
@@ -43,6 +43,8 @@
$scope.timepickerEnd = moment().add(1,'hour').format('YYYY-MM-DD HH:mm');
$scope.timepickerOptions ={format: 'YYYY-MM-DD HH:mm', showClear: true};
+ $scope.taskId = "";
+
$scope.paginationConf = {
currentPage: 1,
totalItems: 0,
@@ -51,10 +53,44 @@
perPageOptions: [10],
rememberPerPage: 'perPageItems',
onChange: function () {
- $scope.changeShowMessageList(this.currentPage,this.totalItems);
+ $scope.queryMessagePageByTopic()
}
};
+ $scope.queryMessagePageByTopic = function () {
+ if ($scope.timepickerEnd < $scope.timepickerBegin) {
+ Notification.error({message: "endTime is later than beginTime!", delay: 2000});
+ return
+ }
+ if( $scope.selectedTopic === [] || (typeof $scope.selectedTopic) == "object"){
+ return
+ }
+ $http({
+ method: "POST",
+ url: "message/queryMessagePageByTopic.query",
+ data: {
+ topic: $scope.selectedTopic,
+ begin: $scope.timepickerBegin.valueOf(),
+ end: $scope.timepickerEnd.valueOf(),
+ pageNum: $scope.paginationConf.currentPage,
+ pageSize: $scope.paginationConf.itemsPerPage,
+ taskId: $scope.taskId
+ }
+ }).success(function (resp) {
+ if (resp.status === 0) {
+ console.log(resp);
+ $scope.messageShowList = resp.data.page.content;
+ if(resp.data.page.first){
+ $scope.paginationConf.currentPage = 1;
+ }
+ $scope.paginationConf.currentPage = resp.data.page.number + 1;
+ $scope.paginationConf.totalItems = resp.data.page.totalElements;
+ $scope.taskId = resp.data.taskId
+ }else {
+ Notification.error({message: resp.errMsg, delay: 2000});
+ }
+ });
+ }
$scope.queryMessageByTopic = function () {
console.log($scope.selectedTopic);
@@ -153,7 +189,6 @@
});
};
-
$scope.changeShowMessageList = function (currentPage,totalItem) {
var perPage = $scope.paginationConf.itemsPerPage;
var from = (currentPage - 1) * perPage;
@@ -161,6 +196,13 @@
$scope.messageShowList = $scope.queryMessageByTopicResult.slice(from, to);
$scope.paginationConf.totalItems = totalItem ;
};
+
+ $scope.onChangeQueryCondition = function (){
+ console.log("change")
+ $scope.taskId = "";
+ $scope.paginationConf.currentPage = 1;
+ $scope.paginationConf.totalItems = 0;
+ }
}]);
module.controller('messageDetailViewDialogController',['$scope', 'ngDialog', '$http','Notification', function ($scope, ngDialog, $http,Notification) {
diff --git a/rocketmq-console/src/main/resources/static/view/pages/message.html b/rocketmq-console/src/main/resources/static/view/pages/message.html
index f8bf3c8..8cac242 100644
--- a/rocketmq-console/src/main/resources/static/view/pages/message.html
+++ b/rocketmq-console/src/main/resources/static/view/pages/message.html
@@ -21,7 +21,7 @@
<md-tabs md-dynamic-height="" md-border-bottom="">
<md-tab label="Topic">
<md-content class="md-padding" style="min-height:600px">
- <h5 class="md-display-5">Only Return 2000 Messages</h5>
+ <h5 class="md-display-5">Total {{paginationConf.totalItems}} Messages</h5>
<div class="row">
<form class="form-inline pull-left col-sm-12">
<div class="form-group">
@@ -32,6 +32,7 @@
<select name="mySelectTopic" chosen
ng-model="selectedTopic"
ng-options="item for item in allTopicList"
+ ng-change="onChangeQueryCondition()"
required>
<option value=""></option>
</select>
@@ -40,7 +41,7 @@
<div class="form-group ">
<label>{{'BEGIN' | translate}}:</label>
<div class="input-group">
- <input class="form-control" datetimepicker ng-model="timepickerBegin"
+ <input class="form-control" datetimepicker ng-change="onChangeQueryCondition()" ng-model="timepickerBegin"
options="timepickerOptions"/>
<span class="input-group-addon"><span
class="glyphicon glyphicon-calendar"></span></span>
@@ -49,7 +50,7 @@
<div class="form-group">
<label>{{'END' | translate}}:</label>
<div class="input-group">
- <input class="form-control" datetimepicker ng-model="timepickerEnd"
+ <input class="form-control" datetimepicker ng-change="onChangeQueryCondition()" ng-model="timepickerEnd"
options="timepickerOptions"/>
<span class="input-group-addon"><span
class="glyphicon glyphicon-calendar"></span></span>
@@ -58,7 +59,7 @@
<button id="searchAppsButton" type="button"
class="btn btn-raised btn-sm btn-primary"
data-toggle="modal"
- ng-click="queryMessageByTopic()">
+ ng-click="queryMessagePageByTopic()">
<span class="glyphicon glyphicon-search"></span>{{ 'SEARCH' | translate}}
</button>
</form>