# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module ArrowJSON

using Mmap
using StructTypes, JSON3, Tables, SentinelArrays, Arrow

# read json files as "table"
# write to arrow stream/file
# read arrow stream/file back

abstract type Type end
Type() = Null("null")
StructTypes.StructType(::Base.Type{Type}) = StructTypes.AbstractType()

children(::Base.Type{T}) where {T} = Field[]

mutable struct Int <: Type
    name::String
    bitWidth::Int64
    isSigned::Base.Bool
end

Int() = Int("", 0, true)
Type(::Base.Type{T}) where {T <: Integer} = Int("int", 8 * sizeof(T), T <: Signed)
StructTypes.StructType(::Base.Type{Int}) = StructTypes.Mutable()
function juliatype(f, x::Int)
    T = x.bitWidth == 8 ? Int8 : x.bitWidth == 16 ? Int16 :
        x.bitWidth == 32 ? Int32 : x.bitWidth == 64 ? Int64 : Int128
    return x.isSigned ? T : unsigned(T)
end

struct FloatingPoint <: Type
    name::String
    precision::String
end

Type(::Base.Type{T}) where {T <: AbstractFloat} = FloatingPoint("floatingpoint", T == Float16 ? "HALF" : T == Float32 ? "SINGLE" : "DOUBLE")
StructTypes.StructType(::Base.Type{FloatingPoint}) = StructTypes.Struct()
juliatype(f, x::FloatingPoint) = x.precision == "HALF" ? Float16 : x.precision == "SINGLE" ? Float32 : Float64

struct FixedSizeBinary <: Type
    name::String
    byteWidth::Int64
end

Type(::Base.Type{NTuple{N, UInt8}}) where {N} = FixedSizeBinary("fixedsizebinary", N)
children(::Base.Type{NTuple{N, UInt8}}) where {N} = Field[]
StructTypes.StructType(::Base.Type{FixedSizeBinary}) = StructTypes.Struct()
juliatype(f, x::FixedSizeBinary) = NTuple{x.byteWidth, UInt8}

struct Decimal <: Type
    name::String
    precision::Int32
    scale::Int32
end

Type(::Base.Type{Arrow.Decimal{P, S, T}}) where {P, S, T} = Decimal("decimal", P, S)
StructTypes.StructType(::Base.Type{Decimal}) = StructTypes.Struct()
juliatype(f, x::Decimal) = Arrow.Decimal{x.precision, x.scale, Int128}

mutable struct Timestamp <: Type
    name::String
    unit::String
    timezone::Union{Nothing ,String}
end

Timestamp() = Timestamp("", "", nothing)
unit(U) = U == Arrow.Meta.TimeUnits.SECOND ? "SECOND" :
          U == Arrow.Meta.TimeUnits.MILLISECOND ? "MILLISECOND" :
          U == Arrow.Meta.TimeUnits.MICROSECOND ? "MICROSECOND" : "NANOSECOND"
Type(::Base.Type{Arrow.Timestamp{U, TZ}}) where {U, TZ} = Timestamp("timestamp", unit(U), TZ === nothing ? nothing : String(TZ))
StructTypes.StructType(::Base.Type{Timestamp}) = StructTypes.Mutable()
unitT(u) = u == "SECOND" ? Arrow.Meta.TimeUnits.SECOND :
           u == "MILLISECOND" ? Arrow.Meta.TimeUnits.MILLISECOND :
           u == "MICROSECOND" ? Arrow.Meta.TimeUnits.MICROSECOND : Arrow.Meta.TimeUnits.NANOSECOND
juliatype(f, x::Timestamp) = Arrow.Timestamp{unitT(x.unit), x.timezone === nothing ? nothing : Symbol(x.timezone)}

struct Duration <: Type
    name::String
    unit::String
end

Type(::Base.Type{Arrow.Duration{U}}) where {U} = Duration("duration", unit(U))
StructTypes.StructType(::Base.Type{Duration}) = StructTypes.Struct()
juliatype(f, x::Duration) = Arrow.Duration{unit%(x.unit)}

struct Date <: Type
    name::String
    unit::String
end

