blob: 3d93ae67d26929561127316c51131e155b3e95e5 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
import (
"context"
"errors"
"fmt"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/apache/iceberg-go"
iceio "github.com/apache/iceberg-go/io"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// readOnlyIO implements iceio.IO but NOT iceio.WriteFileIO.
// Used to verify that doCommit fails early when the FS cannot write.
type readOnlyIO struct{}
func (readOnlyIO) Open(_ string) (iceio.File, error) { return nil, errors.New("readOnlyIO: no files") }
func (readOnlyIO) Remove(_ string) error { return errors.New("readOnlyIO: read-only") }
// sequentialCatalog returns a predetermined error per CommitTable attempt.
// If attempts exceed the len(errs) slice it returns nil (success).
// When loadMeta is set, LoadTable returns that metadata instead of c.metadata.
type sequentialCatalog struct {
metadata Metadata
loadMeta Metadata // optional: returned by LoadTable if non-nil
errs []error
attempts atomic.Int32
}
func (c *sequentialCatalog) LoadTable(_ context.Context, ident Identifier) (*Table, error) {
m := c.metadata
if c.loadMeta != nil {
m = c.loadMeta
}
return New(ident, m, "",
func(context.Context) (iceio.IO, error) { return iceio.LocalFS{}, nil }, c), nil
}
func (c *sequentialCatalog) CommitTable(_ context.Context, _ Identifier, _ []Requirement, updates []Update) (Metadata, string, error) {
n := int(c.attempts.Add(1)) - 1 // 0-indexed
if n < len(c.errs) && c.errs[n] != nil {
return nil, "", c.errs[n]
}
meta, err := UpdateTableMetadata(c.metadata, updates, "")
if err != nil {
return nil, "", err
}
c.metadata = meta
return meta, "", nil
}
// flakyCatalog commits successfully only on a specified attempt number.
// Earlier attempts return the given error.
type flakyCatalog struct {
metadata Metadata
failUntilAttempt int
failWith error
attempts atomic.Int32
}
func (c *flakyCatalog) LoadTable(_ context.Context, ident Identifier) (*Table, error) {
if c.metadata == nil {
return nil, nil
}
return New(ident, c.metadata, "", func(context.Context) (iceio.IO, error) { return iceio.LocalFS{}, nil }, c), nil
}
func (c *flakyCatalog) CommitTable(ctx context.Context, ident Identifier, reqs []Requirement, updates []Update) (Metadata, string, error) {
n := c.attempts.Add(1)
if int(n) <= c.failUntilAttempt {
return nil, "", c.failWith
}
meta, err := UpdateTableMetadata(c.metadata, updates, "")
if err != nil {
return nil, "", err
}
c.metadata = meta
return meta, "", nil
}
func newRetryTestTable(t *testing.T, cat CatalogIO, props iceberg.Properties) *Table {
t.Helper()
location := filepath.ToSlash(t.TempDir())
schema := iceberg.NewSchema(0,
iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true},
)
if props == nil {
props = iceberg.Properties{}
}
props[PropertyFormatVersion] = "2"
meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec,
UnsortedSortOrder, location, props)
require.NoError(t, err)
return New(
Identifier{"db", "retry_test"},
meta, location+"/metadata/v1.metadata.json",
func(ctx context.Context) (iceio.IO, error) {
return iceio.LocalFS{}, nil
},
cat,
)
}
func TestDoCommit_SucceedsFirstTry(t *testing.T) {
cat := &flakyCatalog{}
tbl := newRetryTestTable(t, cat, nil)
cat.metadata = tbl.Metadata()
_, err := tbl.doCommit(t.Context(), nil, nil)
require.NoError(t, err)
assert.Equal(t, int32(1), cat.attempts.Load())
}
func TestDoCommit_RetriesOnCommitFailed(t *testing.T) {
cat := &flakyCatalog{
failUntilAttempt: 2,
failWith: fmt.Errorf("REST: %w", ErrCommitFailed),
}
tbl := newRetryTestTable(t, cat, iceberg.Properties{
CommitNumRetriesKey: "4",
CommitMinRetryWaitMsKey: "1",
CommitMaxRetryWaitMsKey: "2",
})
cat.metadata = tbl.Metadata()
_, err := tbl.doCommit(t.Context(), nil, nil)
require.NoError(t, err)
assert.Equal(t, int32(3), cat.attempts.Load(), "should retry 2x then succeed on 3rd")
}
func TestDoCommit_GivesUpAfterMaxRetries(t *testing.T) {
cat := &flakyCatalog{
failUntilAttempt: 100,
failWith: fmt.Errorf("REST: %w", ErrCommitFailed),
}
tbl := newRetryTestTable(t, cat, iceberg.Properties{
CommitNumRetriesKey: "2",
CommitMinRetryWaitMsKey: "1",
CommitMaxRetryWaitMsKey: "2",
})
cat.metadata = tbl.Metadata()
_, err := tbl.doCommit(t.Context(), nil, nil)
require.Error(t, err)
assert.ErrorIs(t, err, ErrCommitFailed)
// 1 initial + 2 retries = 3 attempts
assert.Equal(t, int32(3), cat.attempts.Load())
}
func TestDoCommit_DoesNotRetryUnknownStateError(t *testing.T) {
unknownErr := errors.New("500 internal server error")
cat := &flakyCatalog{
failUntilAttempt: 5,
failWith: unknownErr,
}
tbl := newRetryTestTable(t, cat, iceberg.Properties{
CommitNumRetriesKey: "10",
CommitMinRetryWaitMsKey: "1",
CommitMaxRetryWaitMsKey: "2",
})
cat.metadata = tbl.Metadata()
_, err := tbl.doCommit(t.Context(), nil, nil)
require.Error(t, err)
assert.ErrorIs(t, err, unknownErr)
// Must not retry — unknown state could mean the commit actually succeeded.
assert.Equal(t, int32(1), cat.attempts.Load())
}
func TestDoCommit_DoesNotRetryUnrelatedError(t *testing.T) {
otherErr := errors.New("network unreachable")
cat := &flakyCatalog{
failUntilAttempt: 5,
failWith: otherErr,
}
tbl := newRetryTestTable(t, cat, iceberg.Properties{
CommitNumRetriesKey: "10",
CommitMinRetryWaitMsKey: "1",
CommitMaxRetryWaitMsKey: "2",
})
cat.metadata = tbl.Metadata()
_, err := tbl.doCommit(t.Context(), nil, nil)
require.Error(t, err)
assert.ErrorIs(t, err, otherErr)
assert.Equal(t, int32(1), cat.attempts.Load())
}
func TestDoCommit_RespectsContextCancellation(t *testing.T) {
cat := &flakyCatalog{
failUntilAttempt: 100,
failWith: fmt.Errorf("REST: %w", ErrCommitFailed),
}
tbl := newRetryTestTable(t, cat, iceberg.Properties{
CommitNumRetriesKey: "10",
CommitMinRetryWaitMsKey: "50",
CommitMaxRetryWaitMsKey: "200",
})
cat.metadata = tbl.Metadata()
ctx, cancel := context.WithTimeout(t.Context(), 30*time.Millisecond)
defer cancel()
_, err := tbl.doCommit(ctx, nil, nil)
require.Error(t, err)
// Either the commit error bubbles up or context cancellation does.
// Both are acceptable outcomes — the test just verifies we don't hang.
assert.True(t, errors.Is(err, context.DeadlineExceeded) || errors.Is(err, ErrCommitFailed))
assert.Less(t, cat.attempts.Load(), int32(10), "should stop retrying after context cancels")
}
func TestDoCommit_ZeroRetriesOnlyOneAttempt(t *testing.T) {
cat := &flakyCatalog{
failUntilAttempt: 5,
failWith: fmt.Errorf("REST: %w", ErrCommitFailed),
}
tbl := newRetryTestTable(t, cat, iceberg.Properties{
CommitNumRetriesKey: "0",
CommitMinRetryWaitMsKey: "1",
CommitMaxRetryWaitMsKey: "2",
})
cat.metadata = tbl.Metadata()
_, err := tbl.doCommit(t.Context(), nil, nil)
require.Error(t, err)
assert.Equal(t, int32(1), cat.attempts.Load())
}
func TestBackoffDuration_ExponentialWithJitter(t *testing.T) {
const minMs, maxMs = 100, 60000
minWait := time.Duration(minMs) * time.Millisecond
// Attempt 0: cap == minMs, wait is exactly minMs.
for range 20 {
d := backoffDuration(0, minMs, maxMs)
assert.Equal(t, minWait, d)
}
// Attempt 3: cap = 800ms (100 << 3), wait in [minMs, 800ms].
for range 20 {
d := backoffDuration(3, minMs, maxMs)
assert.GreaterOrEqual(t, d, minWait)
assert.LessOrEqual(t, d, 800*time.Millisecond)
}
// Attempt 20: overflow protection, wait in [minMs, maxMs].
for range 20 {
d := backoffDuration(20, minMs, maxMs)
assert.GreaterOrEqual(t, d, minWait)
assert.LessOrEqual(t, d, time.Duration(maxMs)*time.Millisecond)
}
}
func TestBackoffDuration_HandlesZeroInputs(t *testing.T) {
// Zero min/max should fall back to defaults rather than return garbage.
d := backoffDuration(0, 0, 0)
assert.Equal(t, time.Duration(CommitMinRetryWaitMsDefault)*time.Millisecond, d)
// Very large attempt counts must not panic on shift; clamps to maxMs.
d = backoffDuration(100, 100, 60000)
assert.GreaterOrEqual(t, d, 100*time.Millisecond)
assert.LessOrEqual(t, d, 60000*time.Millisecond)
}
func TestReadRetryConfig_ClampsNegativeProperties(t *testing.T) {
// Negative values in properties should be replaced with defaults.
cfg := readRetryConfig(iceberg.Properties{
CommitNumRetriesKey: "-1",
CommitMinRetryWaitMsKey: "-100",
CommitMaxRetryWaitMsKey: "-1000",
CommitTotalRetryTimeoutMsKey: "-5",
})
assert.Equal(t, uint(CommitNumRetriesDefault), cfg.numRetries)
assert.Equal(t, uint(CommitMinRetryWaitMsDefault), cfg.minWaitMs)
assert.Equal(t, uint(CommitMaxRetryWaitMsDefault), cfg.maxWaitMs)
assert.Equal(t, uint(CommitTotalRetryTimeoutMsDefault), cfg.totalTimeoutMs)
}
// ---------------------------------------------------------------------------
// Fix 5 — mandatory WriteFileIO check at top of doCommit
// ---------------------------------------------------------------------------
// TestDoCommit_NonWriteFileIOReturnsError verifies that doCommit fails
// immediately when the table's file system does not implement WriteFileIO.
// A silent skip would reuse the stale manifest list — exactly the bug
// this PR was designed to fix.
func TestDoCommit_NonWriteFileIOReturnsError(t *testing.T) {
cat := &flakyCatalog{}
schema := iceberg.NewSchema(0,
iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true},
)
meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, UnsortedSortOrder, "file:///tmp/rotest",
iceberg.Properties{PropertyFormatVersion: "2"})
require.NoError(t, err)
cat.metadata = meta
tbl := New(
Identifier{"db", "ro-test"},
meta, "file:///tmp/rotest/v1.metadata.json",
func(context.Context) (iceio.IO, error) { return readOnlyIO{}, nil },
cat,
)
_, doErr := tbl.doCommit(t.Context(), nil, nil)
require.Error(t, doErr, "doCommit must fail when FS does not implement WriteFileIO")
require.ErrorIs(t, doErr, ErrWriteIORequired,
"doCommit must wrap ErrWriteIORequired so callers can detect this condition with errors.Is")
assert.Equal(t, int32(0), cat.attempts.Load(),
"CommitTable must not be called when FS check fails")
}
// ---------------------------------------------------------------------------
// Fix 6 — orphan cleanup via defer
// ---------------------------------------------------------------------------
// newOCCTable creates a table that uses the given wfs for its FS and the given
// catalog for commits. meta should include retry-config properties so that
// doCommit's retry loop allows at least one retry.
func newOCCTable(t *testing.T, meta Metadata, wfs iceio.WriteFileIO, cat CatalogIO) *Table {
t.Helper()
return New(
Identifier{"db", "occ-cleanup-test"},
meta,
"mem://default/table-location/metadata/v1.metadata.json",
func(context.Context) (iceio.IO, error) { return wfs, nil },
cat,
)
}
// newMemIOWithRetryMeta creates a test memIO and a matching table Metadata that
// includes retry-config properties, so doCommit's retry loop allows retries.
// The location matches createTestTransactionWithMemIO so they share the same
// memIO for writing manifest files.
func newMemIOWithRetryMeta(t *testing.T, spec iceberg.PartitionSpec) (*memIO, Metadata) {
t.Helper()
wfs := newMemIO(1<<20, nil)
schema := simpleSchema()
meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, "mem://default/table-location",
iceberg.Properties{
CommitNumRetriesKey: "3",
CommitMinRetryWaitMsKey: "1",
CommitMaxRetryWaitMsKey: "2",
CommitTotalRetryTimeoutMsKey: "60000",
})
require.NoError(t, err, "new metadata")
return wfs, meta
}
// TestDoCommit_OrphanCleanedOnSuccess verifies that manifest-list files
// orphaned by OCC retries are removed after a successful commit. These files
// are written during rebuild and must not leak on the happy path.
func TestDoCommit_OrphanCleanedOnSuccess(t *testing.T) {
spec := iceberg.NewPartitionSpec()
wfs, meta := newMemIOWithRetryMeta(t, spec)
// Build a transaction from the retry-enabled meta and commit via the producer.
tbl := newOCCTable(t, meta, wfs, nil)
txn := tbl.NewTransaction()
sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
sp.appendDataFile(newTestDataFile(t, spec, "mem://default/table-location/data/f.parquet", nil))
updates, reqs, err := sp.commit(context.Background())
require.NoError(t, err)
addSnap := updates[0].(*addSnapshotUpdate)
originalManifestList := addSnap.Snapshot.ManifestList
// R3: assert the producer actually wrote the manifest list (real flow,
// not pre-populated placeholder). The orphan/cleanup contract is only
// meaningful when there is a real file to clean up.
require.Contains(t, wfs.files, originalManifestList,
"producer.commit() must persist the manifest list via WriteFileIO before doCommit runs")
// Catalog: fail once with ErrCommitFailed (triggers rebuild that orphans
// originalManifestList), then succeed.
cat := &sequentialCatalog{
metadata: meta,
errs: []error{ErrCommitFailed},
}
tbl = newOCCTable(t, meta, wfs, cat)
_, err = tbl.doCommit(t.Context(), updates, reqs, withCommitBranch(MainBranch))
require.NoError(t, err, "doCommit must succeed on the second attempt")
_, stillExists := wfs.files[originalManifestList]
assert.False(t, stillExists,
"orphaned manifest list must be removed after successful commit")
// R3 (latest review): prove the LIVE committed path was preserved, not
// just that something got deleted. The catalog's CurrentSnapshot is the
// rebuilt one; its ManifestList must (a) differ from the orphan, and
// (b) still be present in the filesystem. A regression that flipped the
// cleanup loop to delete the committed path instead of the orphan would
// fail (b).
committedSnap := cat.metadata.CurrentSnapshot()
require.NotNil(t, committedSnap, "catalog must record the committed snapshot")
committedManifestList := committedSnap.ManifestList
require.NotEqual(t, originalManifestList, committedManifestList,
"committed manifest list must be the rebuilt path, not the original")
require.Contains(t, wfs.files, committedManifestList,
"live committed manifest list must be preserved by the cleanup defer")
}
// TestDoCommit_OrphanNotCleanedOnUnknownError verifies that manifest-list
// files are NOT removed when CommitTable returns an unknown non-ErrCommitFailed
// error (5xx / gateway timeout). In that case the catalog may have silently
// accepted the commit, meaning one of the "orphaned" files is actually the
// live snapshot. Deleting it would permanently corrupt the table.
func TestDoCommit_OrphanNotCleanedOnUnknownError(t *testing.T) {
spec := iceberg.NewPartitionSpec()
wfs, meta := newMemIOWithRetryMeta(t, spec)
tbl := newOCCTable(t, meta, wfs, nil)
txn := tbl.NewTransaction()
sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
sp.appendDataFile(newTestDataFile(t, spec, "mem://default/table-location/data/f.parquet", nil))
updates, reqs, err := sp.commit(context.Background())
require.NoError(t, err)
addSnap := updates[0].(*addSnapshotUpdate)
originalManifestList := addSnap.Snapshot.ManifestList
require.Contains(t, wfs.files, originalManifestList,
"producer.commit() must persist the manifest list via WriteFileIO before doCommit runs")
unknown5xxErr := errors.New("simulated 5xx: internal server error")
// Catalog: fail once (ErrCommitFailed → rebuild → orphan created),
// then return a 5xx (non-ErrCommitFailed → cleanupOrphans=false).
cat := &sequentialCatalog{
metadata: meta,
errs: []error{ErrCommitFailed, unknown5xxErr},
}
tbl = newOCCTable(t, meta, wfs, cat)
_, err = tbl.doCommit(t.Context(), updates, reqs, withCommitBranch(MainBranch))
require.ErrorIs(t, err, unknown5xxErr, "5xx error must propagate")
_, stillExists := wfs.files[originalManifestList]
assert.True(t, stillExists,
"orphaned manifest list must NOT be removed when commit outcome is unknown (5xx)")
// R3 (latest review): on unknown-state, EVERY rebuild output must be
// preserved — any of them might be the snapshot the catalog silently
// accepted. Walk wfs.files and assert there are >=2 distinct manifest-list
// entries (the original + at least one rebuild). A regression that
// flipped cleanupOrphans=true on this path would leave only the original.
manifestListCount := 0
for name := range wfs.files {
if strings.HasSuffix(name, ".avro") && strings.Contains(name, "snap-") {
manifestListCount++
}
}
require.GreaterOrEqual(t, manifestListCount, 2,
"on unknown 5xx every rebuild's manifest list must be preserved (one may be the live commit)")
}
// TestDoCommit_OrphanCleanedOnCommitDiverged verifies that manifest-list files
// orphaned by rebuild attempts are removed when ErrCommitDiverged is returned
// by a conflict validator. Diverged commits are terminal (no retry), and since
// neither of the orphaned files was ever accepted by the catalog, they are safe
// to delete. The defer cleanup runs with cleanupOrphans=true on this path.
func TestDoCommit_OrphanCleanedOnCommitDiverged(t *testing.T) {
spec := iceberg.NewPartitionSpec()
wfs, meta := newMemIOWithRetryMeta(t, spec)
// freshMeta has a snapshot on MainBranch so validators run on the retry attempt.
freshID := int64(42)
freshMeta := newConflictTestMetadataWithProps(t, &freshID, iceberg.Properties{
CommitNumRetriesKey: "3",
CommitMinRetryWaitMsKey: "1",
CommitMaxRetryWaitMsKey: "2",
CommitTotalRetryTimeoutMsKey: "60000",
})
tbl := newOCCTable(t, meta, wfs, nil)
txn := tbl.NewTransaction()
sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
sp.appendDataFile(newTestDataFile(t, spec, "mem://default/table-location/data/f.parquet", nil))
updates, reqs, err := sp.commit(context.Background())
require.NoError(t, err)
addSnap := updates[0].(*addSnapshotUpdate)
originalManifestList := addSnap.Snapshot.ManifestList
require.Contains(t, wfs.files, originalManifestList,
"producer.commit() must persist the manifest list via WriteFileIO before doCommit runs")
// Catalog: fail once (ErrCommitFailed → rebuild → orphan created).
// LoadTable returns freshMeta (has branch snapshot → validators run on retry).
// Validator returns ErrCommitDiverged immediately.
cat := &sequentialCatalog{
metadata: meta,
loadMeta: freshMeta,
errs: []error{ErrCommitFailed},
}
tbl = newOCCTable(t, meta, wfs, cat)
divergedValidator := func(*conflictContext) error { return ErrCommitDiverged }
_, err = tbl.doCommit(t.Context(), updates, reqs,
withCommitBranch(MainBranch),
withCommitValidators(divergedValidator),
)
require.ErrorIs(t, err, ErrCommitDiverged)
// The defer must have fired with cleanupOrphans=true and removed the orphan.
_, stillExists := wfs.files[originalManifestList]
assert.False(t, stillExists,
"orphaned manifest list must be removed even on ErrCommitDiverged: "+
"the file was never accepted by the catalog so it is safe to delete")
}
// TestDoCommit_OrphanCleanedOnRetriesExhausted verifies that when every retry
// attempt fails with ErrCommitFailed and the loop exits with the budget
// exhausted, the defer still fires with cleanupOrphans=true. None of the
// orphaned manifest-list files were ever accepted by the catalog, so they are
// safe to delete on this terminal exit.
func TestDoCommit_OrphanCleanedOnRetriesExhausted(t *testing.T) {
spec := iceberg.NewPartitionSpec()
wfs, meta := newMemIOWithRetryMeta(t, spec)
tbl := newOCCTable(t, meta, wfs, nil)
txn := tbl.NewTransaction()
sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
sp.appendDataFile(newTestDataFile(t, spec, "mem://default/table-location/data/f.parquet", nil))
updates, reqs, err := sp.commit(context.Background())
require.NoError(t, err)
addSnap := updates[0].(*addSnapshotUpdate)
originalManifestList := addSnap.Snapshot.ManifestList
require.Contains(t, wfs.files, originalManifestList,
"producer.commit() must persist the manifest list via WriteFileIO before doCommit runs")
// numRetries=3 → 4 attempts; every attempt fails with ErrCommitFailed.
cat := &sequentialCatalog{
metadata: meta,
errs: []error{ErrCommitFailed, ErrCommitFailed, ErrCommitFailed, ErrCommitFailed},
}
tbl = newOCCTable(t, meta, wfs, cat)
_, err = tbl.doCommit(t.Context(), updates, reqs, withCommitBranch(MainBranch))
require.ErrorIs(t, err, ErrCommitFailed,
"retries exhausted: terminal error must be ErrCommitFailed")
_, stillExists := wfs.files[originalManifestList]
assert.False(t, stillExists,
"orphaned manifest list must be removed when retries are exhausted with ErrCommitFailed: "+
"none of the retry attempts were accepted, so all orphans are safe to delete")
}
// ---------------------------------------------------------------------------
// Fix 7 (R4) — retry-progression: every fix exercised end-to-end
// ---------------------------------------------------------------------------
// progressingRebuildCatalog grafts a real peer commit (with a readable
// manifest-list file in wfs.files) onto its tracked metadata between every
// failed retry, then accepts on the final attempt. Each peer commit:
//
// - advances LastSequenceNumber and the branch head,
// - publishes a Summary increment ("total-data-files"+1, etc.) so the
// next retry's rebuild closure observes an evolving freshParent.Summary,
// - writes a real (empty) manifest-list file at a unique path so
// freshParent.Manifests(fio) succeeds inside rebuildFn.
//
// This is the helper reviewer R4 asked for: a single test that drives ≥2
// ErrCommitFailed retries with a freshMeta that progresses between attempts,
// so the rebuild closure is exercised for real (not just the retry-loop
// refresh path with nil updates). The existing headTrackingCatalog cannot
// do this — it only advances metadata on a SUCCESSFUL CommitTable call.
type progressingRebuildCatalog struct {
metadata Metadata
wfs *memIO
location string
branch string
failTimes int
commitTableCalls atomic.Int32
// Per-call captures (used by assertions).
loadedLastSeqNums []int64 // freshMeta.LastSequenceNumber() at each LoadTable
observedManifestLists []string // ManifestList path submitted at each CommitTable
observedSeqNums []int64 // SequenceNumber submitted at each CommitTable
committedSnapshot *Snapshot // populated when CommitTable accepts
peerCount int // running peer-commit counter (= total-data-files contributed by peers)
}
func (c *progressingRebuildCatalog) LoadTable(_ context.Context, ident Identifier) (*Table, error) {
c.loadedLastSeqNums = append(c.loadedLastSeqNums, c.metadata.LastSequenceNumber())
return New(ident, c.metadata, "",
func(context.Context) (iceio.IO, error) { return c.wfs, nil }, c), nil
}
func (c *progressingRebuildCatalog) CommitTable(_ context.Context, _ Identifier, _ []Requirement, updates []Update) (Metadata, string, error) {
n := c.commitTableCalls.Add(1)
// Capture the manifest-list path and sequence number being submitted
// this attempt — the test asserts each retry produces a unique path
// and a strictly advancing seq number.
for _, u := range updates {
su, ok := u.(*addSnapshotUpdate)
if !ok {
continue
}
c.observedManifestLists = append(c.observedManifestLists, su.Snapshot.ManifestList)
c.observedSeqNums = append(c.observedSeqNums, su.Snapshot.SequenceNumber)
break
}
if int(n) <= c.failTimes {
c.graftPeer()
return nil, "", ErrCommitFailed
}
// Accept: capture the submitted snapshot for assertions and apply
// the updates so c.metadata.CurrentSnapshot() reflects the commit.
for _, u := range updates {
if su, ok := u.(*addSnapshotUpdate); ok {
snapCopy := *su.Snapshot
c.committedSnapshot = &snapCopy
break
}
}
builder, err := MetadataBuilderFromBase(c.metadata, "")
if err != nil {
return nil, "", err
}
for _, u := range updates {
if applyErr := u.Apply(builder); applyErr != nil {
return nil, "", applyErr
}
}
out, err := builder.Build()
if err != nil {
return nil, "", err
}
c.metadata = out
return out, "", nil
}
// graftPeer adds a peer snapshot (with a real, readable empty manifest list)
// to c.metadata and advances LastSequenceNumber. The peer's Summary contains
// "total-data-files" = c.peerCount (cumulative), so the producer's rebuild
// closure observes summary chaining when it rebases against this freshParent.
func (c *progressingRebuildCatalog) graftPeer() {
c.peerCount++
peerID := int64(9_000) + int64(c.peerCount)
peerSeq := c.metadata.LastSequenceNumber() + 1
manifestListPath := fmt.Sprintf("%s/metadata/peer-%d-manifest-list.avro", c.location, peerID)
out, err := c.wfs.Create(manifestListPath)
if err != nil {
panic(err)
}
if writeErr := iceberg.WriteManifestList(2, out, peerID, nil, &peerSeq, 0, nil); writeErr != nil {
panic(writeErr)
}
if closeErr := out.Close(); closeErr != nil {
panic(closeErr)
}
parent := c.metadata.SnapshotByName(c.branch)
var parentID *int64
if parent != nil {
id := parent.SnapshotID
parentID = &id
}
builder, err := MetadataBuilderFromBase(c.metadata, "")
if err != nil {
panic(err)
}
count := strconv.Itoa(c.peerCount)
if addErr := builder.AddSnapshot(&Snapshot{
SnapshotID: peerID,
ParentSnapshotID: parentID,
SequenceNumber: peerSeq,
TimestampMs: c.metadata.LastUpdatedMillis() + 1,
ManifestList: manifestListPath,
Summary: &Summary{
Operation: OpAppend,
Properties: iceberg.Properties{
"total-data-files": count,
"total-records": count,
"total-files-size": count,
"total-delete-files": "0",
"total-position-deletes": "0",
"total-equality-deletes": "0",
},
},
}); addErr != nil {
panic(addErr)
}
if refErr := builder.SetSnapshotRef(c.branch, peerID, BranchRef); refErr != nil {
panic(refErr)
}
out2, err := builder.Build()
if err != nil {
panic(err)
}
c.metadata = out2
}
// TestDoCommit_RetryProgressesFreshMeta is the end-to-end regression test
// for reviewer R4. It drives doCommit through ≥2 ErrCommitFailed retries
// with a freshMeta that progresses between attempts, and asserts every
// PR fix is exercised at once:
//
// - Fix 3 (newSeq from freshMeta): the rebuilt SequenceNumber observed
// on each retry strictly advances and matches freshMeta.LastSequenceNumber+1.
// - Fix 2 (summary chained against fresh parent): the final committed
// summary's total-data-files == peerCount + this producer's adds.
// - Fix 6 (orphan defer cleanup): the original (attempt-0) and intermediate
// rebuild manifest-list paths are deleted; the live committed path is
// preserved in wfs.files.
// - "Orphan list grows by N": each retry submits a DISTINCT ManifestList
// path (proves the rebuild closure rewrote the list every attempt).
//
// A regression that cached freshMeta across retries, skipped the rebuild,
// or carried over the captured-attempt-0 summary would fail one or more
// of these assertions.
func TestDoCommit_RetryProgressesFreshMeta(t *testing.T) {
spec := iceberg.NewPartitionSpec()
wfs, meta := newMemIOWithRetryMeta(t, spec)
// Build a real producer commit (with a real rebuildManifestList closure).
tbl := newOCCTable(t, meta, wfs, nil)
txn := tbl.NewTransaction()
sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
sp.appendDataFile(newTestDataFile(t, spec, "mem://default/table-location/data/f.parquet", nil))
updates, reqs, err := sp.commit(context.Background())
require.NoError(t, err)
addSnap := updates[0].(*addSnapshotUpdate)
originalManifestList := addSnap.Snapshot.ManifestList
require.Contains(t, wfs.files, originalManifestList,
"producer must persist the attempt-0 manifest list before doCommit runs")
cat := &progressingRebuildCatalog{
metadata: meta,
wfs: wfs,
location: "mem://default/table-location",
branch: MainBranch,
failTimes: 2, // 2 fails + 1 success = 3 CommitTable calls
}
tbl = newOCCTable(t, meta, wfs, cat)
_, err = tbl.doCommit(context.Background(), updates, reqs, withCommitBranch(MainBranch))
require.NoError(t, err, "doCommit must succeed on the third attempt")
// (a) commitTableCalls: 2 fails + 1 success.
require.Equal(t, int32(3), cat.commitTableCalls.Load(),
"failTimes=2 → 2 ErrCommitFailed + 1 success = 3 CommitTable calls")
// (b) Orphan list grows by N: every retry's submitted ManifestList path
// must be DISTINCT. The first equals the producer's original; attempts
// 2 and 3 are rebuilt paths from the rebuild closure.
require.Len(t, cat.observedManifestLists, 3)
seen := make(map[string]struct{}, 3)
for i, p := range cat.observedManifestLists {
require.NotEmpty(t, p, "attempt %d submitted an empty manifest list path", i)
_, dup := seen[p]
require.False(t, dup,
"every retry must submit a UNIQUE manifest list path (orphan list grows by N): %v",
cat.observedManifestLists)
seen[p] = struct{}{}
}
require.Equal(t, originalManifestList, cat.observedManifestLists[0],
"attempt 0 must submit the producer's original manifest list")
// (c) Each rebuild observed a different LastSequenceNumber. LoadTable
// is called once per retry (attempts 1 and 2 here — attempt 0 uses
// the writer's own base), so we expect 2 entries that strictly advance.
require.Len(t, cat.loadedLastSeqNums, 2,
"LoadTable must be called once per retry attempt (=2)")
require.Greater(t, cat.loadedLastSeqNums[1], cat.loadedLastSeqNums[0],
"freshMeta.LastSequenceNumber() must strictly advance across retries: %v",
cat.loadedLastSeqNums)
// (d) Fix 3 — newSeq derives from freshMeta.LastSequenceNumber()+1 on
// each retry. Submitted seq numbers must strictly increase across the
// 3 attempts, AND attempts 1 and 2 (the rebuilds) must equal the
// freshMeta.LastSequenceNumber observed by their respective LoadTable.
require.Len(t, cat.observedSeqNums, 3)
require.Greater(t, cat.observedSeqNums[1], cat.observedSeqNums[0],
"rebuild attempt 1 must submit a strictly higher seq num than attempt 0: %v",
cat.observedSeqNums)
require.Greater(t, cat.observedSeqNums[2], cat.observedSeqNums[1],
"rebuild attempt 2 must submit a strictly higher seq num than attempt 1: %v",
cat.observedSeqNums)
require.Equal(t, cat.loadedLastSeqNums[0]+1, cat.observedSeqNums[1],
"attempt-1 rebuild seq must equal freshMeta.LastSequenceNumber()+1 from its LoadTable")
require.Equal(t, cat.loadedLastSeqNums[1]+1, cat.observedSeqNums[2],
"attempt-2 rebuild seq must equal freshMeta.LastSequenceNumber()+1 from its LoadTable")
// (e) Fix 2 — summary chains on top of an evolving freshParent.
// 2 peer commits (each contributing total-data-files=1, then =2) +
// this producer's 1 file → final total-data-files == 3.
require.NotNil(t, cat.committedSnapshot, "committed snapshot must be captured")
require.NotNil(t, cat.committedSnapshot.Summary)
gotDataFiles := cat.committedSnapshot.Summary.Properties.GetInt("total-data-files", -1)
require.Equal(t, 3, gotDataFiles,
"committed total-data-files must = peer cumulative (2) + producer (1) = 3; "+
"a regression that ignored freshParent.Summary would publish 1")
// (f) Fix 6 — defer cleanup: the live committed manifest list is
// preserved in wfs.files; the original and intermediate rebuilds
// were orphans and have been removed.
committedML := cat.committedSnapshot.ManifestList
require.Equal(t, cat.observedManifestLists[2], committedML,
"the third (accepted) submission must be the live committed manifest list")
require.Contains(t, wfs.files, committedML,
"live committed manifest list must be preserved by the cleanup defer")
require.NotContains(t, wfs.files, originalManifestList,
"attempt-0 manifest list must be cleaned as orphan after success")
require.NotContains(t, wfs.files, cat.observedManifestLists[1],
"attempt-1 rebuild manifest list must be cleaned as orphan after success")
}