[IoTDB-4328] Complete add/remove replica for MultiLeaderConsensus (#7390)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index d6b3b2b..d93ca83 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -558,8 +558,15 @@
return Optional.empty();
}
+ List<TDataNodeLocation> aliveDataNodes =
+ configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .collect(Collectors.toList());
+
// TODO replace findAny() by select the low load node.
- return regionReplicaNodes.stream().filter(e -> !e.equals(filterLocation)).findAny();
+ return regionReplicaNodes.stream()
+ .filter(e -> aliveDataNodes.contains(e) && !e.equals(filterLocation))
+ .findAny();
}
private String getIdWithRpcEndpoint(TDataNodeLocation location) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java
new file mode 100644
index 0000000..0260ba0
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.exception;
+
+public class ConsensusGroupAddPeerException extends Exception {
+ public ConsensusGroupAddPeerException(String message) {
+ super(message);
+ }
+
+ public ConsensusGroupAddPeerException(Throwable cause) {
+ super(cause);
+ }
+
+ public ConsensusGroupAddPeerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index d091772..2a9da3a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -36,14 +36,19 @@
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory;
+import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.SyncMultiLeaderServiceClientPoolFactory;
+import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCService;
import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCServiceProcessor;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -73,6 +78,7 @@
private final RegisterManager registerManager = new RegisterManager();
private final MultiLeaderConfig config;
private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager;
+ private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager;
public MultiLeaderConsensus(ConsensusConfig config, Registry registry) {
this.thisNode = config.getThisNode();
@@ -84,6 +90,10 @@
new IClientManager.Factory<TEndPoint, AsyncMultiLeaderServiceClient>()
.createClientManager(
new AsyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig()));
+ this.syncClientManager =
+ new IClientManager.Factory<TEndPoint, SyncMultiLeaderServiceClient>()
+ .createClientManager(
+ new SyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig()));
}
@Override
@@ -116,6 +126,7 @@
new ArrayList<>(),
registry.apply(consensusGroupId),
clientManager,
+ syncClientManager,
config);
stateMachineMap.put(consensusGroupId, consensus);
consensus.start();
@@ -144,6 +155,10 @@
if (impl.isReadOnly()) {
status = new TSStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR.getStatusCode());
status.setMessage("Fail to do non-query operations because system is read-only.");
+ } else if (!impl.isActive()) {
+ // TODO: (xingtanzjr) whether we need to define a new status to indicate the inactive status ?
+ status = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT);
+ status.setMessage("peer is inactive and not ready to receive sync log request.");
} else {
status = impl.write(request);
}
@@ -191,6 +206,7 @@
peers,
registry.apply(groupId),
clientManager,
+ syncClientManager,
config);
impl.start();
return impl;
@@ -225,12 +241,65 @@
@Override
public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
- return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+ if (impl == null) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ try {
+ // step 1: inactive new Peer to prepare for following steps
+ logger.info("[MultiLeaderConsensus] inactivate new peer: {}", peer);
+ impl.inactivePeer(peer);
+
+ // step 2: notify all the other Peers to build the sync connection to newPeer
+ logger.info("[MultiLeaderConsensus] notify current peers to build sync log...");
+ impl.notifyPeersToBuildSyncLogChannel(peer);
+
+ // step 3: take snapshot
+ logger.info("[MultiLeaderConsensus] start to take snapshot...");
+ impl.takeSnapshot();
+
+ // step 4: transit snapshot
+ logger.info("[MultiLeaderConsensus] start to transit snapshot...");
+ impl.transitSnapshot(peer);
+
+ // step 5: let the new peer load snapshot
+ logger.info("[MultiLeaderConsensus] trigger new peer to load snapshot...");
+ impl.triggerSnapshotLoad(peer);
+
+ // step 6: active new Peer
+ logger.info("[MultiLeaderConsensus] activate new peer...");
+ impl.activePeer(peer);
+
+ } catch (ConsensusGroupAddPeerException e) {
+ logger.error("cannot execute addPeer() for {}", peer, e);
+ return ConsensusGenericResponse.newBuilder()
+ .setSuccess(false)
+ .setException(new ConsensusException(e.getMessage()))
+ .build();
+ }
+
+ return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
}
@Override
public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
- return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+ if (impl == null) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ try {
+ impl.notifyPeersToRemoveSyncLogChannel(peer);
+ } catch (ConsensusGroupAddPeerException e) {
+ return ConsensusGenericResponse.newBuilder()
+ .setSuccess(false)
+ .setException(new ConsensusException(e.getMessage()))
+ .build();
+ }
+ return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
}
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 1b45b3c..5803b9e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -28,14 +28,31 @@
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
+import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
+import org.apache.iotdb.consensus.multileader.snapshot.SnapshotFragmentReader;
+import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerReq;
+import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerRes;
+import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelReq;
+import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelRes;
+import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
+import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
+import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelReq;
+import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelRes;
+import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentReq;
+import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentRes;
+import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
+import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +61,10 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -56,6 +76,8 @@
public class MultiLeaderServerImpl {
private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
+ private static final String CONFIGURATION_TMP_FILE_NAME = "configuration.dat.tmp";
+ private static final String SNAPSHOT_DIR_NAME = "snapshot";
private final Logger logger = LoggerFactory.getLogger(MultiLeaderServerImpl.class);
@@ -69,6 +91,9 @@
private final LogDispatcher logDispatcher;
private final MultiLeaderConfig config;
private final ConsensusReqReader reader;
+ private boolean active;
+ private String latestSnapshotId;
+ private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager;
public MultiLeaderServerImpl(
String storageDir,
@@ -76,10 +101,13 @@
List<Peer> configuration,
IStateMachine stateMachine,
IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager,
+ IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager,
MultiLeaderConfig config) {
+ this.active = true;
this.storageDir = storageDir;
this.thisNode = thisNode;
this.stateMachine = stateMachine;
+ this.syncClientManager = syncClientManager;
this.configuration = configuration;
if (configuration.isEmpty()) {
recoverConfiguration();
@@ -175,36 +203,302 @@
return stateMachine.read(request);
}
- public boolean takeSnapshot(File snapshotDir) {
- return stateMachine.takeSnapshot(snapshotDir);
+ public void takeSnapshot() throws ConsensusGroupAddPeerException {
+ try {
+ latestSnapshotId =
+ String.format(
+ "%s_%s_%d",
+ SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), System.currentTimeMillis());
+ File snapshotDir = new File(storageDir, latestSnapshotId);
+ if (snapshotDir.exists()) {
+ FileUtils.deleteDirectory(snapshotDir);
+ }
+ if (!snapshotDir.mkdirs()) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("%s: cannot mkdir for snapshot", thisNode.getGroupId()));
+ }
+ if (!stateMachine.takeSnapshot(snapshotDir)) {
+ throw new ConsensusGroupAddPeerException("unknown error when taking snapshot");
+ }
+ } catch (IOException e) {
+ throw new ConsensusGroupAddPeerException("error when taking snapshot", e);
+ }
}
- public void loadSnapshot(File latestSnapshotRootDir) {
- stateMachine.loadSnapshot(latestSnapshotRootDir);
+ public void transitSnapshot(Peer targetPeer) throws ConsensusGroupAddPeerException {
+ File snapshotDir = new File(storageDir, latestSnapshotId);
+ List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
+ System.out.println(snapshotPaths);
+ try (SyncMultiLeaderServiceClient client =
+ syncClientManager.borrowClient(targetPeer.getEndpoint())) {
+ for (Path path : snapshotPaths) {
+ SnapshotFragmentReader reader = new SnapshotFragmentReader(latestSnapshotId, path);
+ while (reader.hasNext()) {
+ TSendSnapshotFragmentReq req = reader.next().toTSendSnapshotFragmentReq();
+ req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId());
+ // receiveSnapshotFragment(latestSnapshotId, req.filePath, req.fileChunk);
+ TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req);
+ if (!isSuccess(res.getStatus())) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("error when sending snapshot fragment to %s", targetPeer));
+ }
+ }
+ reader.close();
+ }
+ } catch (IOException | TException e) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("error when send snapshot file to %s", targetPeer), e);
+ }
+ }
+
+ public void receiveSnapshotFragment(
+ String snapshotId, String originalFilePath, ByteBuffer fileChunk)
+ throws ConsensusGroupAddPeerException {
+ try {
+ String targetFilePath = calculateSnapshotPath(snapshotId, originalFilePath);
+ File targetFile = new File(storageDir, targetFilePath);
+ Path parentDir = Paths.get(targetFile.getParent());
+ if (!Files.exists(parentDir)) {
+ Files.createDirectories(parentDir);
+ }
+ Files.write(
+ Paths.get(targetFile.getAbsolutePath()),
+ fileChunk.array(),
+ StandardOpenOption.CREATE,
+ StandardOpenOption.APPEND);
+ } catch (IOException e) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("error when receiving snapshot %s", snapshotId), e);
+ }
+ }
+
+ private String calculateSnapshotPath(String snapshotId, String originalFilePath)
+ throws ConsensusGroupAddPeerException {
+ if (!originalFilePath.contains(snapshotId)) {
+ throw new ConsensusGroupAddPeerException(
+ String.format(
+ "invalid snapshot file. snapshotId: %s, filePath: %s", snapshotId, originalFilePath));
+ }
+ return originalFilePath.substring(originalFilePath.indexOf(snapshotId));
+ }
+
+ public void loadSnapshot(String snapshotId) {
+ // TODO: (xingtanzjr) throw exception if the snapshot load failed
+ stateMachine.loadSnapshot(new File(storageDir, snapshotId));
+ }
+
+ public void inactivePeer(Peer peer) throws ConsensusGroupAddPeerException {
+ try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
+ TInactivatePeerRes res =
+ client.inactivatePeer(
+ new TInactivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
+ if (!isSuccess(res.status)) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("error when inactivating %s. %s", peer, res.getStatus()));
+ }
+ } catch (IOException | TException e) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("error when inactivating %s", peer), e);
+ }
+ }
+
+ public void triggerSnapshotLoad(Peer peer) throws ConsensusGroupAddPeerException {
+ try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
+ TTriggerSnapshotLoadRes res =
+ client.triggerSnapshotLoad(
+ new TTriggerSnapshotLoadReq(
+ thisNode.getGroupId().convertToTConsensusGroupId(), latestSnapshotId));
+ if (!isSuccess(res.status)) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("error when triggering snapshot load %s. %s", peer, res.getStatus()));
+ }
+ } catch (IOException | TException e) {
+ throw new ConsensusGroupAddPeerException(String.format("error when activating %s", peer), e);
+ }
+ }
+
+ public void activePeer(Peer peer) throws ConsensusGroupAddPeerException {
+ try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
+ TActivatePeerRes res =
+ client.activatePeer(new TActivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
+ if (!isSuccess(res.status)) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("error when activating %s. %s", peer, res.getStatus()));
+ }
+ } catch (IOException | TException e) {
+ throw new ConsensusGroupAddPeerException(String.format("error when activating %s", peer), e);
+ }
+ }
+
+ public void notifyPeersToBuildSyncLogChannel(Peer targetPeer)
+ throws ConsensusGroupAddPeerException {
+ // The configuration will be modified during iterating because we will add the targetPeer to
+ // configuration
+ List<Peer> currentMembers = new ArrayList<>(this.configuration);
+ logger.info(
+ "[MultiLeaderConsensus] notify current peers to build sync log. group member: {}, target: {}",
+ currentMembers,
+ targetPeer);
+ for (Peer peer : currentMembers) {
+ logger.info("[MultiLeaderConsensus] build sync log channel from {}", peer);
+ if (peer.equals(thisNode)) {
+ // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the
+ // snapshot produced by thisNode
+ buildSyncLogChannel(targetPeer, index.get());
+ } else {
+ // use RPC to tell other peers to build sync log channel to target peer
+ try (SyncMultiLeaderServiceClient client =
+ syncClientManager.borrowClient(peer.getEndpoint())) {
+ TBuildSyncLogChannelRes res =
+ client.buildSyncLogChannel(
+ new TBuildSyncLogChannelReq(
+ targetPeer.getGroupId().convertToTConsensusGroupId(),
+ targetPeer.getEndpoint()));
+ if (!isSuccess(res.status)) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("build sync log channel failed from %s to %s", peer, targetPeer));
+ }
+ } catch (IOException | TException e) {
+ // We use a simple way to deal with the connection issue when notifying other nodes to
+ // build sync log. If the un-responsible peer is the peer which will be removed, we cannot
+ // suspend the operation and need to skip it. In order to keep the mechanism works fine,
+ // we will skip the peer which cannot be reached.
+ // If following error message appears, the un-responsible peer should be removed manually
+ // after current operation
+ // TODO: (xingtanzjr) design more reliable way for MultiLeaderConsensus
+ logger.error(
+ "cannot notify {} to build sync log channel. Please check the status of this node manually",
+ peer,
+ e);
+ }
+ }
+ }
+ }
+
+ public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer)
+ throws ConsensusGroupAddPeerException {
+ // The configuration will be modified during iterating because we will add the targetPeer to
+ // configuration
+ List<Peer> currentMembers = new ArrayList<>(this.configuration);
+ for (Peer peer : currentMembers) {
+ if (peer.equals(targetPeer)) {
+ // if the targetPeer is the same as current peer, skip it because removing itself is illegal
+ continue;
+ }
+ if (peer.equals(thisNode)) {
+ removeSyncLogChannel(targetPeer);
+ } else {
+ // use RPC to tell other peers to build sync log channel to target peer
+ try (SyncMultiLeaderServiceClient client =
+ syncClientManager.borrowClient(peer.getEndpoint())) {
+ TRemoveSyncLogChannelRes res =
+ client.removeSyncLogChannel(
+ new TRemoveSyncLogChannelReq(
+ targetPeer.getGroupId().convertToTConsensusGroupId(),
+ targetPeer.getEndpoint()));
+ if (!isSuccess(res.status)) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("remove sync log channel failed from %s to %s", peer, targetPeer));
+ }
+ } catch (IOException | TException e) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("error when removing sync log channel to %s", peer), e);
+ }
+ }
+ }
+ }
+
+ private boolean isSuccess(TSStatus status) {
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ }
+
+ /** build SyncLog channel with safeIndex as the default initial sync index */
+ public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException {
+ buildSyncLogChannel(targetPeer, getCurrentSafelyDeletedSearchIndex());
+ }
+
+ public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
+ throws ConsensusGroupAddPeerException {
+ // step 1, build sync channel in LogDispatcher
+ logger.info(
+ "[MultiLeaderConsensus] build sync log channel to {} with initialSyncIndex {}",
+ targetPeer,
+ initialSyncIndex);
+ logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex);
+ // step 2, update configuration
+ configuration.add(targetPeer);
+ logger.info("[MultiLeaderConsensus] persist new configuration: {}", configuration);
+ persistConfigurationUpdate();
+ }
+
+ public void removeSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException {
+ try {
+ logDispatcher.removeLogDispatcherThread(targetPeer);
+ logger.info("[MultiLeaderConsensus] log dispatcher to {} removed and cleanup", targetPeer);
+ configuration.remove(targetPeer);
+ persistConfigurationUpdate();
+ logger.info("[MultiLeaderConsensus] configuration updated to {}", this.configuration);
+ } catch (IOException e) {
+ throw new ConsensusGroupAddPeerException("error when remove LogDispatcherThread", e);
+ }
}
public void persistConfiguration() {
try (PublicBAOS publicBAOS = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
- outputStream.writeInt(configuration.size());
- for (Peer peer : configuration) {
- peer.serialize(outputStream);
- }
+ serializeConfigurationTo(outputStream);
Files.write(
Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath()),
publicBAOS.getBuf());
} catch (IOException e) {
+ // TODO: (xingtanzjr) need to handle the IOException because the MultiLeaderConsensus won't
+ // work expectedly
+ // if the exception occurs
logger.error("Unexpected error occurs when persisting configuration", e);
}
}
+ public void persistConfigurationUpdate() throws ConsensusGroupAddPeerException {
+ try (PublicBAOS publicBAOS = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
+ serializeConfigurationTo(outputStream);
+ Path tmpConfigurationPath =
+ Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
+ Path configurationPath =
+ Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
+ Files.write(tmpConfigurationPath, publicBAOS.getBuf());
+ Files.delete(configurationPath);
+ Files.move(tmpConfigurationPath, configurationPath);
+ } catch (IOException e) {
+ throw new ConsensusGroupAddPeerException(
+ "Unexpected error occurs when update configuration", e);
+ }
+ }
+
+ private void serializeConfigurationTo(DataOutputStream outputStream) throws IOException {
+ outputStream.writeInt(configuration.size());
+ for (Peer peer : configuration) {
+ peer.serialize(outputStream);
+ }
+ }
+
public void recoverConfiguration() {
ByteBuffer buffer;
try {
- buffer =
- ByteBuffer.wrap(
- Files.readAllBytes(
- Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath())));
+ Path tmpConfigurationPath =
+ Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
+ Path configurationPath =
+ Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
+ // If the tmpConfigurationPath exists, it means the `persistConfigurationUpdate` is
+ // interrupted
+ // unexpectedly, we need substitute configuration with tmpConfiguration file
+ if (Files.exists(tmpConfigurationPath)) {
+ if (Files.exists(configurationPath)) {
+ Files.delete(configurationPath);
+ }
+ Files.move(tmpConfigurationPath, configurationPath);
+ }
+ buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
int size = buffer.getInt();
for (int i = 0; i < size; i++) {
configuration.add(Peer.deserialize(buffer));
@@ -278,4 +572,13 @@
public boolean isReadOnly() {
return stateMachine.isReadOnly();
}
+
+ public boolean isActive() {
+ return active;
+ }
+
+ public void setActive(boolean active) {
+ logger.info("set {} active status to {}", this.thisNode, active);
+ this.active = active;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
index b4cefed..958ccd1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
@@ -45,9 +45,7 @@
@Override
public void onComplete(TSyncLogRes response) {
- if (response.getStatus().size() == 1
- && response.getStatus().get(0).getCode()
- == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
+ if (response.getStatus().size() == 1 && needRetry(response.getStatus().get(0).getCode())) {
logger.warn(
"Can not send {} to peer {} for {} times because {}",
batch,
@@ -62,6 +60,12 @@
}
}
+ private boolean needRetry(int statusCode) {
+ return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
+ || statusCode == TSStatusCode.READ_ONLY_SYSTEM_ERROR.getStatusCode()
+ || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
+ }
+
@Override
public void onError(Exception exception) {
logger.warn(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
index 44d1002..8e2d23a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
@@ -33,6 +33,30 @@
private MultiLeaderConsensusClientPool() {}
+ public static class SyncMultiLeaderServiceClientPoolFactory
+ implements IClientPoolFactory<TEndPoint, SyncMultiLeaderServiceClient> {
+ private final MultiLeaderConfig config;
+
+ public SyncMultiLeaderServiceClientPoolFactory(MultiLeaderConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public KeyedObjectPool<TEndPoint, SyncMultiLeaderServiceClient> createClientPool(
+ ClientManager<TEndPoint, SyncMultiLeaderServiceClient> manager) {
+ return new GenericKeyedObjectPool<>(
+ new SyncMultiLeaderServiceClient.Factory(
+ manager,
+ new ClientFactoryProperty.Builder()
+ .setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
+ .setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
+ .setSelectorNumOfAsyncClientManager(
+ config.getRpc().getSelectorNumOfClientManager())
+ .build()),
+ new ClientPoolProperty.Builder<SyncMultiLeaderServiceClient>().build().getConfig());
+ }
+ }
+
public static class AsyncMultiLeaderServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, AsyncMultiLeaderServiceClient> {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/SyncMultiLeaderServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/SyncMultiLeaderServiceClient.java
new file mode 100644
index 0000000..3cbb3ae
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/SyncMultiLeaderServiceClient.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.client;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.BaseClientFactory;
+import org.apache.iotdb.commons.client.ClientFactoryProperty;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.sync.SyncThriftClient;
+import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.lang.reflect.Constructor;
+import java.net.SocketException;
+
+public class SyncMultiLeaderServiceClient extends MultiLeaderConsensusIService.Client
+ implements SyncThriftClient, AutoCloseable {
+
+ private final TEndPoint endPoint;
+ private final ClientManager<TEndPoint, SyncMultiLeaderServiceClient> clientManager;
+
+ public SyncMultiLeaderServiceClient(
+ TProtocolFactory protocolFactory,
+ int connectionTimeout,
+ TEndPoint endPoint,
+ ClientManager<TEndPoint, SyncMultiLeaderServiceClient> clientManager)
+ throws TTransportException {
+ super(
+ protocolFactory.getProtocol(
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ endPoint.getIp(),
+ endPoint.getPort(),
+ connectionTimeout))));
+ this.endPoint = endPoint;
+ this.clientManager = clientManager;
+ getInputProtocol().getTransport().open();
+ }
+
+ @TestOnly
+ public TEndPoint getTEndpoint() {
+ return endPoint;
+ }
+
+ @TestOnly
+ public ClientManager<TEndPoint, SyncMultiLeaderServiceClient> getClientManager() {
+ return clientManager;
+ }
+
+ public void close() {
+ if (clientManager != null) {
+ clientManager.returnClient(endPoint, this);
+ }
+ }
+
+ public void setTimeout(int timeout) {
+ // the same transport is used in both input and output
+ ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+ }
+
+ public void invalidate() {
+ getInputProtocol().getTransport().close();
+ }
+
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endPoint);
+ }
+
+ public int getTimeout() throws SocketException {
+ return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("SyncMultiLeaderServiceClient{%s}", endPoint);
+ }
+
+ public static class Factory extends BaseClientFactory<TEndPoint, SyncMultiLeaderServiceClient> {
+
+ public Factory(
+ ClientManager<TEndPoint, SyncMultiLeaderServiceClient> clientManager,
+ ClientFactoryProperty clientFactoryProperty) {
+ super(clientManager, clientFactoryProperty);
+ }
+
+ @Override
+ public void destroyObject(
+ TEndPoint endpoint, PooledObject<SyncMultiLeaderServiceClient> pooledObject) {
+ pooledObject.getObject().invalidate();
+ }
+
+ @Override
+ public PooledObject<SyncMultiLeaderServiceClient> makeObject(TEndPoint endpoint)
+ throws Exception {
+ Constructor<SyncMultiLeaderServiceClient> constructor =
+ SyncMultiLeaderServiceClient.class.getConstructor(
+ TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
+ return new DefaultPooledObject<>(
+ SyncThriftClientWithErrorHandler.newErrorHandler(
+ SyncMultiLeaderServiceClient.class,
+ constructor,
+ clientFactoryProperty.getProtocolFactory(),
+ clientFactoryProperty.getConnectionTimeoutMs(),
+ endpoint,
+ clientManager));
+ }
+
+ @Override
+ public boolean validateObject(
+ TEndPoint endpoint, PooledObject<SyncMultiLeaderServiceClient> pooledObject) {
+ return pooledObject.getObject() != null
+ && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+ }
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
index 027bf6a..f43f0a4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
@@ -45,13 +45,15 @@
private final String storageDir;
private final String prefix;
+ private final long initialIndex;
private final long checkpointGap;
- public IndexController(String storageDir, String prefix, long checkpointGap) {
+ public IndexController(String storageDir, String prefix, long initialIndex, long checkpointGap) {
this.storageDir = storageDir;
this.prefix = prefix + '-';
this.checkpointGap = checkpointGap;
+ this.initialIndex = initialIndex;
restore();
}
@@ -100,11 +102,17 @@
try {
if (oldFile.exists()) {
FileUtils.moveFile(oldFile, newFile);
+ logger.info(
+ "version file updated, previous: {}, current: {}",
+ oldFile.getAbsolutePath(),
+ newFile.getAbsolutePath());
+ } else {
+ // In the normal state, this branch should not be triggered.
+ logger.error(
+ "failed to flush sync index. cannot find previous version file. previous: {}",
+ lastFlushedIndex);
}
- logger.info(
- "Version file updated, previous: {}, current: {}",
- oldFile.getAbsolutePath(),
- newFile.getAbsolutePath());
+
lastFlushedIndex = flushIndex;
} catch (IOException e) {
logger.error("Error occurred when flushing next version", e);
@@ -138,12 +146,26 @@
}
currentIndex = lastFlushedIndex;
} else {
- versionFile = new File(directory, prefix + "0");
+ currentIndex = initialIndex;
+ versionFile = new File(directory, prefix + initialIndex);
try {
Files.createFile(versionFile.toPath());
+ lastFlushedIndex = initialIndex;
} catch (IOException e) {
+ // TODO: (xingtanzjr) we need to handle the situation that file creation failed.
+ // Or the dispatcher won't run correctly
logger.error("Error occurred when creating new file {}", versionFile.getAbsolutePath(), e);
}
}
}
+
+ public void cleanupVersionFiles() throws IOException {
+ File directory = new File(storageDir);
+ File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(prefix));
+ if (versionFiles != null && versionFiles.length > 0) {
+ for (File versionFile : versionFiles) {
+ Files.delete(versionFile.toPath());
+ }
+ }
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 0574d12..ccf11bf 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -55,15 +55,16 @@
/** Manage all asynchronous replication threads and corresponding async clients */
public class LogDispatcher {
-
private final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
-
+ private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L;
private final MultiLeaderServerImpl impl;
private final List<LogDispatcherThread> threads;
private final String selfPeerId;
private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager;
private ExecutorService executorService;
+ private boolean stopped = false;
+
public LogDispatcher(
MultiLeaderServerImpl impl,
IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager) {
@@ -73,22 +74,26 @@
this.threads =
impl.getConfiguration().stream()
.filter(x -> !Objects.equals(x, impl.getThisNode()))
- .map(x -> new LogDispatcherThread(x, impl.getConfig()))
+ .map(x -> new LogDispatcherThread(x, impl.getConfig(), DEFAULT_INITIAL_SYNC_INDEX))
.collect(Collectors.toList());
if (!threads.isEmpty()) {
+ // We use cached thread pool here because each LogDispatcherThread will occupy one thread.
+ // And every LogDispatcherThread won't release its thread in this pool because it won't stop
+ // unless LogDispatcher stop.
+ // Thus, the size of this threadPool will be the same as the count of LogDispatcherThread.
this.executorService =
- IoTDBThreadPoolFactory.newFixedThreadPool(
- threads.size(), "LogDispatcher-" + impl.getThisNode().getGroupId());
+ IoTDBThreadPoolFactory.newCachedThreadPool(
+ "LogDispatcher-" + impl.getThisNode().getGroupId());
}
}
- public void start() {
+ public synchronized void start() {
if (!threads.isEmpty()) {
threads.forEach(executorService::submit);
}
}
- public void stop() {
+ public synchronized void stop() {
if (!threads.isEmpty()) {
threads.forEach(LogDispatcherThread::stop);
executorService.shutdownNow();
@@ -102,31 +107,65 @@
logger.error("Unexpected Interruption when closing LogDispatcher service ");
}
}
+ stopped = true;
}
- public OptionalLong getMinSyncIndex() {
+ public synchronized void addLogDispatcherThread(Peer peer, long initialSyncIndex) {
+ if (stopped) {
+ return;
+ }
+ //
+ LogDispatcherThread thread = new LogDispatcherThread(peer, impl.getConfig(), initialSyncIndex);
+ threads.add(thread);
+ executorService.submit(thread);
+ }
+
+ public synchronized void removeLogDispatcherThread(Peer peer) throws IOException {
+ if (stopped) {
+ return;
+ }
+ int threadIndex = -1;
+ for (int i = 0; i < threads.size(); i++) {
+ if (threads.get(i).peer.equals(peer)) {
+ threadIndex = i;
+ break;
+ }
+ }
+ if (threadIndex == -1) {
+ return;
+ }
+ threads.get(threadIndex).stop();
+ threads.get(threadIndex).cleanup();
+ threads.remove(threadIndex);
+ }
+
+ public synchronized OptionalLong getMinSyncIndex() {
return threads.stream().mapToLong(LogDispatcherThread::getCurrentSyncIndex).min();
}
public void offer(IndexedConsensusRequest request) {
List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
- threads.forEach(
- thread -> {
- logger.debug(
- "{}->{}: Push a log to the queue, where the queue length is {}",
- impl.getThisNode().getGroupId(),
- thread.getPeer().getEndpoint().getIp(),
- thread.getPendingRequest().size());
- if (!thread
- .getPendingRequest()
- .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
+ // we put the serialization step outside the synchronized block because it is stateless and
+ // time-consuming
+ synchronized (this) {
+ threads.forEach(
+ thread -> {
logger.debug(
- "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
+ "{}->{}: Push a log to the queue, where the queue length is {}",
impl.getThisNode().getGroupId(),
- thread.getPeer(),
- request.getSearchIndex());
- }
- });
+ thread.getPeer().getEndpoint().getIp(),
+ thread.getPendingRequest().size());
+ if (!thread
+ .getPendingRequest()
+ .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
+ logger.debug(
+ "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
+ impl.getThisNode().getGroupId(),
+ thread.getPeer(),
+ request.getSearchIndex());
+ }
+ });
+ }
}
public class LogDispatcherThread implements Runnable {
@@ -148,7 +187,7 @@
private ConsensusReqReader.ReqIterator walEntryiterator;
- public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
+ public LogDispatcherThread(Peer peer, MultiLeaderConfig config, long initialSyncIndex) {
this.peer = peer;
this.config = config;
this.pendingRequest =
@@ -157,6 +196,7 @@
new IndexController(
impl.getStorageDir(),
Utils.fromTEndPointToString(peer.getEndpoint()),
+ initialSyncIndex,
config.getReplication().getCheckpointGap());
this.syncStatus = new SyncStatus(controller, config);
this.walEntryiterator = reader.getReqIterator(START_INDEX);
@@ -186,6 +226,10 @@
stopped = true;
}
+ public void cleanup() throws IOException {
+ this.controller.cleanupVersionFiles();
+ }
+
public boolean isStopped() {
return stopped;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 86c3b0f..2544451 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -21,19 +21,33 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerReq;
+import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerRes;
+import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelReq;
+import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelRes;
+import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
+import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
+import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelReq;
+import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelRes;
+import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentReq;
+import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentRes;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
+import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +75,7 @@
if (impl == null) {
String message =
String.format(
- "Unexpected consensusGroupId %s for TSyncLogReq which size is %s",
+ "unexpected consensusGroupId %s for TSyncLogReq which size is %s",
groupId, req.getBatches().size());
logger.error(message);
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -70,10 +84,17 @@
return;
}
if (impl.isReadOnly()) {
- String message = "Fail to sync log because system is read-only.";
+ String message = "fail to sync log because system is read-only.";
logger.error(message);
- resultHandler.onError(
- new IoTDBException(message, TSStatusCode.READ_ONLY_SYSTEM_ERROR.getStatusCode()));
+ TSStatus status = new TSStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR.getStatusCode());
+ status.setMessage(message);
+ resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
+ return;
+ }
+ if (!impl.isActive()) {
+ TSStatus status = new TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
+ status.setMessage("peer is inactive and not ready to receive sync log request");
+ resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
return;
}
BatchIndexedConsensusRequest requestsInThisBatch =
@@ -106,12 +127,155 @@
}
TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch);
logger.debug(
- "Execute TSyncLogReq for {} with result {}", req.consensusGroupId, writeStatus.subStatus);
+ "execute TSyncLogReq for {} with result {}", req.consensusGroupId, writeStatus.subStatus);
resultHandler.onComplete(new TSyncLogRes(writeStatus.subStatus));
} catch (Exception e) {
resultHandler.onError(e);
}
}
+ @Override
+ public void inactivatePeer(
+ TInactivatePeerReq req, AsyncMethodCallback<TInactivatePeerRes> resultHandler)
+ throws TException {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format("unexpected consensusGroupId %s for inactivatePeer request", groupId);
+ logger.error(message);
+ TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ resultHandler.onComplete(new TInactivatePeerRes(status));
+ return;
+ }
+ impl.setActive(false);
+ resultHandler.onComplete(
+ new TInactivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ }
+
+ @Override
+ public void activatePeer(
+ TActivatePeerReq req, AsyncMethodCallback<TActivatePeerRes> resultHandler) throws TException {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format("unexpected consensusGroupId %s for inactivatePeer request", groupId);
+ logger.error(message);
+ TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ resultHandler.onComplete(new TActivatePeerRes(status));
+ return;
+ }
+ impl.setActive(true);
+ resultHandler.onComplete(
+ new TActivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ }
+
+ @Override
+ public void buildSyncLogChannel(
+ TBuildSyncLogChannelReq req, AsyncMethodCallback<TBuildSyncLogChannelRes> resultHandler)
+ throws TException {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId);
+ logger.error(message);
+ TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ resultHandler.onComplete(new TBuildSyncLogChannelRes(status));
+ return;
+ }
+ TSStatus responseStatus;
+ try {
+ impl.buildSyncLogChannel(new Peer(groupId, req.endPoint));
+ responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (ConsensusGroupAddPeerException e) {
+ responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ responseStatus.setMessage(e.getMessage());
+ }
+ resultHandler.onComplete(new TBuildSyncLogChannelRes(responseStatus));
+ }
+
+ @Override
+ public void removeSyncLogChannel(
+ TRemoveSyncLogChannelReq req, AsyncMethodCallback<TRemoveSyncLogChannelRes> resultHandler)
+ throws TException {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId);
+ logger.error(message);
+ TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ resultHandler.onComplete(new TRemoveSyncLogChannelRes(status));
+ return;
+ }
+ TSStatus responseStatus;
+ try {
+ impl.removeSyncLogChannel(new Peer(groupId, req.endPoint));
+ responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (ConsensusGroupAddPeerException e) {
+ responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ responseStatus.setMessage(e.getMessage());
+ }
+ resultHandler.onComplete(new TRemoveSyncLogChannelRes(responseStatus));
+ }
+
+ @Override
+ public void sendSnapshotFragment(
+ TSendSnapshotFragmentReq req, AsyncMethodCallback<TSendSnapshotFragmentRes> resultHandler)
+ throws TException {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId);
+ logger.error(message);
+ TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ resultHandler.onComplete(new TSendSnapshotFragmentRes(status));
+ return;
+ }
+ TSStatus responseStatus;
+ try {
+ impl.receiveSnapshotFragment(req.snapshotId, req.filePath, req.fileChunk);
+ responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (ConsensusGroupAddPeerException e) {
+ responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ responseStatus.setMessage(e.getMessage());
+ }
+ resultHandler.onComplete(new TSendSnapshotFragmentRes(responseStatus));
+ }
+
+ @Override
+ public void triggerSnapshotLoad(
+ TTriggerSnapshotLoadReq req, AsyncMethodCallback<TTriggerSnapshotLoadRes> resultHandler)
+ throws TException {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId);
+ logger.error(message);
+ TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ resultHandler.onComplete(new TTriggerSnapshotLoadRes(status));
+ return;
+ }
+ impl.loadSnapshot(req.snapshotId);
+ resultHandler.onComplete(
+ new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ }
+
public void handleClientExit() {}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragment.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragment.java
new file mode 100644
index 0000000..ea04296
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragment.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.snapshot;
+
+import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentReq;
+
+import java.nio.ByteBuffer;
+
+public class SnapshotFragment {
+ private final String snapshotId;
+ private final String filePath;
+ private final long totalSize;
+ private final long startOffset;
+ private final long fragmentSize;
+ private final ByteBuffer fileChunk;
+
+ public SnapshotFragment(
+ String snapshotId,
+ String filePath,
+ long totalSize,
+ long startOffset,
+ long fragmentSize,
+ ByteBuffer fileChunk) {
+ this.snapshotId = snapshotId;
+ this.filePath = filePath;
+ this.totalSize = totalSize;
+ this.startOffset = startOffset;
+ this.fragmentSize = fragmentSize;
+ this.fileChunk = fileChunk;
+ }
+
+ public TSendSnapshotFragmentReq toTSendSnapshotFragmentReq() {
+ TSendSnapshotFragmentReq req = new TSendSnapshotFragmentReq();
+ req.setSnapshotId(snapshotId);
+ req.setFilePath(filePath);
+ req.setChunkLength(fragmentSize);
+ req.setFileChunk(fileChunk);
+ return req;
+ }
+
+ public String getSnapshotId() {
+ return snapshotId;
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public long getTotalSize() {
+ return totalSize;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public long getFragmentSize() {
+ return fragmentSize;
+ }
+
+ public ByteBuffer getFileChunk() {
+ return fileChunk;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragmentReader.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragmentReader.java
new file mode 100644
index 0000000..1509995
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragmentReader.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class SnapshotFragmentReader {
+
+ private static final int DEFAULT_FILE_FRAGMENT_SIZE = 10 * 1024 * 1024;
+ private final String snapshotId;
+ private final String filePath;
+ private final SeekableByteChannel fileChannel;
+ private final long fileSize;
+ private final ByteBuffer buf;
+ private long totalReadSize;
+ private SnapshotFragment cachedSnapshotFragment;
+
+ public SnapshotFragmentReader(String snapshotId, Path path) throws IOException {
+ this.snapshotId = snapshotId;
+ this.filePath = path.toAbsolutePath().toString();
+ this.fileSize = Files.size(path);
+ this.fileChannel = Files.newByteChannel(path);
+ this.buf = ByteBuffer.allocate(DEFAULT_FILE_FRAGMENT_SIZE);
+ }
+
+ public boolean hasNext() throws IOException {
+ buf.clear();
+ int actualReadSize = fileChannel.read(buf);
+ buf.flip();
+ if (actualReadSize > 0) {
+ cachedSnapshotFragment =
+ new SnapshotFragment(snapshotId, filePath, fileSize, totalReadSize, actualReadSize, buf);
+ totalReadSize += actualReadSize;
+ return true;
+ }
+ return false;
+ }
+
+ public SnapshotFragment next() {
+ return cachedSnapshotFragment;
+ }
+
+ public void close() throws IOException {
+ if (fileChannel != null) {
+ fileChannel.close();
+ }
+ }
+}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
index 2675470..761ace4 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
@@ -49,7 +49,7 @@
@Test
public void testIncrementIntervalAfterRestart() {
IndexController controller =
- new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+ new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -58,7 +58,7 @@
Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+ controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -66,7 +66,7 @@
Assert.assertEquals(CHECK_POINT_GAP + 1, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+ controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
@@ -74,7 +74,7 @@
Assert.assertEquals(CHECK_POINT_GAP * 2 - 1, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+ controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index f7c9072..a888904 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -56,7 +56,7 @@
@Test
public void sequenceTest() throws InterruptedException {
IndexController controller =
- new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+ new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
SyncStatus status = new SyncStatus(controller, config);
@@ -82,7 +82,7 @@
@Test
public void reverseTest() throws InterruptedException {
IndexController controller =
- new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+ new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -115,7 +115,7 @@
@Test
public void mixedTest() throws InterruptedException {
IndexController controller =
- new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+ new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -160,7 +160,7 @@
@Test
public void waitTest() throws InterruptedException, ExecutionException {
IndexController controller =
- new IndexController(storageDir.getAbsolutePath(), prefix, CHECK_POINT_GAP);
+ new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
SyncStatus status = new SyncStatus(controller, config);
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 75b18d5..0d90993 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -31,6 +31,9 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.engine.cache.BloomFilterCache;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.snapshot.SnapshotLoader;
import org.apache.iotdb.db.engine.snapshot.SnapshotTaker;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
@@ -124,6 +127,9 @@
try {
StorageEngineV2.getInstance()
.setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region);
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
+ BloomFilterCache.getInstance().clear();
} catch (Exception e) {
logger.error("Exception occurs when replacing data region in storage engine.", e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
index e3fdf37..97771dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
@@ -122,6 +122,12 @@
return null;
}
LOGGER.info("Moving snapshot file to data dirs");
+ try {
+ deleteAllFilesInDataDirs();
+ LOGGER.info("Remove all data files in original data dir");
+ } catch (IOException e) {
+ return null;
+ }
createLinksFromSnapshotDirToDataDirWithoutLog(new File(snapshotPath));
return loadSnapshot();
} catch (IOException | DiskSpaceInsufficientException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
index 67b3df9..d90eb08 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
@@ -149,6 +149,9 @@
continue;
}
File tsFile = resource.getTsFile();
+ if (!resource.isClosed()) {
+ continue;
+ }
File snapshotTsFile = getSnapshotFilePathForTsFile(tsFile, snapshotId);
// create hard link for tsfile, resource, mods
createHardLink(snapshotTsFile, tsFile);
@@ -169,7 +172,14 @@
}
private void createHardLink(File target, File source) throws IOException {
- Files.createLink(target.toPath(), source.toPath());
+ if (!target.getParentFile().exists()) {
+ LOGGER.error("Hard link target dir {} doesn't exist", target.getParentFile());
+ }
+ if (!source.exists()) {
+ LOGGER.error("Hard link source file {} doesn't exist", source);
+ }
+
+ Files.createLink(target.getAbsoluteFile().toPath(), source.getAbsoluteFile().toPath());
snapshotLogger.logFile(source.getAbsolutePath(), target.getAbsolutePath());
}
@@ -216,6 +226,7 @@
}
private void cleanUpWhenFail(String snapshotId) {
+ LOGGER.info("Cleaning up snapshot dir for {}", snapshotId);
for (String dataDir : IoTDBDescriptor.getInstance().getConfig().getDataDirs()) {
File dataDirForThisSnapshot =
new File(dataDir + File.separator + "snapshot" + File.separator + snapshotId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
new file mode 100644
index 0000000..4a5b416
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool;
+import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
+import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
+import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
+import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
+import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
+
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestRPCClient {
+ private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ INTERNAL_SERVICE_CLIENT_MANAGER =
+ new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+
+ private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager;
+
+ public TestRPCClient() {
+ syncClientManager =
+ new IClientManager.Factory<TEndPoint, SyncMultiLeaderServiceClient>()
+ .createClientManager(
+ new MultiLeaderConsensusClientPool.SyncMultiLeaderServiceClientPoolFactory(
+ new MultiLeaderConfig.Builder().build()));
+ }
+
+ public static void main(String args[]) {
+ TestRPCClient client = new TestRPCClient();
+ // client.removeRegionPeer();
+ client.addPeer();
+ // client.loadSnapshot();
+ }
+
+ private void loadSnapshot() {
+ try (SyncMultiLeaderServiceClient client =
+ syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40011))) {
+ TTriggerSnapshotLoadRes res =
+ client.triggerSnapshotLoad(
+ new TTriggerSnapshotLoadReq(
+ new DataRegionId(1).convertToTConsensusGroupId(), "snapshot_1_1662370255552"));
+ System.out.println(res.status);
+ } catch (IOException | TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void testAddPeer() {
+ try (SyncMultiLeaderServiceClient client =
+ syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40012))) {
+ TInactivatePeerRes res =
+ client.inactivatePeer(
+ new TInactivatePeerReq(new DataRegionId(1).convertToTConsensusGroupId()));
+ System.out.println(res.status);
+ } catch (IOException | TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void removeRegionPeer() {
+ try (SyncDataNodeInternalServiceClient client =
+ INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) {
+ client.removeRegionPeer(
+ new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
+ } catch (IOException | TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void addPeer() {
+ try (SyncDataNodeInternalServiceClient client =
+ INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) {
+ client.addRegionPeer(
+ new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
+ } catch (IOException | TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private TDataNodeLocation getLocation3(int dataNodeId) {
+ return new TDataNodeLocation(
+ dataNodeId,
+ new TEndPoint("127.0.0.1", 6669),
+ new TEndPoint("127.0.0.1", 9005),
+ new TEndPoint("127.0.0.1", 8779),
+ new TEndPoint("127.0.0.1", 40012),
+ new TEndPoint("127.0.0.1", 50012));
+ }
+
+ private TDataNodeLocation getLocation2(int dataNodeId) {
+ return new TDataNodeLocation(
+ dataNodeId,
+ new TEndPoint("127.0.0.1", 6668),
+ new TEndPoint("127.0.0.1", 9004),
+ new TEndPoint("127.0.0.1", 8778),
+ new TEndPoint("127.0.0.1", 40011),
+ new TEndPoint("127.0.0.1", 50011));
+ }
+
+ private void createDataRegion() {
+ try (SyncDataNodeInternalServiceClient client =
+ INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9005))) {
+ TCreateDataRegionReq req = new TCreateDataRegionReq();
+ req.setStorageGroup("root.test.g_0");
+ TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
+ regionReplicaSet.setRegionId(new DataRegionId(1).convertToTConsensusGroupId());
+ List<TDataNodeLocation> locationList = new ArrayList<>();
+ locationList.add(
+ new TDataNodeLocation(
+ 3,
+ new TEndPoint("127.0.0.1", 6667),
+ new TEndPoint("127.0.0.1", 9003),
+ new TEndPoint("127.0.0.1", 8777),
+ new TEndPoint("127.0.0.1", 40010),
+ new TEndPoint("127.0.0.1", 50010)));
+ locationList.add(
+ new TDataNodeLocation(
+ 4,
+ new TEndPoint("127.0.0.1", 6668),
+ new TEndPoint("127.0.0.1", 9004),
+ new TEndPoint("127.0.0.1", 8778),
+ new TEndPoint("127.0.0.1", 40011),
+ new TEndPoint("127.0.0.1", 50011)));
+ locationList.add(
+ new TDataNodeLocation(
+ 4,
+ new TEndPoint("127.0.0.1", 6669),
+ new TEndPoint("127.0.0.1", 9005),
+ new TEndPoint("127.0.0.1", 8779),
+ new TEndPoint("127.0.0.1", 40012),
+ new TEndPoint("127.0.0.1", 50012)));
+ regionReplicaSet.setDataNodeLocations(locationList);
+ req.setRegionReplicaSet(regionReplicaSet);
+ TSStatus res = client.createDataRegion(req);
+ System.out.println(res.code + " " + res.message);
+
+ } catch (IOException | TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index e532877..f83730b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -169,7 +169,7 @@
private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) {
for (TEndPoint endPoint : queryContext.getEndPointBlackList()) {
- if (endPoint.getIp().equals(dataNodeLocation.internalEndPoint.getIp())) {
+ if (endPoint.equals(dataNodeLocation.internalEndPoint)) {
return false;
}
}
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 7d315d2..380f30a 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -33,10 +33,71 @@
3: required list<TLogBatch> batches
}
+struct TInactivatePeerReq {
+ 1: required common.TConsensusGroupId consensusGroupId
+}
+
+struct TInactivatePeerRes {
+ 1: required common.TSStatus status
+}
+
+struct TActivatePeerReq {
+ 1: required common.TConsensusGroupId consensusGroupId
+}
+
+struct TActivatePeerRes {
+ 1: required common.TSStatus status
+}
+
struct TSyncLogRes {
1: required list<common.TSStatus> status
}
+struct TBuildSyncLogChannelReq {
+ 1: required common.TConsensusGroupId consensusGroupId
+ 2: required common.TEndPoint endPoint
+}
+
+struct TBuildSyncLogChannelRes {
+ 1: required common.TSStatus status
+}
+
+struct TRemoveSyncLogChannelReq {
+ 1: required common.TConsensusGroupId consensusGroupId
+ 2: required common.TEndPoint endPoint
+}
+
+struct TRemoveSyncLogChannelRes {
+ 1: required common.TSStatus status
+}
+
+struct TSendSnapshotFragmentReq {
+ 1: required common.TConsensusGroupId consensusGroupId
+ 2: required string snapshotId
+ 3: required string filePath
+ 4: required i64 chunkLength
+ 5: required binary fileChunk
+}
+
+struct TSendSnapshotFragmentRes {
+ 1: required common.TSStatus status
+}
+
+struct TTriggerSnapshotLoadReq {
+ 1: required common.TConsensusGroupId consensusGroupId
+ 2: required string snapshotId
+}
+
+struct TTriggerSnapshotLoadRes {
+ 1: required common.TSStatus status
+}
+
service MultiLeaderConsensusIService {
TSyncLogRes syncLog(TSyncLogReq req)
+ TInactivatePeerRes inactivatePeer(TInactivatePeerReq req)
+ TActivatePeerRes activatePeer(TActivatePeerReq req)
+ TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req)
+ TRemoveSyncLogChannelRes removeSyncLogChannel(TRemoveSyncLogChannelReq req)
+ TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq req)
+ TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req)
}
\ No newline at end of file