[improve] support read variant type (#363)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index a8014d0..beaa5b9 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -67,7 +67,7 @@
     </mailingLists>
 
     <properties>
-        <revision>1.6.0-SNAPSHOT</revision>
+        <revision>1.6.1-SNAPSHOT</revision>
         <flink.version>1.18.0</flink.version>
         <flink.major.version>1.18</flink.major.version>
         <flink.sql.cdc.version>2.4.2</flink.sql.cdc.version>
@@ -79,7 +79,7 @@
         <maven-source-plugin.version>3.2.1</maven-source-plugin.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.scm.id>github</project.scm.id>
-        <thrift-service.version>1.0.0</thrift-service.version>
+        <thrift-service.version>1.0.1</thrift-service.version>
         <checkstyle.version>8.14</checkstyle.version>
         <maven-checkstyle-plugin.version>2.17</maven-checkstyle-plugin.version>
         <spotless.version>2.4.2</spotless.version>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index ed649ca..c6dfa0e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -378,6 +378,7 @@
             case "VARCHAR":
             case "STRING":
             case "JSONB":
+            case "VARIANT":
                 if (!minorType.equals(Types.MinorType.VARCHAR)) {
                     return false;
                 }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 6b29078..481004b 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -885,4 +885,74 @@
         thrown.expectMessage(startsWith("Get row offset:"));
         rowBatch.next();
     }
+
+    @Test
+    public void testVariant() throws DorisException, IOException {
+
+        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+        childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null));
+
+        VectorSchemaRoot root =
+                VectorSchemaRoot.create(
+                        new org.apache.arrow.vector.types.pojo.Schema(
+                                childrenBuilder.build(), null),
+                        new RootAllocator(Integer.MAX_VALUE));
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ArrowStreamWriter arrowStreamWriter =
+                new ArrowStreamWriter(
+                        root, new DictionaryProvider.MapDictionaryProvider(), outputStream);
+
+        arrowStreamWriter.start();
+        root.setRowCount(3);
+
+        FieldVector vector = root.getVector("k1");
+        VarCharVector datetimeVector = (VarCharVector) vector;
+        datetimeVector.setInitialCapacity(3);
+        datetimeVector.allocateNew();
+        datetimeVector.setIndexDefined(0);
+        datetimeVector.setValueLengthSafe(0, 20);
+        datetimeVector.setSafe(0, "{\"id\":\"a\"}".getBytes());
+        datetimeVector.setIndexDefined(1);
+        datetimeVector.setValueLengthSafe(1, 20);
+        datetimeVector.setSafe(1, "1000".getBytes());
+        datetimeVector.setIndexDefined(2);
+        datetimeVector.setValueLengthSafe(2, 20);
+        datetimeVector.setSafe(2, "123.456".getBytes());
+        vector.setValueCount(3);
+
+        arrowStreamWriter.writeBatch();
+
+        arrowStreamWriter.end();
+        arrowStreamWriter.close();
+
+        TStatus status = new TStatus();
+        status.setStatusCode(TStatusCode.OK);
+        TScanBatchResult scanBatchResult = new TScanBatchResult();
+        scanBatchResult.setStatus(status);
+        scanBatchResult.setEos(false);
+        scanBatchResult.setRows(outputStream.toByteArray());
+
+        String schemaStr =
+                "{\"properties\":[{\"type\":\"VARIANT\",\"name\":\"k\",\"comment\":\"\"}"
+                        + "], \"status\":200}";
+
+        Schema schema = RestService.parseSchema(schemaStr, logger);
+
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow0 = rowBatch.next();
+        Assert.assertEquals("{\"id\":\"a\"}", actualRow0.get(0));
+
+        List<Object> actualRow1 = rowBatch.next();
+        Assert.assertEquals("1000", actualRow1.get(0));
+
+        List<Object> actualRow2 = rowBatch.next();
+        Assert.assertEquals("123.456", actualRow2.get(0));
+
+        Assert.assertFalse(rowBatch.hasNext());
+        thrown.expect(NoSuchElementException.class);
+        thrown.expectMessage(startsWith("Get row offset:"));
+        rowBatch.next();
+    }
 }