| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package ipc |
| |
| import ( |
| "context" |
| "encoding/binary" |
| "errors" |
| "fmt" |
| "io" |
| "math" |
| "sync" |
| "unsafe" |
| |
| "github.com/apache/arrow/go/v14/arrow" |
| "github.com/apache/arrow/go/v14/arrow/array" |
| "github.com/apache/arrow/go/v14/arrow/bitutil" |
| "github.com/apache/arrow/go/v14/arrow/internal" |
| "github.com/apache/arrow/go/v14/arrow/internal/debug" |
| "github.com/apache/arrow/go/v14/arrow/internal/dictutils" |
| "github.com/apache/arrow/go/v14/arrow/internal/flatbuf" |
| "github.com/apache/arrow/go/v14/arrow/memory" |
| ) |
| |
| type swriter struct { |
| w io.Writer |
| pos int64 |
| } |
| |
| func (w *swriter) Start() error { return nil } |
| func (w *swriter) Close() error { |
| _, err := w.Write(kEOS[:]) |
| return err |
| } |
| |
| func (w *swriter) WritePayload(p Payload) error { |
| _, err := writeIPCPayload(w, p) |
| if err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func (w *swriter) Write(p []byte) (int, error) { |
| n, err := w.w.Write(p) |
| w.pos += int64(n) |
| return n, err |
| } |
| |
| func hasNestedDict(data arrow.ArrayData) bool { |
| if data.DataType().ID() == arrow.DICTIONARY { |
| return true |
| } |
| for _, c := range data.Children() { |
| if hasNestedDict(c) { |
| return true |
| } |
| } |
| return false |
| } |
| |
| // Writer is an Arrow stream writer. |
| type Writer struct { |
| w io.Writer |
| |
| mem memory.Allocator |
| pw PayloadWriter |
| |
| started bool |
| schema *arrow.Schema |
| mapper dictutils.Mapper |
| codec flatbuf.CompressionType |
| compressNP int |
| minSpaceSavings *float64 |
| |
| // map of the last written dictionaries by id |
| // so we can avoid writing the same dictionary over and over |
| lastWrittenDicts map[int64]arrow.Array |
| emitDictDeltas bool |
| } |
| |
| // NewWriterWithPayloadWriter constructs a writer with the provided payload writer |
| // instead of the default stream payload writer. This makes the writer more |
| // reusable such as by the Arrow Flight writer. |
| func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer { |
| cfg := newConfig(opts...) |
| return &Writer{ |
| mem: cfg.alloc, |
| pw: pw, |
| schema: cfg.schema, |
| codec: cfg.codec, |
| compressNP: cfg.compressNP, |
| minSpaceSavings: cfg.minSpaceSavings, |
| emitDictDeltas: cfg.emitDictDeltas, |
| } |
| } |
| |
| // NewWriter returns a writer that writes records to the provided output stream. |
| func NewWriter(w io.Writer, opts ...Option) *Writer { |
| cfg := newConfig(opts...) |
| return &Writer{ |
| w: w, |
| mem: cfg.alloc, |
| pw: &swriter{w: w}, |
| schema: cfg.schema, |
| codec: cfg.codec, |
| emitDictDeltas: cfg.emitDictDeltas, |
| } |
| } |
| |
| func (w *Writer) Close() error { |
| if !w.started { |
| err := w.start() |
| if err != nil { |
| return err |
| } |
| } |
| |
| if w.pw == nil { |
| return nil |
| } |
| |
| err := w.pw.Close() |
| if err != nil { |
| return fmt.Errorf("arrow/ipc: could not close payload writer: %w", err) |
| } |
| w.pw = nil |
| |
| for _, d := range w.lastWrittenDicts { |
| d.Release() |
| } |
| |
| return nil |
| } |
| |
| func (w *Writer) Write(rec arrow.Record) (err error) { |
| defer func() { |
| if pErr := recover(); pErr != nil { |
| err = fmt.Errorf("arrow/ipc: unknown error while writing: %v", pErr) |
| } |
| }() |
| |
| if !w.started { |
| err := w.start() |
| if err != nil { |
| return err |
| } |
| } |
| |
| schema := rec.Schema() |
| if schema == nil || !schema.Equal(w.schema) { |
| return errInconsistentSchema |
| } |
| |
| const allow64b = true |
| var ( |
| data = Payload{msg: MessageRecordBatch} |
| enc = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b, w.codec, w.compressNP, w.minSpaceSavings) |
| ) |
| defer data.Release() |
| |
| err = writeDictionaryPayloads(w.mem, rec, false, w.emitDictDeltas, &w.mapper, w.lastWrittenDicts, w.pw, enc) |
| if err != nil { |
| return fmt.Errorf("arrow/ipc: failure writing dictionary batches: %w", err) |
| } |
| |
| enc.reset() |
| if err := enc.Encode(&data, rec); err != nil { |
| return fmt.Errorf("arrow/ipc: could not encode record to payload: %w", err) |
| } |
| |
| return w.pw.WritePayload(data) |
| } |
| |
| func writeDictionaryPayloads(mem memory.Allocator, batch arrow.Record, isFileFormat bool, emitDictDeltas bool, mapper *dictutils.Mapper, lastWrittenDicts map[int64]arrow.Array, pw PayloadWriter, encoder *recordEncoder) error { |
| dictionaries, err := dictutils.CollectDictionaries(batch, mapper) |
| if err != nil { |
| return err |
| } |
| defer func() { |
| for _, d := range dictionaries { |
| d.Dict.Release() |
| } |
| }() |
| |
| eqopt := array.WithNaNsEqual(true) |
| for _, pair := range dictionaries { |
| encoder.reset() |
| var ( |
| deltaStart int64 |
| enc = dictEncoder{encoder} |
| ) |
| lastDict, exists := lastWrittenDicts[pair.ID] |
| if exists { |
| if lastDict.Data() == pair.Dict.Data() { |
| continue |
| } |
| newLen, lastLen := pair.Dict.Len(), lastDict.Len() |
| if lastLen == newLen && array.ApproxEqual(lastDict, pair.Dict, eqopt) { |
| // same dictionary by value |
| // might cost CPU, but required for IPC file format |
| continue |
| } |
| if isFileFormat { |
| return errors.New("arrow/ipc: Dictionary replacement detected when writing IPC file format. Arrow IPC File only supports single dictionary per field") |
| } |
| |
| if newLen > lastLen && |
| emitDictDeltas && |
| !hasNestedDict(pair.Dict.Data()) && |
| (array.SliceApproxEqual(lastDict, 0, int64(lastLen), pair.Dict, 0, int64(lastLen), eqopt)) { |
| deltaStart = int64(lastLen) |
| } |
| } |
| |
| var data = Payload{msg: MessageDictionaryBatch} |
| defer data.Release() |
| |
| dict := pair.Dict |
| if deltaStart > 0 { |
| dict = array.NewSlice(dict, deltaStart, int64(dict.Len())) |
| defer dict.Release() |
| } |
| if err := enc.Encode(&data, pair.ID, deltaStart > 0, dict); err != nil { |
| return err |
| } |
| |
| if err := pw.WritePayload(data); err != nil { |
| return err |
| } |
| |
| lastWrittenDicts[pair.ID] = pair.Dict |
| if lastDict != nil { |
| lastDict.Release() |
| } |
| pair.Dict.Retain() |
| } |
| return nil |
| } |
| |
| func (w *Writer) start() error { |
| w.started = true |
| |
| w.mapper.ImportSchema(w.schema) |
| w.lastWrittenDicts = make(map[int64]arrow.Array) |
| |
| // write out schema payloads |
| ps := payloadFromSchema(w.schema, w.mem, &w.mapper) |
| defer ps.Release() |
| |
| for _, data := range ps { |
| err := w.pw.WritePayload(data) |
| if err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| type dictEncoder struct { |
| *recordEncoder |
| } |
| |
| func (d *dictEncoder) encodeMetadata(p *Payload, isDelta bool, id, nrows int64) error { |
| p.meta = writeDictionaryMessage(d.mem, id, isDelta, nrows, p.size, d.fields, d.meta, d.codec) |
| return nil |
| } |
| |
| func (d *dictEncoder) Encode(p *Payload, id int64, isDelta bool, dict arrow.Array) error { |
| d.start = 0 |
| defer func() { |
| d.start = 0 |
| }() |
| |
| schema := arrow.NewSchema([]arrow.Field{{Name: "dictionary", Type: dict.DataType(), Nullable: true}}, nil) |
| batch := array.NewRecord(schema, []arrow.Array{dict}, int64(dict.Len())) |
| defer batch.Release() |
| if err := d.encode(p, batch); err != nil { |
| return err |
| } |
| |
| return d.encodeMetadata(p, isDelta, id, batch.NumRows()) |
| } |
| |
| type recordEncoder struct { |
| mem memory.Allocator |
| |
| fields []fieldMetadata |
| meta []bufferMetadata |
| |
| depth int64 |
| start int64 |
| allow64b bool |
| codec flatbuf.CompressionType |
| compressNP int |
| minSpaceSavings *float64 |
| } |
| |
| func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64, allow64b bool, codec flatbuf.CompressionType, compressNP int, minSpaceSavings *float64) *recordEncoder { |
| return &recordEncoder{ |
| mem: mem, |
| start: startOffset, |
| depth: maxDepth, |
| allow64b: allow64b, |
| codec: codec, |
| compressNP: compressNP, |
| minSpaceSavings: minSpaceSavings, |
| } |
| } |
| |
| func (w *recordEncoder) shouldCompress(uncompressed, compressed int) bool { |
| debug.Assert(uncompressed > 0, "uncompressed size is 0") |
| if w.minSpaceSavings == nil { |
| return true |
| } |
| |
| savings := 1.0 - float64(compressed)/float64(uncompressed) |
| return savings >= *w.minSpaceSavings |
| } |
| |
| func (w *recordEncoder) reset() { |
| w.start = 0 |
| w.fields = make([]fieldMetadata, 0) |
| } |
| |
| func (w *recordEncoder) compressBodyBuffers(p *Payload) error { |
| compress := func(idx int, codec compressor) error { |
| if p.body[idx] == nil || p.body[idx].Len() == 0 { |
| return nil |
| } |
| |
| buf := memory.NewResizableBuffer(w.mem) |
| buf.Reserve(codec.MaxCompressedLen(p.body[idx].Len()) + arrow.Int64SizeBytes) |
| |
| binary.LittleEndian.PutUint64(buf.Buf(), uint64(p.body[idx].Len())) |
| bw := &bufferWriter{buf: buf, pos: arrow.Int64SizeBytes} |
| codec.Reset(bw) |
| |
| n, err := codec.Write(p.body[idx].Bytes()) |
| if err != nil { |
| return err |
| } |
| if err := codec.Close(); err != nil { |
| return err |
| } |
| |
| finalLen := bw.pos |
| compressedLen := bw.pos - arrow.Int64SizeBytes |
| if !w.shouldCompress(n, compressedLen) { |
| n = copy(buf.Buf()[arrow.Int64SizeBytes:], p.body[idx].Bytes()) |
| // size of -1 indicates to the reader that the body |
| // doesn't need to be decompressed |
| var noprefix int64 = -1 |
| binary.LittleEndian.PutUint64(buf.Buf(), uint64(noprefix)) |
| finalLen = n + arrow.Int64SizeBytes |
| } |
| bw.buf.Resize(finalLen) |
| p.body[idx].Release() |
| p.body[idx] = buf |
| return nil |
| } |
| |
| if w.compressNP <= 1 { |
| codec := getCompressor(w.codec) |
| for idx := range p.body { |
| if err := compress(idx, codec); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| var ( |
| wg sync.WaitGroup |
| ch = make(chan int) |
| errch = make(chan error) |
| ctx, cancel = context.WithCancel(context.Background()) |
| ) |
| defer cancel() |
| |
| for i := 0; i < w.compressNP; i++ { |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| codec := getCompressor(w.codec) |
| for { |
| select { |
| case idx, ok := <-ch: |
| if !ok { |
| // we're done, channel is closed! |
| return |
| } |
| |
| if err := compress(idx, codec); err != nil { |
| errch <- err |
| cancel() |
| return |
| } |
| case <-ctx.Done(): |
| // cancelled, return early |
| return |
| } |
| } |
| }() |
| } |
| |
| for idx := range p.body { |
| ch <- idx |
| } |
| |
| close(ch) |
| wg.Wait() |
| close(errch) |
| |
| return <-errch |
| } |
| |
| func (w *recordEncoder) encode(p *Payload, rec arrow.Record) error { |
| // perform depth-first traversal of the row-batch |
| for i, col := range rec.Columns() { |
| err := w.visit(p, col) |
| if err != nil { |
| return fmt.Errorf("arrow/ipc: could not encode column %d (%q): %w", i, rec.ColumnName(i), err) |
| } |
| } |
| |
| if w.codec != -1 { |
| if w.minSpaceSavings != nil { |
| pct := *w.minSpaceSavings |
| if pct < 0 || pct > 1 { |
| p.Release() |
| return fmt.Errorf("%w: minSpaceSavings not in range [0,1]. Provided %.05f", |
| arrow.ErrInvalid, pct) |
| } |
| } |
| w.compressBodyBuffers(p) |
| } |
| |
| // position for the start of a buffer relative to the passed frame of reference. |
| // may be 0 or some other position in an address space. |
| offset := w.start |
| w.meta = make([]bufferMetadata, len(p.body)) |
| |
| // construct the metadata for the record batch header |
| for i, buf := range p.body { |
| var ( |
| size int64 |
| padding int64 |
| ) |
| // the buffer might be null if we are handling zero row lengths. |
| if buf != nil { |
| size = int64(buf.Len()) |
| padding = bitutil.CeilByte64(size) - size |
| } |
| w.meta[i] = bufferMetadata{ |
| Offset: offset, |
| // even though we add padding, we need the Len to be correct |
| // so that decompressing works properly. |
| Len: size, |
| } |
| offset += size + padding |
| } |
| |
| p.size = offset - w.start |
| if !bitutil.IsMultipleOf8(p.size) { |
| panic("not aligned") |
| } |
| |
| return nil |
| } |
| |
| func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error { |
| if w.depth <= 0 { |
| return errMaxRecursion |
| } |
| |
| if !w.allow64b && arr.Len() > math.MaxInt32 { |
| return errBigArray |
| } |
| |
| if arr.DataType().ID() == arrow.EXTENSION { |
| arr := arr.(array.ExtensionArray) |
| err := w.visit(p, arr.Storage()) |
| if err != nil { |
| return fmt.Errorf("failed visiting storage of for array %T: %w", arr, err) |
| } |
| return nil |
| } |
| |
| if arr.DataType().ID() == arrow.DICTIONARY { |
| arr := arr.(*array.Dictionary) |
| return w.visit(p, arr.Indices()) |
| } |
| |
| // add all common elements |
| w.fields = append(w.fields, fieldMetadata{ |
| Len: int64(arr.Len()), |
| Nulls: int64(arr.NullN()), |
| Offset: 0, |
| }) |
| |
| if arr.DataType().ID() == arrow.NULL { |
| return nil |
| } |
| |
| if internal.HasValidityBitmap(arr.DataType().ID(), flatbuf.MetadataVersion(currentMetadataVersion)) { |
| switch arr.NullN() { |
| case 0: |
| // there are no null values, drop the null bitmap |
| p.body = append(p.body, nil) |
| default: |
| data := arr.Data() |
| var bitmap *memory.Buffer |
| if data.NullN() == data.Len() { |
| // every value is null, just use a new zero-initialized bitmap to avoid the expense of copying |
| bitmap = memory.NewResizableBuffer(w.mem) |
| minLength := paddedLength(bitutil.BytesForBits(int64(data.Len())), kArrowAlignment) |
| bitmap.Resize(int(minLength)) |
| } else { |
| // otherwise truncate and copy the bits |
| bitmap = newTruncatedBitmap(w.mem, int64(data.Offset()), int64(data.Len()), data.Buffers()[0]) |
| } |
| p.body = append(p.body, bitmap) |
| } |
| } |
| |
| switch dtype := arr.DataType().(type) { |
| case *arrow.NullType: |
| // ok. NullArrays are completely empty. |
| |
| case *arrow.BooleanType: |
| var ( |
| data = arr.Data() |
| bitm *memory.Buffer |
| ) |
| |
| if data.Len() != 0 { |
| bitm = newTruncatedBitmap(w.mem, int64(data.Offset()), int64(data.Len()), data.Buffers()[1]) |
| } |
| p.body = append(p.body, bitm) |
| |
| case arrow.FixedWidthDataType: |
| data := arr.Data() |
| values := data.Buffers()[1] |
| arrLen := int64(arr.Len()) |
| typeWidth := int64(dtype.BitWidth() / 8) |
| minLength := paddedLength(arrLen*typeWidth, kArrowAlignment) |
| |
| switch { |
| case needTruncate(int64(data.Offset()), values, minLength): |
| // non-zero offset: slice the buffer |
| offset := int64(data.Offset()) * typeWidth |
| // send padding if available |
| len := minI64(bitutil.CeilByte64(arrLen*typeWidth), int64(values.Len())-offset) |
| values = memory.NewBufferBytes(values.Bytes()[offset : offset+len]) |
| default: |
| if values != nil { |
| values.Retain() |
| } |
| } |
| p.body = append(p.body, values) |
| |
| case *arrow.BinaryType, *arrow.LargeBinaryType, *arrow.StringType, *arrow.LargeStringType: |
| arr := arr.(array.BinaryLike) |
| voffsets := w.getZeroBasedValueOffsets(arr) |
| data := arr.Data() |
| values := data.Buffers()[2] |
| |
| var totalDataBytes int64 |
| if voffsets != nil { |
| totalDataBytes = int64(len(arr.ValueBytes())) |
| } |
| |
| switch { |
| case needTruncate(int64(data.Offset()), values, totalDataBytes): |
| // slice data buffer to include the range we need now. |
| var ( |
| beg = arr.ValueOffset64(0) |
| len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(totalDataBytes)) |
| ) |
| values = memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len]) |
| default: |
| if values != nil { |
| values.Retain() |
| } |
| } |
| p.body = append(p.body, voffsets) |
| p.body = append(p.body, values) |
| |
| case *arrow.StructType: |
| w.depth-- |
| arr := arr.(*array.Struct) |
| for i := 0; i < arr.NumField(); i++ { |
| err := w.visit(p, arr.Field(i)) |
| if err != nil { |
| return fmt.Errorf("could not visit field %d of struct-array: %w", i, err) |
| } |
| } |
| w.depth++ |
| |
| case *arrow.SparseUnionType: |
| offset, length := arr.Data().Offset(), arr.Len() |
| arr := arr.(*array.SparseUnion) |
| typeCodes := getTruncatedBuffer(int64(offset), int64(length), int32(unsafe.Sizeof(arrow.UnionTypeCode(0))), arr.TypeCodes()) |
| p.body = append(p.body, typeCodes) |
| |
| w.depth-- |
| for i := 0; i < arr.NumFields(); i++ { |
| err := w.visit(p, arr.Field(i)) |
| if err != nil { |
| return fmt.Errorf("could not visit field %d of sparse union array: %w", i, err) |
| } |
| } |
| w.depth++ |
| case *arrow.DenseUnionType: |
| offset, length := arr.Data().Offset(), arr.Len() |
| arr := arr.(*array.DenseUnion) |
| typeCodes := getTruncatedBuffer(int64(offset), int64(length), int32(unsafe.Sizeof(arrow.UnionTypeCode(0))), arr.TypeCodes()) |
| p.body = append(p.body, typeCodes) |
| |
| w.depth-- |
| dt := arr.UnionType() |
| |
| // union type codes are not necessarily 0-indexed |
| maxCode := dt.MaxTypeCode() |
| |
| // allocate an array of child offsets. Set all to -1 to indicate we |
| // haven't observed a first occurrence of a particular child yet |
| offsets := make([]int32, maxCode+1) |
| lengths := make([]int32, maxCode+1) |
| offsets[0], lengths[0] = -1, 0 |
| for i := 1; i < len(offsets); i *= 2 { |
| copy(offsets[i:], offsets[:i]) |
| copy(lengths[i:], lengths[:i]) |
| } |
| |
| var valueOffsets *memory.Buffer |
| if offset != 0 { |
| valueOffsets = w.rebaseDenseUnionValueOffsets(arr, offsets, lengths) |
| } else { |
| valueOffsets = getTruncatedBuffer(int64(offset), int64(length), int32(arrow.Int32SizeBytes), arr.ValueOffsets()) |
| } |
| p.body = append(p.body, valueOffsets) |
| |
| // visit children and slice accordingly |
| for i := range dt.Fields() { |
| child := arr.Field(i) |
| // for sliced unions it's tricky to know how much to truncate |
| // the children. For now we'll truncate the children to be |
| // no longer than the parent union. |
| |
| if offset != 0 { |
| code := dt.TypeCodes()[i] |
| childOffset := offsets[code] |
| childLen := lengths[code] |
| |
| if childOffset > 0 { |
| child = array.NewSlice(child, int64(childOffset), int64(childOffset+childLen)) |
| defer child.Release() |
| } else if childLen < int32(child.Len()) { |
| child = array.NewSlice(child, 0, int64(childLen)) |
| defer child.Release() |
| } |
| } |
| if err := w.visit(p, child); err != nil { |
| return fmt.Errorf("could not visit field %d of dense union array: %w", i, err) |
| } |
| } |
| w.depth++ |
| case *arrow.MapType, *arrow.ListType, *arrow.LargeListType: |
| arr := arr.(array.ListLike) |
| voffsets := w.getZeroBasedValueOffsets(arr) |
| p.body = append(p.body, voffsets) |
| |
| w.depth-- |
| var ( |
| values = arr.ListValues() |
| mustRelease = false |
| values_offset int64 |
| values_end int64 |
| ) |
| defer func() { |
| if mustRelease { |
| values.Release() |
| } |
| }() |
| |
| if arr.Len() > 0 && voffsets != nil { |
| values_offset, _ = arr.ValueOffsets(0) |
| _, values_end = arr.ValueOffsets(arr.Len() - 1) |
| } |
| |
| if arr.Len() != 0 || values_end < int64(values.Len()) { |
| // must also slice the values |
| values = array.NewSlice(values, values_offset, values_end) |
| mustRelease = true |
| } |
| err := w.visit(p, values) |
| |
| if err != nil { |
| return fmt.Errorf("could not visit list element for array %T: %w", arr, err) |
| } |
| w.depth++ |
| |
| case *arrow.ListViewType, *arrow.LargeListViewType: |
| data := arr.Data() |
| arr := arr.(array.VarLenListLike) |
| offsetTraits := arr.DataType().(arrow.OffsetsDataType).OffsetTypeTraits() |
| rngOff, rngLen := array.RangeOfValuesUsed(arr) |
| voffsets := w.getValueOffsetsAtBaseValue(arr, rngOff) |
| p.body = append(p.body, voffsets) |
| |
| vsizes := data.Buffers()[2] |
| if vsizes != nil { |
| if data.Offset() != 0 || vsizes.Len() > offsetTraits.BytesRequired(arr.Len()) { |
| beg := offsetTraits.BytesRequired(data.Offset()) |
| end := beg + offsetTraits.BytesRequired(data.Len()) |
| vsizes = memory.NewBufferBytes(vsizes.Bytes()[beg:end]) |
| } else { |
| vsizes.Retain() |
| } |
| } |
| p.body = append(p.body, vsizes) |
| |
| w.depth-- |
| var ( |
| values = arr.ListValues() |
| mustRelease = false |
| values_offset = int64(rngOff) |
| values_end = int64(rngOff + rngLen) |
| ) |
| defer func() { |
| if mustRelease { |
| values.Release() |
| } |
| }() |
| |
| if arr.Len() > 0 && values_end < int64(values.Len()) { |
| // must also slice the values |
| values = array.NewSlice(values, values_offset, values_end) |
| mustRelease = true |
| } |
| err := w.visit(p, values) |
| |
| if err != nil { |
| return fmt.Errorf("could not visit list element for array %T: %w", arr, err) |
| } |
| w.depth++ |
| |
| case *arrow.FixedSizeListType: |
| arr := arr.(*array.FixedSizeList) |
| |
| w.depth-- |
| |
| size := int64(arr.DataType().(*arrow.FixedSizeListType).Len()) |
| beg := int64(arr.Offset()) * size |
| end := int64(arr.Offset()+arr.Len()) * size |
| |
| values := array.NewSlice(arr.ListValues(), beg, end) |
| defer values.Release() |
| |
| err := w.visit(p, values) |
| |
| if err != nil { |
| return fmt.Errorf("could not visit list element for array %T: %w", arr, err) |
| } |
| w.depth++ |
| |
| case *arrow.RunEndEncodedType: |
| arr := arr.(*array.RunEndEncoded) |
| w.depth-- |
| child := arr.LogicalRunEndsArray(w.mem) |
| defer child.Release() |
| if err := w.visit(p, child); err != nil { |
| return err |
| } |
| child = arr.LogicalValuesArray() |
| defer child.Release() |
| if err := w.visit(p, child); err != nil { |
| return err |
| } |
| w.depth++ |
| |
| default: |
| panic(fmt.Errorf("arrow/ipc: unknown array %T (dtype=%T)", arr, dtype)) |
| } |
| |
| return nil |
| } |
| |
| func (w *recordEncoder) getZeroBasedValueOffsets(arr arrow.Array) *memory.Buffer { |
| data := arr.Data() |
| voffsets := data.Buffers()[1] |
| offsetTraits := arr.DataType().(arrow.OffsetsDataType).OffsetTypeTraits() |
| offsetBytesNeeded := offsetTraits.BytesRequired(data.Len() + 1) |
| |
| if voffsets == nil || voffsets.Len() == 0 { |
| return nil |
| } |
| |
| // if we have a non-zero offset, then the value offsets do not start at |
| // zero. we must a) create a new offsets array with shifted offsets and |
| // b) slice the values array accordingly |
| // |
| // or if there are more value offsets than values (the array has been sliced) |
| // we need to trim off the trailing offsets |
| needsTruncateAndShift := data.Offset() != 0 || offsetBytesNeeded < voffsets.Len() |
| |
| if needsTruncateAndShift { |
| shiftedOffsets := memory.NewResizableBuffer(w.mem) |
| shiftedOffsets.Resize(offsetBytesNeeded) |
| |
| switch arr.DataType().Layout().Buffers[1].ByteWidth { |
| case 8: |
| dest := arrow.Int64Traits.CastFromBytes(shiftedOffsets.Bytes()) |
| offsets := arrow.Int64Traits.CastFromBytes(voffsets.Bytes())[data.Offset() : data.Offset()+data.Len()+1] |
| |
| startOffset := offsets[0] |
| for i, o := range offsets { |
| dest[i] = o - startOffset |
| } |
| |
| default: |
| debug.Assert(arr.DataType().Layout().Buffers[1].ByteWidth == 4, "invalid offset bytewidth") |
| dest := arrow.Int32Traits.CastFromBytes(shiftedOffsets.Bytes()) |
| offsets := arrow.Int32Traits.CastFromBytes(voffsets.Bytes())[data.Offset() : data.Offset()+data.Len()+1] |
| |
| startOffset := offsets[0] |
| for i, o := range offsets { |
| dest[i] = o - startOffset |
| } |
| } |
| |
| voffsets = shiftedOffsets |
| } else { |
| voffsets.Retain() |
| } |
| |
| return voffsets |
| } |
| |
| // Truncates the offsets if needed and shifts the values if minOffset > 0. |
| // The offsets returned are corrected assuming the child values are truncated |
| // and now start at minOffset. |
| // |
| // This function only works on offset buffers of ListViews and LargeListViews. |
| // TODO(felipecrv): Unify this with getZeroBasedValueOffsets. |
| func (w *recordEncoder) getValueOffsetsAtBaseValue(arr arrow.Array, minOffset int) *memory.Buffer { |
| data := arr.Data() |
| voffsets := data.Buffers()[1] |
| offsetTraits := arr.DataType().(arrow.OffsetsDataType).OffsetTypeTraits() |
| offsetBytesNeeded := offsetTraits.BytesRequired(data.Len()) |
| |
| if voffsets == nil || voffsets.Len() == 0 { |
| return nil |
| } |
| |
| needsTruncate := data.Offset() != 0 || offsetBytesNeeded < voffsets.Len() |
| needsShift := minOffset > 0 |
| |
| if needsTruncate || needsShift { |
| shiftedOffsets := memory.NewResizableBuffer(w.mem) |
| shiftedOffsets.Resize(offsetBytesNeeded) |
| |
| switch arr.DataType().Layout().Buffers[1].ByteWidth { |
| case 8: |
| dest := arrow.Int64Traits.CastFromBytes(shiftedOffsets.Bytes()) |
| offsets := arrow.Int64Traits.CastFromBytes(voffsets.Bytes())[data.Offset() : data.Offset()+data.Len()] |
| |
| if minOffset > 0 { |
| for i, o := range offsets { |
| dest[i] = o - int64(minOffset) |
| } |
| } else { |
| copy(dest, offsets) |
| } |
| default: |
| debug.Assert(arr.DataType().Layout().Buffers[1].ByteWidth == 4, "invalid offset bytewidth") |
| dest := arrow.Int32Traits.CastFromBytes(shiftedOffsets.Bytes()) |
| offsets := arrow.Int32Traits.CastFromBytes(voffsets.Bytes())[data.Offset() : data.Offset()+data.Len()] |
| |
| if minOffset > 0 { |
| for i, o := range offsets { |
| dest[i] = o - int32(minOffset) |
| } |
| } else { |
| copy(dest, offsets) |
| } |
| } |
| |
| voffsets = shiftedOffsets |
| } else { |
| voffsets.Retain() |
| } |
| |
| return voffsets |
| } |
| |
| func (w *recordEncoder) rebaseDenseUnionValueOffsets(arr *array.DenseUnion, offsets, lengths []int32) *memory.Buffer { |
| // this case sucks. Because the offsets are different for each |
| // child array, when we have a sliced array, we need to re-base |
| // the value offsets for each array! ew. |
| unshiftedOffsets := arr.RawValueOffsets() |
| codes := arr.RawTypeCodes() |
| |
| shiftedOffsetsBuf := memory.NewResizableBuffer(w.mem) |
| shiftedOffsetsBuf.Resize(arrow.Int32Traits.BytesRequired(arr.Len())) |
| shiftedOffsets := arrow.Int32Traits.CastFromBytes(shiftedOffsetsBuf.Bytes()) |
| |
| // compute shifted offsets by subtracting child offset |
| for i, c := range codes { |
| if offsets[c] == -1 { |
| // offsets are guaranteed to be increasing according to the spec |
| // so the first offset we find for a child is the initial offset |
| // and will become the "0" for this child. |
| offsets[c] = unshiftedOffsets[i] |
| shiftedOffsets[i] = 0 |
| } else { |
| shiftedOffsets[i] = unshiftedOffsets[i] - offsets[c] |
| } |
| lengths[c] = maxI32(lengths[c], shiftedOffsets[i]+1) |
| } |
| return shiftedOffsetsBuf |
| } |
| |
| func (w *recordEncoder) Encode(p *Payload, rec arrow.Record) error { |
| if err := w.encode(p, rec); err != nil { |
| return err |
| } |
| return w.encodeMetadata(p, rec.NumRows()) |
| } |
| |
| func (w *recordEncoder) encodeMetadata(p *Payload, nrows int64) error { |
| p.meta = writeRecordMessage(w.mem, nrows, p.size, w.fields, w.meta, w.codec) |
| return nil |
| } |
| |
| func newTruncatedBitmap(mem memory.Allocator, offset, length int64, input *memory.Buffer) *memory.Buffer { |
| if input == nil { |
| return nil |
| } |
| |
| minLength := paddedLength(bitutil.BytesForBits(length), kArrowAlignment) |
| switch { |
| case offset != 0 || minLength < int64(input.Len()): |
| // with a sliced array / non-zero offset, we must copy the bitmap |
| buf := memory.NewResizableBuffer(mem) |
| buf.Resize(int(minLength)) |
| bitutil.CopyBitmap(input.Bytes(), int(offset), int(length), buf.Bytes(), 0) |
| return buf |
| default: |
| input.Retain() |
| return input |
| } |
| } |
| |
| func getTruncatedBuffer(offset, length int64, byteWidth int32, buf *memory.Buffer) *memory.Buffer { |
| if buf == nil { |
| return buf |
| } |
| |
| paddedLen := paddedLength(length*int64(byteWidth), kArrowAlignment) |
| if offset != 0 || paddedLen < int64(buf.Len()) { |
| return memory.SliceBuffer(buf, int(offset*int64(byteWidth)), int(minI64(paddedLen, int64(buf.Len())))) |
| } |
| buf.Retain() |
| return buf |
| } |
| |
| func needTruncate(offset int64, buf *memory.Buffer, minLength int64) bool { |
| if buf == nil { |
| return false |
| } |
| return offset != 0 || minLength < int64(buf.Len()) |
| } |
| |
| func minI64(a, b int64) int64 { |
| if a < b { |
| return a |
| } |
| return b |
| } |
| |
| func maxI32(a, b int32) int32 { |
| if a > b { |
| return a |
| } |
| return b |
| } |