Type(::Base.Type{Arrow.Date{U, T}}) where {U, T} = Date("date", U == Arrow.Meta.DateUnits.DAY ? "DAY" : "MILLISECOND")
StructTypes.StructType(::Base.Type{Date}) = StructTypes.Struct()
juliatype(f, x::Date) = Arrow.Date{x.unit == "DAY" ? Arrow.Meta.DateUnits.DAY : Arrow.Meta.DateUnits.MILLISECOND, x.unit == "DAY" ? Int32 : Int64}

struct Time <: Type
    name::String
    unit::String
    bitWidth::Int64
end

Type(::Base.Type{Arrow.Time{U, T}}) where {U, T} = Time("time", unit(U), 8 * sizeof(T))
StructTypes.StructType(::Base.Type{Time}) = StructTypes.Struct()
juliatype(f, x::Time) = Arrow.Time{unitT(x.unit), x.unit == "SECOND" || x.unit == "MILLISECOND" ? Int32 : Int64}

struct Interval <: Type
    name::String
    unit::String
end

Type(::Base.Type{Arrow.Interval{U, T}}) where {U, T} = Interval("interval", U == Arrow.Meta.IntervalUnits.YEAR_MONTH ? "YEAR_MONTH" : "DAY_TIME")
StructTypes.StructType(::Base.Type{Interval}) = StructTypes.Struct()
juliatype(f, x::Interval) = Arrow.Interval{x.unit == "YEAR_MONTH" ? Arrow.Meta.IntervalUnits.YEAR_MONTH : Arrow.Meta.IntervalUnits.DAY_TIME, x.unit == "YEAR_MONTH" ? Int32 : Int64}

struct UnionT <: Type
    name::String
    mode::String
    typIds::Vector{Int64}
end

Type(::Base.Type{Arrow.UnionT{T, typeIds, U}}) where {T, typeIds, U} = UnionT("union", T == Arrow.Meta.UnionModes.Dense ? "DENSE" : "SPARSE", collect(typeIds))
children(::Base.Type{Arrow.UnionT{T, typeIds, U}}) where {T, typeIds, U} = Field[Field("", fieldtype(U, i), nothing) for i = 1:fieldcount(U)]
StructTypes.StructType(::Base.Type{UnionT}) = StructTypes.Struct()
juliatype(f, x::UnionT) = Arrow.UnionT{x.mode == "DENSE" ? Arrow.Meta.UnionModes.DENSE : Arrow.Meta.UnionModes.SPARSE, Tuple(x.typeIds), Tuple{(juliatype(y) for y in f.children)...}}

struct List <: Type
    name::String
end

Type(::Base.Type{Vector{T}}) where {T} = List("list")
children(::Base.Type{Vector{T}}) where {T} = [Field("item", T, nothing)]
StructTypes.StructType(::Base.Type{List}) = StructTypes.Struct()
juliatype(f, x::List) = Vector{juliatype(f.children[1])}

struct LargeList <: Type
    name::String
end

StructTypes.StructType(::Base.Type{LargeList}) = StructTypes.Struct()
juliatype(f, x::LargeList) = Vector{juliatype(f.children[1])}

struct FixedSizeList <: Type
    name::String
    listSize::Int64
end

Type(::Base.Type{NTuple{N, T}}) where {N, T} = FixedSizeList("fixedsizelist", N)
children(::Base.Type{NTuple{N, T}}) where {N, T} = [Field("item", T, nothing)]
StructTypes.StructType(::Base.Type{FixedSizeList}) = StructTypes.Struct()
juliatype(f, x::FixedSizeList) = NTuple{x.listSize, juliatype(f.children[1])}

struct Struct <: Type
    name::String
end

Type(::Base.Type{NamedTuple{names, types}}) where {names, types} = Struct("struct")
children(::Base.Type{NamedTuple{names, types}}) where {names, types} = [Field(names[i], fieldtype(types, i), nothing) for i = 1:length(names)]
StructTypes.StructType(::Base.Type{Struct}) = StructTypes.Struct()
juliatype(f, x::Struct) = NamedTuple{Tuple(Symbol(x.name) for x in f.children), Tuple{(juliatype(y) for y in f.children)...}}

struct Map <: Type
    name::String
    keysSorted::Base.Bool
end

Type(::Base.Type{Dict{K, V}}) where {K, V} = Map("map", false)
children(::Base.Type{Dict{K, V}}) where {K, V} = [Field("entries", Arrow.KeyValue{K, V}, nothing)]
StructTypes.StructType(::Base.Type{Map}) = StructTypes.Struct()
juliatype(f, x::Map) = Dict{juliatype(f.children[1].children[1]), juliatype(f.children[1].children[2])}

