[IoTDB-4328] Complete add/remove replica for MultiLeaderConsensus (#7390)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index d6b3b2b..d93ca83 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -558,8 +558,15 @@
       return Optional.empty();
     }
 
+    List<TDataNodeLocation> aliveDataNodes =
+        configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
+            .map(TDataNodeConfiguration::getLocation)
+            .collect(Collectors.toList());
+
     // TODO replace findAny() by select the low load node.
-    return regionReplicaNodes.stream().filter(e -> !e.equals(filterLocation)).findAny();
+    return regionReplicaNodes.stream()
+        .filter(e -> aliveDataNodes.contains(e) && !e.equals(filterLocation))
+        .findAny();
   }
 
   private String getIdWithRpcEndpoint(TDataNodeLocation location) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java
new file mode 100644
index 0000000..0260ba0
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.exception;
+
+public class ConsensusGroupAddPeerException extends Exception {
+  public ConsensusGroupAddPeerException(String message) {
+    super(message);
+  }
+
+  public ConsensusGroupAddPeerException(Throwable cause) {
+    super(cause);
+  }
+
+  public ConsensusGroupAddPeerException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index d091772..2a9da3a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -36,14 +36,19 @@
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
 import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
 import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory;
+import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.SyncMultiLeaderServiceClientPoolFactory;
+import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
 import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCService;
 import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCServiceProcessor;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -73,6 +78,7 @@
   private final RegisterManager registerManager = new RegisterManager();
   private final MultiLeaderConfig config;
   private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager;
+  private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager;
 
   public MultiLeaderConsensus(ConsensusConfig config, Registry registry) {
     this.thisNode = config.getThisNode();
@@ -84,6 +90,10 @@
         new IClientManager.Factory<TEndPoint, AsyncMultiLeaderServiceClient>()
             .createClientManager(
                 new AsyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig()));
+    this.syncClientManager =
+        new IClientManager.Factory<TEndPoint, SyncMultiLeaderServiceClient>()
+            .createClientManager(
+                new SyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig()));
   }
 
   @Override
@@ -116,6 +126,7 @@
                   new ArrayList<>(),
                   registry.apply(consensusGroupId),
                   clientManager,
+                  syncClientManager,
                   config);
           stateMachineMap.put(consensusGroupId, consensus);
           consensus.start();
@@ -144,6 +155,10 @@
     if (impl.isReadOnly()) {
       status = new TSStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR.getStatusCode());
       status.setMessage("Fail to do non-query operations because system is read-only.");
+    } else if (!impl.isActive()) {
+      // TODO: (xingtanzjr) whether we need to define a new status to indicate the inactive status ?
+      status = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT);
+      status.setMessage("peer is inactive and not ready to receive sync log request.");
     } else {
       status = impl.write(request);
     }
@@ -191,6 +206,7 @@
                   peers,
                   registry.apply(groupId),
                   clientManager,
+                  syncClientManager,
                   config);
           impl.start();
           return impl;
@@ -225,12 +241,65 @@
 
   @Override
   public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+    if (impl == null) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    try {
+      // step 1: inactive new Peer to prepare for following steps
+      logger.info("[MultiLeaderConsensus] inactivate new peer: {}", peer);
+      impl.inactivePeer(peer);
+
+      // step 2: notify all the other Peers to build the sync connection to newPeer
+      logger.info("[MultiLeaderConsensus] notify current peers to build sync log...");
+      impl.notifyPeersToBuildSyncLogChannel(peer);
+
+      // step 3: take snapshot
+      logger.info("[MultiLeaderConsensus] start to take snapshot...");
+      impl.takeSnapshot();
+
+      // step 4: transit snapshot
+      logger.info("[MultiLeaderConsensus] start to transit snapshot...");
+      impl.transitSnapshot(peer);
+
+      // step 5: let the new peer load snapshot
+      logger.info("[MultiLeaderConsensus] trigger new peer to load snapshot...");
+      impl.triggerSnapshotLoad(peer);
+
+      // step 6: active new Peer
+      logger.info("[MultiLeaderConsensus] activate new peer...");
+      impl.activePeer(peer);
+
+    } catch (ConsensusGroupAddPeerException e) {
+      logger.error("cannot execute addPeer() for {}", peer, e);
+      return ConsensusGenericResponse.newBuilder()
+          .setSuccess(false)
+          .setException(new ConsensusException(e.getMessage()))
+          .build();
+    }
+
+    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
   }
 
   @Override
   public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+    if (impl == null) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    try {
+      impl.notifyPeersToRemoveSyncLogChannel(peer);
+    } catch (ConsensusGroupAddPeerException e) {
+      return ConsensusGenericResponse.newBuilder()
+          .setSuccess(false)
+          .setException(new ConsensusException(e.getMessage()))
+          .build();
+    }
+    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
   }
 
   @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 1b45b3c..5803b9e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -28,14 +28,31 @@
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
 import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
+import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
 import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
