blob: cdd88d2c36132aa5c15c4768519efbf76d78f28c [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
import (
"context"
"errors"
"testing"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/iceberg-go"
iceio "github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table/dv"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDVWritePathProducesReadableOutput(t *testing.T) {
mb := newPositionDeleteUnpartitionedMetadata(t, 3)
fs := iceio.NewMemFS()
batch := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[
{"file_path": "s3://bucket/data/file-001.parquet", "pos": 1},
{"file_path": "s3://bucket/data/file-001.parquet", "pos": 3},
{"file_path": "s3://bucket/data/file-001.parquet", "pos": 5},
{"file_path": "s3://bucket/data/file-002.parquet", "pos": 0},
{"file_path": "s3://bucket/data/file-002.parquet", "pos": 10}
]`)
itr := func(yield func(arrow.RecordBatch, error) bool) {
batch.Retain()
yield(batch, nil)
}
seq := positionDeleteRecordsToDataFiles(context.Background(), "mem://test", mb, nil,
recordWritingArgs{
sc: PositionalDeleteArrowSchema,
itr: itr,
fs: fs,
})
var dataFiles []iceberg.DataFile
for df, err := range seq {
require.NoError(t, err)
dataFiles = append(dataFiles, df)
}
require.Len(t, dataFiles, 2)
df1 := dataFiles[0]
assert.Equal(t, iceberg.PuffinFile, df1.FileFormat())
assert.Equal(t, int64(3), df1.Count())
assert.Equal(t, "s3://bucket/data/file-001.parquet", *df1.ReferencedDataFile())
assert.NotNil(t, df1.ContentOffset())
assert.NotNil(t, df1.ContentSizeInBytes())
df2 := dataFiles[1]
assert.Equal(t, iceberg.PuffinFile, df2.FileFormat())
assert.Equal(t, int64(2), df2.Count())
assert.Equal(t, "s3://bucket/data/file-002.parquet", *df2.ReferencedDataFile())
bm1, err := dv.ReadDV(fs, df1)
require.NoError(t, err)
assert.Equal(t, int64(3), bm1.Cardinality())
assert.True(t, bm1.Contains(1))
assert.True(t, bm1.Contains(3))
assert.True(t, bm1.Contains(5))
assert.False(t, bm1.Contains(2))
bm2, err := dv.ReadDV(fs, df2)
require.NoError(t, err)
assert.Equal(t, int64(2), bm2.Cardinality())
assert.True(t, bm2.Contains(0))
assert.True(t, bm2.Contains(10))
}
func TestDVWritePathV2UsesParquetPositionDeletes(t *testing.T) {
mb := newPositionDeleteUnpartitionedMetadata(t, 2)
batch := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[
{"file_path": "s3://bucket/data/file-001.parquet", "pos": 1}
]`)
itr := func(yield func(arrow.RecordBatch, error) bool) {
batch.Retain()
yield(batch, nil)
}
seq := positionDeleteRecordsToDataFiles(context.Background(), t.TempDir(), mb, nil,
recordWritingArgs{
sc: PositionalDeleteArrowSchema,
itr: itr,
fs: iceio.LocalFS{},
})
var dataFiles []iceberg.DataFile
for df, err := range seq {
require.NoError(t, err)
dataFiles = append(dataFiles, df)
}
require.NotEmpty(t, dataFiles, "v2 position-delete write should produce at least one DataFile")
assert.Equal(t, iceberg.ParquetFile, dataFiles[0].FileFormat(),
"v2 must use Parquet position-delete files, not Puffin DVs")
}
func TestDVWritePathV3PartitionedUsesParquetPositionDeletes(t *testing.T) {
// DV+partitioned support is deferred; v3 partitioned writes must still
// route through the Parquet position-delete writer.
mb := newPositionDeletePartitionedMetadata(t, 3)
const path = "file://namespace/age_bucket=1/test.parquet"
partitions := map[string]partitionContext{
path: {partitionData: map[int]any{iceberg.PartitionDataIDStart: 1}, specID: 0},
}
batch := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
`[{"file_path": "`+path+`", "pos": 0}]`)
itr := func(yield func(arrow.RecordBatch, error) bool) {
batch.Retain()
yield(batch, nil)
}
seq := positionDeleteRecordsToDataFiles(context.Background(), t.TempDir(), mb, partitions,
recordWritingArgs{
sc: PositionalDeleteArrowSchema,
itr: itr,
fs: iceio.LocalFS{},
})
var dataFiles []iceberg.DataFile
for df, err := range seq {
require.NoError(t, err)
dataFiles = append(dataFiles, df)
}
require.NotEmpty(t, dataFiles, "v3 partitioned write should still produce Parquet position deletes")
assert.Equal(t, iceberg.ParquetFile, dataFiles[0].FileFormat(),
"v3 partitioned write must fall back to Parquet until DV+partitioned support lands")
}
// TestDVWritePathCancelledContext verifies that when the iterator surfaces a
// context cancellation error (e.g., the upstream scan honored a cancelled
// ctx), the DV producer propagates it instead of silently producing partial
// output.
func TestDVWritePathCancelledContext(t *testing.T) {
mb := newPositionDeleteUnpartitionedMetadata(t, 3)
fs := iceio.NewMemFS()
ctx, cancel := context.WithCancel(context.Background())
cancel()
itr := func(yield func(arrow.RecordBatch, error) bool) {
yield(nil, ctx.Err())
}
seq := positionDeleteRecordsToDataFiles(ctx, "mem://test", mb, nil,
recordWritingArgs{
sc: PositionalDeleteArrowSchema,
itr: itr,
fs: fs,
})
var sawErr bool
var dataFiles []iceberg.DataFile
for df, err := range seq {
if err != nil {
sawErr = true
assert.ErrorIs(t, err, context.Canceled)
continue
}
dataFiles = append(dataFiles, df)
}
assert.True(t, sawErr, "expected cancellation error from iterator to propagate")
assert.Empty(t, dataFiles, "no DataFiles should be produced when the iterator errors")
}
// TestDVWritePathPropagatesMidStreamError verifies that an error surfaced
// after at least one good batch is propagated to the consumer rather than
// being swallowed and producing a partial Puffin file.
func TestDVWritePathPropagatesMidStreamError(t *testing.T) {
mb := newPositionDeleteUnpartitionedMetadata(t, 3)
fs := iceio.NewMemFS()
good := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
`[{"file_path": "s3://bucket/data/file-001.parquet", "pos": 1}]`)
wantErr := errors.New("simulated upstream failure")
itr := func(yield func(arrow.RecordBatch, error) bool) {
good.Retain()
if !yield(good, nil) {
return
}
yield(nil, wantErr)
}
seq := positionDeleteRecordsToDataFiles(context.Background(), "mem://test", mb, nil,
recordWritingArgs{
sc: PositionalDeleteArrowSchema,
itr: itr,
fs: fs,
})
var sawErr bool
var dataFiles []iceberg.DataFile
for df, err := range seq {
if err != nil {
sawErr = true
assert.ErrorIs(t, err, wantErr)
continue
}
dataFiles = append(dataFiles, df)
}
assert.True(t, sawErr, "expected mid-stream error to propagate to the consumer")
assert.Empty(t, dataFiles,
"DV producer must not flush a partial Puffin when the iterator errors mid-stream")
}
// TestDVWritePathMergesMultiBatchSameFile verifies that positions for the
// same data file arriving across multiple batches are merged into a single
// DV blob (one DataFile, one bitmap with the union of positions).
func TestDVWritePathMergesMultiBatchSameFile(t *testing.T) {
mb := newPositionDeleteUnpartitionedMetadata(t, 3)
fs := iceio.NewMemFS()
const path = "s3://bucket/data/file-001.parquet"
batch1 := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[
{"file_path": "`+path+`", "pos": 1},
{"file_path": "`+path+`", "pos": 3}
]`)
batch2 := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[
{"file_path": "`+path+`", "pos": 5},
{"file_path": "`+path+`", "pos": 7}
]`)
itr := func(yield func(arrow.RecordBatch, error) bool) {
batch1.Retain()
if !yield(batch1, nil) {
return
}
batch2.Retain()
yield(batch2, nil)
}
seq := positionDeleteRecordsToDataFiles(context.Background(), "mem://test", mb, nil,
recordWritingArgs{
sc: PositionalDeleteArrowSchema,
itr: itr,
fs: fs,
})
var dataFiles []iceberg.DataFile
for df, err := range seq {
require.NoError(t, err)
dataFiles = append(dataFiles, df)
}
require.Len(t, dataFiles, 1,
"two batches targeting the same file should merge into one DV blob")
df := dataFiles[0]
assert.Equal(t, iceberg.PuffinFile, df.FileFormat())
assert.Equal(t, path, *df.ReferencedDataFile())
assert.Equal(t, int64(4), df.Count(),
"merged DV cardinality should be the union across batches")
bm, err := dv.ReadDV(fs, df)
require.NoError(t, err)
assert.Equal(t, int64(4), bm.Cardinality())
for _, pos := range []uint64{1, 3, 5, 7} {
assert.True(t, bm.Contains(pos), "merged bitmap should contain pos %d", pos)
}
}