[IOTDB-4111][IOTDB-3461] Improve data type infer and auto create schema logic in schema fetcher (#6962)
[IOTDB-4111][IOTDB-3461] Improve data type infer and auto create schema logic in schema fetcher (#6962)
diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 686b0f0..e289ec4 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -1924,7 +1924,10 @@
@Override
public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
- PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned)
+ PartialPath devicePath,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ boolean aligned)
throws MetadataException {
throw new UnsupportedOperationException();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index afa6bbf..4ff65c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -51,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
/**
* This interface defines all interfaces and behaviours that one SchemaRegion should support and
@@ -358,7 +359,10 @@
IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) throws MetadataException, IOException;
DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
- PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned)
+ PartialPath devicePath,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ boolean aligned)
throws MetadataException;
// endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 0a8bac1..452f2c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -107,6 +107,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Stream;
import static java.util.stream.Collectors.toList;
@@ -1749,7 +1750,10 @@
@Override
public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
- PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned)
+ PartialPath devicePath,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ boolean aligned)
throws MetadataException {
try {
List<MeasurementSchemaInfo> measurementSchemaInfoList = new ArrayList<>(measurements.length);
@@ -1763,10 +1767,11 @@
internalAlignedCreateTimeseries(
devicePath,
Collections.singletonList(measurements[i]),
- Collections.singletonList(tsDataTypes[i]));
+ Collections.singletonList(getDataType.apply(i)));
} else {
- internalCreateTimeseries(devicePath.concatNode(measurements[i]), tsDataTypes[i]);
+ internalCreateTimeseries(
+ devicePath.concatNode(measurements[i]), getDataType.apply(i));
}
// after creating timeseries, the deviceMNode has been replaced by a new entityMNode
deviceMNode = mtree.getNodeByPath(devicePath);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index c68aa3d..49fd195 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -104,6 +104,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Stream;
import static java.util.stream.Collectors.toList;
@@ -1609,7 +1610,10 @@
@Override
public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
- PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned)
+ PartialPath devicePath,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ boolean aligned)
throws MetadataException {
throw new UnsupportedOperationException();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index c3e97ec..77ff8e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -68,7 +68,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
@@ -114,7 +116,6 @@
ClusterPartitionFetcher.getInstance(),
this,
config.getQueryTimeoutThreshold());
- // TODO: (xingtanzjr) throw exception
if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException(
String.format(
@@ -172,24 +173,28 @@
@Override
public ISchemaTree fetchSchemaWithAutoCreate(
- PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean isAligned) {
-
+ PartialPath devicePath,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ boolean isAligned) {
ClusterSchemaTree schemaTree = schemaCache.get(devicePath, measurements);
- Pair<List<String>, List<TSDataType>> missingMeasurements =
- checkMissingMeasurements(schemaTree, devicePath, measurements, tsDataTypes);
+ List<Integer> indexOfMissingMeasurements =
+ checkMissingMeasurements(schemaTree, devicePath, measurements);
- PathPatternTree patternTree = new PathPatternTree();
- for (String measurement : missingMeasurements.left) {
- patternTree.appendFullPath(devicePath, measurement);
- }
-
- if (patternTree.isEmpty()) {
+ if (indexOfMissingMeasurements.isEmpty()) {
return schemaTree;
}
+ PathPatternTree patternTree = new PathPatternTree();
+ for (int index : indexOfMissingMeasurements) {
+ patternTree.appendFullPath(devicePath, measurements[index]);
+ }
+
ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
- schemaTree.mergeSchemaTree(remoteSchemaTree);
- schemaCache.put(remoteSchemaTree);
+ if (!remoteSchemaTree.isEmpty()) {
+ schemaTree.mergeSchemaTree(remoteSchemaTree);
+ schemaCache.put(remoteSchemaTree);
+ }
if (!config.isAutoCreateSchemaEnabled()) {
return schemaTree;
@@ -199,8 +204,9 @@
checkAndAutoCreateMissingMeasurements(
remoteSchemaTree,
devicePath,
- missingMeasurements.left.toArray(new String[0]),
- missingMeasurements.right.toArray(new TSDataType[0]),
+ indexOfMissingMeasurements,
+ measurements,
+ getDataType,
isAligned);
schemaTree.mergeSchemaTree(missingSchemaTree);
@@ -218,17 +224,14 @@
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
PathPatternTree patternTree = new PathPatternTree();
+ List<List<Integer>> indexOfMissingMeasurementsList = new ArrayList<>(devicePathList.size());
for (int i = 0; i < devicePathList.size(); i++) {
schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i), measurementsList.get(i)));
- List<String> missingMeasurements =
- checkMissingMeasurements(
- schemaTree,
- devicePathList.get(i),
- measurementsList.get(i),
- tsDataTypesList.get(i))
- .left;
- for (String measurement : missingMeasurements) {
- patternTree.appendFullPath(devicePathList.get(i), measurement);
+ List<Integer> indexOfMissingMeasurements =
+ checkMissingMeasurements(schemaTree, devicePathList.get(i), measurementsList.get(i));
+ indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
+ for (int index : indexOfMissingMeasurements) {
+ patternTree.appendFullPath(devicePathList.get(i), measurementsList.get(i)[index]);
}
}
@@ -237,8 +240,10 @@
}
ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
- schemaTree.mergeSchemaTree(remoteSchemaTree);
- schemaCache.put(remoteSchemaTree);
+ if (!remoteSchemaTree.isEmpty()) {
+ schemaTree.mergeSchemaTree(remoteSchemaTree);
+ schemaCache.put(remoteSchemaTree);
+ }
if (!config.isAutoCreateSchemaEnabled()) {
return schemaTree;
@@ -246,12 +251,14 @@
ClusterSchemaTree missingSchemaTree;
for (int i = 0; i < devicePathList.size(); i++) {
+ int finalI = i;
missingSchemaTree =
checkAndAutoCreateMissingMeasurements(
schemaTree,
devicePathList.get(i),
+ indexOfMissingMeasurementsList.get(i),
measurementsList.get(i),
- tsDataTypesList.get(i),
+ index -> tsDataTypesList.get(finalI)[index],
isAlignedList.get(i));
schemaTree.mergeSchemaTree(missingSchemaTree);
schemaCache.put(missingSchemaTree);
@@ -277,19 +284,30 @@
private ClusterSchemaTree checkAndAutoCreateMissingMeasurements(
ClusterSchemaTree schemaTree,
PartialPath devicePath,
+ List<Integer> indexOfMissingMeasurements,
String[] measurements,
- TSDataType[] tsDataTypes,
+ Function<Integer, TSDataType> getDataType,
boolean isAligned) {
-
- Pair<List<String>, List<TSDataType>> checkResult =
- checkMissingMeasurements(schemaTree, devicePath, measurements, tsDataTypes);
-
- List<String> missingMeasurements = checkResult.left;
- List<TSDataType> dataTypesOfMissingMeasurement = checkResult.right;
+ DeviceSchemaInfo deviceSchemaInfo =
+ schemaTree.searchDeviceSchemaInfo(
+ devicePath,
+ indexOfMissingMeasurements.stream()
+ .map(index -> measurements[index])
+ .collect(Collectors.toList()));
+ if (deviceSchemaInfo != null) {
+ List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
+ int removedCount = 0;
+ for (int i = 0, size = schemaList.size(); i < size; i++) {
+ if (schemaList.get(i) != null) {
+ indexOfMissingMeasurements.remove(i - removedCount);
+ removedCount++;
+ }
+ }
+ }
ClusterSchemaTree reFetchedSchemaTree = new ClusterSchemaTree();
- if (missingMeasurements.isEmpty()) {
+ if (indexOfMissingMeasurements.isEmpty()) {
return reFetchedSchemaTree;
}
@@ -297,8 +315,8 @@
if (templateInfo != null) {
Template template = templateInfo.left;
boolean shouldActivateTemplate = false;
- for (String missingMeasurement : missingMeasurements) {
- if (template.hasSchema(missingMeasurement)) {
+ for (int index : indexOfMissingMeasurements) {
+ if (template.hasSchema(measurements[index])) {
shouldActivateTemplate = true;
break;
}
@@ -306,16 +324,13 @@
if (shouldActivateTemplate) {
internalActivateTemplate(devicePath);
- List<String> recheckedMissingMeasurements = new ArrayList<>();
- List<TSDataType> recheckedMissingDataTypes = new ArrayList<>();
- for (int i = 0; i < missingMeasurements.size(); i++) {
- if (!template.hasSchema(missingMeasurements.get(i))) {
- recheckedMissingMeasurements.add(missingMeasurements.get(i));
- recheckedMissingDataTypes.add(dataTypesOfMissingMeasurement.get(i));
+ List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
+ for (int i = 0; i < indexOfMissingMeasurements.size(); i++) {
+ if (!template.hasSchema(measurements[i])) {
+ recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i));
}
}
- missingMeasurements = recheckedMissingMeasurements;
- dataTypesOfMissingMeasurement = recheckedMissingDataTypes;
+ indexOfMissingMeasurements = recheckedIndexOfMissingMeasurements;
for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) {
schemaTree.appendSingleMeasurement(
devicePath.concatNode(entry.getKey()),
@@ -324,12 +339,21 @@
template.isDirectAligned());
}
- if (missingMeasurements.isEmpty()) {
+ if (indexOfMissingMeasurements.isEmpty()) {
return schemaTree;
}
}
}
+ List<String> missingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size());
+ List<TSDataType> dataTypesOfMissingMeasurement =
+ new ArrayList<>(indexOfMissingMeasurements.size());
+ indexOfMissingMeasurements.forEach(
+ index -> {
+ missingMeasurements.add(measurements[index]);
+ dataTypesOfMissingMeasurement.add(getDataType.apply(index));
+ });
+
schemaTree.mergeSchemaTree(
internalCreateTimeseries(
devicePath, missingMeasurements, dataTypesOfMissingMeasurement, isAligned));
@@ -337,28 +361,23 @@
return schemaTree;
}
- private Pair<List<String>, List<TSDataType>> checkMissingMeasurements(
- ISchemaTree schemaTree,
- PartialPath devicePath,
- String[] measurements,
- TSDataType[] tsDataTypes) {
+ private List<Integer> checkMissingMeasurements(
+ ISchemaTree schemaTree, PartialPath devicePath, String[] measurements) {
DeviceSchemaInfo deviceSchemaInfo =
schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
if (deviceSchemaInfo == null) {
- return new Pair<>(Arrays.asList(measurements), Arrays.asList(tsDataTypes));
+ return IntStream.range(0, measurements.length).boxed().collect(Collectors.toList());
}
- List<String> missingMeasurements = new ArrayList<>();
- List<TSDataType> dataTypesOfMissingMeasurement = new ArrayList<>();
+ List<Integer> indexOfMissingMeasurements = new ArrayList<>();
List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
for (int i = 0; i < measurements.length; i++) {
if (schemaList.get(i) == null) {
- missingMeasurements.add(measurements[i]);
- dataTypesOfMissingMeasurement.add(tsDataTypes[i]);
+ indexOfMissingMeasurements.add(i);
}
}
- return new Pair<>(missingMeasurements, dataTypesOfMissingMeasurement);
+ return indexOfMissingMeasurements;
}
private ClusterSchemaTree internalCreateTimeseries(
@@ -400,6 +419,8 @@
isAligned);
}
+ schemaCache.put(schemaTree);
+
return schemaTree;
}
@@ -408,7 +429,6 @@
ExecutionResult executionResult = executeStatement(statement);
- // TODO: throw exception
int statusCode = executionResult.status.getCode();
if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return Collections.emptyList();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index af877f0..7c620a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
public class FakeSchemaFetcherImpl implements ISchemaFetcher {
@@ -48,7 +49,10 @@
@Override
public ISchemaTree fetchSchemaWithAutoCreate(
- PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
+ PartialPath devicePath,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ boolean aligned) {
return schemaTree;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
index 331acd6..e175f59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
/**
* This interface is used to fetch the metadata information required in execution plan generating.
@@ -37,7 +38,10 @@
ISchemaTree fetchSchema(PathPatternTree patternTree);
ISchemaTree fetchSchemaWithAutoCreate(
- PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned);
+ PartialPath devicePath,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ boolean aligned);
ISchemaTree fetchSchemaListWithAutoCreate(
List<PartialPath> devicePath,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
index 686a241..e14fbf1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
@@ -48,7 +48,7 @@
SCHEMA_FETCHER.fetchSchemaWithAutoCreate(
insertNode.getDevicePath(),
insertNode.getMeasurements(),
- insertNode.getDataTypes(),
+ insertNode::getDataType,
insertNode.isAligned());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index eea35fc..4e9cc1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -42,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
public class StandaloneSchemaFetcher implements ISchemaFetcher {
@@ -81,21 +82,27 @@
@Override
public ISchemaTree fetchSchemaWithAutoCreate(
- PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
+ PartialPath devicePath,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ boolean aligned) {
DeviceSchemaInfo deviceSchemaInfo =
- getDeviceSchemaInfoWithAutoCreate(devicePath, measurements, tsDataTypes, aligned);
+ getDeviceSchemaInfoWithAutoCreate(devicePath, measurements, getDataType, aligned);
DeviceGroupSchemaTree schemaTree = new DeviceGroupSchemaTree();
schemaTree.addDeviceInfo(deviceSchemaInfo);
return schemaTree;
}
private DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
- PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
+ PartialPath devicePath,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ boolean aligned) {
try {
SchemaRegionId schemaRegionId = localConfigNode.getBelongedSchemaRegionId(devicePath);
ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
return schemaRegion.getDeviceSchemaInfoWithAutoCreate(
- devicePath, measurements, tsDataTypes, aligned);
+ devicePath, measurements, getDataType, aligned);
} catch (MetadataException e) {
throw new RuntimeException(e);
}
@@ -142,7 +149,8 @@
}
schemaTree.addDeviceInfo(
- getDeviceSchemaInfoWithAutoCreate(entry.getKey(), measurements, tsDataTypes, isAligned));
+ getDeviceSchemaInfoWithAutoCreate(
+ entry.getKey(), measurements, index -> tsDataTypes[index], isAligned));
}
return schemaTree;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index d9808da..884e693 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -137,6 +137,10 @@
return dataTypes;
}
+ public TSDataType getDataType(int index) {
+ return dataTypes[index];
+ }
+
public void setDataTypes(TSDataType[] dataTypes) {
this.dataTypes = dataTypes;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index 22ee2b1..42ea49dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -139,6 +139,15 @@
return dataTypes;
}
+ @Override
+ public TSDataType getDataType(int index) {
+ if (isNeedInferType) {
+ return TypeInferenceUtils.getPredictedDataType(values[index], true);
+ } else {
+ return dataTypes[index];
+ }
+ }
+
public Object[] getValues() {
return values;
}