// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <algorithm>
#include <ciso646>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/discovery.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/scanner_internal.h"
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/test_util.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/io_util.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
namespace dataset {
const std::shared_ptr<Schema> kBoringSchema = schema({
field("bool", boolean()),
field("i8", int8()),
field("i32", int32()),
field("i32_req", int32(), /*nullable=*/false),
field("u32", uint32()),
field("i64", int64()),
field("f32", float32()),
field("f32_req", float32(), /*nullable=*/false),
field("f64", float64()),
field("date64", date64()),
field("str", utf8()),
field("dict_str", dictionary(int32(), utf8())),
field("dict_i32", dictionary(int32(), int32())),
field("ts_ns", timestamp(TimeUnit::NANO)),
using fs::internal::GetAbstractPathExtension;
using internal::checked_cast;
using internal::checked_pointer_cast;
using internal::TemporaryDir;
class FileSourceFixtureMixin : public ::testing::Test {
std::unique_ptr<FileSource> GetSource(std::shared_ptr<Buffer> buffer) {
return internal::make_unique<FileSource>(std::move(buffer));
template <typename Gen>
class GeneratedRecordBatch : public RecordBatchReader {
GeneratedRecordBatch(std::shared_ptr<Schema> schema, Gen gen)
: schema_(std::move(schema)), gen_(gen) {}
std::shared_ptr<Schema> schema() const override { return schema_; }
Status ReadNext(std::shared_ptr<RecordBatch>* batch) override { return gen_(batch); }
std::shared_ptr<Schema> schema_;
Gen gen_;
template <typename Gen>
std::unique_ptr<GeneratedRecordBatch<Gen>> MakeGeneratedRecordBatch(
std::shared_ptr<Schema> schema, Gen&& gen) {
return internal::make_unique<GeneratedRecordBatch<Gen>>(schema, std::forward<Gen>(gen));
std::unique_ptr<RecordBatchReader> MakeGeneratedRecordBatch(
std::shared_ptr<Schema> schema, int64_t batch_size, int64_t batch_repetitions) {
auto batch = random::GenerateBatch(schema->fields(), batch_size, /*seed=*/0);
int64_t i = 0;
return MakeGeneratedRecordBatch(
schema, [batch, i, batch_repetitions](std::shared_ptr<RecordBatch>* out) mutable {
*out = i++ < batch_repetitions ? batch : nullptr;
return Status::OK();
void EnsureRecordBatchReaderDrained(RecordBatchReader* reader) {
ASSERT_OK_AND_ASSIGN(auto batch, reader->Next());
EXPECT_EQ(batch, nullptr);
class DatasetFixtureMixin : public ::testing::Test {
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by the data fragment.
void AssertScanTaskEquals(RecordBatchReader* expected, ScanTask* task,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto it, task->Execute());
ARROW_EXPECT_OK(it.Visit([expected](std::shared_ptr<RecordBatch> rhs) -> Status {
std::shared_ptr<RecordBatch> lhs;
EXPECT_NE(lhs, nullptr);
AssertBatchesEqual(*lhs, *rhs);
return Status::OK();
if (ensure_drained) {
/// \brief Assert the value of the next batch yielded by the reader
void AssertBatchEquals(RecordBatchReader* expected, const RecordBatch& batch) {
std::shared_ptr<RecordBatch> lhs;
EXPECT_NE(lhs, nullptr);
AssertBatchesEqual(*lhs, batch);
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by the data fragment.
void AssertFragmentEquals(RecordBatchReader* expected, Fragment* fragment,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(options_));
ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr<ScanTask> task) -> Status {
AssertScanTaskEquals(expected, task.get(), false);
return Status::OK();
if (ensure_drained) {
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by the data fragments of a dataset.
void AssertDatasetFragmentsEqual(RecordBatchReader* expected, Dataset* dataset,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto predicate, options_->filter.Bind(*dataset->schema()));
ASSERT_OK_AND_ASSIGN(auto it, dataset->GetFragments(predicate));
ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr<Fragment> fragment) -> Status {
AssertFragmentEquals(expected, fragment.get(), false);
return Status::OK();
if (ensure_drained) {
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by a scanner.
void AssertScannerEquals(RecordBatchReader* expected, Scanner* scanner,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto it, scanner->Scan());
ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr<ScanTask> task) -> Status {
AssertScanTaskEquals(expected, task.get(), false);
return Status::OK();
if (ensure_drained) {
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by a scanner.
void AssertScanBatchesEquals(RecordBatchReader* expected, Scanner* scanner,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto it, scanner->ScanBatches());
ARROW_EXPECT_OK(it.Visit([&](TaggedRecordBatch batch) -> Status {
AssertBatchEquals(expected, *batch.record_batch);
return Status::OK();
if (ensure_drained) {
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by a scanner. Each fragment in the scanner is
/// expected to have a single batch.
void AssertScanBatchesUnorderedEquals(RecordBatchReader* expected, Scanner* scanner,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto it, scanner->ScanBatchesUnordered());
int fragment_counter = 0;
bool saw_last_fragment = false;
ARROW_EXPECT_OK(it.Visit([&](EnumeratedRecordBatch batch) -> Status {
EXPECT_EQ(0, batch.record_batch.index);
EXPECT_EQ(true, batch.record_batch.last);
EXPECT_EQ(fragment_counter++, batch.fragment.index);
saw_last_fragment = batch.fragment.last;
AssertBatchEquals(expected, *batch.record_batch.value);
return Status::OK();
if (ensure_drained) {
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by a dataset.
void AssertDatasetEquals(RecordBatchReader* expected, Dataset* dataset,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, builder->Finish());
AssertScannerEquals(expected, scanner.get());
if (ensure_drained) {
void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
schema_ = schema(std::move(fields));
options_ = std::make_shared<ScanOptions>();
options_->dataset_schema = schema_;
ASSERT_OK(SetProjection(options_.get(), schema_->field_names()));
void SetFilter(Expression filter) {
ASSERT_OK_AND_ASSIGN(options_->filter, filter.Bind(*schema_));
std::shared_ptr<Schema> schema_;
std::shared_ptr<ScanOptions> options_;
/// \brief A dummy FileFormat implementation
class DummyFileFormat : public FileFormat {
explicit DummyFileFormat(std::shared_ptr<Schema> schema = NULLPTR)
: schema_(std::move(schema)) {}
std::string type_name() const override { return "dummy"; }
bool Equals(const FileFormat& other) const override {
return type_name() == other.type_name() &&
schema_->Equals(checked_cast<const DummyFileFormat&>(other).schema_);
Result<bool> IsSupported(const FileSource& source) const override { return true; }
Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override {
return schema_;
/// \brief Open a file for scanning (always returns an empty iterator)
Result<ScanTaskIterator> ScanFile(
std::shared_ptr<ScanOptions> options,
const std::shared_ptr<FileFragment>& fragment) const override {
return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const override {
return Status::NotImplemented("writing fragment of DummyFileFormat");
std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override { return nullptr; }
std::shared_ptr<Schema> schema_;
class JSONRecordBatchFileFormat : public FileFormat {
using SchemaResolver = std::function<std::shared_ptr<Schema>(const FileSource&)>;
explicit JSONRecordBatchFileFormat(std::shared_ptr<Schema> schema)
: resolver_([schema](const FileSource&) { return schema; }) {}
explicit JSONRecordBatchFileFormat(SchemaResolver resolver)
: resolver_(std::move(resolver)) {}
bool Equals(const FileFormat& other) const override { return this == &other; }
std::string type_name() const override { return "json_record_batch"; }
/// \brief Return true if the given file extension
Result<bool> IsSupported(const FileSource& source) const override { return true; }
Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override {
return resolver_(source);
/// \brief Open a file for scanning
Result<ScanTaskIterator> ScanFile(
std::shared_ptr<ScanOptions> options,
const std::shared_ptr<FileFragment>& fragment) const override {
ARROW_ASSIGN_OR_RAISE(auto file, fragment->source().Open());
ARROW_ASSIGN_OR_RAISE(int64_t size, file->GetSize());
ARROW_ASSIGN_OR_RAISE(auto buffer, file->Read(size));
util::string_view view{*buffer};
ARROW_ASSIGN_OR_RAISE(auto schema, Inspect(fragment->source()));
std::shared_ptr<RecordBatch> batch = RecordBatchFromJSON(schema, view);
return ScanTaskIteratorFromRecordBatch({batch}, std::move(options));
Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const override {
return Status::NotImplemented("writing fragment of JSONRecordBatchFileFormat");
std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override { return nullptr; }
SchemaResolver resolver_;
struct MakeFileSystemDatasetMixin {
std::vector<fs::FileInfo> ParsePathList(const std::string& pathlist) {
std::vector<fs::FileInfo> infos;
std::stringstream ss(pathlist);
std::string line;
while (std::getline(ss, line)) {
auto start = line.find_first_not_of(" \n\r\t");
if (start == std::string::npos) {
line.erase(0, start);
if (line.front() == '#') {
if (line.back() == '/') {
return infos;
void MakeFileSystem(const std::vector<fs::FileInfo>& infos) {
ASSERT_OK_AND_ASSIGN(fs_, fs::internal::MockFileSystem::Make(fs::kNoTime, infos));
void MakeFileSystem(const std::vector<std::string>& paths) {
std::vector<fs::FileInfo> infos{paths.size()};
std::transform(paths.cbegin(), paths.cend(), infos.begin(),
[](const std::string& p) { return fs::File(p); });
ASSERT_OK_AND_ASSIGN(fs_, fs::internal::MockFileSystem::Make(fs::kNoTime, infos));
void MakeDataset(const std::vector<fs::FileInfo>& infos,
Expression root_partition = literal(true),
std::vector<Expression> partitions = {},
std::shared_ptr<Schema> s = kBoringSchema) {
auto n_fragments = infos.size();
if (partitions.empty()) {
partitions.resize(n_fragments, literal(true));
auto format = std::make_shared<DummyFileFormat>(s);
std::vector<std::shared_ptr<FileFragment>> fragments;
for (size_t i = 0; i < n_fragments; i++) {
const auto& info = infos[i];
if (!info.IsFile()) {
ASSERT_OK_AND_ASSIGN(partitions[i], partitions[i].Bind(*s));
ASSERT_OK_AND_ASSIGN(auto fragment,
format->MakeFragment({info, fs_}, partitions[i]));
ASSERT_OK_AND_ASSIGN(root_partition, root_partition.Bind(*s));
ASSERT_OK_AND_ASSIGN(dataset_, FileSystemDataset::Make(s, root_partition, format, fs_,
std::shared_ptr<fs::FileSystem> fs_;
std::shared_ptr<Dataset> dataset_;
std::shared_ptr<ScanOptions> options_;
static const std::string& PathOf(const std::shared_ptr<Fragment>& fragment) {
EXPECT_NE(fragment, nullptr);
EXPECT_THAT(fragment->type_name(), "dummy");
return internal::checked_cast<const FileFragment&>(*fragment).source().path();
class TestFileSystemDataset : public ::testing::Test,
public MakeFileSystemDatasetMixin {};
static std::vector<std::string> PathsOf(const FragmentVector& fragments) {
std::vector<std::string> paths(fragments.size());
std::transform(fragments.begin(), fragments.end(), paths.begin(), PathOf);
return paths;
void AssertFilesAre(const std::shared_ptr<Dataset>& dataset,
std::vector<std::string> expected) {
auto fs_dataset = internal::checked_cast<FileSystemDataset*>(dataset.get());
EXPECT_THAT(fs_dataset->files(), testing::UnorderedElementsAreArray(expected));
void AssertFragmentsAreFromPath(FragmentIterator it, std::vector<std::string> expected) {
// Ordering is not guaranteed.
static std::vector<Expression> PartitionExpressionsOf(const FragmentVector& fragments) {
std::vector<Expression> partition_expressions;
std::transform(fragments.begin(), fragments.end(),
[](const std::shared_ptr<Fragment>& fragment) {
return fragment->partition_expression();
return partition_expressions;
void AssertFragmentsHavePartitionExpressions(std::shared_ptr<Dataset> dataset,
std::vector<Expression> expected) {
ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset->GetFragments());
for (auto& expr : expected) {
ASSERT_OK_AND_ASSIGN(expr, expr.Bind(*dataset->schema()));
// Ordering is not guaranteed.
struct ArithmeticDatasetFixture {
static std::shared_ptr<Schema> schema() {
return ::arrow::schema({
field("i64", int64()),
// ARROW-1644: Parquet can't write complex level
// field("struct", struct_({
// // ARROW-2587: Parquet can't write struct with more
// // than one field.
// // field("i32", int32()),
// field("str", utf8()),
// })),
field("u8", uint8()),
field("list", list(int32())),
field("bool", boolean()),
/// \brief Creates a single JSON record templated with n as follow.
/// {"i64": n, "struct": {"i32": n, "str": "n"}, "u8": n "list": [n,n], "bool": n %
/// 2},
static std::string JSONRecordFor(int64_t n) {
std::stringstream ss;
auto n_i32 = static_cast<int32_t>(n);
ss << "{";
ss << "\"i64\": " << n << ", ";
// ss << "\"struct\": {";
// {
// // ss << "\"i32\": " << n_i32 << ", ";
// ss << "\"str\": \"" << std::to_string(n) << "\"";
// }
// ss << "}, ";
ss << "\"u8\": " << static_cast<int32_t>(n) << ", ";
ss << "\"list\": [" << n_i32 << ", " << n_i32 << "], ";
ss << "\"bool\": " << (static_cast<bool>(n % 2) ? "true" : "false");
ss << "}";
return ss.str();
/// \brief Creates a JSON RecordBatch
static std::string JSONRecordBatch(int64_t n) {
DCHECK_GT(n, 0);
auto record = JSONRecordFor(n);
std::stringstream ss;
ss << "[\n";
for (int64_t i = 1; i <= n; i++) {
if (i != 1) {
ss << "\n,";
ss << record;
ss << "]\n";
return ss.str();
static std::shared_ptr<RecordBatch> GetRecordBatch(int64_t n) {
return RecordBatchFromJSON(ArithmeticDatasetFixture::schema(), JSONRecordBatch(n));
static std::unique_ptr<RecordBatchReader> GetRecordBatchReader(int64_t n) {
DCHECK_GT(n, 0);
// Functor which generates `n` RecordBatch
struct {
Status operator()(std::shared_ptr<RecordBatch>* out) {
*out = i++ < count ? GetRecordBatch(i) : nullptr;
return Status::OK();
int64_t i;
int64_t count;
} generator{0, n};
return MakeGeneratedRecordBatch(schema(), std::move(generator));
class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
using PathAndContent = std::unordered_map<std::string, std::string>;
void MakeSourceDataset() {
PathAndContent source_files;
source_files["/dataset/year=2018/month=01/dat0.json"] = R"([
{"region": "NY", "model": "3", "sales": 742.0, "country": "US"},
{"region": "NY", "model": "S", "sales": 304.125, "country": "US"},
{"region": "NY", "model": "Y", "sales": 27.5, "country": "US"}
source_files["/dataset/year=2018/month=01/dat1.json"] = R"([
{"region": "QC", "model": "3", "sales": 512, "country": "CA"},
{"region": "QC", "model": "S", "sales": 978, "country": "CA"},
{"region": "NY", "model": "X", "sales": 136.25, "country": "US"},
{"region": "QC", "model": "X", "sales": 1.0, "country": "CA"},
{"region": "QC", "model": "Y", "sales": 69, "country": "CA"}
source_files["/dataset/year=2019/month=01/dat0.json"] = R"([
{"region": "CA", "model": "3", "sales": 273.5, "country": "US"},
{"region": "CA", "model": "S", "sales": 13, "country": "US"},
{"region": "CA", "model": "X", "sales": 54, "country": "US"},
{"region": "QC", "model": "S", "sales": 10, "country": "CA"},
{"region": "CA", "model": "Y", "sales": 21, "country": "US"}
source_files["/dataset/year=2019/month=01/dat1.json"] = R"([
{"region": "QC", "model": "3", "sales": 152.25, "country": "CA"},
{"region": "QC", "model": "X", "sales": 42, "country": "CA"},
{"region": "QC", "model": "Y", "sales": 37, "country": "CA"}
source_files["/dataset/.pesky"] = "garbage content";
auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
for (const auto& f : source_files) {
ARROW_EXPECT_OK(mock_fs->CreateFile(f.first, f.second, /* recursive */ true));
fs_ = mock_fs;
/// schema for the whole dataset (both source and destination)
source_schema_ = schema({
field("region", utf8()),
field("model", utf8()),
field("sales", float64()),
field("year", int32()),
field("month", int32()),
field("country", utf8()),
/// Dummy file format for source dataset. Note that it isn't partitioned on country
auto source_format = std::make_shared<JSONRecordBatchFileFormat>(
SchemaFromColumnNames(source_schema_, {"region", "model", "sales", "country"}));
fs::FileSelector s;
s.base_dir = "/dataset";
s.recursive = true;
FileSystemFactoryOptions options;
options.selector_ignore_prefixes = {"."};
options.partitioning = std::make_shared<HivePartitioning>(
SchemaFromColumnNames(source_schema_, {"year", "month"}));
ASSERT_OK_AND_ASSIGN(auto factory,
FileSystemDatasetFactory::Make(fs_, s, source_format, options));
ASSERT_OK_AND_ASSIGN(dataset_, factory->Finish());
scan_options_ = std::make_shared<ScanOptions>();
scan_options_->dataset_schema = source_schema_;
ASSERT_OK(SetProjection(scan_options_.get(), source_schema_->field_names()));
void SetWriteOptions(std::shared_ptr<FileWriteOptions> file_write_options) {
write_options_.file_write_options = file_write_options;
write_options_.filesystem = fs_;
write_options_.base_dir = "new_root/";
write_options_.basename_template = "dat_{i}";
void DoWrite(std::shared_ptr<Partitioning> desired_partitioning) {
write_options_.partitioning = desired_partitioning;
auto scanner_builder = ScannerBuilder(dataset_, scan_options_);
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder.Finish());
ASSERT_OK(FileSystemDataset::Write(write_options_, scanner));
// re-discover the written dataset
fs::FileSelector s;
s.recursive = true;
s.base_dir = "/new_root";
FileSystemFactoryOptions factory_options;
factory_options.partitioning = desired_partitioning;
auto factory, FileSystemDatasetFactory::Make(fs_, s, format_, factory_options));
ASSERT_OK_AND_ASSIGN(written_, factory->Finish());
void TestWriteWithIdenticalPartitioningSchema() {
SchemaFromColumnNames(source_schema_, {"year", "month"})));
expected_files_["/new_root/2018/1/dat_0"] = R"([
{"region": "NY", "model": "3", "sales": 742.0, "country": "US"},
{"region": "NY", "model": "S", "sales": 304.125, "country": "US"},
{"region": "NY", "model": "Y", "sales": 27.5, "country": "US"},
{"region": "QC", "model": "3", "sales": 512, "country": "CA"},
{"region": "QC", "model": "S", "sales": 978, "country": "CA"},
{"region": "NY", "model": "X", "sales": 136.25, "country": "US"},
{"region": "QC", "model": "X", "sales": 1.0, "country": "CA"},
{"region": "QC", "model": "Y", "sales": 69, "country": "CA"}
expected_files_["/new_root/2019/1/dat_1"] = R"([
{"region": "CA", "model": "3", "sales": 273.5, "country": "US"},
{"region": "CA", "model": "S", "sales": 13, "country": "US"},
{"region": "CA", "model": "X", "sales": 54, "country": "US"},
{"region": "QC", "model": "S", "sales": 10, "country": "CA"},
{"region": "CA", "model": "Y", "sales": 21, "country": "US"},
{"region": "QC", "model": "3", "sales": 152.25, "country": "CA"},
{"region": "QC", "model": "X", "sales": 42, "country": "CA"},
{"region": "QC", "model": "Y", "sales": 37, "country": "CA"}
expected_physical_schema_ =
SchemaFromColumnNames(source_schema_, {"region", "model", "sales", "country"});
void TestWriteWithUnrelatedPartitioningSchema() {
SchemaFromColumnNames(source_schema_, {"country", "region"})));
// XXX first thing a user will be annoyed by: we don't support left
// padding the month field with 0.
expected_files_["/new_root/US/NY/dat_0"] = R"([
{"year": 2018, "month": 1, "model": "3", "sales": 742.0},
{"year": 2018, "month": 1, "model": "S", "sales": 304.125},
{"year": 2018, "month": 1, "model": "Y", "sales": 27.5},
{"year": 2018, "month": 1, "model": "X", "sales": 136.25}
expected_files_["/new_root/CA/QC/dat_1"] = R"([
{"year": 2018, "month": 1, "model": "3", "sales": 512},
{"year": 2018, "month": 1, "model": "S", "sales": 978},
{"year": 2018, "month": 1, "model": "X", "sales": 1.0},
{"year": 2018, "month": 1, "model": "Y", "sales": 69},
{"year": 2019, "month": 1, "model": "S", "sales": 10},
{"year": 2019, "month": 1, "model": "3", "sales": 152.25},
{"year": 2019, "month": 1, "model": "X", "sales": 42},
{"year": 2019, "month": 1, "model": "Y", "sales": 37}
expected_files_["/new_root/US/CA/dat_2"] = R"([
{"year": 2019, "month": 1, "model": "3", "sales": 273.5},
{"year": 2019, "month": 1, "model": "S", "sales": 13},
{"year": 2019, "month": 1, "model": "X", "sales": 54},
{"year": 2019, "month": 1, "model": "Y", "sales": 21}
expected_physical_schema_ =
SchemaFromColumnNames(source_schema_, {"model", "sales", "year", "month"});
void TestWriteWithSupersetPartitioningSchema() {
SchemaFromColumnNames(source_schema_, {"year", "month", "country", "region"})));
// XXX first thing a user will be annoyed by: we don't support left
// padding the month field with 0.
expected_files_["/new_root/2018/1/US/NY/dat_0"] = R"([
{"model": "3", "sales": 742.0},
{"model": "S", "sales": 304.125},
{"model": "Y", "sales": 27.5},
{"model": "X", "sales": 136.25}
expected_files_["/new_root/2018/1/CA/QC/dat_1"] = R"([
{"model": "3", "sales": 512},
{"model": "S", "sales": 978},
{"model": "X", "sales": 1.0},
{"model": "Y", "sales": 69}
expected_files_["/new_root/2019/1/US/CA/dat_2"] = R"([
{"model": "3", "sales": 273.5},
{"model": "S", "sales": 13},
{"model": "X", "sales": 54},
{"model": "Y", "sales": 21}
expected_files_["/new_root/2019/1/CA/QC/dat_3"] = R"([
{"model": "S", "sales": 10},
{"model": "3", "sales": 152.25},
{"model": "X", "sales": 42},
{"model": "Y", "sales": 37}
expected_physical_schema_ = SchemaFromColumnNames(source_schema_, {"model", "sales"});
void TestWriteWithEmptyPartitioningSchema() {
SchemaFromColumnNames(source_schema_, {})));
expected_files_["/new_root/dat_0"] = R"([
{"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "3", "sales": 742.0},
{"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "S", "sales": 304.125},
{"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "Y", "sales": 27.5},
{"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "3", "sales": 512},
{"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "S", "sales": 978},
{"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "X", "sales": 136.25},
{"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "X", "sales": 1.0},
{"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "Y", "sales": 69},
{"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "3", "sales": 273.5},
{"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "S", "sales": 13},
{"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "X", "sales": 54},
{"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "S", "sales": 10},
{"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "Y", "sales": 21},
{"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "3", "sales": 152.25},
{"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "X", "sales": 42},
{"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "Y", "sales": 37}
expected_physical_schema_ = source_schema_;
void AssertWrittenAsExpected() {
std::unordered_set<std::string> expected_paths, actual_paths;
for (const auto& file_contents : expected_files_) {
for (auto path : checked_pointer_cast<FileSystemDataset>(written_)->files()) {
EXPECT_THAT(actual_paths, testing::UnorderedElementsAreArray(expected_paths));
ASSERT_OK_AND_ASSIGN(auto written_fragments_it, written_->GetFragments());
for (auto maybe_fragment : written_fragments_it) {
ASSERT_OK_AND_ASSIGN(auto fragment, maybe_fragment);
ASSERT_OK_AND_ASSIGN(auto actual_physical_schema, fragment->ReadPhysicalSchema());
AssertSchemaEqual(*expected_physical_schema_, *actual_physical_schema,
const auto& path = checked_pointer_cast<FileFragment>(fragment)->source().path();
auto file_contents = expected_files_.find(path);
if (file_contents == expected_files_.end()) {
// file wasn't expected to be written at all; nothing to compare with
ASSERT_OK_AND_ASSIGN(auto scanner, ScannerBuilder(actual_physical_schema, fragment,
ASSERT_OK_AND_ASSIGN(auto actual_table, scanner->ToTable());
ASSERT_OK_AND_ASSIGN(actual_table, actual_table->CombineChunks());
std::shared_ptr<Array> actual_struct;
for (auto maybe_batch :
IteratorFromReader(std::make_shared<TableBatchReader>(*actual_table))) {
ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
ASSERT_OK_AND_ASSIGN(actual_struct, batch->ToStructArray());
auto expected_struct = ArrayFromJSON(struct_(expected_physical_schema_->fields()),
AssertArraysEqual(*expected_struct, *actual_struct, /*verbose=*/true);
bool check_metadata_ = true;
std::shared_ptr<Schema> source_schema_;
std::shared_ptr<FileFormat> format_;
PathAndContent expected_files_;
std::shared_ptr<Schema> expected_physical_schema_;
std::shared_ptr<Dataset> written_;
FileSystemDatasetWriteOptions write_options_;
std::shared_ptr<ScanOptions> scan_options_;
} // namespace dataset
} // namespace arrow