blob: b2d982ded7ff070aabecab6f6b08396e2dfed9ba [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
struct ArrowBlob
bytes::Vector{UInt8}
pos::Int
len::Int
end
ArrowBlob(bytes::Vector{UInt8}, pos::Int, len::Nothing) = ArrowBlob(bytes, pos, length(bytes))
tobytes(bytes::Vector{UInt8}) = bytes
tobytes(io::IO) = Base.read(io)
tobytes(io::IOStream) = Mmap.mmap(io)
tobytes(file_path) = open(tobytes, file_path, "r")
struct BatchIterator
bytes::Vector{UInt8}
startpos::Int
function BatchIterator(blob::ArrowBlob)
bytes, pos, len = blob.bytes, blob.pos, blob.len
if len > 24 && _startswith(bytes, pos, FILE_FORMAT_MAGIC_BYTES)
pos += 8 # skip past magic bytes + padding
end
new(bytes, pos)
end
end
"""
Arrow.Stream(io::IO; convert::Bool=true)
Arrow.Stream(file::String; convert::Bool=true)
Arrow.Stream(bytes::Vector{UInt8}, pos=1, len=nothing; convert::Bool=true)
Arrow.Stream(inputs::Vector; convert::Bool=true)
Start reading an arrow formatted table, from:
* `io`, bytes will be read all at once via `read(io)`
* `file`, bytes will be read via `Mmap.mmap(file)`
* `bytes`, a byte vector directly, optionally allowing specifying the starting byte position `pos` and `len`
* A `Vector` of any of the above, in which each input should be an IPC or arrow file and must match schema
Reads the initial schema message from the arrow stream/file, then returns an `Arrow.Stream` object
which will iterate over record batch messages, producing an [`Arrow.Table`](@ref) on each iteration.
By iterating [`Arrow.Table`](@ref), `Arrow.Stream` satisfies the `Tables.partitions` interface, and as such can
be passed to Tables.jl-compatible sink functions.
This allows iterating over extremely large "arrow tables" in chunks represented as record batches.
Supports the `convert` keyword argument which controls whether certain arrow primitive types will be
lazily converted to more friendly Julia defaults; by default, `convert=true`.
"""
mutable struct Stream
inputs::Vector{ArrowBlob}
inputindex::Int
batchiterator::Union{Nothing,BatchIterator}
names::Vector{Symbol}
types::Vector{Type}
schema::Union{Nothing,Meta.Schema}
dictencodings::Dict{Int64, DictEncoding} # dictionary id => DictEncoding
dictencoded::Dict{Int64, Meta.Field} # dictionary id => field
convert::Bool
compression::Ref{Union{Symbol,Nothing}}
end
function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
inputindex = 1
batchiterator = nothing
names = Symbol[]
types = Type[]
schema = nothing
dictencodings = Dict{Int64, DictEncoding}()
dictencoded = Dict{Int64, Meta.Field}()
compression = Ref{Union{Symbol,Nothing}}(nothing)
Stream(inputs, inputindex, batchiterator, names, types, schema, dictencodings, dictencoded, convert, compression)
end
Stream(input, pos::Integer=1, len=nothing; kw...) = Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
Stream(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...) = Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
Stream(inputs::Vector; kw...) = Stream([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...)
function initialize!(x::Stream)
isempty(getfield(x, :names)) || return
# Initialize member fields using iteration and reset state
lastinputindex = x.inputindex
lastbatchiterator = x.batchiterator
iterate(x)
x.inputindex = lastinputindex
x.batchiterator = lastbatchiterator
nothing
end
Tables.partitions(x::Stream) = x
function Tables.columnnames(x::Stream)
initialize!(x)
getfield(x, :names)
end
function Tables.schema(x::Stream)
initialize!(x)
Tables.Schema(Tables.columnnames(x), getfield(x, :types))
end
Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown()
Base.eltype(::Type{Stream}) = Table
function Base.iterate(x::Stream, (pos, id)=(1, 0))
if isnothing(x.batchiterator)
blob = x.inputs[x.inputindex]
x.batchiterator = BatchIterator(blob)
pos = x.batchiterator.startpos
end
columns = AbstractVector[]
compression = nothing
while true
state = iterate(x.batchiterator, (pos, id))
# check for additional inputs
while state === nothing
x.inputindex += 1
x.inputindex > length(x.inputs) && return nothing
blob = x.inputs[x.inputindex]
x.batchiterator = BatchIterator(blob)
pos = x.batchiterator.startpos
state = iterate(x.batchiterator, (pos, id))
end
batch, (pos, id) = state
header = batch.msg.header
if isnothing(x.schema) && !isa(header, Meta.Schema)
throw(ArgumentError("first arrow ipc message MUST be a schema message"))
end
if header isa Meta.Schema
if isnothing(x.schema)
x.schema = header
# assert endianness?
# store custom_metadata?
for (i, field) in enumerate(x.schema.fields)
push!(x.names, Symbol(field.name))
push!(x.types, juliaeltype(field, buildmetadata(field.custom_metadata), x.convert))
# recursively find any dictionaries for any fields
getdictionaries!(x.dictencoded, field)
@debugv 1 "parsed column from schema: field = $field"
end
elseif header != x.schema
throw(ArgumentError("mismatched schemas between different arrow batches: $(x.schema) != $header"))
end
elseif header isa Meta.DictionaryBatch
id = header.id
recordbatch = header.data
@debugv 1 "parsing dictionary batch message: id = $id, compression = $(recordbatch.compression)"
if recordbatch.compression !== nothing
compression = recordbatch.compression
end
if haskey(x.dictencodings, id) && header.isDelta
# delta
field = x.dictencoded[id]
values, _, _ = build(field, field.type, batch, recordbatch, x.dictencodings, Int64(1), Int64(1), x.convert)
dictencoding = x.dictencodings[id]
append!(dictencoding.data, values)
continue
end
# new dictencoding or replace
field = x.dictencoded[id]
values, _, _ = build(field, field.type, batch, recordbatch, x.dictencodings, Int64(1), Int64(1), x.convert)
A = ChainedVector([values])
S = field.dictionary.indexType === nothing ? Int32 : juliaeltype(field, field.dictionary.indexType, false)
x.dictencodings[id] = DictEncoding{eltype(A), S, typeof(A)}(id, A, field.dictionary.isOrdered, values.metadata)
@debugv 1 "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
@debugv 1 "parsing record batch message: compression = $(header.compression)"
if header.compression !== nothing
compression = header.compression
end
for vec in VectorIterator(x.schema, batch, x.dictencodings, x.convert)
push!(columns, vec)
end
break
else
throw(ArgumentError("unsupported arrow message type: $(typeof(header))"))
end
end
if compression !== nothing
if compression.codec == Flatbuf.CompressionType.ZSTD
x.compression[] = :zstd
elseif compression.codec == Flatbuf.CompressionType.LZ4_FRAME
x.compression[] = :lz4
else
throw(ArgumentError("unsupported compression codec: $(compression.codec)"))
end
end
lookup = Dict{Symbol, AbstractVector}()
types = Type[]
for (nm, col) in zip(x.names, columns)
lookup[nm] = col
push!(types, eltype(col))
end
return Table(x.names, types, columns, lookup, Ref(x.schema)), (pos, id)
end
"""
Arrow.Table(io::IO; convert::Bool=true)
Arrow.Table(file::String; convert::Bool=true)
Arrow.Table(bytes::Vector{UInt8}, pos=1, len=nothing; convert::Bool=true)
Arrow.Table(inputs::Vector; convert::Bool=true)
Read an arrow formatted table, from:
* `io`, bytes will be read all at once via `read(io)`
* `file`, bytes will be read via `Mmap.mmap(file)`
* `bytes`, a byte vector directly, optionally allowing specifying the starting byte position `pos` and `len`
* A `Vector` of any of the above, in which each input should be an IPC or arrow file and must match schema
Returns a `Arrow.Table` object that allows column access via `table.col1`, `table[:col1]`, or `table[1]`.
NOTE: the columns in an `Arrow.Table` are views into the original arrow memory, and hence are not easily
modifiable (with e.g. `push!`, `append!`, etc.). To mutate arrow columns, call `copy(x)` to materialize
the arrow data as a normal Julia array.
`Arrow.Table` also satisfies the [Tables.jl](https://github.com/JuliaData/Tables.jl) interface, and so can easily be materialied via any supporting
sink function: e.g. `DataFrame(Arrow.Table(file))`, `SQLite.load!(db, "table", Arrow.Table(file))`, etc.
Supports the `convert` keyword argument which controls whether certain arrow primitive types will be
lazily converted to more friendly Julia defaults; by default, `convert=true`.
"""
struct Table <: Tables.AbstractColumns
names::Vector{Symbol}
types::Vector{Type}
columns::Vector{AbstractVector}
lookup::Dict{Symbol, AbstractVector}
schema::Ref{Meta.Schema}
metadata::Ref{Union{Nothing,Base.ImmutableDict{String,String}}}
end
Table() = Table(Symbol[], Type[], AbstractVector[], Dict{Symbol, AbstractVector}(), Ref{Meta.Schema}(), Ref{Union{Nothing,Base.ImmutableDict{String,String}}}(nothing))
function Table(names, types, columns, lookup, schema)
m = isassigned(schema) ? buildmetadata(schema[]) : nothing
return Table(names, types, columns, lookup, schema, Ref{Union{Nothing,Base.ImmutableDict{String,String}}}(m))
end
names(t::Table) = getfield(t, :names)
types(t::Table) = getfield(t, :types)
columns(t::Table) = getfield(t, :columns)
lookup(t::Table) = getfield(t, :lookup)
schema(t::Table) = getfield(t, :schema)
metadata(t::Table) = getfield(t, :metadata)
"""
Arrow.getmetadata(x)
If `x isa Arrow.Table` return a `Base.ImmutableDict{String,String}` representation of `x`'s
`Schema` `custom_metadata`, or `nothing` if no such metadata exists.
If `x isa Arrow.ArrowVector`, return a `Base.ImmutableDict{String,String}` representation of `x`'s
`Field` `custom_metadata`, or `nothing` if no such metadata exists.
Otherwise, return `nothing`.
See [the official Arrow documentation for more details on custom application metadata](https://arrow.apache.org/docs/format/Columnar.html#custom-application-metadata).
"""
getmetadata(t::Table) = getfield(t, :metadata)[]
getmetadata(::Any) = nothing
Tables.istable(::Table) = true
Tables.columnaccess(::Table) = true
Tables.columns(t::Table) = Tables.CopiedColumns(t)
Tables.schema(t::Table) = Tables.Schema(names(t), types(t))
Tables.columnnames(t::Table) = names(t)
Tables.getcolumn(t::Table, i::Int) = columns(t)[i]
Tables.getcolumn(t::Table, nm::Symbol) = lookup(t)[nm]
struct TablePartitions
table::Table
npartitions::Int
end
function TablePartitions(table::Table)
cols = columns(table)
npartitions = if length(cols) == 0
0
elseif cols[1] isa ChainedVector
length(cols[1].arrays)
else
1
end
return TablePartitions(table, npartitions)
end
function Base.iterate(tp::TablePartitions, i=1)
i > tp.npartitions && return nothing
tp.npartitions == 1 && return tp.table, i + 1
cols = columns(tp.table)
newcols = AbstractVector[cols[j].arrays[i] for j in 1:length(cols)]
nms = names(tp.table)
tbl = Table(
nms,
types(tp.table),
newcols,
Dict{Symbol, AbstractVector}(nms[i] => newcols[i] for i in 1:length(nms)),
schema(tp.table)
)
return tbl, i + 1
end
Tables.partitions(t::Table) = TablePartitions(t)
# high-level user API functions
Table(input, pos::Integer=1, len=nothing; kw...) = Table([ArrowBlob(tobytes(input), pos, len)]; kw...)
Table(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...) = Table([ArrowBlob(tobytes(input), pos, len)]; kw...)
Table(inputs::Vector; kw...) = Table([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...)
# will detect whether we're reading a Table from a file or stream
function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
t = Table()
sch = nothing
dictencodings = Dict{Int64, DictEncoding}() # dictionary id => DictEncoding
dictencoded = Dict{Int64, Meta.Field}() # dictionary id => field
sync = OrderedSynchronizer()
tsks = Channel{Any}(Inf)
tsk = Threads.@spawn begin
i = 1
for cols in tsks
if i == 1
foreach(x -> push!(columns(t), x), cols)
elseif i == 2
foreach(1:length(cols)) do i
columns(t)[i] = ChainedVector([columns(t)[i], cols[i]])
end
else
foreach(1:length(cols)) do i
append!(columns(t)[i], cols[i])
end
end
i += 1
end
end
anyrecordbatches = false
rbi = 1
@sync for blob in blobs
for batch in BatchIterator(blob)
# store custom_metadata of batch.msg?
header = batch.msg.header
if header isa Meta.Schema
@debugv 1 "parsing schema message"
# assert endianness?
# store custom_metadata?
if sch === nothing
for (i, field) in enumerate(header.fields)
push!(names(t), Symbol(field.name))
# recursively find any dictionaries for any fields
getdictionaries!(dictencoded, field)
@debugv 1 "parsed column from schema: field = $field"
end
sch = header
schema(t)[] = sch
elseif sch != header
throw(ArgumentError("mismatched schemas between different arrow batches: $sch != $header"))
end
elseif header isa Meta.DictionaryBatch
id = header.id
recordbatch = header.data
@debugv 1 "parsing dictionary batch message: id = $id, compression = $(recordbatch.compression)"
if haskey(dictencodings, id) && header.isDelta
# delta
field = dictencoded[id]
values, _, _ = build(field, field.type, batch, recordbatch, dictencodings, Int64(1), Int64(1), convert)
dictencoding = dictencodings[id]
if typeof(dictencoding.data) <: ChainedVector
append!(dictencoding.data, values)
else
A = ChainedVector([dictencoding.data, values])
S = field.dictionary.indexType === nothing ? Int32 : juliaeltype(field, field.dictionary.indexType, false)
dictencodings[id] = DictEncoding{eltype(A), S, typeof(A)}(id, A, field.dictionary.isOrdered, values.metadata)
end
continue
end
# new dictencoding or replace
field = dictencoded[id]
values, _, _ = build(field, field.type, batch, recordbatch, dictencodings, Int64(1), Int64(1), convert)
A = values
S = field.dictionary.indexType === nothing ? Int32 : juliaeltype(field, field.dictionary.indexType, false)
dictencodings[id] = DictEncoding{eltype(A), S, typeof(A)}(id, A, field.dictionary.isOrdered, values.metadata)
@debugv 1 "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
anyrecordbatches = true
@debugv 1 "parsing record batch message: compression = $(header.compression)"
Threads.@spawn begin
cols = collect(VectorIterator(sch, $batch, dictencodings, convert))
put!(() -> put!(tsks, cols), sync, $(rbi))
end
rbi += 1
else
throw(ArgumentError("unsupported arrow message type: $(typeof(header))"))
end
end
end
close(tsks)
wait(tsk)
lu = lookup(t)
ty = types(t)
# 158; some implementations may send 0 record batches
if !anyrecordbatches && !isnothing(sch)
for field in sch.fields
T = juliaeltype(field, buildmetadata(field), convert)
push!(columns(t), T[])
end
end
for (nm, col) in zip(names(t), columns(t))
lu[nm] = col
push!(ty, eltype(col))
end
getfield(t, :metadata)[] = buildmetadata(sch)
return t
end
function getdictionaries!(dictencoded, field)
d = field.dictionary
if d !== nothing
dictencoded[d.id] = field
end
if field.children !== nothing
for child in field.children
getdictionaries!(dictencoded, child)
end
end
return
end
struct Batch
msg::Meta.Message
bytes::Vector{UInt8}
pos::Int
id::Int
end
function Base.iterate(x::BatchIterator, (pos, id)=(x.startpos, 0))
@debugv 1 "checking for next arrow message: pos = $pos"
if pos + 3 > length(x.bytes)
@debugv 1 "not enough bytes left for another batch message"
return nothing
end
if readbuffer(x.bytes, pos, UInt32) != CONTINUATION_INDICATOR_BYTES
@debugv 1 "didn't find continuation byte to keep parsing messages: $(readbuffer(x.bytes, pos, UInt32))"
return nothing
end
pos += 4
if pos + 3 > length(x.bytes)
@debugv 1 "not enough bytes left to read length of another batch message"
return nothing
end
msglen = readbuffer(x.bytes, pos, Int32)
if msglen == 0
@debugv 1 "message has 0 length; terminating message parsing"
return nothing
end
pos += 4
if pos + msglen - 1 > length(x.bytes)
@debugv 1 "not enough bytes left to read Meta.Message"
return nothing
end
msg = FlatBuffers.getrootas(Meta.Message, x.bytes, pos-1)
pos += msglen
# pos now points to message body
@debugv 1 "parsing message: pos = $pos, msglen = $msglen, bodyLength = $(msg.bodyLength)"
if pos + msg.bodyLength - 1 > length(x.bytes)
@debugv 1 "not enough bytes left to read message body"
return nothing
end
return Batch(msg, x.bytes, pos, id), (pos + msg.bodyLength, id + 1)
end
struct VectorIterator
schema::Meta.Schema
batch::Batch # batch.msg.header MUST BE RecordBatch
dictencodings::Dict{Int64, DictEncoding}
convert::Bool
end
buildmetadata(f::Union{Meta.Field,Meta.Schema}) = buildmetadata(f.custom_metadata)
buildmetadata(meta) = toidict(String(kv.key) => String(kv.value) for kv in meta)
buildmetadata(::Nothing) = nothing
buildmetadata(x::AbstractDict) = x
function Base.iterate(x::VectorIterator, (columnidx, nodeidx, bufferidx)=(Int64(1), Int64(1), Int64(1)))
columnidx > length(x.schema.fields) && return nothing
field = x.schema.fields[columnidx]
@debugv 2 "building top-level column: field = $(field), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx"
A, nodeidx, bufferidx = build(field, x.batch, x.batch.msg.header, x.dictencodings, nodeidx, bufferidx, x.convert)
@debugv 2 "built top-level column: A = $(typeof(A)), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx"
@debugv 3 A
return A, (columnidx + 1, nodeidx, bufferidx)
end
Base.length(x::VectorIterator) = length(x.schema.fields)
const ListTypes = Union{Meta.Utf8, Meta.LargeUtf8, Meta.Binary, Meta.LargeBinary, Meta.List, Meta.LargeList}
const LargeLists = Union{Meta.LargeUtf8, Meta.LargeBinary, Meta.LargeList}
function build(field::Meta.Field, batch, rb, de, nodeidx, bufferidx, convert)
d = field.dictionary
if d !== nothing
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
buffer = rb.buffers[bufferidx]
S = d.indexType === nothing ? Int32 : juliaeltype(field, d.indexType, false)
bytes, indices = reinterp(S, batch, buffer, rb.compression)
encoding = de[d.id]
A = DictEncoded(bytes, validity, indices, encoding, buildmetadata(field.custom_metadata))
nodeidx += 1
bufferidx += 1
else
A, nodeidx, bufferidx = build(field, field.type, batch, rb, de, nodeidx, bufferidx, convert)
end
return A, nodeidx, bufferidx
end
function buildbitmap(batch, rb, nodeidx, bufferidx)
buffer = rb.buffers[bufferidx]
voff = batch.pos + buffer.offset
node = rb.nodes[nodeidx]
if rb.compression === nothing
return ValidityBitmap(batch.bytes, voff, node.length, node.null_count)
else
# compressed
ptr = pointer(batch.bytes, voff)
_, decodedbytes = uncompress(ptr, buffer, rb.compression)
return ValidityBitmap(decodedbytes, 1, node.length, node.null_count)
end
end
function uncompress(ptr::Ptr{UInt8}, buffer, compression)
if buffer.length == 0
return 0, UInt8[]
end
len = unsafe_load(convert(Ptr{Int64}, ptr))
ptr += 8 # skip past uncompressed length as Int64
encodedbytes = unsafe_wrap(Array, ptr, buffer.length - 8)
if len == -1
# len = -1 means data is not compressed
# it's unclear why other language implementations allow this
# but we support to be able to read data produced as such
return length(encodedbytes), copy(encodedbytes)
end
decodedbytes = Vector{UInt8}(undef, len)
if compression.codec === Meta.CompressionType.LZ4_FRAME
transcode(LZ4FrameDecompressor, encodedbytes, decodedbytes)
elseif compression.codec === Meta.CompressionType.ZSTD
transcode(ZstdDecompressor, encodedbytes, decodedbytes)
else
error("unsupported compression type when reading arrow buffers: $(typeof(compression.codec))")
end
return len, decodedbytes
end
function reinterp(::Type{T}, batch, buf, compression) where {T}
ptr = pointer(batch.bytes, batch.pos + buf.offset)
bytes = batch.bytes
len = buf.length
if compression !== nothing
len, bytes = uncompress(ptr, buf, compression)
ptr = pointer(bytes)
end
# it would be technically more correct to check that T.layout->alignment > 8
# but the datatype alignment isn't officially exported, so we're using
# primitive types w/ sizeof(T) >= 16 as a proxy for types that need 16-byte alignment
if sizeof(T) >= 16 && (UInt(ptr) & 15) != 0
# https://github.com/apache/arrow-julia/issues/345
# https://github.com/JuliaLang/julia/issues/42326
# need to ensure that the data/pointers are aligned to 16 bytes
# so we can't use unsafe_wrap here, but do an extra allocation
# to avoid the allocation, user needs to ensure input buffer is
# 16-byte aligned (somehow, it's not super straightforward how to ensure that)
A = Vector{T}(undef, div(len, sizeof(T)))
unsafe_copyto!(Ptr{UInt8}(pointer(A)), ptr, len)
return bytes, A
else
return bytes, unsafe_wrap(Array, convert(Ptr{T}, ptr), div(len, sizeof(T)))
end
end
function build(f::Meta.Field, L::ListTypes, batch, rb, de, nodeidx, bufferidx, convert)
@debugv 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
buffer = rb.buffers[bufferidx]
ooff = batch.pos + buffer.offset
OT = L isa LargeLists ? Int64 : Int32
bytes, offs = reinterp(OT, batch, buffer, rb.compression)
offsets = Offsets(bytes, offs)
bufferidx += 1
len = rb.nodes[nodeidx].length
nodeidx += 1
if L isa Meta.Utf8 || L isa Meta.LargeUtf8 || L isa Meta.Binary || L isa Meta.LargeBinary
buffer = rb.buffers[bufferidx]
bytes, A = reinterp(UInt8, batch, buffer, rb.compression)
bufferidx += 1
else
bytes = UInt8[]
A, nodeidx, bufferidx = build(f.children[1], batch, rb, de, nodeidx, bufferidx, convert)
end
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
return List{T, OT, typeof(A)}(bytes, validity, offsets, A, len, meta), nodeidx, bufferidx
end
function build(f::Meta.Field, L::Union{Meta.FixedSizeBinary, Meta.FixedSizeList}, batch, rb, de, nodeidx, bufferidx, convert)
@debugv 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
len = rb.nodes[nodeidx].length
nodeidx += 1
if L isa Meta.FixedSizeBinary
buffer = rb.buffers[bufferidx]
bytes, A = reinterp(UInt8, batch, buffer, rb.compression)
bufferidx += 1
else
bytes = UInt8[]
A, nodeidx, bufferidx = build(f.children[1], batch, rb, de, nodeidx, bufferidx, convert)
end
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
return FixedSizeList{T, typeof(A)}(bytes, validity, A, len, meta), nodeidx, bufferidx
end
function build(f::Meta.Field, L::Meta.Map, batch, rb, de, nodeidx, bufferidx, convert)
@debugv 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
buffer = rb.buffers[bufferidx]
ooff = batch.pos + buffer.offset
OT = Int32
bytes, offs = reinterp(OT, batch, buffer, rb.compression)
offsets = Offsets(bytes, offs)
bufferidx += 1
len = rb.nodes[nodeidx].length
nodeidx += 1
A, nodeidx, bufferidx = build(f.children[1], batch, rb, de, nodeidx, bufferidx, convert)
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
return Map{T, OT, typeof(A)}(validity, offsets, A, len, meta), nodeidx, bufferidx
end
function build(f::Meta.Field, L::Meta.Struct, batch, rb, de, nodeidx, bufferidx, convert)
@debugv 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
len = rb.nodes[nodeidx].length
vecs = []
nodeidx += 1
for child in f.children
A, nodeidx, bufferidx = build(child, batch, rb, de, nodeidx, bufferidx, convert)
push!(vecs, A)
end
data = Tuple(vecs)
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
return Struct{T, typeof(data)}(validity, data, len, meta), nodeidx, bufferidx
end
function build(f::Meta.Field, L::Meta.Union, batch, rb, de, nodeidx, bufferidx, convert)
@debugv 2 "building array: L = $L"
buffer = rb.buffers[bufferidx]
bytes, typeIds = reinterp(UInt8, batch, buffer, rb.compression)
bufferidx += 1
if L.mode == Meta.UnionMode.Dense
buffer = rb.buffers[bufferidx]
bytes2, offsets = reinterp(Int32, batch, buffer, rb.compression)
bufferidx += 1
end
vecs = []
nodeidx += 1
for child in f.children
A, nodeidx, bufferidx = build(child, batch, rb, de, nodeidx, bufferidx, convert)
push!(vecs, A)
end
data = Tuple(vecs)
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
UT = UnionT(f, convert)
if L.mode == Meta.UnionMode.Dense
B = DenseUnion{T, UT, typeof(data)}(bytes, bytes2, typeIds, offsets, data, meta)
else
B = SparseUnion{T, UT, typeof(data)}(bytes, typeIds, data, meta)
end
return B, nodeidx, bufferidx
end
function build(f::Meta.Field, L::Meta.Null, batch, rb, de, nodeidx, bufferidx, convert)
@debugv 2 "building array: L = $L"
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
return NullVector{maybemissing(T)}(MissingVector(rb.nodes[nodeidx].length), meta), nodeidx + 1, bufferidx
end
# primitives
function build(f::Meta.Field, ::L, batch, rb, de, nodeidx, bufferidx, convert) where {L}
@debugv 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
buffer = rb.buffers[bufferidx]
meta = buildmetadata(f.custom_metadata)
# get storage type (non-converted)
T = juliaeltype(f, nothing, false)
@debugv 2 "storage type for primitive: T = $T"
bytes, A = reinterp(Base.nonmissingtype(T), batch, buffer, rb.compression)
len = rb.nodes[nodeidx].length
T = juliaeltype(f, meta, convert)
@debugv 2 "final julia type for primitive: T = $T"
return Primitive(T, bytes, validity, A, len, meta), nodeidx + 1, bufferidx + 1
end
function build(f::Meta.Field, L::Meta.Bool, batch, rb, de, nodeidx, bufferidx, convert)
@debugv 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
buffer = rb.buffers[bufferidx]
meta = buildmetadata(f.custom_metadata)
# get storage type (non-converted)
T = juliaeltype(f, nothing, false)
@debugv 2 "storage type for primitive: T = $T"
buffer = rb.buffers[bufferidx]
voff = batch.pos + buffer.offset
node = rb.nodes[nodeidx]
if rb.compression === nothing
decodedbytes = batch.bytes
pos = voff
# return ValidityBitmap(batch.bytes, voff, node.length, node.null_count)
else
# compressed
ptr = pointer(batch.bytes, voff)
_, decodedbytes = uncompress(ptr, buffer, rb.compression)
pos = 1
# return ValidityBitmap(decodedbytes, 1, node.length, node.null_count)
end
len = rb.nodes[nodeidx].length
T = juliaeltype(f, meta, convert)
return BoolVector{T}(decodedbytes, pos, validity, len, meta), nodeidx + 1, bufferidx + 1
end