blob: 9a0d8b7dfddb51faf019f6f66a6ed3fd9cde505f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package iceberg
import (
"io"
"sync"
"time"
iceio "github.com/apache/iceberg-go/io"
"github.com/hamba/avro/v2"
"github.com/hamba/avro/v2/ocf"
)
// ManifestContent indicates the type of data inside of the files
// described by a manifest. This will indicate whether the data files
// contain active data or deleted rows.
type ManifestContent int32
const (
ManifestContentData ManifestContent = 0
ManifestContentDeletes ManifestContent = 1
)
type FieldSummary struct {
ContainsNull bool `avro:"contains_null"`
ContainsNaN *bool `avro:"contains_nan"`
LowerBound *[]byte `avro:"lower_bound"`
UpperBound *[]byte `avro:"upper_bound"`
}
// ManifestV1Builder is a helper for building a V1 manifest file
// struct which will conform to the ManifestFile interface.
type ManifestV1Builder struct {
m *manifestFileV1
}
// NewManifestV1Builder is passed all of the required fields and then allows
// all of the optional fields to be set by calling the corresponding methods
// before calling [ManifestV1Builder.Build] to construct the object.
func NewManifestV1Builder(path string, length int64, partitionSpecID int32, addedSnapshotID int64) *ManifestV1Builder {
return &ManifestV1Builder{
m: &manifestFileV1{
Path: path,
Len: length,
SpecID: partitionSpecID,
AddedSnapshotID: addedSnapshotID,
},
}
}
func (b *ManifestV1Builder) AddedFiles(cnt int32) *ManifestV1Builder {
b.m.AddedFilesCount = &cnt
return b
}
func (b *ManifestV1Builder) ExistingFiles(cnt int32) *ManifestV1Builder {
b.m.ExistingFilesCount = &cnt
return b
}
func (b *ManifestV1Builder) DeletedFiles(cnt int32) *ManifestV1Builder {
b.m.DeletedFilesCount = &cnt
return b
}
func (b *ManifestV1Builder) AddedRows(cnt int64) *ManifestV1Builder {
b.m.AddedRowsCount = &cnt
return b
}
func (b *ManifestV1Builder) ExistingRows(cnt int64) *ManifestV1Builder {
b.m.ExistingRowsCount = &cnt
return b
}
func (b *ManifestV1Builder) DeletedRows(cnt int64) *ManifestV1Builder {
b.m.DeletedRowsCount = &cnt
return b
}
func (b *ManifestV1Builder) Partitions(p []FieldSummary) *ManifestV1Builder {
b.m.PartitionList = &p
return b
}
func (b *ManifestV1Builder) KeyMetadata(km []byte) *ManifestV1Builder {
b.m.Key = km
return b
}
// Build returns the constructed manifest file, after calling Build this
// builder should not be used further as we avoid copying by just returning
// a pointer to the constructed manifest file. Further calls to the modifier
// methods after calling build would modify the constructed ManifestFile.
func (b *ManifestV1Builder) Build() ManifestFile {
return b.m
}
type fallbackManifestFileV1 struct {
manifestFileV1
AddedSnapshotID *int64 `avro:"added_snapshot_id"`
}
func (f *fallbackManifestFileV1) toManifest() *manifestFileV1 {
f.manifestFileV1.AddedSnapshotID = *f.AddedSnapshotID
return &f.manifestFileV1
}
type manifestFileV1 struct {
Path string `avro:"manifest_path"`
Len int64 `avro:"manifest_length"`
SpecID int32 `avro:"partition_spec_id"`
AddedSnapshotID int64 `avro:"added_snapshot_id"`
AddedFilesCount *int32 `avro:"added_data_files_count"`
ExistingFilesCount *int32 `avro:"existing_data_files_count"`
DeletedFilesCount *int32 `avro:"deleted_data_files_count"`
AddedRowsCount *int64 `avro:"added_rows_count"`
ExistingRowsCount *int64 `avro:"existing_rows_count"`
DeletedRowsCount *int64 `avro:"deleted_rows_count"`
PartitionList *[]FieldSummary `avro:"partitions"`
Key []byte `avro:"key_metadata"`
}
func (*manifestFileV1) Version() int { return 1 }
func (m *manifestFileV1) FilePath() string { return m.Path }
func (m *manifestFileV1) Length() int64 { return m.Len }
func (m *manifestFileV1) PartitionSpecID() int32 { return m.SpecID }
func (m *manifestFileV1) ManifestContent() ManifestContent {
return ManifestContentData
}
func (m *manifestFileV1) SnapshotID() int64 {
return m.AddedSnapshotID
}
func (m *manifestFileV1) AddedDataFiles() int32 {
if m.AddedFilesCount == nil {
return 0
}
return *m.AddedFilesCount
}
func (m *manifestFileV1) ExistingDataFiles() int32 {
if m.ExistingFilesCount == nil {
return 0
}
return *m.ExistingFilesCount
}
func (m *manifestFileV1) DeletedDataFiles() int32 {
if m.DeletedFilesCount == nil {
return 0
}
return *m.DeletedFilesCount
}
func (m *manifestFileV1) AddedRows() int64 {
if m.AddedRowsCount == nil {
return 0
}
return *m.AddedRowsCount
}
func (m *manifestFileV1) ExistingRows() int64 {
if m.ExistingRowsCount == nil {
return 0
}
return *m.ExistingRowsCount
}
func (m *manifestFileV1) DeletedRows() int64 {
if m.DeletedRowsCount == nil {
return 0
}
return *m.DeletedRowsCount
}
func (m *manifestFileV1) HasAddedFiles() bool {
return m.AddedFilesCount == nil || *m.AddedFilesCount > 0
}
func (m *manifestFileV1) HasExistingFiles() bool {
return m.ExistingFilesCount == nil || *m.ExistingFilesCount > 0
}
func (m *manifestFileV1) SequenceNum() int64 { return 0 }
func (m *manifestFileV1) MinSequenceNum() int64 { return 0 }
func (m *manifestFileV1) KeyMetadata() []byte { return m.Key }
func (m *manifestFileV1) Partitions() []FieldSummary {
if m.PartitionList == nil {
return nil
}
return *m.PartitionList
}
func (m *manifestFileV1) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
return fetchManifestEntries(m, fs, discardDeleted)
}
// ManifestV2Builder is a helper for building a V2 manifest file
// struct which will conform to the ManifestFile interface.
type ManifestV2Builder struct {
m *manifestFileV2
}
// NewManifestV2Builder is constructed with the primary fields, with the remaining
// fields set to their zero value unless modified by calling the corresponding
// methods of the builder. Then calling [ManifestV2Builder.Build] to retrieve the
// constructed ManifestFile.
func NewManifestV2Builder(path string, length int64, partitionSpecID int32, content ManifestContent, addedSnapshotID int64) *ManifestV2Builder {
return &ManifestV2Builder{
m: &manifestFileV2{
Path: path,
Len: length,
SpecID: partitionSpecID,
Content: content,
AddedSnapshotID: addedSnapshotID,
},
}
}
func (b *ManifestV2Builder) SequenceNum(num, minSeqNum int64) *ManifestV2Builder {
b.m.SeqNumber, b.m.MinSeqNumber = num, minSeqNum
return b
}
func (b *ManifestV2Builder) AddedFiles(cnt int32) *ManifestV2Builder {
b.m.AddedFilesCount = cnt
return b
}
func (b *ManifestV2Builder) ExistingFiles(cnt int32) *ManifestV2Builder {
b.m.ExistingFilesCount = cnt
return b
}
func (b *ManifestV2Builder) DeletedFiles(cnt int32) *ManifestV2Builder {
b.m.DeletedFilesCount = cnt
return b
}
func (b *ManifestV2Builder) AddedRows(cnt int64) *ManifestV2Builder {
b.m.AddedRowsCount = cnt
return b
}
func (b *ManifestV2Builder) ExistingRows(cnt int64) *ManifestV2Builder {
b.m.ExistingRowsCount = cnt
return b
}
func (b *ManifestV2Builder) DeletedRows(cnt int64) *ManifestV2Builder {
b.m.DeletedRowsCount = cnt
return b
}
func (b *ManifestV2Builder) Partitions(p []FieldSummary) *ManifestV2Builder {
b.m.PartitionList = &p
return b
}
func (b *ManifestV2Builder) KeyMetadata(km []byte) *ManifestV2Builder {
b.m.Key = km
return b
}
// Build returns the constructed manifest file, after calling Build this
// builder should not be used further as we avoid copying by just returning
// a pointer to the constructed manifest file. Further calls to the modifier
// methods after calling build would modify the constructed ManifestFile.
func (b *ManifestV2Builder) Build() ManifestFile {
return b.m
}
type manifestFileV2 struct {
Path string `avro:"manifest_path"`
Len int64 `avro:"manifest_length"`
SpecID int32 `avro:"partition_spec_id"`
Content ManifestContent `avro:"content"`
SeqNumber int64 `avro:"sequence_number"`
MinSeqNumber int64 `avro:"min_sequence_number"`
AddedSnapshotID int64 `avro:"added_snapshot_id"`
AddedFilesCount int32 `avro:"added_files_count"`
ExistingFilesCount int32 `avro:"existing_files_count"`
DeletedFilesCount int32 `avro:"deleted_files_count"`
AddedRowsCount int64 `avro:"added_rows_count"`
ExistingRowsCount int64 `avro:"existing_rows_count"`
DeletedRowsCount int64 `avro:"deleted_rows_count"`
PartitionList *[]FieldSummary `avro:"partitions"`
Key []byte `avro:"key_metadata"`
}
func (*manifestFileV2) Version() int { return 2 }
func (m *manifestFileV2) FilePath() string { return m.Path }
func (m *manifestFileV2) Length() int64 { return m.Len }
func (m *manifestFileV2) PartitionSpecID() int32 { return m.SpecID }
func (m *manifestFileV2) ManifestContent() ManifestContent { return m.Content }
func (m *manifestFileV2) SnapshotID() int64 {
return m.AddedSnapshotID
}
func (m *manifestFileV2) AddedDataFiles() int32 {
return m.AddedFilesCount
}
func (m *manifestFileV2) ExistingDataFiles() int32 {
return m.ExistingFilesCount
}
func (m *manifestFileV2) DeletedDataFiles() int32 {
return m.DeletedFilesCount
}
func (m *manifestFileV2) AddedRows() int64 {
return m.AddedRowsCount
}
func (m *manifestFileV2) ExistingRows() int64 {
return m.ExistingRowsCount
}
func (m *manifestFileV2) DeletedRows() int64 {
return m.DeletedRowsCount
}
func (m *manifestFileV2) SequenceNum() int64 { return m.SeqNumber }
func (m *manifestFileV2) MinSequenceNum() int64 { return m.MinSeqNumber }
func (m *manifestFileV2) KeyMetadata() []byte { return m.Key }
func (m *manifestFileV2) Partitions() []FieldSummary {
if m.PartitionList == nil {
return nil
}
return *m.PartitionList
}
func (m *manifestFileV2) HasAddedFiles() bool {
return m.AddedFilesCount > 0
}
func (m *manifestFileV2) HasExistingFiles() bool {
return m.ExistingFilesCount > 0
}
func (m *manifestFileV2) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
return fetchManifestEntries(m, fs, discardDeleted)
}
func getFieldIDMap(sc avro.Schema) map[string]int {
getField := func(rs *avro.RecordSchema, name string) *avro.Field {
for _, f := range rs.Fields() {
if f.Name() == name {
return f
}
}
return nil
}
result := make(map[string]int)
entryField := getField(sc.(*avro.RecordSchema), "data_file")
partitionField := getField(entryField.Type().(*avro.RecordSchema), "partition")
for _, field := range partitionField.Type().(*avro.RecordSchema).Fields() {
if fid, ok := field.Prop("field-id").(float64); ok {
result[field.Name()] = int(fid)
}
}
return result
}
type hasFieldToIDMap interface {
setFieldNameToIDMap(map[string]int)
}
func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
f, err := fs.Open(m.FilePath())
if err != nil {
return nil, err
}
defer f.Close()
dec, err := ocf.NewDecoder(f)
if err != nil {
return nil, err
}
metadata := dec.Metadata()
sc, err := avro.ParseBytes(dec.Metadata()["avro.schema"])
if err != nil {
return nil, err
}
fieldNameToID := getFieldIDMap(sc)
isVer1, isFallback := true, false
if string(metadata["format-version"]) == "2" {
isVer1 = false
} else {
for _, f := range sc.(*avro.RecordSchema).Fields() {
if f.Name() == "snapshot_id" {
if f.Type().Type() == avro.Union {
isFallback = true
}
break
}
}
}
results := make([]ManifestEntry, 0)
for dec.HasNext() {
var tmp ManifestEntry
if isVer1 {
if isFallback {
tmp = &fallbackManifestEntryV1{}
} else {
tmp = &manifestEntryV1{}
}
} else {
tmp = &manifestEntryV2{}
}
if err := dec.Decode(tmp); err != nil {
return nil, err
}
if isFallback {
tmp = tmp.(*fallbackManifestEntryV1).toEntry()
}
if !discardDeleted || tmp.Status() != EntryStatusDELETED {
tmp.inheritSeqNum(m)
if fieldToIDMap, ok := tmp.DataFile().(hasFieldToIDMap); ok {
fieldToIDMap.setFieldNameToIDMap(fieldNameToID)
}
results = append(results, tmp)
}
}
return results, dec.Error()
}
// ManifestFile is the interface which covers both V1 and V2 manifest files.
type ManifestFile interface {
// Version returns the version number of this manifest file.
// It should be 1 or 2.
Version() int
// FilePath is the location URI of this manifest file.
FilePath() string
// Length is the length in bytes of the manifest file.
Length() int64
// PartitionSpecID is the ID of the partition spec used to write
// this manifest. It must be listed in the table metadata
// partition-specs.
PartitionSpecID() int32
// ManifestContent is the type of files tracked by this manifest,
// either data or delete files. All v1 manifests track data files.
ManifestContent() ManifestContent
// SnapshotID is the ID of the snapshot where this manifest file
// was added.
SnapshotID() int64
// AddedDataFiles returns the number of entries in the manifest that
// have the status of EntryStatusADDED.
AddedDataFiles() int32
// ExistingDataFiles returns the number of entries in the manifest
// which have the status of EntryStatusEXISTING.
ExistingDataFiles() int32
// DeletedDataFiles returns the number of entries in the manifest
// which have the status of EntryStatusDELETED.
DeletedDataFiles() int32
// AddedRows returns the number of rows in all files of the manifest
// that have status EntryStatusADDED.
AddedRows() int64
// ExistingRows returns the number of rows in all files of the manifest
// which have status EntryStatusEXISTING.
ExistingRows() int64
// DeletedRows returns the number of rows in all files of the manifest
// which have status EntryStatusDELETED.
DeletedRows() int64
// SequenceNum returns the sequence number when this manifest was
// added to the table. Will be 0 for v1 manifest lists.
SequenceNum() int64
// MinSequenceNum is the minimum data sequence number of all live data
// or delete files in the manifest. Will be 0 for v1 manifest lists.
MinSequenceNum() int64
// KeyMetadata returns implementation-specific key metadata for encryption
// if it exists in the manifest list.
KeyMetadata() []byte
// Partitions returns a list of field summaries for each partition
// field in the spec. Each field in the list corresponds to a field in
// the manifest file's partition spec.
Partitions() []FieldSummary
// HasAddedFiles returns true if AddedDataFiles > 0 or if it was null.
HasAddedFiles() bool
// HasExistingFiles returns true if ExistingDataFiles > 0 or if it was null.
HasExistingFiles() bool
// FetchEntries reads the manifest list file to fetch the list of
// manifest entries using the provided file system IO interface.
// If discardDeleted is true, entries for files containing deleted rows
// will be skipped.
FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error)
}
// ReadManifestList reads in an avro manifest list file and returns a slice
// of manifest files or an error if one is encountered.
func ReadManifestList(in io.Reader) ([]ManifestFile, error) {
dec, err := ocf.NewDecoder(in)
if err != nil {
return nil, err
}
sc, err := avro.ParseBytes(dec.Metadata()["avro.schema"])
if err != nil {
return nil, err
}
var fallbackAddedSnapshot bool
for _, f := range sc.(*avro.RecordSchema).Fields() {
if f.Name() == "added_snapshot_id" {
if f.Type().Type() == avro.Union {
fallbackAddedSnapshot = true
}
break
}
}
out := make([]ManifestFile, 0)
for dec.HasNext() {
var file ManifestFile
if string(dec.Metadata()["format-version"]) == "2" {
file = &manifestFileV2{}
} else {
if fallbackAddedSnapshot {
file = &fallbackManifestFileV1{}
} else {
file = &manifestFileV1{}
}
}
if err := dec.Decode(file); err != nil {
return nil, err
}
if fallbackAddedSnapshot {
file = file.(*fallbackManifestFileV1).toManifest()
}
out = append(out, file)
}
return out, dec.Error()
}
// ManifestEntryStatus defines constants for the entry status of
// existing, added or deleted.
type ManifestEntryStatus int8
const (
EntryStatusEXISTING ManifestEntryStatus = 0
EntryStatusADDED ManifestEntryStatus = 1
EntryStatusDELETED ManifestEntryStatus = 2
)
// ManifestEntryContent defines constants for the type of file contents
// in the file entries. Data, Position based deletes and equality based
// deletes.
type ManifestEntryContent int8
const (
EntryContentData ManifestEntryContent = 0
EntryContentPosDeletes ManifestEntryContent = 1
EntryContentEqDeletes ManifestEntryContent = 2
)
func (m ManifestEntryContent) String() string {
switch m {
case EntryContentData:
return "Data"
case EntryContentPosDeletes:
return "Positional_Deletes"
case EntryContentEqDeletes:
return "Equality_Deletes"
default:
return "UNKNOWN"
}
}
// FileFormat defines constants for the format of data files.
type FileFormat string
const (
AvroFile FileFormat = "AVRO"
OrcFile FileFormat = "ORC"
ParquetFile FileFormat = "PARQUET"
)
type colMap[K, V any] struct {
Key K `avro:"key"`
Value V `avro:"value"`
}
func avroColMapToMap[K comparable, V any](c *[]colMap[K, V]) map[K]V {
if c == nil {
return nil
}
out := make(map[K]V)
for _, data := range *c {
out[data.Key] = data.Value
}
return out
}
func avroPartitionData(input map[string]any) map[string]any {
// hambra/avro/v2 will unmarshal a map[string]any such that
// each entry will actually be a map[string]any with the key being
// the avro type, not the field name.
//
// This means that partition data that looks like this:
//
// [{"field-id": 1000, "name": "ts", "type": {"type": "int", "logicalType": "date"}}]
//
// Becomes:
//
// map[string]any{"ts": map[string]any{"int.date": time.Time{}}}
//
// so we need to simplify our map and make the partition data handling easier
out := make(map[string]any)
for k, v := range input {
switch v := v.(type) {
case map[string]any:
for typeName, val := range v {
switch typeName {
case "int.date":
out[k] = Date(val.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 24).Seconds()))
case "int.time-millis":
out[k] = Time(val.(time.Duration).Microseconds())
case "long.time-micros":
out[k] = Time(val.(time.Duration).Microseconds())
case "long.timestamp-millis":
out[k] = Timestamp(val.(time.Time).UTC().UnixMicro())
case "long.timestamp-micros":
out[k] = Timestamp(val.(time.Time).UTC().UnixMicro())
case "bytes.decimal":
// not implemented yet
case "fixed.decimal":
// not implemented yet
default:
out[k] = val
}
}
default:
out[k] = v
}
}
return out
}
type dataFile struct {
Content ManifestEntryContent `avro:"content"`
Path string `avro:"file_path"`
Format FileFormat `avro:"file_format"`
PartitionData map[string]any `avro:"partition"`
RecordCount int64 `avro:"record_count"`
FileSize int64 `avro:"file_size_in_bytes"`
BlockSizeInBytes int64 `avro:"block_size_in_bytes"`
ColSizes *[]colMap[int, int64] `avro:"column_sizes"`
ValCounts *[]colMap[int, int64] `avro:"value_counts"`
NullCounts *[]colMap[int, int64] `avro:"null_value_counts"`
NaNCounts *[]colMap[int, int64] `avro:"nan_value_counts"`
DistinctCounts *[]colMap[int, int64] `avro:"distinct_counts"`
LowerBounds *[]colMap[int, []byte] `avro:"lower_bounds"`
UpperBounds *[]colMap[int, []byte] `avro:"upper_bounds"`
Key *[]byte `avro:"key_metadata"`
Splits *[]int64 `avro:"split_offsets"`
EqualityIDs *[]int `avro:"equality_ids"`
SortOrder *int `avro:"sort_order_id"`
colSizeMap map[int]int64
valCntMap map[int]int64
nullCntMap map[int]int64
nanCntMap map[int]int64
distinctCntMap map[int]int64
lowerBoundMap map[int][]byte
upperBoundMap map[int][]byte
// not used for anything yet, but important to maintain the information
// for future development and updates such as when we get to writes,
// and scan planning
fieldNameToID map[string]int
initMaps sync.Once
}
func (d *dataFile) initializeMapData() {
d.initMaps.Do(func() {
d.colSizeMap = avroColMapToMap(d.ColSizes)
d.valCntMap = avroColMapToMap(d.ValCounts)
d.nullCntMap = avroColMapToMap(d.NullCounts)
d.nanCntMap = avroColMapToMap(d.NaNCounts)
d.distinctCntMap = avroColMapToMap(d.DistinctCounts)
d.lowerBoundMap = avroColMapToMap(d.LowerBounds)
d.upperBoundMap = avroColMapToMap(d.UpperBounds)
d.PartitionData = avroPartitionData(d.PartitionData)
})
}
func (d *dataFile) setFieldNameToIDMap(m map[string]int) { d.fieldNameToID = m }
func (d *dataFile) ContentType() ManifestEntryContent { return d.Content }
func (d *dataFile) FilePath() string { return d.Path }
func (d *dataFile) FileFormat() FileFormat { return d.Format }
func (d *dataFile) Partition() map[string]any {
d.initializeMapData()
return d.PartitionData
}
func (d *dataFile) Count() int64 { return d.RecordCount }
func (d *dataFile) FileSizeBytes() int64 { return d.FileSize }
func (d *dataFile) ColumnSizes() map[int]int64 {
d.initializeMapData()
return d.colSizeMap
}
func (d *dataFile) ValueCounts() map[int]int64 {
d.initializeMapData()
return d.valCntMap
}
func (d *dataFile) NullValueCounts() map[int]int64 {
d.initializeMapData()
return d.nullCntMap
}
func (d *dataFile) NaNValueCounts() map[int]int64 {
d.initializeMapData()
return d.nanCntMap
}
func (d *dataFile) DistinctValueCounts() map[int]int64 {
d.initializeMapData()
return d.distinctCntMap
}
func (d *dataFile) LowerBoundValues() map[int][]byte {
d.initializeMapData()
return d.lowerBoundMap
}
func (d *dataFile) UpperBoundValues() map[int][]byte {
d.initializeMapData()
return d.upperBoundMap
}
func (d *dataFile) KeyMetadata() []byte {
if d.Key == nil {
return nil
}
return *d.Key
}
func (d *dataFile) SplitOffsets() []int64 {
if d.Splits == nil {
return nil
}
return *d.Splits
}
func (d *dataFile) EqualityFieldIDs() []int {
if d.EqualityIDs == nil {
return nil
}
return d.EqualityFieldIDs()
}
func (d *dataFile) SortOrderID() *int { return d.SortOrder }
type manifestEntryV1 struct {
EntryStatus ManifestEntryStatus `avro:"status"`
Snapshot int64 `avro:"snapshot_id"`
SeqNum *int64
FileSeqNum *int64
Data dataFile `avro:"data_file"`
}
type fallbackManifestEntryV1 struct {
manifestEntryV1
Snapshot *int64 `avro:"snapshot_id"`
}
func (f *fallbackManifestEntryV1) toEntry() *manifestEntryV1 {
f.manifestEntryV1.Snapshot = *f.Snapshot
return &f.manifestEntryV1
}
func (m *manifestEntryV1) inheritSeqNum(manifest ManifestFile) {}
func (m *manifestEntryV1) Status() ManifestEntryStatus { return m.EntryStatus }
func (m *manifestEntryV1) SnapshotID() int64 { return m.Snapshot }
func (m *manifestEntryV1) SequenceNum() int64 {
if m.SeqNum == nil {
return 0
}
return *m.SeqNum
}
func (m *manifestEntryV1) FileSequenceNum() *int64 {
return m.FileSeqNum
}
func (m *manifestEntryV1) DataFile() DataFile { return &m.Data }
type manifestEntryV2 struct {
EntryStatus ManifestEntryStatus `avro:"status"`
Snapshot *int64 `avro:"snapshot_id"`
SeqNum *int64 `avro:"sequence_number"`
FileSeqNum *int64 `avro:"file_sequence_number"`
Data dataFile `avro:"data_file"`
}
func (m *manifestEntryV2) inheritSeqNum(manifest ManifestFile) {
if m.Snapshot == nil {
snap := manifest.SnapshotID()
m.Snapshot = &snap
}
manifestSequenceNum := manifest.SequenceNum()
if m.SeqNum == nil && (manifestSequenceNum == 0 || m.EntryStatus == EntryStatusADDED) {
m.SeqNum = &manifestSequenceNum
}
if m.FileSeqNum == nil && (manifestSequenceNum == 0 || m.EntryStatus == EntryStatusADDED) {
m.FileSeqNum = &manifestSequenceNum
}
}
func (m *manifestEntryV2) Status() ManifestEntryStatus { return m.EntryStatus }
func (m *manifestEntryV2) SnapshotID() int64 {
if m.Snapshot == nil {
return 0
}
return *m.Snapshot
}
func (m *manifestEntryV2) SequenceNum() int64 {
if m.SeqNum == nil {
return 0
}
return *m.SeqNum
}
func (m *manifestEntryV2) FileSequenceNum() *int64 {
return m.FileSeqNum
}
func (m *manifestEntryV2) DataFile() DataFile { return &m.Data }
// DataFile is the interface for reading the information about a
// given data file indicated by an entry in a manifest list.
type DataFile interface {
// ContentType is the type of the content stored by the data file,
// either Data, Equality deletes, or Position deletes. All v1 files
// are Data files.
ContentType() ManifestEntryContent
// FilePath is the full URI for the file, complete with FS scheme.
FilePath() string
// FileFormat is the format of the data file, AVRO, Orc, or Parquet.
FileFormat() FileFormat
// Partition returns a mapping of field name to partition value for
// each of the partition spec's fields.
Partition() map[string]any
// Count returns the number of records in this file.
Count() int64
// FileSizeBytes is the total file size in bytes.
FileSizeBytes() int64
// ColumnSizes is a mapping from column id to the total size on disk
// of all regions that store the column. Does not include bytes
// necessary to read other columns, like footers. Map will be nil for
// row-oriented formats (avro).
ColumnSizes() map[int]int64
// ValueCounts is a mapping from column id to the number of values
// in the column, including null and NaN values.
ValueCounts() map[int]int64
// NullValueCounts is a mapping from column id to the number of
// null values in the column.
NullValueCounts() map[int]int64
// NaNValueCounts is a mapping from column id to the number of NaN
// values in the column.
NaNValueCounts() map[int]int64
// DistictValueCounts is a mapping from column id to the number of
// distinct values in the column. Distinct counts must be derived
// using values in the file by counting or using sketches, but not
// using methods like merging existing distinct counts.
DistinctValueCounts() map[int]int64
// LowerBoundValues is a mapping from column id to the lower bounded
// value of the column, serialized as binary. Each value in the column
// must be less than or requal to all non-null, non-NaN values in the
// column for the file.
LowerBoundValues() map[int][]byte
// UpperBoundValues is a mapping from column id to the upper bounded
// value of the column, serialized as binary. Each value in the column
// must be greater than or equal to all non-null, non-NaN values in
// the column for the file.
UpperBoundValues() map[int][]byte
// KeyMetadata is implementation-specific key metadata for encryption.
KeyMetadata() []byte
// SplitOffsets are the split offsets for the data file. For example,
// all row group offsets in a Parquet file. Must be sorted ascending.
SplitOffsets() []int64
// EqualityFieldIDs are used to determine row equality in equality
// delete files. It is required when the content type is
// EntryContentEqDeletes.
EqualityFieldIDs() []int
// SortOrderID returns the id representing the sort order for this
// file, or nil if there is no sort order.
SortOrderID() *int
}
// ManifestEntry is an interface for both v1 and v2 manifest entries.
type ManifestEntry interface {
// Status returns the type of the file tracked by this entry.
// Deletes are informational only and not used in scans.
Status() ManifestEntryStatus
// SnapshotID is the id where the file was added, or deleted,
// if null it is inherited from the manifest list.
SnapshotID() int64
// SequenceNum returns the data sequence number of the file.
// If it was null and the status is EntryStatusADDED then it
// is inherited from the manifest list.
SequenceNum() int64
// FileSequenceNum returns the file sequence number indicating
// when the file was added. If it was null and the status is
// EntryStatusADDED then it is inherited from the manifest list.
FileSequenceNum() *int64
// DataFile provides the information about the data file indicated
// by this manifest entry.
DataFile() DataFile
inheritSeqNum(manifest ManifestFile)
}
var PositionalDeleteSchema = NewSchema(0,
NestedField{ID: 2147483546, Type: PrimitiveTypes.String, Name: "file_path", Required: true},
NestedField{ID: 2147483545, Type: PrimitiveTypes.Int32, Name: "pos", Required: true},
)