blob: 93ecbbd9dd2b56b1019b28ab8ba2002b8ef8f284 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.controller.processor;
import com.google.common.base.Stopwatch;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
import org.apache.rocketmq.controller.ControllerManager;
import org.apache.rocketmq.controller.metrics.ControllerMetricsConstant;
import org.apache.rocketmq.controller.metrics.ControllerMetricsManager;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_REQUEST_HANDLE_STATUS;
import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_REQUEST_TYPE;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_APPLY_BROKER_ID;
import static org.apache.rocketmq.remoting.protocol.RequestCode.BROKER_HEARTBEAT;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CLEAN_BROKER_DATA;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_ELECT_MASTER;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_REPLICA_INFO;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_SYNC_STATE_DATA;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_REGISTER_BROKER;
import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONTROLLER_CONFIG;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_NEXT_BROKER_ID;
import static org.apache.rocketmq.remoting.protocol.RequestCode.UPDATE_CONTROLLER_CONFIG;
/**
* Processor for controller request
*/
public class ControllerRequestProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
private static final int WAIT_TIMEOUT_OUT = 5;
private final ControllerManager controllerManager;
private final BrokerHeartbeatManager heartbeatManager;
public ControllerRequestProcessor(final ControllerManager controllerManager) {
this.controllerManager = controllerManager;
this.heartbeatManager = controllerManager.getHeartbeatManager();
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
if (ctx != null) {
log.debug("Receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
Stopwatch stopwatch = Stopwatch.createStarted();
try {
RemotingCommand resp = handleRequest(ctx, request);
Attributes attributes = ControllerMetricsManager.newAttributesBuilder()
.put(LABEL_REQUEST_TYPE, ControllerMetricsConstant.RequestType.getLowerCaseNameByCode(request.getCode()))
.put(LABEL_REQUEST_HANDLE_STATUS, ControllerMetricsConstant.RequestHandleStatus.SUCCESS.getLowerCaseName())
.build();
ControllerMetricsManager.requestTotal.add(1, attributes);
attributes = ControllerMetricsManager.newAttributesBuilder()
.put(LABEL_REQUEST_TYPE, ControllerMetricsConstant.RequestType.getLowerCaseNameByCode(request.getCode()))
.build();
ControllerMetricsManager.requestLatency.record(stopwatch.elapsed(TimeUnit.MICROSECONDS), attributes);
return resp;
} catch (Exception e) {
log.error("process request: {} error, ", request, e);
Attributes attributes;
if (e instanceof TimeoutException) {
attributes = ControllerMetricsManager.newAttributesBuilder()
.put(LABEL_REQUEST_TYPE, ControllerMetricsConstant.RequestType.getLowerCaseNameByCode(request.getCode()))
.put(LABEL_REQUEST_HANDLE_STATUS, ControllerMetricsConstant.RequestHandleStatus.TIMEOUT.getLowerCaseName())
.build();
} else {
attributes = ControllerMetricsManager.newAttributesBuilder()
.put(LABEL_REQUEST_TYPE, ControllerMetricsConstant.RequestType.getLowerCaseNameByCode(request.getCode()))
.put(LABEL_REQUEST_HANDLE_STATUS, ControllerMetricsConstant.RequestHandleStatus.FAILED.getLowerCaseName())
.build();
}
ControllerMetricsManager.requestTotal.add(1, attributes);
throw e;
}
}
private RemotingCommand handleRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
switch (request.getCode()) {
case CONTROLLER_ALTER_SYNC_STATE_SET:
return this.handleAlterSyncStateSet(ctx, request);
case CONTROLLER_ELECT_MASTER:
return this.handleControllerElectMaster(ctx, request);
case CONTROLLER_GET_REPLICA_INFO:
return this.handleControllerGetReplicaInfo(ctx, request);
case CONTROLLER_GET_METADATA_INFO:
return this.handleControllerGetMetadataInfo(ctx, request);
case BROKER_HEARTBEAT:
return this.handleBrokerHeartbeat(ctx, request);
case CONTROLLER_GET_SYNC_STATE_DATA:
return this.handleControllerGetSyncStateData(ctx, request);
case UPDATE_CONTROLLER_CONFIG:
return this.handleUpdateControllerConfig(ctx, request);
case GET_CONTROLLER_CONFIG:
return this.handleGetControllerConfig(ctx, request);
case CLEAN_BROKER_DATA:
return this.handleCleanBrokerData(ctx, request);
case CONTROLLER_GET_NEXT_BROKER_ID:
return this.handleGetNextBrokerId(ctx, request);
case CONTROLLER_APPLY_BROKER_ID:
return this.handleApplyBrokerId(ctx, request);
case CONTROLLER_REGISTER_BROKER:
return this.handleRegisterBroker(ctx, request);
default: {
final String error = " request type " + request.getCode() + " not supported";
return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
}
}
}
private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet);
if (future != null) {
return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
}
return RemotingCommand.createResponseCommand(null);
}
private RemotingCommand handleControllerElectMaster(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
final ElectMasterRequestHeader electMasterRequest = (ElectMasterRequestHeader) request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().electMaster(electMasterRequest);
if (future != null) {
final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
if (response.getCode() == ResponseCode.SUCCESS) {
if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(response));
}
}
return response;
}
return RemotingCommand.createResponseCommand(null);
}
private RemotingCommand handleControllerGetReplicaInfo(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
final GetReplicaInfoRequestHeader controllerRequest = (GetReplicaInfoRequestHeader) request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getReplicaInfo(controllerRequest);
if (future != null) {
return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
}
return RemotingCommand.createResponseCommand(null);
}
private RemotingCommand handleControllerGetMetadataInfo(ChannelHandlerContext ctx, RemotingCommand request) {
return this.controllerManager.getController().getControllerMetadata();
}
private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
if (requestHeader.getBrokerId() == null) {
return RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, "Heart beat with empty brokerId");
}
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
}
private RemotingCommand handleControllerGetSyncStateData(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
if (request.getBody() != null) {
final List<String> brokerNames = RemotingSerializable.decode(request.getBody(), List.class);
if (brokerNames != null && brokerNames.size() > 0) {
final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getSyncStateData(brokerNames);
if (future != null) {
return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
}
}
}
return RemotingCommand.createResponseCommand(null);
}
private RemotingCommand handleCleanBrokerData(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
final CleanControllerBrokerDataRequestHeader requestHeader = (CleanControllerBrokerDataRequestHeader) request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().cleanBrokerData(requestHeader);
if (null != future) {
return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
}
return RemotingCommand.createResponseCommand(null);
}
private RemotingCommand handleGetNextBrokerId(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
final GetNextBrokerIdRequestHeader requestHeader = (GetNextBrokerIdRequestHeader) request.decodeCommandCustomHeader(GetNextBrokerIdRequestHeader.class);
CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getNextBrokerId(requestHeader);
if (future != null) {
return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
}
return RemotingCommand.createResponseCommand(null);
}
private RemotingCommand handleApplyBrokerId(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
final ApplyBrokerIdRequestHeader requestHeader = (ApplyBrokerIdRequestHeader) request.decodeCommandCustomHeader(ApplyBrokerIdRequestHeader.class);
CompletableFuture<RemotingCommand> future = this.controllerManager.getController().applyBrokerId(requestHeader);
if (future != null) {
return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
}
return RemotingCommand.createResponseCommand(null);
}
private RemotingCommand handleRegisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
RegisterBrokerToControllerRequestHeader requestHeader = (RegisterBrokerToControllerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
CompletableFuture<RemotingCommand> future = this.controllerManager.getController().registerBroker(requestHeader);
if (future != null) {
return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
}
return RemotingCommand.createResponseCommand(null);
}
private RemotingCommand handleUpdateControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
if (ctx != null) {
log.info("updateConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
byte[] body = request.getBody();
if (body != null) {
String bodyStr;
try {
bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
} catch (UnsupportedEncodingException e) {
log.error("updateConfig byte array to string error: ", e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UnsupportedEncodingException " + e);
return response;
}
Properties properties = MixAll.string2Properties(bodyStr);
if (properties == null) {
log.error("updateConfig MixAll.string2Properties error {}", bodyStr);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("string2Properties error");
return response;
}
if (properties.containsKey("configStorePath")) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("Can not update config path");
return response;
}
this.controllerManager.getConfiguration().update(properties);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
private RemotingCommand handleGetControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
String content = this.controllerManager.getConfiguration().getAllConfigsFormatString();
if (content != null && content.length() > 0) {
try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
log.error("getConfig error, ", e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UnsupportedEncodingException " + e);
return response;
}
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
@Override
public boolean rejectRequest() {
return false;
}
}