+import org.apache.iotdb.consensus.multileader.snapshot.SnapshotFragmentReader;
+import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerReq;
+import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerRes;
+import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelReq;
+import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelRes;
+import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
+import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
+import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelReq;
+import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelRes;
+import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentReq;
+import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentRes;
+import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
+import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +61,10 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -56,6 +76,8 @@
 public class MultiLeaderServerImpl {
 
   private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
+  private static final String CONFIGURATION_TMP_FILE_NAME = "configuration.dat.tmp";
+  private static final String SNAPSHOT_DIR_NAME = "snapshot";
 
   private final Logger logger = LoggerFactory.getLogger(MultiLeaderServerImpl.class);
 
@@ -69,6 +91,9 @@
   private final LogDispatcher logDispatcher;
   private final MultiLeaderConfig config;
   private final ConsensusReqReader reader;
+  private boolean active;
+  private String latestSnapshotId;
+  private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager;
 
   public MultiLeaderServerImpl(
       String storageDir,
@@ -76,10 +101,13 @@
       List<Peer> configuration,
       IStateMachine stateMachine,
       IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager,
+      IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager,
       MultiLeaderConfig config) {
+    this.active = true;
     this.storageDir = storageDir;
     this.thisNode = thisNode;
     this.stateMachine = stateMachine;
+    this.syncClientManager = syncClientManager;
     this.configuration = configuration;
     if (configuration.isEmpty()) {
       recoverConfiguration();
@@ -175,36 +203,302 @@
     return stateMachine.read(request);
   }
 
-  public boolean takeSnapshot(File snapshotDir) {
-    return stateMachine.takeSnapshot(snapshotDir);
+  public void takeSnapshot() throws ConsensusGroupAddPeerException {
+    try {
+      latestSnapshotId =
+          String.format(
+              "%s_%s_%d",
+              SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), System.currentTimeMillis());
+      File snapshotDir = new File(storageDir, latestSnapshotId);
+      if (snapshotDir.exists()) {
+        FileUtils.deleteDirectory(snapshotDir);
+      }
+      if (!snapshotDir.mkdirs()) {
+        throw new ConsensusGroupAddPeerException(
+            String.format("%s: cannot mkdir for snapshot", thisNode.getGroupId()));
+      }
+      if (!stateMachine.takeSnapshot(snapshotDir)) {
+        throw new ConsensusGroupAddPeerException("unknown error when taking snapshot");
+      }
+    } catch (IOException e) {
+      throw new ConsensusGroupAddPeerException("error when taking snapshot", e);
+    }
   }
 
-  public void loadSnapshot(File latestSnapshotRootDir) {
-    stateMachine.loadSnapshot(latestSnapshotRootDir);
+  public void transitSnapshot(Peer targetPeer) throws ConsensusGroupAddPeerException {
+    File snapshotDir = new File(storageDir, latestSnapshotId);
+    List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
+    System.out.println(snapshotPaths);
+    try (SyncMultiLeaderServiceClient client =
+        syncClientManager.borrowClient(targetPeer.getEndpoint())) {
+      for (Path path : snapshotPaths) {
+        SnapshotFragmentReader reader = new SnapshotFragmentReader(latestSnapshotId, path);
+        while (reader.hasNext()) {
+          TSendSnapshotFragmentReq req = reader.next().toTSendSnapshotFragmentReq();
+          req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId());
+          //          receiveSnapshotFragment(latestSnapshotId, req.filePath, req.fileChunk);
+          TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req);
+          if (!isSuccess(res.getStatus())) {
+            throw new ConsensusGroupAddPeerException(
+                String.format("error when sending snapshot fragment to %s", targetPeer));
+          }
+        }
+        reader.close();
+      }
+    } catch (IOException | TException e) {
+      throw new ConsensusGroupAddPeerException(
+          String.format("error when send snapshot file to %s", targetPeer), e);
+    }
+  }
+
+  public void receiveSnapshotFragment(
+      String snapshotId, String originalFilePath, ByteBuffer fileChunk)
+      throws ConsensusGroupAddPeerException {
+    try {
+      String targetFilePath = calculateSnapshotPath(snapshotId, originalFilePath);
+      File targetFile = new File(storageDir, targetFilePath);
+      Path parentDir = Paths.get(targetFile.getParent());
+      if (!Files.exists(parentDir)) {
+        Files.createDirectories(parentDir);
+      }
+      Files.write(
+          Paths.get(targetFile.getAbsolutePath()),
+          fileChunk.array(),
+          StandardOpenOption.CREATE,
+          StandardOpenOption.APPEND);
+    } catch (IOException e) {
+      throw new ConsensusGroupAddPeerException(
+          String.format("error when receiving snapshot %s", snapshotId), e);
+    }
+  }
+
+  private String calculateSnapshotPath(String snapshotId, String originalFilePath)
+      throws ConsensusGroupAddPeerException {
+    if (!originalFilePath.contains(snapshotId)) {
+      throw new ConsensusGroupAddPeerException(
+          String.format(
+              "invalid snapshot file. snapshotId: %s, filePath: %s", snapshotId, originalFilePath));
+    }
+    return originalFilePath.substring(originalFilePath.indexOf(snapshotId));
+  }
+
+  public void loadSnapshot(String snapshotId) {
+    // TODO: (xingtanzjr) throw exception if the snapshot load failed
+    stateMachine.loadSnapshot(new File(storageDir, snapshotId));
+  }
+
+  public void inactivePeer(Peer peer) throws ConsensusGroupAddPeerException {
+    try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
+      TInactivatePeerRes res =
+          client.inactivatePeer(
+              new TInactivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
+      if (!isSuccess(res.status)) {
+        throw new ConsensusGroupAddPeerException(
+            String.format("error when inactivating %s. %s", peer, res.getStatus()));
+      }
+    } catch (IOException | TException e) {
+      throw new ConsensusGroupAddPeerException(
+          String.format("error when inactivating %s", peer), e);
+    }
+  }
+
+  public void triggerSnapshotLoad(Peer peer) throws ConsensusGroupAddPeerException {
+    try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
+      TTriggerSnapshotLoadRes res =
+          client.triggerSnapshotLoad(
+              new TTriggerSnapshotLoadReq(
+                  thisNode.getGroupId().convertToTConsensusGroupId(), latestSnapshotId));
+      if (!isSuccess(res.status)) {
+        throw new ConsensusGroupAddPeerException(
+            String.format("error when triggering snapshot load %s. %s", peer, res.getStatus()));
+      }
+    } catch (IOException | TException e) {
+      throw new ConsensusGroupAddPeerException(String.format("error when activating %s", peer), e);
+    }
+  }
+
+  public void activePeer(Peer peer) throws ConsensusGroupAddPeerException {
+    try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
+      TActivatePeerRes res =
+          client.activatePeer(new TActivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
+      if (!isSuccess(res.status)) {
+        throw new ConsensusGroupAddPeerException(
+            String.format("error when activating %s. %s", peer, res.getStatus()));
+      }
+    } catch (IOException | TException e) {
+      throw new ConsensusGroupAddPeerException(String.format("error when activating %s", peer), e);
+    }
+  }
+
+  public void notifyPeersToBuildSyncLogChannel(Peer targetPeer)
+      throws ConsensusGroupAddPeerException {
+    // The configuration will be modified during iterating because we will add the targetPeer to
+    // configuration
+    List<Peer> currentMembers = new ArrayList<>(this.configuration);
+    logger.info(
+        "[MultiLeaderConsensus] notify current peers to build sync log. group member: {}, target: {}",
+        currentMembers,
+        targetPeer);
+    for (Peer peer : currentMembers) {
+      logger.info("[MultiLeaderConsensus] build sync log channel from {}", peer);
+      if (peer.equals(thisNode)) {
+        // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the
+        // snapshot produced by thisNode
+        buildSyncLogChannel(targetPeer, index.get());
+      } else {
+        // use RPC to tell other peers to build sync log channel to target peer
+        try (SyncMultiLeaderServiceClient client =
+            syncClientManager.borrowClient(peer.getEndpoint())) {
+          TBuildSyncLogChannelRes res =
+              client.buildSyncLogChannel(
+                  new TBuildSyncLogChannelReq(
+                      targetPeer.getGroupId().convertToTConsensusGroupId(),
+                      targetPeer.getEndpoint()));
+          if (!isSuccess(res.status)) {
+            throw new ConsensusGroupAddPeerException(
+                String.format("build sync log channel failed from %s to %s", peer, targetPeer));
+          }
+        } catch (IOException | TException e) {
+          // We use a simple way to deal with the connection issue when notifying other nodes to
+          // build sync log. If the un-responsible peer is the peer which will be removed, we cannot
+          // suspend the operation and need to skip it. In order to keep the mechanism works fine,
+          // we will skip the peer which cannot be reached.
+          // If following error message appears, the un-responsible peer should be removed manually
+          // after current operation
+          // TODO: (xingtanzjr) design more reliable way for MultiLeaderConsensus
+          logger.error(
+              "cannot notify {} to build sync log channel. Please check the status of this node manually",
+              peer,
+              e);
+        }
+      }
+    }
+  }
+
+  public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer)
+      throws ConsensusGroupAddPeerException {
+    // The configuration will be modified during iterating because we will add the targetPeer to
+    // configuration
+    List<Peer> currentMembers = new ArrayList<>(this.configuration);
+    for (Peer peer : currentMembers) {
+      if (peer.equals(targetPeer)) {
+        // if the targetPeer is the same as current peer, skip it because removing itself is illegal
+        continue;
+      }
+      if (peer.equals(thisNode)) {
+        removeSyncLogChannel(targetPeer);
+      } else {
+        // use RPC to tell other peers to build sync log channel to target peer
+        try (SyncMultiLeaderServiceClient client =
+            syncClientManager.borrowClient(peer.getEndpoint())) {
+          TRemoveSyncLogChannelRes res =
+              client.removeSyncLogChannel(
+                  new TRemoveSyncLogChannelReq(
+                      targetPeer.getGroupId().convertToTConsensusGroupId(),
+                      targetPeer.getEndpoint()));
+          if (!isSuccess(res.status)) {
+            throw new ConsensusGroupAddPeerException(
+                String.format("remove sync log channel failed from %s to %s", peer, targetPeer));
+          }
+        } catch (IOException | TException e) {
+          throw new ConsensusGroupAddPeerException(
+              String.format("error when removing sync log channel to %s", peer), e);
+        }
+      }
+    }
+  }
+
+  private boolean isSuccess(TSStatus status) {
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+  }
+
+  /** build SyncLog channel with safeIndex as the default initial sync index */
+  public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException {
+    buildSyncLogChannel(targetPeer, getCurrentSafelyDeletedSearchIndex());
+  }
+
+  public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
+      throws ConsensusGroupAddPeerException {
+    // step 1, build sync channel in LogDispatcher
+    logger.info(
+        "[MultiLeaderConsensus] build sync log channel to {} with initialSyncIndex {}",
+        targetPeer,
+        initialSyncIndex);
+    logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex);
+    // step 2, update configuration
+    configuration.add(targetPeer);
+    logger.info("[MultiLeaderConsensus] persist new configuration: {}", configuration);
+    persistConfigurationUpdate();
+  }
+
+  public void removeSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException {
+    try {
+      logDispatcher.removeLogDispatcherThread(targetPeer);
+      logger.info("[MultiLeaderConsensus] log dispatcher to {} removed and cleanup", targetPeer);
+      configuration.remove(targetPeer);
+      persistConfigurationUpdate();
+      logger.info("[MultiLeaderConsensus] configuration updated to {}", this.configuration);
+    } catch (IOException e) {
+      throw new ConsensusGroupAddPeerException("error when remove LogDispatcherThread", e);
+    }
   }
 
   public void persistConfiguration() {
     try (PublicBAOS publicBAOS = new PublicBAOS();
         DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
-      outputStream.writeInt(configuration.size());
-      for (Peer peer : configuration) {
-        peer.serialize(outputStream);
-      }
+      serializeConfigurationTo(outputStream);
       Files.write(
           Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath()),
           publicBAOS.getBuf());
     } catch (IOException e) {
+      // TODO: (xingtanzjr) need to handle the IOException because the MultiLeaderConsensus won't
+      // work expectedly
+      //  if the exception occurs
       logger.error("Unexpected error occurs when persisting configuration", e);
     }
   }
 
