| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package iceberg |
| |
| import ( |
| "encoding/json" |
| "fmt" |
| "regexp" |
| "slices" |
| "strconv" |
| "strings" |
| "time" |
| |
| "github.com/apache/arrow-go/v18/arrow/decimal128" |
| ) |
| |
| var ( |
| regexFromBrackets = regexp.MustCompile(`^\w+\[(\d+)\]$`) |
| decimalRegex = regexp.MustCompile(`decimal\(\s*(\d+)\s*,\s*(\d+)\s*\)`) |
| ) |
| |
| type Properties map[string]string |
| |
| // Get returns the value of the key if it exists, otherwise it returns the default value. |
| func (p Properties) Get(key, defVal string) string { |
| if v, ok := p[key]; ok { |
| return v |
| } |
| return defVal |
| } |
| |
| // Type is an interface representing any of the available iceberg types, |
| // such as primitives (int32/int64/etc.) or nested types (list/struct/map). |
| type Type interface { |
| fmt.Stringer |
| Type() string |
| Equals(Type) bool |
| } |
| |
| // NestedType is an interface that allows access to the child fields of |
| // a nested type such as a list/struct/map type. |
| type NestedType interface { |
| Type |
| Fields() []NestedField |
| } |
| |
| type typeIFace struct { |
| Type |
| } |
| |
| func (t *typeIFace) MarshalJSON() ([]byte, error) { |
| if nested, ok := t.Type.(NestedType); ok { |
| return json.Marshal(nested) |
| } |
| return []byte(`"` + t.Type.Type() + `"`), nil |
| } |
| |
| func (t *typeIFace) UnmarshalJSON(b []byte) error { |
| var typename string |
| err := json.Unmarshal(b, &typename) |
| if err == nil { |
| switch typename { |
| case "boolean": |
| t.Type = BooleanType{} |
| case "int": |
| t.Type = Int32Type{} |
| case "long": |
| t.Type = Int64Type{} |
| case "float": |
| t.Type = Float32Type{} |
| case "double": |
| t.Type = Float64Type{} |
| case "date": |
| t.Type = DateType{} |
| case "time": |
| t.Type = TimeType{} |
| case "timestamp": |
| t.Type = TimestampType{} |
| case "timestamptz": |
| t.Type = TimestampTzType{} |
| case "string": |
| t.Type = StringType{} |
| case "uuid": |
| t.Type = UUIDType{} |
| case "binary": |
| t.Type = BinaryType{} |
| default: |
| switch { |
| case strings.HasPrefix(typename, "fixed"): |
| matches := regexFromBrackets.FindStringSubmatch(typename) |
| if len(matches) != 2 { |
| return fmt.Errorf("%w: %s", ErrInvalidTypeString, typename) |
| } |
| |
| n, _ := strconv.Atoi(matches[1]) |
| t.Type = FixedType{len: n} |
| case strings.HasPrefix(typename, "decimal"): |
| matches := decimalRegex.FindStringSubmatch(typename) |
| if len(matches) != 3 { |
| return fmt.Errorf("%w: %s", ErrInvalidTypeString, typename) |
| } |
| |
| prec, _ := strconv.Atoi(matches[1]) |
| scale, _ := strconv.Atoi(matches[2]) |
| t.Type = DecimalType{precision: prec, scale: scale} |
| default: |
| return fmt.Errorf("%w: unrecognized field type", ErrInvalidSchema) |
| } |
| } |
| return nil |
| } |
| |
| aux := struct { |
| TypeName string `json:"type"` |
| }{} |
| if err = json.Unmarshal(b, &aux); err != nil { |
| return err |
| } |
| |
| switch aux.TypeName { |
| case "list": |
| t.Type = &ListType{} |
| case "map": |
| t.Type = &MapType{} |
| case "struct": |
| t.Type = &StructType{} |
| default: |
| return fmt.Errorf("%w: %s", ErrInvalidTypeString, aux.TypeName) |
| } |
| |
| return json.Unmarshal(b, t.Type) |
| } |
| |
| type NestedField struct { |
| Type `json:"-"` |
| |
| ID int `json:"id"` |
| Name string `json:"name"` |
| Required bool `json:"required"` |
| Doc string `json:"doc,omitempty"` |
| InitialDefault any `json:"initial-default,omitempty"` |
| WriteDefault any `json:"write-default,omitempty"` |
| } |
| |
| func optOrReq(required bool) string { |
| if required { |
| return "required" |
| } |
| return "optional" |
| } |
| |
| func (n NestedField) String() string { |
| doc := n.Doc |
| if doc != "" { |
| doc = " (" + doc + ")" |
| } |
| |
| return fmt.Sprintf("%d: %s: %s %s%s", |
| n.ID, n.Name, optOrReq(n.Required), n.Type, doc) |
| } |
| |
| func (n *NestedField) Equals(other NestedField) bool { |
| return n.ID == other.ID && |
| n.Name == other.Name && |
| n.Required == other.Required && |
| n.Doc == other.Doc && |
| n.InitialDefault == other.InitialDefault && |
| n.WriteDefault == other.WriteDefault && |
| n.Type.Equals(other.Type) |
| } |
| |
| func (n NestedField) MarshalJSON() ([]byte, error) { |
| type Alias NestedField |
| return json.Marshal(struct { |
| Type *typeIFace `json:"type"` |
| *Alias |
| }{Type: &typeIFace{n.Type}, Alias: (*Alias)(&n)}) |
| } |
| |
| func (n *NestedField) UnmarshalJSON(b []byte) error { |
| type Alias NestedField |
| aux := struct { |
| Type typeIFace `json:"type"` |
| *Alias |
| }{ |
| Alias: (*Alias)(n), |
| } |
| |
| if err := json.Unmarshal(b, &aux); err != nil { |
| return err |
| } |
| |
| n.Type = aux.Type.Type |
| |
| return nil |
| } |
| |
| type StructType struct { |
| FieldList []NestedField `json:"fields"` |
| } |
| |
| func (s *StructType) Equals(other Type) bool { |
| st, ok := other.(*StructType) |
| if !ok { |
| return false |
| } |
| |
| return slices.EqualFunc(s.FieldList, st.FieldList, func(a, b NestedField) bool { |
| return a.Equals(b) |
| }) |
| } |
| |
| func (s *StructType) Fields() []NestedField { return s.FieldList } |
| |
| func (s *StructType) MarshalJSON() ([]byte, error) { |
| type Alias StructType |
| return json.Marshal(struct { |
| Type string `json:"type"` |
| *Alias |
| }{Type: s.Type(), Alias: (*Alias)(s)}) |
| } |
| |
| func (*StructType) Type() string { return "struct" } |
| func (s *StructType) String() string { |
| var b strings.Builder |
| b.WriteString("struct<") |
| for i, f := range s.FieldList { |
| if i != 0 { |
| b.WriteString(", ") |
| } |
| fmt.Fprintf(&b, "%d: %s: ", |
| f.ID, f.Name) |
| if f.Required { |
| b.WriteString("required ") |
| } else { |
| b.WriteString("optional ") |
| } |
| b.WriteString(f.Type.String()) |
| if f.Doc != "" { |
| b.WriteString(" (") |
| b.WriteString(f.Doc) |
| b.WriteByte(')') |
| } |
| } |
| b.WriteString(">") |
| |
| return b.String() |
| } |
| |
| type ListType struct { |
| ElementID int `json:"element-id"` |
| Element Type `json:"-"` |
| ElementRequired bool `json:"element-required"` |
| } |
| |
| func (l *ListType) MarshalJSON() ([]byte, error) { |
| type Alias ListType |
| return json.Marshal(struct { |
| Type string `json:"type"` |
| *Alias |
| Element *typeIFace `json:"element"` |
| }{Type: l.Type(), Alias: (*Alias)(l), Element: &typeIFace{l.Element}}) |
| } |
| |
| func (l *ListType) Equals(other Type) bool { |
| rhs, ok := other.(*ListType) |
| if !ok { |
| return false |
| } |
| |
| return l.ElementID == rhs.ElementID && |
| l.Element.Equals(rhs.Element) && |
| l.ElementRequired == rhs.ElementRequired |
| } |
| |
| func (l *ListType) Fields() []NestedField { |
| return []NestedField{l.ElementField()} |
| } |
| |
| func (l *ListType) ElementField() NestedField { |
| return NestedField{ |
| ID: l.ElementID, |
| Name: "element", |
| Type: l.Element, |
| Required: l.ElementRequired, |
| } |
| } |
| |
| func (*ListType) Type() string { return "list" } |
| func (l *ListType) String() string { return fmt.Sprintf("list<%s>", l.Element) } |
| |
| func (l *ListType) UnmarshalJSON(b []byte) error { |
| aux := struct { |
| ID int `json:"element-id"` |
| Elem typeIFace `json:"element"` |
| Req bool `json:"element-required"` |
| }{} |
| if err := json.Unmarshal(b, &aux); err != nil { |
| return err |
| } |
| |
| l.ElementID = aux.ID |
| l.Element = aux.Elem.Type |
| l.ElementRequired = aux.Req |
| return nil |
| } |
| |
| type MapType struct { |
| KeyID int `json:"key-id"` |
| KeyType Type `json:"-"` |
| ValueID int `json:"value-id"` |
| ValueType Type `json:"-"` |
| ValueRequired bool `json:"value-required"` |
| } |
| |
| func (m *MapType) MarshalJSON() ([]byte, error) { |
| type Alias MapType |
| return json.Marshal(struct { |
| Type string `json:"type"` |
| *Alias |
| KeyType *typeIFace `json:"key"` |
| ValueType *typeIFace `json:"value"` |
| }{Type: m.Type(), Alias: (*Alias)(m), |
| KeyType: &typeIFace{m.KeyType}, |
| ValueType: &typeIFace{m.ValueType}}) |
| } |
| |
| func (m *MapType) Equals(other Type) bool { |
| rhs, ok := other.(*MapType) |
| if !ok { |
| return false |
| } |
| |
| return m.KeyID == rhs.KeyID && |
| m.KeyType.Equals(rhs.KeyType) && |
| m.ValueID == rhs.ValueID && |
| m.ValueType.Equals(rhs.ValueType) && |
| m.ValueRequired == rhs.ValueRequired |
| } |
| |
| func (m *MapType) Fields() []NestedField { |
| return []NestedField{m.KeyField(), m.ValueField()} |
| } |
| |
| func (m *MapType) KeyField() NestedField { |
| return NestedField{ |
| Name: "key", |
| ID: m.KeyID, |
| Type: m.KeyType, |
| Required: true, |
| } |
| } |
| |
| func (m *MapType) ValueField() NestedField { |
| return NestedField{ |
| Name: "value", |
| ID: m.ValueID, |
| Type: m.ValueType, |
| Required: m.ValueRequired, |
| } |
| } |
| |
| func (*MapType) Type() string { return "map" } |
| func (m *MapType) String() string { |
| return fmt.Sprintf("map<%s, %s>", m.KeyType, m.ValueType) |
| } |
| |
| func (m *MapType) UnmarshalJSON(b []byte) error { |
| aux := struct { |
| KeyID int `json:"key-id"` |
| Key typeIFace `json:"key"` |
| ValueID int `json:"value-id"` |
| Value typeIFace `json:"value"` |
| ValueReq *bool `json:"value-required"` |
| }{} |
| if err := json.Unmarshal(b, &aux); err != nil { |
| return err |
| } |
| |
| m.KeyID, m.KeyType = aux.KeyID, aux.Key.Type |
| m.ValueID, m.ValueType = aux.ValueID, aux.Value.Type |
| if aux.ValueReq == nil { |
| m.ValueRequired = true |
| } else { |
| m.ValueRequired = *aux.ValueReq |
| } |
| return nil |
| } |
| |
| func FixedTypeOf(n int) FixedType { return FixedType{len: n} } |
| |
| type FixedType struct { |
| len int |
| } |
| |
| func (f FixedType) Equals(other Type) bool { |
| rhs, ok := other.(FixedType) |
| if !ok { |
| return false |
| } |
| |
| return f.len == rhs.len |
| } |
| func (f FixedType) Len() int { return f.len } |
| func (f FixedType) Type() string { return fmt.Sprintf("fixed[%d]", f.len) } |
| func (f FixedType) String() string { return fmt.Sprintf("fixed[%d]", f.len) } |
| func (f FixedType) primitive() {} |
| |
| func DecimalTypeOf(prec, scale int) DecimalType { |
| return DecimalType{precision: prec, scale: scale} |
| } |
| |
| type DecimalType struct { |
| precision, scale int |
| } |
| |
| func (d DecimalType) Equals(other Type) bool { |
| rhs, ok := other.(DecimalType) |
| if !ok { |
| return false |
| } |
| |
| return d.precision == rhs.precision && |
| d.scale == rhs.scale |
| } |
| |
| func (d DecimalType) Type() string { return fmt.Sprintf("decimal(%d, %d)", d.precision, d.scale) } |
| func (d DecimalType) String() string { return fmt.Sprintf("decimal(%d, %d)", d.precision, d.scale) } |
| func (d DecimalType) Precision() int { return d.precision } |
| func (d DecimalType) Scale() int { return d.scale } |
| func (DecimalType) primitive() {} |
| |
| type Decimal struct { |
| Val decimal128.Num |
| Scale int |
| } |
| |
| type PrimitiveType interface { |
| Type |
| primitive() |
| } |
| |
| type BooleanType struct{} |
| |
| func (BooleanType) Equals(other Type) bool { |
| _, ok := other.(BooleanType) |
| return ok |
| } |
| |
| func (BooleanType) primitive() {} |
| func (BooleanType) Type() string { return "boolean" } |
| func (BooleanType) String() string { return "boolean" } |
| |
| // Int32Type is the "int"/"integer" type of the iceberg spec. |
| type Int32Type struct{} |
| |
| func (Int32Type) Equals(other Type) bool { |
| _, ok := other.(Int32Type) |
| return ok |
| } |
| |
| func (Int32Type) primitive() {} |
| func (Int32Type) Type() string { return "int" } |
| func (Int32Type) String() string { return "int" } |
| |
| // Int64Type is the "long" type of the iceberg spec. |
| type Int64Type struct{} |
| |
| func (Int64Type) Equals(other Type) bool { |
| _, ok := other.(Int64Type) |
| return ok |
| } |
| |
| func (Int64Type) primitive() {} |
| func (Int64Type) Type() string { return "long" } |
| func (Int64Type) String() string { return "long" } |
| |
| // Float32Type is the "float" type in the iceberg spec. |
| type Float32Type struct{} |
| |
| func (Float32Type) Equals(other Type) bool { |
| _, ok := other.(Float32Type) |
| return ok |
| } |
| |
| func (Float32Type) primitive() {} |
| func (Float32Type) Type() string { return "float" } |
| func (Float32Type) String() string { return "float" } |
| |
| // Float64Type represents the "double" type of the iceberg spec. |
| type Float64Type struct{} |
| |
| func (Float64Type) Equals(other Type) bool { |
| _, ok := other.(Float64Type) |
| return ok |
| } |
| |
| func (Float64Type) primitive() {} |
| func (Float64Type) Type() string { return "double" } |
| func (Float64Type) String() string { return "double" } |
| |
| type Date int32 |
| |
| func (d Date) ToTime() time.Time { |
| return epochTM.AddDate(0, 0, int(d)) |
| } |
| |
| // DateType represents a calendar date without a timezone or time, |
| // represented as a 32-bit integer denoting the number of days since |
| // the unix epoch. |
| type DateType struct{} |
| |
| func (DateType) Equals(other Type) bool { |
| _, ok := other.(DateType) |
| return ok |
| } |
| |
| func (DateType) primitive() {} |
| func (DateType) Type() string { return "date" } |
| func (DateType) String() string { return "date" } |
| |
| type Time int64 |
| |
| // TimeType represents a number of microseconds since midnight. |
| type TimeType struct{} |
| |
| func (TimeType) Equals(other Type) bool { |
| _, ok := other.(TimeType) |
| return ok |
| } |
| |
| func (TimeType) primitive() {} |
| func (TimeType) Type() string { return "time" } |
| func (TimeType) String() string { return "time" } |
| |
| type Timestamp int64 |
| |
| func (t Timestamp) ToTime() time.Time { |
| return time.UnixMicro(int64(t)).UTC() |
| } |
| |
| func (t Timestamp) ToDate() Date { |
| tm := time.UnixMicro(int64(t)).UTC() |
| return Date(tm.Truncate(24*time.Hour).Unix() / int64((time.Hour * 24).Seconds())) |
| } |
| |
| // TimestampType represents a number of microseconds since the unix epoch |
| // without regard for timezone. |
| type TimestampType struct{} |
| |
| func (TimestampType) Equals(other Type) bool { |
| _, ok := other.(TimestampType) |
| return ok |
| } |
| |
| func (TimestampType) primitive() {} |
| func (TimestampType) Type() string { return "timestamp" } |
| func (TimestampType) String() string { return "timestamp" } |
| |
| // TimestampTzType represents a timestamp stored as UTC representing the |
| // number of microseconds since the unix epoch. |
| type TimestampTzType struct{} |
| |
| func (TimestampTzType) Equals(other Type) bool { |
| _, ok := other.(TimestampTzType) |
| return ok |
| } |
| |
| func (TimestampTzType) primitive() {} |
| func (TimestampTzType) Type() string { return "timestamptz" } |
| func (TimestampTzType) String() string { return "timestamptz" } |
| |
| type StringType struct{} |
| |
| func (StringType) Equals(other Type) bool { |
| _, ok := other.(StringType) |
| return ok |
| } |
| |
| func (StringType) primitive() {} |
| func (StringType) Type() string { return "string" } |
| func (StringType) String() string { return "string" } |
| |
| type UUIDType struct{} |
| |
| func (UUIDType) Equals(other Type) bool { |
| _, ok := other.(UUIDType) |
| return ok |
| } |
| |
| func (UUIDType) primitive() {} |
| func (UUIDType) Type() string { return "uuid" } |
| func (UUIDType) String() string { return "uuid" } |
| |
| type BinaryType struct{} |
| |
| func (BinaryType) Equals(other Type) bool { |
| _, ok := other.(BinaryType) |
| return ok |
| } |
| |
| func (BinaryType) primitive() {} |
| func (BinaryType) Type() string { return "binary" } |
| func (BinaryType) String() string { return "binary" } |
| |
| var PrimitiveTypes = struct { |
| Bool PrimitiveType |
| Int32 PrimitiveType |
| Int64 PrimitiveType |
| Float32 PrimitiveType |
| Float64 PrimitiveType |
| Date PrimitiveType |
| Time PrimitiveType |
| Timestamp PrimitiveType |
| TimestampTz PrimitiveType |
| String PrimitiveType |
| Binary PrimitiveType |
| UUID PrimitiveType |
| }{ |
| Bool: BooleanType{}, |
| Int32: Int32Type{}, |
| Int64: Int64Type{}, |
| Float32: Float32Type{}, |
| Float64: Float64Type{}, |
| Date: DateType{}, |
| Time: TimeType{}, |
| Timestamp: TimestampType{}, |
| TimestampTz: TimestampTzType{}, |
| String: StringType{}, |
| Binary: BinaryType{}, |
| UUID: UUIDType{}, |
| } |
| |
| // PromoteType promotes the type being read from a file to a requested read type. |
| // fileType is the type from the file being read |
| // readType is the requested readType |
| func PromoteType(fileType, readType Type) (Type, error) { |
| switch t := fileType.(type) { |
| case Int32Type: |
| if _, ok := readType.(Int64Type); ok { |
| return readType, nil |
| } |
| case Float32Type: |
| if _, ok := readType.(Float64Type); ok { |
| return readType, nil |
| } |
| case StringType: |
| if _, ok := readType.(BinaryType); ok { |
| return readType, nil |
| } |
| case BinaryType: |
| if _, ok := readType.(StringType); ok { |
| return readType, nil |
| } |
| case DecimalType: |
| if rt, ok := readType.(DecimalType); ok { |
| if t.precision <= rt.precision && t.scale <= rt.scale { |
| return readType, nil |
| } |
| return nil, fmt.Errorf("%w: cannot reduce precision from %s to %s", |
| ErrResolve, fileType, readType) |
| } |
| case FixedType: |
| if _, ok := readType.(UUIDType); ok && t.len == 16 { |
| return readType, nil |
| } |
| default: |
| if fileType.Equals(readType) { |
| return fileType, nil |
| } |
| } |
| |
| return nil, fmt.Errorf("%w: cannot promote %s to %s", ErrResolve, fileType, readType) |
| } |