blob: 5bd12b8d0d1ea79c6626eeafb5d07f0d4ba9822a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.constant.CompactionPriority;
import org.apache.iotdb.db.engine.compaction.utils.CompactionClearUtils;
import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class CompactionSchedulerTest {
private static final Logger logger = LoggerFactory.getLogger(CompactionSchedulerTest.class);
static final String COMPACTION_TEST_SG = "root.compactionSchedulerTest_";
private static final boolean oldEnableInnerSeqCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
private static final boolean oldEnableInnerUnseqCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
private static final boolean oldEnableCrossCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
static final long MAX_WAITING_TIME = 60_000;
static final long SCHEDULE_AGAIN_TIME = 30_000;
static final String[] fullPaths =
new String[] {
".device0.sensor0",
".device0.sensor1",
".device0.sensor2",
".device0.sensor3",
".device0.sensor4",
".device0.sensor5",
".device0.sensor6",
".device0.sensor7",
".device0.sensor8",
".device0.sensor9",
".device1.sensor0",
".device1.sensor1",
".device1.sensor2",
".device1.sensor3",
".device1.sensor4",
};
@Before
public void setUp() throws MetadataException, IOException {
CompactionClearUtils.clearAllCompactionFiles();
EnvironmentUtils.cleanAllDir();
IoTDB.configManager.init();
File basicOutputDir = new File(TestConstant.BASE_OUTPUT_PATH);
IoTDBDescriptor.getInstance().getConfig().setCompactionPriority(CompactionPriority.INNER_CROSS);
if (!basicOutputDir.exists()) {
assertTrue(basicOutputDir.mkdirs());
}
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
CompactionTaskManager.getInstance().start();
while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
@After
public void tearDown() throws IOException, StorageEngineException {
new CompactionConfigRestorer().restoreCompactionConfig();
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
IoTDB.configManager.clear();
CompactionClearUtils.clearAllCompactionFiles();
EnvironmentUtils.cleanAllDir();
CompactionClearUtils.deleteEmptyDir(new File("target"));
CompactionTaskManager.getInstance().stop();
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(oldEnableInnerSeqCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(oldEnableInnerUnseqCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableCrossSpaceCompaction(oldEnableCrossCompaction);
}
/**
* enable_seq_space_compaction=true enable_unseq_space_compaction=true
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=100
*/
@Test
public void test1() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test1");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(50);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(100);
IoTDBDescriptor.getInstance()
.getConfig()
.setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
String sgName = COMPACTION_TEST_SG + "test1";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 1) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
}
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 1) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
/**
* enable_seq_space_compaction=false enable_unseq_space_compaction=true
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=100
*/
@Test
public void test2() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test2");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(50);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(100);
long origin = SystemInfo.getInstance().getMemorySizeForCompaction();
SystemInfo.getInstance()
.setMemorySizeForCompaction(
2
* 1024
* 1024L
* 1024L
* IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread());
try {
String sgName = COMPACTION_TEST_SG + "test2";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 1) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
Thread.sleep(10);
totalWaitingTime += 10;
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
if (totalWaitingTime % 10_000 == 0) {
logger.warn(
"sequence file num is {}, unsequence file num is {}",
tsFileManager.getTsFileList(true).size(),
tsFileManager.getTsFileList(false).size());
}
if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
} finally {
SystemInfo.getInstance().setMemorySizeForCompaction(origin);
}
}
/**
* enable_seq_space_compaction=true enable_unseq_space_compaction=false
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=100
*/
@Test
public void test3() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test3");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(50);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(100);
String sgName = COMPACTION_TEST_SG + "test3";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 1) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(0, tsFileManager.getTsFileList(false).size());
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(1, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
/**
* enable_seq_space_compaction=false enable_unseq_space_compaction=false
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=100
*/
@Test
public void test4() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test4");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(50);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(100);
IoTDBDescriptor.getInstance()
.getConfig()
.setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
String sgName = COMPACTION_TEST_SG + "test4";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
/**
* enable_seq_space_compaction=true enable_unseq_space_compaction=true
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=100
*/
@Test
public void test5() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test5");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(1);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(100);
String sgName = COMPACTION_TEST_SG + "test5";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 1) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 1) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
/**
* enable_seq_space_compaction=false enable_unseq_space_compaction=true
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=100
*/
@Test
public void test6() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test6");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(1);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(100);
String sgName = COMPACTION_TEST_SG + "test6";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 1) {
assertEquals(100, tsFileManager.getTsFileList(true).size());
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
assertEquals(100, tsFileManager.getTsFileList(true).size());
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
/**
* enable_seq_space_compaction=true enable_unseq_space_compaction=false
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=100
*/
@Test
public void test7() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test7");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(1);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(100);
String sgName = COMPACTION_TEST_SG + "test7";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 1) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
/**
* enable_seq_space_compaction=false enable_unseq_space_compaction=false
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=100
*/
@Test
public void test8() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test8");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(1);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(100);
String sgName = COMPACTION_TEST_SG + "test8";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
/**
* enable_seq_space_compaction=true enable_unseq_space_compaction=true
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=2
*/
@Test
public void test9() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test9");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(50);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
String sgName = COMPACTION_TEST_SG + "test9";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 50) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 25) {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
}
}
assertTrue(tsFileManager.getTsFileList(true).size() <= 25);
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
/**
* enable_seq_space_compaction=false enable_unseq_space_compaction=true
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=2
*/
@Test
public void test10() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test10");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
boolean prevEnableCrossCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(50);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
String sgName = COMPACTION_TEST_SG + "test10";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 50) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 25) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableCrossSpaceCompaction(prevEnableCrossCompaction);
}
}
/**
* enable_seq_space_compaction=true enable_unseq_space_compaction=false
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=2
*/
@Test
public void test11() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test11");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(50);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
String sgName = COMPACTION_TEST_SG + "test11";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 50) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 25) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
/**
* enable_seq_space_compaction=false enable_unseq_space_compaction=false
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=2
*/
@Test
public void test12() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test12");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(50);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
String sgName = COMPACTION_TEST_SG + "test12";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 98) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 96) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
/**
* enable_seq_space_compaction=false enable_unseq_space_compaction=true
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=2
*/
@Test
public void test14() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test14");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(1);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
String sgName = COMPACTION_TEST_SG + "test13";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
assertEquals(100, tsFileManager.getTsFileList(true).size());
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 99) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
assertEquals(100, tsFileManager.getTsFileList(true).size());
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 98) {
try {
Thread.sleep(100);
assertEquals(100, tsFileManager.getTsFileList(true).size());
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
/**
* enable_seq_space_compaction=true enable_unseq_space_compaction=false
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=2
*/
@Test
public void test15() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test15");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(1);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
String sgName = COMPACTION_TEST_SG + "test14";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 99) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 98) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
/**
* enable_seq_space_compaction=false enable_unseq_space_compaction=false
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=2
*/
@Test
public void test16() throws IOException, MetadataException, InterruptedException {
logger.warn("Running test16");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
boolean prevEnableUnseqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
int prevCompactionConcurrentThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(1);
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
String sgName = COMPACTION_TEST_SG + "test16";
try {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
} catch (Exception e) {
logger.error("exception occurs", e);
}
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
Set<String> fullPath = new HashSet<>();
for (String device : fullPaths) {
fullPath.add(sgName + device);
PartialPath path = new PartialPath(sgName + device);
IoTDB.schemaProcessor.createTimeseries(
path,
TSDataType.INT64,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
tsFileManager.add(tsFileResource, true);
}
for (int i = 0; i < 100; i++) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource tsFileResource =
CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
CompactionFileGeneratorUtils.writeTsFile(
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 98) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 96) {
try {
Thread.sleep(100);
totalWaitingTime += 100;
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
IoTDBDescriptor.getInstance()
.getConfig()
.setConcurrentCompactionThread(prevCompactionConcurrentThread);
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
public void stopCompactionTaskManager() {
CompactionTaskManager.getInstance().clearCandidateQueue();
while (CompactionTaskManager.getInstance().getRunningCompactionTaskList().size() > 0) {
try {
Thread.sleep(10);
} catch (Exception e) {
}
}
}
}