| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.storageengine.dataregion; |
| |
| import org.apache.iotdb.commons.conf.CommonConfig; |
| import org.apache.iotdb.commons.conf.CommonDescriptor; |
| import org.apache.iotdb.commons.consensus.DataRegionId; |
| import org.apache.iotdb.commons.exception.IllegalPathException; |
| import org.apache.iotdb.commons.exception.MetadataException; |
| import org.apache.iotdb.commons.exception.ShutdownException; |
| import org.apache.iotdb.commons.path.MeasurementPath; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.db.conf.IoTDBConfig; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.exception.DataRegionException; |
| import org.apache.iotdb.db.exception.WriteProcessException; |
| import org.apache.iotdb.db.exception.WriteProcessRejectException; |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.queryengine.common.QueryId; |
| import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; |
| import org.apache.iotdb.db.storageengine.StorageEngine; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionConfigRestorer; |
| import org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager; |
| import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; |
| import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; |
| import org.apache.iotdb.db.storageengine.rescon.memory.MemTableManager; |
| import org.apache.iotdb.db.utils.EnvironmentUtils; |
| import org.apache.iotdb.db.utils.constant.TestConstant; |
| import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID; |
| import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.iotdb.tsfile.read.TimeValuePair; |
| import org.apache.iotdb.tsfile.read.reader.IPointReader; |
| import org.apache.iotdb.tsfile.utils.BitMap; |
| import org.apache.iotdb.tsfile.write.record.TSRecord; |
| import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; |
| import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; |
| |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| |
| public class DataRegionTest { |
| private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); |
| private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); |
| private static final Logger logger = LoggerFactory.getLogger(DataRegionTest.class); |
| |
| private String storageGroup = "root.vehicle.d0"; |
| private String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info"); |
| private String deviceId = "root.vehicle.d0"; |
| private String measurementId = "s0"; |
| private DataRegion dataRegion; |
| private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT; |
| |
| @Before |
| public void setUp() throws Exception { |
| EnvironmentUtils.envSetUp(); |
| dataRegion = new DummyDataRegion(systemDir, storageGroup); |
| StorageEngine.getInstance().setDataRegion(new DataRegionId(0), dataRegion); |
| CompactionTaskManager.getInstance().start(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| if (dataRegion != null) { |
| dataRegion.syncDeleteDataFiles(); |
| StorageEngine.getInstance().deleteDataRegion(new DataRegionId(0)); |
| } |
| EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR); |
| CompactionTaskManager.getInstance().stop(); |
| EnvironmentUtils.cleanEnv(); |
| } |
| |
| public static InsertRowNode buildInsertRowNodeByTSRecord(TSRecord record) |
| throws IllegalPathException { |
| String[] measurements = new String[record.dataPointList.size()]; |
| MeasurementSchema[] measurementSchemas = new MeasurementSchema[record.dataPointList.size()]; |
| TSDataType[] dataTypes = new TSDataType[record.dataPointList.size()]; |
| Object[] values = new Object[record.dataPointList.size()]; |
| for (int i = 0; i < record.dataPointList.size(); i++) { |
| measurements[i] = record.dataPointList.get(i).getMeasurementId(); |
| measurementSchemas[i] = |
| new MeasurementSchema( |
| measurements[i], |
| record.dataPointList.get(i).getType(), |
| TSEncoding.PLAIN, |
| CompressionType.UNCOMPRESSED); |
| dataTypes[i] = record.dataPointList.get(i).getType(); |
| values[i] = record.dataPointList.get(i).getValue(); |
| } |
| QueryId queryId = new QueryId("test_write"); |
| InsertRowNode insertRowNode = |
| new InsertRowNode( |
| queryId.genPlanNodeId(), |
| new PartialPath(record.deviceId), |
| false, |
| measurements, |
| dataTypes, |
| record.time, |
| values, |
| false); |
| insertRowNode.setMeasurementSchemas(measurementSchemas); |
| return insertRowNode; |
| } |
| |
| @Test |
| public void testUnseqUnsealedDelete() |
| throws WriteProcessException, IOException, MetadataException { |
| TSRecord record = new TSRecord(10000, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (int j = 1; j <= 10; j++) { |
| record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| |
| for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) { |
| tsfileProcessor.syncFlush(); |
| } |
| |
| for (int j = 11; j <= 20; j++) { |
| record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| |
| PartialPath fullPath = |
| new MeasurementPath( |
| deviceId, |
| measurementId, |
| new MeasurementSchema( |
| measurementId, |
| TSDataType.INT32, |
| TSEncoding.RLE, |
| CompressionType.UNCOMPRESSED, |
| Collections.emptyMap())); |
| |
| dataRegion.deleteByDevice(new PartialPath(deviceId, measurementId), 0, 15L, -1); |
| |
| List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); |
| for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) { |
| tsfileProcessor.query( |
| Collections.singletonList(fullPath), |
| EnvironmentUtils.TEST_QUERY_CONTEXT, |
| tsfileResourcesForQuery); |
| } |
| |
| Assert.assertEquals(1, tsfileResourcesForQuery.size()); |
| List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath); |
| long time = 16; |
| for (ReadOnlyMemChunk memChunk : memChunks) { |
| IPointReader iterator = memChunk.getPointReader(); |
| while (iterator.hasNextTimeValuePair()) { |
| TimeValuePair timeValuePair = iterator.nextTimeValuePair(); |
| Assert.assertEquals(time++, timeValuePair.getTimestamp()); |
| } |
| } |
| } |
| |
| @Test |
| public void testSequenceSyncClose() |
| throws WriteProcessException, QueryProcessException, IllegalPathException { |
| for (int j = 1; j <= 10; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| QueryDataSource queryDataSource = |
| dataRegion.query( |
| Collections.singletonList(new PartialPath(deviceId, measurementId)), |
| deviceId, |
| context, |
| null, |
| null); |
| Assert.assertEquals(10, queryDataSource.getSeqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| } |
| |
| @Test |
| public void testIoTDBTabletWriteAndSyncClose() |
| throws QueryProcessException, IllegalPathException, WriteProcessException { |
| String[] measurements = new String[2]; |
| measurements[0] = "s0"; |
| measurements[1] = "s1"; |
| TSDataType[] dataTypes = new TSDataType[2]; |
| dataTypes[0] = TSDataType.INT32; |
| dataTypes[1] = TSDataType.INT64; |
| |
| MeasurementSchema[] measurementSchemas = new MeasurementSchema[2]; |
| measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN); |
| measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN); |
| |
| long[] times = new long[100]; |
| Object[] columns = new Object[2]; |
| columns[0] = new int[100]; |
| columns[1] = new long[100]; |
| |
| for (int r = 0; r < 100; r++) { |
| times[r] = r; |
| ((int[]) columns[0])[r] = 1; |
| ((long[]) columns[1])[r] = 1; |
| } |
| |
| InsertTabletNode insertTabletNode1 = |
| new InsertTabletNode( |
| new QueryId("test_write").genPlanNodeId(), |
| new PartialPath("root.vehicle.d0"), |
| false, |
| measurements, |
| dataTypes, |
| measurementSchemas, |
| times, |
| null, |
| columns, |
| times.length); |
| |
| dataRegion.insertTablet(insertTabletNode1); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| |
| for (int r = 50; r < 149; r++) { |
| times[r - 50] = r; |
| ((int[]) columns[0])[r - 50] = 1; |
| ((long[]) columns[1])[r - 50] = 1; |
| } |
| |
| InsertTabletNode insertTabletNode2 = |
| new InsertTabletNode( |
| new QueryId("test_write").genPlanNodeId(), |
| new PartialPath("root.vehicle.d0"), |
| false, |
| measurements, |
| dataTypes, |
| measurementSchemas, |
| times, |
| null, |
| columns, |
| times.length); |
| |
| dataRegion.insertTablet(insertTabletNode2); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| QueryDataSource queryDataSource = |
| dataRegion.query( |
| Collections.singletonList(new PartialPath(deviceId, measurementId)), |
| deviceId, |
| context, |
| null, |
| null); |
| |
| Assert.assertEquals(2, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(1, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| } |
| |
| @Test |
| public void testEmptyTabletWriteAndSyncClose() |
| throws QueryProcessException, IllegalPathException, WriteProcessException { |
| String[] measurements = new String[2]; |
| measurements[0] = "s0"; |
| measurements[1] = "s1"; |
| TSDataType[] dataTypes = new TSDataType[2]; |
| dataTypes[0] = TSDataType.INT32; |
| dataTypes[1] = TSDataType.INT64; |
| |
| MeasurementSchema[] measurementSchemas = new MeasurementSchema[2]; |
| measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN); |
| measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN); |
| |
| long[] times = new long[100]; |
| Object[] columns = new Object[2]; |
| columns[0] = new int[100]; |
| columns[1] = new long[100]; |
| BitMap[] bitMaps = new BitMap[2]; |
| bitMaps[0] = new BitMap(100); |
| bitMaps[1] = new BitMap(100); |
| |
| for (int r = 0; r < 100; r++) { |
| times[r] = r; |
| bitMaps[0].mark(r); |
| bitMaps[1].mark(r); |
| } |
| |
| InsertTabletNode insertTabletNode1 = |
| new InsertTabletNode( |
| new QueryId("test_write").genPlanNodeId(), |
| new PartialPath("root.vehicle.d0"), |
| false, |
| measurements, |
| dataTypes, |
| measurementSchemas, |
| times, |
| bitMaps, |
| columns, |
| times.length); |
| |
| dataRegion.insertTablet(insertTabletNode1); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| |
| for (int r = 50; r < 149; r++) { |
| times[r - 50] = r; |
| bitMaps[0].mark(r - 50); |
| bitMaps[1].mark(r - 50); |
| } |
| |
| InsertTabletNode insertTabletNode2 = |
| new InsertTabletNode( |
| new QueryId("test_write").genPlanNodeId(), |
| new PartialPath("root.vehicle.d0"), |
| false, |
| measurements, |
| dataTypes, |
| measurementSchemas, |
| times, |
| bitMaps, |
| columns, |
| times.length); |
| |
| dataRegion.insertTablet(insertTabletNode2); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| QueryDataSource queryDataSource = |
| dataRegion.query( |
| Collections.singletonList(new PartialPath(deviceId, measurementId)), |
| deviceId, |
| context, |
| null, |
| null); |
| |
| Assert.assertEquals(0, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| } |
| |
| @Test |
| public void testAllMeasurementsFailedTabletWriteAndSyncClose() |
| throws QueryProcessException, IllegalPathException, WriteProcessException { |
| String[] measurements = new String[2]; |
| measurements[0] = "s0"; |
| measurements[1] = "s1"; |
| TSDataType[] dataTypes = new TSDataType[2]; |
| dataTypes[0] = TSDataType.INT32; |
| dataTypes[1] = TSDataType.INT64; |
| |
| MeasurementSchema[] measurementSchemas = new MeasurementSchema[2]; |
| measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN); |
| measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN); |
| |
| long[] times = new long[100]; |
| Object[] columns = new Object[2]; |
| columns[0] = new int[100]; |
| columns[1] = new long[100]; |
| |
| for (int r = 0; r < 100; r++) { |
| times[r] = r; |
| ((int[]) columns[0])[r] = 1; |
| ((long[]) columns[1])[r] = 1; |
| } |
| |
| InsertTabletNode insertTabletNode1 = |
| new InsertTabletNode( |
| new QueryId("test_write").genPlanNodeId(), |
| new PartialPath("root.vehicle.d0"), |
| false, |
| measurements, |
| dataTypes, |
| measurementSchemas, |
| times, |
| null, |
| columns, |
| times.length); |
| insertTabletNode1.setFailedMeasurementNumber(2); |
| |
| dataRegion.insertTablet(insertTabletNode1); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| |
| for (int r = 50; r < 149; r++) { |
| times[r - 50] = r; |
| ((int[]) columns[0])[r - 50] = 1; |
| ((long[]) columns[1])[r - 50] = 1; |
| } |
| |
| InsertTabletNode insertTabletNode2 = |
| new InsertTabletNode( |
| new QueryId("test_write").genPlanNodeId(), |
| new PartialPath("root.vehicle.d0"), |
| false, |
| measurements, |
| dataTypes, |
| measurementSchemas, |
| times, |
| null, |
| columns, |
| times.length); |
| insertTabletNode2.setFailedMeasurementNumber(2); |
| |
| dataRegion.insertTablet(insertTabletNode2); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| QueryDataSource queryDataSource = |
| dataRegion.query( |
| Collections.singletonList(new PartialPath(deviceId, measurementId)), |
| deviceId, |
| context, |
| null, |
| null); |
| |
| Assert.assertEquals(0, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| } |
| |
| @Test |
| public void testSeqAndUnSeqSyncClose() |
| throws WriteProcessException, QueryProcessException, IllegalPathException { |
| for (int j = 21; j <= 30; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (int j = 10; j >= 1; j--) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| QueryDataSource queryDataSource = |
| dataRegion.query( |
| Collections.singletonList(new PartialPath(deviceId, measurementId)), |
| deviceId, |
| context, |
| null, |
| null); |
| Assert.assertEquals(10, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(10, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| for (TsFileResource resource : queryDataSource.getUnseqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| } |
| |
| @Test |
| public void testAllMeasurementsFailedRecordSeqAndUnSeqSyncClose() |
| throws WriteProcessException, QueryProcessException, IllegalPathException { |
| for (int j = 21; j <= 30; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record); |
| rowNode.setFailedMeasurementNumber(1); |
| dataRegion.insert(rowNode); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (int j = 10; j >= 1; j--) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record); |
| rowNode.setFailedMeasurementNumber(1); |
| dataRegion.insert(rowNode); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| QueryDataSource queryDataSource = |
| dataRegion.query( |
| Collections.singletonList(new PartialPath(deviceId, measurementId)), |
| deviceId, |
| context, |
| null, |
| null); |
| Assert.assertEquals(0, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| for (TsFileResource resource : queryDataSource.getUnseqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| } |
| |
| @Test |
| public void testDisableSeparateDataForInsertRowPlan() |
| throws WriteProcessException, QueryProcessException, IllegalPathException, IOException { |
| boolean defaultValue = config.isEnableSeparateData(); |
| config.setEnableSeparateData(false); |
| |
| for (int j = 21; j <= 30; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (int j = 10; j >= 1; j--) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) { |
| tsfileProcessor.syncFlush(); |
| } |
| |
| QueryDataSource queryDataSource = |
| dataRegion.query( |
| Collections.singletonList(new PartialPath(deviceId, measurementId)), |
| deviceId, |
| context, |
| null, |
| null); |
| Assert.assertEquals(0, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(20, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| for (TsFileResource resource : queryDataSource.getUnseqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| |
| config.setEnableSeparateData(defaultValue); |
| } |
| |
| @Test |
| public void testDisableSeparateDataForInsertTablet1() |
| throws QueryProcessException, IllegalPathException, IOException, WriteProcessException { |
| boolean defaultEnableDiscard = config.isEnableSeparateData(); |
| long defaultTimePartition = COMMON_CONFIG.getTimePartitionInterval(); |
| config.setEnableSeparateData(false); |
| COMMON_CONFIG.setTimePartitionInterval(100000); |
| |
| String[] measurements = new String[2]; |
| measurements[0] = "s0"; |
| measurements[1] = "s1"; |
| TSDataType[] dataTypes = new TSDataType[2]; |
| dataTypes[0] = TSDataType.INT32; |
| dataTypes[1] = TSDataType.INT64; |
| |
| MeasurementSchema[] measurementSchemas = new MeasurementSchema[2]; |
| measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN); |
| measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN); |
| |
| long[] times = new long[100]; |
| Object[] columns = new Object[2]; |
| columns[0] = new int[100]; |
| columns[1] = new long[100]; |
| |
| for (int r = 0; r < 100; r++) { |
| times[r] = r; |
| ((int[]) columns[0])[r] = 1; |
| ((long[]) columns[1])[r] = 1; |
| } |
| InsertTabletNode insertTabletNode1 = |
| new InsertTabletNode( |
| new QueryId("test_write").genPlanNodeId(), |
| new PartialPath("root.vehicle.d0"), |
| false, |
| measurements, |
| dataTypes, |
| measurementSchemas, |
| times, |
| null, |
| columns, |
| times.length); |
| |
| dataRegion.insertTablet(insertTabletNode1); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| |
| for (int r = 149; r >= 50; r--) { |
| times[r - 50] = r; |
| ((int[]) columns[0])[r - 50] = 1; |
| ((long[]) columns[1])[r - 50] = 1; |
| } |
| InsertTabletNode insertTabletNode2 = |
| new InsertTabletNode( |
| new QueryId("test_write").genPlanNodeId(), |
| new PartialPath("root.vehicle.d0"), |
| false, |
| measurements, |
| dataTypes, |
| measurementSchemas, |
| times, |
| null, |
| columns, |
| times.length); |
| |
| dataRegion.insertTablet(insertTabletNode2); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) { |
| tsfileProcessor.syncFlush(); |
| } |
| |
| QueryDataSource queryDataSource = |
| dataRegion.query( |
| Collections.singletonList(new PartialPath(deviceId, measurementId)), |
| deviceId, |
| context, |
| null, |
| null); |
| |
| Assert.assertEquals(0, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(2, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| |
| config.setEnableSeparateData(defaultEnableDiscard); |
| COMMON_CONFIG.setTimePartitionInterval(defaultTimePartition); |
| } |
| |
| @Test |
| public void testDisableSeparateDataForInsertTablet2() |
| throws QueryProcessException, IllegalPathException, IOException, WriteProcessException { |
| boolean defaultEnableDiscard = config.isEnableSeparateData(); |
| long defaultTimePartition = COMMON_CONFIG.getTimePartitionInterval(); |
| config.setEnableSeparateData(false); |
| COMMON_CONFIG.setTimePartitionInterval(1200000); |
| |
| String[] measurements = new String[2]; |
| measurements[0] = "s0"; |
| measurements[1] = "s1"; |
| TSDataType[] dataTypes = new TSDataType[2]; |
| dataTypes[0] = TSDataType.INT32; |
| dataTypes[1] = TSDataType.INT64; |
| |
| MeasurementSchema[] measurementSchemas = new MeasurementSchema[2]; |
| measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN); |
| measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN); |
| |
| long[] times = new long[1200]; |
| Object[] columns = new Object[2]; |
| columns[0] = new int[1200]; |
| columns[1] = new long[1200]; |
| |
| for (int r = 0; r < 1200; r++) { |
| times[r] = r; |
| ((int[]) columns[0])[r] = 1; |
| ((long[]) columns[1])[r] = 1; |
| } |
| InsertTabletNode insertTabletNode1 = |
| new InsertTabletNode( |
| new QueryId("test_write").genPlanNodeId(), |
| new PartialPath("root.vehicle.d0"), |
| false, |
| measurements, |
| dataTypes, |
| measurementSchemas, |
| times, |
| null, |
| columns, |
| times.length); |
| |
| dataRegion.insertTablet(insertTabletNode1); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| |
| for (int r = 1249; r >= 50; r--) { |
| times[r - 50] = r; |
| ((int[]) columns[0])[r - 50] = 1; |
| ((long[]) columns[1])[r - 50] = 1; |
| } |
| InsertTabletNode insertTabletNode2 = |
| new InsertTabletNode( |
| new QueryId("test_write").genPlanNodeId(), |
| new PartialPath("root.vehicle.d0"), |
| false, |
| measurements, |
| dataTypes, |
| measurementSchemas, |
| times, |
| null, |
| columns, |
| times.length); |
| |
| dataRegion.insertTablet(insertTabletNode2); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) { |
| tsfileProcessor.syncFlush(); |
| } |
| |
| QueryDataSource queryDataSource = |
| dataRegion.query( |
| Collections.singletonList(new PartialPath(deviceId, measurementId)), |
| deviceId, |
| context, |
| null, |
| null); |
| |
| Assert.assertEquals(0, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(2, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| |
| config.setEnableSeparateData(defaultEnableDiscard); |
| COMMON_CONFIG.setTimePartitionInterval(defaultTimePartition); |
| } |
| |
| @Test |
| public void testDisableSeparateDataForInsertTablet3() |
| throws QueryProcessException, IllegalPathException, IOException, WriteProcessException { |
| boolean defaultEnableDiscard = config.isEnableSeparateData(); |
| long defaultTimePartition = COMMON_CONFIG.getTimePartitionInterval(); |
| config.setEnableSeparateData(false); |
| COMMON_CONFIG.setTimePartitionInterval(1000000); |
| |
| String[] measurements = new String[2]; |
| measurements[0] = "s0"; |
| measurements[1] = "s1"; |
| TSDataType[] dataTypes = new TSDataType[2]; |
| dataTypes[0] = TSDataType.INT32; |
| dataTypes[1] = TSDataType.INT64; |
| |
| MeasurementSchema[] measurementSchemas = new MeasurementSchema[2]; |
| measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN); |
| measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN); |
| |
| long[] times = new long[1200]; |
| Object[] columns = new Object[2]; |
| columns[0] = new int[1200]; |
| columns[1] = new long[1200]; |
| |
| for (int r = 0; r < 1200; r++) { |
| times[r] = r; |
| ((int[]) columns[0])[r] = 1; |
| ((long[]) columns[1])[r] = 1; |
| } |
| InsertTabletNode insertTabletNode1 = |
| new InsertTabletNode( |
| new QueryId("test_write").genPlanNodeId(), |
| new PartialPath("root.vehicle.d0"), |
| false, |
| measurements, |
| dataTypes, |
| measurementSchemas, |
| times, |
| null, |
| columns, |
| times.length); |
| |
| dataRegion.insertTablet(insertTabletNode1); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| |
| for (int r = 1249; r >= 50; r--) { |
| times[r - 50] = r; |
| ((int[]) columns[0])[r - 50] = 1; |
| ((long[]) columns[1])[r - 50] = 1; |
| } |
| InsertTabletNode insertTabletNode2 = |
| new InsertTabletNode( |
| new QueryId("test_write").genPlanNodeId(), |
| new PartialPath("root.vehicle.d0"), |
| false, |
| measurements, |
| dataTypes, |
| measurementSchemas, |
| times, |
| null, |
| columns, |
| times.length); |
| |
| dataRegion.insertTablet(insertTabletNode2); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) { |
| tsfileProcessor.syncFlush(); |
| } |
| |
| QueryDataSource queryDataSource = |
| dataRegion.query( |
| Collections.singletonList(new PartialPath(deviceId, measurementId)), |
| deviceId, |
| context, |
| null, |
| null); |
| |
| Assert.assertEquals(0, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(2, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| |
| config.setEnableSeparateData(defaultEnableDiscard); |
| COMMON_CONFIG.setTimePartitionInterval(defaultTimePartition); |
| } |
| |
| @Test |
| public void testInsertUnSequenceRows() |
| throws IllegalPathException, WriteProcessRejectException, QueryProcessException, |
| DataRegionException { |
| int defaultAvgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold(); |
| config.setAvgSeriesPointNumberThreshold(2); |
| DataRegion dataRegion1 = new DummyDataRegion(systemDir, "root.Rows"); |
| long[] time = new long[] {3, 4, 1, 2}; |
| List<Integer> indexList = new ArrayList<>(); |
| List<InsertRowNode> nodes = new ArrayList<>(); |
| for (int i = 0; i < 4; i++) { |
| TSRecord record = new TSRecord(time[i], "root.Rows"); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(i))); |
| nodes.add(buildInsertRowNodeByTSRecord(record)); |
| indexList.add(i); |
| } |
| InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""), indexList, nodes); |
| dataRegion1.insert(insertRowsNode); |
| dataRegion1.syncCloseAllWorkingTsFileProcessors(); |
| QueryDataSource queryDataSource = |
| dataRegion1.query( |
| Collections.singletonList(new PartialPath("root.Rows", measurementId)), |
| "root.Rows", |
| context, |
| null, |
| null); |
| Assert.assertEquals(1, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| dataRegion1.syncDeleteDataFiles(); |
| config.setAvgSeriesPointNumberThreshold(defaultAvgSeriesPointNumberThreshold); |
| } |
| |
| @Test |
| public void testSmallReportProportionInsertRow() |
| throws WriteProcessException, QueryProcessException, IllegalPathException, IOException, |
| DataRegionException { |
| double defaultValue = config.getWriteMemoryVariationReportProportion(); |
| config.setWriteMemoryVariationReportProportion(0); |
| DataRegion dataRegion1 = new DummyDataRegion(systemDir, "root.ln22"); |
| |
| for (int j = 21; j <= 30; j++) { |
| TSRecord record = new TSRecord(j, "root.ln22"); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion1.insert(buildInsertRowNodeByTSRecord(record)); |
| dataRegion1.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| dataRegion1.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (TsFileProcessor tsfileProcessor : dataRegion1.getWorkUnsequenceTsFileProcessors()) { |
| tsfileProcessor.syncFlush(); |
| } |
| |
| QueryDataSource queryDataSource = |
| dataRegion1.query( |
| Collections.singletonList(new PartialPath("root.ln22", measurementId)), |
| "root.ln22", |
| context, |
| null, |
| null); |
| Assert.assertEquals(10, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| for (TsFileResource resource : queryDataSource.getUnseqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| |
| dataRegion1.syncDeleteDataFiles(); |
| config.setWriteMemoryVariationReportProportion(defaultValue); |
| } |
| |
| @Test |
| public void testMerge() |
| throws WriteProcessException, QueryProcessException, IllegalPathException { |
| int originCandidateFileNum = |
| IoTDBDescriptor.getInstance().getConfig().getFileLimitPerInnerTask(); |
| IoTDBDescriptor.getInstance().getConfig().setFileLimitPerInnerTask(9); |
| boolean originEnableSeqSpaceCompaction = |
| IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction(); |
| boolean originEnableUnseqSpaceCompaction = |
| IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction(); |
| boolean originEnableCrossSpaceCompaction = |
| IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction(); |
| IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true); |
| IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true); |
| IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false); |
| long finishedCompactionTaskNumWhenTestStart = |
| CompactionTaskManager.getInstance().getFinishedTaskNum(); |
| for (int j = 21; j <= 30; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (int j = 10; j >= 1; j--) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| dataRegion.compact(); |
| long totalWaitingTime = 0; |
| do { |
| // wait |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| totalWaitingTime += 100; |
| if (totalWaitingTime % 1000 == 0) { |
| logger.warn("has waited for {} seconds", totalWaitingTime / 1000); |
| } |
| if (totalWaitingTime > 120_000) { |
| Assert.fail(); |
| break; |
| } |
| } while (CompactionTaskManager.getInstance().getFinishedTaskNum() |
| <= finishedCompactionTaskNumWhenTestStart + 1); |
| |
| QueryDataSource queryDataSource = |
| dataRegion.query( |
| Collections.singletonList(new PartialPath(deviceId, measurementId)), |
| deviceId, |
| context, |
| null, |
| null); |
| Assert.assertEquals(2, queryDataSource.getSeqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| for (TsFileResource resource : queryDataSource.getUnseqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| IoTDBDescriptor.getInstance().getConfig().setFileLimitPerInnerTask(originCandidateFileNum); |
| IoTDBDescriptor.getInstance() |
| .getConfig() |
| .setEnableSeqSpaceCompaction(originEnableSeqSpaceCompaction); |
| IoTDBDescriptor.getInstance() |
| .getConfig() |
| .setEnableCrossSpaceCompaction(originEnableCrossSpaceCompaction); |
| IoTDBDescriptor.getInstance() |
| .getConfig() |
| .setEnableUnseqSpaceCompaction(originEnableUnseqSpaceCompaction); |
| } |
| |
| @Ignore |
| @Test |
| public void testDeleteStorageGroupWhenCompacting() throws Exception { |
| IoTDBDescriptor.getInstance().getConfig().setFileLimitPerInnerTask(10); |
| try { |
| for (int j = 0; j < 10; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| dataRegion.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| ICompactionPerformer performer = new FastCompactionPerformer(false); |
| performer.setSourceFiles(dataRegion.getSequenceFileList()); |
| InnerSpaceCompactionTask task = |
| new InnerSpaceCompactionTask( |
| 0, |
| dataRegion.getTsFileManager(), |
| dataRegion.getSequenceFileList(), |
| true, |
| performer, |
| 0); |
| CompactionTaskManager.getInstance().addTaskToWaitingQueue(task); |
| Thread.sleep(20); |
| List<DataRegion> dataRegions = StorageEngine.getInstance().getAllDataRegions(); |
| List<DataRegion> regionsToBeDeleted = new ArrayList<>(); |
| for (DataRegion region : dataRegions) { |
| if (region.getDatabaseName().equals(storageGroup)) { |
| regionsToBeDeleted.add(region); |
| } |
| } |
| for (DataRegion region : regionsToBeDeleted) { |
| StorageEngine.getInstance() |
| .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId()))); |
| } |
| Thread.sleep(500); |
| |
| for (TsFileResource resource : dataRegion.getSequenceFileList()) { |
| Assert.assertFalse(resource.getTsFile().exists()); |
| } |
| TsFileResource targetTsFileResource = |
| TsFileNameGenerator.getInnerCompactionTargetFileResource( |
| dataRegion.getSequenceFileList(), true); |
| Assert.assertFalse(targetTsFileResource.getTsFile().exists()); |
| String dataDirectory = targetTsFileResource.getTsFile().getParent(); |
| File logFile = |
| new File( |
| dataDirectory |
| + File.separator |
| + targetTsFileResource.getTsFile().getName() |
| + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX); |
| Assert.assertFalse(logFile.exists()); |
| Assert.assertFalse(CommonDescriptor.getInstance().getConfig().isReadOnly()); |
| Assert.assertTrue(dataRegion.getTsFileManager().isAllowCompaction()); |
| } finally { |
| new CompactionConfigRestorer().restoreCompactionConfig(); |
| } |
| } |
| |
| @Test |
| public void testTimedFlushSeqMemTable() |
| throws IllegalPathException, InterruptedException, WriteProcessException, ShutdownException { |
| // create one sequence memtable |
| TSRecord record = new TSRecord(10000, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber()); |
| |
| // change config & reboot timed service |
| boolean prevEnableTimedFlushSeqMemtable = config.isEnableTimedFlushSeqMemtable(); |
| long preFLushInterval = config.getSeqMemtableFlushInterval(); |
| config.setEnableTimedFlushSeqMemtable(true); |
| config.setSeqMemtableFlushInterval(5); |
| StorageEngine.getInstance().rebootTimedService(); |
| |
| Thread.sleep(500); |
| |
| Assert.assertEquals(1, dataRegion.getWorkSequenceTsFileProcessors().size()); |
| TsFileProcessor tsFileProcessor = |
| dataRegion.getWorkSequenceTsFileProcessors().iterator().next(); |
| FlushManager flushManager = FlushManager.getInstance(); |
| |
| // flush the sequence memtable |
| tsFileProcessor.getWorkMemTable().getUpdateTime(); |
| Thread.sleep(500); |
| dataRegion.timedFlushSeqMemTable(); |
| |
| // wait until memtable flush task is done |
| int waitCnt = 0; |
| while (tsFileProcessor.getFlushingMemTableSize() != 0 |
| || tsFileProcessor.isManagedByFlushManager() |
| || flushManager.getNumberOfPendingTasks() != 0 |
| || flushManager.getNumberOfPendingSubTasks() != 0 |
| || flushManager.getNumberOfWorkingTasks() != 0 |
| || flushManager.getNumberOfWorkingSubTasks() != 0) { |
| Thread.sleep(500); |
| ++waitCnt; |
| if (waitCnt % 10 == 0) { |
| logger.info("already wait {} s", waitCnt / 2); |
| } |
| } |
| |
| Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber()); |
| |
| config.setEnableTimedFlushSeqMemtable(prevEnableTimedFlushSeqMemtable); |
| config.setSeqMemtableFlushInterval(preFLushInterval); |
| } |
| |
| @Test |
| public void testTimedFlushUnseqMemTable() |
| throws IllegalPathException, InterruptedException, WriteProcessException, ShutdownException { |
| // create one sequence memtable & close |
| TSRecord record = new TSRecord(10000, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber()); |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber()); |
| |
| // create one unsequence memtable |
| record = new TSRecord(1, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber()); |
| |
| // change config & reboot timed service |
| boolean prevEnableTimedFlushUnseqMemtable = config.isEnableTimedFlushUnseqMemtable(); |
| long preFLushInterval = config.getUnseqMemtableFlushInterval(); |
| config.setEnableTimedFlushUnseqMemtable(true); |
| config.setUnseqMemtableFlushInterval(5); |
| StorageEngine.getInstance().rebootTimedService(); |
| |
| Thread.sleep(500); |
| |
| Assert.assertEquals(1, dataRegion.getWorkUnsequenceTsFileProcessors().size()); |
| TsFileProcessor tsFileProcessor = |
| dataRegion.getWorkUnsequenceTsFileProcessors().iterator().next(); |
| FlushManager flushManager = FlushManager.getInstance(); |
| |
| // flush the unsequence memtable |
| tsFileProcessor.getWorkMemTable().getUpdateTime(); |
| Thread.sleep(500); |
| dataRegion.timedFlushUnseqMemTable(); |
| |
| // wait until memtable flush task is done |
| int waitCnt = 0; |
| while (tsFileProcessor.getFlushingMemTableSize() != 0 |
| || tsFileProcessor.isManagedByFlushManager() |
| || flushManager.getNumberOfPendingTasks() != 0 |
| || flushManager.getNumberOfPendingSubTasks() != 0 |
| || flushManager.getNumberOfWorkingTasks() != 0 |
| || flushManager.getNumberOfWorkingSubTasks() != 0) { |
| Thread.sleep(500); |
| ++waitCnt; |
| if (waitCnt % 10 == 0) { |
| logger.info("already wait {} s", waitCnt / 2); |
| } |
| } |
| |
| Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber()); |
| |
| config.setEnableTimedFlushUnseqMemtable(prevEnableTimedFlushUnseqMemtable); |
| config.setUnseqMemtableFlushInterval(preFLushInterval); |
| } |
| |
| /** |
| * Totally 5 tsfiles<br> |
| * file 0, file 2 and file 4 has d0 ~ d1, time range is 0 ~ 99, 200 ~ 299, 400 ~ 499<br> |
| * file 1, file 3 has d0 ~ d2, time range is 100 ~ 199, 300 ~ 399<br> |
| * delete d2 in time range 50 ~ 150 and 150 ~ 450. Therefore, only file 1 and file 3 has mods. |
| */ |
| @Test |
| public void testDeleteDataNotInFile() |
| throws IllegalPathException, WriteProcessException, InterruptedException, IOException { |
| for (int i = 0; i < 5; i++) { |
| if (i % 2 == 0) { |
| for (int d = 0; d < 2; d++) { |
| for (int count = i * 100; count < i * 100 + 100; count++) { |
| TSRecord record = new TSRecord(count, "root.vehicle.d" + d); |
| record.addTuple( |
| DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(count))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| } |
| } else { |
| for (int d = 0; d < 3; d++) { |
| for (int count = i * 100; count < i * 100 + 100; count++) { |
| TSRecord record = new TSRecord(count, "root.vehicle.d" + d); |
| record.addTuple( |
| DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(count))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| } |
| } |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| } |
| |
| // delete root.vehicle.d2.s0 data in the second file |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d2.s0"), 50, 150, 0); |
| |
| // delete root.vehicle.d2.s0 data in the third file |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d2.s0"), 150, 450, 0); |
| |
| for (int i = 0; i < dataRegion.getSequenceFileList().size(); i++) { |
| TsFileResource resource = dataRegion.getSequenceFileList().get(i); |
| if (i == 1) { |
| Assert.assertTrue(resource.getModFile().exists()); |
| Assert.assertEquals(2, resource.getModFile().getModifications().size()); |
| } else if (i == 3) { |
| Assert.assertTrue(resource.getModFile().exists()); |
| Assert.assertEquals(1, resource.getModFile().getModifications().size()); |
| } else { |
| Assert.assertFalse(resource.getModFile().exists()); |
| } |
| } |
| |
| List<DataRegion> dataRegions = StorageEngine.getInstance().getAllDataRegions(); |
| List<DataRegion> regionsToBeDeleted = new ArrayList<>(); |
| for (DataRegion region : dataRegions) { |
| if (region.getDatabaseName().equals(storageGroup)) { |
| regionsToBeDeleted.add(region); |
| } |
| } |
| for (DataRegion region : regionsToBeDeleted) { |
| StorageEngine.getInstance() |
| .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId()))); |
| } |
| Thread.sleep(500); |
| |
| for (TsFileResource resource : dataRegion.getSequenceFileList()) { |
| Assert.assertFalse(resource.getTsFile().exists()); |
| } |
| } |
| |
| @Test |
| public void testDeleteDataNotInFlushingMemtable() |
| throws IllegalPathException, WriteProcessException, IOException { |
| for (int j = 0; j < 100; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| TsFileResource tsFileResource = dataRegion.getTsFileManager().getTsFileList(true).get(0); |
| TsFileProcessor tsFileProcessor = tsFileResource.getProcessor(); |
| tsFileProcessor.getFlushingMemTable().addLast(tsFileProcessor.getWorkMemTable()); |
| |
| // delete data which is in memtable |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d2.s0"), 50, 70, 0); |
| |
| // delete data which is not in memtable |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0); |
| |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| Assert.assertFalse(tsFileResource.getModFile().exists()); |
| } |
| |
| @Test |
| public void testDeleteDataInSeqFlushingMemtable() |
| throws IllegalPathException, WriteProcessException, IOException { |
| for (int j = 100; j < 200; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| TsFileResource tsFileResource = dataRegion.getTsFileManager().getTsFileList(true).get(0); |
| TsFileProcessor tsFileProcessor = tsFileResource.getProcessor(); |
| tsFileProcessor.getFlushingMemTable().addLast(tsFileProcessor.getWorkMemTable()); |
| |
| // delete data which is not in flushing memtable |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 99, 0); |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0); |
| |
| // delete data which is in flushing memtable |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 100, 0); |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 150, 0); |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 190, 0); |
| |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| Assert.assertTrue(tsFileResource.getModFile().exists()); |
| Assert.assertEquals(3, tsFileResource.getModFile().getModifications().size()); |
| } |
| |
| @Test |
| public void testDeleteDataInUnSeqFlushingMemtable() |
| throws IllegalPathException, WriteProcessException, IOException { |
| for (int j = 100; j < 200; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| TsFileResource tsFileResource = dataRegion.getTsFileManager().getTsFileList(true).get(0); |
| |
| // delete data which is not in work memtable |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 99, 0); |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0); |
| |
| // delete data which is in work memtable |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 100, 0); |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 150, 0); |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 190, 0); |
| |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| Assert.assertFalse(tsFileResource.getModFile().exists()); |
| |
| // insert unseq data points |
| for (int j = 50; j < 100; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| // delete data which is not in work memtable |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 200, 299, 0); |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0); |
| |
| // delete data which is in work memtable |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 80, 85, 0); |
| |
| Assert.assertFalse(tsFileResource.getModFile().exists()); |
| |
| tsFileResource = dataRegion.getTsFileManager().getTsFileList(false).get(0); |
| TsFileProcessor tsFileProcessor = tsFileResource.getProcessor(); |
| tsFileProcessor.getFlushingMemTable().addLast(tsFileProcessor.getWorkMemTable()); |
| |
| // delete data which is not in flushing memtable |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 0, 49, 0); |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 200, 0); |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0); |
| |
| // delete data which is in flushing memtable |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 25, 50, 0); |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 80, 0); |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 99, 150, 0); |
| |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| Assert.assertTrue(tsFileResource.getModFile().exists()); |
| Assert.assertEquals(3, tsFileResource.getModFile().getModifications().size()); |
| } |
| |
| @Test |
| public void testDeleteDataInSeqWorkingMemtable() |
| throws IllegalPathException, WriteProcessException, IOException { |
| for (int j = 100; j < 200; j++) { |
| TSRecord record = new TSRecord(j, "root.vehicle.d0"); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| for (int j = 100; j < 200; j++) { |
| TSRecord record = new TSRecord(j, "root.vehicle.d199"); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| TsFileResource tsFileResource = dataRegion.getTsFileManager().getTsFileList(true).get(0); |
| |
| // delete data which is not in working memtable |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 99, 0); |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0); |
| |
| // delete data which is in working memtable |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d199.*"), 50, 500, 0); |
| |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| Assert.assertFalse(tsFileResource.getModFile().exists()); |
| Assert.assertFalse( |
| tsFileResource.getDevices().contains(new PlainDeviceID("root.vehicle.d199"))); |
| } |
| |
| @Test |
| public void testFlushingEmptyMemtable() |
| throws IllegalPathException, WriteProcessException, IOException { |
| for (int j = 100; j < 200; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| TsFileResource tsFileResource = dataRegion.getTsFileManager().getTsFileList(true).get(0); |
| |
| // delete all data which is in flushing memtable |
| dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 200, 0); |
| |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| Assert.assertFalse(tsFileResource.getTsFile().exists()); |
| Assert.assertFalse(tsFileResource.getModFile().exists()); |
| Assert.assertFalse(dataRegion.getTsFileManager().contains(tsFileResource, true)); |
| Assert.assertFalse( |
| dataRegion.getWorkSequenceTsFileProcessors().contains(tsFileResource.getProcessor())); |
| } |
| |
| public static class DummyDataRegion extends DataRegion { |
| |
| public DummyDataRegion(String systemInfoDir, String storageGroupName) |
| throws DataRegionException { |
| super(systemInfoDir, "0", new TsFileFlushPolicy.DirectFlushPolicy(), storageGroupName); |
| } |
| } |
| |
| // -- test for deleting data directly |
| // -- delete data and file only when: |
| // 1. tsfile is closed |
| // 2. tsfile is not compating |
| // 3. tsfile's start time and end time must be a subinterval |
| // of the given time range. |
| |
| @Test |
| public void testDeleteDataDirectlySeqWriteModsOrDeleteFiles() |
| throws IllegalPathException, WriteProcessException, IOException { |
| for (int j = 100; j < 200; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| |
| TsFileResource tsFileResource = dataRegion.getTsFileManager().getTsFileList(true).get(0); |
| // delete data in work mem, no mods. |
| dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 50, 100, 0); |
| Assert.assertTrue(tsFileResource.getTsFile().exists()); |
| Assert.assertFalse(tsFileResource.getModFile().exists()); |
| |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| // delete data in closed file, but time not match |
| dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 100, 120, 0); |
| Assert.assertTrue(tsFileResource.getTsFile().exists()); |
| Assert.assertTrue(tsFileResource.getModFile().exists()); |
| |
| // delete data in closed file, and time all match |
| dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 100, 199, 0); |
| Assert.assertFalse(tsFileResource.getTsFile().exists()); |
| Assert.assertFalse(tsFileResource.getModFile().exists()); |
| } |
| |
| @Test |
| public void testDeleteDataDirectlyUnseqWriteModsOrDeleteFiles() |
| throws IllegalPathException, WriteProcessException, IOException { |
| for (int j = 100; j < 200; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| TsFileResource tsFileResourceSeq = dataRegion.getTsFileManager().getTsFileList(true).get(0); |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| for (int j = 30; j < 100; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| dataRegion.insert(buildInsertRowNodeByTSRecord(record)); |
| } |
| |
| for (TsFileProcessor processor : dataRegion.getWorkSequenceTsFileProcessors()) { |
| processor.syncFlush(); |
| } |
| TsFileResource tsFileResourceUnSeq = dataRegion.getTsFileManager().getTsFileList(false).get(0); |
| |
| Assert.assertTrue(tsFileResourceSeq.getTsFile().exists()); |
| Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists()); |
| |
| // already closed, will have a mods file. |
| dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 40, 60, 0); |
| // not close yet, just delete in memory. |
| dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 140, 160, 0); |
| |
| // delete data in mem table, there is no mods |
| Assert.assertTrue(tsFileResourceSeq.getTsFile().exists()); |
| Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists()); |
| Assert.assertTrue(tsFileResourceSeq.getModFile().exists()); |
| Assert.assertFalse(tsFileResourceUnSeq.getModFile().exists()); |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| |
| dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 40, 80, 0); |
| Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists()); |
| Assert.assertTrue(tsFileResourceUnSeq.getModFile().exists()); |
| |
| // seq file and unseq file have data file and mod file now, |
| // this deletion will remove data file and mod file. |
| dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 30, 100, 0); |
| dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 100, 199, 0); |
| |
| Assert.assertFalse(tsFileResourceSeq.getTsFile().exists()); |
| Assert.assertFalse(tsFileResourceUnSeq.getTsFile().exists()); |
| Assert.assertFalse(tsFileResourceSeq.getModFile().exists()); |
| Assert.assertFalse(tsFileResourceUnSeq.getModFile().exists()); |
| } |
| } |