fix some log and potential memory leak. (#7414)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 5803b9e..c9c71aa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -228,22 +228,24 @@
public void transitSnapshot(Peer targetPeer) throws ConsensusGroupAddPeerException {
File snapshotDir = new File(storageDir, latestSnapshotId);
List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
- System.out.println(snapshotPaths);
+ logger.info("transit snapshots: {}", snapshotPaths);
try (SyncMultiLeaderServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
for (Path path : snapshotPaths) {
SnapshotFragmentReader reader = new SnapshotFragmentReader(latestSnapshotId, path);
- while (reader.hasNext()) {
- TSendSnapshotFragmentReq req = reader.next().toTSendSnapshotFragmentReq();
- req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId());
- // receiveSnapshotFragment(latestSnapshotId, req.filePath, req.fileChunk);
- TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req);
- if (!isSuccess(res.getStatus())) {
- throw new ConsensusGroupAddPeerException(
- String.format("error when sending snapshot fragment to %s", targetPeer));
+ try {
+ while (reader.hasNext()) {
+ TSendSnapshotFragmentReq req = reader.next().toTSendSnapshotFragmentReq();
+ req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId());
+ TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req);
+ if (!isSuccess(res.getStatus())) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("error when sending snapshot fragment to %s", targetPeer));
+ }
}
+ } finally {
+ reader.close();
}
- reader.close();
}
} catch (IOException | TException e) {
throw new ConsensusGroupAddPeerException(
@@ -427,15 +429,19 @@
logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex);
// step 2, update configuration
configuration.add(targetPeer);
+ // step 3, persist configuration
logger.info("[MultiLeaderConsensus] persist new configuration: {}", configuration);
persistConfigurationUpdate();
}
public void removeSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException {
try {
+ // step 1, remove sync channel in LogDispatcher
logDispatcher.removeLogDispatcherThread(targetPeer);
logger.info("[MultiLeaderConsensus] log dispatcher to {} removed and cleanup", targetPeer);
+ // step 2, update configuration
configuration.remove(targetPeer);
+ // step 3, persist configuration
persistConfigurationUpdate();
logger.info("[MultiLeaderConsensus] configuration updated to {}", this.configuration);
} catch (IOException e) {