| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package pqarrow |
| |
| import ( |
| "encoding/binary" |
| "errors" |
| "fmt" |
| "reflect" |
| "sync" |
| "sync/atomic" |
| "time" |
| "unsafe" |
| |
| "github.com/apache/arrow/go/v17/arrow" |
| "github.com/apache/arrow/go/v17/arrow/array" |
| "github.com/apache/arrow/go/v17/arrow/bitutil" |
| "github.com/apache/arrow/go/v17/arrow/decimal128" |
| "github.com/apache/arrow/go/v17/arrow/decimal256" |
| "github.com/apache/arrow/go/v17/arrow/memory" |
| "github.com/apache/arrow/go/v17/internal/utils" |
| "github.com/apache/arrow/go/v17/parquet" |
| "github.com/apache/arrow/go/v17/parquet/file" |
| "github.com/apache/arrow/go/v17/parquet/schema" |
| "golang.org/x/sync/errgroup" |
| ) |
| |
| // column reader for leaf columns (non-nested) |
| type leafReader struct { |
| out *arrow.Chunked |
| rctx *readerCtx |
| field *arrow.Field |
| input *columnIterator |
| descr *schema.Column |
| recordRdr file.RecordReader |
| props ArrowReadProperties |
| |
| refCount int64 |
| } |
| |
| func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator, leafInfo file.LevelInfo, props ArrowReadProperties, bufferPool *sync.Pool) (*ColumnReader, error) { |
| ret := &leafReader{ |
| rctx: rctx, |
| field: field, |
| input: input, |
| descr: input.Descr(), |
| recordRdr: file.NewRecordReader(input.Descr(), leafInfo, field.Type, rctx.mem, bufferPool), |
| props: props, |
| refCount: 1, |
| } |
| err := ret.nextRowGroup() |
| return &ColumnReader{ret}, err |
| } |
| |
| func (lr *leafReader) Retain() { |
| atomic.AddInt64(&lr.refCount, 1) |
| } |
| |
| func (lr *leafReader) Release() { |
| if atomic.AddInt64(&lr.refCount, -1) == 0 { |
| lr.releaseOut() |
| if lr.recordRdr != nil { |
| lr.recordRdr.Release() |
| lr.recordRdr = nil |
| } |
| } |
| } |
| |
| func (lr *leafReader) GetDefLevels() ([]int16, error) { |
| return lr.recordRdr.DefLevels()[:int(lr.recordRdr.LevelsPos())], nil |
| } |
| |
| func (lr *leafReader) GetRepLevels() ([]int16, error) { |
| return lr.recordRdr.RepLevels()[:int(lr.recordRdr.LevelsPos())], nil |
| } |
| |
| func (lr *leafReader) IsOrHasRepeatedChild() bool { return false } |
| |
| func (lr *leafReader) LoadBatch(nrecords int64) (err error) { |
| lr.releaseOut() |
| lr.recordRdr.Reset() |
| |
| if err := lr.recordRdr.Reserve(nrecords); err != nil { |
| return err |
| } |
| for nrecords > 0 { |
| if !lr.recordRdr.HasMore() { |
| break |
| } |
| numRead, err := lr.recordRdr.ReadRecords(nrecords) |
| if err != nil { |
| return err |
| } |
| nrecords -= numRead |
| if numRead == 0 { |
| if err = lr.nextRowGroup(); err != nil { |
| return err |
| } |
| } |
| } |
| lr.out, err = transferColumnData(lr.recordRdr, lr.field.Type, lr.descr) |
| return |
| } |
| |
| func (lr *leafReader) BuildArray(int64) (*arrow.Chunked, error) { |
| return lr.clearOut(), nil |
| } |
| |
| // releaseOut will clear lr.out as well as release it if it wasn't nil |
| func (lr *leafReader) releaseOut() { |
| if out := lr.clearOut(); out != nil { |
| out.Release() |
| } |
| } |
| |
| // clearOut will clear lt.out and return the old value |
| func (lr *leafReader) clearOut() (out *arrow.Chunked) { |
| out, lr.out = lr.out, nil |
| return out |
| } |
| |
| func (lr *leafReader) Field() *arrow.Field { return lr.field } |
| |
| func (lr *leafReader) nextRowGroup() error { |
| pr, err := lr.input.NextChunk() |
| if err != nil { |
| return err |
| } |
| lr.recordRdr.SetPageReader(pr) |
| return nil |
| } |
| |
| // column reader for struct arrays, has readers for each child which could |
| // themselves be nested or leaf columns. |
| type structReader struct { |
| rctx *readerCtx |
| filtered *arrow.Field |
| levelInfo file.LevelInfo |
| children []*ColumnReader |
| defRepLevelChild *ColumnReader |
| hasRepeatedChild bool |
| props ArrowReadProperties |
| |
| refCount int64 |
| } |
| |
| func (sr *structReader) Retain() { |
| atomic.AddInt64(&sr.refCount, 1) |
| } |
| |
| func (sr *structReader) Release() { |
| if atomic.AddInt64(&sr.refCount, -1) == 0 { |
| if sr.defRepLevelChild != nil { |
| sr.defRepLevelChild.Release() |
| sr.defRepLevelChild = nil |
| } |
| for _, c := range sr.children { |
| c.Release() |
| } |
| sr.children = nil |
| } |
| } |
| |
| func newStructReader(rctx *readerCtx, filtered *arrow.Field, levelInfo file.LevelInfo, children []*ColumnReader, props ArrowReadProperties) *ColumnReader { |
| ret := &structReader{ |
| rctx: rctx, |
| filtered: filtered, |
| levelInfo: levelInfo, |
| children: children, |
| props: props, |
| refCount: 1, |
| } |
| |
| // there could be a mix of children some might be repeated and some might not be |
| // if possible use one that isn't since that will be guaranteed to have the least |
| // number of levels to reconstruct a nullable bitmap |
| for _, child := range children { |
| if !child.IsOrHasRepeatedChild() { |
| ret.defRepLevelChild = child |
| break |
| } |
| } |
| |
| if ret.defRepLevelChild == nil { |
| ret.defRepLevelChild = children[0] |
| ret.hasRepeatedChild = true |
| } |
| ret.defRepLevelChild.Retain() |
| return &ColumnReader{ret} |
| } |
| |
| func (sr *structReader) IsOrHasRepeatedChild() bool { return sr.hasRepeatedChild } |
| |
| func (sr *structReader) GetDefLevels() ([]int16, error) { |
| if len(sr.children) == 0 { |
| return nil, errors.New("struct reader has no children") |
| } |
| |
| // this method should only be called when this struct or one of its parents |
| // are optional/repeated or has a repeated child |
| // meaning all children must have rep/def levels associated with them |
| return sr.defRepLevelChild.GetDefLevels() |
| } |
| |
| func (sr *structReader) GetRepLevels() ([]int16, error) { |
| if len(sr.children) == 0 { |
| return nil, errors.New("struct reader has no children") |
| } |
| |
| // this method should only be called when this struct or one of its parents |
| // are optional/repeated or has a repeated child |
| // meaning all children must have rep/def levels associated with them |
| return sr.defRepLevelChild.GetRepLevels() |
| } |
| |
| func (sr *structReader) LoadBatch(nrecords int64) error { |
| // Load batches in parallel |
| // When reading structs with large numbers of columns, the serial load is very slow. |
| // This is especially true when reading Cloud Storage. Loading concurrently |
| // greatly improves performance. |
| g := new(errgroup.Group) |
| if !sr.props.Parallel { |
| g.SetLimit(1) |
| } |
| for _, rdr := range sr.children { |
| rdr := rdr |
| g.Go(func() error { |
| return rdr.LoadBatch(nrecords) |
| }) |
| } |
| |
| return g.Wait() |
| } |
| |
| func (sr *structReader) Field() *arrow.Field { return sr.filtered } |
| |
| func (sr *structReader) BuildArray(lenBound int64) (*arrow.Chunked, error) { |
| validityIO := file.ValidityBitmapInputOutput{ |
| ReadUpperBound: lenBound, |
| Read: lenBound, |
| } |
| |
| var nullBitmap *memory.Buffer |
| |
| if lenBound > 0 && (sr.hasRepeatedChild || sr.filtered.Nullable) { |
| nullBitmap = memory.NewResizableBuffer(sr.rctx.mem) |
| nullBitmap.Resize(int(bitutil.BytesForBits(lenBound))) |
| defer nullBitmap.Release() |
| validityIO.ValidBits = nullBitmap.Bytes() |
| defLevels, err := sr.GetDefLevels() |
| if err != nil { |
| return nil, err |
| } |
| |
| if sr.hasRepeatedChild { |
| repLevels, err := sr.GetRepLevels() |
| if err != nil { |
| return nil, err |
| } |
| |
| if err := file.DefRepLevelsToBitmap(defLevels, repLevels, sr.levelInfo, &validityIO); err != nil { |
| return nil, err |
| } |
| } else { |
| file.DefLevelsToBitmap(defLevels, sr.levelInfo, &validityIO) |
| } |
| } |
| |
| if nullBitmap != nil { |
| nullBitmap.Resize(int(bitutil.BytesForBits(validityIO.Read))) |
| } |
| |
| childArrData := make([]arrow.ArrayData, len(sr.children)) |
| defer releaseArrayData(childArrData) |
| // gather children arrays and def levels |
| for i, child := range sr.children { |
| field, err := child.BuildArray(lenBound) |
| if err != nil { |
| return nil, err |
| } |
| |
| childArrData[i], err = chunksToSingle(field) |
| field.Release() // release field before checking |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| if !sr.filtered.Nullable && !sr.hasRepeatedChild { |
| validityIO.Read = int64(childArrData[0].Len()) |
| } |
| |
| buffers := make([]*memory.Buffer, 1) |
| if validityIO.NullCount > 0 { |
| buffers[0] = nullBitmap |
| } |
| |
| data := array.NewData(sr.filtered.Type, int(validityIO.Read), buffers, childArrData, int(validityIO.NullCount), 0) |
| defer data.Release() |
| arr := array.NewStructData(data) |
| defer arr.Release() |
| return arrow.NewChunked(sr.filtered.Type, []arrow.Array{arr}), nil |
| } |
| |
| // column reader for repeated columns specifically for list arrays |
| type listReader struct { |
| rctx *readerCtx |
| field *arrow.Field |
| info file.LevelInfo |
| itemRdr *ColumnReader |
| props ArrowReadProperties |
| refCount int64 |
| } |
| |
| func newListReader(rctx *readerCtx, field *arrow.Field, info file.LevelInfo, childRdr *ColumnReader, props ArrowReadProperties) *ColumnReader { |
| childRdr.Retain() |
| return &ColumnReader{&listReader{rctx, field, info, childRdr, props, 1}} |
| } |
| |
| func (lr *listReader) Retain() { |
| atomic.AddInt64(&lr.refCount, 1) |
| } |
| |
| func (lr *listReader) Release() { |
| if atomic.AddInt64(&lr.refCount, -1) == 0 { |
| if lr.itemRdr != nil { |
| lr.itemRdr.Release() |
| lr.itemRdr = nil |
| } |
| } |
| } |
| |
| func (lr *listReader) GetDefLevels() ([]int16, error) { |
| return lr.itemRdr.GetDefLevels() |
| } |
| |
| func (lr *listReader) GetRepLevels() ([]int16, error) { |
| return lr.itemRdr.GetRepLevels() |
| } |
| |
| func (lr *listReader) Field() *arrow.Field { return lr.field } |
| |
| func (lr *listReader) IsOrHasRepeatedChild() bool { return true } |
| |
| func (lr *listReader) LoadBatch(nrecords int64) error { |
| return lr.itemRdr.LoadBatch(nrecords) |
| } |
| |
| func (lr *listReader) BuildArray(lenBound int64) (*arrow.Chunked, error) { |
| var ( |
| defLevels []int16 |
| repLevels []int16 |
| err error |
| validityBuffer *memory.Buffer |
| ) |
| |
| if defLevels, err = lr.itemRdr.GetDefLevels(); err != nil { |
| return nil, err |
| } |
| if repLevels, err = lr.itemRdr.GetRepLevels(); err != nil { |
| return nil, err |
| } |
| |
| validityIO := file.ValidityBitmapInputOutput{ReadUpperBound: lenBound} |
| if lr.field.Nullable { |
| validityBuffer = memory.NewResizableBuffer(lr.rctx.mem) |
| validityBuffer.Resize(int(bitutil.BytesForBits(lenBound))) |
| defer validityBuffer.Release() |
| validityIO.ValidBits = validityBuffer.Bytes() |
| } |
| offsetsBuffer := memory.NewResizableBuffer(lr.rctx.mem) |
| offsetsBuffer.Resize(arrow.Int32Traits.BytesRequired(int(lenBound) + 1)) |
| defer offsetsBuffer.Release() |
| |
| offsetData := arrow.Int32Traits.CastFromBytes(offsetsBuffer.Bytes()) |
| if err = file.DefRepLevelsToListInfo(defLevels, repLevels, lr.info, &validityIO, offsetData); err != nil { |
| return nil, err |
| } |
| |
| // if the parent (itemRdr) has nulls and is a nested type like list |
| // then we need BuildArray to account for that with the number of |
| // definition levels when building out the bitmap. So the upper bound |
| // to make sure we have the space for is the worst case scenario, |
| // the upper bound is the value of the last offset + the nullcount |
| arr, err := lr.itemRdr.BuildArray(int64(offsetData[int(validityIO.Read)]) + validityIO.NullCount) |
| if err != nil { |
| return nil, err |
| } |
| defer arr.Release() |
| |
| // resize to actual number of elems returned |
| offsetsBuffer.Resize(arrow.Int32Traits.BytesRequired(int(validityIO.Read) + 1)) |
| if validityBuffer != nil { |
| validityBuffer.Resize(int(bitutil.BytesForBits(validityIO.Read))) |
| } |
| |
| item, err := chunksToSingle(arr) |
| if err != nil { |
| return nil, err |
| } |
| defer item.Release() |
| |
| buffers := []*memory.Buffer{nil, offsetsBuffer} |
| if validityIO.NullCount > 0 { |
| buffers[0] = validityBuffer |
| } |
| |
| data := array.NewData(lr.field.Type, int(validityIO.Read), buffers, []arrow.ArrayData{item}, int(validityIO.NullCount), 0) |
| defer data.Release() |
| if lr.field.Type.ID() == arrow.FIXED_SIZE_LIST { |
| defer data.Buffers()[1].Release() |
| listSize := lr.field.Type.(*arrow.FixedSizeListType).Len() |
| for x := 1; x < data.Len(); x++ { |
| size := offsetData[x] - offsetData[x-1] |
| if size != listSize { |
| return nil, fmt.Errorf("expected all lists to be of size=%d, but index %d had size=%d", listSize, x, size) |
| } |
| } |
| data.Buffers()[1] = nil |
| } |
| out := array.MakeFromData(data) |
| defer out.Release() |
| return arrow.NewChunked(lr.field.Type, []arrow.Array{out}), nil |
| } |
| |
| // column reader logic for fixed size lists instead of variable length ones. |
| type fixedSizeListReader struct { |
| listReader |
| } |
| |
| func newFixedSizeListReader(rctx *readerCtx, field *arrow.Field, info file.LevelInfo, childRdr *ColumnReader, props ArrowReadProperties) *ColumnReader { |
| childRdr.Retain() |
| return &ColumnReader{&fixedSizeListReader{listReader{rctx, field, info, childRdr, props, 1}}} |
| } |
| |
| // helper function to combine chunks into a single array. |
| // |
| // nested data conversion for chunked array outputs not yet implemented |
| func chunksToSingle(chunked *arrow.Chunked) (arrow.ArrayData, error) { |
| switch len(chunked.Chunks()) { |
| case 0: |
| return array.NewData(chunked.DataType(), 0, []*memory.Buffer{nil, nil}, nil, 0, 0), nil |
| case 1: |
| data := chunked.Chunk(0).Data() |
| data.Retain() // we pass control to the caller |
| return data, nil |
| default: // if an item reader yields a chunked array, this is not yet implemented |
| return nil, arrow.ErrNotImplemented |
| } |
| } |
| |
| // create a chunked arrow array from the raw record data |
| func transferColumnData(rdr file.RecordReader, valueType arrow.DataType, descr *schema.Column) (*arrow.Chunked, error) { |
| dt := valueType |
| if valueType.ID() == arrow.EXTENSION { |
| dt = valueType.(arrow.ExtensionType).StorageType() |
| } |
| |
| var data arrow.ArrayData |
| switch dt.ID() { |
| case arrow.DICTIONARY: |
| return transferDictionary(rdr, valueType), nil |
| case arrow.NULL: |
| return arrow.NewChunked(arrow.Null, []arrow.Array{array.NewNull(rdr.ValuesWritten())}), nil |
| case arrow.INT32, arrow.INT64, arrow.FLOAT32, arrow.FLOAT64: |
| data = transferZeroCopy(rdr, valueType) // can just reference the raw data without copying |
| case arrow.BOOL: |
| data = transferBool(rdr) |
| case arrow.UINT8, |
| arrow.UINT16, |
| arrow.UINT32, |
| arrow.UINT64, |
| arrow.INT8, |
| arrow.INT16, |
| arrow.DATE32, |
| arrow.TIME32, |
| arrow.TIME64: |
| data = transferInt(rdr, valueType) |
| case arrow.DATE64: |
| data = transferDate64(rdr, valueType) |
| case arrow.FIXED_SIZE_BINARY, arrow.BINARY, arrow.STRING, arrow.LARGE_BINARY, arrow.LARGE_STRING: |
| return transferBinary(rdr, valueType), nil |
| case arrow.DECIMAL, arrow.DECIMAL256: |
| switch descr.PhysicalType() { |
| case parquet.Types.Int32, parquet.Types.Int64: |
| data = transferDecimalInteger(rdr, valueType) |
| case parquet.Types.ByteArray, parquet.Types.FixedLenByteArray: |
| return transferDecimalBytes(rdr.(file.BinaryRecordReader), valueType) |
| default: |
| return nil, errors.New("physical type for decimal128/decimal256 must be int32, int64, bytearray or fixed len byte array") |
| } |
| case arrow.TIMESTAMP: |
| tstype := valueType.(*arrow.TimestampType) |
| switch tstype.Unit { |
| case arrow.Millisecond, arrow.Microsecond: |
| data = transferZeroCopy(rdr, valueType) |
| case arrow.Nanosecond: |
| if descr.PhysicalType() == parquet.Types.Int96 { |
| data = transferInt96(rdr, valueType) |
| } else { |
| data = transferZeroCopy(rdr, valueType) |
| } |
| default: |
| return nil, errors.New("time unit not supported") |
| } |
| case arrow.FLOAT16: |
| if descr.PhysicalType() != parquet.Types.FixedLenByteArray { |
| return nil, errors.New("physical type for float16 must be fixed len byte array") |
| } |
| if len := arrow.Float16SizeBytes; descr.TypeLength() != len { |
| return nil, fmt.Errorf("fixed len byte array length for float16 must be %d", len) |
| } |
| return transferBinary(rdr, valueType), nil |
| default: |
| return nil, fmt.Errorf("no support for reading columns of type: %s", valueType.Name()) |
| } |
| |
| defer data.Release() |
| arr := array.MakeFromData(data) |
| defer arr.Release() |
| return arrow.NewChunked(valueType, []arrow.Array{arr}), nil |
| } |
| |
| func transferZeroCopy(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData { |
| bitmap := rdr.ReleaseValidBits() |
| values := rdr.ReleaseValues() |
| defer func() { |
| if bitmap != nil { |
| bitmap.Release() |
| } |
| if values != nil { |
| values.Release() |
| } |
| }() |
| |
| return array.NewData(dt, rdr.ValuesWritten(), |
| []*memory.Buffer{bitmap, values}, |
| nil, int(rdr.NullCount()), 0) |
| } |
| |
| func transferBinary(rdr file.RecordReader, dt arrow.DataType) *arrow.Chunked { |
| brdr := rdr.(file.BinaryRecordReader) |
| if brdr.ReadDictionary() { |
| return transferDictionary(brdr, &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: dt}) |
| } |
| chunks := brdr.GetBuilderChunks() |
| defer releaseArrays(chunks) |
| |
| switch dt := dt.(type) { |
| case arrow.ExtensionType: |
| for idx, chunk := range chunks { |
| chunks[idx] = array.NewExtensionArrayWithStorage(dt, chunk) |
| chunk.Release() |
| } |
| case *arrow.StringType, *arrow.LargeStringType: |
| for idx, chunk := range chunks { |
| chunks[idx] = array.MakeFromData(chunk.Data()) |
| chunk.Release() |
| } |
| case *arrow.Float16Type: |
| for idx, chunk := range chunks { |
| data := chunk.Data() |
| f16_data := array.NewData(dt, data.Len(), data.Buffers(), nil, data.NullN(), data.Offset()) |
| defer f16_data.Release() |
| chunks[idx] = array.NewFloat16Data(f16_data) |
| chunk.Release() |
| } |
| } |
| return arrow.NewChunked(dt, chunks) |
| } |
| |
| func transferInt(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData { |
| var ( |
| output reflect.Value |
| ) |
| |
| signed := true |
| // create buffer for proper type since parquet only has int32 and int64 |
| // physical representations, but we want the correct type representation |
| // for Arrow's in memory buffer. |
| data := make([]byte, rdr.ValuesWritten()*int(bitutil.BytesForBits(int64(dt.(arrow.FixedWidthDataType).BitWidth())))) |
| switch dt.ID() { |
| case arrow.INT8: |
| output = reflect.ValueOf(arrow.Int8Traits.CastFromBytes(data)) |
| case arrow.UINT8: |
| signed = false |
| output = reflect.ValueOf(arrow.Uint8Traits.CastFromBytes(data)) |
| case arrow.INT16: |
| output = reflect.ValueOf(arrow.Int16Traits.CastFromBytes(data)) |
| case arrow.UINT16: |
| signed = false |
| output = reflect.ValueOf(arrow.Uint16Traits.CastFromBytes(data)) |
| case arrow.UINT32: |
| signed = false |
| output = reflect.ValueOf(arrow.Uint32Traits.CastFromBytes(data)) |
| case arrow.UINT64: |
| signed = false |
| output = reflect.ValueOf(arrow.Uint64Traits.CastFromBytes(data)) |
| case arrow.DATE32: |
| output = reflect.ValueOf(arrow.Date32Traits.CastFromBytes(data)) |
| case arrow.TIME32: |
| output = reflect.ValueOf(arrow.Time32Traits.CastFromBytes(data)) |
| case arrow.TIME64: |
| output = reflect.ValueOf(arrow.Time64Traits.CastFromBytes(data)) |
| } |
| |
| length := rdr.ValuesWritten() |
| // copy the values semantically with the correct types |
| switch rdr.Type() { |
| case parquet.Types.Int32: |
| values := arrow.Int32Traits.CastFromBytes(rdr.Values()) |
| if signed { |
| for idx, v := range values[:length] { |
| output.Index(idx).SetInt(int64(v)) |
| } |
| } else { |
| for idx, v := range values[:length] { |
| output.Index(idx).SetUint(uint64(v)) |
| } |
| } |
| case parquet.Types.Int64: |
| values := arrow.Int64Traits.CastFromBytes(rdr.Values()) |
| if signed { |
| for idx, v := range values[:length] { |
| output.Index(idx).SetInt(v) |
| } |
| } else { |
| for idx, v := range values[:length] { |
| output.Index(idx).SetUint(uint64(v)) |
| } |
| } |
| } |
| |
| bitmap := rdr.ReleaseValidBits() |
| if bitmap != nil { |
| defer bitmap.Release() |
| } |
| |
| return array.NewData(dt, rdr.ValuesWritten(), []*memory.Buffer{ |
| bitmap, memory.NewBufferBytes(data), |
| }, nil, int(rdr.NullCount()), 0) |
| } |
| |
| func transferBool(rdr file.RecordReader) arrow.ArrayData { |
| // TODO(mtopol): optimize this so we don't convert bitmap to []bool back to bitmap |
| length := rdr.ValuesWritten() |
| data := make([]byte, int(bitutil.BytesForBits(int64(length)))) |
| bytedata := rdr.Values() |
| values := *(*[]bool)(unsafe.Pointer(&bytedata)) |
| |
| for idx, v := range values[:length] { |
| if v { |
| bitutil.SetBit(data, idx) |
| } |
| } |
| |
| bitmap := rdr.ReleaseValidBits() |
| if bitmap != nil { |
| defer bitmap.Release() |
| } |
| bb := memory.NewBufferBytes(data) |
| defer bb.Release() |
| return array.NewData(&arrow.BooleanType{}, length, []*memory.Buffer{ |
| bitmap, bb, |
| }, nil, int(rdr.NullCount()), 0) |
| } |
| |
| var milliPerDay = time.Duration(24 * time.Hour).Milliseconds() |
| |
| // parquet equivalent for date64 is a 32-bit integer of the number of days |
| // since the epoch. Convert each value to milliseconds for date64 |
| func transferDate64(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData { |
| length := rdr.ValuesWritten() |
| values := arrow.Int32Traits.CastFromBytes(rdr.Values()) |
| |
| data := make([]byte, arrow.Int64Traits.BytesRequired(length)) |
| out := arrow.Int64Traits.CastFromBytes(data) |
| for idx, val := range values[:length] { |
| out[idx] = int64(val) * milliPerDay |
| } |
| |
| bitmap := rdr.ReleaseValidBits() |
| if bitmap != nil { |
| defer bitmap.Release() |
| } |
| return array.NewData(dt, length, []*memory.Buffer{ |
| bitmap, memory.NewBufferBytes(data), |
| }, nil, int(rdr.NullCount()), 0) |
| } |
| |
| // coerce int96 to nanosecond timestamp |
| func transferInt96(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData { |
| length := rdr.ValuesWritten() |
| values := parquet.Int96Traits.CastFromBytes(rdr.Values()) |
| |
| data := make([]byte, arrow.Int64SizeBytes*length) |
| out := arrow.Int64Traits.CastFromBytes(data) |
| |
| for idx, val := range values[:length] { |
| if binary.LittleEndian.Uint32(val[8:]) == 0 { |
| out[idx] = 0 |
| } else { |
| out[idx] = val.ToTime().UnixNano() |
| } |
| } |
| |
| bitmap := rdr.ReleaseValidBits() |
| if bitmap != nil { |
| defer bitmap.Release() |
| } |
| return array.NewData(dt, length, []*memory.Buffer{ |
| bitmap, memory.NewBufferBytes(data), |
| }, nil, int(rdr.NullCount()), 0) |
| } |
| |
| // convert physical integer storage of a decimal logical type to a decimal128 typed array |
| func transferDecimalInteger(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData { |
| length := rdr.ValuesWritten() |
| |
| var values reflect.Value |
| switch rdr.Type() { |
| case parquet.Types.Int32: |
| values = reflect.ValueOf(arrow.Int32Traits.CastFromBytes(rdr.Values())[:length]) |
| case parquet.Types.Int64: |
| values = reflect.ValueOf(arrow.Int64Traits.CastFromBytes(rdr.Values())[:length]) |
| } |
| |
| var data []byte |
| switch dt.ID() { |
| case arrow.DECIMAL128: |
| data = make([]byte, arrow.Decimal128Traits.BytesRequired(length)) |
| out := arrow.Decimal128Traits.CastFromBytes(data) |
| for i := 0; i < values.Len(); i++ { |
| out[i] = decimal128.FromI64(values.Index(i).Int()) |
| } |
| case arrow.DECIMAL256: |
| data = make([]byte, arrow.Decimal256Traits.BytesRequired(length)) |
| out := arrow.Decimal256Traits.CastFromBytes(data) |
| for i := 0; i < values.Len(); i++ { |
| out[i] = decimal256.FromI64(values.Index(i).Int()) |
| } |
| } |
| |
| var nullmap *memory.Buffer |
| if rdr.NullCount() > 0 { |
| nullmap = rdr.ReleaseValidBits() |
| defer nullmap.Release() |
| } |
| return array.NewData(dt, length, []*memory.Buffer{ |
| nullmap, memory.NewBufferBytes(data), |
| }, nil, int(rdr.NullCount()), 0) |
| } |
| |
| func uint64FromBigEndianShifted(buf []byte) uint64 { |
| var ( |
| bytes [8]byte |
| ) |
| copy(bytes[8-len(buf):], buf) |
| return binary.BigEndian.Uint64(bytes[:]) |
| } |
| |
| // parquet's defined encoding for decimal data is for it to be written as big |
| // endian bytes, so convert a bit endian byte order to a decimal128 |
| func bigEndianToDecimal128(buf []byte) (decimal128.Num, error) { |
| const ( |
| minDecimalBytes = 1 |
| maxDecimalBytes = 16 |
| ) |
| |
| if len(buf) < minDecimalBytes || len(buf) > maxDecimalBytes { |
| return decimal128.Num{}, fmt.Errorf("length of byte array passed to bigEndianToDecimal128 was %d but must be between %d and %d", |
| len(buf), minDecimalBytes, maxDecimalBytes) |
| } |
| |
| // bytes are big endian so first byte is MSB and holds the sign bit |
| isNeg := int8(buf[0]) < 0 |
| |
| // 1. extract high bits |
| highBitsOffset := utils.Max(0, len(buf)-8) |
| var ( |
| highBits uint64 |
| lowBits uint64 |
| hi int64 |
| lo int64 |
| ) |
| highBits = uint64FromBigEndianShifted(buf[:highBitsOffset]) |
| |
| if highBitsOffset == 8 { |
| hi = int64(highBits) |
| } else { |
| if isNeg && len(buf) < maxDecimalBytes { |
| hi = -1 |
| } |
| |
| hi = int64(uint64(hi) << (uint64(highBitsOffset) * 8)) |
| hi |= int64(highBits) |
| } |
| |
| // 2. extract lower bits |
| lowBitsOffset := utils.Min(len(buf), 8) |
| lowBits = uint64FromBigEndianShifted(buf[highBitsOffset:]) |
| |
| if lowBitsOffset == 8 { |
| lo = int64(lowBits) |
| } else { |
| if isNeg && len(buf) < 8 { |
| lo = -1 |
| } |
| |
| lo = int64(uint64(lo) << (uint64(lowBitsOffset) * 8)) |
| lo |= int64(lowBits) |
| } |
| |
| return decimal128.New(hi, uint64(lo)), nil |
| } |
| |
| func bigEndianToDecimal256(buf []byte) (decimal256.Num, error) { |
| const ( |
| minDecimalBytes = 1 |
| maxDecimalBytes = 32 |
| ) |
| |
| if len(buf) < minDecimalBytes || len(buf) > maxDecimalBytes { |
| return decimal256.Num{}, |
| fmt.Errorf("%w: length of byte array for bigEndianToDecimal256 was %d but must be between %d and %d", |
| arrow.ErrInvalid, len(buf), minDecimalBytes, maxDecimalBytes) |
| } |
| |
| var littleEndian [4]uint64 |
| // bytes are coming in big-endian, so the first byte is the MSB and |
| // therefore holds the sign bit |
| initWord, isNeg := uint64(0), int8(buf[0]) < 0 |
| if isNeg { |
| // sign extend if necessary |
| initWord = uint64(0xFFFFFFFFFFFFFFFF) |
| } |
| |
| for wordIdx := 0; wordIdx < 4; wordIdx++ { |
| wordLen := utils.Min(len(buf), arrow.Uint64SizeBytes) |
| word := buf[len(buf)-wordLen:] |
| |
| if wordLen == 8 { |
| // full words can be assigned as-is |
| littleEndian[wordIdx] = binary.BigEndian.Uint64(word) |
| } else { |
| result := initWord |
| if len(buf) > 0 { |
| // incorporate the actual values if present |
| // shift left enough bits to make room for the incoming int64 |
| result = result << uint64(wordLen) |
| // preserve the upper bits by inplace OR-ing the int64 |
| result |= uint64FromBigEndianShifted(word) |
| } |
| littleEndian[wordIdx] = result |
| } |
| |
| buf = buf[:len(buf)-wordLen] |
| } |
| |
| return decimal256.New(littleEndian[3], littleEndian[2], littleEndian[1], littleEndian[0]), nil |
| } |
| |
| type varOrFixedBin interface { |
| arrow.Array |
| Value(i int) []byte |
| } |
| |
| // convert physical byte storage, instead of integers, to decimal128 |
| func transferDecimalBytes(rdr file.BinaryRecordReader, dt arrow.DataType) (*arrow.Chunked, error) { |
| convert128 := func(in varOrFixedBin) (arrow.Array, error) { |
| length := in.Len() |
| data := make([]byte, arrow.Decimal128Traits.BytesRequired(length)) |
| out := arrow.Decimal128Traits.CastFromBytes(data) |
| |
| nullCount := in.NullN() |
| var err error |
| for i := 0; i < length; i++ { |
| if nullCount > 0 && in.IsNull(i) { |
| continue |
| } |
| |
| rec := in.Value(i) |
| if len(rec) <= 0 { |
| return nil, fmt.Errorf("invalid BYTEARRAY length for type: %s", dt) |
| } |
| out[i], err = bigEndianToDecimal128(rec) |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| ret := array.NewData(dt, length, []*memory.Buffer{ |
| in.Data().Buffers()[0], memory.NewBufferBytes(data), |
| }, nil, nullCount, 0) |
| defer ret.Release() |
| return array.MakeFromData(ret), nil |
| } |
| |
| convert256 := func(in varOrFixedBin) (arrow.Array, error) { |
| length := in.Len() |
| data := make([]byte, arrow.Decimal256Traits.BytesRequired(length)) |
| out := arrow.Decimal256Traits.CastFromBytes(data) |
| |
| nullCount := in.NullN() |
| var err error |
| for i := 0; i < length; i++ { |
| if nullCount > 0 && in.IsNull(i) { |
| continue |
| } |
| |
| rec := in.Value(i) |
| if len(rec) <= 0 { |
| return nil, fmt.Errorf("invalid BYTEARRAY length for type: %s", dt) |
| } |
| out[i], err = bigEndianToDecimal256(rec) |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| ret := array.NewData(dt, length, []*memory.Buffer{ |
| in.Data().Buffers()[0], memory.NewBufferBytes(data), |
| }, nil, nullCount, 0) |
| defer ret.Release() |
| return array.MakeFromData(ret), nil |
| } |
| |
| convert := func(arr arrow.Array) (arrow.Array, error) { |
| switch dt.ID() { |
| case arrow.DECIMAL128: |
| return convert128(arr.(varOrFixedBin)) |
| case arrow.DECIMAL256: |
| return convert256(arr.(varOrFixedBin)) |
| } |
| return nil, arrow.ErrNotImplemented |
| } |
| |
| chunks := rdr.GetBuilderChunks() |
| var err error |
| for idx, chunk := range chunks { |
| defer chunk.Release() |
| if chunks[idx], err = convert(chunk); err != nil { |
| return nil, err |
| } |
| defer chunks[idx].Release() |
| } |
| return arrow.NewChunked(dt, chunks), nil |
| } |
| |
| func transferDictionary(rdr file.RecordReader, logicalValueType arrow.DataType) *arrow.Chunked { |
| brdr := rdr.(file.BinaryRecordReader) |
| chunks := brdr.GetBuilderChunks() |
| defer releaseArrays(chunks) |
| return arrow.NewChunked(logicalValueType, chunks) |
| } |