blob: 7a7b3d394acbb3f1c1bf1f1cd3114a7cdf15f377 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package iceberg
import (
"fmt"
"reflect"
"github.com/apache/iceberg-go/internal"
"github.com/apache/iceberg-go/internal/datafileavro"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/twmb/avro"
)
// defaultSchemaCacheSize is the initial capacity of dataFileSchemaCache.
// 8192 covers a few thousand active tables (each contributing 1–3 entries
// for its partition spec × format version) which is what a long-running
// consumer like a server-side compaction service typically sees.
// SetSchemaCacheSize tunes this at runtime.
const defaultSchemaCacheSize = 8192
func init() {
if datafileavro.Unmarshal != nil {
panic("iceberg: datafileavro.Unmarshal already set")
}
datafileavro.Unmarshal = func(data []byte, spec, schema any, version int) (any, error) {
s, ok := spec.(PartitionSpec)
if !ok {
return nil, fmt.Errorf("iceberg: datafileavro.Unmarshal: expected PartitionSpec, got %T", spec)
}
sc, ok := schema.(*Schema)
if !ok {
return nil, fmt.Errorf("iceberg: datafileavro.Unmarshal: expected *Schema, got %T", schema)
}
return unmarshalAvroDataFileEntry(data, s, sc, version)
}
}
// AvroEntryMarshaler is implemented by DataFile values that can be
// encoded using the manifest-entry Avro encoding. The iceberg
// package's built-in DataFile implementation satisfies it; external
// implementations can also satisfy it to participate in the
// [github.com/apache/iceberg-go/codec] DataFile codec.
//
// The encoded bytes are the same bytes a manifest carries for this
// data file. Implementations must produce output that the iceberg
// package's manifest-entry Avro decoder accepts.
type AvroEntryMarshaler interface {
MarshalAvroEntry(spec PartitionSpec, schema *Schema, version int) ([]byte, error)
}
// MarshalAvroEntry encodes this DataFile as Avro bytes using the
// manifest-entry encoding for the given partition spec, table schema
// and format version (1, 2, or 3). The wire format is the same one a
// manifest carries for this data file, so adding a field to the
// underlying struct (and its avro tags) automatically extends what
// MarshalAvroEntry transports — there is no separate wire-mirror
// struct to keep in sync.
//
// MarshalAvroEntry is the iceberg-package side of the
// [github.com/apache/iceberg-go/codec] DataFile codec; callers
// performing cross-process transport should prefer that package's
// high-level API.
//
// MarshalAvroEntry is safe to call concurrently with any other
// reader or encoder of the same DataFile: a fresh *dataFile is
// cloned (avro-tagged fields only) and the avro encoder reads, but
// does not mutate, the cloned values. Pointer-typed avro fields like
// ColSizes share their backing storage with the source; the
// thread-safety guarantee relies on the avro encoder being
// non-mutating.
//
// v1 note: the v1 manifest-entry schema has a non-nullable snapshot_id
// field. MarshalAvroEntry writes 0 there, so v1 bytes are not usable
// as a standalone manifest entry — they only round-trip via the
// matching decoder.
//
// distinct_counts (field 111) is deprecated in the spec for every
// version (apache/iceberg#12182). MarshalAvroEntry drops the field
// on encode for v1, v2, and v3 alike — values populated on the
// source DataFile are not transported. The Avro tag on the dataFile
// struct is intentionally retained so legacy manifests that already
// carry the field on the wire still decode through the matching
// reader. New DataFiles should not set distinct counts.
func (d *dataFile) MarshalAvroEntry(spec PartitionSpec, schema *Schema, version int) ([]byte, error) {
if version < 1 || version > 3 {
return nil, fmt.Errorf("iceberg: MarshalAvroEntry: unsupported format version %d", version)
}
s, maps, err := manifestEntrySchemaFor(spec, schema, version)
if err != nil {
return nil, err
}
clone := cloneDataFileAvroFields(d)
clone.PartitionData = avroEncodePartitionData(d.Partition(), maps.nameToID, maps.idToType)
return s.Encode(newEncodeEntry(version, clone))
}
// unmarshalAvroDataFileEntry decodes Avro bytes produced by
// [(*dataFile).MarshalAvroEntry] back into a DataFile. The
// (spec, schema, version) triple must match the encoder; passing a
// different spec or version yields a decode error or silently
// mis-typed partition values.
//
// The returned DataFile carries the partition spec id and the field-id
// lookup tables, so Partition() and the stats accessors return id-keyed
// maps as if the file had been read from a manifest.
//
// It is reachable from the [github.com/apache/iceberg-go/codec]
// package through the [datafileavro] bridge.
func unmarshalAvroDataFileEntry(data []byte, spec PartitionSpec, schema *Schema, version int) (DataFile, error) {
if version < 1 || version > 3 {
return nil, fmt.Errorf("iceberg: unmarshalAvroDataFileEntry: unsupported format version %d", version)
}
s, maps, err := manifestEntrySchemaFor(spec, schema, version)
if err != nil {
return nil, err
}
entry, df := newDecodeEntry(version)
if _, err := s.Decode(data, entry); err != nil {
return nil, fmt.Errorf("iceberg: unmarshalAvroDataFileEntry: %w", err)
}
df.specID = int32(spec.ID())
df.fieldNameToID = maps.nameToID
df.fieldIDToLogicalType = maps.idToType
df.fieldIDToFixedSize = maps.idToFixedSize
return df, nil
}
// newEncodeEntry returns the right manifest-entry shape for the schema
// version: v1's manifest_entry has a non-nullable snapshot_id and uses
// [fallbackManifestEntry], v2/v3 use [manifestEntry] with nullable
// pointers.
func newEncodeEntry(version int, df *dataFile) any {
if version == 1 {
return &fallbackManifestEntry{
manifestEntry: manifestEntry{EntryStatus: EntryStatusADDED, Data: df},
}
}
return &manifestEntry{EntryStatus: EntryStatusADDED, Data: df}
}
// newDecodeEntry mirrors [newEncodeEntry] for the read side: it returns
// the pointer to pass to avro.Schema.Decode along with the pre-allocated
// *dataFile that will be populated.
func newDecodeEntry(version int) (any, *dataFile) {
df := &dataFile{}
if version == 1 {
return &fallbackManifestEntry{manifestEntry: manifestEntry{Data: df}}, df
}
return &manifestEntry{Data: df}, df
}
// cloneDataFileAvroFields returns a fresh *dataFile populated with src's
// avro-tagged fields. Internal state (sync.Once, lazy-init caches,
// specID, the field-id lookup maps) is intentionally left at zero
// values because the avro encoder reads only the avro-tagged fields.
//
// Using reflection over the tag set means a new avro-tagged field
// upstream is auto-copied without an update here — the dataFile struct
// remains the single source of truth for the wire shape. It also
// sidesteps the go-vet copies-lock warning that would fire on a
// struct-literal copy of *dataFile (it embeds sync.Once).
//
// Note: this is a shallow copy. Pointer-typed avro fields (ColSizes,
// LowerBounds, etc.) share their backing storage with the source.
// The no-mutation guarantee of MarshalAvroEntry depends on the avro
// encoder being read-only on the values it walks; TestMarshalAvroEntry
// DoesNotMutate asserts this end-to-end across every avro-tagged
// field, so a future regression in the encoder surfaces in tests.
func cloneDataFileAvroFields(src *dataFile) *dataFile {
out := &dataFile{}
srcVal := reflect.ValueOf(src).Elem()
outVal := reflect.ValueOf(out).Elem()
t := srcVal.Type()
for i := 0; i < t.NumField(); i++ {
if _, hasAvroTag := t.Field(i).Tag.Lookup("avro"); hasAvroTag {
outVal.Field(i).Set(srcVal.Field(i))
}
}
return out
}
// avroEncodePartitionData converts an id-keyed partition tuple (carrying
// iceberg-typed values like Date or Decimal) into the name-keyed
// avro-friendly map the manifest-entry schema expects. Idempotent:
// values already in primitive form pass through unchanged.
func avroEncodePartitionData(idKeyed map[int]any, nameToID map[string]int, logicalTypes map[int]string) map[string]any {
converted := avroPartitionData(idKeyed, logicalTypes)
out := make(map[string]any, len(converted))
for name, id := range nameToID {
if v, ok := converted[id]; ok {
out[name] = v
}
}
return out
}
type dataFileFieldMaps struct {
nameToID map[string]int
idToType map[int]string
idToFixedSize map[int]int
}
// dataFileSchemaCacheKey identifies a cached avro schema by the
// structural fingerprint of the partition Avro shape and the format
// version. The fingerprint is taken from the avro schema produced by
// [partitionTypeToAvroSchema] rather than [StructType.String]: the
// avro shape ignores doc strings and other metadata that don't change
// the wire format, so structurally identical specs that differ only
// in documentation share a single cache entry.
type dataFileSchemaCacheKey struct {
partAvroFingerprint string
version int
}
type dataFileSchemaEntry struct {
schema *avro.Schema
maps dataFileFieldMaps
}
var dataFileSchemaCache = mustNewSchemaCache(defaultSchemaCacheSize)
func mustNewSchemaCache(size int) *lru.Cache[dataFileSchemaCacheKey, *dataFileSchemaEntry] {
c, err := lru.New[dataFileSchemaCacheKey, *dataFileSchemaEntry](size)
if err != nil {
panic(fmt.Sprintf("iceberg: schema cache size %d invalid: %v", size, err))
}
return c
}
// SetSchemaCacheSize resizes the manifest-entry schema cache used by
// the DataFile codec. The default capacity is sized for a few thousand
// active partition specs; long-running consumers with larger working
// sets (e.g. a compaction service touching many tables) should raise
// it. Existing entries are preserved on grow; on shrink, least-recently
// used entries are evicted down to the new size. Not safe to call
// concurrently with codec operations.
func SetSchemaCacheSize(size int) error {
if size <= 0 {
return fmt.Errorf("iceberg: SetSchemaCacheSize: size must be positive, got %d", size)
}
dataFileSchemaCache.Resize(size)
return nil
}
// manifestEntrySchemaFor returns the cached avro schema and partition
// field-id lookups for the given partition type and format version.
// The cache key fingerprints the partition Avro shape, so specs that
// differ only in field documentation share a single entry.
func manifestEntrySchemaFor(spec PartitionSpec, schema *Schema, version int) (*avro.Schema, dataFileFieldMaps, error) {
partType := spec.PartitionType(schema)
partSchema, err := partitionTypeToAvroSchema(partType)
if err != nil {
return nil, dataFileFieldMaps{}, err
}
key := dataFileSchemaCacheKey{partAvroFingerprint: partSchema.String(), version: version}
if cached, ok := dataFileSchemaCache.Get(key); ok {
return cached.schema, cached.maps, nil
}
fullSchema, err := internal.NewManifestEntrySchema(partSchema, version)
if err != nil {
return nil, dataFileFieldMaps{}, err
}
n2i, i2t, i2s := getFieldIDMap(fullSchema)
entry := &dataFileSchemaEntry{
schema: fullSchema,
maps: dataFileFieldMaps{
nameToID: n2i,
idToType: i2t,
idToFixedSize: i2s,
},
}
dataFileSchemaCache.Add(key, entry)
return entry.schema, entry.maps, nil
}