blob: 9b5b9f3ac5ddd65582233a8d3e9f505985fa0cee [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build cgo
package cdata
import (
"unsafe"
"github.com/apache/arrow/go/v6/arrow"
"github.com/apache/arrow/go/v6/arrow/array"
"github.com/apache/arrow/go/v6/arrow/arrio"
"github.com/apache/arrow/go/v6/arrow/memory"
"golang.org/x/xerrors"
)
// SchemaFromPtr is a simple helper function to cast a uintptr to a *CArrowSchema
func SchemaFromPtr(ptr uintptr) *CArrowSchema { return (*CArrowSchema)(unsafe.Pointer(ptr)) }
// ArrayFromPtr is a simple helper function to cast a uintptr to a *CArrowArray
func ArrayFromPtr(ptr uintptr) *CArrowArray { return (*CArrowArray)(unsafe.Pointer(ptr)) }
// ImportCArrowField takes in an ArrowSchema from the C Data interface, it
// will copy the metadata and type definitions rather than keep direct references
// to them. It is safe to call C.ArrowSchemaRelease after receiving the field
// from this function.
func ImportCArrowField(out *CArrowSchema) (arrow.Field, error) {
return importSchema(out)
}
// ImportCArrowSchema takes in the ArrowSchema from the C Data Interface, it
// will copy the metadata and schema definitions over from the C object rather
// than keep direct references to them. This function will call ArrowSchemaRelease
// on the passed in schema regardless of whether or not there is an error returned.
//
// This version is intended to take in a schema for a record batch, which means
// that the top level of the schema should be a struct of the schema fields. If
// importing a single array's schema, then use ImportCArrowField instead.
func ImportCArrowSchema(out *CArrowSchema) (*arrow.Schema, error) {
ret, err := importSchema(out)
if err != nil {
return nil, err
}
return arrow.NewSchema(ret.Type.(*arrow.StructType).Fields(), &ret.Metadata), nil
}
// ImportCArrayWithType takes a pointer to a C Data ArrowArray and interprets the values
// as an array with the given datatype. If err is not nil, then ArrowArrayRelease must still
// be called on arr to release the memory.
//
// The underlying buffers will not be copied, but will instead be referenced directly
// by the resulting array interface object. The passed in ArrowArray will have it's ownership
// transferred to the resulting array.Interface via ArrowArrayMove. The underlying array.Data
// object that is owned by the Array will now be the owner of the memory pointer and
// will call ArrowArrayRelease when it is released and garbage collected via runtime.SetFinalizer.
//
// NOTE: The array takes ownership of the underlying memory buffers via ArrowArrayMove,
// it does not take ownership of the actual arr object itself.
func ImportCArrayWithType(arr *CArrowArray, dt arrow.DataType) (array.Interface, error) {
imp, err := importCArrayAsType(arr, dt)
if err != nil {
return nil, err
}
defer imp.data.Release()
return array.MakeFromData(imp.data), nil
}
// ImportCArray takes a pointer to both a C Data ArrowArray and C Data ArrowSchema in order
// to import them into usable Go Objects. If err is not nil, then ArrowArrayRelease must still
// be called on arr to release the memory. The ArrowSchemaRelease will be called on the passed in
// schema regardless of whether there is an error or not.
//
// The Schema will be copied with the information used to populate the returned Field, complete
// with metadata. The array will reference the same memory that is referred to by the ArrowArray
// object and take ownership of it as per ImportCArrayWithType. The returned array.Interface will
// own the C memory and call ArrowArrayRelease when the array.Data object is cleaned up.
//
// NOTE: The array takes ownership of the underlying memory buffers via ArrowArrayMove,
// it does not take ownership of the actual arr object itself.
func ImportCArray(arr *CArrowArray, schema *CArrowSchema) (arrow.Field, array.Interface, error) {
field, err := importSchema(schema)
if err != nil {
return field, nil, err
}
ret, err := ImportCArrayWithType(arr, field.Type)
return field, ret, err
}
// ImportCRecordBatchWithSchema is used for importing a Record Batch array when the schema
// is already known such as when receiving record batches through a stream.
//
// All of the semantics regarding memory ownership are the same as when calling
// ImportCRecordBatch directly with a schema.
//
// NOTE: The array takes ownership of the underlying memory buffers via ArrowArrayMove,
// it does not take ownership of the actual arr object itself.
func ImportCRecordBatchWithSchema(arr *CArrowArray, sc *arrow.Schema) (array.Record, error) {
imp, err := importCArrayAsType(arr, arrow.StructOf(sc.Fields()...))
if err != nil {
return nil, err
}
st := array.NewStructData(imp.data)
defer st.Release()
// now that we have our fields, we can split them out into the slice of arrays
// and construct a record batch from them to return.
cols := make([]array.Interface, st.NumField())
for i := 0; i < st.NumField(); i++ {
cols[i] = st.Field(i)
}
return array.NewRecord(sc, cols, int64(st.Len())), nil
}
// ImportCRecordBatch imports an ArrowArray from C as a record batch. If err is not nil,
// then ArrowArrayRelease must still be called to release the memory.
//
// A record batch is represented in the C Data Interface as a Struct Array whose fields
// are the columns of the record batch. Thus after importing the schema passed in here,
// if it is not a Struct type, this will return an error. As with ImportCArray, the
// columns in the record batch will take ownership of the CArrowArray memory if successful.
// Since ArrowArrayMove is used, it's still safe to call ArrowArrayRelease on the source
// regardless. But if there is an error, it *MUST* be called to ensure there is no memory leak.
//
// NOTE: The array takes ownership of the underlying memory buffers via ArrowArrayMove,
// it does not take ownership of the actual arr object itself.
func ImportCRecordBatch(arr *CArrowArray, sc *CArrowSchema) (array.Record, error) {
field, err := importSchema(sc)
if err != nil {
return nil, err
}
if field.Type.ID() != arrow.STRUCT {
return nil, xerrors.New("recordbatch array import must be of struct type")
}
return ImportCRecordBatchWithSchema(arr, arrow.NewSchema(field.Type.(*arrow.StructType).Fields(), &field.Metadata))
}
// ImportCArrayStream creates an arrio.Reader from an ArrowArrayStream taking ownership
// of the underlying stream object via ArrowArrayStreamMove.
//
// The records returned by this reader must be released manually after they are returned.
// The reader itself will release the stream via SetFinalizer when it is garbage collected.
// It will return (nil, io.EOF) from the Read function when there are no more records to return.
//
// NOTE: The reader takes ownership of the underlying memory buffers via ArrowArrayStreamMove,
// it does not take ownership of the actual stream object itself.
func ImportCArrayStream(stream *CArrowArrayStream, schema *arrow.Schema) arrio.Reader {
out := &nativeCRecordBatchReader{schema: schema}
initReader(out, stream)
return out
}
// ExportArrowSchema populates the passed in CArrowSchema with the schema passed in so
// that it can be passed to some consumer of the C Data Interface. The `release` function
// is tied to a callback in order to properly release any memory that was allocated during
// the populating of the struct. Any memory allocated will be allocated using malloc
// which means that it is invisible to the Go Garbage Collector and must be freed manually
// using the callback on the CArrowSchema object.
func ExportArrowSchema(schema *arrow.Schema, out *CArrowSchema) {
dummy := arrow.Field{Type: arrow.StructOf(schema.Fields()...), Metadata: schema.Metadata()}
exportField(dummy, out)
}
// ExportArrowRecordBatch populates the passed in CArrowArray (and optionally the schema too)
// by sharing the memory used for the buffers of each column's arrays. It does not
// copy the data, and will internally increment the reference counters so that releasing
// the record will not free the memory prematurely.
//
// When using CGO, memory passed to C is pinned so that the Go garbage collector won't
// move where it is allocated out from under the C pointer locations, ensuring the C pointers
// stay valid. This is only true until the CGO call returns, at which point the garbage collector
// is free to move things around again. As a result, if the function you're calling is going to
// hold onto the pointers or otherwise continue to reference the memory *after* the call returns,
// you should use the CgoArrowAllocator rather than the GoAllocator (or DefaultAllocator) so that
// the memory which is allocated for the record batch in the first place is allocated in C,
// not by the Go runtime and is therefore not subject to the Garbage collection.
//
// The release function on the populated CArrowArray will properly decrease the reference counts,
// and release the memory if the record has already been released. But since this must be explicitly
// done, make sure it is released so that you do not create a memory leak.
func ExportArrowRecordBatch(rb array.Record, out *CArrowArray, outSchema *CArrowSchema) {
children := make([]*array.Data, rb.NumCols())
for i := range rb.Columns() {
children[i] = rb.Column(i).Data()
}
data := array.NewData(arrow.StructOf(rb.Schema().Fields()...), int(rb.NumRows()), []*memory.Buffer{nil},
children, 0, 0)
defer data.Release()
arr := array.NewStructData(data)
defer arr.Release()
if outSchema != nil {
ExportArrowSchema(rb.Schema(), outSchema)
}
exportArray(arr, out, nil)
}
// ExportArrowArray populates the CArrowArray that is passed in with the pointers to the memory
// being used by the array.Interface passed in, in order to share with zero-copy across the C
// Data Interface. See the documentation for ExportArrowRecordBatch for details on how to ensure
// you do not leak memory and prevent unwanted, undefined or strange behaviors.
func ExportArrowArray(arr array.Interface, out *CArrowArray, outSchema *CArrowSchema) {
exportArray(arr, out, outSchema)
}