blob: 2a9da3a1a6fff12787ad35c0697efc70791b40a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.consensus.multileader;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.IStateMachine.Registry;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory;
import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.SyncMultiLeaderServiceClientPoolFactory;
import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCService;
import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCServiceProcessor;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class MultiLeaderConsensus implements IConsensus {
private final Logger logger = LoggerFactory.getLogger(MultiLeaderConsensus.class);
private final TEndPoint thisNode;
private final File storageDir;
private final IStateMachine.Registry registry;
private final Map<ConsensusGroupId, MultiLeaderServerImpl> stateMachineMap =
new ConcurrentHashMap<>();
private final MultiLeaderRPCService service;
private final RegisterManager registerManager = new RegisterManager();
private final MultiLeaderConfig config;
private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager;
private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager;
public MultiLeaderConsensus(ConsensusConfig config, Registry registry) {
this.thisNode = config.getThisNode();
this.storageDir = new File(config.getStorageDir());
this.config = config.getMultiLeaderConfig();
this.registry = registry;
this.service = new MultiLeaderRPCService(thisNode, config.getMultiLeaderConfig());
this.clientManager =
new IClientManager.Factory<TEndPoint, AsyncMultiLeaderServiceClient>()
.createClientManager(
new AsyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig()));
this.syncClientManager =
new IClientManager.Factory<TEndPoint, SyncMultiLeaderServiceClient>()
.createClientManager(
new SyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig()));
}
@Override
public void start() throws IOException {
initAndRecover();
service.initAsyncedServiceImpl(new MultiLeaderRPCServiceProcessor(this));
try {
registerManager.register(service);
} catch (StartupException e) {
throw new IOException(e);
}
}
private void initAndRecover() throws IOException {
if (!storageDir.exists()) {
if (!storageDir.mkdirs()) {
logger.warn("Unable to create consensus dir at {}", storageDir);
}
} else {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
for (Path path : stream) {
String[] items = path.getFileName().toString().split("_");
ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.create(
Integer.parseInt(items[0]), Integer.parseInt(items[1]));
MultiLeaderServerImpl consensus =
new MultiLeaderServerImpl(
path.toString(),
new Peer(consensusGroupId, thisNode),
new ArrayList<>(),
registry.apply(consensusGroupId),
clientManager,
syncClientManager,
config);
stateMachineMap.put(consensusGroupId, consensus);
consensus.start();
}
}
}
}
@Override
public void stop() {
clientManager.close();
stateMachineMap.values().parallelStream().forEach(MultiLeaderServerImpl::stop);
registerManager.deregisterAll();
}
@Override
public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) {
MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
if (impl == null) {
return ConsensusWriteResponse.newBuilder()
.setException(new ConsensusGroupNotExistException(groupId))
.build();
}
TSStatus status;
if (impl.isReadOnly()) {
status = new TSStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR.getStatusCode());
status.setMessage("Fail to do non-query operations because system is read-only.");
} else if (!impl.isActive()) {
// TODO: (xingtanzjr) whether we need to define a new status to indicate the inactive status ?
status = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT);
status.setMessage("peer is inactive and not ready to receive sync log request.");
} else {
status = impl.write(request);
}
return ConsensusWriteResponse.newBuilder().setStatus(status).build();
}
@Override
public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest request) {
MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
if (impl == null) {
return ConsensusReadResponse.newBuilder()
.setException(new ConsensusGroupNotExistException(groupId))
.build();
}
return ConsensusReadResponse.newBuilder().setDataSet(impl.read(request)).build();
}
@Override
public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) {
int consensusGroupSize = peers.size();
if (consensusGroupSize == 0) {
return ConsensusGenericResponse.newBuilder()
.setException(new IllegalPeerNumException(consensusGroupSize))
.build();
}
if (!peers.contains(new Peer(groupId, thisNode))) {
return ConsensusGenericResponse.newBuilder()
.setException(new IllegalPeerEndpointException(thisNode, peers))
.build();
}
AtomicBoolean exist = new AtomicBoolean(true);
stateMachineMap.computeIfAbsent(
groupId,
k -> {
exist.set(false);
String path = buildPeerDir(groupId);
File file = new File(path);
if (!file.mkdirs()) {
logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
}
MultiLeaderServerImpl impl =
new MultiLeaderServerImpl(
path,
new Peer(groupId, thisNode),
peers,
registry.apply(groupId),
clientManager,
syncClientManager,
config);
impl.start();
return impl;
});
if (exist.get()) {
return ConsensusGenericResponse.newBuilder()
.setException(new ConsensusGroupAlreadyExistException(groupId))
.build();
}
return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
}
@Override
public ConsensusGenericResponse deletePeer(ConsensusGroupId groupId) {
AtomicBoolean exist = new AtomicBoolean(false);
stateMachineMap.computeIfPresent(
groupId,
(k, v) -> {
exist.set(true);
v.stop();
FileUtils.deleteDirectory(new File(buildPeerDir(groupId)));
return null;
});
if (!exist.get()) {
return ConsensusGenericResponse.newBuilder()
.setException(new ConsensusGroupNotExistException(groupId))
.build();
}
return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
}
@Override
public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
if (impl == null) {
return ConsensusGenericResponse.newBuilder()
.setException(new ConsensusGroupNotExistException(groupId))
.build();
}
try {
// step 1: inactive new Peer to prepare for following steps
logger.info("[MultiLeaderConsensus] inactivate new peer: {}", peer);
impl.inactivePeer(peer);
// step 2: notify all the other Peers to build the sync connection to newPeer
logger.info("[MultiLeaderConsensus] notify current peers to build sync log...");
impl.notifyPeersToBuildSyncLogChannel(peer);
// step 3: take snapshot
logger.info("[MultiLeaderConsensus] start to take snapshot...");
impl.takeSnapshot();
// step 4: transit snapshot
logger.info("[MultiLeaderConsensus] start to transit snapshot...");
impl.transitSnapshot(peer);
// step 5: let the new peer load snapshot
logger.info("[MultiLeaderConsensus] trigger new peer to load snapshot...");
impl.triggerSnapshotLoad(peer);
// step 6: active new Peer
logger.info("[MultiLeaderConsensus] activate new peer...");
impl.activePeer(peer);
} catch (ConsensusGroupAddPeerException e) {
logger.error("cannot execute addPeer() for {}", peer, e);
return ConsensusGenericResponse.newBuilder()
.setSuccess(false)
.setException(new ConsensusException(e.getMessage()))
.build();
}
return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
}
@Override
public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
if (impl == null) {
return ConsensusGenericResponse.newBuilder()
.setException(new ConsensusGroupNotExistException(groupId))
.build();
}
try {
impl.notifyPeersToRemoveSyncLogChannel(peer);
} catch (ConsensusGroupAddPeerException e) {
return ConsensusGenericResponse.newBuilder()
.setSuccess(false)
.setException(new ConsensusException(e.getMessage()))
.build();
}
return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
}
@Override
public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) {
return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
}
@Override
public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) {
return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
}
@Override
public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
}
@Override
public boolean isLeader(ConsensusGroupId groupId) {
return true;
}
@Override
public Peer getLeader(ConsensusGroupId groupId) {
if (!stateMachineMap.containsKey(groupId)) {
return null;
}
return new Peer(groupId, thisNode);
}
@Override
public List<ConsensusGroupId> getAllConsensusGroupIds() {
return new ArrayList<>(stateMachineMap.keySet());
}
public MultiLeaderServerImpl getImpl(ConsensusGroupId groupId) {
return stateMachineMap.get(groupId);
}
private String buildPeerDir(ConsensusGroupId groupId) {
return storageDir + File.separator + groupId.getType().getValue() + "_" + groupId.getId();
}
}