fix(table): writing map column with multiple entries in a row (#598)
fixes #595
Turns out we were incorrectly constructing the Map column inside of
`ToRequestedSchema`. I also discovered a potential memory leak while
testing this and fixed that with an added `defer batch.Release()` to
correctly handle references.
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 4cd254e..4ba1d24 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -896,7 +896,7 @@
valField := a.constructField(m.ValueField(), vals.DataType())
mapType := arrow.MapOfWithMetadata(keyField.Type, keyField.Metadata, valField.Type, valField.Metadata)
- childData := array.NewData(mapType.Elem(), arr.Len(), []*memory.Buffer{nil},
+ childData := array.NewData(mapType.Elem(), arr.Data().Children()[0].Len(), []*memory.Buffer{nil},
[]arrow.ArrayData{keys.Data(), vals.Data()}, 0, 0)
defer childData.Release()
newData := array.NewData(mapType, arr.Len(), arr.Data().Buffers(),
diff --git a/table/table_test.go b/table/table_test.go
index 47fbe96..7f7abc0 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -34,6 +34,7 @@
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/compute"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/apache/iceberg-go"
@@ -1473,6 +1474,91 @@
t.NotContains(logOutput, "no such file or directory")
}
+// testing issue reported in https://github.com/apache/iceberg-go/issues/595
+func TestWriteMapType(t *testing.T) {
+ loc := filepath.ToSlash(t.TempDir())
+
+ cat, err := catalog.Load(context.Background(), "default", iceberg.Properties{
+ "uri": ":memory:",
+ "type": "sql",
+ sql.DriverKey: sqliteshim.ShimName,
+ sql.DialectKey: string(sql.SQLite),
+ "warehouse": "file://" + loc,
+ })
+ require.NoError(t, err)
+
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ ctx := compute.WithAllocator(context.Background(), mem)
+ cat.CreateNamespace(ctx, catalog.ToIdentifier("default"), nil)
+ iceSch := iceberg.NewSchema(1,
+ iceberg.NestedField{
+ ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.String, Required: true,
+ },
+ iceberg.NestedField{
+ ID: 2, Name: "attrs", Required: true, Type: &iceberg.MapType{
+ KeyID: 3,
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueID: 4,
+ ValueType: iceberg.PrimitiveTypes.String,
+ ValueRequired: false,
+ },
+ })
+
+ ident := catalog.ToIdentifier("default", "repro_map")
+ tbl, err := cat.CreateTable(ctx, ident, iceSch, catalog.WithLocation(loc))
+ require.NoError(t, err)
+
+ arrowSch, err := table.SchemaToArrowSchema(iceSch, nil, true, false)
+ require.NoError(t, err)
+
+ bldr := array.NewRecordBuilder(mem, arrowSch)
+ defer bldr.Release()
+
+ idbldr := bldr.Field(0).(*array.StringBuilder)
+ attrBldr := bldr.Field(1).(*array.MapBuilder)
+ attrKeyBldr := attrBldr.KeyBuilder().(*array.StringBuilder)
+ attrItemBldr := attrBldr.ItemBuilder().(*array.StringBuilder)
+
+ idbldr.Append("row-0")
+ attrBldr.Append(true)
+ attrKeyBldr.Append("a")
+ attrItemBldr.Append("1")
+
+ idbldr.Append("row-1")
+ attrBldr.Append(true)
+ attrKeyBldr.AppendValues([]string{"b", "c"}, nil)
+ attrItemBldr.AppendValues([]string{"2", "3"}, nil)
+
+ rec := bldr.NewRecordBatch()
+ defer rec.Release()
+
+ rr, err := array.NewRecordReader(arrowSch, []arrow.RecordBatch{rec})
+ require.NoError(t, err)
+ defer rr.Release()
+
+ result, err := tbl.Append(ctx, rr, nil)
+ require.NoError(t, err)
+
+ resultTbl, err := result.Scan().ToArrowTable(ctx)
+ require.NoError(t, err)
+ defer resultTbl.Release()
+
+ expectedSchema, err := table.SchemaToArrowSchema(iceSch, nil, false, false)
+ require.NoError(t, err)
+ expected, err := array.TableFromJSON(mem, expectedSchema, []string{
+ `[
+ {"id": "row-0", "attrs": [{"key": "a", "value": "1"}]},
+ {"id": "row-1", "attrs": [{"key": "b", "value": "2"}, {"key": "c", "value": "3"}]}
+ ]`,
+ })
+ require.NoError(t, err)
+ defer expected.Release()
+
+ require.True(t, array.TableEqual(expected, resultTbl), "expected:\n %s\ngot:\n %s", expected, resultTbl)
+}
+
func (t *TableTestSuite) TestRefresh() {
cat, err := catalog.Load(context.Background(), "default", iceberg.Properties{
"uri": ":memory:",
diff --git a/table/writer.go b/table/writer.go
index 7a9155c..ef7f728 100644
--- a/table/writer.go
+++ b/table/writer.go
@@ -68,6 +68,7 @@
return nil, err
}
batches[i] = rec
+ defer rec.Release()
}
statsCols, err := computeStatsPlan(w.fileSchema, w.meta.props)