blob: a383687c2590e6e9cf173f18a50b20b3098da1a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.consensus.simple;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.IStateMachine.Registry;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
import org.apache.iotdb.consensus.iot.IoTConsensus;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A simple consensus implementation, which can be used when replicaNum is 1.
*
* <p>Notice: The stateMachine needs to implement WAL itself to ensure recovery after a restart
*/
class SimpleConsensus implements IConsensus {
private final Logger logger = LoggerFactory.getLogger(SimpleConsensus.class);
private final TEndPoint thisNode;
private final int thisNodeId;
private final File storageDir;
private final IStateMachine.Registry registry;
private final Map<ConsensusGroupId, SimpleConsensusServerImpl> stateMachineMap =
new ConcurrentHashMap<>();
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
PerformanceOverviewMetrics.getInstance();
public SimpleConsensus(ConsensusConfig config, Registry registry) {
this.thisNode = config.getThisNodeEndPoint();
this.thisNodeId = config.getThisNodeId();
this.storageDir = new File(config.getStorageDir());
this.registry = registry;
}
@Override
public synchronized void start() throws IOException {
initAndRecover();
}
private void initAndRecover() throws IOException {
if (!storageDir.exists()) {
if (!storageDir.mkdirs()) {
throw new IOException(String.format("Unable to create consensus dir at %s", storageDir));
}
} else {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
for (Path path : stream) {
String[] items = path.getFileName().toString().split("_");
ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.create(
Integer.parseInt(items[0]), Integer.parseInt(items[1]));
SimpleConsensusServerImpl consensus =
new SimpleConsensusServerImpl(
new Peer(consensusGroupId, thisNodeId, thisNode),
registry.apply(consensusGroupId));
stateMachineMap.put(consensusGroupId, consensus);
consensus.start();
}
}
}
}
@Override
public synchronized void stop() throws IOException {
stateMachineMap.values().parallelStream().forEach(SimpleConsensusServerImpl::stop);
}
@Override
public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
throws ConsensusException {
SimpleConsensusServerImpl impl =
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
if (impl.isReadOnly()) {
return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
} else {
TSStatus status;
if (groupId instanceof DataRegionId) {
long startWriteTime = System.nanoTime();
status = impl.write(request);
// only record time cost for data region in Performance Overview Dashboard
PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(System.nanoTime() - startWriteTime);
} else {
status = impl.write(request);
}
return status;
}
}
@Override
public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
throws ConsensusException {
return Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId))
.read(request);
}
@SuppressWarnings("java:S2201")
@Override
public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
throws ConsensusException {
int consensusGroupSize = peers.size();
if (consensusGroupSize != 1) {
throw new IllegalPeerNumException(consensusGroupSize);
}
if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
throw new IllegalPeerEndpointException(thisNode, peers);
}
AtomicBoolean exist = new AtomicBoolean(true);
Optional.ofNullable(
stateMachineMap.computeIfAbsent(
groupId,
k -> {
exist.set(false);
String path = buildPeerDir(groupId);
File file = new File(path);
if (!file.mkdirs()) {
logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
return null;
}
return new SimpleConsensusServerImpl(peers.get(0), registry.apply(groupId));
}))
.map(
impl -> {
impl.start();
return impl;
})
.orElseThrow(
() ->
new ConsensusException(
String.format("Unable to create consensus dir for group %s", groupId)));
if (exist.get()) {
throw new ConsensusGroupAlreadyExistException(groupId);
}
}
@Override
public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException {
AtomicBoolean exist = new AtomicBoolean(false);
stateMachineMap.computeIfPresent(
groupId,
(k, v) -> {
exist.set(true);
v.stop();
FileUtils.deleteFileOrDirectory(new File(buildPeerDir(groupId)));
return null;
});
if (!exist.get()) {
throw new ConsensusGroupNotExistException(groupId);
}
}
@Override
public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException {
throw new ConsensusException("SimpleConsensus does not support membership changes");
}
@Override
public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException {
throw new ConsensusException("SimpleConsensus does not support membership changes");
}
@Override
public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws ConsensusException {
throw new ConsensusException("SimpleConsensus does not support leader transfer");
}
@Override
public void triggerSnapshot(ConsensusGroupId groupId, boolean force) throws ConsensusException {
throw new ConsensusException("SimpleConsensus does not support snapshot trigger currently");
}
@Override
public boolean isLeader(ConsensusGroupId groupId) {
return true;
}
@Override
public boolean isLeaderReady(ConsensusGroupId groupId) {
return true;
}
@Override
public long getLogicalClock(ConsensusGroupId groupId) {
return 0;
}
@Override
public Peer getLeader(ConsensusGroupId groupId) {
if (!stateMachineMap.containsKey(groupId)) {
return null;
}
return new Peer(groupId, thisNodeId, thisNode);
}
@Override
public List<ConsensusGroupId> getAllConsensusGroupIds() {
return new ArrayList<>(stateMachineMap.keySet());
}
@Override
public List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting() {
return IoTConsensus.getConsensusGroupIdsFromDir(storageDir, logger);
}
@Override
public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
return buildPeerDir(groupId);
}
@Override
public void reloadConsensusConfig(ConsensusConfig consensusConfig) {
// do not support reload consensus config for now
}
@Override
public void resetPeerList(ConsensusGroupId groupId, List<Peer> peers) throws ConsensusException {
throw new ConsensusException("SimpleConsensus does not support reset peer list");
}
private String buildPeerDir(ConsensusGroupId groupId) {
return storageDir + File.separator + groupId.getType().getValue() + "_" + groupId.getId();
}
}