blob: 0e8c9d6687a0234e71ca4ce5e6b79704dd80f559 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using Apache.Arrow.Ipc;
using Apache.Arrow.Memory;
using System;
using System.IO;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace Apache.Arrow.Tests
{
public class ArrowStreamReaderTests
{
[Fact]
public void Ctor_LeaveOpenDefault_StreamClosedOnDispose()
{
var stream = new MemoryStream();
new ArrowStreamReader(stream).Dispose();
Assert.Throws<ObjectDisposedException>(() => stream.Position);
}
[Fact]
public void Ctor_LeaveOpenFalse_StreamClosedOnDispose()
{
var stream = new MemoryStream();
new ArrowStreamReader(stream, leaveOpen: false).Dispose();
Assert.Throws<ObjectDisposedException>(() => stream.Position);
}
[Fact]
public void Ctor_LeaveOpenTrue_StreamValidOnDispose()
{
var stream = new MemoryStream();
new ArrowStreamReader(stream, leaveOpen: true).Dispose();
Assert.Equal(0, stream.Position);
}
[Theory]
[InlineData(true, true, 2)]
[InlineData(true, false, 1)]
[InlineData(false, true, 2)]
[InlineData(false, false, 1)]
public async Task Ctor_MemoryPool_AllocatesFromPool(bool shouldLeaveOpen, bool createDictionaryArray, int expectedAllocations)
{
RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100, createDictionaryArray: createDictionaryArray);
using (MemoryStream stream = new MemoryStream())
{
ArrowStreamWriter writer = new ArrowStreamWriter(stream, originalBatch.Schema);
await writer.WriteRecordBatchAsync(originalBatch);
await writer.WriteEndAsync();
stream.Position = 0;
var memoryPool = new TestMemoryAllocator();
ArrowStreamReader reader = new ArrowStreamReader(stream, memoryPool, shouldLeaveOpen);
reader.ReadNextRecordBatch();
Assert.Equal(expectedAllocations, memoryPool.Statistics.Allocations);
Assert.True(memoryPool.Statistics.BytesAllocated > 0);
reader.Dispose();
if (shouldLeaveOpen)
{
Assert.True(stream.Position > 0);
}
else
{
Assert.Throws<ObjectDisposedException>(() => stream.Position);
}
}
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ReadRecordBatch_Memory(bool writeEnd)
{
await TestReaderFromMemory((reader, originalBatch) =>
{
ArrowReaderVerifier.VerifyReader(reader, originalBatch);
return Task.CompletedTask;
}, writeEnd);
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ReadRecordBatchAsync_Memory(bool writeEnd)
{
await TestReaderFromMemory(ArrowReaderVerifier.VerifyReaderAsync, writeEnd);
}
private static async Task TestReaderFromMemory(
Func<ArrowStreamReader, RecordBatch, Task> verificationFunc,
bool writeEnd)
{
RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
byte[] buffer;
using (MemoryStream stream = new MemoryStream())
{
ArrowStreamWriter writer = new ArrowStreamWriter(stream, originalBatch.Schema);
await writer.WriteRecordBatchAsync(originalBatch);
if (writeEnd)
{
await writer.WriteEndAsync();
}
buffer = stream.GetBuffer();
}
ArrowStreamReader reader = new ArrowStreamReader(buffer);
await verificationFunc(reader, originalBatch);
}
[Theory]
[InlineData(true, true)]
[InlineData(true, false)]
[InlineData(false, true)]
[InlineData(false, false)]
public async Task ReadRecordBatch_Stream(bool writeEnd, bool createDictionaryArray)
{
await TestReaderFromStream((reader, originalBatch) =>
{
ArrowReaderVerifier.VerifyReader(reader, originalBatch);
return Task.CompletedTask;
}, writeEnd, createDictionaryArray);
}
[Theory]
[InlineData(true, true)]
[InlineData(true, false)]
[InlineData(false, true)]
[InlineData(false, false)]
public async Task ReadRecordBatchAsync_Stream(bool writeEnd, bool createDictionaryArray)
{
await TestReaderFromStream(ArrowReaderVerifier.VerifyReaderAsync, writeEnd, createDictionaryArray);
}
private static async Task TestReaderFromStream(
Func<ArrowStreamReader, RecordBatch, Task> verificationFunc,
bool writeEnd, bool createDictionaryArray)
{
RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100, createDictionaryArray: createDictionaryArray);
using (MemoryStream stream = new MemoryStream())
{
ArrowStreamWriter writer = new ArrowStreamWriter(stream, originalBatch.Schema);
await writer.WriteRecordBatchAsync(originalBatch);
if (writeEnd)
{
await writer.WriteEndAsync();
}
stream.Position = 0;
ArrowStreamReader reader = new ArrowStreamReader(stream);
await verificationFunc(reader, originalBatch);
}
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ReadRecordBatch_PartialReadStream(bool createDictionaryArray)
{
await TestReaderFromPartialReadStream((reader, originalBatch) =>
{
ArrowReaderVerifier.VerifyReader(reader, originalBatch);
return Task.CompletedTask;
}, createDictionaryArray);
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ReadRecordBatchAsync_PartialReadStream(bool createDictionaryArray)
{
await TestReaderFromPartialReadStream(ArrowReaderVerifier.VerifyReaderAsync, createDictionaryArray);
}
/// <summary>
/// Verifies that the stream reader reads multiple times when a stream
/// only returns a subset of the data from each Read.
/// </summary>
private static async Task TestReaderFromPartialReadStream(Func<ArrowStreamReader, RecordBatch, Task> verificationFunc, bool createDictionaryArray)
{
RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100, createDictionaryArray: createDictionaryArray);
using (PartialReadStream stream = new PartialReadStream())
{
ArrowStreamWriter writer = new ArrowStreamWriter(stream, originalBatch.Schema);
await writer.WriteRecordBatchAsync(originalBatch);
await writer.WriteEndAsync();
stream.Position = 0;
ArrowStreamReader reader = new ArrowStreamReader(stream);
await verificationFunc(reader, originalBatch);
}
}
/// <summary>
/// A stream class that only returns a part of the data at a time.
/// </summary>
private class PartialReadStream : MemoryStream
{
// by default return 20 bytes at a time
public int PartialReadLength { get; set; } = 20;
public override int Read(Span<byte> destination)
{
if (destination.Length > PartialReadLength)
{
destination = destination.Slice(0, PartialReadLength);
}
return base.Read(destination);
}
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
if (destination.Length > PartialReadLength)
{
destination = destination.Slice(0, PartialReadLength);
}
return base.ReadAsync(destination, cancellationToken);
}
}
}
}