blob: 36564a70d29071162da6d71341e3b9d538af3849 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <memory>
#include <sstream>
#include <utility>
#include "benchmark/benchmark.h"
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/HeadBucketRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include "arrow/filesystem/s3_internal.h"
#include "arrow/filesystem/s3_test_util.h"
#include "arrow/filesystem/s3fs.h"
#include "arrow/io/caching.h"
#include "arrow/io/interfaces.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/range.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/writer.h"
#include "parquet/properties.h"
namespace arrow {
namespace fs {
using ::arrow::fs::internal::ConnectRetryStrategy;
using ::arrow::fs::internal::OutcomeToStatus;
using ::arrow::fs::internal::ToAwsString;
// Environment variables to configure the S3 test environment
static const char* kEnvBucketName = "ARROW_TEST_S3_BUCKET";
static const char* kEnvSkipSetup = "ARROW_TEST_S3_SKIP_SETUP";
static const char* kEnvAwsRegion = "ARROW_TEST_S3_REGION";
// Set up Minio and create the test bucket and files.
class MinioFixture : public benchmark::Fixture {
public:
void SetUp(const ::benchmark::State& state) override {
minio_.reset(new MinioTestServer());
ASSERT_OK(minio_->Start());
const char* region_str = std::getenv(kEnvAwsRegion);
if (region_str) {
region_ = region_str;
std::cerr << "Using region from environment: " << region_ << std::endl;
} else {
std::cerr << "Using default region" << std::endl;
}
const char* bucket_str = std::getenv(kEnvBucketName);
if (bucket_str) {
bucket_ = bucket_str;
std::cerr << "Using bucket from environment: " << bucket_ << std::endl;
} else {
bucket_ = "bucket";
std::cerr << "Using default bucket: " << bucket_ << std::endl;
}
client_config_.endpointOverride = ToAwsString(minio_->connect_string());
client_config_.scheme = Aws::Http::Scheme::HTTP;
if (!region_.empty()) {
client_config_.region = ToAwsString(region_);
}
client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
credentials_ = {ToAwsString(minio_->access_key()), ToAwsString(minio_->secret_key())};
bool use_virtual_addressing = false;
client_.reset(
new Aws::S3::S3Client(credentials_, client_config_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
use_virtual_addressing));
MakeFileSystem();
const char* skip_str = std::getenv(kEnvSkipSetup);
const std::string skip = skip_str ? std::string(skip_str) : "";
if (!skip.empty()) {
std::cerr << "Skipping creation of bucket/objects as requested" << std::endl;
} else {
ASSERT_OK(MakeBucket());
ASSERT_OK(MakeObject("bytes_1mib", 1024 * 1024));
ASSERT_OK(MakeObject("bytes_100mib", 100 * 1024 * 1024));
ASSERT_OK(MakeObject("bytes_500mib", 500 * 1024 * 1024));
ASSERT_OK(MakeParquetObject(bucket_ + "/pq_c402_r250k", 400, 250000));
}
}
void MakeFileSystem() {
options_.ConfigureAccessKey(minio_->access_key(), minio_->secret_key());
options_.scheme = "http";
if (!region_.empty()) {
options_.region = region_;
}
options_.endpoint_override = minio_->connect_string();
ASSERT_OK_AND_ASSIGN(fs_, S3FileSystem::Make(options_));
}
/// Set up bucket if it doesn't exist.
///
/// When using Minio we'll have a fresh setup each time, but
/// otherwise we may have a leftover bucket.
Status MakeBucket() {
Aws::S3::Model::HeadBucketRequest head;
head.SetBucket(ToAwsString(bucket_));
const Status st = OutcomeToStatus(client_->HeadBucket(head));
if (st.ok()) {
// Bucket exists already
return st;
}
Aws::S3::Model::CreateBucketRequest req;
req.SetBucket(ToAwsString(bucket_));
return OutcomeToStatus(client_->CreateBucket(req));
}
/// Make an object with dummy data.
Status MakeObject(const std::string& name, int size) {
Aws::S3::Model::PutObjectRequest req;
req.SetBucket(ToAwsString(bucket_));
req.SetKey(ToAwsString(name));
req.SetBody(std::make_shared<std::stringstream>(std::string(size, 'a')));
return OutcomeToStatus(client_->PutObject(req));
}
/// Make an object with Parquet data.
/// Appends integer columns to the beginning (to act as indices).
Status MakeParquetObject(const std::string& path, int num_columns, int num_rows) {
std::vector<std::shared_ptr<ChunkedArray>> columns;
FieldVector fields{
field("timestamp", int64(), /*nullable=*/true,
key_value_metadata(
{{"min", "0"}, {"max", "10000000000"}, {"null_probability", "0"}})),
field("val", int32(), /*nullable=*/true,
key_value_metadata(
{{"min", "0"}, {"max", "1000000000"}, {"null_probability", "0"}}))};
for (int i = 0; i < num_columns; i++) {
std::stringstream ss;
ss << "col" << i;
fields.push_back(
field(ss.str(), float64(), /*nullable=*/true,
key_value_metadata(
{{"min", "-1.e10"}, {"max", "1e10"}, {"null_probability", "0"}})));
}
auto batch = random::GenerateBatch(fields, num_rows, 0);
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> table,
Table::FromRecordBatches({batch}));
std::shared_ptr<io::OutputStream> sink;
ARROW_ASSIGN_OR_RAISE(sink, fs_->OpenOutputStream(path));
RETURN_NOT_OK(
parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), sink, num_rows));
return Status::OK();
}
void TearDown(const ::benchmark::State& state) override {
ASSERT_OK(minio_->Stop());
// Delete temporary directory, freeing up disk space
minio_.reset();
}
protected:
std::unique_ptr<MinioTestServer> minio_;
std::string region_;
std::string bucket_;
Aws::Client::ClientConfiguration client_config_;
Aws::Auth::AWSCredentials credentials_;
std::unique_ptr<Aws::S3::S3Client> client_;
S3Options options_;
std::shared_ptr<S3FileSystem> fs_;
};
/// Set up/tear down the AWS SDK globally.
/// (GBenchmark doesn't run GTest environments.)
class S3BenchmarkEnvironment {
public:
S3BenchmarkEnvironment() { s3_env->SetUp(); }
~S3BenchmarkEnvironment() { s3_env->TearDown(); }
};
S3BenchmarkEnvironment env{};
/// Read the entire file into memory in one go to measure bandwidth.
static void NaiveRead(benchmark::State& st, S3FileSystem* fs, const std::string& path) {
int64_t total_bytes = 0;
int total_items = 0;
for (auto _ : st) {
std::shared_ptr<io::RandomAccessFile> file;
std::shared_ptr<Buffer> buf;
int64_t size;
ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
ASSERT_OK_AND_ASSIGN(size, file->GetSize());
ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(0, size));
total_bytes += buf->size();
total_items += 1;
}
st.SetBytesProcessed(total_bytes);
st.SetItemsProcessed(total_items);
std::cerr << "Read the file " << total_items << " times" << std::endl;
}
constexpr int64_t kChunkSize = 5 * 1024 * 1024;
/// Mimic the Parquet reader, reading the file in small chunks.
static void ChunkedRead(benchmark::State& st, S3FileSystem* fs, const std::string& path) {
int64_t total_bytes = 0;
int total_items = 0;
for (auto _ : st) {
std::shared_ptr<io::RandomAccessFile> file;
std::shared_ptr<Buffer> buf;
int64_t size = 0;
ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
ASSERT_OK_AND_ASSIGN(size, file->GetSize());
total_items += 1;
int64_t offset = 0;
while (offset < size) {
const int64_t read = std::min(size, kChunkSize);
ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(offset, read));
total_bytes += buf->size();
offset += buf->size();
}
}
st.SetBytesProcessed(total_bytes);
st.SetItemsProcessed(total_items);
std::cerr << "Read the file " << total_items << " times" << std::endl;
}
/// Read the file in small chunks, but using read coalescing.
static void CoalescedRead(benchmark::State& st, S3FileSystem* fs,
const std::string& path) {
int64_t total_bytes = 0;
int total_items = 0;
for (auto _ : st) {
std::shared_ptr<io::RandomAccessFile> file;
std::shared_ptr<Buffer> buf;
int64_t size = 0;
ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
ASSERT_OK_AND_ASSIGN(size, file->GetSize());
total_items += 1;
io::internal::ReadRangeCache cache(file, {},
io::CacheOptions{8192, 64 * 1024 * 1024});
std::vector<io::ReadRange> ranges;
int64_t offset = 0;
while (offset < size) {
const int64_t read = std::min(size, kChunkSize);
ranges.push_back(io::ReadRange{offset, read});
offset += read;
}
ASSERT_OK(cache.Cache(ranges));
offset = 0;
while (offset < size) {
const int64_t read = std::min(size, kChunkSize);
ASSERT_OK_AND_ASSIGN(buf, cache.Read({offset, read}));
total_bytes += buf->size();
offset += read;
}
}
st.SetBytesProcessed(total_bytes);
st.SetItemsProcessed(total_items);
std::cerr << "Read the file " << total_items << " times" << std::endl;
}
/// Read a Parquet file from S3.
static void ParquetRead(benchmark::State& st, S3FileSystem* fs, const std::string& path,
std::vector<int> column_indices, bool pre_buffer,
std::string read_strategy) {
int64_t total_bytes = 0;
int total_items = 0;
parquet::ArrowReaderProperties properties;
properties.set_use_threads(true);
properties.set_pre_buffer(pre_buffer);
parquet::ReaderProperties parquet_properties = parquet::default_reader_properties();
for (auto _ : st) {
std::shared_ptr<io::RandomAccessFile> file;
int64_t size = 0;
ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
ASSERT_OK_AND_ASSIGN(size, file->GetSize());
std::unique_ptr<parquet::arrow::FileReader> reader;
parquet::arrow::FileReaderBuilder builder;
ASSERT_OK(builder.Open(file, parquet_properties));
ASSERT_OK(builder.properties(properties)->Build(&reader));
std::shared_ptr<Table> table;
if (read_strategy == "ReadTable") {
ASSERT_OK(reader->ReadTable(column_indices, &table));
} else {
std::shared_ptr<RecordBatchReader> rb_reader;
ASSERT_OK(reader->GetRecordBatchReader({0}, column_indices, &rb_reader));
ASSERT_OK(rb_reader->ReadAll(&table));
}
// TODO: actually measure table memory usage
total_bytes += size;
total_items += 1;
}
st.SetBytesProcessed(total_bytes);
st.SetItemsProcessed(total_items);
}
/// Helper function used in the macros below to template benchmarks.
static void ParquetReadAll(benchmark::State& st, S3FileSystem* fs,
const std::string& bucket, int64_t file_rows,
int64_t file_cols, bool pre_buffer,
std::string read_strategy) {
std::vector<int> column_indices(file_cols);
std::iota(column_indices.begin(), column_indices.end(), 0);
std::stringstream ss;
ss << bucket << "/pq_c" << file_cols << "_r" << file_rows << "k";
ParquetRead(st, fs, ss.str(), column_indices, false, read_strategy);
}
/// Helper function used in the macros below to template benchmarks.
static void ParquetReadSome(benchmark::State& st, S3FileSystem* fs,
const std::string& bucket, int64_t file_rows,
int64_t file_cols, std::vector<int> cols_to_read,
bool pre_buffer, std::string read_strategy) {
std::stringstream ss;
ss << bucket << "/pq_c" << file_cols << "_r" << file_rows << "k";
ParquetRead(st, fs, ss.str(), cols_to_read, false, read_strategy);
}
BENCHMARK_DEFINE_F(MinioFixture, ReadAll1Mib)(benchmark::State& st) {
NaiveRead(st, fs_.get(), bucket_ + "/bytes_1mib");
}
BENCHMARK_REGISTER_F(MinioFixture, ReadAll1Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture, ReadAll100Mib)(benchmark::State& st) {
NaiveRead(st, fs_.get(), bucket_ + "/bytes_100mib");
}
BENCHMARK_REGISTER_F(MinioFixture, ReadAll100Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture, ReadAll500Mib)(benchmark::State& st) {
NaiveRead(st, fs_.get(), bucket_ + "/bytes_500mib");
}
BENCHMARK_REGISTER_F(MinioFixture, ReadAll500Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture, ReadChunked100Mib)(benchmark::State& st) {
ChunkedRead(st, fs_.get(), bucket_ + "/bytes_100mib");
}
BENCHMARK_REGISTER_F(MinioFixture, ReadChunked100Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture, ReadChunked500Mib)(benchmark::State& st) {
ChunkedRead(st, fs_.get(), bucket_ + "/bytes_500mib");
}
BENCHMARK_REGISTER_F(MinioFixture, ReadChunked500Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture, ReadCoalesced100Mib)(benchmark::State& st) {
CoalescedRead(st, fs_.get(), bucket_ + "/bytes_100mib");
}
BENCHMARK_REGISTER_F(MinioFixture, ReadCoalesced100Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture, ReadCoalesced500Mib)(benchmark::State& st) {
CoalescedRead(st, fs_.get(), bucket_ + "/bytes_500mib");
}
BENCHMARK_REGISTER_F(MinioFixture, ReadCoalesced500Mib)->UseRealTime();
// Helpers to generate various multiple benchmarks for a given Parquet file.
// NAME: the base name of the benchmark.
// ROWS: the number of rows in the Parquet file.
// COLS: the number of columns in the Parquet file.
// STRATEGY: how to read the file (ReadTable or GetRecordBatchReader)
#define PQ_BENCHMARK_IMPL(NAME, ROWS, COLS, STRATEGY) \
BENCHMARK_DEFINE_F(MinioFixture, NAME##STRATEGY##AllNaive)(benchmark::State & st) { \
ParquetReadAll(st, fs_.get(), bucket_, ROWS, COLS, false, #STRATEGY); \
} \
BENCHMARK_REGISTER_F(MinioFixture, NAME##STRATEGY##AllNaive)->UseRealTime(); \
BENCHMARK_DEFINE_F(MinioFixture, NAME##STRATEGY##AllCoalesced) \
(benchmark::State & st) { \
ParquetReadAll(st, fs_.get(), bucket_, ROWS, COLS, true, #STRATEGY); \
} \
BENCHMARK_REGISTER_F(MinioFixture, NAME##STRATEGY##AllCoalesced)->UseRealTime();
// COL_INDICES: a vector specifying a subset of column indices to read.
#define PQ_BENCHMARK_PICK_IMPL(NAME, ROWS, COLS, COL_INDICES, STRATEGY) \
BENCHMARK_DEFINE_F(MinioFixture, NAME##STRATEGY##PickNaive)(benchmark::State & st) { \
ParquetReadSome(st, fs_.get(), bucket_, ROWS, COLS, COL_INDICES, false, #STRATEGY); \
} \
BENCHMARK_REGISTER_F(MinioFixture, NAME##STRATEGY##PickNaive)->UseRealTime(); \
BENCHMARK_DEFINE_F(MinioFixture, NAME##STRATEGY##PickCoalesced) \
(benchmark::State & st) { \
ParquetReadSome(st, fs_.get(), bucket_, ROWS, COLS, COL_INDICES, true, #STRATEGY); \
} \
BENCHMARK_REGISTER_F(MinioFixture, NAME##STRATEGY##PickCoalesced)->UseRealTime();
#define PQ_BENCHMARK(ROWS, COLS) \
PQ_BENCHMARK_IMPL(ReadParquet_c##COLS##_r##ROWS##K_, ROWS, COLS, \
GetRecordBatchReader); \
PQ_BENCHMARK_IMPL(ReadParquet_c##COLS##_r##ROWS##K_, ROWS, COLS, ReadTable);
#define PQ_BENCHMARK_PICK(NAME, ROWS, COLS, COL_INDICES) \
PQ_BENCHMARK_PICK_IMPL(ReadParquet_c##COLS##_r##ROWS##K_##NAME##_, ROWS, COLS, \
COL_INDICES, GetRecordBatchReader); \
PQ_BENCHMARK_PICK_IMPL(ReadParquet_c##COLS##_r##ROWS##K_##NAME##_, ROWS, COLS, \
COL_INDICES, ReadTable);
// Test a Parquet file with 250k rows, 402 columns.
PQ_BENCHMARK(250, 402);
// Scenario A: test selecting a small set of contiguous columns, and a "far" column.
PQ_BENCHMARK_PICK(A, 250, 402, (std::vector<int>{0, 1, 2, 3, 4, 90}));
// Scenario B: test selecting a large set of contiguous columns.
PQ_BENCHMARK_PICK(B, 250, 402, (::arrow::internal::Iota(41)));
} // namespace fs
} // namespace arrow