Type(::Base.Type{Arrow.KeyValue{K, V}}) where {K, V} = Struct("struct")
children(::Base.Type{Arrow.KeyValue{K, V}}) where {K, V} = [Field("key", K, nothing), Field("value", V, nothing)]

struct Null <: Type
    name::String
end

Type(::Base.Type{Missing}) = Null("null")
StructTypes.StructType(::Base.Type{Null}) = StructTypes.Struct()
juliatype(f, x::Null) = Missing

struct Utf8 <: Type
    name::String
end

Type(::Base.Type{<:String}) = Utf8("utf8")
StructTypes.StructType(::Base.Type{Utf8}) = StructTypes.Struct()
juliatype(f, x::Utf8) = String

struct LargeUtf8 <: Type
    name::String
end

StructTypes.StructType(::Base.Type{LargeUtf8}) = StructTypes.Struct()
juliatype(f, x::LargeUtf8) = String

struct Binary <: Type
    name::String
end

Type(::Base.Type{Vector{UInt8}}) = Binary("binary")
children(::Base.Type{Vector{UInt8}}) = Field[]
StructTypes.StructType(::Base.Type{Binary}) = StructTypes.Struct()
juliatype(f, x::Binary) = Vector{UInt8}

struct LargeBinary <: Type
    name::String
end

StructTypes.StructType(::Base.Type{LargeBinary}) = StructTypes.Struct()
juliatype(f, x::LargeBinary) = Vector{UInt8}

struct Bool <: Type
    name::String
end

Type(::Base.Type{Base.Bool}) = Bool("bool")
StructTypes.StructType(::Base.Type{Bool}) = StructTypes.Struct()
juliatype(f, x::Bool) = Base.Bool

StructTypes.subtypekey(::Base.Type{Type}) = :name

const SUBTYPES = @eval (
    int=Int,
    floatingpoint=FloatingPoint,
    fixedsizebinary=FixedSizeBinary,
    decimal=Decimal,
    timestamp=Timestamp,
    duration=Duration,
    date=Date,
    time=Time,
    interval=Interval,
    union=UnionT,
    list=List,
    largelist=LargeList,
    fixedsizelist=FixedSizeList,
    $(Symbol("struct"))=Struct,
    map=Map,
    null=Null,
    utf8=Utf8,
    largeutf8=LargeUtf8,
    binary=Binary,
    largebinary=LargeBinary,
    bool=Bool
)

StructTypes.subtypes(::Base.Type{Type}) = SUBTYPES

const Metadata = Union{Nothing, Vector{NamedTuple{(:key, :value), Tuple{String, String}}}}
Metadata() = nothing

mutable struct DictEncoding
    id::Int64
    indexType::Type
    isOrdered::Base.Bool
end

DictEncoding() = DictEncoding(0, Type(), false)
StructTypes.StructType(::Base.Type{DictEncoding}) = StructTypes.Mutable()

mutable struct Field
    name::String
    nullable::Base.Bool
    type::Type
    children::Vector{Field}
    dictionary::Union{DictEncoding, Nothing}
    metadata::Metadata
end

Field() = Field("", true, Type(), Field[], nothing, Metadata())
StructTypes.StructType(::Base.Type{Field}) = StructTypes.Mutable()
Base.copy(f::Field) = Field(f.name, f.nullable, f.type, f.children, f.dictionary, f.metadata)

function juliatype(f::Field)
    T = juliatype(f, f.type)
    return f.nullable ? Union{T, Missing} : T
end

function Field(nm, ::Base.Type{T}, dictencodings) where {T}
    S = Arrow.maybemissing(T)
    type = Type(S)
    ch = children(S)
    if dictencodings !== nothing && haskey(dictencodings, nm)
        dict = dictencodings[nm]
    else
        dict = nothing
    end
    return Field(nm, T !== S, type, ch, dict, nothing)
end

mutable struct Schema
    fields::Vector{Field}
    metadata::Metadata
end

Schema() = Schema(Field[], Metadata())
StructTypes.StructType(::Base.Type{Schema}) = StructTypes.Mutable()

struct Offsets{T} <: AbstractVector{T}
    data::Vector{T}
end

Base.size(x::Offsets) = size(x.data)
Base.getindex(x::Offsets, i::Base.Int) = getindex(x.data, i)

