[IOTDB-4488] Implement ClusterSyncInfo snapshot by SyncLogWritter and SyncLogReader (#7452)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
index 5838fb2..c42dd0d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
@@ -101,17 +101,20 @@
return resp;
}
+ // endregion
+
+ // ======================================================
+ // region Implement of Snapshot
+ // ======================================================
+
@Override
public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
- // TODO: merge snapshot logic into SyncLogWritter and SyncLogReader
- // TODO: add ClusterSyncInfoTest later
- return true;
+ return syncMetadata.processTakeSnapshot(snapshotDir);
}
@Override
public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
- // TODO: merge snapshot logic into SyncLogWritter and SyncLogReader
- // TODO: add ClusterSyncInfoTest later
+ syncMetadata.processLoadSnapshot(snapshotDir);
}
// endregion
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java
new file mode 100644
index 0000000..dd9abc5
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.exception.sync.PipeSinkException;
+import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
+import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class ClusterSyncInfoTest {
+
+ private ClusterSyncInfo clusterSyncInfo;
+ private static final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot");
+
+ @Before
+ public void setup() throws IOException {
+ clusterSyncInfo = new ClusterSyncInfo();
+ if (!snapshotDir.exists()) {
+ snapshotDir.mkdirs();
+ }
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ if (snapshotDir.exists()) {
+ FileUtils.deleteDirectory(snapshotDir);
+ }
+ }
+
+ private void prepareClusterSyncInfo() {
+ Map<String, String> attributes1 = new HashMap<>();
+ attributes1.put("ip", "192.168.11.11");
+ attributes1.put("port", "7766");
+ TPipeSinkInfo pipeSinkInfo1 =
+ new TPipeSinkInfo()
+ .setPipeSinkName("demo1")
+ .setPipeSinkType("IoTDB")
+ .setAttributes(attributes1);
+ Map<String, String> attributes2 = new HashMap<>();
+ attributes2.put("ip", "192.168.22.2");
+ attributes2.put("port", "7777");
+ TPipeSinkInfo pipeSinkInfo2 =
+ new TPipeSinkInfo()
+ .setPipeSinkName("demo2")
+ .setPipeSinkType("IoTDB")
+ .setAttributes(attributes2);
+
+ clusterSyncInfo.addPipeSink(new CreatePipeSinkPlan(pipeSinkInfo1));
+ clusterSyncInfo.addPipeSink(new CreatePipeSinkPlan(pipeSinkInfo2));
+ }
+
+ @Test
+ public void testSnapshot() throws Exception {
+ prepareClusterSyncInfo();
+
+ clusterSyncInfo.processTakeSnapshot(snapshotDir);
+ ClusterSyncInfo clusterSyncInfo2 = new ClusterSyncInfo();
+ clusterSyncInfo2.processLoadSnapshot(snapshotDir);
+
+ List<PipeSink> expectedPipeSink =
+ clusterSyncInfo.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
+ List<PipeSink> actualPipeSink =
+ clusterSyncInfo2.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
+ Assert.assertEquals(expectedPipeSink, actualPipeSink);
+ }
+
+ @Test
+ public void testPipeSinkOperation() {
+ prepareClusterSyncInfo();
+ try {
+ clusterSyncInfo.checkAddPipeSink("demo1");
+ clusterSyncInfo.checkDropPipeSink("demo3");
+ Assert.fail("checkOperatePipeSink ignore failure.");
+ } catch (PipeSinkException e) {
+ // nothing
+ }
+ try {
+ clusterSyncInfo.checkAddPipeSink("demo3");
+ clusterSyncInfo.checkDropPipeSink("demo1");
+ } catch (PipeSinkException e) {
+ Assert.fail("checkOperatePipeSink should not throw exception.");
+ }
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
index fee6963..5704381 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
@@ -20,18 +20,31 @@
import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.exception.sync.PipeSinkException;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.sync.persistence.SyncLogReader;
+import org.apache.iotdb.commons.sync.persistence.SyncLogWriter;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipe.PipeStatus;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncConstant;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-public class SyncMetadata {
+public class SyncMetadata implements SnapshotProcessor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SyncMetadata.class);
// <PipeSinkName, PipeSink>
private Map<String, PipeSink> pipeSinks;
@@ -207,6 +220,56 @@
}
}
+ @Override
+ public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
+ File snapshotFile = new File(snapshotDir, SyncConstant.SYNC_LOG_NAME);
+ if (snapshotFile.exists() && snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to take snapshot, because snapshot file [{}] is already exist.",
+ snapshotFile.getAbsolutePath());
+ return false;
+ }
+ File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID());
+ try (SyncLogWriter writer = new SyncLogWriter(snapshotDir, tmpFile.getName())) {
+ for (PipeSink pipeSink : pipeSinks.values()) {
+ writer.addPipeSink(pipeSink);
+ }
+ for (Map<Long, PipeInfo> map : pipes.values()) {
+ for (PipeInfo pipeInfo : map.values()) {
+ writer.addPipe(pipeInfo);
+ switch (pipeInfo.getStatus()) {
+ case RUNNING:
+ writer.operatePipe(pipeInfo.getPipeName(), SyncOperation.START_PIPE);
+ break;
+ case STOP:
+ writer.operatePipe(pipeInfo.getPipeName(), SyncOperation.STOP_PIPE);
+ break;
+ case DROP:
+ writer.operatePipe(pipeInfo.getPipeName(), SyncOperation.DROP_PIPE);
+ break;
+ }
+ }
+ }
+ }
+ return tmpFile.renameTo(snapshotFile);
+ }
+
+ @Override
+ public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
+ File snapshotFile = new File(snapshotDir, SyncConstant.SYNC_LOG_NAME);
+ if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to load snapshot,snapshot file [{}] is not exist.",
+ snapshotFile.getAbsolutePath());
+ return;
+ }
+ SyncLogReader reader = new SyncLogReader(snapshotDir);
+ reader.recover();
+ setPipes(reader.getAllPipeInfos());
+ setPipeSinks(reader.getAllPipeSinks());
+ setRunningPipe(reader.getRunningPipeInfo());
+ }
+
// endregion
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java
new file mode 100644
index 0000000..2f402ca
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.sync.persistence;
+
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncConstant;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SyncLogReader {
+ private static final Logger logger = LoggerFactory.getLogger(SyncLogReader.class);
+ // <pipeSinkName, PipeSink>
+ private final Map<String, PipeSink> pipeSinks = new ConcurrentHashMap<>();
+ private final Map<String, Map<Long, PipeInfo>> pipes = new ConcurrentHashMap<>();
+ private PipeInfo runningPipe;
+ private final File dir;
+ private final String fileName;
+
+ public SyncLogReader(File dir) {
+ this.dir = dir;
+ this.fileName = SyncConstant.SYNC_LOG_NAME;
+ }
+
+ public SyncLogReader(File dir, String fileName) {
+ this.dir = dir;
+ this.fileName = fileName;
+ }
+
+ public void recover() throws IOException {
+ logger.info("Start to recover all sync state for sync.");
+ File serviceLogFile = new File(dir, fileName);
+ if (!serviceLogFile.exists()) {
+ logger.warn("Sync service log file not found");
+ } else {
+ try (InputStream inputStream = new FileInputStream(serviceLogFile)) {
+ recoverPipe(inputStream);
+ }
+ }
+ }
+
+ public Map<String, PipeSink> getAllPipeSinks() {
+ return pipeSinks;
+ }
+
+ public Map<String, Map<Long, PipeInfo>> getAllPipeInfos() {
+ return pipes;
+ }
+
+ public PipeInfo getRunningPipeInfo() {
+ return runningPipe;
+ }
+
+ private void recoverPipe(InputStream inputStream) throws IOException {
+ byte nextByte;
+ while ((nextByte = ReadWriteIOUtils.readByte(inputStream)) != -1) {
+ SyncOperation operationType = SyncOperation.values()[nextByte];
+ switch (operationType) {
+ case CREATE_PIPESINK:
+ PipeSink pipeSink = PipeSink.deserializePipeSink(inputStream);
+ pipeSinks.put(pipeSink.getPipeSinkName(), pipeSink);
+ break;
+ case DROP_PIPESINK:
+ pipeSinks.remove(ReadWriteIOUtils.readString(inputStream));
+ break;
+ case CREATE_PIPE:
+ runningPipe = PipeInfo.deserializePipeInfo(inputStream);
+ pipes
+ .computeIfAbsent(runningPipe.getPipeName(), i -> new ConcurrentHashMap<>())
+ .computeIfAbsent(runningPipe.getCreateTime(), i -> runningPipe);
+ break;
+ case STOP_PIPE:
+ // TODO: support multiple pipe
+ ReadWriteIOUtils.readString(inputStream);
+ runningPipe.stop();
+ break;
+ case START_PIPE:
+ // TODO: support multiple pipe
+ ReadWriteIOUtils.readString(inputStream);
+ runningPipe.start();
+ break;
+ case DROP_PIPE:
+ // TODO: support multiple pipe
+ ReadWriteIOUtils.readString(inputStream);
+ runningPipe.drop();
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Can not recognize SyncOperation %s.", operationType.name()));
+ }
+ }
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogWriter.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogWriter.java
new file mode 100644
index 0000000..84fe42f
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogWriter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.sync.persistence;
+
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncConstant;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * SyncLogger is used to manage the persistent information in the sync module. Persistent
+ * information can be recovered on reboot via {@linkplain SyncLogReader}.
+ */
+public class SyncLogWriter implements AutoCloseable {
+ // record pipe meta info
+ private OutputStream outputStream;
+ private final File dir;
+ private final String fileName;
+
+ public SyncLogWriter(File dir) {
+ this.dir = dir;
+ this.fileName = SyncConstant.SYNC_LOG_NAME;
+ }
+
+ public SyncLogWriter(File dir, String fileName) {
+ this.dir = dir;
+ this.fileName = fileName;
+ }
+
+ public void getOutputStream() throws IOException {
+ if (outputStream == null) {
+ // File logFile = new File(SyncPathUtil.getSysDir(), SyncConstant.SYNC_LOG_NAME);
+ File logFile = new File(dir, fileName);
+ if (!logFile.getParentFile().exists()) {
+ logFile.getParentFile().mkdirs();
+ }
+ outputStream = new FileOutputStream(logFile, true);
+ }
+ }
+
+ public synchronized void addPipeSink(PipeSink pipeSink) throws IOException {
+ getOutputStream();
+ ReadWriteIOUtils.write((byte) SyncOperation.CREATE_PIPESINK.ordinal(), outputStream);
+ pipeSink.serialize(outputStream);
+ }
+
+ public synchronized void dropPipeSink(String pipeSinkName) throws IOException {
+ getOutputStream();
+ ReadWriteIOUtils.write((byte) SyncOperation.DROP_PIPESINK.ordinal(), outputStream);
+ ReadWriteIOUtils.write(pipeSinkName, outputStream);
+ }
+
+ public synchronized void addPipe(PipeInfo pipeInfo) throws IOException {
+ getOutputStream();
+ ReadWriteIOUtils.write((byte) SyncOperation.CREATE_PIPE.ordinal(), outputStream);
+ pipeInfo.serialize(outputStream);
+ }
+
+ public synchronized void operatePipe(String pipeName, SyncOperation syncOperation)
+ throws IOException {
+ getOutputStream();
+ ReadWriteIOUtils.write((byte) syncOperation.ordinal(), outputStream);
+ ReadWriteIOUtils.write(pipeName, outputStream);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (outputStream != null) {
+ outputStream.close();
+ outputStream = null;
+ }
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
index 2067145..b93da6a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
@@ -18,13 +18,23 @@
*/
package org.apache.iotdb.commons.sync.pipe;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
public abstract class PipeInfo {
+
protected String pipeName;
protected String pipeSinkName;
protected PipeStatus status;
protected long createTime;
protected PipeMessage.PipeMessageType messageType;
+ // only used for serialization
+ protected PipeInfo() {}
+
public PipeInfo(String pipeName, String pipeSinkName, long createTime) {
this.pipeName = pipeName;
this.pipeSinkName = pipeSinkName;
@@ -41,6 +51,8 @@
this.messageType = PipeMessage.PipeMessageType.NORMAL;
}
+ abstract PipeType getType();
+
public String getPipeName() {
return pipeName;
}
@@ -92,4 +104,42 @@
public void setCreateTime(long createTime) {
this.createTime = createTime;
}
+
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write((byte) getType().ordinal(), outputStream);
+ ReadWriteIOUtils.write(pipeName, outputStream);
+ ReadWriteIOUtils.write(pipeSinkName, outputStream);
+ ReadWriteIOUtils.write((byte) status.ordinal(), outputStream);
+ ReadWriteIOUtils.write(createTime, outputStream);
+ ReadWriteIOUtils.write((byte) messageType.ordinal(), outputStream);
+ }
+
+ protected void deserialize(InputStream inputStream) throws IOException {
+ pipeName = ReadWriteIOUtils.readString(inputStream);
+ pipeSinkName = ReadWriteIOUtils.readString(inputStream);
+ status = PipeStatus.values()[ReadWriteIOUtils.readByte(inputStream)];
+ createTime = ReadWriteIOUtils.readLong(inputStream);
+ messageType = PipeMessage.PipeMessageType.values()[ReadWriteIOUtils.readByte(inputStream)];
+ }
+
+ public static PipeInfo deserializePipeInfo(InputStream inputStream) throws IOException {
+ PipeType pipeType = PipeType.values()[ReadWriteIOUtils.readByte(inputStream)];
+ PipeInfo pipeInfo;
+ switch (pipeType) {
+ case TsFilePipe:
+ pipeInfo = new TsFilePipeInfo();
+ pipeInfo.deserialize(inputStream);
+ break;
+ case WALPipe:
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Can not recognize PipeType %s.", pipeType.name()));
+ }
+ return pipeInfo;
+ }
+
+ enum PipeType {
+ TsFilePipe,
+ WALPipe
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
index 618e0eb..d2bfdf7 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.commons.sync.pipe;
public enum SyncOperation {
+ CREATE_PIPESINK,
+ DROP_PIPESINK,
+ CREATE_PIPE,
START_PIPE,
STOP_PIPE,
DROP_PIPE
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
index 81feed0..391d4c6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
@@ -18,12 +18,20 @@
*/
package org.apache.iotdb.commons.sync.pipe;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.Objects;
public class TsFilePipeInfo extends PipeInfo {
private boolean syncDelOp;
private long dataStartTimestamp;
+ // only used for serialization
+ protected TsFilePipeInfo() {}
+
public TsFilePipeInfo(
String pipeName,
String pipeSinkName,
@@ -64,6 +72,25 @@
}
@Override
+ PipeType getType() {
+ return PipeType.TsFilePipe;
+ }
+
+ @Override
+ public void serialize(OutputStream outputStream) throws IOException {
+ super.serialize(outputStream);
+ ReadWriteIOUtils.write(syncDelOp, outputStream);
+ ReadWriteIOUtils.write(dataStartTimestamp, outputStream);
+ }
+
+ @Override
+ protected void deserialize(InputStream inputStream) throws IOException {
+ super.deserialize(inputStream);
+ syncDelOp = ReadWriteIOUtils.readBool(inputStream);
+ dataStartTimestamp = ReadWriteIOUtils.readLong(inputStream);
+ }
+
+ @Override
public String toString() {
return "TsFilePipeInfo{"
+ "pipeName='"
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
index 56736a4..b0cbf2a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
@@ -24,14 +24,18 @@
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class IoTDBPipeSink implements PipeSink {
- private final PipeSinkType pipeSinkType;
+ private final PipeSinkType pipeSinkType = PipeSinkType.IoTDB;
private String name;
private String ip;
@@ -40,11 +44,13 @@
private static final String ATTRIBUTE_IP_KEY = "ip";
private static final String ATTRIBUTE_PORT_KEY = "port";
+ public IoTDBPipeSink() {}
+
public IoTDBPipeSink(String name) {
- ip = SyncConstant.DEFAULT_PIPE_SINK_IP;
- port = SyncConstant.DEFAULT_PIPE_SINK_PORT;
+ this();
+ this.ip = SyncConstant.DEFAULT_PIPE_SINK_IP;
+ this.port = SyncConstant.DEFAULT_PIPE_SINK_PORT;
this.name = name;
- pipeSinkType = PipeSinkType.IoTDB;
}
@Override
@@ -123,6 +129,21 @@
}
@Override
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write((byte) pipeSinkType.ordinal(), outputStream);
+ ReadWriteIOUtils.write(name, outputStream);
+ ReadWriteIOUtils.write(ip, outputStream);
+ ReadWriteIOUtils.write(port, outputStream);
+ }
+
+ @Override
+ public void deserialize(InputStream inputStream) throws IOException {
+ name = ReadWriteIOUtils.readString(inputStream);
+ ip = ReadWriteIOUtils.readString(inputStream);
+ port = ReadWriteIOUtils.readInt(inputStream);
+ }
+
+ @Override
public String toString() {
return "IoTDBPipeSink{"
+ "pipeSinkType="
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
index d68a35d..588618d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
@@ -22,7 +22,11 @@
import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.List;
import java.util.Map;
@@ -41,6 +45,27 @@
TPipeSinkInfo getTPipeSinkInfo();
+ void serialize(OutputStream outputStream) throws IOException;
+
+ void deserialize(InputStream inputStream) throws IOException;
+
+ static PipeSink deserializePipeSink(InputStream inputStream) throws IOException {
+ PipeSinkType pipeSinkType = PipeSinkType.values()[ReadWriteIOUtils.readByte(inputStream)];
+ PipeSink pipeSink;
+ switch (pipeSinkType) {
+ case IoTDB:
+ pipeSink = new IoTDBPipeSink();
+ pipeSink.deserialize(inputStream);
+ break;
+ case ExternalPipe:
+ // TODO(ext-pipe): deserialize external pipesink here
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Can not recognize PipeSinkType %s.", pipeSinkType.name()));
+ }
+ return pipeSink;
+ }
+
enum PipeSinkType {
IoTDB,
ExternalPipe
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
index 6f44f50..a90a67b 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
@@ -18,25 +18,26 @@
*/
package org.apache.iotdb.db.sync.common;
-import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.sync.metadata.SyncMetadata;
+import org.apache.iotdb.commons.sync.persistence.SyncLogReader;
+import org.apache.iotdb.commons.sync.persistence.SyncLogWriter;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.util.List;
@@ -49,15 +50,15 @@
private final SyncMetadata syncMetadata;
public LocalSyncInfo() {
- syncLogWriter = SyncLogWriter.getInstance();
+ syncLogWriter = new SyncLogWriter(new File(SyncPathUtil.getSysDir()));
syncMetadata = new SyncMetadata();
- SyncLogReader logReader = new SyncLogReader();
+ SyncLogReader logReader = new SyncLogReader(new File(SyncPathUtil.getSysDir()));
try {
logReader.recover();
syncMetadata.setPipes(logReader.getAllPipeInfos());
syncMetadata.setPipeSinks(logReader.getAllPipeSinks());
syncMetadata.setRunningPipe(logReader.getRunningPipeInfo());
- } catch (StartupException e) {
+ } catch (IOException e) {
LOGGER.error(
"Cannot recover ReceiverInfo because {}. Use default info values.", e.getMessage());
}
@@ -75,7 +76,7 @@
PipeSink pipeSink = SyncPipeUtil.parseCreatePipeSinkPlan(plan);
// should guarantee the adding pipesink is not exist.
syncMetadata.addPipeSink(pipeSink);
- syncLogWriter.addPipeSink(plan);
+ syncLogWriter.addPipeSink(pipeSink);
}
public void addPipeSink(CreatePipeSinkStatement createPipeSinkStatement)
@@ -84,7 +85,7 @@
PipeSink pipeSink = SyncPipeUtil.parseCreatePipeSinkStatement(createPipeSinkStatement);
// should guarantee the adding pipesink is not exist.
syncMetadata.addPipeSink(pipeSink);
- syncLogWriter.addPipeSink(createPipeSinkStatement);
+ syncLogWriter.addPipeSink(pipeSink);
}
public void dropPipeSink(String name) throws PipeSinkException, IOException {
@@ -112,7 +113,7 @@
PipeSink pipeSink = getPipeSink(plan.getPipeSinkName());
PipeInfo pipeInfo = SyncPipeUtil.parseCreatePipePlanAsPipeInfo(plan, pipeSink, createTime);
syncMetadata.addPipe(pipeInfo, pipeSink);
- syncLogWriter.addPipe(plan, createTime);
+ syncLogWriter.addPipe(pipeInfo);
}
public void addPipe(CreatePipeStatement createPipeStatement, long createTime)
@@ -125,7 +126,7 @@
PipeInfo pipeInfo =
SyncPipeUtil.parseCreatePipePlanAsPipeInfo(createPipeStatement, pipeSink, createTime);
syncMetadata.addPipe(pipeInfo, pipeSink);
- syncLogWriter.addPipe(createPipeStatement, createTime);
+ syncLogWriter.addPipe(pipeInfo);
}
public void operatePipe(String pipeName, SyncOperation syncOperation)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index 46f23e3..0a6700f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -185,4 +185,9 @@
public void reset() {
localSyncInfo = new LocalSyncInfo();
}
+
+ @TestOnly
+ public void close() throws IOException {
+ localSyncInfo.close();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
deleted file mode 100644
index eb5de29..0000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.common.persistence;
-
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.commons.sync.pipesink.PipeSink;
-import org.apache.iotdb.commons.sync.utils.SyncConstant;
-import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
-import org.apache.iotdb.db.mpp.plan.constant.StatementType;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class SyncLogReader {
- private static final Logger logger = LoggerFactory.getLogger(SyncLogReader.class);
- // <pipeSinkName, PipeSink>
- private Map<String, PipeSink> pipeSinks = new ConcurrentHashMap<>();
- private Map<String, Map<Long, PipeInfo>> pipes = new ConcurrentHashMap<>();
- private PipeInfo runningPipe;
-
- public void recover() throws StartupException {
- logger.info("Start to recover all sync state for sync.");
- File serviceLogFile = new File(SyncPathUtil.getSysDir(), SyncConstant.SYNC_LOG_NAME);
- try (BufferedReader br = new BufferedReader(new FileReader(serviceLogFile))) {
- recoverPipe(br);
- } catch (IOException e) {
- logger.warn("Sync service log file not found");
- }
- }
-
- public Map<String, PipeSink> getAllPipeSinks() {
- return pipeSinks;
- }
-
- public Map<String, Map<Long, PipeInfo>> getAllPipeInfos() {
- return pipes;
- }
-
- public PipeInfo getRunningPipeInfo() {
- return runningPipe;
- }
-
- private void recoverPipe(BufferedReader br) throws IOException {
- int lineNumber =
- 0; // line index shown in sender log starts from 1, so lineNumber starts from 0.
- String readLine = "";
- String[] parseStrings;
-
- try {
- while ((readLine = br.readLine()) != null) {
- lineNumber += 1;
- parseStrings = readLine.split(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
-
- StatementType type = StatementType.valueOf(parseStrings[0]);
-
- switch (type) {
- case CREATE_PIPESINK:
- readLine = br.readLine();
- lineNumber += 1;
- CreatePipeSinkStatement createPipeSinkStatement =
- CreatePipeSinkStatement.parseString(readLine);
- pipeSinks.put(
- createPipeSinkStatement.getPipeSinkName(),
- SyncPipeUtil.parseCreatePipeSinkStatement(createPipeSinkStatement));
- break;
- case DROP_PIPESINK:
- pipeSinks.remove(parseStrings[1]);
- break;
- case CREATE_PIPE:
- readLine = br.readLine();
- lineNumber += 1;
- CreatePipeStatement createPipeStatement = CreatePipeStatement.parseString(readLine);
- runningPipe =
- SyncPipeUtil.parseCreatePipePlanAsPipeInfo(
- createPipeStatement,
- pipeSinks.get(createPipeStatement.getPipeSinkName()),
- Long.parseLong(parseStrings[1]));
- pipes
- .computeIfAbsent(runningPipe.getPipeName(), i -> new ConcurrentHashMap<>())
- .computeIfAbsent(runningPipe.getCreateTime(), i -> runningPipe);
- break;
- case STOP_PIPE:
- runningPipe.stop();
- break;
- case START_PIPE:
- runningPipe.start();
- break;
- case DROP_PIPE:
- runningPipe.drop();
- break;
- default:
- throw new UnsupportedOperationException(
- String.format("Can not recognize type %s.", type.name()));
- }
- }
- } catch (Exception e) {
- throw new IOException(
- String.format("Recover error in line %d : %s, because %s", lineNumber, readLine, e));
- }
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
deleted file mode 100644
index e6920e9..0000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.common.persistence;
-
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
-import org.apache.iotdb.commons.sync.utils.SyncConstant;
-import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-
-/**
- * SyncLogger is used to manage the persistent information in the sync module. Persistent
- * information can be recovered on reboot via {@linkplain SyncLogReader}.
- */
-public class SyncLogWriter {
- // record pipe meta info
- private BufferedWriter pipeInfoWriter;
-
- private SyncLogWriter() {}
-
- public void getBufferedWriter() throws IOException {
- if (pipeInfoWriter == null) {
- File logFile = new File(SyncPathUtil.getSysDir(), SyncConstant.SYNC_LOG_NAME);
- if (!logFile.getParentFile().exists()) {
- logFile.getParentFile().mkdirs();
- }
- pipeInfoWriter = new BufferedWriter(new FileWriter(logFile, true));
- }
- }
-
- // TODO(sync): delete this in new-standalone version
- public synchronized void addPipeSink(CreatePipeSinkPlan plan) throws IOException {
- getBufferedWriter();
- pipeInfoWriter.write(Operator.OperatorType.CREATE_PIPESINK.name());
- pipeInfoWriter.newLine();
- pipeInfoWriter.write(plan.toString());
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
- public synchronized void addPipeSink(CreatePipeSinkStatement createPipeSinkStatement)
- throws IOException {
- getBufferedWriter();
- pipeInfoWriter.write(createPipeSinkStatement.getType().name());
- pipeInfoWriter.newLine();
- pipeInfoWriter.write(createPipeSinkStatement.toString());
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
- public synchronized void dropPipeSink(String pipeSinkName) throws IOException {
- getBufferedWriter();
- pipeInfoWriter.write(Operator.OperatorType.DROP_PIPESINK.name());
- pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
- pipeInfoWriter.write(pipeSinkName);
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
- // TODO(sync): delete this in new-standalone version
- public synchronized void addPipe(CreatePipePlan plan, long pipeCreateTime) throws IOException {
- getBufferedWriter();
- pipeInfoWriter.write(Operator.OperatorType.CREATE_PIPE.name());
- pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
- pipeInfoWriter.write(String.valueOf(pipeCreateTime));
- pipeInfoWriter.newLine();
- pipeInfoWriter.write(plan.toString());
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
- public synchronized void addPipe(CreatePipeStatement createPipeStatement, long pipeCreateTime)
- throws IOException {
- getBufferedWriter();
- pipeInfoWriter.write(createPipeStatement.getType().name());
- pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
- pipeInfoWriter.write(String.valueOf(pipeCreateTime));
- pipeInfoWriter.newLine();
- pipeInfoWriter.write(createPipeStatement.toString());
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
- public synchronized void operatePipe(String pipeName, SyncOperation syncOperation)
- throws IOException {
- getBufferedWriter();
- pipeInfoWriter.write(syncOperation.name());
- pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
- pipeInfoWriter.write(pipeName);
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
- public void close() throws IOException {
- if (pipeInfoWriter != null) {
- pipeInfoWriter.close();
- pipeInfoWriter = null;
- }
- }
-
- private static class SyncLoggerHolder {
- private static final SyncLogWriter INSTANCE = new SyncLogWriter();
-
- private SyncLoggerHolder() {
- // empty constructor
- }
- }
-
- public static SyncLogWriter getInstance() {
- return SyncLogWriter.SyncLoggerHolder.INSTANCE;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
index b7c4a1c..b926ac1 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
@@ -24,10 +24,14 @@
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -38,11 +42,13 @@
private final PipeSinkType pipeSinkType = PipeSinkType.ExternalPipe;
- private final String pipeSinkName;
- private final String extPipeSinkTypeName;
+ private String pipeSinkName;
+ private String extPipeSinkTypeName;
private Map<String, String> sinkParams;
+ public ExternalPipeSink() {}
+
public ExternalPipeSink(String pipeSinkName, String extPipeSinkTypeName) {
this.pipeSinkName = pipeSinkName;
this.extPipeSinkTypeName = extPipeSinkTypeName;
@@ -111,6 +117,21 @@
return new TPipeSinkInfo(this.pipeSinkName, this.pipeSinkType.name()).setAttributes(sinkParams);
}
+ @Override
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write((byte) pipeSinkType.ordinal(), outputStream);
+ ReadWriteIOUtils.write(pipeSinkName, outputStream);
+ ReadWriteIOUtils.write(extPipeSinkTypeName, outputStream);
+ ReadWriteIOUtils.write(sinkParams, outputStream);
+ }
+
+ @Override
+ public void deserialize(InputStream inputStream) throws IOException {
+ pipeSinkName = ReadWriteIOUtils.readString(inputStream);
+ extPipeSinkTypeName = ReadWriteIOUtils.readString(inputStream);
+ sinkParams = ReadWriteIOUtils.readMap(inputStream);
+ }
+
public Map<String, String> getSinkParams() {
return sinkParams;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
index b907dd0..23fc443 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
@@ -18,17 +18,18 @@
*/
package org.apache.iotdb.db.sync.receiver.recovery;
+import org.apache.iotdb.commons.sync.persistence.SyncLogReader;
+import org.apache.iotdb.commons.sync.persistence.SyncLogWriter;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipe.PipeStatus;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
+import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.SyncTestUtils;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.junit.After;
@@ -36,7 +37,9 @@
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
/** This test is for ReceiverLog and ReceiverLogAnalyzer */
@@ -62,19 +65,23 @@
@Test
public void testServiceLog() {
try {
- SyncLogWriter log = SyncLogWriter.getInstance();
- CreatePipeSinkPlan createPipeSinkPlan = new CreatePipeSinkPlan("demo", "iotdb");
- createPipeSinkPlan.addPipeSinkAttribute("ip", "127.0.0.1");
- createPipeSinkPlan.addPipeSinkAttribute("port", "6670");
- log.addPipeSink(createPipeSinkPlan);
- log.addPipe(new CreatePipePlan(pipe1, "demo"), createdTime1);
+ SyncLogWriter log = new SyncLogWriter(new File(SyncPathUtil.getSysDir()));
+ PipeSink pipeSink = new IoTDBPipeSink("demo");
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("ip", "192.168.11.11");
+ attributes.put("port", "7766");
+ pipeSink.setAttribute(attributes);
+ log.addPipeSink(pipeSink);
+ PipeInfo pipeInfo1 = new TsFilePipeInfo(pipe1, "demo", createdTime1, 0, true);
+ PipeInfo pipeInfo2 = new TsFilePipeInfo(pipe2, "demo", createdTime2, 99, false);
+ log.addPipe(pipeInfo1);
log.operatePipe(pipe1, SyncOperation.DROP_PIPE);
- log.addPipe(new CreatePipePlan(pipe2, "demo"), createdTime2);
- log.operatePipe(pipe1, SyncOperation.STOP_PIPE);
- log.operatePipe(pipe1, SyncOperation.START_PIPE);
+ log.addPipe(pipeInfo2);
+ log.operatePipe(pipe2, SyncOperation.STOP_PIPE);
+ log.operatePipe(pipe2, SyncOperation.START_PIPE);
log.close();
- SyncLogReader syncLogReader = new SyncLogReader();
+ SyncLogReader syncLogReader = new SyncLogReader(new File(SyncPathUtil.getSysDir()));
syncLogReader.recover();
@@ -92,25 +99,25 @@
createdTime2,
PipeMessage.PipeMessageType.NORMAL);
Map<String, Map<Long, PipeInfo>> pipes = syncLogReader.getAllPipeInfos();
- PipeInfo pipeInfo1 = pipes.get(pipe1).get(createdTime1);
+ PipeInfo pipeInfoRecover1 = pipes.get(pipe1).get(createdTime1);
SyncTestUtils.checkPipeInfo(
- pipeInfo1,
+ pipeInfoRecover1,
pipe1,
"demo",
PipeStatus.DROP,
createdTime1,
PipeMessage.PipeMessageType.NORMAL);
- PipeInfo pipeInfo2 = pipes.get(pipe2).get(createdTime2);
+ PipeInfo pipeInfoRecover2 = pipes.get(pipe2).get(createdTime2);
SyncTestUtils.checkPipeInfo(
- pipeInfo2,
+ pipeInfoRecover2,
pipe2,
"demo",
PipeStatus.RUNNING,
createdTime2,
PipeMessage.PipeMessageType.NORMAL);
} catch (Exception e) {
- Assert.fail();
e.printStackTrace();
+ Assert.fail();
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 0b17944..9298697 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -50,7 +50,7 @@
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
+import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.recover.WALRecoverManager;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
@@ -193,7 +193,7 @@
LastQueryExecutor.clear();
// clear SyncLogger
- SyncLogWriter.getInstance().close();
+ LocalSyncInfoFetcher.getInstance().close();
// delete all directory
cleanAllDir();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 1ad3b5f..5959ba0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -692,6 +692,22 @@
return map;
}
+ public static Map<String, String> readMap(InputStream inputStream) throws IOException {
+ int length = readInt(inputStream);
+ if (length == NO_BYTE_TO_READ) {
+ return null;
+ }
+ Map<String, String> map = new HashMap<>(length);
+ for (int i = 0; i < length; i++) {
+ // key
+ String key = readString(inputStream);
+ // value
+ String value = readString(inputStream);
+ map.put(key, value);
+ }
+ return map;
+ }
+
public static LinkedHashMap<String, String> readLinkedHashMap(ByteBuffer buffer) {
int length = readInt(buffer);
if (length == NO_BYTE_TO_READ) {