blob: 38fb472b0cb2d05d3787aa77bf541dd5f17cf1ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.inner;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.storageengine.buffer.ChunkCache;
import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
import org.apache.iotdb.db.storageengine.dataregion.compaction.TestUtilsForAlignedSeries;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionCheckerUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionConfigRestorer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class ReadChunkCompactionPerformerAlignedTest {
private static final String storageGroup = "root.testAlignedCompaction";
private final ICompactionPerformer performer = new ReadChunkCompactionPerformer();
private static File dataDirectory =
new File(
TestConstant.BASE_OUTPUT_PATH
+ "data".concat(File.separator)
+ "sequence".concat(File.separator)
+ storageGroup.concat(File.separator)
+ "0".concat(File.separator)
+ "0".concat(File.separator));
@Before
public void setUp() throws Exception {
if (!dataDirectory.exists()) {
Assert.assertTrue(dataDirectory.mkdirs());
}
}
@After
public void tearDown() throws Exception {
new CompactionConfigRestorer().restoreCompactionConfig();
FileUtils.forceDelete(dataDirectory);
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
EnvironmentUtils.cleanEnv();
}
@Test
public void testSimpleAlignedTsFileCompaction() throws Exception {
List<String> devices = new ArrayList<>();
for (int i = 0; i < 5; ++i) {
devices.add(storageGroup + ".d" + i);
}
boolean[] aligned = new boolean[] {true, true, true, true, true};
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s0", TSDataType.DOUBLE));
schemas.add(new MeasurementSchema("s1", TSDataType.FLOAT));
schemas.add(new MeasurementSchema("s2", TSDataType.INT64));
schemas.add(new MeasurementSchema("s3", TSDataType.INT32));
schemas.add(new MeasurementSchema("s4", TSDataType.TEXT));
schemas.add(new MeasurementSchema("s5", TSDataType.BOOLEAN));
TestUtilsForAlignedSeries.registerTimeSeries(
storageGroup,
devices.toArray(new String[] {}),
schemas.toArray(new IMeasurementSchema[] {}),
aligned);
boolean[] randomNull = new boolean[] {false, false, false, false, false};
int timeInterval = 500;
List<TsFileResource> resources = new ArrayList<>();
for (int i = 1; i < 31; i++) {
TsFileResource resource =
new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", i, i)));
TestUtilsForAlignedSeries.writeTsFile(
devices.toArray(new String[] {}),
schemas.toArray(new IMeasurementSchema[0]),
resource,
aligned,
timeInterval * i,
timeInterval * (i + 1),
randomNull);
resources.add(resource);
}
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(resources, true);
List<PartialPath> fullPaths = new ArrayList<>();
List<IMeasurementSchema> iMeasurementSchemas = new ArrayList<>();
List<String> measurementIds = new ArrayList<>();
schemas.forEach(
(e) -> {
measurementIds.add(e.getMeasurementId());
});
for (String device : devices) {
iMeasurementSchemas.addAll(schemas);
fullPaths.add(new AlignedPath(device, measurementIds, schemas));
}
Map<PartialPath, List<TimeValuePair>> originData =
CompactionCheckerUtils.getDataByQuery(
fullPaths, iMeasurementSchemas, resources, new ArrayList<>());
performer.setSourceFiles(resources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(Collections.singletonList(targetResource), true, storageGroup);
CompactionUtils.moveTargetFile(Collections.singletonList(targetResource), true, storageGroup);
Map<PartialPath, List<TimeValuePair>> compactedData =
CompactionCheckerUtils.getDataByQuery(
fullPaths,
iMeasurementSchemas,
Collections.singletonList(targetResource),
new ArrayList<>());
CompactionCheckerUtils.validDataByValueList(originData, compactedData);
}
@Test
public void testAlignedTsFileWithModificationCompaction() throws Exception {
List<String> devices = new ArrayList<>();
for (int i = 0; i < 5; ++i) {
devices.add(storageGroup + ".d" + i);
}
boolean[] aligned = new boolean[] {true, true, true, true, true};
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s0", TSDataType.DOUBLE));
schemas.add(new MeasurementSchema("s1", TSDataType.FLOAT));
schemas.add(new MeasurementSchema("s2", TSDataType.INT64));
schemas.add(new MeasurementSchema("s3", TSDataType.INT32));
schemas.add(new MeasurementSchema("s4", TSDataType.TEXT));
schemas.add(new MeasurementSchema("s5", TSDataType.BOOLEAN));
TestUtilsForAlignedSeries.registerTimeSeries(
storageGroup,
devices.toArray(new String[] {}),
schemas.toArray(new IMeasurementSchema[] {}),
aligned);
boolean[] randomNull = new boolean[] {false, false, false, false, false};
int timeInterval = 500;
List<TsFileResource> resources = new ArrayList<>();
for (int i = 1; i < 31; i++) {
TsFileResource resource =
new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", i, i)));
TestUtilsForAlignedSeries.writeTsFile(
devices.toArray(new String[] {}),
schemas.toArray(new IMeasurementSchema[0]),
resource,
aligned,
timeInterval * i,
timeInterval * (i + 1),
randomNull);
Pair<Long, Long> deleteInterval = new Pair<>(timeInterval * i + 10L, timeInterval * i + 20L);
Map<String, Pair<Long, Long>> deletionMap = new HashMap<>();
for (String device : devices) {
deletionMap.put(device + ".s0", deleteInterval);
}
CompactionFileGeneratorUtils.generateMods(deletionMap, resource, false);
resources.add(resource);
}
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(resources, true);
List<PartialPath> fullPaths = new ArrayList<>();
List<IMeasurementSchema> iMeasurementSchemas = new ArrayList<>();
List<String> measurementIds = new ArrayList<>();
schemas.forEach(
(e) -> {
measurementIds.add(e.getMeasurementId());
});
for (String device : devices) {
iMeasurementSchemas.addAll(schemas);
fullPaths.add(new AlignedPath(device, measurementIds, schemas));
}
Map<PartialPath, List<TimeValuePair>> originData =
CompactionCheckerUtils.getDataByQuery(
fullPaths, iMeasurementSchemas, resources, new ArrayList<>());
performer.setSourceFiles(resources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(Collections.singletonList(targetResource), true, storageGroup);
Map<PartialPath, List<TimeValuePair>> compactedData =
CompactionCheckerUtils.getDataByQuery(
fullPaths,
iMeasurementSchemas,
Collections.singletonList(targetResource),
new ArrayList<>());
CompactionCheckerUtils.validDataByValueList(originData, compactedData);
}
@Test
public void testAlignedTsFileWithNullValueCompaction() throws Exception {
List<String> devices = new ArrayList<>();
for (int i = 0; i < 5; ++i) {
devices.add(storageGroup + ".d" + i);
}
boolean[] aligned = new boolean[] {true, true, true, true, true};
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s0", TSDataType.DOUBLE));
schemas.add(new MeasurementSchema("s1", TSDataType.FLOAT));
schemas.add(new MeasurementSchema("s2", TSDataType.INT64));
schemas.add(new MeasurementSchema("s3", TSDataType.INT32));
schemas.add(new MeasurementSchema("s4", TSDataType.TEXT));
schemas.add(new MeasurementSchema("s5", TSDataType.BOOLEAN));
TestUtilsForAlignedSeries.registerTimeSeries(
storageGroup,
devices.toArray(new String[] {}),
schemas.toArray(new IMeasurementSchema[] {}),
aligned);
boolean[] randomNull = new boolean[] {true, false, true, false, true};
int timeInterval = 500;
List<TsFileResource> resources = new ArrayList<>();
for (int i = 1; i < 31; i++) {
TsFileResource resource =
new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", i, i)));
TestUtilsForAlignedSeries.writeTsFile(
devices.toArray(new String[] {}),
schemas.toArray(new IMeasurementSchema[0]),
resource,
aligned,
timeInterval * i,
timeInterval * (i + 1),
randomNull);
resources.add(resource);
}
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(resources, true);
List<PartialPath> fullPaths = new ArrayList<>();
List<IMeasurementSchema> iMeasurementSchemas = new ArrayList<>();
List<String> measurementIds = new ArrayList<>();
schemas.forEach(
(e) -> {
measurementIds.add(e.getMeasurementId());
});
for (String device : devices) {
iMeasurementSchemas.addAll(schemas);
fullPaths.add(new AlignedPath(device, measurementIds, schemas));
}
Map<PartialPath, List<TimeValuePair>> originData =
CompactionCheckerUtils.getDataByQuery(
fullPaths, iMeasurementSchemas, resources, new ArrayList<>());
performer.setSourceFiles(resources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(Collections.singletonList(targetResource), true, storageGroup);
Map<PartialPath, List<TimeValuePair>> compactedData =
CompactionCheckerUtils.getDataByQuery(
fullPaths,
iMeasurementSchemas,
Collections.singletonList(targetResource),
new ArrayList<>());
CompactionCheckerUtils.validDataByValueList(originData, compactedData);
}
@Test
public void testAlignedTsFileWithDifferentSchemaInDifferentTsFileCompaction() throws Exception {
List<String> devices = new ArrayList<>();
for (int i = 0; i < 5; ++i) {
devices.add(storageGroup + ".d" + i);
}
boolean[] aligned = new boolean[] {true, true, true, true, true};
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s0", TSDataType.DOUBLE));
schemas.add(new MeasurementSchema("s1", TSDataType.FLOAT));
schemas.add(new MeasurementSchema("s2", TSDataType.INT64));
schemas.add(new MeasurementSchema("s3", TSDataType.INT32));
schemas.add(new MeasurementSchema("s4", TSDataType.TEXT));
schemas.add(new MeasurementSchema("s5", TSDataType.BOOLEAN));
TestUtilsForAlignedSeries.registerTimeSeries(
storageGroup,
devices.toArray(new String[] {}),
schemas.toArray(new IMeasurementSchema[] {}),
aligned);
boolean[] randomNull = new boolean[] {true, false, true, false, true};
int timeInterval = 500;
Random random = new Random(1);
List<TsFileResource> resources = new ArrayList<>();
for (int i = 1; i < 31; i++) {
TsFileResource resource =
new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", i, i)));
TestUtilsForAlignedSeries.writeTsFile(
devices.toArray(new String[] {}),
schemas
.subList(0, random.nextInt(schemas.size() - 1) + 1)
.toArray(new IMeasurementSchema[0]),
resource,
aligned,
timeInterval * i,
timeInterval * (i + 1),
randomNull);
resources.add(resource);
}
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(resources, true);
List<PartialPath> fullPaths = new ArrayList<>();
List<IMeasurementSchema> iMeasurementSchemas = new ArrayList<>();
List<String> measurementIds = new ArrayList<>();
schemas.forEach(
(e) -> {
measurementIds.add(e.getMeasurementId());
});
for (String device : devices) {
iMeasurementSchemas.addAll(schemas);
fullPaths.add(new AlignedPath(device, measurementIds, schemas));
}
Map<PartialPath, List<TimeValuePair>> originData =
CompactionCheckerUtils.getDataByQuery(
fullPaths, iMeasurementSchemas, resources, new ArrayList<>());
performer.setSourceFiles(resources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(Collections.singletonList(targetResource), true, storageGroup);
Map<PartialPath, List<TimeValuePair>> compactedData =
CompactionCheckerUtils.getDataByQuery(
fullPaths,
iMeasurementSchemas,
Collections.singletonList(targetResource),
new ArrayList<>());
CompactionCheckerUtils.validDataByValueList(originData, compactedData);
}
@Test
public void testAlignedTsFileWithDifferentDataTypeCompaction() throws Exception {
List<String> devices = new ArrayList<>();
for (int i = 0; i < 5; ++i) {
devices.add(storageGroup + ".d" + i);
}
boolean[] aligned = new boolean[] {false, true, false, true, false};
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s0", TSDataType.DOUBLE));
schemas.add(new MeasurementSchema("s1", TSDataType.FLOAT));
schemas.add(new MeasurementSchema("s2", TSDataType.INT64));
schemas.add(new MeasurementSchema("s3", TSDataType.INT32));
schemas.add(new MeasurementSchema("s4", TSDataType.TEXT));
schemas.add(new MeasurementSchema("s5", TSDataType.BOOLEAN));
TestUtilsForAlignedSeries.registerTimeSeries(
storageGroup,
devices.toArray(new String[] {}),
schemas.toArray(new IMeasurementSchema[] {}),
aligned);
boolean[] randomNull = new boolean[] {true, false, true, false, true};
int timeInterval = 500;
Random random = new Random(1);
List<TsFileResource> resources = new ArrayList<>();
for (int i = 1; i < 31; i++) {
TsFileResource resource =
new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", i, i)));
TestUtilsForAlignedSeries.writeTsFile(
devices.toArray(new String[] {}),
schemas.toArray(new IMeasurementSchema[0]),
resource,
aligned,
timeInterval * i,
timeInterval * (i + 1),
randomNull);
resources.add(resource);
}
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(resources, true);
List<PartialPath> fullPaths = new ArrayList<>();
List<IMeasurementSchema> iMeasurementSchemas = new ArrayList<>();
List<String> measurementIds = new ArrayList<>();
schemas.forEach(
(e) -> {
measurementIds.add(e.getMeasurementId());
});
for (String device : devices) {
iMeasurementSchemas.addAll(schemas);
fullPaths.add(new AlignedPath(device, measurementIds, schemas));
}
Map<PartialPath, List<TimeValuePair>> originData =
CompactionCheckerUtils.getDataByQuery(
fullPaths, iMeasurementSchemas, resources, new ArrayList<>());
performer.setSourceFiles(resources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(Collections.singletonList(targetResource), true, storageGroup);
Map<PartialPath, List<TimeValuePair>> compactedData =
CompactionCheckerUtils.getDataByQuery(
fullPaths,
iMeasurementSchemas,
Collections.singletonList(targetResource),
new ArrayList<>());
CompactionCheckerUtils.validDataByValueList(originData, compactedData);
}
@Test
public void testAlignedTsFileWithDifferentDataTypeInDifferentTsFileCompaction() throws Exception {
List<String> devices = new ArrayList<>();
for (int i = 0; i < 5; ++i) {
devices.add(storageGroup + ".d" + i);
}
boolean[] aligned = new boolean[] {false, true, false, true, false};
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s0", TSDataType.DOUBLE));
schemas.add(new MeasurementSchema("s1", TSDataType.FLOAT));
schemas.add(new MeasurementSchema("s2", TSDataType.INT64));
schemas.add(new MeasurementSchema("s3", TSDataType.INT32));
schemas.add(new MeasurementSchema("s4", TSDataType.TEXT));
schemas.add(new MeasurementSchema("s5", TSDataType.BOOLEAN));
TestUtilsForAlignedSeries.registerTimeSeries(
storageGroup,
devices.toArray(new String[] {}),
schemas.toArray(new IMeasurementSchema[] {}),
aligned);
boolean[] randomNull = new boolean[] {true, false, true, false, true};
int timeInterval = 500;
Random random = new Random(5);
List<TsFileResource> resources = new ArrayList<>();
for (int i = 1; i < 31; i++) {
TsFileResource resource =
new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", i, i)));
TestUtilsForAlignedSeries.writeTsFile(
devices.subList(0, random.nextInt(devices.size() - 1) + 1).toArray(new String[0]),
schemas
.subList(0, random.nextInt(schemas.size() - 1) + 1)
.toArray(new IMeasurementSchema[0]),
resource,
aligned,
timeInterval * i,
timeInterval * (i + 1),
randomNull);
resources.add(resource);
}
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(resources, true);
List<PartialPath> fullPaths = new ArrayList<>();
List<IMeasurementSchema> iMeasurementSchemas = new ArrayList<>();
List<String> measurementIds = new ArrayList<>();
schemas.forEach(
(e) -> {
measurementIds.add(e.getMeasurementId());
});
for (String device : devices) {
iMeasurementSchemas.addAll(schemas);
fullPaths.add(new AlignedPath(device, measurementIds, schemas));
}
Map<PartialPath, List<TimeValuePair>> originData =
CompactionCheckerUtils.getDataByQuery(
fullPaths, iMeasurementSchemas, resources, new ArrayList<>());
performer.setSourceFiles(resources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(Collections.singletonList(targetResource), true, storageGroup);
Map<PartialPath, List<TimeValuePair>> compactedData =
CompactionCheckerUtils.getDataByQuery(
fullPaths,
iMeasurementSchemas,
Collections.singletonList(targetResource),
new ArrayList<>());
CompactionCheckerUtils.validDataByValueList(originData, compactedData);
}
@Test
public void testAlignedTsFileWithBadSchemaCompaction() throws Exception {
List<String> devices = new ArrayList<>();
devices.add(storageGroup + ".d" + 0);
for (int i = 1; i < 5; ++i) {
devices.add(devices.get(i - 1) + ".d" + i);
}
boolean[] aligned = new boolean[] {false, true, false, true, false};
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s0", TSDataType.DOUBLE));
schemas.add(new MeasurementSchema("s1", TSDataType.FLOAT));
schemas.add(new MeasurementSchema("s2", TSDataType.INT64));
schemas.add(new MeasurementSchema("s3", TSDataType.INT32));
schemas.add(new MeasurementSchema("s4", TSDataType.TEXT));
schemas.add(new MeasurementSchema("s5", TSDataType.BOOLEAN));
TestUtilsForAlignedSeries.registerTimeSeries(
storageGroup,
devices.toArray(new String[] {}),
schemas.toArray(new IMeasurementSchema[] {}),
aligned);
boolean[] randomNull = new boolean[] {true, false, true, false, true};
int timeInterval = 500;
Random random = new Random(5);
List<TsFileResource> resources = new ArrayList<>();
for (int i = 1; i < 31; i++) {
TsFileResource resource =
new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", i, i)));
TestUtilsForAlignedSeries.writeTsFile(
devices.subList(0, random.nextInt(devices.size() - 1) + 1).toArray(new String[0]),
schemas
.subList(0, random.nextInt(schemas.size() - 1) + 1)
.toArray(new IMeasurementSchema[0]),
resource,
aligned,
timeInterval * i,
timeInterval * (i + 1),
randomNull);
resources.add(resource);
}
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(resources, true);
List<PartialPath> fullPaths = new ArrayList<>();
List<IMeasurementSchema> iMeasurementSchemas = new ArrayList<>();
List<String> measurementIds = new ArrayList<>();
schemas.forEach(
(e) -> {
measurementIds.add(e.getMeasurementId());
});
for (String device : devices) {
iMeasurementSchemas.addAll(schemas);
fullPaths.add(new AlignedPath(device, measurementIds, schemas));
}
Map<PartialPath, List<TimeValuePair>> originData =
CompactionCheckerUtils.getDataByQuery(
fullPaths, iMeasurementSchemas, resources, new ArrayList<>());
performer.setSourceFiles(resources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(Collections.singletonList(targetResource), true, storageGroup);
Map<PartialPath, List<TimeValuePair>> compactedData =
CompactionCheckerUtils.getDataByQuery(
fullPaths,
iMeasurementSchemas,
Collections.singletonList(targetResource),
new ArrayList<>());
CompactionCheckerUtils.validDataByValueList(originData, compactedData);
}
@Test
public void testAlignedTsFileWithEmptyChunkGroup() throws Exception {
List<String> devices = new ArrayList<>();
devices.add(storageGroup + ".d" + 0);
for (int i = 1; i < 5; ++i) {
devices.add(devices.get(i - 1) + ".d" + i);
}
boolean[] aligned = new boolean[] {false, true, false, true, false};
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s0", TSDataType.DOUBLE));
schemas.add(new MeasurementSchema("s1", TSDataType.FLOAT));
schemas.add(new MeasurementSchema("s2", TSDataType.INT64));
schemas.add(new MeasurementSchema("s3", TSDataType.INT32));
schemas.add(new MeasurementSchema("s4", TSDataType.TEXT));
schemas.add(new MeasurementSchema("s5", TSDataType.BOOLEAN));
TestUtilsForAlignedSeries.registerTimeSeries(
storageGroup,
devices.toArray(new String[] {}),
schemas.toArray(new IMeasurementSchema[] {}),
aligned);
boolean[] randomNull = new boolean[] {true, false, true, false, true};
int timeInterval = 500;
Random random = new Random(5);
List<TsFileResource> resources = new ArrayList<>();
for (int i = 1; i < 30; i++) {
TsFileResource resource =
new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", i, i)));
TestUtilsForAlignedSeries.writeTsFile(
devices.toArray(new String[0]),
schemas.toArray(new IMeasurementSchema[0]),
resource,
aligned,
timeInterval * i,
timeInterval * (i + 1),
randomNull);
resources.add(resource);
}
TsFileResource resource =
new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", 30, 30)));
// the start time and end time is the same
// it will write tsfile with empty chunk group
TestUtilsForAlignedSeries.writeTsFile(
devices.toArray(new String[0]),
schemas.toArray(new IMeasurementSchema[0]),
resource,
aligned,
timeInterval * (30 + 1),
timeInterval * (30 + 1),
randomNull);
resources.add(resource);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(resources, true);
List<PartialPath> fullPaths = new ArrayList<>();
List<IMeasurementSchema> iMeasurementSchemas = new ArrayList<>();
List<String> measurementIds = new ArrayList<>();
schemas.forEach(
(e) -> {
measurementIds.add(e.getMeasurementId());
});
for (String device : devices) {
iMeasurementSchemas.addAll(schemas);
fullPaths.add(new AlignedPath(device, measurementIds, schemas));
}
Map<PartialPath, List<TimeValuePair>> originData =
CompactionCheckerUtils.getDataByQuery(
fullPaths, iMeasurementSchemas, resources, new ArrayList<>());
performer.setSourceFiles(resources);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(Collections.singletonList(targetResource), true, storageGroup);
Map<PartialPath, List<TimeValuePair>> compactedData =
CompactionCheckerUtils.getDataByQuery(
fullPaths,
iMeasurementSchemas,
Collections.singletonList(targetResource),
new ArrayList<>());
CompactionCheckerUtils.validDataByValueList(originData, compactedData);
}
@Test
public void testEmptyChunkWithModification() throws Exception {
List<IMeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s0", TSDataType.DOUBLE));
schemas.add(new MeasurementSchema("s1", TSDataType.FLOAT));
schemas.add(new MeasurementSchema("s2", TSDataType.INT64));
schemas.add(new MeasurementSchema("s3", TSDataType.INT32));
schemas.add(new MeasurementSchema("s4", TSDataType.TEXT));
schemas.add(new MeasurementSchema("s5", TSDataType.BOOLEAN));
Map<PartialPath, List<TimeValuePair>> originData = new HashMap<>();
List<TsFileResource> resources = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
TsFileIOWriter writer =
new TsFileIOWriter(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", i, i)));
AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(schemas);
for (int j = i * 100; j < i * 100 + 100; j++) {
TsPrimitiveType[] values = {
new TsPrimitiveType.TsDouble(0.0D),
new TsPrimitiveType.TsFloat(0.0F),
null,
null,
new TsPrimitiveType.TsBinary(new Binary("", TSFileConfig.STRING_CHARSET)),
new TsPrimitiveType.TsBoolean(false)
};
originData
.computeIfAbsent(new PartialPath("root.sg.d1.s0"), k -> new ArrayList<>())
.add(new TimeValuePair(j, values[0]));
originData
.computeIfAbsent(new PartialPath("root.sg.d1.s1"), k -> new ArrayList<>())
.add(new TimeValuePair(j, values[1]));
originData.computeIfAbsent(new PartialPath("root.sg.d1.s2"), k -> null);
originData.computeIfAbsent(new PartialPath("root.sg.d1.s3"), k -> null);
originData
.computeIfAbsent(new PartialPath("root.sg.d1.s4"), k -> new ArrayList<>())
.add(new TimeValuePair(j, values[4]));
originData
.computeIfAbsent(new PartialPath("root.sg.d1.s5"), k -> new ArrayList<>())
.add(new TimeValuePair(j, values[5]));
alignedChunkWriter.write(j, values);
}
writer.startChunkGroup(new PlainDeviceID("root.sg.d1"));
alignedChunkWriter.writeToFileWriter(writer);
writer.endChunkGroup();
writer.endFile();
TsFileResource resource = new TsFileResource(writer.getFile(), TsFileResourceStatus.NORMAL);
resource
.getModFile()
.write(new Deletion(new PartialPath("root.sg.d1.*"), i * 100, i * 100 + 20));
resource.getModFile().close();
int finalI = i;
originData.forEach(
(x, y) ->
y.removeIf(
timeValuePair ->
timeValuePair.getTimestamp() >= finalI * 100
&& timeValuePair.getTimestamp() < finalI * 100 + 20));
resources.add(resource);
}
performer.setSourceFiles(resources);
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(resources, true);
performer.setTargetFiles(Collections.singletonList(targetResource));
performer.setSummary(new CompactionTaskSummary());
performer.perform();
Assert.assertTrue(targetResource.getTsFile().exists());
RestorableTsFileIOWriter checkWriter = new RestorableTsFileIOWriter(targetResource.getTsFile());
Assert.assertFalse(checkWriter.hasCrashed());
}
}