blob: 2f2229ded4c46077c0be3b775be85a0b7406cc18 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using Apache.Arrow.Ipc;
using System;
using System.IO;
using System.Reflection;
using System.Threading.Tasks;
using Xunit;
namespace Apache.Arrow.Tests
{
public class ArrowFileReaderTests
{
[Fact]
public void Ctor_LeaveOpenDefault_StreamClosedOnDispose()
{
var stream = new MemoryStream();
new ArrowFileReader(stream).Dispose();
Assert.Throws<ObjectDisposedException>(() => stream.Position);
}
[Fact]
public void Ctor_LeaveOpenFalse_StreamClosedOnDispose()
{
var stream = new MemoryStream();
new ArrowFileReader(stream, leaveOpen: false).Dispose();
Assert.Throws<ObjectDisposedException>(() => stream.Position);
}
[Fact]
public void Ctor_LeaveOpenTrue_StreamValidOnDispose()
{
var stream = new MemoryStream();
new ArrowFileReader(stream, leaveOpen: true).Dispose();
Assert.Equal(0, stream.Position);
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task Ctor_MemoryPool_AllocatesFromPool(bool shouldLeaveOpen)
{
RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
using (MemoryStream stream = new MemoryStream())
{
ArrowFileWriter writer = new ArrowFileWriter(stream, originalBatch.Schema);
await writer.WriteRecordBatchAsync(originalBatch);
await writer.WriteEndAsync();
stream.Position = 0;
var memoryPool = new TestMemoryAllocator();
ArrowFileReader reader = new ArrowFileReader(stream, memoryPool, leaveOpen: shouldLeaveOpen);
reader.ReadNextRecordBatch();
Assert.Equal(1, memoryPool.Statistics.Allocations);
Assert.True(memoryPool.Statistics.BytesAllocated > 0);
reader.Dispose();
if (shouldLeaveOpen)
{
Assert.True(stream.Position > 0);
}
else
{
Assert.Throws<ObjectDisposedException>(() => stream.Position);
}
}
}
[Fact]
public async Task TestReadNextRecordBatch()
{
await TestReadRecordBatchHelper((reader, originalBatch) =>
{
ArrowReaderVerifier.VerifyReader(reader, originalBatch);
return Task.CompletedTask;
});
}
[Fact]
public async Task TestReadNextRecordBatchAsync()
{
await TestReadRecordBatchHelper(ArrowReaderVerifier.VerifyReaderAsync);
}
[Fact]
public async Task TestReadRecordBatchAsync()
{
await TestReadRecordBatchHelper(async (reader, originalBatch) =>
{
RecordBatch readBatch = await reader.ReadRecordBatchAsync(0);
ArrowReaderVerifier.CompareBatches(originalBatch, readBatch);
// You should be able to read the same record batch again
RecordBatch readBatch2 = await reader.ReadRecordBatchAsync(0);
ArrowReaderVerifier.CompareBatches(originalBatch, readBatch2);
});
}
private static async Task TestReadRecordBatchHelper(
Func<ArrowFileReader, RecordBatch, Task> verificationFunc)
{
RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
using (MemoryStream stream = new MemoryStream())
{
ArrowFileWriter writer = new ArrowFileWriter(stream, originalBatch.Schema);
await writer.WriteRecordBatchAsync(originalBatch);
await writer.WriteEndAsync();
stream.Position = 0;
ArrowFileReader reader = new ArrowFileReader(stream);
await verificationFunc(reader, originalBatch);
}
}
[Fact]
public async Task TestReadMultipleRecordBatchAsync()
{
RecordBatch originalBatch1 = TestData.CreateSampleRecordBatch(length: 100);
RecordBatch originalBatch2 = TestData.CreateSampleRecordBatch(length: 50);
using (MemoryStream stream = new MemoryStream())
{
ArrowFileWriter writer = new ArrowFileWriter(stream, originalBatch1.Schema);
await writer.WriteRecordBatchAsync(originalBatch1);
await writer.WriteRecordBatchAsync(originalBatch2);
await writer.WriteEndAsync();
stream.Position = 0;
ArrowFileReader reader = new ArrowFileReader(stream);
RecordBatch readBatch1 = await reader.ReadRecordBatchAsync(0);
ArrowReaderVerifier.CompareBatches(originalBatch1, readBatch1);
RecordBatch readBatch2 = await reader.ReadRecordBatchAsync(1);
ArrowReaderVerifier.CompareBatches(originalBatch2, readBatch2);
// now read the first again, for random access
RecordBatch readBatch3 = await reader.ReadRecordBatchAsync(0);
ArrowReaderVerifier.CompareBatches(originalBatch1, readBatch3);
}
}
[Fact]
public void TestRecordBatchBasics()
{
RecordBatch recordBatch = TestData.CreateSampleRecordBatch(length: 1);
Assert.Throws<ArgumentOutOfRangeException>(() => new RecordBatch(recordBatch.Schema, recordBatch.Arrays, -1));
var col1 = recordBatch.Column(0);
var col2 = recordBatch.Column("list0");
ArrowReaderVerifier.CompareArrays(col1, col2);
recordBatch.Dispose();
}
}
}