mutable struct FieldData
    name::String
    count::Int64
    VALIDITY::Union{Nothing, Vector{Int8}}
    OFFSET::Union{Nothing, Offsets}
    TYPE_ID::Union{Nothing, Vector{Int8}}
    DATA::Union{Nothing, Vector{Any}}
    children::Vector{FieldData}
end

FieldData() = FieldData("", 0, nothing, nothing, nothing, nothing, FieldData[])
StructTypes.StructType(::Base.Type{FieldData}) = StructTypes.Mutable()

function FieldData(nm, ::Base.Type{T}, col, dictencodings) where {T}
    if dictencodings !== nothing && haskey(dictencodings, nm)
        refvals = DataAPI.refarray(col.data)
        if refvals !== col.data
            IT = eltype(refvals)
            col = (x - one(T) for x in refvals)
        else
            _, de = dictencodings[nm]
            IT = de.indexType
            vals = unique(col)
            col = Arrow.DictEncoder(col, vals, Arrow.encodingtype(length(vals)))
        end
        return FieldData(nm, IT, col, nothing)
    end
    S = Arrow.maybemissing(T)
    len = Arrow._length(col)
    VALIDITY = OFFSET = TYPE_ID = DATA = nothing
    children = FieldData[]
    if S <: Pair
        return FieldData(nm, Vector{Arrow.KeyValue{Arrow._keytype(S), Arrow._valtype(S)}}, (Arrow.KeyValue(k, v) for (k, v) in pairs(col)))
    elseif S !== Missing
        # VALIDITY
        VALIDITY = Int8[!ismissing(x) for x in col]
        # OFFSET
        if S <: Vector || S == String
            lenfun = S == String ? x->ismissing(x) ? 0 : sizeof(x) : x->ismissing(x) ? 0 : length(x)
            tot = sum(lenfun, col)
            if tot > 2147483647
                OFFSET = String[String(lenfun(x)) for x in col]
                pushfirst!(OFFSET, "0")
            else
                OFFSET = Int32[ismissing(x) ? 0 : lenfun(x) for x in col]
                pushfirst!(OFFSET, 0)
            end
            OFFSET = Offsets(OFFSET)
            push!(children, FieldData("item", eltype(S), Arrow.flatten(skipmissing(col)), dictencodings))
        elseif S <: NTuple
            if Arrow.ArrowTypes.gettype(Arrow.ArrowTypes.ArrowKind(S)) == UInt8
                DATA = [ismissing(x) ? Arrow.ArrowTypes.default(S) : String(collect(x)) for x in col]
            else
                push!(children, FieldData("item", Arrow.ArrowTypes.gettype(Arrow.ArrowTypes.ArrowKind(S)), Arrow.flatten(coalesce(x, Arrow.ArrowTypes.default(S)) for x in col), dictencodings))
            end
        elseif S <: NamedTuple
            for (nm, typ) in zip(fieldnames(S), fieldtypes(S))
                push!(children, FieldData(String(nm), typ, (getfield(x, nm) for x in col), dictencodings))
            end
        elseif S <: Arrow.UnionT
            U = eltype(S)
            tids = Arrow.typeids(S) === nothing ? (0:fieldcount(U)) : Arrow.typeids(S)
            TYPE_ID = [x === missing ? 0 : tids[Arrow.isatypeid(x, U)] for x in col]
            if Arrow.unionmode(S) == Arrow.Meta.UnionModes.Dense
                offs = zeros(Int32, fieldcount(U))
                OFFSET = Int32[]
                for x in col
                    idx = x === missing ? 1 : Arrow.isatypeid(x, U)
                    push!(OFFSET, offs[idx])
                    offs[idx] += 1
                end
                for i = 1:fieldcount(U)
                    SS = fieldtype(U, i)
                    push!(children, FieldData("$i", SS, Arrow.filtered(i == 1 ? Union{SS, Missing} : Arrow.maybemissing(SS), col), dictencodings))
                end
            else
                for i = 1:fieldcount(U)
                    SS = fieldtype(U, i)
                    push!(children, FieldData("$i", SS, Arrow.replaced(SS, col), dictencodings))
                end
            end
        elseif S <: KeyValue
            push!(children, FieldData("key", Arrow.keyvalueK(S), (x.key for x in col), dictencodings))
            push!(children, FieldData("value", Arrow.keyvalueV(S), (x.value for x in col), dictencodings))
        end
    end
    return FieldData(nm, len, VALIDITY, OFFSET, TYPE_ID, DATA, children)
