| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package encoding |
| |
| import ( |
| "encoding/binary" |
| "fmt" |
| |
| "github.com/apache/arrow/go/v10/arrow" |
| "github.com/apache/arrow/go/v10/parquet" |
| "github.com/apache/arrow/go/v10/internal/utils" |
| "github.com/apache/arrow/go/v10/internal/bitutils" |
| ) |
| |
| var ( |
| {{range .In}} |
| {{if and (ne .Name "Boolean") (ne .Name "ByteArray") (ne .Name "FixedLenByteArray") -}} |
| write{{.Name}}LE func(*encoder, []{{.name}}) |
| copyFrom{{.Name}}LE func(dst []{{.name}}, src []byte) |
| {{- end}} |
| {{- end}} |
| ) |
| |
| func init() { |
| // int96 is already internally represented as little endian data |
| // no need to have special behavior on big endian architectures |
| // for read/write, consumers will need to be aware of the fact |
| // that it is internally 12 bytes little endian when attempting |
| // to utilize it. |
| writeInt96LE = func(e *encoder, in []parquet.Int96) { |
| e.append(parquet.Int96Traits.CastToBytes(in)) |
| } |
| copyFromInt96LE = func(dst []parquet.Int96, src []byte) { |
| copy(parquet.Int96Traits.CastToBytes(dst), src) |
| } |
| |
| if endian.IsBigEndian { |
| {{- range .In}} |
| {{- if and (ne .Name "Boolean") (ne .Name "ByteArray") (ne .Name "FixedLenByteArray") (ne .Name "Int96")}} |
| write{{.Name}}LE = func(e *encoder, in []{{.name}}) { |
| binary.Write(e.sink, binary.LittleEndian, in) |
| } |
| copyFrom{{.Name}}LE = func(dst []{{.name}}, src []byte) { |
| r := bytes.NewReader(src) |
| binary.Read(r, binary.LittleEndian, &dst) |
| } |
| {{- end -}} |
| {{- end}} |
| } else { |
| {{- range .In}} |
| {{- if and (ne .Name "Boolean") (ne .Name "ByteArray") (ne .Name "FixedLenByteArray") (ne .Name "Int96")}} |
| write{{.Name}}LE = func(e *encoder, in []{{.name}}) { |
| e.append({{.prefix}}.{{.Name}}Traits.CastToBytes(in)) |
| } |
| copyFrom{{.Name}}LE = func(dst []{{.name}}, src []byte) { |
| copy({{.prefix}}.{{.Name}}Traits.CastToBytes(dst), src) |
| } |
| {{- end -}} |
| {{- end}} |
| } |
| } |
| |
| {{range .In}} |
| {{if and (ne .Name "Boolean") (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}} |
| // Plain{{.Name}}Encoder is an encoder for {{.name}} values using Plain Encoding |
| // which in general is just storing the values as raw bytes of the appropriate size |
| type Plain{{.Name}}Encoder struct { |
| encoder |
| |
| bitSetReader bitutils.SetBitRunReader |
| } |
| |
| // Put encodes a slice of values into the underlying buffer |
| func (enc *Plain{{.Name}}Encoder) Put(in []{{.name}}) { |
| write{{.Name}}LE(&enc.encoder, in) |
| } |
| |
| // PutSpaced encodes a slice of values into the underlying buffer which are spaced out |
| // including null values defined by the validBits bitmap starting at a given bit offset. |
| // the values are first compressed by having the null slots removed before writing to the buffer |
| func (enc *Plain{{.Name}}Encoder) PutSpaced(in []{{.name}}, validBits []byte, validBitsOffset int64) { |
| nbytes := {{.prefix}}.{{.Name}}Traits.BytesRequired(len(in)) |
| enc.ReserveForWrite(nbytes) |
| |
| if enc.bitSetReader == nil { |
| enc.bitSetReader = bitutils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(in))) |
| } else { |
| enc.bitSetReader.Reset(validBits, validBitsOffset, int64(len(in))) |
| } |
| |
| for { |
| run := enc.bitSetReader.NextRun() |
| if run.Length == 0 { |
| break |
| } |
| enc.Put(in[int(run.Pos):int(run.Pos+run.Length)]) |
| } |
| } |
| |
| // Type returns the underlying physical type this encoder is able to encode |
| func (Plain{{.Name}}Encoder) Type() parquet.Type { |
| return parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} |
| } |
| |
| // Plain{{.Name}}Decoder is a decoder specifically for decoding Plain Encoding data |
| // of {{.name}} type. |
| type Plain{{.Name}}Decoder struct { |
| decoder |
| |
| bitSetReader bitutils.SetBitRunReader |
| } |
| |
| // Type returns the physical type this decoder is able to decode for |
| func (Plain{{.Name}}Decoder) Type() parquet.Type { |
| return parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} |
| } |
| |
| // Decode populates the given slice with values from the data to be decoded, |
| // decoding the min(len(out), remaining values). |
| // It returns the number of values actually decoded and any error encountered. |
| func (dec *Plain{{.Name}}Decoder) Decode(out []{{.name}}) (int, error) { |
| max := utils.MinInt(len(out), dec.nvals) |
| nbytes := int64(max) * int64({{.prefix}}.{{.Name}}SizeBytes) |
| if nbytes > int64(len(dec.data)) || nbytes > math.MaxInt32 { |
| return 0, fmt.Errorf("parquet: eof exception decode plain {{.Name}}, nvals: %d, nbytes: %d, datalen: %d", dec.nvals, nbytes, len(dec.data)) |
| } |
| |
| copyFrom{{.Name}}LE(out, dec.data[:nbytes]) |
| dec.data = dec.data[nbytes:] |
| dec.nvals -= max |
| return max, nil |
| } |
| |
| // DecodeSpaced is the same as decode, except it expands the data out to leave spaces for null values |
| // as defined by the bitmap provided. |
| func (dec *Plain{{.Name}}Decoder) DecodeSpaced(out []{{.name}}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { |
| toread := len(out) - nullCount |
| values, err := dec.Decode(out[:toread]) |
| if err != nil { |
| return 0, err |
| } |
| if values != toread { |
| return 0, xerrors.New("parquet: number of values / definition levels read did not match") |
| } |
| |
| nvalues := len(out) |
| if nullCount == 0 { |
| return nvalues, nil |
| } |
| |
| idxDecode := nvalues - nullCount |
| if dec.bitSetReader == nil { |
| dec.bitSetReader = bitutils.NewReverseSetBitRunReader(validBits, validBitsOffset, int64(nvalues)) |
| } else { |
| dec.bitSetReader.Reset(validBits, validBitsOffset, int64(nvalues)) |
| } |
| |
| for { |
| run := dec.bitSetReader.NextRun() |
| if run.Length == 0 { |
| break |
| } |
| |
| idxDecode -= int(run.Length) |
| copy(out[int(run.Pos):], out[idxDecode:idxDecode+int(run.Length)]) |
| } |
| return nvalues, nil |
| } |
| {{end}} |
| {{end}} |