blob: 676eb752aa4cdc521b60b67beceea89484be46dd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/dataset/partition.h"
#include <algorithm>
#include <chrono>
#include <map>
#include <memory>
#include <stack>
#include <utility>
#include <vector>
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/filter.h"
#include "arrow/dataset/scanner.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/scalar.h"
#include "arrow/util/iterator.h"
#include "arrow/util/range.h"
#include "arrow/util/sort.h"
#include "arrow/util/string_view.h"
namespace arrow {
namespace dataset {
using util::string_view;
using internal::checked_cast;
Result<std::shared_ptr<Expression>> Partitioning::Parse(const std::string& path) const {
ExpressionVector expressions;
int i = 0;
for (auto segment : fs::internal::SplitAbstractPath(path)) {
ARROW_ASSIGN_OR_RAISE(auto expr, Parse(segment, i++));
if (expr->Equals(true)) {
return and_(std::move(expressions));
std::shared_ptr<Partitioning> Partitioning::Default() {
return std::make_shared<DefaultPartitioning>();
Result<WritePlan> PartitioningFactory::MakeWritePlan(FragmentIterator fragment_it) {
return Status::NotImplemented("MakeWritePlan from PartitioningFactory of type ",
Result<WritePlan> PartitioningFactory::MakeWritePlan(
FragmentIterator fragment_it, const std::shared_ptr<Schema>& schema) {
return Status::NotImplemented("MakeWritePlan from PartitioningFactory of type ",
Result<std::shared_ptr<Expression>> SegmentDictionaryPartitioning::Parse(
const std::string& segment, int i) const {
if (static_cast<size_t>(i) < dictionaries_.size()) {
auto it = dictionaries_[i].find(segment);
if (it != dictionaries_[i].end()) {
return it->second;
return scalar(true);
Status KeyValuePartitioning::VisitKeys(
const Expression& expr,
const std::function<Status(const std::string& name,
const std::shared_ptr<Scalar>& value)>& visitor) {
if (expr.type() == ExpressionType::AND) {
const auto& and_ = checked_cast<const AndExpression&>(expr);
RETURN_NOT_OK(VisitKeys(*and_.left_operand(), visitor));
RETURN_NOT_OK(VisitKeys(*and_.right_operand(), visitor));
return Status::OK();
if (expr.type() != ExpressionType::COMPARISON) {
return Status::OK();
const auto& cmp = checked_cast<const ComparisonExpression&>(expr);
if (cmp.op() != compute::CompareOperator::EQUAL) {
return Status::OK();
auto lhs = cmp.left_operand().get();
auto rhs = cmp.right_operand().get();
if (lhs->type() != ExpressionType::FIELD) std::swap(lhs, rhs);
if (lhs->type() != ExpressionType::FIELD || rhs->type() != ExpressionType::SCALAR) {
return Status::OK();
return visitor(checked_cast<const FieldExpression*>(lhs)->name(),
checked_cast<const ScalarExpression*>(rhs)->value());
Status KeyValuePartitioning::SetDefaultValuesFromKeys(const Expression& expr,
RecordBatchProjector* projector) {
return KeyValuePartitioning::VisitKeys(
expr, [projector](const std::string& name, const std::shared_ptr<Scalar>& value) {
if (match.indices().empty()) {
return Status::OK();
return projector->SetDefaultValue(match, value);
Result<std::shared_ptr<Expression>> KeyValuePartitioning::ConvertKey(
const Key& key, const Schema& schema) {
ARROW_ASSIGN_OR_RAISE(auto field, FieldRef(;
if (field == nullptr) {
return scalar(true);
ARROW_ASSIGN_OR_RAISE(auto converted, Scalar::Parse(field->type(), key.value));
return equal(field_ref(field->name()), scalar(converted));
Result<std::shared_ptr<Expression>> KeyValuePartitioning::Parse(
const std::string& segment, int i) const {
if (auto key = ParseKey(segment, i)) {
return ConvertKey(*key, *schema_);
return scalar(true);
Result<std::string> KeyValuePartitioning::Format(const Expression& expr, int i) const {
if (expr.type() != ExpressionType::COMPARISON) {
return Status::Invalid(expr.ToString(), " is not a comparison expression");
const auto& cmp = checked_cast<const ComparisonExpression&>(expr);
if (cmp.op() != compute::CompareOperator::EQUAL) {
return Status::Invalid(expr.ToString(), " is not an equality comparison expression");
if (cmp.left_operand()->type() != ExpressionType::FIELD) {
return Status::Invalid(expr.ToString(), " LHS is not a field");
const auto& lhs = checked_cast<const FieldExpression&>(*cmp.left_operand());
if (cmp.right_operand()->type() != ExpressionType::SCALAR) {
return Status::Invalid(expr.ToString(), " RHS is not a scalar");
const auto& rhs = checked_cast<const ScalarExpression&>(*cmp.right_operand());
auto expected_type = schema_->GetFieldByName(>type();
if (!rhs.value()->type->Equals(expected_type)) {
return Status::TypeError(expr.ToString(), " expected RHS to have type ",
return FormatKey({, rhs.value()->ToString()}, i);
util::optional<KeyValuePartitioning::Key> DirectoryPartitioning::ParseKey(
const std::string& segment, int i) const {
if (i >= schema_->num_fields()) {
return util::nullopt;
return Key{schema_->field(i)->name(), segment};
Result<std::string> DirectoryPartitioning::FormatKey(const Key& key, int i) const {
if (schema_->GetFieldIndex( != i) {
return Status::Invalid("field ",, " in unexpected position ", i,
" for schema ", *schema_);
return key.value;
class KeyValuePartitioningInspectImpl {
static Result<std::shared_ptr<DataType>> InferType(
const std::string& name, const std::vector<std::string>& reprs) {
if (reprs.empty()) {
return Status::Invalid("No segments were available for field '", name,
"'; couldn't infer type");
bool all_integral = std::all_of(reprs.begin(), reprs.end(), [](string_view repr) {
// TODO(bkietz) use ParseUnsigned or so
return repr.find_first_not_of("0123456789") == string_view::npos;
if (all_integral) {
return int32();
return utf8();
int GetOrInsertField(const std::string& name) {
auto name_index =
name_to_index_.emplace(name, static_cast<int>(name_to_index_.size())).first;
if (static_cast<size_t>(name_index->second) >= values_.size()) {
values_.resize(name_index->second + 1);
return name_index->second;
void InsertRepr(const std::string& name, std::string repr) {
InsertRepr(GetOrInsertField(name), std::move(repr));
void InsertRepr(int index, std::string repr) {
Result<std::shared_ptr<Schema>> Finish() {
std::vector<std::shared_ptr<Field>> fields(name_to_index_.size());
for (const auto& name_index : name_to_index_) {
const auto& name = name_index.first;
auto index = name_index.second;
ARROW_ASSIGN_OR_RAISE(auto type, InferType(name, values_[index]));
fields[index] = field(name, type);
return ::arrow::schema(std::move(fields));
std::unordered_map<std::string, int> name_to_index_;
std::vector<std::vector<std::string>> values_;
class DirectoryPartitioningFactory : public PartitioningFactory {
explicit DirectoryPartitioningFactory(std::vector<std::string> field_names)
: field_names_(std::move(field_names)) {}
std::string type_name() const override { return "schema"; }
Result<std::shared_ptr<Schema>> Inspect(
const std::vector<string_view>& paths) const override {
KeyValuePartitioningInspectImpl impl;
for (const auto& name : field_names_) {
for (auto path : paths) {
size_t field_index = 0;
for (auto&& segment : fs::internal::SplitAbstractPath(path.to_string())) {
if (field_index == field_names_.size()) break;
impl.InsertRepr(static_cast<int>(field_index++), std::move(segment));
return impl.Finish();
Result<std::shared_ptr<Partitioning>> Finish(
const std::shared_ptr<Schema>& schema) const override {
for (FieldRef ref : field_names_) {
// ensure all of field_names_ are present in schema
// drop fields which aren't in field_names_
auto out_schema = SchemaFromColumnNames(schema, field_names_);
return std::make_shared<DirectoryPartitioning>(std::move(out_schema));
struct MakeWritePlanImpl;
Result<WritePlan> MakeWritePlan(FragmentIterator fragments) override;
Result<WritePlan> MakeWritePlan(FragmentIterator fragments,
const std::shared_ptr<Schema>& schema) override;
std::vector<std::string> field_names_;
struct DirectoryPartitioningFactory::MakeWritePlanImpl {
using Indices = std::basic_string<int>;
MakeWritePlanImpl(DirectoryPartitioningFactory* factory,
FragmentVector source_fragments)
: this_(factory),
right_hand_sides_(source_fragments_.size(), Indices(num_fields(), -1)) {}
int num_fields() const { return static_cast<int>(this_->field_names_.size()); }
// For a KeyValuePartitioning, every partition expression will be an equality
// ComparisonExpression where the left operand is a FieldExpression and the right is a
// ScalarExpression. Comparing Scalars directly is expensive, so first assemble a
// dictionary containing the scalars from the right operands of every partition
// expression. This allows later stages of MakeWritePlan to handle a scalar by its
// dictionary code, which is both more compact to store and cheap to compare.
// Scalars are stored such that the dictionary code of a fragment's RHS in the
// partition expression for a given field is given by
// int code = right_hand_sides_[fragment_index][field_index];
// and the corresponding scalar can be retrieved with
// std::shared_ptr<Scalar> scalar = scalar_dict_.code_to_scalar[code];
Status DictEncodeRightHandSides() {
if (source_fragments_.empty()) {
return Status::OK();
for (size_t fragment_i = 0; fragment_i < source_fragments_.size(); ++fragment_i) {
const auto& fragment = source_fragments_[fragment_i];
auto insert_representable_into_dict = [this, fragment_i](
const std::string& name,
const std::shared_ptr<Scalar>& value) {
auto it = std::find(this_->field_names_.begin(), this_->field_names_.end(), name);
if (it == this_->field_names_.end()) {
return Status::OK();
auto field_i = it - this_->field_names_.begin();
int code = scalar_dict_.GetOrInsert(value);
right_hand_sides_[fragment_i][field_i] = code;
return Status::OK();
auto it = std::find(right_hand_sides_[fragment_i].begin(),
right_hand_sides_[fragment_i].end(), -1);
if (it != right_hand_sides_[fragment_i].end()) {
// NB: this is an error when writing DirectoryPartitioning but not
// HivePartitioning (as it will be valid to simply omit segments)
return Status::Invalid(
"fragment ", fragment_i, " had no partition expression for field '",
this_-> - right_hand_sides_[fragment_i].begin()), "'");
return Status::OK();
// Infer the Partitioning schema from partition expressions.
// For example if one partition expression is "omega"_ == 13
// we can infer that the field "omega" has type int32
Result<std::shared_ptr<Schema>> InferPartitioningSchema() const {
if (source_fragments_.empty()) {
return Status::Invalid(
"No fragments were provided so the Partitioning schema could not be "
// NB: under DirectoryPartitioning every fragment has a partition expression for every
// field, so we can infer the schema by looking only at the first fragment. This will
// be more complicated for HivePartitioning.
int fragment_i = 0;
FieldVector fields(num_fields());
for (int field_i = 0; field_i < num_fields(); ++field_i) {
const auto& name = this_->field_names_[field_i];
const auto& type =
fields[field_i] = field(name, type);
return schema(std::move(fields));
// reconstitute fragment_i's partition expression for field_i by reading the right
// hand side from the scalar dictionary and constructing an equality
// ComparisonExpression
std::shared_ptr<Expression> PartitionExpression(size_t fragment_i, int field_i) {
auto left_hand_side = field_ref(this_->field_names_[field_i]);
auto right_hand_side =
return equal(std::move(left_hand_side), std::move(right_hand_side));
// create a guid by stringifying the number of milliseconds since the epoch
std::string Guid() {
using std::chrono::duration_cast;
using std::chrono::milliseconds;
using std::chrono::steady_clock;
auto milliseconds_since_epoch =
return std::to_string(milliseconds_since_epoch);
// remove fields which will be implicit in the partitioning; writing them to files would
// be redundant
Status DropPartitionFields(const std::shared_ptr<Partitioning>& partitioning,
Fragment* fragment) {
auto schema = fragment->schema();
for (const auto& field : partitioning->schema()->fields()) {
int field_i = schema->GetFieldIndex(field->name());
if (field_i != -1) {
ARROW_ASSIGN_OR_RAISE(schema, schema->RemoveField(field_i));
// the fragment being scanned to disk will now deselect redundant columns
fragment->scan_options()->projector = RecordBatchProjector(std::move(schema));
return Status::OK();
Result<WritePlan> Finish(std::shared_ptr<Schema> partitioning_schema = nullptr) && {
WritePlan out;
if (partitioning_schema == nullptr) {
ARROW_ASSIGN_OR_RAISE(partitioning_schema, InferPartitioningSchema());
auto fragment_schema =
source_fragments_.empty() ? schema({}) : source_fragments_.front()->schema();
UnifySchemas({out.partitioning->schema(), fragment_schema}));
// Lexicographic ordering WRT right_hand_sides_ ensures that source_fragments_ are in
// a depth first visitation order WRT their partition expressions. This makes
// generation of the full directory tree far simpler since a directory's files are
// grouped.
auto permutation = internal::ArgSort(right_hand_sides_);
internal::Permute(permutation, &source_fragments_);
internal::Permute(permutation, &right_hand_sides_);
// the basename of out.paths[i] is stored in segments[i] (full paths will be assembled
// after segments is complete)
std::vector<std::string> segments;
// out.paths[parents[i]] is the parent directory of out.paths[i]
std::vector<int> parents;
// current_right_hand_sides[field_i] is the RHS dictionary code for the current
// partition expression corresponding to field_i
Indices current_right_hand_sides(num_fields(), -1);
// out.paths[current_parents[field_i]] is the current ancestor directory corresponding
// to field_i
Indices current_parents(num_fields() + 1, -1);
for (size_t fragment_i = 0; fragment_i < source_fragments_.size(); ++fragment_i) {
DropPartitionFields(out.partitioning, source_fragments_[fragment_i].get()));
int field_i = 0;
for (; field_i < num_fields(); ++field_i) {
// these directories have already been created and we're still writing their
// children
if (right_hand_sides_[fragment_i][field_i] != current_right_hand_sides[field_i]) {
for (; field_i < num_fields(); ++field_i) {
// push a new directory
current_parents[field_i + 1] = static_cast<int>(parents.size());
auto partition_expression = PartitionExpression(fragment_i, field_i);
// format segment for partition_expression
out.partitioning->Format(*partition_expression, field_i));
// store partition_expression for use in the written Dataset
current_right_hand_sides[field_i] = right_hand_sides_[fragment_i][field_i];
// push a fragment (not attempting to give files meaningful names)
segments.emplace_back(Guid() + "_" + std::to_string(fragment_i));
// store a fragment for writing to disk
// render paths from segments
for (size_t i = 0; i < segments.size(); ++i) {
if (parents[i] == -1) {
out.paths.push_back(out.paths[parents[i]] + segments[i]);
return out;
DirectoryPartitioningFactory* this_;
FragmentVector source_fragments_;
struct {
std::unordered_map<std::shared_ptr<Scalar>, int, Scalar::Hash, Scalar::PtrsEqual>
ScalarVector code_to_scalar;
int GetOrInsert(const std::shared_ptr<Scalar>& scalar) {
int new_code = static_cast<int>(code_to_scalar.size());
auto it_inserted = scalar_to_code.emplace(scalar, new_code);
if (!it_inserted.second) {
return it_inserted.first->second;
return new_code;
} scalar_dict_;
std::vector<Indices> right_hand_sides_;
Result<WritePlan> DirectoryPartitioningFactory::MakeWritePlan(
FragmentIterator fragment_it, const std::shared_ptr<Schema>& schema) {
ARROW_ASSIGN_OR_RAISE(auto fragments, fragment_it.ToVector());
return MakeWritePlanImpl(this, std::move(fragments)).Finish(schema);
Result<WritePlan> DirectoryPartitioningFactory::MakeWritePlan(
FragmentIterator fragment_it) {
ARROW_ASSIGN_OR_RAISE(auto fragments, fragment_it.ToVector());
return MakeWritePlanImpl(this, std::move(fragments)).Finish();
std::shared_ptr<PartitioningFactory> DirectoryPartitioning::MakeFactory(
std::vector<std::string> field_names) {
return std::shared_ptr<PartitioningFactory>(
new DirectoryPartitioningFactory(std::move(field_names)));
util::optional<KeyValuePartitioning::Key> HivePartitioning::ParseKey(
const std::string& segment) {
auto name_end = string_view(segment).find_first_of('=');
if (name_end == string_view::npos) {
return util::nullopt;
return Key{segment.substr(0, name_end), segment.substr(name_end + 1)};
Result<std::string> HivePartitioning::FormatKey(const Key& key, int i) const {
return + "=" + key.value;
class HivePartitioningFactory : public PartitioningFactory {
std::string type_name() const override { return "hive"; }
Result<std::shared_ptr<Schema>> Inspect(
const std::vector<string_view>& paths) const override {
KeyValuePartitioningInspectImpl impl;
for (auto path : paths) {
for (auto&& segment : fs::internal::SplitAbstractPath(path.to_string())) {
if (auto key = HivePartitioning::ParseKey(segment)) {
impl.InsertRepr(key->name, key->value);
return impl.Finish();
Result<std::shared_ptr<Partitioning>> Finish(
const std::shared_ptr<Schema>& schema) const override {
return std::shared_ptr<Partitioning>(new HivePartitioning(schema));
std::shared_ptr<PartitioningFactory> HivePartitioning::MakeFactory() {
return std::shared_ptr<PartitioningFactory>(new HivePartitioningFactory());
} // namespace dataset
} // namespace arrow