| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package encoding |
| |
| import ( |
| "bytes" |
| "reflect" |
| |
| "github.com/apache/arrow/go/v6/arrow/memory" |
| "github.com/apache/arrow/go/v6/parquet" |
| "github.com/apache/arrow/go/v6/parquet/internal/debug" |
| format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet" |
| "github.com/apache/arrow/go/v6/parquet/internal/utils" |
| "github.com/apache/arrow/go/v6/parquet/schema" |
| "golang.org/x/xerrors" |
| ) |
| |
| // DecoderTraits provides an interface for more easily interacting with types |
| // to generate decoders for specific types. |
| type DecoderTraits interface { |
| Decoder(e parquet.Encoding, descr *schema.Column, useDict bool, mem memory.Allocator) TypedDecoder |
| BytesRequired(int) int |
| } |
| |
| // NewDecoder constructs a decoder for a given type and encoding |
| func NewDecoder(t parquet.Type, e parquet.Encoding, descr *schema.Column, mem memory.Allocator) TypedDecoder { |
| traits := getDecodingTraits(t) |
| if traits == nil { |
| return nil |
| } |
| |
| return traits.Decoder(e, descr, false /* use dictionary */, mem) |
| } |
| |
| // NewDictDecoder is like NewDecoder but for dictionary encodings, panics if type is bool. |
| // |
| // if mem is nil, memory.DefaultAllocator will be used |
| func NewDictDecoder(t parquet.Type, descr *schema.Column, mem memory.Allocator) DictDecoder { |
| traits := getDecodingTraits(t) |
| if traits == nil { |
| return nil |
| } |
| |
| if mem == nil { |
| mem = memory.DefaultAllocator |
| } |
| |
| return traits.Decoder(parquet.Encodings.RLEDict, descr, true /* use dictionary */, mem).(DictDecoder) |
| } |
| |
| type decoder struct { |
| descr *schema.Column |
| encoding format.Encoding |
| nvals int |
| data []byte |
| typeLen int |
| } |
| |
| // newDecoderBase constructs the base decoding object that is embedded in the |
| // type specific decoders. |
| func newDecoderBase(e format.Encoding, descr *schema.Column) decoder { |
| typeLen := -1 |
| if descr != nil && descr.PhysicalType() == parquet.Types.FixedLenByteArray { |
| typeLen = int(descr.TypeLength()) |
| } |
| |
| return decoder{ |
| descr: descr, |
| encoding: e, |
| typeLen: typeLen, |
| } |
| } |
| |
| // SetData sets the data for decoding into the decoder to update the available |
| // data bytes and number of values available. |
| func (d *decoder) SetData(nvals int, data []byte) error { |
| d.data = data |
| d.nvals = nvals |
| return nil |
| } |
| |
| // ValuesLeft returns the number of remaining values that can be decoded |
| func (d *decoder) ValuesLeft() int { return d.nvals } |
| |
| // Encoding returns the encoding type used by this decoder to decode the bytes. |
| func (d *decoder) Encoding() parquet.Encoding { return parquet.Encoding(d.encoding) } |
| |
| type dictDecoder struct { |
| decoder |
| mem memory.Allocator |
| dictValueDecoder utils.DictionaryConverter |
| idxDecoder *utils.RleDecoder |
| } |
| |
| // SetDict sets a decoder that can be used to decode the dictionary that is |
| // used for this column in order to return the proper values. |
| func (d *dictDecoder) SetDict(dict TypedDecoder) { |
| if dict.Type() != d.descr.PhysicalType() { |
| panic("parquet: mismatch dictionary and column data type") |
| } |
| |
| d.dictValueDecoder = NewDictConverter(dict) |
| } |
| |
| // SetData sets the index value data into the decoder. |
| func (d *dictDecoder) SetData(nvals int, data []byte) error { |
| d.nvals = nvals |
| if len(data) == 0 { |
| // no data, bitwidth can safely be 0 |
| d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data), 0 /* bitwidth */) |
| return nil |
| } |
| |
| // grab the bit width from the first byte |
| width := uint8(data[0]) |
| if width >= 64 { |
| return xerrors.New("parquet: invalid or corrupted bit width") |
| } |
| |
| // pass the rest of the data, minus that first byte, to the decoder |
| d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data[1:]), int(width)) |
| return nil |
| } |
| |
| func (d *dictDecoder) decode(out interface{}) (int, error) { |
| return d.idxDecoder.GetBatchWithDict(d.dictValueDecoder, out) |
| } |
| |
| func (d *dictDecoder) decodeSpaced(out interface{}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { |
| return d.idxDecoder.GetBatchWithDictSpaced(d.dictValueDecoder, out, nullCount, validBits, validBitsOffset) |
| } |
| |
| var empty = [1]byte{0} |
| |
| // spacedExpand is used to take a slice of data and utilize the bitmap provided to fill in nulls into the |
| // correct slots according to the bitmap in order to produce a fully expanded result slice with nulls |
| // in the correct slots. |
| func spacedExpand(buffer interface{}, nullCount int, validBits []byte, validBitsOffset int64) int { |
| bufferRef := reflect.ValueOf(buffer) |
| if bufferRef.Kind() != reflect.Slice { |
| panic("invalid spacedexpand type, not slice") |
| } |
| |
| var ( |
| numValues int = bufferRef.Len() |
| ) |
| |
| idxDecode := int64(numValues - nullCount) |
| if idxDecode == 0 { // if there's nothing to decode there's nothing to do. |
| return numValues |
| } |
| |
| // read the bitmap in reverse grabbing runs of valid bits where possible. |
| rdr := utils.NewReverseSetBitRunReader(validBits, validBitsOffset, int64(numValues)) |
| for { |
| run := rdr.NextRun() |
| if run.Length == 0 { |
| break |
| } |
| |
| // copy data from the end of the slice to it's proper location in the slice after accounting for the nulls |
| // because we technically don't care what is in the null slots we don't actually have to clean |
| // up after ourselves because we're doing this in reverse to guarantee that we'll always simply |
| // overwrite any existing data with the correctly spaced data. Any data that happens to be left in the null |
| // slots is fine since it shouldn't matter and saves us work. |
| idxDecode -= run.Length |
| n := reflect.Copy(bufferRef.Slice(int(run.Pos), bufferRef.Len()), bufferRef.Slice(int(idxDecode), int(int64(idxDecode)+run.Length))) |
| debug.Assert(n == int(run.Length), "reflect.Copy copied incorrect number of elements in spacedExpand") |
| } |
| |
| return numValues |
| } |