blob: d9d92034889547a9d7f8b73a35f7c2493aad212b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.cross;
import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.IDataBlockReader;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.SeriesDataBlockReader;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsFileGeneratorUtils;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.iotdb.commons.conf.IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX;
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
import static org.junit.Assert.assertEquals;
public class RewriteCrossSpaceCompactionWithFastPerformerTest extends AbstractCompactionTest {
private final String oldThreadName = Thread.currentThread().getName();
@Before
public void setUp()
throws IOException, WriteProcessException, MetadataException, InterruptedException {
super.setUp();
WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new ExceptionalCountDownLatch(1));
IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(1024);
Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
}
@After
public void tearDown() throws IOException, StorageEngineException {
super.tearDown();
Thread.currentThread().setName(oldThreadName);
FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
}
/**
* Total 4 seq files and 5 unseq files, each file has different aligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
* is 0 ~ 299 and 350 ~ 649.<br>
* third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
* range is 700 ~ 999 and 1050 ~ 1349.<br>
*
* <p>UnSeq files<br>
* first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
* ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*
* <p>The data of d0.s0, d0.s1, d2.s4 and d3.s4 is deleted in each file.
*/
@Test
public void testAlignedCrossSpaceCompactionWithAllDataDeletedInTimeseries() throws Exception {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
// generate mods file
List<String> seriesPaths = new ArrayList<>();
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ TsFileGeneratorUtils.getAlignDeviceOffset()
+ PATH_SEPARATOR
+ "s0");
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ TsFileGeneratorUtils.getAlignDeviceOffset()
+ PATH_SEPARATOR
+ "s1");
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)
+ PATH_SEPARATOR
+ "s4");
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)
+ PATH_SEPARATOR
+ "s4");
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE, false);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE, false);
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE, true);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE, true);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
i++) {
for (int j = 0; j < 5; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsFilesReader =
new SeriesDataBlockReader(
path,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
TsBlock batchData = tsFilesReader.nextBatch();
for (int readIndex = 0, size = batchData.getPositionCount();
readIndex < size;
readIndex++) {
long currentTime = batchData.getTimeByIndex(readIndex);
long currentValue = batchData.getColumn(0).getLong(readIndex);
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
&& ((450 <= currentTime && currentTime < 550)
|| (550 <= currentTime && currentTime < 650))) {
assertEquals(currentTime + 20000, currentValue);
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
&& ((20 <= currentTime && currentTime < 220)
|| (250 <= currentTime && currentTime < 450)
|| (480 <= currentTime && currentTime < 680))) {
assertEquals(currentTime + 10000, currentValue);
} else {
assertEquals(currentTime, currentValue);
}
count++;
}
}
tsFilesReader.close();
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j == 4)) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
assertEquals(1230, count);
} else if (i == TsFileGeneratorUtils.getAlignDeviceOffset()) {
assertEquals(800, count);
} else if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j == 4)) {
assertEquals(600, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
TsFileManager tsFileManager =
new TsFileManager(COMPACTION_TEST_SG, "0", STORAGE_GROUP_DIR.getPath());
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
tsFileManager,
seqResources,
unseqResources,
new FastCompactionPerformer(true),
0,
0);
task.start();
for (TsFileResource resource : seqResources) {
resource.resetModFile();
Assert.assertFalse(resource.getModFile().exists());
}
for (TsFileResource resource : unseqResources) {
resource.resetModFile();
Assert.assertFalse(resource.getModFile().exists());
}
for (TsFileResource resource : targetResources) {
resource.setFile(
new File(
resource
.getTsFilePath()
.replace(CROSS_COMPACTION_TMP_FILE_SUFFIX, TsFileConstant.TSFILE_SUFFIX)));
resource.resetModFile();
Assert.assertTrue(resource.getModFile().exists());
Assert.assertEquals(4, resource.getModFile().getModifications().size());
}
FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
i++) {
for (int j = 0; j < 5; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsFilesReader =
new SeriesDataBlockReader(
path,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
tsFileManager.getTsFileList(true),
new ArrayList<>(),
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
TsBlock batchData = tsFilesReader.nextBatch();
for (int readIndex = 0, size = batchData.getPositionCount();
readIndex < size;
readIndex++) {
long currentTime = batchData.getTimeByIndex(readIndex);
long currentValue = batchData.getColumn(0).getLong(readIndex);
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
&& ((450 <= currentTime && currentTime < 550)
|| (550 <= currentTime && currentTime < 650))) {
assertEquals(currentTime + 20000, currentValue);
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
&& ((20 <= currentTime && currentTime < 220)
|| (250 <= currentTime && currentTime < 450)
|| (480 <= currentTime && currentTime < 680))) {
assertEquals(currentTime + 10000, currentValue);
} else {
assertEquals(currentTime, currentValue);
}
count++;
}
}
tsFilesReader.close();
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j == 4)) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
assertEquals(1230, count);
} else if (i == TsFileGeneratorUtils.getAlignDeviceOffset()) {
assertEquals(800, count);
} else if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j == 4)) {
assertEquals(600, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
/**
* Total 4 seq files and 5 unseq files, each file has different aligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
* is 0 ~ 299 and 350 ~ 649.<br>
* third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
* range is 700 ~ 999 and 1050 ~ 1349.<br>
*
* <p>UnSeq files<br>
* first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
* ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*
* <p>The data of d0, d1 and d2 is deleted in each file. The first target file is empty.
*/
@Test
public void testAlignedCrossSpaceCompactionWithAllDataDeletedInOneTargetFile() throws Exception {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
// generate mods file
List<String> seriesPaths = new ArrayList<>();
for (int i = 0; i < 5; i++) {
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ TsFileGeneratorUtils.getAlignDeviceOffset()
+ PATH_SEPARATOR
+ "s"
+ i);
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)
+ PATH_SEPARATOR
+ "s"
+ i);
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)
+ PATH_SEPARATOR
+ "s"
+ i);
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s" + i);
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" + PATH_SEPARATOR + "s" + i);
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2" + PATH_SEPARATOR + "s" + i);
}
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE, false);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE, false);
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE, true);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE, true);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
i++) {
for (int j = 0; j < 5; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsFilesReader =
new SeriesDataBlockReader(
path,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
TsBlock batchData = tsFilesReader.nextBatch();
for (int readIndex = 0, size = batchData.getPositionCount();
readIndex < size;
readIndex++) {
long currentTime = batchData.getTimeByIndex(readIndex);
long currentValue = batchData.getColumn(0).getLong(readIndex);
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
&& ((450 <= currentTime && currentTime < 550)
|| (550 <= currentTime && currentTime < 650))) {
assertEquals(currentTime + 20000, currentValue);
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
&& ((20 <= currentTime && currentTime < 220)
|| (250 <= currentTime && currentTime < 450)
|| (480 <= currentTime && currentTime < 680))) {
assertEquals(currentTime + 10000, currentValue);
} else {
assertEquals(currentTime, currentValue);
}
count++;
}
}
tsFilesReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
}
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset())
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2)) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
assertEquals(1230, count);
} else if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j == 4)) {
assertEquals(600, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
TsFileManager tsFileManager =
new TsFileManager(COMPACTION_TEST_SG, "0", STORAGE_GROUP_DIR.getPath());
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
for (TsFileResource resource : seqResources) {
Assert.assertTrue(resource.getModFile().exists());
}
for (TsFileResource resource : unseqResources) {
Assert.assertTrue(resource.getModFile().exists());
}
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
tsFileManager,
seqResources,
unseqResources,
new FastCompactionPerformer(true),
0,
0);
task.start();
for (TsFileResource resource : targetResources) {
resource.setFile(
new File(
resource
.getTsFilePath()
.replace(CROSS_COMPACTION_TMP_FILE_SUFFIX, TsFileConstant.TSFILE_SUFFIX)));
if (!resource.getTsFile().exists()) {
continue;
}
Assert.assertTrue(resource.getModFile().exists());
Assert.assertEquals(30, resource.getModFile().getModifications().size());
}
FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
i++) {
for (int j = 0; j < 5; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsFilesReader =
new SeriesDataBlockReader(
path,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
tsFileManager.getTsFileList(true),
new ArrayList<>(),
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
TsBlock batchData = tsFilesReader.nextBatch();
for (int readIndex = 0, size = batchData.getPositionCount();
readIndex < size;
readIndex++) {
long currentTime = batchData.getTimeByIndex(readIndex);
long currentValue = batchData.getColumn(0).getLong(readIndex);
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
&& ((450 <= currentTime && currentTime < 550)
|| (550 <= currentTime && currentTime < 650))) {
assertEquals(currentTime + 20000, currentValue);
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
&& ((20 <= currentTime && currentTime < 220)
|| (250 <= currentTime && currentTime < 450)
|| (480 <= currentTime && currentTime < 680))) {
assertEquals(currentTime + 10000, currentValue);
} else {
assertEquals(currentTime, currentValue);
}
count++;
}
}
tsFilesReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
}
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset())
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2)) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
assertEquals(1230, count);
} else if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j == 4)) {
assertEquals(600, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
/**
* Total 4 seq files and 5 unseq files, each file has different aligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
* is 0 ~ 299 and 350 ~ 649.<br>
* third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
* range is 700 ~ 999 and 1050 ~ 1349.<br>
*
* <p>UnSeq files<br>
* first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
* ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*
* <p>The data of d3.s0 is deleted. Test when there is a deletion to the file before compaction,
* then comes to a deletion during compaction.
*/
@Test
public void testOneDeletionDuringCompaction() throws Exception {
DataRegion vsgp =
new DataRegion(
STORAGE_GROUP_DIR.getPath(),
"0",
new TsFileFlushPolicy.DirectFlushPolicy(),
COMPACTION_TEST_SG);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
vsgp.getTsFileResourceManager().addAll(seqResources, true);
vsgp.getTsFileResourceManager().addAll(unseqResources, false);
vsgp.deleteByDevice(
new PartialPath(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)
+ PATH_SEPARATOR
+ "s0"),
0,
1000,
0);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
vsgp.getTsFileResourceManager(),
seqResources,
unseqResources,
new FastCompactionPerformer(true),
0,
0);
task.setSourceFilesToCompactionCandidate();
seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
// delete data in source file during compaction
vsgp.deleteByDevice(
new PartialPath(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)
+ PATH_SEPARATOR
+ "s0"),
0,
1200,
0);
for (int i = 0; i < seqResources.size(); i++) {
TsFileResource resource = seqResources.get(i);
resource.resetModFile();
if (i < 2) {
Assert.assertFalse(resource.getCompactionModFile().exists());
Assert.assertFalse(resource.getModFile().exists());
} else if (i == 2) {
Assert.assertTrue(resource.getCompactionModFile().exists());
Assert.assertTrue(resource.getModFile().exists());
Assert.assertEquals(2, resource.getModFile().getModifications().size());
Assert.assertEquals(1, resource.getCompactionModFile().getModifications().size());
} else {
Assert.assertTrue(resource.getCompactionModFile().exists());
Assert.assertTrue(resource.getModFile().exists());
Assert.assertEquals(1, resource.getModFile().getModifications().size());
Assert.assertEquals(1, resource.getCompactionModFile().getModifications().size());
}
}
for (TsFileResource resource : unseqResources) {
resource.resetModFile();
Assert.assertFalse(resource.getCompactionModFile().exists());
Assert.assertFalse(resource.getModFile().exists());
}
task.start();
for (TsFileResource resource : seqResources) {
Assert.assertFalse(resource.getTsFile().exists());
Assert.assertFalse(resource.getModFile().exists());
Assert.assertFalse(resource.getCompactionModFile().exists());
}
for (TsFileResource resource : unseqResources) {
Assert.assertFalse(resource.getTsFile().exists());
Assert.assertFalse(resource.getModFile().exists());
Assert.assertFalse(resource.getCompactionModFile().exists());
}
for (int i = 0; i < seqResources.size(); i++) {
TsFileResource seqResource = seqResources.get(i);
TsFileResource resource =
new TsFileResource(
TsFileNameGenerator.increaseCrossCompactionCnt(seqResource.getTsFile()));
if (i < 2) {
Assert.assertFalse(resource.getCompactionModFile().exists());
Assert.assertFalse(resource.getModFile().exists());
} else {
Assert.assertFalse(resource.getCompactionModFile().exists());
Assert.assertTrue(resource.getModFile().exists());
Assert.assertEquals(1, resource.getModFile().getModifications().size());
}
}
}
/**
* Total 4 seq files and 5 unseq files, each file has different aligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
* is 0 ~ 299 and 350 ~ 649.<br>
* third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
* range is 700 ~ 999 and 1050 ~ 1349.<br>
*
* <p>UnSeq files<br>
* first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
* ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*
* <p>The data of d3.s0 is deleted. Test when there is a deletion to the file before compaction,
* then comes to serveral deletions during compaction.
*/
@Test
public void testSeveralDeletionsDuringCompaction() throws Exception {
DataRegion vsgp =
new DataRegion(
STORAGE_GROUP_DIR.getPath(),
"0",
new TsFileFlushPolicy.DirectFlushPolicy(),
COMPACTION_TEST_SG);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
vsgp.getTsFileResourceManager().addAll(seqResources, true);
vsgp.getTsFileResourceManager().addAll(unseqResources, false);
vsgp.deleteByDevice(
new PartialPath(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)
+ PATH_SEPARATOR
+ "s0"),
0,
1000,
0);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
vsgp.getTsFileResourceManager(),
seqResources,
unseqResources,
new FastCompactionPerformer(true),
0,
0);
task.setSourceFilesToCompactionCandidate();
seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
// delete data in source file during compaction
vsgp.deleteByDevice(
new PartialPath(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)
+ PATH_SEPARATOR
+ "s0"),
0,
1200,
0);
vsgp.deleteByDevice(
new PartialPath(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)
+ PATH_SEPARATOR
+ "s0"),
0,
1800,
0);
for (int i = 0; i < seqResources.size(); i++) {
TsFileResource resource = seqResources.get(i);
resource.resetModFile();
if (i < 2) {
Assert.assertFalse(resource.getCompactionModFile().exists());
Assert.assertFalse(resource.getModFile().exists());
} else if (i == 2) {
Assert.assertTrue(resource.getCompactionModFile().exists());
Assert.assertTrue(resource.getModFile().exists());
Assert.assertEquals(3, resource.getModFile().getModifications().size());
Assert.assertEquals(2, resource.getCompactionModFile().getModifications().size());
} else {
Assert.assertTrue(resource.getCompactionModFile().exists());
Assert.assertTrue(resource.getModFile().exists());
Assert.assertEquals(2, resource.getModFile().getModifications().size());
Assert.assertEquals(2, resource.getCompactionModFile().getModifications().size());
}
}
for (TsFileResource resource : unseqResources) {
resource.resetModFile();
Assert.assertFalse(resource.getCompactionModFile().exists());
Assert.assertFalse(resource.getModFile().exists());
}
task.start();
for (TsFileResource resource : seqResources) {
Assert.assertFalse(resource.getTsFile().exists());
Assert.assertFalse(resource.getModFile().exists());
Assert.assertFalse(resource.getCompactionModFile().exists());
}
for (TsFileResource resource : unseqResources) {
Assert.assertFalse(resource.getTsFile().exists());
Assert.assertFalse(resource.getModFile().exists());
Assert.assertFalse(resource.getCompactionModFile().exists());
}
for (int i = 0; i < seqResources.size(); i++) {
TsFileResource seqResource = seqResources.get(i);
TsFileResource resource =
new TsFileResource(
TsFileNameGenerator.increaseCrossCompactionCnt(seqResource.getTsFile()));
if (i < 2) {
Assert.assertFalse(resource.getCompactionModFile().exists());
Assert.assertFalse(resource.getModFile().exists());
} else {
Assert.assertFalse(resource.getCompactionModFile().exists());
Assert.assertTrue(resource.getModFile().exists());
Assert.assertEquals(2, resource.getModFile().getModifications().size());
}
}
}
private void generateModsFile(
List<String> seriesPaths,
List<TsFileResource> resources,
long startValue,
long endValue,
boolean isCompactionMods)
throws IllegalPathException, IOException {
for (TsFileResource resource : resources) {
Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
for (String path : seriesPaths) {
deleteMap.put(path, new Pair<>(startValue, endValue));
}
CompactionFileGeneratorUtils.generateMods(deleteMap, resource, isCompactionMods);
}
}
}