blob: 0ea2b5e81a00bcf4b4c2dc6c2e9a0cb7606b2ae6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metadata
import (
"bytes"
"encoding/binary"
"math"
"unsafe"
"github.com/apache/arrow/go/v6/arrow"
"github.com/apache/arrow/go/v6/arrow/memory"
"github.com/apache/arrow/go/v6/parquet"
"github.com/apache/arrow/go/v6/parquet/internal/debug"
"github.com/apache/arrow/go/v6/parquet/internal/encoding"
"github.com/apache/arrow/go/v6/parquet/internal/utils"
format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
"github.com/apache/arrow/go/v6/parquet/schema"
)
//go:generate go run ../../arrow/_tools/tmpl/main.go -i -data=../internal/encoding/physical_types.tmpldata statistics_types.gen.go.tmpl
type StatProvider interface {
GetMin() []byte
GetMax() []byte
GetNullCount() int64
GetDistinctCount() int64
IsSetMax() bool
IsSetMin() bool
IsSetNullCount() bool
IsSetDistinctCount() bool
}
// EncodedStatistics are raw statistics with encoded values that will be written
// to the parquet file, or was read from the parquet file.
type EncodedStatistics struct {
HasMax bool
Max []byte
HasMin bool
Min []byte
Signed bool
HasNullCount bool
NullCount int64
HasDistinctCount bool
DistinctCount int64
}
// ApplyStatSizeLimits sets the maximum size of the min/max values.
//
// from parquet-mr
// we don't write stats larger than the max size rather than truncating.
// the rationale is that some engines may use the minimum value in the page
// as the true minimum for aggregations and there is no way to mark that
// a value has been truncated and is a lower bound and not in the page
func (e *EncodedStatistics) ApplyStatSizeLimits(length int) {
if len(e.Max) > length {
e.HasMax = false
}
if len(e.Min) > length {
e.HasMin = false
}
}
// IsSet returns true iff one of the Has* values is true.
func (e *EncodedStatistics) IsSet() bool {
return e.HasMin || e.HasMax || e.HasNullCount || e.HasDistinctCount
}
// SetMax sets the encoded Max value to val and sets HasMax to true
func (e *EncodedStatistics) SetMax(val []byte) *EncodedStatistics {
e.Max = val[:]
e.HasMax = true
return e
}
// SetMin sets the encoded Min value to val, and sets HasMin to true
func (e *EncodedStatistics) SetMin(val []byte) *EncodedStatistics {
e.Min = val[:]
e.HasMin = true
return e
}
// SetNullCount sets the NullCount to val and sets HasNullCount to true
func (e *EncodedStatistics) SetNullCount(val int64) *EncodedStatistics {
e.NullCount = val
e.HasNullCount = true
return e
}
// SetDistinctCount sets the DistinctCount to val and sets HasDistinctCount to true
func (e *EncodedStatistics) SetDistinctCount(val int64) *EncodedStatistics {
e.DistinctCount = val
e.HasDistinctCount = true
return e
}
func (e *EncodedStatistics) ToThrift() (stats *format.Statistics) {
stats = format.NewStatistics()
if e.HasMin {
stats.MinValue = e.Min
// if sort order is SIGNED then the old min value must be set too for backwards compatibility
if e.Signed {
stats.Min = e.Min
}
}
if e.HasMax {
stats.MaxValue = e.Max
// if sort order is SIGNED then old max value must be set to
if e.Signed {
stats.Max = e.Max
}
}
if e.HasNullCount {
stats.NullCount = &e.NullCount
}
if e.HasDistinctCount {
stats.DistinctCount = &e.DistinctCount
}
return
}
// TypedStatistics is the base interface for dealing with stats as
// they are being populated
type TypedStatistics interface {
// Type is the underlying physical type for this stat block
Type() parquet.Type
// Returns true if there is a min and max value set for this stat object
HasMinMax() bool
// Returns true if a nullcount has been set
HasNullCount() bool
// returns true only if a distinct count has been set
// current implementation does of the writer does not automatically populate
// the distinct count right now.
HasDistinctCount() bool
NullCount() int64
DistinctCount() int64
NumValues() int64
// return the column descriptor that this stat object was initialized with
Descr() *schema.Column
// Encode the current min value and return the bytes. ByteArray does not
// include the len in the encoded bytes, otherwise this is identical to
// plain encoding
EncodeMin() []byte
// Encode the current max value and return the bytes. ByteArray does not
// include the len in the encoded bytes, otherwise this is identical to
// plain encoding
EncodeMax() []byte
// Populate an EncodedStatistics object from the current stats
Encode() (EncodedStatistics, error)
// Resets all values to 0 to enable reusing this stat object for multiple
// columns, by calling Encode to get the finished values and then calling
// reset
Reset()
// Merge the min/max/nullcounts and distinct count from the passed stat object
// into this one.
Merge(TypedStatistics)
}
type statistics struct {
descr *schema.Column
hasMinMax bool
hasNullCount bool
hasDistinctCount bool
mem memory.Allocator
nvalues int64
stats EncodedStatistics
order schema.SortOrder
encoder encoding.TypedEncoder
}
func (s *statistics) incNulls(n int64) {
s.stats.NullCount += n
s.hasNullCount = true
}
func (s *statistics) incDistinct(n int64) {
s.stats.DistinctCount += n
s.hasDistinctCount = true
}
func (s *statistics) Descr() *schema.Column { return s.descr }
func (s *statistics) Type() parquet.Type { return s.descr.PhysicalType() }
func (s *statistics) HasDistinctCount() bool { return s.hasDistinctCount }
func (s *statistics) HasMinMax() bool { return s.hasMinMax }
func (s *statistics) HasNullCount() bool { return s.hasNullCount }
func (s *statistics) NullCount() int64 { return s.stats.NullCount }
func (s *statistics) DistinctCount() int64 { return s.stats.DistinctCount }
func (s *statistics) NumValues() int64 { return s.nvalues }
func (s *statistics) Reset() {
s.stats.NullCount = 0
s.stats.DistinctCount = 0
s.nvalues = 0
s.hasMinMax = false
s.hasDistinctCount = false
s.hasNullCount = false
}
// base merge function for base non-typed stat object so we don't have to
// duplicate this in each of the typed implementations
func (s *statistics) merge(other TypedStatistics) {
s.nvalues += other.NumValues()
if other.HasNullCount() {
s.stats.NullCount += other.NullCount()
}
if other.HasDistinctCount() {
// this isn't technically correct as it should be keeping an actual set
// of the distinct values and then combining the sets to get a new count
// but for now we'll do this to match the C++ implementation at the current
// time.
s.stats.DistinctCount += other.DistinctCount()
}
}
func coalesce(val, fallback interface{}) interface{} {
switch v := val.(type) {
case float32:
if math.IsNaN(float64(v)) {
return fallback
}
case float64:
if math.IsNaN(v) {
return fallback
}
}
return val
}
func signedByteLess(a, b []byte) bool {
// signed comparison is used for integers encoded as big-endian twos complement
// integers (e.g. decimals)
// if at least one of the lengths is zero, we can short circuit
if len(a) == 0 || len(b) == 0 {
return len(a) == 0 && len(b) > 0
}
sa := *(*[]int8)(unsafe.Pointer(&a))
sb := *(*[]int8)(unsafe.Pointer(&b))
// we can short circuit for different signd numbers or for equal length byte
// arrays that have different first bytes. The equality requirement is necessary
// for sign extension cases. 0xFF10 should be equal to 0x10 (due to big endian sign extension)
if int8(0x80&uint8(sa[0])) != int8(0x80&uint8(sb[0])) || (len(sa) == len(sb) && sa[0] != sb[0]) {
return sa[0] < sb[0]
}
// when the lengths are unequal and the numbers are of the same sign, we need
// to do comparison by sign extending the shorter value first, and once we get
// to equal sized arrays, lexicographical unsigned comparison of everything but
// the first byte is sufficient.
if len(a) != len(b) {
var lead []byte
if len(a) > len(b) {
leadLen := len(a) - len(b)
lead = a[:leadLen]
a = a[leadLen:]
} else {
debug.Assert(len(a) < len(b), "something weird in byte slice signed comparison")
leadLen := len(b) - len(a)
lead = b[:leadLen]
b = b[leadLen:]
}
// compare extra bytes to the sign extension of the first byte of the other number
var extension byte
if sa[0] < 0 {
extension = 0xFF
}
notequal := false
for _, c := range lead {
if c != extension {
notequal = true
break
}
}
if notequal {
// since sign extension are extrema values for unsigned bytes:
//
// Four cases exist:
// negative values:
// b is the longer value
// b must be the lesser value: return false
// else:
// a must be the lesser value: return true
//
// positive values:
// b is the longer value
// values in b must be greater than a: return true
// else:
// values in a must be greater than b: return false
neg := sa[0] < 0
blonger := len(sa) < len(sb)
return neg != blonger
}
} else {
a = a[1:]
b = b[1:]
}
return bytes.Compare(a, b) == -1
}
func (BooleanStatistics) defaultMin() bool { return true }
func (BooleanStatistics) defaultMax() bool { return false }
func (s *Int32Statistics) defaultMin() int32 {
if s.order == schema.SortUNSIGNED {
val := math.MaxUint32
return int32(val)
}
return math.MaxInt32
}
func (s *Int32Statistics) defaultMax() int32 {
if s.order == schema.SortUNSIGNED {
return int32(0)
}
return math.MinInt32
}
func (s *Int64Statistics) defaultMin() int64 {
if s.order == schema.SortUNSIGNED {
val := uint64(math.MaxUint64)
return int64(val)
}
return math.MaxInt64
}
func (s *Int64Statistics) defaultMax() int64 {
if s.order == schema.SortUNSIGNED {
return int64(0)
}
return math.MinInt64
}
var (
defaultMinInt96 parquet.Int96
defaultMinUInt96 parquet.Int96
defaultMaxInt96 parquet.Int96
defaultMaxUInt96 parquet.Int96
)
func init() {
i96 := arrow.Uint32Traits.CastFromBytes(defaultMinInt96[:])
i96[0] = math.MaxUint32
i96[1] = math.MaxUint32
i96[2] = math.MaxInt32
i96 = arrow.Uint32Traits.CastFromBytes(defaultMinUInt96[:])
i96[0] = math.MaxUint32
i96[1] = math.MaxUint32
i96[2] = math.MaxUint32
// golang will initialize the bytes to 0
i96 = arrow.Uint32Traits.CastFromBytes(defaultMaxInt96[:])
i96[2] = math.MaxInt32 + 1
// defaultMaxUInt96 will be initialized to 0 as desired
}
func (s *Int96Statistics) defaultMin() parquet.Int96 {
if s.order == schema.SortUNSIGNED {
return defaultMinUInt96
}
return defaultMinInt96
}
func (s *Int96Statistics) defaultMax() parquet.Int96 {
if s.order == schema.SortUNSIGNED {
return defaultMaxUInt96
}
return defaultMaxInt96
}
func (Float32Statistics) defaultMin() float32 { return math.MaxFloat32 }
func (Float32Statistics) defaultMax() float32 { return -math.MaxFloat32 }
func (Float64Statistics) defaultMin() float64 { return math.MaxFloat64 }
func (Float64Statistics) defaultMax() float64 { return -math.MaxFloat64 }
func (ByteArrayStatistics) defaultMin() parquet.ByteArray { return nil }
func (ByteArrayStatistics) defaultMax() parquet.ByteArray { return nil }
func (FixedLenByteArrayStatistics) defaultMin() parquet.FixedLenByteArray { return nil }
func (FixedLenByteArrayStatistics) defaultMax() parquet.FixedLenByteArray { return nil }
func (BooleanStatistics) equal(a, b bool) bool { return a == b }
func (Int32Statistics) equal(a, b int32) bool { return a == b }
func (Int64Statistics) equal(a, b int64) bool { return a == b }
func (Float32Statistics) equal(a, b float32) bool { return a == b }
func (Float64Statistics) equal(a, b float64) bool { return a == b }
func (Int96Statistics) equal(a, b parquet.Int96) bool { return bytes.Equal(a[:], b[:]) }
func (ByteArrayStatistics) equal(a, b parquet.ByteArray) bool { return bytes.Equal(a, b) }
func (FixedLenByteArrayStatistics) equal(a, b parquet.FixedLenByteArray) bool {
return bytes.Equal(a, b)
}
func (BooleanStatistics) less(a, b bool) bool {
return !a && b
}
func (s *Int32Statistics) less(a, b int32) bool {
if s.order == schema.SortUNSIGNED {
return uint32(a) < uint32(b)
}
return a < b
}
func (s *Int64Statistics) less(a, b int64) bool {
if s.order == schema.SortUNSIGNED {
return uint64(a) < uint64(b)
}
return a < b
}
func (Float32Statistics) less(a, b float32) bool { return a < b }
func (Float64Statistics) less(a, b float64) bool { return a < b }
func (s *Int96Statistics) less(a, b parquet.Int96) bool {
i96a := arrow.Uint32Traits.CastFromBytes(a[:])
i96b := arrow.Uint32Traits.CastFromBytes(b[:])
a0, a1, a2 := utils.ToLEUint32(i96a[0]), utils.ToLEUint32(i96a[1]), utils.ToLEUint32(i96a[2])
b0, b1, b2 := utils.ToLEUint32(i96b[0]), utils.ToLEUint32(i96b[1]), utils.ToLEUint32(i96b[2])
if a2 != b2 {
// only the msb bit is by signed comparison
if s.order == schema.SortSIGNED {
return int32(a2) < int32(b2)
}
return a2 < b2
} else if a1 != b1 {
return a1 < b1
}
return a0 < b0
}
func (s *ByteArrayStatistics) less(a, b parquet.ByteArray) bool {
if s.order == schema.SortUNSIGNED {
return bytes.Compare(a, b) == -1
}
return signedByteLess([]byte(a), []byte(b))
}
func (s *FixedLenByteArrayStatistics) less(a, b parquet.FixedLenByteArray) bool {
if s.order == schema.SortUNSIGNED {
return bytes.Compare(a, b) == -1
}
return signedByteLess([]byte(a), []byte(b))
}
func (BooleanStatistics) cleanStat(minMax minmaxPairBoolean) *minmaxPairBoolean { return &minMax }
func (Int32Statistics) cleanStat(minMax minmaxPairInt32) *minmaxPairInt32 { return &minMax }
func (Int64Statistics) cleanStat(minMax minmaxPairInt64) *minmaxPairInt64 { return &minMax }
func (Int96Statistics) cleanStat(minMax minmaxPairInt96) *minmaxPairInt96 { return &minMax }
// in the case of floating point types, the following rules are applied as per parquet-mr:
// - if any of min/max is NaN, return nothing
// - if min is 0.0f replace with -0.0f
// - if max is -0.0f replace with 0.0f
//
// https://issues.apache.org/jira/browse/PARQUET-1222 tracks the official documenting of
// a well-defined order for floats and doubles.
func (Float32Statistics) cleanStat(minMax minmaxPairFloat32) *minmaxPairFloat32 {
if math.IsNaN(float64(minMax[0])) || math.IsNaN(float64(minMax[1])) {
return nil
}
if minMax[0] == math.MaxFloat32 && minMax[1] == -math.MaxFloat32 {
return nil
}
var zero float32 = 0
if minMax[0] == zero && !math.Signbit(float64(minMax[0])) {
minMax[0] = -minMax[0]
}
if minMax[1] == zero && math.Signbit(float64(minMax[1])) {
minMax[1] = -minMax[1]
}
return &minMax
}
func (Float64Statistics) cleanStat(minMax minmaxPairFloat64) *minmaxPairFloat64 {
if math.IsNaN(minMax[0]) || math.IsNaN(minMax[1]) {
return nil
}
if minMax[0] == math.MaxFloat64 && minMax[1] == -math.MaxFloat64 {
return nil
}
var zero float64 = 0
if minMax[0] == zero && !math.Signbit(minMax[0]) {
minMax[0] = -minMax[0]
}
if minMax[1] == zero && math.Signbit(minMax[1]) {
minMax[1] = -minMax[1]
}
return &minMax
}
func (ByteArrayStatistics) cleanStat(minMax minmaxPairByteArray) *minmaxPairByteArray {
if minMax[0] == nil || minMax[1] == nil {
return nil
}
return &minMax
}
func (FixedLenByteArrayStatistics) cleanStat(minMax minmaxPairFixedLenByteArray) *minmaxPairFixedLenByteArray {
if minMax[0] == nil || minMax[1] == nil {
return nil
}
return &minMax
}
func GetStatValue(typ parquet.Type, val []byte) interface{} {
switch typ {
case parquet.Types.Boolean:
return val[0] != 0
case parquet.Types.Int32:
return int32(binary.LittleEndian.Uint32(val))
case parquet.Types.Int64:
return int64(binary.LittleEndian.Uint64(val))
case parquet.Types.Int96:
p := parquet.Int96{}
copy(p[:], val)
return p
case parquet.Types.Float:
return math.Float32frombits(binary.LittleEndian.Uint32(val))
case parquet.Types.Double:
return math.Float64frombits(binary.LittleEndian.Uint64(val))
case parquet.Types.ByteArray:
fallthrough
case parquet.Types.FixedLenByteArray:
return val
}
return nil
}