blob: 7b78c33eba36eb48e368dfd280903ccd4976de08 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.storagegroup;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionTask;
import org.apache.iotdb.db.engine.compaction.log.CompactionLogger;
import org.apache.iotdb.db.engine.compaction.performer.impl.ReadChunkCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.TriggerExecutionException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class StorageGroupProcessorTest {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static Logger logger = LoggerFactory.getLogger(StorageGroupProcessorTest.class);
private String storageGroup = "root.vehicle.d0";
private String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
private String deviceId = "root.vehicle.d0";
private String measurementId = "s0";
private DataRegion processor;
private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
@Before
public void setUp() throws Exception {
MetadataManagerHelper.initMetadata();
EnvironmentUtils.envSetUp();
processor = new DummySGP(systemDir, storageGroup);
CompactionTaskManager.getInstance().start();
}
@After
public void tearDown() throws Exception {
processor.syncDeleteDataFiles();
EnvironmentUtils.cleanEnv();
EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
CompactionTaskManager.getInstance().stop();
EnvironmentUtils.cleanEnv();
}
private void insertToStorageGroupProcessor(TSRecord record)
throws WriteProcessException, IllegalPathException, TriggerExecutionException {
InsertRowPlan insertRowPlan = new InsertRowPlan(record);
processor.insert(insertRowPlan);
}
@Test
public void testUnseqUnsealedDelete()
throws WriteProcessException, IOException, MetadataException, TriggerExecutionException {
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
processor.insert(new InsertRowPlan(record));
processor.syncCloseAllWorkingTsFileProcessors();
for (int j = 1; j <= 10; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertRowPlan(record));
}
for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) {
tsfileProcessor.syncFlush();
}
for (int j = 11; j <= 20; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertRowPlan(record));
}
PartialPath fullPath =
new MeasurementPath(
deviceId,
measurementId,
new MeasurementSchema(
measurementId,
TSDataType.INT32,
TSEncoding.RLE,
CompressionType.UNCOMPRESSED,
Collections.emptyMap()));
processor.delete(new PartialPath(deviceId, measurementId), 0, 15L, -1, null);
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) {
tsfileProcessor.query(
Collections.singletonList(fullPath),
EnvironmentUtils.TEST_QUERY_CONTEXT,
tsfileResourcesForQuery);
}
Assert.assertEquals(1, tsfileResourcesForQuery.size());
List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath);
long time = 16;
for (ReadOnlyMemChunk memChunk : memChunks) {
IPointReader iterator = memChunk.getPointReader();
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
Assert.assertEquals(time++, timeValuePair.getTimestamp());
}
}
}
@Test
public void testSequenceSyncClose()
throws WriteProcessException, QueryProcessException, IllegalPathException,
TriggerExecutionException {
for (int j = 1; j <= 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertRowPlan(record));
processor.asyncCloseAllWorkingTsFileProcessors();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
}
@Test
public void testInsertDataAndRemovePartitionAndInsert()
throws WriteProcessException, QueryProcessException, IllegalPathException,
TriggerExecutionException {
for (int j = 0; j < 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertRowPlan(record));
processor.asyncCloseAllWorkingTsFileProcessors();
}
processor.syncCloseAllWorkingTsFileProcessors();
processor.removePartitions((storageGroupName, timePartitionId) -> true);
for (int j = 0; j < 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertRowPlan(record));
processor.asyncCloseAllWorkingTsFileProcessors();
}
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
}
@Test
public void testIoTDBTabletWriteAndSyncClose()
throws QueryProcessException, IllegalPathException, TriggerExecutionException {
String[] measurements = new String[2];
measurements[0] = "s0";
measurements[1] = "s1";
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.INT32.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
IMeasurementMNode[] measurementMNodes = new IMeasurementMNode[2];
measurementMNodes[0] =
MeasurementMNode.getMeasurementMNode(
null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
measurementMNodes[1] =
MeasurementMNode.getMeasurementMNode(
null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
InsertTabletPlan insertTabletPlan1 =
new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
insertTabletPlan1.setMeasurementMNodes(measurementMNodes);
long[] times = new long[100];
Object[] columns = new Object[2];
columns[0] = new int[100];
columns[1] = new long[100];
for (int r = 0; r < 100; r++) {
times[r] = r;
((int[]) columns[0])[r] = 1;
((long[]) columns[1])[r] = 1;
}
insertTabletPlan1.setTimes(times);
insertTabletPlan1.setColumns(columns);
insertTabletPlan1.setRowCount(times.length);
processor.insertTablet(insertTabletPlan1);
processor.asyncCloseAllWorkingTsFileProcessors();
InsertTabletPlan insertTabletPlan2 =
new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
insertTabletPlan2.setMeasurementMNodes(measurementMNodes);
for (int r = 50; r < 149; r++) {
times[r - 50] = r;
((int[]) columns[0])[r - 50] = 1;
((long[]) columns[1])[r - 50] = 1;
}
insertTabletPlan2.setTimes(times);
insertTabletPlan2.setColumns(columns);
insertTabletPlan2.setRowCount(times.length);
processor.insertTablet(insertTabletPlan2);
processor.asyncCloseAllWorkingTsFileProcessors();
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(1, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
}
@Test
public void testSeqAndUnSeqSyncClose()
throws WriteProcessException, QueryProcessException, IllegalPathException,
TriggerExecutionException {
for (int j = 21; j <= 30; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertRowPlan(record));
processor.asyncCloseAllWorkingTsFileProcessors();
}
processor.syncCloseAllWorkingTsFileProcessors();
for (int j = 10; j >= 1; j--) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertRowPlan(record));
processor.asyncCloseAllWorkingTsFileProcessors();
}
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
Assert.assertTrue(resource.isClosed());
}
}
@Test
public void testEnableDiscardOutOfOrderDataForInsertRowPlan()
throws WriteProcessException, QueryProcessException, IllegalPathException, IOException,
TriggerExecutionException {
boolean defaultValue = config.isEnableDiscardOutOfOrderData();
config.setEnableDiscardOutOfOrderData(true);
for (int j = 21; j <= 30; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
insertToStorageGroupProcessor(record);
processor.asyncCloseAllWorkingTsFileProcessors();
}
processor.syncCloseAllWorkingTsFileProcessors();
for (int j = 10; j >= 1; j--) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
insertToStorageGroupProcessor(record);
processor.asyncCloseAllWorkingTsFileProcessors();
}
processor.syncCloseAllWorkingTsFileProcessors();
for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) {
tsfileProcessor.syncFlush();
}
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
Assert.assertTrue(resource.isClosed());
}
config.setEnableDiscardOutOfOrderData(defaultValue);
}
@Test
public void testEnableDiscardOutOfOrderDataForInsertTablet1()
throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException {
boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
long defaultTimePartition = config.getPartitionInterval();
boolean defaultEnablePartition = config.isEnablePartition();
config.setEnableDiscardOutOfOrderData(true);
config.setEnablePartition(true);
config.setPartitionInterval(100);
String[] measurements = new String[2];
measurements[0] = "s0";
measurements[1] = "s1";
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.INT32.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
IMeasurementMNode[] measurementMNodes = new IMeasurementMNode[2];
measurementMNodes[0] =
MeasurementMNode.getMeasurementMNode(
null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
measurementMNodes[1] =
MeasurementMNode.getMeasurementMNode(
null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
InsertTabletPlan insertTabletPlan1 =
new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
long[] times = new long[100];
Object[] columns = new Object[2];
columns[0] = new int[100];
columns[1] = new long[100];
for (int r = 0; r < 100; r++) {
times[r] = r;
((int[]) columns[0])[r] = 1;
((long[]) columns[1])[r] = 1;
}
insertTabletPlan1.setTimes(times);
insertTabletPlan1.setColumns(columns);
insertTabletPlan1.setRowCount(times.length);
insertTabletPlan1.setMeasurementMNodes(measurementMNodes);
processor.insertTablet(insertTabletPlan1);
processor.asyncCloseAllWorkingTsFileProcessors();
InsertTabletPlan insertTabletPlan2 =
new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
for (int r = 149; r >= 50; r--) {
times[r - 50] = r;
((int[]) columns[0])[r - 50] = 1;
((long[]) columns[1])[r - 50] = 1;
}
insertTabletPlan2.setTimes(times);
insertTabletPlan2.setColumns(columns);
insertTabletPlan2.setRowCount(times.length);
insertTabletPlan2.setMeasurementMNodes(measurementMNodes);
processor.insertTablet(insertTabletPlan2);
processor.asyncCloseAllWorkingTsFileProcessors();
processor.syncCloseAllWorkingTsFileProcessors();
for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) {
tsfileProcessor.syncFlush();
}
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
config.setPartitionInterval(defaultTimePartition);
config.setEnablePartition(defaultEnablePartition);
}
@Test
public void testEnableDiscardOutOfOrderDataForInsertTablet2()
throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException {
boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
long defaultTimePartition = config.getPartitionInterval();
boolean defaultEnablePartition = config.isEnablePartition();
config.setEnableDiscardOutOfOrderData(true);
config.setEnablePartition(true);
config.setPartitionInterval(1200);
String[] measurements = new String[2];
measurements[0] = "s0";
measurements[1] = "s1";
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.INT32.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
IMeasurementMNode[] measurementMNodes = new IMeasurementMNode[2];
measurementMNodes[0] =
MeasurementMNode.getMeasurementMNode(
null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
measurementMNodes[1] =
MeasurementMNode.getMeasurementMNode(
null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
InsertTabletPlan insertTabletPlan1 =
new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
long[] times = new long[1200];
Object[] columns = new Object[2];
columns[0] = new int[1200];
columns[1] = new long[1200];
for (int r = 0; r < 1200; r++) {
times[r] = r;
((int[]) columns[0])[r] = 1;
((long[]) columns[1])[r] = 1;
}
insertTabletPlan1.setTimes(times);
insertTabletPlan1.setColumns(columns);
insertTabletPlan1.setRowCount(times.length);
insertTabletPlan1.setMeasurementMNodes(measurementMNodes);
processor.insertTablet(insertTabletPlan1);
processor.asyncCloseAllWorkingTsFileProcessors();
InsertTabletPlan insertTabletPlan2 =
new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
for (int r = 1249; r >= 50; r--) {
times[r - 50] = r;
((int[]) columns[0])[r - 50] = 1;
((long[]) columns[1])[r - 50] = 1;
}
insertTabletPlan2.setTimes(times);
insertTabletPlan2.setColumns(columns);
insertTabletPlan2.setRowCount(times.length);
insertTabletPlan2.setMeasurementMNodes(measurementMNodes);
processor.insertTablet(insertTabletPlan2);
processor.asyncCloseAllWorkingTsFileProcessors();
processor.syncCloseAllWorkingTsFileProcessors();
for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) {
tsfileProcessor.syncFlush();
}
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
config.setPartitionInterval(defaultTimePartition);
config.setEnablePartition(defaultEnablePartition);
}
@Test
public void testEnableDiscardOutOfOrderDataForInsertTablet3()
throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException {
boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
long defaultTimePartition = config.getPartitionInterval();
boolean defaultEnablePartition = config.isEnablePartition();
config.setEnableDiscardOutOfOrderData(true);
config.setEnablePartition(true);
config.setPartitionInterval(1000);
String[] measurements = new String[2];
measurements[0] = "s0";
measurements[1] = "s1";
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.INT32.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
IMeasurementMNode[] measurementMNodes = new IMeasurementMNode[2];
measurementMNodes[0] =
MeasurementMNode.getMeasurementMNode(
null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
measurementMNodes[1] =
MeasurementMNode.getMeasurementMNode(
null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
InsertTabletPlan insertTabletPlan1 =
new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
long[] times = new long[1200];
Object[] columns = new Object[2];
columns[0] = new int[1200];
columns[1] = new long[1200];
for (int r = 0; r < 1200; r++) {
times[r] = r;
((int[]) columns[0])[r] = 1;
((long[]) columns[1])[r] = 1;
}
insertTabletPlan1.setTimes(times);
insertTabletPlan1.setColumns(columns);
insertTabletPlan1.setRowCount(times.length);
insertTabletPlan1.setMeasurementMNodes(measurementMNodes);
processor.insertTablet(insertTabletPlan1);
processor.asyncCloseAllWorkingTsFileProcessors();
InsertTabletPlan insertTabletPlan2 =
new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
for (int r = 1249; r >= 50; r--) {
times[r - 50] = r;
((int[]) columns[0])[r - 50] = 1;
((long[]) columns[1])[r - 50] = 1;
}
insertTabletPlan2.setTimes(times);
insertTabletPlan2.setColumns(columns);
insertTabletPlan2.setRowCount(times.length);
insertTabletPlan2.setMeasurementMNodes(measurementMNodes);
processor.insertTablet(insertTabletPlan2);
processor.asyncCloseAllWorkingTsFileProcessors();
processor.syncCloseAllWorkingTsFileProcessors();
for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) {
tsfileProcessor.syncFlush();
}
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
config.setPartitionInterval(defaultTimePartition);
config.setEnablePartition(defaultEnablePartition);
}
@Test
public void testMerge()
throws WriteProcessException, QueryProcessException, IllegalPathException,
TriggerExecutionException {
int originCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(9);
boolean originEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
boolean originEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
for (int j = 21; j <= 30; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertRowPlan(record));
processor.asyncCloseAllWorkingTsFileProcessors();
}
processor.syncCloseAllWorkingTsFileProcessors();
for (int j = 10; j >= 1; j--) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertRowPlan(record));
processor.asyncCloseAllWorkingTsFileProcessors();
}
processor.syncCloseAllWorkingTsFileProcessors();
processor.compact();
long totalWaitingTime = 0;
do {
// wait
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
totalWaitingTime += 100;
if (totalWaitingTime % 1000 == 0) {
logger.warn("has waited for {} seconds", totalWaitingTime / 1000);
}
if (totalWaitingTime > 120_000) {
Assert.fail();
break;
}
} while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0);
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null,
null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
Assert.assertTrue(resource.isClosed());
}
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(originCandidateFileNum);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(originEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(originEnableUnseqSpaceCompaction);
}
@Test
public void testDeleteStorageGroupWhenCompacting() throws Exception {
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(10);
try {
for (int j = 0; j < 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertRowPlan(record));
processor.asyncCloseAllWorkingTsFileProcessors();
}
processor.syncCloseAllWorkingTsFileProcessors();
InnerSpaceCompactionTask task =
new InnerSpaceCompactionTask(
0,
processor.getTsFileManager(),
processor.getSequenceFileList(),
true,
new ReadChunkCompactionPerformer(processor.getSequenceFileList()),
new AtomicInteger(0),
0);
CompactionTaskManager.getInstance().addTaskToWaitingQueue(task);
Thread.sleep(20);
StorageEngine.getInstance().deleteStorageGroup(new PartialPath(storageGroup));
Thread.sleep(500);
for (TsFileResource resource : processor.getSequenceFileList()) {
Assert.assertFalse(resource.getTsFile().exists());
}
TsFileResource targetTsFileResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(
processor.getSequenceFileList(), true);
Assert.assertFalse(targetTsFileResource.getTsFile().exists());
String dataDirectory = targetTsFileResource.getTsFile().getParent();
File logFile =
new File(
dataDirectory
+ File.separator
+ targetTsFileResource.getTsFile().getName()
+ CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
Assert.assertFalse(logFile.exists());
Assert.assertFalse(CommonDescriptor.getInstance().getConfig().isReadOnly());
Assert.assertTrue(processor.getTsFileManager().isAllowCompaction());
} finally {
new CompactionConfigRestorer().restoreCompactionConfig();
}
}
@Test
public void testTimedFlushSeqMemTable()
throws IllegalPathException, InterruptedException, WriteProcessException,
TriggerExecutionException, ShutdownException {
// create one sequence memtable
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
processor.insert(new InsertRowPlan(record));
Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
// change config & reboot timed service
boolean prevEnableTimedFlushSeqMemtable = config.isEnableTimedFlushSeqMemtable();
long preFLushInterval = config.getSeqMemtableFlushInterval();
config.setEnableTimedFlushSeqMemtable(true);
config.setSeqMemtableFlushInterval(5);
StorageEngine.getInstance().rebootTimedService();
Thread.sleep(500);
Assert.assertEquals(1, processor.getWorkSequenceTsFileProcessors().size());
TsFileProcessor tsFileProcessor = processor.getWorkSequenceTsFileProcessors().iterator().next();
FlushManager flushManager = FlushManager.getInstance();
// flush the sequence memtable
processor.timedFlushSeqMemTable();
// wait until memtable flush task is done
int waitCnt = 0;
while (tsFileProcessor.getFlushingMemTableSize() != 0
|| tsFileProcessor.isManagedByFlushManager()
|| flushManager.getNumberOfPendingTasks() != 0
|| flushManager.getNumberOfPendingSubTasks() != 0
|| flushManager.getNumberOfWorkingTasks() != 0
|| flushManager.getNumberOfWorkingSubTasks() != 0) {
Thread.sleep(500);
++waitCnt;
if (waitCnt % 10 == 0) {
logger.info("already wait {} s", waitCnt / 2);
}
}
Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
config.setEnableTimedFlushSeqMemtable(prevEnableTimedFlushSeqMemtable);
config.setSeqMemtableFlushInterval(preFLushInterval);
}
@Test
public void testTimedFlushUnseqMemTable()
throws IllegalPathException, InterruptedException, WriteProcessException,
TriggerExecutionException, ShutdownException {
// create one sequence memtable & close
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
processor.insert(new InsertRowPlan(record));
Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
processor.syncCloseAllWorkingTsFileProcessors();
Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
// create one unsequence memtable
record = new TSRecord(1, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
processor.insert(new InsertRowPlan(record));
Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
// change config & reboot timed service
boolean prevEnableTimedFlushUnseqMemtable = config.isEnableTimedFlushUnseqMemtable();
long preFLushInterval = config.getUnseqMemtableFlushInterval();
config.setEnableTimedFlushUnseqMemtable(true);
config.setUnseqMemtableFlushInterval(5);
StorageEngine.getInstance().rebootTimedService();
Thread.sleep(500);
Assert.assertEquals(1, processor.getWorkUnsequenceTsFileProcessors().size());
TsFileProcessor tsFileProcessor =
processor.getWorkUnsequenceTsFileProcessors().iterator().next();
FlushManager flushManager = FlushManager.getInstance();
// flush the unsequence memtable
processor.timedFlushUnseqMemTable();
// wait until memtable flush task is done
int waitCnt = 0;
while (tsFileProcessor.getFlushingMemTableSize() != 0
|| tsFileProcessor.isManagedByFlushManager()
|| flushManager.getNumberOfPendingTasks() != 0
|| flushManager.getNumberOfPendingSubTasks() != 0
|| flushManager.getNumberOfWorkingTasks() != 0
|| flushManager.getNumberOfWorkingSubTasks() != 0) {
Thread.sleep(500);
++waitCnt;
if (waitCnt % 10 == 0) {
logger.info("already wait {} s", waitCnt / 2);
}
}
Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
config.setEnableTimedFlushUnseqMemtable(prevEnableTimedFlushUnseqMemtable);
config.setUnseqMemtableFlushInterval(preFLushInterval);
}
class DummySGP extends DataRegion {
DummySGP(String systemInfoDir, String storageGroupName) throws DataRegionException {
super(systemInfoDir, "0", new TsFileFlushPolicy.DirectFlushPolicy(), storageGroupName);
}
}
}