blob: 5a3892f451ddc6707e3141b91c173262efbc357b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer;
import org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.reader.IDataBlockReader;
import org.apache.iotdb.db.engine.compaction.reader.SeriesDataBlockReader;
import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
import static org.junit.Assert.assertEquals;
public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
private final String oldThreadName = Thread.currentThread().getName();
@Before
public void setUp() throws IOException, WriteProcessException, MetadataException {
super.setUp();
IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(1024);
Thread.currentThread().setName("pool-1-IoTDB-Compaction-1");
}
@After
public void tearDown() throws IOException, StorageEngineException {
super.tearDown();
Thread.currentThread().setName(oldThreadName);
for (TsFileResource tsFileResource : seqResources) {
FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());
}
for (TsFileResource tsFileResource : unseqResources) {
FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());
}
}
/* Total 5 seq files, each file has the same 6 nonAligned timeseries, each timeseries has the same 100 data point.*/
@Test
public void testSeqInnerSpaceCompactionWithSameTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
registerTimeseriesInMManger(2, 3, false);
createFiles(5, 2, 3, 100, 0, 0, 50, 50, false, true);
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d1",
"s1",
new MeasurementSchema("s1", TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
assertEquals(iterator.currentTime(), iterator.currentValue());
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(500, count);
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
assertEquals(iterator.currentTime(), iterator.currentValue());
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(500, count);
}
/*
Total 6 seq files, each file has different nonAligned timeseries.
First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 99 and 150 ~ 249, value range is 0 ~ 99 and 150 ~ 249.
Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 250 ~ 299 and 350 ~ 399, value range is 250 ~ 299 and 350 ~ 399.
Fifth and Sixth file: d0 ~ d4 and s0 ~ s5, time range is 600 ~ 649 and 700 ~ 749, value range is 800 ~ 849 and 900 ~ 949.
Timeseries d[0-4].s5 are deleted before compaction.
*/
@Test
public void testSeqInnerSpaceCompactionWithDifferentTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
registerTimeseriesInMManger(5, 5, false);
createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, true);
createFiles(2, 3, 5, 50, 250, 250, 50, 50, false, true);
createFiles(2, 5, 6, 50, 600, 800, 50, 50, false, true);
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 5; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() >= 600) {
assertEquals(iterator.currentTime() + 200, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(400, count);
} else if (i < 3) {
assertEquals(200, count);
} else {
assertEquals(100, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
assertEquals(
0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
assertEquals(
0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
assertEquals(
250, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
assertEquals(
600, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
assertEquals(
600, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
for (int i = 0; i < 5; i++) {
assertEquals(
749, targetResources.get(0).getEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i));
}
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 5; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() >= 600) {
assertEquals(iterator.currentTime() + 200, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(400, count);
} else if (i < 3 && j < 5) {
assertEquals(200, count);
} else if (i < 5 && j < 5) {
assertEquals(100, count);
}
}
}
}
/* Total 5 unseq files, each file has the same 6 nonAligned timeseries, each timeseries has the same 100 data point.*/
@Test
public void testUnSeqInnerSpaceCompactionWithSameTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
registerTimeseriesInMManger(2, 3, false);
createFiles(5, 2, 3, 100, 0, 0, 50, 50, false, false);
for (int i = 0; i < 2; i++) {
for (int j = 0; j < 3; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s1",
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
assertEquals(iterator.currentTime(), iterator.currentValue());
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(500, count);
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = 0; i < 2; i++) {
for (int j = 0; j < 3; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s1",
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
new ArrayList<>(),
targetResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
assertEquals(iterator.currentTime(), iterator.currentValue());
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(500, count);
}
}
}
/*
Total 10 seq files, each file has different nonAligned timeseries.
First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 99 and 150 ~ 249, value range is 0 ~ 99 and 150 ~ 249.
Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 150 ~ 199 and 250 ~ 299, value range is 150 ~ 199 and 250 ~ 299.
Fifth and Sixth file: d0 ~ d4 and s0 ~ s4, time range is 100 ~ 149 and 250 ~ 299, value range is 100 ~ 149 and 250 ~ 299.
Seventh and Eighth file: d0 ~ d6 and s0 ~ s6, time range is 200 ~ 269 and 370 ~ 439, value range is 300 ~ 369 and 470 ~ 539.
Ninth and Tenth file: d0 ~ d8 and s0 ~ s8, time range is 100 ~ 169 and 270 ~ 339, value range is 300 ~ 369 and 470 ~ 539.
*/
@Test
public void testUnSeqInnerSpaceCompactionWithDifferentTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
registerTimeseriesInMManger(9, 9, false);
createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, false);
createFiles(2, 3, 5, 50, 150, 150, 50, 50, false, false);
createFiles(2, 5, 5, 50, 100, 100, 100, 100, false, false);
createFiles(2, 7, 7, 70, 200, 300, 100, 100, false, false);
createFiles(2, 9, 9, 70, 100, 300, 100, 100, false, false);
for (int i = 0; i < 9; i++) {
for (int j = 0; j < 9; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if ((100 <= iterator.currentTime() && iterator.currentTime() < 170)
|| (270 <= iterator.currentTime() && iterator.currentTime() < 340)) {
assertEquals(iterator.currentTime() + 200, iterator.currentValue());
} else if ((200 <= iterator.currentTime() && iterator.currentTime() < 270)
|| (370 <= iterator.currentTime() && iterator.currentTime() < 440)) {
assertEquals(iterator.currentTime() + 100, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(410, count);
} else if (i < 3 && j < 5) {
assertEquals(310, count);
} else if (i < 5 && j < 5) {
assertEquals(280, count);
} else if (i < 7 && j < 7) {
assertEquals(280, count);
} else {
assertEquals(140, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = 0; i < 9; i++) {
for (int j = 0; j < 9; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
new ArrayList<>(),
targetResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if ((100 <= iterator.currentTime() && iterator.currentTime() < 170)
|| (270 <= iterator.currentTime() && iterator.currentTime() < 340)) {
assertEquals(iterator.currentTime() + 200, iterator.currentValue());
} else if ((200 <= iterator.currentTime() && iterator.currentTime() < 270)
|| (370 <= iterator.currentTime() && iterator.currentTime() < 440)) {
assertEquals(iterator.currentTime() + 100, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(410, count);
} else if (i < 3 && j < 5) {
assertEquals(310, count);
} else if (i < 5 && j < 5) {
assertEquals(280, count);
} else if (i < 7 && j < 7) {
assertEquals(280, count);
} else {
assertEquals(140, count);
}
}
}
}
/*
Total 6 unseq files, each file has different nonAligned timeseries.
First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 300 ~ 599 , value range is 0 ~ 299 and 300 ~ 599.
Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 200 ~ 499 and 550 ~ 849, value range is 300 ~ 599 and 650 ~ 949.
Fifth and Sixth file: d0 ~ d4 and s0 ~ s6, time range is 900 ~ 1199 and 1250 ~ 1549, value range is 1100 ~ 1399 and 1450 ~ 1749.
The data of d0.s0, d0.s1, d2.s4 and d3.s5 is deleted in each file.
*/
@Test
public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, false);
createFiles(2, 2, 3, 300, 0, 0, 0, 0, false, false);
createFiles(2, 3, 5, 300, 200, 300, 50, 50, false, false);
createFiles(2, 5, 7, 300, 900, 1100, 50, 50, false, false);
// generate mods file
for (int i = 0; i < unseqResources.size(); i++) {
Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
deleteMap.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + 0 + PATH_SEPARATOR + "s0",
new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
deleteMap.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + 0 + PATH_SEPARATOR + "s1",
new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
deleteMap.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + 2 + PATH_SEPARATOR + "s4",
new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
deleteMap.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + 3 + PATH_SEPARATOR + "s5",
new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(i), false);
}
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 7; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() < 200
|| (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(iterator.currentTime(), iterator.currentValue());
} else if (iterator.currentTime() < 850) {
assertEquals(iterator.currentTime() + 100, iterator.currentValue());
} else {
assertEquals(iterator.currentTime() + 200, iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if ((i == 0 && j == 0) || (i == 0 && j == 1) || (i == 2 && j == 4) || (i == 3 && j == 5)) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
assertEquals(1450, count);
} else if (i < 3 && j < 5) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 7; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
new ArrayList<>(),
targetResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() < 200
|| (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(iterator.currentTime(), iterator.currentValue());
} else if (iterator.currentTime() < 850) {
assertEquals(iterator.currentTime() + 100, iterator.currentValue());
} else {
assertEquals(iterator.currentTime() + 200, iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if ((i == 0 && j == 0) || (i == 0 && j == 1) || (i == 2 && j == 4) || (i == 3 && j == 5)) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
assertEquals(1450, count);
} else if (i < 3 && j < 5) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
/*
Total 6 unseq files, each file has different nonAligned timeseries.
First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 300 ~ 599 , value range is 0 ~ 299 and 300 ~ 599.
Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 200 ~ 499 and 550 ~ 849, value range is 300 ~ 599 and 650 ~ 949.
Fifth and Sixth file: d0 ~ d4 and s0 ~ s6, time range is 900 ~ 1199 and 1250 ~ 1549, value range is 1100 ~ 1399 and 1450 ~ 1749.
The data of device d0 is deleted in each file.
*/
@Test
public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInDevice()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, false);
createFiles(2, 2, 3, 300, 0, 0, 0, 0, false, false);
createFiles(2, 3, 5, 300, 200, 300, 50, 50, false, false);
createFiles(2, 5, 7, 300, 900, 1100, 50, 50, false, false);
// generate mods file
for (int i = 0; i < unseqResources.size(); i++) {
Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
for (int j = 0; j < 7; j++) {
deleteMap.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + 0 + PATH_SEPARATOR + "s" + j,
new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
}
CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(i), false);
}
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 7; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() < 200
|| (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(iterator.currentTime(), iterator.currentValue());
} else if (iterator.currentTime() < 850) {
assertEquals(iterator.currentTime() + 100, iterator.currentValue());
} else {
assertEquals(iterator.currentTime() + 200, iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == 0) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
assertEquals(1450, count);
} else if (i < 3 && j < 5) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 7; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
new ArrayList<>(),
targetResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() < 200
|| (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(iterator.currentTime(), iterator.currentValue());
} else if (iterator.currentTime() < 850) {
assertEquals(iterator.currentTime() + 100, iterator.currentValue());
} else {
assertEquals(iterator.currentTime() + 200, iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == 0) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
assertEquals(1450, count);
} else if (i < 3 && j < 5) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
/*
Total 6 unseq files, each file has different nonAligned timeseries.
First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 300 ~ 599 , value range is 0 ~ 299 and 300 ~ 599.
Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 200 ~ 499 and 550 ~ 849, value range is 300 ~ 599 and 650 ~ 949.
Fifth and Sixth file: d0 ~ d4 and s0 ~ s6, time range is 900 ~ 1199 and 1250 ~ 1549, value range is 1100 ~ 1399 and 1450 ~ 1749.
The data of device d0 ~ d4 is deleted in each file.
*/
@Test
public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInTargetFile()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, false);
createFiles(2, 2, 3, 300, 0, 0, 0, 0, false, false);
createFiles(2, 3, 5, 300, 200, 300, 50, 50, false, false);
createFiles(2, 5, 7, 300, 900, 1100, 50, 50, false, false);
// generate mods file
for (int i = 0; i < unseqResources.size(); i++) {
Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
for (int d = 0; d < 5; d++) {
for (int j = 0; j < 7; j++) {
deleteMap.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + d + PATH_SEPARATOR + "s" + j,
new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
}
}
CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(i), false);
}
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 7; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(0, count);
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 7; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
new ArrayList<>(),
targetResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(0, count);
}
}
}
/* Total 5 seq files, each file has the same 6 aligned timeseries, each timeseries has the same 100 data point.*/
@Test
public void testAlignedSeqInnerSpaceCompactionWithSameTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
registerTimeseriesInMManger(2, 3, true);
createFiles(5, 2, 3, 100, 0, 0, 50, 50, true, true);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2;
i++) {
for (int j = 0; j < 3; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(500, count);
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2;
i++) {
for (int j = 0; j < 3; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(500, count);
}
}
}
/*
Total 6 seq files, each file has different aligned timeseries, which cause empty page.
First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 99 and 150 ~ 249, value range is 0 ~ 99 and 150 ~ 249.
Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 250 ~ 299 and 350 ~ 399, value range is 250 ~ 299 and 350 ~ 399.
Fifth and Sixth file: d0 ~ d4 and s0 ~ s7, time range is 600 ~ 649 and 700 ~ 749, value range is 800 ~ 849 and 900 ~ 949.
Timeseries d[0-4].s7 are deleted before compaction.
*/
@Test
public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyPage()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(50);
registerTimeseriesInMManger(5, 7, true);
createFiles(2, 2, 3, 100, 0, 0, 50, 50, true, true);
createFiles(2, 3, 5, 50, 250, 250, 50, 50, true, true);
createFiles(2, 5, 8, 50, 600, 800, 50, 50, true, true);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 5;
i++) {
for (int j = 0; j < 7; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() >= 600) {
assertEquals(
iterator.currentTime() + 200,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(400, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
assertEquals(200, count);
} else {
assertEquals(100, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 5;
i++) {
for (int j = 0; j < 7; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() >= 600) {
assertEquals(
iterator.currentTime() + 200,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(400, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
assertEquals(200, count);
} else {
assertEquals(100, count);
}
}
}
}
/*
Total 6 seq files, each file has different aligned timeseries, which cause empty value chunk.
First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 99 and 150 ~ 249, value range is 0 ~ 99 and 150 ~ 249.
Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 250 ~ 299 and 350 ~ 399, value range is 250 ~ 299 and 350 ~ 399.
Fifth and Sixth file: d0 ~ d4 and s0 ~ s6, time range is 600 ~ 649 and 700 ~ 749, value range is 800 ~ 849 and 900 ~ 949.
*/
@Test
public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyChunk()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
registerTimeseriesInMManger(5, 7, true);
createFiles(2, 2, 3, 100, 0, 0, 50, 50, true, true);
createFiles(2, 3, 5, 50, 250, 250, 50, 50, true, true);
createFiles(2, 5, 7, 50, 600, 800, 50, 50, true, true);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 5;
i++) {
for (int j = 0; j < 7; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() >= 600) {
assertEquals(
iterator.currentTime() + 200,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(400, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
assertEquals(200, count);
} else {
assertEquals(100, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 5;
i++) {
for (int j = 0; j < 7; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
false);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() >= 600) {
assertEquals(
iterator.currentTime() + 200,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(400, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
assertEquals(200, count);
} else {
assertEquals(100, count);
}
}
}
}
/*
Total 6 unseq files, each file has different aligned timeseries, which cause empty page and chunk.
First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 300 ~ 599 , value range is 0 ~ 299 and 300 ~ 599.
Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 200 ~ 499 and 550 ~ 849, value range is 300 ~ 599 and 650 ~ 949.
Fifth and Sixth file: d0 ~ d4 and s0 ~ s6, time range is 900 ~ 1199 and 1250 ~ 1549, value range is 1100 ~ 1399 and 1450 ~ 1749.
*/
@Test
public void testAlignedUnSeqInnerSpaceCompactionWithEmptyChunkAndEmptyPage()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, true);
createFiles(2, 2, 3, 300, 0, 0, 0, 0, true, false);
createFiles(2, 3, 5, 300, 200, 300, 50, 50, true, false);
createFiles(2, 5, 7, 300, 900, 1100, 50, 50, true, false);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 5;
i++) {
for (int j = 0; j < 7; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() < 200
|| (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if (iterator.currentTime() < 850) {
assertEquals(
iterator.currentTime() + 100,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime() + 200,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1450, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 5;
i++) {
for (int j = 0; j < 7; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() < 200
|| (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if (iterator.currentTime() < 850) {
assertEquals(
iterator.currentTime() + 100,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime() + 200,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1450, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
/*
Total 6 unseq files, each file has different aligned timeseries, which cause empty page and chunk.
First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 300 ~ 599 , value range is 0 ~ 299 and 300 ~ 599.
Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 200 ~ 499 and 550 ~ 849, value range is 300 ~ 599 and 650 ~ 949.
Fifth and Sixth file: d0 ~ d4 and s0 ~ s6, time range is 900 ~ 1199 and 1250 ~ 1549, value range is 1100 ~ 1399 and 1450 ~ 1749.
The data of d0.s0, d0.s1, d2.s4 and d3.s5 is deleted in each file.
*/
@Test
public void testAlignedUnSeqInnerSpaceCompactionWithAllDataDeletedInTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, true);
createFiles(2, 2, 3, 300, 0, 0, 0, 0, true, false);
createFiles(2, 3, 5, 300, 200, 300, 50, 50, true, false);
createFiles(2, 5, 7, 300, 900, 1100, 50, 50, true, false);
// generate mods file
for (int i = 0; i < unseqResources.size(); i++) {
Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
deleteMap.put(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ TsFileGeneratorUtils.getAlignDeviceOffset()
+ PATH_SEPARATOR
+ "s0",
new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
deleteMap.put(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ TsFileGeneratorUtils.getAlignDeviceOffset()
+ PATH_SEPARATOR
+ "s1",
new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
deleteMap.put(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)
+ PATH_SEPARATOR
+ "s4",
new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
deleteMap.put(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)
+ PATH_SEPARATOR
+ "s5",
new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(i), false);
}
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 5;
i++) {
for (int j = 0; j < 7; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() < 200
|| (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if (iterator.currentTime() < 850) {
assertEquals(
iterator.currentTime() + 100,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime() + 200,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j == 5)) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1450, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 5;
i++) {
for (int j = 0; j < 7; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() < 200
|| (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if (iterator.currentTime() < 850) {
assertEquals(
iterator.currentTime() + 100,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime() + 200,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j == 5)) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1450, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
/*
Total 6 unseq files, each file has different aligned timeseries, which cause empty page and chunk.
First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 300 ~ 599 , value range is 0 ~ 299 and 300 ~ 599.
Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 200 ~ 499 and 550 ~ 849, value range is 300 ~ 599 and 650 ~ 949.
Fifth and Sixth file: d0 ~ d4 and s0 ~ s6, time range is 900 ~ 1199 and 1250 ~ 1549, value range is 1100 ~ 1399 and 1450 ~ 1749.
The data of device d0 is deleted in each file.
*/
@Test
public void testAlignedUnSeqInnerSpaceCompactionWithAllDataDeletedInDevice()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, true);
createFiles(2, 2, 3, 300, 0, 0, 0, 0, true, false);
createFiles(2, 3, 5, 300, 200, 300, 50, 50, true, false);
createFiles(2, 5, 7, 300, 900, 1100, 50, 50, true, false);
// generate mods file
for (int i = 0; i < unseqResources.size(); i++) {
Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
for (int j = 0; j < 7; j++) {
deleteMap.put(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ TsFileGeneratorUtils.getAlignDeviceOffset()
+ PATH_SEPARATOR
+ "s"
+ j,
new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
}
CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(i), false);
}
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 5;
i++) {
for (int j = 0; j < 7; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() < 200
|| (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if (iterator.currentTime() < 850) {
assertEquals(
iterator.currentTime() + 100,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime() + 200,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1450, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 5;
i++) {
for (int j = 0; j < 7; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() < 200
|| (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if (iterator.currentTime() < 850) {
assertEquals(
iterator.currentTime() + 100,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime() + 200,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1450, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
/* Total 5 files, each file has the same 6 aligned timeseries, each timeseries has the same 100 data point.*/
@Test
public void testAlignedUnSeqInnerSpaceCompactionWithSameTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
registerTimeseriesInMManger(2, 3, true);
createFiles(5, 2, 3, 100, 0, 0, 50, 50, true, false);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2;
i++) {
for (int j = 0; j < 3; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(500, count);
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2;
i++) {
for (int j = 0; j < 3; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
new ArrayList<>(),
targetResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(500, count);
}
}
}
/**
* Total 5 seq files and 5 unseq files, each file has the same nonAligned timeseries
*
* <p>Seq files has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 99, 100 ~ 199, 200 ~ 299, 300 ~ 399 and
* 400 ~ 499, value range is 0 ~ 99, 100 ~ 199, 200 ~ 299, 300 ~ 399 and 400 ~ 499.
*
* <p>UnSeq files has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 49, 100 ~ 149, 200 ~ 249, 300 ~ 349
* and 400 ~ 449, value range is 10000 ~ 10049, 10100 ~ 10149, 10200 ~ 10249, 10300 ~ 10349 and
* 10400 ~ 10449.
*/
@Test
public void testCrossSpaceCompactionWithSameTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
registerTimeseriesInMManger(2, 3, false);
createFiles(5, 2, 3, 100, 0, 0, 0, 0, false, true);
createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d1",
"s1",
new MeasurementSchema("s1", TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() % 100 < 50) {
assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(500, count);
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() % 100 < 50) {
assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(500, count);
}
/**
* Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
* is 0 ~ 299 and 350 ~ 649.<br>
* third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
* range is 700 ~ 999 and 1050 ~ 1349.<br>
*
* <p>UnSeq files<br>
* first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
* ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*/
@Test
public void testCrossSpaceCompactionWithDifferentTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (i == 0
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(1280, count);
} else if (i < 1 && j < 4) {
assertEquals(1230, count);
} else if (i == 0) {
assertEquals(800, count);
} else if ((i == 1 && j == 4)) {
assertEquals(600, count);
} else if (i < 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
for (int i = 0; i < 2; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
for (int i = 2; i < 4; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
measurementMaxTime.putIfAbsent(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
Long.MIN_VALUE);
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
>= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
iterator.currentTime());
if (i == 0
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(1280, count);
} else if (i < 1 && j < 4) {
assertEquals(1230, count);
} else if (i == 0) {
assertEquals(800, count);
} else if ((i == 1 && j == 4)) {
assertEquals(600, count);
} else if (i < 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
/**
* Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
* is 0 ~ 299 and 350 ~ 649.<br>
* third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
* range is 700 ~ 999 and 1050 ~ 1349.<br>
*
* <p>UnSeq files<br>
* first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
* ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*
* <p>The data of d0.s0, d0.s1, d2.s4 and d3.s4 is deleted in each file.
*/
@Test
public void testCrossSpaceCompactionWithAllDataDeletedInTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
// generate mods file
List<String> seriesPaths = new ArrayList<>();
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s0");
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s1");
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2" + PATH_SEPARATOR + "s4");
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3" + PATH_SEPARATOR + "s4");
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
deleteTimeseriesInMManager(seriesPaths);
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (i == 0
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if ((i == 0 && j == 0) || (i == 0 && j == 1) || (i == 2 && j == 4) || (i == 3 && j == 4)) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
assertEquals(1280, count);
} else if (i < 1 && j < 4) {
assertEquals(1230, count);
} else if (i == 0) {
assertEquals(800, count);
} else if ((i == 1 && j == 4)) {
assertEquals(600, count);
} else if (i < 3) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
for (int i = 0; i < 2; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
for (int i = 2; i < 4; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
measurementMaxTime.putIfAbsent(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
Long.MIN_VALUE);
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
>= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
iterator.currentTime());
if (i == 0
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if ((i == 0 && j == 0) || (i == 0 && j == 1) || (i == 2 && j == 4) || (i == 3 && j == 4)) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
assertEquals(1280, count);
} else if (i < 1 && j < 4) {
assertEquals(1230, count);
} else if (i == 0) {
assertEquals(800, count);
} else if ((i == 1 && j == 4)) {
assertEquals(600, count);
} else if (i < 3) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
/**
* Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
* is 0 ~ 299 and 350 ~ 649.<br>
* third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
* range is 700 ~ 999 and 1050 ~ 1349.<br>
*
* <p>UnSeq files<br>
* first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
* ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*
* <p>The data of d0 and d2 is deleted in each file.
*/
@Test
public void testCrossSpaceCompactionWithAllDataDeletedInDevice()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
// generate mods file
List<String> seriesPaths = new ArrayList<>();
for (int i = 0; i < 5; i++) {
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s" + i);
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2" + PATH_SEPARATOR + "s" + i);
}
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
deleteTimeseriesInMManager(seriesPaths);
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (i == 0
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == 0 || i == 2) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
assertEquals(1280, count);
} else if ((i == 1 && j == 4)) {
assertEquals(600, count);
} else if (i < 3) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
for (int i = 0; i < 2; i++) {
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
for (int i = 2; i < 4; i++) {
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
measurementMaxTime.putIfAbsent(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
Long.MIN_VALUE);
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
>= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
iterator.currentTime());
if (i == 0
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == 0 || i == 2) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
assertEquals(1280, count);
} else if ((i == 1 && j == 4)) {
assertEquals(600, count);
} else if (i < 3) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
/**
* Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
* is 0 ~ 299 and 350 ~ 649.<br>
* third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
* range is 700 ~ 999 and 1050 ~ 1349.<br>
*
* <p>UnSeq files<br>
* first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
* ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*
* <p>The data of d0, d1 and d2 is deleted in each file. Data in the first and second target file
* is all deleted.
*/
@Test
public void testCrossSpaceCompactionWithAllDataDeletedInOneTargetFile()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
// generate mods file
List<String> seriesPaths = new ArrayList<>();
for (int i = 0; i < 5; i++) {
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s" + i);
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" + PATH_SEPARATOR + "s" + i);
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2" + PATH_SEPARATOR + "s" + i);
}
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
deleteTimeseriesInMManager(seriesPaths);
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (i == 0
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(2, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
for (int i = 0; i < 2; i++) {
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
measurementMaxTime.putIfAbsent(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
Long.MIN_VALUE);
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
>= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
iterator.currentTime());
if (i == 0
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
} else {
assertEquals(600, count);
}
}
}
}
/**
* Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
* is 0 ~ 299 and 350 ~ 649.<br>
* third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
* range is 700 ~ 999 and 1050 ~ 1349.<br>
*
* <p>UnSeq files<br>
* first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
* ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*
* <p>The data of d0, d1 and d2 is deleted in each seq file.
*/
@Test
public void testCrossSpaceCompactionWithAllDataDeletedInDeviceInSeqFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
// generate mods file
List<String> seriesPaths = new ArrayList<>();
for (int i = 0; i < 5; i++) {
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s" + i);
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" + PATH_SEPARATOR + "s" + i);
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2" + PATH_SEPARATOR + "s" + i);
}
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (i == 0
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < 1) {
if (j < 4) {
assertEquals(630, count);
} else {
assertEquals(200, count);
}
} else if (i < 3) {
if (j < 4) {
assertEquals(600, count);
} else {
assertEquals(0, count);
}
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(4, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
for (int i = 0; i < 2; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
for (int i = 2; i < 3; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
deviceIdList.clear();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
for (int i = 3; i < 4; i++) {
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
measurementMaxTime.putIfAbsent(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
Long.MIN_VALUE);
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
>= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
iterator.currentTime());
if (i == 0
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < 1) {
if (j < 4) {
assertEquals(630, count);
} else {
assertEquals(200, count);
}
} else if (i < 3) {
if (j < 4) {
assertEquals(600, count);
} else {
assertEquals(0, count);
}
} else {
assertEquals(600, count);
}
}
}
}
/**
* Total 5 seq files and 5 unseq files, each file has the same aligned timeseries
*
* <p>Seq files has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 99, 100 ~ 199, 200 ~ 299, 300 ~ 399 and
* 400 ~ 499, value range is 0 ~ 99, 100 ~ 199, 200 ~ 299, 300 ~ 399 and 400 ~ 499.
*
* <p>UnSeq files has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 49, 100 ~ 149, 200 ~ 249, 300 ~ 349
* and 400 ~ 449, value range is 10000 ~ 10049, 10100 ~ 10149, 10200 ~ 10249, 10300 ~ 10349 and
* 10400 ~ 10449.
*/
@Test
public void testAlignedCrossSpaceCompactionWithSameTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
registerTimeseriesInMManger(2, 3, true);
createFiles(5, 2, 3, 100, 0, 0, 0, 0, true, true);
createFiles(5, 2, 3, 50, 0, 10000, 50, 50, true, false);
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s1", TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000",
Collections.singletonList("s1"),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() % 100 < 50) {
assertEquals(
iterator.currentTime() + 10000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(500, count);
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() % 100 < 50) {
assertEquals(
iterator.currentTime() + 10000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
assertEquals(500, count);
}
/**
* Total 4 seq files and 4 unseq files, each file has different aligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
* is 0 ~ 299 and 350 ~ 649.<br>
* third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
* range is 700 ~ 999 and 1050 ~ 1349.<br>
*
* <p>UnSeq files<br>
* first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
* ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*/
@Test
public void testAlignedCrossSpaceCompactionWithDifferentTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
i++) {
for (int j = 0; j < 5; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
iterator.currentTime() + 20000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
iterator.currentTime() + 10000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
assertEquals(1230, count);
} else if (i == TsFileGeneratorUtils.getAlignDeviceOffset()) {
assertEquals(800, count);
} else if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j == 4)) {
assertEquals(600, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
i++) {
for (int j = 0; j < 5; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
iterator.currentTime() + 20000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
iterator.currentTime() + 10000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
assertEquals(1230, count);
} else if (i == TsFileGeneratorUtils.getAlignDeviceOffset()) {
assertEquals(800, count);
} else if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j == 4)) {
assertEquals(600, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
/**
* Total 4 seq files and 5 unseq files, each file has different aligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
* is 0 ~ 299 and 350 ~ 649.<br>
* third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
* range is 700 ~ 999 and 1050 ~ 1349.<br>
*
* <p>UnSeq files<br>
* first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
* ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*
* <p>The data of d0.s0, d0.s1, d2.s4 and d3.s4 is deleted in each file.
*/
@Test
public void testAlignedCrossSpaceCompactionWithAllDataDeletedInTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
// generate mods file
List<String> seriesPaths = new ArrayList<>();
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ TsFileGeneratorUtils.getAlignDeviceOffset()
+ PATH_SEPARATOR
+ "s0");
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ TsFileGeneratorUtils.getAlignDeviceOffset()
+ PATH_SEPARATOR
+ "s1");
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)
+ PATH_SEPARATOR
+ "s4");
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)
+ PATH_SEPARATOR
+ "s4");
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
deleteTimeseriesInMManager(seriesPaths);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
i++) {
for (int j = 0; j < 5; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
iterator.currentTime() + 20000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
iterator.currentTime() + 10000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j == 4)) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
assertEquals(1230, count);
} else if (i == TsFileGeneratorUtils.getAlignDeviceOffset()) {
assertEquals(800, count);
} else if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j == 4)) {
assertEquals(600, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(4, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001");
for (int i = 0; i < 2; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003"));
check(targetResources.get(i), deviceIdList);
}
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003");
for (int i = 2; i < 4; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003"));
check(targetResources.get(i), deviceIdList);
}
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
i++) {
for (int j = 0; j < 5; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
iterator.currentTime() + 20000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
iterator.currentTime() + 10000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j == 4)) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
assertEquals(1230, count);
} else if (i == TsFileGeneratorUtils.getAlignDeviceOffset()) {
assertEquals(800, count);
} else if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j == 4)) {
assertEquals(600, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
/**
* Total 4 seq files and 5 unseq files, each file has different aligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
* is 0 ~ 299 and 350 ~ 649.<br>
* third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
* range is 700 ~ 999 and 1050 ~ 1349.<br>
*
* <p>UnSeq files<br>
* first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
* ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*
* <p>The data of d0, d1 and d2 is deleted in each file. The first target file is empty.
*/
@Test
public void testAlignedCrossSpaceCompactionWithAllDataDeletedInOneTargetFile()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
// generate mods file
List<String> seriesPaths = new ArrayList<>();
for (int i = 0; i < 5; i++) {
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ TsFileGeneratorUtils.getAlignDeviceOffset()
+ PATH_SEPARATOR
+ "s"
+ i);
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)
+ PATH_SEPARATOR
+ "s"
+ i);
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)
+ PATH_SEPARATOR
+ "s"
+ i);
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s" + i);
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" + PATH_SEPARATOR + "s" + i);
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2" + PATH_SEPARATOR + "s" + i);
}
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
deleteTimeseriesInMManager(seriesPaths);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
i++) {
for (int j = 0; j < 5; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
iterator.currentTime() + 20000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
iterator.currentTime() + 10000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
}
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset())
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2)) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
assertEquals(1230, count);
} else if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j == 4)) {
assertEquals(600, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
i++) {
for (int j = 0; j < 5; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
iterator.currentTime() + 20000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
iterator.currentTime() + 10000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
}
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset())
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2)) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
assertEquals(1230, count);
} else if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j == 4)) {
assertEquals(600, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
/**
* Different source files have timeseries with the same path, but different data types. Because
* timeseries in the former file is been deleted.
*/
@Test
public void testCrossSpaceCompactionWithSameTimeseriesInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
// generate mods file
List<String> seriesPaths = new ArrayList<>();
for (int i = 0; i < 5; i++) {
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s" + i);
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" + PATH_SEPARATOR + "s" + i);
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2" + PATH_SEPARATOR + "s" + i);
}
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
deleteTimeseriesInMManager(seriesPaths);
setDataType(TSDataType.TEXT);
registerTimeseriesInMManger(2, 7, false);
List<Integer> deviceIndex = new ArrayList<>();
for (int i = 0; i < 2; i++) {
deviceIndex.add(i);
}
List<Integer> measurementIndex = new ArrayList<>();
for (int i = 0; i < 7; i++) {
measurementIndex.add(i);
}
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1350, 0, false, false);
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(2, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
for (int i = 0; i < 2; i++) {
if (i == 0) {
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
} else {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
}
check(targetResources.get(i), deviceIdList);
}
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 4; i++) {
TSDataType tsDataType = i < 2 ? TSDataType.TEXT : TSDataType.INT64;
for (int j = 0; j < 7; j++) {
measurementMaxTime.putIfAbsent(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
Long.MIN_VALUE);
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, tsDataType));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
tsDataType,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
>= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
iterator.currentTime());
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < 2 && j < 7) {
assertEquals(300, count);
} else if (i == 3 && j < 5) {
assertEquals(600, count);
} else {
assertEquals(0, count);
}
}
}
}
/** Each source file has different device. */
@Test
public void testCrossSpaceCompactionWithDifferentDevicesInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, false);
List<Integer> deviceIndex = new ArrayList<>();
List<Integer> measurementIndex = new ArrayList<>();
for (int i = 0; i < 7; i++) {
measurementIndex.add(i);
}
deviceIndex.add(0);
deviceIndex.add(2);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, false, true);
deviceIndex.clear();
deviceIndex.add(1);
deviceIndex.add(3);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, false, true);
deviceIndex.clear();
deviceIndex.add(2);
deviceIndex.add(4);
deviceIndex.add(0);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, false, true);
deviceIndex.clear();
deviceIndex.add(1);
deviceIndex.add(4);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, false, false);
deviceIndex.clear();
deviceIndex.add(1);
deviceIndex.add(3);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 400, 600, 0, false, false);
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4");
for (int i = 0; i < 3; i++) {
if (i == 0) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
} else if (i == 1) {
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
} else {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
}
check(targetResources.get(i), deviceIdList);
}
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 5; j++) {
measurementMaxTime.putIfAbsent(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
Long.MIN_VALUE);
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.TEXT));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.TEXT,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
>= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
iterator.currentTime());
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == 0 || i == 2 || i == 3) {
assertEquals(600, count);
} else if (i == 1) {
assertEquals(800, count);
} else {
assertEquals(500, count);
}
}
}
}
/** Each source file has same device with different measurements. */
@Test
public void testCrossSpaceCompactionWithDifferentMeasurementsInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 5, false);
List<Integer> deviceIndex = new ArrayList<>();
List<Integer> measurementIndex = new ArrayList<>();
for (int i = 0; i < 5; i++) {
deviceIndex.add(i);
}
measurementIndex.add(0);
measurementIndex.add(2);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, false, true);
measurementIndex.clear();
measurementIndex.add(1);
measurementIndex.add(3);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, false, true);
measurementIndex.clear();
measurementIndex.add(2);
measurementIndex.add(4);
measurementIndex.add(0);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, false, true);
measurementIndex.clear();
measurementIndex.add(1);
measurementIndex.add(4);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, false, false);
measurementIndex.clear();
measurementIndex.add(0);
measurementIndex.add(2);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 400, 0, false, false);
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4");
for (int i = 0; i < 3; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
check(targetResources.get(i), deviceIdList);
}
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 5; j++) {
measurementMaxTime.putIfAbsent(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
Long.MIN_VALUE);
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.TEXT));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.TEXT,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
>= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
iterator.currentTime());
count++;
iterator.next();
}
}
tsBlockReader.close();
if (j == 0 || j == 2) {
assertEquals(800, count);
} else if (j == 1 || j == 4) {
assertEquals(500, count);
} else {
assertEquals(300, count);
}
}
}
}
/** Each source file has different devices and different measurements. */
@Test
public void testCrossSpaceCompactionWithDifferentDevicesAndMeasurementsInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
List<Integer> deviceIndex = new ArrayList<>();
List<Integer> measurementIndex = new ArrayList<>();
deviceIndex.add(0);
deviceIndex.add(1);
measurementIndex.add(0);
measurementIndex.add(2);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, false, true);
measurementIndex.clear();
measurementIndex.add(1);
measurementIndex.add(3);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, false, true);
deviceIndex.add(2);
deviceIndex.add(3);
measurementIndex.clear();
measurementIndex.add(2);
measurementIndex.add(4);
measurementIndex.add(0);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, false, true);
deviceIndex.remove(2);
deviceIndex.remove(2);
measurementIndex.clear();
measurementIndex.add(1);
measurementIndex.add(4);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, false, false);
measurementIndex.clear();
measurementIndex.add(0);
measurementIndex.add(2);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 600, 0, false, false);
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
for (int i = 0; i < 3; i++) {
if (i < 2) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
} else {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
}
check(targetResources.get(i), deviceIdList);
}
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
measurementMaxTime.putIfAbsent(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
Long.MIN_VALUE);
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.TEXT));
IBatchReader tsFilesReader =
new SeriesRawDataBatchReader(
path,
TSDataType.VECTOR,
EnvironmentUtils.TEST_QUERY_CONTEXT,
targetResources,
new ArrayList<>(),
null,
null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
BatchData batchData = tsFilesReader.nextBatch();
while (batchData.hasCurrent()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
>= batchData.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
batchData.currentTime());
count++;
batchData.next();
}
}
tsFilesReader.close();
if (i < 2) {
if (j == 0 || j == 2) {
assertEquals(800, count);
} else if (j == 1 || j == 4) {
assertEquals(500, count);
} else {
assertEquals(300, count);
}
} else {
if (j == 0 || j == 2 || j == 4) {
assertEquals(300, count);
} else {
assertEquals(0, count);
}
}
}
}
}
/**
* Different source files have timeseries with the same path, but different data types. Because
* timeseries in the former file is been deleted.
*/
@Test
public void testAlignedCrossSpaceCompactionWithSameTimeseriesInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
// generate mods file
List<String> seriesPaths = new ArrayList<>();
for (int i = 0; i < 5; i++) {
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ TsFileGeneratorUtils.getAlignDeviceOffset()
+ PATH_SEPARATOR
+ "s"
+ i);
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)
+ PATH_SEPARATOR
+ "s"
+ i);
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)
+ PATH_SEPARATOR
+ "s"
+ i);
}
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
deleteTimeseriesInMManager(seriesPaths);
setDataType(TSDataType.TEXT);
registerTimeseriesInMManger(2, 7, true);
List<Integer> deviceIndex = new ArrayList<>();
for (int i = 0; i < 2; i++) {
deviceIndex.add(i);
}
List<Integer> measurementIndex = new ArrayList<>();
for (int i = 0; i < 7; i++) {
measurementIndex.add(i);
}
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1350, 0, true, false);
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(2, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + (TsFileGeneratorUtils.getAlignDeviceOffset()));
deviceIdList.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 1));
deviceIdList.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2));
deviceIdList.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3));
for (int i = 0; i < 2; i++) {
if (i == 0) {
Assert.assertFalse(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset())));
Assert.assertFalse(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
Assert.assertFalse(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
} else {
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset())));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
Assert.assertFalse(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
}
check(targetResources.get(i), deviceIdList);
}
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 4; i++) {
TSDataType tsDataType = i < 2 ? TSDataType.TEXT : TSDataType.INT64;
for (int j = 0; j < 7; j++) {
measurementMaxTime.putIfAbsent(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ PATH_SEPARATOR
+ "s"
+ j,
Long.MIN_VALUE);
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, tsDataType));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + i),
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
tsDataType,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ PATH_SEPARATOR
+ "s"
+ j)
>= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ PATH_SEPARATOR
+ "s"
+ j,
iterator.currentTime());
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < 2 && j < 7) {
assertEquals(300, count);
} else if (i == 3 && j < 5) {
assertEquals(600, count);
} else {
assertEquals(0, count);
}
}
}
}
/** Each source file has different device. */
@Test
public void testAlignedCrossSpaceCompactionWithDifferentDevicesInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, true);
List<Integer> deviceIndex = new ArrayList<>();
List<Integer> measurementIndex = new ArrayList<>();
for (int i = 0; i < 7; i++) {
measurementIndex.add(i);
}
deviceIndex.add(0);
deviceIndex.add(2);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, true, true);
deviceIndex.clear();
deviceIndex.add(1);
deviceIndex.add(3);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, true, true);
deviceIndex.clear();
deviceIndex.add(2);
deviceIndex.add(4);
deviceIndex.add(0);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, true, true);
deviceIndex.clear();
deviceIndex.add(1);
deviceIndex.add(4);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, true, false);
deviceIndex.clear();
deviceIndex.add(1);
deviceIndex.add(3);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 400, 600, 0, true, false);
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + TsFileGeneratorUtils.getAlignDeviceOffset());
deviceIdList.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 1));
deviceIdList.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2));
deviceIdList.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3));
deviceIdList.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 4));
for (int i = 0; i < 3; i++) {
if (i == 0) {
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset())));
Assert.assertFalse(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
Assert.assertFalse(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
Assert.assertFalse(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 4)));
} else if (i == 1) {
Assert.assertFalse(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset())));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
Assert.assertFalse(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
Assert.assertFalse(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 4)));
} else {
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset())));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 4)));
}
check(targetResources.get(i), deviceIdList);
}
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 5; j++) {
measurementMaxTime.putIfAbsent(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ PATH_SEPARATOR
+ "s"
+ j,
Long.MIN_VALUE);
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.TEXT));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + i),
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.TEXT,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ PATH_SEPARATOR
+ "s"
+ j)
>= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ PATH_SEPARATOR
+ "s"
+ j,
iterator.currentTime());
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == 0 || i == 2 || i == 3) {
assertEquals(600, count);
} else if (i == 1) {
assertEquals(800, count);
} else {
assertEquals(500, count);
}
}
}
}
@Test
public void testAlignedCrossSpaceCompactionWithDifferentMeasurementsInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 5, true);
List<Integer> deviceIndex = new ArrayList<>();
List<Integer> measurementIndex = new ArrayList<>();
for (int i = 0; i < 5; i++) {
deviceIndex.add(i);
}
measurementIndex.add(0);
measurementIndex.add(2);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, true, true);
measurementIndex.clear();
measurementIndex.add(1);
measurementIndex.add(3);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, true, true);
measurementIndex.clear();
measurementIndex.add(2);
measurementIndex.add(4);
measurementIndex.add(0);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, true, true);
measurementIndex.clear();
measurementIndex.add(1);
measurementIndex.add(4);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, true, false);
measurementIndex.clear();
measurementIndex.add(0);
measurementIndex.add(2);
createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 400, 0, true, false);
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + TsFileGeneratorUtils.getAlignDeviceOffset());
deviceIdList.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 1));
deviceIdList.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2));
deviceIdList.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3));
deviceIdList.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 4));
for (int i = 0; i < 3; i++) {
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset())));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
Assert.assertTrue(
targetResources
.get(i)
.isDeviceIdExist(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 4)));
check(targetResources.get(i), deviceIdList);
}
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 5; j++) {
measurementMaxTime.putIfAbsent(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ PATH_SEPARATOR
+ "s"
+ j,
Long.MIN_VALUE);
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.TEXT));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + i),
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.TEXT,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ PATH_SEPARATOR
+ "s"
+ j)
>= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ PATH_SEPARATOR
+ "s"
+ j,
iterator.currentTime());
count++;
iterator.next();
}
}
tsBlockReader.close();
if (j == 0 || j == 2) {
assertEquals(800, count);
} else if (j == 1 || j == 4) {
assertEquals(500, count);
} else {
assertEquals(300, count);
}
}
}
}
/**
* Total 4 seq files and 5 unseq files, each file has different aligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
* is 0 ~ 299 and 350 ~ 649.<br>
* third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
* range is 700 ~ 999 and 1050 ~ 1349.<br>
*
* <p>UnSeq files<br>
* first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
* ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*
* <p>The data of d0, d1 and d2 is deleted in each file. The first target file is empty.
*/
@Test
public void testAlignedCrossSpaceCompactionWithFileTimeIndexResource()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
// generate mods file
List<String> seriesPaths = new ArrayList<>();
for (int i = 0; i < 5; i++) {
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ TsFileGeneratorUtils.getAlignDeviceOffset()
+ PATH_SEPARATOR
+ "s"
+ i);
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)
+ PATH_SEPARATOR
+ "s"
+ i);
seriesPaths.add(
COMPACTION_TEST_SG
+ PATH_SEPARATOR
+ "d"
+ (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)
+ PATH_SEPARATOR
+ "s"
+ i);
}
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
deleteTimeseriesInMManager(seriesPaths);
for (TsFileResource resource : seqResources) {
resource.setTimeIndexType((byte) 2);
}
for (TsFileResource resource : unseqResources) {
resource.setTimeIndexType((byte) 2);
}
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
i++) {
for (int j = 0; j < 5; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
iterator.currentTime() + 20000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
iterator.currentTime() + 10000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
}
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset())
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2)) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
assertEquals(1230, count);
} else if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j == 4)) {
assertEquals(600, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
i++) {
for (int j = 0; j < 5; j++) {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
AlignedPath path =
new AlignedPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
&& ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
|| (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
iterator.currentTime() + 20000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
&& ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
|| (250 <= iterator.currentTime() && iterator.currentTime() < 450)
|| (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
iterator.currentTime() + 10000,
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
iterator.currentTime(),
((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
}
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset())
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2)) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
assertEquals(1230, count);
} else if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j == 4)) {
assertEquals(600, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4) {
assertEquals(1200, count);
} else {
assertEquals(600, count);
}
}
}
}
@Test
public void testCrossSpaceCompactionWithNewDeviceInUnseqFile() throws ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
try {
registerTimeseriesInMManger(6, 6, false);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
createFiles(3, 6, 6, 200, 20, 10020, 30, 30, false, false);
createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(4, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
for (int i = 0; i < 2; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
for (int i = 2; i < 3; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d5");
for (int i = 3; i < 4; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d5"));
check(targetResources.get(i), deviceIdList);
}
} catch (MetadataException
| IOException
| WriteProcessException
| StorageEngineException
| InterruptedException e) {
e.printStackTrace();
Assert.fail();
}
}
@Test
public void testCrossSpaceCompactionWithDeviceMaxTimeLaterInUnseqFile()
throws ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
try {
registerTimeseriesInMManger(6, 6, false);
createFiles(2, 2, 3, 200, 0, 0, 0, 0, false, true);
createFiles(3, 4, 4, 300, 20, 10020, 0, 0, false, false);
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
ICompactionPerformer performer =
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(2, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
for (int i = 0; i < 1; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertFalse(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
for (int i = 1; i < 2; i++) {
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
Assert.assertTrue(
targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
check(targetResources.get(i), deviceIdList);
}
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
PartialPath path =
new MeasurementPath(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
IDataBlockReader tsBlockReader =
new SeriesDataBlockReader(
path,
TSDataType.INT64,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
true);
int count = 0;
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
while (iterator.hasNext()) {
if (iterator.currentTime() < 20) {
assertEquals(iterator.currentTime(), iterator.currentValue());
} else {
assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
}
count++;
iterator.next();
}
}
tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(920, count);
} else {
assertEquals(900, count);
}
}
}
} catch (MetadataException
| IOException
| WriteProcessException
| StorageEngineException
| InterruptedException e) {
e.printStackTrace();
Assert.fail();
}
}
private void generateModsFile(
List<String> seriesPaths, List<TsFileResource> resources, long startValue, long endValue)
throws IllegalPathException, IOException {
for (TsFileResource resource : resources) {
Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
for (String path : seriesPaths) {
deleteMap.put(path, new Pair<>(startValue, endValue));
}
CompactionFileGeneratorUtils.generateMods(deleteMap, resource, false);
}
}
/**
* Check whether target file contain empty chunk group or not. Assert fail if it contains empty
* chunk group whose deviceID is not in the deviceIdList.
*/
public void check(TsFileResource targetResource, List<String> deviceIdList) throws IOException {
byte marker;
try (TsFileSequenceReader reader =
new TsFileSequenceReader(targetResource.getTsFile().getAbsolutePath())) {
reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
case MetaMarker.CHUNK_HEADER:
case MetaMarker.TIME_CHUNK_HEADER:
case MetaMarker.VALUE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
ChunkHeader header = reader.readChunkHeader(marker);
int dataSize = header.getDataSize();
reader.position(reader.position() + dataSize);
break;
case MetaMarker.CHUNK_GROUP_HEADER:
ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
String deviceID = chunkGroupHeader.getDeviceID();
if (!deviceIdList.contains(deviceID)) {
Assert.fail(
"Target file "
+ targetResource.getTsFile().getPath()
+ " contains empty chunk group "
+ deviceID);
}
break;
case MetaMarker.OPERATION_INDEX_RANGE:
reader.readPlanIndex();
break;
default:
// the disk file is corrupted, using this file may be dangerous
throw new IOException("Unexpected marker " + marker);
}
}
}
}
}