[IOTDB-3656]mpp load with auto create schema (#7293)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
index 3334155..d72e58e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
@@ -26,6 +26,7 @@
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.TsFileWriter;
@@ -72,8 +73,6 @@
originPartitionInterval = ConfigFactory.getConfig().getPartitionInterval();
ConfigFactory.getConfig().setPartitionInterval(PARTITION_INTERVAL);
EnvFactory.getEnv().initBeforeTest();
-
- registerSchema();
}
@After
@@ -159,6 +158,8 @@
@Test
public void testLoad() throws Exception {
+ registerSchema();
+
long writtenPoint1 = 0;
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
@@ -199,7 +200,7 @@
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
- statement.execute(String.format("load \"%s\"", tmpDir.getAbsolutePath()));
+ statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));
try (ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
@@ -215,6 +216,84 @@
}
}
+ @Test
+ public void testLoadWithAutoRegister() throws Exception {
+ long writtenPoint1 = 0;
+ // device 0, device 1, sg 0
+ try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
+ generator.registerTimeseries(
+ new Path(SchemaConfig.DEVICE_0),
+ Arrays.asList(
+ SchemaConfig.MEASUREMENT_00,
+ SchemaConfig.MEASUREMENT_01,
+ SchemaConfig.MEASUREMENT_02,
+ SchemaConfig.MEASUREMENT_03));
+ generator.registerAlignedTimeseries(
+ new Path(SchemaConfig.DEVICE_1),
+ Arrays.asList(
+ SchemaConfig.MEASUREMENT_10,
+ SchemaConfig.MEASUREMENT_11,
+ SchemaConfig.MEASUREMENT_12,
+ SchemaConfig.MEASUREMENT_13));
+ generator.generateData(new Path(SchemaConfig.DEVICE_0), 10000, false);
+ generator.generateData(new Path(SchemaConfig.DEVICE_1), 10000, true);
+ writtenPoint1 = generator.getTotalNumber();
+ }
+
+ long writtenPoint2 = 0;
+ // device 2, device 3, device4, sg 1
+ try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
+ generator.registerTimeseries(
+ new Path(SchemaConfig.DEVICE_2), Arrays.asList(SchemaConfig.MEASUREMENT_20));
+ generator.registerTimeseries(
+ new Path(SchemaConfig.DEVICE_3), Arrays.asList(SchemaConfig.MEASUREMENT_30));
+ generator.registerAlignedTimeseries(
+ new Path(SchemaConfig.DEVICE_4), Arrays.asList(SchemaConfig.MEASUREMENT_40));
+ generator.generateData(new Path(SchemaConfig.DEVICE_2), 10000, false);
+ generator.generateData(new Path(SchemaConfig.DEVICE_3), 10000, false);
+ generator.generateData(new Path(SchemaConfig.DEVICE_4), 10000, true);
+ writtenPoint2 = generator.getTotalNumber();
+ }
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ statement.execute(
+ String.format("load \"%s\" sglevel=2,autoregister=true", tmpDir.getAbsolutePath()));
+
+ try (ResultSet resultSet =
+ statement.executeQuery("select count(*) from root.** group by level=1,2")) {
+ if (resultSet.next()) {
+ long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
+ Assert.assertEquals(writtenPoint1, sg1Count);
+ long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
+ Assert.assertEquals(writtenPoint2, sg2Count);
+ } else {
+ Assert.fail("This ResultSet is empty.");
+ }
+ }
+
+ Map<String, String> isAligned = new HashMap<>();
+ isAligned.put(SchemaConfig.DEVICE_0, "false");
+ isAligned.put(SchemaConfig.DEVICE_1, "true");
+ isAligned.put(SchemaConfig.DEVICE_2, "false");
+ isAligned.put(SchemaConfig.DEVICE_3, "false");
+ isAligned.put(SchemaConfig.DEVICE_4, "true");
+ try (ResultSet resultSet = statement.executeQuery("show devices")) {
+ int size = 0;
+ while (resultSet.next()) {
+ size += 1;
+ String device = resultSet.getString("devices");
+ Assert.assertEquals(isAligned.get(device), resultSet.getString("isAligned"));
+ }
+ Assert.assertEquals(isAligned.size(), size);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Parse result set error.");
+ }
+ }
+ }
+
private static class SchemaConfig {
private static final String STORAGE_GROUP_0 = "root.sg.test_0";
private static final String STORAGE_GROUP_1 = "root.sg.test_1";
@@ -222,39 +301,39 @@
// device 0, nonaligned, sg 0
private static final String DEVICE_0 = "root.sg.test_0.d_0";
private static final MeasurementSchema MEASUREMENT_00 =
- new MeasurementSchema("sensor_00", TSDataType.INT32);
+ new MeasurementSchema("sensor_00", TSDataType.INT32, TSEncoding.RLE);
private static final MeasurementSchema MEASUREMENT_01 =
- new MeasurementSchema("sensor_01", TSDataType.INT64);
+ new MeasurementSchema("sensor_01", TSDataType.INT64, TSEncoding.RLE);
private static final MeasurementSchema MEASUREMENT_02 =
- new MeasurementSchema("sensor_02", TSDataType.DOUBLE);
+ new MeasurementSchema("sensor_02", TSDataType.DOUBLE, TSEncoding.GORILLA);
private static final MeasurementSchema MEASUREMENT_03 =
- new MeasurementSchema("sensor_03", TSDataType.TEXT);
+ new MeasurementSchema("sensor_03", TSDataType.TEXT, TSEncoding.PLAIN);
// device 1, aligned, sg 0
private static final String DEVICE_1 = "root.sg.test_0.a_1";
private static final MeasurementSchema MEASUREMENT_10 =
- new MeasurementSchema("sensor_10", TSDataType.INT32);
+ new MeasurementSchema("sensor_10", TSDataType.INT32, TSEncoding.RLE);
private static final MeasurementSchema MEASUREMENT_11 =
- new MeasurementSchema("sensor_11", TSDataType.INT64);
+ new MeasurementSchema("sensor_11", TSDataType.INT64, TSEncoding.RLE);
private static final MeasurementSchema MEASUREMENT_12 =
- new MeasurementSchema("sensor_12", TSDataType.DOUBLE);
+ new MeasurementSchema("sensor_12", TSDataType.DOUBLE, TSEncoding.GORILLA);
private static final MeasurementSchema MEASUREMENT_13 =
- new MeasurementSchema("sensor_13", TSDataType.TEXT);
+ new MeasurementSchema("sensor_13", TSDataType.TEXT, TSEncoding.PLAIN);
// device 2, non aligned, sg 1
private static final String DEVICE_2 = "root.sg.test_1.d_2";
private static final MeasurementSchema MEASUREMENT_20 =
- new MeasurementSchema("sensor_20", TSDataType.INT32);
+ new MeasurementSchema("sensor_20", TSDataType.INT32, TSEncoding.RLE);
// device 3, non aligned, sg 1
private static final String DEVICE_3 = "root.sg.test_1.d_3";
private static final MeasurementSchema MEASUREMENT_30 =
- new MeasurementSchema("sensor_30", TSDataType.INT32);
+ new MeasurementSchema("sensor_30", TSDataType.INT32, TSEncoding.RLE);
// device 4, aligned, sg 1
private static final String DEVICE_4 = "root.sg.test_1.a_4";
private static final MeasurementSchema MEASUREMENT_40 =
- new MeasurementSchema("sensor_40", TSDataType.INT32);
+ new MeasurementSchema("sensor_40", TSDataType.INT32, TSEncoding.RLE);
}
public class TsFileGenerator implements AutoCloseable {
diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 087d329..45214bb 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -1955,6 +1955,8 @@
PartialPath devicePath,
String[] measurements,
Function<Integer, TSDataType> getDataType,
+ TSEncoding[] encodings,
+ CompressionType[] compressionTypes,
boolean aligned)
throws MetadataException {
throw new UnsupportedOperationException();
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java b/server/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
new file mode 100644
index 0000000..1603c98
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class VerifyMetadataException extends IoTDBException {
+ public VerifyMetadataException(
+ String path, String compareInfo, String tsFileInfo, String tsFilePath, String IoTDBInfo) {
+ super(
+ String.format(
+ "%s %s mismatch, %s in tsfile %s, but %s in IoTDB.",
+ path, compareInfo, tsFileInfo, tsFilePath, IoTDBInfo),
+ TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
+ }
+
+ public VerifyMetadataException(String message) {
+ super(message, TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 55a5ecb..dec6efc 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -44,7 +44,9 @@
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
import java.io.File;
@@ -393,6 +395,8 @@
PartialPath devicePath,
String[] measurements,
Function<Integer, TSDataType> getDataType,
+ TSEncoding[] encodings,
+ CompressionType[] compressionTypes,
boolean aligned)
throws MetadataException;
// endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 28cc5fe..97a77bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -1869,6 +1869,30 @@
Collections.emptyMap());
}
+ /** create timeseries ignoring PathAlreadyExistException */
+ private void internalCreateTimeseries(
+ PartialPath path, TSDataType dataType, TSEncoding encoding, CompressionType compressor)
+ throws MetadataException {
+ if (encoding == null) {
+ encoding = getDefaultEncoding(dataType);
+ }
+ if (compressor == null) {
+ compressor = TSFileDescriptor.getInstance().getConfig().getCompressor();
+ }
+ createTimeseries(path, dataType, encoding, compressor, Collections.emptyMap());
+ }
+
+ /** create aligned timeseries ignoring PathAlreadyExistException */
+ private void internalAlignedCreateTimeseries(
+ PartialPath prefixPath,
+ List<String> measurements,
+ List<TSDataType> dataTypes,
+ List<TSEncoding> encodings,
+ List<CompressionType> compressors)
+ throws MetadataException {
+ createAlignedTimeSeries(prefixPath, measurements, dataTypes, encodings, compressors);
+ }
+
/** create aligned timeseries ignoring PathAlreadyExistException */
private void internalAlignedCreateTimeseries(
PartialPath prefixPath, List<String> measurements, List<TSDataType> dataTypes)
@@ -1887,6 +1911,8 @@
PartialPath devicePath,
String[] measurements,
Function<Integer, TSDataType> getDataType,
+ TSEncoding[] encodings,
+ CompressionType[] compressionTypes,
boolean aligned)
throws MetadataException {
try {
@@ -1898,14 +1924,23 @@
if (measurementMNode == null) {
if (config.isAutoCreateSchemaEnabled()) {
if (aligned) {
+ TSDataType dataType = getDataType.apply(i);
internalAlignedCreateTimeseries(
devicePath,
Collections.singletonList(measurements[i]),
- Collections.singletonList(getDataType.apply(i)));
-
+ Collections.singletonList(dataType),
+ Collections.singletonList(
+ encodings[i] == null ? getDefaultEncoding(dataType) : encodings[i]),
+ Collections.singletonList(
+ compressionTypes[i] == null
+ ? TSFileDescriptor.getInstance().getConfig().getCompressor()
+ : compressionTypes[i]));
} else {
internalCreateTimeseries(
- devicePath.concatNode(measurements[i]), getDataType.apply(i));
+ devicePath.concatNode(measurements[i]),
+ getDataType.apply(i),
+ encodings[i],
+ compressionTypes[i]);
}
// after creating timeseries, the deviceMNode has been replaced by a new entityMNode
deviceMNode = mtree.getNodeByPath(devicePath);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 657fd9b..bf2ada9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -1663,6 +1663,8 @@
PartialPath devicePath,
String[] measurements,
Function<Integer, TSDataType> getDataType,
+ TSEncoding[] encodings,
+ CompressionType[] compressionTypes,
boolean aligned)
throws MetadataException {
throw new UnsupportedOperationException();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 980ccbe..c0df8bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -28,8 +28,12 @@
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.exception.VerifyMetadataException;
import org.apache.iotdb.db.exception.metadata.template.TemplateImcompatibeException;
import org.apache.iotdb.db.exception.sql.MeasurementNotExistException;
import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -42,6 +46,8 @@
import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
@@ -79,6 +85,7 @@
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement;
@@ -96,13 +103,23 @@
import org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkTypeStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,6 +127,7 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -1436,29 +1454,49 @@
Map<String, Long> device2MinTime = new HashMap<>();
Map<String, Long> device2MaxTime = new HashMap<>();
+ Map<String, Map<MeasurementSchema, File>> device2Schemas = new HashMap<>();
+ Map<String, Pair<Boolean, File>> device2IsAligned = new HashMap<>();
+
+ // analyze tsfile metadata
for (File tsFile : loadTsFileStatement.getTsFiles()) {
try {
- TsFileResource resource = new TsFileResource(tsFile);
- FileLoaderUtils.loadOrGenerateResource(resource);
- for (String device : resource.getDevices()) {
- device2MinTime.put(
- device,
- Math.min(
- device2MinTime.getOrDefault(device, Long.MAX_VALUE),
- resource.getStartTime(device)));
- device2MaxTime.put(
- device,
- Math.max(
- device2MaxTime.getOrDefault(device, Long.MIN_VALUE),
- resource.getEndTime(device)));
- }
- } catch (IOException e) {
+ TsFileResource resource =
+ analyzeTsFile(
+ loadTsFileStatement,
+ tsFile,
+ device2MinTime,
+ device2MaxTime,
+ device2Schemas,
+ device2IsAligned);
+ loadTsFileStatement.addTsFileResource(resource);
+ } catch (Exception e) {
logger.error(String.format("Parse file %s to resource error.", tsFile.getPath()), e);
throw new SemanticException(
String.format("Parse file %s to resource error", tsFile.getPath()));
}
}
+ // auto create and verify schema
+ if (loadTsFileStatement.isVerifySchema() || loadTsFileStatement.isAutoCreateSchema()) {
+ try {
+ if (loadTsFileStatement.isVerifySchema()) {
+ verifyLoadingMeasurements(device2Schemas);
+ }
+ autoCreateSg(loadTsFileStatement.getSgLevel(), device2Schemas);
+ ISchemaTree schemaTree = autoCreateSchema(device2Schemas, device2IsAligned);
+ if (loadTsFileStatement.isVerifySchema()) {
+ verifySchema(schemaTree, device2Schemas, device2IsAligned);
+ }
+ } catch (Exception e) {
+ logger.error("Auto create or verify schema error.", e);
+ throw new SemanticException(
+ String.format(
+ "Auto create or verify schema error when executing statement %s.",
+ loadTsFileStatement));
+ }
+ }
+
+ // construct partition info
List<DataPartitionQueryParam> params = new ArrayList<>();
for (Map.Entry<String, Long> entry : device2MinTime.entrySet()) {
List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
@@ -1485,6 +1523,260 @@
return analysis;
}
+ private TsFileResource analyzeTsFile(
+ LoadTsFileStatement statement,
+ File tsFile,
+ Map<String, Long> device2MinTime,
+ Map<String, Long> device2MaxTime,
+ Map<String, Map<MeasurementSchema, File>> device2Schemas,
+ Map<String, Pair<Boolean, File>> device2IsAligned)
+ throws IOException, VerifyMetadataException {
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) {
+ Map<String, List<TimeseriesMetadata>> device2Metadata = reader.getAllTimeseriesMetadata(true);
+
+ if (statement.isAutoCreateSchema() || statement.isVerifySchema()) {
+ // construct schema
+ for (Map.Entry<String, List<TimeseriesMetadata>> entry : device2Metadata.entrySet()) {
+ String device = entry.getKey();
+ List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
+ boolean isAligned = false;
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+ TSDataType dataType = timeseriesMetadata.getTSDataType();
+ if (!dataType.equals(TSDataType.VECTOR)) {
+ ChunkHeader chunkHeader =
+ getChunkHeaderByTimeseriesMetadata(reader, timeseriesMetadata);
+ MeasurementSchema measurementSchema =
+ new MeasurementSchema(
+ timeseriesMetadata.getMeasurementId(),
+ dataType,
+ chunkHeader.getEncodingType(),
+ chunkHeader.getCompressionType());
+ device2Schemas
+ .computeIfAbsent(device, o -> new HashMap<>())
+ .put(measurementSchema, tsFile);
+ } else {
+ isAligned = true;
+ }
+ }
+ boolean finalIsAligned = isAligned;
+ if (!device2IsAligned
+ .computeIfAbsent(device, o -> new Pair<>(finalIsAligned, tsFile))
+ .left
+ .equals(isAligned)) {
+ throw new VerifyMetadataException(
+ String.format(
+ "Device %s has different aligned definition in tsFile %s and other TsFile.",
+ device, tsFile.getParentFile()));
+ }
+ }
+ }
+
+ // construct TsFileResource
+ TsFileResource resource = new TsFileResource(tsFile);
+ FileLoaderUtils.updateTsFileResource(device2Metadata, resource);
+ resource.updatePlanIndexes(reader.getMinPlanIndex());
+ resource.updatePlanIndexes(reader.getMaxPlanIndex());
+
+ // construct device time range
+ for (String device : resource.getDevices()) {
+ device2MinTime.put(
+ device,
+ Math.min(
+ device2MinTime.getOrDefault(device, Long.MAX_VALUE),
+ resource.getStartTime(device)));
+ device2MaxTime.put(
+ device,
+ Math.max(
+ device2MaxTime.getOrDefault(device, Long.MIN_VALUE), resource.getEndTime(device)));
+ }
+
+ resource.setStatus(TsFileResourceStatus.CLOSED);
+ return resource;
+ }
+ }
+
+ private ChunkHeader getChunkHeaderByTimeseriesMetadata(
+ TsFileSequenceReader reader, TimeseriesMetadata timeseriesMetadata) throws IOException {
+ IChunkMetadata chunkMetadata = timeseriesMetadata.getChunkMetadataList().get(0);
+ reader.position(chunkMetadata.getOffsetOfChunkHeader());
+ return reader.readChunkHeader(reader.readMarker());
+ }
+
+ private void autoCreateSg(int sgLevel, Map<String, Map<MeasurementSchema, File>> device2Schemas)
+ throws VerifyMetadataException, LoadFileException, IllegalPathException {
+ sgLevel += 1; // e.g. "root.sg" means sgLevel = 1, "root.sg.test" means sgLevel=2
+ Set<PartialPath> sgSet = new HashSet<>();
+ for (String device : device2Schemas.keySet()) {
+ PartialPath devicePath = new PartialPath(device);
+
+ String[] nodes = devicePath.getNodes();
+ String[] sgNodes = new String[sgLevel];
+ if (nodes.length < sgLevel) {
+ throw new VerifyMetadataException(
+ String.format("Sg level %d is longer than device %s.", sgLevel, device));
+ }
+ for (int i = 0; i < sgLevel; i++) {
+ sgNodes[i] = nodes[i];
+ }
+ PartialPath sgPath = new PartialPath(sgNodes);
+ sgSet.add(sgPath);
+ }
+
+ for (PartialPath sgPath : sgSet) {
+ SetStorageGroupStatement statement = new SetStorageGroupStatement();
+ statement.setStorageGroupPath(sgPath);
+ executeSetStorageGroupStatement(statement);
+ }
+ }
+
+ private void executeSetStorageGroupStatement(Statement statement) throws LoadFileException {
+ long queryId = SessionManager.getInstance().requestQueryId(false);
+ ExecutionResult result =
+ Coordinator.getInstance()
+ .execute(
+ statement,
+ queryId,
+ null,
+ "",
+ partitionFetcher,
+ schemaFetcher,
+ IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code != TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode()) {
+ logger.error(String.format("Set Storage group error, statement: %s.", statement));
+ logger.error(String.format("Set storage group result status : %s.", result.status));
+ throw new LoadFileException(
+ String.format("Can not execute set storage group statement: %s", statement));
+ }
+ }
+
+ private ISchemaTree autoCreateSchema(
+ Map<String, Map<MeasurementSchema, File>> device2Schemas,
+ Map<String, Pair<Boolean, File>> device2IsAligned)
+ throws IllegalPathException {
+ List<PartialPath> deviceList = new ArrayList<>();
+ List<String[]> measurementList = new ArrayList<>();
+ List<TSDataType[]> dataTypeList = new ArrayList<>();
+ List<TSEncoding[]> encodingsList = new ArrayList<>();
+ List<CompressionType[]> compressionTypesList = new ArrayList<>();
+ List<Boolean> isAlignedList = new ArrayList<>();
+
+ for (Map.Entry<String, Map<MeasurementSchema, File>> entry : device2Schemas.entrySet()) {
+ int measurementSize = entry.getValue().size();
+ String[] measurements = new String[measurementSize];
+ TSDataType[] tsDataTypes = new TSDataType[measurementSize];
+ TSEncoding[] encodings = new TSEncoding[measurementSize];
+ CompressionType[] compressionTypes = new CompressionType[measurementSize];
+
+ int index = 0;
+ for (MeasurementSchema measurementSchema : entry.getValue().keySet()) {
+ measurements[index] = measurementSchema.getMeasurementId();
+ tsDataTypes[index] = measurementSchema.getType();
+ encodings[index] = measurementSchema.getEncodingType();
+ compressionTypes[index++] = measurementSchema.getCompressor();
+ }
+
+ deviceList.add(new PartialPath(entry.getKey()));
+ measurementList.add(measurements);
+ dataTypeList.add(tsDataTypes);
+ encodingsList.add(encodings);
+ compressionTypesList.add(compressionTypes);
+ isAlignedList.add(device2IsAligned.get(entry.getKey()).left);
+ }
+
+ return SchemaValidator.validate(
+ deviceList,
+ measurementList,
+ dataTypeList,
+ encodingsList,
+ compressionTypesList,
+ isAlignedList);
+ }
+
+ private void verifyLoadingMeasurements(Map<String, Map<MeasurementSchema, File>> device2Schemas)
+ throws VerifyMetadataException {
+ for (Map.Entry<String, Map<MeasurementSchema, File>> deviceEntry : device2Schemas.entrySet()) {
+ Map<String, MeasurementSchema> id2Schema = new HashMap<>();
+ Map<MeasurementSchema, File> schema2TsFile = deviceEntry.getValue();
+ for (Map.Entry<MeasurementSchema, File> entry : schema2TsFile.entrySet()) {
+ String measurementId = entry.getKey().getMeasurementId();
+ if (!id2Schema.containsKey(measurementId)) {
+ id2Schema.put(measurementId, entry.getKey());
+ } else {
+ MeasurementSchema conflictSchema = id2Schema.get(measurementId);
+ String msg =
+ String.format(
+ "Measurement %s Conflict, TsFile %s has measurement: %s, TsFile %s has measurement %s.",
+ deviceEntry.getKey() + measurementId,
+ entry.getValue().getPath(),
+ entry.getKey(),
+ schema2TsFile.get(conflictSchema).getPath(),
+ conflictSchema);
+ logger.error(msg);
+ throw new VerifyMetadataException(msg);
+ }
+ }
+ }
+ }
+
+ private void verifySchema(
+ ISchemaTree schemaTree,
+ Map<String, Map<MeasurementSchema, File>> device2Schemas,
+ Map<String, Pair<Boolean, File>> device2IsAligned)
+ throws VerifyMetadataException, IllegalPathException {
+ for (Map.Entry<String, Map<MeasurementSchema, File>> entry : device2Schemas.entrySet()) {
+ String device = entry.getKey();
+ MeasurementSchema[] tsFileSchemas =
+ entry.getValue().keySet().toArray(new MeasurementSchema[0]);
+ DeviceSchemaInfo schemaInfo =
+ schemaTree.searchDeviceSchemaInfo(
+ new PartialPath(device),
+ Arrays.stream(tsFileSchemas)
+ .map(MeasurementSchema::getMeasurementId)
+ .collect(Collectors.toList()));
+ if (schemaInfo.isAligned() != device2IsAligned.get(device).left) {
+ throw new VerifyMetadataException(
+ device,
+ "Is aligned",
+ device2IsAligned.get(device).left.toString(),
+ device2IsAligned.get(device).right.getPath(),
+ String.valueOf(schemaInfo.isAligned()));
+ }
+ List<MeasurementSchema> originSchemaList = schemaInfo.getMeasurementSchemaList();
+ int measurementSize = originSchemaList.size();
+ for (int j = 0; j < measurementSize; j++) {
+ MeasurementSchema originSchema = originSchemaList.get(j);
+ MeasurementSchema tsFileSchema = tsFileSchemas[j];
+ String measurementPath =
+ device + TsFileConstant.PATH_SEPARATOR + originSchema.getMeasurementId();
+ if (!tsFileSchema.getType().equals(originSchema.getType())) {
+ throw new VerifyMetadataException(
+ measurementPath,
+ "Datatype",
+ tsFileSchema.getType().name(),
+ entry.getValue().get(tsFileSchema).getPath(),
+ originSchema.getType().name());
+ }
+ if (!tsFileSchema.getEncodingType().equals(originSchema.getEncodingType())) {
+ throw new VerifyMetadataException(
+ measurementPath,
+ "Encoding",
+ tsFileSchema.getEncodingType().name(),
+ entry.getValue().get(tsFileSchema).getPath(),
+ originSchema.getEncodingType().name());
+ }
+ if (!tsFileSchema.getCompressor().equals(originSchema.getCompressor())) {
+ throw new VerifyMetadataException(
+ measurementPath,
+ "Compress type",
+ tsFileSchema.getCompressor().name(),
+ entry.getValue().get(tsFileSchema).getPath(),
+ originSchema.getCompressor().name());
+ }
+ }
+ }
+ }
+
@Override
public Analysis visitShowTimeSeries(
ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 643e8a3..d80ef21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -208,6 +208,8 @@
indexOfMissingMeasurements,
measurements,
getDataType,
+ null,
+ null,
isAligned);
schemaTree.mergeSchemaTree(missingSchemaTree);
@@ -225,7 +227,20 @@
List<String[]> measurementsList,
List<TSDataType[]> tsDataTypesList,
List<Boolean> isAlignedList) {
+ return fetchSchemaListWithAutoCreate(
+ devicePathList, measurementsList, tsDataTypesList, null, null, isAlignedList);
+ }
+
+ @Override
+ public ISchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePathList,
+ List<String[]> measurementsList,
+ List<TSDataType[]> tsDataTypesList,
+ List<TSEncoding[]> encodingsList,
+ List<CompressionType[]> compressionTypesList,
+ List<Boolean> isAlignedList) {
schemaCache.takeReadLock();
+
try {
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
PathPatternTree patternTree = new PathPatternTree();
@@ -264,6 +279,8 @@
indexOfMissingMeasurementsList.get(i),
measurementsList.get(i),
index -> tsDataTypesList.get(finalI)[index],
+ encodingsList == null ? null : encodingsList.get(i),
+ compressionTypesList == null ? null : compressionTypesList.get(i),
isAlignedList.get(i));
schemaTree.mergeSchemaTree(missingSchemaTree);
schemaCache.put(missingSchemaTree);
@@ -295,6 +312,8 @@
List<Integer> indexOfMissingMeasurements,
String[] measurements,
Function<Integer, TSDataType> getDataType,
+ TSEncoding[] encodings,
+ CompressionType[] compressionTypes,
boolean isAligned) {
DeviceSchemaInfo deviceSchemaInfo =
schemaTree.searchDeviceSchemaInfo(
@@ -356,15 +375,31 @@
List<String> missingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size());
List<TSDataType> dataTypesOfMissingMeasurement =
new ArrayList<>(indexOfMissingMeasurements.size());
+ List<TSEncoding> encodingsOfMissingMeasurement =
+ new ArrayList<>(indexOfMissingMeasurements.size());
+ List<CompressionType> compressionTypesOfMissingMeasurement =
+ new ArrayList<>(indexOfMissingMeasurements.size());
indexOfMissingMeasurements.forEach(
index -> {
+ TSDataType tsDataType = getDataType.apply(index);
missingMeasurements.add(measurements[index]);
- dataTypesOfMissingMeasurement.add(getDataType.apply(index));
+ dataTypesOfMissingMeasurement.add(tsDataType);
+ encodingsOfMissingMeasurement.add(
+ encodings == null ? getDefaultEncoding(tsDataType) : encodings[index]);
+ compressionTypesOfMissingMeasurement.add(
+ compressionTypes == null
+ ? TSFileDescriptor.getInstance().getConfig().getCompressor()
+ : compressionTypes[index]);
});
schemaTree.mergeSchemaTree(
internalCreateTimeseries(
- devicePath, missingMeasurements, dataTypesOfMissingMeasurement, isAligned));
+ devicePath,
+ missingMeasurements,
+ dataTypesOfMissingMeasurement,
+ encodingsOfMissingMeasurement,
+ compressionTypesOfMissingMeasurement,
+ isAligned));
return schemaTree;
}
@@ -392,15 +427,9 @@
PartialPath devicePath,
List<String> measurements,
List<TSDataType> tsDataTypes,
+ List<TSEncoding> encodings,
+ List<CompressionType> compressors,
boolean isAligned) {
-
- List<TSEncoding> encodings = new ArrayList<>();
- List<CompressionType> compressors = new ArrayList<>();
- for (TSDataType dataType : tsDataTypes) {
- encodings.add(getDefaultEncoding(dataType));
- compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
- }
-
List<MeasurementPath> measurementPathList =
executeInternalCreateTimeseriesStatement(
new InternalCreateTimeSeriesStatement(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 4e478b1..79208ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -28,7 +28,9 @@
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaInternalNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -112,6 +114,17 @@
}
@Override
+ public ISchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePath,
+ List<String[]> measurements,
+ List<TSDataType[]> tsDataTypes,
+ List<TSEncoding[]> encodings,
+ List<CompressionType[]> compressionTypes,
+ List<Boolean> aligned) {
+ return null;
+ }
+
+ @Override
public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
index 1e319ef..a688949 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
@@ -23,7 +23,9 @@
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.List;
@@ -49,6 +51,14 @@
List<TSDataType[]> tsDataTypes,
List<Boolean> aligned);
+ ISchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePath,
+ List<String[]> measurements,
+ List<TSDataType[]> tsDataTypes,
+ List<TSEncoding[]> encodings,
+ List<CompressionType[]> compressionTypes,
+ List<Boolean> aligned);
+
Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path);
Map<Integer, Template> checkAllRelatedTemplate(PartialPath pathPattern);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
index bb34ce3..f40e82a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
@@ -25,7 +25,9 @@
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.BatchInsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import java.util.List;
@@ -67,8 +69,10 @@
List<PartialPath> devicePaths,
List<String[]> measurements,
List<TSDataType[]> dataTypes,
+ List<TSEncoding[]> encodings,
+ List<CompressionType[]> compressionTypes,
List<Boolean> isAlignedList) {
return SCHEMA_FETCHER.fetchSchemaListWithAutoCreate(
- devicePaths, measurements, dataTypes, isAlignedList);
+ devicePaths, measurements, dataTypes, encodings, compressionTypes, isAlignedList);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index e056f42..f4060e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -33,7 +33,9 @@
import org.apache.iotdb.db.mpp.common.schematree.DeviceGroupSchemaTree;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.ArrayList;
@@ -87,7 +89,13 @@
Function<Integer, TSDataType> getDataType,
boolean aligned) {
DeviceSchemaInfo deviceSchemaInfo =
- getDeviceSchemaInfoWithAutoCreate(devicePath, measurements, getDataType, aligned);
+ getDeviceSchemaInfoWithAutoCreate(
+ devicePath,
+ measurements,
+ getDataType,
+ new TSEncoding[measurements.length],
+ new CompressionType[measurements.length],
+ aligned);
DeviceGroupSchemaTree schemaTree = new DeviceGroupSchemaTree();
schemaTree.addDeviceInfo(deviceSchemaInfo);
return schemaTree;
@@ -97,12 +105,14 @@
PartialPath devicePath,
String[] measurements,
Function<Integer, TSDataType> getDataType,
+ TSEncoding[] encodings,
+ CompressionType[] compressionTypes,
boolean aligned) {
try {
SchemaRegionId schemaRegionId = localConfigNode.getBelongedSchemaRegionId(devicePath);
ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
return schemaRegion.getDeviceSchemaInfoWithAutoCreate(
- devicePath, measurements, getDataType, aligned);
+ devicePath, measurements, getDataType, encodings, compressionTypes, aligned);
} catch (MetadataException e) {
throw new RuntimeException(e);
}
@@ -114,6 +124,18 @@
List<String[]> measurementsList,
List<TSDataType[]> tsDataTypesList,
List<Boolean> isAlignedList) {
+ return fetchSchemaListWithAutoCreate(
+ devicePathList, measurementsList, tsDataTypesList, null, null, isAlignedList);
+ }
+
+ @Override
+ public ISchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePathList,
+ List<String[]> measurementsList,
+ List<TSDataType[]> tsDataTypesList,
+ List<TSEncoding[]> encodingsList,
+ List<CompressionType[]> compressionTypesList,
+ List<Boolean> isAlignedList) {
Map<PartialPath, List<Integer>> deviceMap = new HashMap<>();
for (int i = 0, size = devicePathList.size(); i < size; i++) {
deviceMap.computeIfAbsent(devicePathList.get(i), k -> new ArrayList<>()).add(i);
@@ -134,6 +156,8 @@
String[] measurements = new String[totalSize];
TSDataType[] tsDataTypes = new TSDataType[totalSize];
+ TSEncoding[] encodings = new TSEncoding[totalSize];
+ CompressionType[] compressionTypes = new CompressionType[totalSize];
int curPos = 0;
for (int index : entry.getValue()) {
@@ -145,12 +169,29 @@
measurementsList.get(index).length);
System.arraycopy(
tsDataTypesList.get(index), 0, tsDataTypes, curPos, tsDataTypesList.get(index).length);
+ if (encodingsList != null) {
+ System.arraycopy(
+ encodingsList.get(index), 0, encodings, curPos, encodingsList.get(index).length);
+ }
+ if (compressionTypesList != null) {
+ System.arraycopy(
+ compressionTypesList.get(index),
+ 0,
+ compressionTypes,
+ curPos,
+ compressionTypesList.get(index).length);
+ }
curPos += measurementsList.get(index).length;
}
schemaTree.addDeviceInfo(
getDeviceSchemaInfoWithAutoCreate(
- entry.getKey(), measurements, index -> tsDataTypes[index], isAligned));
+ entry.getKey(),
+ measurements,
+ index -> tsDataTypes[index],
+ encodings,
+ compressionTypes,
+ isAligned));
}
return schemaTree;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 8561ad6..7ceee7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -479,7 +479,7 @@
@Override
public PlanNode visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
return new LoadTsFileNode(
- context.getQueryId().genPlanNodeId(), loadTsFileStatement.getTsFiles());
+ context.getQueryId().genPlanNodeId(), loadTsFileStatement.getResources());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 91519fe..21edd2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -23,20 +23,16 @@
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.partition.DataPartition;
-import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.load.AlignedChunkData;
import org.apache.iotdb.db.engine.load.ChunkData;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
-import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -85,15 +81,13 @@
super(id);
}
- public LoadSingleTsFileNode(PlanNodeId id, File tsFile) throws IOException {
+ public LoadSingleTsFileNode(PlanNodeId id, TsFileResource resource) {
super(id);
- this.tsFile = tsFile;
- this.resource = new TsFileResource(tsFile);
-
- FileLoaderUtils.loadOrGenerateResource(resource);
+ this.tsFile = resource.getTsFile();
+ this.resource = resource;
}
- public void checkIfNeedDecodeTsFile(DataPartition dataPartition) {
+ public void checkIfNeedDecodeTsFile(DataPartition dataPartition) throws IOException {
Set<TRegionReplicaSet> allRegionReplicaSet = new HashSet<>();
needDecodeTsFile = false;
for (String device : resource.getDevices()) {
@@ -105,6 +99,9 @@
allRegionReplicaSet.addAll(dataPartition.getAllDataRegionReplicaSetForOneDevice(device));
}
needDecodeTsFile = !isDispatchedToLocal(allRegionReplicaSet);
+ if (!needDecodeTsFile) {
+ resource.serialize();
+ }
}
private boolean isDispatchedToLocal(Set<TRegionReplicaSet> replicaSets) {
@@ -127,45 +124,6 @@
&& IoTDBDescriptor.getInstance().getConfig().getInternalPort() == endPoint.port;
}
- public void autoRegisterSchema()
- throws IOException, IllegalPathException { // TODO: only support sg level=1
- try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) {
- List<PartialPath> deviceList = new ArrayList<>();
- List<String[]> measurementList = new ArrayList<>();
- List<TSDataType[]> dataTypeList = new ArrayList<>();
- List<Boolean> isAlignedList = new ArrayList<>();
-
- Map<String, List<TimeseriesMetadata>> device2Metadata = reader.getAllTimeseriesMetadata(true);
- for (Map.Entry<String, List<TimeseriesMetadata>> entry : device2Metadata.entrySet()) {
- deviceList.add(new PartialPath(entry.getKey()));
-
- List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
- boolean isAligned =
- timeseriesMetadataList.stream()
- .mapToInt(o -> o.getTSDataType().equals(TSDataType.VECTOR) ? 1 : 0)
- .sum()
- != 0;
- int measurementSize = timeseriesMetadataList.size() - (isAligned ? 1 : 0);
- String[] measurements = new String[measurementSize];
- TSDataType[] tsDataTypes = new TSDataType[measurementSize];
-
- int index = 0;
- for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
- TSDataType dataType = timeseriesMetadata.getTSDataType();
- if (!dataType.equals(TSDataType.VECTOR)) {
- measurements[index] = timeseriesMetadata.getMeasurementId();
- tsDataTypes[index++] = dataType;
- }
- }
- measurementList.add(measurements);
- dataTypeList.add(tsDataTypes);
- isAlignedList.add(isAligned);
- }
-
- SchemaValidator.validate(deviceList, measurementList, dataTypeList, isAlignedList);
- }
- }
-
public boolean needDecodeTsFile() {
return needDecodeTsFile;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
index c991a9c..c0f6f8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.load;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -30,7 +31,6 @@
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -39,15 +39,15 @@
public class LoadTsFileNode extends WritePlanNode {
private static final Logger logger = LoggerFactory.getLogger(LoadTsFileNode.class);
- private final List<File> tsFiles;
+ private final List<TsFileResource> resources;
public LoadTsFileNode(PlanNodeId id) {
this(id, new ArrayList<>());
}
- public LoadTsFileNode(PlanNodeId id, List<File> tsFiles) {
+ public LoadTsFileNode(PlanNodeId id, List<TsFileResource> resources) {
super(id);
- this.tsFiles = tsFiles;
+ this.resources = resources;
}
@Override
@@ -87,18 +87,16 @@
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
List<WritePlanNode> res = new ArrayList<>();
- for (File file : tsFiles) {
+ for (TsFileResource resource : resources) {
try {
- LoadSingleTsFileNode singleTsFileNode = new LoadSingleTsFileNode(getPlanNodeId(), file);
+ LoadSingleTsFileNode singleTsFileNode = new LoadSingleTsFileNode(getPlanNodeId(), resource);
singleTsFileNode.checkIfNeedDecodeTsFile(analysis.getDataPartitionInfo());
- singleTsFileNode.autoRegisterSchema();
-
if (singleTsFileNode.needDecodeTsFile()) {
singleTsFileNode.splitTsFileByDataPartition(analysis.getDataPartitionInfo());
}
res.add(singleTsFileNode);
} catch (Exception e) {
- logger.error(String.format("Parse TsFile %s error", file.getPath()), e);
+ logger.error(String.format("Parse TsFile %s error", resource.getTsFile().getPath()), e);
}
}
return res;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
index 4fb39ca..f5cd7e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
@@ -21,6 +21,7 @@
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -40,14 +41,16 @@
private boolean verifySchema;
private List<File> tsFiles;
+ private List<TsFileResource> resources;
public LoadTsFileStatement(String filePath) {
this.file = new File(filePath);
this.autoCreateSchema = true;
this.sgLevel = IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
this.verifySchema = true;
+ this.tsFiles = new ArrayList<>();
+ this.resources = new ArrayList<>();
- tsFiles = new ArrayList<>();
if (file.isFile()) {
tsFiles.add(file);
} else {
@@ -98,10 +101,30 @@
this.verifySchema = verifySchema;
}
+ public boolean isVerifySchema() {
+ return verifySchema;
+ }
+
+ public boolean isAutoCreateSchema() {
+ return autoCreateSchema;
+ }
+
+ public int getSgLevel() {
+ return sgLevel;
+ }
+
public List<File> getTsFiles() {
return tsFiles;
}
+ public void addTsFileResource(TsFileResource resource) {
+ resources.add(resource);
+ }
+
+ public List<TsFileResource> getResources() {
+ return resources;
+ }
+
@Override
public List<? extends PartialPath> getPaths() {
return Collections.emptyList();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SetStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SetStorageGroupStatement.java
index 28e95a0..799dd03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SetStorageGroupStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SetStorageGroupStatement.java
@@ -97,4 +97,20 @@
public Long getTimePartitionInterval() {
return timePartitionInterval;
}
+
+ @Override
+ public String toString() {
+ return "SetStorageGroupStatement{"
+ + "storageGroupPath="
+ + storageGroupPath
+ + ", ttl="
+ + ttl
+ + ", schemaReplicationFactor="
+ + schemaReplicationFactor
+ + ", dataReplicationFactor="
+ + dataReplicationFactor
+ + ", timePartitionInterval="
+ + timePartitionInterval
+ + '}';
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 35d8500..fd3e136 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -44,6 +44,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -68,8 +69,14 @@
public static void updateTsFileResource(
TsFileSequenceReader reader, TsFileResource tsFileResource) throws IOException {
- for (Entry<String, List<TimeseriesMetadata>> entry :
- reader.getAllTimeseriesMetadata(false).entrySet()) {
+ updateTsFileResource(reader.getAllTimeseriesMetadata(false), tsFileResource);
+ tsFileResource.updatePlanIndexes(reader.getMinPlanIndex());
+ tsFileResource.updatePlanIndexes(reader.getMaxPlanIndex());
+ }
+
+ public static void updateTsFileResource(
+ Map<String, List<TimeseriesMetadata>> device2Metadata, TsFileResource tsFileResource) {
+ for (Entry<String, List<TimeseriesMetadata>> entry : device2Metadata.entrySet()) {
for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
tsFileResource.updateStartTime(
entry.getKey(), timeseriesMetaData.getStatistics().getStartTime());
@@ -77,8 +84,6 @@
entry.getKey(), timeseriesMetaData.getStatistics().getEndTime());
}
}
- tsFileResource.updatePlanIndexes(reader.getMinPlanIndex());
- tsFileResource.updatePlanIndexes(reader.getMaxPlanIndex());
}
/**
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 8e714f4..1004d6e 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -72,7 +72,8 @@
CREATE_TEMPLATE_ERROR(340),
SYNC_FILE_REBASE(341),
SYNC_FILE_ERROR(342),
- MEASUREMENT_IN_BLACK_LIST(343),
+ VERIFY_METADATA_ERROR(343),
+ MEASUREMENT_IN_BLACK_LIST(344),
EXECUTE_STATEMENT_ERROR(400),
SQL_PARSE_ERROR(401),