| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package encoding |
| |
| import ( |
| "github.com/apache/arrow/go/v6/parquet" |
| "github.com/apache/arrow/go/v6/parquet/schema" |
| format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet" |
| "github.com/apache/arrow/go/v6/arrow" |
| "github.com/apache/arrow/go/v6/parquet/internal/utils" |
| ) |
| |
| // fully typed encoder interfaces to enable writing against encoder/decoders |
| // without having to care about what encoding type is actually being used. |
| |
| var ( |
| {{range .In}} |
| {{.Name}}EncoderTraits {{.lower}}EncoderTraits |
| {{.Name}}DecoderTraits {{.lower}}DecoderTraits |
| {{- end}} |
| ) |
| |
| {{range .In}} |
| // {{.Name}}Encoder is the interface for all encoding types that implement encoding |
| // {{.name}} values. |
| type {{.Name}}Encoder interface { |
| TypedEncoder |
| Put([]{{.name}}) |
| PutSpaced([]{{.name}}, []byte, int64) |
| } |
| |
| // {{.Name}}Decoder is the interface for all encoding types that implement decoding |
| // {{.name}} values. |
| type {{.Name}}Decoder interface { |
| TypedDecoder |
| Decode([]{{.name}}) (int, error) |
| DecodeSpaced([]{{.name}}, int, []byte, int64) (int, error) |
| } |
| |
| // the {{.lower}}EncoderTraits struct is used to make it easy to create encoders and decoders based on type |
| type {{.lower}}EncoderTraits struct{} |
| |
| // Encoder returns an encoder for {{.lower}} type data, using the specified encoding type and whether or not |
| // it should be dictionary encoded. |
| {{- if or (eq .Name "Boolean") (eq .Name "Int96")}} |
| // dictionary encoding does not exist for this type and Encoder will panic if useDict is true |
| {{- end }} |
| func ({{.lower}}EncoderTraits) Encoder(e format.Encoding, useDict bool, descr *schema.Column, mem memory.Allocator) TypedEncoder { |
| if useDict { |
| {{- if or (eq .Name "Boolean") (eq .Name "Int96")}} |
| panic("parquet: no {{.name}} dictionary encoding") |
| {{- else}} |
| return &Dict{{.Name}}Encoder{newDictEncoderBase(descr, New{{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}}{{.Name}}Dictionary(){{else}}BinaryDictionary(mem){{end}}, mem)} |
| {{- end}} |
| } |
| |
| switch e { |
| case format.Encoding_PLAIN: |
| return &Plain{{.Name}}Encoder{encoder: newEncoderBase(e, descr, mem)} |
| {{- if or (eq .Name "Int32") (eq .Name "Int64")}} |
| case format.Encoding_DELTA_BINARY_PACKED: |
| return DeltaBitPack{{.Name}}Encoder{&deltaBitPackEncoder{ |
| encoder: newEncoderBase(e, descr, mem)}} |
| {{- end}} |
| {{- if eq .Name "ByteArray"}} |
| case format.Encoding_DELTA_LENGTH_BYTE_ARRAY: |
| return &DeltaLengthByteArrayEncoder{ |
| encoder: newEncoderBase(e, descr, mem), |
| lengthEncoder: &DeltaBitPackInt32Encoder{ |
| &deltaBitPackEncoder{encoder: newEncoderBase(e, descr, mem)}}, |
| } |
| case format.Encoding_DELTA_BYTE_ARRAY: |
| return &DeltaByteArrayEncoder{ |
| encoder: newEncoderBase(e, descr, mem), |
| } |
| {{- end}} |
| default: |
| panic("unimplemented encoding type") |
| } |
| } |
| |
| // {{.lower}}DecoderTraits is a helper struct for providing information regardless of the type |
| // and used as a generic way to create a Decoder or Dictionary Decoder for {{.lower}} values |
| type {{.lower}}DecoderTraits struct{} |
| |
| // BytesRequired returns the number of bytes required to store n {{.lower}} values. |
| func ({{.lower}}DecoderTraits) BytesRequired(n int) int { |
| return {{.prefix}}.{{.Name}}Traits.BytesRequired(n) |
| } |
| |
| // Decoder returns a decoder for {{.lower}} typed data of the requested encoding type if available |
| func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column, useDict bool, mem memory.Allocator) TypedDecoder { |
| if useDict { |
| {{- if and (ne .Name "Boolean") (ne .Name "Int96")}} |
| return &Dict{{.Name}}Decoder{dictDecoder{decoder: newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}} |
| {{- else}} |
| panic("dictionary decoding unimplemented for {{.lower}}") |
| {{- end}} |
| } |
| |
| switch e { |
| case parquet.Encodings.Plain: |
| return &Plain{{.Name}}Decoder{decoder: newDecoderBase(format.Encoding(e), descr)} |
| {{- if or (eq .Name "Int32") (eq .Name "Int64")}} |
| case parquet.Encodings.DeltaBinaryPacked: |
| if mem == nil { |
| mem = memory.DefaultAllocator |
| } |
| return &DeltaBitPack{{.Name}}Decoder{ |
| deltaBitPackDecoder: &deltaBitPackDecoder{ |
| decoder: newDecoderBase(format.Encoding(e), descr), |
| mem: mem, |
| }} |
| {{- end}} |
| {{- if eq .Name "ByteArray"}} |
| case parquet.Encodings.DeltaLengthByteArray: |
| if mem == nil { |
| mem = memory.DefaultAllocator |
| } |
| return &DeltaLengthByteArrayDecoder{ |
| decoder: newDecoderBase(format.Encoding(e), descr), |
| mem: mem, |
| } |
| case parquet.Encodings.DeltaByteArray: |
| if mem == nil { |
| mem = memory.DefaultAllocator |
| } |
| return &DeltaByteArrayDecoder{ |
| DeltaLengthByteArrayDecoder: &DeltaLengthByteArrayDecoder{ |
| decoder: newDecoderBase(format.Encoding(e), descr), |
| mem: mem, |
| }} |
| {{- end}} |
| default: |
| panic("unimplemented encoding type") |
| } |
| } |
| |
| {{if and (ne .Name "Boolean") (ne .Name "Int96")}} |
| // Dict{{.Name}}Encoder is an encoder for {{.name}} data using dictionary encoding |
| type Dict{{.Name}}Encoder struct { |
| dictEncoder |
| } |
| |
| // Type returns the underlying physical type that can be encoded with this encoder |
| func (enc *Dict{{.Name}}Encoder) Type() parquet.Type { |
| return parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} |
| } |
| |
| {{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}} |
| // Put encodes the values passed in, adding to the index as needed. |
| func (enc *Dict{{.Name}}Encoder) Put(in []{{.name}}) { |
| for _, val := range in { |
| enc.dictEncoder.Put(val) |
| } |
| } |
| |
| // PutSpaced is the same as Put but for when the data being encoded has slots open for |
| // null values, using the bitmap provided to skip values as needed. |
| func (enc *Dict{{.Name}}Encoder) PutSpaced(in []{{.name}}, validBits []byte, validBitsOffset int64) { |
| utils.VisitSetBitRuns(validBits, validBitsOffset, int64(len(in)), func(pos, length int64) error { |
| for i := int64(0); i < length; i++ { |
| enc.dictEncoder.Put(in[i+pos]) |
| } |
| return nil |
| }) |
| } |
| {{end}} |
| |
| // Dict{{.Name}}Decoder is a decoder for decoding dictionary encoded data for {{.name}} columns |
| type Dict{{.Name}}Decoder struct { |
| dictDecoder |
| } |
| |
| // Type returns the underlying physical type that can be decoded with this decoder |
| func (Dict{{.Name}}Decoder) Type() parquet.Type { |
| return parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} |
| } |
| |
| // Decode populates the passed in slice with min(len(out), remaining values) values, |
| // decoding using hte dictionary to get the actual values. Returns the number of values |
| // actually decoded and any error encountered. |
| func (d *Dict{{.Name}}Decoder) Decode(out []{{.name}}) (int, error) { |
| vals := utils.MinInt(len(out), d.nvals) |
| decoded, err := d.decode(out[:vals]) |
| if err != nil { |
| return decoded, err |
| } |
| if vals != decoded { |
| return decoded, xerrors.New("parquet: dict eof exception") |
| } |
| d.nvals -= vals |
| return vals, nil |
| } |
| |
| // Decode spaced is like Decode but will space out the data leaving slots for null values |
| // based on the provided bitmap. |
| func (d *Dict{{.Name}}Decoder) DecodeSpaced(out []{{.name}}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { |
| vals := utils.MinInt(len(out), d.nvals) |
| decoded, err := d.decodeSpaced(out[:vals], nullCount, validBits, validBitsOffset) |
| if err != nil { |
| return decoded, err |
| } |
| if vals != decoded { |
| return decoded, xerrors.New("parquet: dict spaced eof exception") |
| } |
| d.nvals -= vals |
| return vals, nil |
| } |
| |
| // {{.Name}}DictConverter is a helper for dictionary handling which is used for converting |
| // run length encoded indexes into the actual values that are stored in the dictionary index page. |
| type {{.Name}}DictConverter struct { |
| valueDecoder {{.Name}}Decoder |
| dict []{{.name}} |
| zeroVal {{.name}} |
| } |
| |
| // ensure validates that we've decoded dictionary values up to the index |
| // provided so that we don't need to decode the entire dictionary at start. |
| func (dc *{{.Name}}DictConverter) ensure(idx utils.IndexType) error { |
| if len(dc.dict) <= int(idx) { |
| if cap(dc.dict) <= int(idx) { |
| val := make([]{{.name}}, int(idx+1)-len(dc.dict)) |
| n, err := dc.valueDecoder.Decode(val) |
| if err != nil { |
| return err |
| } |
| dc.dict = append(dc.dict, val[:n]...) |
| } else { |
| cur := len(dc.dict) |
| n, err := dc.valueDecoder.Decode(dc.dict[cur : idx+1]) |
| if err != nil { |
| return err |
| } |
| dc.dict = dc.dict[:cur+n] |
| } |
| } |
| return nil |
| } |
| |
| // IsValid verifies that the set of indexes passed in are all valid indexes |
| // in the dictionary and if necessary decodes dictionary indexes up to the index |
| // requested. |
| func (dc *{{.Name}}DictConverter) IsValid(idxes ...utils.IndexType) bool { |
| min, max := utils.GetMinMaxInt32(*(*[]int32)(unsafe.Pointer(&idxes))) |
| dc.ensure(utils.IndexType(max)) |
| |
| return min >= 0 && int(min) < len(dc.dict) && int(max) >= 0 && int(max) < len(dc.dict) |
| } |
| |
| // Fill populates the slice passed in entirely with the value at dictionary index indicated by val |
| func (dc *{{.Name}}DictConverter) Fill(out interface{}, val utils.IndexType) error { |
| o := out.([]{{.name}}) |
| if err := dc.ensure(val); err != nil { |
| return err |
| } |
| o[0] = dc.dict[val] |
| for i := 1; i < len(o); i *= 2 { |
| copy(o[i:], o[:i]) |
| } |
| return nil |
| } |
| |
| // FillZero populates the entire slice of out with the zero value for {{.name}} |
| func (dc *{{.Name}}DictConverter) FillZero(out interface{}) { |
| o := out.([]{{.name}}) |
| o[0] = dc.zeroVal |
| for i := 1; i < len(o); i *= 2 { |
| copy(o[i:], o[:i]) |
| } |
| } |
| |
| // Copy populates the slice provided with the values in the dictionary at the indexes |
| // in the vals slice. |
| func (dc *{{.Name}}DictConverter) Copy(out interface{}, vals []utils.IndexType) error { |
| o := out.([]{{.name}}) |
| for idx, val := range vals { |
| o[idx] = dc.dict[val] |
| } |
| return nil |
| } |
| {{end}} |
| |
| {{end}} |
| |
| // NewDictConverter creates a dict converter of the appropriate type, using the passed in |
| // decoder as the decoder to decode the dictionary index. |
| func NewDictConverter(dict TypedDecoder) utils.DictionaryConverter { |
| switch dict.Type() { |
| {{ range .In }}{{ if and (ne .Name "Boolean") (ne .Name "Int96") -}} |
| case parquet.Types.{{if .physical }}{{.physical}}{{else}}{{.Name}}{{end}}: |
| return &{{.Name}}DictConverter{valueDecoder: dict.({{.Name}}Decoder), dict: make([]{{.name}}, 0, dict.ValuesLeft())} |
| {{ end }}{{ end -}} |
| default: |
| return nil |
| } |
| } |
| |
| // helper function to get encoding traits object for the physical type indicated |
| func getEncodingTraits(t parquet.Type) EncoderTraits { |
| switch t { |
| {{ range .In -}} |
| case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}: |
| return {{.Name}}EncoderTraits |
| {{ end -}} |
| default: |
| return nil |
| } |
| } |
| |
| // helper function to get decoding traits object for the physical type indicated |
| func getDecodingTraits(t parquet.Type) DecoderTraits { |
| switch t { |
| {{ range .In -}} |
| case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}: |
| return {{.Name}}DecoderTraits |
| {{ end -}} |
| default: |
| return nil |
| } |
| } |