blob: 676eb752aa4cdc521b60b67beceea89484be46dd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/dataset/partition.h"
#include <algorithm>
#include <chrono>
#include <map>
#include <memory>
#include <stack>
#include <utility>
#include <vector>
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/filter.h"
#include "arrow/dataset/scanner.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/scalar.h"
#include "arrow/util/iterator.h"
#include "arrow/util/range.h"
#include "arrow/util/sort.h"
#include "arrow/util/string_view.h"
namespace arrow {
namespace dataset {
using util::string_view;
using internal::checked_cast;
Result<std::shared_ptr<Expression>> Partitioning::Parse(const std::string& path) const {
ExpressionVector expressions;
int i = 0;
for (auto segment : fs::internal::SplitAbstractPath(path)) {
ARROW_ASSIGN_OR_RAISE(auto expr, Parse(segment, i++));
if (expr->Equals(true)) {
continue;
}
expressions.push_back(std::move(expr));
}
return and_(std::move(expressions));
}
std::shared_ptr<Partitioning> Partitioning::Default() {
return std::make_shared<DefaultPartitioning>();
}
Result<WritePlan> PartitioningFactory::MakeWritePlan(FragmentIterator fragment_it) {
return Status::NotImplemented("MakeWritePlan from PartitioningFactory of type ",
type_name());
}
Result<WritePlan> PartitioningFactory::MakeWritePlan(
FragmentIterator fragment_it, const std::shared_ptr<Schema>& schema) {
return Status::NotImplemented("MakeWritePlan from PartitioningFactory of type ",
type_name());
}
Result<std::shared_ptr<Expression>> SegmentDictionaryPartitioning::Parse(
const std::string& segment, int i) const {
if (static_cast<size_t>(i) < dictionaries_.size()) {
auto it = dictionaries_[i].find(segment);
if (it != dictionaries_[i].end()) {
return it->second;
}
}
return scalar(true);
}
Status KeyValuePartitioning::VisitKeys(
const Expression& expr,
const std::function<Status(const std::string& name,
const std::shared_ptr<Scalar>& value)>& visitor) {
if (expr.type() == ExpressionType::AND) {
const auto& and_ = checked_cast<const AndExpression&>(expr);
RETURN_NOT_OK(VisitKeys(*and_.left_operand(), visitor));
RETURN_NOT_OK(VisitKeys(*and_.right_operand(), visitor));
return Status::OK();
}
if (expr.type() != ExpressionType::COMPARISON) {
return Status::OK();
}
const auto& cmp = checked_cast<const ComparisonExpression&>(expr);
if (cmp.op() != compute::CompareOperator::EQUAL) {
return Status::OK();
}
auto lhs = cmp.left_operand().get();
auto rhs = cmp.right_operand().get();
if (lhs->type() != ExpressionType::FIELD) std::swap(lhs, rhs);
if (lhs->type() != ExpressionType::FIELD || rhs->type() != ExpressionType::SCALAR) {
return Status::OK();
}
return visitor(checked_cast<const FieldExpression*>(lhs)->name(),
checked_cast<const ScalarExpression*>(rhs)->value());
}
Status KeyValuePartitioning::SetDefaultValuesFromKeys(const Expression& expr,
RecordBatchProjector* projector) {
return KeyValuePartitioning::VisitKeys(
expr, [projector](const std::string& name, const std::shared_ptr<Scalar>& value) {
ARROW_ASSIGN_OR_RAISE(auto match,
FieldRef(name).FindOneOrNone(*projector->schema()));
if (match.indices().empty()) {
return Status::OK();
}
return projector->SetDefaultValue(match, value);
});
}
Result<std::shared_ptr<Expression>> KeyValuePartitioning::ConvertKey(
const Key& key, const Schema& schema) {
ARROW_ASSIGN_OR_RAISE(auto field, FieldRef(key.name).GetOneOrNone(schema));
if (field == nullptr) {
return scalar(true);
}
ARROW_ASSIGN_OR_RAISE(auto converted, Scalar::Parse(field->type(), key.value));
return equal(field_ref(field->name()), scalar(converted));
}
Result<std::shared_ptr<Expression>> KeyValuePartitioning::Parse(
const std::string& segment, int i) const {
if (auto key = ParseKey(segment, i)) {
return ConvertKey(*key, *schema_);
}
return scalar(true);
}
Result<std::string> KeyValuePartitioning::Format(const Expression& expr, int i) const {
if (expr.type() != ExpressionType::COMPARISON) {
return Status::Invalid(expr.ToString(), " is not a comparison expression");
}
const auto& cmp = checked_cast<const ComparisonExpression&>(expr);
if (cmp.op() != compute::CompareOperator::EQUAL) {
return Status::Invalid(expr.ToString(), " is not an equality comparison expression");
}
if (cmp.left_operand()->type() != ExpressionType::FIELD) {
return Status::Invalid(expr.ToString(), " LHS is not a field");
}
const auto& lhs = checked_cast<const FieldExpression&>(*cmp.left_operand());
if (cmp.right_operand()->type() != ExpressionType::SCALAR) {
return Status::Invalid(expr.ToString(), " RHS is not a scalar");
}
const auto& rhs = checked_cast<const ScalarExpression&>(*cmp.right_operand());
auto expected_type = schema_->GetFieldByName(lhs.name())->type();
if (!rhs.value()->type->Equals(expected_type)) {
return Status::TypeError(expr.ToString(), " expected RHS to have type ",
*expected_type);
}
return FormatKey({lhs.name(), rhs.value()->ToString()}, i);
}
util::optional<KeyValuePartitioning::Key> DirectoryPartitioning::ParseKey(
const std::string& segment, int i) const {
if (i >= schema_->num_fields()) {
return util::nullopt;
}
return Key{schema_->field(i)->name(), segment};
}
Result<std::string> DirectoryPartitioning::FormatKey(const Key& key, int i) const {
if (schema_->GetFieldIndex(key.name) != i) {
return Status::Invalid("field ", key.name, " in unexpected position ", i,
" for schema ", *schema_);
}
return key.value;
}
class KeyValuePartitioningInspectImpl {
public:
static Result<std::shared_ptr<DataType>> InferType(
const std::string& name, const std::vector<std::string>& reprs) {
if (reprs.empty()) {
return Status::Invalid("No segments were available for field '", name,
"'; couldn't infer type");
}
bool all_integral = std::all_of(reprs.begin(), reprs.end(), [](string_view repr) {
// TODO(bkietz) use ParseUnsigned or so
return repr.find_first_not_of("0123456789") == string_view::npos;
});
if (all_integral) {
return int32();
}
return utf8();
}
int GetOrInsertField(const std::string& name) {
auto name_index =
name_to_index_.emplace(name, static_cast<int>(name_to_index_.size())).first;
if (static_cast<size_t>(name_index->second) >= values_.size()) {
values_.resize(name_index->second + 1);
}
return name_index->second;
}
void InsertRepr(const std::string& name, std::string repr) {
InsertRepr(GetOrInsertField(name), std::move(repr));
}
void InsertRepr(int index, std::string repr) {
values_[index].push_back(std::move(repr));
}
Result<std::shared_ptr<Schema>> Finish() {
std::vector<std::shared_ptr<Field>> fields(name_to_index_.size());
for (const auto& name_index : name_to_index_) {
const auto& name = name_index.first;
auto index = name_index.second;
ARROW_ASSIGN_OR_RAISE(auto type, InferType(name, values_[index]));
fields[index] = field(name, type);
}
return ::arrow::schema(std::move(fields));
}
private:
std::unordered_map<std::string, int> name_to_index_;
std::vector<std::vector<std::string>> values_;
};
class DirectoryPartitioningFactory : public PartitioningFactory {
public:
explicit DirectoryPartitioningFactory(std::vector<std::string> field_names)
: field_names_(std::move(field_names)) {}
std::string type_name() const override { return "schema"; }
Result<std::shared_ptr<Schema>> Inspect(
const std::vector<string_view>& paths) const override {
KeyValuePartitioningInspectImpl impl;
for (const auto& name : field_names_) {
impl.GetOrInsertField(name);
}
for (auto path : paths) {
size_t field_index = 0;
for (auto&& segment : fs::internal::SplitAbstractPath(path.to_string())) {
if (field_index == field_names_.size()) break;
impl.InsertRepr(static_cast<int>(field_index++), std::move(segment));
}
}
return impl.Finish();
}
Result<std::shared_ptr<Partitioning>> Finish(
const std::shared_ptr<Schema>& schema) const override {
for (FieldRef ref : field_names_) {
// ensure all of field_names_ are present in schema
RETURN_NOT_OK(ref.FindOne(*schema).status());
}
// drop fields which aren't in field_names_
auto out_schema = SchemaFromColumnNames(schema, field_names_);
return std::make_shared<DirectoryPartitioning>(std::move(out_schema));
}
struct MakeWritePlanImpl;
Result<WritePlan> MakeWritePlan(FragmentIterator fragments) override;
Result<WritePlan> MakeWritePlan(FragmentIterator fragments,
const std::shared_ptr<Schema>& schema) override;
private:
std::vector<std::string> field_names_;
};
struct DirectoryPartitioningFactory::MakeWritePlanImpl {
using Indices = std::basic_string<int>;
MakeWritePlanImpl(DirectoryPartitioningFactory* factory,
FragmentVector source_fragments)
: this_(factory),
source_fragments_(std::move(source_fragments)),
right_hand_sides_(source_fragments_.size(), Indices(num_fields(), -1)) {}
int num_fields() const { return static_cast<int>(this_->field_names_.size()); }
// For a KeyValuePartitioning, every partition expression will be an equality
// ComparisonExpression where the left operand is a FieldExpression and the right is a
// ScalarExpression. Comparing Scalars directly is expensive, so first assemble a
// dictionary containing the scalars from the right operands of every partition
// expression. This allows later stages of MakeWritePlan to handle a scalar by its
// dictionary code, which is both more compact to store and cheap to compare.
//
// Scalars are stored such that the dictionary code of a fragment's RHS in the
// partition expression for a given field is given by
// int code = right_hand_sides_[fragment_index][field_index];
// and the corresponding scalar can be retrieved with
// std::shared_ptr<Scalar> scalar = scalar_dict_.code_to_scalar[code];
Status DictEncodeRightHandSides() {
if (source_fragments_.empty()) {
return Status::OK();
}
for (size_t fragment_i = 0; fragment_i < source_fragments_.size(); ++fragment_i) {
const auto& fragment = source_fragments_[fragment_i];
auto insert_representable_into_dict = [this, fragment_i](
const std::string& name,
const std::shared_ptr<Scalar>& value) {
auto it = std::find(this_->field_names_.begin(), this_->field_names_.end(), name);
if (it == this_->field_names_.end()) {
return Status::OK();
}
auto field_i = it - this_->field_names_.begin();
int code = scalar_dict_.GetOrInsert(value);
right_hand_sides_[fragment_i][field_i] = code;
return Status::OK();
};
RETURN_NOT_OK(KeyValuePartitioning::VisitKeys(*fragment->partition_expression(),
insert_representable_into_dict));
auto it = std::find(right_hand_sides_[fragment_i].begin(),
right_hand_sides_[fragment_i].end(), -1);
if (it != right_hand_sides_[fragment_i].end()) {
// NB: this is an error when writing DirectoryPartitioning but not
// HivePartitioning (as it will be valid to simply omit segments)
return Status::Invalid(
"fragment ", fragment_i, " had no partition expression for field '",
this_->field_names_.at(it - right_hand_sides_[fragment_i].begin()), "'");
}
}
return Status::OK();
}
// Infer the Partitioning schema from partition expressions.
// For example if one partition expression is "omega"_ == 13
// we can infer that the field "omega" has type int32
Result<std::shared_ptr<Schema>> InferPartitioningSchema() const {
if (source_fragments_.empty()) {
return Status::Invalid(
"No fragments were provided so the Partitioning schema could not be "
"inferred.");
}
// NB: under DirectoryPartitioning every fragment has a partition expression for every
// field, so we can infer the schema by looking only at the first fragment. This will
// be more complicated for HivePartitioning.
int fragment_i = 0;
FieldVector fields(num_fields());
for (int field_i = 0; field_i < num_fields(); ++field_i) {
const auto& name = this_->field_names_[field_i];
const auto& type =
scalar_dict_.code_to_scalar[right_hand_sides_[fragment_i][field_i]]->type;
fields[field_i] = field(name, type);
}
return schema(std::move(fields));
}
// reconstitute fragment_i's partition expression for field_i by reading the right
// hand side from the scalar dictionary and constructing an equality
// ComparisonExpression
std::shared_ptr<Expression> PartitionExpression(size_t fragment_i, int field_i) {
auto left_hand_side = field_ref(this_->field_names_[field_i]);
auto right_hand_side =
scalar(scalar_dict_.code_to_scalar[right_hand_sides_[fragment_i][field_i]]);
return equal(std::move(left_hand_side), std::move(right_hand_side));
}
// create a guid by stringifying the number of milliseconds since the epoch
std::string Guid() {
using std::chrono::duration_cast;
using std::chrono::milliseconds;
using std::chrono::steady_clock;
auto milliseconds_since_epoch =
duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();
return std::to_string(milliseconds_since_epoch);
}
// remove fields which will be implicit in the partitioning; writing them to files would
// be redundant
Status DropPartitionFields(const std::shared_ptr<Partitioning>& partitioning,
Fragment* fragment) {
auto schema = fragment->schema();
for (const auto& field : partitioning->schema()->fields()) {
int field_i = schema->GetFieldIndex(field->name());
if (field_i != -1) {
ARROW_ASSIGN_OR_RAISE(schema, schema->RemoveField(field_i));
}
}
// the fragment being scanned to disk will now deselect redundant columns
fragment->scan_options()->projector = RecordBatchProjector(std::move(schema));
return Status::OK();
}
Result<WritePlan> Finish(std::shared_ptr<Schema> partitioning_schema = nullptr) && {
WritePlan out;
RETURN_NOT_OK(DictEncodeRightHandSides());
if (partitioning_schema == nullptr) {
ARROW_ASSIGN_OR_RAISE(partitioning_schema, InferPartitioningSchema());
}
ARROW_ASSIGN_OR_RAISE(out.partitioning,
this_->Finish(std::move(partitioning_schema)));
auto fragment_schema =
source_fragments_.empty() ? schema({}) : source_fragments_.front()->schema();
ARROW_ASSIGN_OR_RAISE(out.schema,
UnifySchemas({out.partitioning->schema(), fragment_schema}));
// Lexicographic ordering WRT right_hand_sides_ ensures that source_fragments_ are in
// a depth first visitation order WRT their partition expressions. This makes
// generation of the full directory tree far simpler since a directory's files are
// grouped.
auto permutation = internal::ArgSort(right_hand_sides_);
internal::Permute(permutation, &source_fragments_);
internal::Permute(permutation, &right_hand_sides_);
// the basename of out.paths[i] is stored in segments[i] (full paths will be assembled
// after segments is complete)
std::vector<std::string> segments;
// out.paths[parents[i]] is the parent directory of out.paths[i]
std::vector<int> parents;
// current_right_hand_sides[field_i] is the RHS dictionary code for the current
// partition expression corresponding to field_i
Indices current_right_hand_sides(num_fields(), -1);
// out.paths[current_parents[field_i]] is the current ancestor directory corresponding
// to field_i
Indices current_parents(num_fields() + 1, -1);
for (size_t fragment_i = 0; fragment_i < source_fragments_.size(); ++fragment_i) {
RETURN_NOT_OK(
DropPartitionFields(out.partitioning, source_fragments_[fragment_i].get()));
int field_i = 0;
for (; field_i < num_fields(); ++field_i) {
// these directories have already been created and we're still writing their
// children
if (right_hand_sides_[fragment_i][field_i] != current_right_hand_sides[field_i]) {
break;
}
}
for (; field_i < num_fields(); ++field_i) {
// push a new directory
current_parents[field_i + 1] = static_cast<int>(parents.size());
parents.push_back(current_parents[field_i]);
auto partition_expression = PartitionExpression(fragment_i, field_i);
// format segment for partition_expression
ARROW_ASSIGN_OR_RAISE(auto segment,
out.partitioning->Format(*partition_expression, field_i));
segment.push_back(fs::internal::kSep);
segments.push_back(std::move(segment));
// store partition_expression for use in the written Dataset
out.fragment_or_partition_expressions.emplace_back(
std::move(partition_expression));
current_right_hand_sides[field_i] = right_hand_sides_[fragment_i][field_i];
}
// push a fragment (not attempting to give files meaningful names)
parents.push_back(current_parents[field_i]);
segments.emplace_back(Guid() + "_" + std::to_string(fragment_i));
// store a fragment for writing to disk
out.fragment_or_partition_expressions.emplace_back(
std::move(source_fragments_[fragment_i]));
}
// render paths from segments
for (size_t i = 0; i < segments.size(); ++i) {
if (parents[i] == -1) {
out.paths.push_back(segments[i]);
continue;
}
out.paths.push_back(out.paths[parents[i]] + segments[i]);
}
return out;
}
DirectoryPartitioningFactory* this_;
FragmentVector source_fragments_;
struct {
std::unordered_map<std::shared_ptr<Scalar>, int, Scalar::Hash, Scalar::PtrsEqual>
scalar_to_code;
ScalarVector code_to_scalar;
int GetOrInsert(const std::shared_ptr<Scalar>& scalar) {
int new_code = static_cast<int>(code_to_scalar.size());
auto it_inserted = scalar_to_code.emplace(scalar, new_code);
if (!it_inserted.second) {
return it_inserted.first->second;
}
code_to_scalar.push_back(scalar);
return new_code;
}
} scalar_dict_;
std::vector<Indices> right_hand_sides_;
};
Result<WritePlan> DirectoryPartitioningFactory::MakeWritePlan(
FragmentIterator fragment_it, const std::shared_ptr<Schema>& schema) {
ARROW_ASSIGN_OR_RAISE(auto fragments, fragment_it.ToVector());
return MakeWritePlanImpl(this, std::move(fragments)).Finish(schema);
}
Result<WritePlan> DirectoryPartitioningFactory::MakeWritePlan(
FragmentIterator fragment_it) {
ARROW_ASSIGN_OR_RAISE(auto fragments, fragment_it.ToVector());
return MakeWritePlanImpl(this, std::move(fragments)).Finish();
}
std::shared_ptr<PartitioningFactory> DirectoryPartitioning::MakeFactory(
std::vector<std::string> field_names) {
return std::shared_ptr<PartitioningFactory>(
new DirectoryPartitioningFactory(std::move(field_names)));
}
util::optional<KeyValuePartitioning::Key> HivePartitioning::ParseKey(
const std::string& segment) {
auto name_end = string_view(segment).find_first_of('=');
if (name_end == string_view::npos) {
return util::nullopt;
}
return Key{segment.substr(0, name_end), segment.substr(name_end + 1)};
}
Result<std::string> HivePartitioning::FormatKey(const Key& key, int i) const {
return key.name + "=" + key.value;
}
class HivePartitioningFactory : public PartitioningFactory {
public:
std::string type_name() const override { return "hive"; }
Result<std::shared_ptr<Schema>> Inspect(
const std::vector<string_view>& paths) const override {
KeyValuePartitioningInspectImpl impl;
for (auto path : paths) {
for (auto&& segment : fs::internal::SplitAbstractPath(path.to_string())) {
if (auto key = HivePartitioning::ParseKey(segment)) {
impl.InsertRepr(key->name, key->value);
}
}
}
return impl.Finish();
}
Result<std::shared_ptr<Partitioning>> Finish(
const std::shared_ptr<Schema>& schema) const override {
return std::shared_ptr<Partitioning>(new HivePartitioning(schema));
}
};
std::shared_ptr<PartitioningFactory> HivePartitioning::MakeFactory() {
return std::shared_ptr<PartitioningFactory>(new HivePartitioningFactory());
}
} // namespace dataset
} // namespace arrow