+  public void persistConfigurationUpdate() throws ConsensusGroupAddPeerException {
+    try (PublicBAOS publicBAOS = new PublicBAOS();
+        DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
+      serializeConfigurationTo(outputStream);
+      Path tmpConfigurationPath =
+          Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
+      Path configurationPath =
+          Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
+      Files.write(tmpConfigurationPath, publicBAOS.getBuf());
+      Files.delete(configurationPath);
+      Files.move(tmpConfigurationPath, configurationPath);
+    } catch (IOException e) {
+      throw new ConsensusGroupAddPeerException(
+          "Unexpected error occurs when update configuration", e);
+    }
+  }
+
+  private void serializeConfigurationTo(DataOutputStream outputStream) throws IOException {
+    outputStream.writeInt(configuration.size());
+    for (Peer peer : configuration) {
+      peer.serialize(outputStream);
+    }
+  }
+
   public void recoverConfiguration() {
     ByteBuffer buffer;
     try {
-      buffer =
-          ByteBuffer.wrap(
-              Files.readAllBytes(
-                  Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath())));
+      Path tmpConfigurationPath =
+          Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
+      Path configurationPath =
+          Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
+      // If the tmpConfigurationPath exists, it means the `persistConfigurationUpdate` is
+      // interrupted
+      // unexpectedly, we need substitute configuration with tmpConfiguration file
+      if (Files.exists(tmpConfigurationPath)) {
+        if (Files.exists(configurationPath)) {
+          Files.delete(configurationPath);
+        }
+        Files.move(tmpConfigurationPath, configurationPath);
+      }
+      buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
       int size = buffer.getInt();
       for (int i = 0; i < size; i++) {
         configuration.add(Peer.deserialize(buffer));
@@ -278,4 +572,13 @@
   public boolean isReadOnly() {
     return stateMachine.isReadOnly();
   }
+
+  public boolean isActive() {
+    return active;
+  }
+
+  public void setActive(boolean active) {
+    logger.info("set {} active status to {}", this.thisNode, active);
+    this.active = active;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
index b4cefed..958ccd1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
@@ -45,9 +45,7 @@
 
   @Override
   public void onComplete(TSyncLogRes response) {
-    if (response.getStatus().size() == 1
-        && response.getStatus().get(0).getCode()
-            == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
+    if (response.getStatus().size() == 1 && needRetry(response.getStatus().get(0).getCode())) {
       logger.warn(
           "Can not send {} to peer {} for {} times because {}",
           batch,
@@ -62,6 +60,12 @@
     }
   }
 
+  private boolean needRetry(int statusCode) {
+    return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
+        || statusCode == TSStatusCode.READ_ONLY_SYSTEM_ERROR.getStatusCode()
+        || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
+  }
+
   @Override
   public void onError(Exception exception) {
     logger.warn(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
index 44d1002..8e2d23a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
@@ -33,6 +33,30 @@
 
   private MultiLeaderConsensusClientPool() {}
 
+  public static class SyncMultiLeaderServiceClientPoolFactory
+      implements IClientPoolFactory<TEndPoint, SyncMultiLeaderServiceClient> {
+    private final MultiLeaderConfig config;
+
+    public SyncMultiLeaderServiceClientPoolFactory(MultiLeaderConfig config) {
+      this.config = config;
+    }
+
+    @Override
+    public KeyedObjectPool<TEndPoint, SyncMultiLeaderServiceClient> createClientPool(
+        ClientManager<TEndPoint, SyncMultiLeaderServiceClient> manager) {
+      return new GenericKeyedObjectPool<>(
+          new SyncMultiLeaderServiceClient.Factory(
+              manager,
+              new ClientFactoryProperty.Builder()
+                  .setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
+                  .setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
+                  .setSelectorNumOfAsyncClientManager(
+                      config.getRpc().getSelectorNumOfClientManager())
+                  .build()),
+          new ClientPoolProperty.Builder<SyncMultiLeaderServiceClient>().build().getConfig());
+    }
+  }
+
   public static class AsyncMultiLeaderServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, AsyncMultiLeaderServiceClient> {
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/SyncMultiLeaderServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/SyncMultiLeaderServiceClient.java
new file mode 100644
index 0000000..3cbb3ae
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/SyncMultiLeaderServiceClient.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.client;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.BaseClientFactory;
+import org.apache.iotdb.commons.client.ClientFactoryProperty;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.sync.SyncThriftClient;
+import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.lang.reflect.Constructor;
+import java.net.SocketException;
+
+public class SyncMultiLeaderServiceClient extends MultiLeaderConsensusIService.Client
+    implements SyncThriftClient, AutoCloseable {
+
+  private final TEndPoint endPoint;
+  private final ClientManager<TEndPoint, SyncMultiLeaderServiceClient> clientManager;
+
+  public SyncMultiLeaderServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      TEndPoint endPoint,
+      ClientManager<TEndPoint, SyncMultiLeaderServiceClient> clientManager)
+      throws TTransportException {
+    super(
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    endPoint.getIp(),
+                    endPoint.getPort(),
+                    connectionTimeout))));
+    this.endPoint = endPoint;
+    this.clientManager = clientManager;
+    getInputProtocol().getTransport().open();
+  }
+
+  @TestOnly
+  public TEndPoint getTEndpoint() {
+    return endPoint;
+  }
+
+  @TestOnly
+  public ClientManager<TEndPoint, SyncMultiLeaderServiceClient> getClientManager() {
+    return clientManager;
+  }
+
+  public void close() {
+    if (clientManager != null) {
+      clientManager.returnClient(endPoint, this);
+    }
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public void invalidate() {
+    getInputProtocol().getTransport().close();
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endPoint);
+  }
+
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("SyncMultiLeaderServiceClient{%s}", endPoint);
+  }
+
+  public static class Factory extends BaseClientFactory<TEndPoint, SyncMultiLeaderServiceClient> {
+
+    public Factory(
+        ClientManager<TEndPoint, SyncMultiLeaderServiceClient> clientManager,
+        ClientFactoryProperty clientFactoryProperty) {
+      super(clientManager, clientFactoryProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        TEndPoint endpoint, PooledObject<SyncMultiLeaderServiceClient> pooledObject) {
+      pooledObject.getObject().invalidate();
+    }
+
+    @Override
+    public PooledObject<SyncMultiLeaderServiceClient> makeObject(TEndPoint endpoint)
+        throws Exception {
+      Constructor<SyncMultiLeaderServiceClient> constructor =
+          SyncMultiLeaderServiceClient.class.getConstructor(
+              TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
+      return new DefaultPooledObject<>(
+          SyncThriftClientWithErrorHandler.newErrorHandler(
+              SyncMultiLeaderServiceClient.class,
+              constructor,
+              clientFactoryProperty.getProtocolFactory(),
+              clientFactoryProperty.getConnectionTimeoutMs(),
+              endpoint,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        TEndPoint endpoint, PooledObject<SyncMultiLeaderServiceClient> pooledObject) {
+      return pooledObject.getObject() != null
+          && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+    }
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
index 027bf6a..f43f0a4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
@@ -45,13 +45,15 @@
 
   private final String storageDir;
   private final String prefix;
+  private final long initialIndex;
 
   private final long checkpointGap;
 
-  public IndexController(String storageDir, String prefix, long checkpointGap) {
+  public IndexController(String storageDir, String prefix, long initialIndex, long checkpointGap) {
     this.storageDir = storageDir;
     this.prefix = prefix + '-';
     this.checkpointGap = checkpointGap;
+    this.initialIndex = initialIndex;
     restore();
   }
 
@@ -100,11 +102,17 @@
     try {
       if (oldFile.exists()) {
         FileUtils.moveFile(oldFile, newFile);
+        logger.info(
+            "version file updated, previous: {}, current: {}",
+            oldFile.getAbsolutePath(),
+            newFile.getAbsolutePath());
+      } else {
+        // In the normal state, this branch should not be triggered.
+        logger.error(
+            "failed to flush sync index. cannot find previous version file. previous: {}",
+            lastFlushedIndex);
       }
-      logger.info(
-          "Version file updated, previous: {}, current: {}",
-          oldFile.getAbsolutePath(),
-          newFile.getAbsolutePath());
+
       lastFlushedIndex = flushIndex;
     } catch (IOException e) {
       logger.error("Error occurred when flushing next version", e);
@@ -138,12 +146,26 @@
       }
       currentIndex = lastFlushedIndex;
     } else {
-      versionFile = new File(directory, prefix + "0");
+      currentIndex = initialIndex;
+      versionFile = new File(directory, prefix + initialIndex);
       try {
         Files.createFile(versionFile.toPath());
+        lastFlushedIndex = initialIndex;
       } catch (IOException e) {
+        // TODO: (xingtanzjr) we need to handle the situation that file creation failed.
+        //  Or the dispatcher won't run correctly
         logger.error("Error occurred when creating new file {}", versionFile.getAbsolutePath(), e);
       }
     }
   }
+
+  public void cleanupVersionFiles() throws IOException {
+    File directory = new File(storageDir);
+    File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(prefix));
+    if (versionFiles != null && versionFiles.length > 0) {
+      for (File versionFile : versionFiles) {
+        Files.delete(versionFile.toPath());
+      }
+    }
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 0574d12..ccf11bf 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -55,15 +55,16 @@
 
 /** Manage all asynchronous replication threads and corresponding async clients */
 public class LogDispatcher {
-
   private final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
-
+  private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L;
   private final MultiLeaderServerImpl impl;
   private final List<LogDispatcherThread> threads;
   private final String selfPeerId;
   private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager;
   private ExecutorService executorService;
 
+  private boolean stopped = false;
+
   public LogDispatcher(
       MultiLeaderServerImpl impl,
       IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager) {
@@ -73,22 +74,26 @@
     this.threads =
         impl.getConfiguration().stream()
             .filter(x -> !Objects.equals(x, impl.getThisNode()))
-            .map(x -> new LogDispatcherThread(x, impl.getConfig()))
+            .map(x -> new LogDispatcherThread(x, impl.getConfig(), DEFAULT_INITIAL_SYNC_INDEX))
             .collect(Collectors.toList());
     if (!threads.isEmpty()) {
+      // We use cached thread pool here because each LogDispatcherThread will occupy one thread.
+      // And every LogDispatcherThread won't release its thread in this pool because it won't stop
+      // unless LogDispatcher stop.
+      // Thus, the size of this threadPool will be the same as the count of LogDispatcherThread.
       this.executorService =
-          IoTDBThreadPoolFactory.newFixedThreadPool(
-              threads.size(), "LogDispatcher-" + impl.getThisNode().getGroupId());
+          IoTDBThreadPoolFactory.newCachedThreadPool(
+              "LogDispatcher-" + impl.getThisNode().getGroupId());
     }
   }
 
-  public void start() {
+  public synchronized void start() {
     if (!threads.isEmpty()) {
       threads.forEach(executorService::submit);
     }
   }
 
-  public void stop() {
+  public synchronized void stop() {
     if (!threads.isEmpty()) {
       threads.forEach(LogDispatcherThread::stop);
       executorService.shutdownNow();
@@ -102,31 +107,65 @@
         logger.error("Unexpected Interruption when closing LogDispatcher service ");
       }
     }
+    stopped = true;
   }
 
-  public OptionalLong getMinSyncIndex() {
+  public synchronized void addLogDispatcherThread(Peer peer, long initialSyncIndex) {
+    if (stopped) {
+      return;
+    }
+    //
+    LogDispatcherThread thread = new LogDispatcherThread(peer, impl.getConfig(), initialSyncIndex);
+    threads.add(thread);
+    executorService.submit(thread);
+  }
+
+  public synchronized void removeLogDispatcherThread(Peer peer) throws IOException {
+    if (stopped) {
+      return;
+    }
+    int threadIndex = -1;
+    for (int i = 0; i < threads.size(); i++) {
+      if (threads.get(i).peer.equals(peer)) {
+        threadIndex = i;
+        break;
+      }
+    }
+    if (threadIndex == -1) {
+      return;
+    }
+    threads.get(threadIndex).stop();
+    threads.get(threadIndex).cleanup();
+    threads.remove(threadIndex);
+  }
+
+  public synchronized OptionalLong getMinSyncIndex() {
     return threads.stream().mapToLong(LogDispatcherThread::getCurrentSyncIndex).min();
   }
 
   public void offer(IndexedConsensusRequest request) {
     List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
-    threads.forEach(
-        thread -> {
-          logger.debug(
-              "{}->{}: Push a log to the queue, where the queue length is {}",
-              impl.getThisNode().getGroupId(),
-              thread.getPeer().getEndpoint().getIp(),
-              thread.getPendingRequest().size());
-          if (!thread
-              .getPendingRequest()
-              .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
+    // we put the serialization step outside the synchronized block because it is stateless and
+    // time-consuming
+    synchronized (this) {
+      threads.forEach(
+          thread -> {
             logger.debug(
-                "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
+                "{}->{}: Push a log to the queue, where the queue length is {}",
                 impl.getThisNode().getGroupId(),
-                thread.getPeer(),
-                request.getSearchIndex());
-          }
-        });
+                thread.getPeer().getEndpoint().getIp(),
+                thread.getPendingRequest().size());
+            if (!thread
+                .getPendingRequest()
+                .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
+              logger.debug(
+                  "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
+                  impl.getThisNode().getGroupId(),
+                  thread.getPeer(),
+                  request.getSearchIndex());
+            }
+          });
+    }
   }
 
   public class LogDispatcherThread implements Runnable {
@@ -148,7 +187,7 @@
 
     private ConsensusReqReader.ReqIterator walEntryiterator;
 
-    public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
+    public LogDispatcherThread(Peer peer, MultiLeaderConfig config, long initialSyncIndex) {
       this.peer = peer;
       this.config = config;
       this.pendingRequest =
@@ -157,6 +196,7 @@
           new IndexController(
               impl.getStorageDir(),
               Utils.fromTEndPointToString(peer.getEndpoint()),
+              initialSyncIndex,
               config.getReplication().getCheckpointGap());
       this.syncStatus = new SyncStatus(controller, config);
       this.walEntryiterator = reader.getReqIterator(START_INDEX);
@@ -186,6 +226,10 @@
       stopped = true;
     }
 
+    public void cleanup() throws IOException {
+      this.controller.cleanupVersionFiles();
+    }
+
     public boolean isStopped() {
       return stopped;
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 86c3b0f..2544451 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -21,19 +21,33 @@
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
 import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
 import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerReq;
+import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerRes;
+import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelReq;
+import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelRes;
+import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
+import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
 import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
+import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelReq;
+import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelRes;
+import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentReq;
+import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentRes;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
+import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +75,7 @@
       if (impl == null) {
         String message =
             String.format(
-                "Unexpected consensusGroupId %s for TSyncLogReq which size is %s",
+                "unexpected consensusGroupId %s for TSyncLogReq which size is %s",
                 groupId, req.getBatches().size());
         logger.error(message);
         TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -70,10 +84,17 @@
         return;
       }
       if (impl.isReadOnly()) {
-        String message = "Fail to sync log because system is read-only.";
+        String message = "fail to sync log because system is read-only.";
         logger.error(message);
-        resultHandler.onError(
-            new IoTDBException(message, TSStatusCode.READ_ONLY_SYSTEM_ERROR.getStatusCode()));
+        TSStatus status = new TSStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR.getStatusCode());
+        status.setMessage(message);
+        resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
+        return;
+      }
+      if (!impl.isActive()) {
+        TSStatus status = new TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
+        status.setMessage("peer is inactive and not ready to receive sync log request");
+        resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
         return;
       }
       BatchIndexedConsensusRequest requestsInThisBatch =
@@ -106,12 +127,155 @@
       }
       TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch);
       logger.debug(
-          "Execute TSyncLogReq for {} with result {}", req.consensusGroupId, writeStatus.subStatus);
+          "execute TSyncLogReq for {} with result {}", req.consensusGroupId, writeStatus.subStatus);
       resultHandler.onComplete(new TSyncLogRes(writeStatus.subStatus));
     } catch (Exception e) {
       resultHandler.onError(e);
     }
   }
 
+  @Override
+  public void inactivatePeer(
+      TInactivatePeerReq req, AsyncMethodCallback<TInactivatePeerRes> resultHandler)
+      throws TException {
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format("unexpected consensusGroupId %s for inactivatePeer request", groupId);
+      logger.error(message);
+      TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      resultHandler.onComplete(new TInactivatePeerRes(status));
+      return;
+    }
+    impl.setActive(false);
+    resultHandler.onComplete(
+        new TInactivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+  }
+
+  @Override
+  public void activatePeer(
+      TActivatePeerReq req, AsyncMethodCallback<TActivatePeerRes> resultHandler) throws TException {
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format("unexpected consensusGroupId %s for inactivatePeer request", groupId);
+      logger.error(message);
+      TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      resultHandler.onComplete(new TActivatePeerRes(status));
+      return;
+    }
+    impl.setActive(true);
+    resultHandler.onComplete(
+        new TActivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+  }
+
+  @Override
+  public void buildSyncLogChannel(
+      TBuildSyncLogChannelReq req, AsyncMethodCallback<TBuildSyncLogChannelRes> resultHandler)
+      throws TException {
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId);
+      logger.error(message);
+      TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      resultHandler.onComplete(new TBuildSyncLogChannelRes(status));
+      return;
+    }
+    TSStatus responseStatus;
+    try {
+      impl.buildSyncLogChannel(new Peer(groupId, req.endPoint));
+      responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (ConsensusGroupAddPeerException e) {
+      responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      responseStatus.setMessage(e.getMessage());
+    }
+    resultHandler.onComplete(new TBuildSyncLogChannelRes(responseStatus));
+  }
+
+  @Override
+  public void removeSyncLogChannel(
+      TRemoveSyncLogChannelReq req, AsyncMethodCallback<TRemoveSyncLogChannelRes> resultHandler)
+      throws TException {
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId);
+      logger.error(message);
+      TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      resultHandler.onComplete(new TRemoveSyncLogChannelRes(status));
+      return;
+    }
+    TSStatus responseStatus;
+    try {
+      impl.removeSyncLogChannel(new Peer(groupId, req.endPoint));
+      responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (ConsensusGroupAddPeerException e) {
+      responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      responseStatus.setMessage(e.getMessage());
+    }
+    resultHandler.onComplete(new TRemoveSyncLogChannelRes(responseStatus));
+  }
+
+  @Override
+  public void sendSnapshotFragment(
+      TSendSnapshotFragmentReq req, AsyncMethodCallback<TSendSnapshotFragmentRes> resultHandler)
+      throws TException {
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId);
+      logger.error(message);
+      TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      resultHandler.onComplete(new TSendSnapshotFragmentRes(status));
+      return;
+    }
+    TSStatus responseStatus;
+    try {
+      impl.receiveSnapshotFragment(req.snapshotId, req.filePath, req.fileChunk);
+      responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (ConsensusGroupAddPeerException e) {
+      responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      responseStatus.setMessage(e.getMessage());
+    }
+    resultHandler.onComplete(new TSendSnapshotFragmentRes(responseStatus));
+  }
+
+  @Override
+  public void triggerSnapshotLoad(
+      TTriggerSnapshotLoadReq req, AsyncMethodCallback<TTriggerSnapshotLoadRes> resultHandler)
+      throws TException {
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId);
+      logger.error(message);
+      TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      resultHandler.onComplete(new TTriggerSnapshotLoadRes(status));
+      return;
+    }
+    impl.loadSnapshot(req.snapshotId);
+    resultHandler.onComplete(
+        new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+  }
+
   public void handleClientExit() {}
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragment.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragment.java
new file mode 100644
index 0000000..ea04296
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragment.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.snapshot;
+
+import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentReq;
+
+import java.nio.ByteBuffer;
+
+public class SnapshotFragment {
+  private final String snapshotId;
+  private final String filePath;
+  private final long totalSize;
+  private final long startOffset;
+  private final long fragmentSize;
+  private final ByteBuffer fileChunk;
+
+  public SnapshotFragment(
+      String snapshotId,
+      String filePath,
+      long totalSize,
+      long startOffset,
+      long fragmentSize,
+      ByteBuffer fileChunk) {
+    this.snapshotId = snapshotId;
+    this.filePath = filePath;
+    this.totalSize = totalSize;
+    this.startOffset = startOffset;
+    this.fragmentSize = fragmentSize;
+    this.fileChunk = fileChunk;
+  }
+
+  public TSendSnapshotFragmentReq toTSendSnapshotFragmentReq() {
+    TSendSnapshotFragmentReq req = new TSendSnapshotFragmentReq();
+    req.setSnapshotId(snapshotId);
+    req.setFilePath(filePath);
+    req.setChunkLength(fragmentSize);
+    req.setFileChunk(fileChunk);
+    return req;
+  }
+
+  public String getSnapshotId() {
+    return snapshotId;
+  }
+
+  public String getFilePath() {
+    return filePath;
+  }
+
+  public long getTotalSize() {
+    return totalSize;
+  }
+
+  public long getStartOffset() {
+    return startOffset;
+  }
+
+  public long getFragmentSize() {
+    return fragmentSize;
+  }
+
+  public ByteBuffer getFileChunk() {
+    return fileChunk;
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragmentReader.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragmentReader.java
new file mode 100644
index 0000000..1509995
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragmentReader.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class SnapshotFragmentReader {
+
+  private static final int DEFAULT_FILE_FRAGMENT_SIZE = 10 * 1024 * 1024;
+  private final String snapshotId;
+  private final String filePath;
+  private final SeekableByteChannel fileChannel;
+  private final long fileSize;
+  private final ByteBuffer buf;
+  private long totalReadSize;
+  private SnapshotFragment cachedSnapshotFragment;
+
+  public SnapshotFragmentReader(String snapshotId, Path path) throws IOException {
+    this.snapshotId = snapshotId;
+    this.filePath = path.toAbsolutePath().toString();
+    this.fileSize = Files.size(path);
+    this.fileChannel = Files.newByteChannel(path);
+    this.buf = ByteBuffer.allocate(DEFAULT_FILE_FRAGMENT_SIZE);
+  }
+
+  public boolean hasNext() throws IOException {
+    buf.clear();
+    int actualReadSize = fileChannel.read(buf);
+    buf.flip();
+    if (actualReadSize > 0) {
+      cachedSnapshotFragment =
+          new SnapshotFragment(snapshotId, filePath, fileSize, totalReadSize, actualReadSize, buf);
+      totalReadSize += actualReadSize;
+      return true;
+    }
+    return false;
+  }
+
+  public SnapshotFragment next() {
+    return cachedSnapshotFragment;
+  }
+
+  public void close() throws IOException {
+    if (fileChannel != null) {
+      fileChannel.close();
+    }
+  }
+}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
index 2675470..761ace4 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
@@ -49,7 +49,7 @@
   @Test
   public void testIncrementIntervalAfterRestart() {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -58,7 +58,7 @@
     Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -66,7 +66,7 @@
     Assert.assertEquals(CHECK_POINT_GAP + 1, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
     Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
@@ -74,7 +74,7 @@
     Assert.assertEquals(CHECK_POINT_GAP * 2 - 1, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
     Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index f7c9072..a888904 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -56,7 +56,7 @@
   @Test
   public void sequenceTest() throws InterruptedException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);
@@ -82,7 +82,7 @@
   @Test
   public void reverseTest() throws InterruptedException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -115,7 +115,7 @@
   @Test
   public void mixedTest() throws InterruptedException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -160,7 +160,7 @@
   @Test
   public void waitTest() throws InterruptedException, ExecutionException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 75b18d5..0d90993 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -31,6 +31,9 @@
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
 import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.engine.cache.BloomFilterCache;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.snapshot.SnapshotLoader;
 import org.apache.iotdb.db.engine.snapshot.SnapshotTaker;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
@@ -124,6 +127,9 @@
     try {
       StorageEngineV2.getInstance()
           .setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region);
+      ChunkCache.getInstance().clear();
+      TimeSeriesMetadataCache.getInstance().clear();
+      BloomFilterCache.getInstance().clear();
     } catch (Exception e) {
       logger.error("Exception occurs when replacing data region in storage engine.", e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
index e3fdf37..97771dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
@@ -122,6 +122,12 @@
         return null;
       }
       LOGGER.info("Moving snapshot file to data dirs");
+      try {
+        deleteAllFilesInDataDirs();
+        LOGGER.info("Remove all data files in original data dir");
+      } catch (IOException e) {
+        return null;
+      }
       createLinksFromSnapshotDirToDataDirWithoutLog(new File(snapshotPath));
       return loadSnapshot();
     } catch (IOException | DiskSpaceInsufficientException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
index 67b3df9..d90eb08 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
@@ -149,6 +149,9 @@
           continue;
         }
         File tsFile = resource.getTsFile();
+        if (!resource.isClosed()) {
+          continue;
+        }
         File snapshotTsFile = getSnapshotFilePathForTsFile(tsFile, snapshotId);
         // create hard link for tsfile, resource, mods
         createHardLink(snapshotTsFile, tsFile);
@@ -169,7 +172,14 @@
   }
 
   private void createHardLink(File target, File source) throws IOException {
-    Files.createLink(target.toPath(), source.toPath());
+    if (!target.getParentFile().exists()) {
+      LOGGER.error("Hard link target dir {} doesn't exist", target.getParentFile());
+    }
+    if (!source.exists()) {
+      LOGGER.error("Hard link source file {} doesn't exist", source);
+    }
+
+    Files.createLink(target.getAbsoluteFile().toPath(), source.getAbsoluteFile().toPath());
     snapshotLogger.logFile(source.getAbsolutePath(), target.getAbsolutePath());
   }
 
@@ -216,6 +226,7 @@
   }
 
   private void cleanUpWhenFail(String snapshotId) {
+    LOGGER.info("Cleaning up snapshot dir for {}", snapshotId);
     for (String dataDir : IoTDBDescriptor.getInstance().getConfig().getDataDirs()) {
       File dataDirForThisSnapshot =
           new File(dataDir + File.separator + "snapshot" + File.separator + snapshotId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
new file mode 100644
index 0000000..4a5b416
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool;
+import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
+import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
+import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
+import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
+import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
+
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestRPCClient {
+  private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      INTERNAL_SERVICE_CLIENT_MANAGER =
+          new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+              .createClientManager(
+                  new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+
+  private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager;
+
+  public TestRPCClient() {
+    syncClientManager =
+        new IClientManager.Factory<TEndPoint, SyncMultiLeaderServiceClient>()
+            .createClientManager(
+                new MultiLeaderConsensusClientPool.SyncMultiLeaderServiceClientPoolFactory(
+                    new MultiLeaderConfig.Builder().build()));
+  }
+
+  public static void main(String args[]) {
+    TestRPCClient client = new TestRPCClient();
+    //    client.removeRegionPeer();
+    client.addPeer();
+    //    client.loadSnapshot();
+  }
+
+  private void loadSnapshot() {
+    try (SyncMultiLeaderServiceClient client =
+        syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40011))) {
+      TTriggerSnapshotLoadRes res =
+          client.triggerSnapshotLoad(
+              new TTriggerSnapshotLoadReq(
+                  new DataRegionId(1).convertToTConsensusGroupId(), "snapshot_1_1662370255552"));
+      System.out.println(res.status);
+    } catch (IOException | TException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void testAddPeer() {
+    try (SyncMultiLeaderServiceClient client =
+        syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40012))) {
+      TInactivatePeerRes res =
+          client.inactivatePeer(
+              new TInactivatePeerReq(new DataRegionId(1).convertToTConsensusGroupId()));
+      System.out.println(res.status);
+    } catch (IOException | TException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void removeRegionPeer() {
+    try (SyncDataNodeInternalServiceClient client =
+        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) {
+      client.removeRegionPeer(
+          new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
+    } catch (IOException | TException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void addPeer() {
+    try (SyncDataNodeInternalServiceClient client =
+        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) {
+      client.addRegionPeer(
+          new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
+    } catch (IOException | TException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private TDataNodeLocation getLocation3(int dataNodeId) {
+    return new TDataNodeLocation(
+        dataNodeId,
+        new TEndPoint("127.0.0.1", 6669),
+        new TEndPoint("127.0.0.1", 9005),
+        new TEndPoint("127.0.0.1", 8779),
+        new TEndPoint("127.0.0.1", 40012),
+        new TEndPoint("127.0.0.1", 50012));
+  }
+
+  private TDataNodeLocation getLocation2(int dataNodeId) {
+    return new TDataNodeLocation(
+        dataNodeId,
+        new TEndPoint("127.0.0.1", 6668),
+        new TEndPoint("127.0.0.1", 9004),
+        new TEndPoint("127.0.0.1", 8778),
+        new TEndPoint("127.0.0.1", 40011),
+        new TEndPoint("127.0.0.1", 50011));
+  }
+
+  private void createDataRegion() {
+    try (SyncDataNodeInternalServiceClient client =
+        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9005))) {
+      TCreateDataRegionReq req = new TCreateDataRegionReq();
+      req.setStorageGroup("root.test.g_0");
+      TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
+      regionReplicaSet.setRegionId(new DataRegionId(1).convertToTConsensusGroupId());
+      List<TDataNodeLocation> locationList = new ArrayList<>();
+      locationList.add(
+          new TDataNodeLocation(
+              3,
+              new TEndPoint("127.0.0.1", 6667),
+              new TEndPoint("127.0.0.1", 9003),
+              new TEndPoint("127.0.0.1", 8777),
+              new TEndPoint("127.0.0.1", 40010),
+              new TEndPoint("127.0.0.1", 50010)));
+      locationList.add(
+          new TDataNodeLocation(
+              4,
+              new TEndPoint("127.0.0.1", 6668),
+              new TEndPoint("127.0.0.1", 9004),
+              new TEndPoint("127.0.0.1", 8778),
+              new TEndPoint("127.0.0.1", 40011),
+              new TEndPoint("127.0.0.1", 50011)));
+      locationList.add(
+          new TDataNodeLocation(
+              4,
+              new TEndPoint("127.0.0.1", 6669),
+              new TEndPoint("127.0.0.1", 9005),
+              new TEndPoint("127.0.0.1", 8779),
+              new TEndPoint("127.0.0.1", 40012),
+              new TEndPoint("127.0.0.1", 50012)));
+      regionReplicaSet.setDataNodeLocations(locationList);
+      req.setRegionReplicaSet(regionReplicaSet);
+      TSStatus res = client.createDataRegion(req);
+      System.out.println(res.code + " " + res.message);
+
+    } catch (IOException | TException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index e532877..f83730b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -169,7 +169,7 @@
 
   private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) {
     for (TEndPoint endPoint : queryContext.getEndPointBlackList()) {
-      if (endPoint.getIp().equals(dataNodeLocation.internalEndPoint.getIp())) {
+      if (endPoint.equals(dataNodeLocation.internalEndPoint)) {
         return false;
       }
     }
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 7d315d2..380f30a 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -33,10 +33,71 @@
   3: required list<TLogBatch> batches
 }
 
+struct TInactivatePeerReq {
+  1: required common.TConsensusGroupId consensusGroupId
+}
+
+struct TInactivatePeerRes {
+  1: required common.TSStatus status
+}
+
+struct TActivatePeerReq {
+  1: required common.TConsensusGroupId consensusGroupId
+}
+
+struct TActivatePeerRes {
+  1: required common.TSStatus status
+}
+
 struct TSyncLogRes {
   1: required list<common.TSStatus> status
 }
 
+struct TBuildSyncLogChannelReq {
+  1: required common.TConsensusGroupId consensusGroupId
+  2: required common.TEndPoint endPoint
+}
+
+struct TBuildSyncLogChannelRes {
+  1: required common.TSStatus status
+}
+
+struct TRemoveSyncLogChannelReq {
+  1: required common.TConsensusGroupId consensusGroupId
+  2: required common.TEndPoint endPoint
+}
+
+struct TRemoveSyncLogChannelRes {
+  1: required common.TSStatus status
+}
+
+struct TSendSnapshotFragmentReq {
+  1: required common.TConsensusGroupId consensusGroupId
+  2: required string snapshotId
+  3: required string filePath
+  4: required i64 chunkLength
+  5: required binary fileChunk
+}
+
+struct TSendSnapshotFragmentRes {
+  1: required common.TSStatus status
+}
+
+struct TTriggerSnapshotLoadReq {
+  1: required common.TConsensusGroupId consensusGroupId
+  2: required string snapshotId
+}
+
+struct TTriggerSnapshotLoadRes {
+  1: required common.TSStatus status
+}
+
 service MultiLeaderConsensusIService {
   TSyncLogRes syncLog(TSyncLogReq req)
+  TInactivatePeerRes inactivatePeer(TInactivatePeerReq req)
+  TActivatePeerRes activatePeer(TActivatePeerReq req)
+  TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req)
+  TRemoveSyncLogChannelRes removeSyncLogChannel(TRemoveSyncLogChannelReq req)
+  TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq req)
+  TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req)
 }
\ No newline at end of file