blob: 5a7fe53dcf38b1379f0703558c06556ca12a733a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.cross;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache;
import org.apache.iotdb.db.storageengine.buffer.ChunkCache;
import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.RewriteCrossSpaceCompactionSelector;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossCompactionTaskResource;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossSpaceCompactionCandidate;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionCheckerUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionClearUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionConfigRestorer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTimeseriesType;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Pair;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.fail;
public class CrossSpaceCompactionWithFastPerformerTest {
private final String oldThreadName = Thread.currentThread().getName();
int index = 0;
static final String COMPACTION_TEST_SG = "root.compactionTest";
static final String[] fullPaths =
new String[] {
COMPACTION_TEST_SG + ".device0.sensor0",
COMPACTION_TEST_SG + ".device0.sensor1",
COMPACTION_TEST_SG + ".device0.sensor2",
COMPACTION_TEST_SG + ".device0.sensor3",
COMPACTION_TEST_SG + ".device0.sensor4",
COMPACTION_TEST_SG + ".device0.sensor5",
COMPACTION_TEST_SG + ".device0.sensor6",
COMPACTION_TEST_SG + ".device0.sensor7",
COMPACTION_TEST_SG + ".device0.sensor8",
COMPACTION_TEST_SG + ".device0.sensor9",
COMPACTION_TEST_SG + ".device1.sensor0",
COMPACTION_TEST_SG + ".device1.sensor1",
COMPACTION_TEST_SG + ".device1.sensor2",
COMPACTION_TEST_SG + ".device1.sensor3",
COMPACTION_TEST_SG + ".device1.sensor4",
};
static final CompactionTimeseriesType[] compactionTimeseriesTypes =
new CompactionTimeseriesType[] {
CompactionTimeseriesType.ALL_SAME,
CompactionTimeseriesType.PART_SAME,
CompactionTimeseriesType.NO_SAME
};
static final boolean[] compactionBeforeHasMods = new boolean[] {true, false};
static final boolean[] compactionHasMods = new boolean[] {true, false};
@Before
public void setUp() throws MetadataException {
IoTDBDescriptor.getInstance().getConfig().setMinCrossCompactionUnseqFileLevel(0);
CompactionTaskManager.getInstance().start();
Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
}
@After
public void tearDown() throws IOException, StorageEngineException {
CompactionClearUtils.clearAllCompactionFiles();
CompactionClearUtils.deleteEmptyDir(new File("target"));
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
BloomFilterCache.getInstance().clear();
CompactionTaskManager.getInstance().stop();
EnvironmentUtils.cleanAllDir();
Thread.currentThread().setName(oldThreadName);
new CompactionConfigRestorer().restoreCompactionConfig();
}
// test one seq file overlaps with six unseq files with six type of relation(Contains, In,
// Prefix-Overlap, Suffix-Overlap, Prefix-Non-Overlap, Suffix-Non-Overlap). While some timestamp
// of timeseries in unseq files may later than seq files.
@Test
public void testOneSeqFileAndSixUnseqFile() throws Exception {
for (CompactionTimeseriesType compactionTimeseriesType : compactionTimeseriesTypes) {
for (boolean compactionBeforeHasMod : compactionBeforeHasMods) {
for (boolean compactionHasMod : compactionHasMods) {
// generate seq file
List<TsFileResource> seqResources = new ArrayList<>();
Set<String> fullPath;
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[0]);
fullPath.add(fullPaths[1]);
fullPath.add(fullPaths[2]);
fullPath.add(fullPaths[3]);
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[14]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[0]);
fullPath.add(fullPaths[14]);
}
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
pagePointsNum.add(500L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, 1, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 2000L, tsFileResource);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[0], new Pair<>(2000L, 2500L));
CompactionFileGeneratorUtils.generateMods(
toDeleteTimeseriesAndTime, tsFileResource, false);
}
seqResources.add(tsFileResource);
List<TsFileResource> unseqResources = new ArrayList<>();
// unseq file with Contains relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[1]);
fullPath.add(fullPaths[2]);
fullPath.add(fullPaths[3]);
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[1]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
pagePointsNum.add(50L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource unseqFile1 =
CompactionFileGeneratorUtils.generateTsFileResource(false, 1, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 3000L, unseqFile1);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[1], new Pair<>(3000L, 3100L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, unseqFile1, false);
}
unseqResources.add(unseqFile1);
// unseq file with In relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[2]);
fullPath.add(fullPaths[3]);
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[9]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[2]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(2000L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(2000L);
pagePointsNum.add(1000L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource unseqFile2 =
CompactionFileGeneratorUtils.generateTsFileResource(false, 2, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 1000L, unseqFile2);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[2], new Pair<>(1000L, 1100L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, unseqFile2, false);
}
unseqResources.add(unseqFile2);
// unseq file with Prefix-Overlap relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[3]);
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[9]);
fullPath.add(fullPaths[10]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[3]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
pagePointsNum.add(500L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource unseqFile3 =
CompactionFileGeneratorUtils.generateTsFileResource(false, 3, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 1000L, unseqFile3);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[3], new Pair<>(1100L, 1200L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, unseqFile3, false);
}
unseqResources.add(unseqFile3);
// unseq file with Suffix-Overlap relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[9]);
fullPath.add(fullPaths[10]);
fullPath.add(fullPaths[11]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[4]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
pagePointsNum.add(500L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource unseqFile4 =
CompactionFileGeneratorUtils.generateTsFileResource(false, 4, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 3000L, unseqFile4);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[4], new Pair<>(3100L, 3200L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, unseqFile4, false);
}
unseqResources.add(unseqFile4);
// unseq file with Prefix-Non-Overlap relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[9]);
fullPath.add(fullPaths[10]);
fullPath.add(fullPaths[11]);
fullPath.add(fullPaths[12]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[5]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
pagePointsNum.add(50L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource unseqFile5 =
CompactionFileGeneratorUtils.generateTsFileResource(false, 5, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 0L, unseqFile5);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[5], new Pair<>(0L, 100L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, unseqFile5, false);
}
unseqResources.add(unseqFile5);
// unseq file with Suffix-Non-Overlap relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[9]);
fullPath.add(fullPaths[10]);
fullPath.add(fullPaths[11]);
fullPath.add(fullPaths[12]);
fullPath.add(fullPaths[13]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[6]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
pagePointsNum.add(50L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource unseqFile6 =
CompactionFileGeneratorUtils.generateTsFileResource(false, 6, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 5000L, unseqFile6);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[5], new Pair<>(5000L, 5100L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, unseqFile5, false);
}
unseqResources.add(unseqFile6);
List<TsFileResource> sourceResources = new ArrayList<>(seqResources);
sourceResources.addAll(unseqResources);
Map<String, List<TimeValuePair>> sourceData =
CompactionCheckerUtils.readFiles(sourceResources);
if (compactionHasMod) {
// seq mods
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[1], new Pair<>(250L, 300L));
CompactionFileGeneratorUtils.generateMods(
toDeleteTimeseriesAndTime, seqResources.get(0), true);
// unseq mods
toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[1], new Pair<>(0L, 100L));
CompactionFileGeneratorUtils.generateMods(
toDeleteTimeseriesAndTime, unseqResources.get(5), true);
// remove data in source data list
List<TimeValuePair> timeValuePairs = sourceData.get(fullPaths[1]);
timeValuePairs.removeIf(
timeValuePair ->
timeValuePair.getTimestamp() >= 250L && timeValuePair.getTimestamp() <= 300L);
timeValuePairs.removeIf(
timeValuePair ->
timeValuePair.getTimestamp() >= 0L && timeValuePair.getTimestamp() <= 100L);
}
TsFileResourceList seqTsFileResourceList = new TsFileResourceList();
seqTsFileResourceList.addAll(seqResources);
TsFileResourceList unseqTsFileResourceList = new TsFileResourceList();
unseqTsFileResourceList.addAll(unseqResources);
long timeLowerBound = System.currentTimeMillis() - Long.MAX_VALUE;
CrossSpaceCompactionCandidate mergeResource =
new CrossSpaceCompactionCandidate(
seqTsFileResourceList, unseqTsFileResourceList, timeLowerBound);
RewriteCrossSpaceCompactionSelector selector =
new RewriteCrossSpaceCompactionSelector("", "", 0, null);
List<CrossCompactionTaskResource> selected =
selector.selectCrossSpaceTask(seqTsFileResourceList, unseqTsFileResourceList);
index++;
if (selected.size() > 0) {
AbstractCompactionTask compactionTask =
new CrossSpaceCompactionTask(
0,
getTsFileManager(),
mergeResource.getSeqFiles(),
mergeResource.getUnseqFiles(),
new FastCompactionPerformer(true),
0,
0);
Assert.assertTrue(compactionTask.start());
List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
for (TsFileResource seqResource : seqResources) {
TsFileResource targetResource =
new TsFileResource(
TsFileNameGenerator.increaseCrossCompactionCnt(seqResource).getTsFile());
targetResource.deserialize();
targetResource.setStatusForTest(TsFileResourceStatus.NORMAL);
targetTsfileResourceList.add(targetResource);
}
CompactionCheckerUtils.checkDataAndResource(sourceData, targetTsfileResourceList);
CompactionClearUtils.clearAllCompactionFiles();
} else {
fail();
}
}
}
}
}
// test five seq files overlaps with one unseq file with five type of relation(Contains,
// Prefix-Overlap, Suffix-Overlap, Prefix-Non-Overlap, Suffix-Non-Overlap). Seq files do not have
// device d1.
@Test
public void testFiveSeqFileAndOneUnseqFileWithSomeDeviceNotInSeqFiles() throws Exception {
for (CompactionTimeseriesType compactionTimeseriesType : compactionTimeseriesTypes) {
for (boolean compactionBeforeHasMod : compactionBeforeHasMods) {
for (boolean compactionHasMod : compactionHasMods) {
// unseq file
List<TsFileResource> unseqResources = new ArrayList<>();
Set<String> fullPath;
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[0]);
fullPath.add(fullPaths[1]);
fullPath.add(fullPaths[2]);
fullPath.add(fullPaths[3]);
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[1]);
}
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
pagePointsNum.add(500L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, 1, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 2000L, tsFileResource);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[0], new Pair<>(2000L, 2500L));
CompactionFileGeneratorUtils.generateMods(
toDeleteTimeseriesAndTime, tsFileResource, false);
}
unseqResources.add(tsFileResource);
// seq file with Prefix-Non-Overlap relation
List<TsFileResource> seqResources = new ArrayList<>();
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[1]);
fullPath.add(fullPaths[2]);
fullPath.add(fullPaths[3]);
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[2]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
pagePointsNum.add(50L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource seqFile1 =
CompactionFileGeneratorUtils.generateTsFileResource(true, 1, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 0L, seqFile1);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[0], new Pair<>(0L, 25L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, seqFile1, false);
}
seqResources.add(seqFile1);
// seq file with Prefix-Overlap relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[2]);
fullPath.add(fullPaths[3]);
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[9]);
fullPath.add(fullPaths[14]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[0]);
fullPath.add(fullPaths[14]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
pagePointsNum.add(500L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource seqFile2 =
CompactionFileGeneratorUtils.generateTsFileResource(true, 2, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 1000L, seqFile2);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[1], new Pair<>(3000L, 3100L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, seqFile2, false);
}
seqResources.add(seqFile2);
// seq file with Contains relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[3]);
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[9]);
fullPath.add(fullPaths[10]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[3]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
pagePointsNum.add(50L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource seqFile3 =
CompactionFileGeneratorUtils.generateTsFileResource(true, 3, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 3500L, seqFile3);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[3], new Pair<>(3500L, 3525L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, seqFile3, false);
}
seqResources.add(seqFile3);
// seq file with Suffix-Overlap relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[9]);
fullPath.add(fullPaths[10]);
fullPath.add(fullPaths[11]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[4]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
pagePointsNum.add(500L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource seqFile4 =
CompactionFileGeneratorUtils.generateTsFileResource(true, 4, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 3750L, seqFile4);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[4], new Pair<>(3800L, 3900L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, seqFile4, false);
}
seqResources.add(seqFile4);
// seq file with Suffix-Non-Overlap relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[9]);
fullPath.add(fullPaths[10]);
fullPath.add(fullPaths[11]);
fullPath.add(fullPaths[12]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[5]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
pagePointsNum.add(500L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource seqFile5 =
CompactionFileGeneratorUtils.generateTsFileResource(true, 5, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 6250L, seqFile5);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[5], new Pair<>(6300L, 6400L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, seqFile5, false);
}
seqResources.add(seqFile5);
List<TsFileResource> sourceResources = new ArrayList<>(unseqResources);
for (int i = 1; i < 4; i++) {
sourceResources.add(seqResources.get(i));
}
Map<String, List<TimeValuePair>> sourceData =
CompactionCheckerUtils.readFiles(sourceResources);
if (compactionHasMod) {
// unseq mods
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[1], new Pair<>(2500L, 2600L));
CompactionFileGeneratorUtils.generateMods(
toDeleteTimeseriesAndTime, unseqResources.get(0), true);
// seq mods
toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[1], new Pair<>(0L, 100L));
CompactionFileGeneratorUtils.generateMods(
toDeleteTimeseriesAndTime, seqResources.get(0), true);
// remove data in source data list
List<TimeValuePair> timeValuePairs = sourceData.get(fullPaths[1]);
timeValuePairs.removeIf(
timeValuePair ->
timeValuePair.getTimestamp() >= 2500L && timeValuePair.getTimestamp() <= 2600L);
timeValuePairs.removeIf(
timeValuePair ->
timeValuePair.getTimestamp() >= 0L && timeValuePair.getTimestamp() <= 100L);
}
TsFileResourceList seqTsFileResourceList = new TsFileResourceList();
seqTsFileResourceList.addAll(seqResources);
TsFileResourceList unseqTsFileResourceList = new TsFileResourceList();
unseqTsFileResourceList.addAll(unseqResources);
long timeLowerBound = System.currentTimeMillis() - Long.MAX_VALUE;
CrossSpaceCompactionCandidate mergeResource =
new CrossSpaceCompactionCandidate(
seqTsFileResourceList, unseqTsFileResourceList, timeLowerBound);
RewriteCrossSpaceCompactionSelector selector =
new RewriteCrossSpaceCompactionSelector("", "", 0, null);
List<CrossCompactionTaskResource> selected =
selector.selectCrossSpaceTask(seqTsFileResourceList, unseqTsFileResourceList);
if (selected.size() > 0) {
AbstractCompactionTask compactionTask =
new CrossSpaceCompactionTask(
0,
getTsFileManager(),
mergeResource.getSeqFiles(),
mergeResource.getUnseqFiles(),
new FastCompactionPerformer(true),
0,
0);
Assert.assertTrue(compactionTask.start());
List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
for (TsFileResource seqResource : seqResources.subList(1, 4)) {
TsFileResource targetResource =
new TsFileResource(
TsFileNameGenerator.increaseCrossCompactionCnt(seqResource).getTsFile());
targetResource.deserialize();
targetResource.setStatusForTest(TsFileResourceStatus.NORMAL);
targetTsfileResourceList.add(targetResource);
}
CompactionCheckerUtils.checkDataAndResource(sourceData, targetTsfileResourceList);
CompactionClearUtils.clearAllCompactionFiles();
} else {
fail();
}
}
}
}
}
// test five seq files overlaps with one unseq file with five type of relation(Contains,
// Prefix-Overlap, Suffix-Overlap, Prefix-Non-Overlap, Suffix-Non-Overlap)
@Test
public void testFiveSeqFileAndOneUnseqFile() throws Exception {
for (CompactionTimeseriesType compactionTimeseriesType : compactionTimeseriesTypes) {
for (boolean compactionBeforeHasMod : compactionBeforeHasMods) {
for (boolean compactionHasMod : compactionHasMods) {
// unseq file
List<TsFileResource> unseqResources = new ArrayList<>();
Set<String> fullPath;
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[0]);
fullPath.add(fullPaths[1]);
fullPath.add(fullPaths[2]);
fullPath.add(fullPaths[3]);
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[1]);
}
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
pagePointsNum.add(500L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, 1, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 2000L, tsFileResource);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[0], new Pair<>(2000L, 2500L));
CompactionFileGeneratorUtils.generateMods(
toDeleteTimeseriesAndTime, tsFileResource, false);
}
unseqResources.add(tsFileResource);
// seq file with Prefix-Non-Overlap relation
List<TsFileResource> seqResources = new ArrayList<>();
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[1]);
fullPath.add(fullPaths[2]);
fullPath.add(fullPaths[3]);
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[14]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[0]);
fullPath.add(fullPaths[14]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
pagePointsNum.add(50L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource seqFile1 =
CompactionFileGeneratorUtils.generateTsFileResource(true, 1, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 0L, seqFile1);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[0], new Pair<>(0L, 25L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, seqFile1, false);
}
seqResources.add(seqFile1);
// seq file with Prefix-Overlap relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[2]);
fullPath.add(fullPaths[3]);
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[9]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[1]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
pagePointsNum.add(500L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource seqFile2 =
CompactionFileGeneratorUtils.generateTsFileResource(true, 2, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 1000L, seqFile2);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[1], new Pair<>(3000L, 3100L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, seqFile2, false);
}
seqResources.add(seqFile2);
// seq file with Contains relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[3]);
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[9]);
fullPath.add(fullPaths[10]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[3]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
pagePointsNum.add(50L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource seqFile3 =
CompactionFileGeneratorUtils.generateTsFileResource(true, 3, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 3500L, seqFile3);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[3], new Pair<>(3500L, 3525L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, seqFile3, false);
}
seqResources.add(seqFile3);
// seq file with Suffix-Overlap relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[4]);
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[9]);
fullPath.add(fullPaths[10]);
fullPath.add(fullPaths[11]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[4]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
pagePointsNum.add(500L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource seqFile4 =
CompactionFileGeneratorUtils.generateTsFileResource(true, 4, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 3750L, seqFile4);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[4], new Pair<>(3800L, 3900L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, seqFile4, false);
}
seqResources.add(seqFile4);
// seq file with Suffix-Non-Overlap relation
if (compactionTimeseriesType == CompactionTimeseriesType.ALL_SAME) {
fullPath = new HashSet<>(Arrays.asList(fullPaths));
} else if (compactionTimeseriesType == CompactionTimeseriesType.PART_SAME) {
fullPath = new HashSet<>();
fullPath.add(fullPaths[5]);
fullPath.add(fullPaths[6]);
fullPath.add(fullPaths[7]);
fullPath.add(fullPaths[8]);
fullPath.add(fullPaths[9]);
fullPath.add(fullPaths[10]);
fullPath.add(fullPaths[11]);
fullPath.add(fullPaths[12]);
} else {
fullPath = new HashSet<>();
fullPath.add(fullPaths[5]);
}
chunkPagePointsNum = new ArrayList<>();
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
chunkPagePointsNum.add(pagePointsNum);
pagePointsNum = new ArrayList<>();
pagePointsNum.add(1000L);
pagePointsNum.add(500L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource seqFile5 =
CompactionFileGeneratorUtils.generateTsFileResource(true, 5, COMPACTION_TEST_SG);
CompactionFileGeneratorUtils.writeTsFile(fullPath, chunkPagePointsNum, 6250L, seqFile5);
// has mods files before compaction
if (compactionBeforeHasMod) {
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[5], new Pair<>(6300L, 6400L));
CompactionFileGeneratorUtils.generateMods(toDeleteTimeseriesAndTime, seqFile5, false);
}
seqResources.add(seqFile5);
List<TsFileResource> sourceResources = new ArrayList<>(unseqResources);
for (int i = 1; i < 4; i++) {
sourceResources.add(seqResources.get(i));
}
Map<String, List<TimeValuePair>> sourceData =
CompactionCheckerUtils.readFiles(sourceResources);
if (compactionHasMod) {
// unseq mods
Map<String, Pair<Long, Long>> toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[1], new Pair<>(2500L, 2600L));
CompactionFileGeneratorUtils.generateMods(
toDeleteTimeseriesAndTime, unseqResources.get(0), true);
// seq mods
toDeleteTimeseriesAndTime = new HashMap<>();
toDeleteTimeseriesAndTime.put(fullPaths[1], new Pair<>(0L, 100L));
CompactionFileGeneratorUtils.generateMods(
toDeleteTimeseriesAndTime, seqResources.get(0), true);
// remove data in source data list
List<TimeValuePair> timeValuePairs = sourceData.get(fullPaths[1]);
timeValuePairs.removeIf(
timeValuePair ->
timeValuePair.getTimestamp() >= 2500L && timeValuePair.getTimestamp() <= 2600L);
timeValuePairs.removeIf(
timeValuePair ->
timeValuePair.getTimestamp() >= 0L && timeValuePair.getTimestamp() <= 100L);
}
TsFileResourceList seqTsFileResourceList = new TsFileResourceList();
seqTsFileResourceList.addAll(seqResources);
TsFileResourceList unseqTsFileResourceList = new TsFileResourceList();
unseqTsFileResourceList.addAll(unseqResources);
long timeLowerBound = System.currentTimeMillis() - Long.MAX_VALUE;
CrossSpaceCompactionCandidate mergeResource =
new CrossSpaceCompactionCandidate(
seqTsFileResourceList, unseqTsFileResourceList, timeLowerBound);
RewriteCrossSpaceCompactionSelector selector =
new RewriteCrossSpaceCompactionSelector("", "", 0, null);
List<CrossCompactionTaskResource> selected =
selector.selectCrossSpaceTask(seqTsFileResourceList, unseqTsFileResourceList);
if (selected.size() > 0) {
AbstractCompactionTask compactionTask =
new CrossSpaceCompactionTask(
0,
getTsFileManager(),
mergeResource.getSeqFiles(),
mergeResource.getUnseqFiles(),
new FastCompactionPerformer(true),
0,
0);
Assert.assertTrue(compactionTask.start());
List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
for (TsFileResource seqResource : seqResources.subList(1, 4)) {
TsFileResource targetResource =
new TsFileResource(
TsFileNameGenerator.increaseCrossCompactionCnt(seqResource).getTsFile());
targetResource.deserialize();
targetResource.setStatusForTest(TsFileResourceStatus.NORMAL);
targetTsfileResourceList.add(targetResource);
}
CompactionCheckerUtils.checkDataAndResource(sourceData, targetTsfileResourceList);
CompactionClearUtils.clearAllCompactionFiles();
} else {
fail();
}
}
}
}
}
private TsFileManager getTsFileManager() {
TsFileManager tsFileManager =
new TsFileManager(
"root.compactionTest",
"0",
"target\\data\\sequence\\test\\root.compactionTest\\0\\0\\");
tsFileManager.getOrCreateUnsequenceListByTimePartition(0);
tsFileManager.getOrCreateSequenceListByTimePartition(0);
return tsFileManager;
}
}