blob: 8593613c8f50f40d72184a65cb3b81031cfd22b1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <cmath>
#include <limits>
#include <numeric>
#include <type_traits>
#include <utility>
#include "arrow/array/data.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/kernels/common.h"
#include "arrow/compute/kernels/util_internal.h"
#include "arrow/table.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_block_counter.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/optional.h"
#include "arrow/visitor_inline.h"
namespace arrow {
using internal::checked_cast;
namespace compute {
namespace internal {
// Visit all physical types for which sorting is implemented.
#define VISIT_PHYSICAL_TYPES(VISIT) \
VISIT(Int8Type) \
VISIT(Int16Type) \
VISIT(Int32Type) \
VISIT(Int64Type) \
VISIT(UInt8Type) \
VISIT(UInt16Type) \
VISIT(UInt32Type) \
VISIT(UInt64Type) \
VISIT(FloatType) \
VISIT(DoubleType) \
VISIT(BinaryType) \
VISIT(LargeBinaryType) \
VISIT(FixedSizeBinaryType) \
VISIT(Decimal128Type) \
VISIT(Decimal256Type)
namespace {
// The target chunk in a chunked array.
template <typename ArrayType>
struct ResolvedChunk {
using V = GetViewType<typename ArrayType::TypeClass>;
using LogicalValueType = typename V::T;
// The target array in chunked array.
const ArrayType* array;
// The index in the target array.
const int64_t index;
ResolvedChunk(const ArrayType* array, int64_t index) : array(array), index(index) {}
bool IsNull() const { return array->IsNull(index); }
LogicalValueType Value() const { return V::LogicalValue(array->GetView(index)); }
};
// ResolvedChunk specialization for untyped arrays when all is needed is null lookup
template <>
struct ResolvedChunk<Array> {
// The target array in chunked array.
const Array* array;
// The index in the target array.
const int64_t index;
ResolvedChunk(const Array* array, int64_t index) : array(array), index(index) {}
bool IsNull() const { return array->IsNull(index); }
};
// An object that resolves an array chunk depending on the index.
struct ChunkedArrayResolver {
explicit ChunkedArrayResolver(const std::vector<const Array*>& chunks)
: num_chunks_(static_cast<int64_t>(chunks.size())),
chunks_(chunks.data()),
offsets_(MakeEndOffsets(chunks)),
cached_chunk_(0) {}
template <typename ArrayType>
ResolvedChunk<ArrayType> Resolve(int64_t index) const {
// It is common for the algorithms below to make consecutive accesses at
// a relatively small distance from each other, hence often falling in
// the same chunk.
// This is trivial when merging (assuming each side of the merge uses
// its own resolver), but also in the inner recursive invocations of
// partitioning.
const bool cache_hit =
(index >= offsets_[cached_chunk_] && index < offsets_[cached_chunk_ + 1]);
if (ARROW_PREDICT_TRUE(cache_hit)) {
return ResolvedChunk<ArrayType>(
checked_cast<const ArrayType*>(chunks_[cached_chunk_]),
index - offsets_[cached_chunk_]);
} else {
return ResolveMissBisect<ArrayType>(index);
}
}
private:
template <typename ArrayType>
ResolvedChunk<ArrayType> ResolveMissBisect(int64_t index) const {
// Like std::upper_bound(), but hand-written as it can help the compiler.
const int64_t* raw_offsets = offsets_.data();
// Search [lo, lo + n)
int64_t lo = 0, n = num_chunks_;
while (n > 1) {
int64_t m = n >> 1;
int64_t mid = lo + m;
if (index >= raw_offsets[mid]) {
lo = mid;
n -= m;
} else {
n = m;
}
}
cached_chunk_ = lo;
return ResolvedChunk<ArrayType>(checked_cast<const ArrayType*>(chunks_[lo]),
index - offsets_[lo]);
}
static std::vector<int64_t> MakeEndOffsets(const std::vector<const Array*>& chunks) {
std::vector<int64_t> end_offsets(chunks.size() + 1);
int64_t offset = 0;
end_offsets[0] = 0;
std::transform(chunks.begin(), chunks.end(), end_offsets.begin() + 1,
[&](const Array* chunk) {
offset += chunk->length();
return offset;
});
return end_offsets;
}
int64_t num_chunks_;
const Array* const* chunks_;
std::vector<int64_t> offsets_;
mutable int64_t cached_chunk_;
};
// We could try to reproduce the concrete Array classes' facilities
// (such as cached raw values pointer) in a separate hierarchy of
// physical accessors, but doing so ends up too cumbersome.
// Instead, we simply create the desired concrete Array objects.
std::shared_ptr<Array> GetPhysicalArray(const Array& array,
const std::shared_ptr<DataType>& physical_type) {
auto new_data = array.data()->Copy();
new_data->type = physical_type;
return MakeArray(std::move(new_data));
}
ArrayVector GetPhysicalChunks(const ChunkedArray& chunked_array,
const std::shared_ptr<DataType>& physical_type) {
const auto& chunks = chunked_array.chunks();
ArrayVector physical(chunks.size());
std::transform(chunks.begin(), chunks.end(), physical.begin(),
[&](const std::shared_ptr<Array>& array) {
return GetPhysicalArray(*array, physical_type);
});
return physical;
}
std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays) {
std::vector<const Array*> pointers(arrays.size());
std::transform(arrays.begin(), arrays.end(), pointers.begin(),
[&](const std::shared_ptr<Array>& array) { return array.get(); });
return pointers;
}
// NOTE: std::partition is usually faster than std::stable_partition.
struct NonStablePartitioner {
template <typename Predicate>
uint64_t* operator()(uint64_t* indices_begin, uint64_t* indices_end, Predicate&& pred) {
return std::partition(indices_begin, indices_end, std::forward<Predicate>(pred));
}
};
struct StablePartitioner {
template <typename Predicate>
uint64_t* operator()(uint64_t* indices_begin, uint64_t* indices_end, Predicate&& pred) {
return std::stable_partition(indices_begin, indices_end,
std::forward<Predicate>(pred));
}
};
// TODO factor out value comparison and NaN checking?
template <typename TypeClass, typename Enable = void>
struct NullTraits {
static constexpr bool has_null_like_values = false;
};
template <typename TypeClass>
struct NullTraits<TypeClass, enable_if_floating_point<TypeClass>> {
static constexpr bool has_null_like_values = true;
};
// Move nulls (not null-like values) to end of array. Return where null starts.
//
// `offset` is used when this is called on a chunk of a chunked array
template <typename Partitioner>
uint64_t* PartitionNullsOnly(uint64_t* indices_begin, uint64_t* indices_end,
const Array& values, int64_t offset) {
if (values.null_count() == 0) {
return indices_end;
}
Partitioner partitioner;
return partitioner(indices_begin, indices_end, [&values, &offset](uint64_t ind) {
return !values.IsNull(ind - offset);
});
}
// For chunked array.
template <typename Partitioner>
uint64_t* PartitionNullsOnly(uint64_t* indices_begin, uint64_t* indices_end,
const std::vector<const Array*>& arrays,
int64_t null_count) {
if (null_count == 0) {
return indices_end;
}
ChunkedArrayResolver resolver(arrays);
Partitioner partitioner;
return partitioner(indices_begin, indices_end, [&](uint64_t ind) {
const auto chunk = resolver.Resolve<Array>(ind);
return !chunk.IsNull();
});
}
// Move non-null null-like values to end of array. Return where null-like starts.
//
// `offset` is used when this is called on a chunk of a chunked array
template <typename ArrayType, typename Partitioner>
enable_if_t<!is_floating_type<typename ArrayType::TypeClass>::value, uint64_t*>
PartitionNullLikes(uint64_t* indices_begin, uint64_t* indices_end,
const ArrayType& values, int64_t offset) {
return indices_end;
}
// For chunked array.
template <typename ArrayType, typename Partitioner>
enable_if_t<!is_floating_type<typename ArrayType::TypeClass>::value, uint64_t*>
PartitionNullLikes(uint64_t* indices_begin, uint64_t* indices_end,
const std::vector<const Array*>& arrays, int64_t null_count) {
return indices_end;
}
template <typename ArrayType, typename Partitioner>
enable_if_t<is_floating_type<typename ArrayType::TypeClass>::value, uint64_t*>
PartitionNullLikes(uint64_t* indices_begin, uint64_t* indices_end,
const ArrayType& values, int64_t offset) {
Partitioner partitioner;
return partitioner(indices_begin, indices_end, [&values, &offset](uint64_t ind) {
return !std::isnan(values.GetView(ind - offset));
});
}
template <typename ArrayType, typename Partitioner>
enable_if_t<is_floating_type<typename ArrayType::TypeClass>::value, uint64_t*>
PartitionNullLikes(uint64_t* indices_begin, uint64_t* indices_end,
const std::vector<const Array*>& arrays, int64_t null_count) {
Partitioner partitioner;
ChunkedArrayResolver resolver(arrays);
return partitioner(indices_begin, indices_end, [&](uint64_t ind) {
const auto chunk = resolver.Resolve<ArrayType>(ind);
return !std::isnan(chunk.Value());
});
}
// Move nulls to end of array. Return where null starts.
//
// `offset` is used when this is called on a chunk of a chunked array
template <typename ArrayType, typename Partitioner>
uint64_t* PartitionNulls(uint64_t* indices_begin, uint64_t* indices_end,
const ArrayType& values, int64_t offset) {
// Partition nulls at end, and null-like values just before
uint64_t* nulls_begin =
PartitionNullsOnly<Partitioner>(indices_begin, indices_end, values, offset);
return PartitionNullLikes<ArrayType, Partitioner>(indices_begin, nulls_begin, values,
offset);
}
// For chunked array.
template <typename ArrayType, typename Partitioner>
uint64_t* PartitionNulls(uint64_t* indices_begin, uint64_t* indices_end,
const std::vector<const Array*>& arrays, int64_t null_count) {
// Partition nulls at end, and null-like values just before
uint64_t* nulls_begin =
PartitionNullsOnly<Partitioner>(indices_begin, indices_end, arrays, null_count);
return PartitionNullLikes<ArrayType, Partitioner>(indices_begin, nulls_begin, arrays,
null_count);
}
// ----------------------------------------------------------------------
// partition_nth_indices implementation
// We need to preserve the options
using PartitionNthToIndicesState = internal::OptionsWrapper<PartitionNthOptions>;
template <typename OutType, typename InType>
struct PartitionNthToIndices {
using ArrayType = typename TypeTraits<InType>::ArrayType;
static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
using GetView = GetViewType<InType>;
if (ctx->state() == nullptr) {
ctx->SetStatus(Status::Invalid("NthToIndices requires PartitionNthOptions"));
return;
}
ArrayType arr(batch[0].array());
int64_t pivot = PartitionNthToIndicesState::Get(ctx).pivot;
if (pivot > arr.length()) {
ctx->SetStatus(Status::IndexError("NthToIndices index out of bound"));
return;
}
ArrayData* out_arr = out->mutable_array();
uint64_t* out_begin = out_arr->GetMutableValues<uint64_t>(1);
uint64_t* out_end = out_begin + arr.length();
std::iota(out_begin, out_end, 0);
if (pivot == arr.length()) {
return;
}
auto nulls_begin =
PartitionNulls<ArrayType, NonStablePartitioner>(out_begin, out_end, arr, 0);
auto nth_begin = out_begin + pivot;
if (nth_begin < nulls_begin) {
std::nth_element(out_begin, nth_begin, nulls_begin,
[&arr](uint64_t left, uint64_t right) {
const auto lval = GetView::LogicalValue(arr.GetView(left));
const auto rval = GetView::LogicalValue(arr.GetView(right));
return lval < rval;
});
}
}
};
// ----------------------------------------------------------------------
// Array sorting implementations
template <typename ArrayType, typename VisitorNotNull, typename VisitorNull>
inline void VisitRawValuesInline(const ArrayType& values,
VisitorNotNull&& visitor_not_null,
VisitorNull&& visitor_null) {
const auto data = values.raw_values();
VisitBitBlocksVoid(
values.null_bitmap(), values.offset(), values.length(),
[&](int64_t i) { visitor_not_null(data[i]); }, [&]() { visitor_null(); });
}
template <typename ArrowType>
class ArrayCompareSorter {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using GetView = GetViewType<ArrowType>;
public:
// Returns where null starts.
//
// `offset` is used when this is called on a chunk of a chunked array
uint64_t* Sort(uint64_t* indices_begin, uint64_t* indices_end, const ArrayType& values,
int64_t offset, const ArraySortOptions& options) {
auto nulls_begin = PartitionNulls<ArrayType, StablePartitioner>(
indices_begin, indices_end, values, offset);
if (options.order == SortOrder::Ascending) {
std::stable_sort(
indices_begin, nulls_begin, [&values, &offset](uint64_t left, uint64_t right) {
const auto lhs = GetView::LogicalValue(values.GetView(left - offset));
const auto rhs = GetView::LogicalValue(values.GetView(right - offset));
return lhs < rhs;
});
} else {
std::stable_sort(
indices_begin, nulls_begin, [&values, &offset](uint64_t left, uint64_t right) {
const auto lhs = GetView::LogicalValue(values.GetView(left - offset));
const auto rhs = GetView::LogicalValue(values.GetView(right - offset));
// We don't use 'left > right' here to reduce required operator.
// If we use 'right < left' here, '<' is only required.
return rhs < lhs;
});
}
return nulls_begin;
}
};
template <typename ArrowType>
class ArrayCountSorter {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using c_type = typename ArrowType::c_type;
public:
ArrayCountSorter() = default;
explicit ArrayCountSorter(c_type min, c_type max) { SetMinMax(min, max); }
// Assume: max >= min && (max - min) < 4Gi
void SetMinMax(c_type min, c_type max) {
min_ = min;
value_range_ = static_cast<uint32_t>(max - min) + 1;
}
// Returns where null starts.
uint64_t* Sort(uint64_t* indices_begin, uint64_t* indices_end, const ArrayType& values,
int64_t offset, const ArraySortOptions& options) {
// 32bit counter performs much better than 64bit one
if (values.length() < (1LL << 32)) {
return SortInternal<uint32_t>(indices_begin, indices_end, values, offset, options);
} else {
return SortInternal<uint64_t>(indices_begin, indices_end, values, offset, options);
}
}
private:
c_type min_{0};
uint32_t value_range_{0};
// Returns where null starts.
//
// `offset` is used when this is called on a chunk of a chunked array
template <typename CounterType>
uint64_t* SortInternal(uint64_t* indices_begin, uint64_t* indices_end,
const ArrayType& values, int64_t offset,
const ArraySortOptions& options) {
const uint32_t value_range = value_range_;
// first slot reserved for prefix sum
std::vector<CounterType> counts(1 + value_range);
if (options.order == SortOrder::Ascending) {
VisitRawValuesInline(
values, [&](c_type v) { ++counts[v - min_ + 1]; }, []() {});
for (uint32_t i = 1; i <= value_range; ++i) {
counts[i] += counts[i - 1];
}
auto null_position = counts[value_range];
auto nulls_begin = indices_begin + null_position;
int64_t index = offset;
VisitRawValuesInline(
values, [&](c_type v) { indices_begin[counts[v - min_]++] = index++; },
[&]() { indices_begin[null_position++] = index++; });
return nulls_begin;
} else {
VisitRawValuesInline(
values, [&](c_type v) { ++counts[v - min_]; }, []() {});
for (uint32_t i = value_range; i >= 1; --i) {
counts[i - 1] += counts[i];
}
auto null_position = counts[0];
auto nulls_begin = indices_begin + null_position;
int64_t index = offset;
VisitRawValuesInline(
values, [&](c_type v) { indices_begin[counts[v - min_ + 1]++] = index++; },
[&]() { indices_begin[null_position++] = index++; });
return nulls_begin;
}
}
};
// Sort integers with counting sort or comparison based sorting algorithm
// - Use O(n) counting sort if values are in a small range
// - Use O(nlogn) std::stable_sort otherwise
template <typename ArrowType>
class ArrayCountOrCompareSorter {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using c_type = typename ArrowType::c_type;
public:
// Returns where null starts.
//
// `offset` is used when this is called on a chunk of a chunked array
uint64_t* Sort(uint64_t* indices_begin, uint64_t* indices_end, const ArrayType& values,
int64_t offset, const ArraySortOptions& options) {
if (values.length() >= countsort_min_len_ && values.length() > values.null_count()) {
c_type min, max;
std::tie(min, max) = GetMinMax<c_type>(*values.data());
// For signed int32/64, (max - min) may overflow and trigger UBSAN.
// Cast to largest unsigned type(uint64_t) before subtraction.
if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <=
countsort_max_range_) {
count_sorter_.SetMinMax(min, max);
return count_sorter_.Sort(indices_begin, indices_end, values, offset, options);
}
}
return compare_sorter_.Sort(indices_begin, indices_end, values, offset, options);
}
private:
ArrayCompareSorter<ArrowType> compare_sorter_;
ArrayCountSorter<ArrowType> count_sorter_;
// Cross point to prefer counting sort than stl::stable_sort(merge sort)
// - array to be sorted is longer than "count_min_len_"
// - value range (max-min) is within "count_max_range_"
//
// The optimal setting depends heavily on running CPU. Below setting is
// conservative to adapt to various hardware and keep code simple.
// It's possible to decrease array-len and/or increase value-range to cover
// more cases, or setup a table for best array-len/value-range combinations.
// See https://issues.apache.org/jira/browse/ARROW-1571 for detailed analysis.
static const uint32_t countsort_min_len_ = 1024;
static const uint32_t countsort_max_range_ = 4096;
};
template <typename Type, typename Enable = void>
struct ArraySorter;
template <>
struct ArraySorter<UInt8Type> {
ArrayCountSorter<UInt8Type> impl;
ArraySorter() : impl(0, 255) {}
};
template <>
struct ArraySorter<Int8Type> {
ArrayCountSorter<Int8Type> impl;
ArraySorter() : impl(-128, 127) {}
};
template <typename Type>
struct ArraySorter<Type, enable_if_t<(is_integer_type<Type>::value &&
(sizeof(typename Type::c_type) > 1)) ||
is_temporal_type<Type>::value>> {
ArrayCountOrCompareSorter<Type> impl;
};
template <typename Type>
struct ArraySorter<
Type, enable_if_t<is_floating_type<Type>::value || is_base_binary_type<Type>::value ||
is_fixed_size_binary_type<Type>::value>> {
ArrayCompareSorter<Type> impl;
};
using ArraySortIndicesState = internal::OptionsWrapper<ArraySortOptions>;
template <typename OutType, typename InType>
struct ArraySortIndices {
using ArrayType = typename TypeTraits<InType>::ArrayType;
static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const auto& options = ArraySortIndicesState::Get(ctx);
ArrayType arr(batch[0].array());
ArrayData* out_arr = out->mutable_array();
uint64_t* out_begin = out_arr->GetMutableValues<uint64_t>(1);
uint64_t* out_end = out_begin + arr.length();
std::iota(out_begin, out_end, 0);
ArraySorter<InType> sorter;
sorter.impl.Sort(out_begin, out_end, arr, 0, options);
}
};
// Sort indices kernels implemented for
//
// * Number types
// * Base binary types
template <template <typename...> class ExecTemplate>
void AddSortingKernels(VectorKernel base, VectorFunction* func) {
for (const auto& ty : NumericTypes()) {
auto physical_type = GetPhysicalType(ty);
base.signature = KernelSignature::Make({InputType::Array(ty)}, uint64());
base.exec = GenerateNumeric<ExecTemplate, UInt64Type>(*physical_type);
DCHECK_OK(func->AddKernel(base));
}
for (const auto& ty : TemporalTypes()) {
auto physical_type = GetPhysicalType(ty);
base.signature = KernelSignature::Make({InputType::Array(ty)}, uint64());
base.exec = GenerateNumeric<ExecTemplate, UInt64Type>(*physical_type);
DCHECK_OK(func->AddKernel(base));
}
for (const auto id : DecimalTypeIds()) {
base.signature = KernelSignature::Make({InputType::Array(id)}, uint64());
base.exec = GenerateDecimal<ExecTemplate, UInt64Type>(id);
DCHECK_OK(func->AddKernel(base));
}
for (const auto& ty : BaseBinaryTypes()) {
auto physical_type = GetPhysicalType(ty);
base.signature = KernelSignature::Make({InputType::Array(ty)}, uint64());
base.exec = GenerateVarBinaryBase<ExecTemplate, UInt64Type>(*physical_type);
DCHECK_OK(func->AddKernel(base));
}
base.signature =
KernelSignature::Make({InputType::Array(Type::FIXED_SIZE_BINARY)}, uint64());
base.exec = ExecTemplate<UInt64Type, FixedSizeBinaryType>::Exec;
DCHECK_OK(func->AddKernel(base));
}
// ----------------------------------------------------------------------
// ChunkedArray sorting implementations
// Sort a chunked array directly without sorting each array in the
// chunked array. This is used for processing the second and following
// sort keys in TableRadixSorter.
//
// This uses the same algorithm as ArrayCompareSorter.
template <typename Type>
class ChunkedArrayCompareSorter {
using ArrayType = typename TypeTraits<Type>::ArrayType;
public:
// Returns where null starts.
uint64_t* Sort(uint64_t* indices_begin, uint64_t* indices_end,
const std::vector<const Array*>& arrays, int64_t null_count,
const ArraySortOptions& options) {
auto nulls_begin = PartitionNulls<ArrayType, StablePartitioner>(
indices_begin, indices_end, arrays, null_count);
ChunkedArrayResolver resolver(arrays);
if (options.order == SortOrder::Ascending) {
std::stable_sort(indices_begin, nulls_begin, [&](uint64_t left, uint64_t right) {
const auto chunk_left = resolver.Resolve<ArrayType>(left);
const auto chunk_right = resolver.Resolve<ArrayType>(right);
return chunk_left.Value() < chunk_right.Value();
});
} else {
std::stable_sort(indices_begin, nulls_begin, [&](uint64_t left, uint64_t right) {
const auto chunk_left = resolver.Resolve<ArrayType>(left);
const auto chunk_right = resolver.Resolve<ArrayType>(right);
// We don't use 'left > right' here to reduce required operator.
// If we use 'right < left' here, '<' is only required.
return chunk_right.Value() < chunk_left.Value();
});
}
return nulls_begin;
}
};
// Sort a chunked array by sorting each array in the chunked array.
//
// TODO: This is a naive implementation. We'll be able to improve
// performance of this. For example, we'll be able to use threads for
// sorting each array.
class ChunkedArraySorter : public TypeVisitor {
public:
ChunkedArraySorter(ExecContext* ctx, uint64_t* indices_begin, uint64_t* indices_end,
const ChunkedArray& chunked_array, const SortOrder order,
bool can_use_array_sorter = true)
: TypeVisitor(),
indices_begin_(indices_begin),
indices_end_(indices_end),
chunked_array_(chunked_array),
physical_type_(GetPhysicalType(chunked_array.type())),
physical_chunks_(GetPhysicalChunks(chunked_array_, physical_type_)),
order_(order),
can_use_array_sorter_(can_use_array_sorter),
ctx_(ctx) {}
Status Sort() { return physical_type_->Accept(this); }
#define VISIT(TYPE) \
Status Visit(const TYPE& type) override { return SortInternal<TYPE>(); }
VISIT_PHYSICAL_TYPES(VISIT)
#undef VISIT
private:
template <typename Type>
Status SortInternal() {
using ArrayType = typename TypeTraits<Type>::ArrayType;
ArraySortOptions options(order_);
const auto num_chunks = chunked_array_.num_chunks();
if (num_chunks == 0) {
return Status::OK();
}
const auto arrays = GetArrayPointers(physical_chunks_);
if (can_use_array_sorter_) {
// Sort each chunk independently and merge to sorted indices.
// This is a serial implementation.
ArraySorter<Type> sorter;
struct SortedChunk {
int64_t begin_offset;
int64_t end_offset;
int64_t nulls_offset;
};
std::vector<SortedChunk> sorted(num_chunks);
// First sort all individual chunks
int64_t begin_offset = 0;
int64_t end_offset = 0;
int64_t null_count = 0;
for (int i = 0; i < num_chunks; ++i) {
const auto array = checked_cast<const ArrayType*>(arrays[i]);
end_offset += array->length();
null_count += array->null_count();
uint64_t* nulls_begin =
sorter.impl.Sort(indices_begin_ + begin_offset, indices_begin_ + end_offset,
*array, begin_offset, options);
sorted[i] = {begin_offset, end_offset, nulls_begin - indices_begin_};
begin_offset = end_offset;
}
DCHECK_EQ(end_offset, indices_end_ - indices_begin_);
std::unique_ptr<Buffer> temp_buffer;
uint64_t* temp_indices = nullptr;
if (sorted.size() > 1) {
ARROW_ASSIGN_OR_RAISE(
temp_buffer,
AllocateBuffer(sizeof(int64_t) * (indices_end_ - indices_begin_ - null_count),
ctx_->memory_pool()));
temp_indices = reinterpret_cast<uint64_t*>(temp_buffer->mutable_data());
}
// Then merge them by pairs, recursively
while (sorted.size() > 1) {
auto out_it = sorted.begin();
auto it = sorted.begin();
while (it < sorted.end() - 1) {
const auto& left = *it++;
const auto& right = *it++;
DCHECK_EQ(left.end_offset, right.begin_offset);
DCHECK_GE(left.nulls_offset, left.begin_offset);
DCHECK_LE(left.nulls_offset, left.end_offset);
DCHECK_GE(right.nulls_offset, right.begin_offset);
DCHECK_LE(right.nulls_offset, right.end_offset);
uint64_t* nulls_begin = Merge<ArrayType>(
indices_begin_ + left.begin_offset, indices_begin_ + left.end_offset,
indices_begin_ + right.end_offset, indices_begin_ + left.nulls_offset,
indices_begin_ + right.nulls_offset, arrays, null_count, order_,
temp_indices);
*out_it++ = {left.begin_offset, right.end_offset, nulls_begin - indices_begin_};
}
if (it < sorted.end()) {
*out_it++ = *it++;
}
sorted.erase(out_it, sorted.end());
}
DCHECK_EQ(sorted.size(), 1);
DCHECK_EQ(sorted[0].begin_offset, 0);
DCHECK_EQ(sorted[0].end_offset, chunked_array_.length());
// Note that "nulls" can also include NaNs, hence the >= check
DCHECK_GE(chunked_array_.length() - sorted[0].nulls_offset, null_count);
} else {
// Sort the chunked array directory.
ChunkedArrayCompareSorter<Type> sorter;
sorter.Sort(indices_begin_, indices_end_, arrays, chunked_array_.null_count(),
options);
}
return Status::OK();
}
// Merges two sorted indices arrays and returns where nulls starts.
// Where nulls starts is used when the next merge to detect the
// sorted indices locations.
template <typename ArrayType>
uint64_t* Merge(uint64_t* indices_begin, uint64_t* indices_middle,
uint64_t* indices_end, uint64_t* left_nulls_begin,
uint64_t* right_nulls_begin, const std::vector<const Array*>& arrays,
int64_t null_count, const SortOrder order, uint64_t* temp_indices) {
// Input layout:
// [left non-nulls .... left nulls .... right non-nulls .... right nulls]
// ^ ^ ^ ^
// | | | |
// indices_begin left_nulls_begin indices_middle right_nulls_begin
auto left_num_non_nulls = left_nulls_begin - indices_begin;
auto right_num_non_nulls = right_nulls_begin - indices_middle;
// Mutate the input, stably, to obtain the following layout:
// [left non-nulls .... right non-nulls .... left nulls .... right nulls]
// ^ ^ ^ ^
// | | | |
// indices_begin indices_middle nulls_begin right_nulls_begin
std::rotate(left_nulls_begin, indices_middle, right_nulls_begin);
auto nulls_begin = indices_begin + left_num_non_nulls + right_num_non_nulls;
// If the type has null-like values (such as NaN), ensure those plus regular
// nulls are partitioned in the right order. Note this assumes that all
// null-like values (e.g. NaN) are ordered equally.
if (NullTraits<typename ArrayType::TypeClass>::has_null_like_values) {
PartitionNullsOnly<StablePartitioner>(nulls_begin, indices_end, arrays, null_count);
}
// Merge the non-null values into temp area
indices_middle = indices_begin + left_num_non_nulls;
indices_end = indices_middle + right_num_non_nulls;
const ChunkedArrayResolver left_resolver(arrays);
const ChunkedArrayResolver right_resolver(arrays);
if (order == SortOrder::Ascending) {
std::merge(indices_begin, indices_middle, indices_middle, indices_end, temp_indices,
[&](uint64_t left, uint64_t right) {
const auto chunk_left = left_resolver.Resolve<ArrayType>(left);
const auto chunk_right = right_resolver.Resolve<ArrayType>(right);
return chunk_left.Value() < chunk_right.Value();
});
} else {
std::merge(indices_begin, indices_middle, indices_middle, indices_end, temp_indices,
[&](uint64_t left, uint64_t right) {
const auto chunk_left = left_resolver.Resolve<ArrayType>(left);
const auto chunk_right = right_resolver.Resolve<ArrayType>(right);
// We don't use 'left > right' here to reduce required
// operator. If we use 'right < left' here, '<' is only
// required.
return chunk_right.Value() < chunk_left.Value();
});
}
// Copy back temp area into main buffer
std::copy(temp_indices, temp_indices + (nulls_begin - indices_begin), indices_begin);
return nulls_begin;
}
uint64_t* indices_begin_;
uint64_t* indices_end_;
const ChunkedArray& chunked_array_;
const std::shared_ptr<DataType> physical_type_;
const ArrayVector physical_chunks_;
const SortOrder order_;
const bool can_use_array_sorter_;
ExecContext* ctx_;
};
// ----------------------------------------------------------------------
// Record batch sorting implementation(s)
// Visit contiguous ranges of equal values. All entries are assumed
// to be non-null.
template <typename ArrayType, typename Visitor>
void VisitConstantRanges(const ArrayType& array, uint64_t* indices_begin,
uint64_t* indices_end, Visitor&& visit) {
using GetView = GetViewType<typename ArrayType::TypeClass>;
if (indices_begin == indices_end) {
return;
}
auto range_start = indices_begin;
auto range_cur = range_start;
auto last_value = GetView::LogicalValue(array.GetView(*range_cur));
while (++range_cur != indices_end) {
auto v = GetView::LogicalValue(array.GetView(*range_cur));
if (v != last_value) {
visit(range_start, range_cur);
range_start = range_cur;
last_value = v;
}
}
if (range_start != range_cur) {
visit(range_start, range_cur);
}
}
// A sorter for a single column of a RecordBatch, deferring to the next column
// for ranges of equal values.
class RecordBatchColumnSorter {
public:
explicit RecordBatchColumnSorter(RecordBatchColumnSorter* next_column = nullptr)
: next_column_(next_column) {}
virtual ~RecordBatchColumnSorter() {}
virtual void SortRange(uint64_t* indices_begin, uint64_t* indices_end) = 0;
protected:
RecordBatchColumnSorter* next_column_;
};
template <typename Type>
class ConcreteRecordBatchColumnSorter : public RecordBatchColumnSorter {
public:
using ArrayType = typename TypeTraits<Type>::ArrayType;
ConcreteRecordBatchColumnSorter(std::shared_ptr<Array> array, SortOrder order,
RecordBatchColumnSorter* next_column = nullptr)
: RecordBatchColumnSorter(next_column),
owned_array_(std::move(array)),
array_(checked_cast<const ArrayType&>(*owned_array_)),
order_(order),
null_count_(array_.null_count()) {}
void SortRange(uint64_t* indices_begin, uint64_t* indices_end) {
using GetView = GetViewType<Type>;
constexpr int64_t offset = 0;
uint64_t* nulls_begin;
if (null_count_ == 0) {
nulls_begin = indices_end;
} else {
// NOTE that null_count_ is merely an upper bound on the number of nulls
// in this particular range.
nulls_begin = PartitionNullsOnly<StablePartitioner>(indices_begin, indices_end,
array_, offset);
DCHECK_LE(indices_end - nulls_begin, null_count_);
}
uint64_t* null_likes_begin = PartitionNullLikes<ArrayType, StablePartitioner>(
indices_begin, nulls_begin, array_, offset);
// TODO This is roughly the same as ArrayCompareSorter.
// Also, we would like to use a counting sort if possible. This requires
// a counting sort compatible with indirect indexing.
if (order_ == SortOrder::Ascending) {
std::stable_sort(
indices_begin, null_likes_begin, [&](uint64_t left, uint64_t right) {
const auto lhs = GetView::LogicalValue(array_.GetView(left - offset));
const auto rhs = GetView::LogicalValue(array_.GetView(right - offset));
return lhs < rhs;
});
} else {
std::stable_sort(
indices_begin, null_likes_begin, [&](uint64_t left, uint64_t right) {
// We don't use 'left > right' here to reduce required operator.
// If we use 'right < left' here, '<' is only required.
const auto lhs = GetView::LogicalValue(array_.GetView(left - offset));
const auto rhs = GetView::LogicalValue(array_.GetView(right - offset));
return lhs > rhs;
});
}
if (next_column_ != nullptr) {
// Visit all ranges of equal values in this column and sort them on
// the next column.
SortNextColumn(null_likes_begin, nulls_begin);
SortNextColumn(nulls_begin, indices_end);
VisitConstantRanges(array_, indices_begin, null_likes_begin,
[&](uint64_t* range_start, uint64_t* range_end) {
SortNextColumn(range_start, range_end);
});
}
}
void SortNextColumn(uint64_t* indices_begin, uint64_t* indices_end) {
// Avoid the cost of a virtual method call in trivial cases
if (indices_end - indices_begin > 1) {
next_column_->SortRange(indices_begin, indices_end);
}
}
protected:
const std::shared_ptr<Array> owned_array_;
const ArrayType& array_;
const SortOrder order_;
const int64_t null_count_;
};
// Sort a batch using a single-pass left-to-right radix sort.
class RadixRecordBatchSorter {
public:
RadixRecordBatchSorter(uint64_t* indices_begin, uint64_t* indices_end,
const RecordBatch& batch, const SortOptions& options)
: batch_(batch),
options_(options),
indices_begin_(indices_begin),
indices_end_(indices_end) {}
Status Sort() {
ARROW_ASSIGN_OR_RAISE(const auto sort_keys,
ResolveSortKeys(batch_, options_.sort_keys));
// Create column sorters from right to left
std::vector<std::unique_ptr<RecordBatchColumnSorter>> column_sorts(sort_keys.size());
RecordBatchColumnSorter* next_column = nullptr;
for (int64_t i = static_cast<int64_t>(sort_keys.size() - 1); i >= 0; --i) {
ColumnSortFactory factory(sort_keys[i], next_column);
ARROW_ASSIGN_OR_RAISE(column_sorts[i], factory.MakeColumnSort());
next_column = column_sorts[i].get();
}
// Sort from left to right
column_sorts.front()->SortRange(indices_begin_, indices_end_);
return Status::OK();
}
protected:
struct ResolvedSortKey {
std::shared_ptr<Array> array;
SortOrder order;
};
struct ColumnSortFactory {
ColumnSortFactory(const ResolvedSortKey& sort_key,
RecordBatchColumnSorter* next_column)
: physical_type(GetPhysicalType(sort_key.array->type())),
array(GetPhysicalArray(*sort_key.array, physical_type)),
order(sort_key.order),
next_column(next_column) {}
Result<std::unique_ptr<RecordBatchColumnSorter>> MakeColumnSort() {
RETURN_NOT_OK(VisitTypeInline(*physical_type, this));
DCHECK_NE(result, nullptr);
return std::move(result);
}
#define VISIT(TYPE) \
Status Visit(const TYPE& type) { return VisitGeneric(type); }
VISIT_PHYSICAL_TYPES(VISIT)
#undef VISIT
Status Visit(const DataType& type) {
return Status::TypeError("Unsupported type for RecordBatch sorting: ",
type.ToString());
}
template <typename Type>
Status VisitGeneric(const Type&) {
result.reset(new ConcreteRecordBatchColumnSorter<Type>(array, order, next_column));
return Status::OK();
}
std::shared_ptr<DataType> physical_type;
std::shared_ptr<Array> array;
SortOrder order;
RecordBatchColumnSorter* next_column;
std::unique_ptr<RecordBatchColumnSorter> result;
};
static Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
const RecordBatch& batch, const std::vector<SortKey>& sort_keys) {
std::vector<ResolvedSortKey> resolved;
resolved.reserve(sort_keys.size());
for (const auto& sort_key : sort_keys) {
auto array = batch.GetColumnByName(sort_key.name);
if (!array) {
return Status::Invalid("Nonexistent sort key column: ", sort_key.name);
}
resolved.push_back({std::move(array), sort_key.order});
}
return resolved;
}
const RecordBatch& batch_;
const SortOptions& options_;
uint64_t* indices_begin_;
uint64_t* indices_end_;
};
// Compare two records in the same RecordBatch or Table
// (indexing is handled through ResolvedSortKey)
template <typename ResolvedSortKey>
class MultipleKeyComparator {
public:
explicit MultipleKeyComparator(const std::vector<ResolvedSortKey>& sort_keys)
: sort_keys_(sort_keys) {}
Status status() const { return status_; }
// Returns true if the left-th value should be ordered before the
// right-th value, false otherwise. The start_sort_key_index-th
// sort key and subsequent sort keys are used for comparison.
bool Compare(uint64_t left, uint64_t right, size_t start_sort_key_index) {
current_left_ = left;
current_right_ = right;
current_compared_ = 0;
auto num_sort_keys = sort_keys_.size();
for (size_t i = start_sort_key_index; i < num_sort_keys; ++i) {
current_sort_key_index_ = i;
status_ = VisitTypeInline(*sort_keys_[i].type, this);
// If the left value equals to the right value, we need to
// continue to sort.
if (current_compared_ != 0) {
break;
}
}
return current_compared_ < 0;
}
#define VISIT(TYPE) \
Status Visit(const TYPE& type) { \
current_compared_ = CompareType<TYPE>(); \
return Status::OK(); \
}
VISIT_PHYSICAL_TYPES(VISIT)
#undef VISIT
Status Visit(const DataType& type) {
return Status::TypeError("Unsupported type for RecordBatch sorting: ",
type.ToString());
}
private:
// Compares two records in the same table and returns -1, 0 or 1.
//
// -1: The left is less than the right.
// 0: The left equals to the right.
// 1: The left is greater than the right.
//
// This supports null and NaN. Null is processed in this and NaN
// is processed in CompareTypeValue().
template <typename Type>
int32_t CompareType() {
using ArrayType = typename TypeTraits<Type>::ArrayType;
const auto& sort_key = sort_keys_[current_sort_key_index_];
auto order = sort_key.order;
const auto chunk_left = sort_key.template GetChunk<ArrayType>(current_left_);
const auto chunk_right = sort_key.template GetChunk<ArrayType>(current_right_);
if (sort_key.null_count > 0) {
auto is_null_left = chunk_left.IsNull();
auto is_null_right = chunk_right.IsNull();
if (is_null_left && is_null_right) {
return 0;
} else if (is_null_left) {
return 1;
} else if (is_null_right) {
return -1;
}
}
return CompareTypeValue<Type>(chunk_left, chunk_right, order);
}
// For non-float types. Value is never NaN.
template <typename Type>
enable_if_t<!is_floating_type<Type>::value, int32_t> CompareTypeValue(
const ResolvedChunk<typename TypeTraits<Type>::ArrayType>& chunk_left,
const ResolvedChunk<typename TypeTraits<Type>::ArrayType>& chunk_right,
const SortOrder order) {
const auto left = chunk_left.Value();
const auto right = chunk_right.Value();
int32_t compared;
if (left == right) {
compared = 0;
} else if (left > right) {
compared = 1;
} else {
compared = -1;
}
if (order == SortOrder::Descending) {
compared = -compared;
}
return compared;
}
// For float types. Value may be NaN.
template <typename Type>
enable_if_t<is_floating_type<Type>::value, int32_t> CompareTypeValue(
const ResolvedChunk<typename TypeTraits<Type>::ArrayType>& chunk_left,
const ResolvedChunk<typename TypeTraits<Type>::ArrayType>& chunk_right,
const SortOrder order) {
const auto left = chunk_left.Value();
const auto right = chunk_right.Value();
auto is_nan_left = std::isnan(left);
auto is_nan_right = std::isnan(right);
if (is_nan_left && is_nan_right) {
return 0;
} else if (is_nan_left) {
return 1;
} else if (is_nan_right) {
return -1;
}
int32_t compared;
if (left == right) {
compared = 0;
} else if (left > right) {
compared = 1;
} else {
compared = -1;
}
if (order == SortOrder::Descending) {
compared = -compared;
}
return compared;
}
const std::vector<ResolvedSortKey>& sort_keys_;
Status status_;
int64_t current_left_;
int64_t current_right_;
size_t current_sort_key_index_;
int32_t current_compared_;
};
// Sort a batch using a single sort and multiple-key comparisons.
class MultipleKeyRecordBatchSorter : public TypeVisitor {
private:
// Preprocessed sort key.
struct ResolvedSortKey {
ResolvedSortKey(const std::shared_ptr<Array>& array, const SortOrder order)
: type(GetPhysicalType(array->type())),
owned_array(GetPhysicalArray(*array, type)),
array(*owned_array),
order(order),
null_count(array->null_count()) {}
template <typename ArrayType>
ResolvedChunk<ArrayType> GetChunk(int64_t index) const {
return {&checked_cast<const ArrayType&>(array), index};
}
const std::shared_ptr<DataType> type;
std::shared_ptr<Array> owned_array;
const Array& array;
SortOrder order;
int64_t null_count;
};
using Comparator = MultipleKeyComparator<ResolvedSortKey>;
public:
MultipleKeyRecordBatchSorter(uint64_t* indices_begin, uint64_t* indices_end,
const RecordBatch& batch, const SortOptions& options)
: indices_begin_(indices_begin),
indices_end_(indices_end),
sort_keys_(ResolveSortKeys(batch, options.sort_keys, &status_)),
comparator_(sort_keys_) {}
// This is optimized for the first sort key. The first sort key sort
// is processed in this class. The second and following sort keys
// are processed in Comparator.
Status Sort() {
RETURN_NOT_OK(status_);
return sort_keys_[0].type->Accept(this);
}
#define VISIT(TYPE) \
Status Visit(const TYPE& type) override { return SortInternal<TYPE>(); }
VISIT_PHYSICAL_TYPES(VISIT)
#undef VISIT
private:
static std::vector<ResolvedSortKey> ResolveSortKeys(
const RecordBatch& batch, const std::vector<SortKey>& sort_keys, Status* status) {
std::vector<ResolvedSortKey> resolved;
for (const auto& sort_key : sort_keys) {
auto array = batch.GetColumnByName(sort_key.name);
if (!array) {
*status = Status::Invalid("Nonexistent sort key column: ", sort_key.name);
break;
}
resolved.emplace_back(array, sort_key.order);
}
return resolved;
}
template <typename Type>
Status SortInternal() {
using ArrayType = typename TypeTraits<Type>::ArrayType;
auto& comparator = comparator_;
const auto& first_sort_key = sort_keys_[0];
const ArrayType& array = checked_cast<const ArrayType&>(first_sort_key.array);
auto nulls_begin = indices_end_;
nulls_begin = PartitionNullsInternal<Type>(first_sort_key);
// Sort first-key non-nulls
std::stable_sort(indices_begin_, nulls_begin, [&](uint64_t left, uint64_t right) {
// Both values are never null nor NaN
// (otherwise they've been partitioned away above).
const auto value_left = array.GetView(left);
const auto value_right = array.GetView(right);
if (value_left != value_right) {
bool compared = value_left < value_right;
if (first_sort_key.order == SortOrder::Ascending) {
return compared;
} else {
return !compared;
}
}
// If the left value equals to the right value,
// we need to compare the second and following
// sort keys.
return comparator.Compare(left, right, 1);
});
return comparator_.status();
}
// Behaves like PatitionNulls() but this supports multiple sort keys.
//
// For non-float types.
template <typename Type>
enable_if_t<!is_floating_type<Type>::value, uint64_t*> PartitionNullsInternal(
const ResolvedSortKey& first_sort_key) {
using ArrayType = typename TypeTraits<Type>::ArrayType;
if (first_sort_key.null_count == 0) {
return indices_end_;
}
const ArrayType& array = checked_cast<const ArrayType&>(first_sort_key.array);
StablePartitioner partitioner;
auto nulls_begin = partitioner(indices_begin_, indices_end_,
[&](uint64_t index) { return !array.IsNull(index); });
// Sort all nulls by second and following sort keys
// TODO: could we instead run an independent sort from the second key on
// this slice?
if (nulls_begin != indices_end_) {
auto& comparator = comparator_;
std::stable_sort(nulls_begin, indices_end_,
[&comparator](uint64_t left, uint64_t right) {
return comparator.Compare(left, right, 1);
});
}
return nulls_begin;
}
// Behaves like PatitionNulls() but this supports multiple sort keys.
//
// For float types.
template <typename Type>
enable_if_t<is_floating_type<Type>::value, uint64_t*> PartitionNullsInternal(
const ResolvedSortKey& first_sort_key) {
using ArrayType = typename TypeTraits<Type>::ArrayType;
const ArrayType& array = checked_cast<const ArrayType&>(first_sort_key.array);
StablePartitioner partitioner;
uint64_t* nulls_begin;
if (first_sort_key.null_count == 0) {
nulls_begin = indices_end_;
} else {
nulls_begin = partitioner(indices_begin_, indices_end_,
[&](uint64_t index) { return !array.IsNull(index); });
}
uint64_t* nans_and_nulls_begin =
partitioner(indices_begin_, nulls_begin,
[&](uint64_t index) { return !std::isnan(array.GetView(index)); });
auto& comparator = comparator_;
if (nans_and_nulls_begin != nulls_begin) {
// Sort all NaNs by the second and following sort keys.
// TODO: could we instead run an independent sort from the second key on
// this slice?
std::stable_sort(nans_and_nulls_begin, nulls_begin,
[&comparator](uint64_t left, uint64_t right) {
return comparator.Compare(left, right, 1);
});
}
if (nulls_begin != indices_end_) {
// Sort all nulls by the second and following sort keys.
// TODO: could we instead run an independent sort from the second key on
// this slice?
std::stable_sort(nulls_begin, indices_end_,
[&comparator](uint64_t left, uint64_t right) {
return comparator.Compare(left, right, 1);
});
}
return nans_and_nulls_begin;
}
uint64_t* indices_begin_;
uint64_t* indices_end_;
Status status_;
std::vector<ResolvedSortKey> sort_keys_;
Comparator comparator_;
};
// ----------------------------------------------------------------------
// Table sorting implementations
// Sort a table using a radix sort-like algorithm.
// A distinct stable sort is called for each sort key, from the last key to the first.
class TableRadixSorter {
public:
Status Sort(ExecContext* ctx, uint64_t* indices_begin, uint64_t* indices_end,
const Table& table, const SortOptions& options) {
for (auto i = options.sort_keys.size(); i > 0; --i) {
const auto& sort_key = options.sort_keys[i - 1];
const auto& chunked_array = table.GetColumnByName(sort_key.name);
if (!chunked_array) {
return Status::Invalid("Nonexistent sort key column: ", sort_key.name);
}
// We can use ArraySorter only for the sort key that is
// processed first because ArraySorter doesn't care about
// existing indices.
const auto can_use_array_sorter = (i == 0);
ChunkedArraySorter sorter(ctx, indices_begin, indices_end, *chunked_array.get(),
sort_key.order, can_use_array_sorter);
ARROW_RETURN_NOT_OK(sorter.Sort());
}
return Status::OK();
}
};
// Sort a table using a single sort and multiple-key comparisons.
class MultipleKeyTableSorter : public TypeVisitor {
private:
// TODO instead of resolving chunks for each column independently, we could
// split the table into RecordBatches and pay the cost of chunked indexing
// at the first column only.
// Preprocessed sort key.
struct ResolvedSortKey {
ResolvedSortKey(const ChunkedArray& chunked_array, const SortOrder order)
: order(order),
type(GetPhysicalType(chunked_array.type())),
chunks(GetPhysicalChunks(chunked_array, type)),
chunk_pointers(GetArrayPointers(chunks)),
null_count(chunked_array.null_count()),
num_chunks(chunked_array.num_chunks()),
resolver(chunk_pointers) {}
// Finds the target chunk and index in the target chunk from an
// index in chunked array.
template <typename ArrayType>
ResolvedChunk<ArrayType> GetChunk(int64_t index) const {
return resolver.Resolve<ArrayType>(index);
}
const SortOrder order;
const std::shared_ptr<DataType> type;
const ArrayVector chunks;
const std::vector<const Array*> chunk_pointers;
const int64_t null_count;
const int num_chunks;
const ChunkedArrayResolver resolver;
};
using Comparator = MultipleKeyComparator<ResolvedSortKey>;
public:
MultipleKeyTableSorter(uint64_t* indices_begin, uint64_t* indices_end,
const Table& table, const SortOptions& options)
: indices_begin_(indices_begin),
indices_end_(indices_end),
sort_keys_(ResolveSortKeys(table, options.sort_keys, &status_)),
comparator_(sort_keys_) {}
// This is optimized for the first sort key. The first sort key sort
// is processed in this class. The second and following sort keys
// are processed in Comparator.
Status Sort() {
ARROW_RETURN_NOT_OK(status_);
return sort_keys_[0].type->Accept(this);
}
#define VISIT(TYPE) \
Status Visit(const TYPE& type) override { return SortInternal<TYPE>(); }
VISIT_PHYSICAL_TYPES(VISIT)
#undef VISIT
private:
static std::vector<ResolvedSortKey> ResolveSortKeys(
const Table& table, const std::vector<SortKey>& sort_keys, Status* status) {
std::vector<ResolvedSortKey> resolved;
resolved.reserve(sort_keys.size());
for (const auto& sort_key : sort_keys) {
const auto& chunked_array = table.GetColumnByName(sort_key.name);
if (!chunked_array) {
*status = Status::Invalid("Nonexistent sort key column: ", sort_key.name);
break;
}
resolved.emplace_back(*chunked_array, sort_key.order);
}
return resolved;
}
template <typename Type>
Status SortInternal() {
using ArrayType = typename TypeTraits<Type>::ArrayType;
auto& comparator = comparator_;
const auto& first_sort_key = sort_keys_[0];
auto nulls_begin = indices_end_;
nulls_begin = PartitionNullsInternal<Type>(first_sort_key);
std::stable_sort(indices_begin_, nulls_begin, [&](uint64_t left, uint64_t right) {
// Both values are never null nor NaN.
auto chunk_left = first_sort_key.GetChunk<ArrayType>(left);
auto chunk_right = first_sort_key.GetChunk<ArrayType>(right);
auto value_left = chunk_left.Value();
auto value_right = chunk_right.Value();
if (value_left == value_right) {
// If the left value equals to the right value,
// we need to compare the second and following
// sort keys.
return comparator.Compare(left, right, 1);
} else {
auto compared = value_left < value_right;
if (first_sort_key.order == SortOrder::Ascending) {
return compared;
} else {
return !compared;
}
}
});
return comparator_.status();
}
// Behaves like PatitionNulls() but this supports multiple sort keys.
//
// For non-float types.
template <typename Type>
enable_if_t<!is_floating_type<Type>::value, uint64_t*> PartitionNullsInternal(
const ResolvedSortKey& first_sort_key) {
using ArrayType = typename TypeTraits<Type>::ArrayType;
if (first_sort_key.null_count == 0) {
return indices_end_;
}
StablePartitioner partitioner;
auto nulls_begin =
partitioner(indices_begin_, indices_end_, [&first_sort_key](uint64_t index) {
const auto chunk = first_sort_key.GetChunk<ArrayType>(index);
return !chunk.IsNull();
});
DCHECK_EQ(indices_end_ - nulls_begin, first_sort_key.null_count);
auto& comparator = comparator_;
std::stable_sort(nulls_begin, indices_end_, [&](uint64_t left, uint64_t right) {
return comparator.Compare(left, right, 1);
});
return nulls_begin;
}
// Behaves like PatitionNulls() but this supports multiple sort keys.
//
// For float types.
template <typename Type>
enable_if_t<is_floating_type<Type>::value, uint64_t*> PartitionNullsInternal(
const ResolvedSortKey& first_sort_key) {
using ArrayType = typename TypeTraits<Type>::ArrayType;
StablePartitioner partitioner;
uint64_t* nulls_begin;
if (first_sort_key.null_count == 0) {
nulls_begin = indices_end_;
} else {
nulls_begin = partitioner(indices_begin_, indices_end_, [&](uint64_t index) {
const auto chunk = first_sort_key.GetChunk<ArrayType>(index);
return !chunk.IsNull();
});
}
DCHECK_EQ(indices_end_ - nulls_begin, first_sort_key.null_count);
uint64_t* nans_begin = partitioner(indices_begin_, nulls_begin, [&](uint64_t index) {
const auto chunk = first_sort_key.GetChunk<ArrayType>(index);
return !std::isnan(chunk.Value());
});
auto& comparator = comparator_;
// Sort all NaNs by the second and following sort keys.
std::stable_sort(nans_begin, nulls_begin, [&](uint64_t left, uint64_t right) {
return comparator.Compare(left, right, 1);
});
// Sort all nulls by the second and following sort keys.
std::stable_sort(nulls_begin, indices_end_, [&](uint64_t left, uint64_t right) {
return comparator.Compare(left, right, 1);
});
return nans_begin;
}
uint64_t* indices_begin_;
uint64_t* indices_end_;
Status status_;
std::vector<ResolvedSortKey> sort_keys_;
Comparator comparator_;
};
// ----------------------------------------------------------------------
// Top-level sort functions
const auto kDefaultSortOptions = SortOptions::Defaults();
const FunctionDoc sort_indices_doc(
"Return the indices that would sort an array, record batch or table",
("This function computes an array of indices that define a stable sort\n"
"of the input array, record batch or table. Null values are considered\n"
"greater than any other value and are therefore sorted at the end of the\n"
"input. For floating-point types, NaNs are considered greater than any\n"
"other non-null value, but smaller than null values."),
{"input"}, "SortOptions");
class SortIndicesMetaFunction : public MetaFunction {
public:
SortIndicesMetaFunction()
: MetaFunction("sort_indices", Arity::Unary(), &sort_indices_doc,
&kDefaultSortOptions) {}
Result<Datum> ExecuteImpl(const std::vector<Datum>& args,
const FunctionOptions* options,
ExecContext* ctx) const override {
const SortOptions& sort_options = static_cast<const SortOptions&>(*options);
switch (args[0].kind()) {
case Datum::ARRAY:
return SortIndices(*args[0].make_array(), sort_options, ctx);
break;
case Datum::CHUNKED_ARRAY:
return SortIndices(*args[0].chunked_array(), sort_options, ctx);
break;
case Datum::RECORD_BATCH: {
return SortIndices(*args[0].record_batch(), sort_options, ctx);
} break;
case Datum::TABLE:
return SortIndices(*args[0].table(), sort_options, ctx);
break;
default:
break;
}
return Status::NotImplemented(
"Unsupported types for sort_indices operation: "
"values=",
args[0].ToString());
}
private:
Result<Datum> SortIndices(const Array& values, const SortOptions& options,
ExecContext* ctx) const {
SortOrder order = SortOrder::Ascending;
if (!options.sort_keys.empty()) {
order = options.sort_keys[0].order;
}
ArraySortOptions array_options(order);
return CallFunction("array_sort_indices", {values}, &array_options, ctx);
}
Result<Datum> SortIndices(const ChunkedArray& chunked_array, const SortOptions& options,
ExecContext* ctx) const {
SortOrder order = SortOrder::Ascending;
if (!options.sort_keys.empty()) {
order = options.sort_keys[0].order;
}
auto out_type = uint64();
auto length = chunked_array.length();
auto buffer_size = BitUtil::BytesForBits(
length * std::static_pointer_cast<UInt64Type>(out_type)->bit_width());
std::vector<std::shared_ptr<Buffer>> buffers(2);
ARROW_ASSIGN_OR_RAISE(buffers[1],
AllocateResizableBuffer(buffer_size, ctx->memory_pool()));
auto out = std::make_shared<ArrayData>(out_type, length, buffers, 0);
auto out_begin = out->GetMutableValues<uint64_t>(1);
auto out_end = out_begin + length;
std::iota(out_begin, out_end, 0);
ChunkedArraySorter sorter(ctx, out_begin, out_end, chunked_array, order);
ARROW_RETURN_NOT_OK(sorter.Sort());
return Datum(out);
}
Result<Datum> SortIndices(const RecordBatch& batch, const SortOptions& options,
ExecContext* ctx) const {
auto n_sort_keys = options.sort_keys.size();
if (n_sort_keys == 0) {
return Status::Invalid("Must specify one or more sort keys");
}
if (n_sort_keys == 1) {
auto array = batch.GetColumnByName(options.sort_keys[0].name);
if (!array) {
return Status::Invalid("Nonexistent sort key column: ",
options.sort_keys[0].name);
}
return SortIndices(*array, options, ctx);
}
auto out_type = uint64();
auto length = batch.num_rows();
auto buffer_size = BitUtil::BytesForBits(
length * std::static_pointer_cast<UInt64Type>(out_type)->bit_width());
BufferVector buffers(2);
ARROW_ASSIGN_OR_RAISE(buffers[1],
AllocateResizableBuffer(buffer_size, ctx->memory_pool()));
auto out = std::make_shared<ArrayData>(out_type, length, buffers, 0);
auto out_begin = out->GetMutableValues<uint64_t>(1);
auto out_end = out_begin + length;
std::iota(out_begin, out_end, 0);
// Radix sorting is consistently faster except when there is a large number
// of sort keys, in which case it can end up degrading catastrophically.
// Cut off above 8 sort keys.
if (n_sort_keys <= 8) {
RadixRecordBatchSorter sorter(out_begin, out_end, batch, options);
ARROW_RETURN_NOT_OK(sorter.Sort());
} else {
MultipleKeyRecordBatchSorter sorter(out_begin, out_end, batch, options);
ARROW_RETURN_NOT_OK(sorter.Sort());
}
return Datum(out);
}
Result<Datum> SortIndices(const Table& table, const SortOptions& options,
ExecContext* ctx) const {
auto n_sort_keys = options.sort_keys.size();
if (n_sort_keys == 0) {
return Status::Invalid("Must specify one or more sort keys");
}
if (n_sort_keys == 1) {
auto chunked_array = table.GetColumnByName(options.sort_keys[0].name);
if (!chunked_array) {
return Status::Invalid("Nonexistent sort key column: ",
options.sort_keys[0].name);
}
return SortIndices(*chunked_array, options, ctx);
}
auto out_type = uint64();
auto length = table.num_rows();
auto buffer_size = BitUtil::BytesForBits(
length * std::static_pointer_cast<UInt64Type>(out_type)->bit_width());
std::vector<std::shared_ptr<Buffer>> buffers(2);
ARROW_ASSIGN_OR_RAISE(buffers[1],
AllocateResizableBuffer(buffer_size, ctx->memory_pool()));
auto out = std::make_shared<ArrayData>(out_type, length, buffers, 0);
auto out_begin = out->GetMutableValues<uint64_t>(1);
auto out_end = out_begin + length;
std::iota(out_begin, out_end, 0);
// TODO: We should choose suitable sort implementation
// automatically. The current TableRadixSorter implementation is
// faster than MultipleKeyTableSorter only when the number of
// sort keys is 2 and counting sort is used. So we always
// MultipleKeyTableSorter for now.
//
// TableRadixSorter sorter;
// ARROW_RETURN_NOT_OK(sorter.Sort(ctx, out_begin, out_end, table, options));
MultipleKeyTableSorter sorter(out_begin, out_end, table, options);
ARROW_RETURN_NOT_OK(sorter.Sort());
return Datum(out);
}
};
const auto kDefaultArraySortOptions = ArraySortOptions::Defaults();
const FunctionDoc array_sort_indices_doc(
"Return the indices that would sort an array",
("This function computes an array of indices that define a stable sort\n"
"of the input array. Null values are considered greater than any\n"
"other value and are therefore sorted at the end of the array.\n"
"For floating-point types, NaNs are considered greater than any\n"
"other non-null value, but smaller than null values."),
{"array"}, "ArraySortOptions");
const FunctionDoc partition_nth_indices_doc(
"Return the indices that would partition an array around a pivot",
("This functions computes an array of indices that define a non-stable\n"
"partial sort of the input array.\n"
"\n"
"The output is such that the `N`'th index points to the `N`'th element\n"
"of the input in sorted order, and all indices before the `N`'th point\n"
"to elements in the input less or equal to elements at or after the `N`'th.\n"
"\n"
"Null values are considered greater than any other value and are\n"
"therefore partitioned towards the end of the array.\n"
"For floating-point types, NaNs are considered greater than any\n"
"other non-null value, but smaller than null values.\n"
"\n"
"The pivot index `N` must be given in PartitionNthOptions."),
{"array"}, "PartitionNthOptions");
} // namespace
void RegisterVectorSort(FunctionRegistry* registry) {
// The kernel outputs into preallocated memory and is never null
VectorKernel base;
base.mem_allocation = MemAllocation::PREALLOCATE;
base.null_handling = NullHandling::OUTPUT_NOT_NULL;
auto array_sort_indices = std::make_shared<VectorFunction>(
"array_sort_indices", Arity::Unary(), &array_sort_indices_doc,
&kDefaultArraySortOptions);
base.init = ArraySortIndicesState::Init;
AddSortingKernels<ArraySortIndices>(base, array_sort_indices.get());
DCHECK_OK(registry->AddFunction(std::move(array_sort_indices)));
DCHECK_OK(registry->AddFunction(std::make_shared<SortIndicesMetaFunction>()));
// partition_nth_indices has a parameter so needs its init function
auto part_indices = std::make_shared<VectorFunction>(
"partition_nth_indices", Arity::Unary(), &partition_nth_indices_doc);
base.init = PartitionNthToIndicesState::Init;
AddSortingKernels<PartitionNthToIndices>(base, part_indices.get());
DCHECK_OK(registry->AddFunction(std::move(part_indices)));
}
#undef VISIT_PHYSICAL_TYPES
} // namespace internal
} // namespace compute
} // namespace arrow