| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.doris.flink.serialization; |
| |
| import org.apache.flink.table.data.DecimalData; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import org.apache.arrow.memory.ArrowBuf; |
| import org.apache.arrow.memory.RootAllocator; |
| import org.apache.arrow.vector.BigIntVector; |
| import org.apache.arrow.vector.BitVector; |
| import org.apache.arrow.vector.DateDayVector; |
| import org.apache.arrow.vector.DecimalVector; |
| import org.apache.arrow.vector.FieldVector; |
| import org.apache.arrow.vector.FixedSizeBinaryVector; |
| import org.apache.arrow.vector.Float4Vector; |
| import org.apache.arrow.vector.Float8Vector; |
| import org.apache.arrow.vector.IntVector; |
| import org.apache.arrow.vector.SmallIntVector; |
| import org.apache.arrow.vector.TimeStampMicroVector; |
| import org.apache.arrow.vector.TinyIntVector; |
| import org.apache.arrow.vector.VarBinaryVector; |
| import org.apache.arrow.vector.VarCharVector; |
| import org.apache.arrow.vector.VectorSchemaRoot; |
| import org.apache.arrow.vector.complex.MapVector; |
| import org.apache.arrow.vector.complex.StructVector; |
| import org.apache.arrow.vector.complex.impl.NullableStructWriter; |
| import org.apache.arrow.vector.complex.impl.UnionMapWriter; |
| import org.apache.arrow.vector.dictionary.DictionaryProvider; |
| import org.apache.arrow.vector.ipc.ArrowStreamWriter; |
| import org.apache.arrow.vector.types.DateUnit; |
| import org.apache.arrow.vector.types.FloatingPointPrecision; |
| import org.apache.arrow.vector.types.TimeUnit; |
| import org.apache.arrow.vector.types.pojo.ArrowType; |
| import org.apache.arrow.vector.types.pojo.Field; |
| import org.apache.arrow.vector.types.pojo.FieldType; |
| import org.apache.arrow.vector.util.Text; |
| import org.apache.commons.lang3.ArrayUtils; |
| import org.apache.doris.flink.exception.DorisException; |
| import org.apache.doris.flink.rest.RestService; |
| import org.apache.doris.flink.rest.models.Schema; |
| import org.apache.doris.sdk.thrift.TScanBatchResult; |
| import org.apache.doris.sdk.thrift.TStatus; |
| import org.apache.doris.sdk.thrift.TStatusCode; |
| import org.junit.Assert; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.math.BigDecimal; |
| import java.math.BigInteger; |
| import java.nio.charset.StandardCharsets; |
| import java.time.LocalDate; |
| import java.time.LocalDateTime; |
| import java.time.ZoneId; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.NoSuchElementException; |
| |
| import static org.hamcrest.core.StringStartsWith.startsWith; |
| |
| public class TestRowBatch { |
| private static Logger logger = LoggerFactory.getLogger(TestRowBatch.class); |
| |
| @Rule public ExpectedException thrown = ExpectedException.none(); |
| |
| @Test |
| public void testRowBatch() throws Exception { |
| // schema |
| List<Field> childrenBuilder = new ArrayList<>(); |
| childrenBuilder.add(new Field("k0", FieldType.nullable(new ArrowType.Bool()), null)); |
| childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Int(8, true)), null)); |
| childrenBuilder.add(new Field("k2", FieldType.nullable(new ArrowType.Int(16, true)), null)); |
| childrenBuilder.add(new Field("k3", FieldType.nullable(new ArrowType.Int(32, true)), null)); |
| childrenBuilder.add(new Field("k4", FieldType.nullable(new ArrowType.Int(64, true)), null)); |
| childrenBuilder.add( |
| new Field( |
| "k9", |
| FieldType.nullable( |
| new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), |
| null)); |
| childrenBuilder.add( |
| new Field( |
| "k8", |
| FieldType.nullable( |
| new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), |
| null)); |
| childrenBuilder.add(new Field("k10", FieldType.nullable(new ArrowType.Utf8()), null)); |
| childrenBuilder.add(new Field("k11", FieldType.nullable(new ArrowType.Utf8()), null)); |
| childrenBuilder.add(new Field("k5", FieldType.nullable(new ArrowType.Decimal(9, 2)), null)); |
| childrenBuilder.add(new Field("k6", FieldType.nullable(new ArrowType.Utf8()), null)); |
| |
| VectorSchemaRoot root = |
| VectorSchemaRoot.create( |
| new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null), |
| new RootAllocator(Integer.MAX_VALUE)); |
| ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
| ArrowStreamWriter arrowStreamWriter = |
| new ArrowStreamWriter( |
| root, new DictionaryProvider.MapDictionaryProvider(), outputStream); |
| |
| arrowStreamWriter.start(); |
| root.setRowCount(3); |
| |
| FieldVector vector = root.getVector("k0"); |
| BitVector bitVector = (BitVector) vector; |
| bitVector.setInitialCapacity(3); |
| bitVector.allocateNew(3); |
| bitVector.setSafe(0, 1); |
| bitVector.setSafe(1, 0); |
| bitVector.setSafe(2, 1); |
| vector.setValueCount(3); |
| |
| vector = root.getVector("k1"); |
| TinyIntVector tinyIntVector = (TinyIntVector) vector; |
| tinyIntVector.setInitialCapacity(3); |
| tinyIntVector.allocateNew(3); |
| tinyIntVector.setSafe(0, 1); |
| tinyIntVector.setSafe(1, 2); |
| tinyIntVector.setSafe(2, 3); |
| vector.setValueCount(3); |
| |
| vector = root.getVector("k2"); |
| SmallIntVector smallIntVector = (SmallIntVector) vector; |
| smallIntVector.setInitialCapacity(3); |
| smallIntVector.allocateNew(3); |
| smallIntVector.setSafe(0, 1); |
| smallIntVector.setSafe(1, 2); |
| smallIntVector.setSafe(2, 3); |
| vector.setValueCount(3); |
| |
| vector = root.getVector("k3"); |
| IntVector intVector = (IntVector) vector; |
| intVector.setInitialCapacity(3); |
| intVector.allocateNew(3); |
| intVector.setSafe(0, 1); |
| intVector.setNull(1); |
| intVector.setSafe(2, 3); |
| vector.setValueCount(3); |
| |
| vector = root.getVector("k4"); |
| BigIntVector bigIntVector = (BigIntVector) vector; |
| bigIntVector.setInitialCapacity(3); |
| bigIntVector.allocateNew(3); |
| bigIntVector.setSafe(0, 1); |
| bigIntVector.setSafe(1, 2); |
| bigIntVector.setSafe(2, 3); |
| vector.setValueCount(3); |
| |
| vector = root.getVector("k5"); |
| DecimalVector decimalVector = (DecimalVector) vector; |
| decimalVector.setInitialCapacity(3); |
| decimalVector.allocateNew(); |
| decimalVector.setIndexDefined(0); |
| decimalVector.setSafe(0, new BigDecimal("12.34")); |
| decimalVector.setIndexDefined(1); |
| decimalVector.setSafe(1, new BigDecimal("88.88")); |
| decimalVector.setIndexDefined(2); |
| decimalVector.setSafe(2, new BigDecimal("10.22")); |
| vector.setValueCount(3); |
| |
| vector = root.getVector("k6"); |
| VarCharVector charVector = (VarCharVector) vector; |
| charVector.setInitialCapacity(3); |
| charVector.allocateNew(); |
| charVector.setIndexDefined(0); |
| charVector.setValueLengthSafe(0, 5); |
| charVector.setSafe(0, "char1".getBytes()); |
| charVector.setIndexDefined(1); |
| charVector.setValueLengthSafe(1, 5); |
| charVector.setSafe(1, "char2".getBytes()); |
| charVector.setIndexDefined(2); |
| charVector.setValueLengthSafe(2, 5); |
| charVector.setSafe(2, "char3".getBytes()); |
| vector.setValueCount(3); |
| |
| vector = root.getVector("k8"); |
| Float8Vector float8Vector = (Float8Vector) vector; |
| float8Vector.setInitialCapacity(3); |
| float8Vector.allocateNew(3); |
| float8Vector.setSafe(0, 1.1); |
| float8Vector.setSafe(1, 2.2); |
| float8Vector.setSafe(2, 3.3); |
| vector.setValueCount(3); |
| |
| vector = root.getVector("k9"); |
| Float4Vector float4Vector = (Float4Vector) vector; |
| float4Vector.setInitialCapacity(3); |
| float4Vector.allocateNew(3); |
| float4Vector.setSafe(0, 1.1f); |
| float4Vector.setSafe(1, 2.2f); |
| float4Vector.setSafe(2, 3.3f); |
| vector.setValueCount(3); |
| |
| vector = root.getVector("k10"); |
| VarCharVector datecharVector = (VarCharVector) vector; |
| datecharVector.setInitialCapacity(3); |
| datecharVector.allocateNew(); |
| datecharVector.setIndexDefined(0); |
| datecharVector.setValueLengthSafe(0, 5); |
| datecharVector.setSafe(0, "2008-08-08".getBytes()); |
| datecharVector.setIndexDefined(1); |
| datecharVector.setValueLengthSafe(1, 5); |
| datecharVector.setSafe(1, "1900-08-08".getBytes()); |
| datecharVector.setIndexDefined(2); |
| datecharVector.setValueLengthSafe(2, 5); |
| datecharVector.setSafe(2, "2100-08-08".getBytes()); |
| vector.setValueCount(3); |
| |
| vector = root.getVector("k11"); |
| VarCharVector timecharVector = (VarCharVector) vector; |
| timecharVector.setInitialCapacity(3); |
| timecharVector.allocateNew(); |
| timecharVector.setIndexDefined(0); |
| timecharVector.setValueLengthSafe(0, 5); |
| timecharVector.setSafe(0, "2008-08-08 00:00:00".getBytes()); |
| timecharVector.setIndexDefined(1); |
| timecharVector.setValueLengthSafe(1, 5); |
| timecharVector.setSafe(1, "1900-08-08 00:00:00".getBytes()); |
| timecharVector.setIndexDefined(2); |
| timecharVector.setValueLengthSafe(2, 5); |
| timecharVector.setSafe(2, "2100-08-08 00:00:00".getBytes()); |
| vector.setValueCount(3); |
| |
| arrowStreamWriter.writeBatch(); |
| |
| arrowStreamWriter.end(); |
| arrowStreamWriter.close(); |
| |
| TStatus status = new TStatus(); |
| status.setStatusCode(TStatusCode.OK); |
| TScanBatchResult scanBatchResult = new TScanBatchResult(); |
| scanBatchResult.setStatus(status); |
| scanBatchResult.setEos(false); |
| scanBatchResult.setRows(outputStream.toByteArray()); |
| |
| String schemaStr = |
| "{\"properties\":[{\"type\":\"BOOLEAN\",\"name\":\"k0\",\"comment\":\"\"}," |
| + "{\"type\":\"TINYINT\",\"name\":\"k1\",\"comment\":\"\"},{\"type\":\"SMALLINT\",\"name\":\"k2\"," |
| + "\"comment\":\"\"},{\"type\":\"INT\",\"name\":\"k3\",\"comment\":\"\"},{\"type\":\"BIGINT\"," |
| + "\"name\":\"k4\",\"comment\":\"\"},{\"type\":\"FLOAT\",\"name\":\"k9\",\"comment\":\"\"}," |
| + "{\"type\":\"DOUBLE\",\"name\":\"k8\",\"comment\":\"\"},{\"type\":\"DATE\",\"name\":\"k10\"," |
| + "\"comment\":\"\"},{\"type\":\"DATETIME\",\"name\":\"k11\",\"comment\":\"\"}," |
| + "{\"name\":\"k5\",\"scale\":\"0\",\"comment\":\"\"," |
| + "\"type\":\"DECIMAL\",\"precision\":\"9\",\"aggregation_type\":\"\"},{\"type\":\"CHAR\",\"name\":\"k6\",\"comment\":\"\",\"aggregation_type\":\"REPLACE_IF_NOT_NULL\"}]," |
| + "\"status\":200}"; |
| |
| Schema schema = RestService.parseSchema(schemaStr, logger); |
| |
| RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); |
| |
| List<Object> expectedRow1 = |
| Arrays.asList( |
| Boolean.TRUE, |
| (byte) 1, |
| (short) 1, |
| 1, |
| 1L, |
| (float) 1.1, |
| (double) 1.1, |
| LocalDate.of(2008, 8, 8), |
| LocalDateTime.of(2008, 8, 8, 0, 0, 0), |
| DecimalData.fromBigDecimal(new BigDecimal(12.34), 4, 2), |
| "char1"); |
| |
| List<Object> expectedRow2 = |
| Arrays.asList( |
| Boolean.FALSE, |
| (byte) 2, |
| (short) 2, |
| null, |
| 2L, |
| (float) 2.2, |
| (double) 2.2, |
| LocalDate.of(1900, 8, 8), |
| LocalDateTime.of(1900, 8, 8, 0, 0, 0), |
| DecimalData.fromBigDecimal(new BigDecimal(88.88), 4, 2), |
| "char2"); |
| |
| List<Object> expectedRow3 = |
| Arrays.asList( |
| Boolean.TRUE, |
| (byte) 3, |
| (short) 3, |
| 3, |
| 3L, |
| (float) 3.3, |
| (double) 3.3, |
| LocalDate.of(2100, 8, 8), |
| LocalDateTime.of(2100, 8, 8, 0, 0, 0), |
| DecimalData.fromBigDecimal(new BigDecimal(10.22), 4, 2), |
| "char3"); |
| |
| Assert.assertTrue(rowBatch.hasNext()); |
| List<Object> actualRow1 = rowBatch.next(); |
| actualRow1.set(9, DecimalData.fromBigDecimal((BigDecimal) actualRow1.get(9), 4, 2)); |
| Assert.assertEquals(expectedRow1, actualRow1); |
| |
| Assert.assertTrue(rowBatch.hasNext()); |
| List<Object> actualRow2 = rowBatch.next(); |
| actualRow2.set(9, DecimalData.fromBigDecimal((BigDecimal) actualRow2.get(9), 4, 2)); |
| Assert.assertEquals(expectedRow2, actualRow2); |
| |
| Assert.assertTrue(rowBatch.hasNext()); |
| List<Object> actualRow3 = rowBatch.next(); |
| actualRow3.set(9, DecimalData.fromBigDecimal((BigDecimal) actualRow3.get(9), 4, 2)); |
| Assert.assertEquals(expectedRow3, actualRow3); |
| |
| Assert.assertFalse(rowBatch.hasNext()); |
| thrown.expect(NoSuchElementException.class); |
| thrown.expectMessage(startsWith("Get row offset:")); |
| rowBatch.next(); |
| } |
| |
| @Test |
| public void testBinary() throws Exception { |
| byte[] binaryRow0 = {'a', 'b', 'c'}; |
| byte[] binaryRow1 = {'d', 'e', 'f'}; |
| byte[] binaryRow2 = {'g', 'h', 'i'}; |
| |
| List<Field> childrenBuilder = new ArrayList<>(); |
| childrenBuilder.add(new Field("k7", FieldType.nullable(new ArrowType.Binary()), null)); |
| |
| VectorSchemaRoot root = |
| VectorSchemaRoot.create( |
| new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null), |
| new RootAllocator(Integer.MAX_VALUE)); |
| ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
| ArrowStreamWriter arrowStreamWriter = |
| new ArrowStreamWriter( |
| root, new DictionaryProvider.MapDictionaryProvider(), outputStream); |
| |
| arrowStreamWriter.start(); |
| root.setRowCount(3); |
| |
| FieldVector vector = root.getVector("k7"); |
| VarBinaryVector varBinaryVector = (VarBinaryVector) vector; |
| varBinaryVector.setInitialCapacity(3); |
| varBinaryVector.allocateNew(); |
| varBinaryVector.setIndexDefined(0); |
| varBinaryVector.setValueLengthSafe(0, 3); |
| varBinaryVector.setSafe(0, binaryRow0); |
| varBinaryVector.setIndexDefined(1); |
| varBinaryVector.setValueLengthSafe(1, 3); |
| varBinaryVector.setSafe(1, binaryRow1); |
| varBinaryVector.setIndexDefined(2); |
| varBinaryVector.setValueLengthSafe(2, 3); |
| varBinaryVector.setSafe(2, binaryRow2); |
| vector.setValueCount(3); |
| |
| arrowStreamWriter.writeBatch(); |
| |
| arrowStreamWriter.end(); |
| arrowStreamWriter.close(); |
| |
| TStatus status = new TStatus(); |
| status.setStatusCode(TStatusCode.OK); |
| TScanBatchResult scanBatchResult = new TScanBatchResult(); |
| scanBatchResult.setStatus(status); |
| scanBatchResult.setEos(false); |
| scanBatchResult.setRows(outputStream.toByteArray()); |
| |
| String schemaStr = |
| "{\"properties\":[{\"type\":\"BINARY\",\"name\":\"k7\",\"comment\":\"\"}], \"status\":200}"; |
| |
| Schema schema = RestService.parseSchema(schemaStr, logger); |
| |
| RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); |
| |
| Assert.assertTrue(rowBatch.hasNext()); |
| List<Object> actualRow0 = rowBatch.next(); |
| Assert.assertArrayEquals(binaryRow0, (byte[]) actualRow0.get(0)); |
| |
| Assert.assertTrue(rowBatch.hasNext()); |
| List<Object> actualRow1 = rowBatch.next(); |
| Assert.assertArrayEquals(binaryRow1, (byte[]) actualRow1.get(0)); |
| |
| Assert.assertTrue(rowBatch.hasNext()); |
| List<Object> actualRow2 = rowBatch.next(); |
| Assert.assertArrayEquals(binaryRow2, (byte[]) actualRow2.get(0)); |
| |
| Assert.assertFalse(rowBatch.hasNext()); |
| thrown.expect(NoSuchElementException.class); |
| thrown.expectMessage(startsWith("Get row offset:")); |
| rowBatch.next(); |
| } |
| |
| @Test |
| public void testDecimalV2() throws Exception { |
| List<Field> childrenBuilder = new ArrayList<>(); |
| childrenBuilder.add( |
| new Field("k7", FieldType.nullable(new ArrowType.Decimal(27, 9)), null)); |
| |
| VectorSchemaRoot root = |
| VectorSchemaRoot.create( |
| new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null), |
| new RootAllocator(Integer.MAX_VALUE)); |
| ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
| ArrowStreamWriter arrowStreamWriter = |
| new ArrowStreamWriter( |
| root, new DictionaryProvider.MapDictionaryProvider(), outputStream); |
| |
| arrowStreamWriter.start(); |
| root.setRowCount(3); |
| |
| FieldVector vector = root.getVector("k7"); |
| DecimalVector decimalVector = (DecimalVector) vector; |
| decimalVector.setInitialCapacity(3); |
| decimalVector.allocateNew(3); |
| decimalVector.setSafe(0, new BigDecimal("12.340000000")); |
| decimalVector.setSafe(1, new BigDecimal("88.880000000")); |
| decimalVector.setSafe(2, new BigDecimal("10.000000000")); |
| vector.setValueCount(3); |
| |
| arrowStreamWriter.writeBatch(); |
| |
| arrowStreamWriter.end(); |
| arrowStreamWriter.close(); |
| |
| TStatus status = new TStatus(); |
| status.setStatusCode(TStatusCode.OK); |
| TScanBatchResult scanBatchResult = new TScanBatchResult(); |
| scanBatchResult.setStatus(status); |
| scanBatchResult.setEos(false); |
| scanBatchResult.setRows(outputStream.toByteArray()); |
| |
| String schemaStr = |
| "{\"properties\":[{\"type\":\"DECIMALV2\",\"scale\": 0," |
| + "\"precision\": 9, \"name\":\"k7\",\"comment\":\"\"}], " |
| + "\"status\":200}"; |
| |
| Schema schema = RestService.parseSchema(schemaStr, logger); |
| |
| RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); |
| |
| Assert.assertTrue(rowBatch.hasNext()); |
| List<Object> actualRow0 = rowBatch.next(); |
| Assert.assertEquals( |
| DecimalData.fromBigDecimal(new BigDecimal(12.340000000), 11, 9), |
| DecimalData.fromBigDecimal((BigDecimal) actualRow0.get(0), 11, 9)); |
| |
| Assert.assertTrue(rowBatch.hasNext()); |
| List<Object> actualRow1 = rowBatch.next(); |
| Assert.assertEquals( |
| DecimalData.fromBigDecimal(new BigDecimal(88.880000000), 11, 9), |
| DecimalData.fromBigDecimal((BigDecimal) actualRow1.get(0), 11, 9)); |
| |
| Assert.assertTrue(rowBatch.hasNext()); |
| List<Object> actualRow2 = rowBatch.next(); |
| Assert.assertEquals( |
| DecimalData.fromBigDecimal(new BigDecimal(10.000000000), 11, 9), |
| DecimalData.fromBigDecimal((BigDecimal) actualRow2.get(0), 11, 9)); |
| |
| Assert.assertFalse(rowBatch.hasNext()); |
| thrown.expect(NoSuchElementException.class); |
| thrown.expectMessage(startsWith("Get row offset:")); |
| rowBatch.next(); |
| } |
| |
| @Test |
| public void testMap() throws IOException, DorisException { |
| |
| ImmutableList<Field> mapChildren = |
| ImmutableList.of( |
| new Field( |
| "child", |
| new FieldType(false, new ArrowType.Struct(), null), |
| ImmutableList.of( |
| new Field( |
| "key", |
| new FieldType(false, new ArrowType.Utf8(), null), |
| null), |
| new Field( |
| "value", |
| new FieldType( |
| false, new ArrowType.Int(32, true), null), |
| null)))); |
| |
| ImmutableList<Field> fields = |
| ImmutableList.of( |
| new Field( |
| "col_map", |
| new FieldType(false, new ArrowType.Map(false), null), |
| mapChildren)); |
| |
| RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE); |
| VectorSchemaRoot root = |
| VectorSchemaRoot.create( |
| new org.apache.arrow.vector.types.pojo.Schema(fields, null), allocator); |
| ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
| ArrowStreamWriter arrowStreamWriter = |
| new ArrowStreamWriter( |
| root, new DictionaryProvider.MapDictionaryProvider(), outputStream); |
| |
| arrowStreamWriter.start(); |
| root.setRowCount(3); |
| |
| MapVector mapVector = (MapVector) root.getVector("col_map"); |
| mapVector.allocateNew(); |
| UnionMapWriter mapWriter = mapVector.getWriter(); |
| for (int i = 0; i < 3; i++) { |
| mapWriter.setPosition(i); |
| mapWriter.startMap(); |
| mapWriter.startEntry(); |
| String key = "k" + (i + 1); |
| byte[] bytes = key.getBytes(StandardCharsets.UTF_8); |
| ArrowBuf buffer = allocator.buffer(bytes.length); |
| buffer.setBytes(0, bytes); |
| mapWriter.key().varChar().writeVarChar(0, bytes.length, buffer); |
| buffer.close(); |
| mapWriter.value().integer().writeInt(i); |
| mapWriter.endEntry(); |
| mapWriter.endMap(); |
| } |
| mapWriter.setValueCount(3); |
| |
| arrowStreamWriter.writeBatch(); |
| |
| arrowStreamWriter.end(); |
| arrowStreamWriter.close(); |
| |
| TStatus status = new TStatus(); |
| status.setStatusCode(TStatusCode.OK); |
| TScanBatchResult scanBatchResult = new TScanBatchResult(); |
| scanBatchResult.setStatus(status); |
| scanBatchResult.setEos(false); |
| scanBatchResult.setRows(outputStream.toByteArray()); |
| |
| String schemaStr = |
| "{\"properties\":[{\"type\":\"MAP\",\"name\":\"col_map\",\"comment\":\"\"}" |
| + "], \"status\":200}"; |
| |
| Schema schema = RestService.parseSchema(schemaStr, logger); |
| |
| RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); |
| Assert.assertTrue(rowBatch.hasNext()); |
| Assert.assertTrue(ImmutableMap.of("k1", 0).equals(rowBatch.next().get(0))); |
| Assert.assertTrue(rowBatch.hasNext()); |
| Assert.assertTrue(ImmutableMap.of("k2", 1).equals(rowBatch.next().get(0))); |
| Assert.assertTrue(rowBatch.hasNext()); |
| Assert.assertTrue(ImmutableMap.of("k3", 2).equals(rowBatch.next().get(0))); |
| Assert.assertFalse(rowBatch.hasNext()); |
| } |
| |
| @Test |
| public void testStruct() throws IOException, DorisException { |
| |
| ImmutableList<Field> fields = |
| ImmutableList.of( |
| new Field( |
| "col_struct", |
| new FieldType(false, new ArrowType.Struct(), null), |
| ImmutableList.of( |
| new Field( |
| "a", |
| new FieldType(false, new ArrowType.Utf8(), null), |
| null), |
| new Field( |
| "b", |
| new FieldType( |
| false, new ArrowType.Int(32, true), null), |
| null)))); |
| |
| RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE); |
| VectorSchemaRoot root = |
| VectorSchemaRoot.create( |
| new org.apache.arrow.vector.types.pojo.Schema(fields, null), allocator); |
| ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
| ArrowStreamWriter arrowStreamWriter = |
| new ArrowStreamWriter( |
| root, new DictionaryProvider.MapDictionaryProvider(), outputStream); |
| |
| arrowStreamWriter.start(); |
| root.setRowCount(3); |
| |
| StructVector structVector = (StructVector) root.getVector("col_struct"); |
| structVector.allocateNew(); |
| NullableStructWriter writer = structVector.getWriter(); |
| writer.setPosition(0); |
| writer.start(); |
| byte[] bytes = "a1".getBytes(StandardCharsets.UTF_8); |
| ArrowBuf buffer = allocator.buffer(bytes.length); |
| buffer.setBytes(0, bytes); |
| writer.varChar("a").writeVarChar(0, bytes.length, buffer); |
| buffer.close(); |
| writer.integer("b").writeInt(1); |
| writer.end(); |
| writer.setValueCount(1); |
| |
| arrowStreamWriter.writeBatch(); |
| |
| arrowStreamWriter.end(); |
| arrowStreamWriter.close(); |
| |
| TStatus status = new TStatus(); |
| status.setStatusCode(TStatusCode.OK); |
| TScanBatchResult scanBatchResult = new TScanBatchResult(); |
| scanBatchResult.setStatus(status); |
| scanBatchResult.setEos(false); |
| scanBatchResult.setRows(outputStream.toByteArray()); |
| |
| String schemaStr = |
| "{\"properties\":[{\"type\":\"STRUCT\",\"name\":\"col_struct\",\"comment\":\"\"}" |
| + "], \"status\":200}"; |
| Schema schema = RestService.parseSchema(schemaStr, logger); |
| |
| RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); |
| Assert.assertTrue(rowBatch.hasNext()); |
| Assert.assertTrue( |
| ImmutableMap.of("a", new Text("a1"), "b", 1).equals(rowBatch.next().get(0))); |
| } |
| |
| @Test |
| public void testDate() throws DorisException, IOException { |
| |
| ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); |
| childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null)); |
| childrenBuilder.add(new Field("k2", FieldType.nullable(new ArrowType.Utf8()), null)); |
| childrenBuilder.add( |
| new Field("k3", FieldType.nullable(new ArrowType.Date(DateUnit.DAY)), null)); |
| |
| VectorSchemaRoot root = |
| VectorSchemaRoot.create( |
| new org.apache.arrow.vector.types.pojo.Schema( |
| childrenBuilder.build(), null), |
| new RootAllocator(Integer.MAX_VALUE)); |
| ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
| ArrowStreamWriter arrowStreamWriter = |
| new ArrowStreamWriter( |
| root, new DictionaryProvider.MapDictionaryProvider(), outputStream); |
| |
| arrowStreamWriter.start(); |
| root.setRowCount(1); |
| |
| FieldVector vector = root.getVector("k1"); |
| VarCharVector dateVector = (VarCharVector) vector; |
| dateVector.setInitialCapacity(1); |
| dateVector.allocateNew(); |
| dateVector.setIndexDefined(0); |
| dateVector.setValueLengthSafe(0, 10); |
| dateVector.setSafe(0, "2023-08-09".getBytes()); |
| vector.setValueCount(1); |
| |
| vector = root.getVector("k2"); |
| VarCharVector dateV2Vector = (VarCharVector) vector; |
| dateV2Vector.setInitialCapacity(1); |
| dateV2Vector.allocateNew(); |
| dateV2Vector.setIndexDefined(0); |
| dateV2Vector.setValueLengthSafe(0, 10); |
| dateV2Vector.setSafe(0, "2023-08-10".getBytes()); |
| vector.setValueCount(1); |
| |
| vector = root.getVector("k3"); |
| DateDayVector dateNewVector = (DateDayVector) vector; |
| dateNewVector.setInitialCapacity(1); |
| dateNewVector.allocateNew(); |
| dateNewVector.setIndexDefined(0); |
| dateNewVector.setSafe(0, 19802); |
| vector.setValueCount(1); |
| |
| arrowStreamWriter.writeBatch(); |
| |
| arrowStreamWriter.end(); |
| arrowStreamWriter.close(); |
| |
| TStatus status = new TStatus(); |
| status.setStatusCode(TStatusCode.OK); |
| TScanBatchResult scanBatchResult = new TScanBatchResult(); |
| scanBatchResult.setStatus(status); |
| scanBatchResult.setEos(false); |
| scanBatchResult.setRows(outputStream.toByteArray()); |
| |
| String schemaStr = |
| "{\"properties\":[" |
| + "{\"type\":\"DATE\",\"name\":\"k1\",\"comment\":\"\"}, " |
| + "{\"type\":\"DATEV2\",\"name\":\"k2\",\"comment\":\"\"}, " |
| + "{\"type\":\"DATEV2\",\"name\":\"k3\",\"comment\":\"\"}" |
| + "], \"status\":200}"; |
| |
| Schema schema = RestService.parseSchema(schemaStr, logger); |
| |
| RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); |
| |
| Assert.assertTrue(rowBatch.hasNext()); |
| List<Object> actualRow0 = rowBatch.next(); |
| Assert.assertEquals(LocalDate.of(2023, 8, 9), actualRow0.get(0)); |
| Assert.assertEquals(LocalDate.of(2023, 8, 10), actualRow0.get(1)); |
| Assert.assertEquals(LocalDate.of(2024, 3, 20), actualRow0.get(2)); |
| |
| Assert.assertFalse(rowBatch.hasNext()); |
| thrown.expect(NoSuchElementException.class); |
| thrown.expectMessage(startsWith("Get row offset:")); |
| rowBatch.next(); |
| } |
| |
| @Test |
| public void testDateTime() throws IOException, DorisException { |
| |
| ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); |
| childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null)); |
| childrenBuilder.add( |
| new Field( |
| "k2", |
| FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)), |
| null)); |
| |
| VectorSchemaRoot root = |
| VectorSchemaRoot.create( |
| new org.apache.arrow.vector.types.pojo.Schema( |
| childrenBuilder.build(), null), |
| new RootAllocator(Integer.MAX_VALUE)); |
| ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
| ArrowStreamWriter arrowStreamWriter = |
| new ArrowStreamWriter( |
| root, new DictionaryProvider.MapDictionaryProvider(), outputStream); |
| |
| arrowStreamWriter.start(); |
| root.setRowCount(3); |
| |
| FieldVector vector = root.getVector("k1"); |
| VarCharVector datetimeVector = (VarCharVector) vector; |
| datetimeVector.setInitialCapacity(3); |
| datetimeVector.allocateNew(); |
| datetimeVector.setIndexDefined(0); |
| datetimeVector.setValueLengthSafe(0, 20); |
| datetimeVector.setSafe(0, "2024-03-20 00:00:00".getBytes()); |
| datetimeVector.setIndexDefined(1); |
| datetimeVector.setValueLengthSafe(1, 20); |
| datetimeVector.setSafe(1, "2024-03-20 00:00:01".getBytes()); |
| datetimeVector.setIndexDefined(2); |
| datetimeVector.setValueLengthSafe(2, 20); |
| datetimeVector.setSafe(2, "2024-03-20 00:00:02".getBytes()); |
| vector.setValueCount(3); |
| |
| LocalDateTime localDateTime = LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123456000); |
| long second = localDateTime.atZone(ZoneId.systemDefault()).toEpochSecond(); |
| int nano = localDateTime.getNano(); |
| |
| vector = root.getVector("k2"); |
| TimeStampMicroVector datetimeV2Vector = (TimeStampMicroVector) vector; |
| datetimeV2Vector.setInitialCapacity(3); |
| datetimeV2Vector.allocateNew(); |
| datetimeV2Vector.setIndexDefined(0); |
| datetimeV2Vector.setSafe(0, second); |
| datetimeV2Vector.setIndexDefined(1); |
| datetimeV2Vector.setSafe(1, second * 1000 + nano / 1000000); |
| datetimeV2Vector.setIndexDefined(2); |
| datetimeV2Vector.setSafe(2, second * 1000000 + nano / 1000); |
| vector.setValueCount(3); |
| |
| arrowStreamWriter.writeBatch(); |
| |
| arrowStreamWriter.end(); |
| arrowStreamWriter.close(); |
| |
| TStatus status = new TStatus(); |
| status.setStatusCode(TStatusCode.OK); |
| TScanBatchResult scanBatchResult = new TScanBatchResult(); |
| scanBatchResult.setStatus(status); |
| scanBatchResult.setEos(false); |
| scanBatchResult.setRows(outputStream.toByteArray()); |
| |
| String schemaStr = |
| "{\"properties\":[" |
| + "{\"type\":\"DATETIME\",\"name\":\"k1\",\"comment\":\"\"}, " |
| + "{\"type\":\"DATETIMEV2\",\"name\":\"k2\",\"comment\":\"\"}" |
| + "], \"status\":200}"; |
| |
| Schema schema = RestService.parseSchema(schemaStr, logger); |
| |
| RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); |
| |
| Assert.assertTrue(rowBatch.hasNext()); |
| List<Object> actualRow0 = rowBatch.next(); |
| Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0), actualRow0.get(0)); |
| Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0), actualRow0.get(1)); |
| |
| List<Object> actualRow1 = rowBatch.next(); |
| Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 1), actualRow1.get(0)); |
| Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123000000), actualRow1.get(1)); |
| |
| List<Object> actualRow2 = rowBatch.next(); |
| Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 2), actualRow2.get(0)); |
| Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123456000), actualRow2.get(1)); |
| |
| Assert.assertFalse(rowBatch.hasNext()); |
| thrown.expect(NoSuchElementException.class); |
| thrown.expectMessage(startsWith("Get row offset:")); |
| rowBatch.next(); |
| } |
| |
| @Test |
| public void testLargeInt() throws DorisException, IOException { |
| |
| ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); |
| childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null)); |
| childrenBuilder.add( |
| new Field("k2", FieldType.nullable(new ArrowType.FixedSizeBinary(16)), null)); |
| |
| VectorSchemaRoot root = |
| VectorSchemaRoot.create( |
| new org.apache.arrow.vector.types.pojo.Schema( |
| childrenBuilder.build(), null), |
| new RootAllocator(Integer.MAX_VALUE)); |
| ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
| ArrowStreamWriter arrowStreamWriter = |
| new ArrowStreamWriter( |
| root, new DictionaryProvider.MapDictionaryProvider(), outputStream); |
| |
| arrowStreamWriter.start(); |
| root.setRowCount(1); |
| |
| FieldVector vector = root.getVector("k1"); |
| VarCharVector lageIntVector = (VarCharVector) vector; |
| lageIntVector.setInitialCapacity(1); |
| lageIntVector.allocateNew(); |
| lageIntVector.setIndexDefined(0); |
| lageIntVector.setValueLengthSafe(0, 19); |
| lageIntVector.setSafe(0, "9223372036854775808".getBytes()); |
| vector.setValueCount(1); |
| |
| vector = root.getVector("k2"); |
| FixedSizeBinaryVector lageIntVector1 = (FixedSizeBinaryVector) vector; |
| lageIntVector1.setInitialCapacity(1); |
| lageIntVector1.allocateNew(); |
| lageIntVector1.setIndexDefined(0); |
| byte[] bytes = new BigInteger("9223372036854775809").toByteArray(); |
| byte[] fixedBytes = new byte[16]; |
| System.arraycopy(bytes, 0, fixedBytes, 16 - bytes.length, bytes.length); |
| ArrayUtils.reverse(fixedBytes); |
| lageIntVector1.setSafe(0, fixedBytes); |
| vector.setValueCount(1); |
| |
| arrowStreamWriter.writeBatch(); |
| |
| arrowStreamWriter.end(); |
| arrowStreamWriter.close(); |
| |
| TStatus status = new TStatus(); |
| status.setStatusCode(TStatusCode.OK); |
| TScanBatchResult scanBatchResult = new TScanBatchResult(); |
| scanBatchResult.setStatus(status); |
| scanBatchResult.setEos(false); |
| scanBatchResult.setRows(outputStream.toByteArray()); |
| |
| String schemaStr = |
| "{\"properties\":[" |
| + "{\"type\":\"LARGEINT\",\"name\":\"k1\",\"comment\":\"\"}, " |
| + "{\"type\":\"LARGEINT\",\"name\":\"k2\",\"comment\":\"\"}" |
| + "], \"status\":200}"; |
| |
| Schema schema = RestService.parseSchema(schemaStr, logger); |
| |
| RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); |
| |
| Assert.assertTrue(rowBatch.hasNext()); |
| List<Object> actualRow0 = rowBatch.next(); |
| |
| Assert.assertEquals(new BigInteger("9223372036854775808"), actualRow0.get(0)); |
| Assert.assertEquals(new BigInteger("9223372036854775809"), actualRow0.get(1)); |
| |
| Assert.assertFalse(rowBatch.hasNext()); |
| thrown.expect(NoSuchElementException.class); |
| thrown.expectMessage(startsWith("Get row offset:")); |
| rowBatch.next(); |
| } |
| |
| @Test |
| public void testVariant() throws DorisException, IOException { |
| |
| ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); |
| childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null)); |
| |
| VectorSchemaRoot root = |
| VectorSchemaRoot.create( |
| new org.apache.arrow.vector.types.pojo.Schema( |
| childrenBuilder.build(), null), |
| new RootAllocator(Integer.MAX_VALUE)); |
| ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
| ArrowStreamWriter arrowStreamWriter = |
| new ArrowStreamWriter( |
| root, new DictionaryProvider.MapDictionaryProvider(), outputStream); |
| |
| arrowStreamWriter.start(); |
| root.setRowCount(3); |
| |
| FieldVector vector = root.getVector("k1"); |
| VarCharVector datetimeVector = (VarCharVector) vector; |
| datetimeVector.setInitialCapacity(3); |
| datetimeVector.allocateNew(); |
| datetimeVector.setIndexDefined(0); |
| datetimeVector.setValueLengthSafe(0, 20); |
| datetimeVector.setSafe(0, "{\"id\":\"a\"}".getBytes()); |
| datetimeVector.setIndexDefined(1); |
| datetimeVector.setValueLengthSafe(1, 20); |
| datetimeVector.setSafe(1, "1000".getBytes()); |
| datetimeVector.setIndexDefined(2); |
| datetimeVector.setValueLengthSafe(2, 20); |
| datetimeVector.setSafe(2, "123.456".getBytes()); |
| vector.setValueCount(3); |
| |
| arrowStreamWriter.writeBatch(); |
| |
| arrowStreamWriter.end(); |
| arrowStreamWriter.close(); |
| |
| TStatus status = new TStatus(); |
| status.setStatusCode(TStatusCode.OK); |
| TScanBatchResult scanBatchResult = new TScanBatchResult(); |
| scanBatchResult.setStatus(status); |
| scanBatchResult.setEos(false); |
| scanBatchResult.setRows(outputStream.toByteArray()); |
| |
| String schemaStr = |
| "{\"properties\":[{\"type\":\"VARIANT\",\"name\":\"k\",\"comment\":\"\"}" |
| + "], \"status\":200}"; |
| |
| Schema schema = RestService.parseSchema(schemaStr, logger); |
| |
| RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); |
| |
| Assert.assertTrue(rowBatch.hasNext()); |
| List<Object> actualRow0 = rowBatch.next(); |
| Assert.assertEquals("{\"id\":\"a\"}", actualRow0.get(0)); |
| |
| List<Object> actualRow1 = rowBatch.next(); |
| Assert.assertEquals("1000", actualRow1.get(0)); |
| |
| List<Object> actualRow2 = rowBatch.next(); |
| Assert.assertEquals("123.456", actualRow2.get(0)); |
| |
| Assert.assertFalse(rowBatch.hasNext()); |
| thrown.expect(NoSuchElementException.class); |
| thrown.expectMessage(startsWith("Get row offset:")); |
| rowBatch.next(); |
| } |
| } |