[fix](httpv2) make http v2 and v1 interface compatible (#7848)

http v2 TableSchemaAction adds the return value of aggregation_type,
and modifies the corresponding code of Flink/Spark Connector
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
index 13bde01..5c64556 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
@@ -33,7 +33,7 @@
      */
     public static Schema convertToSchema(List<TScanColumnDesc> tscanColumnDescs) {
         Schema schema = new Schema(tscanColumnDescs.size());
-        tscanColumnDescs.stream().forEach(desc -> schema.put(new Field(desc.getName(), desc.getType().name(), "", 0, 0)));
+        tscanColumnDescs.stream().forEach(desc -> schema.put(new Field(desc.getName(), desc.getType().name(), "", 0, 0, "")));
         return schema;
     }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
index 9a58180..04341bf 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
@@ -25,16 +25,26 @@
     private String comment;
     private int precision;
     private int scale;
+    private String aggregation_type;
 
     public Field() {
     }
 
-    public Field(String name, String type, String comment, int precision, int scale) {
+    public Field(String name, String type, String comment, int precision, int scale, String aggregation_type) {
         this.name = name;
         this.type = type;
         this.comment = comment;
         this.precision = precision;
         this.scale = scale;
+        this.aggregation_type = aggregation_type;
+    }
+
+    public String getAggregation_type() {
+        return aggregation_type;
+    }
+
+    public void setAggregation_type(String aggregation_type) {
+        this.aggregation_type = aggregation_type;
     }
 
     public String getName() {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
index e274352..264e736 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
@@ -58,8 +58,8 @@
         this.properties = properties;
     }
 
-    public void put(String name, String type, String comment, int scale, int precision) {
-        properties.add(new Field(name, type, comment, scale, precision));
+    public void put(String name, String type, String comment, int scale, int precision, String aggregation_type) {
+        properties.add(new Field(name, type, comment, scale, precision, aggregation_type));
     }
 
     public void put(Field f) {
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index ac19066..0f45aaa 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -232,8 +232,8 @@
                 + "\"name\":\"k4\",\"comment\":\"\"},{\"type\":\"FLOAT\",\"name\":\"k9\",\"comment\":\"\"},"
                 + "{\"type\":\"DOUBLE\",\"name\":\"k8\",\"comment\":\"\"},{\"type\":\"DATE\",\"name\":\"k10\","
                 + "\"comment\":\"\"},{\"type\":\"DATETIME\",\"name\":\"k11\",\"comment\":\"\"},"
-                + "{\"name\":\"k5\",\"scale\":\"9\",\"comment\":\"\","
-                + "\"type\":\"DECIMAL\",\"precision\":\"2\"},{\"type\":\"CHAR\",\"name\":\"k6\",\"comment\":\"\"}],"
+                + "{\"name\":\"k5\",\"scale\":\"0\",\"comment\":\"\","
+                + "\"type\":\"DECIMAL\",\"precision\":\"9\",\"aggregation_type\":\"\"},{\"type\":\"CHAR\",\"name\":\"k6\",\"comment\":\"\",\"aggregation_type\":\"REPLACE_IF_NOT_NULL\"}],"
                 + "\"status\":200}";
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Field.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Field.java
index 12cdab9..53c622b 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Field.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Field.java
@@ -26,14 +26,25 @@
     private int precision;
     private int scale;
 
+    private String aggregation_type;
+
     public Field() { }
 
-    public Field(String name, String type, String comment, int precision, int scale) {
+    public Field(String name, String type, String comment, int precision, int scale, String aggregation_type) {
         this.name = name;
         this.type = type;
         this.comment = comment;
         this.precision = precision;
         this.scale = scale;
+        this.aggregation_type = aggregation_type;
+    }
+
+    public String getAggregation_type() {
+        return aggregation_type;
+    }
+
+    public void setAggregation_type(String aggregation_type) {
+        this.aggregation_type = aggregation_type;
     }
 
     public String getName() {
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java
index 285fe42..586a8ac 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java
@@ -49,8 +49,8 @@
         this.properties = properties;
     }
 
-    public void put(String name, String type, String comment, int scale, int precision) {
-        properties.add(new Field(name, type, comment, scale, precision));
+    public void put(String name, String type, String comment, int scale, int precision, String aggregation_type) {
+        properties.add(new Field(name, type, comment, scale, precision, aggregation_type));
     }
 
     public void put(Field f) {
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index ffba27c..f595092 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -103,7 +103,7 @@
    */
   def convertToSchema(tscanColumnDescs: Seq[TScanColumnDesc]): Schema = {
     val schema = new Schema(tscanColumnDescs.length)
-    tscanColumnDescs.foreach(desc => schema.put(new Field(desc.getName, desc.getType.name, "", 0, 0)))
+    tscanColumnDescs.foreach(desc => schema.put(new Field(desc.getName, desc.getType.name, "", 0, 0, "")))
     schema
   }
 }
diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
index 5862dd0..8004590 100644
--- a/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
+++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
@@ -120,12 +120,12 @@
 
     @Test
     public void testFeResponseToSchema() throws Exception {
-        String res = "{\"properties\":[{\"type\":\"TINYINT\",\"name\":\"k1\",\"comment\":\"\"},{\"name\":\"k5\","
-                + "\"scale\":\"0\",\"comment\":\"\",\"type\":\"DECIMALV2\",\"precision\":\"9\"}],\"status\":200}";
+        String res = "{\"properties\":[{\"type\":\"TINYINT\",\"name\":\"k1\",\"comment\":\"\",\"aggregation_type\":\"\"},{\"name\":\"k5\","
+                + "\"scale\":\"0\",\"comment\":\"\",\"type\":\"DECIMALV2\",\"precision\":\"9\",\"aggregation_type\":\"\"}],\"status\":200}";
         Schema expected = new Schema();
         expected.setStatus(200);
-        Field k1 = new Field("k1", "TINYINT", "", 0, 0);
-        Field k5 = new Field("k5", "DECIMALV2", "", 9, 0);
+        Field k1 = new Field("k1", "TINYINT", "", 0, 0, "");
+        Field k5 = new Field("k5", "DECIMALV2", "", 9, 0, "");
         expected.put(k1);
         expected.put(k5);
         Assert.assertEquals(expected, RestService.parseSchema(res, logger));
diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
index ad0f150..ff65480 100644
--- a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
+++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
@@ -235,7 +235,7 @@
                 + "{\"type\":\"DOUBLE\",\"name\":\"k8\",\"comment\":\"\"},{\"type\":\"DATE\",\"name\":\"k10\","
                 + "\"comment\":\"\"},{\"type\":\"DATETIME\",\"name\":\"k11\",\"comment\":\"\"},"
                 + "{\"name\":\"k5\",\"scale\":\"0\",\"comment\":\"\","
-                + "\"type\":\"DECIMAL\",\"precision\":\"9\"},{\"type\":\"CHAR\",\"name\":\"k6\",\"comment\":\"\"}],"
+                + "\"type\":\"DECIMAL\",\"precision\":\"9\",\"aggregation_type\":\"\"},{\"type\":\"CHAR\",\"name\":\"k6\",\"comment\":\"\",\"aggregation_type\":\"REPLACE_IF_NOT_NULL\"}],"
                 + "\"status\":200}";
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
index 57df27f..97bbe0e 100644
--- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
+++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
@@ -31,8 +31,8 @@
   def testConvertToStruct(): Unit = {
     val schema = new Schema
     schema.setStatus(200)
-    val k1 = new Field("k1", "TINYINT", "", 0, 0)
-    val k5 = new Field("k5", "BIGINT", "", 0, 0)
+    val k1 = new Field("k1", "TINYINT", "", 0, 0, "")
+    val k5 = new Field("k5", "BIGINT", "", 0, 0, "")
     schema.put(k1)
     schema.put(k5)
 
@@ -84,8 +84,8 @@
 
     val expected = new Schema
     expected.setStatus(0)
-    val ek1 = new Field("k1", "BOOLEAN", "", 0, 0)
-    val ek2 = new Field("k2", "DOUBLE", "", 0, 0)
+    val ek1 = new Field("k1", "BOOLEAN", "", 0, 0, "")
+    val ek2 = new Field("k2", "DOUBLE", "", 0, 0, "")
     expected.put(ek1)
     expected.put(ek2)