blob: e97293bcd9ee58acc33c65b5a33bfb9206f1c6d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "iceberg/row/arrow_array_wrapper.h"
#include <cstring>
#include <nanoarrow/nanoarrow.h>
#include "iceberg/arrow_c_data_guard_internal.h"
#include "iceberg/result.h"
#include "iceberg/util/macros.h"
namespace iceberg {
#define NANOARROW_RETURN_IF_NOT_OK(status) \
if (status != NANOARROW_OK) [[unlikely]] { \
return InvalidArrowData("Nanoarrow error: {}", error.message); \
}
namespace {
// TODO(gangwu): Reuse created ArrowArrayStructLike and others with cache.
Result<Scalar> ExtractValue(const ArrowSchema* schema, const ArrowArray* array,
const ArrowArrayView* array_view, int64_t index) {
if (ArrowArrayViewIsNull(array_view, index)) {
return std::monostate{};
}
switch (array_view->storage_type) {
case NANOARROW_TYPE_BOOL:
return static_cast<bool>(ArrowArrayViewGetIntUnsafe(array_view, index));
case NANOARROW_TYPE_INT32:
case NANOARROW_TYPE_DATE32:
return static_cast<int32_t>(ArrowArrayViewGetIntUnsafe(array_view, index));
case NANOARROW_TYPE_INT64:
case NANOARROW_TYPE_TIME64:
case NANOARROW_TYPE_TIMESTAMP:
return ArrowArrayViewGetIntUnsafe(array_view, index);
case NANOARROW_TYPE_FLOAT:
return static_cast<float>(ArrowArrayViewGetDoubleUnsafe(array_view, index));
case NANOARROW_TYPE_DOUBLE:
return ArrowArrayViewGetDoubleUnsafe(array_view, index);
case NANOARROW_TYPE_STRING:
case NANOARROW_TYPE_BINARY:
case NANOARROW_TYPE_FIXED_SIZE_BINARY:
case NANOARROW_TYPE_STRING_VIEW:
case NANOARROW_TYPE_BINARY_VIEW:
case NANOARROW_TYPE_LARGE_STRING:
case NANOARROW_TYPE_LARGE_BINARY: {
ArrowStringView value = ArrowArrayViewGetStringUnsafe(array_view, index);
return std::string_view(value.data, value.size_bytes);
}
case NANOARROW_TYPE_DECIMAL128: {
ArrowError error;
ArrowSchemaView schema_view;
NANOARROW_RETURN_IF_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, &error));
ArrowDecimal value;
ArrowDecimalInit(&value, schema_view.decimal_bitwidth,
schema_view.decimal_precision, schema_view.decimal_scale);
ArrowArrayViewGetDecimalUnsafe(array_view, index, &value);
if (value.n_words != 2) {
return InvalidArrowData("Unsupported Arrow decimal words: {}", value.n_words);
}
int128_t int_value{0};
std::memcpy(&int_value, value.words, sizeof(int128_t));
return Decimal(int_value);
}
case NANOARROW_TYPE_STRUCT: {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<StructLike> struct_like,
ArrowArrayStructLike::Make(*schema, *array, index));
return struct_like;
}
case NANOARROW_TYPE_LIST: {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<ArrayLike> array_like,
ArrowArrayArrayLike::Make(*schema, *array, index));
return array_like;
}
case NANOARROW_TYPE_MAP: {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<MapLike> map_like,
ArrowArrayMapLike::Make(*schema, *array, index));
return map_like;
}
case NANOARROW_TYPE_EXTENSION:
// TODO(gangwu): Handle these types properly
default:
return NotImplemented("Unsupported Arrow type: {}",
static_cast<int>(array_view->storage_type));
}
}
} // namespace
// ArrowArrayStructLike Implementation
class ArrowArrayStructLike::Impl {
public:
Impl(const ArrowSchema& schema, const ArrowArray& array, int64_t row_index)
: schema_(schema), array_(std::cref(array)), row_index_(row_index) {}
~Impl() = default;
Result<Scalar> GetField(size_t pos) const {
// NOLINTNEXTLINE(modernize-use-integer-sign-comparison)
if (pos >= static_cast<size_t>(schema_.n_children)) {
return InvalidArgument("Field index {} out of range (size: {})", pos,
schema_.n_children);
}
if (row_index_ < 0 || row_index_ >= array_.get().length) {
return InvalidArgument("Row index {} out of range (length: {})", row_index_,
array_.get().length);
}
const ArrowSchema* child_schema = schema_.children[pos];
const ArrowArray* child_array = array_.get().children[pos];
const ArrowArrayView* child_view = array_view_.children[pos];
return ExtractValue(child_schema, child_array, child_view, row_index_);
}
size_t num_fields() const { return static_cast<size_t>(schema_.n_children); }
Status Reset(const ArrowArray& array, int64_t row_index) {
array_ = std::cref(array);
row_index_ = row_index;
ArrowError error;
NANOARROW_RETURN_IF_NOT_OK(
ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
return {};
}
Status Reset(int64_t row_index) {
row_index_ = row_index;
return {};
}
Status Init() {
ArrowError error;
NANOARROW_RETURN_IF_NOT_OK(
ArrowArrayViewInitFromSchema(&array_view_, &schema_, &error));
NANOARROW_RETURN_IF_NOT_OK(
ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
return {};
}
private:
ArrowArrayView array_view_;
internal::ArrowArrayViewGuard array_view_guard_{&array_view_};
const ArrowSchema& schema_;
std::reference_wrapper<const ArrowArray> array_;
int64_t row_index_;
};
Result<std::unique_ptr<ArrowArrayStructLike>> ArrowArrayStructLike::Make(
const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) {
auto impl = std::make_unique<Impl>(schema, array, row_index);
ICEBERG_RETURN_UNEXPECTED(impl->Init());
return std::unique_ptr<ArrowArrayStructLike>(new ArrowArrayStructLike(std::move(impl)));
}
ArrowArrayStructLike::ArrowArrayStructLike(std::unique_ptr<Impl> impl)
: impl_(std::move(impl)) {}
ArrowArrayStructLike::~ArrowArrayStructLike() = default;
Result<Scalar> ArrowArrayStructLike::GetField(size_t pos) const {
return impl_->GetField(pos);
}
size_t ArrowArrayStructLike::num_fields() const { return impl_->num_fields(); }
Status ArrowArrayStructLike::Reset(int64_t row_index) { return impl_->Reset(row_index); }
Status ArrowArrayStructLike::Reset(const ArrowArray& array, int64_t row_index) {
return impl_->Reset(array, row_index);
}
// ArrowArrayArrayLike Implementation
class ArrowArrayArrayLike::Impl {
public:
Impl(const ArrowSchema& schema, const ArrowArray& array, int64_t row_index)
: schema_(schema), array_(std::cref(array)), row_index_(row_index) {}
~Impl() = default;
Result<Scalar> GetElement(size_t pos) const {
// NOLINTNEXTLINE(modernize-use-integer-sign-comparison)
if (pos >= static_cast<size_t>(length_)) {
return InvalidArgument("Element index {} out of range (length: {})", pos, length_);
}
const ArrowSchema* child_schema = schema_.children[0];
const ArrowArray* child_array = array_.get().children[0];
const ArrowArrayView* child_view = array_view_.children[0];
return ExtractValue(child_schema, child_array, child_view,
offset_ + static_cast<int64_t>(pos));
}
size_t size() const { return static_cast<size_t>(length_); }
Status Reset(const ArrowArray& array, int64_t row_index) {
array_ = std::cref(array);
row_index_ = row_index;
ArrowError error;
NANOARROW_RETURN_IF_NOT_OK(
ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
return UpdateOffsets();
}
Status Reset(int64_t row_index) {
row_index_ = row_index;
return UpdateOffsets();
}
Status Init() {
ArrowError error;
NANOARROW_RETURN_IF_NOT_OK(
ArrowArrayViewInitFromSchema(&array_view_, &schema_, &error));
NANOARROW_RETURN_IF_NOT_OK(
ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
return UpdateOffsets();
}
private:
Status UpdateOffsets() {
if (row_index_ < 0 || row_index_ >= array_.get().length) {
return InvalidArgument("Row index {} out of range (length: {})", row_index_,
array_.get().length);
}
offset_ = ArrowArrayViewListChildOffset(&array_view_, row_index_);
length_ = ArrowArrayViewListChildOffset(&array_view_, row_index_ + 1) - offset_;
return {};
}
ArrowArrayView array_view_;
internal::ArrowArrayViewGuard array_view_guard_{&array_view_};
const ArrowSchema& schema_;
std::reference_wrapper<const ArrowArray> array_;
int64_t row_index_;
int64_t offset_ = 0;
int64_t length_ = 0;
};
Result<std::unique_ptr<ArrowArrayArrayLike>> ArrowArrayArrayLike::Make(
const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) {
auto impl = std::make_unique<Impl>(schema, array, row_index);
ICEBERG_RETURN_UNEXPECTED(impl->Init());
return std::unique_ptr<ArrowArrayArrayLike>(new ArrowArrayArrayLike(std::move(impl)));
}
ArrowArrayArrayLike::ArrowArrayArrayLike(std::unique_ptr<Impl> impl)
: impl_(std::move(impl)) {}
ArrowArrayArrayLike::~ArrowArrayArrayLike() = default;
Result<Scalar> ArrowArrayArrayLike::GetElement(size_t pos) const {
return impl_->GetElement(pos);
}
size_t ArrowArrayArrayLike::size() const { return impl_->size(); }
Status ArrowArrayArrayLike::Reset(int64_t row_index) { return impl_->Reset(row_index); }
Status ArrowArrayArrayLike::Reset(const ArrowArray& array, int64_t row_index) {
return impl_->Reset(array, row_index);
}
// ArrowArrayMapLike Implementation
class ArrowArrayMapLike::Impl {
public:
Impl(const ArrowSchema& schema, const ArrowArray& array, int64_t row_index)
: schema_(schema), array_(std::cref(array)), row_index_(row_index) {}
~Impl() = default;
Result<Scalar> GetKey(size_t pos) const {
// NOLINTNEXTLINE(modernize-use-integer-sign-comparison)
if (pos >= static_cast<size_t>(length_)) {
return InvalidArgument("Key index {} out of range (length: {})", pos, length_);
}
const ArrowSchema* keys_schema = schema_.children[0]->children[0];
const ArrowArray* keys_array = array_.get().children[0]->children[0];
const ArrowArrayView* keys_view = array_view_.children[0]->children[0];
return ExtractValue(keys_schema, keys_array, keys_view,
offset_ + static_cast<int64_t>(pos));
}
Result<Scalar> GetValue(size_t pos) const {
// NOLINTNEXTLINE(modernize-use-integer-sign-comparison)
if (pos >= static_cast<size_t>(length_)) {
return InvalidArgument("Value index {} out of range (length: {})", pos, length_);
}
const ArrowSchema* values_schema = schema_.children[0]->children[1];
const ArrowArray* values_array = array_.get().children[0]->children[1];
const ArrowArrayView* values_view = array_view_.children[0]->children[1];
return ExtractValue(values_schema, values_array, values_view,
offset_ + static_cast<int64_t>(pos));
}
size_t size() const { return static_cast<size_t>(length_); }
Status Reset(const ArrowArray& array, int64_t row_index) {
array_ = std::cref(array);
row_index_ = row_index;
ArrowError error;
NANOARROW_RETURN_IF_NOT_OK(
ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
return UpdateOffsets();
}
Status Reset(int64_t row_index) {
row_index_ = row_index;
return UpdateOffsets();
}
Status Init() {
ArrowError error;
NANOARROW_RETURN_IF_NOT_OK(
ArrowArrayViewInitFromSchema(&array_view_, &schema_, &error));
NANOARROW_RETURN_IF_NOT_OK(
ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
return UpdateOffsets();
}
private:
Status UpdateOffsets() {
if (row_index_ < 0 || row_index_ >= array_.get().length) {
return InvalidArgument("Row index {} out of range (length: {})", row_index_,
array_.get().length);
}
// XXX: ArrowArrayViewListChildOffset does not work for map types.
// We need to directly access the offsets buffer instead.
auto* offsets_buffer = array_view_.buffer_views[1].data.as_int32;
offset_ = offsets_buffer[row_index_];
length_ = offsets_buffer[row_index_ + 1] - offset_;
return {};
}
ArrowArrayView array_view_;
internal::ArrowArrayViewGuard array_view_guard_{&array_view_};
const ArrowSchema& schema_;
std::reference_wrapper<const ArrowArray> array_;
int64_t row_index_;
int64_t offset_ = 0;
int64_t length_ = 0;
};
Result<std::unique_ptr<ArrowArrayMapLike>> ArrowArrayMapLike::Make(
const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) {
auto impl = std::make_unique<Impl>(schema, array, row_index);
ICEBERG_RETURN_UNEXPECTED(impl->Init());
return std::unique_ptr<ArrowArrayMapLike>(new ArrowArrayMapLike(std::move(impl)));
}
ArrowArrayMapLike::ArrowArrayMapLike(std::unique_ptr<Impl> impl)
: impl_(std::move(impl)) {}
ArrowArrayMapLike::~ArrowArrayMapLike() = default;
Result<Scalar> ArrowArrayMapLike::GetKey(size_t pos) const { return impl_->GetKey(pos); }
Result<Scalar> ArrowArrayMapLike::GetValue(size_t pos) const {
return impl_->GetValue(pos);
}
size_t ArrowArrayMapLike::size() const { return impl_->size(); }
Status ArrowArrayMapLike::Reset(int64_t row_index) { return impl_->Reset(row_index); }
Status ArrowArrayMapLike::Reset(const ArrowArray& array, int64_t row_index) {
return impl_->Reset(array, row_index);
}
} // namespace iceberg