blob: 2f4cb7b15f8e6b9ff32daf039afdda87279cbf7b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.QueryAssignmentRequestBody;
import org.apache.rocketmq.remoting.protocol.body.QueryAssignmentResponseBody;
import org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
public class QueryAssignmentProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private final ConcurrentHashMap<String, AllocateMessageQueueStrategy> name2LoadStrategy = new ConcurrentHashMap<>();
private MessageRequestModeManager messageRequestModeManager;
public QueryAssignmentProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
//register strategy
//NOTE: init with broker's log instead of init with ClientLogger.getLog();
AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
name2LoadStrategy.put(allocateMessageQueueAveragely.getName(), allocateMessageQueueAveragely);
AllocateMessageQueueAveragelyByCircle allocateMessageQueueAveragelyByCircle = new AllocateMessageQueueAveragelyByCircle();
name2LoadStrategy.put(allocateMessageQueueAveragelyByCircle.getName(), allocateMessageQueueAveragelyByCircle);
this.messageRequestModeManager = new MessageRequestModeManager(brokerController);
this.messageRequestModeManager.load();
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.QUERY_ASSIGNMENT:
return this.queryAssignment(ctx, request);
case RequestCode.SET_MESSAGE_REQUEST_MODE:
return this.setMessageRequestMode(ctx, request);
default:
break;
}
return null;
}
@Override
public boolean rejectRequest() {
return false;
}
/**
*
*/
private RemotingCommand queryAssignment(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final QueryAssignmentRequestBody requestBody = QueryAssignmentRequestBody.decode(request.getBody(), QueryAssignmentRequestBody.class);
final String topic = requestBody.getTopic();
final String consumerGroup = requestBody.getConsumerGroup();
final String clientId = requestBody.getClientId();
final MessageModel messageModel = requestBody.getMessageModel();
final String strategyName = requestBody.getStrategyName();
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final QueryAssignmentResponseBody responseBody = new QueryAssignmentResponseBody();
SetMessageRequestModeRequestBody setMessageRequestModeRequestBody = this.messageRequestModeManager.getMessageRequestMode(topic, consumerGroup);
if (setMessageRequestModeRequestBody == null) {
setMessageRequestModeRequestBody = new SetMessageRequestModeRequestBody();
setMessageRequestModeRequestBody.setTopic(topic);
setMessageRequestModeRequestBody.setConsumerGroup(consumerGroup);
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
// retry topic must be pull mode
setMessageRequestModeRequestBody.setMode(MessageRequestMode.PULL);
} else {
setMessageRequestModeRequestBody.setMode(brokerController.getBrokerConfig().getDefaultMessageRequestMode());
}
if (setMessageRequestModeRequestBody.getMode() == MessageRequestMode.POP) {
setMessageRequestModeRequestBody.setPopShareQueueNum(brokerController.getBrokerConfig().getDefaultPopShareQueueNum());
}
}
Set<MessageQueue> messageQueues = doLoadBalance(topic, consumerGroup, clientId, messageModel, strategyName, setMessageRequestModeRequestBody, ctx);
Set<MessageQueueAssignment> assignments = null;
if (messageQueues != null) {
assignments = new HashSet<>();
for (MessageQueue messageQueue : messageQueues) {
MessageQueueAssignment messageQueueAssignment = new MessageQueueAssignment();
messageQueueAssignment.setMessageQueue(messageQueue);
if (setMessageRequestModeRequestBody != null) {
messageQueueAssignment.setMode(setMessageRequestModeRequestBody.getMode());
}
assignments.add(messageQueueAssignment);
}
}
responseBody.setMessageQueueAssignments(assignments);
response.setBody(responseBody.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
/**
* Returns empty set means the client should clear all load assigned to it before, null means invalid result and the
* client should skip the update logic
*
* @param topic
* @param consumerGroup
* @param clientId
* @param messageModel
* @param strategyName
* @return the MessageQueues assigned to this client
*/
private Set<MessageQueue> doLoadBalance(final String topic, final String consumerGroup, final String clientId,
final MessageModel messageModel, final String strategyName,
SetMessageRequestModeRequestBody setMessageRequestModeRequestBody, final ChannelHandlerContext ctx) {
Set<MessageQueue> assignedQueueSet = null;
final TopicRouteInfoManager topicRouteInfoManager = this.brokerController.getTopicRouteInfoManager();
switch (messageModel) {
case BROADCASTING: {
assignedQueueSet = topicRouteInfoManager.getTopicSubscribeInfo(topic);
if (assignedQueueSet == null) {
log.warn("QueryLoad: no assignment for group[{}], the topic[{}] does not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
Set<MessageQueue> mqSet;
if (MixAll.isLmq(topic)) {
mqSet = new HashSet<>();
mqSet.add(new MessageQueue(
topic, brokerController.getBrokerConfig().getBrokerName(), (int)MixAll.LMQ_QUEUE_ID));
} else {
mqSet = topicRouteInfoManager.getTopicSubscribeInfo(topic);
}
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("QueryLoad: no assignment for group[{}], the topic[{}] does not exist.", consumerGroup, topic);
}
return null;
}
if (!brokerController.getBrokerConfig().isServerLoadBalancerEnable()) {
return mqSet;
}
List<String> cidAll = null;
ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(consumerGroup);
if (consumerGroupInfo != null) {
cidAll = consumerGroupInfo.getAllClientId();
}
if (null == cidAll) {
log.warn("QueryLoad: no assignment for group[{}] topic[{}], get consumer id list failed", consumerGroup, topic);
return null;
}
List<MessageQueue> mqAll = new ArrayList<>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
List<MessageQueue> allocateResult = null;
try {
AllocateMessageQueueStrategy allocateMessageQueueStrategy = name2LoadStrategy.get(strategyName);
if (null == allocateMessageQueueStrategy) {
log.warn("QueryLoad: unsupported strategy [{}], {}", strategyName, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
return null;
}
if (setMessageRequestModeRequestBody != null && setMessageRequestModeRequestBody.getMode() == MessageRequestMode.POP) {
allocateResult = allocate4Pop(allocateMessageQueueStrategy, consumerGroup, clientId, mqAll,
cidAll, setMessageRequestModeRequestBody.getPopShareQueueNum());
} else {
allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
}
} catch (Throwable e) {
log.error("QueryLoad: no assignment for group[{}] topic[{}], allocate message queue exception. strategy name: {}, ex: {}", consumerGroup, topic, strategyName, e);
return null;
}
assignedQueueSet = new HashSet<>();
if (allocateResult != null) {
assignedQueueSet.addAll(allocateResult);
}
break;
}
default:
break;
}
return assignedQueueSet;
}
public List<MessageQueue> allocate4Pop(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
final String consumerGroup, final String clientId, List<MessageQueue> mqAll, List<String> cidAll,
int popShareQueueNum) {
List<MessageQueue> allocateResult;
if (popShareQueueNum <= 0 || popShareQueueNum >= cidAll.size() - 1) {
//each client pop all messagequeue
allocateResult = new ArrayList<>(mqAll.size());
for (MessageQueue mq : mqAll) {
//must create new MessageQueue in case of change cache in AssignmentManager
MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1);
allocateResult.add(newMq);
}
} else {
if (cidAll.size() <= mqAll.size()) {
//consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list
allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
int index = cidAll.indexOf(clientId);
if (index >= 0) {
for (int i = 1; i <= popShareQueueNum; i++) {
index++;
index = index % cidAll.size();
List<MessageQueue> tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index), mqAll, cidAll);
allocateResult.addAll(tmp);
}
}
} else {
//make sure each cid is assigned
allocateResult = allocate(consumerGroup, clientId, mqAll, cidAll);
}
}
return allocateResult;
}
private List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (StringUtils.isBlank(currentCID)) {
throw new IllegalArgumentException("currentCID is empty");
}
if (CollectionUtils.isEmpty(mqAll)) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (CollectionUtils.isEmpty(cidAll)) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
int index = cidAll.indexOf(currentCID);
result.add(mqAll.get(index % mqAll.size()));
return result;
}
private RemotingCommand setMessageRequestMode(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final SetMessageRequestModeRequestBody requestBody = SetMessageRequestModeRequestBody.decode(request.getBody(), SetMessageRequestModeRequestBody.class);
final String topic = requestBody.getTopic();
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("retry topic is not allowed to set mode");
return response;
}
final String consumerGroup = requestBody.getConsumerGroup();
this.messageRequestModeManager.setMessageRequestMode(topic, consumerGroup, requestBody);
this.messageRequestModeManager.persist();
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
public MessageRequestModeManager getMessageRequestModeManager() {
return messageRequestModeManager;
}
}