blob: cf78a6c252a1e0f213d9cf7f6cb5c0c750734eb1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package iceberg
import (
"bytes"
"io"
"testing"
"time"
"github.com/apache/iceberg-go/internal"
"github.com/hamba/avro/v2"
"github.com/hamba/avro/v2/ocf"
"github.com/stretchr/testify/suite"
)
var (
falseBool = false
snapshotID int64 = 9182715666859759686
addedRows int64 = 237993
manifestFileRecordsV1 = []ManifestFile{
NewManifestFile(1, "/home/iceberg/warehouse/nyc/taxis_partitioned/metadata/0125c686-8aa6-4502-bdcc-b6d17ca41a3b-m0.avro",
7989, 0, snapshotID).
AddedFiles(3).
ExistingFiles(0).
DeletedFiles(0).
AddedRows(addedRows).
ExistingRows(0).
DeletedRows(0).
Partitions([]FieldSummary{{
ContainsNull: true, ContainsNaN: &falseBool,
LowerBound: &[]byte{0x01, 0x00, 0x00, 0x00},
UpperBound: &[]byte{0x02, 0x00, 0x00, 0x00},
}}).Build(),
}
manifestFileRecordsV2 = []ManifestFile{
NewManifestFile(2, "/home/iceberg/warehouse/nyc/taxis_partitioned/metadata/0125c686-8aa6-4502-bdcc-b6d17ca41a3b-m0.avro",
7989, 0, snapshotID).
Content(ManifestContentDeletes).
SequenceNum(3, 3).
AddedFiles(3).
ExistingFiles(0).
DeletedFiles(0).
AddedRows(addedRows).
ExistingRows(0).
DeletedRows(0).
Partitions([]FieldSummary{{
ContainsNull: true,
ContainsNaN: &falseBool,
LowerBound: &[]byte{0x01, 0x00, 0x00, 0x00},
UpperBound: &[]byte{0x02, 0x00, 0x00, 0x00},
}}).Build(),
}
entrySnapshotID int64 = 8744736658442914487
intZero = 0
manifestEntryV1Records = []*manifestEntry{
{
EntryStatus: EntryStatusADDED,
Snapshot: &entrySnapshotID,
Data: &dataFile{
// bad value for Content but this field doesn't exist in V1
// so it shouldn't get written and shouldn't be read back out
// so the roundtrip test asserts that we get the default value
// back out.
Content: EntryContentEqDeletes,
Path: "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
Format: ParquetFile,
PartitionData: map[string]any{"VendorID": int(1), "tpep_pickup_datetime": time.Unix(1925, 0).UnixMicro()},
RecordCount: 19513,
FileSize: 388872,
BlockSizeInBytes: 67108864,
ColSizes: &[]colMap[int, int64]{
{Key: 1, Value: 53},
{Key: 2, Value: 98153},
{Key: 3, Value: 98693},
{Key: 4, Value: 53},
{Key: 5, Value: 53},
{Key: 6, Value: 53},
{Key: 7, Value: 17425},
{Key: 8, Value: 18528},
{Key: 9, Value: 53},
{Key: 10, Value: 44788},
{Key: 11, Value: 35571},
{Key: 12, Value: 53},
{Key: 13, Value: 1243},
{Key: 14, Value: 2355},
{Key: 15, Value: 12750},
{Key: 16, Value: 4029},
{Key: 17, Value: 110},
{Key: 18, Value: 47194},
{Key: 19, Value: 2948},
},
ValCounts: &[]colMap[int, int64]{
{Key: 1, Value: 19513},
{Key: 2, Value: 19513},
{Key: 3, Value: 19513},
{Key: 4, Value: 19513},
{Key: 5, Value: 19513},
{Key: 6, Value: 19513},
{Key: 7, Value: 19513},
{Key: 8, Value: 19513},
{Key: 9, Value: 19513},
{Key: 10, Value: 19513},
{Key: 11, Value: 19513},
{Key: 12, Value: 19513},
{Key: 13, Value: 19513},
{Key: 14, Value: 19513},
{Key: 15, Value: 19513},
{Key: 16, Value: 19513},
{Key: 17, Value: 19513},
{Key: 18, Value: 19513},
{Key: 19, Value: 19513},
},
NullCounts: &[]colMap[int, int64]{
{Key: 1, Value: 19513},
{Key: 2, Value: 0},
{Key: 3, Value: 0},
{Key: 4, Value: 19513},
{Key: 5, Value: 19513},
{Key: 6, Value: 19513},
{Key: 7, Value: 0},
{Key: 8, Value: 0},
{Key: 9, Value: 19513},
{Key: 10, Value: 0},
{Key: 11, Value: 0},
{Key: 12, Value: 19513},
{Key: 13, Value: 0},
{Key: 14, Value: 0},
{Key: 15, Value: 0},
{Key: 16, Value: 0},
{Key: 17, Value: 0},
{Key: 18, Value: 0},
{Key: 19, Value: 0},
},
NaNCounts: &[]colMap[int, int64]{
{Key: 16, Value: 0},
{Key: 17, Value: 0},
{Key: 18, Value: 0},
{Key: 19, Value: 0},
{Key: 10, Value: 0},
{Key: 11, Value: 0},
{Key: 12, Value: 0},
{Key: 13, Value: 0},
{Key: 14, Value: 0},
{Key: 15, Value: 0},
},
LowerBounds: &[]colMap[int, []byte]{
{Key: 2, Value: []byte("2020-04-01 00:00")},
{Key: 3, Value: []byte("2020-04-01 00:12")},
{Key: 7, Value: []byte{0x03, 0x00, 0x00, 0x00}},
{Key: 8, Value: []byte{0x01, 0x00, 0x00, 0x00}},
{Key: 10, Value: []byte{0xf6, 0x28, 0x5c, 0x8f, 0xc2, 0x05, 'S', 0xc0}},
{Key: 11, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}},
{Key: 13, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}},
{Key: 14, Value: []byte{0, 0, 0, 0, 0, 0, 0xe0, 0xbf}},
{Key: 15, Value: []byte{')', '\\', 0x8f, 0xc2, 0xf5, '(', 0x08, 0xc0}},
{Key: 16, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}},
{Key: 17, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}},
{Key: 18, Value: []byte{0xf6, '(', '\\', 0x8f, 0xc2, 0xc5, 'S', 0xc0}},
{Key: 19, Value: []byte{0, 0, 0, 0, 0, 0, 0x04, 0xc0}},
},
UpperBounds: &[]colMap[int, []byte]{
{Key: 2, Value: []byte("2020-04-30 23:5:")},
{Key: 3, Value: []byte("2020-05-01 00:41")},
{Key: 7, Value: []byte{'\t', 0x01, 0x00, 0x00}},
{Key: 8, Value: []byte{'\t', 0x01, 0x00, 0x00}},
{Key: 10, Value: []byte{0xcd, 0xcc, 0xcc, 0xcc, 0xcc, ',', '_', '@'}},
{Key: 11, Value: []byte{0x1f, 0x85, 0xeb, 'Q', '\\', 0xe2, 0xfe, '@'}},
{Key: 13, Value: []byte{0, 0, 0, 0, 0, 0, 0x12, '@'}},
{Key: 14, Value: []byte{0, 0, 0, 0, 0, 0, 0xe0, '?'}},
{Key: 15, Value: []byte{'q', '=', '\n', 0xd7, 0xa3, 0xf0, '1', '@'}},
{Key: 16, Value: []byte{0, 0, 0, 0, 0, '`', 'B', '@'}},
{Key: 17, Value: []byte{'3', '3', '3', '3', '3', '3', 0xd3, '?'}},
{Key: 18, Value: []byte{0, 0, 0, 0, 0, 0x18, 'b', '@'}},
{Key: 19, Value: []byte{0, 0, 0, 0, 0, 0, 0x04, '@'}},
},
Splits: &[]int64{4},
SortOrder: &intZero,
},
},
{
EntryStatus: EntryStatusADDED,
Snapshot: &entrySnapshotID,
Data: &dataFile{
Path: "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=1/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00002.parquet",
Format: ParquetFile,
PartitionData: map[string]any{"VendorID": int(1), "tpep_pickup_datetime": time.Unix(1925, 0).UnixMicro()},
RecordCount: 95050,
FileSize: 1265950,
BlockSizeInBytes: 67108864,
ColSizes: &[]colMap[int, int64]{
{Key: 1, Value: 318},
{Key: 2, Value: 329806},
{Key: 3, Value: 331632},
{Key: 4, Value: 15343},
{Key: 5, Value: 2351},
{Key: 6, Value: 3389},
{Key: 7, Value: 71269},
{Key: 8, Value: 76429},
{Key: 9, Value: 16383},
{Key: 10, Value: 86992},
{Key: 11, Value: 89608},
{Key: 12, Value: 265},
{Key: 13, Value: 19377},
{Key: 14, Value: 1692},
{Key: 15, Value: 76162},
{Key: 16, Value: 4354},
{Key: 17, Value: 759},
{Key: 18, Value: 120650},
{Key: 19, Value: 11804},
},
ValCounts: &[]colMap[int, int64]{
{Key: 1, Value: 95050},
{Key: 2, Value: 95050},
{Key: 3, Value: 95050},
{Key: 4, Value: 95050},
{Key: 5, Value: 95050},
{Key: 6, Value: 95050},
{Key: 7, Value: 95050},
{Key: 8, Value: 95050},
{Key: 9, Value: 95050},
{Key: 10, Value: 95050},
{Key: 11, Value: 95050},
{Key: 12, Value: 95050},
{Key: 13, Value: 95050},
{Key: 14, Value: 95050},
{Key: 15, Value: 95050},
{Key: 16, Value: 95050},
{Key: 17, Value: 95050},
{Key: 18, Value: 95050},
{Key: 19, Value: 95050},
},
NullCounts: &[]colMap[int, int64]{
{Key: 1, Value: 0},
{Key: 2, Value: 0},
{Key: 3, Value: 0},
{Key: 4, Value: 0},
{Key: 5, Value: 0},
{Key: 6, Value: 0},
{Key: 7, Value: 0},
{Key: 8, Value: 0},
{Key: 9, Value: 0},
{Key: 10, Value: 0},
{Key: 11, Value: 0},
{Key: 12, Value: 95050},
{Key: 13, Value: 0},
{Key: 14, Value: 0},
{Key: 15, Value: 0},
{Key: 16, Value: 0},
{Key: 17, Value: 0},
{Key: 18, Value: 0},
{Key: 19, Value: 0},
},
NaNCounts: &[]colMap[int, int64]{
{Key: 16, Value: 0},
{Key: 17, Value: 0},
{Key: 18, Value: 0},
{Key: 19, Value: 0},
{Key: 10, Value: 0},
{Key: 11, Value: 0},
{Key: 12, Value: 0},
{Key: 13, Value: 0},
{Key: 14, Value: 0},
{Key: 15, Value: 0},
},
LowerBounds: &[]colMap[int, []byte]{
{Key: 1, Value: []byte{0x01, 0x00, 0x00, 0x00}},
{Key: 2, Value: []byte("2020-04-01 00:00")},
{Key: 3, Value: []byte("2020-04-01 00:13")},
{Key: 4, Value: []byte{0x00, 0x00, 0x00, 0x00}},
{Key: 5, Value: []byte{0x01, 0x00, 0x00, 0x00}},
{Key: 6, Value: []byte("N")},
{Key: 7, Value: []byte{0x01, 0x00, 0x00, 0x00}},
{Key: 8, Value: []byte{0x01, 0x00, 0x00, 0x00}},
{Key: 9, Value: []byte{0x01, 0x00, 0x00, 0x00}},
{Key: 10, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}},
{Key: 11, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}},
{Key: 13, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}},
{Key: 14, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}},
{Key: 15, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}},
{Key: 16, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}},
{Key: 17, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}},
{Key: 18, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}},
{Key: 19, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}},
},
UpperBounds: &[]colMap[int, []byte]{
{Key: 1, Value: []byte{0x01, 0x00, 0x00, 0x00}},
{Key: 2, Value: []byte("2020-04-30 23:5:")},
{Key: 3, Value: []byte("2020-05-01 00:1:")},
{Key: 4, Value: []byte{0x06, 0x00, 0x00, 0x00}},
{Key: 5, Value: []byte{'c', 0x00, 0x00, 0x00}},
{Key: 6, Value: []byte("Y")},
{Key: 7, Value: []byte{'\t', 0x01, 0x00, 0x00}},
{Key: 8, Value: []byte{'\t', 0x01, 0x00, 0x00}},
{Key: 9, Value: []byte{0x04, 0x01, 0x00, 0x00}},
{Key: 10, Value: []byte{'\\', 0x8f, 0xc2, 0xf5, '(', '8', 0x8c, '@'}},
{Key: 11, Value: []byte{0xcd, 0xcc, 0xcc, 0xcc, 0xcc, ',', 'f', '@'}},
{Key: 13, Value: []byte{0, 0, 0, 0, 0, 0, 0x1c, '@'}},
{Key: 14, Value: []byte{0x9a, 0x99, 0x99, 0x99, 0x99, 0x99, 0xf1, '?'}},
{Key: 15, Value: []byte{0, 0, 0, 0, 0, 0, 'Y', '@'}},
{Key: 16, Value: []byte{0, 0, 0, 0, 0, 0xb0, 'X', '@'}},
{Key: 17, Value: []byte{'3', '3', '3', '3', '3', '3', 0xd3, '?'}},
{Key: 18, Value: []byte{0xc3, 0xf5, '(', '\\', 0x8f, ':', 0x8c, '@'}},
{Key: 19, Value: []byte{0, 0, 0, 0, 0, 0, 0x04, '@'}},
},
Splits: &[]int64{4},
SortOrder: &intZero,
},
},
}
dataRecord0 = manifestEntryV1Records[0].Data.(*dataFile)
dataRecord1 = manifestEntryV1Records[1].Data.(*dataFile)
manifestEntryV2Records = []*manifestEntry{
{
EntryStatus: EntryStatusADDED,
Snapshot: &entrySnapshotID,
Data: &dataFile{
Path: dataRecord0.Path,
Format: dataRecord0.Format,
PartitionData: dataRecord0.PartitionData,
RecordCount: dataRecord0.RecordCount,
FileSize: dataRecord0.FileSize,
BlockSizeInBytes: dataRecord0.BlockSizeInBytes,
ColSizes: dataRecord0.ColSizes,
ValCounts: dataRecord0.ValCounts,
NullCounts: dataRecord0.NullCounts,
NaNCounts: dataRecord0.NaNCounts,
LowerBounds: dataRecord0.LowerBounds,
UpperBounds: dataRecord0.UpperBounds,
Splits: dataRecord0.Splits,
SortOrder: dataRecord0.SortOrder,
},
},
{
EntryStatus: EntryStatusADDED,
Snapshot: &entrySnapshotID,
Data: &dataFile{
Path: dataRecord1.Path,
Format: dataRecord1.Format,
PartitionData: dataRecord1.PartitionData,
RecordCount: dataRecord1.RecordCount,
FileSize: dataRecord1.FileSize,
BlockSizeInBytes: dataRecord1.BlockSizeInBytes,
ColSizes: dataRecord1.ColSizes,
ValCounts: dataRecord1.ValCounts,
NullCounts: dataRecord1.NullCounts,
NaNCounts: dataRecord1.NaNCounts,
LowerBounds: dataRecord1.LowerBounds,
UpperBounds: dataRecord1.UpperBounds,
Splits: dataRecord1.Splits,
SortOrder: dataRecord1.SortOrder,
},
},
}
testSchema = NewSchema(0,
NestedField{ID: 1, Name: "VendorID", Type: PrimitiveTypes.Int32, Required: true},
NestedField{ID: 2, Name: "tpep_pickup_datetime", Type: PrimitiveTypes.Timestamp, Required: true},
NestedField{ID: 3, Name: "tpep_dropoff_datetime", Type: PrimitiveTypes.Timestamp, Required: true},
NestedField{ID: 4, Name: "passenger_count", Type: PrimitiveTypes.Int64, Required: false},
NestedField{ID: 5, Name: "trip_distance", Type: PrimitiveTypes.Float64, Required: true},
NestedField{ID: 6, Name: "RatecodeID", Type: PrimitiveTypes.Int64, Required: false},
NestedField{ID: 7, Name: "store_and_fwd_flag", Type: PrimitiveTypes.String, Required: false},
NestedField{ID: 8, Name: "PULocationID", Type: PrimitiveTypes.Int32, Required: false},
NestedField{ID: 9, Name: "DOLocationID", Type: PrimitiveTypes.Int32, Required: false},
NestedField{ID: 10, Name: "payment_type", Type: PrimitiveTypes.Int64, Required: true},
NestedField{ID: 11, Name: "fare_amount", Type: PrimitiveTypes.Float64, Required: true},
NestedField{ID: 12, Name: "extra", Type: PrimitiveTypes.Float64, Required: false},
NestedField{ID: 13, Name: "mta_tax", Type: PrimitiveTypes.Float64, Required: false},
NestedField{ID: 14, Name: "tip_amount", Type: PrimitiveTypes.Float64, Required: false},
NestedField{ID: 15, Name: "tolls_amount", Type: PrimitiveTypes.Float64, Required: false},
NestedField{ID: 16, Name: "improvement_surcharge", Type: PrimitiveTypes.Float64, Required: false},
NestedField{ID: 17, Name: "total_amount", Type: PrimitiveTypes.Float64, Required: true},
NestedField{ID: 18, Name: "congestion_surcharge", Type: PrimitiveTypes.Float64, Required: false},
NestedField{ID: 19, Name: "VendorID", Type: PrimitiveTypes.Int32, Required: false},
)
)
type ManifestTestSuite struct {
suite.Suite
v1ManifestList bytes.Buffer
v1ManifestEntries bytes.Buffer
v2ManifestList bytes.Buffer
v2ManifestEntries bytes.Buffer
}
func (m *ManifestTestSuite) writeManifestList() {
m.Require().NoError(WriteManifestList(1, &m.v1ManifestList, snapshotID, nil, nil, manifestFileRecordsV1))
unassignedSequenceNum := int64(-1)
m.Require().NoError(WriteManifestList(2, &m.v2ManifestList, snapshotID, nil, &unassignedSequenceNum, manifestFileRecordsV2))
}
func (m *ManifestTestSuite) writeManifestEntries() {
manifestEntryV1Recs := make([]ManifestEntry, len(manifestEntryV1Records))
for i, rec := range manifestEntryV1Records {
manifestEntryV1Recs[i] = rec
}
manifestEntryV2Recs := make([]ManifestEntry, len(manifestEntryV2Records))
for i, rec := range manifestEntryV2Records {
manifestEntryV2Recs[i] = rec
}
partitionSpec := NewPartitionSpecID(1,
PartitionField{FieldID: 1000, SourceID: 1, Name: "VendorID", Transform: IdentityTransform{}},
PartitionField{FieldID: 1001, SourceID: 2, Name: "tpep_pickup_datetime", Transform: IdentityTransform{}})
mf, err := WriteManifest("/home/iceberg/warehouse/nyc/taxis_partitioned/metadata/0125c686-8aa6-4502-bdcc-b6d17ca41a3b-m0.avro",
&m.v1ManifestEntries, 1, partitionSpec, testSchema, entrySnapshotID, manifestEntryV1Recs)
m.Require().NoError(err)
m.EqualValues(m.v1ManifestEntries.Len(), mf.Length())
mf, err = WriteManifest("/home/iceberg/warehouse/nyc/taxis_partitioned/metadata/0125c686-8aa6-4502-bdcc-b6d17ca41a3b-m0.avro",
&m.v2ManifestEntries, 2, partitionSpec, testSchema, entrySnapshotID, manifestEntryV2Recs)
m.Require().NoError(err)
m.EqualValues(m.v2ManifestEntries.Len(), mf.Length())
}
func (m *ManifestTestSuite) SetupSuite() {
m.writeManifestList()
m.writeManifestEntries()
}
func (m *ManifestTestSuite) TestManifestEntriesV1() {
var mockfs internal.MockFS
manifest := manifestFile{
version: 1,
Path: manifestFileRecordsV1[0].FilePath(),
}
mockfs.Test(m.T())
mockfs.On("Open", manifest.FilePath()).Return(&internal.MockFile{
Contents: bytes.NewReader(m.v1ManifestEntries.Bytes()),
}, nil)
defer mockfs.AssertExpectations(m.T())
entries, err := manifest.FetchEntries(&mockfs, false)
m.Require().NoError(err)
m.Len(entries, 2)
m.Zero(manifest.PartitionSpecID())
m.Zero(manifest.SnapshotID())
m.Zero(manifest.AddedDataFiles())
m.Zero(manifest.ExistingDataFiles())
m.Zero(manifest.DeletedDataFiles())
m.Zero(manifest.ExistingRows())
m.Zero(manifest.DeletedRows())
m.Zero(manifest.AddedRows())
entry1 := entries[0]
m.Equal(EntryStatusADDED, entry1.Status())
m.EqualValues(8744736658442914487, entry1.SnapshotID())
m.Zero(entry1.SequenceNum())
m.NotNil(entry1.FileSequenceNum())
m.Zero(*entry1.FileSequenceNum())
datafile := entry1.DataFile()
m.Equal(EntryContentData, datafile.ContentType())
m.Equal("/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet", datafile.FilePath())
m.Equal(ParquetFile, datafile.FileFormat())
m.EqualValues(19513, datafile.Count())
m.EqualValues(388872, datafile.FileSizeBytes())
m.Equal(map[int]int64{
1: 53,
2: 98153,
3: 98693,
4: 53,
5: 53,
6: 53,
7: 17425,
8: 18528,
9: 53,
10: 44788,
11: 35571,
12: 53,
13: 1243,
14: 2355,
15: 12750,
16: 4029,
17: 110,
18: 47194,
19: 2948,
}, datafile.ColumnSizes())
m.Equal(map[int]int64{
1: 19513,
2: 19513,
3: 19513,
4: 19513,
5: 19513,
6: 19513,
7: 19513,
8: 19513,
9: 19513,
10: 19513,
11: 19513,
12: 19513,
13: 19513,
14: 19513,
15: 19513,
16: 19513,
17: 19513,
18: 19513,
19: 19513,
}, datafile.ValueCounts())
m.Equal(map[int]int64{
1: 19513,
2: 0,
3: 0,
4: 19513,
5: 19513,
6: 19513,
7: 0,
8: 0,
9: 19513,
10: 0,
11: 0,
12: 19513,
13: 0,
14: 0,
15: 0,
16: 0,
17: 0,
18: 0,
19: 0,
}, datafile.NullValueCounts())
m.Equal(map[int]int64{
16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0,
}, datafile.NaNValueCounts())
m.Equal(map[int][]byte{
2: []byte("2020-04-01 00:00"),
3: []byte("2020-04-01 00:12"),
7: {0x03, 0x00, 0x00, 0x00},
8: {0x01, 0x00, 0x00, 0x00},
10: {0xf6, '(', '\\', 0x8f, 0xc2, 0x05, 'S', 0xc0},
11: {0, 0, 0, 0, 0, 0, 0, 0},
13: {0, 0, 0, 0, 0, 0, 0, 0},
14: {0, 0, 0, 0, 0, 0, 0xe0, 0xbf},
15: {')', '\\', 0x8f, 0xc2, 0xf5, '(', 0x08, 0xc0},
16: {0, 0, 0, 0, 0, 0, 0, 0},
17: {0, 0, 0, 0, 0, 0, 0, 0},
18: {0xf6, '(', '\\', 0x8f, 0xc2, 0xc5, 'S', 0xc0},
19: {0, 0, 0, 0, 0, 0, 0x04, 0xc0},
}, datafile.LowerBoundValues())
m.Equal(map[int][]byte{
2: []byte("2020-04-30 23:5:"),
3: []byte("2020-05-01 00:41"),
7: {'\t', 0x01, 0, 0},
8: {'\t', 0x01, 0, 0},
10: {0xcd, 0xcc, 0xcc, 0xcc, 0xcc, ',', '_', '@'},
11: {0x1f, 0x85, 0xeb, 'Q', '\\', 0xe2, 0xfe, '@'},
13: {0, 0, 0, 0, 0, 0, 0x12, '@'},
14: {0, 0, 0, 0, 0, 0, 0xe0, '?'},
15: {'q', '=', '\n', 0xd7, 0xa3, 0xf0, '1', '@'},
16: {0, 0, 0, 0, 0, '`', 'B', '@'},
17: {'3', '3', '3', '3', '3', '3', 0xd3, '?'},
18: {0, 0, 0, 0, 0, 0x18, 'b', '@'},
19: {0, 0, 0, 0, 0, 0, 0x04, '@'},
}, datafile.UpperBoundValues())
m.Nil(datafile.KeyMetadata())
m.Equal([]int64{4}, datafile.SplitOffsets())
m.Nil(datafile.EqualityFieldIDs())
m.Zero(*datafile.SortOrderID())
}
func (m *ManifestTestSuite) TestReadManifestListV1() {
list, err := ReadManifestList(&m.v1ManifestList)
m.Require().NoError(err)
m.Len(list, 1)
m.Equal(1, list[0].Version())
m.EqualValues(7989, list[0].Length())
m.Equal(ManifestContentData, list[0].ManifestContent())
m.Zero(list[0].SequenceNum())
m.Zero(list[0].MinSequenceNum())
m.EqualValues(9182715666859759686, list[0].SnapshotID())
m.EqualValues(3, list[0].AddedDataFiles())
m.True(list[0].HasAddedFiles())
m.Zero(list[0].ExistingDataFiles())
m.False(list[0].HasExistingFiles())
m.Zero(list[0].DeletedDataFiles())
m.Equal(addedRows, list[0].AddedRows())
m.Zero(list[0].ExistingRows())
m.Zero(list[0].DeletedRows())
m.Nil(list[0].KeyMetadata())
m.Zero(list[0].PartitionSpecID())
m.Equal(snapshotID, list[0].SnapshotID())
part := list[0].Partitions()[0]
m.True(part.ContainsNull)
m.False(*part.ContainsNaN)
m.Equal([]byte{0x01, 0x00, 0x00, 0x00}, *part.LowerBound)
m.Equal([]byte{0x02, 0x00, 0x00, 0x00}, *part.UpperBound)
}
func (m *ManifestTestSuite) TestReadManifestListV2() {
list, err := ReadManifestList(&m.v2ManifestList)
m.Require().NoError(err)
m.Equal("/home/iceberg/warehouse/nyc/taxis_partitioned/metadata/0125c686-8aa6-4502-bdcc-b6d17ca41a3b-m0.avro", list[0].FilePath())
m.Len(list, 1)
m.Equal(2, list[0].Version())
m.EqualValues(7989, list[0].Length())
m.Equal(ManifestContentDeletes, list[0].ManifestContent())
m.EqualValues(3, list[0].SequenceNum())
m.EqualValues(3, list[0].MinSequenceNum())
m.EqualValues(9182715666859759686, list[0].SnapshotID())
m.EqualValues(3, list[0].AddedDataFiles())
m.True(list[0].HasAddedFiles())
m.Zero(list[0].ExistingDataFiles())
m.False(list[0].HasExistingFiles())
m.Zero(list[0].DeletedDataFiles())
m.Equal(addedRows, list[0].AddedRows())
m.Zero(list[0].ExistingRows())
m.Zero(list[0].DeletedRows())
m.Nil(list[0].KeyMetadata())
m.Zero(list[0].PartitionSpecID())
part := list[0].Partitions()[0]
m.True(part.ContainsNull)
m.False(*part.ContainsNaN)
m.Equal([]byte{0x01, 0x00, 0x00, 0x00}, *part.LowerBound)
m.Equal([]byte{0x02, 0x00, 0x00, 0x00}, *part.UpperBound)
}
func (m *ManifestTestSuite) TestReadManifestListIncompleteSchema() {
// This prevents a regression that could be caused by using a schema cache
// across multiple read/write operations of an avro file. While it may sound
// like a reasonable idea (caches speed things up, right?), it isn't that
// sort of cache: it's really a resolver to allow files with incomplete
// schemas, which we don't want.
// If a schema cache *were* in use, this would populate it with a definition for
// the missing record type in the incomplete schema. So we'll first "warm up"
// any cache. (Note: if working correctly, this will have no such side effect.)
var buf bytes.Buffer
seqNum := int64(9876)
err := WriteManifestList(2, &buf, 1234, nil, &seqNum, []ManifestFile{
NewManifestFile(2, "s3://bucket/namespace/table/metadata/abcd-0123.avro", 99, 0, 1234).Build(),
})
m.NoError(err)
files, err := ReadManifestList(&buf)
m.NoError(err)
m.Len(files, 1)
// This schema is that of a v2 manifest list, except that it refers to
// a type named "field_summary" for the "partitions" field, instead of
// actually including the definition of the "field_summary" record type.
// This omission should result in an error. But if a schema cache were
// in use, this could get resolved based on a type of the same name read
// from a file that defined it.
incompleteSchema := `
{
"name": "manifest_file",
"type": "record",
"fields": [
{
"name": "manifest_path",
"type": "string",
"field-id": 500
},
{
"name": "manifest_length",
"type": "long",
"field-id": 501
},
{
"name": "partition_spec_id",
"type": "int",
"field-id": 502
},
{
"name": "content",
"type": "int",
"default": 0,
"field-id": 517
},
{
"name": "sequence_number",
"type": "long",
"default": 0,
"field-id": 515
},
{
"name": "min_sequence_number",
"type": "long",
"default": 0,
"field-id": 516
},
{
"name": "added_snapshot_id",
"type": "long",
"field-id": 503
},
{
"name": "added_files_count",
"type": "int",
"field-id": 504
},
{
"name": "existing_files_count",
"type": "int",
"field-id": 505
},
{
"name": "deleted_files_count",
"type": "int",
"field-id": 506
},
{
"name": "partitions",
"type": [
"null",
{
"type": "array",
"items": "field_summary",
"element-id": 508
}
],
"field-id": 507
},
{
"name": "added_rows_count",
"type": "long",
"field-id": 512
},
{
"name": "existing_rows_count",
"type": "long",
"field-id": 513
},
{
"name": "deleted_rows_count",
"type": "long",
"field-id": 514
},
{
"name": "key_metadata",
"type": [
"null",
"bytes"
],
"field-id": 519
}
]
}`
// We'll generate a file that is missing part of its schema
cache := &avro.SchemaCache{}
sch, err := internal.NewManifestFileSchema(2)
m.NoError(err)
enc, err := ocf.NewEncoderWithSchema(sch, &buf,
ocf.WithEncoderSchemaCache(cache),
ocf.WithSchemaMarshaler(func(schema avro.Schema) ([]byte, error) {
return []byte(incompleteSchema), nil
}),
ocf.WithMetadata(map[string][]byte{
"format-version": {'2'},
"snapshot-id": []byte("1234"),
"sequence-number": []byte("9876"),
"parent-snapshot-id": []byte("null"),
}),
)
m.NoError(err)
for _, file := range files {
m.NoError(enc.Encode(file))
}
// This should fail because the file's schema is incomplete.
_, err = ReadManifestList(&buf)
m.ErrorContains(err, "unknown type: field_summary")
}
func (m *ManifestTestSuite) TestReadManifestIncompleteSchema() {
// This prevents a regression that could be caused by using a schema cache
// across multiple read/write operations of an avro file. While it may sound
// like a reasonable idea (caches speed things up, right?), it isn't that
// sort of cache: it's really a resolver to allow files with incomplete
// schemas, which we don't want.
// If a schema cache *were* in use, this would populate it with a definition for
// the missing record type in the incomplete schema. So we'll first "warm up"
// any cache. (Note: if working correctly, this will have no such side effect.)
var buf bytes.Buffer
partitionSpec := NewPartitionSpecID(1)
snapshotID := int64(12345678)
seqNum := int64(9876)
dataFileBuilder, err := NewDataFileBuilder(
partitionSpec,
EntryContentData,
"s3://bucket/namespace/table/data/abcd-0123.parquet",
ParquetFile,
map[int]any{},
100,
100*1000*1000,
)
m.NoError(err)
file, err := WriteManifest(
"s3://bucket/namespace/table/metadata/abcd-0123.avro", &buf, 2,
partitionSpec,
NewSchema(123,
NestedField{ID: 1, Name: "id", Type: Int64Type{}},
NestedField{ID: 2, Name: "name", Type: StringType{}},
),
snapshotID,
[]ManifestEntry{NewManifestEntry(
EntryStatusADDED,
&snapshotID,
&seqNum, &seqNum,
dataFileBuilder.Build(),
)},
)
m.NoError(err)
entries, err := ReadManifest(file, &buf, false)
m.NoError(err)
m.Len(entries, 1)
// This schema is that of a v2 manifest file, except that it refers to
// a type named "r2" for the "data_file" field, instead of actually
// including the definition of the "data_file" record type.
// This omission should result in an error. But if a schema cache were
// in use, this could get resolved based on a type of the same name read
// from a file that defined it.
incompleteSchema := `
{
"name": "manifest_entry",
"type": "record",
"fields": [
{
"name": "status",
"type": "int",
"field-id": 0
},
{
"name": "snapshot_id",
"type": [
"null",
"long"
],
"field-id": 1
},
{
"name": "sequence_number",
"type": [
"null",
"long"
],
"field-id": 3
},
{
"name": "file_sequence_number",
"type": [
"null",
"long"
],
"field-id": 4
},
{
"name": "data_file",
"type": "r2",
"field-id": 2
}
]
}`
// We'll generate a file that is missing part of its schema
cache := &avro.SchemaCache{}
partitionSchema, err := avro.NewRecordSchema("r102", "", nil) // empty struct
m.NoError(err)
sch, err := internal.NewManifestEntrySchema(partitionSchema, 2)
m.NoError(err)
enc, err := ocf.NewEncoderWithSchema(sch, &buf,
ocf.WithEncoderSchemaCache(cache),
ocf.WithSchemaMarshaler(func(schema avro.Schema) ([]byte, error) {
return []byte(incompleteSchema), nil
}),
ocf.WithMetadata(map[string][]byte{
"format-version": {'2'},
// TODO: spec says other things are required, like schema and partition-spec info,
// but this package currently only looks at this one value when reading...
}),
)
m.NoError(err)
for _, entry := range entries {
m.NoError(enc.Encode(entry))
}
// This should fail because the file's schema is incomplete.
_, err = ReadManifest(file, &buf, false)
m.ErrorContains(err, "unknown type: r2")
}
func (m *ManifestTestSuite) TestManifestEntriesV2() {
manifest := manifestFile{
version: 2,
SpecID: 1,
Path: manifestFileRecordsV2[0].FilePath(),
}
partitionSpec := NewPartitionSpecID(1,
PartitionField{FieldID: 1000, SourceID: 1, Name: "VendorID", Transform: IdentityTransform{}},
PartitionField{FieldID: 1001, SourceID: 2, Name: "tpep_pickup_datetime", Transform: IdentityTransform{}})
mockedFile := &internal.MockFile{
Contents: bytes.NewReader(m.v2ManifestEntries.Bytes()),
}
manifestReader, err := NewManifestReader(&manifest, mockedFile)
m.Require().NoError(err)
m.Equal(2, manifestReader.Version())
m.Equal(ManifestContentData, manifestReader.ManifestContent())
loadedSchema, err := manifestReader.Schema()
m.Require().NoError(err)
m.True(loadedSchema.Equals(testSchema))
loadedPartitionSpec, err := manifestReader.PartitionSpec()
m.Require().NoError(err)
m.True(loadedPartitionSpec.Equals(partitionSpec))
entry1, err := manifestReader.ReadEntry()
m.Require().NoError(err)
_, err = manifestReader.ReadEntry()
m.Require().NoError(err)
_, err = manifestReader.ReadEntry()
m.Require().ErrorIs(err, io.EOF)
m.Equal(int32(1), manifest.PartitionSpecID())
m.Zero(manifest.SnapshotID())
m.Zero(manifest.AddedDataFiles())
m.Zero(manifest.ExistingDataFiles())
m.Zero(manifest.DeletedDataFiles())
m.Zero(manifest.ExistingRows())
m.Zero(manifest.DeletedRows())
m.Zero(manifest.AddedRows())
m.Equal(EntryStatusADDED, entry1.Status())
m.Equal(entrySnapshotID, entry1.SnapshotID())
m.Zero(entry1.SequenceNum())
m.Zero(*entry1.FileSequenceNum())
datafile := entry1.DataFile()
m.Equal(EntryContentData, datafile.ContentType())
m.Equal("/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet", datafile.FilePath())
m.Equal(ParquetFile, datafile.FileFormat())
m.EqualValues(19513, datafile.Count())
m.EqualValues(388872, datafile.FileSizeBytes())
m.Equal(map[int]int64{
1: 53,
2: 98153,
3: 98693,
4: 53,
5: 53,
6: 53,
7: 17425,
8: 18528,
9: 53,
10: 44788,
11: 35571,
12: 53,
13: 1243,
14: 2355,
15: 12750,
16: 4029,
17: 110,
18: 47194,
19: 2948,
}, datafile.ColumnSizes())
m.Equal(map[int]int64{
1: 19513,
2: 19513,
3: 19513,
4: 19513,
5: 19513,
6: 19513,
7: 19513,
8: 19513,
9: 19513,
10: 19513,
11: 19513,
12: 19513,
13: 19513,
14: 19513,
15: 19513,
16: 19513,
17: 19513,
18: 19513,
19: 19513,
}, datafile.ValueCounts())
m.Equal(map[int]int64{
1: 19513,
2: 0,
3: 0,
4: 19513,
5: 19513,
6: 19513,
7: 0,
8: 0,
9: 19513,
10: 0,
11: 0,
12: 19513,
13: 0,
14: 0,
15: 0,
16: 0,
17: 0,
18: 0,
19: 0,
}, datafile.NullValueCounts())
m.Equal(map[int]int64{
16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0,
}, datafile.NaNValueCounts())
m.Equal(map[int][]byte{
2: []byte("2020-04-01 00:00"),
3: []byte("2020-04-01 00:12"),
7: {0x03, 0x00, 0x00, 0x00},
8: {0x01, 0x00, 0x00, 0x00},
10: {0xf6, '(', '\\', 0x8f, 0xc2, 0x05, 'S', 0xc0},
11: {0, 0, 0, 0, 0, 0, 0, 0},
13: {0, 0, 0, 0, 0, 0, 0, 0},
14: {0, 0, 0, 0, 0, 0, 0xe0, 0xbf},
15: {')', '\\', 0x8f, 0xc2, 0xf5, '(', 0x08, 0xc0},
16: {0, 0, 0, 0, 0, 0, 0, 0},
17: {0, 0, 0, 0, 0, 0, 0, 0},
18: {0xf6, '(', '\\', 0x8f, 0xc2, 0xc5, 'S', 0xc0},
19: {0, 0, 0, 0, 0, 0, 0x04, 0xc0},
}, datafile.LowerBoundValues())
m.Equal(map[int][]byte{
2: []byte("2020-04-30 23:5:"),
3: []byte("2020-05-01 00:41"),
7: {'\t', 0x01, 0, 0},
8: {'\t', 0x01, 0, 0},
10: {0xcd, 0xcc, 0xcc, 0xcc, 0xcc, ',', '_', '@'},
11: {0x1f, 0x85, 0xeb, 'Q', '\\', 0xe2, 0xfe, '@'},
13: {0, 0, 0, 0, 0, 0, 0x12, '@'},
14: {0, 0, 0, 0, 0, 0, 0xe0, '?'},
15: {'q', '=', '\n', 0xd7, 0xa3, 0xf0, '1', '@'},
16: {0, 0, 0, 0, 0, '`', 'B', '@'},
17: {'3', '3', '3', '3', '3', '3', 0xd3, '?'},
18: {0, 0, 0, 0, 0, 0x18, 'b', '@'},
19: {0, 0, 0, 0, 0, 0, 0x04, '@'},
}, datafile.UpperBoundValues())
m.Nil(datafile.KeyMetadata())
m.Equal([]int64{4}, datafile.SplitOffsets())
m.Nil(datafile.EqualityFieldIDs())
m.Zero(*datafile.SortOrderID())
}
func (m *ManifestTestSuite) TestManifestEntryBuilder() {
dataFileBuilder, err := NewDataFileBuilder(
NewPartitionSpec(),
EntryContentData,
"sample.parquet",
ParquetFile,
map[int]any{1001: int(1), 1002: time.Unix(1925, 0).UnixMicro()},
1,
2,
)
m.Require().NoError(err)
dataFileBuilder.ColumnSizes(map[int]int64{
1: 1,
2: 2,
}).ValueCounts(map[int]int64{
1: 1,
2: 2,
}).DistinctValueCounts(map[int]int64{
1: 1,
2: 2,
}).NullValueCounts(map[int]int64{
1: 0,
2: 0,
}).NaNValueCounts(map[int]int64{
1: 0,
2: 0,
}).LowerBoundValues(map[int][]byte{
1: {0x01, 0x00, 0x00, 0x00},
2: []byte("2020-04-01 00:00"),
}).UpperBoundValues(map[int][]byte{
1: {0x01, 0x00, 0x00, 0x00},
2: []byte("2020-04-30 23:5:"),
}).SplitOffsets([]int64{4}).EqualityFieldIDs([]int{1, 1}).SortOrderID(0)
snapshotEntryID := int64(1)
entry := NewManifestEntryBuilder(
EntryStatusEXISTING,
&snapshotEntryID,
dataFileBuilder.Build()).Build()
m.Assert().Equal(EntryStatusEXISTING, entry.Status())
m.Assert().EqualValues(1, entry.SnapshotID())
// unassigned sequence number
m.Assert().Equal(int64(-1), entry.SequenceNum())
m.Assert().Nil(entry.FileSequenceNum())
data := entry.DataFile()
m.Assert().Equal(EntryContentData, data.ContentType())
m.Assert().Equal("sample.parquet", data.FilePath())
m.Assert().Equal(ParquetFile, data.FileFormat())
m.Assert().EqualValues(1, data.Count())
m.Assert().EqualValues(2, data.FileSizeBytes())
m.Assert().Equal(map[int]int64{
1: 1,
2: 2,
}, data.ColumnSizes())
m.Assert().Equal(map[int]int64{
1: 1,
2: 2,
}, data.ValueCounts())
m.Assert().Equal(map[int]int64{
1: 0,
2: 0,
}, data.NullValueCounts())
m.Assert().Equal(map[int]int64{
1: 1,
2: 2,
}, data.DistinctValueCounts())
m.Assert().Equal(map[int]int64{
1: 0,
2: 0,
}, data.NaNValueCounts())
m.Assert().Equal(map[int][]byte{
1: {0x01, 0x00, 0x00, 0x00},
2: []byte("2020-04-01 00:00"),
}, data.LowerBoundValues())
m.Assert().Equal(map[int][]byte{
1: {0x01, 0x00, 0x00, 0x00},
2: []byte("2020-04-30 23:5:"),
}, data.UpperBoundValues())
m.Assert().Equal([]int64{4}, data.SplitOffsets())
m.Assert().Equal([]int{1, 1}, data.EqualityFieldIDs())
m.Assert().Equal(0, *data.SortOrderID())
}
func (m *ManifestTestSuite) TestManifestWriterMeta() {
sch := NewSchema(0, NestedField{ID: 0, Name: "test01", Type: StringType{}})
w, err := NewManifestWriter(2, io.Discard, *UnpartitionedSpec, sch, 1)
m.Require().NoError(err)
md, err := w.meta()
m.Require().NoError(err)
m.NotEqual("null", string(md["partition-spec"]))
m.Equal("[]", string(md["partition-spec"]))
}
func TestManifests(t *testing.T) {
suite.Run(t, new(ManifestTestSuite))
}