blob: 0a59e5e80e43c8adc28208a3cd58cfa67b5f2712 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.read.reader.series;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache;
import org.apache.iotdb.db.storageengine.buffer.ChunkCache;
import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
import org.apache.iotdb.tsfile.write.page.TimePageWriter;
import org.apache.iotdb.tsfile.write.page.ValuePageWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.iotdb.db.storageengine.dataregion.compaction.utils.TsFileGeneratorUtils.createChunkWriter;
import static org.apache.iotdb.db.storageengine.dataregion.compaction.utils.TsFileGeneratorUtils.writeAlignedChunk;
import static org.apache.iotdb.db.storageengine.dataregion.compaction.utils.TsFileGeneratorUtils.writeAlignedPoint;
import static org.apache.iotdb.db.storageengine.dataregion.compaction.utils.TsFileGeneratorUtils.writeNullPoint;
public abstract class AbstractAlignedSeriesScanTest {
protected static final String TEST_DATABASE = "root.sg_pd";
protected static final IDeviceID TEST_DEVICE = new PlainDeviceID(TEST_DATABASE + ".d1_aligned");
/**
* The data distribution is as follows.
*
* <pre>
* time root.sg_pd.d1_aligned(s1, s2, s3)
* ┌──────────┐
* 0 │ f1-c1-p1 │ (s1, s2, s3 align on time column)
* ╞══════════╡
* 10 │ f2-c1-p1 │ (s1, s2, s3 non-align on time column)
* ╞══════════╡
* 20 │ f3-c1-p1 │ (s1, s2, s3 align on time column)
* │==========│
* 30 │ f3-c2-p1 │ (s1, s2, s3 non-align on time column)
* ╞══════════╡
* 40 │ f4-c1-p1 │ (s1, s2, s3 align on time column)
* │----------│
* 50 │ f4-c1-p2 │ (s1, s2, s3 non-align on time column)
* │----------│
* 60 │ f4-c1-p3 │ (s1, s2, s3 align on time column)
* │==========│──────────┐
* 70 │ f4-c2-p1 │ f5-c1-p1 │
* └──────────│----------│
* 80 │ f5-c1-p2 │
* └──────────┘
* </pre>
*/
protected static final List<TsFileResource> seqResources = new ArrayList<>();
protected static final List<TsFileResource> unSeqResources = new ArrayList<>();
@BeforeClass
public static void setUp() throws IOException, WriteProcessException, IllegalPathException {
List<PartialPath> writtenPaths =
Arrays.asList(
new AlignedPath(((PlainDeviceID) TEST_DEVICE).toStringID(), "s1"),
new AlignedPath(((PlainDeviceID) TEST_DEVICE).toStringID(), "s2"),
new AlignedPath(((PlainDeviceID) TEST_DEVICE).toStringID(), "s3"));
List<TSDataType> dataTypes =
Arrays.asList(TSDataType.INT32, TSDataType.INT32, TSDataType.INT32);
List<TSEncoding> encodings =
Arrays.asList(TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.PLAIN);
List<CompressionType> compressionTypes =
Arrays.asList(
CompressionType.UNCOMPRESSED,
CompressionType.UNCOMPRESSED,
CompressionType.UNCOMPRESSED);
// prepare file 1
File seqFile1 = new File(TestConstant.getTestTsFilePath(TEST_DATABASE, 0, 0, 1));
TsFileResource seqFileResource1 = new TsFileResource(seqFile1);
if (!seqFile1.getParentFile().exists()) {
Assert.assertTrue(seqFile1.getParentFile().mkdirs());
}
try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(seqFileResource1.getTsFile())) {
// prepare f1-c1
tsFileIOWriter.startChunkGroup(TEST_DEVICE);
List<TimeRange> pages = new ArrayList<>();
// prepare f1-c1-p1
pages.add(new TimeRange(0L, 9L));
for (IChunkWriter chunkWriter :
createChunkWriter(writtenPaths, dataTypes, encodings, compressionTypes, true)) {
writeAlignedChunk((AlignedChunkWriterImpl) chunkWriter, tsFileIOWriter, pages, true);
}
tsFileIOWriter.endChunkGroup();
seqFileResource1.updateStartTime(TEST_DEVICE, 0);
seqFileResource1.updateEndTime(TEST_DEVICE, 9);
tsFileIOWriter.endFile();
}
seqFileResource1.setStatusForTest(TsFileResourceStatus.NORMAL);
seqResources.add(seqFileResource1);
// prepare file 2
File seqFile2 = new File(TestConstant.getTestTsFilePath(TEST_DATABASE, 0, 0, 2));
TsFileResource seqFileResource2 = new TsFileResource(seqFile2);
try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(seqFileResource2.getTsFile())) {
// prepare f2-c1
tsFileIOWriter.startChunkGroup(TEST_DEVICE);
// prepare f2-c1-p1
AlignedChunkWriterImpl alignedChunkWriter =
(AlignedChunkWriterImpl)
createChunkWriter(writtenPaths, dataTypes, encodings, compressionTypes, true).get(0);
TimePageWriter timePageWriter = alignedChunkWriter.getTimeChunkWriter().getPageWriter();
// write time page
for (long timestamp = 10L; timestamp <= 19L; timestamp++) {
timePageWriter.write(timestamp);
}
// seal time page
alignedChunkWriter.getTimeChunkWriter().sealCurrentPage();
List<ValueChunkWriter> valueChunkWriterList = alignedChunkWriter.getValueChunkWriterList();
// write value page-1
ValuePageWriter valuePageWriter1 = valueChunkWriterList.get(0).getPageWriter();
for (long timestamp = 10L; timestamp <= 15L; timestamp++) {
writeAlignedPoint(valuePageWriter1, timestamp, true);
}
for (long timestamp = 16L; timestamp <= 19L; timestamp++) {
writeNullPoint(valuePageWriter1, timestamp);
}
// seal sub value page
valueChunkWriterList.get(0).sealCurrentPage();
// write value page-2
ValuePageWriter valuePageWriter2 = valueChunkWriterList.get(1).getPageWriter();
for (long timestamp = 10L; timestamp <= 13L; timestamp++) {
writeNullPoint(valuePageWriter2, timestamp);
}
for (long timestamp = 14L; timestamp <= 19L; timestamp++) {
writeAlignedPoint(valuePageWriter2, timestamp, true);
}
// seal sub value page
valueChunkWriterList.get(1).sealCurrentPage();
// write value page-3
ValuePageWriter valuePageWriter3 = valueChunkWriterList.get(2).getPageWriter();
for (long timestamp = 10L; timestamp <= 11L; timestamp++) {
writeNullPoint(valuePageWriter3, timestamp);
}
for (long timestamp = 12L; timestamp <= 17L; timestamp++) {
writeAlignedPoint(valuePageWriter3, timestamp, true);
}
for (long timestamp = 18L; timestamp <= 19L; timestamp++) {
writeNullPoint(valuePageWriter3, timestamp);
}
// seal sub value page
valueChunkWriterList.get(2).sealCurrentPage();
// seal time chunk and value chunks
alignedChunkWriter.writeToFileWriter(tsFileIOWriter);
tsFileIOWriter.endChunkGroup();
seqFileResource2.updateStartTime(TEST_DEVICE, 10);
seqFileResource2.updateEndTime(TEST_DEVICE, 19);
tsFileIOWriter.endFile();
}
seqFileResource2.setStatusForTest(TsFileResourceStatus.NORMAL);
seqResources.add(seqFileResource2);
// prepare file 3
File seqFile3 = new File(TestConstant.getTestTsFilePath(TEST_DATABASE, 0, 0, 3));
TsFileResource seqFileResource3 = new TsFileResource(seqFile3);
try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(seqFileResource3.getTsFile())) {
// prepare f3-c1
tsFileIOWriter.startChunkGroup(TEST_DEVICE);
// prepare f3-c1-p1
List<TimeRange> pages = new ArrayList<>();
pages.add(new TimeRange(20L, 29L));
for (IChunkWriter chunkWriter :
createChunkWriter(writtenPaths, dataTypes, encodings, compressionTypes, true)) {
writeAlignedChunk((AlignedChunkWriterImpl) chunkWriter, tsFileIOWriter, pages, true);
}
tsFileIOWriter.endChunkGroup();
// prepare f3-c2
tsFileIOWriter.startChunkGroup(TEST_DEVICE);
// prepare f3-c2-p1
AlignedChunkWriterImpl alignedChunkWriter =
(AlignedChunkWriterImpl)
createChunkWriter(writtenPaths, dataTypes, encodings, compressionTypes, true).get(0);
TimePageWriter timePageWriter = alignedChunkWriter.getTimeChunkWriter().getPageWriter();
// write time page
for (long timestamp = 30L; timestamp <= 39L; timestamp++) {
timePageWriter.write(timestamp);
}
// seal time page
alignedChunkWriter.getTimeChunkWriter().sealCurrentPage();
List<ValueChunkWriter> valueChunkWriterList = alignedChunkWriter.getValueChunkWriterList();
// write value page-1
ValuePageWriter valuePageWriter1 = valueChunkWriterList.get(0).getPageWriter();
for (long timestamp = 30L; timestamp <= 35L; timestamp++) {
writeAlignedPoint(valuePageWriter1, timestamp, true);
}
for (long timestamp = 36L; timestamp <= 39L; timestamp++) {
writeNullPoint(valuePageWriter1, timestamp);
}
// seal sub value page
valueChunkWriterList.get(0).sealCurrentPage();
// write value page-2
ValuePageWriter valuePageWriter2 = valueChunkWriterList.get(1).getPageWriter();
for (long timestamp = 30L; timestamp <= 33L; timestamp++) {
writeNullPoint(valuePageWriter2, timestamp);
}
for (long timestamp = 34L; timestamp <= 39L; timestamp++) {
writeAlignedPoint(valuePageWriter2, timestamp, true);
}
// seal sub value page
valueChunkWriterList.get(1).sealCurrentPage();
// write value page-3
ValuePageWriter valuePageWriter3 = valueChunkWriterList.get(2).getPageWriter();
for (long timestamp = 30L; timestamp <= 31L; timestamp++) {
writeNullPoint(valuePageWriter3, timestamp);
}
for (long timestamp = 32L; timestamp <= 37L; timestamp++) {
writeAlignedPoint(valuePageWriter3, timestamp, true);
}
for (long timestamp = 38L; timestamp <= 39L; timestamp++) {
writeNullPoint(valuePageWriter3, timestamp);
}
// seal sub value page
valueChunkWriterList.get(2).sealCurrentPage();
// seal time chunk and value chunks
alignedChunkWriter.writeToFileWriter(tsFileIOWriter);
tsFileIOWriter.endChunkGroup();
seqFileResource3.updateStartTime(TEST_DEVICE, 20);
seqFileResource3.updateEndTime(TEST_DEVICE, 39);
tsFileIOWriter.endFile();
}
seqFileResource3.setStatusForTest(TsFileResourceStatus.NORMAL);
seqResources.add(seqFileResource3);
// prepare file 4
File seqFile4 = new File(TestConstant.getTestTsFilePath(TEST_DATABASE, 0, 0, 4));
TsFileResource seqFileResource4 = new TsFileResource(seqFile4);
try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(seqFileResource4.getTsFile())) {
// prepare f4-c1
tsFileIOWriter.startChunkGroup(TEST_DEVICE);
// prepare f4-c1-p1
List<TimeRange> pages = new ArrayList<>();
pages.add(new TimeRange(40L, 49L));
for (IChunkWriter chunkWriter :
createChunkWriter(writtenPaths, dataTypes, encodings, compressionTypes, true)) {
writeAlignedChunk((AlignedChunkWriterImpl) chunkWriter, tsFileIOWriter, pages, true);
}
// prepare f4-c1-p2
AlignedChunkWriterImpl alignedChunkWriter =
(AlignedChunkWriterImpl)
createChunkWriter(writtenPaths, dataTypes, encodings, compressionTypes, true).get(0);
TimePageWriter timePageWriter = alignedChunkWriter.getTimeChunkWriter().getPageWriter();
// write time page
for (long timestamp = 50L; timestamp <= 59L; timestamp++) {
timePageWriter.write(timestamp);
}
// seal time page
alignedChunkWriter.getTimeChunkWriter().sealCurrentPage();
List<ValueChunkWriter> valueChunkWriterList = alignedChunkWriter.getValueChunkWriterList();
// write value page-1
ValuePageWriter valuePageWriter1 = valueChunkWriterList.get(0).getPageWriter();
for (long timestamp = 50L; timestamp <= 55L; timestamp++) {
writeAlignedPoint(valuePageWriter1, timestamp, true);
}
for (long timestamp = 56L; timestamp <= 59L; timestamp++) {
writeNullPoint(valuePageWriter1, timestamp);
}
// seal sub value page
valueChunkWriterList.get(0).sealCurrentPage();
// write value page-2
ValuePageWriter valuePageWriter2 = valueChunkWriterList.get(1).getPageWriter();
for (long timestamp = 50L; timestamp <= 53L; timestamp++) {
writeNullPoint(valuePageWriter2, timestamp);
}
for (long timestamp = 54L; timestamp <= 59L; timestamp++) {
writeAlignedPoint(valuePageWriter2, timestamp, true);
}
// seal sub value page
valueChunkWriterList.get(1).sealCurrentPage();
// write value page-3
ValuePageWriter valuePageWriter3 = valueChunkWriterList.get(2).getPageWriter();
for (long timestamp = 50L; timestamp <= 51L; timestamp++) {
writeNullPoint(valuePageWriter3, timestamp);
}
for (long timestamp = 52L; timestamp <= 57L; timestamp++) {
writeAlignedPoint(valuePageWriter3, timestamp, true);
}
for (long timestamp = 58L; timestamp <= 59L; timestamp++) {
writeNullPoint(valuePageWriter3, timestamp);
}
// seal sub value page
valueChunkWriterList.get(2).sealCurrentPage();
// seal time chunk and value chunks
alignedChunkWriter.writeToFileWriter(tsFileIOWriter);
// prepare f4-c1-p3
pages.clear();
pages.add(new TimeRange(60L, 69L));
for (IChunkWriter chunkWriter :
createChunkWriter(writtenPaths, dataTypes, encodings, compressionTypes, true)) {
writeAlignedChunk((AlignedChunkWriterImpl) chunkWriter, tsFileIOWriter, pages, true);
}
tsFileIOWriter.endChunkGroup();
// prepare f4-c2
tsFileIOWriter.startChunkGroup(TEST_DEVICE);
pages.clear();
// prepare f4-c2-p1
pages.add(new TimeRange(70L, 79L));
for (IChunkWriter chunkWriter :
createChunkWriter(writtenPaths, dataTypes, encodings, compressionTypes, true)) {
writeAlignedChunk((AlignedChunkWriterImpl) chunkWriter, tsFileIOWriter, pages, true);
}
tsFileIOWriter.endChunkGroup();
seqFileResource4.updateStartTime(TEST_DEVICE, 40);
seqFileResource4.updateEndTime(TEST_DEVICE, 79);
tsFileIOWriter.endFile();
}
seqFileResource4.setStatusForTest(TsFileResourceStatus.NORMAL);
seqResources.add(seqFileResource4);
// prepare file 5
File unseqFile5 = new File(TestConstant.getTestTsFilePath(TEST_DATABASE, 0, 0, 5));
TsFileResource unseqFileResource5 = new TsFileResource(unseqFile5);
try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(unseqFileResource5.getTsFile())) {
// prepare f5-c1
tsFileIOWriter.startChunkGroup(TEST_DEVICE);
List<TimeRange> pages = new ArrayList<>();
// prepare f5-c1-p1
pages.add(new TimeRange(70L, 79L));
// prepare f5-c1-p2
pages.add(new TimeRange(80L, 89L));
for (IChunkWriter chunkWriter :
createChunkWriter(writtenPaths, dataTypes, encodings, compressionTypes, true)) {
writeAlignedChunk((AlignedChunkWriterImpl) chunkWriter, tsFileIOWriter, pages, true);
}
tsFileIOWriter.endChunkGroup();
unseqFileResource5.updateStartTime(TEST_DEVICE, 70);
unseqFileResource5.updateEndTime(TEST_DEVICE, 89);
tsFileIOWriter.endFile();
}
unseqFileResource5.setStatusForTest(TsFileResourceStatus.NORMAL);
unSeqResources.add(unseqFileResource5);
}
@AfterClass
public static void tearDown() throws IOException {
FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
for (TsFileResource tsFileResource : seqResources) {
if (tsFileResource.getTsFile().exists()) {
tsFileResource.remove();
}
}
for (TsFileResource tsFileResource : unSeqResources) {
if (tsFileResource.getTsFile().exists()) {
tsFileResource.remove();
}
}
File[] files = FSFactoryProducer.getFSFactory().listFilesBySuffix("target", ".tsfile");
for (File file : files) {
file.delete();
}
File[] resourceFiles =
FSFactoryProducer.getFSFactory().listFilesBySuffix("target", ".resource");
for (File resourceFile : resourceFiles) {
resourceFile.delete();
}
seqResources.clear();
unSeqResources.clear();
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
BloomFilterCache.getInstance().clear();
EnvironmentUtils.cleanAllDir();
}
}