blob: 0a3a44ee629aba1fc00ac29ad62c6ca79eff842e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.it;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.it.utils.TsFileGenerator;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.jdbc.IoTDBSQLException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.file.Files;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.it.utils.TestUtils.assertNonQueryTestFail;
import static org.apache.iotdb.db.it.utils.TestUtils.createUser;
import static org.apache.iotdb.db.it.utils.TestUtils.executeNonQuery;
import static org.apache.iotdb.db.it.utils.TestUtils.grantUserSeriesPrivilege;
import static org.apache.iotdb.db.it.utils.TestUtils.grantUserSystemPrivileges;
@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IOTDBLoadTsFileIT {
private static final Logger LOGGER = LoggerFactory.getLogger(IOTDBLoadTsFileIT.class);
private static final long PARTITION_INTERVAL = 10 * 1000L;
private static final int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(300);
private static final long loadTsFileAnalyzeSchemaMemorySizeInBytes = 10 * 1024L;
private File tmpDir;
@Before
public void setUp() throws Exception {
tmpDir = new File(Files.createTempDirectory("load").toUri());
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(PARTITION_INTERVAL);
EnvFactory.getEnv()
.getConfig()
.getDataNodeConfig()
.setConnectionTimeoutInMS(connectionTimeoutInMS)
.setLoadTsFileAnalyzeSchemaMemorySizeInBytes(loadTsFileAnalyzeSchemaMemorySizeInBytes);
EnvFactory.getEnv().initClusterEnvironment();
}
@After
public void tearDown() throws Exception {
deleteSG();
EnvFactory.getEnv().cleanClusterEnvironment();
if (!deleteDir()) {
LOGGER.error("Can not delete tmp dir for loading tsfile.");
}
}
private void registerSchema() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute("CREATE DATABASE " + SchemaConfig.STORAGE_GROUP_0);
statement.execute("CREATE DATABASE " + SchemaConfig.STORAGE_GROUP_1);
statement.execute(convert2SQL(SchemaConfig.DEVICE_0, SchemaConfig.MEASUREMENT_00));
statement.execute(convert2SQL(SchemaConfig.DEVICE_0, SchemaConfig.MEASUREMENT_01));
statement.execute(convert2SQL(SchemaConfig.DEVICE_0, SchemaConfig.MEASUREMENT_02));
statement.execute(convert2SQL(SchemaConfig.DEVICE_0, SchemaConfig.MEASUREMENT_03));
statement.execute(
convert2AlignedSQL(
SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13)));
statement.execute(convert2SQL(SchemaConfig.DEVICE_2, SchemaConfig.MEASUREMENT_20));
statement.execute(convert2SQL(SchemaConfig.DEVICE_3, SchemaConfig.MEASUREMENT_30));
statement.execute(
convert2AlignedSQL(SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40)));
}
}
private String convert2SQL(String device, MeasurementSchema schema) {
String sql =
String.format(
"create timeseries %s %s",
new Path(device, schema.getMeasurementId(), true).getFullPath(),
schema.getType().name());
LOGGER.info("schema execute: {}", sql);
return sql;
}
private String convert2AlignedSQL(String device, List<MeasurementSchema> schemas) {
String sql = String.format("create aligned timeseries %s(", device);
for (int i = 0; i < schemas.size(); i++) {
MeasurementSchema schema = schemas.get(i);
sql += (String.format("%s %s", schema.getMeasurementId(), schema.getType().name()));
sql += (i == schemas.size() - 1 ? ")" : ",");
}
LOGGER.info("schema execute: {}.", sql);
return sql;
}
private void deleteSG() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("delete database %s", SchemaConfig.STORAGE_GROUP_0));
statement.execute(String.format("delete database %s", SchemaConfig.STORAGE_GROUP_1));
} catch (IoTDBSQLException ignored) {
}
}
private boolean deleteDir() {
for (File file : tmpDir.listFiles()) {
if (!file.delete()) {
return false;
}
}
return tmpDir.delete();
}
@Test
public void testLoad() throws Exception {
registerSchema();
long writtenPoint1 = 0;
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
generator.registerTimeseries(
SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
generator.generateData(SchemaConfig.DEVICE_0, 100000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_1, 100000, PARTITION_INTERVAL / 10_000, true);
writtenPoint1 = generator.getTotalNumber();
}
long writtenPoint2 = 0;
// device 2, device 3, device4, sg 1
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
generator.registerTimeseries(
SchemaConfig.DEVICE_2, Arrays.asList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
SchemaConfig.DEVICE_3, Arrays.asList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40));
generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL / 10_000, true);
writtenPoint2 = generator.getTotalNumber();
}
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));
try (ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
if (resultSet.next()) {
long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
Assert.assertEquals(writtenPoint1, sg1Count);
long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
Assert.assertEquals(writtenPoint2, sg2Count);
} else {
Assert.fail("This ResultSet is empty.");
}
}
}
// try delete after loading. Expect no deadlock
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"delete timeseries %s.%s",
SchemaConfig.DEVICE_0, SchemaConfig.MEASUREMENT_00.getMeasurementId()));
}
}
@Test
public void testLoadWithExtendTemplate() throws Exception {
long writtenPoint1 = 0;
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
generator.registerTimeseries(
SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
generator.generateData(SchemaConfig.DEVICE_0, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_1, 10000, PARTITION_INTERVAL / 10_000, true);
writtenPoint1 = generator.getTotalNumber();
}
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute("create database root.sg.test_0");
statement.execute(
"create device template t1 (lat FLOAT encoding=Gorilla, lon FLOAT encoding=Gorilla)");
statement.execute(" set device template t1 to root.sg.test_0.d_0");
statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));
try (ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
if (resultSet.next()) {
long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
Assert.assertEquals(writtenPoint1, sg1Count);
} else {
Assert.fail("This ResultSet is empty.");
}
}
Set<String> nodes =
new HashSet<>(
Arrays.asList(
"lat",
"lon",
SchemaConfig.MEASUREMENT_00.getMeasurementId(),
SchemaConfig.MEASUREMENT_01.getMeasurementId(),
SchemaConfig.MEASUREMENT_02.getMeasurementId(),
SchemaConfig.MEASUREMENT_03.getMeasurementId()));
try (ResultSet resultSet = statement.executeQuery("show nodes in schema template t1")) {
while (resultSet.next()) {
String device = resultSet.getString(ColumnHeaderConstant.CHILD_NODES);
Assert.assertTrue(nodes.remove(device));
}
Assert.assertTrue(nodes.isEmpty());
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Parse result set error.");
}
}
}
@Test
public void testLoadWithAutoRegister() throws Exception {
long writtenPoint1 = 0;
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
generator.registerTimeseries(
SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
generator.generateData(SchemaConfig.DEVICE_0, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_1, 10000, PARTITION_INTERVAL / 10_000, true);
writtenPoint1 = generator.getTotalNumber();
}
long writtenPoint2 = 0;
// device 2, device 3, device4, sg 1
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
generator.registerTimeseries(
SchemaConfig.DEVICE_2, Arrays.asList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
SchemaConfig.DEVICE_3, Arrays.asList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40));
generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL / 10_000, true);
writtenPoint2 = generator.getTotalNumber();
}
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));
try (ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
if (resultSet.next()) {
long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
Assert.assertEquals(writtenPoint1, sg1Count);
long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
Assert.assertEquals(writtenPoint2, sg2Count);
} else {
Assert.fail("This ResultSet is empty.");
}
}
Map<String, String> isAligned = new HashMap<>();
isAligned.put(SchemaConfig.DEVICE_0, "false");
isAligned.put(SchemaConfig.DEVICE_1, "true");
isAligned.put(SchemaConfig.DEVICE_2, "false");
isAligned.put(SchemaConfig.DEVICE_3, "false");
isAligned.put(SchemaConfig.DEVICE_4, "true");
try (ResultSet resultSet = statement.executeQuery("show devices")) {
int size = 0;
while (resultSet.next()) {
size += 1;
String device = resultSet.getString(ColumnHeaderConstant.DEVICE);
Assert.assertEquals(
isAligned.get(device), resultSet.getString(ColumnHeaderConstant.IS_ALIGNED));
}
Assert.assertEquals(isAligned.size(), size);
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Parse result set error.");
}
}
}
@Test
public void testAuth() throws Exception {
createUser("test", "test123");
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "test1-0-0-0.tsfile"))) {
generator.registerTimeseries(
SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
generator.generateData(SchemaConfig.DEVICE_0, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_1, 10000, PARTITION_INTERVAL / 10_000, true);
}
// device 2, device 3, device4, sg 1
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "test2-0-0-0.tsfile"))) {
generator.registerTimeseries(
SchemaConfig.DEVICE_2, Arrays.asList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
SchemaConfig.DEVICE_3, Arrays.asList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40));
generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL / 10_000, true);
}
assertNonQueryTestFail(
String.format("load \"%s\" sgLevel=2", tmpDir.getAbsolutePath()),
"No permissions for this operation, please add privilege WRITE_DATA",
"test",
"test123");
grantUserSeriesPrivilege("test", PrivilegeType.WRITE_DATA, "root.**");
assertNonQueryTestFail(
String.format("load \"%s\" sgLevel=2", tmpDir.getAbsolutePath()),
"No permissions for this operation, please add privilege MANAGE_DATABASE",
"test",
"test123");
grantUserSystemPrivileges("test", PrivilegeType.MANAGE_DATABASE);
assertNonQueryTestFail(
String.format("load \"%s\" sgLevel=2", tmpDir.getAbsolutePath()),
"Auto create or verify schema error when executing statement LoadTsFileStatement",
"test",
"test123");
grantUserSystemPrivileges("test", PrivilegeType.WRITE_SCHEMA);
executeNonQuery(
String.format("load \"%s\" sgLevel=2", tmpDir.getAbsolutePath()), "test", "test123");
}
@Test
public void testLoadWithOnSuccess() throws Exception {
File file1 = new File(tmpDir, "1-0-0-0.tsfile");
File file2 = new File(tmpDir, "2-0-0-0.tsfile");
long writtenPoint1 = 0;
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(file1)) {
generator.registerTimeseries(
SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
generator.generateData(SchemaConfig.DEVICE_0, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_1, 10000, PARTITION_INTERVAL / 10_000, true);
writtenPoint1 = generator.getTotalNumber();
}
long writtenPoint2 = 0;
// device 2, device 3, device4, sg 1
try (TsFileGenerator generator = new TsFileGenerator(file2)) {
generator.registerTimeseries(
SchemaConfig.DEVICE_2, Arrays.asList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
SchemaConfig.DEVICE_3, Arrays.asList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40));
generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL / 10_000, true);
writtenPoint2 = generator.getTotalNumber();
}
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format("load \"%s\" sglevel=2 onSuccess=none", file1.getAbsolutePath()));
try (ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
if (resultSet.next()) {
long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
Assert.assertEquals(writtenPoint1, sg1Count);
} else {
Assert.fail("This ResultSet is empty.");
}
}
Assert.assertTrue(file1.exists());
}
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format("load \"%s\" sglevel=2 onSuccess=delete", file2.getAbsolutePath()));
try (ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
if (resultSet.next()) {
long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
Assert.assertEquals(writtenPoint1, sg1Count);
long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
Assert.assertEquals(writtenPoint2, sg2Count);
} else {
Assert.fail("This ResultSet is empty.");
}
}
Assert.assertFalse(file2.exists());
}
}
@Test
public void testLoadWithLastCache() throws Exception {
registerSchema();
String device = SchemaConfig.DEVICE_0;
String measurement = SchemaConfig.MEASUREMENT_00.getMeasurementId();
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format("insert into %s(timestamp, %s) values(100, 100)", device, measurement));
try (ResultSet resultSet =
statement.executeQuery(String.format("select last %s from %s", measurement, device))) {
if (resultSet.next()) {
String lastValue = resultSet.getString(ColumnHeaderConstant.VALUE);
Assert.assertEquals("100", lastValue);
} else {
Assert.fail("This ResultSet is empty.");
}
}
}
File file1 = new File(tmpDir, "1-0-0-0.tsfile");
File file2 = new File(tmpDir, "2-0-0-0.tsfile");
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(file1)) {
generator.registerTimeseries(
SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
generator.generateData(SchemaConfig.DEVICE_0, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_1, 10000, PARTITION_INTERVAL / 10_000, true);
}
// device 2, device 3, device4, sg 1
try (TsFileGenerator generator = new TsFileGenerator(file2)) {
generator.registerTimeseries(
SchemaConfig.DEVICE_2, Arrays.asList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
SchemaConfig.DEVICE_3, Arrays.asList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40));
generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL / 10_000, true);
}
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));
try (ResultSet resultSet =
statement.executeQuery(String.format("select last %s from %s", measurement, device))) {
if (resultSet.next()) {
String lastTime = resultSet.getString(ColumnHeaderConstant.TIME);
Assert.assertEquals(String.valueOf(PARTITION_INTERVAL), lastTime);
} else {
Assert.fail("This ResultSet is empty.");
}
}
}
}
@Test
public void testLoadWithOnNonStandardTsFileName() throws Exception {
File file1 = new File(tmpDir, "1-0-0-0.tsfile");
File file2 = new File(tmpDir, "a-1.tsfile");
long writtenPoint1 = 0;
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(file1)) {
generator.registerTimeseries(
SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
generator.generateData(SchemaConfig.DEVICE_0, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_1, 10000, PARTITION_INTERVAL / 10_000, true);
writtenPoint1 = generator.getTotalNumber();
}
long writtenPoint2 = 0;
// device 2, device 3, device4, sg 1
try (TsFileGenerator generator = new TsFileGenerator(file2)) {
generator.registerTimeseries(
SchemaConfig.DEVICE_2, Arrays.asList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
SchemaConfig.DEVICE_3, Arrays.asList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40));
generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL / 10_000, true);
writtenPoint2 = generator.getTotalNumber();
}
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));
try (ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
if (resultSet.next()) {
long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
Assert.assertEquals(writtenPoint1, sg1Count);
long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
Assert.assertEquals(writtenPoint2, sg2Count);
} else {
Assert.fail("This ResultSet is empty.");
}
}
}
}
@Test
public void testLoadWithMods() throws Exception {
long writtenPoint1 = 0;
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
generator.registerTimeseries(
SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
generator.generateData(SchemaConfig.DEVICE_0, 100000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_1, 100000, PARTITION_INTERVAL / 10_000, true);
generator.generateDeletion(SchemaConfig.DEVICE_0, 10);
generator.generateDeletion(SchemaConfig.DEVICE_1, 10);
writtenPoint1 = generator.getTotalNumber();
}
long writtenPoint2 = 0;
// device 2, device 3, device4, sg 1
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
generator.resetRandom(1000);
generator.registerTimeseries(
SchemaConfig.DEVICE_2, Arrays.asList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
SchemaConfig.DEVICE_3, Arrays.asList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40));
generator.generateData(SchemaConfig.DEVICE_2, 100, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 100, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 100, PARTITION_INTERVAL / 10_000, true);
generator.generateDeletion(SchemaConfig.DEVICE_2, 2);
generator.generateDeletion(SchemaConfig.DEVICE_4, 2);
generator.generateData(SchemaConfig.DEVICE_2, 100, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 100, PARTITION_INTERVAL / 10_000, true);
generator.generateDeletion(SchemaConfig.DEVICE_2, 2);
generator.generateDeletion(SchemaConfig.DEVICE_4, 2);
writtenPoint2 = generator.getTotalNumber();
}
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));
try (ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
if (resultSet.next()) {
long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
Assert.assertEquals(writtenPoint1, sg1Count);
long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
Assert.assertEquals(writtenPoint2, sg2Count);
} else {
Assert.fail("This ResultSet is empty.");
}
}
}
}
@Test
public void testLoadWithEmptyTsFile() throws Exception {
try (TsFileGenerator ignored = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {}
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("load \"%s\"", tmpDir.getAbsolutePath()));
try (ResultSet resultSet = statement.executeQuery("show timeseries")) {
Assert.assertFalse(resultSet.next());
}
}
}
@Test
public void testLoadTsFileWithWrongTimestampPrecision() throws Exception {
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
generator.registerTimeseries(
SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
// generate ns timestamp
generator.generateData(
SchemaConfig.DEVICE_0, 100000, PARTITION_INTERVAL / 10_000, false, 1694689856546000000L);
generator.generateData(
SchemaConfig.DEVICE_1, 100000, PARTITION_INTERVAL / 10_000, true, 1694689856546000000L);
}
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("load \"%s\"", tmpDir.getAbsolutePath()));
} catch (IoTDBSQLException e) {
Assert.assertTrue(e.getMessage().contains("Current system timestamp precision is ms"));
}
}
@Test
public void testLoadLocally() throws Exception {
registerSchema();
long writtenPoint1 = 0;
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
generator.registerTimeseries(
SchemaConfig.DEVICE_0, Collections.singletonList(SchemaConfig.MEASUREMENT_00));
generator.generateData(SchemaConfig.DEVICE_0, 1, PARTITION_INTERVAL / 10_000, false);
writtenPoint1 = generator.getTotalNumber();
}
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));
try (ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
if (resultSet.next()) {
long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
Assert.assertEquals(writtenPoint1, sg1Count);
} else {
Assert.fail("This ResultSet is empty.");
}
}
}
}
private static class SchemaConfig {
private static final String STORAGE_GROUP_0 = "root.sg.test_0";
private static final String STORAGE_GROUP_1 = "root.sg.test_1";
// device 0, nonaligned, sg 0
private static final String DEVICE_0 = "root.sg.test_0.d_0";
private static final MeasurementSchema MEASUREMENT_00 =
new MeasurementSchema("sensor_00", TSDataType.INT32, TSEncoding.RLE);
private static final MeasurementSchema MEASUREMENT_01 =
new MeasurementSchema("sensor_01", TSDataType.INT64, TSEncoding.RLE);
private static final MeasurementSchema MEASUREMENT_02 =
new MeasurementSchema("sensor_02", TSDataType.DOUBLE, TSEncoding.GORILLA);
private static final MeasurementSchema MEASUREMENT_03 =
new MeasurementSchema("sensor_03", TSDataType.TEXT, TSEncoding.PLAIN);
// device 1, aligned, sg 0
private static final String DEVICE_1 = "root.sg.test_0.a_1";
private static final MeasurementSchema MEASUREMENT_10 =
new MeasurementSchema("sensor_10", TSDataType.INT32, TSEncoding.RLE);
private static final MeasurementSchema MEASUREMENT_11 =
new MeasurementSchema("sensor_11", TSDataType.INT64, TSEncoding.RLE);
private static final MeasurementSchema MEASUREMENT_12 =
new MeasurementSchema("sensor_12", TSDataType.DOUBLE, TSEncoding.GORILLA);
private static final MeasurementSchema MEASUREMENT_13 =
new MeasurementSchema("sensor_13", TSDataType.TEXT, TSEncoding.PLAIN);
// device 2, non aligned, sg 1
private static final String DEVICE_2 = "root.sg.test_1.d_2";
private static final MeasurementSchema MEASUREMENT_20 =
new MeasurementSchema("sensor_20", TSDataType.INT32, TSEncoding.RLE);
// device 3, non aligned, sg 1
private static final String DEVICE_3 = "root.sg.test_1.d_3";
private static final MeasurementSchema MEASUREMENT_30 =
new MeasurementSchema("sensor_30", TSDataType.INT32, TSEncoding.RLE);
// device 4, aligned, sg 1
private static final String DEVICE_4 = "root.sg.test_1.a_4";
private static final MeasurementSchema MEASUREMENT_40 =
new MeasurementSchema("sensor_40", TSDataType.INT32, TSEncoding.RLE);
}
}