blob: f8fbe1542af8b451f831f1f39df4323f773ebd21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.storagegroup;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionTask;
import org.apache.iotdb.db.engine.compaction.log.CompactionLogger;
import org.apache.iotdb.db.engine.compaction.performer.impl.ReadChunkCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.TriggerExecutionException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class DataRegionTest {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final Logger logger = LoggerFactory.getLogger(DataRegionTest.class);
private String storageGroup = "root.vehicle.d0";
private String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
private String deviceId = "root.vehicle.d0";
private String measurementId = "s0";
private DataRegion dataRegion;
private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
@Before
public void setUp() throws Exception {
MetadataManagerHelper.initMetadata();
EnvironmentUtils.envSetUp();
dataRegion = new DummyDataRegion(systemDir, storageGroup);
CompactionTaskManager.getInstance().start();
}
@After
public void tearDown() throws Exception {
dataRegion.syncDeleteDataFiles();
EnvironmentUtils.cleanEnv();
EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
CompactionTaskManager.getInstance().stop();
EnvironmentUtils.cleanEnv();
}
public static InsertRowNode buildInsertRowNodeByTSRecord(TSRecord record)
throws IllegalPathException {
String[] measurements = new String[record.dataPointList.size()];
MeasurementSchema[] measurementSchemas = new MeasurementSchema[record.dataPointList.size()];
TSDataType[] dataTypes = new TSDataType[record.dataPointList.size()];
Object[] values = new Object[record.dataPointList.size()];
for (int i = 0; i < record.dataPointList.size(); i++) {
measurements[i] = record.dataPointList.get(i).getMeasurementId();
measurementSchemas[i] =
new MeasurementSchema(
measurements[i],
record.dataPointList.get(i).getType(),
TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED);
dataTypes[i] = record.dataPointList.get(i).getType();
values[i] = record.dataPointList.get(i).getValue();
}
QueryId queryId = new QueryId("test_write");
InsertRowNode insertRowNode =
new InsertRowNode(
queryId.genPlanNodeId(),
new PartialPath(record.deviceId),
false,
measurements,
dataTypes,
record.time,
values,
false);
insertRowNode.setMeasurementSchemas(measurementSchemas);
return insertRowNode;
}
@Test
public void testUnseqUnsealedDelete()
throws WriteProcessException, IOException, MetadataException, TriggerExecutionException {
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
dataRegion.syncCloseAllWorkingTsFileProcessors();
for (int j = 1; j <= 10; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
}
for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
tsfileProcessor.syncFlush();
}
for (int j = 11; j <= 20; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
}
PartialPath fullPath =
new MeasurementPath(
deviceId,
measurementId,
new MeasurementSchema(
measurementId,
TSDataType.INT32,
TSEncoding.RLE,
CompressionType.UNCOMPRESSED,
Collections.emptyMap()));
dataRegion.delete(new PartialPath(deviceId, measurementId), 0, 15L, -1, null);
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
tsfileProcessor.query(
Collections.singletonList(fullPath),
EnvironmentUtils.TEST_QUERY_CONTEXT,
tsfileResourcesForQuery);
}
Assert.assertEquals(1, tsfileResourcesForQuery.size());
List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath);
long time = 16;
for (ReadOnlyMemChunk memChunk : memChunks) {
IPointReader iterator = memChunk.getPointReader();
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
Assert.assertEquals(time++, timeValuePair.getTimestamp());
}
}
}
@Test
public void testSequenceSyncClose()
throws WriteProcessException, QueryProcessException, IllegalPathException,
TriggerExecutionException {
for (int j = 1; j <= 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
dataRegion.asyncCloseAllWorkingTsFileProcessors();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
dataRegion.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
dataRegion.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
}
@Test
public void testInsertDataAndRemovePartitionAndInsert()
throws WriteProcessException, QueryProcessException, IllegalPathException,
TriggerExecutionException {
for (int j = 0; j < 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
dataRegion.asyncCloseAllWorkingTsFileProcessors();
}
dataRegion.syncCloseAllWorkingTsFileProcessors();
dataRegion.removePartitions((storageGroupName, timePartitionId) -> true);
for (int j = 0; j < 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
dataRegion.asyncCloseAllWorkingTsFileProcessors();
}
dataRegion.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
dataRegion.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
}
@Test
public void testIoTDBTabletWriteAndSyncClose()
throws QueryProcessException, IllegalPathException, TriggerExecutionException,
WriteProcessException {
String[] measurements = new String[2];
measurements[0] = "s0";
measurements[1] = "s1";
TSDataType[] dataTypes = new TSDataType[2];
dataTypes[0] = TSDataType.INT32;
dataTypes[1] = TSDataType.INT64;
MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN);
measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN);
long[] times = new long[100];
Object[] columns = new Object[2];
columns[0] = new int[100];
columns[1] = new long[100];
for (int r = 0; r < 100; r++) {
times[r] = r;
((int[]) columns[0])[r] = 1;
((long[]) columns[1])[r] = 1;
}
InsertTabletNode insertTabletNode1 =
new InsertTabletNode(
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
measurements,
dataTypes,
times,
null,
columns,
times.length);
insertTabletNode1.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 50; r < 149; r++) {
times[r - 50] = r;
((int[]) columns[0])[r - 50] = 1;
((long[]) columns[1])[r - 50] = 1;
}
InsertTabletNode insertTabletNode2 =
new InsertTabletNode(
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
measurements,
dataTypes,
times,
null,
columns,
times.length);
insertTabletNode2.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
dataRegion.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(1, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
}
@Test
public void testSeqAndUnSeqSyncClose()
throws WriteProcessException, QueryProcessException, IllegalPathException,
TriggerExecutionException {
for (int j = 21; j <= 30; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
dataRegion.asyncCloseAllWorkingTsFileProcessors();
}
dataRegion.syncCloseAllWorkingTsFileProcessors();
for (int j = 10; j >= 1; j--) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
dataRegion.asyncCloseAllWorkingTsFileProcessors();
}
dataRegion.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
dataRegion.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
Assert.assertTrue(resource.isClosed());
}
}
@Test
public void testEnableDiscardOutOfOrderDataForInsertRowPlan()
throws WriteProcessException, QueryProcessException, IllegalPathException, IOException,
TriggerExecutionException {
boolean defaultValue = config.isEnableDiscardOutOfOrderData();
config.setEnableDiscardOutOfOrderData(true);
for (int j = 21; j <= 30; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
dataRegion.asyncCloseAllWorkingTsFileProcessors();
}
dataRegion.syncCloseAllWorkingTsFileProcessors();
for (int j = 10; j >= 1; j--) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
dataRegion.asyncCloseAllWorkingTsFileProcessors();
}
dataRegion.syncCloseAllWorkingTsFileProcessors();
for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
tsfileProcessor.syncFlush();
}
QueryDataSource queryDataSource =
dataRegion.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
Assert.assertTrue(resource.isClosed());
}
config.setEnableDiscardOutOfOrderData(defaultValue);
}
@Test
public void testEnableDiscardOutOfOrderDataForInsertTablet1()
throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException,
WriteProcessException {
boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
long defaultTimePartition = config.getPartitionInterval();
boolean defaultEnablePartition = config.isEnablePartition();
config.setEnableDiscardOutOfOrderData(true);
config.setEnablePartition(true);
config.setPartitionInterval(100);
String[] measurements = new String[2];
measurements[0] = "s0";
measurements[1] = "s1";
TSDataType[] dataTypes = new TSDataType[2];
dataTypes[0] = TSDataType.INT32;
dataTypes[1] = TSDataType.INT64;
MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN);
measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN);
long[] times = new long[100];
Object[] columns = new Object[2];
columns[0] = new int[100];
columns[1] = new long[100];
for (int r = 0; r < 100; r++) {
times[r] = r;
((int[]) columns[0])[r] = 1;
((long[]) columns[1])[r] = 1;
}
InsertTabletNode insertTabletNode1 =
new InsertTabletNode(
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
measurements,
dataTypes,
times,
null,
columns,
times.length);
insertTabletNode1.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 149; r >= 50; r--) {
times[r - 50] = r;
((int[]) columns[0])[r - 50] = 1;
((long[]) columns[1])[r - 50] = 1;
}
InsertTabletNode insertTabletNode2 =
new InsertTabletNode(
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
measurements,
dataTypes,
times,
null,
columns,
times.length);
insertTabletNode2.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
tsfileProcessor.syncFlush();
}
QueryDataSource queryDataSource =
dataRegion.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
config.setPartitionInterval(defaultTimePartition);
config.setEnablePartition(defaultEnablePartition);
}
@Test
public void testEnableDiscardOutOfOrderDataForInsertTablet2()
throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException,
WriteProcessException {
boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
long defaultTimePartition = config.getPartitionInterval();
boolean defaultEnablePartition = config.isEnablePartition();
config.setEnableDiscardOutOfOrderData(true);
config.setEnablePartition(true);
config.setPartitionInterval(1200);
String[] measurements = new String[2];
measurements[0] = "s0";
measurements[1] = "s1";
TSDataType[] dataTypes = new TSDataType[2];
dataTypes[0] = TSDataType.INT32;
dataTypes[1] = TSDataType.INT64;
MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN);
measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN);
long[] times = new long[1200];
Object[] columns = new Object[2];
columns[0] = new int[1200];
columns[1] = new long[1200];
for (int r = 0; r < 1200; r++) {
times[r] = r;
((int[]) columns[0])[r] = 1;
((long[]) columns[1])[r] = 1;
}
InsertTabletNode insertTabletNode1 =
new InsertTabletNode(
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
measurements,
dataTypes,
times,
null,
columns,
times.length);
insertTabletNode1.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 1249; r >= 50; r--) {
times[r - 50] = r;
((int[]) columns[0])[r - 50] = 1;
((long[]) columns[1])[r - 50] = 1;
}
InsertTabletNode insertTabletNode2 =
new InsertTabletNode(
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
measurements,
dataTypes,
times,
null,
columns,
times.length);
insertTabletNode2.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
tsfileProcessor.syncFlush();
}
QueryDataSource queryDataSource =
dataRegion.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
config.setPartitionInterval(defaultTimePartition);
config.setEnablePartition(defaultEnablePartition);
}
@Test
public void testEnableDiscardOutOfOrderDataForInsertTablet3()
throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException,
WriteProcessException {
boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
long defaultTimePartition = config.getPartitionInterval();
boolean defaultEnablePartition = config.isEnablePartition();
config.setEnableDiscardOutOfOrderData(true);
config.setEnablePartition(true);
config.setPartitionInterval(1000);
String[] measurements = new String[2];
measurements[0] = "s0";
measurements[1] = "s1";
TSDataType[] dataTypes = new TSDataType[2];
dataTypes[0] = TSDataType.INT32;
dataTypes[1] = TSDataType.INT64;
MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN);
measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN);
long[] times = new long[1200];
Object[] columns = new Object[2];
columns[0] = new int[1200];
columns[1] = new long[1200];
for (int r = 0; r < 1200; r++) {
times[r] = r;
((int[]) columns[0])[r] = 1;
((long[]) columns[1])[r] = 1;
}
InsertTabletNode insertTabletNode1 =
new InsertTabletNode(
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
measurements,
dataTypes,
times,
null,
columns,
times.length);
insertTabletNode1.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 1249; r >= 50; r--) {
times[r - 50] = r;
((int[]) columns[0])[r - 50] = 1;
((long[]) columns[1])[r - 50] = 1;
}
InsertTabletNode insertTabletNode2 =
new InsertTabletNode(
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
measurements,
dataTypes,
times,
null,
columns,
times.length);
insertTabletNode2.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
tsfileProcessor.syncFlush();
}
QueryDataSource queryDataSource =
dataRegion.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
config.setPartitionInterval(defaultTimePartition);
config.setEnablePartition(defaultEnablePartition);
}
@Test
public void testMerge()
throws WriteProcessException, QueryProcessException, IllegalPathException,
TriggerExecutionException {
int originCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(9);
boolean originEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
boolean originEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
for (int j = 21; j <= 30; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
dataRegion.asyncCloseAllWorkingTsFileProcessors();
}
dataRegion.syncCloseAllWorkingTsFileProcessors();
for (int j = 10; j >= 1; j--) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
dataRegion.asyncCloseAllWorkingTsFileProcessors();
}
dataRegion.syncCloseAllWorkingTsFileProcessors();
dataRegion.compact();
long totalWaitingTime = 0;
do {
// wait
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
totalWaitingTime += 100;
if (totalWaitingTime % 1000 == 0) {
logger.warn("has waited for {} seconds", totalWaitingTime / 1000);
}
if (totalWaitingTime > 120_000) {
Assert.fail();
break;
}
} while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0);
QueryDataSource queryDataSource =
dataRegion.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
Assert.assertTrue(resource.isClosed());
}
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(originCandidateFileNum);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(originEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(originEnableUnseqSpaceCompaction);
}
@Test
public void testDeleteStorageGroupWhenCompacting() throws Exception {
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(10);
try {
for (int j = 0; j < 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
dataRegion.asyncCloseAllWorkingTsFileProcessors();
}
dataRegion.syncCloseAllWorkingTsFileProcessors();
InnerSpaceCompactionTask task =
new InnerSpaceCompactionTask(
0,
dataRegion.getTsFileManager(),
dataRegion.getSequenceFileList(),
true,
new ReadChunkCompactionPerformer(dataRegion.getSequenceFileList()),
new AtomicInteger(0),
0);
CompactionTaskManager.getInstance().addTaskToWaitingQueue(task);
Thread.sleep(20);
StorageEngine.getInstance().deleteStorageGroup(new PartialPath(storageGroup));
Thread.sleep(500);
for (TsFileResource resource : dataRegion.getSequenceFileList()) {
Assert.assertFalse(resource.getTsFile().exists());
}
TsFileResource targetTsFileResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(
dataRegion.getSequenceFileList(), true);
Assert.assertFalse(targetTsFileResource.getTsFile().exists());
String dataDirectory = targetTsFileResource.getTsFile().getParent();
File logFile =
new File(
dataDirectory
+ File.separator
+ targetTsFileResource.getTsFile().getName()
+ CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
Assert.assertFalse(logFile.exists());
Assert.assertFalse(CommonDescriptor.getInstance().getConfig().isReadOnly());
Assert.assertTrue(dataRegion.getTsFileManager().isAllowCompaction());
} finally {
new CompactionConfigRestorer().restoreCompactionConfig();
}
}
@Test
public void testTimedFlushSeqMemTable()
throws IllegalPathException, InterruptedException, WriteProcessException,
TriggerExecutionException, ShutdownException {
// create one sequence memtable
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
// change config & reboot timed service
boolean prevEnableTimedFlushSeqMemtable = config.isEnableTimedFlushSeqMemtable();
long preFLushInterval = config.getSeqMemtableFlushInterval();
config.setEnableTimedFlushSeqMemtable(true);
config.setSeqMemtableFlushInterval(5);
StorageEngine.getInstance().rebootTimedService();
Thread.sleep(500);
Assert.assertEquals(1, dataRegion.getWorkSequenceTsFileProcessors().size());
TsFileProcessor tsFileProcessor =
dataRegion.getWorkSequenceTsFileProcessors().iterator().next();
FlushManager flushManager = FlushManager.getInstance();
// flush the sequence memtable
dataRegion.timedFlushSeqMemTable();
// wait until memtable flush task is done
int waitCnt = 0;
while (tsFileProcessor.getFlushingMemTableSize() != 0
|| tsFileProcessor.isManagedByFlushManager()
|| flushManager.getNumberOfPendingTasks() != 0
|| flushManager.getNumberOfPendingSubTasks() != 0
|| flushManager.getNumberOfWorkingTasks() != 0
|| flushManager.getNumberOfWorkingSubTasks() != 0) {
Thread.sleep(500);
++waitCnt;
if (waitCnt % 10 == 0) {
logger.info("already wait {} s", waitCnt / 2);
}
}
Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
config.setEnableTimedFlushSeqMemtable(prevEnableTimedFlushSeqMemtable);
config.setSeqMemtableFlushInterval(preFLushInterval);
}
@Test
public void testTimedFlushUnseqMemTable()
throws IllegalPathException, InterruptedException, WriteProcessException,
TriggerExecutionException, ShutdownException {
// create one sequence memtable & close
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
dataRegion.syncCloseAllWorkingTsFileProcessors();
Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
// create one unsequence memtable
record = new TSRecord(1, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
// change config & reboot timed service
boolean prevEnableTimedFlushUnseqMemtable = config.isEnableTimedFlushUnseqMemtable();
long preFLushInterval = config.getUnseqMemtableFlushInterval();
config.setEnableTimedFlushUnseqMemtable(true);
config.setUnseqMemtableFlushInterval(5);
StorageEngine.getInstance().rebootTimedService();
Thread.sleep(500);
Assert.assertEquals(1, dataRegion.getWorkUnsequenceTsFileProcessors().size());
TsFileProcessor tsFileProcessor =
dataRegion.getWorkUnsequenceTsFileProcessors().iterator().next();
FlushManager flushManager = FlushManager.getInstance();
// flush the unsequence memtable
dataRegion.timedFlushUnseqMemTable();
// wait until memtable flush task is done
int waitCnt = 0;
while (tsFileProcessor.getFlushingMemTableSize() != 0
|| tsFileProcessor.isManagedByFlushManager()
|| flushManager.getNumberOfPendingTasks() != 0
|| flushManager.getNumberOfPendingSubTasks() != 0
|| flushManager.getNumberOfWorkingTasks() != 0
|| flushManager.getNumberOfWorkingSubTasks() != 0) {
Thread.sleep(500);
++waitCnt;
if (waitCnt % 10 == 0) {
logger.info("already wait {} s", waitCnt / 2);
}
}
Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
config.setEnableTimedFlushUnseqMemtable(prevEnableTimedFlushUnseqMemtable);
config.setUnseqMemtableFlushInterval(preFLushInterval);
}
/**
* Totally 5 tsfiles<br>
* file 0, file 2 and file 4 has d0 ~ d1, time range is 0 ~ 99, 200 ~ 299, 400 ~ 499<br>
* file 1, file 3 has d0 ~ d2, time range is 100 ~ 199, 300 ~ 399<br>
* delete d2 in time range 50 ~ 150 and 150 ~ 450. Therefore, only file 1 and file 3 has mods.
*/
@Test
public void testDeleteDataNotInFile()
throws IllegalPathException, WriteProcessException, TriggerExecutionException,
InterruptedException, IOException {
for (int i = 0; i < 5; i++) {
if (i % 2 == 0) {
for (int d = 0; d < 2; d++) {
for (int count = i * 100; count < i * 100 + 100; count++) {
TSRecord record = new TSRecord(count, "root.vehicle.d" + d);
record.addTuple(
DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(count)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
}
}
} else {
for (int d = 0; d < 3; d++) {
for (int count = i * 100; count < i * 100 + 100; count++) {
TSRecord record = new TSRecord(count, "root.vehicle.d" + d);
record.addTuple(
DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(count)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
}
}
}
dataRegion.syncCloseAllWorkingTsFileProcessors();
}
// delete root.vehicle.d2.s0 data in the second file
dataRegion.delete(new PartialPath("root.vehicle.d2.s0"), 50, 150, 0, null);
// delete root.vehicle.d2.s0 data in the third file
dataRegion.delete(new PartialPath("root.vehicle.d2.s0"), 150, 450, 0, null);
for (int i = 0; i < dataRegion.getSequenceFileList().size(); i++) {
TsFileResource resource = dataRegion.getSequenceFileList().get(i);
if (i == 1) {
Assert.assertTrue(resource.getModFile().exists());
Assert.assertEquals(2, resource.getModFile().getModifications().size());
} else if (i == 3) {
Assert.assertTrue(resource.getModFile().exists());
Assert.assertEquals(1, resource.getModFile().getModifications().size());
} else {
Assert.assertFalse(resource.getModFile().exists());
}
}
StorageEngine.getInstance().deleteStorageGroup(new PartialPath(storageGroup));
Thread.sleep(500);
for (TsFileResource resource : dataRegion.getSequenceFileList()) {
Assert.assertFalse(resource.getTsFile().exists());
}
}
@Test
public void testDeleteDataNotInFlushingMemtable()
throws IllegalPathException, WriteProcessException, TriggerExecutionException, IOException {
for (int j = 0; j < 100; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
}
TsFileResource tsFileResource = dataRegion.getTsFileManager().getTsFileList(true).get(0);
TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
tsFileProcessor.getFlushingMemTable().addLast(tsFileProcessor.getWorkMemTable());
// delete data which is in memtable
dataRegion.delete(new PartialPath("root.vehicle.d2.s0"), 50, 70, 0, null);
// delete data which is not in memtable
dataRegion.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
dataRegion.syncCloseAllWorkingTsFileProcessors();
Assert.assertFalse(tsFileResource.getModFile().exists());
}
@Test
public void testDeleteDataInSeqFlushingMemtable()
throws IllegalPathException, WriteProcessException, TriggerExecutionException, IOException {
for (int j = 100; j < 200; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
}
TsFileResource tsFileResource = dataRegion.getTsFileManager().getTsFileList(true).get(0);
TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
tsFileProcessor.getFlushingMemTable().addLast(tsFileProcessor.getWorkMemTable());
// delete data which is not in flushing memtable
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 50, 99, 0, null);
dataRegion.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
// delete data which is in flushing memtable
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 50, 100, 0, null);
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 50, 150, 0, null);
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 100, 300, 0, null);
dataRegion.syncCloseAllWorkingTsFileProcessors();
Assert.assertTrue(tsFileResource.getModFile().exists());
Assert.assertEquals(3, tsFileResource.getModFile().getModifications().size());
}
@Test
public void testDeleteDataInUnSeqFlushingMemtable()
throws IllegalPathException, WriteProcessException, TriggerExecutionException, IOException {
for (int j = 100; j < 200; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
}
TsFileResource tsFileResource = dataRegion.getTsFileManager().getTsFileList(true).get(0);
// delete data which is not in work memtable
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 50, 99, 0, null);
dataRegion.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
// delete data which is in work memtable
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 50, 100, 0, null);
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 50, 150, 0, null);
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 100, 300, 0, null);
dataRegion.syncCloseAllWorkingTsFileProcessors();
Assert.assertFalse(tsFileResource.getModFile().exists());
// insert unseq data points
for (int j = 50; j < 100; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
}
// delete data which is not in work memtable
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 200, 299, 0, null);
dataRegion.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
// delete data which is in work memtable
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 80, 85, 0, null);
Assert.assertFalse(tsFileResource.getModFile().exists());
tsFileResource = dataRegion.getTsFileManager().getTsFileList(false).get(0);
TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
tsFileProcessor.getFlushingMemTable().addLast(tsFileProcessor.getWorkMemTable());
// delete data which is not in flushing memtable
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 0, 49, 0, null);
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 100, 200, 0, null);
dataRegion.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
// delete data which is in flushing memtable
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 25, 50, 0, null);
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 50, 80, 0, null);
dataRegion.delete(new PartialPath("root.vehicle.d0.s0"), 99, 150, 0, null);
dataRegion.syncCloseAllWorkingTsFileProcessors();
Assert.assertTrue(tsFileResource.getModFile().exists());
Assert.assertEquals(3, tsFileResource.getModFile().getModifications().size());
}
static class DummyDataRegion extends DataRegion {
DummyDataRegion(String systemInfoDir, String storageGroupName) throws DataRegionException {
super(systemInfoDir, "0", new TsFileFlushPolicy.DirectFlushPolicy(), storageGroupName);
}
}
}