blob: 232d8ebb699a91d8efd341f7fbb94b560b6867c3 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
// Wiring smokes for the PR 2.4 doCommit pre-flight. These prove the
// transaction's validators slice is actually drained before
// cat.CommitTable, and that ErrCommitDiverged is terminal.
// Behavioral rejection coverage of the individual validators lives
// in conflict_validation_test.go (PR 2.3).
import (
"context"
"errors"
"sync/atomic"
"testing"
"github.com/apache/iceberg-go"
iceio "github.com/apache/iceberg-go/io"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type countingCatalog struct {
metadata Metadata
attempts atomic.Int32
}
func (c *countingCatalog) LoadTable(context.Context, Identifier) (*Table, error) {
return nil, nil
}
func (c *countingCatalog) CommitTable(_ context.Context, _ Identifier, _ []Requirement, updates []Update) (Metadata, string, error) {
c.attempts.Add(1)
meta, err := UpdateTableMetadata(c.metadata, updates, "")
if err != nil {
return nil, "", err
}
c.metadata = meta
return meta, "", nil
}
func newValidationTestTable(t *testing.T, props iceberg.Properties) (*Table, *countingCatalog) {
t.Helper()
schema := iceberg.NewSchema(0,
iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true},
)
merged := iceberg.Properties{PropertyFormatVersion: "2"}
for k, v := range props {
merged[k] = v
}
meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, UnsortedSortOrder, "file:///tmp/validation-test", merged)
require.NoError(t, err)
// Graft a synthetic snapshot so the main branch has a head —
// doCommit's pre-flight short-circuits on missing-branch, and we
// want the validator to actually run.
head := int64(100)
builder, err := MetadataBuilderFromBase(meta, "")
require.NoError(t, err)
require.NoError(t, builder.AddSnapshot(&Snapshot{
SnapshotID: head,
SequenceNumber: 1,
TimestampMs: meta.LastUpdatedMillis() + 1,
Summary: &Summary{Operation: OpAppend},
}))
require.NoError(t, builder.SetSnapshotRef(MainBranch, head, BranchRef))
meta, err = builder.Build()
require.NoError(t, err)
cat := &countingCatalog{metadata: meta}
tbl := New(Identifier{"db", "validation"}, meta, "file:///tmp/validation-test/metadata/v1.metadata.json",
func(context.Context) (iceio.IO, error) { return iceio.LocalFS{}, nil }, cat)
return tbl, cat
}
// TestDoCommit_ValidatorRejectsTerminatesPreFlight proves the pre-
// flight wiring: when a validator rejects with a wrapped
// ErrCommitFailed sentinel, doCommit surfaces it and never reaches
// cat.CommitTable. The sentinel is structurally retryable; PR 2.5
// will add refresh-and-replay that actually makes the retry
// meaningful.
func TestDoCommit_ValidatorRejectsTerminatesPreFlight(t *testing.T) {
tbl, cat := newValidationTestTable(t, nil)
reject := func(cc *conflictContext) error { return ErrConflictingDataFiles }
_, err := tbl.doCommit(context.Background(), nil, nil,
withCommitBranch(MainBranch),
withCommitValidators(reject))
require.Error(t, err)
assert.ErrorIs(t, err, ErrConflictingDataFiles)
assert.ErrorIs(t, err, ErrCommitFailed)
assert.Equal(t, int32(0), cat.attempts.Load(),
"validator rejected before CommitTable was called")
}
// TestDoCommit_DivergedSentinelIsTerminal: ErrCommitDiverged does not
// wrap ErrCommitFailed, so doCommit returns it without entering the
// retry loop. Mirrors Java's ValidationException.
func TestDoCommit_DivergedSentinelIsTerminal(t *testing.T) {
tbl, cat := newValidationTestTable(t, iceberg.Properties{
CommitNumRetriesKey: "4",
CommitMinRetryWaitMsKey: "1",
CommitMaxRetryWaitMsKey: "2",
})
diverge := func(cc *conflictContext) error { return ErrCommitDiverged }
_, err := tbl.doCommit(context.Background(), nil, nil,
withCommitBranch(MainBranch),
withCommitValidators(diverge))
require.Error(t, err)
assert.ErrorIs(t, err, ErrCommitDiverged)
assert.False(t, errors.Is(err, ErrCommitFailed), "divergence must not be retryable")
assert.Equal(t, int32(0), cat.attempts.Load(),
"diverged validator must not retry or call CommitTable")
}
// TestRowDelta_RegistersValidatorOnTransaction proves RowDelta.Commit
// registers its validator on txn.validators so the eventual
// Transaction.Commit → doCommit picks it up.
func TestRowDelta_RegistersValidatorOnTransaction(t *testing.T) {
tbl, _ := newValidationTestTable(t, nil)
txn := tbl.NewTransaction()
rd := txn.NewRowDelta(nil)
rd.AddDeletes(newTestPosDeleteFile(t, "pos-1.parquet", nil))
require.NoError(t, rd.Commit(context.Background()))
require.NotEmpty(t, txn.validators,
"RowDelta.Commit must append at least one validator to the transaction")
}