blob: 17edb342a7032e9d2f6bfffb4eba0bd31f406a25 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
import (
"bytes"
"context"
"fmt"
"math"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/compute"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/iceberg-go"
iceinternal "github.com/apache/iceberg-go/internal"
iceio "github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table/internal"
"golang.org/x/sync/errgroup"
)
// equalityDeleteSet holds the set of delete keys and the column names
// used to look them up in data records. Each set corresponds to one
// group of equality field IDs — delete files with different field IDs
// produce separate sets.
type equalityDeleteSet struct {
keys set[string]
fieldIDs []int
colNames []string
}
// readAllEqualityDeleteFiles reads all unique equality delete files from
// the tasks and builds per-task delete key sets. Returns nil if there are
// no equality deletes. Delete files with different equality field IDs are
// kept as separate sets (not merged).
func readAllEqualityDeleteFiles(ctx context.Context, fs iceio.IO, schema *iceberg.Schema, tasks []FileScanTask, concurrency int) (map[int][]*equalityDeleteSet, error) {
type deleteFileInfo struct {
file iceberg.DataFile
fieldIDs []int
}
uniqueDeletes := make(map[string]deleteFileInfo)
hasAny := false
for _, t := range tasks {
for _, d := range t.EqualityDeleteFiles {
if d.ContentType() != iceberg.EntryContentEqDeletes {
continue
}
hasAny = true
if _, ok := uniqueDeletes[d.FilePath()]; !ok {
uniqueDeletes[d.FilePath()] = deleteFileInfo{
file: d,
fieldIDs: d.EqualityFieldIDs(),
}
}
}
}
if !hasAny {
return nil, nil
}
type deleteFileResult struct {
path string
fieldIDs []int
colNames []string
keys set[string]
}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(concurrency)
resultCh := make(chan deleteFileResult, len(uniqueDeletes))
go func() {
defer close(resultCh)
for _, info := range uniqueDeletes {
g.Go(func() error {
keys, colNames, err := readEqualityDeleteFile(ctx, fs, schema, info.file, info.fieldIDs)
if err != nil {
return err
}
resultCh <- deleteFileResult{
path: info.file.FilePath(),
fieldIDs: info.fieldIDs,
colNames: colNames,
keys: keys,
}
return nil
})
}
_ = g.Wait()
}()
type perFileDeleteKeys struct {
fieldIDs []int
colNames []string
keys set[string]
}
perFile := make(map[string]*perFileDeleteKeys)
for result := range resultCh {
perFile[result.path] = &perFileDeleteKeys{
fieldIDs: result.fieldIDs,
colNames: result.colNames,
keys: result.keys,
}
}
if err := g.Wait(); err != nil {
return nil, err
}
// Build per-task delete sets. Group by field IDs so delete files with
// different equality field sets are applied independently.
perTask := make(map[int][]*equalityDeleteSet)
for i, t := range tasks {
if len(t.EqualityDeleteFiles) == 0 {
continue
}
// Group delete files by their field IDs key.
groups := make(map[string]*equalityDeleteSet)
for _, d := range t.EqualityDeleteFiles {
dk, ok := perFile[d.FilePath()]
if !ok {
continue
}
groupKey := fmt.Sprint(dk.fieldIDs)
g, exists := groups[groupKey]
if !exists {
g = &equalityDeleteSet{
keys: make(set[string]),
fieldIDs: dk.fieldIDs,
colNames: dk.colNames,
}
groups[groupKey] = g
}
for k := range dk.keys {
g.keys[k] = struct{}{}
}
}
sets := make([]*equalityDeleteSet, 0, len(groups))
for _, g := range groups {
if len(g.keys) > 0 {
sets = append(sets, g)
}
}
if len(sets) > 0 {
perTask[i] = sets
}
}
return perTask, nil
}
// readEqualityDeleteFile reads a single equality delete file and returns
// the set of encoded delete keys and the column names used.
func readEqualityDeleteFile(ctx context.Context, fs iceio.IO, tableSchema *iceberg.Schema, dataFile iceberg.DataFile, fieldIDs []int) (set[string], []string, error) {
src, err := internal.GetFile(ctx, fs, dataFile, true)
if err != nil {
return nil, nil, err
}
rdr, err := src.GetReader(ctx)
if err != nil {
return nil, nil, err
}
defer iceinternal.CheckedClose(rdr, &err)
tbl, err := rdr.ReadTable(ctx)
if err != nil {
return nil, nil, err
}
defer tbl.Release()
// Resolve column names from field IDs.
colNames := make([]string, len(fieldIDs))
colIndices := make([]int, len(fieldIDs))
for i, fid := range fieldIDs {
name, ok := tableSchema.FindColumnName(fid)
if !ok {
return nil, nil, fmt.Errorf("equality delete field ID %d not found in table schema", fid)
}
colNames[i] = name
indices := tbl.Schema().FieldIndices(name)
if len(indices) == 0 {
return nil, nil, fmt.Errorf("equality delete column %q not found in delete file %s", name, dataFile.FilePath())
}
colIndices[i] = indices[0]
}
// Build the set of encoded delete keys by iterating aligned batches.
keys := make(set[string])
var keyBuf bytes.Buffer
tr := array.NewTableReader(tbl, tbl.NumRows())
defer tr.Release()
for tr.Next() {
rec := tr.RecordBatch()
encoders := make([]colEncoder, len(colIndices))
for i, idx := range colIndices {
encoders[i] = makeColEncoder(rec.Column(idx))
}
numRows := int(rec.NumRows())
for row := 0; row < numRows; row++ {
keyBuf.Reset()
for _, enc := range encoders {
enc(&keyBuf, row)
}
keys[keyBuf.String()] = struct{}{}
}
}
return keys, colNames, nil
}
// bufPutUint16 writes a uint16 in big-endian without allocating.
func bufPutUint16(buf *bytes.Buffer, v uint16) {
buf.Write([]byte{byte(v >> 8), byte(v)})
}
// bufPutUint32 writes a uint32 in big-endian without allocating.
func bufPutUint32(buf *bytes.Buffer, v uint32) {
buf.Write([]byte{byte(v >> 24), byte(v >> 16), byte(v >> 8), byte(v)})
}
// bufPutUint64 writes a uint64 in big-endian without allocating.
func bufPutUint64(buf *bytes.Buffer, v uint64) {
buf.Write([]byte{
byte(v >> 56), byte(v >> 48), byte(v >> 40), byte(v >> 32),
byte(v >> 24), byte(v >> 16), byte(v >> 8), byte(v),
})
}
// bufString returns the buffer contents as a string without copying.
// The returned string is only valid until the next buffer modification.
func bufString(buf *bytes.Buffer) string {
b := buf.Bytes()
return unsafe.String(unsafe.SliceData(b), len(b))
}
// encodeArrowValue writes a single Arrow value to the buffer for key
// encoding. Values are type-tagged and length-prefixed for variable-length
// types to avoid hash collisions.
func encodeArrowValue(buf *bytes.Buffer, arr arrow.Array, idx int) {
if arr.IsNull(idx) {
buf.WriteByte(0) // null tag
return
}
buf.WriteByte(1) // non-null tag
switch a := arr.(type) {
case *array.Int8:
buf.WriteByte(byte(a.Value(idx)))
case *array.Int16:
bufPutUint16(buf, uint16(a.Value(idx)))
case *array.Int32:
bufPutUint32(buf, uint32(a.Value(idx)))
case *array.Int64:
bufPutUint64(buf, uint64(a.Value(idx)))
case *array.Float32:
bufPutUint32(buf, math.Float32bits(a.Value(idx)))
case *array.Float64:
bufPutUint64(buf, math.Float64bits(a.Value(idx)))
case *array.String:
s := a.Value(idx)
bufPutUint32(buf, uint32(len(s)))
buf.WriteString(s)
case *array.LargeString:
s := a.Value(idx)
bufPutUint32(buf, uint32(len(s)))
buf.WriteString(s)
case *array.Binary:
b := a.Value(idx)
bufPutUint32(buf, uint32(len(b)))
buf.Write(b)
case *array.LargeBinary:
b := a.Value(idx)
bufPutUint32(buf, uint32(len(b)))
buf.Write(b)
case *array.FixedSizeBinary:
buf.Write(a.Value(idx))
case *array.Boolean:
if a.Value(idx) {
buf.WriteByte(1)
} else {
buf.WriteByte(0)
}
case *array.Date32:
bufPutUint32(buf, uint32(a.Value(idx)))
case *array.Date64:
bufPutUint64(buf, uint64(a.Value(idx)))
case *array.Time32:
bufPutUint32(buf, uint32(a.Value(idx)))
case *array.Time64:
bufPutUint64(buf, uint64(a.Value(idx)))
case *array.Timestamp:
bufPutUint64(buf, uint64(a.Value(idx)))
default:
// Fallback: length-prefixed string representation.
s := a.ValueStr(idx)
bufPutUint32(buf, uint32(len(s)))
buf.WriteString(s)
}
}
// processEqualityDeletes returns a pipeline function that filters out
// rows whose equality key columns match any entry in the delete sets.
// Each set is applied independently (they may have different field IDs).
func processEqualityDeletes(ctx context.Context, eqDeleteSets []*equalityDeleteSet) (recProcessFn, error) {
return processEqualityDeletesColumnar(ctx, eqDeleteSets)
}
// colEncoder writes the value at row idx to buf. Resolved once per column
// to avoid per-row type switches.
type colEncoder func(buf *bytes.Buffer, row int)
// makeColEncoder returns a colEncoder for the given Arrow array that writes
// values directly from the raw typed backing slice when possible.
func makeColEncoder(arr arrow.Array) colEncoder {
switch a := arr.(type) {
case *array.Int8:
vals := a.Int8Values()
return func(buf *bytes.Buffer, row int) {
buf.WriteByte(1)
buf.WriteByte(byte(vals[row]))
}
case *array.Int16:
vals := a.Int16Values()
return func(buf *bytes.Buffer, row int) {
buf.WriteByte(1)
bufPutUint16(buf, uint16(vals[row]))
}
case *array.Int32:
vals := a.Int32Values()
return func(buf *bytes.Buffer, row int) {
buf.WriteByte(1)
bufPutUint32(buf, uint32(vals[row]))
}
case *array.Int64:
vals := a.Int64Values()
return func(buf *bytes.Buffer, row int) {
buf.WriteByte(1)
bufPutUint64(buf, uint64(vals[row]))
}
case *array.Float32:
vals := a.Float32Values()
return func(buf *bytes.Buffer, row int) {
buf.WriteByte(1)
bufPutUint32(buf, math.Float32bits(vals[row]))
}
case *array.Float64:
vals := a.Float64Values()
return func(buf *bytes.Buffer, row int) {
buf.WriteByte(1)
bufPutUint64(buf, math.Float64bits(vals[row]))
}
case *array.Date32:
vals := a.Date32Values()
return func(buf *bytes.Buffer, row int) {
buf.WriteByte(1)
bufPutUint32(buf, uint32(vals[row]))
}
case *array.Date64:
vals := a.Date64Values()
return func(buf *bytes.Buffer, row int) {
buf.WriteByte(1)
bufPutUint64(buf, uint64(vals[row]))
}
case *array.Time32:
vals := a.Time32Values()
return func(buf *bytes.Buffer, row int) {
buf.WriteByte(1)
bufPutUint32(buf, uint32(vals[row]))
}
case *array.Time64:
vals := a.Time64Values()
return func(buf *bytes.Buffer, row int) {
buf.WriteByte(1)
bufPutUint64(buf, uint64(vals[row]))
}
case *array.Timestamp:
vals := a.TimestampValues()
return func(buf *bytes.Buffer, row int) {
buf.WriteByte(1)
bufPutUint64(buf, uint64(vals[row]))
}
case *array.String:
offsets := a.ValueOffsets()
rawBytes := a.ValueBytes()
return func(buf *bytes.Buffer, row int) {
if a.IsNull(row) {
buf.WriteByte(0)
return
}
buf.WriteByte(1)
start, end := offsets[row], offsets[row+1]
bufPutUint32(buf, uint32(end-start))
buf.Write(rawBytes[start:end])
}
case *array.LargeString:
offsets := a.ValueOffsets()
rawBytes := a.ValueBytes()
return func(buf *bytes.Buffer, row int) {
if a.IsNull(row) {
buf.WriteByte(0)
return
}
buf.WriteByte(1)
start, end := offsets[row], offsets[row+1]
bufPutUint32(buf, uint32(end-start))
buf.Write(rawBytes[start:end])
}
case *array.Binary:
offsets := a.ValueOffsets()
rawBytes := a.ValueBytes()
return func(buf *bytes.Buffer, row int) {
if a.IsNull(row) {
buf.WriteByte(0)
return
}
buf.WriteByte(1)
start, end := offsets[row], offsets[row+1]
bufPutUint32(buf, uint32(end-start))
buf.Write(rawBytes[start:end])
}
case *array.LargeBinary:
offsets := a.ValueOffsets()
rawBytes := a.ValueBytes()
return func(buf *bytes.Buffer, row int) {
if a.IsNull(row) {
buf.WriteByte(0)
return
}
buf.WriteByte(1)
start, end := offsets[row], offsets[row+1]
bufPutUint32(buf, uint32(end-start))
buf.Write(rawBytes[start:end])
}
case *array.Boolean:
return func(buf *bytes.Buffer, row int) {
if a.IsNull(row) {
buf.WriteByte(0)
return
}
buf.WriteByte(1)
if a.Value(row) {
buf.WriteByte(1)
} else {
buf.WriteByte(0)
}
}
case *array.FixedSizeBinary:
return func(buf *bytes.Buffer, row int) {
if a.IsNull(row) {
buf.WriteByte(0)
return
}
buf.WriteByte(1)
buf.Write(a.Value(row))
}
default:
return func(buf *bytes.Buffer, row int) {
encodeArrowValue(buf, arr, row)
}
}
}
// processEqualityDeletesColumnar resolves typed column encoders once per
// batch, then iterates rows without per-row type switches.
func processEqualityDeletesColumnar(ctx context.Context, eqDeleteSets []*equalityDeleteSet) (recProcessFn, error) {
return func(r arrow.RecordBatch) (arrow.RecordBatch, error) {
defer r.Release()
mem := compute.GetAllocator(ctx)
numRows := int(r.NumRows())
maskBuf := memory.NewResizableBuffer(mem)
defer maskBuf.Release()
maskBuf.Resize(int(bitutil.BytesForBits(int64(numRows))))
maskBytes := maskBuf.Bytes()
for i := range maskBytes {
maskBytes[i] = 0xFF
}
var keyBuf bytes.Buffer
for _, eqDel := range eqDeleteSets {
encoders := make([]colEncoder, len(eqDel.colNames))
for i, name := range eqDel.colNames {
indices := r.Schema().FieldIndices(name)
if len(indices) == 0 {
return nil, fmt.Errorf("equality delete column %q not found in data record", name)
}
encoders[i] = makeColEncoder(r.Column(indices[0]))
}
for row := 0; row < numRows; row++ {
if !bitutil.BitIsSet(maskBytes, row) {
continue
}
keyBuf.Reset()
for _, enc := range encoders {
enc(&keyBuf, row)
}
if _, deleted := eqDel.keys[bufString(&keyBuf)]; deleted {
bitutil.ClearBit(maskBytes, row)
}
}
}
mask := array.NewBooleanData(array.NewData(
arrow.FixedWidthTypes.Boolean, numRows,
[]*memory.Buffer{nil, maskBuf}, nil, 0, 0))
defer mask.Release()
filtered, err := compute.Filter(ctx,
compute.NewDatumWithoutOwning(r),
compute.NewDatumWithoutOwning(mask),
*compute.DefaultFilterOptions())
if err != nil {
return nil, err
}
return filtered.(*compute.RecordDatum).Value, nil
}, nil
}