blob: ea4c41e63c5be1b8ba5c988d235311e956cc0dee [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <algorithm>
#include <ciso646>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/discovery.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/scanner_internal.h"
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/test_util.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/io_util.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
namespace dataset {
const std::shared_ptr<Schema> kBoringSchema = schema({
field("bool", boolean()),
field("i8", int8()),
field("i32", int32()),
field("i32_req", int32(), /*nullable=*/false),
field("u32", uint32()),
field("i64", int64()),
field("f32", float32()),
field("f32_req", float32(), /*nullable=*/false),
field("f64", float64()),
field("date64", date64()),
field("str", utf8()),
field("dict_str", dictionary(int32(), utf8())),
field("dict_i32", dictionary(int32(), int32())),
field("ts_ns", timestamp(TimeUnit::NANO)),
});
using fs::internal::GetAbstractPathExtension;
using internal::checked_cast;
using internal::checked_pointer_cast;
using internal::TemporaryDir;
class FileSourceFixtureMixin : public ::testing::Test {
public:
std::unique_ptr<FileSource> GetSource(std::shared_ptr<Buffer> buffer) {
return internal::make_unique<FileSource>(std::move(buffer));
}
};
template <typename Gen>
class GeneratedRecordBatch : public RecordBatchReader {
public:
GeneratedRecordBatch(std::shared_ptr<Schema> schema, Gen gen)
: schema_(std::move(schema)), gen_(gen) {}
std::shared_ptr<Schema> schema() const override { return schema_; }
Status ReadNext(std::shared_ptr<RecordBatch>* batch) override { return gen_(batch); }
private:
std::shared_ptr<Schema> schema_;
Gen gen_;
};
template <typename Gen>
std::unique_ptr<GeneratedRecordBatch<Gen>> MakeGeneratedRecordBatch(
std::shared_ptr<Schema> schema, Gen&& gen) {
return internal::make_unique<GeneratedRecordBatch<Gen>>(schema, std::forward<Gen>(gen));
}
std::unique_ptr<RecordBatchReader> MakeGeneratedRecordBatch(
std::shared_ptr<Schema> schema, int64_t batch_size, int64_t batch_repetitions) {
auto batch = random::GenerateBatch(schema->fields(), batch_size, /*seed=*/0);
int64_t i = 0;
return MakeGeneratedRecordBatch(
schema, [batch, i, batch_repetitions](std::shared_ptr<RecordBatch>* out) mutable {
*out = i++ < batch_repetitions ? batch : nullptr;
return Status::OK();
});
}
void EnsureRecordBatchReaderDrained(RecordBatchReader* reader) {
ASSERT_OK_AND_ASSIGN(auto batch, reader->Next());
EXPECT_EQ(batch, nullptr);
}
class DatasetFixtureMixin : public ::testing::Test {
public:
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by the data fragment.
void AssertScanTaskEquals(RecordBatchReader* expected, ScanTask* task,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto it, task->Execute());
ARROW_EXPECT_OK(it.Visit([expected](std::shared_ptr<RecordBatch> rhs) -> Status {
std::shared_ptr<RecordBatch> lhs;
RETURN_NOT_OK(expected->ReadNext(&lhs));
EXPECT_NE(lhs, nullptr);
AssertBatchesEqual(*lhs, *rhs);
return Status::OK();
}));
if (ensure_drained) {
EnsureRecordBatchReaderDrained(expected);
}
}
/// \brief Assert the value of the next batch yielded by the reader
void AssertBatchEquals(RecordBatchReader* expected, const RecordBatch& batch) {
std::shared_ptr<RecordBatch> lhs;
ASSERT_OK(expected->ReadNext(&lhs));
EXPECT_NE(lhs, nullptr);
AssertBatchesEqual(*lhs, batch);
}
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by the data fragment.
void AssertFragmentEquals(RecordBatchReader* expected, Fragment* fragment,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(options_));
ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr<ScanTask> task) -> Status {
AssertScanTaskEquals(expected, task.get(), false);
return Status::OK();
}));
if (ensure_drained) {
EnsureRecordBatchReaderDrained(expected);
}
}
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by the data fragments of a dataset.
void AssertDatasetFragmentsEqual(RecordBatchReader* expected, Dataset* dataset,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto predicate, options_->filter.Bind(*dataset->schema()));
ASSERT_OK_AND_ASSIGN(auto it, dataset->GetFragments(predicate));
ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr<Fragment> fragment) -> Status {
AssertFragmentEquals(expected, fragment.get(), false);
return Status::OK();
}));
if (ensure_drained) {
EnsureRecordBatchReaderDrained(expected);
}
}
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by a scanner.
void AssertScannerEquals(RecordBatchReader* expected, Scanner* scanner,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto it, scanner->Scan());
ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr<ScanTask> task) -> Status {
AssertScanTaskEquals(expected, task.get(), false);
return Status::OK();
}));
if (ensure_drained) {
EnsureRecordBatchReaderDrained(expected);
}
}
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by a scanner.
void AssertScanBatchesEquals(RecordBatchReader* expected, Scanner* scanner,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto it, scanner->ScanBatches());
ARROW_EXPECT_OK(it.Visit([&](TaggedRecordBatch batch) -> Status {
AssertBatchEquals(expected, *batch.record_batch);
return Status::OK();
}));
if (ensure_drained) {
EnsureRecordBatchReaderDrained(expected);
}
}
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by a scanner. Each fragment in the scanner is
/// expected to have a single batch.
void AssertScanBatchesUnorderedEquals(RecordBatchReader* expected, Scanner* scanner,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto it, scanner->ScanBatchesUnordered());
int fragment_counter = 0;
bool saw_last_fragment = false;
ARROW_EXPECT_OK(it.Visit([&](EnumeratedRecordBatch batch) -> Status {
EXPECT_EQ(0, batch.record_batch.index);
EXPECT_EQ(true, batch.record_batch.last);
EXPECT_EQ(fragment_counter++, batch.fragment.index);
EXPECT_FALSE(saw_last_fragment);
saw_last_fragment = batch.fragment.last;
AssertBatchEquals(expected, *batch.record_batch.value);
return Status::OK();
}));
if (ensure_drained) {
EnsureRecordBatchReaderDrained(expected);
}
}
/// \brief Ensure that record batches found in reader are equals to the
/// record batches yielded by a dataset.
void AssertDatasetEquals(RecordBatchReader* expected, Dataset* dataset,
bool ensure_drained = true) {
ASSERT_OK_AND_ASSIGN(auto builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, builder->Finish());
AssertScannerEquals(expected, scanner.get());
if (ensure_drained) {
EnsureRecordBatchReaderDrained(expected);
}
}
protected:
void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
schema_ = schema(std::move(fields));
options_ = std::make_shared<ScanOptions>();
options_->dataset_schema = schema_;
ASSERT_OK(SetProjection(options_.get(), schema_->field_names()));
SetFilter(literal(true));
}
void SetFilter(Expression filter) {
ASSERT_OK_AND_ASSIGN(options_->filter, filter.Bind(*schema_));
}
std::shared_ptr<Schema> schema_;
std::shared_ptr<ScanOptions> options_;
};
/// \brief A dummy FileFormat implementation
class DummyFileFormat : public FileFormat {
public:
explicit DummyFileFormat(std::shared_ptr<Schema> schema = NULLPTR)
: schema_(std::move(schema)) {}
std::string type_name() const override { return "dummy"; }
bool Equals(const FileFormat& other) const override {
return type_name() == other.type_name() &&
schema_->Equals(checked_cast<const DummyFileFormat&>(other).schema_);
}
Result<bool> IsSupported(const FileSource& source) const override { return true; }
Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override {
return schema_;
}
/// \brief Open a file for scanning (always returns an empty iterator)
Result<ScanTaskIterator> ScanFile(
std::shared_ptr<ScanOptions> options,
const std::shared_ptr<FileFragment>& fragment) const override {
return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
}
Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const override {
return Status::NotImplemented("writing fragment of DummyFileFormat");
}
std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override { return nullptr; }
protected:
std::shared_ptr<Schema> schema_;
};
class JSONRecordBatchFileFormat : public FileFormat {
public:
using SchemaResolver = std::function<std::shared_ptr<Schema>(const FileSource&)>;
explicit JSONRecordBatchFileFormat(std::shared_ptr<Schema> schema)
: resolver_([schema](const FileSource&) { return schema; }) {}
explicit JSONRecordBatchFileFormat(SchemaResolver resolver)
: resolver_(std::move(resolver)) {}
bool Equals(const FileFormat& other) const override { return this == &other; }
std::string type_name() const override { return "json_record_batch"; }
/// \brief Return true if the given file extension
Result<bool> IsSupported(const FileSource& source) const override { return true; }
Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override {
return resolver_(source);
}
/// \brief Open a file for scanning
Result<ScanTaskIterator> ScanFile(
std::shared_ptr<ScanOptions> options,
const std::shared_ptr<FileFragment>& fragment) const override {
ARROW_ASSIGN_OR_RAISE(auto file, fragment->source().Open());
ARROW_ASSIGN_OR_RAISE(int64_t size, file->GetSize());
ARROW_ASSIGN_OR_RAISE(auto buffer, file->Read(size));
util::string_view view{*buffer};
ARROW_ASSIGN_OR_RAISE(auto schema, Inspect(fragment->source()));
std::shared_ptr<RecordBatch> batch = RecordBatchFromJSON(schema, view);
return ScanTaskIteratorFromRecordBatch({batch}, std::move(options));
}
Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const override {
return Status::NotImplemented("writing fragment of JSONRecordBatchFileFormat");
}
std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override { return nullptr; }
protected:
SchemaResolver resolver_;
};
struct MakeFileSystemDatasetMixin {
std::vector<fs::FileInfo> ParsePathList(const std::string& pathlist) {
std::vector<fs::FileInfo> infos;
std::stringstream ss(pathlist);
std::string line;
while (std::getline(ss, line)) {
auto start = line.find_first_not_of(" \n\r\t");
if (start == std::string::npos) {
continue;
}
line.erase(0, start);
if (line.front() == '#') {
continue;
}
if (line.back() == '/') {
infos.push_back(fs::Dir(line));
continue;
}
infos.push_back(fs::File(line));
}
return infos;
}
void MakeFileSystem(const std::vector<fs::FileInfo>& infos) {
ASSERT_OK_AND_ASSIGN(fs_, fs::internal::MockFileSystem::Make(fs::kNoTime, infos));
}
void MakeFileSystem(const std::vector<std::string>& paths) {
std::vector<fs::FileInfo> infos{paths.size()};
std::transform(paths.cbegin(), paths.cend(), infos.begin(),
[](const std::string& p) { return fs::File(p); });
ASSERT_OK_AND_ASSIGN(fs_, fs::internal::MockFileSystem::Make(fs::kNoTime, infos));
}
void MakeDataset(const std::vector<fs::FileInfo>& infos,
Expression root_partition = literal(true),
std::vector<Expression> partitions = {},
std::shared_ptr<Schema> s = kBoringSchema) {
auto n_fragments = infos.size();
if (partitions.empty()) {
partitions.resize(n_fragments, literal(true));
}
MakeFileSystem(infos);
auto format = std::make_shared<DummyFileFormat>(s);
std::vector<std::shared_ptr<FileFragment>> fragments;
for (size_t i = 0; i < n_fragments; i++) {
const auto& info = infos[i];
if (!info.IsFile()) {
continue;
}
ASSERT_OK_AND_ASSIGN(partitions[i], partitions[i].Bind(*s));
ASSERT_OK_AND_ASSIGN(auto fragment,
format->MakeFragment({info, fs_}, partitions[i]));
fragments.push_back(std::move(fragment));
}
ASSERT_OK_AND_ASSIGN(root_partition, root_partition.Bind(*s));
ASSERT_OK_AND_ASSIGN(dataset_, FileSystemDataset::Make(s, root_partition, format, fs_,
std::move(fragments)));
}
std::shared_ptr<fs::FileSystem> fs_;
std::shared_ptr<Dataset> dataset_;
std::shared_ptr<ScanOptions> options_;
};
static const std::string& PathOf(const std::shared_ptr<Fragment>& fragment) {
EXPECT_NE(fragment, nullptr);
EXPECT_THAT(fragment->type_name(), "dummy");
return internal::checked_cast<const FileFragment&>(*fragment).source().path();
}
class TestFileSystemDataset : public ::testing::Test,
public MakeFileSystemDatasetMixin {};
static std::vector<std::string> PathsOf(const FragmentVector& fragments) {
std::vector<std::string> paths(fragments.size());
std::transform(fragments.begin(), fragments.end(), paths.begin(), PathOf);
return paths;
}
void AssertFilesAre(const std::shared_ptr<Dataset>& dataset,
std::vector<std::string> expected) {
auto fs_dataset = internal::checked_cast<FileSystemDataset*>(dataset.get());
EXPECT_THAT(fs_dataset->files(), testing::UnorderedElementsAreArray(expected));
}
void AssertFragmentsAreFromPath(FragmentIterator it, std::vector<std::string> expected) {
// Ordering is not guaranteed.
EXPECT_THAT(PathsOf(IteratorToVector(std::move(it))),
testing::UnorderedElementsAreArray(expected));
}
static std::vector<Expression> PartitionExpressionsOf(const FragmentVector& fragments) {
std::vector<Expression> partition_expressions;
std::transform(fragments.begin(), fragments.end(),
std::back_inserter(partition_expressions),
[](const std::shared_ptr<Fragment>& fragment) {
return fragment->partition_expression();
});
return partition_expressions;
}
void AssertFragmentsHavePartitionExpressions(std::shared_ptr<Dataset> dataset,
std::vector<Expression> expected) {
ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset->GetFragments());
for (auto& expr : expected) {
ASSERT_OK_AND_ASSIGN(expr, expr.Bind(*dataset->schema()));
}
// Ordering is not guaranteed.
EXPECT_THAT(PartitionExpressionsOf(IteratorToVector(std::move(fragment_it))),
testing::UnorderedElementsAreArray(expected));
}
struct ArithmeticDatasetFixture {
static std::shared_ptr<Schema> schema() {
return ::arrow::schema({
field("i64", int64()),
// ARROW-1644: Parquet can't write complex level
// field("struct", struct_({
// // ARROW-2587: Parquet can't write struct with more
// // than one field.
// // field("i32", int32()),
// field("str", utf8()),
// })),
field("u8", uint8()),
field("list", list(int32())),
field("bool", boolean()),
});
}
/// \brief Creates a single JSON record templated with n as follow.
///
/// {"i64": n, "struct": {"i32": n, "str": "n"}, "u8": n "list": [n,n], "bool": n %
/// 2},
static std::string JSONRecordFor(int64_t n) {
std::stringstream ss;
auto n_i32 = static_cast<int32_t>(n);
ss << "{";
ss << "\"i64\": " << n << ", ";
// ss << "\"struct\": {";
// {
// // ss << "\"i32\": " << n_i32 << ", ";
// ss << "\"str\": \"" << std::to_string(n) << "\"";
// }
// ss << "}, ";
ss << "\"u8\": " << static_cast<int32_t>(n) << ", ";
ss << "\"list\": [" << n_i32 << ", " << n_i32 << "], ";
ss << "\"bool\": " << (static_cast<bool>(n % 2) ? "true" : "false");
ss << "}";
return ss.str();
}
/// \brief Creates a JSON RecordBatch
static std::string JSONRecordBatch(int64_t n) {
DCHECK_GT(n, 0);
auto record = JSONRecordFor(n);
std::stringstream ss;
ss << "[\n";
for (int64_t i = 1; i <= n; i++) {
if (i != 1) {
ss << "\n,";
}
ss << record;
}
ss << "]\n";
return ss.str();
}
static std::shared_ptr<RecordBatch> GetRecordBatch(int64_t n) {
return RecordBatchFromJSON(ArithmeticDatasetFixture::schema(), JSONRecordBatch(n));
}
static std::unique_ptr<RecordBatchReader> GetRecordBatchReader(int64_t n) {
DCHECK_GT(n, 0);
// Functor which generates `n` RecordBatch
struct {
Status operator()(std::shared_ptr<RecordBatch>* out) {
*out = i++ < count ? GetRecordBatch(i) : nullptr;
return Status::OK();
}
int64_t i;
int64_t count;
} generator{0, n};
return MakeGeneratedRecordBatch(schema(), std::move(generator));
}
};
class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
public:
using PathAndContent = std::unordered_map<std::string, std::string>;
void MakeSourceDataset() {
PathAndContent source_files;
source_files["/dataset/year=2018/month=01/dat0.json"] = R"([
{"region": "NY", "model": "3", "sales": 742.0, "country": "US"},
{"region": "NY", "model": "S", "sales": 304.125, "country": "US"},
{"region": "NY", "model": "Y", "sales": 27.5, "country": "US"}
])";
source_files["/dataset/year=2018/month=01/dat1.json"] = R"([
{"region": "QC", "model": "3", "sales": 512, "country": "CA"},
{"region": "QC", "model": "S", "sales": 978, "country": "CA"},
{"region": "NY", "model": "X", "sales": 136.25, "country": "US"},
{"region": "QC", "model": "X", "sales": 1.0, "country": "CA"},
{"region": "QC", "model": "Y", "sales": 69, "country": "CA"}
])";
source_files["/dataset/year=2019/month=01/dat0.json"] = R"([
{"region": "CA", "model": "3", "sales": 273.5, "country": "US"},
{"region": "CA", "model": "S", "sales": 13, "country": "US"},
{"region": "CA", "model": "X", "sales": 54, "country": "US"},
{"region": "QC", "model": "S", "sales": 10, "country": "CA"},
{"region": "CA", "model": "Y", "sales": 21, "country": "US"}
])";
source_files["/dataset/year=2019/month=01/dat1.json"] = R"([
{"region": "QC", "model": "3", "sales": 152.25, "country": "CA"},
{"region": "QC", "model": "X", "sales": 42, "country": "CA"},
{"region": "QC", "model": "Y", "sales": 37, "country": "CA"}
])";
source_files["/dataset/.pesky"] = "garbage content";
auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
for (const auto& f : source_files) {
ARROW_EXPECT_OK(mock_fs->CreateFile(f.first, f.second, /* recursive */ true));
}
fs_ = mock_fs;
/// schema for the whole dataset (both source and destination)
source_schema_ = schema({
field("region", utf8()),
field("model", utf8()),
field("sales", float64()),
field("year", int32()),
field("month", int32()),
field("country", utf8()),
});
/// Dummy file format for source dataset. Note that it isn't partitioned on country
auto source_format = std::make_shared<JSONRecordBatchFileFormat>(
SchemaFromColumnNames(source_schema_, {"region", "model", "sales", "country"}));
fs::FileSelector s;
s.base_dir = "/dataset";
s.recursive = true;
FileSystemFactoryOptions options;
options.selector_ignore_prefixes = {"."};
options.partitioning = std::make_shared<HivePartitioning>(
SchemaFromColumnNames(source_schema_, {"year", "month"}));
ASSERT_OK_AND_ASSIGN(auto factory,
FileSystemDatasetFactory::Make(fs_, s, source_format, options));
ASSERT_OK_AND_ASSIGN(dataset_, factory->Finish());
scan_options_ = std::make_shared<ScanOptions>();
scan_options_->dataset_schema = source_schema_;
ASSERT_OK(SetProjection(scan_options_.get(), source_schema_->field_names()));
}
void SetWriteOptions(std::shared_ptr<FileWriteOptions> file_write_options) {
write_options_.file_write_options = file_write_options;
write_options_.filesystem = fs_;
write_options_.base_dir = "new_root/";
write_options_.basename_template = "dat_{i}";
}
void DoWrite(std::shared_ptr<Partitioning> desired_partitioning) {
write_options_.partitioning = desired_partitioning;
auto scanner_builder = ScannerBuilder(dataset_, scan_options_);
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder.Finish());
ASSERT_OK(FileSystemDataset::Write(write_options_, scanner));
// re-discover the written dataset
fs::FileSelector s;
s.recursive = true;
s.base_dir = "/new_root";
FileSystemFactoryOptions factory_options;
factory_options.partitioning = desired_partitioning;
ASSERT_OK_AND_ASSIGN(
auto factory, FileSystemDatasetFactory::Make(fs_, s, format_, factory_options));
ASSERT_OK_AND_ASSIGN(written_, factory->Finish());
}
void TestWriteWithIdenticalPartitioningSchema() {
DoWrite(std::make_shared<DirectoryPartitioning>(
SchemaFromColumnNames(source_schema_, {"year", "month"})));
expected_files_["/new_root/2018/1/dat_0"] = R"([
{"region": "NY", "model": "3", "sales": 742.0, "country": "US"},
{"region": "NY", "model": "S", "sales": 304.125, "country": "US"},
{"region": "NY", "model": "Y", "sales": 27.5, "country": "US"},
{"region": "QC", "model": "3", "sales": 512, "country": "CA"},
{"region": "QC", "model": "S", "sales": 978, "country": "CA"},
{"region": "NY", "model": "X", "sales": 136.25, "country": "US"},
{"region": "QC", "model": "X", "sales": 1.0, "country": "CA"},
{"region": "QC", "model": "Y", "sales": 69, "country": "CA"}
])";
expected_files_["/new_root/2019/1/dat_1"] = R"([
{"region": "CA", "model": "3", "sales": 273.5, "country": "US"},
{"region": "CA", "model": "S", "sales": 13, "country": "US"},
{"region": "CA", "model": "X", "sales": 54, "country": "US"},
{"region": "QC", "model": "S", "sales": 10, "country": "CA"},
{"region": "CA", "model": "Y", "sales": 21, "country": "US"},
{"region": "QC", "model": "3", "sales": 152.25, "country": "CA"},
{"region": "QC", "model": "X", "sales": 42, "country": "CA"},
{"region": "QC", "model": "Y", "sales": 37, "country": "CA"}
])";
expected_physical_schema_ =
SchemaFromColumnNames(source_schema_, {"region", "model", "sales", "country"});
AssertWrittenAsExpected();
}
void TestWriteWithUnrelatedPartitioningSchema() {
DoWrite(std::make_shared<DirectoryPartitioning>(
SchemaFromColumnNames(source_schema_, {"country", "region"})));
// XXX first thing a user will be annoyed by: we don't support left
// padding the month field with 0.
expected_files_["/new_root/US/NY/dat_0"] = R"([
{"year": 2018, "month": 1, "model": "3", "sales": 742.0},
{"year": 2018, "month": 1, "model": "S", "sales": 304.125},
{"year": 2018, "month": 1, "model": "Y", "sales": 27.5},
{"year": 2018, "month": 1, "model": "X", "sales": 136.25}
])";
expected_files_["/new_root/CA/QC/dat_1"] = R"([
{"year": 2018, "month": 1, "model": "3", "sales": 512},
{"year": 2018, "month": 1, "model": "S", "sales": 978},
{"year": 2018, "month": 1, "model": "X", "sales": 1.0},
{"year": 2018, "month": 1, "model": "Y", "sales": 69},
{"year": 2019, "month": 1, "model": "S", "sales": 10},
{"year": 2019, "month": 1, "model": "3", "sales": 152.25},
{"year": 2019, "month": 1, "model": "X", "sales": 42},
{"year": 2019, "month": 1, "model": "Y", "sales": 37}
])";
expected_files_["/new_root/US/CA/dat_2"] = R"([
{"year": 2019, "month": 1, "model": "3", "sales": 273.5},
{"year": 2019, "month": 1, "model": "S", "sales": 13},
{"year": 2019, "month": 1, "model": "X", "sales": 54},
{"year": 2019, "month": 1, "model": "Y", "sales": 21}
])";
expected_physical_schema_ =
SchemaFromColumnNames(source_schema_, {"model", "sales", "year", "month"});
AssertWrittenAsExpected();
}
void TestWriteWithSupersetPartitioningSchema() {
DoWrite(std::make_shared<DirectoryPartitioning>(
SchemaFromColumnNames(source_schema_, {"year", "month", "country", "region"})));
// XXX first thing a user will be annoyed by: we don't support left
// padding the month field with 0.
expected_files_["/new_root/2018/1/US/NY/dat_0"] = R"([
{"model": "3", "sales": 742.0},
{"model": "S", "sales": 304.125},
{"model": "Y", "sales": 27.5},
{"model": "X", "sales": 136.25}
])";
expected_files_["/new_root/2018/1/CA/QC/dat_1"] = R"([
{"model": "3", "sales": 512},
{"model": "S", "sales": 978},
{"model": "X", "sales": 1.0},
{"model": "Y", "sales": 69}
])";
expected_files_["/new_root/2019/1/US/CA/dat_2"] = R"([
{"model": "3", "sales": 273.5},
{"model": "S", "sales": 13},
{"model": "X", "sales": 54},
{"model": "Y", "sales": 21}
])";
expected_files_["/new_root/2019/1/CA/QC/dat_3"] = R"([
{"model": "S", "sales": 10},
{"model": "3", "sales": 152.25},
{"model": "X", "sales": 42},
{"model": "Y", "sales": 37}
])";
expected_physical_schema_ = SchemaFromColumnNames(source_schema_, {"model", "sales"});
AssertWrittenAsExpected();
}
void TestWriteWithEmptyPartitioningSchema() {
DoWrite(std::make_shared<DirectoryPartitioning>(
SchemaFromColumnNames(source_schema_, {})));
expected_files_["/new_root/dat_0"] = R"([
{"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "3", "sales": 742.0},
{"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "S", "sales": 304.125},
{"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "Y", "sales": 27.5},
{"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "3", "sales": 512},
{"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "S", "sales": 978},
{"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "X", "sales": 136.25},
{"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "X", "sales": 1.0},
{"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "Y", "sales": 69},
{"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "3", "sales": 273.5},
{"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "S", "sales": 13},
{"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "X", "sales": 54},
{"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "S", "sales": 10},
{"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "Y", "sales": 21},
{"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "3", "sales": 152.25},
{"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "X", "sales": 42},
{"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "Y", "sales": 37}
])";
expected_physical_schema_ = source_schema_;
AssertWrittenAsExpected();
}
void AssertWrittenAsExpected() {
std::unordered_set<std::string> expected_paths, actual_paths;
for (const auto& file_contents : expected_files_) {
expected_paths.insert(file_contents.first);
}
for (auto path : checked_pointer_cast<FileSystemDataset>(written_)->files()) {
actual_paths.insert(std::move(path));
}
EXPECT_THAT(actual_paths, testing::UnorderedElementsAreArray(expected_paths));
ASSERT_OK_AND_ASSIGN(auto written_fragments_it, written_->GetFragments());
for (auto maybe_fragment : written_fragments_it) {
ASSERT_OK_AND_ASSIGN(auto fragment, maybe_fragment);
ASSERT_OK_AND_ASSIGN(auto actual_physical_schema, fragment->ReadPhysicalSchema());
AssertSchemaEqual(*expected_physical_schema_, *actual_physical_schema,
check_metadata_);
const auto& path = checked_pointer_cast<FileFragment>(fragment)->source().path();
auto file_contents = expected_files_.find(path);
if (file_contents == expected_files_.end()) {
// file wasn't expected to be written at all; nothing to compare with
continue;
}
ASSERT_OK_AND_ASSIGN(auto scanner, ScannerBuilder(actual_physical_schema, fragment,
std::make_shared<ScanOptions>())
.Finish());
ASSERT_OK_AND_ASSIGN(auto actual_table, scanner->ToTable());
ASSERT_OK_AND_ASSIGN(actual_table, actual_table->CombineChunks());
std::shared_ptr<Array> actual_struct;
for (auto maybe_batch :
IteratorFromReader(std::make_shared<TableBatchReader>(*actual_table))) {
ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
ASSERT_OK_AND_ASSIGN(actual_struct, batch->ToStructArray());
}
auto expected_struct = ArrayFromJSON(struct_(expected_physical_schema_->fields()),
{file_contents->second});
AssertArraysEqual(*expected_struct, *actual_struct, /*verbose=*/true);
}
}
bool check_metadata_ = true;
std::shared_ptr<Schema> source_schema_;
std::shared_ptr<FileFormat> format_;
PathAndContent expected_files_;
std::shared_ptr<Schema> expected_physical_schema_;
std::shared_ptr<Dataset> written_;
FileSystemDatasetWriteOptions write_options_;
std::shared_ptr<ScanOptions> scan_options_;
};
} // namespace dataset
} // namespace arrow