blob: 6a8a3070e35e1f52a96d5d478158d24822697245 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.writelog.recover;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class LogReplayerTest {
@Before
public void before() {
EnvironmentUtils.envSetUp();
}
@After
public void after() throws IOException, StorageEngineException {
EnvironmentUtils.cleanEnv();
}
@Test
public void test()
throws IOException, StorageGroupProcessorException, QueryProcessException, MetadataException {
String logNodePrefix = "testLogNode";
File tsFile = SystemFileFactory.INSTANCE.getFile("temp", "1-1-1.tsfile");
File modF = SystemFileFactory.INSTANCE.getFile("test.mod");
ModificationFile modFile = new ModificationFile(modF.getPath());
TsFileResource tsFileResource = new TsFileResource(tsFile);
IMemTable memTable = new PrimitiveMemTable();
IoTDB.metaManager.setStorageGroup(new PartialPath("root.sg"));
try {
for (int i = 0; i <= 5; i++) {
for (int j = 0; j <= 5; j++) {
IoTDB.metaManager.createTimeseries(
new PartialPath("root.sg.device" + i + ".sensor" + j),
TSDataType.INT64,
TSEncoding.PLAIN,
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
}
LogReplayer replayer =
new LogReplayer(
logNodePrefix, tsFile.getPath(), modFile, tsFileResource, memTable, false);
WriteLogNode node =
MultiFileLogNodeManager.getInstance()
.getNode(
logNodePrefix + tsFile.getName(),
() -> {
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] =
ByteBuffer.allocateDirect(
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
byteBuffers[1] =
ByteBuffer.allocateDirect(
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
return byteBuffers;
});
node.write(
new InsertRowPlan(
new PartialPath("root.sg.device0"),
100,
"sensor0",
TSDataType.INT64,
String.valueOf(0)));
node.write(
new InsertRowPlan(
new PartialPath("root.sg.device0"),
2,
"sensor1",
TSDataType.INT64,
String.valueOf(0)));
for (int i = 1; i < 5; i++) {
node.write(
new InsertRowPlan(
new PartialPath("root.sg.device" + i),
i,
"sensor" + i,
TSDataType.INT64,
String.valueOf(i)));
}
node.write(insertTablePlan());
DeletePlan deletePlan = new DeletePlan(0, 200, new PartialPath("root.sg.device0.sensor0"));
node.write(deletePlan);
node.close();
replayer.replayLogs(
() -> {
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] =
ByteBuffer.allocateDirect(
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
byteBuffers[1] =
ByteBuffer.allocateDirect(
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
return byteBuffers;
});
for (int i = 0; i < 5; i++) {
ReadOnlyMemChunk memChunk =
memTable.query(
"root.sg.device" + i,
"sensor" + i,
new MeasurementSchema(
"sensor" + i,
TSDataType.INT64,
TSEncoding.RLE,
CompressionType.UNCOMPRESSED,
Collections.emptyMap()),
Long.MIN_VALUE,
null);
IPointReader iterator = memChunk.getPointReader();
if (i == 0) {
assertFalse(iterator.hasNextTimeValuePair());
} else {
assertTrue(iterator.hasNextTimeValuePair());
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(i, timeValuePair.getTimestamp());
assertEquals(i, timeValuePair.getValue().getLong());
assertFalse(iterator.hasNextTimeValuePair());
}
}
Modification[] mods = modFile.getModifications().toArray(new Modification[0]);
assertEquals(1, mods.length);
assertEquals("root.sg.device0.sensor0", mods[0].getPathString());
assertEquals(0, mods[0].getFileOffset());
assertEquals(200, ((Deletion) mods[0]).getEndTime());
assertEquals(2, tsFileResource.getStartTime("root.sg.device0"));
assertEquals(100, tsFileResource.getEndTime("root.sg.device0"));
for (int i = 1; i < 5; i++) {
assertEquals(i, tsFileResource.getStartTime("root.sg.device" + i));
assertEquals(i, tsFileResource.getEndTime("root.sg.device" + i));
}
// test insert tablet
for (int i = 0; i < 2; i++) {
ReadOnlyMemChunk memChunk =
memTable.query(
"root.sg.device5",
"sensor" + i,
new MeasurementSchema(
"sensor" + i,
TSDataType.INT64,
TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED,
Collections.emptyMap()),
Long.MIN_VALUE,
null);
// s0 has datatype boolean, but required INT64, will return null
if (i == 0) {
assertNull(memChunk);
} else {
IPointReader iterator = memChunk.getPointReader();
iterator.hasNextTimeValuePair();
for (int time = 0; time < 100; time++) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(time, timeValuePair.getValue().getLong());
}
}
}
} finally {
modFile.close();
MultiFileLogNodeManager.getInstance()
.deleteNode(
logNodePrefix + tsFile.getName(),
(ByteBuffer[] byteBuffers) -> {
for (ByteBuffer byteBuffer : byteBuffers) {
MmapUtil.clean((MappedByteBuffer) byteBuffer);
}
});
modF.delete();
tsFile.delete();
tsFile.getParentFile().delete();
}
}
/**
* insert tablet plan, time series expected datatype is INT64 s0 is set to boolean, it will output
* null value s1 is set to INT64, it will output its value
*
* @return
* @throws IllegalPathException
*/
public InsertTabletPlan insertTablePlan() throws IllegalPathException {
String[] measurements = new String[2];
measurements[0] = "sensor0";
measurements[1] = "sensor1";
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.BOOLEAN.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
String deviceId = "root.sg.device5";
IMeasurementMNode[] mNodes = new IMeasurementMNode[2];
mNodes[0] = new MeasurementMNode(null, "sensor0", null, null);
mNodes[1] = new MeasurementMNode(null, "sensor1", null, null);
InsertTabletPlan insertTabletPlan =
new InsertTabletPlan(new PartialPath(deviceId), measurements, dataTypes);
long[] times = new long[100];
Object[] columns = new Object[2];
columns[0] = new boolean[100];
columns[1] = new long[100];
for (long r = 0; r < 100; r++) {
times[(int) r] = r;
((boolean[]) columns[0])[(int) r] = false;
((long[]) columns[1])[(int) r] = r;
}
insertTabletPlan.setTimes(times);
insertTabletPlan.setColumns(columns);
insertTabletPlan.setRowCount(times.length);
insertTabletPlan.setMeasurementMNodes(mNodes);
insertTabletPlan.setStart(0);
insertTabletPlan.setEnd(100);
return insertTabletPlan;
}
}