blob: 0acb0791d7c9ee93f9bfd2e633bbe289a445c467 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package codec_test
import (
"reflect"
"sort"
"strconv"
"sync"
"testing"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/codec"
"github.com/stretchr/testify/require"
)
func TestEncodeDecodeDataFileRoundTrip(t *testing.T) {
for _, version := range []int{1, 2, 3} {
t.Run("v"+strconv.Itoa(version), func(t *testing.T) {
spec, schema, original := fullyPopulatedDataFile(t, version)
bytes, err := codec.EncodeDataFile(original, spec, schema, version)
require.NoError(t, err)
require.NotEmpty(t, bytes)
decoded, err := codec.DecodeDataFile(bytes, spec, schema, version)
require.NoError(t, err)
require.NotNil(t, decoded)
assertDataFileEqual(t, original, decoded, version)
})
}
}
func TestEncodeDataFileRejectsForeignImpl(t *testing.T) {
spec, schema, _ := fullyPopulatedDataFile(t, 2)
_, err := codec.EncodeDataFile(stubDataFile{}, spec, schema, 2)
require.Error(t, err)
require.Contains(t, err.Error(), "iceberg.AvroEntryMarshaler",
"error should name the interface the caller needs to satisfy")
require.Contains(t, err.Error(), "codec: EncodeDataFile:",
"codec errors should be prefixed with the codec entry point name")
}
func TestEncodeDataFileRejectsBadVersion(t *testing.T) {
spec, schema, df := fullyPopulatedDataFile(t, 2)
for _, v := range []int{0, 4, -1} {
_, err := codec.EncodeDataFile(df, spec, schema, v)
require.Error(t, err, "version %d must be rejected", v)
require.Contains(t, err.Error(), "codec: EncodeDataFile:")
require.Contains(t, err.Error(), "unsupported format version")
}
}
func TestDecodeDataFileRejectsBadVersion(t *testing.T) {
spec, schema, df := fullyPopulatedDataFile(t, 2)
encoded, err := codec.EncodeDataFile(df, spec, schema, 2)
require.NoError(t, err)
for _, v := range []int{0, 4, -1} {
_, err := codec.DecodeDataFile(encoded, spec, schema, v)
require.Error(t, err, "version %d must be rejected", v)
require.Contains(t, err.Error(), "codec: DecodeDataFile:")
require.Contains(t, err.Error(), "unsupported format version")
}
}
func TestDecodeDataFileWrapsDecodeError(t *testing.T) {
spec, schema, _ := fullyPopulatedDataFile(t, 2)
_, err := codec.DecodeDataFile([]byte{0xCA, 0xFE}, spec, schema, 2)
require.Error(t, err)
require.Contains(t, err.Error(), "codec: DecodeDataFile:",
"decode-side errors from the bridge should be wrapped with the codec entry point")
}
// TestEncodeDataFileAcceptsExternalMarshaler exercises the
// extensibility contract: any DataFile that also implements
// [iceberg.AvroEntryMarshaler] is accepted by the codec, not only the
// iceberg package's built-in implementation.
func TestEncodeDataFileAcceptsExternalMarshaler(t *testing.T) {
spec, schema, _ := fullyPopulatedDataFile(t, 2)
got, err := codec.EncodeDataFile(externalMarshaler{payload: []byte{0xCA, 0xFE}}, spec, schema, 2)
require.NoError(t, err)
require.Equal(t, []byte{0xCA, 0xFE}, got,
"the marshaler interface contract must let external impls supply their own bytes")
}
type externalMarshaler struct {
stubDataFile
payload []byte
}
func (e externalMarshaler) MarshalAvroEntry(iceberg.PartitionSpec, *iceberg.Schema, int) ([]byte, error) {
return e.payload, nil
}
func TestEncodeDataFileIdempotent(t *testing.T) {
spec, schema, df := fullyPopulatedDataFile(t, 2)
first, err := codec.EncodeDataFile(df, spec, schema, 2)
require.NoError(t, err)
second, err := codec.EncodeDataFile(df, spec, schema, 2)
require.NoError(t, err)
require.Equal(t, first, second, "repeated encodes must produce identical bytes")
}
func TestEncodeDataFileConcurrent(t *testing.T) {
spec, schema, df := fullyPopulatedDataFile(t, 2)
want, err := codec.EncodeDataFile(df, spec, schema, 2)
require.NoError(t, err)
const goroutines = 16
const iterations = 32
results := make(chan []byte, goroutines*iterations)
errs := make(chan error, goroutines*iterations)
var wg sync.WaitGroup
for range goroutines {
wg.Go(func() {
for range iterations {
b, err := codec.EncodeDataFile(df, spec, schema, 2)
if err != nil {
errs <- err
return
}
results <- b
}
})
}
wg.Wait()
close(results)
close(errs)
for err := range errs {
t.Fatalf("concurrent encode failed: %v", err)
}
for b := range results {
require.Equal(t, want, b, "concurrent encodes must produce identical bytes")
}
}
func fullyPopulatedDataFile(t *testing.T, version int) (iceberg.PartitionSpec, *iceberg.Schema, iceberg.DataFile) {
t.Helper()
schema := iceberg.NewSchema(123,
iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.Int64Type{}, Required: true},
iceberg.NestedField{ID: 2, Name: "name", Type: iceberg.StringType{}},
)
spec := iceberg.NewPartitionSpecID(7,
iceberg.PartitionField{SourceIDs: []int{1}, FieldID: 1000, Name: "id_part", Transform: iceberg.IdentityTransform{}},
)
builder, err := iceberg.NewDataFileBuilder(
spec,
iceberg.EntryContentData,
"s3://bucket/ns/tbl/data/part-0000.parquet",
iceberg.ParquetFile,
map[int]any{1000: int64(42)},
map[int]string{},
map[int]int{},
1024,
1024*1024,
)
require.NoError(t, err)
builder.
ColumnSizes(map[int]int64{1: 512, 2: 256}).
ValueCounts(map[int]int64{1: 1024, 2: 1024}).
NullValueCounts(map[int]int64{1: 0, 2: 4}).
NaNValueCounts(map[int]int64{1: 0, 2: 0}).
LowerBoundValues(map[int][]byte{1: {0x01}, 2: []byte("a")}).
UpperBoundValues(map[int][]byte{1: {0xff}, 2: []byte("z")}).
SplitOffsets([]int64{0, 4096}).
SortOrderID(0).
KeyMetadata([]byte("kms-key-1"))
// distinct_counts (field 111) is deprecated in the spec for every
// version (apache/iceberg#12182). The fixture populates it on every
// version to assert that the wire codec drops the field on encode
// regardless of what the in-memory DataFile carries — strict readers
// (e.g. PyIceberg) reject manifests that emit it. The deprecated
// setter is invoked here intentionally to exercise that drop path.
builder.DistinctValueCounts(map[int]int64{1: 64, 2: 128}) //nolint:staticcheck // SA1019: deliberate use of deprecated setter to assert the encoder drops the field
if version >= 2 {
builder.EqualityFieldIDs([]int{1})
}
if version >= 3 {
builder.FirstRowID(0).
ReferencedDataFile("s3://bucket/ns/tbl/data/source.parquet").
ContentOffset(128).
ContentSizeInBytes(2048)
}
return spec, schema, builder.Build()
}
func assertDataFileEqual(t *testing.T, want, got iceberg.DataFile, version int) {
t.Helper()
require.Equal(t, want.FilePath(), got.FilePath())
require.Equal(t, want.FileFormat(), got.FileFormat())
require.Equal(t, want.Partition(), got.Partition())
require.Equal(t, want.Count(), got.Count())
require.Equal(t, want.FileSizeBytes(), got.FileSizeBytes())
require.Equal(t, want.ColumnSizes(), got.ColumnSizes())
require.Equal(t, want.ValueCounts(), got.ValueCounts())
require.Equal(t, want.NullValueCounts(), got.NullValueCounts())
require.Equal(t, want.NaNValueCounts(), got.NaNValueCounts())
require.Equal(t, want.LowerBoundValues(), got.LowerBoundValues())
require.Equal(t, want.UpperBoundValues(), got.UpperBoundValues())
require.Equal(t, want.KeyMetadata(), got.KeyMetadata())
require.Equal(t, want.SplitOffsets(), got.SplitOffsets())
require.Equal(t, want.SortOrderID(), got.SortOrderID())
require.Equal(t, want.SpecID(), got.SpecID())
require.Equal(t, want.ContentType(), got.ContentType())
require.Empty(t, got.DistinctValueCounts(),
"distinct_counts (field 111) is deprecated in every version "+
"(apache/iceberg#12182); the manifest-entry schemas in "+
"internal/avro_schemas.go omit it for v1/v2/v3 so the encoder "+
"drops it on the wire regardless of what the source DataFile carries")
if version >= 2 {
require.Equal(t, want.EqualityFieldIDs(), got.EqualityFieldIDs())
}
if version >= 3 {
require.Equal(t, want.FirstRowID(), got.FirstRowID())
require.Equal(t, want.ReferencedDataFile(), got.ReferencedDataFile())
require.Equal(t, want.ContentOffset(), got.ContentOffset())
require.Equal(t, want.ContentSizeInBytes(), got.ContentSizeInBytes())
}
require.Equal(t, expectedDataFileMethods, dataFileInterfaceMethods(),
"DataFile interface drifted: either extend [codec.EncodeDataFile]/[codec.DecodeDataFile] "+
"and the assertions above to cover the change, or update expectedDataFileMethods "+
"with a comment explaining why the new method is intentionally not transported")
}
// expectedDataFileMethods is the sorted list of methods the
// [iceberg.DataFile] interface is known to export. The round-trip test
// asserts this against runtime reflection; any drift here forces a
// deliberate decision about whether the new method needs to be
// round-tripped by [codec.EncodeDataFile] / [codec.DecodeDataFile].
var expectedDataFileMethods = []string{
"ColumnSizes",
"ContentOffset",
"ContentSizeInBytes",
"ContentType",
"Count",
"DistinctValueCounts",
"EqualityFieldIDs",
"FileFormat",
"FilePath",
"FileSizeBytes",
"FirstRowID",
"KeyMetadata",
"LowerBoundValues",
"NaNValueCounts",
"NullValueCounts",
"Partition",
"ReferencedDataFile",
"SortOrderID",
"SpecID",
"SplitOffsets",
"UpperBoundValues",
"ValueCounts",
}
// dataFileInterfaceMethods returns the sorted names of every method on
// the DataFile interface. If a new method is added upstream the slice
// changes; the round-trip test then enforces it is covered by an
// explicit assertion above.
func dataFileInterfaceMethods() []string {
t := reflect.TypeOf((*iceberg.DataFile)(nil)).Elem()
out := make([]string, 0, t.NumMethod())
for i := 0; i < t.NumMethod(); i++ {
out = append(out, t.Method(i).Name)
}
sort.Strings(out)
return out
}
type stubDataFile struct{}
func (stubDataFile) ContentType() iceberg.ManifestEntryContent { return iceberg.EntryContentData }
func (stubDataFile) FilePath() string { return "" }
func (stubDataFile) FileFormat() iceberg.FileFormat { return iceberg.ParquetFile }
func (stubDataFile) Partition() map[int]any { return nil }
func (stubDataFile) Count() int64 { return 0 }
func (stubDataFile) FileSizeBytes() int64 { return 0 }
func (stubDataFile) ColumnSizes() map[int]int64 { return nil }
func (stubDataFile) ValueCounts() map[int]int64 { return nil }
func (stubDataFile) NullValueCounts() map[int]int64 { return nil }
func (stubDataFile) NaNValueCounts() map[int]int64 { return nil }
func (stubDataFile) DistinctValueCounts() map[int]int64 { return nil }
func (stubDataFile) LowerBoundValues() map[int][]byte { return nil }
func (stubDataFile) UpperBoundValues() map[int][]byte { return nil }
func (stubDataFile) KeyMetadata() []byte { return nil }
func (stubDataFile) SplitOffsets() []int64 { return nil }
func (stubDataFile) EqualityFieldIDs() []int { return nil }
func (stubDataFile) SortOrderID() *int { return nil }
func (stubDataFile) SpecID() int32 { return 0 }
func (stubDataFile) FirstRowID() *int64 { return nil }
func (stubDataFile) ReferencedDataFile() *string { return nil }
func (stubDataFile) ContentOffset() *int64 { return nil }
func (stubDataFile) ContentSizeInBytes() *int64 { return nil }