[feature] support variant type (#197)
diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index f5dce80..fef519d 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -77,7 +77,7 @@
<project.scm.id>github</project.scm.id>
<netty.version>4.1.77.Final</netty.version>
<fasterxml.jackson.version>2.13.5</fasterxml.jackson.version>
- <thrift-service.version>1.0.0</thrift-service.version>
+ <thrift-service.version>1.0.1</thrift-service.version>
<testcontainers.version>1.17.6</testcontainers.version>
</properties>
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
index 7c28f76..8fd84d3 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
@@ -379,6 +379,7 @@
case "VARCHAR":
case "STRING":
case "JSONB":
+ case "VARIANT":
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
typeMismatchMessage(currentType, mt));
VarCharVector varCharVector = (VarCharVector) curFieldVector;
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index e298349..914190a 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -129,6 +129,7 @@
case "ARRAY" => DataTypes.StringType
case "MAP" => MapType(DataTypes.StringType, DataTypes.StringType)
case "STRUCT" => DataTypes.StringType
+ case "VARIANT" => DataTypes.StringType
case "HLL" =>
throw new DorisException("Unsupported type " + dorisType)
case _ =>
diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
index 1cf4136..348895d 100644
--- a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
+++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
@@ -850,4 +850,75 @@
}
+ @Test
+ public void testVariant() throws DorisException, IOException {
+
+ ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+ childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null));
+
+ VectorSchemaRoot root = VectorSchemaRoot.create(
+ new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
+ new RootAllocator(Integer.MAX_VALUE));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(
+ root,
+ new DictionaryProvider.MapDictionaryProvider(),
+ outputStream);
+
+ arrowStreamWriter.start();
+ root.setRowCount(3);
+
+ FieldVector vector = root.getVector("k1");
+ VarCharVector datetimeVector = (VarCharVector)vector;
+ datetimeVector.setInitialCapacity(3);
+ datetimeVector.allocateNew();
+ datetimeVector.setIndexDefined(0);
+ datetimeVector.setValueLengthSafe(0, 20);
+ datetimeVector.setSafe(0, "{\"id\":\"a\"}".getBytes());
+ datetimeVector.setIndexDefined(1);
+ datetimeVector.setValueLengthSafe(1, 20);
+ datetimeVector.setSafe(1, "1000".getBytes());
+ datetimeVector.setIndexDefined(2);
+ datetimeVector.setValueLengthSafe(2, 20);
+ datetimeVector.setSafe(2, "123.456".getBytes());
+ vector.setValueCount(3);
+
+ arrowStreamWriter.writeBatch();
+
+ arrowStreamWriter.end();
+ arrowStreamWriter.close();
+
+ TStatus status = new TStatus();
+ status.setStatusCode(TStatusCode.OK);
+ TScanBatchResult scanBatchResult = new TScanBatchResult();
+ scanBatchResult.setStatus(status);
+ scanBatchResult.setEos(false);
+ scanBatchResult.setRows(outputStream.toByteArray());
+
+
+ String schemaStr = "{\"properties\":[" +
+ "{\"type\":\"VARIANT\",\"name\":\"k\",\"comment\":\"\"}" +
+ "], \"status\":200}";
+
+ Schema schema = RestService.parseSchema(schemaStr, logger);
+
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow0 = rowBatch.next();
+ Assert.assertEquals("{\"id\":\"a\"}", actualRow0.get(0));
+
+ List<Object> actualRow1 = rowBatch.next();
+ Assert.assertEquals("1000", actualRow1.get(0));
+
+ List<Object> actualRow2 = rowBatch.next();
+ Assert.assertEquals("123.456", actualRow2.get(0));
+
+ Assert.assertFalse(rowBatch.hasNext());
+ thrown.expect(NoSuchElementException.class);
+ thrown.expectMessage(startsWith("Get row offset:"));
+ rowBatch.next();
+
+ }
+
}