| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| using System; |
| using System.Buffers.Binary; |
| using System.Collections.Generic; |
| using System.IO; |
| using System.Linq; |
| using System.Net; |
| using System.Net.Sockets; |
| using System.Threading.Tasks; |
| using Apache.Arrow.Ipc; |
| using Apache.Arrow.Memory; |
| using Apache.Arrow.Types; |
| using Xunit; |
| |
| namespace Apache.Arrow.Tests |
| { |
| public class ArrowStreamWriterTests |
| { |
| [Fact] |
| public void Ctor_LeaveOpenDefault_StreamClosedOnDispose() |
| { |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100); |
| var stream = new MemoryStream(); |
| new ArrowStreamWriter(stream, originalBatch.Schema).Dispose(); |
| Assert.Throws<ObjectDisposedException>(() => stream.Position); |
| } |
| |
| [Fact] |
| public void Ctor_LeaveOpenFalse_StreamClosedOnDispose() |
| { |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100); |
| var stream = new MemoryStream(); |
| new ArrowStreamWriter(stream, originalBatch.Schema, leaveOpen: false).Dispose(); |
| Assert.Throws<ObjectDisposedException>(() => stream.Position); |
| } |
| |
| [Fact] |
| public void Ctor_LeaveOpenTrue_StreamValidOnDispose() |
| { |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100); |
| var stream = new MemoryStream(); |
| new ArrowStreamWriter(stream, originalBatch.Schema, leaveOpen: true).Dispose(); |
| Assert.Equal(0, stream.Position); |
| } |
| |
| [Theory] |
| [InlineData(true)] |
| [InlineData(false)] |
| public void CanWriteToNetworkStream(bool createDictionaryArray) |
| { |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100, createDictionaryArray: createDictionaryArray); |
| |
| TcpListener listener = new TcpListener(IPAddress.Loopback, 0); |
| listener.Start(); |
| int port = ((IPEndPoint)listener.LocalEndpoint).Port; |
| |
| using (TcpClient sender = new TcpClient()) |
| { |
| sender.Connect(IPAddress.Loopback, port); |
| NetworkStream stream = sender.GetStream(); |
| |
| using (var writer = new ArrowStreamWriter(stream, originalBatch.Schema)) |
| { |
| writer.WriteRecordBatch(originalBatch); |
| writer.WriteEnd(); |
| |
| stream.Flush(); |
| } |
| } |
| |
| using (TcpClient receiver = listener.AcceptTcpClient()) |
| { |
| NetworkStream stream = receiver.GetStream(); |
| using (var reader = new ArrowStreamReader(stream)) |
| { |
| RecordBatch newBatch = reader.ReadNextRecordBatch(); |
| ArrowReaderVerifier.CompareBatches(originalBatch, newBatch); |
| } |
| } |
| } |
| |
| [Theory] |
| [InlineData(true)] |
| [InlineData(false)] |
| public async Task CanWriteToNetworkStreamAsync(bool createDictionaryArray) |
| { |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100, createDictionaryArray: createDictionaryArray); |
| |
| TcpListener listener = new TcpListener(IPAddress.Loopback, 0); |
| listener.Start(); |
| int port = ((IPEndPoint)listener.LocalEndpoint).Port; |
| |
| using (TcpClient sender = new TcpClient()) |
| { |
| sender.Connect(IPAddress.Loopback, port); |
| NetworkStream stream = sender.GetStream(); |
| |
| using (var writer = new ArrowStreamWriter(stream, originalBatch.Schema)) |
| { |
| await writer.WriteRecordBatchAsync(originalBatch); |
| await writer.WriteEndAsync(); |
| |
| stream.Flush(); |
| } |
| } |
| |
| using (TcpClient receiver = listener.AcceptTcpClient()) |
| { |
| NetworkStream stream = receiver.GetStream(); |
| using (var reader = new ArrowStreamReader(stream)) |
| { |
| RecordBatch newBatch = reader.ReadNextRecordBatch(); |
| ArrowReaderVerifier.CompareBatches(originalBatch, newBatch); |
| } |
| } |
| } |
| |
| [Theory] |
| [InlineData(true)] |
| [InlineData(false)] |
| public void WriteEmptyBatch(bool createDictionaryArray) |
| { |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 0, createDictionaryArray: createDictionaryArray); |
| |
| TestRoundTripRecordBatch(originalBatch); |
| } |
| |
| [Theory] |
| [InlineData(true)] |
| [InlineData(false)] |
| public async Task WriteEmptyBatchAsync(bool createDictionaryArray) |
| { |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 0, createDictionaryArray: createDictionaryArray); |
| |
| await TestRoundTripRecordBatchAsync(originalBatch); |
| } |
| |
| [Fact] |
| public void WriteBatchWithNulls() |
| { |
| RecordBatch originalBatch = new RecordBatch.Builder() |
| .Append("Column1", false, col => col.Int32(array => array.AppendRange(Enumerable.Range(0, 10)))) |
| .Append("Column2", true, new Int32Array( |
| valueBuffer: new ArrowBuffer.Builder<int>().AppendRange(Enumerable.Range(0, 10)).Build(), |
| nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0xfd).Append(0xff).Build(), |
| length: 10, |
| nullCount: 2, |
| offset: 0)) |
| .Append("Column3", true, new Int32Array( |
| valueBuffer: new ArrowBuffer.Builder<int>().AppendRange(Enumerable.Range(0, 10)).Build(), |
| nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0x00).Append(0x00).Build(), |
| length: 10, |
| nullCount: 10, |
| offset: 0)) |
| .Append("NullableBooleanColumn", true, new BooleanArray( |
| valueBuffer: new ArrowBuffer.Builder<byte>().Append(0xfd).Append(0xff).Build(), |
| nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0xed).Append(0xff).Build(), |
| length: 10, |
| nullCount: 3, |
| offset: 0)) |
| .Append("NullColumn", true, new NullArray(10)) |
| .Build(); |
| |
| TestRoundTripRecordBatch(originalBatch); |
| } |
| |
| [Fact] |
| public async Task WriteBatchWithNullsAsync() |
| { |
| RecordBatch originalBatch = new RecordBatch.Builder() |
| .Append("Column1", false, col => col.Int32(array => array.AppendRange(Enumerable.Range(0, 10)))) |
| .Append("Column2", true, new Int32Array( |
| valueBuffer: new ArrowBuffer.Builder<int>().AppendRange(Enumerable.Range(0, 10)).Build(), |
| nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0xfd).Append(0xff).Build(), |
| length: 10, |
| nullCount: 2, |
| offset: 0)) |
| .Append("Column3", true, new Int32Array( |
| valueBuffer: new ArrowBuffer.Builder<int>().AppendRange(Enumerable.Range(0, 10)).Build(), |
| nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0x00).Append(0x00).Build(), |
| length: 10, |
| nullCount: 10, |
| offset: 0)) |
| .Append("NullableBooleanColumn", true, new BooleanArray( |
| valueBuffer: new ArrowBuffer.Builder<byte>().Append(0xfd).Append(0xff).Build(), |
| nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0xed).Append(0xff).Build(), |
| length: 10, |
| nullCount: 3, |
| offset: 0)) |
| .Build(); |
| |
| await TestRoundTripRecordBatchAsync(originalBatch); |
| } |
| |
| [Theory] |
| [InlineData(0, 45)] |
| [InlineData(3, 45)] |
| [InlineData(16, 45)] |
| public void WriteSlicedArrays(int sliceOffset, int sliceLength) |
| { |
| var originalBatch = TestData.CreateSampleRecordBatch(length: 100); |
| var slicedArrays = originalBatch.Arrays |
| .Select(array => ArrowArrayFactory.Slice(array, sliceOffset, sliceLength)) |
| .ToList(); |
| var slicedBatch = new RecordBatch(originalBatch.Schema, slicedArrays, sliceLength); |
| |
| TestRoundTripRecordBatch(slicedBatch, strictCompare: false); |
| } |
| |
| [Theory] |
| [InlineData(0, 45)] |
| [InlineData(3, 45)] |
| [InlineData(16, 45)] |
| public async Task WriteSlicedArraysAsync(int sliceOffset, int sliceLength) |
| { |
| var originalBatch = TestData.CreateSampleRecordBatch(length: 100); |
| var slicedArrays = originalBatch.Arrays |
| .Select(array => ArrowArrayFactory.Slice(array, sliceOffset, sliceLength)) |
| .ToList(); |
| var slicedBatch = new RecordBatch(originalBatch.Schema, slicedArrays, sliceLength); |
| |
| await TestRoundTripRecordBatchAsync(slicedBatch, strictCompare: false); |
| } |
| |
| private static void TestRoundTripRecordBatches(List<RecordBatch> originalBatches, IpcOptions options = null, bool strictCompare = true) |
| { |
| using (MemoryStream stream = new MemoryStream()) |
| { |
| using (var writer = new ArrowStreamWriter(stream, originalBatches[0].Schema, leaveOpen: true, options, new TestMemoryAllocator())) |
| { |
| foreach (RecordBatch originalBatch in originalBatches) |
| { |
| writer.WriteRecordBatch(originalBatch); |
| } |
| writer.WriteEnd(); |
| } |
| |
| stream.Position = 0; |
| |
| using (var reader = new ArrowStreamReader(stream)) |
| { |
| foreach (RecordBatch originalBatch in originalBatches) |
| { |
| RecordBatch newBatch = reader.ReadNextRecordBatch(); |
| ArrowReaderVerifier.CompareBatches(originalBatch, newBatch, strictCompare: strictCompare); |
| } |
| } |
| } |
| } |
| |
| private static async Task TestRoundTripRecordBatchesAsync(List<RecordBatch> originalBatches, IpcOptions options = null, bool strictCompare = true, MemoryAllocator memoryAllocator = null) |
| { |
| using (MemoryStream stream = new MemoryStream()) |
| { |
| using (var writer = new ArrowStreamWriter(stream, originalBatches[0].Schema, leaveOpen: true, options, memoryAllocator ?? new TestMemoryAllocator())) |
| { |
| foreach (RecordBatch originalBatch in originalBatches) |
| { |
| await writer.WriteRecordBatchAsync(originalBatch); |
| } |
| await writer.WriteEndAsync(); |
| } |
| |
| stream.Position = 0; |
| |
| using (var reader = new ArrowStreamReader(stream)) |
| { |
| foreach (RecordBatch originalBatch in originalBatches) |
| { |
| RecordBatch newBatch = reader.ReadNextRecordBatch(); |
| ArrowReaderVerifier.CompareBatches(originalBatch, newBatch, strictCompare: strictCompare); |
| } |
| } |
| } |
| } |
| |
| private static void TestRoundTripRecordBatch(RecordBatch originalBatch, IpcOptions options = null, bool strictCompare = true) |
| { |
| TestRoundTripRecordBatches(new List<RecordBatch> { originalBatch }, options, strictCompare: strictCompare); |
| } |
| |
| private static async Task TestRoundTripRecordBatchAsync(RecordBatch originalBatch, IpcOptions options = null, bool strictCompare = true) |
| { |
| await TestRoundTripRecordBatchesAsync(new List<RecordBatch> { originalBatch }, options, strictCompare: strictCompare); |
| } |
| |
| [Fact] |
| public void WriteBatchWithCorrectPadding() |
| { |
| byte value1 = 0x04; |
| byte value2 = 0x14; |
| var batch = new RecordBatch( |
| new Schema.Builder() |
| .Field(f => f.Name("age").DataType(Int32Type.Default)) |
| .Field(f => f.Name("characterCount").DataType(Int32Type.Default)) |
| .Build(), |
| new IArrowArray[] |
| { |
| new Int32Array( |
| new ArrowBuffer(new byte[] { value1, value1, 0x00, 0x00 }), |
| ArrowBuffer.Empty, |
| length: 1, |
| nullCount: 0, |
| offset: 0), |
| new Int32Array( |
| new ArrowBuffer(new byte[] { value2, value2, 0x00, 0x00 }), |
| ArrowBuffer.Empty, |
| length: 1, |
| nullCount: 0, |
| offset: 0) |
| }, |
| length: 1); |
| |
| TestRoundTripRecordBatch(batch); |
| |
| using (MemoryStream stream = new MemoryStream()) |
| { |
| using (var writer = new ArrowStreamWriter(stream, batch.Schema, leaveOpen: true)) |
| { |
| writer.WriteRecordBatch(batch); |
| writer.WriteEnd(); |
| } |
| |
| byte[] writtenBytes = stream.ToArray(); |
| |
| // ensure that the data buffers at the end are 8-byte aligned |
| Assert.Equal(value1, writtenBytes[writtenBytes.Length - 24]); |
| Assert.Equal(value1, writtenBytes[writtenBytes.Length - 23]); |
| for (int i = 22; i > 16; i--) |
| { |
| Assert.Equal(0, writtenBytes[writtenBytes.Length - i]); |
| } |
| |
| Assert.Equal(value2, writtenBytes[writtenBytes.Length - 16]); |
| Assert.Equal(value2, writtenBytes[writtenBytes.Length - 15]); |
| for (int i = 14; i > 8; i--) |
| { |
| Assert.Equal(0, writtenBytes[writtenBytes.Length - i]); |
| } |
| |
| // verify the EOS is written correctly |
| for (int i = 8; i > 4; i--) |
| { |
| Assert.Equal(0xFF, writtenBytes[writtenBytes.Length - i]); |
| } |
| for (int i = 4; i > 0; i--) |
| { |
| Assert.Equal(0x00, writtenBytes[writtenBytes.Length - i]); |
| } |
| } |
| } |
| |
| [Fact] |
| public async Task WriteBatchWithCorrectPaddingAsync() |
| { |
| byte value1 = 0x04; |
| byte value2 = 0x14; |
| var batch = new RecordBatch( |
| new Schema.Builder() |
| .Field(f => f.Name("age").DataType(Int32Type.Default)) |
| .Field(f => f.Name("characterCount").DataType(Int32Type.Default)) |
| .Build(), |
| new IArrowArray[] |
| { |
| new Int32Array( |
| new ArrowBuffer(new byte[] { value1, value1, 0x00, 0x00 }), |
| ArrowBuffer.Empty, |
| length: 1, |
| nullCount: 0, |
| offset: 0), |
| new Int32Array( |
| new ArrowBuffer(new byte[] { value2, value2, 0x00, 0x00 }), |
| ArrowBuffer.Empty, |
| length: 1, |
| nullCount: 0, |
| offset: 0) |
| }, |
| length: 1); |
| |
| await TestRoundTripRecordBatchAsync(batch); |
| |
| using (MemoryStream stream = new MemoryStream()) |
| { |
| using (var writer = new ArrowStreamWriter(stream, batch.Schema, leaveOpen: true)) |
| { |
| await writer.WriteRecordBatchAsync(batch); |
| await writer.WriteEndAsync(); |
| } |
| |
| byte[] writtenBytes = stream.ToArray(); |
| |
| // ensure that the data buffers at the end are 8-byte aligned |
| Assert.Equal(value1, writtenBytes[writtenBytes.Length - 24]); |
| Assert.Equal(value1, writtenBytes[writtenBytes.Length - 23]); |
| for (int i = 22; i > 16; i--) |
| { |
| Assert.Equal(0, writtenBytes[writtenBytes.Length - i]); |
| } |
| |
| Assert.Equal(value2, writtenBytes[writtenBytes.Length - 16]); |
| Assert.Equal(value2, writtenBytes[writtenBytes.Length - 15]); |
| for (int i = 14; i > 8; i--) |
| { |
| Assert.Equal(0, writtenBytes[writtenBytes.Length - i]); |
| } |
| |
| // verify the EOS is written correctly |
| for (int i = 8; i > 4; i--) |
| { |
| Assert.Equal(0xFF, writtenBytes[writtenBytes.Length - i]); |
| } |
| for (int i = 4; i > 0; i--) |
| { |
| Assert.Equal(0x00, writtenBytes[writtenBytes.Length - i]); |
| } |
| } |
| } |
| |
| [Theory] |
| [InlineData(true)] |
| [InlineData(false)] |
| public void LegacyIpcFormatRoundTrips(bool createDictionaryArray) |
| { |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100, createDictionaryArray: createDictionaryArray); |
| TestRoundTripRecordBatch(originalBatch, new IpcOptions() { WriteLegacyIpcFormat = true }); |
| } |
| |
| [Theory] |
| [InlineData(true)] |
| [InlineData(false)] |
| public async Task LegacyIpcFormatRoundTripsAsync(bool createDictionaryArray) |
| { |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100, createDictionaryArray: createDictionaryArray); |
| await TestRoundTripRecordBatchAsync(originalBatch, new IpcOptions() { WriteLegacyIpcFormat = true }); |
| } |
| |
| [Theory] |
| [InlineData(true, true)] |
| [InlineData(true, false)] |
| [InlineData(false, true)] |
| [InlineData(false, false)] |
| public void WriteLegacyIpcFormat(bool writeLegacyIpcFormat, bool createDictionaryArray) |
| { |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100, createDictionaryArray: createDictionaryArray); |
| var options = new IpcOptions() { WriteLegacyIpcFormat = writeLegacyIpcFormat }; |
| |
| using (MemoryStream stream = new MemoryStream()) |
| { |
| using (var writer = new ArrowStreamWriter(stream, originalBatch.Schema, leaveOpen: true, options)) |
| { |
| writer.WriteRecordBatch(originalBatch); |
| writer.WriteEnd(); |
| } |
| |
| stream.Position = 0; |
| |
| // ensure the continuation is written correctly |
| byte[] buffer = stream.ToArray(); |
| int messageLength = BinaryPrimitives.ReadInt32LittleEndian(buffer); |
| int endOfBuffer1 = BinaryPrimitives.ReadInt32LittleEndian(buffer.AsSpan(buffer.Length - 8)); |
| int endOfBuffer2 = BinaryPrimitives.ReadInt32LittleEndian(buffer.AsSpan(buffer.Length - 4)); |
| if (writeLegacyIpcFormat) |
| { |
| // the legacy IPC format doesn't have a continuation token at the start |
| Assert.NotEqual(-1, messageLength); |
| Assert.NotEqual(-1, endOfBuffer1); |
| } |
| else |
| { |
| // the latest IPC format has a continuation token at the start |
| Assert.Equal(-1, messageLength); |
| Assert.Equal(-1, endOfBuffer1); |
| } |
| |
| Assert.Equal(0, endOfBuffer2); |
| } |
| } |
| |
| [Theory] |
| [InlineData(true, true)] |
| [InlineData(true, false)] |
| [InlineData(false, true)] |
| [InlineData(false, false)] |
| public async Task WriteLegacyIpcFormatAsync(bool writeLegacyIpcFormat, bool createDictionaryArray) |
| { |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100, createDictionaryArray: createDictionaryArray); |
| var options = new IpcOptions() { WriteLegacyIpcFormat = writeLegacyIpcFormat }; |
| |
| using (MemoryStream stream = new MemoryStream()) |
| { |
| using (var writer = new ArrowStreamWriter(stream, originalBatch.Schema, leaveOpen: true, options)) |
| { |
| await writer.WriteRecordBatchAsync(originalBatch); |
| await writer.WriteEndAsync(); |
| } |
| |
| stream.Position = 0; |
| |
| // ensure the continuation is written correctly |
| byte[] buffer = stream.ToArray(); |
| int messageLength = BinaryPrimitives.ReadInt32LittleEndian(buffer); |
| int endOfBuffer1 = BinaryPrimitives.ReadInt32LittleEndian(buffer.AsSpan(buffer.Length - 8)); |
| int endOfBuffer2 = BinaryPrimitives.ReadInt32LittleEndian(buffer.AsSpan(buffer.Length - 4)); |
| if (writeLegacyIpcFormat) |
| { |
| // the legacy IPC format doesn't have a continuation token at the start |
| Assert.NotEqual(-1, messageLength); |
| Assert.NotEqual(-1, endOfBuffer1); |
| } |
| else |
| { |
| // the latest IPC format has a continuation token at the start |
| Assert.Equal(-1, messageLength); |
| Assert.Equal(-1, endOfBuffer1); |
| } |
| |
| Assert.Equal(0, endOfBuffer2); |
| } |
| } |
| |
| [Fact] |
| public void WritesMetadataCorrectly() |
| { |
| Schema.Builder schemaBuilder = new Schema.Builder() |
| .Metadata("index", "1, 2, 3, 4, 5") |
| .Metadata("reverseIndex", "5, 4, 3, 2, 1") |
| .Field(f => f |
| .Name("IntCol") |
| .DataType(UInt32Type.Default) |
| .Metadata("custom1", "false") |
| .Metadata("custom2", "true")) |
| .Field(f => f |
| .Name("StringCol") |
| .DataType(StringType.Default) |
| .Metadata("custom2", "false") |
| .Metadata("custom3", "4")) |
| .Field(f => f |
| .Name("StructCol") |
| .DataType(new StructType(new[] { |
| new Field("Inner1", FloatType.Default, nullable: false), |
| new Field("Inner2", DoubleType.Default, nullable: true, new Dictionary<string, string>() { { "customInner", "1" }, { "customInner2", "3" } }) |
| })) |
| .Metadata("custom4", "6.4") |
| .Metadata("custom1", "true")); |
| |
| var schema = schemaBuilder.Build(); |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(schema, length: 10); |
| |
| TestRoundTripRecordBatch(originalBatch); |
| } |
| |
| [Fact] |
| public async Task WriteMultipleDictionaryArraysAsync() |
| { |
| List<RecordBatch> originalRecordBatches = CreateMultipleDictionaryArraysTestData(); |
| await TestRoundTripRecordBatchesAsync(originalRecordBatches); |
| } |
| |
| [Fact] |
| public void WriteMultipleDictionaryArrays() |
| { |
| List<RecordBatch> originalRecordBatches = CreateMultipleDictionaryArraysTestData(); |
| Assert.Equal("RecordBatch: 10 columns by 3 rows", originalRecordBatches[0].ToString()); |
| Assert.Equal("Schema: Num fields=10, Num metadata=0", originalRecordBatches[0].Schema.ToString()); |
| Assert.Equal("Field: Name=dictionaryField_int8, DataType=dictionary, IsNullable=False, Metadata count=0", |
| originalRecordBatches[0].Schema.FieldsLookup["dictionaryField_int8"].Single().ToString()); |
| TestRoundTripRecordBatches(originalRecordBatches); |
| } |
| |
| private List<RecordBatch> CreateMultipleDictionaryArraysTestData() |
| { |
| var dictionaryData = new List<string> { "a", "b", "c" }; |
| int length = dictionaryData.Count; |
| |
| var schemaForSimpleCase = new Schema(new List<Field> { |
| new Field("int8", Int8Type.Default, true), |
| new Field("uint8", UInt8Type.Default, true), |
| new Field("int16", Int16Type.Default, true), |
| new Field("uint16", UInt16Type.Default, true), |
| new Field("int32", Int32Type.Default, true), |
| new Field("uint32", UInt32Type.Default, true), |
| new Field("int64", Int64Type.Default, true), |
| new Field("uint64", UInt64Type.Default, true) |
| }, null); |
| |
| StringArray dictionary = new StringArray.Builder().AppendRange(dictionaryData).Build(); |
| IEnumerable<IArrowArray> indicesArraysForSimpleCase = TestData.CreateArrays(schemaForSimpleCase, length); |
| |
| var fields = new List<Field>(capacity: length + 1); |
| var testTargetArrays = new List<IArrowArray>(capacity: length + 1); |
| |
| foreach (IArrowArray indices in indicesArraysForSimpleCase) |
| { |
| var dictionaryArray = new DictionaryArray( |
| new DictionaryType(indices.Data.DataType, StringType.Default, false), |
| indices, dictionary); |
| testTargetArrays.Add(dictionaryArray); |
| fields.Add(new Field($"dictionaryField_{indices.Data.DataType.Name}", dictionaryArray.Data.DataType, false)); |
| } |
| |
| (Field dictionaryTypeListArrayField, ListArray dictionaryTypeListArray) = CreateDictionaryTypeListArrayTestData(dictionary); |
| |
| fields.Add(dictionaryTypeListArrayField); |
| testTargetArrays.Add(dictionaryTypeListArray); |
| |
| (Field listTypeDictionaryArrayField, DictionaryArray listTypeDictionaryArray) = CreateListTypeDictionaryArrayTestData(dictionaryData); |
| |
| fields.Add(listTypeDictionaryArrayField); |
| testTargetArrays.Add(listTypeDictionaryArray); |
| |
| var schema = new Schema(fields, null); |
| |
| return new List<RecordBatch> { |
| new RecordBatch(schema, testTargetArrays, length), |
| new RecordBatch(schema, testTargetArrays, length), |
| }; |
| } |
| |
| private Tuple<Field, ListArray> CreateDictionaryTypeListArrayTestData(StringArray dictionary) |
| { |
| Int32Array indiceArray = new Int32Array.Builder().AppendRange(Enumerable.Range(0, dictionary.Length)).Build(); |
| |
| //DictionaryArray has no Builder for now, so creating ListArray directly. |
| var dictionaryType = new DictionaryType(Int32Type.Default, StringType.Default, false); |
| var dictionaryArray = new DictionaryArray(dictionaryType, indiceArray, dictionary); |
| |
| var valueOffsetsBufferBuilder = new ArrowBuffer.Builder<int>(); |
| var validityBufferBuilder = new ArrowBuffer.BitmapBuilder(); |
| |
| foreach (int i in Enumerable.Range(0, dictionary.Length + 1)) |
| { |
| valueOffsetsBufferBuilder.Append(i); |
| validityBufferBuilder.Append(true); |
| } |
| |
| var dictionaryField = new Field("dictionaryField_list", dictionaryType, false); |
| var listType = new ListType(dictionaryField); |
| var listArray = new ListArray(listType, valueOffsetsBufferBuilder.Length - 1, valueOffsetsBufferBuilder.Build(), dictionaryArray, valueOffsetsBufferBuilder.Build()); |
| |
| return Tuple.Create(new Field($"listField_{listType.ValueDataType.Name}", listType, false), listArray); |
| } |
| |
| private Tuple<Field, DictionaryArray> CreateListTypeDictionaryArrayTestData(List<string> dictionaryDataBase) |
| { |
| var listBuilder = new ListArray.Builder(StringType.Default); |
| var valueBuilder = listBuilder.ValueBuilder as StringArray.Builder; |
| |
| foreach (string data in dictionaryDataBase) |
| { |
| listBuilder.Append(); |
| valueBuilder.Append(data); |
| } |
| |
| ListArray dictionary = listBuilder.Build(); |
| Int32Array indiceArray = new Int32Array.Builder().AppendRange(Enumerable.Range(0, dictionary.Length)).Build(); |
| var dictionaryArrayType = new DictionaryType(Int32Type.Default, dictionary.Data.DataType, false); |
| var dictionaryArray = new DictionaryArray(dictionaryArrayType, indiceArray, dictionary); |
| |
| return Tuple.Create(new Field($"dictionaryField_{dictionaryArray.Data.DataType.Name}", dictionaryArrayType, false), dictionaryArray); |
| } |
| |
| /// <summary> |
| /// Tests that writing an arrow stream with no RecordBatches produces the correct result. |
| /// </summary> |
| [Fact] |
| public void WritesEmptyFile() |
| { |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 1); |
| |
| var stream = new MemoryStream(); |
| var writer = new ArrowStreamWriter(stream, originalBatch.Schema); |
| |
| writer.WriteStart(); |
| writer.WriteEnd(); |
| |
| stream.Position = 0; |
| |
| var reader = new ArrowStreamReader(stream); |
| RecordBatch readBatch = reader.ReadNextRecordBatch(); |
| Assert.Null(readBatch); |
| SchemaComparer.Compare(originalBatch.Schema, reader.Schema); |
| } |
| |
| /// <summary> |
| /// Tests that writing an arrow stream with no RecordBatches produces the correct |
| /// result when using WriteStartAsync and WriteEndAsync. |
| /// </summary> |
| [Fact] |
| public async Task WritesEmptyFileAsync() |
| { |
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 1); |
| |
| var stream = new MemoryStream(); |
| var writer = new ArrowStreamWriter(stream, originalBatch.Schema); |
| |
| await writer.WriteStartAsync(); |
| await writer.WriteEndAsync(); |
| |
| stream.Position = 0; |
| |
| var reader = new ArrowStreamReader(stream); |
| RecordBatch readBatch = reader.ReadNextRecordBatch(); |
| Assert.Null(readBatch); |
| SchemaComparer.Compare(originalBatch.Schema, reader.Schema); |
| } |
| |
| |
| [Theory] |
| [InlineData(0, 45)] |
| [InlineData(3, 45)] |
| [InlineData(16, 45)] |
| public async Task MemoryOwnerDisposalSlicedArray(int sliceOffset, int sliceLength) |
| { |
| var originalBatch = TestData.CreateSampleRecordBatch(length: 100); |
| var slicedArrays = originalBatch.Arrays |
| .Select(array => ArrowArrayFactory.Slice(array, sliceOffset, sliceLength)) |
| .ToList(); |
| var slicedBatch = new RecordBatch(originalBatch.Schema, slicedArrays, sliceLength); |
| var allocator = new TestMemoryAllocator(); |
| await TestRoundTripRecordBatchesAsync(new List<RecordBatch>() { slicedBatch }, null, false, allocator); |
| if (sliceOffset % 8 != 0) |
| Assert.True(allocator.Statistics.Allocations > 0); |
| Assert.Equal(0, allocator.Rented); |
| } |
| } |
| } |