blob: b5a9b15d48d837657a0dfac070ff141f1850c20a [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
import (
"cmp"
"context"
"fmt"
"iter"
"math"
"slices"
"strings"
"sync"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
"golang.org/x/sync/errgroup"
)
const ScanNoLimit = -1
type keyDefaultMap[K comparable, V any] struct {
defaultFactory func(K) V
data map[K]V
mx sync.RWMutex
}
func (k *keyDefaultMap[K, V]) Get(key K) V {
k.mx.RLock()
if v, ok := k.data[key]; ok {
k.mx.RUnlock()
return v
}
k.mx.RUnlock()
k.mx.Lock()
defer k.mx.Unlock()
// race check between RLock and Lock
if v, ok := k.data[key]; ok {
return v
}
v := k.defaultFactory(key)
k.data[key] = v
return v
}
func newKeyDefaultMap[K comparable, V any](factory func(K) V) *keyDefaultMap[K, V] {
return &keyDefaultMap[K, V]{
data: make(map[K]V),
defaultFactory: factory,
}
}
func newKeyDefaultMapWrapErr[K comparable, V any](factory func(K) (V, error)) *keyDefaultMap[K, V] {
return &keyDefaultMap[K, V]{
data: make(map[K]V),
defaultFactory: func(k K) V {
v, err := factory(k)
if err != nil {
panic(err)
}
return v
},
}
}
type partitionRecord []any
func (p partitionRecord) Size() int { return len(p) }
func (p partitionRecord) Get(pos int) any { return p[pos] }
func (p partitionRecord) Set(pos int, val any) { p[pos] = val }
// manifestEntries holds the data, positional delete, and equality delete
// entries read from manifests.
type manifestEntries struct {
dataEntries []iceberg.ManifestEntry
positionalDeleteEntries []iceberg.ManifestEntry
equalityDeleteEntries []iceberg.ManifestEntry
dvEntries []iceberg.ManifestEntry
mu sync.Mutex
}
func newManifestEntries() *manifestEntries {
return &manifestEntries{
dataEntries: make([]iceberg.ManifestEntry, 0),
positionalDeleteEntries: make([]iceberg.ManifestEntry, 0),
equalityDeleteEntries: make([]iceberg.ManifestEntry, 0),
dvEntries: make([]iceberg.ManifestEntry, 0),
}
}
func (m *manifestEntries) addDataEntry(e iceberg.ManifestEntry) {
m.mu.Lock()
defer m.mu.Unlock()
m.dataEntries = append(m.dataEntries, e)
}
func (m *manifestEntries) addPositionalDeleteEntry(e iceberg.ManifestEntry) {
m.mu.Lock()
defer m.mu.Unlock()
m.positionalDeleteEntries = append(m.positionalDeleteEntries, e)
}
func (m *manifestEntries) addEqualityDeleteEntry(e iceberg.ManifestEntry) {
m.mu.Lock()
defer m.mu.Unlock()
m.equalityDeleteEntries = append(m.equalityDeleteEntries, e)
}
func (m *manifestEntries) addDVEntry(e iceberg.ManifestEntry) {
m.mu.Lock()
defer m.mu.Unlock()
m.dvEntries = append(m.dvEntries, e)
}
func newPartitionRecord(partitionData map[int]any, partitionType *iceberg.StructType) partitionRecord {
out := make(partitionRecord, len(partitionType.FieldList))
for i, f := range partitionType.FieldList {
out[i] = partitionData[f.ID]
}
return out
}
// GetPartitionRecord converts a DataFile's partition map into a positional
// record ordered by the fields of the given partition struct type.
func GetPartitionRecord(dataFile iceberg.DataFile, partitionType *iceberg.StructType) iceberg.StructLike {
return newPartitionRecord(dataFile.Partition(), partitionType)
}
func openManifest(io io.IO, manifest iceberg.ManifestFile,
partitionFilter, metricsEval func(iceberg.DataFile) (bool, error),
) ([]iceberg.ManifestEntry, error) {
// Counts may be -1 (unset) on V1 manifests, so clamp before allocating.
out := make([]iceberg.ManifestEntry, 0, max(0, int(manifest.AddedDataFiles())+int(manifest.ExistingDataFiles())))
for entry, err := range manifest.Entries(io, true) {
if err != nil {
return nil, err
}
p, err := partitionFilter(entry.DataFile())
if err != nil {
return nil, err
}
m, err := metricsEval(entry.DataFile())
if err != nil {
return nil, err
}
if p && m {
out = append(out, entry)
}
}
return out, nil
}
func isDeletionVector(df iceberg.DataFile) bool {
return df.ReferencedDataFile() != nil
}
type Scan struct {
metadata Metadata
ioF FSysF
rowFilter iceberg.BooleanExpression
selectedFields []string
caseSensitive bool
snapshotID *int64
asOfTimestamp *int64
options iceberg.Properties
limit int64
partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression]
concurrency int
}
func (scan *Scan) UseRowLimit(n int64) *Scan {
out := *scan
out.limit = n
return &out
}
func (scan *Scan) UseRef(name string) (*Scan, error) {
if scan.snapshotID != nil {
return nil, fmt.Errorf("%w: cannot override ref, already set snapshot id %d",
iceberg.ErrInvalidArgument, *scan.snapshotID)
}
if snap := scan.metadata.SnapshotByName(name); snap != nil {
out := *scan
out.snapshotID = &snap.SnapshotID
out.partitionFilters = newKeyDefaultMapWrapErr(out.buildPartitionProjection)
return &out, nil
}
return nil, fmt.Errorf("%w: cannot scan unknown ref=%s", iceberg.ErrInvalidArgument, name)
}
func (scan *Scan) Snapshot() *Snapshot {
if scan.snapshotID != nil {
return scan.metadata.SnapshotByID(*scan.snapshotID)
}
if scan.asOfTimestamp != nil {
entries := slices.Collect(scan.metadata.SnapshotLogs())
for i := len(entries) - 1; i >= 0; i-- {
entry := entries[i]
if entry.TimestampMs <= *scan.asOfTimestamp {
return scan.metadata.SnapshotByID(entry.SnapshotID)
}
}
}
return scan.metadata.CurrentSnapshot()
}
func (scan *Scan) Projection() (*iceberg.Schema, error) {
curSchema := scan.metadata.CurrentSchema()
curVersion := scan.metadata.Version()
caseSensitive := scan.caseSensitive
if scan.snapshotID != nil {
snap := scan.metadata.SnapshotByID(*scan.snapshotID)
if snap == nil {
return nil, fmt.Errorf("%w: snapshot not found: %d", ErrInvalidOperation, *scan.snapshotID)
}
if snap.SchemaID != nil {
for _, schema := range scan.metadata.Schemas() {
if schema.ID == *snap.SchemaID {
curSchema = schema
break
}
}
}
}
if slices.Contains(scan.selectedFields, "*") {
return curSchema, nil
}
selectedFieldsMeta := metaFieldsFromSelectedFields(scan.selectedFields, caseSensitive)
schemaMeta := metaFieldsFromSchema(curSchema)
synthesisMeta := synthesizeMeta(selectedFieldsMeta, schemaMeta)
if len(synthesisMeta) > 0 && curVersion >= minFormatVersionRowLineage {
// synthesis path
removedMetaSlice, missingMetaFields := removeMetadataFromSelectedFields(scan.selectedFields, synthesisMeta)
sch, err := curSchema.Select(scan.caseSensitive, removedMetaSlice...)
if err != nil {
return nil, err
}
return iceberg.NewSchemaWithIdentifiers(sch.ID, sch.IdentifierFieldIDs, append(sch.Fields(), missingMetaFields...)...), nil
}
return curSchema.Select(scan.caseSensitive, scan.selectedFields...)
}
func (scan *Scan) buildPartitionProjection(specID int) (iceberg.BooleanExpression, error) {
return buildPartitionProjection(specID, scan.metadata, scan.rowFilter, scan.caseSensitive)
}
func buildPartitionProjection(specID int, meta Metadata, rowFilter iceberg.BooleanExpression, caseSensitive bool) (iceberg.BooleanExpression, error) {
spec := meta.PartitionSpecByID(specID)
if spec == nil {
return nil, fmt.Errorf("%w: id %d", ErrPartitionSpecNotFound, specID)
}
project := newInclusiveProjection(meta.CurrentSchema(), *spec, caseSensitive)
return project(rowFilter)
}
func (scan *Scan) buildManifestEvaluator(specID int) (func(iceberg.ManifestFile) (bool, error), error) {
return buildManifestEvaluator(specID, scan.metadata, scan.partitionFilters, scan.caseSensitive)
}
func buildManifestEvaluator(specID int, metadata Metadata, partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression], caseSensitive bool) (func(iceberg.ManifestFile) (bool, error), error) {
spec := metadata.PartitionSpecByID(specID)
if spec == nil {
return nil, fmt.Errorf("%w: id %d", ErrPartitionSpecNotFound, specID)
}
return newManifestEvaluator(*spec, metadata.CurrentSchema(),
partitionFilters.Get(specID), caseSensitive)
}
func (scan *Scan) buildPartitionEvaluator(specID int) (func(iceberg.DataFile) (bool, error), error) {
return buildPartitionEvaluator(specID, scan.metadata, scan.partitionFilters, scan.caseSensitive)
}
func buildPartitionEvaluator(specID int, metadata Metadata, partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression], caseSensitive bool) (func(iceberg.DataFile) (bool, error), error) {
spec := metadata.PartitionSpecByID(specID)
if spec == nil {
return nil, fmt.Errorf("%w: id %d", ErrPartitionSpecNotFound, specID)
}
partType := spec.PartitionType(metadata.CurrentSchema())
partSchema := iceberg.NewSchema(0, partType.FieldList...)
fn, err := iceberg.ExpressionEvaluator(partSchema, partitionFilters.Get(specID), caseSensitive)
if err != nil {
return nil, err
}
return func(d iceberg.DataFile) (bool, error) {
return fn(GetPartitionRecord(d, partType))
}, nil
}
func (scan *Scan) checkSequenceNumber(minSeqNum int64, manifest iceberg.ManifestFile) bool {
return manifest.ManifestContent() == iceberg.ManifestContentData ||
(manifest.ManifestContent() == iceberg.ManifestContentDeletes &&
manifest.SequenceNum() >= minSeqNum)
}
func minSequenceNum(manifests []iceberg.ManifestFile) int64 {
var n int64 = math.MaxInt64
for _, m := range manifests {
if m.ManifestContent() == iceberg.ManifestContentData {
n = min(n, m.MinSequenceNum())
}
}
if n == math.MaxInt64 {
return 0
}
return n
}
func matchDeletesToData(entry iceberg.ManifestEntry, positionalDeletes []iceberg.ManifestEntry) ([]iceberg.DataFile, error) {
idx, _ := slices.BinarySearchFunc(positionalDeletes, entry, func(me1, me2 iceberg.ManifestEntry) int {
return cmp.Compare(me1.SequenceNum(), me2.SequenceNum())
})
evaluator, err := newInclusiveMetricsEvaluator(iceberg.PositionalDeleteSchema,
iceberg.EqualTo(iceberg.Reference("file_path"), entry.DataFile().FilePath()), true, false)
if err != nil {
return nil, err
}
out := make([]iceberg.DataFile, 0)
for _, relevant := range positionalDeletes[idx:] {
df := relevant.DataFile()
ok, err := evaluator(df)
if err != nil {
return nil, err
}
if ok {
out = append(out, df)
}
}
return out, nil
}
// matchEqualityDeletesToData returns the equality delete files that apply to
// the given data entry. An equality delete applies when:
// - it has a strictly greater sequence number than the data file
// - it shares the same partition (for partitioned tables)
//
// The "strictly greater" rule ensures that data files committed in the same
// snapshot as the equality deletes are not affected — this is how RowDelta
// atomically adds new rows alongside deletes for old rows.
func matchEqualityDeletesToData(dataEntry iceberg.ManifestEntry, eqDeleteEntries []iceberg.ManifestEntry) []iceberg.DataFile {
dataSeqNum := dataEntry.SequenceNum()
dataPartition := dataEntry.DataFile().Partition()
out := make([]iceberg.DataFile, 0)
for _, del := range eqDeleteEntries {
// Equality deletes only apply to data files with a strictly lower
// sequence number.
if del.SequenceNum() <= dataSeqNum {
continue
}
// For partitioned tables, equality deletes must share the same
// partition as the data file. Unpartitioned deletes (nil/empty
// partition) apply globally.
delPartition := del.DataFile().Partition()
if len(delPartition) > 0 && len(dataPartition) > 0 {
if !partitionsMatch(dataPartition, delPartition) {
continue
}
}
out = append(out, del.DataFile())
}
return out
}
func partitionsMatch(a, b map[int]any) bool {
if len(a) != len(b) {
return false
}
for k, v := range a {
if bv, ok := b[k]; !ok || bv != v {
return false
}
}
return true
}
// fetchPartitionSpecFilteredManifests retrieves the table's current snapshot,
// fetches its manifest files, and applies partition-spec filters to remove irrelevant manifests.
func (scan *Scan) fetchPartitionSpecFilteredManifests(ctx context.Context) ([]iceberg.ManifestFile, error) {
snap := scan.Snapshot()
if snap == nil {
return nil, nil
}
afs, err := scan.ioF(ctx)
if err != nil {
return nil, err
}
// Fetch all manifests for the current snapshot.
manifestList, err := snap.Manifests(afs)
if err != nil {
return nil, err
}
// Build per-spec manifest evaluators and filter out irrelevant manifests.
manifestEvaluators := newKeyDefaultMapWrapErr(scan.buildManifestEvaluator)
manifestList = slices.DeleteFunc(manifestList, func(mf iceberg.ManifestFile) bool {
eval := manifestEvaluators.Get(int(mf.PartitionSpecID()))
use, err := eval(mf)
return !use || err != nil
})
return manifestList, nil
}
// collectManifestEntries concurrently opens manifests, applies partition and metrics
// filters, and accumulates both data entries and positional-delete entries.
func (scan *Scan) collectManifestEntries(
ctx context.Context,
manifestList []iceberg.ManifestFile,
) (*manifestEntries, error) {
metricsEval, err := newInclusiveMetricsEvaluator(
scan.metadata.CurrentSchema(),
scan.rowFilter,
scan.caseSensitive,
scan.options["include_empty_files"] == "true",
)
if err != nil {
return nil, err
}
minSeqNum := minSequenceNum(manifestList)
concurrencyLimit := min(scan.concurrency, len(manifestList))
entries := newManifestEntries()
g, _ := errgroup.WithContext(ctx)
g.SetLimit(concurrencyLimit)
partitionEvaluators := newKeyDefaultMapWrapErr(scan.buildPartitionEvaluator)
for _, mf := range manifestList {
if !scan.checkSequenceNumber(minSeqNum, mf) {
continue
}
g.Go(func() error {
fs, err := scan.ioF(ctx)
if err != nil {
return err
}
partEval := partitionEvaluators.Get(int(mf.PartitionSpecID()))
manifestEntries, err := openManifest(fs, mf, partEval, metricsEval)
if err != nil {
return err
}
for _, e := range manifestEntries {
df := e.DataFile()
switch df.ContentType() {
case iceberg.EntryContentData:
entries.addDataEntry(e)
case iceberg.EntryContentPosDeletes:
if isDeletionVector(e.DataFile()) {
entries.addDVEntry(e)
} else {
entries.addPositionalDeleteEntry(e)
}
case iceberg.EntryContentEqDeletes:
entries.addEqualityDeleteEntry(e)
default:
return fmt.Errorf("%w: unknown DataFileContent type (%s): %s",
ErrInvalidMetadata, df.ContentType(), e)
}
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return entries, nil
}
// PlanFiles orchestrates the fetching and filtering of manifests, and then
// building a list of FileScanTasks that match the current Scan criteria.
func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
if scan.asOfTimestamp != nil {
var snapshot *Snapshot
entries := slices.Collect(scan.metadata.SnapshotLogs())
for i := len(entries) - 1; i >= 0; i-- {
entry := entries[i]
if entry.TimestampMs <= *scan.asOfTimestamp {
snapshot = scan.metadata.SnapshotByID(entry.SnapshotID)
break
}
}
if snapshot == nil {
return nil, fmt.Errorf("no snapshot found for timestamp %d", *scan.asOfTimestamp)
}
scan.snapshotID = &snapshot.SnapshotID
scan.asOfTimestamp = nil
}
// Step 1: Retrieve filtered manifests based on snapshot and partition specs.
manifestList, err := scan.fetchPartitionSpecFilteredManifests(ctx)
if err != nil || len(manifestList) == 0 {
return nil, err
}
// Step 2: Read manifest entries concurrently, accumulating data and positional deletes.
entries, err := scan.collectManifestEntries(ctx, manifestList)
if err != nil {
return nil, err
}
// Step 3: Sort positional deletes and match them to data files.
slices.SortFunc(entries.positionalDeleteEntries, func(a, b iceberg.ManifestEntry) int {
return cmp.Compare(a.SequenceNum(), b.SequenceNum())
})
// Index DVs by referenced data file path for O(1) lookup.
dvIndex := make(map[string][]iceberg.DataFile, len(entries.dvEntries))
for _, del := range entries.dvEntries {
if ref := del.DataFile().ReferencedDataFile(); ref != nil {
dvIndex[*ref] = append(dvIndex[*ref], del.DataFile())
}
}
results := make([]FileScanTask, 0, len(entries.dataEntries))
for _, e := range entries.dataEntries {
deleteFiles, err := matchDeletesToData(e, entries.positionalDeleteEntries)
if err != nil {
return nil, err
}
eqDeleteFiles := matchEqualityDeletesToData(e, entries.equalityDeleteEntries)
task := FileScanTask{
File: e.DataFile(),
DeleteFiles: deleteFiles,
EqualityDeleteFiles: eqDeleteFiles,
DeletionVectorFiles: dvIndex[e.DataFile().FilePath()],
Start: 0,
Length: e.DataFile().FileSizeBytes(),
}
// Row lineage constants: readers use these to synthesize _row_id and
// _last_updated_sequence_number when requested.
task.FirstRowID = e.DataFile().FirstRowID()
if fseq := e.FileSequenceNum(); fseq != nil {
task.DataSequenceNumber = fseq
}
results = append(results, task)
}
return results, nil
}
type FileScanTask struct {
File iceberg.DataFile
DeleteFiles []iceberg.DataFile // positional delete files
EqualityDeleteFiles []iceberg.DataFile // equality delete files
DeletionVectorFiles []iceberg.DataFile // deletion vectors (puffin files)
Start, Length int64
// Row lineage (v3): constants used when reading to synthesize _row_id and _last_updated_sequence_number.
// FirstRowID is the effective first_row_id for this file (from manifest entry, after inheritance).
// DataSequenceNumber is the data sequence number of the file's manifest entry.
FirstRowID *int64
DataSequenceNumber *int64
}
// ToArrowRecords returns the arrow schema of the expected records and an interator
// that can be used with a range expression to read the records as they are available.
// If an error is encountered, during the planning and setup then this will return the
// error directly. If the error occurs while iterating the records, it will be returned
// by the iterator.
//
// The purpose for returning the schema up front is to handle the case where there are no
// rows returned. The resulting Arrow Schema of the projection will still be known.
func (scan *Scan) ToArrowRecords(ctx context.Context) (*arrow.Schema, iter.Seq2[arrow.RecordBatch, error], error) {
tasks, err := scan.PlanFiles(ctx)
if err != nil {
return nil, nil, err
}
return scan.ReadTasks(ctx, tasks)
}
// ReadTasks reads Arrow records from a specific set of FileScanTasks, applying the
// scan's projection, row filters, and positional delete handling. This is useful when
// the caller has already planned or selected specific tasks to read.
func (scan *Scan) ReadTasks(ctx context.Context, tasks []FileScanTask) (*arrow.Schema, iter.Seq2[arrow.RecordBatch, error], error) {
var (
boundFilter iceberg.BooleanExpression
err error
)
if scan.rowFilter != nil {
boundFilter, err = iceberg.BindExpr(scan.metadata.CurrentSchema(), scan.rowFilter, scan.caseSensitive)
if err != nil {
return nil, nil, err
}
}
schema, err := scan.Projection()
if err != nil {
return nil, nil, err
}
fs, err := scan.ioF(ctx)
if err != nil {
return nil, nil, err
}
return (&arrowScan{
metadata: scan.metadata,
fs: fs,
projectedSchema: schema,
boundRowFilter: boundFilter,
caseSensitive: scan.caseSensitive,
rowLimit: scan.limit,
options: scan.options,
concurrency: scan.concurrency,
}).GetRecords(ctx, tasks)
}
// ToArrowTable calls ToArrowRecords and then gathers all of the records together
// and returns an arrow.Table make from those records.
func (scan *Scan) ToArrowTable(ctx context.Context) (arrow.Table, error) {
schema, itr, err := scan.ToArrowRecords(ctx)
if err != nil {
return nil, err
}
records := make([]arrow.RecordBatch, 0)
for rec, err := range itr {
if err != nil {
return nil, err
}
defer rec.Release()
records = append(records, rec)
}
return array.NewTableFromRecords(schema, records), nil
}
// Removes metaFields from selectedField if it exists. Returns a []string representing the filtered selectedFields
// and an iceberg.NestedField[] representing the removed metadata. Note that metaFields is passed in
// after being validated from metaFieldsFromSelectedFields.
func removeMetadataFromSelectedFields(selectedFields []string, metaFields []string) ([]string, []iceberg.NestedField) {
filteredFields := []string{}
meta := []iceberg.NestedField{}
for _, field := range selectedFields {
if slices.Contains(metaFields, strings.ToLower(field)) {
switch strings.ToLower(field) {
case iceberg.LastUpdatedSequenceNumberColumnName:
meta = append(meta, iceberg.LastUpdatedSequenceNumber())
case iceberg.RowIDColumnName:
meta = append(meta, iceberg.RowID())
}
continue
}
filteredFields = append(filteredFields, field)
}
return filteredFields, meta
}
func metaFieldsFromSelectedFields(selectedFields []string, caseSensitive bool) []string {
meta := []string{}
if !caseSensitive {
for _, field := range selectedFields {
if strings.EqualFold(field, iceberg.RowIDColumnName) || strings.EqualFold(field, iceberg.LastUpdatedSequenceNumberColumnName) {
meta = append(meta, strings.ToLower(field))
}
}
return meta
}
for _, field := range selectedFields {
if field == iceberg.RowIDColumnName || field == iceberg.LastUpdatedSequenceNumberColumnName {
meta = append(meta, strings.ToLower(field))
}
}
return meta
}
// Takes in a *iceberg.Schema and returns a []string representing the row lineage metadata present
// in the schema.
func metaFieldsFromSchema(sch *iceberg.Schema) []string {
meta := []string{}
_, hasRowIDMeta := sch.FindFieldByName(iceberg.RowIDColumnName)
_, hasSeqMeta := sch.FindFieldByName(iceberg.LastUpdatedSequenceNumberColumnName)
if hasRowIDMeta {
meta = append(meta, iceberg.RowIDColumnName)
}
if hasSeqMeta {
meta = append(meta, iceberg.LastUpdatedSequenceNumberColumnName)
}
return meta
}
// Any metadata which is in selectedFieldsMeta and not in schemaMeta is a synthesis meta
func synthesizeMeta(selectedFieldsMeta []string, schemaMeta []string) []string {
synthesis := []string{}
for _, f := range selectedFieldsMeta {
if !slices.Contains(schemaMeta, f) {
synthesis = append(synthesis, f)
}
}
return synthesis
}