blob: 6b1ddb33c3b04c639180d83d0e63324812f04833 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.iotdb.db.engine.storagegroup;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.TriggerExecutionException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.Planner;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.SchemaTestUtils;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TTLTest {
private String sg1 = "root.TTL_SG1";
private String sg2 = "root.TTL_SG2";
private long ttl = 12345;
private DataRegion dataRegion;
private String s1 = "s1";
private String g1s1 = sg1 + IoTDBConstant.PATH_SEPARATOR + s1;
private long prevPartitionInterval;
@Before
public void setUp() throws MetadataException, DataRegionException {
prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(86400);
EnvironmentUtils.envSetUp();
createSchemas();
}
@After
public void tearDown() throws IOException, StorageEngineException {
dataRegion.syncCloseAllWorkingTsFileProcessors();
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval);
}
private void createSchemas() throws MetadataException, DataRegionException {
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sg1));
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sg2));
dataRegion =
new DataRegion(
IoTDBDescriptor.getInstance().getConfig().getSystemDir(),
sg1,
new DirectFlushPolicy(),
sg1);
IoTDB.schemaProcessor.createTimeseries(
new PartialPath(g1s1),
TSDataType.INT64,
TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED,
Collections.emptyMap());
}
@Test
public void testSetMetaTTL() throws IOException, MetadataException {
// exception is expected when setting ttl to a non-exist storage group
boolean caught = false;
try {
IoTDB.schemaProcessor.setTTL(new PartialPath(sg1 + ".notExist"), ttl);
} catch (MetadataException e) {
caught = true;
}
assertTrue(caught);
// normally set ttl
IoTDB.schemaProcessor.setTTL(new PartialPath(sg1), ttl);
IStorageGroupMNode mNode =
IoTDB.schemaProcessor.getStorageGroupNodeByPath(new PartialPath(sg1));
assertEquals(ttl, mNode.getDataTTL());
// default ttl
mNode = IoTDB.schemaProcessor.getStorageGroupNodeByPath(new PartialPath(sg2));
assertEquals(Long.MAX_VALUE, mNode.getDataTTL());
}
@Test
public void testTTLWrite()
throws WriteProcessException, QueryProcessException, IllegalPathException,
TriggerExecutionException {
InsertRowPlan plan = new InsertRowPlan();
plan.setDevicePath(new PartialPath(sg1));
plan.setTime(System.currentTimeMillis());
plan.setMeasurements(new String[] {"s1"});
plan.setDataTypes(new TSDataType[] {TSDataType.INT64});
plan.setValues(new Object[] {1L});
plan.setMeasurementMNodes(
new IMeasurementMNode[] {
MeasurementMNode.getMeasurementMNode(
null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null)
});
plan.transferType();
// ok without ttl
dataRegion.insert(plan);
dataRegion.setDataTTL(1000);
// with ttl
plan.setTime(System.currentTimeMillis() - 1001);
boolean caught = false;
try {
dataRegion.insert(plan);
} catch (OutOfTTLException e) {
caught = true;
}
assertTrue(caught);
plan.setTime(System.currentTimeMillis() - 900);
dataRegion.insert(plan);
}
private void prepareData()
throws WriteProcessException, QueryProcessException, IllegalPathException,
TriggerExecutionException {
InsertRowPlan plan = new InsertRowPlan();
plan.setDevicePath(new PartialPath(sg1));
plan.setTime(System.currentTimeMillis());
plan.setMeasurements(new String[] {"s1"});
plan.setDataTypes(new TSDataType[] {TSDataType.INT64});
plan.setValues(new Object[] {1L});
plan.setMeasurementMNodes(
new IMeasurementMNode[] {
MeasurementMNode.getMeasurementMNode(
null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null)
});
plan.transferType();
long initTime = System.currentTimeMillis();
// sequence data
for (int i = 1000; i < 2000; i++) {
plan.setTime(initTime - 2000 + i);
dataRegion.insert(plan);
if ((i + 1) % 300 == 0) {
dataRegion.syncCloseAllWorkingTsFileProcessors();
}
}
// unsequence data
for (int i = 0; i < 1000; i++) {
plan.setTime(initTime - 2000 + i);
dataRegion.insert(plan);
if ((i + 1) % 300 == 0) {
dataRegion.syncCloseAllWorkingTsFileProcessors();
}
}
}
@Test
public void testTTLRead()
throws IOException, WriteProcessException, StorageEngineException, QueryProcessException,
MetadataException {
prepareData();
// files before ttl
QueryDataSource dataSource =
dataRegion.query(
Collections.singletonList(
SchemaTestUtils.getMeasurementPath(sg1 + TsFileConstant.PATH_SEPARATOR + s1)),
sg1,
EnvironmentUtils.TEST_QUERY_CONTEXT,
null,
null);
List<TsFileResource> seqResource = dataSource.getSeqResources();
List<TsFileResource> unseqResource = dataSource.getUnseqResources();
assertEquals(4, seqResource.size());
assertEquals(4, unseqResource.size());
dataRegion.setDataTTL(500);
// files after ttl
dataSource =
dataRegion.query(
Collections.singletonList(
SchemaTestUtils.getMeasurementPath(sg1 + TsFileConstant.PATH_SEPARATOR + s1)),
sg1,
EnvironmentUtils.TEST_QUERY_CONTEXT,
null,
null);
seqResource = dataSource.getSeqResources();
unseqResource = dataSource.getUnseqResources();
assertTrue(seqResource.size() < 4);
assertEquals(0, unseqResource.size());
MeasurementPath path =
SchemaTestUtils.getMeasurementPath(sg1 + TsFileConstant.PATH_SEPARATOR + s1);
Set<String> allSensors = new HashSet<>();
allSensors.add(s1);
IBatchReader reader =
new SeriesRawDataBatchReader(
path,
TSDataType.INT64,
EnvironmentUtils.TEST_QUERY_CONTEXT,
seqResource,
unseqResource,
null,
null,
true);
int cnt = 0;
while (reader.hasNextBatch()) {
BatchData batchData = reader.nextBatch();
while (batchData.hasCurrent()) {
batchData.next();
cnt++;
}
}
reader.close();
// we cannot offer the exact number since when exactly ttl will be checked is unknown
assertTrue(cnt <= 1000);
dataRegion.setDataTTL(0);
dataSource =
dataRegion.query(
Collections.singletonList(
SchemaTestUtils.getMeasurementPath(sg1 + TsFileConstant.PATH_SEPARATOR + s1)),
sg1,
EnvironmentUtils.TEST_QUERY_CONTEXT,
null,
null);
seqResource = dataSource.getSeqResources();
unseqResource = dataSource.getUnseqResources();
assertEquals(0, seqResource.size());
assertEquals(0, unseqResource.size());
}
@Test
public void testTTLRemoval()
throws StorageEngineException, WriteProcessException, QueryProcessException,
IllegalPathException {
prepareData();
dataRegion.syncCloseAllWorkingTsFileProcessors();
// files before ttl
File seqDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), sg1);
File unseqDir = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(), sg1);
List<File> seqFiles = new ArrayList<>();
for (File directory : seqDir.listFiles()) {
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
if (file.isDirectory()) {
for (File tsfile : file.listFiles()) {
if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
seqFiles.add(file);
}
}
}
}
}
}
List<File> unseqFiles = new ArrayList<>();
for (File directory : unseqDir.listFiles()) {
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
if (file.isDirectory()) {
for (File tsfile : file.listFiles()) {
if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
unseqFiles.add(file);
}
}
}
}
}
}
assertEquals(4, seqFiles.size());
assertEquals(4, unseqFiles.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
dataRegion.setDataTTL(500);
dataRegion.checkFilesTTL();
// files after ttl
seqFiles = new ArrayList<>();
for (File directory : seqDir.listFiles()) {
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
if (file.isDirectory()) {
for (File tsfile : file.listFiles()) {
if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
seqFiles.add(file);
}
}
}
}
}
}
unseqFiles = new ArrayList<>();
for (File directory : unseqDir.listFiles()) {
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
if (file.isDirectory()) {
for (File tsfile : file.listFiles()) {
if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
unseqFiles.add(file);
}
}
}
}
}
}
assertTrue(seqFiles.size() <= 2);
assertEquals(0, unseqFiles.size());
}
@Test
public void testParseSetTTL() throws QueryProcessException {
Planner planner = new Planner();
SetTTLPlan plan = (SetTTLPlan) planner.parseSQLToPhysicalPlan("SET TTL TO " + sg1 + " 10000");
assertEquals(sg1, plan.getStorageGroup().getFullPath());
assertEquals(10000, plan.getDataTTL());
plan = (SetTTLPlan) planner.parseSQLToPhysicalPlan("UNSET TTL TO " + sg2);
assertEquals(sg2, plan.getStorageGroup().getFullPath());
assertEquals(Long.MAX_VALUE, plan.getDataTTL());
}
@Test
public void testParseShowTTL() throws QueryProcessException {
Planner planner = new Planner();
ShowTTLPlan plan = (ShowTTLPlan) planner.parseSQLToPhysicalPlan("SHOW ALL TTL");
assertTrue(plan.getStorageGroups().isEmpty());
List<String> sgs = new ArrayList<>();
sgs.add("root.sg1");
sgs.add("root.sg2");
sgs.add("root.sg3");
plan = (ShowTTLPlan) planner.parseSQLToPhysicalPlan("SHOW TTL ON root.sg1,root.sg2,root.sg3");
assertEquals(
sgs,
plan.getStorageGroups().stream()
.map(PartialPath::getFullPath)
.collect(Collectors.toList()));
}
@Test
public void testShowTTL()
throws IOException, QueryProcessException, QueryFilterOptimizationException,
StorageEngineException, MetadataException, InterruptedException {
IoTDB.schemaProcessor.setTTL(new PartialPath(sg1), ttl);
ShowTTLPlan plan = new ShowTTLPlan(Collections.emptyList());
PlanExecutor executor = new PlanExecutor();
QueryDataSet queryDataSet = executor.processQuery(plan, EnvironmentUtils.TEST_QUERY_CONTEXT);
while (queryDataSet.hasNext()) {
RowRecord rowRecord = queryDataSet.next();
String sg = rowRecord.getFields().get(0).getStringValue();
if (sg.equals(sg1)) {
assertEquals(ttl, rowRecord.getFields().get(1).getLongV());
} else if (sg.equals(sg2)) {
assertNull(rowRecord.getFields().get(1));
} else {
fail();
}
}
}
@Test
public void testTTLCleanFile()
throws WriteProcessException, QueryProcessException, IllegalPathException,
TriggerExecutionException {
prepareData();
dataRegion.syncCloseAllWorkingTsFileProcessors();
assertEquals(4, dataRegion.getSequenceFileList().size());
assertEquals(4, dataRegion.getUnSequenceFileList().size());
dataRegion.setDataTTL(0);
dataRegion.checkFilesTTL();
assertEquals(0, dataRegion.getSequenceFileList().size());
assertEquals(0, dataRegion.getUnSequenceFileList().size());
}
}