fix(catalog): add rest integration write test (#352)

Includes a few fixes that were found when writing the test
diff --git a/catalog/rest/options.go b/catalog/rest/options.go
index 5aae122..ae1782c 100644
--- a/catalog/rest/options.go
+++ b/catalog/rest/options.go
@@ -21,6 +21,7 @@
 	"crypto/tls"
 	"net/url"
 
+	"github.com/apache/iceberg-go"
 	"github.com/aws/aws-sdk-go-v2/aws"
 )
 
@@ -100,6 +101,12 @@
 	}
 }
 
+func WithAdditionalProps(props iceberg.Properties) Option {
+	return func(o *options) {
+		o.additionalProps = props
+	}
+}
+
 type options struct {
 	awsConfig         aws.Config
 	tlsConfig         *tls.Config
@@ -113,4 +120,6 @@
 	prefix            string
 	authUri           *url.URL
 	scope             string
+
+	additionalProps iceberg.Properties
 }
diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go
index cbeed0f..5eb870f 100644
--- a/catalog/rest/rest.go
+++ b/catalog/rest/rest.go
@@ -377,7 +377,9 @@
 }
 
 func fromProps(props iceberg.Properties) *options {
-	o := &options{}
+	o := &options{
+		additionalProps: iceberg.Properties{},
+	}
 	for k, v := range props {
 		switch k {
 		case keyOauthToken:
@@ -411,6 +413,11 @@
 			} else {
 				o.tlsConfig.InsecureSkipVerify = verify
 			}
+		case "uri", "type":
+		default:
+			if v != "" {
+				o.additionalProps[k] = v
+			}
 		}
 	}
 
@@ -418,7 +425,12 @@
 }
 
 func toProps(o *options) iceberg.Properties {
-	props := iceberg.Properties{}
+	var props iceberg.Properties
+	if o.additionalProps != nil {
+		props = o.additionalProps
+	} else {
+		props = iceberg.Properties{}
+	}
 
 	setIf := func(key, v string) {
 		if v != "" {
diff --git a/catalog/rest/rest_integration_test.go b/catalog/rest/rest_integration_test.go
index b17725a..b68a32f 100644
--- a/catalog/rest/rest_integration_test.go
+++ b/catalog/rest/rest_integration_test.go
@@ -21,12 +21,17 @@
 
 import (
 	"context"
+	"net/url"
 	"testing"
 
+	"github.com/apache/arrow-go/v18/arrow/array"
+	"github.com/apache/arrow-go/v18/arrow/memory"
+	"github.com/apache/arrow-go/v18/parquet/pqarrow"
 	"github.com/apache/iceberg-go"
 	"github.com/apache/iceberg-go/catalog"
 	"github.com/apache/iceberg-go/catalog/rest"
 	"github.com/apache/iceberg-go/io"
+	"github.com/apache/iceberg-go/table"
 	"github.com/stretchr/testify/suite"
 )
 
@@ -43,6 +48,7 @@
 	cat, err := catalog.Load(ctx, "local", iceberg.Properties{
 		"type":               "rest",
 		"uri":                "http://localhost:8181",
+		io.S3Region:          "us-east-1",
 		io.S3AccessKeyID:     "admin",
 		io.S3SecretAccessKey: "password",
 	})
@@ -60,7 +66,7 @@
 	tableSchemaNested = iceberg.NewSchemaWithIdentifiers(1,
 		[]int{1},
 		iceberg.NestedField{
-			ID: 1, Name: "foo", Type: iceberg.PrimitiveTypes.String, Required: false},
+			ID: 1, Name: "foo", Type: iceberg.PrimitiveTypes.String, Required: true},
 		iceberg.NestedField{
 			ID: 2, Name: "bar", Type: iceberg.PrimitiveTypes.Int32, Required: true},
 		iceberg.NestedField{
@@ -175,6 +181,72 @@
 	s.Require().NoError(s.cat.DropTable(s.ctx, catalog.ToIdentifier(TestNamespaceIdent, "test-table")))
 }
 
+func (s *RestIntegrationSuite) TestWriteCommitTable() {
+	s.ensureNamespace()
+
+	const location = "s3://warehouse/iceberg"
+
+	tbl, err := s.cat.CreateTable(s.ctx,
+		catalog.ToIdentifier(TestNamespaceIdent, "test-table-2"),
+		tableSchemaNested, catalog.WithLocation(location))
+	s.Require().NoError(err)
+	s.Require().NotNil(tbl)
+
+	defer func() {
+		s.Require().NoError(s.cat.DropTable(s.ctx, catalog.ToIdentifier(TestNamespaceIdent, "test-table-2")))
+	}()
+
+	s.Equal(location, tbl.Location())
+
+	arrSchema, err := table.SchemaToArrowSchema(tableSchemaNested, nil, false, false)
+	s.Require().NoError(err)
+
+	table, err := array.TableFromJSON(memory.DefaultAllocator, arrSchema,
+		[]string{`[
+		{
+			"foo": "foo_string",
+			"bar": 123,
+			"baz": true,
+			"qux": ["a", "b", "c"],
+			"quux": [{"key": "gopher", "value": [
+				{"key": "golang", "value": "1337"}]}],
+			"location": [{"latitude": 37.7749, "longitude": -122.4194}],
+			"person": {"name": "gopher", "age": 10}
+		}
+	]`})
+	s.Require().NoError(err)
+	defer table.Release()
+
+	pqfile, err := url.JoinPath(location, "data", "test_commit_table_data", "test.parquet")
+	s.Require().NoError(err)
+
+	fw, err := tbl.FS().(io.WriteFileIO).Create(pqfile)
+	s.Require().NoError(err)
+	s.Require().NoError(pqarrow.WriteTable(table, fw, table.NumRows(),
+		nil, pqarrow.DefaultWriterProps()))
+	defer tbl.FS().Remove(pqfile)
+
+	txn := tbl.NewTransaction()
+	s.Require().NoError(txn.AddFiles([]string{pqfile}, nil, false))
+	updated, err := txn.Commit(s.ctx)
+	s.Require().NoError(err)
+
+	mf := []iceberg.ManifestFile{}
+	for m, err := range updated.AllManifests() {
+		s.Require().NoError(err)
+		s.Require().NotNil(m)
+		mf = append(mf, m)
+	}
+
+	s.Len(mf, 1)
+	s.EqualValues(1, mf[0].AddedDataFiles())
+	entries, err := mf[0].FetchEntries(updated.FS(), false)
+	s.Require().NoError(err)
+
+	s.Len(entries, 1)
+	s.Equal(pqfile, entries[0].DataFile().FilePath())
+}
+
 func TestRestIntegration(t *testing.T) {
 	suite.Run(t, new(RestIntegrationSuite))
 }