blob: 17ec7d51e3939dc13518700cdfce23df3aafa712 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.exporter.service.client;
import com.google.common.base.Throwables;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.exporter.util.JsonUtil;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.joor.Reflect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
@Service
public class MQAdminExtImpl implements MQAdminExt {
private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
public MQAdminExtImpl() {
}
public PullResult queryMsgByOffset(MessageQueue mq, long offset) throws Exception {
return MQAdminInstance.threadLocalMQPullConsumer().pull(mq, "*", offset, 1);
}
@Override
public void updateBrokerConfig(String brokerAddr, Properties properties)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
UnsupportedEncodingException, InterruptedException, MQBrokerException {
MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties);
}
@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
}
@Override
public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateSubscriptionGroupConfig(addr, config);
}
@Override
public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = null;
try {
response = remotingClient.invokeSync(addr, request, 3000);
}
catch (Exception err) {
throw Throwables.propagate(err);
}
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
SubscriptionGroupWrapper subscriptionGroupWrapper = decode(response.getBody(), SubscriptionGroupWrapper.class);
return subscriptionGroupWrapper.getSubscriptionGroupTable().get(group);
}
default:
throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark()));
}
}
@Override
public TopicConfig examineTopicConfig(String addr, String topic) {
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response = null;
try {
response = remotingClient.invokeSync(addr, request, 3000);
}
catch (Exception err) {
throw Throwables.propagate(err);
}
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = decode(response.getBody(), TopicConfigSerializeWrapper.class);
return topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
}
default:
throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark()));
}
}
@Override
public TopicStatsTable examineTopicStats(String topic)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().examineTopicStats(topic);
}
@Override
public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
TopicList topicList = MQAdminInstance.threadLocalMQAdminExt().fetchAllTopicList();
logger.debug("op=look={}", JsonUtil.obj2String(topicList.getTopicList()));
return topicList;
}
@Override
public KVTable fetchBrokerRuntimeStats(String brokerAddr)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().fetchBrokerRuntimeStats(brokerAddr);
}
@Override
public ConsumeStats examineConsumeStats(String consumerGroup)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup);
}
@Override
public ConsumeStats examineConsumeStats(String consumerGroup, String topic)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup, topic);
}
@Override
public ClusterInfo examineBrokerClusterInfo()
throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException {
return MQAdminInstance.threadLocalMQAdminExt().examineBrokerClusterInfo();
}
@Override
public TopicRouteData examineTopicRouteInfo(String topic)
throws RemotingException, MQClientException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().examineTopicRouteInfo(topic);
}
@Override
public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException, MQBrokerException, RemotingException, MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup);
}
@Override
public ProducerConnection examineProducerConnectionInfo(String producerGroup, String topic)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().examineProducerConnectionInfo(producerGroup, topic);
}
@Override
public List<String> getNameServerAddressList() {
return MQAdminInstance.threadLocalMQAdminExt().getNameServerAddressList();
}
@Override
public int wipeWritePermOfBroker(String namesrvAddr, String brokerName)
throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().wipeWritePermOfBroker(namesrvAddr, brokerName);
}
@Override
public void putKVConfig(String namespace, String key, String value) {
MQAdminInstance.threadLocalMQAdminExt().putKVConfig(namespace, key, value);
}
@Override
public String getKVConfig(String namespace, String key)
throws RemotingException, MQClientException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().getKVConfig(namespace, key);
}
@Override
public KVTable getKVListByNamespace(String namespace)
throws RemotingException, MQClientException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().getKVListByNamespace(namespace);
}
@Override
public void deleteTopicInBroker(Set<String> addrs, String topic)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
logger.info("addrs={} topic={}", JsonUtil.obj2String(addrs), topic);
MQAdminInstance.threadLocalMQAdminExt().deleteTopicInBroker(addrs, topic);
}
@Override
public void deleteTopicInNameServer(Set<String> addrs, String topic)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().deleteTopicInNameServer(addrs, topic);
}
@Override
public void deleteSubscriptionGroup(String addr, String groupName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().deleteSubscriptionGroup(addr, groupName);
}
@Override
public void createAndUpdateKvConfig(String namespace, String key, String value)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateKvConfig(namespace, key, value);
}
@Override
public void deleteKvConfig(String namespace, String key)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().deleteKvConfig(namespace, key);
}
@Override
public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
}
@Override
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp,
boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestamp(topic, group, timestamp, isForce);
}
@Override
public void resetOffsetNew(String consumerGroup, String topic, long timestamp)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().resetOffsetNew(consumerGroup, topic, timestamp);
}
@Override
public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().getConsumeStatus(topic, group, clientAddr);
}
@Override
public void createOrUpdateOrderConf(String key, String value, boolean isCluster)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createOrUpdateOrderConf(key, value, isCluster);
}
@Override
public GroupList queryTopicConsumeByWho(String topic)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException, MQBrokerException, RemotingException, MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().queryTopicConsumeByWho(topic);
}
@Override
public boolean cleanExpiredConsumerQueue(String cluster)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueue(cluster);
}
@Override
public boolean cleanExpiredConsumerQueueByAddr(String addr)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueueByAddr(addr);
}
@Override
public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack)
throws RemotingException, MQClientException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().getConsumerRunningInfo(consumerGroup, clientId, jstack);
}
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId,
String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, msgId);
}
@Override
public List<MessageTrack> messageTrackDetail(MessageExt msg)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().messageTrackDetail(msg);
}
@Override
public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
MQAdminInstance.threadLocalMQAdminExt().cloneGroupOffset(srcGroup, destGroup, topic, isOffline);
}
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum);
}
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
throws MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag);
}
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().searchOffset(mq, timestamp);
}
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().maxOffset(mq);
}
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().minOffset(mq);
}
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().earliestMsgStoreTime(mq);
}
@Override
public MessageExt viewMessage(String msgId)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().viewMessage(msgId);
}
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().queryMessage(topic, key, maxNum, begin, end);
}
@Override
@Deprecated
public void start() throws MQClientException {
throw new IllegalStateException("thisMethod is deprecated.use org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this");
}
@Override
@Deprecated
public void shutdown() {
throw new IllegalStateException("thisMethod is deprecated.use org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this");
}
// below is 3.2.6->3.5.8 updated
@Override
public List<QueueTimeSpan> queryConsumeTimeSpan(String topic,
String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().queryConsumeTimeSpan(topic, group);
}
//MessageClientIDSetter.getNearlyTimeFromID has bug,so we subtract half a day
//next version we will remove it
//https://issues.apache.org/jira/browse/ROCKETMQ-111
//https://github.com/apache/incubator-rocketmq/pull/69
@Override
public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId);
try {
return viewMessage(msgId);
}
catch (Exception e) {
}
MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", topic, msgId, 32,
MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true).get();
if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
return qr.getMessageList().get(0);
}
else {
return null;
}
}
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String topic,
String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
}
@Override
public Properties getBrokerConfig(
String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().getBrokerConfig(brokerAddr);
}
@Override
public TopicList fetchTopicsByCLuster(
String clusterName) throws RemotingException, MQClientException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().fetchTopicsByCLuster(clusterName);
}
@Override
public boolean cleanUnusedTopic(
String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopic(cluster);
}
@Override
public boolean cleanUnusedTopicByAddr(
String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopicByAddr(addr);
}
@Override
public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().viewBrokerStatsData(brokerAddr, statsName, statsKey);
}
@Override
public Set<String> getClusterList(
String topic) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().getClusterList(topic);
}
@Override
public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder,
long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
return MQAdminInstance.threadLocalMQAdminExt().fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
}
@Override
public Set<String> getTopicClusterList(
String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
return MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
}
@Override
public SubscriptionGroupWrapper getAllSubscriptionGroup(String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().getAllSubscriptionGroup(brokerAddr, timeoutMillis);
}
@Override
public TopicConfigSerializeWrapper getAllTopicGroup(String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().getAllTopicGroup(brokerAddr, timeoutMillis);
}
@Override
public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
long offset) throws RemotingException, InterruptedException, MQBrokerException {
MQAdminInstance.threadLocalMQAdminExt().updateConsumeOffset(brokerAddr, consumeGroup, mq, offset);
}
// 4.0.0 added
@Override public void updateNameServerConfig(Properties properties,
List<String> list) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
}
@Override public Map<String, Properties> getNameServerConfig(
List<String> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
return null;
}
@Override public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic,
int queueId, long index, int count,
String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
return null;
}
}