blob: 0fa174c17593a2cef02b17d8d5a4a30b1103c721 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/chunked_array.h"
#include <algorithm>
#include <cstdlib>
#include <memory>
#include <sstream>
#include <utility>
#include "arrow/array/array_base.h"
#include "arrow/array/array_nested.h"
#include "arrow/array/util.h"
#include "arrow/array/validate.h"
#include "arrow/device_allocation_type_set.h"
#include "arrow/pretty_print.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging_internal.h"
namespace arrow {
using internal::checked_cast;
class MemoryPool;
// ----------------------------------------------------------------------
// ChunkedArray methods
ChunkedArray::ChunkedArray(ArrayVector chunks, std::shared_ptr<DataType> type)
: chunks_(std::move(chunks)),
type_(std::move(type)),
length_(0),
null_count_(0),
chunk_resolver_{chunks_} {
if (type_ == nullptr) {
ARROW_CHECK_GT(chunks_.size(), static_cast<size_t>(0))
<< "cannot construct ChunkedArray from empty vector and omitted type";
type_ = chunks_[0]->type();
}
ARROW_CHECK_LE(chunks.size(), static_cast<size_t>(std::numeric_limits<int>::max()));
for (const auto& chunk : chunks_) {
length_ += chunk->length();
null_count_ += chunk->null_count();
}
}
Result<std::shared_ptr<ChunkedArray>> ChunkedArray::Make(ArrayVector chunks,
std::shared_ptr<DataType> type) {
if (type == nullptr) {
if (chunks.size() == 0) {
return Status::Invalid(
"cannot construct ChunkedArray from empty vector "
"and omitted type");
}
type = chunks[0]->type();
}
for (const auto& chunk : chunks) {
if (!chunk->type()->Equals(*type)) {
return Status::TypeError("Array chunks must all be same type");
}
}
return std::make_shared<ChunkedArray>(std::move(chunks), std::move(type));
}
Result<std::shared_ptr<ChunkedArray>> ChunkedArray::MakeEmpty(
std::shared_ptr<DataType> type, MemoryPool* memory_pool) {
std::vector<std::shared_ptr<Array>> new_chunks(1);
ARROW_ASSIGN_OR_RAISE(new_chunks[0], MakeEmptyArray(std::move(type), memory_pool));
return std::make_shared<ChunkedArray>(std::move(new_chunks));
}
DeviceAllocationTypeSet ChunkedArray::device_types() const {
if (chunks_.empty()) {
// An empty ChunkedArray is considered to be CPU-only.
return DeviceAllocationTypeSet::CpuOnly();
}
DeviceAllocationTypeSet set;
for (const auto& chunk : chunks_) {
set.add(chunk->device_type());
}
return set;
}
namespace {
// Check whether the type or any of its children is a float type.
bool ContainsFloatType(const DataType& type) {
if (is_floating(type.id())) {
return true;
} else {
// Check if any nested field contains a float type.
for (const auto& field : type.fields()) {
if (ContainsFloatType(*field->type())) {
return true;
}
}
}
// No float types are observed
return false;
}
} // namespace
bool ChunkedArray::Equals(const ChunkedArray& other, const EqualOptions& opts) const {
if (this == &other) {
if (opts.nans_equal()) {
return true;
} else if (!ContainsFloatType(*type_)) {
return true;
}
}
if (length_ != other.length()) {
return false;
}
if (null_count_ != other.null_count()) {
return false;
}
// We cannot toggle check_metadata here yet, so we don't check it
if (!type_->Equals(*other.type_, /*check_metadata=*/false)) {
return false;
}
// Check contents of the underlying arrays. This checks for equality of
// the underlying data independently of the chunk size.
return internal::ApplyBinaryChunked(
*this, other,
[&](const Array& left_piece, const Array& right_piece,
int64_t ARROW_ARG_UNUSED(position)) {
if (!left_piece.Equals(right_piece, opts)) {
return Status::Invalid("Unequal piece");
}
return Status::OK();
})
.ok();
}
bool ChunkedArray::Equals(const std::shared_ptr<ChunkedArray>& other,
const EqualOptions& opts) const {
if (!other) {
return false;
}
return Equals(*other.get(), opts);
}
bool ChunkedArray::ApproxEquals(const ChunkedArray& other,
const EqualOptions& equal_options) const {
return Equals(other, equal_options.use_atol(true));
}
Result<std::shared_ptr<Scalar>> ChunkedArray::GetScalar(int64_t index) const {
const auto loc = chunk_resolver_.Resolve(index);
if (loc.chunk_index >= static_cast<int64_t>(chunks_.size())) {
return Status::IndexError("index with value of ", index,
" is out-of-bounds for chunked array of length ", length_);
}
return chunks_[loc.chunk_index]->GetScalar(loc.index_in_chunk);
}
std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t offset, int64_t length) const {
ARROW_CHECK_LE(offset, length_) << "Slice offset greater than array length";
bool offset_equals_length = offset == length_;
int curr_chunk = 0;
while (curr_chunk < num_chunks() && offset >= chunk(curr_chunk)->length()) {
offset -= chunk(curr_chunk)->length();
curr_chunk++;
}
ArrayVector new_chunks;
if (num_chunks() > 0 && (offset_equals_length || length == 0)) {
// Special case the zero-length slice to make sure there is at least 1 Array
// in the result. When there are zero chunks we return zero chunks
new_chunks.push_back(chunk(std::min(curr_chunk, num_chunks() - 1))->Slice(0, 0));
} else {
while (curr_chunk < num_chunks() && length > 0) {
new_chunks.push_back(chunk(curr_chunk)->Slice(offset, length));
length -= chunk(curr_chunk)->length() - offset;
offset = 0;
curr_chunk++;
}
}
return std::make_shared<ChunkedArray>(new_chunks, type_);
}
std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t offset) const {
return Slice(offset, length_);
}
Result<std::vector<std::shared_ptr<ChunkedArray>>> ChunkedArray::Flatten(
MemoryPool* pool) const {
if (type()->id() != Type::STRUCT) {
// Emulate nonexistent copy constructor
return std::vector<std::shared_ptr<ChunkedArray>>{
std::make_shared<ChunkedArray>(chunks_, type_)};
}
std::vector<ArrayVector> flattened_chunks(type()->num_fields());
for (const auto& chunk : chunks_) {
ARROW_ASSIGN_OR_RAISE(auto arrays,
checked_cast<const StructArray&>(*chunk).Flatten(pool));
DCHECK_EQ(arrays.size(), flattened_chunks.size());
for (size_t i = 0; i < arrays.size(); ++i) {
flattened_chunks[i].push_back(arrays[i]);
}
}
std::vector<std::shared_ptr<ChunkedArray>> flattened(type()->num_fields());
for (size_t i = 0; i < flattened.size(); ++i) {
auto child_type = type()->field(static_cast<int>(i))->type();
flattened[i] =
std::make_shared<ChunkedArray>(std::move(flattened_chunks[i]), child_type);
}
return flattened;
}
Result<std::shared_ptr<ChunkedArray>> ChunkedArray::View(
const std::shared_ptr<DataType>& type) const {
ArrayVector out_chunks(this->num_chunks());
for (int i = 0; i < this->num_chunks(); ++i) {
ARROW_ASSIGN_OR_RAISE(out_chunks[i], chunks_[i]->View(type));
}
return std::make_shared<ChunkedArray>(std::move(out_chunks), type);
}
std::string ChunkedArray::ToString() const {
std::stringstream ss;
ARROW_CHECK_OK(PrettyPrint(*this, 0, &ss));
return ss.str();
}
namespace {
Status ValidateChunks(const ArrayVector& chunks, bool full_validation) {
if (chunks.size() == 0) {
return Status::OK();
}
const auto& type = *chunks[0]->type();
// Make sure chunks all have the same type
for (size_t i = 1; i < chunks.size(); ++i) {
const Array& chunk = *chunks[i];
if (!chunk.type()->Equals(type)) {
return Status::Invalid("In chunk ", i, " expected type ", type.ToString(),
" but saw ", chunk.type()->ToString());
}
}
// Validate the chunks themselves
for (size_t i = 0; i < chunks.size(); ++i) {
const Array& chunk = *chunks[i];
const Status st = full_validation ? internal::ValidateArrayFull(chunk)
: internal::ValidateArray(chunk);
if (!st.ok()) {
return Status::Invalid("In chunk ", i, ": ", st.ToString());
}
}
return Status::OK();
}
} // namespace
Status ChunkedArray::Validate() const {
return ValidateChunks(chunks_, /*full_validation=*/false);
}
Status ChunkedArray::ValidateFull() const {
return ValidateChunks(chunks_, /*full_validation=*/true);
}
namespace internal {
bool MultipleChunkIterator::Next(std::shared_ptr<Array>* next_left,
std::shared_ptr<Array>* next_right) {
if (pos_ == length_) return false;
// Find non-empty chunk
std::shared_ptr<Array> chunk_left, chunk_right;
while (true) {
chunk_left = left_.chunk(chunk_idx_left_);
chunk_right = right_.chunk(chunk_idx_right_);
if (chunk_pos_left_ == chunk_left->length()) {
chunk_pos_left_ = 0;
++chunk_idx_left_;
continue;
}
if (chunk_pos_right_ == chunk_right->length()) {
chunk_pos_right_ = 0;
++chunk_idx_right_;
continue;
}
break;
}
// Determine how big of a section to return
int64_t iteration_size = std::min(chunk_left->length() - chunk_pos_left_,
chunk_right->length() - chunk_pos_right_);
*next_left = chunk_left->Slice(chunk_pos_left_, iteration_size);
*next_right = chunk_right->Slice(chunk_pos_right_, iteration_size);
pos_ += iteration_size;
chunk_pos_left_ += iteration_size;
chunk_pos_right_ += iteration_size;
return true;
}
} // namespace internal
} // namespace arrow