blob: e2d02b75338df2ad21634caf12a6985dc17827e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.consensus.iot;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.consensus.iot.log.GetConsensusReqReaderPlan;
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
import org.apache.iotdb.consensus.iot.snapshot.IoTConsensusRateLimiter;
import org.apache.iotdb.consensus.iot.snapshot.SnapshotFragmentReader;
import org.apache.iotdb.consensus.iot.thrift.TActivatePeerReq;
import org.apache.iotdb.consensus.iot.thrift.TActivatePeerRes;
import org.apache.iotdb.consensus.iot.thrift.TBuildSyncLogChannelReq;
import org.apache.iotdb.consensus.iot.thrift.TBuildSyncLogChannelRes;
import org.apache.iotdb.consensus.iot.thrift.TCleanupTransferredSnapshotReq;
import org.apache.iotdb.consensus.iot.thrift.TCleanupTransferredSnapshotRes;
import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerReq;
import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerRes;
import org.apache.iotdb.consensus.iot.thrift.TRemoveSyncLogChannelReq;
import org.apache.iotdb.consensus.iot.thrift.TRemoveSyncLogChannelRes;
import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentReq;
import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentRes;
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteReq;
import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class IoTConsensusServerImpl {
private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
private static final String CONFIGURATION_TMP_FILE_NAME = "configuration.dat.tmp";
public static final String SNAPSHOT_DIR_NAME = "snapshot";
private static final Pattern SNAPSHOT_INDEX_PATTEN = Pattern.compile(".*[^\\d](?=(\\d+))");
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
PerformanceOverviewMetrics.getInstance();
private final Logger logger = LoggerFactory.getLogger(IoTConsensusServerImpl.class);
private final Peer thisNode;
private final IStateMachine stateMachine;
private final ConcurrentHashMap<Integer, SyncLogCacheQueue> cacheQueueMap;
private final Lock stateMachineLock = new ReentrantLock();
private final Condition stateMachineCondition = stateMachineLock.newCondition();
private final String storageDir;
private final List<Peer> configuration;
private final AtomicLong searchIndex;
private final LogDispatcher logDispatcher;
private final IoTConsensusConfig config;
private final ConsensusReqReader consensusReqReader;
private volatile boolean active;
private String newSnapshotDirName;
private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
private final IoTConsensusServerMetrics ioTConsensusServerMetrics;
private final String consensusGroupId;
private final ScheduledExecutorService backgroundTaskService;
private final IoTConsensusRateLimiter ioTConsensusRateLimiter =
IoTConsensusRateLimiter.getInstance();
public IoTConsensusServerImpl(
String storageDir,
Peer thisNode,
List<Peer> configuration,
IStateMachine stateMachine,
ScheduledExecutorService backgroundTaskService,
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager,
IoTConsensusConfig config) {
this.active = true;
this.storageDir = storageDir;
this.thisNode = thisNode;
this.stateMachine = stateMachine;
this.cacheQueueMap = new ConcurrentHashMap<>();
this.syncClientManager = syncClientManager;
this.configuration = configuration;
if (configuration.isEmpty()) {
recoverConfiguration();
} else {
persistConfiguration();
}
this.backgroundTaskService = backgroundTaskService;
this.config = config;
this.consensusGroupId = thisNode.getGroupId().toString();
consensusReqReader = (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan());
this.searchIndex = new AtomicLong(consensusReqReader.getCurrentSearchIndex());
this.ioTConsensusServerMetrics = new IoTConsensusServerMetrics(this);
this.logDispatcher = new LogDispatcher(this, clientManager);
// Since the underlying wal does not persist safelyDeletedSearchIndex, IoTConsensus needs to
// update wal with its syncIndex recovered from the consensus layer when initializing.
// This prevents wal from being piled up if the safelyDeletedSearchIndex is not updated after
// the restart and Leader migration occurs
checkAndUpdateSafeDeletedSearchIndex();
// see message in logs for details
checkAndUpdateSearchIndex();
}
public IStateMachine getStateMachine() {
return stateMachine;
}
public void start() {
MetricService.getInstance().addMetricSet(this.ioTConsensusServerMetrics);
stateMachine.start();
logDispatcher.start();
}
public void stop() {
logDispatcher.stop();
stateMachine.stop();
MetricService.getInstance().removeMetricSet(this.ioTConsensusServerMetrics);
}
/**
* records the index of the log and writes locally, and then asynchronous replication is
* performed.
*/
public TSStatus write(IConsensusRequest request) {
long consensusWriteStartTime = System.nanoTime();
stateMachineLock.lock();
try {
long getStateMachineLockTime = System.nanoTime();
// statistic the time of acquiring stateMachine lock
ioTConsensusServerMetrics.recordGetStateMachineLockTime(
getStateMachineLockTime - consensusWriteStartTime);
if (needBlockWrite()) {
logger.info("[Throttle Down] index:{}, safeIndex:{}", getSearchIndex(), getMinSyncIndex());
try {
boolean timeout =
!stateMachineCondition.await(
config.getReplication().getThrottleTimeOutMs(), TimeUnit.MILLISECONDS);
if (timeout) {
return RpcUtils.getStatus(
TSStatusCode.WRITE_PROCESS_REJECT,
String.format(
"The write is rejected because the wal directory size has reached the "
+ "threshold %d bytes. You may need to adjust the flush policy of the "
+ "storage storageengine or the IoTConsensus synchronization parameter",
config.getReplication().getWalThrottleThreshold()));
}
} catch (InterruptedException e) {
logger.error("Failed to throttle down because ", e);
Thread.currentThread().interrupt();
}
}
long writeToStateMachineStartTime = System.nanoTime();
// statistic the time of checking write block
ioTConsensusServerMetrics.recordCheckingBeforeWriteTime(
writeToStateMachineStartTime - getStateMachineLockTime);
IndexedConsensusRequest indexedConsensusRequest =
buildIndexedConsensusRequestForLocalRequest(request);
if (indexedConsensusRequest.getSearchIndex() % 100000 == 0) {
logger.info(
"DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}",
thisNode.getGroupId(),
getMinSyncIndex(),
indexedConsensusRequest.getSearchIndex());
}
IConsensusRequest planNode = stateMachine.deserializeRequest(indexedConsensusRequest);
long startWriteTime = System.nanoTime();
TSStatus result = stateMachine.write(planNode);
PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(System.nanoTime() - startWriteTime);
long writeToStateMachineEndTime = System.nanoTime();
// statistic the time of writing request into stateMachine
ioTConsensusServerMetrics.recordWriteStateMachineTime(
writeToStateMachineEndTime - writeToStateMachineStartTime);
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// The index is used when constructing batch in LogDispatcher. If its value
// increases but the corresponding request does not exist or is not put into
// the queue, the dispatcher will try to find the request in WAL. This behavior
// is not expected and will slow down the preparation speed for batch.
// So we need to use the lock to ensure the `offer()` and `incrementAndGet()` are
// in one transaction.
synchronized (searchIndex) {
logDispatcher.offer(indexedConsensusRequest);
searchIndex.incrementAndGet();
}
// statistic the time of offering request into queue
ioTConsensusServerMetrics.recordOfferRequestToQueueTime(
System.nanoTime() - writeToStateMachineEndTime);
} else {
logger.debug(
"{}: write operation failed. searchIndex: {}. Code: {}",
thisNode.getGroupId(),
indexedConsensusRequest.getSearchIndex(),
result.getCode());
}
// statistic the time of total write process
ioTConsensusServerMetrics.recordConsensusWriteTime(
System.nanoTime() - consensusWriteStartTime);
return result;
} finally {
stateMachineLock.unlock();
}
}
public DataSet read(IConsensusRequest request) {
return stateMachine.read(request);
}
public void takeSnapshot() throws ConsensusGroupModifyPeerException {
try {
long newSnapshotIndex = getLatestSnapshotIndex() + 1;
newSnapshotDirName =
String.format(
"%s_%s_%d", SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), newSnapshotIndex);
File snapshotDir = new File(storageDir, newSnapshotDirName);
if (snapshotDir.exists()) {
FileUtils.deleteDirectory(snapshotDir);
}
if (!snapshotDir.mkdirs()) {
throw new ConsensusGroupModifyPeerException(
String.format("%s: cannot mkdir for snapshot", thisNode.getGroupId()));
}
if (!stateMachine.takeSnapshot(snapshotDir)) {
throw new ConsensusGroupModifyPeerException("unknown error when taking snapshot");
}
clearOldSnapshot();
} catch (IOException e) {
throw new ConsensusGroupModifyPeerException("error when taking snapshot", e);
}
}
public void transmitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
File snapshotDir = new File(storageDir, newSnapshotDirName);
List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
AtomicLong snapshotSizeSumAtomic = new AtomicLong();
snapshotPaths.forEach(
snapshotPath -> {
try {
snapshotSizeSumAtomic.addAndGet(Files.size(snapshotPath));
} catch (IOException e) {
logger.error(
"[SNAPSHOT TRANSMISSION] Calculate snapshot file's size fail: {}", snapshotPath, e);
}
});
final long snapshotSizeSum = snapshotSizeSumAtomic.get();
long transitedSnapshotSizeSum = 0;
long transitedFilesNum = 0;
long startTime = System.nanoTime();
logger.info(
"[SNAPSHOT TRANSMISSION] Start to transmit snapshots ({} files, total size {}) from dir {}",
snapshotPaths.size(),
FileUtils.byteCountToDisplaySize(snapshotSizeSum),
snapshotDir);
try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
for (Path path : snapshotPaths) {
SnapshotFragmentReader reader = new SnapshotFragmentReader(newSnapshotDirName, path);
try {
while (reader.hasNext()) {
// TODO: zero copy ?
TSendSnapshotFragmentReq req = reader.next().toTSendSnapshotFragmentReq();
req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId());
ioTConsensusRateLimiter.acquireTransitDataSizeWithRateLimiter(req.getChunkLength());
TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req);
if (!isSuccess(res.getStatus())) {
throw new ConsensusGroupModifyPeerException(
String.format(
"[SNAPSHOT TRANSMISSION] Error when transmitting snapshot fragment to %s",
targetPeer));
}
}
transitedSnapshotSizeSum += reader.getTotalReadSize();
transitedFilesNum++;
logger.info(
"[SNAPSHOT TRANSMISSION] The overall progress for dir {}: files {}/{} done, size {}/{} done, time {} passed",
newSnapshotDirName,
transitedFilesNum,
snapshotPaths.size(),
FileUtils.byteCountToDisplaySize(transitedSnapshotSizeSum),
FileUtils.byteCountToDisplaySize(snapshotSizeSum),
CommonDateTimeUtils.convertMillisecondToDurationStr(
(System.nanoTime() - startTime) / 1_000_000));
} finally {
reader.close();
}
}
} catch (Exception e) {
throw new ConsensusGroupModifyPeerException(
String.format("[SNAPSHOT TRANSMISSION] Error when send snapshot file to %s", targetPeer),
e);
}
logger.info(
"[SNAPSHOT TRANSMISSION] After {}, successfully transmit all snapshots from dir {}",
CommonDateTimeUtils.convertMillisecondToDurationStr(
(System.nanoTime() - startTime) / 1_000_000),
snapshotDir);
}
public void receiveSnapshotFragment(
String snapshotId, String originalFilePath, ByteBuffer fileChunk)
throws ConsensusGroupModifyPeerException {
try {
String targetFilePath = calculateSnapshotPath(snapshotId, originalFilePath);
File targetFile = new File(storageDir, targetFilePath);
Path parentDir = Paths.get(targetFile.getParent());
if (!Files.exists(parentDir)) {
Files.createDirectories(parentDir);
}
try (FileOutputStream fos = new FileOutputStream(targetFile.getAbsolutePath(), true);
FileChannel channel = fos.getChannel()) {
channel.write(fileChunk.slice());
}
} catch (IOException e) {
throw new ConsensusGroupModifyPeerException(
String.format("error when receiving snapshot %s", snapshotId), e);
}
}
private String calculateSnapshotPath(String snapshotId, String originalFilePath)
throws ConsensusGroupModifyPeerException {
if (!originalFilePath.contains(snapshotId)) {
throw new ConsensusGroupModifyPeerException(
String.format(
"invalid snapshot file. snapshotId: %s, filePath: %s", snapshotId, originalFilePath));
}
return originalFilePath.substring(originalFilePath.indexOf(snapshotId));
}
private long getLatestSnapshotIndex() {
long snapShotIndex = 0;
File directory = new File(storageDir);
File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME));
if (versionFiles == null || versionFiles.length == 0) {
return snapShotIndex;
}
for (File file : versionFiles) {
snapShotIndex =
Math.max(
snapShotIndex,
Long.parseLong(SNAPSHOT_INDEX_PATTEN.matcher(file.getName()).replaceAll("")));
}
return snapShotIndex;
}
private void clearOldSnapshot() {
File directory = new File(storageDir);
File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME));
if (versionFiles == null || versionFiles.length == 0) {
logger.error(
"Can not find any snapshot dir after build a new snapshot for group {}",
thisNode.getGroupId());
return;
}
for (File file : versionFiles) {
if (!file.getName().equals(newSnapshotDirName)) {
try {
FileUtils.deleteDirectory(file);
} catch (IOException e) {
logger.error("Delete old snapshot dir {} failed", file.getAbsolutePath(), e);
}
}
}
}
public void loadSnapshot(String snapshotId) {
// TODO: (xingtanzjr) throw exception if the snapshot load failed
stateMachine.loadSnapshot(new File(storageDir, snapshotId));
}
@FunctionalInterface
public interface ThrowableFunction<T, R> {
R apply(T t) throws Exception;
}
public void inactivePeer(Peer peer, boolean forDeletionPurpose)
throws ConsensusGroupModifyPeerException {
try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(peer.getEndpoint())) {
try {
TInactivatePeerRes res =
client.inactivatePeer(
new TInactivatePeerReq(peer.getGroupId().convertToTConsensusGroupId())
.setForDeletionPurpose(forDeletionPurpose));
if (!isSuccess(res.status)) {
throw new ConsensusGroupModifyPeerException(
String.format("error when inactivating %s. %s", peer, res.getStatus()));
}
} catch (Exception e) {
throw new ConsensusGroupModifyPeerException(
String.format("error when inactivating %s", peer), e);
}
} catch (ClientManagerException e) {
throw new ConsensusGroupModifyPeerException(e);
}
}
public void triggerSnapshotLoad(Peer peer) throws ConsensusGroupModifyPeerException {
try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(peer.getEndpoint())) {
TTriggerSnapshotLoadRes res =
client.triggerSnapshotLoad(
new TTriggerSnapshotLoadReq(
thisNode.getGroupId().convertToTConsensusGroupId(), newSnapshotDirName));
if (!isSuccess(res.status)) {
throw new ConsensusGroupModifyPeerException(
String.format("error when triggering snapshot load %s. %s", peer, res.getStatus()));
}
} catch (Exception e) {
throw new ConsensusGroupModifyPeerException(
String.format("error when activating %s", peer), e);
}
}
public void activePeer(Peer peer) throws ConsensusGroupModifyPeerException {
try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(peer.getEndpoint())) {
TActivatePeerRes res =
client.activatePeer(new TActivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
if (!isSuccess(res.status)) {
throw new ConsensusGroupModifyPeerException(
String.format("error when activating %s. %s", peer, res.getStatus()));
}
} catch (Exception e) {
throw new ConsensusGroupModifyPeerException(
String.format("error when activating %s", peer), e);
}
}
public void notifyPeersToBuildSyncLogChannel(Peer targetPeer)
throws ConsensusGroupModifyPeerException {
// The configuration will be modified during iterating because we will add the targetPeer to
// configuration
List<Peer> currentMembers = new ArrayList<>(this.configuration);
logger.info(
"[IoTConsensus] notify current peers to build sync log. group member: {}, target: {}",
currentMembers,
targetPeer);
for (Peer peer : currentMembers) {
logger.info("[IoTConsensus] build sync log channel from {}", peer);
if (peer.equals(thisNode)) {
// use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the
// snapshot produced by thisNode
buildSyncLogChannel(targetPeer, searchIndex.get());
} else {
// use RPC to tell other peers to build sync log channel to target peer
try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(peer.getEndpoint())) {
TBuildSyncLogChannelRes res =
client.buildSyncLogChannel(
new TBuildSyncLogChannelReq(
targetPeer.getGroupId().convertToTConsensusGroupId(),
targetPeer.getEndpoint(),
targetPeer.getNodeId()));
if (!isSuccess(res.status)) {
throw new ConsensusGroupModifyPeerException(
String.format("build sync log channel failed from %s to %s", peer, targetPeer));
}
} catch (Exception e) {
// We use a simple way to deal with the connection issue when notifying other nodes to
// build sync log. If the un-responsible peer is the peer which will be removed, we cannot
// suspend the operation and need to skip it. In order to keep the mechanism works fine,
// we will skip the peer which cannot be reached.
// If following error message appears, the un-responsible peer should be removed manually
// after current operation
// TODO: (xingtanzjr) design more reliable way for IoTConsensus
logger.error(
"cannot notify {} to build sync log channel. "
+ "Please check the status of this node manually",
peer,
e);
}
}
}
}
public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer)
throws ConsensusGroupModifyPeerException {
// The configuration will be modified during iterating because we will add the targetPeer to
// configuration
ImmutableList<Peer> currentMembers = ImmutableList.copyOf(this.configuration);
for (Peer peer : currentMembers) {
if (peer.equals(targetPeer)) {
// if the targetPeer is the same as current peer, skip it because removing itself is illegal
continue;
}
if (peer.equals(thisNode)) {
removeSyncLogChannel(targetPeer);
} else {
// use RPC to tell other peers to build sync log channel to target peer
try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(peer.getEndpoint())) {
TRemoveSyncLogChannelRes res =
client.removeSyncLogChannel(
new TRemoveSyncLogChannelReq(
targetPeer.getGroupId().convertToTConsensusGroupId(),
targetPeer.getEndpoint(),
targetPeer.getNodeId()));
if (!isSuccess(res.status)) {
throw new ConsensusGroupModifyPeerException(
String.format("remove sync log channel failed from %s to %s", peer, targetPeer));
}
} catch (Exception e) {
throw new ConsensusGroupModifyPeerException(
String.format("error when removing sync log channel to %s", peer), e);
}
}
}
}
public void waitTargetPeerUntilSyncLogCompleted(Peer targetPeer)
throws ConsensusGroupModifyPeerException {
long checkIntervalInMs = 10_000L;
try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
while (true) {
TWaitSyncLogCompleteRes res =
client.waitSyncLogComplete(
new TWaitSyncLogCompleteReq(targetPeer.getGroupId().convertToTConsensusGroupId()));
if (res.complete) {
logger.info(
"[WAIT LOG SYNC] {} SyncLog is completed. TargetIndex: {}, CurrentSyncIndex: {}",
targetPeer,
res.searchIndex,
res.safeIndex);
return;
}
logger.info(
"[WAIT LOG SYNC] {} SyncLog is still in progress. TargetIndex: {}, CurrentSyncIndex: {}",
targetPeer,
res.searchIndex,
res.safeIndex);
Thread.sleep(checkIntervalInMs);
}
} catch (ClientManagerException | TException e) {
throw new ConsensusGroupModifyPeerException(
String.format(
"error when waiting %s to complete SyncLog. %s", targetPeer, e.getMessage()),
e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ConsensusGroupModifyPeerException(
String.format(
"thread interrupted when waiting %s to complete SyncLog. %s",
targetPeer, e.getMessage()),
e);
}
}
private boolean isSuccess(TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
/**
* build SyncLog channel with safeIndex as the default initial sync index.
*
* @throws ConsensusGroupModifyPeerException
*/
public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException {
buildSyncLogChannel(targetPeer, getMinSyncIndex());
}
public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
throws ConsensusGroupModifyPeerException {
KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
// step 1, build sync channel in LogDispatcher
logger.info(
"[IoTConsensus] build sync log channel to {} with initialSyncIndex {}",
targetPeer,
initialSyncIndex);
logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex);
// step 2, update configuration
configuration.add(targetPeer);
// step 3, persist configuration
persistConfiguration();
logger.info("[IoTConsensus] persist new configuration: {}", configuration);
}
public void removeSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException {
try {
// step 1, remove sync channel in LogDispatcher
logDispatcher.removeLogDispatcherThread(targetPeer);
logger.info("[IoTConsensus] log dispatcher to {} removed and cleanup", targetPeer);
// step 2, update configuration
configuration.remove(targetPeer);
checkAndUpdateSafeDeletedSearchIndex();
// step 3, persist configuration
persistConfiguration();
logger.info("[IoTConsensus] configuration updated to {}", this.configuration);
} catch (IOException e) {
throw new ConsensusGroupModifyPeerException("error when remove LogDispatcherThread", e);
}
}
public void persistConfiguration() {
try {
removeDuplicateConfiguration();
renameTmpConfigurationFileToRemoveSuffix();
serializeConfigurationAndFsyncToDisk();
deleteConfiguration();
renameTmpConfigurationFileToRemoveSuffix();
} catch (IOException e) {
// TODO: (xingtanzjr) need to handle the IOException because the IoTConsensus won't
// work expectedly
// if the exception occurs
logger.error("Unexpected error occurs when persisting configuration", e);
}
}
public void recoverConfiguration() {
try {
Path tmpConfigurationPath =
Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
Path configurationPath =
Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
// If the tmpConfigurationPath exists, it means the `persistConfigurationUpdate` is
// interrupted
// unexpectedly, we need substitute configuration with tmpConfiguration file
if (Files.exists(tmpConfigurationPath)) {
Files.deleteIfExists(configurationPath);
Files.move(tmpConfigurationPath, configurationPath);
}
if (Files.exists(configurationPath)) {
recoverFromOldConfigurationFile(configurationPath);
} else {
// recover from split configuration file
Path dirPath = Paths.get(storageDir);
List<Peer> tmpPeerList = getConfiguration(dirPath, CONFIGURATION_TMP_FILE_NAME);
configuration.addAll(tmpPeerList);
List<Peer> peerList = getConfiguration(dirPath, CONFIGURATION_FILE_NAME);
for (Peer peer : peerList) {
if (!configuration.contains(peer)) {
configuration.add(peer);
}
}
persistConfiguration();
}
logger.info("Recover IoTConsensus server Impl, configuration: {}", configuration);
} catch (IOException e) {
logger.error("Unexpected error occurs when recovering configuration", e);
}
}
// @Compatibility
private void recoverFromOldConfigurationFile(Path oldConfigurationPath) throws IOException {
// recover from old configuration file
ByteBuffer buffer = ByteBuffer.wrap(Files.readAllBytes(oldConfigurationPath));
int size = buffer.getInt();
for (int i = 0; i < size; i++) {
configuration.add(Peer.deserialize(buffer));
}
persistConfiguration();
}
public static String generateConfigurationDatFileName(int nodeId, String suffix) {
return nodeId + "_" + suffix;
}
private List<Peer> getConfiguration(Path dirPath, String configurationFileName)
throws IOException {
ByteBuffer buffer;
List<Peer> tmpConfiguration = new ArrayList<>();
Path[] files =
Files.walk(dirPath)
.filter(Files::isRegularFile)
.filter(filePath -> filePath.getFileName().toString().contains(configurationFileName))
.toArray(Path[]::new);
for (Path file : files) {
buffer = ByteBuffer.wrap(Files.readAllBytes(file));
tmpConfiguration.add(Peer.deserialize(buffer));
}
return tmpConfiguration;
}
public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
IConsensusRequest request) {
if (request instanceof ComparableConsensusRequest) {
final IoTProgressIndex iotProgressIndex =
new IoTProgressIndex(thisNode.getNodeId(), searchIndex.get() + 1);
((ComparableConsensusRequest) request).setProgressIndex(iotProgressIndex);
}
return new IndexedConsensusRequest(searchIndex.get() + 1, Collections.singletonList(request));
}
public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
long syncIndex, List<IConsensusRequest> requests) {
return new IndexedConsensusRequest(
ConsensusReqReader.DEFAULT_SEARCH_INDEX, syncIndex, requests);
}
/**
* In the case of multiple copies, the minimum synchronization index is selected. In the case of
* single copies, the current index is selected
*/
public long getMinSyncIndex() {
return logDispatcher.getMinSyncIndex().orElseGet(searchIndex::get);
}
public long getMinFlushedSyncIndex() {
return logDispatcher.getMinFlushedSyncIndex().orElseGet(searchIndex::get);
}
public String getStorageDir() {
return storageDir;
}
public Peer getThisNode() {
return thisNode;
}
public List<Peer> getConfiguration() {
return configuration;
}
public long getSearchIndex() {
return searchIndex.get();
}
public long getSyncLag() {
long minSyncIndex = getMinSyncIndex();
return getSearchIndex() - minSyncIndex;
}
public IoTConsensusConfig getConfig() {
return config;
}
public long getLogEntriesFromWAL() {
return logDispatcher.getLogEntriesFromWAL();
}
public long getLogEntriesFromQueue() {
return logDispatcher.getLogEntriesFromQueue();
}
public boolean needBlockWrite() {
return consensusReqReader.getTotalSize() > config.getReplication().getWalThrottleThreshold();
}
public boolean unblockWrite() {
return consensusReqReader.getTotalSize() < config.getReplication().getWalThrottleThreshold();
}
public void signal() {
stateMachineLock.lock();
try {
stateMachineCondition.signalAll();
} finally {
stateMachineLock.unlock();
}
}
public AtomicLong getIndexObject() {
return searchIndex;
}
public ScheduledExecutorService getBackgroundTaskService() {
return backgroundTaskService;
}
public LogDispatcher getLogDispatcher() {
return logDispatcher;
}
public IoTConsensusServerMetrics getIoTConsensusServerMetrics() {
return this.ioTConsensusServerMetrics;
}
public boolean isReadOnly() {
return stateMachine.isReadOnly();
}
public boolean isActive() {
return active;
}
public void setActive(boolean active) {
logger.info("set {} active status to {}", this.thisNode, active);
this.active = active;
}
public void cleanupRemoteSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
TCleanupTransferredSnapshotReq req =
new TCleanupTransferredSnapshotReq(
targetPeer.getGroupId().convertToTConsensusGroupId(), newSnapshotDirName);
TCleanupTransferredSnapshotRes res = client.cleanupTransferredSnapshot(req);
if (!isSuccess(res.getStatus())) {
throw new ConsensusGroupModifyPeerException(
String.format(
"cleanup remote snapshot failed of %s ,status is %s", targetPeer, res.getStatus()));
}
} catch (Exception e) {
throw new ConsensusGroupModifyPeerException(
String.format("cleanup remote snapshot failed of %s", targetPeer), e);
}
}
public void cleanupTransferredSnapshot(String snapshotId)
throws ConsensusGroupModifyPeerException {
File snapshotDir = new File(storageDir, snapshotId);
if (snapshotDir.exists()) {
try {
FileUtils.deleteDirectory(snapshotDir);
} catch (IOException e) {
throw new ConsensusGroupModifyPeerException(e);
}
}
}
/**
* We should set safelyDeletedSearchIndex to searchIndex before addPeer to avoid potential data
* lost.
*/
public void checkAndLockSafeDeletedSearchIndex() {
if (configuration.size() == 1) {
consensusReqReader.setSafelyDeletedSearchIndex(searchIndex.get());
}
}
/**
* If there is only one replica, set it to Long.MAX_VALUE.、 If there are multiple replicas, get
* the latest SafelyDeletedSearchIndex again. This enables wal to be deleted in a timely manner.
*/
public void checkAndUpdateSafeDeletedSearchIndex() {
if (configuration.size() == 1) {
consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
} else {
consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
}
}
public void checkAndUpdateSearchIndex() {
long currentSearchIndex = searchIndex.get();
long safelyDeletedSearchIndex = getMinFlushedSyncIndex();
if (currentSearchIndex < safelyDeletedSearchIndex) {
logger.warn(
"The searchIndex for this region({}) is smaller than the safelyDeletedSearchIndex when "
+ "the node is restarted, which means that the data of the current region is not flushed "
+ "by the wal, but has been synchronized to other nodes. At this point, "
+ "different replicas have been inconsistent and cannot be automatically recovered. "
+ "To prevent subsequent logs from marking smaller searchIndex and exacerbating the "
+ "inconsistency, we manually set the searchIndex({}) to safelyDeletedSearchIndex({}) "
+ "here to reduce the impact of this problem in the future",
consensusGroupId,
currentSearchIndex,
safelyDeletedSearchIndex);
searchIndex.set(safelyDeletedSearchIndex);
}
}
public TSStatus syncLog(int sourcePeerId, IConsensusRequest request) {
return cacheQueueMap
.computeIfAbsent(sourcePeerId, SyncLogCacheQueue::new)
.cacheAndInsertLatestNode((DeserializedBatchIndexedConsensusRequest) request);
}
public String getConsensusGroupId() {
return consensusGroupId;
}
private void serializeConfigurationAndFsyncToDisk() throws IOException {
for (Peer peer : configuration) {
String peerConfigurationFileName =
generateConfigurationDatFileName(peer.getNodeId(), CONFIGURATION_TMP_FILE_NAME);
FileOutputStream fileOutputStream =
new FileOutputStream(new File(storageDir, peerConfigurationFileName));
try (DataOutputStream outputStream = new DataOutputStream(fileOutputStream)) {
peer.serialize(outputStream);
} finally {
try {
fileOutputStream.flush();
fileOutputStream.getFD().sync();
} catch (IOException ignore) {
// ignore sync exception
}
}
}
}
private void renameTmpConfigurationFileToRemoveSuffix() throws IOException {
try (Stream<Path> stream = Files.walk(Paths.get(storageDir))) {
List<Path> paths =
stream
.filter(Files::isRegularFile)
.filter(
filePath ->
filePath.getFileName().toString().endsWith(CONFIGURATION_TMP_FILE_NAME))
.collect(Collectors.toList());
for (Path filePath : paths) {
String targetPath =
filePath.toString().replace(CONFIGURATION_TMP_FILE_NAME, CONFIGURATION_FILE_NAME);
File targetFile = new File(targetPath);
if (targetFile.exists()) {
try {
Files.delete(targetFile.toPath());
} catch (IOException e) {
logger.error("Unexpected error occurs when delete file: {}", targetPath);
}
}
if (!filePath.toFile().renameTo(targetFile)) {
logger.error("Unexpected error occurs when rename file: {} -> {}", filePath, targetPath);
}
}
}
}
private void deleteConfiguration() throws IOException {
try (Stream<Path> stream = Files.walk(Paths.get(storageDir))) {
stream
.filter(Files::isRegularFile)
.filter(filePath -> filePath.getFileName().toString().endsWith(CONFIGURATION_FILE_NAME))
.forEach(
filePath -> {
try {
Files.delete(filePath);
} catch (IOException e) {
logger.error(
"Unexpected error occurs when deleting old configuration file {}",
filePath,
e);
}
});
}
}
public void removeDuplicateConfiguration() {
Set<Peer> seen = new HashSet<>();
Iterator<Peer> it = configuration.iterator();
while (it.hasNext()) {
Peer peer = it.next();
if (!seen.add(peer)) {
it.remove();
}
}
}
/**
* This method is used for write of IoTConsensus SyncLog. By this method, we can keep write order
* in follower the same as the leader. And besides order insurance, we can make the
* deserialization of PlanNode to be concurrent
*/
private class SyncLogCacheQueue {
private final int sourcePeerId;
private final Lock queueLock = new ReentrantLock();
private final Condition queueSortCondition = queueLock.newCondition();
private final PriorityQueue<DeserializedBatchIndexedConsensusRequest> requestCache;
private long nextSyncIndex = -1;
public SyncLogCacheQueue(int sourcePeerId) {
this.sourcePeerId = sourcePeerId;
this.requestCache = new PriorityQueue<>();
}
/**
* This method is used for write of IoTConsensus SyncLog. By this method, we can keep write
* order in follower the same as the leader. And besides order insurance, we can make the
* deserialization of PlanNode to be concurrent
*/
private TSStatus cacheAndInsertLatestNode(DeserializedBatchIndexedConsensusRequest request) {
logger.debug(
"cacheAndInsert start: source = {}, region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
sourcePeerId,
consensusGroupId,
requestCache.size(),
request.getStartSyncIndex(),
request.getEndSyncIndex());
queueLock.lock();
try {
long insertStartTime = System.nanoTime();
requestCache.add(request);
// If the peek is not hold by current thread, it should notify the corresponding thread to
// process the peek when the queue is full
if (requestCache.size() == config.getReplication().getMaxPendingBatchesNum()
&& requestCache.peek() != null
&& requestCache.peek().getStartSyncIndex() != request.getStartSyncIndex()) {
queueSortCondition.signalAll();
}
while (true) {
// If current InsertNode is the next target InsertNode, write it
if (request.getStartSyncIndex() == nextSyncIndex) {
requestCache.remove(request);
nextSyncIndex = request.getEndSyncIndex() + 1;
break;
}
// If all write thread doesn't hit nextSyncIndex and the heap is full, write
// the peek request. This is used to keep the whole write correct when nextSyncIndex
// is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
// There are some cases that nextSyncIndex is not set:
// 1. When the system was just started
// 2. When some exception occurs during SyncLog
if (requestCache.size() == config.getReplication().getMaxPendingBatchesNum()
&& requestCache.peek() != null
&& requestCache.peek().getStartSyncIndex() == request.getStartSyncIndex()) {
requestCache.remove();
nextSyncIndex = request.getEndSyncIndex() + 1;
break;
}
try {
boolean timeout =
!queueSortCondition.await(
config.getReplication().getMaxWaitingTimeForWaitBatchInMs(),
TimeUnit.MILLISECONDS);
// although the timeout is triggered, current thread cannot write its request
// if current thread does not hold the peek request. And there should be some
// other thread who hold the peek request. In this scenario, current thread
// should go into await again and wait until its request becoming peek request
if (timeout
&& requestCache.peek() != null
&& requestCache.peek().getStartSyncIndex() == request.getStartSyncIndex()) {
// current thread hold the peek request thus it can write the peek immediately.
logger.info(
"waiting target request timeout. current index: {}, target index: {}",
request.getStartSyncIndex(),
nextSyncIndex);
requestCache.remove(request);
nextSyncIndex = Math.max(nextSyncIndex, request.getEndSyncIndex() + 1);
break;
}
} catch (InterruptedException e) {
logger.warn(
"current waiting is interrupted. SyncIndex: {}. Exception: ",
request.getStartSyncIndex(),
e);
Thread.currentThread().interrupt();
}
}
long sortTime = System.nanoTime();
ioTConsensusServerMetrics.recordSortCost(sortTime - insertStartTime);
List<TSStatus> subStatus = new LinkedList<>();
for (IConsensusRequest insertNode : request.getInsertNodes()) {
subStatus.add(stateMachine.write(insertNode));
}
long applyTime = System.nanoTime();
ioTConsensusServerMetrics.recordApplyCost(applyTime - sortTime);
queueSortCondition.signalAll();
logger.debug(
"cacheAndInsert end: source = {}, region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}, sortTime = {}ms, applyTime = {}ms",
sourcePeerId,
consensusGroupId,
requestCache.size(),
request.getStartSyncIndex(),
request.getEndSyncIndex(),
TimeUnit.NANOSECONDS.toMillis(sortTime - insertStartTime),
TimeUnit.NANOSECONDS.toMillis(applyTime - sortTime));
return new TSStatus().setSubStatus(subStatus);
} finally {
queueLock.unlock();
}
}
}
}