| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package pqarrow |
| |
| import ( |
| "context" |
| "encoding/binary" |
| "errors" |
| "fmt" |
| "math" |
| "time" |
| "unsafe" |
| |
| "github.com/apache/arrow/go/v14/arrow" |
| "github.com/apache/arrow/go/v14/arrow/array" |
| "github.com/apache/arrow/go/v14/arrow/bitutil" |
| "github.com/apache/arrow/go/v14/arrow/decimal128" |
| "github.com/apache/arrow/go/v14/arrow/decimal256" |
| "github.com/apache/arrow/go/v14/arrow/memory" |
| "github.com/apache/arrow/go/v14/internal/utils" |
| "github.com/apache/arrow/go/v14/parquet" |
| "github.com/apache/arrow/go/v14/parquet/file" |
| "github.com/apache/arrow/go/v14/parquet/internal/debug" |
| ) |
| |
| // get the count of the number of leaf arrays for the type |
| func calcLeafCount(dt arrow.DataType) int { |
| switch dt := dt.(type) { |
| case arrow.ExtensionType: |
| return calcLeafCount(dt.StorageType()) |
| case arrow.NestedType: |
| nleaves := 0 |
| for _, f := range dt.Fields() { |
| nleaves += calcLeafCount(f.Type) |
| } |
| return nleaves |
| case *arrow.DictionaryType: |
| return calcLeafCount(dt.ValueType) |
| default: |
| return 1 |
| } |
| } |
| |
| func nullableRoot(manifest *SchemaManifest, field *SchemaField) bool { |
| curField := field |
| nullable := field.Field.Nullable |
| for curField != nil { |
| nullable = curField.Field.Nullable |
| curField = manifest.GetParent(curField) |
| } |
| return nullable |
| } |
| |
| // ArrowColumnWriter is a convenience object for easily writing arrow data to a specific |
| // set of columns in a parquet file. Since a single arrow array can itself be a nested type |
| // consisting of multiple columns of data, this will write to all of the appropriate leaves in |
| // the parquet file, allowing easy writing of nested columns. |
| type ArrowColumnWriter struct { |
| builders []*multipathLevelBuilder |
| leafCount int |
| colIdx int |
| rgw file.RowGroupWriter |
| } |
| |
| // NewArrowColumnWriter returns a new writer using the chunked array to determine the number of leaf columns, |
| // and the provided schema manifest to determine the paths for writing the columns. |
| // |
| // Using an arrow column writer is a convenience to avoid having to process the arrow array yourself |
| // and determine the correct definition and repetition levels manually. |
| func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, col int) (ArrowColumnWriter, error) { |
| if data.Len() == 0 { |
| return ArrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil |
| } |
| |
| var ( |
| absPos int64 |
| chunkOffset int64 |
| chunkIdx int |
| values int64 |
| ) |
| |
| for idx, chnk := range data.Chunks() { |
| chunkIdx = idx |
| if absPos >= offset { |
| break |
| } |
| |
| chunkLen := int64(chnk.Len()) |
| if absPos+chunkLen > offset { |
| chunkOffset = offset - absPos |
| break |
| } |
| |
| absPos += chunkLen |
| } |
| |
| if absPos >= int64(data.Len()) { |
| return ArrowColumnWriter{}, errors.New("cannot write data at offset past end of chunked array") |
| } |
| |
| leafCount := calcLeafCount(data.DataType()) |
| isNullable := false |
| // row group writer hasn't been advanced yet so add 1 to the current |
| // which is the one this instance will start writing for |
| // colIdx := rgw.CurrentColumn() + 1 |
| |
| schemaField, err := manifest.GetColumnField(col) |
| if err != nil { |
| return ArrowColumnWriter{}, err |
| } |
| isNullable = nullableRoot(manifest, schemaField) |
| |
| builders := make([]*multipathLevelBuilder, 0) |
| for values < size { |
| chunk := data.Chunk(chunkIdx) |
| available := int64(chunk.Len() - int(chunkOffset)) |
| chunkWriteSize := utils.Min(size-values, available) |
| |
| // the chunk offset will be 0 here except for possibly the first chunk |
| // because of the above advancing logic |
| arrToWrite := array.NewSlice(chunk, chunkOffset, chunkOffset+chunkWriteSize) |
| defer arrToWrite.Release() |
| |
| if arrToWrite.Len() > 0 { |
| bldr, err := newMultipathLevelBuilder(arrToWrite, isNullable) |
| if err != nil { |
| return ArrowColumnWriter{}, nil |
| } |
| if leafCount != bldr.leafCount() { |
| return ArrowColumnWriter{}, fmt.Errorf("data type leaf_count != builder leafcount: %d - %d", leafCount, bldr.leafCount()) |
| } |
| builders = append(builders, bldr) |
| } |
| |
| if chunkWriteSize == available { |
| chunkOffset = 0 |
| chunkIdx++ |
| } |
| values += chunkWriteSize |
| } |
| |
| return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: col}, nil |
| } |
| |
| func (acw *ArrowColumnWriter) Write(ctx context.Context) error { |
| arrCtx := arrowCtxFromContext(ctx) |
| for leafIdx := 0; leafIdx < acw.leafCount; leafIdx++ { |
| var ( |
| cw file.ColumnChunkWriter |
| err error |
| ) |
| |
| if acw.rgw.Buffered() { |
| cw, err = acw.rgw.(file.BufferedRowGroupWriter).Column(acw.colIdx + leafIdx) |
| } else { |
| cw, err = acw.rgw.(file.SerialRowGroupWriter).NextColumn() |
| } |
| |
| if err != nil { |
| return err |
| } |
| |
| for _, bldr := range acw.builders { |
| if leafIdx == 0 { |
| defer bldr.Release() |
| } |
| res, err := bldr.write(leafIdx, arrCtx) |
| if err != nil { |
| return err |
| } |
| defer res.Release() |
| |
| if len(res.postListVisitedElems) != 1 { |
| return errors.New("lists with non-zero length null components are not supported") |
| } |
| rng := res.postListVisitedElems[0] |
| values := array.NewSlice(res.leafArr, rng.start, rng.end) |
| defer values.Release() |
| if err = WriteArrowToColumn(ctx, cw, values, res.defLevels, res.repLevels, res.leafIsNullable); err != nil { |
| return err |
| } |
| } |
| } |
| return nil |
| } |
| |
| // WriteArrowToColumn writes apache arrow columnar data directly to a ColumnWriter. |
| // Returns non-nil error if the array data type is not compatible with the concrete |
| // writer type. |
| // |
| // leafArr is always a primitive (possibly dictionary encoded type). |
| // Leaf_field_nullable indicates whether the leaf array is considered nullable |
| // according to its schema in a Table or its parent array. |
| func WriteArrowToColumn(ctx context.Context, cw file.ColumnChunkWriter, leafArr arrow.Array, defLevels, repLevels []int16, leafFieldNullable bool) error { |
| // Leaf nulls are canonical when there is only a single null element after a list |
| // and it is at the leaf. |
| colLevelInfo := cw.LevelInfo() |
| singleNullable := (colLevelInfo.DefLevel == colLevelInfo.RepeatedAncestorDefLevel+1) && leafFieldNullable |
| maybeParentNulls := colLevelInfo.HasNullableValues() && !singleNullable |
| |
| if maybeParentNulls && !cw.HasBitsBuffer() { |
| buf := memory.NewResizableBuffer(cw.Properties().Allocator()) |
| buf.Resize(int(bitutil.BytesForBits(cw.Properties().WriteBatchSize()))) |
| cw.SetBitsBuffer(buf) |
| } |
| |
| arrCtx := arrowCtxFromContext(ctx) |
| defer func() { |
| if arrCtx.dataBuffer != nil { |
| arrCtx.dataBuffer.Release() |
| arrCtx.dataBuffer = nil |
| } |
| }() |
| |
| if leafArr.DataType().ID() == arrow.DICTIONARY { |
| return writeDictionaryArrow(arrCtx, cw, leafArr, defLevels, repLevels, maybeParentNulls) |
| } |
| return writeDenseArrow(arrCtx, cw, leafArr, defLevels, repLevels, maybeParentNulls) |
| } |
| |
| type binaryarr interface { |
| ValueOffsets() []int32 |
| } |
| |
| type binary64arr interface { |
| ValueOffsets() []int64 |
| } |
| |
| func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr arrow.Array, defLevels, repLevels []int16, maybeParentNulls bool) (err error) { |
| if leafArr.DataType().ID() == arrow.EXTENSION { |
| extensionArray := leafArr.(array.ExtensionArray) |
| // Replace leafArr with its underlying storage array |
| leafArr = extensionArray.Storage() |
| } |
| |
| noNulls := cw.Descr().SchemaNode().RepetitionType() == parquet.Repetitions.Required || leafArr.NullN() == 0 |
| |
| if ctx.dataBuffer == nil { |
| ctx.dataBuffer = memory.NewResizableBuffer(cw.Properties().Allocator()) |
| } |
| |
| switch wr := cw.(type) { |
| case *file.BooleanColumnChunkWriter: |
| if leafArr.DataType().ID() != arrow.BOOL { |
| return fmt.Errorf("type mismatch, column is %s, array is %s", cw.Type(), leafArr.DataType().ID()) |
| } |
| // TODO(mtopol): optimize this so that we aren't converting from |
| // the bitmap -> []bool -> bitmap anymore |
| if leafArr.Len() == 0 { |
| _, err = wr.WriteBatch(nil, defLevels, repLevels) |
| break |
| } |
| |
| ctx.dataBuffer.ResizeNoShrink(leafArr.Len()) |
| buf := ctx.dataBuffer.Bytes() |
| data := *(*[]bool)(unsafe.Pointer(&buf)) |
| for idx := range data { |
| data[idx] = leafArr.(*array.Boolean).Value(idx) |
| } |
| if !maybeParentNulls && noNulls { |
| wr.WriteBatch(data, defLevels, repLevels) |
| } else { |
| wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset())) |
| } |
| case *file.Int32ColumnChunkWriter: |
| var data []int32 |
| switch leafArr.DataType().ID() { |
| case arrow.INT32: |
| data = leafArr.(*array.Int32).Int32Values() |
| case arrow.DATE32, arrow.UINT32: |
| if leafArr.Data().Buffers()[1] != nil { |
| data = arrow.Int32Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes()) |
| data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()] |
| } |
| case arrow.TIME32: |
| if leafArr.DataType().(*arrow.Time32Type).Unit != arrow.Second { |
| if leafArr.Data().Buffers()[1] != nil { |
| data = arrow.Int32Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes()) |
| data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()] |
| } |
| } else { // coerce time32 if necessary by multiplying by 1000 |
| ctx.dataBuffer.ResizeNoShrink(arrow.Int32Traits.BytesRequired(leafArr.Len())) |
| data = arrow.Int32Traits.CastFromBytes(ctx.dataBuffer.Bytes()) |
| for idx, val := range leafArr.(*array.Time32).Time32Values() { |
| data[idx] = int32(val) * 1000 |
| } |
| } |
| case arrow.NULL: |
| wr.WriteBatchSpaced(nil, defLevels, repLevels, leafArr.NullBitmapBytes(), 0) |
| return |
| |
| default: |
| // simple integral cases, parquet physical storage is int32 or int64 |
| // so we have to create a new array of int32's for anything smaller than |
| // 32-bits |
| ctx.dataBuffer.ResizeNoShrink(arrow.Int32Traits.BytesRequired(leafArr.Len())) |
| data = arrow.Int32Traits.CastFromBytes(ctx.dataBuffer.Bytes()) |
| switch leafArr.DataType().ID() { |
| case arrow.UINT8: |
| for idx, val := range leafArr.(*array.Uint8).Uint8Values() { |
| data[idx] = int32(val) |
| } |
| case arrow.INT8: |
| for idx, val := range leafArr.(*array.Int8).Int8Values() { |
| data[idx] = int32(val) |
| } |
| case arrow.UINT16: |
| for idx, val := range leafArr.(*array.Uint16).Uint16Values() { |
| data[idx] = int32(val) |
| } |
| case arrow.INT16: |
| for idx, val := range leafArr.(*array.Int16).Int16Values() { |
| data[idx] = int32(val) |
| } |
| case arrow.DATE64: |
| for idx, val := range leafArr.(*array.Date64).Date64Values() { |
| data[idx] = int32(val / 86400000) // coerce date64 values |
| } |
| case arrow.DECIMAL128: |
| for idx, val := range leafArr.(*array.Decimal128).Values() { |
| debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "casting Decimal128 greater than the value range; high bits must be 0 or -1") |
| debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32") |
| data[idx] = int32(val.LowBits()) |
| } |
| case arrow.DECIMAL256: |
| for idx, val := range leafArr.(*array.Decimal256).Values() { |
| debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "casting Decimal128 greater than the value range; high bits must be 0 or -1") |
| debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32") |
| data[idx] = int32(val.LowBits()) |
| } |
| default: |
| return fmt.Errorf("type mismatch, column is int32 writer, arrow array is %s, and not a compatible type", leafArr.DataType().Name()) |
| } |
| } |
| |
| if !maybeParentNulls && noNulls { |
| _, err = wr.WriteBatch(data, defLevels, repLevels) |
| } else { |
| nulls := leafArr.NullBitmapBytes() |
| wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset())) |
| } |
| case *file.Int64ColumnChunkWriter: |
| var data []int64 |
| switch leafArr.DataType().ID() { |
| case arrow.TIMESTAMP: |
| tstype := leafArr.DataType().(*arrow.TimestampType) |
| if ctx.props.coerceTimestamps { |
| // user explicitly requested coercion to specific unit |
| if tstype.Unit == ctx.props.coerceTimestampUnit { |
| // no conversion necessary |
| if leafArr.Data().Buffers()[1] != nil { |
| data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes()) |
| data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()] |
| } |
| } else { |
| ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len())) |
| data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes()) |
| if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &ctx.props, data); err != nil { |
| return err |
| } |
| } |
| } else if (cw.Properties().Version() == parquet.V1_0 || cw.Properties().Version() == parquet.V2_4) && tstype.Unit == arrow.Nanosecond { |
| // absent superceding user instructions, when writing a Parquet Version <=2.4 File, |
| // timestamps in nanoseconds are coerced to microseconds |
| ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len())) |
| data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes()) |
| p := NewArrowWriterProperties(WithCoerceTimestamps(arrow.Microsecond), WithTruncatedTimestamps(true)) |
| if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &p, data); err != nil { |
| return err |
| } |
| } else if tstype.Unit == arrow.Second { |
| // absent superceding user instructions, timestamps in seconds are coerced |
| // to milliseconds |
| p := NewArrowWriterProperties(WithCoerceTimestamps(arrow.Millisecond)) |
| ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len())) |
| data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes()) |
| if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &p, data); err != nil { |
| return err |
| } |
| } else { |
| // no data conversion neccessary |
| if leafArr.Data().Buffers()[1] != nil { |
| data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes()) |
| data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()] |
| } |
| } |
| case arrow.UINT32: |
| ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len())) |
| data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes()) |
| for idx, val := range leafArr.(*array.Uint32).Uint32Values() { |
| data[idx] = int64(val) |
| } |
| case arrow.INT64: |
| data = leafArr.(*array.Int64).Int64Values() |
| case arrow.UINT64, arrow.TIME64, arrow.DATE64: |
| if leafArr.Data().Buffers()[1] != nil { |
| data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes()) |
| data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()] |
| } |
| case arrow.DECIMAL128: |
| ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len())) |
| data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes()) |
| for idx, val := range leafArr.(*array.Decimal128).Values() { |
| debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1") |
| data[idx] = int64(val.LowBits()) |
| } |
| case arrow.DECIMAL256: |
| ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len())) |
| data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes()) |
| for idx, val := range leafArr.(*array.Decimal256).Values() { |
| debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1") |
| data[idx] = int64(val.LowBits()) |
| } |
| default: |
| return fmt.Errorf("unimplemented arrow type to write to int64 column: %s", leafArr.DataType().Name()) |
| } |
| |
| if !maybeParentNulls && noNulls { |
| _, err = wr.WriteBatch(data, defLevels, repLevels) |
| } else { |
| nulls := leafArr.NullBitmapBytes() |
| wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset())) |
| } |
| case *file.Int96ColumnChunkWriter: |
| if leafArr.DataType().ID() != arrow.TIMESTAMP { |
| return errors.New("unsupported arrow type to write to Int96 column") |
| } |
| ctx.dataBuffer.ResizeNoShrink(parquet.Int96Traits.BytesRequired(leafArr.Len())) |
| data := parquet.Int96Traits.CastFromBytes(ctx.dataBuffer.Bytes()) |
| input := leafArr.(*array.Timestamp).TimestampValues() |
| unit := leafArr.DataType().(*arrow.TimestampType).Unit |
| for idx, val := range input { |
| arrowTimestampToImpalaTimestamp(unit, int64(val), &data[idx]) |
| } |
| |
| if !maybeParentNulls && noNulls { |
| _, err = wr.WriteBatch(data, defLevels, repLevels) |
| } else { |
| nulls := leafArr.NullBitmapBytes() |
| wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset())) |
| } |
| case *file.Float32ColumnChunkWriter: |
| if leafArr.DataType().ID() != arrow.FLOAT32 { |
| return errors.New("invalid column type to write to Float") |
| } |
| if !maybeParentNulls && noNulls { |
| _, err = wr.WriteBatch(leafArr.(*array.Float32).Float32Values(), defLevels, repLevels) |
| } else { |
| wr.WriteBatchSpaced(leafArr.(*array.Float32).Float32Values(), defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset())) |
| } |
| case *file.Float64ColumnChunkWriter: |
| if leafArr.DataType().ID() != arrow.FLOAT64 { |
| return errors.New("invalid column type to write to Float") |
| } |
| if !maybeParentNulls && noNulls { |
| _, err = wr.WriteBatch(leafArr.(*array.Float64).Float64Values(), defLevels, repLevels) |
| } else { |
| wr.WriteBatchSpaced(leafArr.(*array.Float64).Float64Values(), defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset())) |
| } |
| case *file.ByteArrayColumnChunkWriter: |
| var ( |
| buffer = leafArr.Data().Buffers()[2] |
| valueBuf []byte |
| ) |
| |
| if buffer == nil { |
| valueBuf = []byte{} |
| } else { |
| valueBuf = buffer.Bytes() |
| } |
| |
| data := make([]parquet.ByteArray, leafArr.Len()) |
| switch leafArr.DataType().ID() { |
| case arrow.BINARY, arrow.STRING: |
| offsets := leafArr.(binaryarr).ValueOffsets() |
| for i := range data { |
| data[i] = parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]]) |
| } |
| case arrow.LARGE_BINARY, arrow.LARGE_STRING: |
| offsets := leafArr.(binary64arr).ValueOffsets() |
| for i := range data { |
| data[i] = parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]]) |
| } |
| default: |
| return fmt.Errorf("%w: invalid column type to write to ByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name()) |
| } |
| |
| if !maybeParentNulls && noNulls { |
| _, err = wr.WriteBatch(data, defLevels, repLevels) |
| } else { |
| wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset())) |
| } |
| |
| case *file.FixedLenByteArrayColumnChunkWriter: |
| switch dt := leafArr.DataType().(type) { |
| case *arrow.FixedSizeBinaryType: |
| data := make([]parquet.FixedLenByteArray, leafArr.Len()) |
| for idx := range data { |
| data[idx] = leafArr.(*array.FixedSizeBinary).Value(idx) |
| } |
| if !maybeParentNulls && noNulls { |
| _, err = wr.WriteBatch(data, defLevels, repLevels) |
| } else { |
| wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset())) |
| } |
| case *arrow.Decimal128Type: |
| // parquet decimal are stored with FixedLength values where the length is |
| // proportional to the precision. Arrow's Decimal are always stored with 16/32 |
| // bytes. thus the internal FLBA must be adjusted by the offset calculation |
| offset := int(bitutil.BytesForBits(int64(dt.BitWidth()))) - int(DecimalSize(dt.Precision)) |
| ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * dt.BitWidth()) |
| scratch := ctx.dataBuffer.Bytes() |
| typeLen := wr.Descr().TypeLength() |
| fixDecimalEndianness := func(in decimal128.Num) parquet.FixedLenByteArray { |
| out := scratch[offset : offset+typeLen] |
| binary.BigEndian.PutUint64(scratch, uint64(in.HighBits())) |
| binary.BigEndian.PutUint64(scratch[arrow.Uint64SizeBytes:], in.LowBits()) |
| scratch = scratch[2*arrow.Uint64SizeBytes:] |
| return out |
| } |
| |
| data := make([]parquet.FixedLenByteArray, leafArr.Len()) |
| arr := leafArr.(*array.Decimal128) |
| if leafArr.NullN() == 0 { |
| for idx := range data { |
| data[idx] = fixDecimalEndianness(arr.Value(idx)) |
| } |
| _, err = wr.WriteBatch(data, defLevels, repLevels) |
| } else { |
| for idx := range data { |
| if arr.IsValid(idx) { |
| data[idx] = fixDecimalEndianness(arr.Value(idx)) |
| } |
| } |
| wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset())) |
| } |
| case *arrow.Decimal256Type: |
| // parquet decimal are stored with FixedLength values where the length is |
| // proportional to the precision. Arrow's Decimal are always stored with 16/32 |
| // bytes. thus the internal FLBA must be adjusted by the offset calculation |
| offset := int(bitutil.BytesForBits(int64(dt.BitWidth()))) - int(DecimalSize(dt.Precision)) |
| ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * dt.BitWidth()) |
| scratch := ctx.dataBuffer.Bytes() |
| typeLen := wr.Descr().TypeLength() |
| fixDecimalEndianness := func(in decimal256.Num) parquet.FixedLenByteArray { |
| out := scratch[offset : offset+typeLen] |
| vals := in.Array() |
| binary.BigEndian.PutUint64(scratch, vals[3]) |
| binary.BigEndian.PutUint64(scratch[arrow.Uint64SizeBytes:], vals[2]) |
| binary.BigEndian.PutUint64(scratch[2*arrow.Uint64SizeBytes:], vals[1]) |
| binary.BigEndian.PutUint64(scratch[3*arrow.Uint64SizeBytes:], vals[0]) |
| scratch = scratch[4*arrow.Uint64SizeBytes:] |
| return out |
| } |
| |
| data := make([]parquet.FixedLenByteArray, leafArr.Len()) |
| arr := leafArr.(*array.Decimal256) |
| if leafArr.NullN() == 0 { |
| for idx := range data { |
| data[idx] = fixDecimalEndianness(arr.Value(idx)) |
| } |
| _, err = wr.WriteBatch(data, defLevels, repLevels) |
| } else { |
| for idx := range data { |
| if arr.IsValid(idx) { |
| data[idx] = fixDecimalEndianness(arr.Value(idx)) |
| } |
| } |
| wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset())) |
| } |
| default: |
| return fmt.Errorf("%w: invalid column type to write to FixedLenByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name()) |
| } |
| default: |
| return errors.New("unknown column writer physical type") |
| } |
| return |
| } |
| |
| type coerceType int8 |
| |
| const ( |
| coerceInvalid coerceType = iota |
| coerceDivide |
| coerceMultiply |
| ) |
| |
| type coercePair struct { |
| typ coerceType |
| factor int64 |
| } |
| |
| var factors = map[arrow.TimeUnit]map[arrow.TimeUnit]coercePair{ |
| arrow.Second: { |
| arrow.Second: {coerceInvalid, 0}, |
| arrow.Millisecond: {coerceMultiply, 1000}, |
| arrow.Microsecond: {coerceMultiply, 1000000}, |
| arrow.Nanosecond: {coerceMultiply, 1000000000}, |
| }, |
| arrow.Millisecond: { |
| arrow.Second: {coerceInvalid, 0}, |
| arrow.Millisecond: {coerceMultiply, 1}, |
| arrow.Microsecond: {coerceMultiply, 1000}, |
| arrow.Nanosecond: {coerceMultiply, 1000000}, |
| }, |
| arrow.Microsecond: { |
| arrow.Second: {coerceInvalid, 0}, |
| arrow.Millisecond: {coerceDivide, 1000}, |
| arrow.Microsecond: {coerceMultiply, 1}, |
| arrow.Nanosecond: {coerceMultiply, 1000}, |
| }, |
| arrow.Nanosecond: { |
| arrow.Second: {coerceInvalid, 0}, |
| arrow.Millisecond: {coerceDivide, 1000000}, |
| arrow.Microsecond: {coerceDivide, 1000}, |
| arrow.Nanosecond: {coerceMultiply, 1}, |
| }, |
| } |
| |
| func writeCoerceTimestamps(arr *array.Timestamp, props *ArrowWriterProperties, out []int64) error { |
| source := arr.DataType().(*arrow.TimestampType).Unit |
| target := props.coerceTimestampUnit |
| truncation := props.allowTruncatedTimestamps |
| |
| vals := arr.TimestampValues() |
| multiply := func(factor int64) error { |
| for idx, val := range vals { |
| out[idx] = int64(val) * factor |
| } |
| return nil |
| } |
| |
| divide := func(factor int64) error { |
| for idx, val := range vals { |
| if !truncation && arr.IsValid(idx) && (int64(val)%factor != 0) { |
| return fmt.Errorf("casting from %s to %s would lose data", source, target) |
| } |
| out[idx] = int64(val) / factor |
| } |
| return nil |
| } |
| |
| coerce := factors[source][target] |
| switch coerce.typ { |
| case coerceMultiply: |
| return multiply(coerce.factor) |
| case coerceDivide: |
| return divide(coerce.factor) |
| default: |
| panic("invalid coercion") |
| } |
| } |
| |
| const ( |
| julianEpochOffsetDays int64 = 2440588 |
| nanoSecondsPerDay = 24 * 60 * 60 * 1000 * 1000 * 1000 |
| ) |
| |
| func arrowTimestampToImpalaTimestamp(unit arrow.TimeUnit, t int64, out *parquet.Int96) { |
| var d time.Duration |
| switch unit { |
| case arrow.Second: |
| d = time.Duration(t) * time.Second |
| case arrow.Microsecond: |
| d = time.Duration(t) * time.Microsecond |
| case arrow.Millisecond: |
| d = time.Duration(t) * time.Millisecond |
| case arrow.Nanosecond: |
| d = time.Duration(t) * time.Nanosecond |
| } |
| |
| julianDays := (int64(d.Hours()) / 24) + julianEpochOffsetDays |
| lastDayNanos := t % (nanoSecondsPerDay) |
| binary.LittleEndian.PutUint64((*out)[:8], uint64(lastDayNanos)) |
| binary.LittleEndian.PutUint32((*out)[8:], uint32(julianDays)) |
| } |