blob: ab976a12da32a2b46b7317dc01e1705d7e20c297 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
import (
"context"
"errors"
"fmt"
"path/filepath"
"sync/atomic"
"testing"
"time"
"github.com/apache/iceberg-go"
iceio "github.com/apache/iceberg-go/io"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// flakyCatalog commits successfully only on a specified attempt number.
// Earlier attempts return the given error.
type flakyCatalog struct {
metadata Metadata
failUntilAttempt int
failWith error
attempts atomic.Int32
}
func (c *flakyCatalog) LoadTable(_ context.Context, ident Identifier) (*Table, error) {
if c.metadata == nil {
return nil, nil
}
return New(ident, c.metadata, "", func(context.Context) (iceio.IO, error) { return iceio.LocalFS{}, nil }, c), nil
}
func (c *flakyCatalog) CommitTable(ctx context.Context, ident Identifier, reqs []Requirement, updates []Update) (Metadata, string, error) {
n := c.attempts.Add(1)
if int(n) <= c.failUntilAttempt {
return nil, "", c.failWith
}
meta, err := UpdateTableMetadata(c.metadata, updates, "")
if err != nil {
return nil, "", err
}
c.metadata = meta
return meta, "", nil
}
func newRetryTestTable(t *testing.T, cat CatalogIO, props iceberg.Properties) *Table {
t.Helper()
location := filepath.ToSlash(t.TempDir())
schema := iceberg.NewSchema(0,
iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true},
)
if props == nil {
props = iceberg.Properties{}
}
props[PropertyFormatVersion] = "2"
meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec,
UnsortedSortOrder, location, props)
require.NoError(t, err)
return New(
Identifier{"db", "retry_test"},
meta, location+"/metadata/v1.metadata.json",
func(ctx context.Context) (iceio.IO, error) {
return iceio.LocalFS{}, nil
},
cat,
)
}
func TestDoCommit_SucceedsFirstTry(t *testing.T) {
cat := &flakyCatalog{}
tbl := newRetryTestTable(t, cat, nil)
cat.metadata = tbl.Metadata()
_, err := tbl.doCommit(t.Context(), nil, nil)
require.NoError(t, err)
assert.Equal(t, int32(1), cat.attempts.Load())
}
func TestDoCommit_RetriesOnCommitFailed(t *testing.T) {
cat := &flakyCatalog{
failUntilAttempt: 2,
failWith: fmt.Errorf("REST: %w", ErrCommitFailed),
}
tbl := newRetryTestTable(t, cat, iceberg.Properties{
CommitNumRetriesKey: "4",
CommitMinRetryWaitMsKey: "1",
CommitMaxRetryWaitMsKey: "2",
})
cat.metadata = tbl.Metadata()
_, err := tbl.doCommit(t.Context(), nil, nil)
require.NoError(t, err)
assert.Equal(t, int32(3), cat.attempts.Load(), "should retry 2x then succeed on 3rd")
}
func TestDoCommit_GivesUpAfterMaxRetries(t *testing.T) {
cat := &flakyCatalog{
failUntilAttempt: 100,
failWith: fmt.Errorf("REST: %w", ErrCommitFailed),
}
tbl := newRetryTestTable(t, cat, iceberg.Properties{
CommitNumRetriesKey: "2",
CommitMinRetryWaitMsKey: "1",
CommitMaxRetryWaitMsKey: "2",
})
cat.metadata = tbl.Metadata()
_, err := tbl.doCommit(t.Context(), nil, nil)
require.Error(t, err)
assert.ErrorIs(t, err, ErrCommitFailed)
// 1 initial + 2 retries = 3 attempts
assert.Equal(t, int32(3), cat.attempts.Load())
}
func TestDoCommit_DoesNotRetryUnknownStateError(t *testing.T) {
unknownErr := errors.New("500 internal server error")
cat := &flakyCatalog{
failUntilAttempt: 5,
failWith: unknownErr,
}
tbl := newRetryTestTable(t, cat, iceberg.Properties{
CommitNumRetriesKey: "10",
CommitMinRetryWaitMsKey: "1",
CommitMaxRetryWaitMsKey: "2",
})
cat.metadata = tbl.Metadata()
_, err := tbl.doCommit(t.Context(), nil, nil)
require.Error(t, err)
assert.ErrorIs(t, err, unknownErr)
// Must not retry — unknown state could mean the commit actually succeeded.
assert.Equal(t, int32(1), cat.attempts.Load())
}
func TestDoCommit_DoesNotRetryUnrelatedError(t *testing.T) {
otherErr := errors.New("network unreachable")
cat := &flakyCatalog{
failUntilAttempt: 5,
failWith: otherErr,
}
tbl := newRetryTestTable(t, cat, iceberg.Properties{
CommitNumRetriesKey: "10",
CommitMinRetryWaitMsKey: "1",
CommitMaxRetryWaitMsKey: "2",
})
cat.metadata = tbl.Metadata()
_, err := tbl.doCommit(t.Context(), nil, nil)
require.Error(t, err)
assert.ErrorIs(t, err, otherErr)
assert.Equal(t, int32(1), cat.attempts.Load())
}
func TestDoCommit_RespectsContextCancellation(t *testing.T) {
cat := &flakyCatalog{
failUntilAttempt: 100,
failWith: fmt.Errorf("REST: %w", ErrCommitFailed),
}
tbl := newRetryTestTable(t, cat, iceberg.Properties{
CommitNumRetriesKey: "10",
CommitMinRetryWaitMsKey: "50",
CommitMaxRetryWaitMsKey: "200",
})
cat.metadata = tbl.Metadata()
ctx, cancel := context.WithTimeout(t.Context(), 30*time.Millisecond)
defer cancel()
_, err := tbl.doCommit(ctx, nil, nil)
require.Error(t, err)
// Either the commit error bubbles up or context cancellation does.
// Both are acceptable outcomes — the test just verifies we don't hang.
assert.True(t, errors.Is(err, context.DeadlineExceeded) || errors.Is(err, ErrCommitFailed))
assert.Less(t, cat.attempts.Load(), int32(10), "should stop retrying after context cancels")
}
func TestDoCommit_ZeroRetriesOnlyOneAttempt(t *testing.T) {
cat := &flakyCatalog{
failUntilAttempt: 5,
failWith: fmt.Errorf("REST: %w", ErrCommitFailed),
}
tbl := newRetryTestTable(t, cat, iceberg.Properties{
CommitNumRetriesKey: "0",
CommitMinRetryWaitMsKey: "1",
CommitMaxRetryWaitMsKey: "2",
})
cat.metadata = tbl.Metadata()
_, err := tbl.doCommit(t.Context(), nil, nil)
require.Error(t, err)
assert.Equal(t, int32(1), cat.attempts.Load())
}
func TestBackoffDuration_ExponentialWithJitter(t *testing.T) {
const minMs, maxMs = 100, 60000
minWait := time.Duration(minMs) * time.Millisecond
// Attempt 0: cap == minMs, wait is exactly minMs.
for range 20 {
d := backoffDuration(0, minMs, maxMs)
assert.Equal(t, minWait, d)
}
// Attempt 3: cap = 800ms (100 << 3), wait in [minMs, 800ms].
for range 20 {
d := backoffDuration(3, minMs, maxMs)
assert.GreaterOrEqual(t, d, minWait)
assert.LessOrEqual(t, d, 800*time.Millisecond)
}
// Attempt 20: overflow protection, wait in [minMs, maxMs].
for range 20 {
d := backoffDuration(20, minMs, maxMs)
assert.GreaterOrEqual(t, d, minWait)
assert.LessOrEqual(t, d, time.Duration(maxMs)*time.Millisecond)
}
}
func TestBackoffDuration_HandlesZeroInputs(t *testing.T) {
// Zero min/max should fall back to defaults rather than return garbage.
d := backoffDuration(0, 0, 0)
assert.Equal(t, time.Duration(CommitMinRetryWaitMsDefault)*time.Millisecond, d)
// Very large attempt counts must not panic on shift; clamps to maxMs.
d = backoffDuration(100, 100, 60000)
assert.GreaterOrEqual(t, d, 100*time.Millisecond)
assert.LessOrEqual(t, d, 60000*time.Millisecond)
}
func TestReadRetryConfig_ClampsNegativeProperties(t *testing.T) {
// Negative values in properties should be replaced with defaults.
cfg := readRetryConfig(iceberg.Properties{
CommitNumRetriesKey: "-1",
CommitMinRetryWaitMsKey: "-100",
CommitMaxRetryWaitMsKey: "-1000",
CommitTotalRetryTimeoutMsKey: "-5",
})
assert.Equal(t, uint(CommitNumRetriesDefault), cfg.numRetries)
assert.Equal(t, uint(CommitMinRetryWaitMsDefault), cfg.minWaitMs)
assert.Equal(t, uint(CommitMaxRetryWaitMsDefault), cfg.maxWaitMs)
assert.Equal(t, uint(CommitTotalRetryTimeoutMsDefault), cfg.totalTimeoutMs)
}