| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| module ArrowJSON |
| |
| using Mmap |
| using StructTypes, JSON3, Tables, SentinelArrays, Arrow |
| |
| # read json files as "table" |
| # write to arrow stream/file |
| # read arrow stream/file back |
| |
| abstract type Type end |
| Type() = Null("null") |
| StructTypes.StructType(::Base.Type{Type}) = StructTypes.AbstractType() |
| |
| children(::Base.Type{T}) where {T} = Field[] |
| |
| mutable struct Int <: Type |
| name::String |
| bitWidth::Int64 |
| isSigned::Base.Bool |
| end |
| |
| Int() = Int("", 0, true) |
| Type(::Base.Type{T}) where {T <: Integer} = Int("int", 8 * sizeof(T), T <: Signed) |
| StructTypes.StructType(::Base.Type{Int}) = StructTypes.Mutable() |
| function juliatype(f, x::Int) |
| T = x.bitWidth == 8 ? Int8 : x.bitWidth == 16 ? Int16 : |
| x.bitWidth == 32 ? Int32 : x.bitWidth == 64 ? Int64 : Int128 |
| return x.isSigned ? T : unsigned(T) |
| end |
| |
| struct FloatingPoint <: Type |
| name::String |
| precision::String |
| end |
| |
| Type(::Base.Type{T}) where {T <: AbstractFloat} = FloatingPoint("floatingpoint", T == Float16 ? "HALF" : T == Float32 ? "SINGLE" : "DOUBLE") |
| StructTypes.StructType(::Base.Type{FloatingPoint}) = StructTypes.Struct() |
| juliatype(f, x::FloatingPoint) = x.precision == "HALF" ? Float16 : x.precision == "SINGLE" ? Float32 : Float64 |
| |
| struct FixedSizeBinary <: Type |
| name::String |
| byteWidth::Int64 |
| end |
| |
| Type(::Base.Type{NTuple{N, UInt8}}) where {N} = FixedSizeBinary("fixedsizebinary", N) |
| children(::Base.Type{NTuple{N, UInt8}}) where {N} = Field[] |
| StructTypes.StructType(::Base.Type{FixedSizeBinary}) = StructTypes.Struct() |
| juliatype(f, x::FixedSizeBinary) = NTuple{x.byteWidth, UInt8} |
| |
| struct Decimal <: Type |
| name::String |
| precision::Int32 |
| scale::Int32 |
| end |
| |
| Type(::Base.Type{Arrow.Decimal{P, S, T}}) where {P, S, T} = Decimal("decimal", P, S) |
| StructTypes.StructType(::Base.Type{Decimal}) = StructTypes.Struct() |
| juliatype(f, x::Decimal) = Arrow.Decimal{x.precision, x.scale, Int128} |
| |
| mutable struct Timestamp <: Type |
| name::String |
| unit::String |
| timezone::Union{Nothing ,String} |
| end |
| |
| Timestamp() = Timestamp("", "", nothing) |
| unit(U) = U == Arrow.Meta.TimeUnit.SECOND ? "SECOND" : |
| U == Arrow.Meta.TimeUnit.MILLISECOND ? "MILLISECOND" : |
| U == Arrow.Meta.TimeUnit.MICROSECOND ? "MICROSECOND" : "NANOSECOND" |
| Type(::Base.Type{Arrow.Timestamp{U, TZ}}) where {U, TZ} = Timestamp("timestamp", unit(U), TZ === nothing ? nothing : String(TZ)) |
| StructTypes.StructType(::Base.Type{Timestamp}) = StructTypes.Mutable() |
| unitT(u) = u == "SECOND" ? Arrow.Meta.TimeUnit.SECOND : |
| u == "MILLISECOND" ? Arrow.Meta.TimeUnit.MILLISECOND : |
| u == "MICROSECOND" ? Arrow.Meta.TimeUnit.MICROSECOND : Arrow.Meta.TimeUnit.NANOSECOND |
| juliatype(f, x::Timestamp) = Arrow.Timestamp{unitT(x.unit), x.timezone === nothing ? nothing : Symbol(x.timezone)} |
| |
| struct Duration <: Type |
| name::String |
| unit::String |
| end |
| |
| Type(::Base.Type{Arrow.Duration{U}}) where {U} = Duration("duration", unit(U)) |
| StructTypes.StructType(::Base.Type{Duration}) = StructTypes.Struct() |
| juliatype(f, x::Duration) = Arrow.Duration{unit%(x.unit)} |
| |
| struct Date <: Type |
| name::String |
| unit::String |
| end |
| |
| Type(::Base.Type{Arrow.Date{U, T}}) where {U, T} = Date("date", U == Arrow.Meta.DateUnit.DAY ? "DAY" : "MILLISECOND") |
| StructTypes.StructType(::Base.Type{Date}) = StructTypes.Struct() |
| juliatype(f, x::Date) = Arrow.Date{x.unit == "DAY" ? Arrow.Meta.DateUnit.DAY : Arrow.Meta.DateUnit.MILLISECOND, x.unit == "DAY" ? Int32 : Int64} |
| |
| struct Time <: Type |
| name::String |
| unit::String |
| bitWidth::Int64 |
| end |
| |
| Type(::Base.Type{Arrow.Time{U, T}}) where {U, T} = Time("time", unit(U), 8 * sizeof(T)) |
| StructTypes.StructType(::Base.Type{Time}) = StructTypes.Struct() |
| juliatype(f, x::Time) = Arrow.Time{unitT(x.unit), x.unit == "SECOND" || x.unit == "MILLISECOND" ? Int32 : Int64} |
| |
| struct Interval <: Type |
| name::String |
| unit::String |
| end |
| |
| Type(::Base.Type{Arrow.Interval{U, T}}) where {U, T} = Interval("interval", U == Arrow.Meta.IntervalUnit.YEAR_MONTH ? "YEAR_MONTH" : "DAY_TIME") |
| StructTypes.StructType(::Base.Type{Interval}) = StructTypes.Struct() |
| juliatype(f, x::Interval) = Arrow.Interval{x.unit == "YEAR_MONTH" ? Arrow.Meta.IntervalUnit.YEAR_MONTH : Arrow.Meta.IntervalUnit.DAY_TIME, x.unit == "YEAR_MONTH" ? Int32 : Int64} |
| |
| struct UnionT <: Type |
| name::String |
| mode::String |
| typIds::Vector{Int64} |
| end |
| |
| Type(::Base.Type{Arrow.UnionT{T, typeIds, U}}) where {T, typeIds, U} = UnionT("union", T == Arrow.Meta.UnionMode.Dense ? "DENSE" : "SPARSE", collect(typeIds)) |
| children(::Base.Type{Arrow.UnionT{T, typeIds, U}}) where {T, typeIds, U} = Field[Field("", fieldtype(U, i), nothing) for i = 1:fieldcount(U)] |
| StructTypes.StructType(::Base.Type{UnionT}) = StructTypes.Struct() |
| juliatype(f, x::UnionT) = Arrow.UnionT{x.mode == "DENSE" ? Arrow.Meta.UnionMode.DENSE : Arrow.Meta.UnionMode.SPARSE, Tuple(x.typeIds), Tuple{(juliatype(y) for y in f.children)...}} |
| |
| struct List <: Type |
| name::String |
| end |
| |
| Type(::Base.Type{Vector{T}}) where {T} = List("list") |
| children(::Base.Type{Vector{T}}) where {T} = [Field("item", T, nothing)] |
| StructTypes.StructType(::Base.Type{List}) = StructTypes.Struct() |
| juliatype(f, x::List) = Vector{juliatype(f.children[1])} |
| |
| struct LargeList <: Type |
| name::String |
| end |
| |
| StructTypes.StructType(::Base.Type{LargeList}) = StructTypes.Struct() |
| juliatype(f, x::LargeList) = Vector{juliatype(f.children[1])} |
| |
| struct FixedSizeList <: Type |
| name::String |
| listSize::Int64 |
| end |
| |
| Type(::Base.Type{NTuple{N, T}}) where {N, T} = FixedSizeList("fixedsizelist", N) |
| children(::Base.Type{NTuple{N, T}}) where {N, T} = [Field("item", T, nothing)] |
| StructTypes.StructType(::Base.Type{FixedSizeList}) = StructTypes.Struct() |
| juliatype(f, x::FixedSizeList) = NTuple{x.listSize, juliatype(f.children[1])} |
| |
| struct Struct <: Type |
| name::String |
| end |
| |
| Type(::Base.Type{NamedTuple{names, types}}) where {names, types} = Struct("struct") |
| children(::Base.Type{NamedTuple{names, types}}) where {names, types} = [Field(names[i], fieldtype(types, i), nothing) for i = 1:length(names)] |
| StructTypes.StructType(::Base.Type{Struct}) = StructTypes.Struct() |
| juliatype(f, x::Struct) = NamedTuple{Tuple(Symbol(x.name) for x in f.children), Tuple{(juliatype(y) for y in f.children)...}} |
| |
| struct Map <: Type |
| name::String |
| keysSorted::Base.Bool |
| end |
| |
| Type(::Base.Type{Dict{K, V}}) where {K, V} = Map("map", false) |
| children(::Base.Type{Dict{K, V}}) where {K, V} = [Field("entries", Arrow.KeyValue{K, V}, nothing)] |
| StructTypes.StructType(::Base.Type{Map}) = StructTypes.Struct() |
| juliatype(f, x::Map) = Dict{juliatype(f.children[1].children[1]), juliatype(f.children[1].children[2])} |
| |
| Type(::Base.Type{Arrow.KeyValue{K, V}}) where {K, V} = Struct("struct") |
| children(::Base.Type{Arrow.KeyValue{K, V}}) where {K, V} = [Field("key", K, nothing), Field("value", V, nothing)] |
| |
| struct Null <: Type |
| name::String |
| end |
| |
| Type(::Base.Type{Missing}) = Null("null") |
| StructTypes.StructType(::Base.Type{Null}) = StructTypes.Struct() |
| juliatype(f, x::Null) = Missing |
| |
| struct Utf8 <: Type |
| name::String |
| end |
| |
| Type(::Base.Type{<:String}) = Utf8("utf8") |
| StructTypes.StructType(::Base.Type{Utf8}) = StructTypes.Struct() |
| juliatype(f, x::Utf8) = String |
| |
| struct LargeUtf8 <: Type |
| name::String |
| end |
| |
| StructTypes.StructType(::Base.Type{LargeUtf8}) = StructTypes.Struct() |
| juliatype(f, x::LargeUtf8) = String |
| |
| struct Binary <: Type |
| name::String |
| end |
| |
| Type(::Base.Type{Vector{UInt8}}) = Binary("binary") |
| children(::Base.Type{Vector{UInt8}}) = Field[] |
| StructTypes.StructType(::Base.Type{Binary}) = StructTypes.Struct() |
| juliatype(f, x::Binary) = Vector{UInt8} |
| |
| struct LargeBinary <: Type |
| name::String |
| end |
| |
| StructTypes.StructType(::Base.Type{LargeBinary}) = StructTypes.Struct() |
| juliatype(f, x::LargeBinary) = Vector{UInt8} |
| |
| struct Bool <: Type |
| name::String |
| end |
| |
| Type(::Base.Type{Base.Bool}) = Bool("bool") |
| StructTypes.StructType(::Base.Type{Bool}) = StructTypes.Struct() |
| juliatype(f, x::Bool) = Base.Bool |
| |
| StructTypes.subtypekey(::Base.Type{Type}) = :name |
| |
| const SUBTYPES = @eval ( |
| int=Int, |
| floatingpoint=FloatingPoint, |
| fixedsizebinary=FixedSizeBinary, |
| decimal=Decimal, |
| timestamp=Timestamp, |
| duration=Duration, |
| date=Date, |
| time=Time, |
| interval=Interval, |
| union=UnionT, |
| list=List, |
| largelist=LargeList, |
| fixedsizelist=FixedSizeList, |
| $(Symbol("struct"))=Struct, |
| map=Map, |
| null=Null, |
| utf8=Utf8, |
| largeutf8=LargeUtf8, |
| binary=Binary, |
| largebinary=LargeBinary, |
| bool=Bool |
| ) |
| |
| StructTypes.subtypes(::Base.Type{Type}) = SUBTYPES |
| |
| const Metadata = Union{Nothing, Vector{NamedTuple{(:key, :value), Tuple{String, String}}}} |
| Metadata() = nothing |
| |
| mutable struct DictEncoding |
| id::Int64 |
| indexType::Type |
| isOrdered::Base.Bool |
| end |
| |
| DictEncoding() = DictEncoding(0, Type(), false) |
| StructTypes.StructType(::Base.Type{DictEncoding}) = StructTypes.Mutable() |
| |
| mutable struct Field |
| name::String |
| nullable::Base.Bool |
| type::Type |
| children::Vector{Field} |
| dictionary::Union{DictEncoding, Nothing} |
| metadata::Metadata |
| end |
| |
| Field() = Field("", true, Type(), Field[], nothing, Metadata()) |
| StructTypes.StructType(::Base.Type{Field}) = StructTypes.Mutable() |
| Base.copy(f::Field) = Field(f.name, f.nullable, f.type, f.children, f.dictionary, f.metadata) |
| |
| function juliatype(f::Field) |
| T = juliatype(f, f.type) |
| return f.nullable ? Union{T, Missing} : T |
| end |
| |
| function Field(nm, ::Base.Type{T}, dictencodings) where {T} |
| S = Arrow.maybemissing(T) |
| type = Type(S) |
| ch = children(S) |
| if dictencodings !== nothing && haskey(dictencodings, nm) |
| dict = dictencodings[nm] |
| else |
| dict = nothing |
| end |
| return Field(nm, T !== S, type, ch, dict, nothing) |
| end |
| |
| mutable struct Schema |
| fields::Vector{Field} |
| metadata::Metadata |
| end |
| |
| Schema() = Schema(Field[], Metadata()) |
| StructTypes.StructType(::Base.Type{Schema}) = StructTypes.Mutable() |
| |
| struct Offsets{T} <: AbstractVector{T} |
| data::Vector{T} |
| end |
| |
| Base.size(x::Offsets) = size(x.data) |
| Base.getindex(x::Offsets, i::Base.Int) = getindex(x.data, i) |
| |
| mutable struct FieldData |
| name::String |
| count::Int64 |
| VALIDITY::Union{Nothing, Vector{Int8}} |
| OFFSET::Union{Nothing, Offsets} |
| TYPE_ID::Union{Nothing, Vector{Int8}} |
| DATA::Union{Nothing, Vector{Any}} |
| children::Vector{FieldData} |
| end |
| |
| FieldData() = FieldData("", 0, nothing, nothing, nothing, nothing, FieldData[]) |
| StructTypes.StructType(::Base.Type{FieldData}) = StructTypes.Mutable() |
| |
| function FieldData(nm, ::Base.Type{T}, col, dictencodings) where {T} |
| if dictencodings !== nothing && haskey(dictencodings, nm) |
| refvals = DataAPI.refarray(col.data) |
| if refvals !== col.data |
| IT = eltype(refvals) |
| col = (x - one(T) for x in refvals) |
| else |
| _, de = dictencodings[nm] |
| IT = de.indexType |
| vals = unique(col) |
| col = Arrow.DictEncoder(col, vals, Arrow.encodingtype(length(vals))) |
| end |
| return FieldData(nm, IT, col, nothing) |
| end |
| S = Arrow.maybemissing(T) |
| len = Arrow._length(col) |
| VALIDITY = OFFSET = TYPE_ID = DATA = nothing |
| children = FieldData[] |
| if S <: Pair |
| return FieldData(nm, Vector{Arrow.KeyValue{Arrow._keytype(S), Arrow._valtype(S)}}, (Arrow.KeyValue(k, v) for (k, v) in pairs(col))) |
| elseif S !== Missing |
| # VALIDITY |
| VALIDITY = Int8[!ismissing(x) for x in col] |
| # OFFSET |
| if S <: Vector || S == String |
| lenfun = S == String ? x->ismissing(x) ? 0 : sizeof(x) : x->ismissing(x) ? 0 : length(x) |
| tot = sum(lenfun, col) |
| if tot > 2147483647 |
| OFFSET = String[String(lenfun(x)) for x in col] |
| pushfirst!(OFFSET, "0") |
| else |
| OFFSET = Int32[ismissing(x) ? 0 : lenfun(x) for x in col] |
| pushfirst!(OFFSET, 0) |
| end |
| OFFSET = Offsets(OFFSET) |
| push!(children, FieldData("item", eltype(S), Arrow.flatten(skipmissing(col)), dictencodings)) |
| elseif S <: NTuple |
| if Arrow.ArrowTypes.gettype(S) == UInt8 |
| DATA = [ismissing(x) ? Arrow.ArrowTypes.default(S) : String(collect(x)) for x in col] |
| else |
| push!(children, FieldData("item", Arrow.ArrowTypes.gettype(S), Arrow.flatten(coalesce(x, Arrow.ArrowTypes.default(S)) for x in col), dictencodings)) |
| end |
| elseif S <: NamedTuple |
| for (nm, typ) in zip(fieldnames(S), fieldtypes(S)) |
| push!(children, FieldData(String(nm), typ, (getfield(x, nm) for x in col), dictencodings)) |
| end |
| elseif S <: Arrow.UnionT |
| U = eltype(S) |
| tids = Arrow.typeids(S) === nothing ? (0:fieldcount(U)) : Arrow.typeids(S) |
| TYPE_ID = [x === missing ? 0 : tids[Arrow.isatypeid(x, U)] for x in col] |
| if Arrow.unionmode(S) == Arrow.Meta.UnionMode.Dense |
| offs = zeros(Int32, fieldcount(U)) |
| OFFSET = Int32[] |
| for x in col |
| idx = x === missing ? 1 : Arrow.isatypeid(x, U) |
| push!(OFFSET, offs[idx]) |
| offs[idx] += 1 |
| end |
| for i = 1:fieldcount(U) |
| SS = fieldtype(U, i) |
| push!(children, FieldData("$i", SS, Arrow.filtered(i == 1 ? Union{SS, Missing} : Arrow.maybemissing(SS), col), dictencodings)) |
| end |
| else |
| for i = 1:fieldcount(U) |
| SS = fieldtype(U, i) |
| push!(children, FieldData("$i", SS, Arrow.replaced(SS, col), dictencodings)) |
| end |
| end |
| elseif S <: KeyValue |
| push!(children, FieldData("key", Arrow.keyvalueK(S), (x.key for x in col), dictencodings)) |
| push!(children, FieldData("value", Arrow.keyvalueV(S), (x.value for x in col), dictencodings)) |
| end |
| end |
| return FieldData(nm, len, VALIDITY, OFFSET, TYPE_ID, DATA, children) |
| end |
| |
| mutable struct RecordBatch |
| count::Int64 |
| columns::Vector{FieldData} |
| end |
| |
| RecordBatch() = RecordBatch(0, FieldData[]) |
| StructTypes.StructType(::Base.Type{RecordBatch}) = StructTypes.Mutable() |
| |
| mutable struct DictionaryBatch |
| id::Int64 |
| data::RecordBatch |
| end |
| |
| DictionaryBatch() = DictionaryBatch(0, RecordBatch()) |
| StructTypes.StructType(::Base.Type{DictionaryBatch}) = StructTypes.Mutable() |
| |
| mutable struct DataFile <: Tables.AbstractColumns |
| schema::Schema |
| batches::Vector{RecordBatch} |
| dictionaries::Vector{DictionaryBatch} |
| end |
| |
| Base.propertynames(x::DataFile) = (:schema, :batches, :dictionaries) |
| |
| function Base.getproperty(df::DataFile, nm::Symbol) |
| if nm === :schema |
| return getfield(df, :schema) |
| elseif nm === :batches |
| return getfield(df, :batches) |
| elseif nm === :dictionaries |
| return getfield(df, :dictionaries) |
| end |
| return Tables.getcolumn(df, nm) |
| end |
| |
| DataFile() = DataFile(Schema(), RecordBatch[], DictionaryBatch[]) |
| StructTypes.StructType(::Base.Type{DataFile}) = StructTypes.Mutable() |
| |
| parsefile(file) = JSON3.read(Mmap.mmap(file), DataFile) |
| |
| # make DataFile satisfy Tables.jl interface |
| function Tables.partitions(x::DataFile) |
| if isempty(x.batches) |
| # special case empty batches by producing a single DataFile w/ schema |
| return (DataFile(x.schema, RecordBatch[], x.dictionaries),) |
| else |
| return (DataFile(x.schema, [x.batches[i]], x.dictionaries) for i = 1:length(x.batches)) |
| end |
| end |
| |
| Tables.columns(x::DataFile) = x |
| |
| function Tables.schema(x::DataFile) |
| names = map(x -> x.name, x.schema.fields) |
| types = map(x -> juliatype(x), x.schema.fields) |
| return Tables.Schema(names, types) |
| end |
| |
| Tables.columnnames(x::DataFile) = map(x -> Symbol(x.name), x.schema.fields) |
| |
| function Tables.getcolumn(x::DataFile, i::Base.Int) |
| field = x.schema.fields[i] |
| type = juliatype(field) |
| return ChainedVector(ArrowArray{type}[ArrowArray{type}(field, length(x.batches) > 0 ? x.batches[j].columns[i] : FieldData(), x.dictionaries) for j = 1:length(x.batches)]) |
| end |
| |
| function Tables.getcolumn(x::DataFile, nm::Symbol) |
| i = findfirst(x -> x.name == String(nm), x.schema.fields) |
| return Tables.getcolumn(x, i) |
| end |
| |
| struct ArrowArray{T} <: AbstractVector{T} |
| field::Field |
| fielddata::FieldData |
| dictionaries::Vector{DictionaryBatch} |
| end |
| ArrowArray(f::Field, fd::FieldData, d) = ArrowArray{juliatype(f)}(f, fd, d) |
| Base.size(x::ArrowArray) = (x.fielddata.count,) |
| |
| function Base.getindex(x::ArrowArray{T}, i::Base.Int) where {T} |
| @boundscheck checkbounds(x, i) |
| S = Base.nonmissingtype(T) |
| if x.field.dictionary !== nothing |
| fielddata = x.dictionaries[findfirst(y -> y.id == x.field.dictionary.id, x.dictionaries)].data.columns[1] |
| field = copy(x.field) |
| field.dictionary = nothing |
| idx = x.fielddata.DATA[i] + 1 |
| return ArrowArray(field, fielddata, x.dictionaries)[idx] |
| end |
| if T === Missing |
| return missing |
| elseif S <: UnionT |
| U = eltype(S) |
| tids = Arrow.typeids(S) === nothing ? (0:fieldcount(U)) : Arrow.typeids(S) |
| typeid = tids[x.fielddata.TYPE_ID[i]] |
| if Arrow.unionmode(S) == Arrow.Meta.UnionMode.DENSE |
| off = x.fielddata.OFFSET[i] |
| return ArrowArray(x.field.children[typeid+1], x.fielddata.children[typeid+1], x.dictionaries)[off] |
| else |
| return ArrowArray(x.field.children[typeid+1], x.fielddata.children[typeid+1], x.dictionaries)[i] |
| end |
| end |
| x.fielddata.VALIDITY[i] == 0 && return missing |
| if S <: Vector{UInt8} |
| return copy(codeunits(x.fielddata.DATA[i])) |
| elseif S <: String |
| return x.fielddata.DATA[i] |
| elseif S <: Vector |
| offs = x.fielddata.OFFSET |
| A = ArrowArray{eltype(S)}(x.field.children[1], x.fielddata.children[1], x.dictionaries) |
| return A[(offs[i] + 1):offs[i + 1]] |
| elseif S <: Dict |
| offs = x.fielddata.OFFSET |
| A = ArrowArray(x.field.children[1], x.fielddata.children[1], x.dictionaries) |
| return Dict(y.key => y.value for y in A[(offs[i] + 1):offs[i + 1]]) |
| elseif S <: Tuple |
| if Arrow.ArrowTypes.gettype(S) == UInt8 |
| A = x.fielddata.DATA |
| return Tuple(map(UInt8, collect(A[i][1:x.field.type.byteWidth]))) |
| else |
| sz = x.field.type.listSize |
| A = ArrowArray{Arrow.ArrowTypes.gettype(S)}(x.field.children[1], x.fielddata.children[1], x.dictionaries) |
| off = (i - 1) * sz + 1 |
| return Tuple(A[off:(off + sz - 1)]) |
| end |
| elseif S <: NamedTuple |
| data = (ArrowArray(x.field.children[j], x.fielddata.children[j], x.dictionaries)[i] for j = 1:length(x.field.children)) |
| return NamedTuple{fieldnames(S)}(Tuple(data)) |
| elseif S == Int64 || S == UInt64 |
| return parse(S, x.fielddata.DATA[i]) |
| elseif S <: Arrow.Decimal |
| str = x.fielddata.DATA[i] |
| return S(parse(Int128, str)) |
| elseif S <: Arrow.Date || S <: Arrow.Time |
| val = x.fielddata.DATA[i] |
| return Arrow.storagetype(S) == Int32 ? S(val) : S(parse(Int64, val)) |
| elseif S <: Arrow.Timestamp |
| return S(parse(Int64, x.fielddata.DATA[i])) |
| else |
| return S(x.fielddata.DATA[i]) |
| end |
| end |
| |
| # take any Tables.jl source and write out arrow json datafile |
| function DataFile(source) |
| fields = Field[] |
| metadata = nothing # TODO? |
| batches = RecordBatch[] |
| dictionaries = DictionaryBatch[] |
| dictencodings = Dict{String, Tuple{Base.Type, DictEncoding}}() |
| dictid = Ref(0) |
| for (i, tbl1) in Tables.partitions(source) |
| tbl = Arrow.toarrowtable(tbl1) |
| if i == 1 |
| sch = Tables.schema(tbl) |
| for (nm, T, col) in zip(sch.names, sch.types, Tables.Columns(tbl)) |
| if col isa Arrow.DictEncode |
| id = dictid[] |
| dictid[] += 1 |
| codes = DataAPI.refarray(col.data) |
| if codes !== col.data |
| IT = Type(eltype(codes)) |
| else |
| IT = Type(Arrow.encodingtype(length(unique(col)))) |
| end |
| dictencodings[String(nm)] = (T, DictEncoding(id, IT, false)) |
| end |
| push!(fields, Field(String(nm), T, dictencodings)) |
| end |
| end |
| # build record batch |
| len = Tables.rowcount(tbl) |
| columns = FieldData[] |
| for (nm, T, col) in zip(sch.names, sch.types, Tables.Columns(tbl)) |
| push!(columns, FieldData(String(nm), T, col, dictencodings)) |
| end |
| push!(batches, RecordBatch(len, columns)) |
| # build dictionaries |
| for (nm, (T, dictencoding)) in dictencodings |
| column = FieldData(nm, T, Tables.getcolumn(tbl, nm), nothing) |
| recordbatch = RecordBatch(len, [column]) |
| push!(dictionaries, DictionaryBatch(dictencoding.id, recordbatch)) |
| end |
| end |
| schema = Schema(fields, metadata) |
| return DataFile(schema, batches, dictionaries) |
| end |
| |
| function Base.isequal(df::DataFile, tbl::Arrow.Table) |
| Tables.schema(df) == Tables.schema(tbl) || return false |
| i = 1 |
| for (col1, col2) in zip(Tables.Columns(df), Tables.Columns(tbl)) |
| if !isequal(col1, col2) |
| @show i |
| return false |
| end |
| i += 1 |
| end |
| return true |
| end |
| |
| end |