blob: 3dd78057b132206a194ba7912329bcf7c0cab330 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"iter"
"log"
"math/rand/v2"
"runtime"
"slices"
"strings"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/internal"
icebergio "github.com/apache/iceberg-go/io"
tblutils "github.com/apache/iceberg-go/table/internal"
"github.com/klauspost/compress/zstd"
"golang.org/x/sync/errgroup"
)
// ErrCommitFailed is the sentinel error returned by catalogs when a
// commit fails due to a concurrent modification (e.g. HTTP 409 Conflict
// from the REST catalog). Catalog implementations should wrap this
// error so that callers using errors.Is(err, table.ErrCommitFailed)
// can detect retryable commit conflicts.
//
// Currently only catalog/rest wraps this sentinel; Glue, SQL, and Hive
// catalogs return their conflict errors raw and will not trigger
// retries until follow-up work wires them through (tracked under
// issue #830).
var ErrCommitFailed = errors.New("commit failed, refresh and try again")
// ErrWriteIORequired is returned by doCommit when the table's file system
// does not implement io.WriteFileIO. Manifest-list rebuild on retry requires
// write access; failing fast here is preferable to silently skipping the
// rebuild and reintroducing the stale-parent data-loss bug. Callers that
// need to detect this condition should use errors.Is(err, ErrWriteIORequired).
var ErrWriteIORequired = errors.New("commit: file system does not implement WriteFileIO")
// ErrSnapshotNotFound is returned (wrapped) by metadata lookups and by
// computeOwnManifests when a snapshot ID does not exist in the table's
// snapshot list. Tests pin meaning via errors.Is(err, ErrSnapshotNotFound).
var ErrSnapshotNotFound = errors.New("snapshot not found")
type FSysF func(ctx context.Context) (icebergio.IO, error)
type Identifier = []string
type CatalogIO interface {
LoadTable(context.Context, Identifier) (*Table, error)
CommitTable(context.Context, Identifier, []Requirement, []Update) (Metadata, string, error)
}
type Table struct {
identifier Identifier
metadata Metadata
metadataLocation string
cat CatalogIO
fsF FSysF
}
func (t Table) Equals(other Table) bool {
return slices.Equal(t.identifier, other.identifier) &&
t.metadataLocation == other.metadataLocation &&
t.metadata.Equals(other.metadata)
}
func (t Table) Identifier() Identifier { return t.identifier }
func (t Table) Metadata() Metadata { return t.metadata }
func (t Table) MetadataLocation() string { return t.metadataLocation }
func (t Table) FS(ctx context.Context) (icebergio.IO, error) { return t.fsF(ctx) }
func (t Table) Schema() *iceberg.Schema { return t.metadata.CurrentSchema() }
func (t Table) Spec() iceberg.PartitionSpec { return t.metadata.PartitionSpec() }
func (t Table) SortOrder() SortOrder { return t.metadata.SortOrder() }
func (t Table) Properties() iceberg.Properties { return t.metadata.Properties() }
func (t Table) NameMapping() iceberg.NameMapping { return t.metadata.NameMapping() }
func (t Table) Location() string { return t.metadata.Location() }
func (t Table) CurrentSnapshot() *Snapshot { return t.metadata.CurrentSnapshot() }
func (t Table) SnapshotByID(id int64) *Snapshot { return t.metadata.SnapshotByID(id) }
func (t Table) SnapshotByName(name string) *Snapshot { return t.metadata.SnapshotByName(name) }
func (t Table) Schemas() map[int]*iceberg.Schema {
m := make(map[int]*iceberg.Schema)
for _, s := range t.metadata.Schemas() {
m[s.ID] = s
}
return m
}
func (t Table) LocationProvider() (LocationProvider, error) {
return LoadLocationProvider(t.metadata.Location(), t.metadata.Properties())
}
func (t Table) NewTransaction() *Transaction {
return t.NewTransactionOnBranch(MainBranch)
}
// NewTransactionOnBranch creates a new transaction that commits to the named
// branch. Use [NewTransaction] to commit to the default "main" branch.
func (t Table) NewTransactionOnBranch(branch string) *Transaction {
meta, _ := MetadataBuilderFromBase(t.metadata, t.metadataLocation)
return &Transaction{
tbl: &t,
meta: meta,
branch: branch,
reqs: []Requirement{},
}
}
func (t *Table) Refresh(ctx context.Context) error {
fresh, err := t.cat.LoadTable(ctx, t.identifier)
if err != nil {
return err
}
t.metadata = fresh.metadata
t.fsF = fresh.fsF
t.metadataLocation = fresh.metadataLocation
return nil
}
// AppendTable is a shortcut for NewTransaction().AppendTable() and then committing the transaction
func (t Table) AppendTable(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties) (*Table, error) {
txn := t.NewTransaction()
if err := txn.AppendTable(ctx, tbl, batchSize, snapshotProps); err != nil {
return nil, err
}
return txn.Commit(ctx)
}
// Append is a shortcut for NewTransaction().Append() and then committing the transaction
func (t Table) Append(ctx context.Context, rdr array.RecordReader, snapshotProps iceberg.Properties) (*Table, error) {
txn := t.NewTransaction()
if err := txn.Append(ctx, rdr, snapshotProps); err != nil {
return nil, err
}
return txn.Commit(ctx)
}
// OverwriteTable is a shortcut for NewTransaction().OverwriteTable() and then committing the transaction.
//
// An optional filter (see WithOverwriteFilter) determines which existing data to delete or rewrite:
// - If filter is nil or AlwaysTrue, all existing data files are deleted and replaced with new data.
// - If a filter is provided, it acts as a row-level predicate on existing data:
// - Files where all rows match the filter (strict match) are completely deleted
// - Files where some rows match and others don't (partial match) are rewritten to keep only non-matching rows
// - Files where no rows match the filter are kept unchanged
//
// The filter uses both inclusive and strict metrics evaluators on file statistics to classify files:
// - Inclusive evaluator identifies candidate files that may contain matching rows
// - Strict evaluator determines if all rows in a file must match the filter
// - Files that pass inclusive but not strict evaluation are rewritten with filtered data
//
// New data from the provided table is written to the table regardless of the filter.
//
// The batchSize parameter refers to the batch size for reading the input data, not the batch size for writes.
// The concurrency parameter controls the level of parallelism for manifest processing and file rewriting and
// can be overridden using the WithOverwriteConcurrency option. Defaults to runtime.GOMAXPROCS(0).
func (t Table) OverwriteTable(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties, opts ...OverwriteOption) (*Table, error) {
txn := t.NewTransaction()
if err := txn.OverwriteTable(ctx, tbl, batchSize, snapshotProps, opts...); err != nil {
return nil, err
}
return txn.Commit(ctx)
}
// Overwrite is a shortcut for NewTransaction().Overwrite() and then committing the transaction.
//
// An optional filter (see WithOverwriteFilter) determines which existing data to delete or rewrite:
// - If filter is nil or AlwaysTrue, all existing data files are deleted and replaced with new data.
// - If a filter is provided, it acts as a row-level predicate on existing data:
// - Files where all rows match the filter (strict match) are completely deleted
// - Files where some rows match and others don't (partial match) are rewritten to keep only non-matching rows
// - Files where no rows match the filter are kept unchanged
//
// The filter uses both inclusive and strict metrics evaluators on file statistics to classify files:
// - Inclusive evaluator identifies candidate files that may contain matching rows
// - Strict evaluator determines if all rows in a file must match the filter
// - Files that pass inclusive but not strict evaluation are rewritten with filtered data
//
// New data from the provided RecordReader is written to the table regardless of the filter.
//
// The concurrency parameter controls the level of parallelism for manifest processing and file rewriting and
// can be overridden using the WithOverwriteConcurrency option. Defaults to runtime.GOMAXPROCS(0).
func (t Table) Overwrite(ctx context.Context, rdr array.RecordReader, snapshotProps iceberg.Properties, opts ...OverwriteOption) (*Table, error) {
txn := t.NewTransaction()
if err := txn.Overwrite(ctx, rdr, snapshotProps, opts...); err != nil {
return nil, err
}
return txn.Commit(ctx)
}
// Delete is a shortcut for NewTransaction().Delete() and then committing the transaction.
//
// The provided filter acts as a row-level predicate on existing data:
// - Files where all rows match the filter (strict match) are completely deleted
// - Files where some rows match and others don't (partial match) are rewritten to keep only non-matching rows
// - Files where no rows match the filter are kept unchanged
//
// The filter uses both inclusive and strict metrics evaluators on file statistics to classify files:
// - Inclusive evaluator identifies candidate files that may contain matching rows
// - Strict evaluator determines if all rows in a file must match the filter
// - Files that pass inclusive but not strict evaluation are rewritten with filtered data
//
// The concurrency parameter controls the level of parallelism for manifest processing and file rewriting and
// can be overridden using the WithOverwriteConcurrency option. Defaults to runtime.GOMAXPROCS(0).
func (t Table) Delete(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties, opts ...DeleteOption) (*Table, error) {
txn := t.NewTransaction()
if err := txn.Delete(ctx, filter, snapshotProps, opts...); err != nil {
return nil, err
}
return txn.Commit(ctx)
}
func (t Table) AllManifests(ctx context.Context) iter.Seq2[iceberg.ManifestFile, error] {
fs, err := t.fsF(ctx)
if err != nil {
return func(yield func(iceberg.ManifestFile, error) bool) {
yield(nil, err)
}
}
type list = tblutils.Enumerated[[]iceberg.ManifestFile]
g := errgroup.Group{}
n := len(t.metadata.Snapshots())
ch := make(chan list, n)
for i, sn := range t.metadata.Snapshots() {
g.Go(func() error {
manifests, err := sn.Manifests(fs)
if err != nil {
return err
}
ch <- list{Index: i, Value: manifests, Last: i == n-1}
return nil
})
}
errch := make(chan error, 1)
go func() {
defer close(errch)
defer close(ch)
if err := g.Wait(); err != nil {
errch <- err
}
}()
results := tblutils.MakeSequencedChan(uint(n), ch,
func(left, right *list) bool {
switch {
case left.Index < 0:
return true
case right.Index < 0:
return false
default:
return left.Index < right.Index
}
}, func(prev, next *list) bool {
if prev.Index < 0 {
return next.Index == 0
}
return next.Index == prev.Index+1
}, list{Index: -1})
return func(yield func(iceberg.ManifestFile, error) bool) {
defer func() {
// drain channels if we exited early
go func() {
for range results {
}
for range errch {
}
}()
}()
for {
select {
case err := <-errch:
if err != nil {
yield(nil, err)
return
}
case next, ok := <-results:
for _, mf := range next.Value {
if !yield(mf, nil) {
return
}
}
if next.Last || !ok {
return
}
}
}
}
}
// conflictValidatorFunc runs a single producer's client-side conflict
// check against a pre-built conflictContext. Validators return a wrapped
// ErrCommit* sentinel on retryable conflict, ErrCommitDiverged on
// terminal divergence, or nil on success.
type conflictValidatorFunc func(cc *conflictContext) error
// commitOpts controls optional behavior of doCommit beyond the core
// updates/requirements loop. All fields are zero-valued by default and
// callers opt in via the commitOption functional options passed to
// doCommit.
type commitOpts struct {
// branch is the ref the commit targets. When empty, pre-flight
// conflict validation is skipped because no conflictContext can
// be built. Direct doCommit callers (unit tests, low-level utils)
// may leave this empty; Transaction.Commit always sets it.
branch string
// validators runs once before cat.CommitTable on the first attempt
// only. Refresh-and-replay across retries is deferred to PR 2.5.
validators []conflictValidatorFunc
}
type commitOption func(*commitOpts)
// withCommitBranch sets the target branch for the pre-flight
// conflict-validation walk. An empty branch is treated as the main
// branch — Transaction.branch is empty when the caller never picked
// one explicitly, and the implicit default is main.
func withCommitBranch(branch string) commitOption {
if branch == "" {
branch = MainBranch
}
return func(o *commitOpts) { o.branch = branch }
}
func withCommitValidators(vs ...conflictValidatorFunc) commitOption {
return func(o *commitOpts) { o.validators = append(o.validators, vs...) }
}
func (t Table) doCommit(ctx context.Context, updates []Update, reqs []Requirement, opts ...commitOption) (*Table, error) {
var co commitOpts
for _, apply := range opts {
apply(&co)
}
cfg := readRetryConfig(t.metadata.Properties())
// Bound total retry time with a derived context so both the wait loop
// and the CommitTable call itself respect the deadline uniformly.
retryCtx, cancel := context.WithTimeout(ctx, time.Duration(cfg.totalTimeoutMs)*time.Millisecond)
defer cancel()
fs, err := t.fsF(ctx)
if err != nil {
return nil, err
}
// Every real commit-path FS implements WriteFileIO. Failing here is
// preferable to silently skipping the manifest-list rebuild inside the
// retry loop — a skip reintroduces the original stale-parent data loss.
wfs, ok := fs.(icebergio.WriteFileIO)
if !ok {
return nil, fmt.Errorf("%w: manifest list rebuild requires write access", ErrWriteIORequired)
}
var (
newMeta Metadata
newLoc string
timer *time.Timer
orphanedManifests []string // manifest-list files orphaned by rebuilds
)
// cleanupOrphans controls whether the defer below removes orphaned manifest-list
// files on exit. It defaults to true (clean on all safe exits) and is set to
// false only for the one unsafe case: a non-ErrCommitFailed error from
// CommitTable, where the catalog may have silently accepted the commit and one
// of the "orphaned" files may actually be the live snapshot.
cleanupOrphans := true
defer func() {
if !cleanupOrphans || len(orphanedManifests) == 0 {
return
}
for _, path := range orphanedManifests {
if removeErr := wfs.Remove(path); removeErr != nil {
log.Printf("Warning: failed to delete orphaned manifest list %s: %v", path, removeErr)
}
}
}()
// current tracks the catalog state between retries. On attempt 0 it
// equals t.metadata (so the conflict context's concurrent-snapshot
// walk is empty and validators short-circuit). On subsequent
// attempts it is the freshly-loaded post-conflict state.
current := t.metadata
// numRetries counts retries; total attempts = 1 initial + numRetries.
totalAttempts := cfg.numRetries + 1
for attempt := range totalAttempts {
if attempt != 0 {
wait := backoffDuration(attempt-1, cfg.minWaitMs, cfg.maxWaitMs)
if timer == nil {
timer = time.NewTimer(wait)
} else {
timer.Reset(wait)
}
select {
case <-retryCtx.Done():
timer.Stop()
return nil, context.Cause(retryCtx)
case <-timer.C:
}
// Refresh-and-replay: reload the catalog's current state,
// run the producers' validators against the fresh
// (base=t.metadata, current=fresh) conflict context, and
// rewrite any AssertRefSnapshotID requirements to target
// the new branch head so re-submission is not rejected
// just because a peer advanced the head with a
// non-conflicting commit.
fresh, refreshErr := t.cat.LoadTable(retryCtx, t.identifier)
if refreshErr != nil {
return nil, fmt.Errorf("refresh table for retry: %w", refreshErr)
}
current = fresh.metadata
reqs = rewriteRefSnapshotRequirements(reqs, co.branch, current)
// Rebuild snapshot manifest lists to inherit all files committed
// by concurrent writers since the snapshot was originally built.
// Without this, the new snapshot's manifest list would only
// contain its own files and callers scanning the current snapshot
// would miss every concurrent writer's data.
rebuiltUpdates, orphaned, rebuildErr := rebuildSnapshotUpdates(retryCtx, updates, current, co.branch, wfs, int(attempt))
if rebuildErr != nil {
return nil, fmt.Errorf("rebuild manifest list for retry attempt %d: %w", attempt, rebuildErr)
}
orphanedManifests = append(orphanedManifests, orphaned...)
updates = rebuiltUpdates
}
// Pre-flight client-side conflict validation. Producers can
// reject commits whose semantics are violated by concurrent
// peers (partition-filter overlap, referenced-file removal)
// even when the catalog-side AssertRefSnapshotID would accept
// them. On attempt 0 base == current → no concurrent
// snapshots → validators short-circuit. Real divergence
// detection fires on attempts > 0 once `current` is the
// post-conflict state.
//
// Skipped when the branch does not exist on `current` — that
// always means "the committer is creating this branch" (e.g.
// first commit on a fresh table). There are no concurrent
// snapshots on a branch that does not yet exist, and
// newConflictContext would otherwise return ErrCommitDiverged.
if co.branch != "" && len(co.validators) > 0 && current.SnapshotByName(co.branch) != nil {
// caseSensitive is hardcoded to true here: transaction-
// level case-sensitivity is not yet threaded through the
// Commit path, and true is the scan default throughout the
// codebase.
cc, ccErr := newConflictContext(t.metadata, current, co.branch, fs, true)
if ccErr != nil {
// ErrCommitDiverged — terminal, do not retry. The
// sentinel deliberately does not wrap ErrCommitFailed.
return nil, ccErr
}
for _, v := range co.validators {
if vErr := v(cc); vErr != nil {
return nil, vErr
}
}
}
if retryCtx.Err() != nil {
return nil, context.Cause(retryCtx)
}
newMeta, newLoc, err = t.cat.CommitTable(retryCtx, t.identifier, reqs, updates)
if err == nil {
break
}
// Only retry on retryable commit conflicts. Unknown-state errors
// (5xx, gateway timeouts) must NOT be retried because the commit
// may have actually succeeded — retrying could duplicate work.
// Suppress orphan cleanup for the same reason: one of the orphaned
// manifest-list files may actually be the snapshot the catalog accepted.
if !errors.Is(err, ErrCommitFailed) {
cleanupOrphans = false
return nil, err
}
}
if err != nil {
return nil, err
}
deleteOldMetadata(fs, t.metadata, newMeta)
return New(t.identifier, newMeta, newLoc, t.fsF, t.cat), nil
}
// rewriteRefSnapshotRequirements returns a copy of reqs with every
// AssertRefSnapshotID targeting `branch` rewritten to point at the
// branch head on `fresh`. Other requirements pass through untouched.
//
// Producers register AssertRefSnapshotID at commit-build time with the
// committer's base snapshot id. After a peer advances the branch head
// with a non-conflicting commit, that assertion no longer matches the
// catalog. Without rewriting the retry would burn the budget on the
// same stale requirement; with it, validators get to decide if the
// commit is still safe to replay against the new head.
//
// Java's SnapshotProducer rewrites the same way between retries. If
// the branch is empty or the new head cannot be resolved (branch
// deleted underneath us), reqs is returned unchanged — newConflict-
// Context will surface the divergence on the next pre-flight pass.
func rewriteRefSnapshotRequirements(reqs []Requirement, branch string, fresh Metadata) []Requirement {
if branch == "" || fresh == nil {
return reqs
}
head := fresh.SnapshotByName(branch)
if head == nil {
return reqs
}
out := make([]Requirement, len(reqs))
for i, r := range reqs {
if a, ok := r.(*assertRefSnapshotID); ok && a.Ref == branch {
newID := head.SnapshotID
out[i] = AssertRefSnapshotID(branch, &newID)
continue
}
out[i] = r
}
return out
}
// rebuildSnapshotUpdates returns a new slice of updates where any
// addSnapshotUpdate that carries a rebuildManifestList closure has its
// snapshot regenerated to inherit all data files committed to the branch
// since the original snapshot was built. Updates without a rebuild closure
// pass through unchanged.
//
// It also returns the manifest-list file paths that were superseded by
// the rebuild (i.e., the paths from the input updates that were replaced).
// These become orphaned objects in object storage and should be removed
// by the caller after a successful commit.
//
// This is the manifest-layer "refresh-and-replay" step: the data files
// (already written to object storage) are reused as-is; only the manifest
// list is rewritten to include the fresh parent's manifests so that the
// rebuilt snapshot contains every committed file.
func rebuildSnapshotUpdates(ctx context.Context, updates []Update, freshMeta Metadata, branch string, fs icebergio.WriteFileIO, attempt int) (rebuilt []Update, orphanedPaths []string, err error) {
// Determine the fresh branch head to use as the rebuilt snapshot's parent.
var freshHead *Snapshot
if branch != "" && freshMeta != nil {
freshHead = freshMeta.SnapshotByName(branch)
} else if freshMeta != nil {
freshHead = freshMeta.CurrentSnapshot()
}
result := make([]Update, len(updates))
copy(result, updates)
for i, u := range result {
su, ok := u.(*addSnapshotUpdate)
if !ok || su.rebuildManifestList == nil {
continue
}
// Skip if the parent has not changed — saves an unnecessary S3 write.
if freshHead != nil && su.Snapshot.ParentSnapshotID != nil &&
*su.Snapshot.ParentSnapshotID == freshHead.SnapshotID {
continue
}
oldManifestList := su.Snapshot.ManifestList
newSnap, rebuildErr := su.rebuildManifestList(ctx, freshMeta, freshHead, fs, attempt)
if rebuildErr != nil {
return nil, nil, rebuildErr
}
result[i] = &addSnapshotUpdate{
baseUpdate: su.baseUpdate,
Snapshot: newSnap,
ownManifests: su.ownManifests,
rebuildManifestList: su.rebuildManifestList,
}
// The old manifest list is now an orphaned object in object storage.
orphanedPaths = append(orphanedPaths, oldManifestList)
}
return result, orphanedPaths, nil
}
type retryConfig struct {
numRetries uint
minWaitMs uint
maxWaitMs uint
totalTimeoutMs uint
}
func readRetryConfig(props iceberg.Properties) retryConfig {
return retryConfig{
numRetries: iceberg.PropUInt(props, CommitNumRetriesKey, CommitNumRetriesDefault),
minWaitMs: iceberg.PropUInt(props, CommitMinRetryWaitMsKey, CommitMinRetryWaitMsDefault),
maxWaitMs: iceberg.PropUInt(props, CommitMaxRetryWaitMsKey, CommitMaxRetryWaitMsDefault),
totalTimeoutMs: iceberg.PropUInt(props, CommitTotalRetryTimeoutMsKey, CommitTotalRetryTimeoutMsDefault),
}
}
// backoffDuration computes wait time for the given 0-based retry attempt
// using exponential backoff (minMs << attempt) clamped to maxMs, with
// jitter in [minMs, ceiling] to avoid retry stampedes while keeping a
// non-zero floor between attempts. Java Iceberg uses a deterministic
// exponential backoff here; we add jitter to reduce stampede risk on
// concurrent Go writers. Backoff is client-local, so this does not
// affect cross-client interop.
//
// Inputs are trusted: readRetryConfig is responsible for normalizing
// user-supplied properties (negatives, zero, min > max).
func backoffDuration(attempt, minMs, maxMs uint) time.Duration {
if minMs == 0 {
minMs = CommitMinRetryWaitMsDefault
}
if maxMs == 0 {
maxMs = CommitMaxRetryWaitMsDefault
}
if minMs > maxMs {
minMs = maxMs
}
// Cap the shift count so the signed int64 below does not overflow
// past its operand width; overflow would just be clamped to maxMs
// anyway, so keep the math obvious instead.
if attempt > 62 {
attempt = 62
}
ceiling := int64(minMs) << attempt
if ceiling <= 0 || ceiling > int64(maxMs) {
ceiling = int64(maxMs)
}
// Jitter in [minMs, ceiling]: keeps a non-zero floor so concurrent
// writers don't all sample 0 and retry in lockstep.
//nolint:gosec // non-security randomness, jitter for retry spread
wait := int64(minMs) + rand.Int64N(ceiling-int64(minMs)+1)
return time.Duration(wait) * time.Millisecond
}
// SnapshotAsOf finds the snapshot that was current as of or right before the given timestamp.
func (t Table) SnapshotAsOf(timestampMs int64, inclusive bool) *Snapshot {
entries := slices.Collect(t.metadata.SnapshotLogs())
for i := len(entries) - 1; i >= 0; i-- {
entry := entries[i]
if (inclusive && entry.TimestampMs <= timestampMs) || (!inclusive && entry.TimestampMs < timestampMs) {
return t.metadata.SnapshotByID(entry.SnapshotID)
}
}
return nil
}
func getFiles(it iter.Seq[MetadataLogEntry]) iter.Seq[string] {
return func(yield func(string) bool) {
next, stop := iter.Pull(it)
defer stop()
for {
entry, ok := next()
if !ok {
return
}
if !yield(entry.MetadataFile) {
return
}
}
}
}
func deleteOldMetadata(fs icebergio.IO, baseMeta, newMeta Metadata) {
deleteAfterCommit := newMeta.Properties().GetBool(MetadataDeleteAfterCommitEnabledKey,
MetadataDeleteAfterCommitEnabledDefault)
if deleteAfterCommit {
removedPrevious := slices.Collect(getFiles(baseMeta.PreviousFiles()))
currentMetadata := slices.Collect(getFiles(newMeta.PreviousFiles()))
toRemove := internal.Difference(removedPrevious, currentMetadata)
for _, file := range toRemove {
if err := fs.Remove(file); err != nil {
// Log the error instead of raising it when deleting old metadata files, as an external entity like a compactor may have already deleted them
log.Printf("Warning: Failed to delete old metadata file: %s error: %v", file, err)
}
}
}
}
type ScanOption func(*Scan)
func noopOption(*Scan) {}
func WithSelectedFields(fields ...string) ScanOption {
if len(fields) == 0 || slices.Contains(fields, "*") {
return noopOption
}
return func(scan *Scan) {
scan.selectedFields = fields
}
}
func WithRowFilter(e iceberg.BooleanExpression) ScanOption {
if e == nil || e.Equals(iceberg.AlwaysTrue{}) {
return noopOption
}
return func(scan *Scan) {
scan.rowFilter = e
}
}
func WithSnapshotID(n int64) ScanOption {
if n == 0 {
return noopOption
}
return func(scan *Scan) {
scan.snapshotID = &n
scan.asOfTimestamp = nil
}
}
func WithSnapshotAsOf(timeStampMs int64) ScanOption {
return func(scan *Scan) {
scan.asOfTimestamp = &timeStampMs
scan.snapshotID = nil
}
}
func WithCaseSensitive(b bool) ScanOption {
return func(scan *Scan) {
scan.caseSensitive = b
}
}
func WithLimit(n int64) ScanOption {
if n < 0 {
return noopOption
}
return func(scan *Scan) {
scan.limit = n
}
}
// WitMaxConcurrency sets the maximum concurrency for table scan and plan
// operations. When unset it defaults to runtime.GOMAXPROCS.
func WitMaxConcurrency(n int) ScanOption {
if n <= 0 {
return noopOption
}
return func(scan *Scan) {
scan.concurrency = n
}
}
func WithOptions(opts iceberg.Properties) ScanOption {
if opts == nil {
return noopOption
}
return func(scan *Scan) {
scan.options = opts
}
}
func (t Table) Scan(opts ...ScanOption) *Scan {
s := &Scan{
metadata: t.metadata,
ioF: t.fsF,
rowFilter: iceberg.AlwaysTrue{},
selectedFields: []string{"*"},
caseSensitive: true,
limit: ScanNoLimit,
concurrency: runtime.GOMAXPROCS(0),
}
for _, opt := range opts {
opt(s)
}
s.partitionFilters = newKeyDefaultMapWrapErr(s.buildPartitionProjection)
return s
}
func New(ident Identifier, meta Metadata, metadataLocation string, fsF FSysF, cat CatalogIO) *Table {
return &Table{
identifier: ident,
metadata: meta,
metadataLocation: metadataLocation,
fsF: fsF,
cat: cat,
}
}
func NewFromLocation(
ctx context.Context,
ident Identifier,
metalocation string,
fsysF FSysF,
cat CatalogIO,
) (_ *Table, err error) {
var meta Metadata
fsys, err := fsysF(ctx)
if err != nil {
return nil, err
}
if rf, ok := fsys.(icebergio.ReadFileIO); ok {
data, err := rf.ReadFile(metalocation)
if err != nil {
return nil, err
}
if codec := metadataCompressionCodec(metalocation); codec != "" {
rc, err := newDecompressor(bytes.NewReader(data), codec)
if err != nil {
return nil, err
}
defer rc.Close()
data, err = io.ReadAll(rc)
if err != nil {
return nil, err
}
}
if meta, err = ParseMetadataBytes(data); err != nil {
return nil, err
}
} else {
f, err := fsys.Open(metalocation)
if err != nil {
return nil, err
}
defer internal.CheckedClose(f, &err)
var r io.Reader = f
if codec := metadataCompressionCodec(metalocation); codec != "" {
rc, err := newDecompressor(f, codec)
if err != nil {
return nil, err
}
defer rc.Close()
r = rc
}
if meta, err = ParseMetadata(r); err != nil {
return nil, err
}
}
return New(ident, meta, metalocation, fsysF, cat), nil
}
func metadataCompressionCodec(location string) string {
switch {
case strings.HasSuffix(location, ".gz.metadata.json") || strings.HasSuffix(location, "metadata.json.gz"):
return MetadataCompressionCodecGzip
case strings.HasSuffix(location, ".zstd.metadata.json") || strings.HasSuffix(location, "metadata.json.zstd"):
return MetadataCompressionCodecZstd
default:
return ""
}
}
func newDecompressor(r io.Reader, codec string) (io.ReadCloser, error) {
switch codec {
case MetadataCompressionCodecGzip:
return gzip.NewReader(r)
case MetadataCompressionCodecZstd:
dec, err := zstd.NewReader(r)
if err != nil {
return nil, err
}
return dec.IOReadCloser(), nil
default:
return nil, fmt.Errorf("unsupported metadata decompression codec: %s", codec)
}
}