blob: 82a3c7d1444c4292d8a2078f1525d025a739ca3b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metadata
import (
"github.com/apache/arrow/go/v6/parquet"
"github.com/apache/arrow/go/v6/parquet/schema"
"github.com/apache/arrow/go/v6/parquet/internal/utils"
"github.com/apache/arrow/go/v6/parquet/internal/encoding"
)
{{range .In}}
type minmaxPair{{.Name}} [2]{{.name}}
// {{.Name}}Statistics is the typed interface for managing stats for a column
// of {{.Name}} type.
type {{.Name}}Statistics struct {
statistics
min {{.name}}
max {{.name}}
bitSetReader utils.SetBitRunReader
}
// New{{.Name}}Statistics constructs an appropriate stat object type using the
// given column descriptor and allocator.
//
// Panics if the physical type of descr is not parquet.Type.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}
func New{{.Name}}Statistics(descr *schema.Column, mem memory.Allocator) *{{.Name}}Statistics {
if descr.PhysicalType() != parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} {
panic(xerrors.Errorf("parquet: invalid type %s for constructing a {{.Name}} stat object", descr.PhysicalType()))
}
return &{{.Name}}Statistics{
statistics: statistics{
descr: descr,
hasNullCount: true,
hasDistinctCount: true,
order: descr.SortOrder(),
encoder: encoding.NewEncoder(descr.PhysicalType(), parquet.Encodings.Plain, false, descr, mem),
mem: mem,
},
{{if eq .Name "ByteArray"}}
min: make([]byte, 0),
max: make([]byte, 0),
{{end}}
}
}
// New{{.Name}}StatisticsFromEncoded will construct a propertly typed statistics object
// initializing it with the provided information.
func New{{.Name}}StatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalues int64, encoded StatProvider) *{{.Name}}Statistics {
ret := New{{.Name}}Statistics(descr, mem)
ret.nvalues += nvalues
if encoded.IsSetNullCount() {
ret.incNulls(encoded.GetNullCount())
}
if encoded.IsSetDistinctCount() {
ret.incDistinct(encoded.GetDistinctCount())
}
encodedMin := encoded.GetMin()
if encodedMin != nil && len(encodedMin) > 0 {
ret.min = ret.plainDecode(encodedMin)
}
encodedMax := encoded.GetMax()
if encodedMax != nil && len(encodedMax) > 0 {
ret.max = ret.plainDecode(encodedMax)
}
ret.hasMinMax = encoded.IsSetMax() || encoded.IsSetMin()
return ret
}
func (s *{{.Name}}Statistics) plainEncode(src {{.name}}) []byte {
{{- if eq .Name "ByteArray"}}
return src
{{- else}}
s.encoder.(encoding.{{.Name}}Encoder).Put([]{{.name}}{src})
buf, err := s.encoder.FlushValues()
if err != nil {
panic(err) // recovered by Encode
}
defer buf.Release()
out := make([]byte, buf.Len())
copy(out, buf.Bytes())
return out
{{- end}}
}
func (s *{{.Name}}Statistics) plainDecode(src []byte) {{.name}} {
{{- if eq .Name "ByteArray"}}
return src
{{- else}}
var buf [1]{{.name}}
decoder := encoding.NewDecoder(s.descr.PhysicalType(), parquet.Encodings.Plain, s.descr, s.mem)
decoder.SetData(1, src)
decoder.(encoding.{{.Name}}Decoder).Decode(buf[:])
return buf[0]
{{- end}}
}
{{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}}
func (s *{{.Name}}Statistics) minval(a, b {{.name}}) {{.name}} {
if s.less(a, b) {
return a
}
return b
}
func (s *{{.Name}}Statistics) maxval(a, b {{.name}}) {{.name}} {
if s.less(a, b) {
return b
}
return a
}
{{else}}
func (s *{{.Name}}Statistics) minval(a, b {{.name}}) {{.name}} {
switch {
case a == nil:
return b
case b == nil:
return a
case s.less(a, b):
return a
default:
return b
}
}
func (s *{{.Name}}Statistics) maxval(a, b {{.name}}) {{.name}} {
switch {
case a == nil:
return b
case b == nil:
return a
case s.less(a, b):
return b
default:
return a
}
}
{{end}}
// MinMaxEqual returns true if both stat objects have the same Min and Max values
func (s *{{.Name}}Statistics) MinMaxEqual(rhs *{{.Name}}Statistics) bool {
return s.equal(s.min, rhs.min) && s.equal(s.max, rhs.max)
}
// Equals returns true only if both objects are the same type, have the same min and
// max values, null count, distinct count and number of values.
func (s *{{.Name}}Statistics) Equals(other TypedStatistics) bool {
if s.Type() != other.Type() {
return false
}
rhs, ok := other.(*{{.Name}}Statistics)
if !ok {
return false
}
if s.HasMinMax() != rhs.HasMinMax() { return false }
return (s.hasMinMax && s.MinMaxEqual(rhs)) &&
s.NullCount() == rhs.NullCount() &&
s.DistinctCount() == rhs.DistinctCount() &&
s.NumValues() == rhs.NumValues()
}
{{if or (eq .name "float32") (eq .name "float64")}}
func (s *{{.Name}}Statistics) coalesce(val, fallback {{.name}}) {{.name}} {
if math.IsNaN(float64(val)) {
return fallback
}
return val
}
{{end}}
func (s *{{.Name}}Statistics) getMinMax(values []{{.name}}) (min, max {{.name}}) {
{{- if or (eq .name "int32") (eq .name "int64")}}
if s.order == schema.SortSIGNED {
min, max = utils.GetMinMax{{.Name}}(values)
} else {
umin, umax := utils.GetMinMaxU{{.name}}(arrow.U{{.name}}Traits.CastFromBytes(arrow.{{.Name}}Traits.CastToBytes(values)))
min, max = {{.name}}(umin), {{.name}}(umax)
}
{{- else}}
defMin := s.defaultMin()
defMax := s.defaultMax()
min = defMin
max = defMax
for _, v := range values {
{{- if or (eq .name "float32") (eq .name "float64") }}
min = s.minval(min, s.coalesce(v, defMin))
max = s.maxval(max, s.coalesce(v, defMax))
{{- else}}
min = s.minval(min, v)
max = s.maxval(max, v)
{{- end }}
}
{{- end}}
return
}
func (s *{{.Name}}Statistics) getMinMaxSpaced(values []{{.name}}, validBits []byte, validBitsOffset int64) (min, max {{.name}}) {
min = s.defaultMin()
max = s.defaultMax()
{{- if or (eq .name "int32") (eq .name "int64")}}
var fn func([]{{.name}}) ({{.name}}, {{.name}})
if s.order == schema.SortSIGNED {
fn = utils.GetMinMax{{.Name}}
} else {
fn = func(v []{{.name}}) ({{.name}}, {{.name}}) {
umin, umax := utils.GetMinMaxU{{.name}}(arrow.U{{.name}}Traits.CastFromBytes(arrow.{{.Name}}Traits.CastToBytes(values)))
return {{.name}}(umin), {{.name}}(umax)
}
}
{{- end}}
if s.bitSetReader == nil {
s.bitSetReader = utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(values)))
} else {
s.bitSetReader.Reset(validBits, validBitsOffset, int64(len(values)))
}
for {
run := s.bitSetReader.NextRun()
if run.Length == 0 {
break
}
{{- if or (eq .name "int32") (eq .name "int64")}}
localMin, localMax := fn(values[int(run.Pos):int(run.Pos+run.Length)])
if min > localMin {
min = localMin
}
if max < localMax {
max = localMax
}
{{- else}}
for _, v := range values[int(run.Pos):int(run.Pos+run.Length)] {
{{- if or (eq .name "float32") (eq .name "float64") }}
min = s.minval(min, coalesce(v, s.defaultMin()).({{.name}}))
max = s.maxval(max, coalesce(v, s.defaultMax()).({{.name}}))
{{- else}}
min = s.minval(min, v)
max = s.maxval(max, v)
{{- end }}
}
{{- end}}
}
return
}
func (s *{{.Name}}Statistics) Min() {{.name}} { return s.min }
func (s *{{.Name}}Statistics) Max() {{.name}} { return s.max }
// Merge merges the stats from other into this stat object, updating
// the null count, distinct count, number of values and the min/max if
// appropriate.
func (s *{{.Name}}Statistics) Merge(other TypedStatistics) {
rhs, ok := other.(*{{.Name}}Statistics)
if !ok {
panic("incompatible stat type merge")
}
s.statistics.merge(rhs)
if rhs.HasMinMax() {
s.SetMinMax(rhs.Min(), rhs.Max())
}
}
// Update is used to add more values to the current stat object, finding the
// min and max values etc.
func (s *{{.Name}}Statistics) Update(values []{{.name}}, numNull int64) {
s.incNulls(numNull)
s.nvalues += int64(len(values))
if len(values) == 0 {
return
}
s.SetMinMax(s.getMinMax(values))
}
// UpdateSpaced is just like Update, but for spaced values using validBits to determine
// and skip null values.
func (s *{{.Name}}Statistics) UpdateSpaced(values []{{.name}}, validBits []byte, validBitsOffset, numNull int64) {
s.incNulls(numNull)
notnull := int64(len(values)) - numNull
s.nvalues += notnull
if notnull == 0 {
return
}
s.SetMinMax(s.getMinMaxSpaced(values, validBits, validBitsOffset))
}
// SetMinMax updates the min and max values only if they are not currently set
// or if argMin is less than the current min / argMax is greater than the current max
func (s *{{.Name}}Statistics) SetMinMax(argMin, argMax {{.name}}) {
maybeMinMax := s.cleanStat([2]{{.name}}{argMin, argMax})
if maybeMinMax == nil {
return
}
min := (*maybeMinMax)[0]
max := (*maybeMinMax)[1]
if !s.hasMinMax {
s.hasMinMax = true
s.min = min
s.max = max
} else {
if !s.less(s.min, min) {
s.min = min
}
if s.less(s.max, max) {
s.max = max
}
}
}
// EncodeMin returns the encoded min value with plain encoding.
//
// ByteArray stats do not include the length in the encoding.
func (s *{{.Name}}Statistics) EncodeMin() []byte {
if s.HasMinMax() {
return s.plainEncode(s.min)
}
return nil
}
// EncodeMax returns the current encoded max value with plain encoding
//
// ByteArray stats do not include the length in the encoding
func (s *{{.Name}}Statistics) EncodeMax() []byte{
if s.HasMinMax() {
return s.plainEncode(s.max)
}
return nil
}
// Encode returns a populated EncodedStatistics object
func (s *{{.Name}}Statistics) Encode() (enc EncodedStatistics, err error) {
defer func() {
if r := recover(); r != nil {
switch r := r.(type) {
case error:
err = r
case string:
err = xerrors.New(r)
default:
err = xerrors.Errorf("unknown error type thrown from panic: %v", r)
}
}
}()
if s.HasMinMax() {
enc.SetMax(s.EncodeMax())
enc.SetMin(s.EncodeMin())
}
if s.HasNullCount() {
enc.SetNullCount(s.NullCount())
}
if s.HasDistinctCount() {
enc.SetDistinctCount(s.DistinctCount())
}
return
}
{{end}}
// NewStatistics uses the type in the column descriptor to construct the appropriate
// typed stats object. If mem is nil, then memory.DefaultAllocator will be used.
func NewStatistics(descr *schema.Column, mem memory.Allocator) TypedStatistics {
if mem == nil {
mem = memory.DefaultAllocator
}
switch descr.PhysicalType() {
{{- range .In}}
case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}:
return New{{.Name}}Statistics(descr, mem)
{{- end}}
default:
panic("not implemented")
}
}
// NewStatisticsFromEncoded uses the provided information to initialize a typed stat object
// by checking the type of the provided column descriptor.
//
// If mem is nil, then memory.DefaultAllocator is used.
func NewStatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalues int64, encoded StatProvider) TypedStatistics {
if mem == nil {
mem = memory.DefaultAllocator
}
switch descr.PhysicalType() {
{{- range .In}}
case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}:
return New{{.Name}}StatisticsFromEncoded(descr, mem, nvalues, encoded)
{{- end}}
default:
panic("not implemented")
}
}