end

mutable struct RecordBatch
    count::Int64
    columns::Vector{FieldData}
end

RecordBatch() = RecordBatch(0, FieldData[])
StructTypes.StructType(::Base.Type{RecordBatch}) = StructTypes.Mutable()

mutable struct DictionaryBatch
    id::Int64
    data::RecordBatch
end

DictionaryBatch() = DictionaryBatch(0, RecordBatch())
StructTypes.StructType(::Base.Type{DictionaryBatch}) = StructTypes.Mutable()

mutable struct DataFile <: Tables.AbstractColumns
    schema::Schema
    batches::Vector{RecordBatch}
    dictionaries::Vector{DictionaryBatch}
end

Base.propertynames(x::DataFile) = (:schema, :batches, :dictionaries)

function Base.getproperty(df::DataFile, nm::Symbol)
    if nm === :schema
        return getfield(df, :schema)
    elseif nm === :batches
        return getfield(df, :batches)
    elseif nm === :dictionaries
        return getfield(df, :dictionaries)
    end
    return Tables.getcolumn(df, nm)
end

DataFile() = DataFile(Schema(), RecordBatch[], DictionaryBatch[])
StructTypes.StructType(::Base.Type{DataFile}) = StructTypes.Mutable()

parsefile(file) = JSON3.read(Mmap.mmap(file), DataFile)

# make DataFile satisfy Tables.jl interface
function Tables.partitions(x::DataFile)
    if isempty(x.batches)
        # special case empty batches by producing a single DataFile w/ schema
        return (DataFile(x.schema, RecordBatch[], x.dictionaries),)
    else
        return (DataFile(x.schema, [x.batches[i]], x.dictionaries) for i = 1:length(x.batches))
    end
end

Tables.columns(x::DataFile) = x

function Tables.schema(x::DataFile)
    names = map(x -> x.name, x.schema.fields)
    types = map(x -> juliatype(x), x.schema.fields)
    return Tables.Schema(names, types)
end

Tables.columnnames(x::DataFile) =  map(x -> Symbol(x.name), x.schema.fields)

function Tables.getcolumn(x::DataFile, i::Base.Int)
    field = x.schema.fields[i]
    type = juliatype(field)
    return ChainedVector(ArrowArray{type}[ArrowArray{type}(field, length(x.batches) > 0 ? x.batches[j].columns[i] : FieldData(), x.dictionaries) for j = 1:length(x.batches)])
end

function Tables.getcolumn(x::DataFile, nm::Symbol)
    i = findfirst(x -> x.name == String(nm), x.schema.fields)
    return Tables.getcolumn(x, i)
end

struct ArrowArray{T} <: AbstractVector{T}
    field::Field
    fielddata::FieldData
    dictionaries::Vector{DictionaryBatch}
end
ArrowArray(f::Field, fd::FieldData, d) = ArrowArray{juliatype(f)}(f, fd, d)
Base.size(x::ArrowArray) = (x.fielddata.count,)

