| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.engine.compaction; |
| |
| import org.apache.iotdb.commons.exception.MetadataException; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.constant.TestConstant; |
| import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer; |
| import org.apache.iotdb.db.engine.storagegroup.TsFileResource; |
| import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; |
| import org.apache.iotdb.db.exception.StorageEngineException; |
| import org.apache.iotdb.db.query.control.FileReaderManager; |
| import org.apache.iotdb.db.service.IoTDB; |
| import org.apache.iotdb.db.utils.EnvironmentUtils; |
| import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; |
| import org.apache.iotdb.tsfile.exception.write.WriteProcessException; |
| import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; |
| import org.apache.iotdb.tsfile.utils.FilePathUtils; |
| import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.junit.Assert; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| |
| import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; |
| |
| public class AbstractCompactionTest { |
| protected int seqFileNum = 5; |
| protected int unseqFileNum = 0; |
| protected List<TsFileResource> seqResources = new ArrayList<>(); |
| protected List<TsFileResource> unseqResources = new ArrayList<>(); |
| private int chunkGroupSize = 0; |
| private int pageSize = 0; |
| protected String COMPACTION_TEST_SG = TsFileGeneratorUtils.testStorageGroup; |
| private TSDataType dataType; |
| |
| private static final long oldTargetChunkSize = |
| IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); |
| private static final int oldChunkGroupSize = |
| TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte(); |
| private static final int oldPagePointSize = |
| TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage(); |
| |
| private static final int oldMaxCrossCompactionFileNum = |
| IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileNum(); |
| |
| protected static File STORAGE_GROUP_DIR = |
| new File( |
| TestConstant.BASE_OUTPUT_PATH |
| + "data" |
| + File.separator |
| + "sequence" |
| + File.separator |
| + "root.compactionTest"); |
| protected static File SEQ_DIRS = |
| new File( |
| TestConstant.BASE_OUTPUT_PATH |
| + "data" |
| + File.separator |
| + "sequence" |
| + File.separator |
| + "root.compactionTest" |
| + File.separator |
| + "0" |
| + File.separator |
| + "0"); |
| protected static File UNSEQ_DIRS = |
| new File( |
| TestConstant.BASE_OUTPUT_PATH |
| + "data" |
| + File.separator |
| + "unsequence" |
| + File.separator |
| + "root.compactionTest" |
| + File.separator |
| + "0" |
| + File.separator |
| + "0"); |
| |
| private int fileVersion = 0; |
| |
| public void setUp() throws IOException, WriteProcessException, MetadataException { |
| if (!SEQ_DIRS.exists()) { |
| Assert.assertTrue(SEQ_DIRS.mkdirs()); |
| } |
| if (!UNSEQ_DIRS.exists()) { |
| Assert.assertTrue(UNSEQ_DIRS.mkdirs()); |
| } |
| dataType = TSDataType.INT64; |
| EnvironmentUtils.envSetUp(); |
| IoTDB.configManager.init(); |
| } |
| |
| /** |
| * @param fileNum the number of file |
| * @param deviceNum device number in each file |
| * @param measurementNum measurement number in each device of each file |
| * @param pointNum data point number of each timeseries in each file |
| * @param startTime start time of each timeseries |
| * @param startValue start value of each timeseries |
| * @param timeInterval time interval of each timeseries between files |
| * @param valueInterval value interval of each timeseries between files |
| * @param isAlign when it is true, it will create mix tsfile which contains aligned and nonAligned |
| * timeseries |
| * @param isSeq |
| * @throws IOException |
| * @throws WriteProcessException |
| * @throws MetadataException |
| */ |
| protected void createFiles( |
| int fileNum, |
| int deviceNum, |
| int measurementNum, |
| int pointNum, |
| int startTime, |
| int startValue, |
| int timeInterval, |
| int valueInterval, |
| boolean isAlign, |
| boolean isSeq) |
| throws IOException, WriteProcessException, MetadataException { |
| for (int i = 0; i < fileNum; i++) { |
| String fileName = |
| System.currentTimeMillis() |
| + FilePathUtils.FILE_NAME_SEPARATOR |
| + fileVersion++ |
| + "-0-0.tsfile"; |
| String filePath; |
| if (isSeq) { |
| filePath = SEQ_DIRS.getPath() + File.separator + fileName; |
| } else { |
| filePath = UNSEQ_DIRS.getPath() + File.separator + fileName; |
| } |
| File file; |
| if (isAlign) { |
| file = |
| TsFileGeneratorUtils.generateAlignedTsFile( |
| filePath, |
| deviceNum, |
| measurementNum, |
| pointNum, |
| startTime + pointNum * i + timeInterval * i, |
| startValue + pointNum * i + valueInterval * i, |
| chunkGroupSize, |
| pageSize); |
| } else { |
| file = |
| TsFileGeneratorUtils.generateNonAlignedTsFile( |
| filePath, |
| deviceNum, |
| measurementNum, |
| pointNum, |
| startTime + pointNum * i + timeInterval * i, |
| startValue + pointNum * i + valueInterval * i, |
| chunkGroupSize, |
| pageSize); |
| } |
| addResource( |
| file, |
| deviceNum, |
| startTime + pointNum * i + timeInterval * i, |
| startTime + pointNum * i + timeInterval * i + pointNum - 1, |
| isAlign, |
| isSeq); |
| } |
| // sleep a few milliseconds to avoid generating files with same timestamps |
| try { |
| Thread.sleep(10); |
| } catch (Exception e) { |
| |
| } |
| } |
| |
| /** |
| * @param fileNum the number of file |
| * @param deviceIndexes device index in each file |
| * @param measurementIndexes measurement index in each device of each file |
| * @param pointNum data point number of each timeseries in each file |
| * @param startTime start time of each timeseries |
| * @param timeInterval time interval of each timeseries between files |
| * @param isAlign when it is true, it will create mix tsfile which contains aligned and nonAligned |
| * timeseries |
| * @param isSeq |
| */ |
| protected void createFilesWithTextValue( |
| int fileNum, |
| List<Integer> deviceIndexes, |
| List<Integer> measurementIndexes, |
| int pointNum, |
| int startTime, |
| int timeInterval, |
| boolean isAlign, |
| boolean isSeq) |
| throws IOException, WriteProcessException { |
| |
| for (int i = 0; i < fileNum; i++) { |
| String fileName = |
| System.currentTimeMillis() |
| + FilePathUtils.FILE_NAME_SEPARATOR |
| + fileVersion++ |
| + "-0-0.tsfile"; |
| String filePath; |
| if (isSeq) { |
| filePath = SEQ_DIRS.getPath() + File.separator + fileName; |
| } else { |
| filePath = UNSEQ_DIRS.getPath() + File.separator + fileName; |
| } |
| File file; |
| if (isAlign) { |
| file = |
| TsFileGeneratorUtils.generateAlignedTsFileWithTextValues( |
| filePath, |
| deviceIndexes, |
| measurementIndexes, |
| pointNum, |
| startTime + pointNum * i + timeInterval * i, |
| chunkGroupSize, |
| pageSize); |
| } else { |
| file = |
| TsFileGeneratorUtils.generateNonAlignedTsFileWithTextValues( |
| filePath, |
| deviceIndexes, |
| measurementIndexes, |
| pointNum, |
| startTime + pointNum * i + timeInterval * i, |
| chunkGroupSize, |
| pageSize); |
| } |
| // add resource |
| TsFileResource resource = new TsFileResource(file); |
| int deviceStartindex = isAlign ? TsFileGeneratorUtils.getAlignDeviceOffset() : 0; |
| for (int j = 0; j < deviceIndexes.size(); j++) { |
| resource.updateStartTime( |
| COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + (deviceIndexes.get(j) + deviceStartindex), |
| startTime + pointNum * i + timeInterval * i); |
| resource.updateEndTime( |
| COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + (deviceIndexes.get(j) + deviceStartindex), |
| startTime + pointNum * i + timeInterval * i + pointNum - 1); |
| } |
| resource.updatePlanIndexes(fileVersion); |
| resource.setStatus(TsFileResourceStatus.CLOSED); |
| resource.serialize(); |
| if (isSeq) { |
| seqResources.add(resource); |
| } else { |
| unseqResources.add(resource); |
| } |
| } |
| // sleep a few milliseconds to avoid generating files with same timestamps |
| try { |
| Thread.sleep(10); |
| } catch (Exception e) { |
| |
| } |
| } |
| |
| private void addResource( |
| File file, int deviceNum, long startTime, long endTime, boolean isAlign, boolean isSeq) |
| throws IOException { |
| TsFileResource resource = new TsFileResource(file); |
| int deviceStartindex = isAlign ? TsFileGeneratorUtils.getAlignDeviceOffset() : 0; |
| |
| for (int i = deviceStartindex; i < deviceStartindex + deviceNum; i++) { |
| resource.updateStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, startTime); |
| resource.updateEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, endTime); |
| } |
| |
| resource.updatePlanIndexes(fileVersion); |
| resource.setStatus(TsFileResourceStatus.CLOSED); |
| // resource.setTimeIndexType((byte) 0); |
| resource.serialize(); |
| if (isSeq) { |
| seqResources.add(resource); |
| } else { |
| unseqResources.add(resource); |
| } |
| } |
| |
| protected void registerTimeseriesInMManger(int deviceNum, int measurementNum, boolean isAligned) |
| throws MetadataException { |
| for (int i = 0; i < deviceNum; i++) { |
| if (isAligned) { |
| List<String> measurements = new ArrayList<>(); |
| List<TSDataType> dataTypes = new ArrayList<>(); |
| List<TSEncoding> encodings = new ArrayList<>(); |
| List<CompressionType> compressionTypes = new ArrayList<>(); |
| for (int j = 0; j < measurementNum; j++) { |
| measurements.add("s" + j); |
| dataTypes.add(dataType); |
| encodings.add(TSEncoding.PLAIN); |
| compressionTypes.add(CompressionType.UNCOMPRESSED); |
| IoTDB.schemaProcessor.createTimeseries( |
| new PartialPath(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, "s" + j), |
| dataType, |
| TSEncoding.PLAIN, |
| CompressionType.UNCOMPRESSED, |
| Collections.emptyMap()); |
| } |
| IoTDB.schemaProcessor.createAlignedTimeSeries( |
| new PartialPath(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + (i + 10000)), |
| measurements, |
| dataTypes, |
| encodings, |
| compressionTypes); |
| } else { |
| for (int j = 0; j < measurementNum; j++) { |
| IoTDB.schemaProcessor.createTimeseries( |
| new PartialPath(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, "s" + j), |
| dataType, |
| TSEncoding.PLAIN, |
| CompressionType.UNCOMPRESSED, |
| Collections.emptyMap()); |
| } |
| } |
| } |
| } |
| |
| protected void deleteTimeseriesInMManager(List<String> timeseries) throws MetadataException { |
| for (String path : timeseries) { |
| IoTDB.schemaProcessor.deleteTimeseries(new PartialPath(path)); |
| } |
| } |
| |
| public void tearDown() throws IOException, StorageEngineException { |
| new CompactionConfigRestorer().restoreCompactionConfig(); |
| removeFiles(); |
| seqResources.clear(); |
| unseqResources.clear(); |
| IoTDB.configManager.clear(); |
| IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(oldTargetChunkSize); |
| IoTDBDescriptor.getInstance() |
| .getConfig() |
| .setMaxCrossCompactionCandidateFileNum(oldMaxCrossCompactionFileNum); |
| TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(oldChunkGroupSize); |
| TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(oldPagePointSize); |
| EnvironmentUtils.cleanEnv(); |
| if (SEQ_DIRS.exists()) { |
| FileUtils.deleteDirectory(SEQ_DIRS); |
| } |
| if (UNSEQ_DIRS.exists()) { |
| FileUtils.deleteDirectory(UNSEQ_DIRS); |
| } |
| } |
| |
| private void removeFiles() throws IOException { |
| FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders(); |
| for (TsFileResource tsFileResource : seqResources) { |
| if (tsFileResource.getTsFile().exists()) { |
| tsFileResource.remove(); |
| } |
| } |
| for (TsFileResource tsFileResource : unseqResources) { |
| if (tsFileResource.getTsFile().exists()) { |
| tsFileResource.remove(); |
| } |
| } |
| File[] files = FSFactoryProducer.getFSFactory().listFilesBySuffix("target", ".tsfile"); |
| for (File file : files) { |
| file.delete(); |
| } |
| File[] resourceFiles = |
| FSFactoryProducer.getFSFactory().listFilesBySuffix("target", ".resource"); |
| for (File resourceFile : resourceFiles) { |
| resourceFile.delete(); |
| } |
| } |
| |
| protected void setDataType(TSDataType dataType) { |
| this.dataType = dataType; |
| } |
| } |