blob: 773082fe7e17e7562b1c50320beca4043faa106a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package schema provides types and functions for manipulating and building parquet
// file schemas.
//
// Some of the utilities provided include building a schema using Struct Tags
// on a struct type, getting Column Paths from a node, and dealing with the
// converted and logical types for Parquet.
//
// Logical types specify ways to interpret the primitive types allowing the
// number of primitive types to be smaller and reuse efficient encodings.
// For instance a "string" is just a ByteArray column with a UTF-8 annotation
// or "String Logical Type".
//
// For more information about Logical and Converted Types, check:
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
package schema
import (
"fmt"
"io"
"strings"
"github.com/apache/arrow/go/v6/parquet"
format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
"golang.org/x/xerrors"
)
// Schema is the container for the converted Parquet schema with a computed
// information from the schema analysis needed for file reading
//
// * Column index to Node
//
// * Max repetition / definition levels for each primitive node
//
// The ColumnDescriptor objects produced by this class can be used to assist in
// the reconstruction of fully materialized data structures from the
// repetition-definition level encoding of nested data
type Schema struct {
root Node
leaves []*Column
nodeToLeaf map[*PrimitiveNode]int
leafToBase map[int]Node
leafToIndex strIntMultimap
}
// FromParquet converts a slice of thrift Schema Elements to the correct node type
func FromParquet(elems []*format.SchemaElement) (Node, error) {
if len(elems) == 0 {
return nil, xerrors.New("parquet: empty schema (no root)")
}
if elems[0].GetNumChildren() == 0 {
if len(elems) > 1 {
return nil, xerrors.New("parquet: schema had multiple nodes but root had no children")
}
// parquet file with no columns
return GroupNodeFromThrift(elems[0], []Node{})
}
// We don't check that the root node is repeated since this is not
// consistently set by implementations
var (
pos = 0
nextNode func() (Node, error)
)
nextNode = func() (Node, error) {
if pos == len(elems) {
return nil, xerrors.New("parquet: malformed schema: not enough elements")
}
elem := elems[pos]
pos++
if elem.GetNumChildren() == 0 {
return PrimitiveNodeFromThrift(elem)
}
fields := make([]Node, 0, elem.GetNumChildren())
for i := 0; i < int(elem.GetNumChildren()); i++ {
n, err := nextNode()
if err != nil {
return nil, err
}
fields = append(fields, n)
}
return GroupNodeFromThrift(elem, fields)
}
return nextNode()
}
// Root returns the group node that is the root of this schema
func (s *Schema) Root() *GroupNode {
return s.root.(*GroupNode)
}
// NumColumns returns the number of leaf nodes that are the actual primitive
// columns in this schema.
func (s *Schema) NumColumns() int {
return len(s.leaves)
}
// Equals returns true as long as the leaf columns are equal, doesn't take
// into account the groups and only checks whether the schemas are compatible
// at the physical storage level.
func (s *Schema) Equals(rhs *Schema) bool {
if s.NumColumns() != rhs.NumColumns() {
return false
}
for idx, c := range s.leaves {
if !c.Equals(rhs.Column(idx)) {
return false
}
}
return true
}
func (s *Schema) buildTree(n Node, maxDefLvl, maxRepLvl int16, base Node) {
switch n.RepetitionType() {
case parquet.Repetitions.Repeated:
maxRepLvl++
fallthrough
case parquet.Repetitions.Optional:
maxDefLvl++
}
switch n := n.(type) {
case *GroupNode:
for _, f := range n.fields {
s.buildTree(f, maxDefLvl, maxRepLvl, base)
}
case *PrimitiveNode:
s.nodeToLeaf[n] = len(s.leaves)
s.leaves = append(s.leaves, NewColumn(n, maxDefLvl, maxRepLvl))
s.leafToBase[len(s.leaves)-1] = base
s.leafToIndex.Add(n.Path(), len(s.leaves)-1)
}
}
// Column returns the (0-indexed) column of the provided index.
func (s *Schema) Column(i int) *Column {
return s.leaves[i]
}
// ColumnIndexByName looks up the column by it's full dot separated
// node path. If there are multiple columns that match, it returns the first one.
//
// Returns -1 if not found.
func (s *Schema) ColumnIndexByName(nodePath string) int {
if search, ok := s.leafToIndex[nodePath]; ok {
return search[0]
}
return -1
}
// ColumnIndexByNode returns the index of the column represented by this node.
//
// Returns -1 if not found.
func (s *Schema) ColumnIndexByNode(n Node) int {
if search, ok := s.leafToIndex[n.Path()]; ok {
for _, idx := range search {
if n == s.Column(idx).SchemaNode() {
return idx
}
}
}
return -1
}
// ColumnRoot returns the root node of a given column if it is under a
// nested group node, providing that root group node.
func (s *Schema) ColumnRoot(i int) Node {
return s.leafToBase[i]
}
// HasRepeatedFields returns true if any node in the schema has a repeated field type.
func (s *Schema) HasRepeatedFields() bool {
return s.root.(*GroupNode).HasRepeatedFields()
}
// UpdateColumnOrders must get a slice that is the same length as the number of leaf columns
// and is used to update the schema metadata Column Orders. len(orders) must equal s.NumColumns()
func (s *Schema) UpdateColumnOrders(orders []parquet.ColumnOrder) error {
if len(orders) != s.NumColumns() {
return xerrors.New("parquet: malformed schema: not enough ColumnOrder values")
}
visitor := schemaColumnOrderUpdater{orders, 0}
s.root.Visit(&visitor)
return nil
}
// NewSchema constructs a new Schema object from a root group node.
//
// Any fields with a field-id of -1 will be given an appropriate field number based on their order.
func NewSchema(root *GroupNode) *Schema {
s := &Schema{
root,
make([]*Column, 0),
make(map[*PrimitiveNode]int),
make(map[int]Node),
make(strIntMultimap),
}
for _, f := range root.fields {
s.buildTree(f, 0, 0, f)
}
return s
}
type schemaColumnOrderUpdater struct {
colOrders []parquet.ColumnOrder
leafCount int
}
func (s *schemaColumnOrderUpdater) VisitPre(n Node) bool {
if n.Type() == Primitive {
leaf := n.(*PrimitiveNode)
leaf.ColumnOrder = s.colOrders[s.leafCount]
s.leafCount++
}
return true
}
func (s *schemaColumnOrderUpdater) VisitPost(Node) {}
type toThriftVisitor struct {
elements []*format.SchemaElement
}
func (t *toThriftVisitor) VisitPre(n Node) bool {
t.elements = append(t.elements, n.toThrift())
return true
}
func (t *toThriftVisitor) VisitPost(Node) {}
// ToThrift converts a GroupNode to a slice of SchemaElements which is used
// for thrift serialization.
func ToThrift(schema *GroupNode) []*format.SchemaElement {
t := &toThriftVisitor{make([]*format.SchemaElement, 0)}
schema.Visit(t)
return t.elements
}
type schemaPrinter struct {
w io.Writer
indent int
indentWidth int
}
func (s *schemaPrinter) VisitPre(n Node) bool {
fmt.Fprint(s.w, strings.Repeat(" ", s.indent))
if n.Type() == Group {
g := n.(*GroupNode)
fmt.Fprintf(s.w, "%s group field_id=%d %s", g.RepetitionType(), g.FieldID(), g.Name())
_, invalid := g.logicalType.(UnknownLogicalType)
_, none := g.logicalType.(NoLogicalType)
if g.logicalType != nil && !invalid && !none {
fmt.Fprintf(s.w, " (%s)", g.logicalType)
} else if g.convertedType != ConvertedTypes.None {
fmt.Fprintf(s.w, " (%s)", g.convertedType)
}
fmt.Fprintln(s.w, " {")
s.indent += s.indentWidth
} else {
p := n.(*PrimitiveNode)
fmt.Fprintf(s.w, "%s %s field_id=%d %s", p.RepetitionType(), strings.ToLower(p.PhysicalType().String()), p.FieldID(), p.Name())
_, invalid := p.logicalType.(UnknownLogicalType)
_, none := p.logicalType.(NoLogicalType)
if p.logicalType != nil && !invalid && !none {
fmt.Fprintf(s.w, " (%s)", p.logicalType)
} else if p.convertedType == ConvertedTypes.Decimal {
fmt.Fprintf(s.w, " (%s(%d,%d))", p.convertedType, p.DecimalMetadata().Precision, p.DecimalMetadata().Scale)
} else if p.convertedType != ConvertedTypes.None {
fmt.Fprintf(s.w, " (%s)", p.convertedType)
}
fmt.Fprintln(s.w, ";")
}
return true
}
func (s *schemaPrinter) VisitPost(n Node) {
if n.Type() == Group {
s.indent -= s.indentWidth
fmt.Fprint(s.w, strings.Repeat(" ", s.indent))
fmt.Fprintln(s.w, "}")
}
}
// PrintSchema writes a string representation of the tree to w using the indent
// width provided.
func PrintSchema(n Node, w io.Writer, indentWidth int) {
n.Visit(&schemaPrinter{w, 0, indentWidth})
}
type strIntMultimap map[string][]int
func (f strIntMultimap) Add(key string, val int) bool {
if _, ok := f[key]; !ok {
f[key] = []int{val}
return false
}
f[key] = append(f[key], val)
return true
}