[IOTDB-4111][IOTDB-3461] Improve data type infer and auto create schema logic in schema fetcher (#6962)

[IOTDB-4111][IOTDB-3461] Improve data type infer and auto create schema logic in schema fetcher (#6962)
diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 686b0f0..e289ec4 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -1924,7 +1924,10 @@
 
   @Override
   public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
-      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned)
+      PartialPath devicePath,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      boolean aligned)
       throws MetadataException {
     throw new UnsupportedOperationException();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index afa6bbf..4ff65c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -51,6 +51,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 /**
  * This interface defines all interfaces and behaviours that one SchemaRegion should support and
@@ -358,7 +359,10 @@
   IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) throws MetadataException, IOException;
 
   DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
-      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned)
+      PartialPath devicePath,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      boolean aligned)
       throws MetadataException;
   // endregion
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 0a8bac1..452f2c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -107,6 +107,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.toList;
@@ -1749,7 +1750,10 @@
 
   @Override
   public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
-      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned)
+      PartialPath devicePath,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      boolean aligned)
       throws MetadataException {
     try {
       List<MeasurementSchemaInfo> measurementSchemaInfoList = new ArrayList<>(measurements.length);
@@ -1763,10 +1767,11 @@
               internalAlignedCreateTimeseries(
                   devicePath,
                   Collections.singletonList(measurements[i]),
-                  Collections.singletonList(tsDataTypes[i]));
+                  Collections.singletonList(getDataType.apply(i)));
 
             } else {
-              internalCreateTimeseries(devicePath.concatNode(measurements[i]), tsDataTypes[i]);
+              internalCreateTimeseries(
+                  devicePath.concatNode(measurements[i]), getDataType.apply(i));
             }
             // after creating timeseries, the deviceMNode has been replaced by a new entityMNode
             deviceMNode = mtree.getNodeByPath(devicePath);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index c68aa3d..49fd195 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -104,6 +104,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.toList;
@@ -1609,7 +1610,10 @@
 
   @Override
   public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
-      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned)
+      PartialPath devicePath,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      boolean aligned)
       throws MetadataException {
     throw new UnsupportedOperationException();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index c3e97ec..77ff8e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -68,7 +68,9 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
 
@@ -114,7 +116,6 @@
               ClusterPartitionFetcher.getInstance(),
               this,
               config.getQueryTimeoutThreshold());
-      // TODO: (xingtanzjr) throw exception
       if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         throw new RuntimeException(
             String.format(
@@ -172,24 +173,28 @@
 
   @Override
   public ISchemaTree fetchSchemaWithAutoCreate(
-      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean isAligned) {
-
+      PartialPath devicePath,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      boolean isAligned) {
     ClusterSchemaTree schemaTree = schemaCache.get(devicePath, measurements);
-    Pair<List<String>, List<TSDataType>> missingMeasurements =
-        checkMissingMeasurements(schemaTree, devicePath, measurements, tsDataTypes);
+    List<Integer> indexOfMissingMeasurements =
+        checkMissingMeasurements(schemaTree, devicePath, measurements);
 
-    PathPatternTree patternTree = new PathPatternTree();
-    for (String measurement : missingMeasurements.left) {
-      patternTree.appendFullPath(devicePath, measurement);
-    }
-
-    if (patternTree.isEmpty()) {
+    if (indexOfMissingMeasurements.isEmpty()) {
       return schemaTree;
     }
 
+    PathPatternTree patternTree = new PathPatternTree();
+    for (int index : indexOfMissingMeasurements) {
+      patternTree.appendFullPath(devicePath, measurements[index]);
+    }
+
     ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
-    schemaTree.mergeSchemaTree(remoteSchemaTree);
-    schemaCache.put(remoteSchemaTree);
+    if (!remoteSchemaTree.isEmpty()) {
+      schemaTree.mergeSchemaTree(remoteSchemaTree);
+      schemaCache.put(remoteSchemaTree);
+    }
 
     if (!config.isAutoCreateSchemaEnabled()) {
       return schemaTree;
@@ -199,8 +204,9 @@
         checkAndAutoCreateMissingMeasurements(
             remoteSchemaTree,
             devicePath,
-            missingMeasurements.left.toArray(new String[0]),
-            missingMeasurements.right.toArray(new TSDataType[0]),
+            indexOfMissingMeasurements,
+            measurements,
+            getDataType,
             isAligned);
 
     schemaTree.mergeSchemaTree(missingSchemaTree);
@@ -218,17 +224,14 @@
 
     ClusterSchemaTree schemaTree = new ClusterSchemaTree();
     PathPatternTree patternTree = new PathPatternTree();
+    List<List<Integer>> indexOfMissingMeasurementsList = new ArrayList<>(devicePathList.size());
     for (int i = 0; i < devicePathList.size(); i++) {
       schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i), measurementsList.get(i)));
-      List<String> missingMeasurements =
-          checkMissingMeasurements(
-                  schemaTree,
-                  devicePathList.get(i),
-                  measurementsList.get(i),
-                  tsDataTypesList.get(i))
-              .left;
-      for (String measurement : missingMeasurements) {
-        patternTree.appendFullPath(devicePathList.get(i), measurement);
+      List<Integer> indexOfMissingMeasurements =
+          checkMissingMeasurements(schemaTree, devicePathList.get(i), measurementsList.get(i));
+      indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
+      for (int index : indexOfMissingMeasurements) {
+        patternTree.appendFullPath(devicePathList.get(i), measurementsList.get(i)[index]);
       }
     }
 
@@ -237,8 +240,10 @@
     }
 
     ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
-    schemaTree.mergeSchemaTree(remoteSchemaTree);
-    schemaCache.put(remoteSchemaTree);
+    if (!remoteSchemaTree.isEmpty()) {
+      schemaTree.mergeSchemaTree(remoteSchemaTree);
+      schemaCache.put(remoteSchemaTree);
+    }
 
     if (!config.isAutoCreateSchemaEnabled()) {
       return schemaTree;
@@ -246,12 +251,14 @@
 
     ClusterSchemaTree missingSchemaTree;
     for (int i = 0; i < devicePathList.size(); i++) {
+      int finalI = i;
       missingSchemaTree =
           checkAndAutoCreateMissingMeasurements(
               schemaTree,
               devicePathList.get(i),
+              indexOfMissingMeasurementsList.get(i),
               measurementsList.get(i),
-              tsDataTypesList.get(i),
+              index -> tsDataTypesList.get(finalI)[index],
               isAlignedList.get(i));
       schemaTree.mergeSchemaTree(missingSchemaTree);
       schemaCache.put(missingSchemaTree);
@@ -277,19 +284,30 @@
   private ClusterSchemaTree checkAndAutoCreateMissingMeasurements(
       ClusterSchemaTree schemaTree,
       PartialPath devicePath,
+      List<Integer> indexOfMissingMeasurements,
       String[] measurements,
-      TSDataType[] tsDataTypes,
+      Function<Integer, TSDataType> getDataType,
       boolean isAligned) {
-
-    Pair<List<String>, List<TSDataType>> checkResult =
-        checkMissingMeasurements(schemaTree, devicePath, measurements, tsDataTypes);
-
-    List<String> missingMeasurements = checkResult.left;
-    List<TSDataType> dataTypesOfMissingMeasurement = checkResult.right;
+    DeviceSchemaInfo deviceSchemaInfo =
+        schemaTree.searchDeviceSchemaInfo(
+            devicePath,
+            indexOfMissingMeasurements.stream()
+                .map(index -> measurements[index])
+                .collect(Collectors.toList()));
+    if (deviceSchemaInfo != null) {
+      List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
+      int removedCount = 0;
+      for (int i = 0, size = schemaList.size(); i < size; i++) {
+        if (schemaList.get(i) != null) {
+          indexOfMissingMeasurements.remove(i - removedCount);
+          removedCount++;
+        }
+      }
+    }
 
     ClusterSchemaTree reFetchedSchemaTree = new ClusterSchemaTree();
 
-    if (missingMeasurements.isEmpty()) {
+    if (indexOfMissingMeasurements.isEmpty()) {
       return reFetchedSchemaTree;
     }
 
@@ -297,8 +315,8 @@
     if (templateInfo != null) {
       Template template = templateInfo.left;
       boolean shouldActivateTemplate = false;
-      for (String missingMeasurement : missingMeasurements) {
-        if (template.hasSchema(missingMeasurement)) {
+      for (int index : indexOfMissingMeasurements) {
+        if (template.hasSchema(measurements[index])) {
           shouldActivateTemplate = true;
           break;
         }
@@ -306,16 +324,13 @@
 
       if (shouldActivateTemplate) {
         internalActivateTemplate(devicePath);
-        List<String> recheckedMissingMeasurements = new ArrayList<>();
-        List<TSDataType> recheckedMissingDataTypes = new ArrayList<>();
-        for (int i = 0; i < missingMeasurements.size(); i++) {
-          if (!template.hasSchema(missingMeasurements.get(i))) {
-            recheckedMissingMeasurements.add(missingMeasurements.get(i));
-            recheckedMissingDataTypes.add(dataTypesOfMissingMeasurement.get(i));
+        List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
+        for (int i = 0; i < indexOfMissingMeasurements.size(); i++) {
+          if (!template.hasSchema(measurements[i])) {
+            recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i));
           }
         }
-        missingMeasurements = recheckedMissingMeasurements;
-        dataTypesOfMissingMeasurement = recheckedMissingDataTypes;
+        indexOfMissingMeasurements = recheckedIndexOfMissingMeasurements;
         for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) {
           schemaTree.appendSingleMeasurement(
               devicePath.concatNode(entry.getKey()),
@@ -324,12 +339,21 @@
               template.isDirectAligned());
         }
 
-        if (missingMeasurements.isEmpty()) {
+        if (indexOfMissingMeasurements.isEmpty()) {
           return schemaTree;
         }
       }
     }
 
+    List<String> missingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size());
+    List<TSDataType> dataTypesOfMissingMeasurement =
+        new ArrayList<>(indexOfMissingMeasurements.size());
+    indexOfMissingMeasurements.forEach(
+        index -> {
+          missingMeasurements.add(measurements[index]);
+          dataTypesOfMissingMeasurement.add(getDataType.apply(index));
+        });
+
     schemaTree.mergeSchemaTree(
         internalCreateTimeseries(
             devicePath, missingMeasurements, dataTypesOfMissingMeasurement, isAligned));
@@ -337,28 +361,23 @@
     return schemaTree;
   }
 
-  private Pair<List<String>, List<TSDataType>> checkMissingMeasurements(
-      ISchemaTree schemaTree,
-      PartialPath devicePath,
-      String[] measurements,
-      TSDataType[] tsDataTypes) {
+  private List<Integer> checkMissingMeasurements(
+      ISchemaTree schemaTree, PartialPath devicePath, String[] measurements) {
     DeviceSchemaInfo deviceSchemaInfo =
         schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
     if (deviceSchemaInfo == null) {
-      return new Pair<>(Arrays.asList(measurements), Arrays.asList(tsDataTypes));
+      return IntStream.range(0, measurements.length).boxed().collect(Collectors.toList());
     }
 
-    List<String> missingMeasurements = new ArrayList<>();
-    List<TSDataType> dataTypesOfMissingMeasurement = new ArrayList<>();
+    List<Integer> indexOfMissingMeasurements = new ArrayList<>();
     List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
     for (int i = 0; i < measurements.length; i++) {
       if (schemaList.get(i) == null) {
-        missingMeasurements.add(measurements[i]);
-        dataTypesOfMissingMeasurement.add(tsDataTypes[i]);
+        indexOfMissingMeasurements.add(i);
       }
     }
 
-    return new Pair<>(missingMeasurements, dataTypesOfMissingMeasurement);
+    return indexOfMissingMeasurements;
   }
 
   private ClusterSchemaTree internalCreateTimeseries(
@@ -400,6 +419,8 @@
           isAligned);
     }
 
+    schemaCache.put(schemaTree);
+
     return schemaTree;
   }
 
@@ -408,7 +429,6 @@
 
     ExecutionResult executionResult = executeStatement(statement);
 
-    // TODO: throw exception
     int statusCode = executionResult.status.getCode();
     if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return Collections.emptyList();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index af877f0..7c620a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -35,6 +35,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 public class FakeSchemaFetcherImpl implements ISchemaFetcher {
 
@@ -48,7 +49,10 @@
 
   @Override
   public ISchemaTree fetchSchemaWithAutoCreate(
-      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
+      PartialPath devicePath,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      boolean aligned) {
     return schemaTree;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
index 331acd6..e175f59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
@@ -28,6 +28,7 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 /**
  * This interface is used to fetch the metadata information required in execution plan generating.
@@ -37,7 +38,10 @@
   ISchemaTree fetchSchema(PathPatternTree patternTree);
 
   ISchemaTree fetchSchemaWithAutoCreate(
-      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned);
+      PartialPath devicePath,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      boolean aligned);
 
   ISchemaTree fetchSchemaListWithAutoCreate(
       List<PartialPath> devicePath,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
index 686a241..e14fbf1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
@@ -48,7 +48,7 @@
           SCHEMA_FETCHER.fetchSchemaWithAutoCreate(
               insertNode.getDevicePath(),
               insertNode.getMeasurements(),
-              insertNode.getDataTypes(),
+              insertNode::getDataType,
               insertNode.isAligned());
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index eea35fc..4e9cc1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -42,6 +42,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 public class StandaloneSchemaFetcher implements ISchemaFetcher {
 
@@ -81,21 +82,27 @@
 
   @Override
   public ISchemaTree fetchSchemaWithAutoCreate(
-      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
+      PartialPath devicePath,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      boolean aligned) {
     DeviceSchemaInfo deviceSchemaInfo =
-        getDeviceSchemaInfoWithAutoCreate(devicePath, measurements, tsDataTypes, aligned);
+        getDeviceSchemaInfoWithAutoCreate(devicePath, measurements, getDataType, aligned);
     DeviceGroupSchemaTree schemaTree = new DeviceGroupSchemaTree();
     schemaTree.addDeviceInfo(deviceSchemaInfo);
     return schemaTree;
   }
 
   private DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
-      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
+      PartialPath devicePath,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      boolean aligned) {
     try {
       SchemaRegionId schemaRegionId = localConfigNode.getBelongedSchemaRegionId(devicePath);
       ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
       return schemaRegion.getDeviceSchemaInfoWithAutoCreate(
-          devicePath, measurements, tsDataTypes, aligned);
+          devicePath, measurements, getDataType, aligned);
     } catch (MetadataException e) {
       throw new RuntimeException(e);
     }
@@ -142,7 +149,8 @@
       }
 
       schemaTree.addDeviceInfo(
-          getDeviceSchemaInfoWithAutoCreate(entry.getKey(), measurements, tsDataTypes, isAligned));
+          getDeviceSchemaInfoWithAutoCreate(
+              entry.getKey(), measurements, index -> tsDataTypes[index], isAligned));
     }
 
     return schemaTree;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index d9808da..884e693 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -137,6 +137,10 @@
     return dataTypes;
   }
 
+  public TSDataType getDataType(int index) {
+    return dataTypes[index];
+  }
+
   public void setDataTypes(TSDataType[] dataTypes) {
     this.dataTypes = dataTypes;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index 22ee2b1..42ea49dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -139,6 +139,15 @@
     return dataTypes;
   }
 
+  @Override
+  public TSDataType getDataType(int index) {
+    if (isNeedInferType) {
+      return TypeInferenceUtils.getPredictedDataType(values[index], true);
+    } else {
+      return dataTypes[index];
+    }
+  }
+
   public Object[] getValues() {
     return values;
   }