| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package metadata |
| |
| import ( |
| "github.com/apache/arrow/go/v6/parquet" |
| "github.com/apache/arrow/go/v6/parquet/schema" |
| "github.com/apache/arrow/go/v6/parquet/internal/utils" |
| "github.com/apache/arrow/go/v6/parquet/internal/encoding" |
| ) |
| |
| {{range .In}} |
| type minmaxPair{{.Name}} [2]{{.name}} |
| |
| // {{.Name}}Statistics is the typed interface for managing stats for a column |
| // of {{.Name}} type. |
| type {{.Name}}Statistics struct { |
| statistics |
| min {{.name}} |
| max {{.name}} |
| |
| bitSetReader utils.SetBitRunReader |
| } |
| |
| // New{{.Name}}Statistics constructs an appropriate stat object type using the |
| // given column descriptor and allocator. |
| // |
| // Panics if the physical type of descr is not parquet.Type.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} |
| func New{{.Name}}Statistics(descr *schema.Column, mem memory.Allocator) *{{.Name}}Statistics { |
| if descr.PhysicalType() != parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} { |
| panic(xerrors.Errorf("parquet: invalid type %s for constructing a {{.Name}} stat object", descr.PhysicalType())) |
| } |
| |
| return &{{.Name}}Statistics{ |
| statistics: statistics{ |
| descr: descr, |
| hasNullCount: true, |
| hasDistinctCount: true, |
| order: descr.SortOrder(), |
| encoder: encoding.NewEncoder(descr.PhysicalType(), parquet.Encodings.Plain, false, descr, mem), |
| mem: mem, |
| }, |
| {{if eq .Name "ByteArray"}} |
| min: make([]byte, 0), |
| max: make([]byte, 0), |
| {{end}} |
| } |
| } |
| |
| // New{{.Name}}StatisticsFromEncoded will construct a propertly typed statistics object |
| // initializing it with the provided information. |
| func New{{.Name}}StatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalues int64, encoded StatProvider) *{{.Name}}Statistics { |
| ret := New{{.Name}}Statistics(descr, mem) |
| ret.nvalues += nvalues |
| if encoded.IsSetNullCount() { |
| ret.incNulls(encoded.GetNullCount()) |
| } |
| if encoded.IsSetDistinctCount() { |
| ret.incDistinct(encoded.GetDistinctCount()) |
| } |
| |
| encodedMin := encoded.GetMin() |
| if encodedMin != nil && len(encodedMin) > 0 { |
| ret.min = ret.plainDecode(encodedMin) |
| } |
| encodedMax := encoded.GetMax() |
| if encodedMax != nil && len(encodedMax) > 0 { |
| ret.max = ret.plainDecode(encodedMax) |
| } |
| ret.hasMinMax = encoded.IsSetMax() || encoded.IsSetMin() |
| return ret |
| } |
| |
| func (s *{{.Name}}Statistics) plainEncode(src {{.name}}) []byte { |
| {{- if eq .Name "ByteArray"}} |
| return src |
| {{- else}} |
| s.encoder.(encoding.{{.Name}}Encoder).Put([]{{.name}}{src}) |
| buf, err := s.encoder.FlushValues() |
| if err != nil { |
| panic(err) // recovered by Encode |
| } |
| defer buf.Release() |
| |
| out := make([]byte, buf.Len()) |
| copy(out, buf.Bytes()) |
| return out |
| {{- end}} |
| } |
| |
| func (s *{{.Name}}Statistics) plainDecode(src []byte) {{.name}} { |
| {{- if eq .Name "ByteArray"}} |
| return src |
| {{- else}} |
| var buf [1]{{.name}} |
| |
| decoder := encoding.NewDecoder(s.descr.PhysicalType(), parquet.Encodings.Plain, s.descr, s.mem) |
| decoder.SetData(1, src) |
| decoder.(encoding.{{.Name}}Decoder).Decode(buf[:]) |
| return buf[0] |
| {{- end}} |
| } |
| |
| {{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}} |
| func (s *{{.Name}}Statistics) minval(a, b {{.name}}) {{.name}} { |
| if s.less(a, b) { |
| return a |
| } |
| return b |
| } |
| |
| func (s *{{.Name}}Statistics) maxval(a, b {{.name}}) {{.name}} { |
| if s.less(a, b) { |
| return b |
| } |
| return a |
| } |
| {{else}} |
| func (s *{{.Name}}Statistics) minval(a, b {{.name}}) {{.name}} { |
| switch { |
| case a == nil: |
| return b |
| case b == nil: |
| return a |
| case s.less(a, b): |
| return a |
| default: |
| return b |
| } |
| } |
| |
| func (s *{{.Name}}Statistics) maxval(a, b {{.name}}) {{.name}} { |
| switch { |
| case a == nil: |
| return b |
| case b == nil: |
| return a |
| case s.less(a, b): |
| return b |
| default: |
| return a |
| } |
| } |
| {{end}} |
| |
| // MinMaxEqual returns true if both stat objects have the same Min and Max values |
| func (s *{{.Name}}Statistics) MinMaxEqual(rhs *{{.Name}}Statistics) bool { |
| return s.equal(s.min, rhs.min) && s.equal(s.max, rhs.max) |
| } |
| |
| // Equals returns true only if both objects are the same type, have the same min and |
| // max values, null count, distinct count and number of values. |
| func (s *{{.Name}}Statistics) Equals(other TypedStatistics) bool { |
| if s.Type() != other.Type() { |
| return false |
| } |
| rhs, ok := other.(*{{.Name}}Statistics) |
| if !ok { |
| return false |
| } |
| |
| if s.HasMinMax() != rhs.HasMinMax() { return false } |
| return (s.hasMinMax && s.MinMaxEqual(rhs)) && |
| s.NullCount() == rhs.NullCount() && |
| s.DistinctCount() == rhs.DistinctCount() && |
| s.NumValues() == rhs.NumValues() |
| } |
| |
| {{if or (eq .name "float32") (eq .name "float64")}} |
| func (s *{{.Name}}Statistics) coalesce(val, fallback {{.name}}) {{.name}} { |
| if math.IsNaN(float64(val)) { |
| return fallback |
| } |
| return val |
| } |
| {{end}} |
| |
| func (s *{{.Name}}Statistics) getMinMax(values []{{.name}}) (min, max {{.name}}) { |
| {{- if or (eq .name "int32") (eq .name "int64")}} |
| if s.order == schema.SortSIGNED { |
| min, max = utils.GetMinMax{{.Name}}(values) |
| } else { |
| umin, umax := utils.GetMinMaxU{{.name}}(arrow.U{{.name}}Traits.CastFromBytes(arrow.{{.Name}}Traits.CastToBytes(values))) |
| min, max = {{.name}}(umin), {{.name}}(umax) |
| } |
| {{- else}} |
| defMin := s.defaultMin() |
| defMax := s.defaultMax() |
| |
| min = defMin |
| max = defMax |
| |
| for _, v := range values { |
| {{- if or (eq .name "float32") (eq .name "float64") }} |
| min = s.minval(min, s.coalesce(v, defMin)) |
| max = s.maxval(max, s.coalesce(v, defMax)) |
| {{- else}} |
| min = s.minval(min, v) |
| max = s.maxval(max, v) |
| {{- end }} |
| } |
| {{- end}} |
| return |
| } |
| |
| func (s *{{.Name}}Statistics) getMinMaxSpaced(values []{{.name}}, validBits []byte, validBitsOffset int64) (min, max {{.name}}) { |
| min = s.defaultMin() |
| max = s.defaultMax() |
| |
| {{- if or (eq .name "int32") (eq .name "int64")}} |
| var fn func([]{{.name}}) ({{.name}}, {{.name}}) |
| if s.order == schema.SortSIGNED { |
| fn = utils.GetMinMax{{.Name}} |
| } else { |
| fn = func(v []{{.name}}) ({{.name}}, {{.name}}) { |
| umin, umax := utils.GetMinMaxU{{.name}}(arrow.U{{.name}}Traits.CastFromBytes(arrow.{{.Name}}Traits.CastToBytes(values))) |
| return {{.name}}(umin), {{.name}}(umax) |
| } |
| } |
| {{- end}} |
| |
| if s.bitSetReader == nil { |
| s.bitSetReader = utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(values))) |
| } else { |
| s.bitSetReader.Reset(validBits, validBitsOffset, int64(len(values))) |
| } |
| |
| for { |
| run := s.bitSetReader.NextRun() |
| if run.Length == 0 { |
| break |
| } |
| {{- if or (eq .name "int32") (eq .name "int64")}} |
| localMin, localMax := fn(values[int(run.Pos):int(run.Pos+run.Length)]) |
| if min > localMin { |
| min = localMin |
| } |
| if max < localMax { |
| max = localMax |
| } |
| {{- else}} |
| for _, v := range values[int(run.Pos):int(run.Pos+run.Length)] { |
| {{- if or (eq .name "float32") (eq .name "float64") }} |
| min = s.minval(min, coalesce(v, s.defaultMin()).({{.name}})) |
| max = s.maxval(max, coalesce(v, s.defaultMax()).({{.name}})) |
| {{- else}} |
| min = s.minval(min, v) |
| max = s.maxval(max, v) |
| {{- end }} |
| } |
| {{- end}} |
| } |
| return |
| } |
| |
| func (s *{{.Name}}Statistics) Min() {{.name}} { return s.min } |
| func (s *{{.Name}}Statistics) Max() {{.name}} { return s.max } |
| |
| // Merge merges the stats from other into this stat object, updating |
| // the null count, distinct count, number of values and the min/max if |
| // appropriate. |
| func (s *{{.Name}}Statistics) Merge(other TypedStatistics) { |
| rhs, ok := other.(*{{.Name}}Statistics) |
| if !ok { |
| panic("incompatible stat type merge") |
| } |
| |
| s.statistics.merge(rhs) |
| if rhs.HasMinMax() { |
| s.SetMinMax(rhs.Min(), rhs.Max()) |
| } |
| } |
| |
| // Update is used to add more values to the current stat object, finding the |
| // min and max values etc. |
| func (s *{{.Name}}Statistics) Update(values []{{.name}}, numNull int64) { |
| s.incNulls(numNull) |
| s.nvalues += int64(len(values)) |
| |
| if len(values) == 0 { |
| return |
| } |
| |
| s.SetMinMax(s.getMinMax(values)) |
| } |
| |
| // UpdateSpaced is just like Update, but for spaced values using validBits to determine |
| // and skip null values. |
| func (s *{{.Name}}Statistics) UpdateSpaced(values []{{.name}}, validBits []byte, validBitsOffset, numNull int64) { |
| s.incNulls(numNull) |
| notnull := int64(len(values)) - numNull |
| s.nvalues += notnull |
| |
| if notnull == 0 { |
| return |
| } |
| |
| s.SetMinMax(s.getMinMaxSpaced(values, validBits, validBitsOffset)) |
| } |
| |
| // SetMinMax updates the min and max values only if they are not currently set |
| // or if argMin is less than the current min / argMax is greater than the current max |
| func (s *{{.Name}}Statistics) SetMinMax(argMin, argMax {{.name}}) { |
| maybeMinMax := s.cleanStat([2]{{.name}}{argMin, argMax}) |
| if maybeMinMax == nil { |
| return |
| } |
| |
| min := (*maybeMinMax)[0] |
| max := (*maybeMinMax)[1] |
| |
| if !s.hasMinMax { |
| s.hasMinMax = true |
| s.min = min |
| s.max = max |
| } else { |
| if !s.less(s.min, min) { |
| s.min = min |
| } |
| if s.less(s.max, max) { |
| s.max = max |
| } |
| } |
| } |
| |
| // EncodeMin returns the encoded min value with plain encoding. |
| // |
| // ByteArray stats do not include the length in the encoding. |
| func (s *{{.Name}}Statistics) EncodeMin() []byte { |
| if s.HasMinMax() { |
| return s.plainEncode(s.min) |
| } |
| return nil |
| } |
| |
| // EncodeMax returns the current encoded max value with plain encoding |
| // |
| // ByteArray stats do not include the length in the encoding |
| func (s *{{.Name}}Statistics) EncodeMax() []byte{ |
| if s.HasMinMax() { |
| return s.plainEncode(s.max) |
| } |
| return nil |
| } |
| |
| // Encode returns a populated EncodedStatistics object |
| func (s *{{.Name}}Statistics) Encode() (enc EncodedStatistics, err error) { |
| defer func() { |
| if r := recover(); r != nil { |
| switch r := r.(type) { |
| case error: |
| err = r |
| case string: |
| err = xerrors.New(r) |
| default: |
| err = xerrors.Errorf("unknown error type thrown from panic: %v", r) |
| } |
| } |
| }() |
| if s.HasMinMax() { |
| enc.SetMax(s.EncodeMax()) |
| enc.SetMin(s.EncodeMin()) |
| } |
| if s.HasNullCount() { |
| enc.SetNullCount(s.NullCount()) |
| } |
| if s.HasDistinctCount() { |
| enc.SetDistinctCount(s.DistinctCount()) |
| } |
| return |
| } |
| {{end}} |
| |
| // NewStatistics uses the type in the column descriptor to construct the appropriate |
| // typed stats object. If mem is nil, then memory.DefaultAllocator will be used. |
| func NewStatistics(descr *schema.Column, mem memory.Allocator) TypedStatistics { |
| if mem == nil { |
| mem = memory.DefaultAllocator |
| } |
| switch descr.PhysicalType() { |
| {{- range .In}} |
| case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}: |
| return New{{.Name}}Statistics(descr, mem) |
| {{- end}} |
| default: |
| panic("not implemented") |
| } |
| } |
| |
| // NewStatisticsFromEncoded uses the provided information to initialize a typed stat object |
| // by checking the type of the provided column descriptor. |
| // |
| // If mem is nil, then memory.DefaultAllocator is used. |
| func NewStatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalues int64, encoded StatProvider) TypedStatistics { |
| if mem == nil { |
| mem = memory.DefaultAllocator |
| } |
| switch descr.PhysicalType() { |
| {{- range .In}} |
| case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}: |
| return New{{.Name}}StatisticsFromEncoded(descr, mem, nvalues, encoded) |
| {{- end}} |
| default: |
| panic("not implemented") |
| } |
| } |