blob: bd732ab747e450a2a6a2c2e49c92f2f08c122edd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.admin;
import java.io.UnsupportedEncodingException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateGroupForbiddenRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.subscription.GroupForbidden;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.admin.api.TrackType;
import org.apache.rocketmq.tools.admin.common.AdminToolHandler;
import org.apache.rocketmq.tools.admin.common.AdminToolResult;
import org.apache.rocketmq.tools.admin.common.AdminToolsResultCodeEnum;
import org.apache.rocketmq.tools.command.CommandUtil;
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<String>();
static {
SYSTEM_GROUP_SET.add(MixAll.DEFAULT_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.DEFAULT_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.MONITOR_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CLIENT_INNER_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.ONS_HTTP_PROXY_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PERMISSION_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_OWNER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PULL_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_SYS_RMQ_TRANS);
}
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQAdminExt defaultMQAdminExt;
private ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mqClientInstance;
private RPCHook rpcHook;
private long timeoutMillis = 20000;
private Random random = new Random();
protected final List<String> kvNamespaceToDeleteList = Arrays.asList(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
protected ThreadPoolExecutor threadPoolExecutor;
public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long timeoutMillis) {
this(defaultMQAdminExt, null, timeoutMillis);
}
public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, RPCHook rpcHook, long timeoutMillis) {
this.defaultMQAdminExt = defaultMQAdminExt;
this.rpcHook = rpcHook;
this.timeoutMillis = timeoutMillis;
}
@Override public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.defaultMQAdminExt.changeInstanceNameToPID();
this.mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup() + "] has created already, specifed another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
mqClientInstance.start();
log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
this.serviceState = ServiceState.RUNNING;
int theadPoolCoreSize = Integer.parseInt(System.getProperty("rocketmq.admin.threadpool.coresize", "20"));
this.threadPoolExecutor = new ThreadPoolExecutor(theadPoolCoreSize, 100, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_"));
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The AdminExt service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}
@Override public void shutdown() {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.mqClientInstance.unregisterAdminExt(this.defaultMQAdminExt.getAdminExtGroup());
this.mqClientInstance.shutdown();
log.info("the adminExt [{}] shutdown OK", this.defaultMQAdminExt.getAdminExtGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
this.threadPoolExecutor.shutdown();
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
@Override public void addBrokerToContainer(String brokerContainerAddr,
String brokerConfig) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
this.mqClientInstance.getMQClientAPIImpl().addBroker(brokerContainerAddr, brokerConfig, 20000);
}
@Override public void removeBrokerFromContainer(String brokerContainerAddr, String clusterName, String brokerName,
long brokerId) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
this.mqClientInstance.getMQClientAPIImpl().removeBroker(brokerContainerAddr, clusterName, brokerName, brokerId, 20000);
}
public AdminToolResult adminToolExecute(AdminToolHandler handler) {
try {
return handler.doExecute();
} catch (RemotingException e) {
log.error("", e);
return AdminToolResult.failure(AdminToolsResultCodeEnum.REMOTING_ERROR, e.getMessage());
} catch (MQClientException e) {
if (ResponseCode.TOPIC_NOT_EXIST == e.getResponseCode()) {
return AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, e.getErrorMessage());
}
return AdminToolResult.failure(AdminToolsResultCodeEnum.MQ_CLIENT_ERROR, e.getMessage());
} catch (InterruptedException e) {
return AdminToolResult.failure(AdminToolsResultCodeEnum.INTERRUPT_ERROR, e.getMessage());
} catch (Exception e) {
return AdminToolResult.failure(AdminToolsResultCodeEnum.MQ_BROKER_ERROR, e.getMessage());
}
}
@Override public void updateBrokerConfig(String brokerAddr,
Properties properties) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
this.mqClientInstance.getMQClientAPIImpl().updateBrokerConfig(brokerAddr, properties, timeoutMillis);
}
@Override public Properties getBrokerConfig(
final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getBrokerConfig(brokerAddr, timeoutMillis);
}
@Override public void createAndUpdateTopicConfig(String addr,
TopicConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}
@Override public void createAndUpdatePlainAccessConfig(String addr,
PlainAccessConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().createPlainAccessConfig(addr, config, timeoutMillis);
}
@Override public void deletePlainAccessConfig(String addr,
String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().deleteAccessConfig(addr, accessKey, timeoutMillis);
}
@Override public void updateGlobalWhiteAddrConfig(String addr,
String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().updateGlobalWhiteAddrsConfig(addr, globalWhiteAddrs, timeoutMillis);
}
@Override public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterAclInfo(addr, timeoutMillis);
}
@Override public AclConfig examineBrokerClusterAclConfig(
String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterConfig(addr, timeoutMillis);
}
@Override public void createAndUpdateSubscriptionGroupConfig(String addr,
SubscriptionGroupConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().createSubscriptionGroup(addr, config, timeoutMillis);
}
@Override public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr,
String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
SubscriptionGroupWrapper wrapper = this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(addr, timeoutMillis);
return wrapper.getSubscriptionGroupTable().get(group);
}
@Override public TopicConfig examineTopicConfig(String addr,
String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis);
}
@Override public TopicStatsTable examineTopicStats(
String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
TopicStatsTable topicStatsTable = new TopicStatsTable();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String addr = bd.selectBrokerAddr();
if (addr != null) {
TopicStatsTable tst = this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis);
topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
}
}
//Get the static stats
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigFromRoute(topic, topicRouteData, defaultMQAdminExt);
MQAdminUtils.convertPhysicalTopicStats(topic, brokerConfigMap, topicStatsTable);
if (topicStatsTable.getOffsetTable().isEmpty()) {
throw new MQClientException("Not found the topic stats info", null);
}
return topicStatsTable;
}
@Override public AdminToolResult<TopicStatsTable> examineTopicStatsConcurrent(final String topic) {
return adminToolExecute(new AdminToolHandler() {
@Override public AdminToolResult doExecute() throws Exception {
final TopicStatsTable topicStatsTable = new TopicStatsTable();
TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
return AdminToolResult.success(topicStatsTable);
}
final CountDownLatch latch = new CountDownLatch(topicRouteData.getBrokerDatas().size());
for (final BrokerData bd : topicRouteData.getBrokerDatas()) {
threadPoolExecutor.submit(new Runnable() {
@Override public void run() {
try {
String addr = bd.selectBrokerAddr();
if (addr != null) {
TopicStatsTable tst = mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis);
topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
}
} catch (Exception e) {
log.error("getTopicStatsInfo error. topic=" + topic, e);
} finally {
latch.countDown();
}
}
});
}
latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return AdminToolResult.success(topicStatsTable);
}
});
}
@Override public TopicStatsTable examineTopicStats(String brokerAddr,
String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic, timeoutMillis);
}
@Override public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis);
}
@Override public TopicList fetchTopicsByCLuster(
String clusterName) throws RemotingException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicsByCluster(clusterName, timeoutMillis);
}
@Override public KVTable fetchBrokerRuntimeStats(
final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getBrokerRuntimeInfo(brokerAddr, timeoutMillis);
}
@Override public ConsumeStats examineConsumeStats(
String consumerGroup) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return examineConsumeStats(consumerGroup, null);
}
@Override public ConsumeStats examineConsumeStats(String consumerGroup,
String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
TopicRouteData topicRouteData = null;
List<String> routeTopics = new ArrayList<>();
routeTopics.add(MixAll.getRetryTopic(consumerGroup));
if (topic != null) {
routeTopics.add(topic);
routeTopics.add(KeyBuilder.buildPopRetryTopic(topic, consumerGroup));
}
for (int i = 0; i < routeTopics.size(); i++) {
try {
topicRouteData = this.examineTopicRouteInfo(routeTopics.get(i));
if (topicRouteData != null) {
break;
}
} catch (Throwable e) {
if (i == routeTopics.size() - 1) {
throw e;
}
}
}
ConsumeStats result = new ConsumeStats();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String addr = bd.selectBrokerAddr();
if (addr != null) {
ConsumeStats consumeStats = this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3);
result.getOffsetTable().putAll(consumeStats.getOffsetTable());
double value = result.getConsumeTps() + consumeStats.getConsumeTps();
result.setConsumeTps(value);
}
}
Set<String> topics = new HashSet<>();
for (MessageQueue messageQueue : result.getOffsetTable().keySet()) {
topics.add(messageQueue.getTopic());
}
ConsumeStats staticResult = new ConsumeStats();
staticResult.setConsumeTps(result.getConsumeTps());
// for topic, we put the physical stats, how about group?
// staticResult.getOffsetTable().putAll(result.getOffsetTable());
for (String currentTopic : topics) {
TopicRouteData currentRoute = this.examineTopicRouteInfo(currentTopic);
if (currentRoute.getTopicQueueMappingByBroker() == null
|| currentRoute.getTopicQueueMappingByBroker().isEmpty()) {
//normal topic
for (Map.Entry<MessageQueue, OffsetWrapper> entry: result.getOffsetTable().entrySet()) {
if (entry.getKey().getTopic().equals(currentTopic)) {
staticResult.getOffsetTable().put(entry.getKey(), entry.getValue());
}
}
}
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigFromRoute(currentTopic, currentRoute, defaultMQAdminExt);
ConsumeStats consumeStats = MQAdminUtils.convertPhysicalConsumeStats(brokerConfigMap, result);
staticResult.getOffsetTable().putAll(consumeStats.getOffsetTable());
}
if (staticResult.getOffsetTable().isEmpty()) {
throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
}
return staticResult;
}
@Override
public AdminToolResult<ConsumeStats> examineConsumeStatsConcurrent(final String consumerGroup, final String topic) {
return adminToolExecute(new AdminToolHandler() {
@Override public AdminToolResult doExecute() throws Exception {
TopicRouteData topicRouteData = null;
List<String> routeTopics = new ArrayList<>();
routeTopics.add(MixAll.getRetryTopic(consumerGroup));
if (topic != null) {
routeTopics.add(topic);
routeTopics.add(KeyBuilder.buildPopRetryTopic(topic, consumerGroup));
}
for (int i = 0; i < routeTopics.size(); i++) {
try {
topicRouteData = examineTopicRouteInfo(routeTopics.get(i));
if (topicRouteData != null) {
break;
}
} catch (Throwable e) {
continue;
}
}
if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
return AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, "topic router info not found");
}
final ConsumeStats result = new ConsumeStats();
final CountDownLatch latch = new CountDownLatch(topicRouteData.getBrokerDatas().size());
final Map<String, Double> consumerTpsMap = new ConcurrentHashMap<>(topicRouteData.getBrokerDatas().size());
for (final BrokerData bd : topicRouteData.getBrokerDatas()) {
threadPoolExecutor.submit(new Runnable() {
@Override public void run() {
try {
String addr = bd.selectBrokerAddr();
if (addr != null) {
ConsumeStats consumeStats = mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis);
result.getOffsetTable().putAll(consumeStats.getOffsetTable());
consumerTpsMap.put(addr, consumeStats.getConsumeTps());
}
} catch (Exception e) {
log.error("getTopicStatsInfo error. topic=" + topic, e);
} finally {
latch.countDown();
}
}
});
}
latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
for (Double tps : consumerTpsMap.values()) {
result.setConsumeTps(result.getConsumeTps() + tps);
}
if (result.getOffsetTable().isEmpty()) {
AdminToolResult.failure(AdminToolsResultCodeEnum.CONSUMER_NOT_ONLINE, "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
}
return AdminToolResult.success(result);
}
});
}
@Override
public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterInfo(timeoutMillis);
}
@Override public TopicRouteData examineTopicRouteInfo(
String topic) throws RemotingException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
}
@Override public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
MessageDecoder.decodeMessageId(msgId);
return this.viewMessage(msgId);
} catch (Exception e) {
log.warn("the msgId maybe created by new client. msgId={}", msgId, e);
}
return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, msgId);
}
@Override public MessageExt queryMessage(String clusterName, String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
MessageDecoder.decodeMessageId(msgId);
return this.viewMessage(msgId);
} catch (Exception e) {
log.warn("the msgId maybe created by new client. msgId={}", msgId, e);
}
return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(clusterName, topic, msgId);
}
@Override public ConsumerConnection examineConsumerConnectionInfo(
String consumerGroup) throws InterruptedException, MQBrokerException,
RemotingException, MQClientException {
ConsumerConnection result = new ConsumerConnection();
String topic = MixAll.getRetryTopic(consumerGroup);
List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas();
BrokerData brokerData = brokers.get(random.nextInt(brokers.size()));
String addr = null;
if (brokerData != null) {
addr = brokerData.selectBrokerAddr();
if (StringUtils.isNotBlank(addr)) {
result = this.mqClientInstance.getMQClientAPIImpl().getConsumerConnectionList(addr, consumerGroup, timeoutMillis);
}
}
if (result.getConnectionSet().isEmpty()) {
log.warn("the consumer group not online. brokerAddr={}, group={}", addr, consumerGroup);
throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not found the consumer group connection");
}
return result;
}
@Override
public ConsumerConnection examineConsumerConnectionInfo(
String consumerGroup, String brokerAddr) throws InterruptedException, MQBrokerException,
RemotingException, MQClientException {
ConsumerConnection result =
this.mqClientInstance.getMQClientAPIImpl().getConsumerConnectionList(brokerAddr, consumerGroup, timeoutMillis);
if (result.getConnectionSet().isEmpty()) {
log.warn("the consumer group not online. brokerAddr={}, group={}", brokerAddr, consumerGroup);
throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not found the consumer group connection");
}
return result;
}
@Override public ProducerConnection examineProducerConnectionInfo(String producerGroup,
final String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
ProducerConnection result = new ProducerConnection();
List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas();
BrokerData brokerData = brokers.get(random.nextInt(brokers.size()));
String addr = null;
if (brokerData != null) {
addr = brokerData.selectBrokerAddr();
if (StringUtils.isNotBlank(addr)) {
result = this.mqClientInstance.getMQClientAPIImpl().getProducerConnectionList(addr, producerGroup, timeoutMillis);
}
}
if (result.getConnectionSet().isEmpty()) {
log.warn("the producer group not online. brokerAddr={}, group={}", addr, producerGroup);
throw new MQClientException("Not found the producer group connection", null);
}
return result;
}
@Override public List<String> getNameServerAddressList() {
return this.mqClientInstance.getMQClientAPIImpl().getNameServerAddressList();
}
@Override public int wipeWritePermOfBroker(final String namesrvAddr,
String brokerName) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
return this.mqClientInstance.getMQClientAPIImpl().wipeWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis);
}
@Override
public int addWritePermOfBroker(String namesrvAddr, String brokerName) throws RemotingCommandException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
return this.mqClientInstance.getMQClientAPIImpl().addWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis);
}
@Override public void putKVConfig(String namespace, String key, String value) {
}
@Override public String getKVConfig(String namespace,
String key) throws RemotingException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(namespace, key, timeoutMillis);
}
@Override public KVTable getKVListByNamespace(
String namespace) throws RemotingException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getKVListByNamespace(namespace, timeoutMillis);
}
@Override public void deleteTopic(String topicName,
String clusterName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
Set<String> brokerAddressSet = CommandUtil.fetchMasterAndSlaveAddrByClusterName(this.defaultMQAdminExt, clusterName);
this.deleteTopicInBroker(brokerAddressSet, topicName);
List<String> nameServerList = this.getNameServerAddressList();
Set<String> nameServerSet = new HashSet<String>(nameServerList);
this.deleteTopicInNameServer(nameServerSet, topicName);
for (String namespace : this.kvNamespaceToDeleteList) {
this.deleteKvConfig(namespace, topicName);
}
}
@Override public void deleteTopicInBroker(Set<String> addrs,
String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
for (String addr : addrs) {
this.mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, timeoutMillis);
}
}
@Override public AdminToolResult<BrokerOperatorResult> deleteTopicInBrokerConcurrent(final Set<String> addrs,
final String topic) {
final List<String> successList = new CopyOnWriteArrayList<>();
final List<String> failureList = new CopyOnWriteArrayList<>();
final CountDownLatch latch = new CountDownLatch(addrs.size());
for (final String addr : addrs) {
threadPoolExecutor.submit(new Runnable() {
@Override public void run() {
try {
mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, timeoutMillis);
successList.add(addr);
} catch (Exception e) {
log.error("deleteTopicInBrokerConcurrent error. topic=" + topic + ", host=" + addr, e);
failureList.add(addr);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) {
}
BrokerOperatorResult result = new BrokerOperatorResult();
result.setSuccessList(successList);
result.setFailureList(failureList);
return AdminToolResult.success(result);
}
@Override public void deleteTopicInNameServer(Set<String> addrs,
String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
if (addrs == null) {
String ns = this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr();
addrs = new HashSet(Arrays.asList(ns.split(";")));
}
for (String addr : addrs) {
this.mqClientInstance.getMQClientAPIImpl().deleteTopicInNameServer(addr, topic, timeoutMillis);
}
}
@Override public void deleteSubscriptionGroup(String addr,
String groupName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, false, timeoutMillis);
}
@Override public void deleteSubscriptionGroup(String addr, String groupName,
boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, removeOffset, timeoutMillis);
}
@Override public void createAndUpdateKvConfig(String namespace, String key,
String value) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(namespace, key, value, timeoutMillis);
}
@Override public void deleteKvConfig(String namespace,
String key) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().deleteKVConfigValue(namespace, key, timeoutMillis);
}
@Override public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List<RollbackStats> rollbackStatsList = new ArrayList<RollbackStats>();
Map<String, QueueData> topicRouteMap = new HashMap<String, QueueData>();
for (QueueData queueData : topicRouteData.getQueueDatas()) {
topicRouteMap.put(queueData.getBrokerName(), queueData);
}
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String addr = bd.selectBrokerAddr();
if (addr != null) {
rollbackStatsList.addAll(resetOffsetByTimestampOld(addr, topicRouteMap.get(bd.getBrokerName()), consumerGroup, topic, timestamp, force));
}
}
return rollbackStatsList;
}
private List<RollbackStats> resetOffsetByTimestampOld(String brokerAddr, QueueData queueData, String consumerGroup,
String topic, long timestamp,
boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
List<RollbackStats> rollbackStatsList = new ArrayList<RollbackStats>();
ConsumeStats consumeStats = this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(brokerAddr, consumerGroup, timeoutMillis);
boolean hasConsumed = false;
for (Map.Entry<MessageQueue, OffsetWrapper> entry : consumeStats.getOffsetTable().entrySet()) {
MessageQueue queue = entry.getKey();
OffsetWrapper offsetWrapper = entry.getValue();
if (topic.equals(queue.getTopic())) {
hasConsumed = true;
RollbackStats rollbackStats = resetOffsetConsumeOffset(brokerAddr, consumerGroup, queue, offsetWrapper, timestamp, force);
rollbackStatsList.add(rollbackStats);
}
}
if (!hasConsumed) {
HashMap<MessageQueue, TopicOffset> topicStatus = this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic, timeoutMillis).getOffsetTable();
for (int i = 0; i < queueData.getReadQueueNums(); i++) {
MessageQueue queue = new MessageQueue(topic, queueData.getBrokerName(), i);
OffsetWrapper offsetWrapper = new OffsetWrapper();
offsetWrapper.setBrokerOffset(topicStatus.get(queue).getMaxOffset());
offsetWrapper.setConsumerOffset(topicStatus.get(queue).getMinOffset());
RollbackStats rollbackStats = resetOffsetConsumeOffset(brokerAddr, consumerGroup, queue, offsetWrapper, timestamp, force);
rollbackStatsList.add(rollbackStats);
}
}
return rollbackStatsList;
}
@Override public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp,
boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return resetOffsetByTimestamp(topic, group, timestamp, isForce, false);
}
@Override public void resetOffsetNew(String consumerGroup, String topic,
long timestamp) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
this.resetOffsetByTimestamp(topic, consumerGroup, timestamp, true);
} catch (MQClientException e) {
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
this.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, true);
return;
}
throw e;
}
}
@Override
public AdminToolResult<BrokerOperatorResult> resetOffsetNewConcurrent(final String group, final String topic,
final long timestamp) {
return adminToolExecute(new AdminToolHandler() {
@Override public AdminToolResult doExecute() throws Exception {
TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
return AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, "topic router info not found");
}
final Map<String, QueueData> topicRouteMap = new HashMap<String, QueueData>();
for (QueueData queueData : topicRouteData.getQueueDatas()) {
topicRouteMap.put(queueData.getBrokerName(), queueData);
}
final CopyOnWriteArrayList successList = new CopyOnWriteArrayList();
final CopyOnWriteArrayList failureList = new CopyOnWriteArrayList();
final CountDownLatch latch = new CountDownLatch(topicRouteData.getBrokerDatas().size());
for (final BrokerData bd : topicRouteData.getBrokerDatas()) {
threadPoolExecutor.submit(new Runnable() {
@Override public void run() {
String addr = bd.selectBrokerAddr();
try {
if (addr != null) {
Map<MessageQueue, Long> offsetTable = mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, true, timeoutMillis, false);
if (offsetTable != null) {
successList.add(addr);
} else {
failureList.add(addr);
}
}
} catch (MQClientException e) {
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
try {
resetOffsetByTimestampOld(addr, topicRouteMap.get(bd.getBrokerName()), group, topic, timestamp, true);
successList.add(addr);
} catch (Exception e2) {
log.error(MessageFormat.format("resetOffsetByTimestampOld error. addr={0}, topic={1}, group={2},timestamp={3}", addr, topic, group, timestamp), e);
failureList.add(addr);
}
} else if (ResponseCode.SYSTEM_ERROR == e.getResponseCode()) {
// CODE: 1 DESC: THe consumer group <GID_newggghh> not exist, never online
successList.add(addr);
} else {
failureList.add(addr);
log.error(MessageFormat.format("resetOffsetNewConcurrent error. addr={0}, topic={1}, group={2},timestamp={3}", addr, topic, group, timestamp), e);
}
} catch (Exception e) {
failureList.add(addr);
log.error(MessageFormat.format("resetOffsetNewConcurrent error. addr={0}, topic={1}, group={2},timestamp={3}", addr, topic, group, timestamp), e);
} finally {
latch.countDown();
}
}
});
}
latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
BrokerOperatorResult result = new BrokerOperatorResult();
result.setSuccessList(successList);
result.setFailureList(failureList);
if (successList.size() == topicRouteData.getBrokerDatas().size()) {
return AdminToolResult.success(result);
} else {
return AdminToolResult.failure(AdminToolsResultCodeEnum.MQ_BROKER_ERROR, "operator failure", result);
}
}
});
}
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();
if (brokerDatas != null) {
for (BrokerData brokerData : brokerDatas) {
String addr = brokerData.selectBrokerAddr();
if (addr != null) {
Map<MessageQueue, Long> offsetTable = this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, isC);
if (offsetTable != null) {
allOffsetTable.putAll(offsetTable);
}
}
}
}
return allOffsetTable;
}
private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue queue,
OffsetWrapper offsetWrapper, long timestamp,
boolean force) throws RemotingException, InterruptedException, MQBrokerException {
long resetOffset;
if (timestamp == -1) {
resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timeoutMillis);
} else {
resetOffset = this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp, timeoutMillis);
}
RollbackStats rollbackStats = new RollbackStats();
rollbackStats.setBrokerName(queue.getBrokerName());
rollbackStats.setQueueId(queue.getQueueId());
rollbackStats.setBrokerOffset(offsetWrapper.getBrokerOffset());
rollbackStats.setConsumerOffset(offsetWrapper.getConsumerOffset());
rollbackStats.setTimestampOffset(resetOffset);
rollbackStats.setRollbackOffset(offsetWrapper.getConsumerOffset());
if (force || resetOffset <= offsetWrapper.getConsumerOffset()) {
rollbackStats.setRollbackOffset(resetOffset);
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setConsumerGroup(consumeGroup);
requestHeader.setTopic(queue.getTopic());
requestHeader.setQueueId(queue.getQueueId());
requestHeader.setCommitOffset(resetOffset);
this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis);
}
return rollbackStats;
}
@Override public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
if (brokerDatas != null && brokerDatas.size() > 0) {
String addr = brokerDatas.get(0).selectBrokerAddr();
if (addr != null) {
return this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToGetConsumerStatus(addr, topic, group, clientAddr, timeoutMillis);
}
}
return Collections.EMPTY_MAP;
}
@Override public void createOrUpdateOrderConf(String key, String value,
boolean isCluster) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
if (isCluster) {
this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key, value, timeoutMillis);
} else {
String oldOrderConfs = null;
try {
oldOrderConfs = this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key, timeoutMillis);
} catch (Exception e) {
e.printStackTrace();
}
Map<String, String> orderConfMap = new HashMap<String, String>();
if (!UtilAll.isBlank(oldOrderConfs)) {
String[] oldOrderConfArr = oldOrderConfs.split(";");
for (String oldOrderConf : oldOrderConfArr) {
String[] items = oldOrderConf.split(":");
orderConfMap.put(items[0], oldOrderConf);
}
}
String[] items = value.split(":");
orderConfMap.put(items[0], value);
StringBuilder newOrderConf = new StringBuilder();
String splitor = "";
for (Map.Entry<String, String> entry : orderConfMap.entrySet()) {
newOrderConf.append(splitor).append(entry.getValue());
splitor = ";";
}
this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key, newOrderConf.toString(), timeoutMillis);
}
}
@Override public GroupList queryTopicConsumeByWho(
String topic) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String addr = bd.selectBrokerAddr();
if (addr != null) {
return this.mqClientInstance.getMQClientAPIImpl().queryTopicConsumeByWho(addr, topic, timeoutMillis);
}
}
return null;
}
@Override public SubscriptionData querySubscription(String group,
String topic) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String addr = bd.selectBrokerAddr();
if (addr != null) {
return this.mqClientInstance.getMQClientAPIImpl().querySubscriptionByConsumer(addr, group, topic, timeoutMillis);
}
break;
}
return null;
}
@Override public TopicList queryTopicsByConsumer(
String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
String retryTopic = MixAll.getRetryTopic(group);
TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic);
TopicList result = new TopicList();
//Query all brokers
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String addr = bd.selectBrokerAddr();
if (addr != null) {
TopicList topicList = this.mqClientInstance.getMQClientAPIImpl().queryTopicsByConsumer(addr, group, timeoutMillis);
result.getTopicList().addAll(topicList.getTopicList());
}
}
return result;
}
@Override public AdminToolResult<TopicList> queryTopicsByConsumerConcurrent(final String group) {
return adminToolExecute(new AdminToolHandler() {
@Override public AdminToolResult doExecute() throws Exception {
String retryTopic = MixAll.getRetryTopic(group);
TopicRouteData topicRouteData = examineTopicRouteInfo(retryTopic);
if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
return AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, "router info not found.");
}
final TopicList result = new TopicList();
final CountDownLatch latch = new CountDownLatch(topicRouteData.getBrokerDatas().size());
for (final BrokerData bd : topicRouteData.getBrokerDatas()) {
threadPoolExecutor.submit(new Runnable() {
@Override public void run() {
try {
String addr = bd.selectBrokerAddr();
if (addr != null) {
TopicList topicList = mqClientInstance.getMQClientAPIImpl().queryTopicsByConsumer(addr, group, timeoutMillis);
result.getTopicList().addAll(topicList.getTopicList());
}
} catch (Exception e) {
log.error("getTopicStatsInfo error. groupId=" + group, e);
} finally {
latch.countDown();
}
}
});
}
latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return AdminToolResult.success(result);
}
});
}
@Override public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic,
final String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
List<QueueTimeSpan> spanSet = new ArrayList<QueueTimeSpan>();
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String addr = bd.selectBrokerAddr();
if (addr != null) {
spanSet.addAll(this.mqClientInstance.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, group, timeoutMillis));
}
}
return spanSet;
}
@Override
public AdminToolResult<List<QueueTimeSpan>> queryConsumeTimeSpanConcurrent(final String topic, final String group) {
return adminToolExecute(new AdminToolHandler() {
@Override public AdminToolResult doExecute() throws Exception {
final List<QueueTimeSpan> spanSet = new ArrayList<QueueTimeSpan>();
TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
return AdminToolResult.success(spanSet);
}
final CountDownLatch latch = new CountDownLatch(topicRouteData.getBrokerDatas().size());
for (final BrokerData bd : topicRouteData.getBrokerDatas()) {
threadPoolExecutor.submit(new Runnable() {
@Override public void run() {
try {
String addr = bd.selectBrokerAddr();
if (addr != null) {
spanSet.addAll(mqClientInstance.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, group, timeoutMillis));
}
} catch (Exception e) {
log.error("queryConsumeTimeSpan error. topic=" + topic, e);
} finally {
latch.countDown();
}
}
});
}
latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return AdminToolResult.success(spanSet);
}
});
}
@Override public boolean cleanExpiredConsumerQueue(
String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
boolean result = false;
try {
ClusterInfo clusterInfo = examineBrokerClusterInfo();
if (null == cluster || "".equals(cluster)) {
for (String targetCluster : clusterInfo.retrieveAllClusterNames()) {
result = cleanExpiredConsumerQueueByCluster(clusterInfo, targetCluster);
}
} else {
result = cleanExpiredConsumerQueueByCluster(clusterInfo, cluster);
}
} catch (MQBrokerException e) {
log.error("cleanExpiredConsumerQueue error.", e);
}
return result;
}
public boolean cleanExpiredConsumerQueueByCluster(ClusterInfo clusterInfo,
String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
boolean result = false;
String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster);
for (String addr : addrs) {
result = cleanExpiredConsumerQueueByAddr(addr);
}
return result;
}
@Override public boolean cleanExpiredConsumerQueueByAddr(
String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
boolean result = mqClientInstance.getMQClientAPIImpl().cleanExpiredConsumeQueue(addr, timeoutMillis);
log.warn("clean expired ConsumeQueue on target " + addr + " broker " + result);
return result;
}
@Override public boolean cleanUnusedTopic(
String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
boolean result = false;
try {
ClusterInfo clusterInfo = examineBrokerClusterInfo();
if (null == cluster || "".equals(cluster)) {
for (String targetCluster : clusterInfo.retrieveAllClusterNames()) {
result = cleanUnusedTopicByCluster(clusterInfo, targetCluster);
}
} else {
result = cleanUnusedTopicByCluster(clusterInfo, cluster);
}
} catch (MQBrokerException e) {
log.error("cleanExpiredConsumerQueue error.", e);
}
return result;
}
public boolean cleanUnusedTopicByCluster(ClusterInfo clusterInfo,
String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
boolean result = false;
String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster);
for (String addr : addrs) {
result = cleanUnusedTopicByAddr(addr);
}
return result;
}
@Override public boolean cleanUnusedTopicByAddr(
String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
boolean result = mqClientInstance.getMQClientAPIImpl().cleanUnusedTopicByAddr(addr, timeoutMillis);
log.warn("clean expired ConsumeQueue on target " + addr + " broker " + result);
return result;
}
@Override public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId,
boolean jstack) throws RemotingException, MQClientException, InterruptedException {
return this.getConsumerRunningInfo(consumerGroup, clientId, jstack, false);
}
@Override public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack,
boolean metrics) throws RemotingException, MQClientException, InterruptedException {
String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
if (brokerDatas != null) {
for (BrokerData brokerData : brokerDatas) {
String addr = brokerData.selectBrokerAddr();
if (addr != null) {
return this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(addr, consumerGroup, clientId, jstack, timeoutMillis);
}
}
}
return null;
}
@Override public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId,
String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
MessageExt msg = this.viewMessage(msgId);
return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()), consumerGroup, clientId, msg.getTopic(), msgId, timeoutMillis);
}
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId,
final String topic,
final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
MessageExt msg = this.viewMessage(topic, msgId);
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()), consumerGroup, clientId, topic, msgId, timeoutMillis);
} else {
MessageClientExt msgClient = (MessageClientExt) msg;
return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()), consumerGroup, clientId, topic, msgClient.getOffsetMsgId(), timeoutMillis);
}
}
@Override public List<MessageTrack> messageTrackDetail(
MessageExt msg) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
List<MessageTrack> result = new ArrayList<MessageTrack>();
GroupList groupList = this.queryTopicConsumeByWho(msg.getTopic());
for (String group : groupList.getGroupList()) {
MessageTrack mt = new MessageTrack();
mt.setConsumerGroup(group);
mt.setTrackType(TrackType.UNKNOWN);
ConsumerConnection cc = null;
try {
cc = this.examineConsumerConnectionInfo(group);
} catch (MQBrokerException e) {
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
mt.setTrackType(TrackType.NOT_ONLINE);
}
mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage());
result.add(mt);
continue;
} catch (Exception e) {
mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e));
result.add(mt);
continue;
}
switch (cc.getConsumeType()) {
case CONSUME_ACTIVELY:
mt.setTrackType(TrackType.PULL);
break;
case CONSUME_PASSIVELY:
boolean ifConsumed = false;
try {
ifConsumed = this.consumed(msg, group);
} catch (MQClientException e) {
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
mt.setTrackType(TrackType.NOT_ONLINE);
}
mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage());
result.add(mt);
continue;
} catch (MQBrokerException e) {
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
mt.setTrackType(TrackType.NOT_ONLINE);
}
mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage());
result.add(mt);
continue;
} catch (Exception e) {
mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e));
result.add(mt);
continue;
}
if (ifConsumed) {
mt.setTrackType(TrackType.CONSUMED);
Iterator<Entry<String, SubscriptionData>> it = cc.getSubscriptionTable().entrySet().iterator();
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
if (next.getKey().equals(msg.getTopic())) {
if (next.getValue().getTagsSet().contains(msg.getTags()) || next.getValue().getTagsSet().contains("*") || next.getValue().getTagsSet().isEmpty()) {
} else {
mt.setTrackType(TrackType.CONSUMED_BUT_FILTERED);
}
}
}
} else {
mt.setTrackType(TrackType.NOT_CONSUME_YET);
}
break;
default:
break;
}
result.add(mt);
}
return result;
}
@Override public List<MessageTrack> messageTrackDetailConcurrent(
final MessageExt msg) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
final List<MessageTrack> result = new ArrayList<MessageTrack>();
GroupList groupList = this.queryTopicConsumeByWho(msg.getTopic());
final CountDownLatch countDownLatch = new CountDownLatch(groupList.getGroupList().size());
for (final String group : groupList.getGroupList()) {
threadPoolExecutor.submit(new Runnable() {
@Override public void run() {
MessageTrack mt = new MessageTrack();
mt.setConsumerGroup(group);
mt.setTrackType(TrackType.UNKNOWN);
ConsumerConnection cc = null;
try {
cc = DefaultMQAdminExtImpl.this.examineConsumerConnectionInfo(group);
} catch (MQBrokerException e) {
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
mt.setTrackType(TrackType.NOT_ONLINE);
}
mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage());
result.add(mt);
countDownLatch.countDown();
return;
} catch (Exception e) {
mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e));
result.add(mt);
countDownLatch.countDown();
return;
}
switch (cc.getConsumeType()) {
case CONSUME_ACTIVELY:
mt.setTrackType(TrackType.PULL);
break;
case CONSUME_PASSIVELY:
boolean ifConsumed = false;
try {
ifConsumed = DefaultMQAdminExtImpl.this.consumed(msg, group);
} catch (MQClientException e) {
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
mt.setTrackType(TrackType.NOT_ONLINE);
}
mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage());
result.add(mt);
countDownLatch.countDown();
return;
} catch (MQBrokerException e) {
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
mt.setTrackType(TrackType.NOT_ONLINE);
}
mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage());
result.add(mt);
countDownLatch.countDown();
return;
} catch (Exception e) {
mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e));
result.add(mt);
countDownLatch.countDown();
return;
}
if (ifConsumed) {
mt.setTrackType(TrackType.CONSUMED);
Iterator<Entry<String, SubscriptionData>> it = cc.getSubscriptionTable().entrySet().iterator();
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
if (next.getKey().equals(msg.getTopic())) {
if (next.getValue().getTagsSet().contains(msg.getTags()) || next.getValue().getTagsSet().contains("*") || next.getValue().getTagsSet().isEmpty()) {
} else {
mt.setTrackType(TrackType.CONSUMED_BUT_FILTERED);
}
}
}
} else {
mt.setTrackType(TrackType.NOT_CONSUME_YET);
}
break;
default:
break;
}
result.add(mt);
countDownLatch.countDown();
return;
}
});
}
countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return result;
}
public static void main(String[] args) {
Arrays.asList(null);
}
public boolean consumed(final MessageExt msg,
final String group) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
ConsumeStats cstats = this.examineConsumeStats(group);
ClusterInfo ci = this.examineBrokerClusterInfo();
Iterator<Entry<MessageQueue, OffsetWrapper>> it = cstats.getOffsetTable().entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, OffsetWrapper> next = it.next();
MessageQueue mq = next.getKey();
if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) {
BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName());
if (brokerData != null) {
String addr = RemotingUtil.convert2IpString(brokerData.getBrokerAddrs().get(MixAll.MASTER_ID));
if (RemotingUtil.socketAddress2String(msg.getStoreHost()).equals(addr)) {
if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
return true;
}
}
}
}
}
return false;
}
public boolean consumedConcurrent(final MessageExt msg,
final String group) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
AdminToolResult<ConsumeStats> cstats = this.examineConsumeStatsConcurrent(group, null);
if (!cstats.isSuccess()) {
throw new MQClientException(cstats.getCode(), cstats.getErrorMsg());
}
ClusterInfo ci = this.examineBrokerClusterInfo();
if (cstats.isSuccess()) {
for (Entry<MessageQueue, OffsetWrapper> next : cstats.getData().getOffsetTable().entrySet()) {
MessageQueue mq = next.getKey();
if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) {
BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName());
if (brokerData != null) {
String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) {
if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
return true;
}
}
}
}
}
}
return false;
}
@Override public void cloneGroupOffset(String srcGroup, String destGroup, String topic,
boolean isOffline) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
String retryTopic = MixAll.getRetryTopic(srcGroup);
TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic);
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String addr = bd.selectBrokerAddr();
if (addr != null) {
this.mqClientInstance.getMQClientAPIImpl().cloneGroupOffset(addr, srcGroup, destGroup, topic, isOffline, timeoutMillis);
}
}
}
@Override public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().viewBrokerStatsData(brokerAddr, statsName, statsKey, timeoutMillis);
}
@Override public Set<String> getClusterList(
String topic) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getClusterList(topic, timeoutMillis);
}
@Override public ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder,
long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
}
@Override public Set<String> getTopicClusterList(
final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
Set<String> clusterSet = new HashSet<String>();
ClusterInfo clusterInfo = examineBrokerClusterInfo();
TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
BrokerData brokerData = topicRouteData.getBrokerDatas().get(0);
String brokerName = brokerData.getBrokerName();
Iterator<Map.Entry<String, Set<String>>> it = clusterInfo.getClusterAddrTable().entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, Set<String>> next = it.next();
if (next.getValue().contains(brokerName)) {
clusterSet.add(next.getKey());
}
}
return clusterSet;
}
@Override public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr, timeoutMillis);
}
@Override public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
SubscriptionGroupWrapper subscriptionGroupWrapper = this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr, timeoutMillis);
Iterator<Entry<String, SubscriptionGroupConfig>> iterator = subscriptionGroupWrapper.getSubscriptionGroupTable().entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next();
if (MixAll.isSysConsumerGroup(configEntry.getKey()) || SYSTEM_GROUP_SET.contains(configEntry.getKey())) {
iterator.remove();
}
}
return subscriptionGroupWrapper;
}
@Override public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis);
}
@Override public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
long timeoutMillis) throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = this.getAllTopicConfig(brokerAddr, timeoutMillis);
TopicList topicList = this.mqClientInstance.getMQClientAPIImpl().getSystemTopicListFromBroker(brokerAddr, timeoutMillis);
Iterator<Entry<String, TopicConfig>> iterator = topicConfigSerializeWrapper.getTopicConfigTable().entrySet().iterator();
while (iterator.hasNext()) {
String topic = iterator.next().getKey();
if (topicList.getTopicList().contains(topic) || (!specialTopic && (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)))) {
iterator.remove();
}
}
return topicConfigSerializeWrapper;
}
@Override public void createTopic(String key, String newTopic, int queueNum,
Map<String, String> attributes) throws MQClientException {
createTopic(key, newTopic, queueNum, 0, null);
}
@Override public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag,
Map<String, String> attributes) throws MQClientException {
this.mqClientInstance.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag, attributes);
}
@Override public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
final TopicQueueMappingDetail mappingDetail,
final boolean force) throws RemotingException, InterruptedException, MQBrokerException {
this.mqClientInstance.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force, timeoutMillis);
}
@Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
}
@Override public long maxOffset(MessageQueue mq) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
}
@Override public long minOffset(MessageQueue mq) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().minOffset(mq);
}
@Override public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().earliestMsgStoreTime(mq);
}
@Override public MessageExt viewMessage(
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.mqClientInstance.getMQAdminImpl().viewMessage(msgId);
}
@Override public QueryResult queryMessage(String topic, String key, int maxNum, long begin,
long end) throws MQClientException, InterruptedException {
return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
}
@Override public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
long offset) throws RemotingException, InterruptedException, MQBrokerException {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setConsumerGroup(consumeGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis);
}
@Override public void updateNameServerConfig(final Properties properties,
final List<String> nameServers) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
this.mqClientInstance.getMQClientAPIImpl().updateNameServerConfig(properties, nameServers, timeoutMillis);
}
@Override public Map<String, Properties> getNameServerConfig(
final List<String> nameServers) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
return this.mqClientInstance.getMQClientAPIImpl().getNameServerConfig(nameServers, timeoutMillis);
}
@Override
public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index,
int count,
String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
return this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue(brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis);
}
@Override public boolean resumeCheckHalfMessage(
String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
MessageExt msg = this.viewMessage(msgId);
return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgId, timeoutMillis);
}
@Override public boolean resumeCheckHalfMessage(final String topic,
final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
MessageExt msg = this.viewMessage(topic, msgId);
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgId, timeoutMillis);
} else {
MessageClientExt msgClient = (MessageClientExt) msg;
return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgClient.getOffsetMsgId(), timeoutMillis);
}
}
@Override public void setMessageRequestMode(final String brokerAddr, final String topic, final String consumerGroup,
final MessageRequestMode mode, final int popShareQueueNum,
final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().setMessageRequestMode(brokerAddr, topic, consumerGroup, mode, popShareQueueNum, timeoutMillis);
}
@Override
public long searchOffset(final String brokerAddr, final String topicName, final int queueId, final long timestamp,
final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, topicName, queueId, timestamp, timeoutMillis);
}
public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin,
long end) throws MQClientException, InterruptedException {
return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, key, maxNum, begin, end);
}
@Override
public void resetOffsetByQueueId(final String brokerAddr, final String consumeGroup, final String topicName,
final int queueId, final long resetOffset) throws RemotingException, InterruptedException, MQBrokerException {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setConsumerGroup(consumeGroup);
requestHeader.setTopic(topicName);
requestHeader.setQueueId(queueId);
requestHeader.setCommitOffset(resetOffset);
this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis);
}
@Override public HARuntimeInfo getBrokerHAStatus(
String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getBrokerHAStatus(brokerAddr, timeoutMillis);
}
@Override public void resetMasterFlushOffset(String brokerAddr,
long masterFlushOffset) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
this.mqClientInstance.getMQClientAPIImpl().resetMasterFlushOffset(brokerAddr, masterFlushOffset);
}
@Override
public GroupForbidden updateAndGetGroupReadForbidden(String brokerAddr, String groupName, String topicName,
Boolean readable) throws RemotingException, InterruptedException, MQBrokerException {
UpdateGroupForbiddenRequestHeader requestHeader = new UpdateGroupForbiddenRequestHeader();
requestHeader.setGroup(groupName);
requestHeader.setTopic(topicName);
requestHeader.setReadable(readable);
return this.mqClientInstance.getMQClientAPIImpl().updateAndGetGroupForbidden(brokerAddr, requestHeader, timeoutMillis);
}
@Override public void deleteTopicInNameServer(Set<String> addrs, String clusterName,
String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
if (addrs == null) {
String ns = this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr();
addrs = new HashSet(Arrays.asList(ns.split(";")));
}
for (String addr : addrs) {
this.mqClientInstance.getMQClientAPIImpl().deleteTopicInNameServer(addr, clusterName, topic, timeoutMillis);
}
}
public MQClientInstance getMqClientInstance() {
return mqClientInstance;
}
}