| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.engine.storagegroup; |
| |
| import org.apache.iotdb.db.conf.IoTDBConfig; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.constant.TestConstant; |
| import org.apache.iotdb.db.engine.MetadataManagerHelper; |
| import org.apache.iotdb.db.engine.compaction.CompactionStrategy; |
| import org.apache.iotdb.db.engine.flush.FlushManager; |
| import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy; |
| import org.apache.iotdb.db.engine.merge.manage.MergeManager; |
| import org.apache.iotdb.db.engine.querycontext.QueryDataSource; |
| import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; |
| import org.apache.iotdb.db.exception.StorageGroupProcessorException; |
| import org.apache.iotdb.db.exception.TriggerExecutionException; |
| import org.apache.iotdb.db.exception.WriteProcessException; |
| import org.apache.iotdb.db.exception.metadata.IllegalPathException; |
| import org.apache.iotdb.db.exception.metadata.MetadataException; |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.metadata.PartialPath; |
| import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; |
| import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; |
| import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; |
| import org.apache.iotdb.db.query.context.QueryContext; |
| import org.apache.iotdb.db.rescon.MemTableManager; |
| import org.apache.iotdb.db.utils.EnvironmentUtils; |
| import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.iotdb.tsfile.read.TimeValuePair; |
| import org.apache.iotdb.tsfile.read.reader.IPointReader; |
| import org.apache.iotdb.tsfile.write.record.TSRecord; |
| import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; |
| import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; |
| |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| |
| public class StorageGroupProcessorTest { |
| private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); |
| private static Logger logger = LoggerFactory.getLogger(StorageGroupProcessorTest.class); |
| |
| private String storageGroup = "root.vehicle.d0"; |
| private String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info"); |
| private String deviceId = "root.vehicle.d0"; |
| private String measurementId = "s0"; |
| private StorageGroupProcessor processor; |
| private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT; |
| |
| private boolean prevEnableTimedFlushMemtable = false; |
| |
| @Before |
| public void setUp() throws Exception { |
| prevEnableTimedFlushMemtable = config.isEnableTimedFlushUnseqMemtable(); |
| config.setEnableTimedFlushUnseqMemtable(true); |
| config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION); |
| MetadataManagerHelper.initMetadata(); |
| EnvironmentUtils.envSetUp(); |
| processor = new DummySGP(systemDir, storageGroup); |
| MergeManager.getINSTANCE().start(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| processor.syncDeleteDataFiles(); |
| EnvironmentUtils.cleanEnv(); |
| EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR); |
| MergeManager.getINSTANCE().stop(); |
| EnvironmentUtils.cleanEnv(); |
| config.setEnableTimedFlushUnseqMemtable(prevEnableTimedFlushMemtable); |
| config.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); |
| } |
| |
| private void insertToStorageGroupProcessor(TSRecord record) |
| throws WriteProcessException, IllegalPathException, TriggerExecutionException { |
| InsertRowPlan insertRowPlan = new InsertRowPlan(record); |
| processor.insert(insertRowPlan); |
| } |
| |
| @Test |
| public void testUnseqUnsealedDelete() |
| throws WriteProcessException, IOException, MetadataException, TriggerExecutionException { |
| TSRecord record = new TSRecord(10000, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000))); |
| processor.insert(new InsertRowPlan(record)); |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (int j = 1; j <= 10; j++) { |
| record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| processor.insert(new InsertRowPlan(record)); |
| } |
| |
| for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) { |
| tsfileProcessor.syncFlush(); |
| } |
| |
| for (int j = 11; j <= 20; j++) { |
| record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| processor.insert(new InsertRowPlan(record)); |
| } |
| |
| processor.delete(new PartialPath(deviceId, measurementId), 0, 15L, -1); |
| |
| List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); |
| for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) { |
| tsfileProcessor.query( |
| deviceId, |
| measurementId, |
| new MeasurementSchema( |
| measurementId, |
| TSDataType.INT32, |
| TSEncoding.RLE, |
| CompressionType.UNCOMPRESSED, |
| Collections.emptyMap()), |
| new QueryContext(), |
| tsfileResourcesForQuery); |
| } |
| |
| Assert.assertEquals(1, tsfileResourcesForQuery.size()); |
| Assert.assertEquals(0, tsfileResourcesForQuery.get(0).getChunkMetadataList().size()); |
| List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(); |
| long time = 16; |
| for (ReadOnlyMemChunk memChunk : memChunks) { |
| IPointReader iterator = memChunk.getPointReader(); |
| while (iterator.hasNextTimeValuePair()) { |
| TimeValuePair timeValuePair = iterator.nextTimeValuePair(); |
| Assert.assertEquals(time++, timeValuePair.getTimestamp()); |
| } |
| } |
| } |
| |
| @Test |
| public void testSequenceSyncClose() |
| throws WriteProcessException, QueryProcessException, IllegalPathException, |
| TriggerExecutionException { |
| for (int j = 1; j <= 10; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| processor.insert(new InsertRowPlan(record)); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| QueryDataSource queryDataSource = |
| processor.query(new PartialPath(deviceId, measurementId), context, null, null); |
| Assert.assertEquals(10, queryDataSource.getSeqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| } |
| |
| @Test |
| public void testInsertDataAndRemovePartitionAndInsert() |
| throws WriteProcessException, QueryProcessException, IllegalPathException, |
| TriggerExecutionException { |
| for (int j = 0; j < 10; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| processor.insert(new InsertRowPlan(record)); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| |
| processor.removePartitions((storageGroupName, timePartitionId) -> true); |
| |
| for (int j = 0; j < 10; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| processor.insert(new InsertRowPlan(record)); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| |
| QueryDataSource queryDataSource = |
| processor.query(new PartialPath(deviceId, measurementId), context, null, null); |
| Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); |
| } |
| |
| @Test |
| public void testIoTDBTabletWriteAndSyncClose() |
| throws QueryProcessException, IllegalPathException, TriggerExecutionException { |
| String[] measurements = new String[2]; |
| measurements[0] = "s0"; |
| measurements[1] = "s1"; |
| List<Integer> dataTypes = new ArrayList<>(); |
| dataTypes.add(TSDataType.INT32.ordinal()); |
| dataTypes.add(TSDataType.INT64.ordinal()); |
| |
| IMeasurementMNode[] measurementMNodes = new IMeasurementMNode[2]; |
| measurementMNodes[0] = |
| new MeasurementMNode( |
| null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null); |
| measurementMNodes[1] = |
| new MeasurementMNode( |
| null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null); |
| |
| InsertTabletPlan insertTabletPlan1 = |
| new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes); |
| insertTabletPlan1.setMeasurementMNodes(measurementMNodes); |
| |
| long[] times = new long[100]; |
| Object[] columns = new Object[2]; |
| columns[0] = new int[100]; |
| columns[1] = new long[100]; |
| |
| for (int r = 0; r < 100; r++) { |
| times[r] = r; |
| ((int[]) columns[0])[r] = 1; |
| ((long[]) columns[1])[r] = 1; |
| } |
| insertTabletPlan1.setTimes(times); |
| insertTabletPlan1.setColumns(columns); |
| insertTabletPlan1.setRowCount(times.length); |
| |
| processor.insertTablet(insertTabletPlan1); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| |
| InsertTabletPlan insertTabletPlan2 = |
| new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes); |
| insertTabletPlan2.setMeasurementMNodes(measurementMNodes); |
| |
| for (int r = 50; r < 149; r++) { |
| times[r - 50] = r; |
| ((int[]) columns[0])[r - 50] = 1; |
| ((long[]) columns[1])[r - 50] = 1; |
| } |
| insertTabletPlan2.setTimes(times); |
| insertTabletPlan2.setColumns(columns); |
| insertTabletPlan2.setRowCount(times.length); |
| |
| processor.insertTablet(insertTabletPlan2); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| |
| QueryDataSource queryDataSource = |
| processor.query(new PartialPath(deviceId, measurementId), context, null, null); |
| |
| Assert.assertEquals(2, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(1, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| } |
| |
| @Test |
| public void testSeqAndUnSeqSyncClose() |
| throws WriteProcessException, QueryProcessException, IllegalPathException, |
| TriggerExecutionException { |
| for (int j = 21; j <= 30; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| processor.insert(new InsertRowPlan(record)); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (int j = 10; j >= 1; j--) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| processor.insert(new InsertRowPlan(record)); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| |
| QueryDataSource queryDataSource = |
| processor.query(new PartialPath(deviceId, measurementId), context, null, null); |
| Assert.assertEquals(10, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(10, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| for (TsFileResource resource : queryDataSource.getUnseqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| } |
| |
| @Test |
| public void testEnableDiscardOutOfOrderDataForInsertRowPlan() |
| throws WriteProcessException, QueryProcessException, IllegalPathException, IOException, |
| TriggerExecutionException { |
| boolean defaultValue = config.isEnableDiscardOutOfOrderData(); |
| config.setEnableDiscardOutOfOrderData(true); |
| |
| for (int j = 21; j <= 30; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| insertToStorageGroupProcessor(record); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (int j = 10; j >= 1; j--) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| insertToStorageGroupProcessor(record); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) { |
| tsfileProcessor.syncFlush(); |
| } |
| |
| QueryDataSource queryDataSource = |
| processor.query(new PartialPath(deviceId, measurementId), context, null, null); |
| Assert.assertEquals(10, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| for (TsFileResource resource : queryDataSource.getUnseqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| |
| config.setEnableDiscardOutOfOrderData(defaultValue); |
| } |
| |
| @Test |
| public void testEnableDiscardOutOfOrderDataForInsertTablet1() |
| throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException { |
| boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData(); |
| long defaultTimePartition = config.getPartitionInterval(); |
| boolean defaultEnablePartition = config.isEnablePartition(); |
| config.setEnableDiscardOutOfOrderData(true); |
| config.setEnablePartition(true); |
| config.setPartitionInterval(100); |
| |
| String[] measurements = new String[2]; |
| measurements[0] = "s0"; |
| measurements[1] = "s1"; |
| List<Integer> dataTypes = new ArrayList<>(); |
| dataTypes.add(TSDataType.INT32.ordinal()); |
| dataTypes.add(TSDataType.INT64.ordinal()); |
| |
| IMeasurementMNode[] measurementMNodes = new IMeasurementMNode[2]; |
| measurementMNodes[0] = |
| new MeasurementMNode( |
| null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null); |
| measurementMNodes[1] = |
| new MeasurementMNode( |
| null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null); |
| |
| InsertTabletPlan insertTabletPlan1 = |
| new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes); |
| |
| long[] times = new long[100]; |
| Object[] columns = new Object[2]; |
| columns[0] = new int[100]; |
| columns[1] = new long[100]; |
| |
| for (int r = 0; r < 100; r++) { |
| times[r] = r; |
| ((int[]) columns[0])[r] = 1; |
| ((long[]) columns[1])[r] = 1; |
| } |
| insertTabletPlan1.setTimes(times); |
| insertTabletPlan1.setColumns(columns); |
| insertTabletPlan1.setRowCount(times.length); |
| insertTabletPlan1.setMeasurementMNodes(measurementMNodes); |
| |
| processor.insertTablet(insertTabletPlan1); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| |
| InsertTabletPlan insertTabletPlan2 = |
| new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes); |
| |
| for (int r = 149; r >= 50; r--) { |
| times[r - 50] = r; |
| ((int[]) columns[0])[r - 50] = 1; |
| ((long[]) columns[1])[r - 50] = 1; |
| } |
| insertTabletPlan2.setTimes(times); |
| insertTabletPlan2.setColumns(columns); |
| insertTabletPlan2.setRowCount(times.length); |
| insertTabletPlan2.setMeasurementMNodes(measurementMNodes); |
| |
| processor.insertTablet(insertTabletPlan2); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) { |
| tsfileProcessor.syncFlush(); |
| } |
| |
| QueryDataSource queryDataSource = |
| processor.query(new PartialPath(deviceId, measurementId), context, null, null); |
| |
| Assert.assertEquals(2, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| |
| config.setEnableDiscardOutOfOrderData(defaultEnableDiscard); |
| config.setPartitionInterval(defaultTimePartition); |
| config.setEnablePartition(defaultEnablePartition); |
| } |
| |
| @Test |
| public void testEnableDiscardOutOfOrderDataForInsertTablet2() |
| throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException { |
| boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData(); |
| long defaultTimePartition = config.getPartitionInterval(); |
| boolean defaultEnablePartition = config.isEnablePartition(); |
| config.setEnableDiscardOutOfOrderData(true); |
| config.setEnablePartition(true); |
| config.setPartitionInterval(1200); |
| |
| String[] measurements = new String[2]; |
| measurements[0] = "s0"; |
| measurements[1] = "s1"; |
| List<Integer> dataTypes = new ArrayList<>(); |
| dataTypes.add(TSDataType.INT32.ordinal()); |
| dataTypes.add(TSDataType.INT64.ordinal()); |
| |
| IMeasurementMNode[] measurementMNodes = new IMeasurementMNode[2]; |
| measurementMNodes[0] = |
| new MeasurementMNode( |
| null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null); |
| measurementMNodes[1] = |
| new MeasurementMNode( |
| null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null); |
| |
| InsertTabletPlan insertTabletPlan1 = |
| new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes); |
| |
| long[] times = new long[1200]; |
| Object[] columns = new Object[2]; |
| columns[0] = new int[1200]; |
| columns[1] = new long[1200]; |
| |
| for (int r = 0; r < 1200; r++) { |
| times[r] = r; |
| ((int[]) columns[0])[r] = 1; |
| ((long[]) columns[1])[r] = 1; |
| } |
| insertTabletPlan1.setTimes(times); |
| insertTabletPlan1.setColumns(columns); |
| insertTabletPlan1.setRowCount(times.length); |
| insertTabletPlan1.setMeasurementMNodes(measurementMNodes); |
| |
| processor.insertTablet(insertTabletPlan1); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| |
| InsertTabletPlan insertTabletPlan2 = |
| new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes); |
| |
| for (int r = 1249; r >= 50; r--) { |
| times[r - 50] = r; |
| ((int[]) columns[0])[r - 50] = 1; |
| ((long[]) columns[1])[r - 50] = 1; |
| } |
| insertTabletPlan2.setTimes(times); |
| insertTabletPlan2.setColumns(columns); |
| insertTabletPlan2.setRowCount(times.length); |
| insertTabletPlan2.setMeasurementMNodes(measurementMNodes); |
| |
| processor.insertTablet(insertTabletPlan2); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) { |
| tsfileProcessor.syncFlush(); |
| } |
| |
| QueryDataSource queryDataSource = |
| processor.query(new PartialPath(deviceId, measurementId), context, null, null); |
| |
| Assert.assertEquals(2, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| |
| config.setEnableDiscardOutOfOrderData(defaultEnableDiscard); |
| config.setPartitionInterval(defaultTimePartition); |
| config.setEnablePartition(defaultEnablePartition); |
| } |
| |
| @Test |
| public void testEnableDiscardOutOfOrderDataForInsertTablet3() |
| throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException { |
| boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData(); |
| long defaultTimePartition = config.getPartitionInterval(); |
| boolean defaultEnablePartition = config.isEnablePartition(); |
| config.setEnableDiscardOutOfOrderData(true); |
| config.setEnablePartition(true); |
| config.setPartitionInterval(1000); |
| |
| String[] measurements = new String[2]; |
| measurements[0] = "s0"; |
| measurements[1] = "s1"; |
| List<Integer> dataTypes = new ArrayList<>(); |
| dataTypes.add(TSDataType.INT32.ordinal()); |
| dataTypes.add(TSDataType.INT64.ordinal()); |
| |
| IMeasurementMNode[] measurementMNodes = new IMeasurementMNode[2]; |
| measurementMNodes[0] = |
| new MeasurementMNode( |
| null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null); |
| measurementMNodes[1] = |
| new MeasurementMNode( |
| null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null); |
| |
| InsertTabletPlan insertTabletPlan1 = |
| new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes); |
| |
| long[] times = new long[1200]; |
| Object[] columns = new Object[2]; |
| columns[0] = new int[1200]; |
| columns[1] = new long[1200]; |
| |
| for (int r = 0; r < 1200; r++) { |
| times[r] = r; |
| ((int[]) columns[0])[r] = 1; |
| ((long[]) columns[1])[r] = 1; |
| } |
| insertTabletPlan1.setTimes(times); |
| insertTabletPlan1.setColumns(columns); |
| insertTabletPlan1.setRowCount(times.length); |
| insertTabletPlan1.setMeasurementMNodes(measurementMNodes); |
| |
| processor.insertTablet(insertTabletPlan1); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| |
| InsertTabletPlan insertTabletPlan2 = |
| new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes); |
| |
| for (int r = 1249; r >= 50; r--) { |
| times[r - 50] = r; |
| ((int[]) columns[0])[r - 50] = 1; |
| ((long[]) columns[1])[r - 50] = 1; |
| } |
| insertTabletPlan2.setTimes(times); |
| insertTabletPlan2.setColumns(columns); |
| insertTabletPlan2.setRowCount(times.length); |
| insertTabletPlan2.setMeasurementMNodes(measurementMNodes); |
| |
| processor.insertTablet(insertTabletPlan2); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) { |
| tsfileProcessor.syncFlush(); |
| } |
| |
| QueryDataSource queryDataSource = |
| processor.query(new PartialPath(deviceId, measurementId), context, null, null); |
| |
| Assert.assertEquals(2, queryDataSource.getSeqResources().size()); |
| Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| |
| config.setEnableDiscardOutOfOrderData(defaultEnableDiscard); |
| config.setPartitionInterval(defaultTimePartition); |
| config.setEnablePartition(defaultEnablePartition); |
| } |
| |
| @Test |
| public void testMerge() |
| throws WriteProcessException, QueryProcessException, IllegalPathException, |
| TriggerExecutionException { |
| for (int j = 21; j <= 30; j++) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| processor.insert(new InsertRowPlan(record)); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| |
| for (int j = 10; j >= 1; j--) { |
| TSRecord record = new TSRecord(j, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); |
| processor.insert(new InsertRowPlan(record)); |
| processor.asyncCloseAllWorkingTsFileProcessors(); |
| } |
| |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| processor.merge(config.isForceFullMerge()); |
| while (processor.getTsFileManagement().isUnseqMerging) { |
| // wait |
| } |
| |
| QueryDataSource queryDataSource = |
| processor.query(new PartialPath(deviceId, measurementId), context, null, null); |
| Assert.assertEquals(10, queryDataSource.getSeqResources().size()); |
| for (TsFileResource resource : queryDataSource.getSeqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| for (TsFileResource resource : queryDataSource.getUnseqResources()) { |
| Assert.assertTrue(resource.isClosed()); |
| } |
| } |
| |
| @Test |
| public void testTimedFlushMemTable() |
| throws IllegalPathException, InterruptedException, WriteProcessException, |
| TriggerExecutionException { |
| // create one seq memtable & close |
| TSRecord record = new TSRecord(10000, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000))); |
| processor.insert(new InsertRowPlan(record)); |
| Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber()); |
| processor.syncCloseAllWorkingTsFileProcessors(); |
| Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber()); |
| |
| // create one unseq memtable |
| record = new TSRecord(1, deviceId); |
| record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000))); |
| processor.insert(new InsertRowPlan(record)); |
| Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber()); |
| |
| // check memtable's flush interval & flush the unsequence memtable |
| long preFLushInterval = config.getUnseqMemtableFlushInterval(); |
| config.setUnseqMemtableFlushInterval(5); |
| |
| Thread.sleep(500); |
| |
| processor.timedFlushMemTable(); |
| |
| // wait until memtable flush task is done |
| Assert.assertEquals(1, processor.getWorkUnsequenceTsFileProcessors().size()); |
| TsFileProcessor tsFileProcessor = |
| processor.getWorkUnsequenceTsFileProcessors().iterator().next(); |
| FlushManager flushManager = FlushManager.getInstance(); |
| int waitCnt = 0; |
| while (tsFileProcessor.getFlushingMemTableSize() != 0 |
| || tsFileProcessor.isManagedByFlushManager() |
| || flushManager.getNumberOfPendingTasks() != 0 |
| || flushManager.getNumberOfPendingSubTasks() != 0 |
| || flushManager.getNumberOfWorkingTasks() != 0 |
| || flushManager.getNumberOfWorkingSubTasks() != 0) { |
| Thread.sleep(500); |
| ++waitCnt; |
| if (waitCnt % 10 == 0) { |
| logger.info("already wait {} s", waitCnt / 2); |
| } |
| } |
| |
| Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber()); |
| |
| config.setUnseqMemtableFlushInterval(preFLushInterval); |
| } |
| |
| class DummySGP extends StorageGroupProcessor { |
| |
| DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException { |
| super( |
| systemInfoDir, |
| storageGroupName, |
| new TsFileFlushPolicy.DirectFlushPolicy(), |
| storageGroupName); |
| } |
| } |
| } |