[fix](httpv2) make http v2 and v1 interface compatible (#7848)
http v2 TableSchemaAction adds the return value of aggregation_type,
and modifies the corresponding code of Flink/Spark Connector
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
index 13bde01..5c64556 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
@@ -33,7 +33,7 @@
*/
public static Schema convertToSchema(List<TScanColumnDesc> tscanColumnDescs) {
Schema schema = new Schema(tscanColumnDescs.size());
- tscanColumnDescs.stream().forEach(desc -> schema.put(new Field(desc.getName(), desc.getType().name(), "", 0, 0)));
+ tscanColumnDescs.stream().forEach(desc -> schema.put(new Field(desc.getName(), desc.getType().name(), "", 0, 0, "")));
return schema;
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
index 9a58180..04341bf 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
@@ -25,16 +25,26 @@
private String comment;
private int precision;
private int scale;
+ private String aggregation_type;
public Field() {
}
- public Field(String name, String type, String comment, int precision, int scale) {
+ public Field(String name, String type, String comment, int precision, int scale, String aggregation_type) {
this.name = name;
this.type = type;
this.comment = comment;
this.precision = precision;
this.scale = scale;
+ this.aggregation_type = aggregation_type;
+ }
+
+ public String getAggregation_type() {
+ return aggregation_type;
+ }
+
+ public void setAggregation_type(String aggregation_type) {
+ this.aggregation_type = aggregation_type;
}
public String getName() {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
index e274352..264e736 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
@@ -58,8 +58,8 @@
this.properties = properties;
}
- public void put(String name, String type, String comment, int scale, int precision) {
- properties.add(new Field(name, type, comment, scale, precision));
+ public void put(String name, String type, String comment, int scale, int precision, String aggregation_type) {
+ properties.add(new Field(name, type, comment, scale, precision, aggregation_type));
}
public void put(Field f) {
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index ac19066..0f45aaa 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -232,8 +232,8 @@
+ "\"name\":\"k4\",\"comment\":\"\"},{\"type\":\"FLOAT\",\"name\":\"k9\",\"comment\":\"\"},"
+ "{\"type\":\"DOUBLE\",\"name\":\"k8\",\"comment\":\"\"},{\"type\":\"DATE\",\"name\":\"k10\","
+ "\"comment\":\"\"},{\"type\":\"DATETIME\",\"name\":\"k11\",\"comment\":\"\"},"
- + "{\"name\":\"k5\",\"scale\":\"9\",\"comment\":\"\","
- + "\"type\":\"DECIMAL\",\"precision\":\"2\"},{\"type\":\"CHAR\",\"name\":\"k6\",\"comment\":\"\"}],"
+ + "{\"name\":\"k5\",\"scale\":\"0\",\"comment\":\"\","
+ + "\"type\":\"DECIMAL\",\"precision\":\"9\",\"aggregation_type\":\"\"},{\"type\":\"CHAR\",\"name\":\"k6\",\"comment\":\"\",\"aggregation_type\":\"REPLACE_IF_NOT_NULL\"}],"
+ "\"status\":200}";
Schema schema = RestService.parseSchema(schemaStr, logger);
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Field.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Field.java
index 12cdab9..53c622b 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Field.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Field.java
@@ -26,14 +26,25 @@
private int precision;
private int scale;
+ private String aggregation_type;
+
public Field() { }
- public Field(String name, String type, String comment, int precision, int scale) {
+ public Field(String name, String type, String comment, int precision, int scale, String aggregation_type) {
this.name = name;
this.type = type;
this.comment = comment;
this.precision = precision;
this.scale = scale;
+ this.aggregation_type = aggregation_type;
+ }
+
+ public String getAggregation_type() {
+ return aggregation_type;
+ }
+
+ public void setAggregation_type(String aggregation_type) {
+ this.aggregation_type = aggregation_type;
}
public String getName() {
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java
index 285fe42..586a8ac 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java
@@ -49,8 +49,8 @@
this.properties = properties;
}
- public void put(String name, String type, String comment, int scale, int precision) {
- properties.add(new Field(name, type, comment, scale, precision));
+ public void put(String name, String type, String comment, int scale, int precision, String aggregation_type) {
+ properties.add(new Field(name, type, comment, scale, precision, aggregation_type));
}
public void put(Field f) {
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index ffba27c..f595092 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -103,7 +103,7 @@
*/
def convertToSchema(tscanColumnDescs: Seq[TScanColumnDesc]): Schema = {
val schema = new Schema(tscanColumnDescs.length)
- tscanColumnDescs.foreach(desc => schema.put(new Field(desc.getName, desc.getType.name, "", 0, 0)))
+ tscanColumnDescs.foreach(desc => schema.put(new Field(desc.getName, desc.getType.name, "", 0, 0, "")))
schema
}
}
diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
index 5862dd0..8004590 100644
--- a/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
+++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
@@ -120,12 +120,12 @@
@Test
public void testFeResponseToSchema() throws Exception {
- String res = "{\"properties\":[{\"type\":\"TINYINT\",\"name\":\"k1\",\"comment\":\"\"},{\"name\":\"k5\","
- + "\"scale\":\"0\",\"comment\":\"\",\"type\":\"DECIMALV2\",\"precision\":\"9\"}],\"status\":200}";
+ String res = "{\"properties\":[{\"type\":\"TINYINT\",\"name\":\"k1\",\"comment\":\"\",\"aggregation_type\":\"\"},{\"name\":\"k5\","
+ + "\"scale\":\"0\",\"comment\":\"\",\"type\":\"DECIMALV2\",\"precision\":\"9\",\"aggregation_type\":\"\"}],\"status\":200}";
Schema expected = new Schema();
expected.setStatus(200);
- Field k1 = new Field("k1", "TINYINT", "", 0, 0);
- Field k5 = new Field("k5", "DECIMALV2", "", 9, 0);
+ Field k1 = new Field("k1", "TINYINT", "", 0, 0, "");
+ Field k5 = new Field("k5", "DECIMALV2", "", 9, 0, "");
expected.put(k1);
expected.put(k5);
Assert.assertEquals(expected, RestService.parseSchema(res, logger));
diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
index ad0f150..ff65480 100644
--- a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
+++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
@@ -235,7 +235,7 @@
+ "{\"type\":\"DOUBLE\",\"name\":\"k8\",\"comment\":\"\"},{\"type\":\"DATE\",\"name\":\"k10\","
+ "\"comment\":\"\"},{\"type\":\"DATETIME\",\"name\":\"k11\",\"comment\":\"\"},"
+ "{\"name\":\"k5\",\"scale\":\"0\",\"comment\":\"\","
- + "\"type\":\"DECIMAL\",\"precision\":\"9\"},{\"type\":\"CHAR\",\"name\":\"k6\",\"comment\":\"\"}],"
+ + "\"type\":\"DECIMAL\",\"precision\":\"9\",\"aggregation_type\":\"\"},{\"type\":\"CHAR\",\"name\":\"k6\",\"comment\":\"\",\"aggregation_type\":\"REPLACE_IF_NOT_NULL\"}],"
+ "\"status\":200}";
Schema schema = RestService.parseSchema(schemaStr, logger);
diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
index 57df27f..97bbe0e 100644
--- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
+++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
@@ -31,8 +31,8 @@
def testConvertToStruct(): Unit = {
val schema = new Schema
schema.setStatus(200)
- val k1 = new Field("k1", "TINYINT", "", 0, 0)
- val k5 = new Field("k5", "BIGINT", "", 0, 0)
+ val k1 = new Field("k1", "TINYINT", "", 0, 0, "")
+ val k5 = new Field("k5", "BIGINT", "", 0, 0, "")
schema.put(k1)
schema.put(k5)
@@ -84,8 +84,8 @@
val expected = new Schema
expected.setStatus(0)
- val ek1 = new Field("k1", "BOOLEAN", "", 0, 0)
- val ek2 = new Field("k2", "DOUBLE", "", 0, 0)
+ val ek1 = new Field("k1", "BOOLEAN", "", 0, 0, "")
+ val ek2 = new Field("k2", "DOUBLE", "", 0, 0, "")
expected.put(ek1)
expected.put(ek2)