blob: e02db29a6402ca4fb23a34a277447905f2b1ac28 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <cerrno>
#include <concepts>
#include <cstdint>
#include <cstring>
#include <memory>
#include <optional>
#include <span>
#include <string>
#include <utility>
#include <vector>
#include "iceberg/arrow_c_data.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
namespace iceberg {
/// \brief Cached state for ProjectBatch over one input/output schema pair.
///
/// Exported because this internal utility is shared across library translation units.
class ICEBERG_EXPORT ProjectionContext {
public:
using ProjectBatchState = std::shared_ptr<void>;
using ProjectBatchFunction = auto (*)(ArrowArray* input_batch,
std::span<const int32_t> row_indices,
ProjectionContext& projection)
-> Result<ArrowArray>;
/// \brief Register a custom implementation for ProjectBatch.
///
/// Registration is process-wide. If multiple implementations are registered,
/// the last non-null implementation wins.
static void RegisterProjectBatchFunction(ProjectBatchFunction project_batch_function);
/// \brief Returns true when a custom implementation has been registered.
static bool HasProjectBatchFunction();
/// \brief Resolve the registered ProjectBatch implementation.
static auto ResolveProjectBatchFunction() -> ProjectBatchFunction;
/// \brief Build reusable projection state for a validated schema pair.
///
/// \param input_schema Schema that describes every input batch.
/// \param output_schema Final schema and column order requested by the caller.
/// \param project_batch_function Optional implementation returned by
/// ProjectionContext::ResolveProjectBatchFunction, or nullptr to use the nanoarrow
/// path.
/// \note It validates that output_schema selects or reorders complete top-level fields
/// by field id. Nested pruning and type changes are rejected. The input_schema and
/// output_schema passed to Make must outlive the context. ProjectBatch may lazily
/// initialize backend cache; do not share one context across concurrent calls.
static Result<ProjectionContext> Make(const Schema& input_schema,
const Schema& output_schema,
ProjectBatchFunction project_batch_function);
ProjectionContext(ProjectionContext&&) noexcept;
ProjectionContext& operator=(ProjectionContext&&) noexcept;
~ProjectionContext();
ProjectionContext(const ProjectionContext&) = delete;
ProjectionContext& operator=(const ProjectionContext&) = delete;
const Schema& input_schema() const;
const Schema& output_schema() const;
const ArrowSchema& input_arrow_schema() const;
const ArrowSchema& output_arrow_schema() const;
std::span<const int32_t> selected_field_indices() const;
ProjectBatchFunction project_batch_function() const;
ProjectBatchState& project_batch_state();
private:
ProjectionContext() = default;
// Raw schema pointers are borrowed from caller-owned schemas. FileScanTaskReader
// keeps those schema objects alive in the same stream source that owns this context.
const Schema* input_schema_ = nullptr;
const Schema* output_schema_ = nullptr;
std::vector<int32_t> selected_field_indices_;
ArrowSchema input_arrow_schema_{};
ArrowSchema output_arrow_schema_{};
ProjectBatchFunction project_batch_function_ = nullptr;
ProjectBatchState project_batch_state_;
};
/// \brief Concept for sources that can be wrapped as ArrowArrayStreams.
template <typename Source>
concept ArrowArrayStreamProvider = requires(Source& source) {
{ source.Close() } -> std::same_as<Status>;
{ source.Next() } -> std::same_as<Result<std::optional<ArrowArray>>>;
{ source.Schema() } -> std::same_as<Result<ArrowSchema>>;
};
namespace detail {
template <ArrowArrayStreamProvider Source>
struct ArrowArrayStreamPrivateData {
std::unique_ptr<Source> source;
std::string last_error;
explicit ArrowArrayStreamPrivateData(std::unique_ptr<Source> src)
: source(std::move(src)) {}
~ArrowArrayStreamPrivateData() {
if (source != nullptr) {
std::ignore = source->Close();
}
}
};
template <ArrowArrayStreamProvider Source>
int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) {
if (stream == nullptr || stream->private_data == nullptr) {
return EINVAL;
}
auto* private_data =
static_cast<ArrowArrayStreamPrivateData<Source>*>(stream->private_data);
auto schema_result = private_data->source->Schema();
if (!schema_result.has_value()) {
private_data->last_error = schema_result.error().message;
std::memset(out, 0, sizeof(ArrowSchema));
return EIO;
}
*out = std::move(schema_result.value());
return 0;
}
template <ArrowArrayStreamProvider Source>
int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) {
if (stream == nullptr || stream->private_data == nullptr) {
return EINVAL;
}
auto* private_data =
static_cast<ArrowArrayStreamPrivateData<Source>*>(stream->private_data);
auto next_result = private_data->source->Next();
if (!next_result.has_value()) {
private_data->last_error = next_result.error().message;
std::memset(out, 0, sizeof(ArrowArray));
return EIO;
}
auto& optional_array = next_result.value();
if (optional_array.has_value()) {
*out = std::move(optional_array.value());
} else {
std::memset(out, 0, sizeof(ArrowArray));
}
return 0;
}
template <ArrowArrayStreamProvider Source>
const char* GetLastError(struct ArrowArrayStream* stream) {
if (stream == nullptr || stream->private_data == nullptr) {
return nullptr;
}
auto* private_data =
static_cast<ArrowArrayStreamPrivateData<Source>*>(stream->private_data);
return private_data->last_error.empty() ? nullptr : private_data->last_error.c_str();
}
template <ArrowArrayStreamProvider Source>
void Release(struct ArrowArrayStream* stream) {
if (stream == nullptr || stream->private_data == nullptr) {
return;
}
delete static_cast<ArrowArrayStreamPrivateData<Source>*>(stream->private_data);
stream->private_data = nullptr;
stream->release = nullptr;
}
} // namespace detail
/// \brief Wrap an object with Close, Next, and Schema as an ArrowArrayStream.
template <ArrowArrayStreamProvider Source>
Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Source> source) {
if (source == nullptr) {
return InvalidArgument("Cannot make ArrowArrayStream from null source");
}
auto private_data =
std::make_unique<detail::ArrowArrayStreamPrivateData<Source>>(std::move(source));
ArrowArrayStream stream{.get_schema = detail::GetSchema<Source>,
.get_next = detail::GetNext<Source>,
.get_last_error = detail::GetLastError<Source>,
.release = detail::Release<Source>,
.private_data = private_data.release()};
return stream;
}
/// \brief Wrap a Reader as an ArrowArrayStream.
ICEBERG_EXPORT Result<ArrowArrayStream> MakeArrowArrayStream(
std::unique_ptr<Reader> reader);
/// \brief Project selected rows from a batch into complete top-level fields.
///
/// `input_batch` is consumed by this function. If the projection carries a registered
/// implementation, the call is delegated to it; otherwise the built-in nanoarrow
/// implementation is used. The projection must have been created for the stable schema
/// pair that describes the input batch and requested output. This function does not
/// revalidate schema compatibility on each batch.
///
/// \param input_batch Owned Arrow C Data batch to project.
/// \param row_indices Zero-based row positions to copy from `input_batch`.
/// \param projection Reusable schema/projection state created by
/// ProjectionContext::Make.
/// \return A newly owned ArrowArray matching `projection.output_schema()`.
ICEBERG_EXPORT Result<ArrowArray> ProjectBatch(ArrowArray* input_batch,
std::span<const int32_t> row_indices,
ProjectionContext& projection);
} // namespace iceberg