function Base.getindex(x::ArrowArray{T}, i::Base.Int) where {T}
    @boundscheck checkbounds(x, i)
    S = Base.nonmissingtype(T)
    if x.field.dictionary !== nothing
        fielddata = x.dictionaries[findfirst(y -> y.id == x.field.dictionary.id, x.dictionaries)].data.columns[1]
        field = copy(x.field)
        field.dictionary = nothing
        idx = x.fielddata.DATA[i] + 1
        return ArrowArray(field, fielddata, x.dictionaries)[idx]
    end
    if T === Missing
        return missing
    elseif S <: UnionT
        U = eltype(S)
        tids = Arrow.typeids(S) === nothing ? (0:fieldcount(U)) : Arrow.typeids(S)
        typeid = tids[x.fielddata.TYPE_ID[i]]
        if Arrow.unionmode(S) == Arrow.Meta.UnionModes.DENSE
            off = x.fielddata.OFFSET[i]
            return ArrowArray(x.field.children[typeid+1], x.fielddata.children[typeid+1], x.dictionaries)[off]
        else
            return ArrowArray(x.field.children[typeid+1], x.fielddata.children[typeid+1], x.dictionaries)[i]
        end
    end
    x.fielddata.VALIDITY[i] == 0 && return missing
    if S <: Vector{UInt8}
        return copy(codeunits(x.fielddata.DATA[i]))
    elseif S <: String
        return x.fielddata.DATA[i]
    elseif S <: Vector
        offs = x.fielddata.OFFSET
        A = ArrowArray{eltype(S)}(x.field.children[1], x.fielddata.children[1], x.dictionaries)
        return A[(offs[i] + 1):offs[i + 1]]
    elseif S <: Dict
        offs = x.fielddata.OFFSET
        A = ArrowArray(x.field.children[1], x.fielddata.children[1], x.dictionaries)
        return Dict(y.key => y.value for y in A[(offs[i] + 1):offs[i + 1]])
    elseif S <: Tuple
        if Arrow.ArrowTypes.gettype(Arrow.ArrowTypes.ArrowKind(S)) == UInt8
            A = x.fielddata.DATA
            return Tuple(map(UInt8, collect(A[i][1:x.field.type.byteWidth])))
        else
            sz = x.field.type.listSize
            A = ArrowArray{Arrow.ArrowTypes.gettype(Arrow.ArrowTypes.ArrowKind(S))}(x.field.children[1], x.fielddata.children[1], x.dictionaries)
            off = (i - 1) * sz + 1
            return Tuple(A[off:(off + sz - 1)])
        end
    elseif S <: NamedTuple
        data = (ArrowArray(x.field.children[j], x.fielddata.children[j], x.dictionaries)[i] for j = 1:length(x.field.children))
        return NamedTuple{fieldnames(S)}(Tuple(data))
    elseif S == Int64 || S == UInt64
        return parse(S, x.fielddata.DATA[i])
    elseif S <: Arrow.Decimal
        str = x.fielddata.DATA[i]
        return S(parse(Int128, str))
    elseif S <: Arrow.Date || S <: Arrow.Time
        val = x.fielddata.DATA[i]
        return Arrow.storagetype(S) == Int32 ? S(val) : S(parse(Int64, val))
    elseif S <: Arrow.Timestamp
        return S(parse(Int64, x.fielddata.DATA[i]))
    else
        return S(x.fielddata.DATA[i])
    end
end

# take any Tables.jl source and write out arrow json datafile
function DataFile(source)
    fields = Field[]
    metadata = nothing # TODO?
    batches = RecordBatch[]
    dictionaries = DictionaryBatch[]
    dictencodings = Dict{String, Tuple{Base.Type, DictEncoding}}()
    dictid = Ref(0)
    for (i, tbl1) in Tables.partitions(source)
        tbl = Arrow.toarrowtable(Table.Columns(tbl1))
        if i == 1
            sch = Tables.schema(tbl)
            for (nm, T, col) in zip(sch.names, sch.types, tbl)
                if col isa Arrow.DictEncode
                    id = dictid[]
                    dictid[] += 1
                    codes = DataAPI.refarray(col.data)
                    if codes !== col.data
                        IT = Type(eltype(codes))
                    else
                        IT = Type(Arrow.encodingtype(length(unique(col))))
                    end
                    dictencodings[String(nm)] = (T, DictEncoding(id, IT, false))
                end
                push!(fields, Field(String(nm), T, dictencodings))
            end
        end
        # build record batch
        len = Tables.rowcount(tbl)
        columns = FieldData[]
        for (nm, T, col) in zip(sch.names, sch.types, tbl)
            push!(columns, FieldData(String(nm), T, col, dictencodings))
        end
        push!(batches, RecordBatch(len, columns))
        # build dictionaries
        for (nm, (T, dictencoding)) in dictencodings
            column = FieldData(nm, T, Tables.getcolumn(tbl, nm), nothing)
            recordbatch = RecordBatch(len, [column])
            push!(dictionaries, DictionaryBatch(dictencoding.id, recordbatch))
        end
    end
    schema = Schema(fields, metadata)
    return DataFile(schema, batches, dictionaries)
end

function Base.isequal(df::DataFile, tbl::Arrow.Table)
    Tables.schema(df) == Tables.schema(tbl) || return false
    i = 1
    for (col1, col2) in zip(Tables.Columns(df), Tables.Columns(tbl))
        if !isequal(col1, col2)
            @show i
            return false
        end
        i += 1
    end
    return true
end

end
