| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package array |
| |
| import ( |
| "fmt" |
| "strings" |
| "sync/atomic" |
| |
| "github.com/apache/arrow/go/arrow" |
| "github.com/apache/arrow/go/arrow/internal/debug" |
| "github.com/apache/arrow/go/arrow/memory" |
| ) |
| |
| // RecordReader reads a stream of records. |
| type RecordReader interface { |
| Retain() |
| Release() |
| |
| Schema() *arrow.Schema |
| |
| Next() bool |
| Record() Record |
| } |
| |
| // simpleRecords is a simple iterator over a collection of records. |
| type simpleRecords struct { |
| refCount int64 |
| |
| schema *arrow.Schema |
| recs []Record |
| cur Record |
| } |
| |
| // NewRecordReader returns a simple iterator over the given slice of records. |
| func NewRecordReader(schema *arrow.Schema, recs []Record) (*simpleRecords, error) { |
| rs := &simpleRecords{ |
| refCount: 1, |
| schema: schema, |
| recs: recs, |
| cur: nil, |
| } |
| |
| for _, rec := range rs.recs { |
| rec.Retain() |
| } |
| |
| for _, rec := range recs { |
| if !rec.Schema().Equal(rs.schema) { |
| rs.Release() |
| return nil, fmt.Errorf("arrow/array: mismatch schema") |
| } |
| } |
| |
| return rs, nil |
| } |
| |
| // Retain increases the reference count by 1. |
| // Retain may be called simultaneously from multiple goroutines. |
| func (rs *simpleRecords) Retain() { |
| atomic.AddInt64(&rs.refCount, 1) |
| } |
| |
| // Release decreases the reference count by 1. |
| // When the reference count goes to zero, the memory is freed. |
| // Release may be called simultaneously from multiple goroutines. |
| func (rs *simpleRecords) Release() { |
| debug.Assert(atomic.LoadInt64(&rs.refCount) > 0, "too many releases") |
| |
| if atomic.AddInt64(&rs.refCount, -1) == 0 { |
| if rs.cur != nil { |
| rs.cur.Release() |
| } |
| for _, rec := range rs.recs { |
| rec.Release() |
| } |
| rs.recs = nil |
| } |
| } |
| |
| func (rs *simpleRecords) Schema() *arrow.Schema { return rs.schema } |
| func (rs *simpleRecords) Record() Record { return rs.cur } |
| func (rs *simpleRecords) Next() bool { |
| if len(rs.recs) == 0 { |
| return false |
| } |
| if rs.cur != nil { |
| rs.cur.Release() |
| } |
| rs.cur = rs.recs[0] |
| rs.recs = rs.recs[1:] |
| return true |
| } |
| |
| // Record is a collection of equal-length arrays |
| // matching a particular Schema. |
| type Record interface { |
| Release() |
| Retain() |
| |
| Schema() *arrow.Schema |
| |
| NumRows() int64 |
| NumCols() int64 |
| |
| Columns() []Interface |
| Column(i int) Interface |
| ColumnName(i int) string |
| |
| // NewSlice constructs a zero-copy slice of the record with the indicated |
| // indices i and j, corresponding to array[i:j]. |
| // The returned record must be Release()'d after use. |
| // |
| // NewSlice panics if the slice is outside the valid range of the record array. |
| // NewSlice panics if j < i. |
| NewSlice(i, j int64) Record |
| } |
| |
| // simpleRecord is a basic, non-lazy in-memory record batch. |
| type simpleRecord struct { |
| refCount int64 |
| |
| schema *arrow.Schema |
| |
| rows int64 |
| arrs []Interface |
| } |
| |
| // NewRecord returns a basic, non-lazy in-memory record batch. |
| // |
| // NewRecord panics if the columns and schema are inconsistent. |
| // NewRecord panics if rows is larger than the height of the columns. |
| func NewRecord(schema *arrow.Schema, cols []Interface, nrows int64) *simpleRecord { |
| rec := &simpleRecord{ |
| refCount: 1, |
| schema: schema, |
| rows: nrows, |
| arrs: make([]Interface, len(cols)), |
| } |
| copy(rec.arrs, cols) |
| for _, arr := range rec.arrs { |
| arr.Retain() |
| } |
| |
| if rec.rows < 0 { |
| switch len(rec.arrs) { |
| case 0: |
| rec.rows = 0 |
| default: |
| rec.rows = int64(rec.arrs[0].Len()) |
| } |
| } |
| |
| err := rec.validate() |
| if err != nil { |
| rec.Release() |
| panic(err) |
| } |
| |
| return rec |
| } |
| |
| func (rec *simpleRecord) validate() error { |
| if len(rec.arrs) != len(rec.schema.Fields()) { |
| return fmt.Errorf("arrow/array: number of columns/fields mismatch") |
| } |
| |
| for i, arr := range rec.arrs { |
| f := rec.schema.Field(i) |
| if int64(arr.Len()) < rec.rows { |
| return fmt.Errorf("arrow/array: mismatch number of rows in column %q: got=%d, want=%d", |
| f.Name, |
| arr.Len(), rec.rows, |
| ) |
| } |
| if !arrow.TypeEquals(f.Type, arr.DataType()) { |
| return fmt.Errorf("arrow/array: column %q type mismatch: got=%v, want=%v", |
| f.Name, |
| arr.DataType(), f.Type, |
| ) |
| } |
| } |
| return nil |
| } |
| |
| // Retain increases the reference count by 1. |
| // Retain may be called simultaneously from multiple goroutines. |
| func (rec *simpleRecord) Retain() { |
| atomic.AddInt64(&rec.refCount, 1) |
| } |
| |
| // Release decreases the reference count by 1. |
| // When the reference count goes to zero, the memory is freed. |
| // Release may be called simultaneously from multiple goroutines. |
| func (rec *simpleRecord) Release() { |
| debug.Assert(atomic.LoadInt64(&rec.refCount) > 0, "too many releases") |
| |
| if atomic.AddInt64(&rec.refCount, -1) == 0 { |
| for _, arr := range rec.arrs { |
| arr.Release() |
| } |
| rec.arrs = nil |
| } |
| } |
| |
| func (rec *simpleRecord) Schema() *arrow.Schema { return rec.schema } |
| func (rec *simpleRecord) NumRows() int64 { return rec.rows } |
| func (rec *simpleRecord) NumCols() int64 { return int64(len(rec.arrs)) } |
| func (rec *simpleRecord) Columns() []Interface { return rec.arrs } |
| func (rec *simpleRecord) Column(i int) Interface { return rec.arrs[i] } |
| func (rec *simpleRecord) ColumnName(i int) string { return rec.schema.Field(i).Name } |
| |
| // NewSlice constructs a zero-copy slice of the record with the indicated |
| // indices i and j, corresponding to array[i:j]. |
| // The returned record must be Release()'d after use. |
| // |
| // NewSlice panics if the slice is outside the valid range of the record array. |
| // NewSlice panics if j < i. |
| func (rec *simpleRecord) NewSlice(i, j int64) Record { |
| arrs := make([]Interface, len(rec.arrs)) |
| for ii, arr := range rec.arrs { |
| arrs[ii] = NewSlice(arr, i, j) |
| } |
| defer func() { |
| for _, arr := range arrs { |
| arr.Release() |
| } |
| }() |
| return NewRecord(rec.schema, arrs, j-i) |
| } |
| |
| func (rec *simpleRecord) String() string { |
| o := new(strings.Builder) |
| fmt.Fprintf(o, "record:\n %v\n", rec.schema) |
| fmt.Fprintf(o, " rows: %d\n", rec.rows) |
| for i, col := range rec.arrs { |
| fmt.Fprintf(o, " col[%d][%s]: %v\n", i, rec.schema.Field(i).Name, col) |
| } |
| |
| return o.String() |
| } |
| |
| // RecordBuilder eases the process of building a Record, iteratively, from |
| // a known Schema. |
| type RecordBuilder struct { |
| refCount int64 |
| mem memory.Allocator |
| schema *arrow.Schema |
| fields []Builder |
| } |
| |
| // NewRecordBuilder returns a builder, using the provided memory allocator and a schema. |
| func NewRecordBuilder(mem memory.Allocator, schema *arrow.Schema) *RecordBuilder { |
| b := &RecordBuilder{ |
| refCount: 1, |
| mem: mem, |
| schema: schema, |
| fields: make([]Builder, len(schema.Fields())), |
| } |
| |
| for i, f := range schema.Fields() { |
| b.fields[i] = newBuilder(b.mem, f.Type) |
| } |
| |
| return b |
| } |
| |
| // Retain increases the reference count by 1. |
| // Retain may be called simultaneously from multiple goroutines. |
| func (b *RecordBuilder) Retain() { |
| atomic.AddInt64(&b.refCount, 1) |
| } |
| |
| // Release decreases the reference count by 1. |
| func (b *RecordBuilder) Release() { |
| debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases") |
| |
| for _, f := range b.fields { |
| f.Release() |
| } |
| |
| if atomic.AddInt64(&b.refCount, -1) == 0 { |
| b.fields = nil |
| } |
| } |
| |
| func (b *RecordBuilder) Schema() *arrow.Schema { return b.schema } |
| func (b *RecordBuilder) Fields() []Builder { return b.fields } |
| func (b *RecordBuilder) Field(i int) Builder { return b.fields[i] } |
| |
| func (b *RecordBuilder) Reserve(size int) { |
| for _, f := range b.fields { |
| f.Reserve(size) |
| } |
| } |
| |
| // NewRecord creates a new record from the memory buffers and resets the |
| // RecordBuilder so it can be used to build a new record. |
| // |
| // The returned Record must be Release()'d after use. |
| // |
| // NewRecord panics if the fields' builder do not have the same length. |
| func (b *RecordBuilder) NewRecord() Record { |
| cols := make([]Interface, len(b.fields)) |
| rows := int64(0) |
| |
| defer func(cols []Interface) { |
| for _, col := range cols { |
| if col == nil { |
| continue |
| } |
| col.Release() |
| } |
| }(cols) |
| |
| for i, f := range b.fields { |
| cols[i] = f.NewArray() |
| irow := int64(cols[i].Len()) |
| if i > 0 && irow != rows { |
| panic(fmt.Errorf("arrow/array: field %d has %d rows. want=%d", i, irow, rows)) |
| } |
| rows = irow |
| } |
| |
| return NewRecord(b.schema, cols, rows) |
| } |
| |
| var ( |
| _ Record = (*simpleRecord)(nil) |
| _ RecordReader = (*simpleRecords)(nil) |
| ) |