| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package cdata |
| |
| // #include <stdlib.h> |
| // #include "arrow/c/abi.h" |
| // #include "arrow/c/helpers.h" |
| // |
| // extern void releaseExportedSchema(struct ArrowSchema* schema); |
| // extern void releaseExportedArray(struct ArrowArray* array); |
| // |
| // void goReleaseArray(struct ArrowArray* array) { |
| // releaseExportedArray(array); |
| // } |
| // void goReleaseSchema(struct ArrowSchema* schema) { |
| // releaseExportedSchema(schema); |
| // } |
| import "C" |
| |
| import ( |
| "bytes" |
| "encoding/binary" |
| "fmt" |
| "reflect" |
| "strings" |
| "unsafe" |
| |
| "github.com/apache/arrow/go/v6/arrow" |
| "github.com/apache/arrow/go/v6/arrow/array" |
| "github.com/apache/arrow/go/v6/arrow/endian" |
| "github.com/apache/arrow/go/v6/arrow/ipc" |
| ) |
| |
| func encodeCMetadata(keys, values []string) []byte { |
| if len(keys) != len(values) { |
| panic("unequal metadata key/values length") |
| } |
| npairs := int32(len(keys)) |
| |
| var b bytes.Buffer |
| totalSize := 4 |
| for i := range keys { |
| totalSize += 8 + len(keys[i]) + len(values[i]) |
| } |
| b.Grow(totalSize) |
| |
| b.Write((*[4]byte)(unsafe.Pointer(&npairs))[:]) |
| for i := range keys { |
| binary.Write(&b, endian.Native, int32(len(keys[i]))) |
| b.WriteString(keys[i]) |
| binary.Write(&b, endian.Native, int32(len(values[i]))) |
| b.WriteString(values[i]) |
| } |
| return b.Bytes() |
| } |
| |
| type schemaExporter struct { |
| format, name string |
| |
| extraMeta arrow.Metadata |
| metadata []byte |
| flags int64 |
| children []schemaExporter |
| } |
| |
| func (exp *schemaExporter) handleExtension(dt arrow.DataType) arrow.DataType { |
| if dt.ID() != arrow.EXTENSION { |
| return dt |
| } |
| |
| ext := dt.(arrow.ExtensionType) |
| exp.extraMeta = arrow.NewMetadata([]string{ipc.ExtensionTypeKeyName, ipc.ExtensionMetadataKeyName}, []string{ext.ExtensionName(), ext.Serialize()}) |
| return ext.StorageType() |
| } |
| |
| func (exp *schemaExporter) exportMeta(m *arrow.Metadata) { |
| var ( |
| finalKeys []string |
| finalValues []string |
| ) |
| |
| if m == nil { |
| if exp.extraMeta.Len() > 0 { |
| finalKeys = exp.extraMeta.Keys() |
| finalValues = exp.extraMeta.Values() |
| } |
| exp.metadata = encodeCMetadata(finalKeys, finalValues) |
| return |
| } |
| |
| finalKeys = m.Keys() |
| finalValues = m.Values() |
| |
| if exp.extraMeta.Len() > 0 { |
| for i, k := range exp.extraMeta.Keys() { |
| if m.FindKey(k) != -1 { |
| continue |
| } |
| finalKeys = append(finalKeys, k) |
| finalValues = append(finalValues, exp.extraMeta.Values()[i]) |
| } |
| } |
| exp.metadata = encodeCMetadata(finalKeys, finalValues) |
| } |
| |
| func (exp *schemaExporter) exportFormat(dt arrow.DataType) string { |
| switch dt := dt.(type) { |
| case *arrow.NullType: |
| return "n" |
| case *arrow.BooleanType: |
| return "b" |
| case *arrow.Int8Type: |
| return "c" |
| case *arrow.Uint8Type: |
| return "C" |
| case *arrow.Int16Type: |
| return "s" |
| case *arrow.Uint16Type: |
| return "S" |
| case *arrow.Int32Type: |
| return "i" |
| case *arrow.Uint32Type: |
| return "I" |
| case *arrow.Int64Type: |
| return "l" |
| case *arrow.Uint64Type: |
| return "L" |
| case *arrow.Float16Type: |
| return "e" |
| case *arrow.Float32Type: |
| return "f" |
| case *arrow.Float64Type: |
| return "g" |
| case *arrow.FixedSizeBinaryType: |
| return fmt.Sprintf("w:%d", dt.ByteWidth) |
| case *arrow.Decimal128Type: |
| return fmt.Sprintf("d:%d,%d", dt.Precision, dt.Scale) |
| case *arrow.BinaryType: |
| return "z" |
| case *arrow.StringType: |
| return "u" |
| case *arrow.Date32Type: |
| return "tdD" |
| case *arrow.Date64Type: |
| return "tdm" |
| case *arrow.Time32Type: |
| switch dt.Unit { |
| case arrow.Second: |
| return "tts" |
| case arrow.Millisecond: |
| return "ttm" |
| default: |
| panic(fmt.Sprintf("invalid time unit for time32: %s", dt.Unit)) |
| } |
| case *arrow.Time64Type: |
| switch dt.Unit { |
| case arrow.Microsecond: |
| return "ttu" |
| case arrow.Nanosecond: |
| return "ttn" |
| default: |
| panic(fmt.Sprintf("invalid time unit for time64: %s", dt.Unit)) |
| } |
| case *arrow.TimestampType: |
| var b strings.Builder |
| switch dt.Unit { |
| case arrow.Second: |
| b.WriteString("tss:") |
| case arrow.Millisecond: |
| b.WriteString("tsm:") |
| case arrow.Microsecond: |
| b.WriteString("tsu:") |
| case arrow.Nanosecond: |
| b.WriteString("tsn:") |
| default: |
| panic(fmt.Sprintf("invalid time unit for timestamp: %s", dt.Unit)) |
| } |
| b.WriteString(dt.TimeZone) |
| return b.String() |
| case *arrow.DurationType: |
| switch dt.Unit { |
| case arrow.Second: |
| return "tDs" |
| case arrow.Millisecond: |
| return "tDm" |
| case arrow.Microsecond: |
| return "tDu" |
| case arrow.Nanosecond: |
| return "tDn" |
| default: |
| panic(fmt.Sprintf("invalid time unit for duration: %s", dt.Unit)) |
| } |
| case *arrow.MonthIntervalType: |
| return "tiM" |
| case *arrow.DayTimeIntervalType: |
| return "tiD" |
| case *arrow.MonthDayNanoIntervalType: |
| return "tin" |
| case *arrow.ListType: |
| return "+l" |
| case *arrow.FixedSizeListType: |
| return fmt.Sprintf("+w:%d", dt.Len()) |
| case *arrow.StructType: |
| return "+s" |
| case *arrow.MapType: |
| if dt.KeysSorted { |
| exp.flags |= C.ARROW_FLAG_MAP_KEYS_SORTED |
| } |
| return "+m" |
| } |
| panic("unsupported data type for export") |
| } |
| |
| func (exp *schemaExporter) export(field arrow.Field) { |
| exp.name = field.Name |
| exp.format = exp.exportFormat(exp.handleExtension(field.Type)) |
| if field.Nullable { |
| exp.flags |= C.ARROW_FLAG_NULLABLE |
| } |
| |
| switch dt := field.Type.(type) { |
| case *arrow.ListType: |
| exp.children = make([]schemaExporter, 1) |
| exp.children[0].export(dt.ElemField()) |
| case *arrow.StructType: |
| exp.children = make([]schemaExporter, len(dt.Fields())) |
| for i, f := range dt.Fields() { |
| exp.children[i].export(f) |
| } |
| case *arrow.MapType: |
| exp.children = make([]schemaExporter, 1) |
| exp.children[0].export(dt.ValueField()) |
| case *arrow.FixedSizeListType: |
| exp.children = make([]schemaExporter, 1) |
| exp.children[0].export(dt.ElemField()) |
| } |
| |
| exp.exportMeta(&field.Metadata) |
| } |
| |
| func allocateArrowSchemaArr(n int) (out []CArrowSchema) { |
| s := (*reflect.SliceHeader)(unsafe.Pointer(&out)) |
| s.Data = uintptr(C.malloc(C.sizeof_struct_ArrowSchema * C.size_t(n))) |
| s.Len = n |
| s.Cap = n |
| |
| return |
| } |
| |
| func allocateArrowSchemaPtrArr(n int) (out []*CArrowSchema) { |
| s := (*reflect.SliceHeader)(unsafe.Pointer(&out)) |
| s.Data = uintptr(C.malloc(C.size_t(unsafe.Sizeof((*CArrowSchema)(nil))) * C.size_t(n))) |
| s.Len = n |
| s.Cap = n |
| |
| return |
| } |
| |
| func allocateArrowArrayArr(n int) (out []CArrowArray) { |
| s := (*reflect.SliceHeader)(unsafe.Pointer(&out)) |
| s.Data = uintptr(C.malloc(C.sizeof_struct_ArrowArray * C.size_t(n))) |
| s.Len = n |
| s.Cap = n |
| |
| return |
| } |
| |
| func allocateArrowArrayPtrArr(n int) (out []*CArrowArray) { |
| s := (*reflect.SliceHeader)(unsafe.Pointer(&out)) |
| s.Data = uintptr(C.malloc(C.size_t(unsafe.Sizeof((*CArrowArray)(nil))) * C.size_t(n))) |
| s.Len = n |
| s.Cap = n |
| |
| return |
| } |
| |
| func allocateBufferPtrArr(n int) (out []*C.void) { |
| s := (*reflect.SliceHeader)(unsafe.Pointer(&out)) |
| s.Data = uintptr(C.malloc(C.size_t(unsafe.Sizeof((*C.void)(nil))) * C.size_t(n))) |
| s.Len = n |
| s.Cap = n |
| |
| return |
| } |
| |
| func (exp *schemaExporter) finish(out *CArrowSchema) { |
| out.dictionary = nil |
| out.name = C.CString(exp.name) |
| out.format = C.CString(exp.format) |
| out.metadata = (*C.char)(C.CBytes(exp.metadata)) |
| out.flags = C.int64_t(exp.flags) |
| out.n_children = C.int64_t(len(exp.children)) |
| |
| if len(exp.children) > 0 { |
| children := allocateArrowSchemaArr(len(exp.children)) |
| childPtrs := allocateArrowSchemaPtrArr(len(exp.children)) |
| |
| for i, c := range exp.children { |
| c.finish(&children[i]) |
| childPtrs[i] = &children[i] |
| } |
| |
| out.children = (**CArrowSchema)(unsafe.Pointer(&childPtrs[0])) |
| } else { |
| out.children = nil |
| } |
| |
| out.release = (*[0]byte)(C.goReleaseSchema) |
| } |
| |
| func exportField(field arrow.Field, out *CArrowSchema) { |
| var exp schemaExporter |
| exp.export(field) |
| exp.finish(out) |
| } |
| |
| func exportArray(arr array.Interface, out *CArrowArray, outSchema *CArrowSchema) { |
| if outSchema != nil { |
| exportField(arrow.Field{Type: arr.DataType()}, outSchema) |
| } |
| |
| out.dictionary = nil |
| out.null_count = C.int64_t(arr.NullN()) |
| out.length = C.int64_t(arr.Len()) |
| out.offset = C.int64_t(arr.Data().Offset()) |
| out.n_buffers = C.int64_t(len(arr.Data().Buffers())) |
| |
| if out.n_buffers > 0 { |
| buffers := allocateBufferPtrArr(len(arr.Data().Buffers())) |
| for i := range arr.Data().Buffers() { |
| buf := arr.Data().Buffers()[i] |
| if buf == nil { |
| buffers[i] = nil |
| continue |
| } |
| |
| buffers[i] = (*C.void)(unsafe.Pointer(&buf.Bytes()[0])) |
| } |
| out.buffers = (*unsafe.Pointer)(unsafe.Pointer(&buffers[0])) |
| } |
| |
| out.private_data = unsafe.Pointer(storeData(arr.Data())) |
| out.release = (*[0]byte)(C.goReleaseArray) |
| switch arr := arr.(type) { |
| case *array.List: |
| out.n_children = 1 |
| childPtrs := allocateArrowArrayPtrArr(1) |
| children := allocateArrowArrayArr(1) |
| exportArray(arr.ListValues(), &children[0], nil) |
| childPtrs[0] = &children[0] |
| out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0])) |
| case *array.FixedSizeList: |
| out.n_children = 1 |
| childPtrs := allocateArrowArrayPtrArr(1) |
| children := allocateArrowArrayArr(1) |
| exportArray(arr.ListValues(), &children[0], nil) |
| childPtrs[0] = &children[0] |
| out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0])) |
| case *array.Map: |
| out.n_children = 1 |
| childPtrs := allocateArrowArrayPtrArr(1) |
| children := allocateArrowArrayArr(1) |
| exportArray(arr.ListValues(), &children[0], nil) |
| childPtrs[0] = &children[0] |
| out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0])) |
| case *array.Struct: |
| out.n_children = C.int64_t(arr.NumField()) |
| childPtrs := allocateArrowArrayPtrArr(arr.NumField()) |
| children := allocateArrowArrayArr(arr.NumField()) |
| for i := 0; i < arr.NumField(); i++ { |
| exportArray(arr.Field(i), &children[i], nil) |
| childPtrs[i] = &children[i] |
| } |
| out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0])) |
| default: |
| out.n_children = 0 |
| out.children = nil |
| } |
| } |