blob: c07616a4ce5bc6c3d1acbba66bb954397f5c9a74 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.BooleanDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class CompactionDataTypeNotMatchTest extends AbstractCompactionTest {
private final String oldThreadName = Thread.currentThread().getName();
private final IDeviceID device = new PlainDeviceID(COMPACTION_TEST_SG + ".d1");
@Before
public void setUp()
throws IOException, WriteProcessException, MetadataException, InterruptedException {
super.setUp();
Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
}
@After
public void tearDown() throws IOException, StorageEngineException {
super.tearDown();
Thread.currentThread().setName(oldThreadName);
}
@Test
public void testCompactNonAlignedSeriesWithReadChunkCompactionPerformer()
throws IOException, WriteProcessException {
generateDataTypeNotMatchFilesWithNonAlignedSeries();
InnerSpaceCompactionTask task =
new InnerSpaceCompactionTask(
0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0);
Assert.assertTrue(task.start());
TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
}
@Test
public void testCompactNonAlignedSeriesWithFastCompactionPerformer()
throws IOException, WriteProcessException {
generateDataTypeNotMatchFilesWithNonAlignedSeries();
InnerSpaceCompactionTask task =
new InnerSpaceCompactionTask(
0, tsFileManager, seqResources, true, new FastCompactionPerformer(false), 0);
Assert.assertTrue(task.start());
TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
}
@Test
public void testCompactNonAlignedSeriesWithReadPointCompactionPerformer()
throws IOException, WriteProcessException {
generateDataTypeNotMatchFilesWithNonAlignedSeries();
InnerSpaceCompactionTask task =
new InnerSpaceCompactionTask(
0, tsFileManager, seqResources, true, new ReadPointCompactionPerformer(), 0);
Assert.assertTrue(task.start());
TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
}
@Test
public void testCompactAlignedSeriesWithReadChunkCompactionPerformer()
throws IOException, WriteProcessException {
generateDataTypeNotMatchFilesWithAlignedSeries();
InnerSpaceCompactionTask task =
new InnerSpaceCompactionTask(
0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0);
Assert.assertTrue(task.start());
TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
}
@Test
public void testCompactAlignedSeriesWithFastCompactionPerformer()
throws IOException, WriteProcessException {
generateDataTypeNotMatchFilesWithAlignedSeries();
InnerSpaceCompactionTask task =
new InnerSpaceCompactionTask(
0, tsFileManager, seqResources, true, new FastCompactionPerformer(false), 0);
Assert.assertTrue(task.start());
TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
}
@Test
public void testCompactAlignedSeriesWithReadPointCompactionPerformer()
throws IOException, WriteProcessException {
generateDataTypeNotMatchFilesWithAlignedSeries();
InnerSpaceCompactionTask task =
new InnerSpaceCompactionTask(
0, tsFileManager, seqResources, true, new ReadPointCompactionPerformer(), 0);
Assert.assertTrue(task.start());
TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
}
private void generateDataTypeNotMatchFilesWithNonAlignedSeries()
throws IOException, WriteProcessException {
MeasurementSchema measurementSchema1 = new MeasurementSchema("s1", TSDataType.BOOLEAN);
TsFileResource resource1 = createEmptyFileAndResource(true);
resource1.setStatusForTest(TsFileResourceStatus.COMPACTING);
try (TsFileWriter writer = new TsFileWriter(resource1.getTsFile())) {
writer.registerTimeseries(new Path(device), measurementSchema1);
TSRecord record = new TSRecord(1, device);
record.addTuple(new BooleanDataPoint("s1", true));
writer.write(record);
writer.flushAllChunkGroups();
}
resource1.updateStartTime(device, 1);
resource1.updateEndTime(device, 1);
resource1.serialize();
seqResources.add(resource1);
MeasurementSchema measurementSchema2 = new MeasurementSchema("s1", TSDataType.INT32);
TsFileResource resource2 = createEmptyFileAndResource(true);
resource2.setStatusForTest(TsFileResourceStatus.COMPACTING);
try (TsFileWriter writer = new TsFileWriter(resource2.getTsFile())) {
writer.registerTimeseries(new Path(device), measurementSchema2);
TSRecord record = new TSRecord(2, device);
record.addTuple(new IntDataPoint("s1", 10));
writer.write(record);
writer.flushAllChunkGroups();
}
resource2.updateStartTime(device, 2);
resource2.updateEndTime(device, 2);
resource2.serialize();
seqResources.add(resource2);
}
private void generateDataTypeNotMatchFilesWithAlignedSeries()
throws IOException, WriteProcessException {
List<MeasurementSchema> measurementSchemas1 = new ArrayList<>();
measurementSchemas1.add(new MeasurementSchema("s1", TSDataType.INT32));
measurementSchemas1.add(new MeasurementSchema("s2", TSDataType.INT32));
TsFileResource resource1 = createEmptyFileAndResource(true);
resource1.setStatusForTest(TsFileResourceStatus.COMPACTING);
try (TsFileWriter writer = new TsFileWriter(resource1.getTsFile())) {
writer.registerAlignedTimeseries(new Path(device), measurementSchemas1);
TSRecord record = new TSRecord(1, device);
record.addTuple(new IntDataPoint("s1", 0));
record.addTuple(new IntDataPoint("s2", 1));
writer.writeAligned(record);
writer.flushAllChunkGroups();
}
resource1.updateStartTime(device, 1);
resource1.updateEndTime(device, 1);
resource1.serialize();
seqResources.add(resource1);
List<MeasurementSchema> measurementSchemas2 = new ArrayList<>();
measurementSchemas2.add(new MeasurementSchema("s1", TSDataType.BOOLEAN));
measurementSchemas2.add(new MeasurementSchema("s2", TSDataType.BOOLEAN));
TsFileResource resource2 = createEmptyFileAndResource(true);
resource2.setStatusForTest(TsFileResourceStatus.COMPACTING);
try (TsFileWriter writer = new TsFileWriter(resource2.getTsFile())) {
writer.registerAlignedTimeseries(new Path(device), measurementSchemas2);
TSRecord record = new TSRecord(2, device);
record.addTuple(new BooleanDataPoint("s1", true));
record.addTuple(new BooleanDataPoint("s2", true));
writer.writeAligned(record);
writer.flushAllChunkGroups();
}
resource2.updateStartTime(device, 2);
resource2.updateEndTime(device, 2);
resource2.serialize();
seqResources.add(resource2);
}
}