| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.storageengine.dataregion.compaction.repair; |
| |
| import org.apache.iotdb.commons.exception.IllegalPathException; |
| import org.apache.iotdb.commons.exception.MetadataException; |
| import org.apache.iotdb.commons.exception.StartupException; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.exception.StorageEngineException; |
| import org.apache.iotdb.db.storageengine.dataregion.DataRegion; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.RepairUnsortedFileCompactionTask; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleSummary; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; |
| import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; |
| import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; |
| import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; |
| import org.apache.iotdb.tsfile.exception.write.WriteProcessException; |
| import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID; |
| import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.iotdb.tsfile.read.TimeValuePair; |
| import org.apache.iotdb.tsfile.read.TsFileSequenceReader; |
| import org.apache.iotdb.tsfile.read.common.BatchData; |
| import org.apache.iotdb.tsfile.read.common.Chunk; |
| import org.apache.iotdb.tsfile.read.common.TimeRange; |
| import org.apache.iotdb.tsfile.read.reader.IPointReader; |
| import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader; |
| |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| |
| public class RepairUnsortedFileCompactionTest extends AbstractRepairDataTest { |
| |
| private boolean enableSeqSpaceCompaction = |
| IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction(); |
| private boolean enableUnSeqSpaceCompaction = |
| IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction(); |
| private boolean enableCrossSpaceCompaction = |
| IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction(); |
| |
| @Before |
| public void setUp() |
| throws IOException, WriteProcessException, MetadataException, InterruptedException { |
| super.setUp(); |
| Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1"); |
| IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true); |
| IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true); |
| IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true); |
| try { |
| CompactionScheduleTaskManager.getInstance().start(); |
| } catch (StartupException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @After |
| public void tearDown() throws IOException, StorageEngineException { |
| IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction); |
| IoTDBDescriptor.getInstance() |
| .getConfig() |
| .setEnableUnseqSpaceCompaction(enableUnSeqSpaceCompaction); |
| IoTDBDescriptor.getInstance() |
| .getConfig() |
| .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction); |
| super.tearDown(); |
| } |
| |
| @Test |
| public void testRepairUnsortedDataBetweenPageWithNonAlignedSeries() throws IOException { |
| TsFileResource resource = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleNonAlignedSeriesToCurrentDevice( |
| "s1", |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(5, 30)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource)); |
| RepairUnsortedFileCompactionTask task = |
| new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, resource.isSeq(), 0); |
| task.start(); |
| Assert.assertEquals(0, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(false).size()); |
| Assert.assertTrue( |
| TsFileResourceUtils.validateTsFileDataCorrectness( |
| tsFileManager.getTsFileList(false).get(0))); |
| } |
| |
| @Test |
| public void testRepairUnsortedDataBetweenPageWithAlignedSeries() throws IOException { |
| TsFileResource resource = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(5, 30)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource)); |
| RepairUnsortedFileCompactionTask task = |
| new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, resource.isSeq(), 0); |
| task.start(); |
| Assert.assertEquals(0, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(false).size()); |
| Assert.assertTrue( |
| TsFileResourceUtils.validateTsFileDataCorrectness( |
| tsFileManager.getTsFileList(false).get(0))); |
| } |
| |
| @Test |
| public void testRepairUnsortedDataInOnePageWithNonAlignedSeries() throws IOException { |
| TsFileResource resource = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleNonAlignedSeriesToCurrentDevice( |
| "s1", |
| new TimeRange[][][] { |
| new TimeRange[][] { |
| new TimeRange[] {new TimeRange(10, 20), new TimeRange(29, 30), new TimeRange(21, 25)} |
| } |
| }, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource)); |
| RepairUnsortedFileCompactionTask task = |
| new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, resource.isSeq(), 0); |
| task.start(); |
| Assert.assertEquals(0, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(false).size()); |
| Assert.assertTrue( |
| TsFileResourceUtils.validateTsFileDataCorrectness( |
| tsFileManager.getTsFileList(false).get(0))); |
| } |
| |
| @Test |
| public void testRepairUnsortedDataInOnePageWithMultiNonAlignedSeries() throws IOException { |
| TsFileResource resource = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { |
| writer.startChunkGroup("d1"); |
| for (int i = 0; i < 1000; i++) { |
| writer.generateSimpleNonAlignedSeriesToCurrentDevice( |
| "s" + i, |
| new TimeRange[][][] { |
| new TimeRange[][] { |
| new TimeRange[] { |
| new TimeRange(10, 20), new TimeRange(29, 30), new TimeRange(21, 25) |
| } |
| } |
| }, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| } |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource)); |
| RepairUnsortedFileCompactionTask task = |
| new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, resource.isSeq(), 0); |
| task.start(); |
| Assert.assertEquals(0, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(false).size()); |
| Assert.assertTrue( |
| TsFileResourceUtils.validateTsFileDataCorrectness( |
| tsFileManager.getTsFileList(false).get(0))); |
| } |
| |
| @Test |
| public void testRepairUnsortedDataInOnePageWithUnseqFile() throws IOException { |
| TsFileResource resource = createEmptyFileAndResource(false); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleNonAlignedSeriesToCurrentDevice( |
| "s1", |
| new TimeRange[][][] { |
| new TimeRange[][] { |
| new TimeRange[] {new TimeRange(10, 20), new TimeRange(29, 30), new TimeRange(21, 25)} |
| } |
| }, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource)); |
| RepairUnsortedFileCompactionTask task = |
| new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, resource.isSeq(), 0); |
| task.start(); |
| Assert.assertEquals(0, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(false).size()); |
| Assert.assertTrue( |
| TsFileResourceUtils.validateTsFileDataCorrectness( |
| tsFileManager.getTsFileList(false).get(0))); |
| } |
| |
| @Test |
| public void testRepairUnsortedDataInOnePageWithAlignedSeries() throws IOException { |
| TsFileResource resource = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][][] { |
| new TimeRange[][] { |
| new TimeRange[] {new TimeRange(10, 20), new TimeRange(29, 30), new TimeRange(21, 25)} |
| } |
| }, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource)); |
| RepairUnsortedFileCompactionTask task = |
| new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, resource.isSeq(), 0); |
| task.start(); |
| Assert.assertEquals(0, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(false).size()); |
| Assert.assertTrue( |
| TsFileResourceUtils.validateTsFileDataCorrectness( |
| tsFileManager.getTsFileList(false).get(0))); |
| Assert.assertTrue( |
| TsFileResourceUtils.validateTsFileResourcesHasNoOverlap( |
| tsFileManager.getOrCreateSequenceListByTimePartition(0))); |
| } |
| |
| @Test |
| public void testMarkFileAndRepairWithInnerSeqSpaceCompactionTask() |
| throws IOException, InterruptedException { |
| TsFileResource seqResource1 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource1)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][][] { |
| new TimeRange[][] { |
| new TimeRange[] {new TimeRange(10, 20), new TimeRange(29, 30), new TimeRange(21, 25)} |
| } |
| }, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| TsFileResource seqResource2 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource2)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(5, 30)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| InnerSpaceCompactionTask task = |
| new InnerSpaceCompactionTask( |
| 0, |
| tsFileManager, |
| Arrays.asList(seqResource1, seqResource2), |
| true, |
| new ReadChunkCompactionPerformer(), |
| 0); |
| Assert.assertFalse(task.start()); |
| |
| for (TsFileResource resource : tsFileManager.getTsFileList(true)) { |
| Assert.assertEquals(resource.getTsFileRepairStatus(), TsFileRepairStatus.NEED_TO_REPAIR); |
| } |
| |
| long initialFinishedCompactionTaskNum = |
| CompactionTaskManager.getInstance().getFinishedTaskNum(); |
| CompactionScheduleSummary summary = new CompactionScheduleSummary(); |
| CompactionScheduler.scheduleCompaction(tsFileManager, 0, summary); |
| Assert.assertEquals(2, summary.getSubmitSeqInnerSpaceCompactionTaskNum()); |
| |
| int waitSecond = 20; |
| while (CompactionTaskManager.getInstance().getFinishedTaskNum() |
| - initialFinishedCompactionTaskNum |
| < 2) { |
| if (waitSecond == 0) { |
| Assert.fail("Exceed the max time to wait repair compaction"); |
| } |
| try { |
| Thread.sleep(TimeUnit.SECONDS.toMillis(1)); |
| waitSecond--; |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| Assert.assertEquals(0, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(2, tsFileManager.getTsFileList(false).size()); |
| for (TsFileResource resource : tsFileManager.getTsFileList(false)) { |
| TsFileResourceUtils.validateTsFileDataCorrectness(resource); |
| } |
| Assert.assertTrue( |
| TsFileResourceUtils.validateTsFileResourcesHasNoOverlap( |
| tsFileManager.getOrCreateSequenceListByTimePartition(0))); |
| } |
| |
| @Test |
| public void testMarkFileAndRepairWithInnerUnSeqSpaceCompactionTask() |
| throws IOException, InterruptedException { |
| TsFileResource unSeqResource1 = createEmptyFileAndResource(false); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(unSeqResource1)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][][] { |
| new TimeRange[][] { |
| new TimeRange[] {new TimeRange(10, 20), new TimeRange(29, 30), new TimeRange(21, 25)} |
| } |
| }, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| TsFileResource unSeqResource2 = createEmptyFileAndResource(false); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(unSeqResource2)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(5, 30)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| InnerSpaceCompactionTask task = |
| new InnerSpaceCompactionTask( |
| 0, |
| tsFileManager, |
| Arrays.asList(unSeqResource1, unSeqResource2), |
| false, |
| new FastCompactionPerformer(false), |
| 0); |
| Assert.assertFalse(task.start()); |
| |
| for (TsFileResource resource : tsFileManager.getTsFileList(true)) { |
| Assert.assertEquals(resource.getTsFileRepairStatus(), TsFileRepairStatus.NEED_TO_REPAIR); |
| } |
| |
| long initialFinishedCompactionTaskNum = |
| CompactionTaskManager.getInstance().getFinishedTaskNum(); |
| CompactionScheduleSummary summary = new CompactionScheduleSummary(); |
| CompactionScheduler.scheduleCompaction(tsFileManager, 0, summary); |
| Assert.assertEquals(2, summary.getSubmitUnseqInnerSpaceCompactionTaskNum()); |
| |
| int waitSecond = 20; |
| while (CompactionTaskManager.getInstance().getFinishedTaskNum() |
| - initialFinishedCompactionTaskNum |
| < 2) { |
| if (waitSecond == 0) { |
| Assert.fail("Exceed the max time to wait repair compaction"); |
| } |
| try { |
| Thread.sleep(TimeUnit.SECONDS.toMillis(1)); |
| waitSecond--; |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| Assert.assertEquals(0, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(2, tsFileManager.getTsFileList(false).size()); |
| for (TsFileResource resource : tsFileManager.getTsFileList(false)) { |
| TsFileResourceUtils.validateTsFileDataCorrectness(resource); |
| } |
| Assert.assertTrue( |
| TsFileResourceUtils.validateTsFileResourcesHasNoOverlap( |
| tsFileManager.getOrCreateSequenceListByTimePartition(0))); |
| } |
| |
| @Test |
| public void testMarkFileAndRepairWithCrossSpaceCompactionTask() |
| throws IOException, InterruptedException { |
| TsFileResource seqResource1 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource1)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][][] { |
| new TimeRange[][] { |
| new TimeRange[] {new TimeRange(10, 20), new TimeRange(29, 30), new TimeRange(21, 25)} |
| } |
| }, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| TsFileResource unSeqResource1 = createEmptyFileAndResource(false); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(unSeqResource1)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(5, 30)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| CrossSpaceCompactionTask task = |
| new CrossSpaceCompactionTask( |
| 0, |
| tsFileManager, |
| Collections.singletonList(seqResource1), |
| Collections.singletonList(unSeqResource1), |
| new FastCompactionPerformer(true), |
| 0, |
| 0); |
| Assert.assertFalse(task.start()); |
| |
| for (TsFileResource resource : tsFileManager.getTsFileList(true)) { |
| Assert.assertEquals(resource.getTsFileRepairStatus(), TsFileRepairStatus.NEED_TO_REPAIR); |
| } |
| |
| long initialFinishedCompactionTaskNum = |
| CompactionTaskManager.getInstance().getFinishedTaskNum(); |
| CompactionScheduleSummary summary = new CompactionScheduleSummary(); |
| CompactionScheduler.scheduleCompaction(tsFileManager, 0, summary); |
| Assert.assertEquals(1, summary.getSubmitSeqInnerSpaceCompactionTaskNum()); |
| Assert.assertEquals(1, summary.getSubmitUnseqInnerSpaceCompactionTaskNum()); |
| |
| int waitSecond = 20; |
| while (CompactionTaskManager.getInstance().getFinishedTaskNum() |
| - initialFinishedCompactionTaskNum |
| < 2) { |
| if (waitSecond == 0) { |
| Assert.fail("Exceed the max time to wait repair compaction"); |
| } |
| try { |
| Thread.sleep(TimeUnit.SECONDS.toMillis(1)); |
| waitSecond--; |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| Assert.assertEquals(0, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(2, tsFileManager.getTsFileList(false).size()); |
| for (TsFileResource resource : tsFileManager.getTsFileList(false)) { |
| TsFileResourceUtils.validateTsFileDataCorrectness(resource); |
| } |
| } |
| |
| @Test |
| public void testRepairOverlapBetweenFile() throws IOException { |
| TsFileResource seqResource1 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource1)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(25, 30)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| TsFileResource seqResource2 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource2)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(20, 30), new TimeRange(35, 40)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| seqResources.add(seqResource1); |
| seqResources.add(seqResource2); |
| |
| tsFileManager.addAll(seqResources, true); |
| Assert.assertFalse(TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(seqResources)); |
| |
| RepairUnsortedFileCompactionTask task = |
| new RepairUnsortedFileCompactionTask(0, tsFileManager, seqResource2, true, false, 0); |
| Assert.assertTrue(task.start()); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(false).size()); |
| Assert.assertTrue( |
| TsFileResourceUtils.validateTsFileDataCorrectness( |
| tsFileManager.getTsFileList(false).get(0))); |
| Assert.assertTrue( |
| TsFileResourceUtils.validateTsFileResourceCorrectness( |
| tsFileManager.getTsFileList(false).get(0))); |
| } |
| |
| @Test |
| public void testRepairOverlapBetweenFileWithModFile() throws IOException, IllegalPathException { |
| TsFileResource seqResource1 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource1)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(25, 30)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| TsFileResource seqResource2 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource2)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(20, 30), new TimeRange(35, 40)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| ModificationFile modFile = seqResource2.getModFile(); |
| Deletion writedModification = |
| new Deletion(new PartialPath("root.testsg.d1.s1"), Long.MAX_VALUE, 15); |
| modFile.write(writedModification); |
| modFile.close(); |
| |
| seqResources.add(seqResource1); |
| seqResources.add(seqResource2); |
| |
| tsFileManager.addAll(seqResources, true); |
| Assert.assertFalse(TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(seqResources)); |
| |
| RepairUnsortedFileCompactionTask task = |
| new RepairUnsortedFileCompactionTask(0, tsFileManager, seqResource2, true, false, 0); |
| Assert.assertTrue(task.start()); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(false).size()); |
| TsFileResource targetResource = tsFileManager.getTsFileList(false).get(0); |
| Assert.assertTrue(TsFileResourceUtils.validateTsFileDataCorrectness(targetResource)); |
| Assert.assertTrue(TsFileResourceUtils.validateTsFileResourceCorrectness(targetResource)); |
| Assert.assertTrue(targetResource.modFileExists()); |
| Assert.assertEquals(1, targetResource.getModFile().getModifications().size()); |
| Deletion modification = |
| (Deletion) targetResource.getModFile().getModifications().iterator().next(); |
| Assert.assertEquals(writedModification.getFileOffset(), modification.getFileOffset()); |
| Assert.assertEquals(writedModification.getEndTime(), modification.getEndTime()); |
| } |
| |
| @Test |
| public void testScheduleRepairInternalUnsortedFile() throws IOException { |
| DataRegion mockDataRegion = Mockito.mock(DataRegion.class); |
| Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager); |
| Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg"); |
| Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0"); |
| Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L)); |
| |
| TsFileResource seqResource1 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource1)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(15, 30)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| TsFileResource seqResource2 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource2)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(40, 50), new TimeRange(55, 60)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| seqResources.add(seqResource1); |
| seqResources.add(seqResource2); |
| tsFileManager.addAll(seqResources, true); |
| Assert.assertTrue(TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(seqResources)); |
| |
| File tempDir = getEmptyRepairDataLogDir(); |
| |
| CompactionScheduleTaskManager.getRepairTaskManagerInstance().markRepairTaskStart(); |
| UnsortedFileRepairTaskScheduler scheduler = |
| new UnsortedFileRepairTaskScheduler( |
| Collections.singletonList(mockDataRegion), false, tempDir); |
| scheduler.run(); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(false).size()); |
| } |
| |
| @Test |
| public void testScheduleRepairOverlapFile() throws IOException { |
| DataRegion mockDataRegion = Mockito.mock(DataRegion.class); |
| Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager); |
| Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg"); |
| Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0"); |
| Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L)); |
| |
| TsFileResource seqResource1 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource1)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(25, 30)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| TsFileResource seqResource2 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource2)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 50), new TimeRange(55, 60)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| seqResources.add(seqResource1); |
| seqResources.add(seqResource2); |
| tsFileManager.addAll(seqResources, true); |
| Assert.assertFalse(TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(seqResources)); |
| |
| File tempDir = getEmptyRepairDataLogDir(); |
| |
| CompactionScheduleTaskManager.getRepairTaskManagerInstance().markRepairTaskStart(); |
| UnsortedFileRepairTaskScheduler scheduler = |
| new UnsortedFileRepairTaskScheduler( |
| Collections.singletonList(mockDataRegion), false, tempDir); |
| scheduler.run(); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(false).size()); |
| } |
| |
| @Test |
| public void testScheduleRepairOverlapFileAndInternalUnsortedFile() throws IOException { |
| DataRegion mockDataRegion = Mockito.mock(DataRegion.class); |
| Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager); |
| Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg"); |
| Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0"); |
| Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L)); |
| |
| TsFileResource seqResource1 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource1)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(15, 30)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| TsFileResource seqResource2 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource2)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 50), new TimeRange(55, 60)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| TsFileResource seqResource3 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource3)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(40, 80)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| seqResources.add(seqResource1); |
| seqResources.add(seqResource2); |
| seqResources.add(seqResource3); |
| tsFileManager.addAll(seqResources, true); |
| Assert.assertFalse(TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(seqResources)); |
| |
| File tempDir = getEmptyRepairDataLogDir(); |
| |
| CompactionScheduleTaskManager.getRepairTaskManagerInstance().markRepairTaskStart(); |
| UnsortedFileRepairTaskScheduler scheduler = |
| new UnsortedFileRepairTaskScheduler( |
| Collections.singletonList(mockDataRegion), false, tempDir); |
| scheduler.run(); |
| Assert.assertEquals(1, tsFileManager.getTsFileList(true).size()); |
| Assert.assertEquals(2, tsFileManager.getTsFileList(false).size()); |
| } |
| |
| @Test |
| public void testRecoverRepairScheduleSkipRepairedTimePartitionAndMarkFile() throws IOException { |
| DataRegion mockDataRegion = Mockito.mock(DataRegion.class); |
| Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager); |
| Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg"); |
| Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0"); |
| Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L)); |
| |
| TsFileResource seqResource1 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource1)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(15, 30)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| TsFileResource seqResource2 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource2)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 50), new TimeRange(55, 60)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| TsFileResource seqResource3 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource3)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(40, 80)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| seqResource3.setTsFileRepairStatus(TsFileRepairStatus.CAN_NOT_REPAIR); |
| seqResources.add(seqResource1); |
| seqResources.add(seqResource2); |
| seqResources.add(seqResource3); |
| tsFileManager.addAll(seqResources, true); |
| Assert.assertFalse(TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(seqResources)); |
| |
| File tempDir = getEmptyRepairDataLogDir(); |
| try (RepairLogger logger = new RepairLogger(tempDir, false)) { |
| logger.recordRepairTaskStartTimeIfLogFileEmpty(System.currentTimeMillis()); |
| RepairTimePartition timePartition = |
| new RepairTimePartition(mockDataRegion, 0, System.currentTimeMillis()); |
| // record seqResource3 as cannot recover |
| logger.recordRepairedTimePartition(timePartition); |
| } |
| // reset the repair status |
| seqResource3.setTsFileRepairStatus(TsFileRepairStatus.NORMAL); |
| |
| CompactionScheduleTaskManager.getRepairTaskManagerInstance().markRepairTaskStart(); |
| UnsortedFileRepairTaskScheduler scheduler = |
| new UnsortedFileRepairTaskScheduler( |
| Collections.singletonList(mockDataRegion), true, tempDir); |
| scheduler.run(); |
| Assert.assertEquals(3, tsFileManager.getTsFileList(true).size()); |
| // check whether the repair status is marked correctly |
| Assert.assertEquals(TsFileRepairStatus.NEED_TO_REPAIR, seqResource3.getTsFileRepairStatus()); |
| } |
| |
| @Test |
| public void testRecoverRepairScheduleSkipRepairedTimePartitionWithDeletedFile() |
| throws IOException { |
| DataRegion mockDataRegion = Mockito.mock(DataRegion.class); |
| Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager); |
| Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg"); |
| Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0"); |
| Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L)); |
| |
| TsFileResource seqResource1 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource1)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(15, 30)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| |
| TsFileResource seqResource2 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource2)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 50), new TimeRange(55, 60)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| TsFileResource seqResource3 = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource3)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDevice( |
| Arrays.asList("s1", "s2"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(40, 80)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| seqResource3.setTsFileRepairStatus(TsFileRepairStatus.CAN_NOT_REPAIR); |
| seqResources.add(seqResource1); |
| seqResources.add(seqResource2); |
| seqResources.add(seqResource3); |
| tsFileManager.addAll(seqResources, true); |
| Assert.assertFalse(TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(seqResources)); |
| |
| File tempDir = getEmptyRepairDataLogDir(); |
| try (RepairLogger logger = new RepairLogger(tempDir, false)) { |
| logger.recordRepairTaskStartTimeIfLogFileEmpty(System.currentTimeMillis()); |
| RepairTimePartition timePartition = |
| new RepairTimePartition(mockDataRegion, 0, System.currentTimeMillis()); |
| // record seqResource3 as cannot recover |
| logger.recordRepairedTimePartition(timePartition); |
| } |
| // reset the repair status |
| seqResource3.setTsFileRepairStatus(TsFileRepairStatus.NORMAL); |
| // resource3 is deleted |
| tsFileManager.replace( |
| Collections.singletonList(seqResource3), |
| Collections.emptyList(), |
| Collections.emptyList(), |
| 0); |
| |
| CompactionScheduleTaskManager.getRepairTaskManagerInstance().markRepairTaskStart(); |
| UnsortedFileRepairTaskScheduler scheduler = |
| new UnsortedFileRepairTaskScheduler( |
| Collections.singletonList(mockDataRegion), true, tempDir); |
| scheduler.run(); |
| Assert.assertEquals(2, tsFileManager.getTsFileList(true).size()); |
| } |
| |
| @Test |
| public void testTimePartitionFilterFiles() { |
| DataRegion mockDataRegion = Mockito.mock(DataRegion.class); |
| Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager); |
| Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L)); |
| |
| TsFileResource seqResource1 = createEmptyFileAndResourceWithName("100-1-0-0.tsfile", 0, true); |
| TsFileResource seqResource2 = createEmptyFileAndResourceWithName("200-3-0-0.tsfile", 0, true); |
| TsFileResource seqResource3 = createEmptyFileAndResourceWithName("300-5-0-0.tsfile", 0, true); |
| TsFileResource unseqResource1 = |
| createEmptyFileAndResourceWithName("101-2-0-0.tsfile", 0, false); |
| TsFileResource unseqResource2 = |
| createEmptyFileAndResourceWithName("201-4-0-0.tsfile", 0, false); |
| TsFileResource unseqResource3 = |
| createEmptyFileAndResourceWithName("301-6-0-0.tsfile", 0, false); |
| seqResources.add(seqResource1); |
| seqResources.add(seqResource2); |
| seqResources.add(seqResource3); |
| unseqResources.add(unseqResource1); |
| unseqResources.add(unseqResource2); |
| unseqResources.add(unseqResource3); |
| tsFileManager.addAll(seqResources, true); |
| tsFileManager.addAll(unseqResources, false); |
| |
| RepairTimePartition timePartition = new RepairTimePartition(mockDataRegion, 0, 250); |
| Assert.assertEquals(4, timePartition.getAllFileSnapshot().size()); |
| Assert.assertEquals(2, timePartition.getSeqFileSnapshot().size()); |
| Assert.assertEquals(2, timePartition.getUnSeqFileSnapshot().size()); |
| } |
| |
| @Test |
| public void testEstimateRepairCompactionMemory() throws IOException { |
| TsFileResource resource = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleNonAlignedSeriesToCurrentDevice( |
| "s1", |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(5, 30)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| RepairUnsortedFileCompactionTask task = |
| new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, true, 0); |
| Assert.assertTrue(task.getEstimatedMemoryCost() > 0); |
| task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, false, 0); |
| Assert.assertEquals(0, task.getEstimatedMemoryCost()); |
| } |
| |
| @Test |
| public void testMergeAlignedSeriesPointWithSameTimestamp() throws IOException { |
| TsFileResource resource = createEmptyFileAndResource(true); |
| try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { |
| writer.startChunkGroup("d1"); |
| writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue( |
| Arrays.asList("s1", "s2", "s3"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4, |
| Arrays.asList(true, false, false)); |
| writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue( |
| Arrays.asList("s1", "s2", "s3"), |
| new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20)}}, |
| TSEncoding.PLAIN, |
| CompressionType.LZ4, |
| Arrays.asList(false, true, true)); |
| writer.endChunkGroup(); |
| writer.endFile(); |
| } |
| RepairUnsortedFileCompactionTask task = |
| new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, true, 0); |
| Assert.assertTrue(task.start()); |
| TsFileResource target = tsFileManager.getTsFileList(false).get(0); |
| try (TsFileSequenceReader reader = new TsFileSequenceReader(target.getTsFilePath())) { |
| List<AlignedChunkMetadata> chunkMetadataList = |
| reader.getAlignedChunkMetadata(new PlainDeviceID("root.testsg.d1")); |
| for (AlignedChunkMetadata alignedChunkMetadata : chunkMetadataList) { |
| ChunkMetadata timeChunkMetadata = |
| (ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata(); |
| Chunk timeChunk = reader.readMemChunk(timeChunkMetadata); |
| List<Chunk> valueChunks = new ArrayList<>(); |
| for (IChunkMetadata chunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { |
| Chunk valueChunk = reader.readMemChunk((ChunkMetadata) chunkMetadata); |
| valueChunks.add(valueChunk); |
| } |
| AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunks, null); |
| while (chunkReader.hasNextSatisfiedPage()) { |
| BatchData batchData = chunkReader.nextPageData(); |
| IPointReader pointReader = batchData.getBatchDataIterator(); |
| while (pointReader.hasNextTimeValuePair()) { |
| TimeValuePair timeValuePair = pointReader.nextTimeValuePair(); |
| for (Object value : timeValuePair.getValues()) { |
| if (value == null) { |
| Assert.fail(); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| } |