blob: fa91f288e2d99742575ce5f75e3269ded3c00560 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.controller.impl;
import com.google.common.base.Stopwatch;
import io.openmessaging.storage.dledger.AppendFuture;
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerLeaderElector;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
import io.opentelemetry.api.common.AttributesBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.controller.elect.ElectPolicy;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.controller.helper.BrokerValidPredicate;
import org.apache.rocketmq.controller.impl.event.ControllerResult;
import org.apache.rocketmq.controller.impl.event.EventMessage;
import org.apache.rocketmq.controller.impl.event.EventSerializer;
import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
import org.apache.rocketmq.controller.metrics.ControllerMetricsConstant;
import org.apache.rocketmq.controller.metrics.ControllerMetricsManager;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_BROKER_SET;
import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_CLUSTER_NAME;
import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_DLEDGER_OPERATION;
import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_DLEDGER_OPERATION_STATUS;
import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_ELECTION_RESULT;
/**
* The implementation of controller, based on DLedger (raft).
*/
public class DLedgerController implements Controller {
private static final Logger log = LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
private final DLedgerServer dLedgerServer;
private final ControllerConfig controllerConfig;
private final DLedgerConfig dLedgerConfig;
private final ReplicasInfoManager replicasInfoManager;
private final EventScheduler scheduler;
private final EventSerializer eventSerializer;
private final RoleChangeHandler roleHandler;
private final DLedgerControllerStateMachine statemachine;
private final ScheduledExecutorService scanInactiveMasterService;
private ScheduledFuture scanInactiveMasterFuture;
private List<BrokerLifecycleListener> brokerLifecycleListeners;
// Usr for checking whether the broker is alive
private BrokerValidPredicate brokerAlivePredicate;
// use for elect a master
private ElectPolicy electPolicy;
private AtomicBoolean isScheduling = new AtomicBoolean(false);
public DLedgerController(final ControllerConfig config, final BrokerValidPredicate brokerAlivePredicate) {
this(config, brokerAlivePredicate, null, null, null, null);
}
public DLedgerController(final ControllerConfig controllerConfig,
final BrokerValidPredicate brokerAlivePredicate, final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig, final ChannelEventListener channelEventListener,
final ElectPolicy electPolicy) {
this.controllerConfig = controllerConfig;
this.eventSerializer = new EventSerializer();
this.scheduler = new EventScheduler();
this.brokerAlivePredicate = brokerAlivePredicate;
this.electPolicy = electPolicy == null ? new DefaultElectPolicy() : electPolicy;
this.dLedgerConfig = new DLedgerConfig();
this.dLedgerConfig.setGroup(controllerConfig.getControllerDLegerGroup());
this.dLedgerConfig.setPeers(controllerConfig.getControllerDLegerPeers());
this.dLedgerConfig.setSelfId(controllerConfig.getControllerDLegerSelfId());
this.dLedgerConfig.setStoreBaseDir(controllerConfig.getControllerStorePath());
this.dLedgerConfig.setMappedFileSizeForEntryData(controllerConfig.getMappedFileSize());
this.roleHandler = new RoleChangeHandler(dLedgerConfig.getSelfId());
this.replicasInfoManager = new ReplicasInfoManager(controllerConfig);
this.statemachine = new DLedgerControllerStateMachine(replicasInfoManager, this.eventSerializer, dLedgerConfig.getGroup(), dLedgerConfig.getSelfId());
// Register statemachine and role handler.
this.dLedgerServer = new DLedgerServer(dLedgerConfig, nettyServerConfig, nettyClientConfig, channelEventListener);
this.dLedgerServer.registerStateMachine(this.statemachine);
this.dLedgerServer.getDLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
this.scanInactiveMasterService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DLedgerController_scanInactiveService_"));
this.brokerLifecycleListeners = new ArrayList<>();
}
@Override
public void startup() {
this.dLedgerServer.startup();
}
@Override
public void shutdown() {
this.cancelScanInactiveFuture();
this.dLedgerServer.shutdown();
}
@Override
public void startScheduling() {
if (this.isScheduling.compareAndSet(false, true)) {
log.info("Start scheduling controller events");
this.scheduler.start();
}
}
@Override
public void stopScheduling() {
if (this.isScheduling.compareAndSet(true, false)) {
log.info("Stop scheduling controller events");
this.scheduler.shutdown(true);
}
}
@Override
public boolean isLeaderState() {
return this.roleHandler.isLeaderState();
}
public ControllerConfig getControllerConfig() {
return controllerConfig;
}
@Override
public CompletableFuture<RemotingCommand> alterSyncStateSet(AlterSyncStateSetRequestHeader request,
final SyncStateSet syncStateSet) {
return this.scheduler.appendEvent("alterSyncStateSet",
() -> this.replicasInfoManager.alterSyncStateSet(request, syncStateSet, this.brokerAlivePredicate), true);
}
@Override
public CompletableFuture<RemotingCommand> electMaster(final ElectMasterRequestHeader request) {
return this.scheduler.appendEvent("electMaster",
() -> {
ControllerResult<ElectMasterResponseHeader> electResult = this.replicasInfoManager.electMaster(request, this.electPolicy);
AttributesBuilder attributesBuilder = ControllerMetricsManager.newAttributesBuilder()
.put(LABEL_CLUSTER_NAME, request.getClusterName())
.put(LABEL_BROKER_SET, request.getBrokerName());
switch (electResult.getResponseCode()) {
case ResponseCode.SUCCESS:
ControllerMetricsManager.electionTotal.add(1,
attributesBuilder.put(LABEL_ELECTION_RESULT, ControllerMetricsConstant.ElectionResult.NEW_MASTER_ELECTED.getLowerCaseName()).build());
break;
case ResponseCode.CONTROLLER_MASTER_STILL_EXIST:
ControllerMetricsManager.electionTotal.add(1,
attributesBuilder.put(LABEL_ELECTION_RESULT, ControllerMetricsConstant.ElectionResult.KEEP_CURRENT_MASTER.getLowerCaseName()).build());
break;
case ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE:
case ResponseCode.CONTROLLER_ELECT_MASTER_FAILED:
ControllerMetricsManager.electionTotal.add(1,
attributesBuilder.put(LABEL_ELECTION_RESULT, ControllerMetricsConstant.ElectionResult.NO_MASTER_ELECTED.getLowerCaseName()).build());
break;
default:
break;
}
return electResult;
}, true);
}
@Override
public CompletableFuture<RemotingCommand> getNextBrokerId(GetNextBrokerIdRequestHeader request) {
return this.scheduler.appendEvent("getNextBrokerId", () -> this.replicasInfoManager.getNextBrokerId(request), false);
}
@Override
public CompletableFuture<RemotingCommand> applyBrokerId(ApplyBrokerIdRequestHeader request) {
return this.scheduler.appendEvent("applyBrokerId", () -> this.replicasInfoManager.applyBrokerId(request), true);
}
@Override
public CompletableFuture<RemotingCommand> registerBroker(RegisterBrokerToControllerRequestHeader request) {
return this.scheduler.appendEvent("registerSuccess", () -> this.replicasInfoManager.registerBroker(request, brokerAlivePredicate), true);
}
@Override
public CompletableFuture<RemotingCommand> getReplicaInfo(final GetReplicaInfoRequestHeader request) {
return this.scheduler.appendEvent("getReplicaInfo",
() -> this.replicasInfoManager.getReplicaInfo(request), false);
}
@Override
public CompletableFuture<RemotingCommand> getSyncStateData(List<String> brokerNames) {
return this.scheduler.appendEvent("getSyncStateData",
() -> this.replicasInfoManager.getSyncStateData(brokerNames, brokerAlivePredicate), false);
}
@Override
public void registerBrokerLifecycleListener(BrokerLifecycleListener listener) {
this.brokerLifecycleListeners.add(listener);
}
@Override
public RemotingCommand getControllerMetadata() {
final MemberState state = getMemberState();
final Map<String, String> peers = state.getPeerMap();
final StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : peers.entrySet()) {
final String peer = entry.getKey() + ":" + entry.getValue();
sb.append(peer).append(";");
}
return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, new GetMetaDataResponseHeader(
state.getGroup(), state.getLeaderId(), state.getLeaderAddr(), state.isLeader(), sb.toString()));
}
@Override
public RemotingServer getRemotingServer() {
return this.dLedgerServer.getRemotingServer();
}
@Override
public CompletableFuture<RemotingCommand> cleanBrokerData(
final CleanControllerBrokerDataRequestHeader requestHeader) {
return this.scheduler.appendEvent("cleanBrokerData",
() -> this.replicasInfoManager.cleanBrokerData(requestHeader, this.brokerAlivePredicate), true);
}
/**
* Scan all broker-set in statemachine, find that the broker-set which
* its master has been timeout but still has at least one broker keep alive with controller,
* and we trigger an election to update its state.
*/
private void scanInactiveMasterAndTriggerReelect() {
if (!this.roleHandler.isLeaderState()) {
cancelScanInactiveFuture();
return;
}
List<String> brokerSets = this.replicasInfoManager.scanNeedReelectBrokerSets(this.brokerAlivePredicate);
for (String brokerName : brokerSets) {
// Notify ControllerManager
this.brokerLifecycleListeners.forEach(listener -> listener.onBrokerInactive(null, brokerName, null));
}
}
/**
* Append the request to DLedger, and wait for DLedger to commit the request.
*/
private boolean appendToDLedgerAndWait(final AppendEntryRequest request) {
if (request != null) {
request.setGroup(this.dLedgerConfig.getGroup());
request.setRemoteId(this.dLedgerConfig.getSelfId());
Stopwatch stopwatch = Stopwatch.createStarted();
AttributesBuilder attributesBuilder = ControllerMetricsManager.newAttributesBuilder()
.put(LABEL_DLEDGER_OPERATION, ControllerMetricsConstant.DLedgerOperation.APPEND.getLowerCaseName());
try {
final AppendFuture<AppendEntryResponse> dLedgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (dLedgerFuture.getPos() == -1) {
ControllerMetricsManager.dLedgerOpTotal.add(1,
attributesBuilder.put(LABEL_DLEDGER_OPERATION_STATUS, ControllerMetricsConstant.DLedgerOperationStatus.FAILED.getLowerCaseName()).build());
return false;
}
dLedgerFuture.get(5, TimeUnit.SECONDS);
ControllerMetricsManager.dLedgerOpTotal.add(1,
attributesBuilder.put(LABEL_DLEDGER_OPERATION_STATUS, ControllerMetricsConstant.DLedgerOperationStatus.SUCCESS.getLowerCaseName()).build());
ControllerMetricsManager.dLedgerOpLatency.record(stopwatch.elapsed(TimeUnit.MICROSECONDS),
attributesBuilder.build());
} catch (Exception e) {
log.error("Failed to append entry to DLedger", e);
if (e instanceof TimeoutException) {
ControllerMetricsManager.dLedgerOpTotal.add(1,
attributesBuilder.put(LABEL_DLEDGER_OPERATION_STATUS, ControllerMetricsConstant.DLedgerOperationStatus.TIMEOUT.getLowerCaseName()).build());
} else {
ControllerMetricsManager.dLedgerOpTotal.add(1,
attributesBuilder.put(LABEL_DLEDGER_OPERATION_STATUS, ControllerMetricsConstant.DLedgerOperationStatus.FAILED.getLowerCaseName()).build());
}
return false;
}
return true;
}
return false;
}
// Only for test
public MemberState getMemberState() {
return this.dLedgerServer.getMemberState();
}
public void setBrokerAlivePredicate(BrokerValidPredicate brokerAlivePredicate) {
this.brokerAlivePredicate = brokerAlivePredicate;
}
public void setElectPolicy(ElectPolicy electPolicy) {
this.electPolicy = electPolicy;
}
private void cancelScanInactiveFuture() {
if (this.scanInactiveMasterFuture != null) {
this.scanInactiveMasterFuture.cancel(true);
this.scanInactiveMasterFuture = null;
}
}
/**
* Event handler that handle event
*/
interface EventHandler<T> {
/**
* Run the controller event
*/
void run() throws Throwable;
/**
* Return the completableFuture
*/
CompletableFuture<RemotingCommand> future();
/**
* Handle Exception.
*/
void handleException(final Throwable t);
}
/**
* Event scheduler, schedule event handler from event queue
*/
class EventScheduler extends ServiceThread {
private final BlockingQueue<EventHandler> eventQueue;
public EventScheduler() {
this.eventQueue = new LinkedBlockingQueue<>(1024);
}
@Override
public String getServiceName() {
return EventScheduler.class.getName();
}
@Override
public void run() {
log.info("Start event scheduler.");
while (!isStopped()) {
EventHandler handler;
try {
handler = this.eventQueue.poll(5, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
continue;
}
try {
if (handler != null) {
handler.run();
}
} catch (final Throwable e) {
handler.handleException(e);
}
}
}
public <T> CompletableFuture<RemotingCommand> appendEvent(final String name,
final Supplier<ControllerResult<T>> supplier, boolean isWriteEvent) {
if (isStopped() || !DLedgerController.this.roleHandler.isLeaderState()) {
final RemotingCommand command = RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_NOT_LEADER, "The controller is not in leader state");
final CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
future.complete(command);
return future;
}
final EventHandler<T> event = new ControllerEventHandler<>(name, supplier, isWriteEvent);
int tryTimes = 0;
while (true) {
try {
if (!this.eventQueue.offer(event, 5, TimeUnit.SECONDS)) {
continue;
}
return event.future();
} catch (final InterruptedException e) {
log.error("Error happen in EventScheduler when append event", e);
tryTimes++;
if (tryTimes > 3) {
return null;
}
}
}
}
}
/**
* Event handler, get events from supplier, and append events to DLedger
*/
class ControllerEventHandler<T> implements EventHandler<T> {
private final String name;
private final Supplier<ControllerResult<T>> supplier;
private final CompletableFuture<RemotingCommand> future;
private final boolean isWriteEvent;
ControllerEventHandler(final String name, final Supplier<ControllerResult<T>> supplier,
final boolean isWriteEvent) {
this.name = name;
this.supplier = supplier;
this.future = new CompletableFuture<>();
this.isWriteEvent = isWriteEvent;
}
@Override
public void run() throws Throwable {
final ControllerResult<T> result = this.supplier.get();
log.info("Event queue run event {}, get the result {}", this.name, result);
boolean appendSuccess = true;
if (!this.isWriteEvent || result.getEvents() == null || result.getEvents().isEmpty()) {
// read event, or write event with empty events in response which also equals to read event
if (DLedgerController.this.controllerConfig.isProcessReadEvent()) {
// Now the DLedger don't have the function of Read-Index or Lease-Read,
// So we still need to propose an empty request to DLedger.
final AppendEntryRequest request = new AppendEntryRequest();
request.setBody(new byte[0]);
appendSuccess = appendToDLedgerAndWait(request);
}
} else {
// write event
final List<EventMessage> events = result.getEvents();
final List<byte[]> eventBytes = new ArrayList<>(events.size());
for (final EventMessage event : events) {
if (event != null) {
final byte[] data = DLedgerController.this.eventSerializer.serialize(event);
if (data != null && data.length > 0) {
eventBytes.add(data);
}
}
}
// Append events to DLedger
if (!eventBytes.isEmpty()) {
// batch append events
final BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setBatchMsgs(eventBytes);
appendSuccess = appendToDLedgerAndWait(request);
}
}
if (appendSuccess) {
final RemotingCommand response = RemotingCommand.createResponseCommandWithHeader(result.getResponseCode(), (CommandCustomHeader) result.getResponse());
if (result.getBody() != null) {
response.setBody(result.getBody());
}
if (result.getRemark() != null) {
response.setRemark(result.getRemark());
}
this.future.complete(response);
} else {
log.error("Failed to append event to DLedger, the response is {}, try cancel the future", result.getResponse());
this.future.cancel(true);
}
}
@Override
public CompletableFuture<RemotingCommand> future() {
return this.future;
}
@Override
public void handleException(final Throwable t) {
log.error("Error happen when handle event {}", this.name, t);
this.future.completeExceptionally(t);
}
}
/**
* Role change handler, trigger the startScheduling() and stopScheduling() when role change.
*/
class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
private final String selfId;
private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER;
public RoleChangeHandler(final String selfId) {
this.selfId = selfId;
}
@Override
public void handle(long term, MemberState.Role role) {
Runnable runnable = () -> {
switch (role) {
case CANDIDATE:
ControllerMetricsManager.recordRole(role, this.currentRole);
this.currentRole = MemberState.Role.CANDIDATE;
log.info("Controller {} change role to candidate", this.selfId);
DLedgerController.this.stopScheduling();
DLedgerController.this.cancelScanInactiveFuture();
break;
case FOLLOWER:
ControllerMetricsManager.recordRole(role, this.currentRole);
this.currentRole = MemberState.Role.FOLLOWER;
log.info("Controller {} change role to Follower, leaderId:{}", this.selfId, getMemberState().getLeaderId());
DLedgerController.this.stopScheduling();
DLedgerController.this.cancelScanInactiveFuture();
break;
case LEADER: {
log.info("Controller {} change role to leader, try process a initial proposal", this.selfId);
// Because the role becomes to leader, but the memory statemachine of the controller is still in the old point,
// some committed logs have not been applied. Therefore, we must first process an empty request to DLedger,
// and after the request is committed, the controller can provide services(startScheduling).
int tryTimes = 0;
while (true) {
final AppendEntryRequest request = new AppendEntryRequest();
request.setBody(new byte[0]);
try {
if (appendToDLedgerAndWait(request)) {
ControllerMetricsManager.recordRole(role, this.currentRole);
this.currentRole = MemberState.Role.LEADER;
DLedgerController.this.startScheduling();
if (DLedgerController.this.scanInactiveMasterFuture == null) {
long scanInactiveMasterInterval = DLedgerController.this.controllerConfig.getScanInactiveMasterInterval();
DLedgerController.this.scanInactiveMasterFuture =
DLedgerController.this.scanInactiveMasterService.scheduleAtFixedRate(DLedgerController.this::scanInactiveMasterAndTriggerReelect,
scanInactiveMasterInterval, scanInactiveMasterInterval, TimeUnit.MILLISECONDS);
}
break;
}
} catch (final Throwable e) {
log.error("Error happen when controller leader append initial request to DLedger", e);
}
if (!DLedgerController.this.getMemberState().isLeader()) {
// now is not a leader
log.error("Append a initial log failed because current state is not leader");
break;
}
tryTimes++;
log.error(String.format("Controller leader append initial log failed, try %d times", tryTimes));
if (tryTimes % 3 == 0) {
log.warn("Controller leader append initial log failed too many times, please wait a while");
}
}
break;
}
}
};
this.executorService.submit(runnable);
}
@Override
public void startup() {
}
@Override
public void shutdown() {
if (this.currentRole == MemberState.Role.LEADER) {
DLedgerController.this.stopScheduling();
}
this.executorService.shutdown();
}
public boolean isLeaderState() {
return this.currentRole == MemberState.Role.LEADER;
}
}
}