blob: 0388442649003d0f675a37e73c69996f833e9380 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package schema
import (
"github.com/apache/arrow/go/parquet"
format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet"
"github.com/apache/thrift/lib/go/thrift"
"golang.org/x/xerrors"
)
// NodeType describes whether the Node is a Primitive or Group node
type NodeType int
// the available constants for NodeType
const (
Primitive NodeType = iota
Group
)
// Node is the interface for both Group and Primitive Nodes.
// A logical schema type has a name, repetition level, and optionally
// a logical type (converted type is the deprecated version of the logical
// type concept, which is maintained for forward compatibility)
type Node interface {
Name() string
Type() NodeType
RepetitionType() parquet.Repetition
ConvertedType() ConvertedType
LogicalType() LogicalType
FieldID() int32
Parent() Node
SetParent(Node)
Path() string
Equals(Node) bool
Visit(v Visitor)
toThrift() *format.SchemaElement
}
// Visitor is an interface for creating functionality to walk the schema tree.
//
// A visitor can be passed to the Visit function of a Node in order to walk
// the tree. VisitPre is called the first time a node is encountered. If
// it is a group node, the return is checked and if it is false, the children
// will be skipped.
//
// VisitPost is called after visiting any children
type Visitor interface {
VisitPre(Node) bool
VisitPost(Node)
}
// ColumnPathFromNode walks the parents of the given node to construct it's
// column path
func ColumnPathFromNode(n Node) parquet.ColumnPath {
if n == nil {
return nil
}
c := make([]string, 0)
// build the path in reverse order as we traverse nodes to the top
cursor := n
for cursor.Parent() != nil {
c = append(c, cursor.Name())
cursor = cursor.Parent()
}
// reverse the order of the list in place so that our result
// is in the proper, correct order.
for i := len(c)/2 - 1; i >= 0; i-- {
opp := len(c) - 1 - i
c[i], c[opp] = c[opp], c[i]
}
return c
}
// node is the base embedded struct for both group and primitive nodes
type node struct {
typ NodeType
parent Node
name string
repetition parquet.Repetition
fieldID int32
logicalType LogicalType
convertedType ConvertedType
colPath parquet.ColumnPath
}
func (n *node) toThrift() *format.SchemaElement { return nil }
func (n *node) Name() string { return n.name }
func (n *node) Type() NodeType { return n.typ }
func (n *node) RepetitionType() parquet.Repetition { return n.repetition }
func (n *node) ConvertedType() ConvertedType { return n.convertedType }
func (n *node) LogicalType() LogicalType { return n.logicalType }
func (n *node) FieldID() int32 { return n.fieldID }
func (n *node) Parent() Node { return n.parent }
func (n *node) SetParent(p Node) { n.parent = p }
func (n *node) Path() string {
return n.columnPath().String()
}
func (n *node) columnPath() parquet.ColumnPath {
if n.colPath == nil {
n.colPath = ColumnPathFromNode(n)
}
return n.colPath
}
func (n *node) Equals(rhs Node) bool {
return n.typ == rhs.Type() &&
n.Name() == rhs.Name() &&
n.RepetitionType() == rhs.RepetitionType() &&
n.ConvertedType() == rhs.ConvertedType() &&
n.FieldID() == rhs.FieldID() &&
n.LogicalType().Equals(rhs.LogicalType())
}
func (n *node) Visit(v Visitor) {}
// A PrimitiveNode is a type that is one of the primitive Parquet storage types. In addition to
// the other type metadata (name, repetition level, logical type), also has the
// physical storage type and their type-specific metadata (byte width, decimal
// parameters)
type PrimitiveNode struct {
node
ColumnOrder parquet.ColumnOrder
physicalType parquet.Type
typeLen int
decimalMetaData DecimalMetadata
}
// NewPrimitiveNodeLogical constructs a Primtive node using the provided logical type for a given
// physical type and typelength.
func NewPrimitiveNodeLogical(name string, repetition parquet.Repetition, logicalType LogicalType, physicalType parquet.Type, typeLen int, id int32) (*PrimitiveNode, error) {
n := &PrimitiveNode{
node: node{typ: Primitive, name: name, repetition: repetition, logicalType: logicalType, fieldID: id},
physicalType: physicalType,
typeLen: typeLen,
}
if logicalType != nil {
if !logicalType.IsNested() {
if logicalType.IsApplicable(physicalType, int32(typeLen)) {
n.convertedType, n.decimalMetaData = n.logicalType.ToConvertedType()
} else {
return nil, xerrors.Errorf("%s cannot be applied to primitive type %s", logicalType, physicalType)
}
} else {
return nil, xerrors.Errorf("nested logical type %s can not be applied to a non-group node", logicalType)
}
} else {
n.logicalType = NoLogicalType{}
n.convertedType, n.decimalMetaData = n.logicalType.ToConvertedType()
}
if !(n.logicalType != nil && !n.logicalType.IsNested() && n.logicalType.IsCompatible(n.convertedType, n.decimalMetaData)) {
return nil, xerrors.Errorf("invalid logical type %s", n.logicalType)
}
if n.physicalType == parquet.Types.FixedLenByteArray && n.typeLen <= 0 {
return nil, xerrors.New("invalid fixed length byte array length")
}
return n, nil
}
// NewPrimitiveNodeConverted constructs a primitive node from the given physical type and converted type,
// determining the logical type from the converted type.
func NewPrimitiveNodeConverted(name string, repetition parquet.Repetition, typ parquet.Type, converted ConvertedType, typeLen, precision, scale int, id int32) (*PrimitiveNode, error) {
n := &PrimitiveNode{
node: node{typ: Primitive, name: name, repetition: repetition, convertedType: converted, fieldID: id},
physicalType: typ,
typeLen: -1,
}
switch converted {
case ConvertedTypes.None:
case ConvertedTypes.UTF8, ConvertedTypes.JSON, ConvertedTypes.BSON:
if typ != parquet.Types.ByteArray {
return nil, xerrors.Errorf("parquet: %s can only annotate BYTE_LEN fields", typ)
}
case ConvertedTypes.Decimal:
switch typ {
case parquet.Types.Int32, parquet.Types.Int64, parquet.Types.ByteArray, parquet.Types.FixedLenByteArray:
default:
return nil, xerrors.New("parquet: DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED")
}
switch {
case precision <= 0:
return nil, xerrors.Errorf("parquet: invalid decimal precision: %d, must be between 1 and 38 inclusive", precision)
case scale < 0:
return nil, xerrors.Errorf("parquet: invalid decimal scale: %d, must be a number between 0 and precision inclusive", scale)
case scale > precision:
return nil, xerrors.Errorf("parquet: invalid decimal scale %d, cannot be greater than precision: %d", scale, precision)
}
n.decimalMetaData.IsSet = true
n.decimalMetaData.Precision = int32(precision)
n.decimalMetaData.Scale = int32(scale)
case ConvertedTypes.Date,
ConvertedTypes.TimeMillis,
ConvertedTypes.Int8,
ConvertedTypes.Int16,
ConvertedTypes.Int32,
ConvertedTypes.Uint8,
ConvertedTypes.Uint16,
ConvertedTypes.Uint32:
if typ != parquet.Types.Int32 {
return nil, xerrors.Errorf("parquet: %s can only annotate INT32", converted)
}
case ConvertedTypes.TimeMicros,
ConvertedTypes.TimestampMicros,
ConvertedTypes.TimestampMillis,
ConvertedTypes.Int64,
ConvertedTypes.Uint64:
if typ != parquet.Types.Int64 {
return nil, xerrors.Errorf("parquet: %s can only annotate INT64", converted)
}
case ConvertedTypes.Interval:
if typ != parquet.Types.FixedLenByteArray || typeLen != 12 {
return nil, xerrors.New("parquet: INTERVAL can only annotate FIXED_LEN_BYTE_ARRAY(12)")
}
case ConvertedTypes.Enum:
if typ != parquet.Types.ByteArray {
return nil, xerrors.New("parquet: ENUM can only annotate BYTE_ARRAY fields")
}
case ConvertedTypes.NA:
default:
return nil, xerrors.Errorf("parquet: %s cannot be applied to a primitive type", converted.String())
}
n.logicalType = n.convertedType.ToLogicalType(n.decimalMetaData)
if !(n.logicalType != nil && !n.logicalType.IsNested() && n.logicalType.IsCompatible(n.convertedType, n.decimalMetaData)) {
return nil, xerrors.Errorf("invalid logical type %s", n.logicalType)
}
if n.physicalType == parquet.Types.FixedLenByteArray {
if typeLen <= 0 {
return nil, xerrors.New("invalid fixed len byte array length")
}
n.typeLen = typeLen
}
return n, nil
}
func PrimitiveNodeFromThrift(elem *format.SchemaElement) (*PrimitiveNode, error) {
fieldID := int32(-1)
if elem.IsSetFieldID() {
fieldID = elem.GetFieldID()
}
if elem.IsSetLogicalType() {
return NewPrimitiveNodeLogical(elem.GetName(), parquet.Repetition(elem.GetRepetitionType()),
getLogicalType(elem.GetLogicalType()), parquet.Type(elem.GetType()), int(elem.GetTypeLength()),
fieldID)
} else if elem.IsSetConvertedType() {
return NewPrimitiveNodeConverted(elem.GetName(), parquet.Repetition(elem.GetRepetitionType()),
parquet.Type(elem.GetType()), ConvertedType(elem.GetConvertedType()),
int(elem.GetTypeLength()), int(elem.GetPrecision()), int(elem.GetScale()), fieldID)
}
return NewPrimitiveNodeLogical(elem.GetName(), parquet.Repetition(elem.GetRepetitionType()), NoLogicalType{}, parquet.Type(elem.GetType()), int(elem.GetTypeLength()), fieldID)
}
// NewPrimitiveNode constructs a primitive node with the ConvertedType of None and no logical type.
//
// Use NewPrimitiveNodeLogical and NewPrimitiveNodeConverted to specify the logical or converted type.
func NewPrimitiveNode(name string, repetition parquet.Repetition, typ parquet.Type, fieldID, typeLength int32) (*PrimitiveNode, error) {
return NewPrimitiveNodeLogical(name, repetition, nil, typ, int(typeLength), fieldID)
}
// Equals returns true if both nodes are primitive nodes with the same physical
// and converted/logical types.
func (p *PrimitiveNode) Equals(rhs Node) bool {
if !p.node.Equals(rhs) {
return false
}
other := rhs.(*PrimitiveNode)
if p == other {
return true
}
if p.PhysicalType() != other.PhysicalType() {
return false
}
equal := true
if p.ConvertedType() == ConvertedTypes.Decimal {
equal = equal &&
(p.decimalMetaData.Precision == other.decimalMetaData.Precision &&
p.decimalMetaData.Scale == other.decimalMetaData.Scale)
}
if p.PhysicalType() == parquet.Types.FixedLenByteArray {
equal = equal && p.TypeLength() == other.TypeLength()
}
return equal
}
// PhysicalType returns the proper Physical parquet.Type primitive that is used
// to store the values in this column.
func (p *PrimitiveNode) PhysicalType() parquet.Type { return p.physicalType }
// SetTypeLength will change the type length of the node, has no effect if the
// physical type is not FixedLength Byte Array
func (p *PrimitiveNode) SetTypeLength(length int) {
if p.PhysicalType() == parquet.Types.FixedLenByteArray {
p.typeLen = length
}
}
// TypeLength will be -1 if not a FixedLenByteArray column, otherwise will be the
// length of the FixedLen Byte Array
func (p *PrimitiveNode) TypeLength() int { return p.typeLen }
// DecimalMetadata returns the current metadata for the node. If not a decimal
// typed column, the return should have IsSet == false.
func (p *PrimitiveNode) DecimalMetadata() DecimalMetadata { return p.decimalMetaData }
// Visit is for implementing a Visitor pattern handler to walk a schema's tree. One
// example is the Schema Printer which walks the tree to print out the schema in order.
func (p *PrimitiveNode) Visit(v Visitor) {
v.VisitPre(p)
v.VisitPost(p)
}
func (p *PrimitiveNode) toThrift() *format.SchemaElement {
elem := &format.SchemaElement{
Name: p.Name(),
RepetitionType: format.FieldRepetitionTypePtr(format.FieldRepetitionType(p.RepetitionType())),
Type: format.TypePtr(format.Type(p.PhysicalType())),
}
if p.ConvertedType() != ConvertedTypes.None {
elem.ConvertedType = format.ConvertedTypePtr(format.ConvertedType(p.ConvertedType()))
}
if p.FieldID() >= 0 {
elem.FieldID = thrift.Int32Ptr(p.FieldID())
}
if p.logicalType != nil && p.logicalType.IsSerialized() && !p.logicalType.Equals(IntervalLogicalType{}) {
elem.LogicalType = p.logicalType.toThrift()
}
if p.physicalType == parquet.Types.FixedLenByteArray {
elem.TypeLength = thrift.Int32Ptr(int32(p.typeLen))
}
if p.decimalMetaData.IsSet {
elem.Precision = &p.decimalMetaData.Precision
elem.Scale = &p.decimalMetaData.Scale
}
return elem
}
// FieldList is an alias for a slice of Nodes
type FieldList []Node
// Len is equivalent to len(fieldlist)
func (f FieldList) Len() int { return len(f) }
// GroupNode is for mananging nested nodes like List, Map, etc.
type GroupNode struct {
node
fields FieldList
nameToIdx strIntMultimap
}
// NewGroupNodeConverted constructs a group node with the provided fields and converted type,
// determining the logical type from that converted type.
func NewGroupNodeConverted(name string, repetition parquet.Repetition, fields FieldList, converted ConvertedType, id int32) (n *GroupNode, err error) {
n = &GroupNode{
node: node{typ: Group, name: name, repetition: repetition, convertedType: converted, fieldID: id},
fields: fields,
}
n.logicalType = n.convertedType.ToLogicalType(DecimalMetadata{})
if !(n.logicalType != nil && (n.logicalType.IsNested() || n.logicalType.IsNone()) && n.logicalType.IsCompatible(n.convertedType, DecimalMetadata{})) {
err = xerrors.Errorf("invalid logical type %s", n.logicalType.String())
return
}
n.nameToIdx = make(strIntMultimap)
for idx, f := range n.fields {
f.SetParent(n)
n.nameToIdx.Add(f.Name(), idx)
}
return
}
// NewGroupNodeLogical constructs a group node with the provided fields and logical type,
// determining the converted type from the provided logical type.
func NewGroupNodeLogical(name string, repetition parquet.Repetition, fields FieldList, logical LogicalType, id int32) (n *GroupNode, err error) {
n = &GroupNode{
node: node{typ: Group, name: name, repetition: repetition, logicalType: logical, fieldID: id},
fields: fields,
}
if logical != nil {
if logical.IsNested() {
n.convertedType, _ = logical.ToConvertedType()
} else {
err = xerrors.Errorf("logical type %s cannot be applied to group node", logical)
return
}
} else {
n.logicalType = NoLogicalType{}
n.convertedType, _ = n.logicalType.ToConvertedType()
}
if !(n.logicalType != nil && (n.logicalType.IsNested() || n.logicalType.IsNone()) && n.logicalType.IsCompatible(n.convertedType, DecimalMetadata{})) {
err = xerrors.Errorf("invalid logical type %s", n.logicalType)
return
}
n.nameToIdx = make(strIntMultimap)
for idx, f := range n.fields {
f.SetParent(n)
n.nameToIdx.Add(f.Name(), idx)
}
return
}
// NewGroupNode constructs a new group node with the provided fields,
// but with converted type None and No Logical Type
func NewGroupNode(name string, repetition parquet.Repetition, fields FieldList, fieldID int32) (*GroupNode, error) {
return NewGroupNodeConverted(name, repetition, fields, ConvertedTypes.None, fieldID)
}
// Must is a convenience function for the NewNode functions that return a Node
// and an error, panic'ing if err != nil or returning the node
func Must(n Node, err error) Node {
if err != nil {
panic(err)
}
return n
}
// MustGroup is like Must, except it casts the node to a *GroupNode, which will panic
// if it is a primitive node.
func MustGroup(n Node, err error) *GroupNode {
if err != nil {
panic(err)
}
return n.(*GroupNode)
}
// MustPrimitive is like Must except it casts the node to *PrimitiveNode which will panic
// if it is a group node.
func MustPrimitive(n Node, err error) *PrimitiveNode {
if err != nil {
panic(err)
}
return n.(*PrimitiveNode)
}
func GroupNodeFromThrift(elem *format.SchemaElement, fields FieldList) (*GroupNode, error) {
id := int32(-1)
if elem.IsSetFieldID() {
id = elem.GetFieldID()
}
if elem.IsSetLogicalType() {
return NewGroupNodeLogical(elem.GetName(), parquet.Repetition(elem.GetRepetitionType()), fields, getLogicalType(elem.GetLogicalType()), id)
}
converted := ConvertedTypes.None
if elem.IsSetConvertedType() {
converted = ConvertedType(elem.GetConvertedType())
}
return NewGroupNodeConverted(elem.GetName(), parquet.Repetition(elem.GetRepetitionType()), fields, converted, id)
}
func (g *GroupNode) toThrift() *format.SchemaElement {
elem := &format.SchemaElement{
Name: g.name,
NumChildren: thrift.Int32Ptr(int32(len(g.fields))),
RepetitionType: format.FieldRepetitionTypePtr(format.FieldRepetitionType(g.RepetitionType())),
}
if g.convertedType != ConvertedTypes.None {
elem.ConvertedType = format.ConvertedTypePtr(format.ConvertedType(g.convertedType))
}
if g.fieldID >= 0 {
elem.FieldID = &g.fieldID
}
if g.logicalType != nil && g.logicalType.IsSerialized() {
elem.LogicalType = g.logicalType.toThrift()
}
return elem
}
// Equals will compare this node to the provided node and only return true if
// this node and all of it's children are the same as the passed in node and its
// children.
func (g *GroupNode) Equals(rhs Node) bool {
if !g.node.Equals(rhs) {
return false
}
other := rhs.(*GroupNode)
if g == other {
return true
}
if len(g.fields) != len(other.fields) {
return false
}
for idx, field := range g.fields {
if !field.Equals(other.fields[idx]) {
return false
}
}
return true
}
// NumFields returns the number of direct child fields for this group node
func (g *GroupNode) NumFields() int {
return len(g.fields)
}
// Field returns the node in the field list which is of the provided (0-based) index
func (g *GroupNode) Field(i int) Node {
return g.fields[i]
}
// FieldIndexByName provides the index for the field of the given name. Returns
// -1 if not found.
//
// If there are more than one field of this name, it returns the index for the first one.
func (g *GroupNode) FieldIndexByName(name string) int {
if idx, ok := g.nameToIdx[name]; ok {
return idx[0]
}
return -1
}
// FieldIndexByField looks up the index child of this node. Returns -1
// if n isn't a child of this group
func (g *GroupNode) FieldIndexByField(n Node) int {
if search, ok := g.nameToIdx[n.Name()]; ok {
for _, idx := range search {
if n == g.fields[idx] {
return idx
}
}
}
return -1
}
// Visit is for implementing a Visitor pattern handler to walk a schema's tree. One
// example is the Schema Printer which walks the tree to print out the schema in order.
func (g *GroupNode) Visit(v Visitor) {
if v.VisitPre(g) {
for _, field := range g.fields {
field.Visit(v)
}
}
v.VisitPost(g)
}
// HasRepeatedFields returns true if any of the children of this node have
// Repeated as its repetition type.
//
// This is recursive and will check the children of any group nodes that are children.
func (g *GroupNode) HasRepeatedFields() bool {
for _, field := range g.fields {
if field.RepetitionType() == parquet.Repetitions.Repeated {
return true
}
if field.Type() == Group {
return field.(*GroupNode).HasRepeatedFields()
}
}
return false
}
// NewInt32Node is a convenience factory for constructing an Int32 Primitive Node
func NewInt32Node(name string, rep parquet.Repetition, fieldID int32) *PrimitiveNode {
return MustPrimitive(NewPrimitiveNode(name, rep, parquet.Types.Int32, fieldID, -1))
}
// NewInt64Node is a convenience factory for constructing an Int64 Primitive Node
func NewInt64Node(name string, rep parquet.Repetition, fieldID int32) *PrimitiveNode {
return MustPrimitive(NewPrimitiveNode(name, rep, parquet.Types.Int64, fieldID, -1))
}
// NewInt96Node is a convenience factory for constructing an Int96 Primitive Node
func NewInt96Node(name string, rep parquet.Repetition, fieldID int32) *PrimitiveNode {
return MustPrimitive(NewPrimitiveNode(name, rep, parquet.Types.Int96, fieldID, -1))
}
// NewFloat32Node is a convenience factory for constructing an Float Primitive Node
func NewFloat32Node(name string, rep parquet.Repetition, fieldID int32) *PrimitiveNode {
return MustPrimitive(NewPrimitiveNode(name, rep, parquet.Types.Float, fieldID, -1))
}
// NewFloat64Node is a convenience factory for constructing an Double Primitive Node
func NewFloat64Node(name string, rep parquet.Repetition, fieldID int32) *PrimitiveNode {
return MustPrimitive(NewPrimitiveNode(name, rep, parquet.Types.Double, fieldID, -1))
}
// NewBooleanNode is a convenience factory for constructing an Boolean Primitive Node
func NewBooleanNode(name string, rep parquet.Repetition, fieldID int32) *PrimitiveNode {
return MustPrimitive(NewPrimitiveNode(name, rep, parquet.Types.Boolean, fieldID, -1))
}
// NewByteArrayNode is a convenience factory for constructing an Byte Array Primitive Node
func NewByteArrayNode(name string, rep parquet.Repetition, fieldID int32) *PrimitiveNode {
return MustPrimitive(NewPrimitiveNode(name, rep, parquet.Types.ByteArray, fieldID, -1))
}
// NewFixedLenByteArrayNode is a convenience factory for constructing an Fixed Length
// Byte Array Primitive Node of the given length
func NewFixedLenByteArrayNode(name string, rep parquet.Repetition, length int32, fieldID int32) *PrimitiveNode {
return MustPrimitive(NewPrimitiveNode(name, rep, parquet.Types.FixedLenByteArray, fieldID, length))
}