blob: 2a25d10e92a4b5262c903afacca61b80666b7823 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package encoding
import (
"bytes"
"reflect"
"github.com/apache/arrow/go/v6/arrow/memory"
"github.com/apache/arrow/go/v6/parquet"
"github.com/apache/arrow/go/v6/parquet/internal/debug"
format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
"github.com/apache/arrow/go/v6/parquet/internal/utils"
"github.com/apache/arrow/go/v6/parquet/schema"
"golang.org/x/xerrors"
)
// DecoderTraits provides an interface for more easily interacting with types
// to generate decoders for specific types.
type DecoderTraits interface {
Decoder(e parquet.Encoding, descr *schema.Column, useDict bool, mem memory.Allocator) TypedDecoder
BytesRequired(int) int
}
// NewDecoder constructs a decoder for a given type and encoding
func NewDecoder(t parquet.Type, e parquet.Encoding, descr *schema.Column, mem memory.Allocator) TypedDecoder {
traits := getDecodingTraits(t)
if traits == nil {
return nil
}
return traits.Decoder(e, descr, false /* use dictionary */, mem)
}
// NewDictDecoder is like NewDecoder but for dictionary encodings, panics if type is bool.
//
// if mem is nil, memory.DefaultAllocator will be used
func NewDictDecoder(t parquet.Type, descr *schema.Column, mem memory.Allocator) DictDecoder {
traits := getDecodingTraits(t)
if traits == nil {
return nil
}
if mem == nil {
mem = memory.DefaultAllocator
}
return traits.Decoder(parquet.Encodings.RLEDict, descr, true /* use dictionary */, mem).(DictDecoder)
}
type decoder struct {
descr *schema.Column
encoding format.Encoding
nvals int
data []byte
typeLen int
}
// newDecoderBase constructs the base decoding object that is embedded in the
// type specific decoders.
func newDecoderBase(e format.Encoding, descr *schema.Column) decoder {
typeLen := -1
if descr != nil && descr.PhysicalType() == parquet.Types.FixedLenByteArray {
typeLen = int(descr.TypeLength())
}
return decoder{
descr: descr,
encoding: e,
typeLen: typeLen,
}
}
// SetData sets the data for decoding into the decoder to update the available
// data bytes and number of values available.
func (d *decoder) SetData(nvals int, data []byte) error {
d.data = data
d.nvals = nvals
return nil
}
// ValuesLeft returns the number of remaining values that can be decoded
func (d *decoder) ValuesLeft() int { return d.nvals }
// Encoding returns the encoding type used by this decoder to decode the bytes.
func (d *decoder) Encoding() parquet.Encoding { return parquet.Encoding(d.encoding) }
type dictDecoder struct {
decoder
mem memory.Allocator
dictValueDecoder utils.DictionaryConverter
idxDecoder *utils.RleDecoder
}
// SetDict sets a decoder that can be used to decode the dictionary that is
// used for this column in order to return the proper values.
func (d *dictDecoder) SetDict(dict TypedDecoder) {
if dict.Type() != d.descr.PhysicalType() {
panic("parquet: mismatch dictionary and column data type")
}
d.dictValueDecoder = NewDictConverter(dict)
}
// SetData sets the index value data into the decoder.
func (d *dictDecoder) SetData(nvals int, data []byte) error {
d.nvals = nvals
if len(data) == 0 {
// no data, bitwidth can safely be 0
d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data), 0 /* bitwidth */)
return nil
}
// grab the bit width from the first byte
width := uint8(data[0])
if width >= 64 {
return xerrors.New("parquet: invalid or corrupted bit width")
}
// pass the rest of the data, minus that first byte, to the decoder
d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data[1:]), int(width))
return nil
}
func (d *dictDecoder) decode(out interface{}) (int, error) {
return d.idxDecoder.GetBatchWithDict(d.dictValueDecoder, out)
}
func (d *dictDecoder) decodeSpaced(out interface{}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
return d.idxDecoder.GetBatchWithDictSpaced(d.dictValueDecoder, out, nullCount, validBits, validBitsOffset)
}
var empty = [1]byte{0}
// spacedExpand is used to take a slice of data and utilize the bitmap provided to fill in nulls into the
// correct slots according to the bitmap in order to produce a fully expanded result slice with nulls
// in the correct slots.
func spacedExpand(buffer interface{}, nullCount int, validBits []byte, validBitsOffset int64) int {
bufferRef := reflect.ValueOf(buffer)
if bufferRef.Kind() != reflect.Slice {
panic("invalid spacedexpand type, not slice")
}
var (
numValues int = bufferRef.Len()
)
idxDecode := int64(numValues - nullCount)
if idxDecode == 0 { // if there's nothing to decode there's nothing to do.
return numValues
}
// read the bitmap in reverse grabbing runs of valid bits where possible.
rdr := utils.NewReverseSetBitRunReader(validBits, validBitsOffset, int64(numValues))
for {
run := rdr.NextRun()
if run.Length == 0 {
break
}
// copy data from the end of the slice to it's proper location in the slice after accounting for the nulls
// because we technically don't care what is in the null slots we don't actually have to clean
// up after ourselves because we're doing this in reverse to guarantee that we'll always simply
// overwrite any existing data with the correctly spaced data. Any data that happens to be left in the null
// slots is fine since it shouldn't matter and saves us work.
idxDecode -= run.Length
n := reflect.Copy(bufferRef.Slice(int(run.Pos), bufferRef.Len()), bufferRef.Slice(int(idxDecode), int(int64(idxDecode)+run.Length)))
debug.Assert(n == int(run.Length), "reflect.Copy copied incorrect number of elements in spacedExpand")
}
return numValues
}