fix(catalog): add rest integration write test (#352)
Includes a few fixes that were found when writing the test
diff --git a/catalog/rest/options.go b/catalog/rest/options.go
index 5aae122..ae1782c 100644
--- a/catalog/rest/options.go
+++ b/catalog/rest/options.go
@@ -21,6 +21,7 @@
"crypto/tls"
"net/url"
+ "github.com/apache/iceberg-go"
"github.com/aws/aws-sdk-go-v2/aws"
)
@@ -100,6 +101,12 @@
}
}
+func WithAdditionalProps(props iceberg.Properties) Option {
+ return func(o *options) {
+ o.additionalProps = props
+ }
+}
+
type options struct {
awsConfig aws.Config
tlsConfig *tls.Config
@@ -113,4 +120,6 @@
prefix string
authUri *url.URL
scope string
+
+ additionalProps iceberg.Properties
}
diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go
index cbeed0f..5eb870f 100644
--- a/catalog/rest/rest.go
+++ b/catalog/rest/rest.go
@@ -377,7 +377,9 @@
}
func fromProps(props iceberg.Properties) *options {
- o := &options{}
+ o := &options{
+ additionalProps: iceberg.Properties{},
+ }
for k, v := range props {
switch k {
case keyOauthToken:
@@ -411,6 +413,11 @@
} else {
o.tlsConfig.InsecureSkipVerify = verify
}
+ case "uri", "type":
+ default:
+ if v != "" {
+ o.additionalProps[k] = v
+ }
}
}
@@ -418,7 +425,12 @@
}
func toProps(o *options) iceberg.Properties {
- props := iceberg.Properties{}
+ var props iceberg.Properties
+ if o.additionalProps != nil {
+ props = o.additionalProps
+ } else {
+ props = iceberg.Properties{}
+ }
setIf := func(key, v string) {
if v != "" {
diff --git a/catalog/rest/rest_integration_test.go b/catalog/rest/rest_integration_test.go
index b17725a..b68a32f 100644
--- a/catalog/rest/rest_integration_test.go
+++ b/catalog/rest/rest_integration_test.go
@@ -21,12 +21,17 @@
import (
"context"
+ "net/url"
"testing"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/catalog/rest"
"github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/table"
"github.com/stretchr/testify/suite"
)
@@ -43,6 +48,7 @@
cat, err := catalog.Load(ctx, "local", iceberg.Properties{
"type": "rest",
"uri": "http://localhost:8181",
+ io.S3Region: "us-east-1",
io.S3AccessKeyID: "admin",
io.S3SecretAccessKey: "password",
})
@@ -60,7 +66,7 @@
tableSchemaNested = iceberg.NewSchemaWithIdentifiers(1,
[]int{1},
iceberg.NestedField{
- ID: 1, Name: "foo", Type: iceberg.PrimitiveTypes.String, Required: false},
+ ID: 1, Name: "foo", Type: iceberg.PrimitiveTypes.String, Required: true},
iceberg.NestedField{
ID: 2, Name: "bar", Type: iceberg.PrimitiveTypes.Int32, Required: true},
iceberg.NestedField{
@@ -175,6 +181,72 @@
s.Require().NoError(s.cat.DropTable(s.ctx, catalog.ToIdentifier(TestNamespaceIdent, "test-table")))
}
+func (s *RestIntegrationSuite) TestWriteCommitTable() {
+ s.ensureNamespace()
+
+ const location = "s3://warehouse/iceberg"
+
+ tbl, err := s.cat.CreateTable(s.ctx,
+ catalog.ToIdentifier(TestNamespaceIdent, "test-table-2"),
+ tableSchemaNested, catalog.WithLocation(location))
+ s.Require().NoError(err)
+ s.Require().NotNil(tbl)
+
+ defer func() {
+ s.Require().NoError(s.cat.DropTable(s.ctx, catalog.ToIdentifier(TestNamespaceIdent, "test-table-2")))
+ }()
+
+ s.Equal(location, tbl.Location())
+
+ arrSchema, err := table.SchemaToArrowSchema(tableSchemaNested, nil, false, false)
+ s.Require().NoError(err)
+
+ table, err := array.TableFromJSON(memory.DefaultAllocator, arrSchema,
+ []string{`[
+ {
+ "foo": "foo_string",
+ "bar": 123,
+ "baz": true,
+ "qux": ["a", "b", "c"],
+ "quux": [{"key": "gopher", "value": [
+ {"key": "golang", "value": "1337"}]}],
+ "location": [{"latitude": 37.7749, "longitude": -122.4194}],
+ "person": {"name": "gopher", "age": 10}
+ }
+ ]`})
+ s.Require().NoError(err)
+ defer table.Release()
+
+ pqfile, err := url.JoinPath(location, "data", "test_commit_table_data", "test.parquet")
+ s.Require().NoError(err)
+
+ fw, err := tbl.FS().(io.WriteFileIO).Create(pqfile)
+ s.Require().NoError(err)
+ s.Require().NoError(pqarrow.WriteTable(table, fw, table.NumRows(),
+ nil, pqarrow.DefaultWriterProps()))
+ defer tbl.FS().Remove(pqfile)
+
+ txn := tbl.NewTransaction()
+ s.Require().NoError(txn.AddFiles([]string{pqfile}, nil, false))
+ updated, err := txn.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ mf := []iceberg.ManifestFile{}
+ for m, err := range updated.AllManifests() {
+ s.Require().NoError(err)
+ s.Require().NotNil(m)
+ mf = append(mf, m)
+ }
+
+ s.Len(mf, 1)
+ s.EqualValues(1, mf[0].AddedDataFiles())
+ entries, err := mf[0].FetchEntries(updated.FS(), false)
+ s.Require().NoError(err)
+
+ s.Len(entries, 1)
+ s.Equal(pqfile, entries[0].DataFile().FilePath())
+}
+
func TestRestIntegration(t *testing.T) {
suite.Run(t, new(RestIntegrationSuite))
}