| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| using System; |
| using System.IO; |
| using System.Linq; |
| using System.Threading.Tasks; |
| using Apache.Arrow.Ipc; |
| using Apache.Arrow.Memory; |
| using Apache.Arrow.Tests; |
| using Apache.Arrow.Types; |
| using BenchmarkDotNet.Attributes; |
| |
| namespace Apache.Arrow.Benchmarks |
| { |
| //[EtwProfiler] - needs elevated privileges |
| [MemoryDiagnoser] |
| public class ArrowReaderBenchmark |
| { |
| [Params(10_000, 1_000_000)] |
| public int Count { get; set; } |
| |
| private MemoryStream _memoryStream; |
| private static readonly MemoryAllocator s_allocator = new TestMemoryAllocator(); |
| |
| [GlobalSetup] |
| public async Task GlobalSetup() |
| { |
| RecordBatch batch = TestData.CreateSampleRecordBatch(length: Count, createDictionaryArray: false); |
| _memoryStream = new MemoryStream(); |
| |
| ArrowStreamWriter writer = new ArrowStreamWriter(_memoryStream, batch.Schema); |
| await writer.WriteRecordBatchAsync(batch); |
| } |
| |
| [IterationSetup] |
| public void Setup() |
| { |
| _memoryStream.Position = 0; |
| } |
| |
| [Benchmark] |
| public async Task<double> ArrowReaderWithMemoryStream() |
| { |
| double sum = 0; |
| var reader = new ArrowStreamReader(_memoryStream); |
| RecordBatch recordBatch; |
| while ((recordBatch = await reader.ReadNextRecordBatchAsync()) != null) |
| { |
| using (recordBatch) |
| { |
| sum += SumAllNumbers(recordBatch); |
| } |
| } |
| return sum; |
| } |
| |
| [Benchmark] |
| public async Task<double> ArrowReaderWithMemoryStream_ManagedMemory() |
| { |
| double sum = 0; |
| var reader = new ArrowStreamReader(_memoryStream, s_allocator); |
| RecordBatch recordBatch; |
| while ((recordBatch = await reader.ReadNextRecordBatchAsync()) != null) |
| { |
| using (recordBatch) |
| { |
| sum += SumAllNumbers(recordBatch); |
| } |
| } |
| return sum; |
| } |
| |
| [Benchmark] |
| public async Task<double> ArrowReaderWithMemory() |
| { |
| double sum = 0; |
| var reader = new ArrowStreamReader(_memoryStream.GetBuffer()); |
| RecordBatch recordBatch; |
| while ((recordBatch = await reader.ReadNextRecordBatchAsync()) != null) |
| { |
| using (recordBatch) |
| { |
| sum += SumAllNumbers(recordBatch); |
| } |
| } |
| return sum; |
| } |
| |
| private static double SumAllNumbers(RecordBatch recordBatch) |
| { |
| double sum = 0; |
| |
| for (int k = 0; k < recordBatch.ColumnCount; k++) |
| { |
| var array = recordBatch.Arrays.ElementAt(k); |
| switch (recordBatch.Schema.GetFieldByIndex(k).DataType.TypeId) |
| { |
| case ArrowTypeId.Int64: |
| Int64Array int64Array = (Int64Array)array; |
| sum += Sum(int64Array); |
| break; |
| case ArrowTypeId.Double: |
| DoubleArray doubleArray = (DoubleArray)array; |
| sum += Sum(doubleArray); |
| break; |
| case ArrowTypeId.Decimal128: |
| Decimal128Array decimalArray = (Decimal128Array)array; |
| sum += Sum(decimalArray); |
| break; |
| } |
| } |
| return sum; |
| } |
| |
| private static double Sum(DoubleArray doubleArray) |
| { |
| double sum = 0; |
| ReadOnlySpan<double> values = doubleArray.Values; |
| for (int valueIndex = 0; valueIndex < values.Length; valueIndex++) |
| { |
| sum += values[valueIndex]; |
| } |
| return sum; |
| } |
| |
| private static long Sum(Int64Array int64Array) |
| { |
| long sum = 0; |
| ReadOnlySpan<long> values = int64Array.Values; |
| for (int valueIndex = 0; valueIndex < values.Length; valueIndex++) |
| { |
| sum += values[valueIndex]; |
| } |
| return sum; |
| } |
| |
| private static double Sum(Decimal128Array decimal128Array) |
| { |
| double sum = 0; |
| for (int valueIndex = 0; valueIndex < decimal128Array.Length; valueIndex++) |
| { |
| sum += (double)decimal128Array.GetValue(valueIndex); |
| } |
| return sum; |
| } |
| } |
| } |