blob: 0b2932a8f75f7606c091a454fc1b83f8593c183b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package iceberg
import (
"encoding/json"
"errors"
"fmt"
"io"
"math"
"reflect"
"slices"
"strconv"
"sync"
"time"
"github.com/apache/iceberg-go/internal"
iceio "github.com/apache/iceberg-go/io"
"github.com/google/uuid"
"github.com/hamba/avro/v2"
"github.com/hamba/avro/v2/ocf"
)
// ManifestContent indicates the type of data inside of the files
// described by a manifest. This will indicate whether the data files
// contain active data or deleted rows.
type ManifestContent int32
const (
ManifestContentData ManifestContent = 0
ManifestContentDeletes ManifestContent = 1
)
func (m ManifestContent) String() string {
switch m {
case ManifestContentData:
return "data"
case ManifestContentDeletes:
return "deletes"
default:
return "UNKNOWN"
}
}
const initialSequenceNumber = 0
type FieldSummary struct {
ContainsNull bool `avro:"contains_null"`
ContainsNaN *bool `avro:"contains_nan"`
LowerBound *[]byte `avro:"lower_bound"`
UpperBound *[]byte `avro:"upper_bound"`
}
type ManifestBuilder struct {
m *manifestFile
}
func NewManifestFile(version int, path string, length int64, partitionSpecID int32, addedSnapshotID int64) *ManifestBuilder {
var seqNum int64
if version != 1 {
seqNum = -1
}
return &ManifestBuilder{
m: &manifestFile{
version: version,
Path: path,
Len: length,
SpecID: partitionSpecID,
AddedSnapshotID: addedSnapshotID,
Content: ManifestContentData,
SeqNumber: seqNum,
MinSeqNumber: seqNum,
},
}
}
func (b *ManifestBuilder) SequenceNum(num, minSeqNum int64) *ManifestBuilder {
b.m.SeqNumber, b.m.MinSeqNumber = num, minSeqNum
return b
}
func (b *ManifestBuilder) Content(content ManifestContent) *ManifestBuilder {
b.m.Content = content
return b
}
func (b *ManifestBuilder) AddedFiles(cnt int32) *ManifestBuilder {
b.m.AddedFilesCount = cnt
return b
}
func (b *ManifestBuilder) ExistingFiles(cnt int32) *ManifestBuilder {
b.m.ExistingFilesCount = cnt
return b
}
func (b *ManifestBuilder) DeletedFiles(cnt int32) *ManifestBuilder {
b.m.DeletedFilesCount = cnt
return b
}
func (b *ManifestBuilder) AddedRows(cnt int64) *ManifestBuilder {
b.m.AddedRowsCount = cnt
return b
}
func (b *ManifestBuilder) ExistingRows(cnt int64) *ManifestBuilder {
b.m.ExistingRowsCount = cnt
return b
}
func (b *ManifestBuilder) DeletedRows(cnt int64) *ManifestBuilder {
b.m.DeletedRowsCount = cnt
return b
}
func (b *ManifestBuilder) Partitions(p []FieldSummary) *ManifestBuilder {
b.m.PartitionList = &p
return b
}
func (b *ManifestBuilder) KeyMetadata(km []byte) *ManifestBuilder {
b.m.Key = km
return b
}
func (b *ManifestBuilder) Build() ManifestFile {
return b.m
}
type fallbackManifestFileV1 struct {
manifestFileV1
AddedSnapshotID *int64 `avro:"added_snapshot_id"`
}
func (f *fallbackManifestFileV1) toFile() *manifestFile {
if f.AddedSnapshotID == nil {
f.manifestFileV1.AddedSnapshotID = -1
}
return f.manifestFileV1.toFile()
}
type manifestFileV1 struct {
manifestFile
AddedFilesCount *int32 `avro:"added_files_count"`
ExistingFilesCount *int32 `avro:"existing_files_count"`
DeletedFilesCount *int32 `avro:"deleted_files_count"`
AddedRowsCount *int64 `avro:"added_rows_count"`
ExistingRowsCount *int64 `avro:"existing_rows_count"`
DeletedRowsCount *int64 `avro:"deleted_rows_count"`
}
func (m *manifestFileV1) toFile() *manifestFile {
m.manifestFile.version = 1
m.Content = ManifestContentData
m.SeqNumber, m.MinSeqNumber = initialSequenceNumber, initialSequenceNumber
if m.AddedFilesCount != nil {
m.manifestFile.AddedFilesCount = *m.AddedFilesCount
} else {
m.manifestFile.AddedFilesCount = -1
}
if m.ExistingFilesCount != nil {
m.manifestFile.ExistingFilesCount = *m.ExistingFilesCount
} else {
m.manifestFile.ExistingFilesCount = -1
}
if m.DeletedFilesCount != nil {
m.manifestFile.DeletedFilesCount = *m.DeletedFilesCount
} else {
m.manifestFile.DeletedFilesCount = -1
}
if m.AddedRowsCount != nil {
m.manifestFile.AddedRowsCount = *m.AddedRowsCount
} else {
m.manifestFile.AddedRowsCount = -1
}
if m.ExistingRowsCount != nil {
m.manifestFile.ExistingRowsCount = *m.ExistingRowsCount
} else {
m.manifestFile.ExistingRowsCount = -1
}
if m.DeletedRowsCount != nil {
m.manifestFile.DeletedRowsCount = *m.DeletedRowsCount
} else {
m.manifestFile.DeletedRowsCount = -1
}
return &m.manifestFile
}
func (*manifestFileV1) Version() int { return 1 }
func (m *manifestFileV1) FilePath() string { return m.Path }
func (m *manifestFileV1) Length() int64 { return m.Len }
func (m *manifestFileV1) PartitionSpecID() int32 { return m.SpecID }
func (m *manifestFileV1) ManifestContent() ManifestContent {
return ManifestContentData
}
func (m *manifestFileV1) SnapshotID() int64 {
return m.AddedSnapshotID
}
func (m *manifestFileV1) AddedDataFiles() int32 {
if m.AddedFilesCount == nil {
return 0
}
return *m.AddedFilesCount
}
func (m *manifestFileV1) ExistingDataFiles() int32 {
if m.ExistingFilesCount == nil {
return 0
}
return *m.ExistingFilesCount
}
func (m *manifestFileV1) DeletedDataFiles() int32 {
if m.DeletedFilesCount == nil {
return 0
}
return *m.DeletedFilesCount
}
func (m *manifestFileV1) AddedRows() int64 {
if m.AddedRowsCount == nil {
return 0
}
return *m.AddedRowsCount
}
func (m *manifestFileV1) ExistingRows() int64 {
if m.ExistingRowsCount == nil {
return 0
}
return *m.ExistingRowsCount
}
func (m *manifestFileV1) DeletedRows() int64 {
if m.DeletedRowsCount == nil {
return 0
}
return *m.DeletedRowsCount
}
func (m *manifestFileV1) HasAddedFiles() bool {
return m.AddedFilesCount == nil || *m.AddedFilesCount > 0
}
func (m *manifestFileV1) HasExistingFiles() bool {
return m.ExistingFilesCount == nil || *m.ExistingFilesCount > 0
}
func (m *manifestFileV1) SequenceNum() int64 { return 0 }
func (m *manifestFileV1) MinSequenceNum() int64 { return 0 }
func (m *manifestFileV1) KeyMetadata() []byte { return m.Key }
func (m *manifestFileV1) Partitions() []FieldSummary {
if m.PartitionList == nil {
return nil
}
return *m.PartitionList
}
func (m *manifestFileV1) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
return fetchManifestEntries(m, fs, discardDeleted)
}
type manifestFile struct {
Path string `avro:"manifest_path"`
Len int64 `avro:"manifest_length"`
SpecID int32 `avro:"partition_spec_id"`
Content ManifestContent `avro:"content"`
SeqNumber int64 `avro:"sequence_number"`
MinSeqNumber int64 `avro:"min_sequence_number"`
AddedSnapshotID int64 `avro:"added_snapshot_id"`
AddedFilesCount int32 `avro:"added_files_count"`
ExistingFilesCount int32 `avro:"existing_files_count"`
DeletedFilesCount int32 `avro:"deleted_files_count"`
AddedRowsCount int64 `avro:"added_rows_count"`
ExistingRowsCount int64 `avro:"existing_rows_count"`
DeletedRowsCount int64 `avro:"deleted_rows_count"`
PartitionList *[]FieldSummary `avro:"partitions"`
Key []byte `avro:"key_metadata"`
version int `avro:"-"`
}
func (m *manifestFile) setVersion(v int) {
m.version = v
}
func (m *manifestFile) toV1(v1file *manifestFileV1) {
v1file.Path = m.Path
v1file.Len = m.Len
v1file.SpecID = m.SpecID
v1file.AddedSnapshotID = m.AddedSnapshotID
v1file.PartitionList = m.PartitionList
v1file.Key = m.Key
if m.AddedFilesCount >= 0 {
v1file.AddedFilesCount = &m.AddedFilesCount
} else {
v1file.AddedFilesCount = nil
}
if m.ExistingFilesCount >= 0 {
v1file.ExistingFilesCount = &m.ExistingFilesCount
} else {
v1file.ExistingFilesCount = nil
}
if m.DeletedFilesCount >= 0 {
v1file.DeletedFilesCount = &m.DeletedFilesCount
} else {
v1file.DeletedFilesCount = nil
}
if m.AddedRowsCount >= 0 {
v1file.AddedRowsCount = &m.AddedRowsCount
} else {
v1file.AddedRowsCount = nil
}
if m.ExistingRowsCount >= 0 {
v1file.ExistingRowsCount = &m.ExistingRowsCount
} else {
v1file.ExistingRowsCount = nil
}
if m.DeletedRowsCount >= 0 {
v1file.DeletedRowsCount = &m.DeletedRowsCount
} else {
v1file.DeletedRowsCount = nil
}
}
func (m *manifestFile) Version() int { return m.version }
func (m *manifestFile) FilePath() string { return m.Path }
func (m *manifestFile) Length() int64 { return m.Len }
func (m *manifestFile) PartitionSpecID() int32 { return m.SpecID }
func (m *manifestFile) ManifestContent() ManifestContent { return m.Content }
func (m *manifestFile) SnapshotID() int64 { return m.AddedSnapshotID }
func (m *manifestFile) AddedDataFiles() int32 { return m.AddedFilesCount }
func (m *manifestFile) ExistingDataFiles() int32 { return m.ExistingFilesCount }
func (m *manifestFile) DeletedDataFiles() int32 { return m.DeletedFilesCount }
func (m *manifestFile) AddedRows() int64 { return m.AddedRowsCount }
func (m *manifestFile) ExistingRows() int64 { return m.ExistingRowsCount }
func (m *manifestFile) DeletedRows() int64 { return m.DeletedRowsCount }
func (m *manifestFile) SequenceNum() int64 { return m.SeqNumber }
func (m *manifestFile) MinSequenceNum() int64 { return m.MinSeqNumber }
func (m *manifestFile) KeyMetadata() []byte { return m.Key }
func (m *manifestFile) Partitions() []FieldSummary {
if m.PartitionList == nil {
return nil
}
return *m.PartitionList
}
func (m *manifestFile) HasAddedFiles() bool { return m.AddedFilesCount != 0 }
func (m *manifestFile) HasExistingFiles() bool { return m.ExistingFilesCount != 0 }
func (m *manifestFile) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
return fetchManifestEntries(m, fs, discardDeleted)
}
func getFieldIDMap(sc avro.Schema) (map[string]int, map[int]avro.LogicalType) {
getField := func(rs *avro.RecordSchema, name string) *avro.Field {
for _, f := range rs.Fields() {
if f.Name() == name {
return f
}
}
return nil
}
result := make(map[string]int)
logicalTypes := make(map[int]avro.LogicalType)
entryField := getField(sc.(*avro.RecordSchema), "data_file")
partitionField := getField(entryField.Type().(*avro.RecordSchema), "partition")
for _, field := range partitionField.Type().(*avro.RecordSchema).Fields() {
if fid, ok := field.Prop("field-id").(float64); ok {
result[field.Name()] = int(fid)
avroTyp := field.Type()
if us, ok := avroTyp.(*avro.UnionSchema); ok {
for _, t := range us.Types() {
avroTyp = t
}
}
if ps, ok := avroTyp.(*avro.PrimitiveSchema); ok && ps.Logical() != nil {
logicalTypes[int(fid)] = ps.Logical().Type()
}
}
}
return result, logicalTypes
}
type hasFieldToIDMap interface {
setFieldNameToIDMap(map[string]int)
setFieldIDToLogicalTypeMap(map[int]avro.LogicalType)
}
func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
f, err := fs.Open(m.FilePath())
if err != nil {
return nil, err
}
defer f.Close()
return ReadManifest(m, f, discardDeleted)
}
// ManifestFile is the interface which covers both V1 and V2 manifest files.
type ManifestFile interface {
// Version returns the version number of this manifest file.
// It should be 1 or 2.
Version() int
// FilePath is the location URI of this manifest file.
FilePath() string
// Length is the length in bytes of the manifest file.
Length() int64
// PartitionSpecID is the ID of the partition spec used to write
// this manifest. It must be listed in the table metadata
// partition-specs.
PartitionSpecID() int32
// ManifestContent is the type of files tracked by this manifest,
// either data or delete files. All v1 manifests track data files.
ManifestContent() ManifestContent
// SnapshotID is the ID of the snapshot where this manifest file
// was added.
SnapshotID() int64
// AddedDataFiles returns the number of entries in the manifest that
// have the status of EntryStatusADDED.
AddedDataFiles() int32
// ExistingDataFiles returns the number of entries in the manifest
// which have the status of EntryStatusEXISTING.
ExistingDataFiles() int32
// DeletedDataFiles returns the number of entries in the manifest
// which have the status of EntryStatusDELETED.
DeletedDataFiles() int32
// AddedRows returns the number of rows in all files of the manifest
// that have status EntryStatusADDED.
AddedRows() int64
// ExistingRows returns the number of rows in all files of the manifest
// which have status EntryStatusEXISTING.
ExistingRows() int64
// DeletedRows returns the number of rows in all files of the manifest
// which have status EntryStatusDELETED.
DeletedRows() int64
// SequenceNum returns the sequence number when this manifest was
// added to the table. Will be 0 for v1 manifest lists.
SequenceNum() int64
// MinSequenceNum is the minimum data sequence number of all live data
// or delete files in the manifest. Will be 0 for v1 manifest lists.
MinSequenceNum() int64
// KeyMetadata returns implementation-specific key metadata for encryption
// if it exists in the manifest list.
KeyMetadata() []byte
// Partitions returns a list of field summaries for each partition
// field in the spec. Each field in the list corresponds to a field in
// the manifest file's partition spec.
Partitions() []FieldSummary
// HasAddedFiles returns true if AddedDataFiles > 0 or if it was null.
HasAddedFiles() bool
// HasExistingFiles returns true if ExistingDataFiles > 0 or if it was null.
HasExistingFiles() bool
// FetchEntries reads the manifest list file to fetch the list of
// manifest entries using the provided file system IO interface.
// If discardDeleted is true, entries for files containing deleted rows
// will be skipped.
FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error)
// // WriteEntries writes a list of manifest entries to a provided
// // io.Writer. The version of the manifest file is used to determine the
// // schema to use for writing the entries.
// WriteEntries(out io.Writer, entries []ManifestEntry) error
setVersion(int)
}
type fallbackManifest[T any] interface {
ManifestFile
toFile() *manifestFile
*T
}
func decodeManifestsWithFallback[P fallbackManifest[T], T any](dec *ocf.Decoder) ([]ManifestFile, error) {
results := make([]ManifestFile, 0)
for dec.HasNext() {
tmp := P(new(T))
if err := dec.Decode(tmp); err != nil {
return nil, err
}
results = append(results, tmp.toFile())
}
return results, dec.Error()
}
func decodeManifests[I interface {
ManifestFile
*T
}, T any](dec *ocf.Decoder, version int) ([]ManifestFile, error) {
results := make([]ManifestFile, 0)
for dec.HasNext() {
tmp := I(new(T))
if err := dec.Decode(tmp); err != nil {
return nil, err
}
tmp.setVersion(version)
results = append(results, tmp)
}
return results, dec.Error()
}
// ManifestReader reads the metadata and data from an avro manifest file.
// This type is not thread-safe; its methods should not be called from
// multiple goroutines.
type ManifestReader struct {
dec *ocf.Decoder
file ManifestFile
formatVersion int
isFallback bool
content ManifestContent
fieldNameToID map[string]int
fieldIDToType map[int]avro.LogicalType
// The rest are lazily populated, on demand. Most readers
// will likely only try to load the entries.
schema Schema
schemaLoaded bool
partitionSpec PartitionSpec
partitionSpecLoaded bool
}
// NewManifestReader returns a value that can read the contents of an avro manifest
// file. If the caller is interested in the manifest entries in the file, it must call
// [ManifestReader.Entries] before closing the provided reader.
func NewManifestReader(file ManifestFile, in io.Reader) (*ManifestReader, error) {
dec, err := ocf.NewDecoder(in, ocf.WithDecoderSchemaCache(&avro.SchemaCache{}))
if err != nil {
return nil, err
}
metadata := dec.Metadata()
sc := dec.Schema()
formatVersion, err := strconv.Atoi(string(metadata["format-version"]))
if err != nil {
return nil, fmt.Errorf("manifest file's 'format-version' metadata is invalid: %w", err)
}
if formatVersion != file.Version() {
return nil, fmt.Errorf("manifest file's 'format-version' metadata indicates version %d, but entry from manifest list indicates version %d",
formatVersion, file.Version())
}
var content ManifestContent
switch contentStr := string(metadata["content"]); contentStr {
case "data":
content = ManifestContentData
case "deletes":
content = ManifestContentDeletes
default:
return nil, fmt.Errorf("manifest file's 'content' metadata is invalid, should be \"data\" or \"deletes\" but instead is %q",
contentStr)
}
if content != file.ManifestContent() {
return nil, fmt.Errorf("manifest file's 'content' metadata indicates %q, but entry from manifest list indicates %q",
content.String(), file.ManifestContent().String())
}
isFallback := false
if formatVersion == 1 {
for _, f := range sc.(*avro.RecordSchema).Fields() {
if f.Name() == "snapshot_id" {
if f.Type().Type() != avro.Union {
isFallback = true
}
break
}
}
}
fieldNameToID, fieldIDToType := getFieldIDMap(sc)
return &ManifestReader{
dec: dec,
file: file,
formatVersion: formatVersion,
isFallback: isFallback,
content: content,
fieldNameToID: fieldNameToID,
fieldIDToType: fieldIDToType,
}, nil
}
// Version returns the file's format version.
func (c *ManifestReader) Version() int {
return c.formatVersion
}
// ManifestContent returns the type of content in the manifest file.
func (c *ManifestReader) ManifestContent() ManifestContent {
return c.content
}
// SchemaID returns the schema ID encoded in the avro file's metadata.
func (c *ManifestReader) SchemaID() (int, error) {
id, err := strconv.Atoi(string(c.dec.Metadata()["schema-id"]))
if err != nil {
return 0, fmt.Errorf("manifest file's 'schema-id' metadata is invalid: %w", err)
}
return id, nil
}
// Schema returns the schema encoded in the avro file's metadata.
func (c *ManifestReader) Schema() (*Schema, error) {
if !c.schemaLoaded {
schemaID, err := c.SchemaID()
if err != nil {
return nil, err
}
if err := json.Unmarshal(c.dec.Metadata()["schema"], &c.schema); err != nil {
return nil, fmt.Errorf("manifest file's 'schema' metadata is invalid: %w", err)
}
c.schema.ID = schemaID
c.schemaLoaded = true
}
return &c.schema, nil
}
// PartitionSpecID returns the partition spec ID encoded in the avro file's metadata.
func (c *ManifestReader) PartitionSpecID() (int, error) {
id, err := strconv.Atoi(string(c.dec.Metadata()["partition-spec-id"]))
if err != nil {
return 0, fmt.Errorf("manifest file's 'partition-spec-id' metadata is invalid: %w", err)
}
if id != int(c.file.PartitionSpecID()) {
return 0, fmt.Errorf("manifest file's 'partition-spec-id' metadata indicates %d, but entry from manifest list indicates %d",
id, c.file.PartitionSpecID())
}
return id, nil
}
// PartitionSpec returns the partition spec encoded in the avro file's metadata.
func (c *ManifestReader) PartitionSpec() (*PartitionSpec, error) {
if !c.partitionSpecLoaded {
partitionSpecID, err := c.PartitionSpecID()
if err != nil {
return nil, err
}
if err := json.Unmarshal(c.dec.Metadata()["partition-spec"], &c.partitionSpec.fields); err != nil {
return nil, fmt.Errorf("manifest file's 'partition-spec' metadata is invalid: %w", err)
}
c.partitionSpec.id = partitionSpecID
c.partitionSpec.initialize()
c.partitionSpecLoaded = true
}
return &c.partitionSpec, nil
}
// ReadEntry reads the next manifest entry in the avro file's data.
func (c *ManifestReader) ReadEntry() (ManifestEntry, error) {
if err := c.dec.Error(); err != nil {
return nil, err
}
if !c.dec.HasNext() {
return nil, io.EOF
}
var tmp ManifestEntry
if c.isFallback {
tmp = &fallbackManifestEntry{
manifestEntry: manifestEntry{Data: &dataFile{}},
}
} else {
tmp = &manifestEntry{Data: &dataFile{}}
}
if err := c.dec.Decode(tmp); err != nil {
return nil, err
}
if c.isFallback {
tmp = tmp.(*fallbackManifestEntry).toEntry()
}
tmp.inherit(c.file)
if fieldToIDMap, ok := tmp.DataFile().(hasFieldToIDMap); ok {
fieldToIDMap.setFieldNameToIDMap(c.fieldNameToID)
fieldToIDMap.setFieldIDToLogicalTypeMap(c.fieldIDToType)
}
return tmp, nil
}
// ReadManifest reads in an avro list file and returns a slice
// of manifest entries or an error if one is encountered. If discardDeleted
// is true, the returned slice omits entries whose status is "deleted".
func ReadManifest(m ManifestFile, f io.Reader, discardDeleted bool) ([]ManifestEntry, error) {
manifestReader, err := NewManifestReader(m, f)
if err != nil {
return nil, err
}
var results []ManifestEntry
for {
entry, err := manifestReader.ReadEntry()
if err != nil {
if errors.Is(err, io.EOF) {
return results, nil
}
return results, err
}
if discardDeleted && entry.Status() == EntryStatusDELETED {
continue
}
results = append(results, entry)
}
}
// ReadManifestList reads in an avro manifest list file and returns a slice
// of manifest files or an error if one is encountered.
func ReadManifestList(in io.Reader) ([]ManifestFile, error) {
dec, err := ocf.NewDecoder(in, ocf.WithDecoderSchemaCache(&avro.SchemaCache{}))
if err != nil {
return nil, err
}
sc, err := avro.ParseBytes(dec.Metadata()["avro.schema"])
if err != nil {
return nil, err
}
version, err := strconv.Atoi(string(dec.Metadata()["format-version"]))
if err != nil {
return nil, fmt.Errorf("invalid format-version: %w", err)
}
if version == 1 {
for _, f := range sc.(*avro.RecordSchema).Fields() {
if f.Name() == "added_snapshot_id" {
if f.Type().Type() == avro.Union {
return decodeManifestsWithFallback[*fallbackManifestFileV1](dec)
}
break
}
}
}
switch version {
case 1:
return decodeManifestsWithFallback[*manifestFileV1](dec)
default:
return decodeManifests[*manifestFile](dec, version)
}
}
type writerImpl interface {
content() ManifestContent
prepareEntry(*manifestEntry, int64) (ManifestEntry, error)
}
type v1writerImpl struct{}
func (v1writerImpl) content() ManifestContent { return ManifestContentData }
func (v1writerImpl) prepareEntry(entry *manifestEntry, sn int64) (ManifestEntry, error) {
if entry.Snapshot != nil && *entry.Snapshot != sn {
if entry.EntryStatus != EntryStatusEXISTING {
return nil, fmt.Errorf("mismatched snapshot id for entry: %d vs %d", *entry.Snapshot, sn)
}
sn = *entry.Snapshot
}
return &fallbackManifestEntry{
manifestEntry: *entry,
Snapshot: sn,
}, nil
}
type v2writerImpl struct{}
func (v2writerImpl) content() ManifestContent { return ManifestContentData }
func (v2writerImpl) prepareEntry(entry *manifestEntry, snapshotID int64) (ManifestEntry, error) {
if entry.SeqNum == nil {
if entry.Snapshot != nil && *entry.Snapshot != snapshotID {
return nil, fmt.Errorf("found unassigned sequence number for entry from snapshot: %d", entry.SnapshotID())
}
if entry.EntryStatus != EntryStatusADDED {
return nil, errors.New("only entries with status ADDED can be missing a sequence number")
}
}
return entry, nil
}
type fieldStats interface {
toSummary() FieldSummary
update(value any) error
}
type partitionFieldStats[T LiteralType] struct {
containsNull bool
containsNan bool
min *T
max *T
cmp Comparator[T]
}
func newPartitionFieldStat(typ PrimitiveType) (fieldStats, error) {
switch typ.(type) {
case Int32Type:
return &partitionFieldStats[int32]{cmp: getComparator[int32]()}, nil
case Int64Type:
return &partitionFieldStats[int64]{cmp: getComparator[int64]()}, nil
case Float32Type:
return &partitionFieldStats[float32]{cmp: getComparator[float32]()}, nil
case Float64Type:
return &partitionFieldStats[float64]{cmp: getComparator[float64]()}, nil
case StringType:
return &partitionFieldStats[string]{cmp: getComparator[string]()}, nil
case DateType:
return &partitionFieldStats[Date]{cmp: getComparator[Date]()}, nil
case TimeType:
return &partitionFieldStats[Time]{cmp: getComparator[Time]()}, nil
case TimestampType:
return &partitionFieldStats[Timestamp]{cmp: getComparator[Timestamp]()}, nil
case UUIDType:
return &partitionFieldStats[uuid.UUID]{cmp: getComparator[uuid.UUID]()}, nil
case BinaryType:
return &partitionFieldStats[[]byte]{cmp: getComparator[[]byte]()}, nil
case FixedType:
return &partitionFieldStats[[]byte]{cmp: getComparator[[]byte]()}, nil
case DecimalType:
return &partitionFieldStats[Decimal]{cmp: getComparator[Decimal]()}, nil
default:
return nil, fmt.Errorf("expected primitive type for partition type: %s", typ)
}
}
func (p *partitionFieldStats[T]) toSummary() FieldSummary {
var (
lowerBound *[]byte
upperBound *[]byte
lit Literal
)
if p.min != nil {
lit = NewLiteral(*p.min)
lb, _ := lit.MarshalBinary()
lowerBound = &lb
}
if p.max != nil {
lit = NewLiteral(*p.max)
ub, _ := lit.MarshalBinary()
upperBound = &ub
}
return FieldSummary{
ContainsNull: p.containsNull,
ContainsNaN: &p.containsNan,
LowerBound: lowerBound,
UpperBound: upperBound,
}
}
func (p *partitionFieldStats[T]) update(value any) (err error) {
if value == nil {
p.containsNull = true
return
}
var actualVal T
v := reflect.ValueOf(value)
if !v.CanConvert(reflect.TypeOf(actualVal)) {
return fmt.Errorf("expected type %T, got %T", actualVal, value)
}
actualVal = v.Convert(reflect.TypeOf(actualVal)).Interface().(T)
switch f := any(actualVal).(type) {
case float32:
if math.IsNaN(float64(f)) {
p.containsNan = true
return
}
case float64:
if math.IsNaN(f) {
p.containsNan = true
return
}
}
if p.min == nil {
p.min = &actualVal
p.max = &actualVal
} else {
if p.cmp(actualVal, *p.min) < 0 {
p.min = &actualVal
}
if p.cmp(actualVal, *p.max) > 0 {
p.max = &actualVal
}
}
return nil
}
func constructPartitionSummaries(spec PartitionSpec, schema *Schema, partitions []map[int]any) ([]FieldSummary, error) {
partType := spec.PartitionType(schema)
fieldStats := make([]fieldStats, len(partType.FieldList))
var err error
for i, field := range partType.FieldList {
pt, ok := field.Type.(PrimitiveType)
if !ok {
return nil, fmt.Errorf("expected primitive type for partition field, got %s", field.Type)
}
fieldStats[i], err = newPartitionFieldStat(pt)
if err != nil {
return nil, fmt.Errorf("error constructing field stats for partition %d: %s: %s", i, field.Name, err)
}
}
for _, part := range partitions {
for i, field := range partType.FieldList {
fieldStats[i].update(part[field.ID])
}
}
summaries := make([]FieldSummary, len(fieldStats))
for i, stat := range fieldStats {
summaries[i] = stat.toSummary()
}
return summaries, nil
}
type ManifestWriter struct {
closed bool
version int
impl writerImpl
output io.Writer
writer *ocf.Encoder
spec PartitionSpec
schema *Schema
snapshotID int64
addedFiles int32
addedRows int64
existingFiles int32
existingRows int64
deletedFiles int32
deletedRows int64
partitions []map[int]any
minSeqNum int64
reusedEntry manifestEntry
}
func NewManifestWriter(version int, out io.Writer, spec PartitionSpec, schema *Schema, snapshotID int64) (*ManifestWriter, error) {
var impl writerImpl
switch version {
case 1:
impl = v1writerImpl{}
case 2:
impl = v2writerImpl{}
default:
return nil, fmt.Errorf("unsupported manifest version: %d", version)
}
sc, err := partitionTypeToAvroSchema(spec.PartitionType(schema))
if err != nil {
return nil, err
}
fileSchema, err := internal.NewManifestEntrySchema(sc, version)
if err != nil {
return nil, err
}
w := &ManifestWriter{
impl: impl,
version: version,
output: out,
spec: spec,
schema: schema,
snapshotID: snapshotID,
minSeqNum: -1,
partitions: make([]map[int]any, 0),
}
md, err := w.meta()
if err != nil {
return nil, err
}
enc, err := ocf.NewEncoderWithSchema(fileSchema, out,
ocf.WithSchemaMarshaler(ocf.FullSchemaMarshaler),
ocf.WithEncoderSchemaCache(&avro.SchemaCache{}),
ocf.WithMetadata(md),
ocf.WithCodec(ocf.Deflate))
w.writer = enc
return w, err
}
func (w *ManifestWriter) Close() error {
if w.closed {
return nil
}
if w.addedFiles+w.existingFiles+w.deletedFiles == 0 {
return errors.New("empty manifest file has been written")
}
w.closed = true
return w.writer.Close()
}
func (w *ManifestWriter) ToManifestFile(location string, length int64) (ManifestFile, error) {
if err := w.Close(); err != nil {
return nil, err
}
if w.minSeqNum == initialSequenceNumber {
w.minSeqNum = -1
}
partitions, err := constructPartitionSummaries(w.spec, w.schema, w.partitions)
if err != nil {
return nil, err
}
return &manifestFile{
version: w.version,
Path: location,
Len: length,
SpecID: int32(w.spec.id),
Content: ManifestContentData,
SeqNumber: -1,
MinSeqNumber: w.minSeqNum,
AddedSnapshotID: w.snapshotID,
AddedFilesCount: w.addedFiles,
ExistingFilesCount: w.existingFiles,
DeletedFilesCount: w.deletedFiles,
AddedRowsCount: w.addedRows,
ExistingRowsCount: w.existingRows,
DeletedRowsCount: w.deletedRows,
PartitionList: &partitions,
Key: nil,
}, nil
}
func (w *ManifestWriter) meta() (map[string][]byte, error) {
schemaJson, err := json.Marshal(w.schema)
if err != nil {
return nil, err
}
specFields := w.spec.fields
if specFields == nil {
specFields = []PartitionField{}
}
specFieldsJson, err := json.Marshal(specFields)
if err != nil {
return nil, err
}
return map[string][]byte{
"schema": schemaJson,
"schema-id": []byte(strconv.Itoa(w.schema.ID)),
"partition-spec": specFieldsJson,
"partition-spec-id": []byte(strconv.Itoa(w.spec.ID())),
"format-version": []byte(strconv.Itoa(w.version)),
"content": []byte(w.impl.content().String()),
}, nil
}
func (w *ManifestWriter) addEntry(entry *manifestEntry) error {
if w.closed {
return errors.New("cannot add entry to closed manifest writer")
}
switch entry.Status() {
case EntryStatusADDED:
w.addedFiles++
w.addedRows += entry.DataFile().Count()
case EntryStatusEXISTING:
w.existingFiles++
w.existingRows += entry.DataFile().Count()
case EntryStatusDELETED:
w.deletedFiles++
w.deletedRows += entry.DataFile().Count()
default:
return fmt.Errorf("unknown entry status: %v", entry.Status())
}
w.partitions = append(w.partitions, entry.DataFile().Partition())
if (entry.Status() == EntryStatusADDED || entry.Status() == EntryStatusEXISTING) &&
entry.SequenceNum() > 0 && (w.minSeqNum < 0 || entry.SequenceNum() < w.minSeqNum) {
w.minSeqNum = entry.SequenceNum()
}
toEncode, err := w.impl.prepareEntry(entry, w.snapshotID)
if err != nil {
return err
}
return w.writer.Encode(toEncode)
}
func (w *ManifestWriter) Add(entry ManifestEntry) error {
w.reusedEntry.wrap(EntryStatusADDED, &w.snapshotID, entry.(*manifestEntry).SeqNum, nil, entry.DataFile())
return w.addEntry(&w.reusedEntry)
}
func (w *ManifestWriter) Delete(entry ManifestEntry) error {
w.reusedEntry.wrap(EntryStatusDELETED, &w.snapshotID, entry.(*manifestEntry).SeqNum, entry.FileSequenceNum(), entry.DataFile())
return w.addEntry(&w.reusedEntry)
}
func (w *ManifestWriter) Existing(entry ManifestEntry) error {
snapshotID := entry.SnapshotID()
w.reusedEntry.wrap(EntryStatusEXISTING, &snapshotID, entry.(*manifestEntry).SeqNum, entry.FileSequenceNum(), entry.DataFile())
return w.addEntry(&w.reusedEntry)
}
type ManifestListWriter struct {
version int
out io.Writer
commitSnapshotID int64
sequenceNumber int64
writer *ocf.Encoder
}
func NewManifestListWriterV1(out io.Writer, snapshotID int64, parentSnapshot *int64) (*ManifestListWriter, error) {
m := &ManifestListWriter{
version: 1,
out: out,
commitSnapshotID: snapshotID,
sequenceNumber: -1,
}
parentSnapshotStr := "null"
if parentSnapshot != nil {
parentSnapshotStr = strconv.Itoa(int(*parentSnapshot))
}
return m, m.init(map[string][]byte{
"format-version": []byte(strconv.Itoa(m.version)),
"snapshot-id": []byte(strconv.Itoa(int(snapshotID))),
"parent-snapshot-id": []byte(parentSnapshotStr),
})
}
func NewManifestListWriterV2(out io.Writer, snapshotID, sequenceNumber int64, parentSnapshot *int64) (*ManifestListWriter, error) {
m := &ManifestListWriter{
version: 2,
out: out,
commitSnapshotID: snapshotID,
sequenceNumber: sequenceNumber,
}
parentSnapshotStr := "null"
if parentSnapshot != nil {
parentSnapshotStr = strconv.Itoa(int(*parentSnapshot))
}
return m, m.init(map[string][]byte{
"format-version": []byte(strconv.Itoa(m.version)),
"snapshot-id": []byte(strconv.Itoa(int(snapshotID))),
"sequence-number": []byte(strconv.Itoa(int(sequenceNumber))),
"parent-snapshot-id": []byte(parentSnapshotStr),
})
}
func (m *ManifestListWriter) init(meta map[string][]byte) error {
fileSchema, err := internal.NewManifestFileSchema(m.version)
if err != nil {
return err
}
enc, err := ocf.NewEncoderWithSchema(fileSchema, m.out,
ocf.WithSchemaMarshaler(ocf.FullSchemaMarshaler),
ocf.WithEncoderSchemaCache(&avro.SchemaCache{}),
ocf.WithMetadata(meta),
ocf.WithCodec(ocf.Deflate))
if err != nil {
return err
}
m.writer = enc
return nil
}
func (m *ManifestListWriter) Close() error {
if m.writer == nil {
return nil
}
return m.writer.Close()
}
func (m *ManifestListWriter) AddManifests(files []ManifestFile) error {
if len(files) == 0 {
return nil
}
switch m.version {
case 1:
if slices.ContainsFunc(files, func(f ManifestFile) bool {
return f.Version() != 1
}) {
return fmt.Errorf("%w: ManifestListWriter only supports version 1 manifest files", ErrInvalidArgument)
}
var tmp manifestFileV1
for _, file := range files {
file.(*manifestFile).toV1(&tmp)
if err := m.writer.Encode(&tmp); err != nil {
return err
}
}
case 2:
for _, file := range files {
if file.Version() != 2 {
return fmt.Errorf("%w: ManifestListWriter only supports version 2 manifest files", ErrInvalidArgument)
}
wrapped := *(file.(*manifestFile))
if wrapped.SeqNumber == -1 {
// if the sequence number is being assigned here,
// then the manifest must be created by the current
// operation.
// to validate this, check the snapshot id matches the current commmit
if m.commitSnapshotID != wrapped.AddedSnapshotID {
return fmt.Errorf("found unassigned sequence number for a manifest from snapshot %d != %d",
m.commitSnapshotID, wrapped.AddedSnapshotID)
}
wrapped.SeqNumber = m.sequenceNumber
}
if wrapped.MinSeqNumber == -1 {
if m.commitSnapshotID != wrapped.AddedSnapshotID {
return fmt.Errorf("found unassigned sequence number for a manifest from snapshot: %d", wrapped.AddedSnapshotID)
}
// if the min sequence number is not determined, then there was no assigned sequence number
// for any file written to the wrapped manifest. replace the unassigned sequence number with
// the one for this commit
wrapped.MinSeqNumber = m.sequenceNumber
}
if err := m.writer.Encode(wrapped); err != nil {
return err
}
}
default:
return fmt.Errorf("unsupported manifest version: %d", m.version)
}
return nil
}
// WriteManifestList writes a list of manifest files to an avro file.
func WriteManifestList(version int, out io.Writer, snapshotID int64, parentSnapshotID, sequenceNumber *int64, files []ManifestFile) error {
var (
writer *ManifestListWriter
err error
)
switch version {
case 1:
writer, err = NewManifestListWriterV1(out, snapshotID, parentSnapshotID)
case 2:
if sequenceNumber == nil {
return errors.New("sequence number is required for V2 tables")
}
writer, err = NewManifestListWriterV2(out, snapshotID, *sequenceNumber, parentSnapshotID)
default:
return fmt.Errorf("unsupported manifest version: %d", version)
}
if err != nil {
return err
}
if err = writer.AddManifests(files); err != nil {
return err
}
return writer.Close()
}
func WriteManifest(
filename string,
out io.Writer,
version int,
spec PartitionSpec,
schema *Schema,
snapshotID int64,
entries []ManifestEntry,
) (ManifestFile, error) {
cnt := &internal.CountingWriter{W: out}
w, err := NewManifestWriter(version, cnt, spec, schema, snapshotID)
if err != nil {
return nil, err
}
for _, entry := range entries {
if err := w.addEntry(entry.(*manifestEntry)); err != nil {
return nil, err
}
}
// flush the writer to ensure cnt.Count is accurate
if err := w.Close(); err != nil {
return nil, err
}
return w.ToManifestFile(filename, cnt.Count)
}
// ManifestEntryStatus defines constants for the entry status of
// existing, added or deleted.
type ManifestEntryStatus int8
const (
EntryStatusEXISTING ManifestEntryStatus = 0
EntryStatusADDED ManifestEntryStatus = 1
EntryStatusDELETED ManifestEntryStatus = 2
)
// ManifestEntryContent defines constants for the type of file contents
// in the file entries. Data, Position based deletes and equality based
// deletes.
type ManifestEntryContent int8
const (
EntryContentData ManifestEntryContent = 0
EntryContentPosDeletes ManifestEntryContent = 1
EntryContentEqDeletes ManifestEntryContent = 2
)
func (m ManifestEntryContent) String() string {
switch m {
case EntryContentData:
return "Data"
case EntryContentPosDeletes:
return "Positional_Deletes"
case EntryContentEqDeletes:
return "Equality_Deletes"
default:
return "UNKNOWN"
}
}
// FileFormat defines constants for the format of data files.
type FileFormat string
const (
AvroFile FileFormat = "AVRO"
OrcFile FileFormat = "ORC"
ParquetFile FileFormat = "PARQUET"
)
type colMap[K, V any] struct {
Key K `avro:"key"`
Value V `avro:"value"`
}
func avroColMapToMap[K comparable, V any](c *[]colMap[K, V]) map[K]V {
if c == nil {
return nil
}
out := make(map[K]V)
for _, data := range *c {
out[data.Key] = data.Value
}
return out
}
func mapToAvroColMap[K comparable, V any](m map[K]V) *[]colMap[K, V] {
if m == nil {
return nil
}
out := make([]colMap[K, V], 0, len(m))
for k, v := range m {
out = append(out, colMap[K, V]{Key: k, Value: v})
}
return &out
}
func avroPartitionData(input map[int]any, logicalTypes map[int]avro.LogicalType) map[int]any {
out := make(map[int]any)
for k, v := range input {
if logical, ok := logicalTypes[k]; ok {
switch logical {
case avro.Date:
out[k] = Date(v.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 24).Seconds()))
case avro.TimeMillis:
out[k] = Time(v.(time.Duration).Milliseconds())
case avro.TimeMicros:
out[k] = Time(v.(time.Duration).Microseconds())
case avro.TimestampMillis:
out[k] = Timestamp(v.(time.Time).UTC().UnixMilli())
case avro.TimestampMicros:
out[k] = Timestamp(v.(time.Time).UTC().UnixMicro())
default:
out[k] = v
}
continue
}
out[k] = v
}
return out
}
type dataFile struct {
Content ManifestEntryContent `avro:"content"`
Path string `avro:"file_path"`
Format FileFormat `avro:"file_format"`
PartitionData map[string]any `avro:"partition"`
RecordCount int64 `avro:"record_count"`
FileSize int64 `avro:"file_size_in_bytes"`
BlockSizeInBytes int64 `avro:"block_size_in_bytes"`
ColSizes *[]colMap[int, int64] `avro:"column_sizes"`
ValCounts *[]colMap[int, int64] `avro:"value_counts"`
NullCounts *[]colMap[int, int64] `avro:"null_value_counts"`
NaNCounts *[]colMap[int, int64] `avro:"nan_value_counts"`
DistinctCounts *[]colMap[int, int64] `avro:"distinct_counts"`
LowerBounds *[]colMap[int, []byte] `avro:"lower_bounds"`
UpperBounds *[]colMap[int, []byte] `avro:"upper_bounds"`
Key *[]byte `avro:"key_metadata"`
Splits *[]int64 `avro:"split_offsets"`
EqualityIDs *[]int `avro:"equality_ids"`
SortOrder *int `avro:"sort_order_id"`
colSizeMap map[int]int64
valCntMap map[int]int64
nullCntMap map[int]int64
nanCntMap map[int]int64
distinctCntMap map[int]int64
lowerBoundMap map[int][]byte
upperBoundMap map[int][]byte
// used for partition retrieval
fieldNameToID map[string]int
fieldIDToLogicalType map[int]avro.LogicalType
fieldIDToPartitionData map[int]any
specID int32
initMaps sync.Once
}
func (d *dataFile) initializeMapData() {
d.initMaps.Do(func() {
d.colSizeMap = avroColMapToMap(d.ColSizes)
d.valCntMap = avroColMapToMap(d.ValCounts)
d.nullCntMap = avroColMapToMap(d.NullCounts)
d.nanCntMap = avroColMapToMap(d.NaNCounts)
d.distinctCntMap = avroColMapToMap(d.DistinctCounts)
d.lowerBoundMap = avroColMapToMap(d.LowerBounds)
d.upperBoundMap = avroColMapToMap(d.UpperBounds)
// Populate fieldIDToPartition map if dataFile read from manifest file
if len(d.fieldIDToPartitionData) < len(d.PartitionData) {
d.fieldIDToPartitionData = make(map[int]any, len(d.PartitionData))
for k, v := range d.PartitionData {
if id, ok := d.fieldNameToID[k]; ok {
d.fieldIDToPartitionData[id] = v
}
}
}
d.fieldIDToPartitionData = avroPartitionData(d.fieldIDToPartitionData, d.fieldIDToLogicalType)
})
}
func (d *dataFile) setFieldNameToIDMap(m map[string]int) { d.fieldNameToID = m }
func (d *dataFile) setFieldIDToLogicalTypeMap(m map[int]avro.LogicalType) {
d.fieldIDToLogicalType = m
}
func (d *dataFile) ContentType() ManifestEntryContent { return d.Content }
func (d *dataFile) FilePath() string { return d.Path }
func (d *dataFile) FileFormat() FileFormat { return d.Format }
// Partition returns the partition data as a map of partition field ID to value.
func (d *dataFile) Partition() map[int]any {
d.initializeMapData()
return d.fieldIDToPartitionData
}
func (d *dataFile) Count() int64 { return d.RecordCount }
func (d *dataFile) FileSizeBytes() int64 { return d.FileSize }
func (d *dataFile) SpecID() int32 { return d.specID }
func (d *dataFile) ColumnSizes() map[int]int64 {
d.initializeMapData()
return d.colSizeMap
}
func (d *dataFile) ValueCounts() map[int]int64 {
d.initializeMapData()
return d.valCntMap
}
func (d *dataFile) NullValueCounts() map[int]int64 {
d.initializeMapData()
return d.nullCntMap
}
func (d *dataFile) NaNValueCounts() map[int]int64 {
d.initializeMapData()
return d.nanCntMap
}
func (d *dataFile) DistinctValueCounts() map[int]int64 {
d.initializeMapData()
return d.distinctCntMap
}
func (d *dataFile) LowerBoundValues() map[int][]byte {
d.initializeMapData()
return d.lowerBoundMap
}
func (d *dataFile) UpperBoundValues() map[int][]byte {
d.initializeMapData()
return d.upperBoundMap
}
func (d *dataFile) KeyMetadata() []byte {
if d.Key == nil {
return nil
}
return *d.Key
}
func (d *dataFile) SplitOffsets() []int64 {
if d.Splits == nil {
return nil
}
return *d.Splits
}
func (d *dataFile) EqualityFieldIDs() []int {
if d.EqualityIDs == nil {
return nil
}
return *d.EqualityIDs
}
func (d *dataFile) SortOrderID() *int { return d.SortOrder }
type ManifestEntryBuilder struct {
m *manifestEntry
}
func NewManifestEntryBuilder(status ManifestEntryStatus, snapshotID *int64, data DataFile) *ManifestEntryBuilder {
return &ManifestEntryBuilder{
m: &manifestEntry{
EntryStatus: status,
Snapshot: snapshotID,
Data: data,
},
}
}
func (b *ManifestEntryBuilder) SequenceNum(num int64) *ManifestEntryBuilder {
b.m.SeqNum = &num
return b
}
func (b *ManifestEntryBuilder) FileSequenceNum(num int64) *ManifestEntryBuilder {
b.m.FileSeqNum = &num
return b
}
func (b *ManifestEntryBuilder) Build() ManifestEntry {
return b.m
}
type manifestEntry struct {
EntryStatus ManifestEntryStatus `avro:"status"`
Snapshot *int64 `avro:"snapshot_id"`
SeqNum *int64 `avro:"sequence_number"`
FileSeqNum *int64 `avro:"file_sequence_number"`
Data DataFile `avro:"data_file"`
}
func (m *manifestEntry) Status() ManifestEntryStatus { return m.EntryStatus }
func (m *manifestEntry) SnapshotID() int64 {
if m.Snapshot == nil {
return -1
}
return *m.Snapshot
}
func (m *manifestEntry) SequenceNum() int64 {
if m.SeqNum == nil {
return -1
}
return *m.SeqNum
}
func (m *manifestEntry) FileSequenceNum() *int64 {
return m.FileSeqNum
}
func (m *manifestEntry) DataFile() DataFile { return m.Data }
func (m *manifestEntry) inherit(manifest ManifestFile) {
if m.Snapshot == nil {
snap := manifest.SnapshotID()
m.Snapshot = &snap
}
manifestSequenceNum := manifest.SequenceNum()
if manifestSequenceNum != -1 {
if m.SeqNum == nil && (manifestSequenceNum == initialSequenceNumber || m.EntryStatus == EntryStatusADDED) {
m.SeqNum = &manifestSequenceNum
}
if m.FileSeqNum == nil && (manifestSequenceNum == initialSequenceNumber || m.EntryStatus == EntryStatusADDED) {
m.FileSeqNum = &manifestSequenceNum
}
}
m.Data.(*dataFile).specID = manifest.PartitionSpecID()
}
func (m *manifestEntry) wrap(status ManifestEntryStatus, newSnapID, newSeq, newFileSeq *int64, data DataFile) ManifestEntry {
if newSeq != nil && *newSeq == -1 {
newSeq = nil
}
m.EntryStatus = status
m.Snapshot = newSnapID
m.SeqNum = newSeq
m.FileSeqNum = newFileSeq
m.Data = data
return m
}
type fallbackManifestEntry struct {
manifestEntry
Snapshot int64 `avro:"snapshot_id"`
}
func (f *fallbackManifestEntry) toEntry() *manifestEntry {
f.manifestEntry.Snapshot = &f.Snapshot
return &f.manifestEntry
}
func NewManifestEntry(status ManifestEntryStatus, snapshotID *int64, seqNum, fileSeqNum *int64, df DataFile) ManifestEntry {
return &manifestEntry{
EntryStatus: status,
Snapshot: snapshotID,
SeqNum: seqNum,
FileSeqNum: fileSeqNum,
Data: df,
}
}
// DataFileBuilder is a helper for building a data file struct which will
// conform to the DataFile interface.
type DataFileBuilder struct {
d *dataFile
}
// NewDataFileBuilder is passed all of the required fields and then allows
// all of the optional fields to be set by calling the corresponding methods
// before calling [DataFileBuilder.Build] to construct the object.
func NewDataFileBuilder(
spec PartitionSpec,
content ManifestEntryContent,
path string,
format FileFormat,
fieldIDToPartitionData map[int]any,
recordCount int64,
fileSize int64,
) (*DataFileBuilder, error) {
if content != EntryContentData && content != EntryContentPosDeletes && content != EntryContentEqDeletes {
return nil, fmt.Errorf(
"%w: content must be one of %s, %s, or %s",
ErrInvalidArgument, EntryContentData, EntryContentPosDeletes, EntryContentEqDeletes,
)
}
if path == "" {
return nil, fmt.Errorf("%w: path cannot be empty", ErrInvalidArgument)
}
if format != AvroFile && format != OrcFile && format != ParquetFile {
return nil, fmt.Errorf(
"%w: format must be one of %s, %s, or %s",
ErrInvalidArgument, AvroFile, OrcFile, ParquetFile,
)
}
if recordCount <= 0 {
return nil, fmt.Errorf("%w: record count must be greater than 0", ErrInvalidArgument)
}
if fileSize <= 0 {
return nil, fmt.Errorf("%w: file size must be greater than 0", ErrInvalidArgument)
}
partitionData := make(map[string]any)
fieldNameToID := make(map[string]int)
for _, p := range spec.fields {
if pData, ok := fieldIDToPartitionData[p.FieldID]; ok {
partitionData[p.Name] = pData
fieldNameToID[p.Name] = p.FieldID
}
}
return &DataFileBuilder{
d: &dataFile{
Content: content,
Path: path,
Format: format,
PartitionData: partitionData,
RecordCount: recordCount,
FileSize: fileSize,
specID: int32(spec.id),
fieldIDToPartitionData: fieldIDToPartitionData,
fieldNameToID: fieldNameToID,
},
}, nil
}
// BlockSizeInBytes sets the block size in bytes for the data file. Deprecated in v2.
func (b *DataFileBuilder) BlockSizeInBytes(size int64) *DataFileBuilder {
b.d.BlockSizeInBytes = size
return b
}
// ColumnSizes sets the column sizes for the data file.
func (b *DataFileBuilder) ColumnSizes(sizes map[int]int64) *DataFileBuilder {
b.d.ColSizes = mapToAvroColMap(sizes)
return b
}
// ValueCounts sets the value counts for the data file.
func (b *DataFileBuilder) ValueCounts(counts map[int]int64) *DataFileBuilder {
b.d.ValCounts = mapToAvroColMap(counts)
return b
}
// NullValueCounts sets the null value counts for the data file.
func (b *DataFileBuilder) NullValueCounts(counts map[int]int64) *DataFileBuilder {
b.d.NullCounts = mapToAvroColMap(counts)
return b
}
// NaNValueCounts sets the NaN value counts for the data file.
func (b *DataFileBuilder) NaNValueCounts(counts map[int]int64) *DataFileBuilder {
b.d.NaNCounts = mapToAvroColMap(counts)
return b
}
// DistinctValueCounts sets the distinct value counts for the data file.
func (b *DataFileBuilder) DistinctValueCounts(counts map[int]int64) *DataFileBuilder {
b.d.DistinctCounts = mapToAvroColMap(counts)
return b
}
// LowerBoundValues sets the lower bound values for the data file.
func (b *DataFileBuilder) LowerBoundValues(bounds map[int][]byte) *DataFileBuilder {
b.d.LowerBounds = mapToAvroColMap(bounds)
return b
}
// UpperBoundValues sets the upper bound values for the data file.
func (b *DataFileBuilder) UpperBoundValues(bounds map[int][]byte) *DataFileBuilder {
b.d.UpperBounds = mapToAvroColMap(bounds)
return b
}
// KeyMetadata sets the key metadata for the data file.
func (b *DataFileBuilder) KeyMetadata(key []byte) *DataFileBuilder {
b.d.Key = &key
return b
}
// SplitOffsets sets the split offsets for the data file.
func (b *DataFileBuilder) SplitOffsets(offsets []int64) *DataFileBuilder {
b.d.Splits = &offsets
return b
}
// EqualityFieldIDs sets the equality field ids for the data file.
func (b *DataFileBuilder) EqualityFieldIDs(ids []int) *DataFileBuilder {
b.d.EqualityIDs = &ids
return b
}
// SortOrderID sets the sort order id for the data file.
func (b *DataFileBuilder) SortOrderID(id int) *DataFileBuilder {
b.d.SortOrder = &id
return b
}
func (b *DataFileBuilder) Build() DataFile {
return b.d
}
// DataFile is the interface for reading the information about a
// given data file indicated by an entry in a manifest list.
type DataFile interface {
// ContentType is the type of the content stored by the data file,
// either Data, Equality deletes, or Position deletes. All v1 files
// are Data files.
ContentType() ManifestEntryContent
// FilePath is the full URI for the file, complete with FS scheme.
FilePath() string
// FileFormat is the format of the data file, AVRO, Orc, or Parquet.
FileFormat() FileFormat
// Partition returns a mapping of field id to partition value for
// each of the partition spec's fields.
Partition() map[int]any
// PartitionFieldData returns a mapping of field id to partition value
// for each of the partition spec's fields.
Count() int64
// FileSizeBytes is the total file size in bytes.
FileSizeBytes() int64
// ColumnSizes is a mapping from column id to the total size on disk
// of all regions that store the column. Does not include bytes
// necessary to read other columns, like footers. Map will be nil for
// row-oriented formats (avro).
ColumnSizes() map[int]int64
// ValueCounts is a mapping from column id to the number of values
// in the column, including null and NaN values.
ValueCounts() map[int]int64
// NullValueCounts is a mapping from column id to the number of
// null values in the column.
NullValueCounts() map[int]int64
// NaNValueCounts is a mapping from column id to the number of NaN
// values in the column.
NaNValueCounts() map[int]int64
// DistictValueCounts is a mapping from column id to the number of
// distinct values in the column. Distinct counts must be derived
// using values in the file by counting or using sketches, but not
// using methods like merging existing distinct counts.
DistinctValueCounts() map[int]int64
// LowerBoundValues is a mapping from column id to the lower bounded
// value of the column, serialized as binary. Each value in the column
// must be less than or requal to all non-null, non-NaN values in the
// column for the file.
LowerBoundValues() map[int][]byte
// UpperBoundValues is a mapping from column id to the upper bounded
// value of the column, serialized as binary. Each value in the column
// must be greater than or equal to all non-null, non-NaN values in
// the column for the file.
UpperBoundValues() map[int][]byte
// KeyMetadata is implementation-specific key metadata for encryption.
KeyMetadata() []byte
// SplitOffsets are the split offsets for the data file. For example,
// all row group offsets in a Parquet file. Must be sorted ascending.
SplitOffsets() []int64
// EqualityFieldIDs are used to determine row equality in equality
// delete files. It is required when the content type is
// EntryContentEqDeletes.
EqualityFieldIDs() []int
// SortOrderID returns the id representing the sort order for this
// file, or nil if there is no sort order.
SortOrderID() *int
// SpecID returns the partition spec id for this data file, inherited
// from the manifest that the data file was read from
SpecID() int32
}
// ManifestEntry is an interface for both v1 and v2 manifest entries.
type ManifestEntry interface {
// Status returns the type of the file tracked by this entry.
// Deletes are informational only and not used in scans.
Status() ManifestEntryStatus
// SnapshotID is the id where the file was added, or deleted,
// if null it is inherited from the manifest list.
SnapshotID() int64
// SequenceNum returns the data sequence number of the file.
// If it was null and the status is EntryStatusADDED then it
// is inherited from the manifest list.
SequenceNum() int64
// FileSequenceNum returns the file sequence number indicating
// when the file was added. If it was null and the status is
// EntryStatusADDED then it is inherited from the manifest list.
FileSequenceNum() *int64
// DataFile provides the information about the data file indicated
// by this manifest entry.
DataFile() DataFile
inherit(manifest ManifestFile)
wrap(status ManifestEntryStatus, snapshotID, seqNum, fileSeqNum *int64, datafile DataFile) ManifestEntry
}
var PositionalDeleteSchema = NewSchema(0,
NestedField{ID: 2147483546, Type: PrimitiveTypes.String, Name: "file_path", Required: true},
NestedField{ID: 2147483545, Type: PrimitiveTypes.Int32, Name: "pos", Required: true},
)