blob: e4d3ab7aba5b0e9a848227344a8538c1f3e7b782 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package encoding
import (
"github.com/apache/arrow/go/v6/parquet"
"github.com/apache/arrow/go/v6/parquet/schema"
format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
"github.com/apache/arrow/go/v6/arrow"
"github.com/apache/arrow/go/v6/parquet/internal/utils"
)
// fully typed encoder interfaces to enable writing against encoder/decoders
// without having to care about what encoding type is actually being used.
var (
{{range .In}}
{{.Name}}EncoderTraits {{.lower}}EncoderTraits
{{.Name}}DecoderTraits {{.lower}}DecoderTraits
{{- end}}
)
{{range .In}}
// {{.Name}}Encoder is the interface for all encoding types that implement encoding
// {{.name}} values.
type {{.Name}}Encoder interface {
TypedEncoder
Put([]{{.name}})
PutSpaced([]{{.name}}, []byte, int64)
}
// {{.Name}}Decoder is the interface for all encoding types that implement decoding
// {{.name}} values.
type {{.Name}}Decoder interface {
TypedDecoder
Decode([]{{.name}}) (int, error)
DecodeSpaced([]{{.name}}, int, []byte, int64) (int, error)
}
// the {{.lower}}EncoderTraits struct is used to make it easy to create encoders and decoders based on type
type {{.lower}}EncoderTraits struct{}
// Encoder returns an encoder for {{.lower}} type data, using the specified encoding type and whether or not
// it should be dictionary encoded.
{{- if or (eq .Name "Boolean") (eq .Name "Int96")}}
// dictionary encoding does not exist for this type and Encoder will panic if useDict is true
{{- end }}
func ({{.lower}}EncoderTraits) Encoder(e format.Encoding, useDict bool, descr *schema.Column, mem memory.Allocator) TypedEncoder {
if useDict {
{{- if or (eq .Name "Boolean") (eq .Name "Int96")}}
panic("parquet: no {{.name}} dictionary encoding")
{{- else}}
return &Dict{{.Name}}Encoder{newDictEncoderBase(descr, New{{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}}{{.Name}}Dictionary(){{else}}BinaryDictionary(mem){{end}}, mem)}
{{- end}}
}
switch e {
case format.Encoding_PLAIN:
return &Plain{{.Name}}Encoder{encoder: newEncoderBase(e, descr, mem)}
{{- if or (eq .Name "Int32") (eq .Name "Int64")}}
case format.Encoding_DELTA_BINARY_PACKED:
return DeltaBitPack{{.Name}}Encoder{&deltaBitPackEncoder{
encoder: newEncoderBase(e, descr, mem)}}
{{- end}}
{{- if eq .Name "ByteArray"}}
case format.Encoding_DELTA_LENGTH_BYTE_ARRAY:
return &DeltaLengthByteArrayEncoder{
encoder: newEncoderBase(e, descr, mem),
lengthEncoder: &DeltaBitPackInt32Encoder{
&deltaBitPackEncoder{encoder: newEncoderBase(e, descr, mem)}},
}
case format.Encoding_DELTA_BYTE_ARRAY:
return &DeltaByteArrayEncoder{
encoder: newEncoderBase(e, descr, mem),
}
{{- end}}
default:
panic("unimplemented encoding type")
}
}
// {{.lower}}DecoderTraits is a helper struct for providing information regardless of the type
// and used as a generic way to create a Decoder or Dictionary Decoder for {{.lower}} values
type {{.lower}}DecoderTraits struct{}
// BytesRequired returns the number of bytes required to store n {{.lower}} values.
func ({{.lower}}DecoderTraits) BytesRequired(n int) int {
return {{.prefix}}.{{.Name}}Traits.BytesRequired(n)
}
// Decoder returns a decoder for {{.lower}} typed data of the requested encoding type if available
func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column, useDict bool, mem memory.Allocator) TypedDecoder {
if useDict {
{{- if and (ne .Name "Boolean") (ne .Name "Int96")}}
return &Dict{{.Name}}Decoder{dictDecoder{decoder: newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
{{- else}}
panic("dictionary decoding unimplemented for {{.lower}}")
{{- end}}
}
switch e {
case parquet.Encodings.Plain:
return &Plain{{.Name}}Decoder{decoder: newDecoderBase(format.Encoding(e), descr)}
{{- if or (eq .Name "Int32") (eq .Name "Int64")}}
case parquet.Encodings.DeltaBinaryPacked:
if mem == nil {
mem = memory.DefaultAllocator
}
return &DeltaBitPack{{.Name}}Decoder{
deltaBitPackDecoder: &deltaBitPackDecoder{
decoder: newDecoderBase(format.Encoding(e), descr),
mem: mem,
}}
{{- end}}
{{- if eq .Name "ByteArray"}}
case parquet.Encodings.DeltaLengthByteArray:
if mem == nil {
mem = memory.DefaultAllocator
}
return &DeltaLengthByteArrayDecoder{
decoder: newDecoderBase(format.Encoding(e), descr),
mem: mem,
}
case parquet.Encodings.DeltaByteArray:
if mem == nil {
mem = memory.DefaultAllocator
}
return &DeltaByteArrayDecoder{
DeltaLengthByteArrayDecoder: &DeltaLengthByteArrayDecoder{
decoder: newDecoderBase(format.Encoding(e), descr),
mem: mem,
}}
{{- end}}
default:
panic("unimplemented encoding type")
}
}
{{if and (ne .Name "Boolean") (ne .Name "Int96")}}
// Dict{{.Name}}Encoder is an encoder for {{.name}} data using dictionary encoding
type Dict{{.Name}}Encoder struct {
dictEncoder
}
// Type returns the underlying physical type that can be encoded with this encoder
func (enc *Dict{{.Name}}Encoder) Type() parquet.Type {
return parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}
}
{{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}}
// Put encodes the values passed in, adding to the index as needed.
func (enc *Dict{{.Name}}Encoder) Put(in []{{.name}}) {
for _, val := range in {
enc.dictEncoder.Put(val)
}
}
// PutSpaced is the same as Put but for when the data being encoded has slots open for
// null values, using the bitmap provided to skip values as needed.
func (enc *Dict{{.Name}}Encoder) PutSpaced(in []{{.name}}, validBits []byte, validBitsOffset int64) {
utils.VisitSetBitRuns(validBits, validBitsOffset, int64(len(in)), func(pos, length int64) error {
for i := int64(0); i < length; i++ {
enc.dictEncoder.Put(in[i+pos])
}
return nil
})
}
{{end}}
// Dict{{.Name}}Decoder is a decoder for decoding dictionary encoded data for {{.name}} columns
type Dict{{.Name}}Decoder struct {
dictDecoder
}
// Type returns the underlying physical type that can be decoded with this decoder
func (Dict{{.Name}}Decoder) Type() parquet.Type {
return parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}
}
// Decode populates the passed in slice with min(len(out), remaining values) values,
// decoding using hte dictionary to get the actual values. Returns the number of values
// actually decoded and any error encountered.
func (d *Dict{{.Name}}Decoder) Decode(out []{{.name}}) (int, error) {
vals := utils.MinInt(len(out), d.nvals)
decoded, err := d.decode(out[:vals])
if err != nil {
return decoded, err
}
if vals != decoded {
return decoded, xerrors.New("parquet: dict eof exception")
}
d.nvals -= vals
return vals, nil
}
// Decode spaced is like Decode but will space out the data leaving slots for null values
// based on the provided bitmap.
func (d *Dict{{.Name}}Decoder) DecodeSpaced(out []{{.name}}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
vals := utils.MinInt(len(out), d.nvals)
decoded, err := d.decodeSpaced(out[:vals], nullCount, validBits, validBitsOffset)
if err != nil {
return decoded, err
}
if vals != decoded {
return decoded, xerrors.New("parquet: dict spaced eof exception")
}
d.nvals -= vals
return vals, nil
}
// {{.Name}}DictConverter is a helper for dictionary handling which is used for converting
// run length encoded indexes into the actual values that are stored in the dictionary index page.
type {{.Name}}DictConverter struct {
valueDecoder {{.Name}}Decoder
dict []{{.name}}
zeroVal {{.name}}
}
// ensure validates that we've decoded dictionary values up to the index
// provided so that we don't need to decode the entire dictionary at start.
func (dc *{{.Name}}DictConverter) ensure(idx utils.IndexType) error {
if len(dc.dict) <= int(idx) {
if cap(dc.dict) <= int(idx) {
val := make([]{{.name}}, int(idx+1)-len(dc.dict))
n, err := dc.valueDecoder.Decode(val)
if err != nil {
return err
}
dc.dict = append(dc.dict, val[:n]...)
} else {
cur := len(dc.dict)
n, err := dc.valueDecoder.Decode(dc.dict[cur : idx+1])
if err != nil {
return err
}
dc.dict = dc.dict[:cur+n]
}
}
return nil
}
// IsValid verifies that the set of indexes passed in are all valid indexes
// in the dictionary and if necessary decodes dictionary indexes up to the index
// requested.
func (dc *{{.Name}}DictConverter) IsValid(idxes ...utils.IndexType) bool {
min, max := utils.GetMinMaxInt32(*(*[]int32)(unsafe.Pointer(&idxes)))
dc.ensure(utils.IndexType(max))
return min >= 0 && int(min) < len(dc.dict) && int(max) >= 0 && int(max) < len(dc.dict)
}
// Fill populates the slice passed in entirely with the value at dictionary index indicated by val
func (dc *{{.Name}}DictConverter) Fill(out interface{}, val utils.IndexType) error {
o := out.([]{{.name}})
if err := dc.ensure(val); err != nil {
return err
}
o[0] = dc.dict[val]
for i := 1; i < len(o); i *= 2 {
copy(o[i:], o[:i])
}
return nil
}
// FillZero populates the entire slice of out with the zero value for {{.name}}
func (dc *{{.Name}}DictConverter) FillZero(out interface{}) {
o := out.([]{{.name}})
o[0] = dc.zeroVal
for i := 1; i < len(o); i *= 2 {
copy(o[i:], o[:i])
}
}
// Copy populates the slice provided with the values in the dictionary at the indexes
// in the vals slice.
func (dc *{{.Name}}DictConverter) Copy(out interface{}, vals []utils.IndexType) error {
o := out.([]{{.name}})
for idx, val := range vals {
o[idx] = dc.dict[val]
}
return nil
}
{{end}}
{{end}}
// NewDictConverter creates a dict converter of the appropriate type, using the passed in
// decoder as the decoder to decode the dictionary index.
func NewDictConverter(dict TypedDecoder) utils.DictionaryConverter {
switch dict.Type() {
{{ range .In }}{{ if and (ne .Name "Boolean") (ne .Name "Int96") -}}
case parquet.Types.{{if .physical }}{{.physical}}{{else}}{{.Name}}{{end}}:
return &{{.Name}}DictConverter{valueDecoder: dict.({{.Name}}Decoder), dict: make([]{{.name}}, 0, dict.ValuesLeft())}
{{ end }}{{ end -}}
default:
return nil
}
}
// helper function to get encoding traits object for the physical type indicated
func getEncodingTraits(t parquet.Type) EncoderTraits {
switch t {
{{ range .In -}}
case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}:
return {{.Name}}EncoderTraits
{{ end -}}
default:
return nil
}
}
// helper function to get decoding traits object for the physical type indicated
func getDecodingTraits(t parquet.Type) DecoderTraits {
switch t {
{{ range .In -}}
case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}:
return {{.Name}}DecoderTraits
{{ end -}}
default:
return nil
}
}