[IOTDB-4488] Implement ClusterSyncInfo snapshot by SyncLogWritter and SyncLogReader (#7452)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
index 5838fb2..c42dd0d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
@@ -101,17 +101,20 @@
     return resp;
   }
 
+  // endregion
+
+  // ======================================================
+  // region Implement of Snapshot
+  // ======================================================
+
   @Override
   public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
-    // TODO: merge snapshot logic into SyncLogWritter and SyncLogReader
-    // TODO: add ClusterSyncInfoTest later
-    return true;
+    return syncMetadata.processTakeSnapshot(snapshotDir);
   }
 
   @Override
   public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
-    // TODO: merge snapshot logic into SyncLogWritter and SyncLogReader
-    // TODO: add ClusterSyncInfoTest later
+    syncMetadata.processLoadSnapshot(snapshotDir);
   }
 
   // endregion
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java
new file mode 100644
index 0000000..dd9abc5
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.exception.sync.PipeSinkException;
+import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
+import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class ClusterSyncInfoTest {
+
+  private ClusterSyncInfo clusterSyncInfo;
+  private static final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot");
+
+  @Before
+  public void setup() throws IOException {
+    clusterSyncInfo = new ClusterSyncInfo();
+    if (!snapshotDir.exists()) {
+      snapshotDir.mkdirs();
+    }
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    if (snapshotDir.exists()) {
+      FileUtils.deleteDirectory(snapshotDir);
+    }
+  }
+
+  private void prepareClusterSyncInfo() {
+    Map<String, String> attributes1 = new HashMap<>();
+    attributes1.put("ip", "192.168.11.11");
+    attributes1.put("port", "7766");
+    TPipeSinkInfo pipeSinkInfo1 =
+        new TPipeSinkInfo()
+            .setPipeSinkName("demo1")
+            .setPipeSinkType("IoTDB")
+            .setAttributes(attributes1);
+    Map<String, String> attributes2 = new HashMap<>();
+    attributes2.put("ip", "192.168.22.2");
+    attributes2.put("port", "7777");
+    TPipeSinkInfo pipeSinkInfo2 =
+        new TPipeSinkInfo()
+            .setPipeSinkName("demo2")
+            .setPipeSinkType("IoTDB")
+            .setAttributes(attributes2);
+
+    clusterSyncInfo.addPipeSink(new CreatePipeSinkPlan(pipeSinkInfo1));
+    clusterSyncInfo.addPipeSink(new CreatePipeSinkPlan(pipeSinkInfo2));
+  }
+
+  @Test
+  public void testSnapshot() throws Exception {
+    prepareClusterSyncInfo();
+
+    clusterSyncInfo.processTakeSnapshot(snapshotDir);
+    ClusterSyncInfo clusterSyncInfo2 = new ClusterSyncInfo();
+    clusterSyncInfo2.processLoadSnapshot(snapshotDir);
+
+    List<PipeSink> expectedPipeSink =
+        clusterSyncInfo.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
+    List<PipeSink> actualPipeSink =
+        clusterSyncInfo2.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
+    Assert.assertEquals(expectedPipeSink, actualPipeSink);
+  }
+
+  @Test
+  public void testPipeSinkOperation() {
+    prepareClusterSyncInfo();
+    try {
+      clusterSyncInfo.checkAddPipeSink("demo1");
+      clusterSyncInfo.checkDropPipeSink("demo3");
+      Assert.fail("checkOperatePipeSink ignore failure.");
+    } catch (PipeSinkException e) {
+      // nothing
+    }
+    try {
+      clusterSyncInfo.checkAddPipeSink("demo3");
+      clusterSyncInfo.checkDropPipeSink("demo1");
+    } catch (PipeSinkException e) {
+      Assert.fail("checkOperatePipeSink should not throw exception.");
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
index fee6963..5704381 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
@@ -20,18 +20,31 @@
 
 import org.apache.iotdb.commons.exception.sync.PipeException;
 import org.apache.iotdb.commons.exception.sync.PipeSinkException;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.sync.persistence.SyncLogReader;
+import org.apache.iotdb.commons.sync.persistence.SyncLogWriter;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipe.PipeStatus;
 import org.apache.iotdb.commons.sync.pipe.SyncOperation;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncConstant;
 
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class SyncMetadata {
+public class SyncMetadata implements SnapshotProcessor {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SyncMetadata.class);
 
   // <PipeSinkName, PipeSink>
   private Map<String, PipeSink> pipeSinks;
@@ -207,6 +220,56 @@
     }
   }
 
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
+    File snapshotFile = new File(snapshotDir, SyncConstant.SYNC_LOG_NAME);
+    if (snapshotFile.exists() && snapshotFile.isFile()) {
+      LOGGER.error(
+          "Failed to take snapshot, because snapshot file [{}] is already exist.",
+          snapshotFile.getAbsolutePath());
+      return false;
+    }
+    File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID());
+    try (SyncLogWriter writer = new SyncLogWriter(snapshotDir, tmpFile.getName())) {
+      for (PipeSink pipeSink : pipeSinks.values()) {
+        writer.addPipeSink(pipeSink);
+      }
+      for (Map<Long, PipeInfo> map : pipes.values()) {
+        for (PipeInfo pipeInfo : map.values()) {
+          writer.addPipe(pipeInfo);
+          switch (pipeInfo.getStatus()) {
+            case RUNNING:
+              writer.operatePipe(pipeInfo.getPipeName(), SyncOperation.START_PIPE);
+              break;
+            case STOP:
+              writer.operatePipe(pipeInfo.getPipeName(), SyncOperation.STOP_PIPE);
+              break;
+            case DROP:
+              writer.operatePipe(pipeInfo.getPipeName(), SyncOperation.DROP_PIPE);
+              break;
+          }
+        }
+      }
+    }
+    return tmpFile.renameTo(snapshotFile);
+  }
+
+  @Override
+  public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
+    File snapshotFile = new File(snapshotDir, SyncConstant.SYNC_LOG_NAME);
+    if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+      LOGGER.error(
+          "Failed to load snapshot,snapshot file [{}] is not exist.",
+          snapshotFile.getAbsolutePath());
+      return;
+    }
+    SyncLogReader reader = new SyncLogReader(snapshotDir);
+    reader.recover();
+    setPipes(reader.getAllPipeInfos());
+    setPipeSinks(reader.getAllPipeSinks());
+    setRunningPipe(reader.getRunningPipeInfo());
+  }
+
   // endregion
 
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java
new file mode 100644
index 0000000..2f402ca
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.sync.persistence;
+
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncConstant;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SyncLogReader {
+  private static final Logger logger = LoggerFactory.getLogger(SyncLogReader.class);
+  // <pipeSinkName, PipeSink>
+  private final Map<String, PipeSink> pipeSinks = new ConcurrentHashMap<>();
+  private final Map<String, Map<Long, PipeInfo>> pipes = new ConcurrentHashMap<>();
+  private PipeInfo runningPipe;
+  private final File dir;
+  private final String fileName;
+
+  public SyncLogReader(File dir) {
+    this.dir = dir;
+    this.fileName = SyncConstant.SYNC_LOG_NAME;
+  }
+
+  public SyncLogReader(File dir, String fileName) {
+    this.dir = dir;
+    this.fileName = fileName;
+  }
+
+  public void recover() throws IOException {
+    logger.info("Start to recover all sync state for sync.");
+    File serviceLogFile = new File(dir, fileName);
+    if (!serviceLogFile.exists()) {
+      logger.warn("Sync service log file not found");
+    } else {
+      try (InputStream inputStream = new FileInputStream(serviceLogFile)) {
+        recoverPipe(inputStream);
+      }
+    }
+  }
+
+  public Map<String, PipeSink> getAllPipeSinks() {
+    return pipeSinks;
+  }
+
+  public Map<String, Map<Long, PipeInfo>> getAllPipeInfos() {
+    return pipes;
+  }
+
+  public PipeInfo getRunningPipeInfo() {
+    return runningPipe;
+  }
+
+  private void recoverPipe(InputStream inputStream) throws IOException {
+    byte nextByte;
+    while ((nextByte = ReadWriteIOUtils.readByte(inputStream)) != -1) {
+      SyncOperation operationType = SyncOperation.values()[nextByte];
+      switch (operationType) {
+        case CREATE_PIPESINK:
+          PipeSink pipeSink = PipeSink.deserializePipeSink(inputStream);
+          pipeSinks.put(pipeSink.getPipeSinkName(), pipeSink);
+          break;
+        case DROP_PIPESINK:
+          pipeSinks.remove(ReadWriteIOUtils.readString(inputStream));
+          break;
+        case CREATE_PIPE:
+          runningPipe = PipeInfo.deserializePipeInfo(inputStream);
+          pipes
+              .computeIfAbsent(runningPipe.getPipeName(), i -> new ConcurrentHashMap<>())
+              .computeIfAbsent(runningPipe.getCreateTime(), i -> runningPipe);
+          break;
+        case STOP_PIPE:
+          // TODO: support multiple pipe
+          ReadWriteIOUtils.readString(inputStream);
+          runningPipe.stop();
+          break;
+        case START_PIPE:
+          // TODO: support multiple pipe
+          ReadWriteIOUtils.readString(inputStream);
+          runningPipe.start();
+          break;
+        case DROP_PIPE:
+          // TODO: support multiple pipe
+          ReadWriteIOUtils.readString(inputStream);
+          runningPipe.drop();
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              String.format("Can not recognize SyncOperation %s.", operationType.name()));
+      }
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogWriter.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogWriter.java
new file mode 100644
index 0000000..84fe42f
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogWriter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.sync.persistence;
+
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncConstant;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * SyncLogger is used to manage the persistent information in the sync module. Persistent
+ * information can be recovered on reboot via {@linkplain SyncLogReader}.
+ */
+public class SyncLogWriter implements AutoCloseable {
+  // record pipe meta info
+  private OutputStream outputStream;
+  private final File dir;
+  private final String fileName;
+
+  public SyncLogWriter(File dir) {
+    this.dir = dir;
+    this.fileName = SyncConstant.SYNC_LOG_NAME;
+  }
+
+  public SyncLogWriter(File dir, String fileName) {
+    this.dir = dir;
+    this.fileName = fileName;
+  }
+
+  public void getOutputStream() throws IOException {
+    if (outputStream == null) {
+      //      File logFile = new File(SyncPathUtil.getSysDir(), SyncConstant.SYNC_LOG_NAME);
+      File logFile = new File(dir, fileName);
+      if (!logFile.getParentFile().exists()) {
+        logFile.getParentFile().mkdirs();
+      }
+      outputStream = new FileOutputStream(logFile, true);
+    }
+  }
+
+  public synchronized void addPipeSink(PipeSink pipeSink) throws IOException {
+    getOutputStream();
+    ReadWriteIOUtils.write((byte) SyncOperation.CREATE_PIPESINK.ordinal(), outputStream);
+    pipeSink.serialize(outputStream);
+  }
+
+  public synchronized void dropPipeSink(String pipeSinkName) throws IOException {
+    getOutputStream();
+    ReadWriteIOUtils.write((byte) SyncOperation.DROP_PIPESINK.ordinal(), outputStream);
+    ReadWriteIOUtils.write(pipeSinkName, outputStream);
+  }
+
+  public synchronized void addPipe(PipeInfo pipeInfo) throws IOException {
+    getOutputStream();
+    ReadWriteIOUtils.write((byte) SyncOperation.CREATE_PIPE.ordinal(), outputStream);
+    pipeInfo.serialize(outputStream);
+  }
+
+  public synchronized void operatePipe(String pipeName, SyncOperation syncOperation)
+      throws IOException {
+    getOutputStream();
+    ReadWriteIOUtils.write((byte) syncOperation.ordinal(), outputStream);
+    ReadWriteIOUtils.write(pipeName, outputStream);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (outputStream != null) {
+      outputStream.close();
+      outputStream = null;
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
index 2067145..b93da6a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
@@ -18,13 +18,23 @@
  */
 package org.apache.iotdb.commons.sync.pipe;
 
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 public abstract class PipeInfo {
+
   protected String pipeName;
   protected String pipeSinkName;
   protected PipeStatus status;
   protected long createTime;
   protected PipeMessage.PipeMessageType messageType;
 
+  // only used for serialization
+  protected PipeInfo() {}
+
   public PipeInfo(String pipeName, String pipeSinkName, long createTime) {
     this.pipeName = pipeName;
     this.pipeSinkName = pipeSinkName;
@@ -41,6 +51,8 @@
     this.messageType = PipeMessage.PipeMessageType.NORMAL;
   }
 
+  abstract PipeType getType();
+
   public String getPipeName() {
     return pipeName;
   }
@@ -92,4 +104,42 @@
   public void setCreateTime(long createTime) {
     this.createTime = createTime;
   }
+
+  public void serialize(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write((byte) getType().ordinal(), outputStream);
+    ReadWriteIOUtils.write(pipeName, outputStream);
+    ReadWriteIOUtils.write(pipeSinkName, outputStream);
+    ReadWriteIOUtils.write((byte) status.ordinal(), outputStream);
+    ReadWriteIOUtils.write(createTime, outputStream);
+    ReadWriteIOUtils.write((byte) messageType.ordinal(), outputStream);
+  }
+
+  protected void deserialize(InputStream inputStream) throws IOException {
+    pipeName = ReadWriteIOUtils.readString(inputStream);
+    pipeSinkName = ReadWriteIOUtils.readString(inputStream);
+    status = PipeStatus.values()[ReadWriteIOUtils.readByte(inputStream)];
+    createTime = ReadWriteIOUtils.readLong(inputStream);
+    messageType = PipeMessage.PipeMessageType.values()[ReadWriteIOUtils.readByte(inputStream)];
+  }
+
+  public static PipeInfo deserializePipeInfo(InputStream inputStream) throws IOException {
+    PipeType pipeType = PipeType.values()[ReadWriteIOUtils.readByte(inputStream)];
+    PipeInfo pipeInfo;
+    switch (pipeType) {
+      case TsFilePipe:
+        pipeInfo = new TsFilePipeInfo();
+        pipeInfo.deserialize(inputStream);
+        break;
+      case WALPipe:
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Can not recognize PipeType %s.", pipeType.name()));
+    }
+    return pipeInfo;
+  }
+
+  enum PipeType {
+    TsFilePipe,
+    WALPipe
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
index 618e0eb..d2bfdf7 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
@@ -19,6 +19,9 @@
 package org.apache.iotdb.commons.sync.pipe;
 
 public enum SyncOperation {
+  CREATE_PIPESINK,
+  DROP_PIPESINK,
+  CREATE_PIPE,
   START_PIPE,
   STOP_PIPE,
   DROP_PIPE
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
index 81feed0..391d4c6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
@@ -18,12 +18,20 @@
  */
 package org.apache.iotdb.commons.sync.pipe;
 
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Objects;
 
 public class TsFilePipeInfo extends PipeInfo {
   private boolean syncDelOp;
   private long dataStartTimestamp;
 
+  // only used for serialization
+  protected TsFilePipeInfo() {}
+
   public TsFilePipeInfo(
       String pipeName,
       String pipeSinkName,
@@ -64,6 +72,25 @@
   }
 
   @Override
+  PipeType getType() {
+    return PipeType.TsFilePipe;
+  }
+
+  @Override
+  public void serialize(OutputStream outputStream) throws IOException {
+    super.serialize(outputStream);
+    ReadWriteIOUtils.write(syncDelOp, outputStream);
+    ReadWriteIOUtils.write(dataStartTimestamp, outputStream);
+  }
+
+  @Override
+  protected void deserialize(InputStream inputStream) throws IOException {
+    super.deserialize(inputStream);
+    syncDelOp = ReadWriteIOUtils.readBool(inputStream);
+    dataStartTimestamp = ReadWriteIOUtils.readLong(inputStream);
+  }
+
+  @Override
   public String toString() {
     return "TsFilePipeInfo{"
         + "pipeName='"
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
index 56736a4..b0cbf2a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
@@ -24,14 +24,18 @@
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 public class IoTDBPipeSink implements PipeSink {
-  private final PipeSinkType pipeSinkType;
+  private final PipeSinkType pipeSinkType = PipeSinkType.IoTDB;
 
   private String name;
   private String ip;
@@ -40,11 +44,13 @@
   private static final String ATTRIBUTE_IP_KEY = "ip";
   private static final String ATTRIBUTE_PORT_KEY = "port";
 
+  public IoTDBPipeSink() {}
+
   public IoTDBPipeSink(String name) {
-    ip = SyncConstant.DEFAULT_PIPE_SINK_IP;
-    port = SyncConstant.DEFAULT_PIPE_SINK_PORT;
+    this();
+    this.ip = SyncConstant.DEFAULT_PIPE_SINK_IP;
+    this.port = SyncConstant.DEFAULT_PIPE_SINK_PORT;
     this.name = name;
-    pipeSinkType = PipeSinkType.IoTDB;
   }
 
   @Override
@@ -123,6 +129,21 @@
   }
 
   @Override
+  public void serialize(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write((byte) pipeSinkType.ordinal(), outputStream);
+    ReadWriteIOUtils.write(name, outputStream);
+    ReadWriteIOUtils.write(ip, outputStream);
+    ReadWriteIOUtils.write(port, outputStream);
+  }
+
+  @Override
+  public void deserialize(InputStream inputStream) throws IOException {
+    name = ReadWriteIOUtils.readString(inputStream);
+    ip = ReadWriteIOUtils.readString(inputStream);
+    port = ReadWriteIOUtils.readInt(inputStream);
+  }
+
+  @Override
   public String toString() {
     return "IoTDBPipeSink{"
         + "pipeSinkType="
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
index d68a35d..588618d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
@@ -22,7 +22,11 @@
 import org.apache.iotdb.commons.exception.sync.PipeSinkException;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.List;
 import java.util.Map;
 
@@ -41,6 +45,27 @@
 
   TPipeSinkInfo getTPipeSinkInfo();
 
+  void serialize(OutputStream outputStream) throws IOException;
+
+  void deserialize(InputStream inputStream) throws IOException;
+
+  static PipeSink deserializePipeSink(InputStream inputStream) throws IOException {
+    PipeSinkType pipeSinkType = PipeSinkType.values()[ReadWriteIOUtils.readByte(inputStream)];
+    PipeSink pipeSink;
+    switch (pipeSinkType) {
+      case IoTDB:
+        pipeSink = new IoTDBPipeSink();
+        pipeSink.deserialize(inputStream);
+        break;
+      case ExternalPipe:
+        // TODO(ext-pipe): deserialize external pipesink here
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Can not recognize PipeSinkType %s.", pipeSinkType.name()));
+    }
+    return pipeSink;
+  }
+
   enum PipeSinkType {
     IoTDB,
     ExternalPipe
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
index 6f44f50..a90a67b 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
@@ -18,25 +18,26 @@
  */
 package org.apache.iotdb.db.sync.common;
 
-import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.exception.sync.PipeException;
 import org.apache.iotdb.commons.exception.sync.PipeSinkException;
 import org.apache.iotdb.commons.sync.metadata.SyncMetadata;
+import org.apache.iotdb.commons.sync.persistence.SyncLogReader;
+import org.apache.iotdb.commons.sync.persistence.SyncLogWriter;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipe.SyncOperation;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
 import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
@@ -49,15 +50,15 @@
   private final SyncMetadata syncMetadata;
 
   public LocalSyncInfo() {
-    syncLogWriter = SyncLogWriter.getInstance();
+    syncLogWriter = new SyncLogWriter(new File(SyncPathUtil.getSysDir()));
     syncMetadata = new SyncMetadata();
-    SyncLogReader logReader = new SyncLogReader();
+    SyncLogReader logReader = new SyncLogReader(new File(SyncPathUtil.getSysDir()));
     try {
       logReader.recover();
       syncMetadata.setPipes(logReader.getAllPipeInfos());
       syncMetadata.setPipeSinks(logReader.getAllPipeSinks());
       syncMetadata.setRunningPipe(logReader.getRunningPipeInfo());
-    } catch (StartupException e) {
+    } catch (IOException e) {
       LOGGER.error(
           "Cannot recover ReceiverInfo because {}. Use default info values.", e.getMessage());
     }
@@ -75,7 +76,7 @@
     PipeSink pipeSink = SyncPipeUtil.parseCreatePipeSinkPlan(plan);
     // should guarantee the adding pipesink is not exist.
     syncMetadata.addPipeSink(pipeSink);
-    syncLogWriter.addPipeSink(plan);
+    syncLogWriter.addPipeSink(pipeSink);
   }
 
   public void addPipeSink(CreatePipeSinkStatement createPipeSinkStatement)
@@ -84,7 +85,7 @@
     PipeSink pipeSink = SyncPipeUtil.parseCreatePipeSinkStatement(createPipeSinkStatement);
     // should guarantee the adding pipesink is not exist.
     syncMetadata.addPipeSink(pipeSink);
-    syncLogWriter.addPipeSink(createPipeSinkStatement);
+    syncLogWriter.addPipeSink(pipeSink);
   }
 
   public void dropPipeSink(String name) throws PipeSinkException, IOException {
@@ -112,7 +113,7 @@
     PipeSink pipeSink = getPipeSink(plan.getPipeSinkName());
     PipeInfo pipeInfo = SyncPipeUtil.parseCreatePipePlanAsPipeInfo(plan, pipeSink, createTime);
     syncMetadata.addPipe(pipeInfo, pipeSink);
-    syncLogWriter.addPipe(plan, createTime);
+    syncLogWriter.addPipe(pipeInfo);
   }
 
   public void addPipe(CreatePipeStatement createPipeStatement, long createTime)
@@ -125,7 +126,7 @@
     PipeInfo pipeInfo =
         SyncPipeUtil.parseCreatePipePlanAsPipeInfo(createPipeStatement, pipeSink, createTime);
     syncMetadata.addPipe(pipeInfo, pipeSink);
-    syncLogWriter.addPipe(createPipeStatement, createTime);
+    syncLogWriter.addPipe(pipeInfo);
   }
 
   public void operatePipe(String pipeName, SyncOperation syncOperation)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index 46f23e3..0a6700f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -185,4 +185,9 @@
   public void reset() {
     localSyncInfo = new LocalSyncInfo();
   }
+
+  @TestOnly
+  public void close() throws IOException {
+    localSyncInfo.close();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
deleted file mode 100644
index eb5de29..0000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.common.persistence;
-
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.commons.sync.pipesink.PipeSink;
-import org.apache.iotdb.commons.sync.utils.SyncConstant;
-import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
-import org.apache.iotdb.db.mpp.plan.constant.StatementType;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class SyncLogReader {
-  private static final Logger logger = LoggerFactory.getLogger(SyncLogReader.class);
-  // <pipeSinkName, PipeSink>
-  private Map<String, PipeSink> pipeSinks = new ConcurrentHashMap<>();
-  private Map<String, Map<Long, PipeInfo>> pipes = new ConcurrentHashMap<>();
-  private PipeInfo runningPipe;
-
-  public void recover() throws StartupException {
-    logger.info("Start to recover all sync state for sync.");
-    File serviceLogFile = new File(SyncPathUtil.getSysDir(), SyncConstant.SYNC_LOG_NAME);
-    try (BufferedReader br = new BufferedReader(new FileReader(serviceLogFile))) {
-      recoverPipe(br);
-    } catch (IOException e) {
-      logger.warn("Sync service log file not found");
-    }
-  }
-
-  public Map<String, PipeSink> getAllPipeSinks() {
-    return pipeSinks;
-  }
-
-  public Map<String, Map<Long, PipeInfo>> getAllPipeInfos() {
-    return pipes;
-  }
-
-  public PipeInfo getRunningPipeInfo() {
-    return runningPipe;
-  }
-
-  private void recoverPipe(BufferedReader br) throws IOException {
-    int lineNumber =
-        0; // line index shown in sender log starts from 1, so lineNumber starts from 0.
-    String readLine = "";
-    String[] parseStrings;
-
-    try {
-      while ((readLine = br.readLine()) != null) {
-        lineNumber += 1;
-        parseStrings = readLine.split(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
-
-        StatementType type = StatementType.valueOf(parseStrings[0]);
-
-        switch (type) {
-          case CREATE_PIPESINK:
-            readLine = br.readLine();
-            lineNumber += 1;
-            CreatePipeSinkStatement createPipeSinkStatement =
-                CreatePipeSinkStatement.parseString(readLine);
-            pipeSinks.put(
-                createPipeSinkStatement.getPipeSinkName(),
-                SyncPipeUtil.parseCreatePipeSinkStatement(createPipeSinkStatement));
-            break;
-          case DROP_PIPESINK:
-            pipeSinks.remove(parseStrings[1]);
-            break;
-          case CREATE_PIPE:
-            readLine = br.readLine();
-            lineNumber += 1;
-            CreatePipeStatement createPipeStatement = CreatePipeStatement.parseString(readLine);
-            runningPipe =
-                SyncPipeUtil.parseCreatePipePlanAsPipeInfo(
-                    createPipeStatement,
-                    pipeSinks.get(createPipeStatement.getPipeSinkName()),
-                    Long.parseLong(parseStrings[1]));
-            pipes
-                .computeIfAbsent(runningPipe.getPipeName(), i -> new ConcurrentHashMap<>())
-                .computeIfAbsent(runningPipe.getCreateTime(), i -> runningPipe);
-            break;
-          case STOP_PIPE:
-            runningPipe.stop();
-            break;
-          case START_PIPE:
-            runningPipe.start();
-            break;
-          case DROP_PIPE:
-            runningPipe.drop();
-            break;
-          default:
-            throw new UnsupportedOperationException(
-                String.format("Can not recognize type %s.", type.name()));
-        }
-      }
-    } catch (Exception e) {
-      throw new IOException(
-          String.format("Recover error in line %d : %s, because %s", lineNumber, readLine, e));
-    }
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
deleted file mode 100644
index e6920e9..0000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.common.persistence;
-
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
-import org.apache.iotdb.commons.sync.utils.SyncConstant;
-import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-
-/**
- * SyncLogger is used to manage the persistent information in the sync module. Persistent
- * information can be recovered on reboot via {@linkplain SyncLogReader}.
- */
-public class SyncLogWriter {
-  // record pipe meta info
-  private BufferedWriter pipeInfoWriter;
-
-  private SyncLogWriter() {}
-
-  public void getBufferedWriter() throws IOException {
-    if (pipeInfoWriter == null) {
-      File logFile = new File(SyncPathUtil.getSysDir(), SyncConstant.SYNC_LOG_NAME);
-      if (!logFile.getParentFile().exists()) {
-        logFile.getParentFile().mkdirs();
-      }
-      pipeInfoWriter = new BufferedWriter(new FileWriter(logFile, true));
-    }
-  }
-
-  // TODO(sync): delete this in new-standalone version
-  public synchronized void addPipeSink(CreatePipeSinkPlan plan) throws IOException {
-    getBufferedWriter();
-    pipeInfoWriter.write(Operator.OperatorType.CREATE_PIPESINK.name());
-    pipeInfoWriter.newLine();
-    pipeInfoWriter.write(plan.toString());
-    pipeInfoWriter.newLine();
-    pipeInfoWriter.flush();
-  }
-
-  public synchronized void addPipeSink(CreatePipeSinkStatement createPipeSinkStatement)
-      throws IOException {
-    getBufferedWriter();
-    pipeInfoWriter.write(createPipeSinkStatement.getType().name());
-    pipeInfoWriter.newLine();
-    pipeInfoWriter.write(createPipeSinkStatement.toString());
-    pipeInfoWriter.newLine();
-    pipeInfoWriter.flush();
-  }
-
-  public synchronized void dropPipeSink(String pipeSinkName) throws IOException {
-    getBufferedWriter();
-    pipeInfoWriter.write(Operator.OperatorType.DROP_PIPESINK.name());
-    pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
-    pipeInfoWriter.write(pipeSinkName);
-    pipeInfoWriter.newLine();
-    pipeInfoWriter.flush();
-  }
-
-  // TODO(sync): delete this in new-standalone version
-  public synchronized void addPipe(CreatePipePlan plan, long pipeCreateTime) throws IOException {
-    getBufferedWriter();
-    pipeInfoWriter.write(Operator.OperatorType.CREATE_PIPE.name());
-    pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
-    pipeInfoWriter.write(String.valueOf(pipeCreateTime));
-    pipeInfoWriter.newLine();
-    pipeInfoWriter.write(plan.toString());
-    pipeInfoWriter.newLine();
-    pipeInfoWriter.flush();
-  }
-
-  public synchronized void addPipe(CreatePipeStatement createPipeStatement, long pipeCreateTime)
-      throws IOException {
-    getBufferedWriter();
-    pipeInfoWriter.write(createPipeStatement.getType().name());
-    pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
-    pipeInfoWriter.write(String.valueOf(pipeCreateTime));
-    pipeInfoWriter.newLine();
-    pipeInfoWriter.write(createPipeStatement.toString());
-    pipeInfoWriter.newLine();
-    pipeInfoWriter.flush();
-  }
-
-  public synchronized void operatePipe(String pipeName, SyncOperation syncOperation)
-      throws IOException {
-    getBufferedWriter();
-    pipeInfoWriter.write(syncOperation.name());
-    pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
-    pipeInfoWriter.write(pipeName);
-    pipeInfoWriter.newLine();
-    pipeInfoWriter.flush();
-  }
-
-  public void close() throws IOException {
-    if (pipeInfoWriter != null) {
-      pipeInfoWriter.close();
-      pipeInfoWriter = null;
-    }
-  }
-
-  private static class SyncLoggerHolder {
-    private static final SyncLogWriter INSTANCE = new SyncLogWriter();
-
-    private SyncLoggerHolder() {
-      // empty constructor
-    }
-  }
-
-  public static SyncLogWriter getInstance() {
-    return SyncLogWriter.SyncLoggerHolder.INSTANCE;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
index b7c4a1c..b926ac1 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
@@ -24,10 +24,14 @@
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -38,11 +42,13 @@
 
   private final PipeSinkType pipeSinkType = PipeSinkType.ExternalPipe;
 
-  private final String pipeSinkName;
-  private final String extPipeSinkTypeName;
+  private String pipeSinkName;
+  private String extPipeSinkTypeName;
 
   private Map<String, String> sinkParams;
 
+  public ExternalPipeSink() {}
+
   public ExternalPipeSink(String pipeSinkName, String extPipeSinkTypeName) {
     this.pipeSinkName = pipeSinkName;
     this.extPipeSinkTypeName = extPipeSinkTypeName;
@@ -111,6 +117,21 @@
     return new TPipeSinkInfo(this.pipeSinkName, this.pipeSinkType.name()).setAttributes(sinkParams);
   }
 
+  @Override
+  public void serialize(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write((byte) pipeSinkType.ordinal(), outputStream);
+    ReadWriteIOUtils.write(pipeSinkName, outputStream);
+    ReadWriteIOUtils.write(extPipeSinkTypeName, outputStream);
+    ReadWriteIOUtils.write(sinkParams, outputStream);
+  }
+
+  @Override
+  public void deserialize(InputStream inputStream) throws IOException {
+    pipeSinkName = ReadWriteIOUtils.readString(inputStream);
+    extPipeSinkTypeName = ReadWriteIOUtils.readString(inputStream);
+    sinkParams = ReadWriteIOUtils.readMap(inputStream);
+  }
+
   public Map<String, String> getSinkParams() {
     return sinkParams;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
index b907dd0..23fc443 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
@@ -18,17 +18,18 @@
  */
 package org.apache.iotdb.db.sync.receiver.recovery;
 
+import org.apache.iotdb.commons.sync.persistence.SyncLogReader;
+import org.apache.iotdb.commons.sync.persistence.SyncLogWriter;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipe.PipeStatus;
 import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
+import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.sync.SyncTestUtils;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 
 import org.junit.After;
@@ -36,7 +37,9 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 
 /** This test is for ReceiverLog and ReceiverLogAnalyzer */
@@ -62,19 +65,23 @@
   @Test
   public void testServiceLog() {
     try {
-      SyncLogWriter log = SyncLogWriter.getInstance();
-      CreatePipeSinkPlan createPipeSinkPlan = new CreatePipeSinkPlan("demo", "iotdb");
-      createPipeSinkPlan.addPipeSinkAttribute("ip", "127.0.0.1");
-      createPipeSinkPlan.addPipeSinkAttribute("port", "6670");
-      log.addPipeSink(createPipeSinkPlan);
-      log.addPipe(new CreatePipePlan(pipe1, "demo"), createdTime1);
+      SyncLogWriter log = new SyncLogWriter(new File(SyncPathUtil.getSysDir()));
+      PipeSink pipeSink = new IoTDBPipeSink("demo");
+      Map<String, String> attributes = new HashMap<>();
+      attributes.put("ip", "192.168.11.11");
+      attributes.put("port", "7766");
+      pipeSink.setAttribute(attributes);
+      log.addPipeSink(pipeSink);
+      PipeInfo pipeInfo1 = new TsFilePipeInfo(pipe1, "demo", createdTime1, 0, true);
+      PipeInfo pipeInfo2 = new TsFilePipeInfo(pipe2, "demo", createdTime2, 99, false);
+      log.addPipe(pipeInfo1);
       log.operatePipe(pipe1, SyncOperation.DROP_PIPE);
 
-      log.addPipe(new CreatePipePlan(pipe2, "demo"), createdTime2);
-      log.operatePipe(pipe1, SyncOperation.STOP_PIPE);
-      log.operatePipe(pipe1, SyncOperation.START_PIPE);
+      log.addPipe(pipeInfo2);
+      log.operatePipe(pipe2, SyncOperation.STOP_PIPE);
+      log.operatePipe(pipe2, SyncOperation.START_PIPE);
       log.close();
-      SyncLogReader syncLogReader = new SyncLogReader();
+      SyncLogReader syncLogReader = new SyncLogReader(new File(SyncPathUtil.getSysDir()));
 
       syncLogReader.recover();
 
@@ -92,25 +99,25 @@
           createdTime2,
           PipeMessage.PipeMessageType.NORMAL);
       Map<String, Map<Long, PipeInfo>> pipes = syncLogReader.getAllPipeInfos();
-      PipeInfo pipeInfo1 = pipes.get(pipe1).get(createdTime1);
+      PipeInfo pipeInfoRecover1 = pipes.get(pipe1).get(createdTime1);
       SyncTestUtils.checkPipeInfo(
-          pipeInfo1,
+          pipeInfoRecover1,
           pipe1,
           "demo",
           PipeStatus.DROP,
           createdTime1,
           PipeMessage.PipeMessageType.NORMAL);
-      PipeInfo pipeInfo2 = pipes.get(pipe2).get(createdTime2);
+      PipeInfo pipeInfoRecover2 = pipes.get(pipe2).get(createdTime2);
       SyncTestUtils.checkPipeInfo(
-          pipeInfo2,
+          pipeInfoRecover2,
           pipe2,
           "demo",
           PipeStatus.RUNNING,
           createdTime2,
           PipeMessage.PipeMessageType.NORMAL);
     } catch (Exception e) {
-      Assert.fail();
       e.printStackTrace();
+      Assert.fail();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 0b17944..9298697 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -50,7 +50,7 @@
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.rescon.TsFileResourceManager;
 import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
+import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
 import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.recover.WALRecoverManager;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
@@ -193,7 +193,7 @@
     LastQueryExecutor.clear();
 
     // clear SyncLogger
-    SyncLogWriter.getInstance().close();
+    LocalSyncInfoFetcher.getInstance().close();
 
     // delete all directory
     cleanAllDir();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 1ad3b5f..5959ba0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -692,6 +692,22 @@
     return map;
   }
 
+  public static Map<String, String> readMap(InputStream inputStream) throws IOException {
+    int length = readInt(inputStream);
+    if (length == NO_BYTE_TO_READ) {
+      return null;
+    }
+    Map<String, String> map = new HashMap<>(length);
+    for (int i = 0; i < length; i++) {
+      // key
+      String key = readString(inputStream);
+      // value
+      String value = readString(inputStream);
+      map.put(key, value);
+    }
+    return map;
+  }
+
   public static LinkedHashMap<String, String> readLinkedHashMap(ByteBuffer buffer) {
     int length = readInt(buffer);
     if (length == NO_BYTE_TO_READ) {