blob: 3760d940b01189d8d941a84dc21e16ba96f30431 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Apache.Arrow.Ipc;
using Apache.Arrow.Memory;
using Apache.Arrow.Tests;
using Apache.Arrow.Types;
using BenchmarkDotNet.Attributes;
namespace Apache.Arrow.Benchmarks
{
//[EtwProfiler] - needs elevated privileges
[MemoryDiagnoser]
public class ArrowReaderBenchmark
{
[Params(10_000, 1_000_000)]
public int Count { get; set; }
private MemoryStream _memoryStream;
private static readonly MemoryAllocator s_allocator = new TestMemoryAllocator();
[GlobalSetup]
public async Task GlobalSetup()
{
RecordBatch batch = TestData.CreateSampleRecordBatch(length: Count, createDictionaryArray: false);
_memoryStream = new MemoryStream();
ArrowStreamWriter writer = new ArrowStreamWriter(_memoryStream, batch.Schema);
await writer.WriteRecordBatchAsync(batch);
}
[IterationSetup]
public void Setup()
{
_memoryStream.Position = 0;
}
[Benchmark]
public async Task<double> ArrowReaderWithMemoryStream()
{
double sum = 0;
var reader = new ArrowStreamReader(_memoryStream);
RecordBatch recordBatch;
while ((recordBatch = await reader.ReadNextRecordBatchAsync()) != null)
{
using (recordBatch)
{
sum += SumAllNumbers(recordBatch);
}
}
return sum;
}
[Benchmark]
public async Task<double> ArrowReaderWithMemoryStream_ManagedMemory()
{
double sum = 0;
var reader = new ArrowStreamReader(_memoryStream, s_allocator);
RecordBatch recordBatch;
while ((recordBatch = await reader.ReadNextRecordBatchAsync()) != null)
{
using (recordBatch)
{
sum += SumAllNumbers(recordBatch);
}
}
return sum;
}
[Benchmark]
public async Task<double> ArrowReaderWithMemory()
{
double sum = 0;
var reader = new ArrowStreamReader(_memoryStream.GetBuffer());
RecordBatch recordBatch;
while ((recordBatch = await reader.ReadNextRecordBatchAsync()) != null)
{
using (recordBatch)
{
sum += SumAllNumbers(recordBatch);
}
}
return sum;
}
private static double SumAllNumbers(RecordBatch recordBatch)
{
double sum = 0;
for (int k = 0; k < recordBatch.ColumnCount; k++)
{
var array = recordBatch.Arrays.ElementAt(k);
switch (recordBatch.Schema.GetFieldByIndex(k).DataType.TypeId)
{
case ArrowTypeId.Int64:
Int64Array int64Array = (Int64Array)array;
sum += Sum(int64Array);
break;
case ArrowTypeId.Double:
DoubleArray doubleArray = (DoubleArray)array;
sum += Sum(doubleArray);
break;
case ArrowTypeId.Decimal128:
Decimal128Array decimalArray = (Decimal128Array)array;
sum += Sum(decimalArray);
break;
}
}
return sum;
}
private static double Sum(DoubleArray doubleArray)
{
double sum = 0;
ReadOnlySpan<double> values = doubleArray.Values;
for (int valueIndex = 0; valueIndex < values.Length; valueIndex++)
{
sum += values[valueIndex];
}
return sum;
}
private static long Sum(Int64Array int64Array)
{
long sum = 0;
ReadOnlySpan<long> values = int64Array.Values;
for (int valueIndex = 0; valueIndex < values.Length; valueIndex++)
{
sum += values[valueIndex];
}
return sum;
}
private static double Sum(Decimal128Array decimal128Array)
{
double sum = 0;
for (int valueIndex = 0; valueIndex < decimal128Array.Length; valueIndex++)
{
sum += (double)decimal128Array.GetValue(valueIndex);
}
return sum;
}
}
}