blob: eeec262e90a95c7e486e1a6acc6a015063aa854d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "iceberg/table_scan.h"
#include <cstring>
#include <iterator>
#include "iceberg/expression/binder.h"
#include "iceberg/expression/expression.h"
#include "iceberg/file_reader.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_group.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/snapshot_util_internal.h"
#include "iceberg/util/timepoint.h"
#include "iceberg/util/type_util.h"
namespace iceberg {
namespace {
const std::vector<std::string> kScanColumns = {
"snapshot_id", "file_path", "file_ordinal", "file_format",
"block_size_in_bytes", "file_size_in_bytes", "record_count", "partition",
"key_metadata", "split_offsets", "sort_order_id",
};
const std::vector<std::string> kStatsColumns = {
"value_counts", "null_value_counts", "nan_value_counts",
"lower_bounds", "upper_bounds", "column_sizes",
};
const std::vector<std::string> kScanColumnsWithStats = [] {
auto cols = kScanColumns;
cols.insert(cols.end(), kStatsColumns.begin(), kStatsColumns.end());
return cols;
}();
/// \brief Private data structure to hold the Reader and error state
struct ReaderStreamPrivateData {
std::unique_ptr<Reader> reader;
std::string last_error;
explicit ReaderStreamPrivateData(std::unique_ptr<Reader> reader_ptr)
: reader(std::move(reader_ptr)) {}
~ReaderStreamPrivateData() {
if (reader) {
std::ignore = reader->Close();
}
}
};
/// \brief Callback to get the stream schema
static int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) {
if (!stream || !stream->private_data) {
return EINVAL;
}
auto* private_data = static_cast<ReaderStreamPrivateData*>(stream->private_data);
// Get schema from reader
auto schema_result = private_data->reader->Schema();
if (!schema_result.has_value()) {
private_data->last_error = schema_result.error().message;
std::memset(out, 0, sizeof(ArrowSchema));
return EIO;
}
*out = std::move(schema_result.value());
return 0;
}
/// \brief Callback to get the next array from the stream
static int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) {
if (!stream || !stream->private_data) {
return EINVAL;
}
auto* private_data = static_cast<ReaderStreamPrivateData*>(stream->private_data);
auto next_result = private_data->reader->Next();
if (!next_result.has_value()) {
private_data->last_error = next_result.error().message;
std::memset(out, 0, sizeof(ArrowArray));
return EIO;
}
auto& optional_array = next_result.value();
if (optional_array.has_value()) {
*out = std::move(optional_array.value());
} else {
// End of stream - set release to nullptr to signal end
std::memset(out, 0, sizeof(ArrowArray));
out->release = nullptr;
}
return 0;
}
/// \brief Callback to get the last error message
static const char* GetLastError(struct ArrowArrayStream* stream) {
if (!stream || !stream->private_data) {
return nullptr;
}
auto* private_data = static_cast<ReaderStreamPrivateData*>(stream->private_data);
return private_data->last_error.empty() ? nullptr : private_data->last_error.c_str();
}
/// \brief Callback to release the stream resources
static void Release(struct ArrowArrayStream* stream) {
if (!stream || !stream->private_data) {
return;
}
delete static_cast<ReaderStreamPrivateData*>(stream->private_data);
stream->private_data = nullptr;
stream->release = nullptr;
}
Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
if (!reader) {
return InvalidArgument("Reader cannot be null");
}
auto private_data = std::make_unique<ReaderStreamPrivateData>(std::move(reader));
ArrowArrayStream stream{.get_schema = GetSchema,
.get_next = GetNext,
.get_last_error = GetLastError,
.release = Release,
.private_data = private_data.release()};
return stream;
}
} // namespace
namespace internal {
Status TableScanContext::Validate() const {
ICEBERG_CHECK(columns_to_keep_stats.empty() || return_column_stats,
"Cannot select columns to keep stats when column stats are not returned");
ICEBERG_CHECK(projected_schema == nullptr || selected_columns.empty(),
"Cannot set projection schema and selected columns at the same time");
ICEBERG_CHECK(!snapshot_id.has_value() ||
(!from_snapshot_id.has_value() && !to_snapshot_id.has_value()),
"Cannot mix snapshot scan and incremental scan");
ICEBERG_CHECK(!min_rows_requested.has_value() || min_rows_requested.value() >= 0,
"Min rows requested cannot be negative");
return {};
}
} // namespace internal
ScanTask::~ScanTask() = default;
// FileScanTask implementation
FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file,
std::vector<std::shared_ptr<DataFile>> delete_files,
std::shared_ptr<Expression> residual_filter)
: data_file_(std::move(data_file)),
delete_files_(std::move(delete_files)),
residual_filter_(std::move(residual_filter)) {
ICEBERG_DCHECK(data_file_ != nullptr, "Data file cannot be null for FileScanTask");
}
int64_t FileScanTask::size_bytes() const { return data_file_->file_size_in_bytes; }
int32_t FileScanTask::files_count() const { return 1; }
int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; }
Result<ArrowArrayStream> FileScanTask::ToArrow(
const std::shared_ptr<FileIO>& io, std::shared_ptr<Schema> projected_schema) const {
if (!delete_files_.empty()) {
return NotSupported("Reading data files with delete files is not yet supported.");
}
const ReaderOptions options{.path = data_file_->file_path,
.length = data_file_->file_size_in_bytes,
.io = io,
.projection = std::move(projected_schema),
.filter = residual_filter_};
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ReaderFactoryRegistry::Open(data_file_->file_format, options));
return MakeArrowArrayStream(std::move(reader));
}
Result<std::unique_ptr<TableScanBuilder>> TableScanBuilder::Make(
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io) {
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
return std::unique_ptr<TableScanBuilder>(
new TableScanBuilder(std::move(metadata), std::move(io)));
}
TableScanBuilder::TableScanBuilder(std::shared_ptr<TableMetadata> table_metadata,
std::shared_ptr<FileIO> file_io)
: metadata_(std::move(table_metadata)), io_(std::move(file_io)) {}
TableScanBuilder& TableScanBuilder::Option(std::string key, std::string value) {
context_.options[std::move(key)] = std::move(value);
return *this;
}
TableScanBuilder& TableScanBuilder::Project(std::shared_ptr<Schema> schema) {
context_.projected_schema = std::move(schema);
return *this;
}
TableScanBuilder& TableScanBuilder::CaseSensitive(bool case_sensitive) {
context_.case_sensitive = case_sensitive;
return *this;
}
TableScanBuilder& TableScanBuilder::IncludeColumnStats() {
context_.return_column_stats = true;
return *this;
}
TableScanBuilder& TableScanBuilder::IncludeColumnStats(
const std::vector<std::string>& requested_columns) {
context_.return_column_stats = true;
context_.columns_to_keep_stats.clear();
context_.columns_to_keep_stats.reserve(requested_columns.size());
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_ref, ResolveSnapshotSchema());
const auto& schema = schema_ref.get();
for (const auto& column_name : requested_columns) {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field, schema->FindFieldByName(column_name));
if (field.has_value()) {
context_.columns_to_keep_stats.insert(field.value().get().field_id());
}
}
return *this;
}
TableScanBuilder& TableScanBuilder::Select(const std::vector<std::string>& column_names) {
context_.selected_columns = column_names;
return *this;
}
TableScanBuilder& TableScanBuilder::Filter(std::shared_ptr<Expression> filter) {
context_.filter = std::move(filter);
return *this;
}
TableScanBuilder& TableScanBuilder::IgnoreResiduals() {
context_.ignore_residuals = true;
return *this;
}
TableScanBuilder& TableScanBuilder::MinRowsRequested(int64_t num_rows) {
context_.min_rows_requested = num_rows;
return *this;
}
TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) {
ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(),
"Cannot override snapshot, already set snapshot id={}",
context_.snapshot_id.value());
ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(snapshot_id));
context_.snapshot_id = snapshot_id;
return *this;
}
TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) {
if (ref == SnapshotRef::kMainBranch) {
snapshot_schema_ = nullptr;
context_.snapshot_id.reset();
return *this;
}
ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(),
"Cannot override ref, already set snapshot id={}",
context_.snapshot_id.value());
auto iter = metadata_->refs.find(ref);
ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref {}", ref);
ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
int32_t snapshot_id = iter->second->snapshot_id;
ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(snapshot_id));
context_.snapshot_id = snapshot_id;
return *this;
}
TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) {
auto time_point_ms = TimePointMsFromUnixMs(timestamp_millis);
ICEBERG_BUILDER_ASSIGN_OR_RETURN(
auto snapshot_id, SnapshotUtil::SnapshotIdAsOfTime(*metadata_, time_point_ms));
return UseSnapshot(snapshot_id);
}
TableScanBuilder& TableScanBuilder::FromSnapshot(
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) {
return AddError(NotImplemented("Incremental scan is not implemented"));
}
TableScanBuilder& TableScanBuilder::FromSnapshot([[maybe_unused]] const std::string& ref,
[[maybe_unused]] bool inclusive) {
return AddError(NotImplemented("Incremental scan is not implemented"));
}
TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] int64_t to_snapshot_id) {
return AddError(NotImplemented("Incremental scan is not implemented"));
}
TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] const std::string& ref) {
return AddError(NotImplemented("Incremental scan is not implemented"));
}
TableScanBuilder& TableScanBuilder::UseBranch(const std::string& branch) {
context_.branch = branch;
return *this;
}
Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
TableScanBuilder::ResolveSnapshotSchema() {
if (snapshot_schema_ == nullptr) {
if (context_.snapshot_id.has_value()) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot,
metadata_->SnapshotById(*context_.snapshot_id));
int32_t schema_id = snapshot->schema_id.value_or(Schema::kInitialSchemaId);
ICEBERG_ASSIGN_OR_RAISE(snapshot_schema_, metadata_->SchemaById(schema_id));
} else {
ICEBERG_ASSIGN_OR_RAISE(snapshot_schema_, metadata_->Schema());
}
}
ICEBERG_CHECK(snapshot_schema_ != nullptr, "Snapshot schema is null");
return snapshot_schema_;
}
bool TableScanBuilder::IsIncrementalScan() const {
return context_.from_snapshot_id.has_value() || context_.to_snapshot_id.has_value();
}
Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
ICEBERG_RETURN_UNEXPECTED(context_.Validate());
if (IsIncrementalScan()) {
return NotImplemented("Incremental scan is not yet implemented");
}
ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema());
return DataTableScan::Make(metadata_, schema.get(), io_, std::move(context_));
}
TableScan::TableScan(std::shared_ptr<TableMetadata> metadata,
std::shared_ptr<Schema> schema, std::shared_ptr<FileIO> file_io,
internal::TableScanContext context)
: metadata_(std::move(metadata)),
schema_(std::move(schema)),
io_(std::move(file_io)),
context_(std::move(context)) {}
TableScan::~TableScan() = default;
const std::shared_ptr<TableMetadata>& TableScan::metadata() const { return metadata_; }
Result<std::shared_ptr<Snapshot>> TableScan::snapshot() const {
auto snapshot_id = context_.snapshot_id ? context_.snapshot_id.value()
: metadata_->current_snapshot_id;
if (snapshot_id == kInvalidSnapshotId) {
return std::shared_ptr<Snapshot>{nullptr};
}
return metadata_->SnapshotById(snapshot_id);
}
Result<std::shared_ptr<Schema>> TableScan::schema() const {
return ResolveProjectedSchema();
}
const internal::TableScanContext& TableScan::context() const { return context_; }
const std::shared_ptr<FileIO>& TableScan::io() const { return io_; }
const std::shared_ptr<Expression>& TableScan::filter() const {
const static std::shared_ptr<Expression> true_expr = True::Instance();
if (!context_.filter) {
return true_expr;
}
return context_.filter;
}
bool TableScan::is_case_sensitive() const { return context_.case_sensitive; }
Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
TableScan::ResolveProjectedSchema() const {
if (projected_schema_ != nullptr) {
return projected_schema_;
}
if (!context_.selected_columns.empty()) {
std::unordered_set<int32_t> required_field_ids;
// Include columns referenced by filter
if (context_.filter != nullptr) {
ICEBERG_ASSIGN_OR_RAISE(auto is_bound, IsBoundVisitor::IsBound(context_.filter));
if (is_bound) {
ICEBERG_ASSIGN_OR_RAISE(required_field_ids,
ReferenceVisitor::GetReferencedFieldIds(context_.filter));
} else {
ICEBERG_ASSIGN_OR_RAISE(auto filter, Binder::Bind(*schema_, context_.filter,
context_.case_sensitive));
ICEBERG_ASSIGN_OR_RAISE(required_field_ids,
ReferenceVisitor::GetReferencedFieldIds(filter));
}
}
// Include columns selected by option
ICEBERG_ASSIGN_OR_RAISE(auto selected, schema_->Select(context_.selected_columns,
context_.case_sensitive));
ICEBERG_ASSIGN_OR_RAISE(
auto selected_field_ids,
GetProjectedIdsVisitor::GetProjectedIds(*selected, /*include_struct_ids=*/true));
required_field_ids.insert(std::make_move_iterator(selected_field_ids.begin()),
std::make_move_iterator(selected_field_ids.end()));
ICEBERG_ASSIGN_OR_RAISE(projected_schema_, schema_->Project(required_field_ids));
} else if (context_.projected_schema != nullptr) {
projected_schema_ = context_.projected_schema;
} else {
projected_schema_ = schema_;
}
return projected_schema_;
}
const std::vector<std::string>& TableScan::ScanColumns() const {
return context_.return_column_stats ? kScanColumnsWithStats : kScanColumns;
}
Result<std::unique_ptr<DataTableScan>> DataTableScan::Make(
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> io, internal::TableScanContext context) {
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
return std::unique_ptr<DataTableScan>(new DataTableScan(
std::move(metadata), std::move(schema), std::move(io), std::move(context)));
}
DataTableScan::DataTableScan(std::shared_ptr<TableMetadata> metadata,
std::shared_ptr<Schema> schema, std::shared_ptr<FileIO> io,
internal::TableScanContext context)
: TableScan(std::move(metadata), std::move(schema), std::move(io),
std::move(context)) {}
Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() const {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot());
if (!snapshot) {
return std::vector<std::shared_ptr<FileScanTask>>{};
}
TableMetadataCache metadata_cache(metadata_.get());
ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById());
SnapshotCache snapshot_cache(snapshot.get());
ICEBERG_ASSIGN_OR_RAISE(auto data_manifests, snapshot_cache.DataManifests(io_));
ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests, snapshot_cache.DeleteManifests(io_));
ICEBERG_ASSIGN_OR_RAISE(
auto manifest_group,
ManifestGroup::Make(io_, schema_, specs_by_id,
{data_manifests.begin(), data_manifests.end()},
{delete_manifests.begin(), delete_manifests.end()}));
manifest_group->CaseSensitive(context_.case_sensitive)
.Select(ScanColumns())
.FilterData(filter())
.IgnoreDeleted()
.ColumnsToKeepStats(context_.columns_to_keep_stats);
if (context_.ignore_residuals) {
manifest_group->IgnoreResiduals();
}
return manifest_group->PlanFiles();
}
} // namespace iceberg