blob: c7ce5154d0afa472c74b4adcb5ac37991c83d0c2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "arrow/dataset/api.h"
#include "arrow/dataset/forest_internal.h"
#include "arrow/dataset/partition.h"
#include "arrow/dataset/test_util.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/test_util.h"
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/io_util.h"
namespace arrow {
namespace dataset {
using fs::internal::GetAbstractPathExtension;
using internal::TemporaryDir;
using testing::ContainerEq;
TEST(FileSource, PathBased) {
auto localfs = std::make_shared<fs::LocalFileSystem>();
std::string p1 = "/path/to/file.ext";
std::string p2 = "/path/to/file.ext.gz";
FileSource source1(p1, localfs);
FileSource source2(p2, localfs, Compression::GZIP);
ASSERT_EQ(p1, source1.path());
ASSERT_TRUE(localfs->Equals(*source1.filesystem()));
ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression());
ASSERT_EQ(p2, source2.path());
ASSERT_TRUE(localfs->Equals(*source2.filesystem()));
ASSERT_EQ(Compression::GZIP, source2.compression());
// Test copy constructor and comparison
FileSource source3;
source3 = source1;
ASSERT_EQ(source1.path(), source3.path());
ASSERT_EQ(source1.filesystem(), source3.filesystem());
}
TEST(FileSource, BufferBased) {
std::string the_data = "this is the file contents";
auto buf = std::make_shared<Buffer>(the_data);
FileSource source1(buf);
FileSource source2(buf, Compression::LZ4);
ASSERT_TRUE(source1.buffer()->Equals(*buf));
ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression());
ASSERT_TRUE(source2.buffer()->Equals(*buf));
ASSERT_EQ(Compression::LZ4, source2.compression());
FileSource source3;
source3 = source1;
ASSERT_EQ(source1.buffer(), source3.buffer());
}
TEST_F(TestFileSystemDataset, Basic) {
MakeDataset({});
AssertFragmentsAreFromPath(*dataset_->GetFragments(), {});
MakeDataset({fs::File("a"), fs::File("b"), fs::File("c")});
AssertFragmentsAreFromPath(*dataset_->GetFragments(), {"a", "b", "c"});
AssertFilesAre(dataset_, {"a", "b", "c"});
// Should not create fragment from directories.
MakeDataset({fs::Dir("A"), fs::Dir("A/B"), fs::File("A/a"), fs::File("A/B/b")});
AssertFragmentsAreFromPath(*dataset_->GetFragments(), {"A/a", "A/B/b"});
AssertFilesAre(dataset_, {"A/a", "A/B/b"});
}
TEST_F(TestFileSystemDataset, ReplaceSchema) {
auto schm = schema({field("i32", int32()), field("f64", float64())});
auto format = std::make_shared<DummyFileFormat>(schm);
ASSERT_OK_AND_ASSIGN(auto dataset,
FileSystemDataset::Make(schm, literal(true), format, nullptr, {}));
// drop field
ASSERT_OK(dataset->ReplaceSchema(schema({field("i32", int32())})).status());
// add nullable field (will be materialized as null during projection)
ASSERT_OK(dataset->ReplaceSchema(schema({field("str", utf8())})).status());
// incompatible type
ASSERT_RAISES(TypeError,
dataset->ReplaceSchema(schema({field("i32", utf8())})).status());
// incompatible nullability
ASSERT_RAISES(
TypeError,
dataset->ReplaceSchema(schema({field("f64", float64(), /*nullable=*/false)}))
.status());
// add non-nullable field
ASSERT_RAISES(TypeError,
dataset->ReplaceSchema(schema({field("str", utf8(), /*nullable=*/false)}))
.status());
}
TEST_F(TestFileSystemDataset, RootPartitionPruning) {
auto root_partition = equal(field_ref("i32"), literal(5));
MakeDataset({fs::File("a"), fs::File("b")}, root_partition);
auto GetFragments = [&](Expression filter) {
return *dataset_->GetFragments(*filter.Bind(*dataset_->schema()));
};
// Default filter should always return all data.
AssertFragmentsAreFromPath(*dataset_->GetFragments(), {"a", "b"});
// filter == partition
AssertFragmentsAreFromPath(GetFragments(root_partition), {"a", "b"});
// Same partition key, but non matching filter
AssertFragmentsAreFromPath(GetFragments(equal(field_ref("i32"), literal(6))), {});
AssertFragmentsAreFromPath(GetFragments(greater(field_ref("i32"), literal(1))),
{"a", "b"});
// different key shouldn't prune
AssertFragmentsAreFromPath(GetFragments(equal(field_ref("f32"), literal(3.F))),
{"a", "b"});
// No partition should match
MakeDataset({fs::File("a"), fs::File("b")});
AssertFragmentsAreFromPath(GetFragments(equal(field_ref("f32"), literal(3.F))),
{"a", "b"});
}
TEST_F(TestFileSystemDataset, TreePartitionPruning) {
auto root_partition = equal(field_ref("country"), literal("US"));
std::vector<fs::FileInfo> regions = {
fs::Dir("NY"), fs::File("NY/New York"), fs::File("NY/Franklin"),
fs::Dir("CA"), fs::File("CA/San Francisco"), fs::File("CA/Franklin"),
};
std::vector<Expression> partitions = {
equal(field_ref("state"), literal("NY")),
and_(equal(field_ref("state"), literal("NY")),
equal(field_ref("city"), literal("New York"))),
and_(equal(field_ref("state"), literal("NY")),
equal(field_ref("city"), literal("Franklin"))),
equal(field_ref("state"), literal("CA")),
and_(equal(field_ref("state"), literal("CA")),
equal(field_ref("city"), literal("San Francisco"))),
and_(equal(field_ref("state"), literal("CA")),
equal(field_ref("city"), literal("Franklin"))),
};
MakeDataset(
regions, root_partition, partitions,
schema({field("country", utf8()), field("state", utf8()), field("city", utf8())}));
std::vector<std::string> all_cities = {"CA/San Francisco", "CA/Franklin", "NY/New York",
"NY/Franklin"};
std::vector<std::string> ca_cities = {"CA/San Francisco", "CA/Franklin"};
std::vector<std::string> franklins = {"CA/Franklin", "NY/Franklin"};
// Default filter should always return all data.
AssertFragmentsAreFromPath(*dataset_->GetFragments(), all_cities);
auto GetFragments = [&](Expression filter) {
return *dataset_->GetFragments(*filter.Bind(*dataset_->schema()));
};
// Dataset's partitions are respected
AssertFragmentsAreFromPath(GetFragments(equal(field_ref("country"), literal("US"))),
all_cities);
AssertFragmentsAreFromPath(GetFragments(equal(field_ref("country"), literal("FR"))),
{});
AssertFragmentsAreFromPath(GetFragments(equal(field_ref("state"), literal("CA"))),
ca_cities);
// Filter where no decisions can be made on inner nodes when filter don't
// apply to inner partitions.
AssertFragmentsAreFromPath(GetFragments(equal(field_ref("city"), literal("Franklin"))),
franklins);
}
TEST_F(TestFileSystemDataset, FragmentPartitions) {
auto root_partition = equal(field_ref("country"), literal("US"));
std::vector<fs::FileInfo> regions = {
fs::Dir("NY"), fs::File("NY/New York"), fs::File("NY/Franklin"),
fs::Dir("CA"), fs::File("CA/San Francisco"), fs::File("CA/Franklin"),
};
std::vector<Expression> partitions = {
equal(field_ref("state"), literal("NY")),
and_(equal(field_ref("state"), literal("NY")),
equal(field_ref("city"), literal("New York"))),
and_(equal(field_ref("state"), literal("NY")),
equal(field_ref("city"), literal("Franklin"))),
equal(field_ref("state"), literal("CA")),
and_(equal(field_ref("state"), literal("CA")),
equal(field_ref("city"), literal("San Francisco"))),
and_(equal(field_ref("state"), literal("CA")),
equal(field_ref("city"), literal("Franklin"))),
};
MakeDataset(
regions, root_partition, partitions,
schema({field("country", utf8()), field("state", utf8()), field("city", utf8())}));
AssertFragmentsHavePartitionExpressions(
dataset_, {
and_(equal(field_ref("state"), literal("CA")),
equal(field_ref("city"), literal("San Francisco"))),
and_(equal(field_ref("state"), literal("CA")),
equal(field_ref("city"), literal("Franklin"))),
and_(equal(field_ref("state"), literal("NY")),
equal(field_ref("city"), literal("New York"))),
and_(equal(field_ref("state"), literal("NY")),
equal(field_ref("city"), literal("Franklin"))),
});
}
// Tests of subtree pruning
struct TestPathTree {
fs::FileInfo info;
std::vector<TestPathTree> subtrees;
explicit TestPathTree(std::string file_path) : info(fs::File(std::move(file_path))) {}
TestPathTree(std::string dir_path, std::vector<TestPathTree> subtrees)
: info(fs::Dir(std::move(dir_path))), subtrees(std::move(subtrees)) {}
TestPathTree(Forest::Ref ref, const std::vector<fs::FileInfo>& infos)
: info(infos[ref.i]) {
const Forest& forest = *ref.forest;
int begin = ref.i + 1;
int end = begin + ref.num_descendants();
for (int i = begin; i < end; ++i) {
subtrees.emplace_back(forest[i], infos);
i += forest[i].num_descendants();
}
}
bool operator==(const TestPathTree& other) const {
return info == other.info && subtrees == other.subtrees;
}
std::string ToString() const {
auto out = "\n" + info.path();
if (info.IsDirectory()) out += "/";
for (const auto& subtree : subtrees) {
out += subtree.ToString();
}
return out;
}
friend std::ostream& operator<<(std::ostream& os, const TestPathTree& tree) {
return os << tree.ToString();
}
};
using PT = TestPathTree;
Forest MakeForest(std::vector<fs::FileInfo>* infos) {
std::sort(infos->begin(), infos->end(), fs::FileInfo::ByPath{});
return Forest(static_cast<int>(infos->size()), [&](int i, int j) {
return fs::internal::IsAncestorOf(infos->at(i).path(), infos->at(j).path());
});
}
void ExpectForestIs(std::vector<fs::FileInfo> infos, std::vector<PT> expected_roots) {
auto forest = MakeForest(&infos);
std::vector<PT> actual_roots;
ASSERT_OK(forest.Visit(
[&](Forest::Ref ref) -> Result<bool> {
actual_roots.emplace_back(ref, infos);
return false; // only vist roots
},
[](Forest::Ref) {}));
// visit expected and assert equality
EXPECT_THAT(actual_roots, ContainerEq(expected_roots));
}
TEST(Forest, Basic) {
ExpectForestIs({}, {});
ExpectForestIs({fs::File("aa")}, {PT("aa")});
ExpectForestIs({fs::Dir("AA")}, {PT("AA", {})});
ExpectForestIs({fs::Dir("AA"), fs::File("AA/aa")}, {PT("AA", {PT("AA/aa")})});
ExpectForestIs({fs::Dir("AA"), fs::Dir("AA/BB"), fs::File("AA/BB/0")},
{PT("AA", {PT("AA/BB", {PT("AA/BB/0")})})});
// Missing parent can still find ancestor.
ExpectForestIs({fs::Dir("AA"), fs::File("AA/BB/bb")}, {PT("AA", {PT("AA/BB/bb")})});
// Ancestors should link to parent regardless of ordering.
ExpectForestIs({fs::File("AA/aa"), fs::Dir("AA")}, {PT("AA", {PT("AA/aa")})});
// Multiple roots are supported.
ExpectForestIs({fs::File("aa"), fs::File("bb")}, {PT("aa"), PT("bb")});
ExpectForestIs({fs::File("00"), fs::Dir("AA"), fs::File("AA/aa"), fs::File("BB/bb")},
{PT("00"), PT("AA", {PT("AA/aa")}), PT("BB/bb")});
ExpectForestIs({fs::Dir("AA"), fs::Dir("AA/BB"), fs::File("AA/BB/0"), fs::Dir("CC"),
fs::Dir("CC/BB"), fs::File("CC/BB/0")},
{PT("AA", {PT("AA/BB", {PT("AA/BB/0")})}),
PT("CC", {PT("CC/BB", {PT("CC/BB/0")})})});
}
TEST(Forest, HourlyETL) {
// This test mimics a scenario where an ETL dumps hourly files in a structure
// `$year/$month/$day/$hour/*.parquet`.
constexpr int64_t kYears = 3;
constexpr int64_t kMonthsPerYear = 12;
constexpr int64_t kDaysPerMonth = 31;
constexpr int64_t kHoursPerDay = 24;
constexpr int64_t kFilesPerHour = 2;
// Avoid constructing strings
std::vector<std::string> numbers{kDaysPerMonth + 1};
for (size_t i = 0; i < numbers.size(); i++) {
numbers[i] = std::to_string(i);
if (numbers[i].size() == 1) {
numbers[i] = "0" + numbers[i];
}
}
auto join = [](const std::vector<std::string>& path) {
return fs::internal::JoinAbstractPath(path);
};
std::vector<fs::FileInfo> infos;
std::vector<PT> forest;
for (int64_t year = 0; year < kYears; year++) {
auto year_str = std::to_string(year + 2000);
auto year_dir = fs::Dir(year_str);
infos.push_back(year_dir);
std::vector<PT> months;
for (int64_t month = 0; month < kMonthsPerYear; month++) {
auto month_str = join({year_str, numbers[month + 1]});
auto month_dir = fs::Dir(month_str);
infos.push_back(month_dir);
std::vector<PT> days;
for (int64_t day = 0; day < kDaysPerMonth; day++) {
auto day_str = join({month_str, numbers[day + 1]});
auto day_dir = fs::Dir(day_str);
infos.push_back(day_dir);
std::vector<PT> hours;
for (int64_t hour = 0; hour < kHoursPerDay; hour++) {
auto hour_str = join({day_str, numbers[hour]});
auto hour_dir = fs::Dir(hour_str);
infos.push_back(hour_dir);
std::vector<PT> files;
for (int64_t file = 0; file < kFilesPerHour; file++) {
auto file_str = join({hour_str, numbers[file] + ".parquet"});
auto file_fd = fs::File(file_str);
infos.push_back(file_fd);
files.emplace_back(file_str);
}
auto hour_pt = PT(hour_str, std::move(files));
hours.push_back(hour_pt);
}
auto day_pt = PT(day_str, std::move(hours));
days.push_back(day_pt);
}
auto month_pt = PT(month_str, std::move(days));
months.push_back(month_pt);
}
auto year_pt = PT(year_str, std::move(months));
forest.push_back(year_pt);
}
ExpectForestIs(infos, forest);
}
TEST(Forest, Visit) {
using Infos = std::vector<fs::FileInfo>;
for (auto infos : {Infos{}, Infos{fs::Dir("A"), fs::File("A/a")},
Infos{fs::Dir("AA"), fs::Dir("AA/BB"), fs::File("AA/BB/0"),
fs::Dir("CC"), fs::Dir("CC/BB"), fs::File("CC/BB/0")}}) {
ASSERT_TRUE(std::is_sorted(infos.begin(), infos.end(), fs::FileInfo::ByPath{}));
auto forest = MakeForest(&infos);
auto ignore_post = [](Forest::Ref) {};
// noop is fine
ASSERT_OK(
forest.Visit([](Forest::Ref) -> Result<bool> { return false; }, ignore_post));
// Should propagate failure
if (forest.size() != 0) {
ASSERT_RAISES(
Invalid,
forest.Visit([](Forest::Ref) -> Result<bool> { return Status::Invalid(""); },
ignore_post));
}
// Ensure basic visit of all nodes
int i = 0;
ASSERT_OK(forest.Visit(
[&](Forest::Ref ref) -> Result<bool> {
EXPECT_EQ(ref.i, i);
++i;
return true;
},
ignore_post));
// Visit only directories
Infos actual_dirs;
ASSERT_OK(forest.Visit(
[&](Forest::Ref ref) -> Result<bool> {
if (!infos[ref.i].IsDirectory()) {
return false;
}
actual_dirs.push_back(infos[ref.i]);
return true;
},
ignore_post));
Infos expected_dirs;
for (const auto& info : infos) {
if (info.IsDirectory()) {
expected_dirs.push_back(info);
}
}
EXPECT_THAT(actual_dirs, ContainerEq(expected_dirs));
}
}
TEST(Subtree, EncodeExpression) {
SubtreeImpl tree;
ASSERT_EQ(0, tree.GetOrInsert(equal(field_ref("a"), literal("1"))));
// Should be idempotent
ASSERT_EQ(0, tree.GetOrInsert(equal(field_ref("a"), literal("1"))));
ASSERT_EQ(equal(field_ref("a"), literal("1")), tree.code_to_expr_[0]);
SubtreeImpl::expression_codes codes;
auto conj =
and_(equal(field_ref("a"), literal("1")), equal(field_ref("b"), literal("2")));
tree.EncodeConjunctionMembers(conj, &codes);
ASSERT_EQ(SubtreeImpl::expression_codes({0, 1}), codes);
codes.clear();
conj = or_(equal(field_ref("a"), literal("1")), equal(field_ref("b"), literal("2")));
tree.EncodeConjunctionMembers(conj, &codes);
ASSERT_EQ(SubtreeImpl::expression_codes({2}), codes);
}
TEST(Subtree, GetSubtreeExpression) {
SubtreeImpl tree;
const auto expr_a = equal(field_ref("a"), literal("1"));
const auto expr_b = equal(field_ref("b"), literal("2"));
const auto code_a = tree.GetOrInsert(expr_a);
const auto code_b = tree.GetOrInsert(expr_b);
ASSERT_EQ(expr_a,
tree.GetSubtreeExpression(SubtreeImpl::Encoded{util::nullopt, {code_a}}));
ASSERT_EQ(expr_b, tree.GetSubtreeExpression(
SubtreeImpl::Encoded{util::nullopt, {code_a, code_b}}));
}
TEST(Subtree, EncodeFragments) {
auto fragment_schema = schema({});
const auto expr_a =
and_(equal(field_ref("a"), literal("1")), equal(field_ref("b"), literal("2")));
const auto expr_b =
and_(equal(field_ref("a"), literal("2")), equal(field_ref("b"), literal("3")));
std::vector<std::shared_ptr<InMemoryFragment>> fragments;
fragments.push_back(std::make_shared<InMemoryFragment>(
fragment_schema, arrow::RecordBatchVector(), expr_a));
fragments.push_back(std::make_shared<InMemoryFragment>(
fragment_schema, arrow::RecordBatchVector(), expr_b));
SubtreeImpl tree;
auto encoded = tree.EncodeFragments(fragments);
EXPECT_THAT(
tree.code_to_expr_,
ContainerEq(std::vector<Expression>{
equal(field_ref("a"), literal("1")), equal(field_ref("b"), literal("2")),
equal(field_ref("a"), literal("2")), equal(field_ref("b"), literal("3"))}));
EXPECT_THAT(
encoded,
testing::UnorderedElementsAreArray({
SubtreeImpl::Encoded{util::make_optional<int>(0),
SubtreeImpl::expression_codes({0, 1})},
SubtreeImpl::Encoded{util::make_optional<int>(1),
SubtreeImpl::expression_codes({2, 3})},
SubtreeImpl::Encoded{util::nullopt, SubtreeImpl::expression_codes({0})},
SubtreeImpl::Encoded{util::nullopt, SubtreeImpl::expression_codes({2})},
SubtreeImpl::Encoded{util::nullopt, SubtreeImpl::expression_codes({0, 1})},
SubtreeImpl::Encoded{util::nullopt, SubtreeImpl::expression_codes({2, 3})},
}));
}
} // namespace dataset
} // namespace arrow