blob: 8c925424b38db09af477dda3b180ec3ec9b28277 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.inner;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionCheckerUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class NewReadChunkCompactionPerformerWithAlignedSeriesTest extends AbstractCompactionTest {
long originTargetChunkSize;
long originTargetChunkPointNum;
int originTargetPageSize;
int originTargetPagePointNum;
@Before
public void setUp()
throws IOException, WriteProcessException, MetadataException, InterruptedException {
super.setUp();
originTargetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
originTargetChunkPointNum = IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
originTargetPageSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
originTargetPagePointNum =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(1048576);
IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(100000);
TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024);
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10000);
}
@After
public void tearDown() throws IOException, StorageEngineException {
super.tearDown();
IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(originTargetChunkSize);
IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(originTargetChunkPointNum);
TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originTargetPageSize);
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(originTargetPagePointNum);
}
@Test
public void testSimpleCompactionByFlushChunk()
throws IOException, StorageEngineException, InterruptedException, MetadataException,
PageException {
TsFileResource seqResource1 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(100000, 200000), new TimeRange(300000, 500000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource1);
TsFileResource seqResource2 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(600000, 700000), new TimeRange(800000, 900000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource2);
tsFileManager.addAll(seqResources, true);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true);
ReadChunkCompactionPerformer performer = new ReadChunkCompactionPerformer();
CompactionTaskSummary summary = new CompactionTaskSummary();
performer.setSummary(summary);
performer.setSourceFiles(seqResources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
Assert.assertEquals(16, summary.getDirectlyFlushChunkNum());
Assert.assertEquals(0, summary.getDeserializeChunkCount());
TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
Assert.assertEquals(
CompactionCheckerUtils.readFiles(seqResources),
CompactionCheckerUtils.readFiles(Collections.singletonList(targetResource)));
}
@Test
public void testSimpleCompactionWithNullColumnByFlushChunk()
throws IOException, StorageEngineException, InterruptedException, MetadataException,
PageException {
TsFileResource seqResource1 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(100000, 200000), new TimeRange(300000, 500000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, true),
true);
seqResources.add(seqResource1);
TsFileResource seqResource2 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(600000, 700000), new TimeRange(800000, 900000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource2);
tsFileManager.addAll(seqResources, true);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true);
ReadChunkCompactionPerformer performer = new ReadChunkCompactionPerformer();
CompactionTaskSummary summary = new CompactionTaskSummary();
performer.setSummary(summary);
performer.setSourceFiles(seqResources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
Assert.assertEquals(14, summary.getDirectlyFlushChunkNum());
Assert.assertEquals(0, summary.getDeserializeChunkCount());
TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
Assert.assertEquals(
CompactionCheckerUtils.readFiles(seqResources),
CompactionCheckerUtils.readFiles(Collections.singletonList(targetResource)));
}
@Test
public void testSimpleCompactionWithAllDeletedColumnByFlushChunk()
throws IOException, StorageEngineException, InterruptedException, MetadataException,
PageException {
TsFileResource seqResource1 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(100000, 200000), new TimeRange(300000, 500000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource1);
seqResource1
.getModFile()
.write(
new Deletion(new PartialPath("root.testsg.d0", "s2"), Long.MAX_VALUE, Long.MAX_VALUE));
seqResource1.getModFile().close();
TsFileResource seqResource2 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(600000, 700000), new TimeRange(800000, 900000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource2);
tsFileManager.addAll(seqResources, true);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true);
ReadChunkCompactionPerformer performer = new ReadChunkCompactionPerformer();
CompactionTaskSummary summary = new CompactionTaskSummary();
performer.setSummary(summary);
performer.setSourceFiles(seqResources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
Assert.assertEquals(14, summary.getDirectlyFlushChunkNum());
Assert.assertEquals(0, summary.getDeserializeChunkCount());
TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
Assert.assertEquals(
CompactionCheckerUtils.readFiles(seqResources),
CompactionCheckerUtils.readFiles(Collections.singletonList(targetResource)));
}
@Test
public void testSimpleCompactionWithNotExistColumnByFlushChunk()
throws IOException, StorageEngineException, InterruptedException, MetadataException,
PageException {
TsFileResource seqResource1 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1"),
new TimeRange[] {new TimeRange(100000, 200000), new TimeRange(300000, 500000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource1);
seqResource1
.getModFile()
.write(
new Deletion(new PartialPath("root.testsg.d0", "s2"), Long.MAX_VALUE, Long.MAX_VALUE));
seqResource1.getModFile().close();
TsFileResource seqResource2 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(600000, 700000), new TimeRange(800000, 900000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource2);
tsFileManager.addAll(seqResources, true);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true);
ReadChunkCompactionPerformer performer = new ReadChunkCompactionPerformer();
CompactionTaskSummary summary = new CompactionTaskSummary();
performer.setSummary(summary);
performer.setSourceFiles(seqResources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
Assert.assertEquals(14, summary.getDirectlyFlushChunkNum());
Assert.assertEquals(0, summary.getDeserializeChunkCount());
TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
Assert.assertEquals(
CompactionCheckerUtils.readFiles(seqResources),
CompactionCheckerUtils.readFiles(Collections.singletonList(targetResource)));
}
@Test
public void testSimpleCompactionWithNullColumn()
throws IOException, StorageEngineException, InterruptedException, MetadataException,
PageException {
TsFileResource seqResource1 = createEmptyFileAndResource(true);
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource1)) {
writer.startChunkGroup("d0");
writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(100000, 200000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, true));
writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(300000, 500000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false));
writer.endChunkGroup();
writer.endFile();
}
seqResources.add(seqResource1);
TsFileResource seqResource2 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1"),
new TimeRange[] {new TimeRange(600000, 700000), new TimeRange(800000, 900000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false),
true);
seqResources.add(seqResource2);
tsFileManager.addAll(seqResources, true);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true);
ReadChunkCompactionPerformer performer = new ReadChunkCompactionPerformer();
CompactionTaskSummary summary = new CompactionTaskSummary();
performer.setSummary(summary);
performer.setSourceFiles(seqResources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
Assert.assertEquals(
CompactionCheckerUtils.readFiles(seqResources),
CompactionCheckerUtils.readFiles(Collections.singletonList(targetResource)));
}
@Test
public void testSimpleCompactionWithPartialDeletedColumnByFlushChunk()
throws IOException, StorageEngineException, InterruptedException, MetadataException,
PageException {
TsFileResource seqResource1 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(100000, 200000), new TimeRange(300000, 500000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource1);
seqResource1
.getModFile()
.write(new Deletion(new PartialPath("root.testsg.d0", "s2"), Long.MAX_VALUE, 250000));
seqResource1.getModFile().close();
TsFileResource seqResource2 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(600000, 700000), new TimeRange(800000, 900000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource2);
tsFileManager.addAll(seqResources, true);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true);
ReadChunkCompactionPerformer performer = new ReadChunkCompactionPerformer();
CompactionTaskSummary summary = new CompactionTaskSummary();
performer.setSummary(summary);
performer.setSourceFiles(seqResources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
Assert.assertEquals(15, summary.getDirectlyFlushChunkNum());
Assert.assertEquals(0, summary.getDeserializeChunkCount());
TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
Assert.assertEquals(
CompactionCheckerUtils.readFiles(seqResources),
CompactionCheckerUtils.readFiles(Collections.singletonList(targetResource)));
}
@Test
public void testSimpleCompactionWithAllDeletedPageByFlushPage() throws Exception {
TsFileResource seqResource1 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[][] {
new TimeRange[] {new TimeRange(10000, 20000), new TimeRange(30000, 50000)}
},
TSEncoding.RLE,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource1);
seqResource1
.getModFile()
.write(new Deletion(new PartialPath("root.testsg.d0", "s2"), Long.MAX_VALUE, 25000));
seqResource1.getModFile().close();
TsFileResource seqResource2 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[][] {
new TimeRange[] {new TimeRange(60000, 70000), new TimeRange(80000, 90000)}
},
TSEncoding.RLE,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource2);
tsFileManager.addAll(seqResources, true);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true);
ReadChunkCompactionPerformer performer = new ReadChunkCompactionPerformer();
CompactionTaskSummary summary = new CompactionTaskSummary();
performer.setSummary(summary);
performer.setSourceFiles(seqResources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
Assert.assertEquals(0, summary.getDirectlyFlushChunkNum());
Assert.assertEquals(8, summary.getDeserializeChunkCount());
Assert.assertEquals(15, summary.getDirectlyFlushPageCount());
TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
// this checker util may throw npe when the file contains any empty page
// Assert.assertEquals(CompactionCheckerUtils.readFiles(seqResources),
// CompactionCheckerUtils.readFiles(Collections.singletonList(targetResource)));
}
@Test
public void testSimpleCompactionWithPartialDeletedPageByWritePoint()
throws IOException, MetadataException, StorageEngineException, InterruptedException,
PageException {
TsFileResource seqResource1 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[][] {
new TimeRange[] {new TimeRange(10000, 20000), new TimeRange(30000, 50000)}
},
TSEncoding.RLE,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource1);
seqResource1
.getModFile()
.write(new Deletion(new PartialPath("root.testsg.d0", "s2"), Long.MAX_VALUE, 15000));
seqResource1.getModFile().close();
TsFileResource seqResource2 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[][] {
new TimeRange[] {new TimeRange(60000, 70000), new TimeRange(80000, 90000)}
},
TSEncoding.RLE,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource2);
tsFileManager.addAll(seqResources, true);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true);
ReadChunkCompactionPerformer performer = new ReadChunkCompactionPerformer();
CompactionTaskSummary summary = new CompactionTaskSummary();
performer.setSummary(summary);
performer.setSourceFiles(seqResources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
Assert.assertTrue(summary.getDeserializePageCount() > 0);
TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
Assert.assertEquals(
CompactionCheckerUtils.readFiles(seqResources),
CompactionCheckerUtils.readFiles(Collections.singletonList(targetResource)));
}
@Test
public void testSimpleCompactionByFlushPage()
throws IOException, StorageEngineException, InterruptedException, MetadataException,
PageException {
TsFileResource seqResource1 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(10000, 20000), new TimeRange(30000, 40000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource1);
TsFileResource seqResource2 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(60000, 70000), new TimeRange(80000, 90000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource2);
tsFileManager.addAll(seqResources, true);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true);
ReadChunkCompactionPerformer performer = new ReadChunkCompactionPerformer();
CompactionTaskSummary summary = new CompactionTaskSummary();
performer.setSummary(summary);
performer.setSourceFiles(seqResources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
Assert.assertEquals(16, summary.getDeserializeChunkCount());
Assert.assertEquals(16, summary.getDirectlyFlushPageCount());
TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
Assert.assertEquals(
CompactionCheckerUtils.readFiles(seqResources),
CompactionCheckerUtils.readFiles(Collections.singletonList(targetResource)));
}
@Test
public void testSimpleCompactionByWritePoint()
throws IOException, StorageEngineException, InterruptedException, MetadataException,
PageException {
TsFileResource seqResource1 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(1000, 2000), new TimeRange(3000, 4000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource1);
TsFileResource seqResource2 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(6000, 7000), new TimeRange(8000, 9000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource2);
tsFileManager.addAll(seqResources, true);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true);
ReadChunkCompactionPerformer performer = new ReadChunkCompactionPerformer();
CompactionTaskSummary summary = new CompactionTaskSummary();
performer.setSummary(summary);
performer.setSourceFiles(seqResources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
Assert.assertEquals(16, summary.getDeserializeChunkCount());
Assert.assertEquals(16, summary.getDeserializePageCount());
TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
Assert.assertEquals(
CompactionCheckerUtils.readFiles(seqResources),
CompactionCheckerUtils.readFiles(Collections.singletonList(targetResource)));
}
@Test
public void testCompactionWithDifferentCompressionTypeOrEncoding()
throws IOException, StorageEngineException, InterruptedException, MetadataException,
PageException {
TsFileResource seqResource1 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(100000, 200000), new TimeRange(300000, 500000)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource1);
TsFileResource seqResource2 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(600000, 700000), new TimeRange(800000, 900000)},
TSEncoding.PLAIN,
CompressionType.SNAPPY,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource2);
TsFileResource seqResource3 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(1600000, 1700000), new TimeRange(1800000, 1900000)},
TSEncoding.PLAIN,
CompressionType.SNAPPY,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource3);
tsFileManager.addAll(seqResources, true);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true);
ReadChunkCompactionPerformer performer = new ReadChunkCompactionPerformer();
CompactionTaskSummary summary = new CompactionTaskSummary();
performer.setSummary(summary);
performer.setSourceFiles(seqResources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
Assert.assertEquals(16, summary.getDirectlyFlushChunkNum());
Assert.assertEquals(0, summary.getDirectlyFlushPageCount());
TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
Assert.assertEquals(
CompactionCheckerUtils.readFiles(seqResources),
CompactionCheckerUtils.readFiles(Collections.singletonList(targetResource)));
}
@Test
public void testFlushChunkMetadataToTempFile()
throws IOException, StorageEngineException, InterruptedException, MetadataException,
PageException {
List<String> devices = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
devices.add("d" + i);
}
TsFileResource seqResource1 = createEmptyFileAndResource(true);
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource1)) {
for (String device : devices) {
writer.startChunkGroup(device);
writer.generateSimpleAlignedSeriesToCurrentDevice(
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(1000, 2000), new TimeRange(3000, 5000)},
TSEncoding.PLAIN,
CompressionType.LZ4);
writer.endChunkGroup();
}
writer.endFile();
}
seqResources.add(seqResource1);
TsFileResource seqResource2 = createEmptyFileAndResource(true);
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource2)) {
for (String device : devices) {
writer.startChunkGroup(device);
writer.generateSimpleAlignedSeriesToCurrentDevice(
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(6000, 7000), new TimeRange(8000, 9000)},
TSEncoding.PLAIN,
CompressionType.LZ4);
writer.endChunkGroup();
}
writer.endFile();
}
seqResources.add(seqResource2);
TsFileResource seqResource3 = createEmptyFileAndResource(true);
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(seqResource3)) {
for (String device : devices) {
writer.startChunkGroup(device);
writer.generateSimpleAlignedSeriesToCurrentDevice(
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(16000, 17000), new TimeRange(18000, 19000)},
TSEncoding.PLAIN,
CompressionType.LZ4);
writer.endChunkGroup();
}
writer.endFile();
}
seqResources.add(seqResource3);
tsFileManager.addAll(seqResources, true);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true);
ReadChunkCompactionPerformer performer = new ReadChunkCompactionPerformer();
CompactionTaskSummary summary = new CompactionTaskSummary();
performer.setSummary(summary);
performer.setSourceFiles(seqResources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
Assert.assertEquals(
CompactionCheckerUtils.readFiles(seqResources),
CompactionCheckerUtils.readFiles(Collections.singletonList(targetResource)));
Assert.assertEquals(devices.size(), targetResource.buildDeviceTimeIndex().getDevices().size());
}
private TsFileResource generateSingleAlignedSeriesFile(
String device,
List<String> measurement,
TimeRange[] chunkTimeRanges,
TSEncoding encoding,
CompressionType compressionType,
List<Boolean> nullValues,
boolean isSeq)
throws IOException {
TsFileResource seqResource1 = createEmptyFileAndResource(isSeq);
CompactionTestFileWriter writer1 = new CompactionTestFileWriter(seqResource1);
writer1.startChunkGroup(device);
writer1.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
measurement, chunkTimeRanges, encoding, compressionType, nullValues);
writer1.endChunkGroup();
writer1.endFile();
writer1.close();
return seqResource1;
}
private TsFileResource generateSingleAlignedSeriesFile(
String device,
List<String> measurement,
TimeRange[][] chunkTimeRanges,
TSEncoding encoding,
CompressionType compressionType,
List<Boolean> nullValues,
boolean isSeq)
throws IOException {
TsFileResource seqResource1 = createEmptyFileAndResource(isSeq);
CompactionTestFileWriter writer1 = new CompactionTestFileWriter(seqResource1);
writer1.startChunkGroup(device);
writer1.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
measurement, chunkTimeRanges, encoding, compressionType, nullValues);
writer1.endChunkGroup();
writer1.endFile();
writer1.close();
return seqResource1;
}
private TsFileResource generateSingleAlignedSeriesFile(
String device,
List<String> measurement,
TimeRange[][][] chunkTimeRanges,
TSEncoding encoding,
CompressionType compressionType,
List<Boolean> nullValues,
boolean isSeq)
throws IOException {
TsFileResource seqResource1 = createEmptyFileAndResource(isSeq);
CompactionTestFileWriter writer1 = new CompactionTestFileWriter(seqResource1);
writer1.startChunkGroup(device);
writer1.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
measurement, chunkTimeRanges, encoding, compressionType, nullValues);
writer1.endChunkGroup();
writer1.endFile();
writer1.close();
return seqResource1;
}
}