blob: f97c09a11bf4f6eba8a6bfb967f6ec3b3ed2f690 [file] [log] [blame]
struct BatchIterator
bytes::Vector{UInt8}
startpos::Int
end
struct Stream
batchiterator::BatchIterator
pos::Int
names::Vector{Symbol}
schema::Meta.Schema
dictencodings::Dict{Int64, DictEncoding} # dictionary id => DictEncoding
dictencoded::Dict{Int64, Meta.Field} # dictionary id => field
convert::Bool
end
Tables.partitions(x::Stream) = x
Stream(io::IO, pos::Integer=1, len=nothing; convert::Bool=true) = Stream(Base.read(io), pos, len; convert=convert)
Stream(str::String, pos::Integer=1, len=nothing; convert::Bool=true) = Stream(Mmap.mmap(str), pos, len; convert=convert)
# will detect whether we're reading a Stream from a file or stream
function Stream(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothing}=nothing; convert::Bool=true)
len = something(tlen, length(bytes))
if len > 24 &&
_startswith(bytes, off, FILE_FORMAT_MAGIC_BYTES) &&
_endswith(bytes, off + len - 1, FILE_FORMAT_MAGIC_BYTES)
off += 8 # skip past magic bytes + padding
end
dictencodings = Dict{Int64, DictEncoding}() # dictionary id => DictEncoding
dictencoded = Dict{Int64, Meta.Field}() # dictionary id => field
batchiterator = BatchIterator(bytes, off)
state = iterate(batchiterator)
state === nothing && throw(ArgumentError("no arrow ipc messages found in provided input"))
batch, (pos, id) = state
schema = batch.msg.header
schema isa Meta.Schema || throw(ArgumentError("first arrow ipc message MUST be a schema message"))
# assert endianness?
# store custom_metadata?
names = Symbol[]
for (i, field) in enumerate(schema.fields)
push!(names, Symbol(field.name))
# recursively find any dictionaries for any fields
getdictionaries!(dictencoded, field)
@debug 1 "parsed column from schema: field = $field"
end
return Stream(batchiterator, pos, names, schema, dictencodings, dictencoded, convert)
end
function Base.iterate(x::Stream, (pos, id)=(x.pos, 1))
columns = AbstractVector[]
while true
state = iterate(x.batchiterator, (pos, id))
state === nothing && return nothing
batch, (pos, id) = state
header = batch.msg.header
if header isa Meta.DictionaryBatch
id = header.id
recordbatch = header.data
@debug 1 "parsing dictionary batch message: id = $id, compression = $(recordbatch.compression)"
if haskey(x.dictencodings, id) && header.isDelta
# delta
field = x.dictencoded[id]
values, _, _ = build(field, field.type, batch, recordbatch, x.dictencodings, Int64(1), Int64(1), x.convert)
dictencoding = x.dictencodings[id]
append!(dictencoding.data, values)
continue
end
# new dictencoding or replace
field = x.dictencoded[id]
values, _, _ = build(field, field.type, batch, recordbatch, x.dictencodings, Int64(1), Int64(1), x.convert)
A = ChainedVector([values])
x.dictencodings[id] = DictEncoding{eltype(A), typeof(A)}(id, A, field.dictionary.isOrdered)
@debug 1 "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
@debug 1 "parsing record batch message: compression = $(header.compression)"
for vec in VectorIterator(x.schema, batch, x.dictencodings, x.convert)
push!(columns, vec)
end
break
else
throw(ArgumentError("unsupported arrow message type: $(typeof(header))"))
end
end
lookup = Dict{Symbol, AbstractVector}()
types = Type[]
for (nm, col) in zip(x.names, columns)
lookup[nm] = col
push!(types, eltype(col))
end
return Table(x.names, types, columns, lookup, Ref(x.schema)), (pos, id)
end
"""
Arrow.Table(io::IO)
Arrow.Table(file::String)
Arrow.Table(bytes::Vector{UInt8}, pos=1, len=nothing)
Read an arrow formatted table, from:
* `io`, bytes come from `read(io)`
* `file`, bytes come from `Mmap.mmap(file)`
# `bytes`, a byte vector directly, optionally allowing specifying the starting byte position `pos` and `len`
Returns a `Arrow.Table` object that allows column access via `table.col1`, `table[:col1]`, or `table[1]`.
`Arrow.Table` also satisfies the Tables.jl interface, and so can easily be materialied via any supporting
sink function: e.g. `DataFrame(Arrow.Table(file))`, `SQLite.load!(db, "table", Arrow.Table(file))`, etc.
"""
struct Table <: Tables.AbstractColumns
names::Vector{Symbol}
types::Vector{Type}
columns::Vector{AbstractVector}
lookup::Dict{Symbol, AbstractVector}
schema::Ref{Meta.Schema}
end
Table() = Table(Symbol[], Type[], AbstractVector[], Dict{Symbol, AbstractVector}(), Ref{Meta.Schema}())
names(t::Table) = getfield(t, :names)
types(t::Table) = getfield(t, :types)
columns(t::Table) = getfield(t, :columns)
lookup(t::Table) = getfield(t, :lookup)
schema(t::Table) = getfield(t, :schema)
Tables.istable(::Table) = true
Tables.columnaccess(::Table) = true
Tables.columns(t::Table) = Tables.CopiedColumns(t)
Tables.schema(t::Table) = Tables.Schema(names(t), types(t))
Tables.columnnames(t::Table) = names(t)
Tables.getcolumn(t::Table, i::Int) = columns(t)[i]
Tables.getcolumn(t::Table, nm::Symbol) = lookup(t)[nm]
# high-level user API functions
Table(io::IO, pos::Integer=1, len=nothing; convert::Bool=true) = Table(Base.read(io), pos, len; convert=convert)
Table(str::String, pos::Integer=1, len=nothing; convert::Bool=true) = Table(Mmap.mmap(str), pos, len; convert=convert)
# will detect whether we're reading a Table from a file or stream
function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothing}=nothing; convert::Bool=true)
len = something(tlen, length(bytes))
if len > 24 &&
_startswith(bytes, off, FILE_FORMAT_MAGIC_BYTES) &&
_endswith(bytes, off + len - 1, FILE_FORMAT_MAGIC_BYTES)
off += 8 # skip past magic bytes + padding
end
t = Table()
sch = nothing
dictencodings = Dict{Int64, DictEncoding}() # dictionary id => DictEncoding
dictencoded = Dict{Int64, Meta.Field}() # dictionary id => field
for batch in BatchIterator(bytes, off)
# store custom_metadata of batch.msg?
header = batch.msg.header
if header isa Meta.Schema
@debug 1 "parsing schema message"
# assert endianness?
# store custom_metadata?
for (i, field) in enumerate(header.fields)
push!(names(t), Symbol(field.name))
# recursively find any dictionaries for any fields
getdictionaries!(dictencoded, field)
@debug 1 "parsed column from schema: field = $field"
end
sch = header
schema(t)[] = sch
elseif header isa Meta.DictionaryBatch
id = header.id
recordbatch = header.data
@debug 1 "parsing dictionary batch message: id = $id, compression = $(recordbatch.compression)"
if haskey(dictencodings, id) && header.isDelta
# delta
field = dictencoded[id]
values, _, _ = build(field, field.type, batch, recordbatch, dictencodings, Int64(1), Int64(1), convert)
dictencoding = dictencodings[id]
append!(dictencoding.data, values)
continue
end
# new dictencoding or replace
field = dictencoded[id]
values, _, _ = build(field, field.type, batch, recordbatch, dictencodings, Int64(1), Int64(1), convert)
A = ChainedVector([values])
dictencodings[id] = DictEncoding{eltype(A), typeof(A)}(id, A, field.dictionary.isOrdered)
@debug 1 "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
@debug 1 "parsing record batch message: compression = $(header.compression)"
if isempty(columns(t))
# first RecordBatch
for vec in VectorIterator(sch, batch, dictencodings, convert)
push!(columns(t), vec)
end
@debug 1 "parsed 1st record batch"
elseif !(columns(t)[1] isa ChainedVector)
# second RecordBatch
for (i, vec) in enumerate(VectorIterator(sch, batch, dictencodings, convert))
columns(t)[i] = ChainedVector([columns(t)[i], vec])
end
@debug 1 "parsed 2nd record batch"
else
# 2+ RecordBatch
for (i, vec) in enumerate(VectorIterator(sch, batch, dictencodings, convert))
append!(columns(t)[i], vec)
end
@debug 1 "parsed additional record batch"
end
else
throw(ArgumentError("unsupported arrow message type: $(typeof(header))"))
end
end
lu = lookup(t)
ty = types(t)
for (nm, col) in zip(names(t), columns(t))
lu[nm] = col
push!(ty, eltype(col))
end
meta = sch !== nothing ? sch.custom_metadata : nothing
if meta !== nothing
setmetadata!(t, Dict(String(kv.key) => String(kv.value) for kv in meta))
end
return t
end
function getdictionaries!(dictencoded, field)
d = field.dictionary
if d !== nothing
dictencoded[d.id] = field
end
for child in field.children
getdictionaries!(dictencoded, child)
end
return
end
struct Batch
msg::Meta.Message
bytes::Vector{UInt8}
pos::Int
id::Int
end
function Base.iterate(x::BatchIterator, (pos, id)=(x.startpos, 0))
if pos + 3 > length(x.bytes)
@debug 1 "not enough bytes left for another batch message"
return nothing
end
if readbuffer(x.bytes, pos, UInt32) != CONTINUATION_INDICATOR_BYTES
@debug 1 "didn't find continuation byte to keep parsing messages: $(readbuffer(x.bytes, pos, UInt32))"
return nothing
end
pos += 4
if pos + 3 > length(x.bytes)
@debug 1 "not enough bytes left to read length of another batch message"
return nothing
end
msglen = readbuffer(x.bytes, pos, Int32)
if msglen == 0
@debug 1 "message has 0 length; terminating message parsing"
return nothing
end
pos += 4
msg = FlatBuffers.getrootas(Meta.Message, x.bytes, pos-1)
pos += msglen
# pos now points to message body
@debug 1 "parsing message: msglen = $msglen, bodyLength = $(msg.bodyLength)"
return Batch(msg, x.bytes, pos, id), (pos + msg.bodyLength, id + 1)
end
struct VectorIterator
schema::Meta.Schema
batch::Batch # batch.msg.header MUST BE RecordBatch
dictencodings::Dict{Int64, DictEncoding}
convert::Bool
end
buildmetadata(f::Meta.Field) = buildmetadata(f.custom_metadata)
buildmetadata(meta) = Dict(String(kv.key) => String(kv.value) for kv in meta)
buildmetadata(::Nothing) = nothing
function Base.iterate(x::VectorIterator, (columnidx, nodeidx, bufferidx)=(Int64(1), Int64(1), Int64(1)))
columnidx > length(x.schema.fields) && return nothing
field = x.schema.fields[columnidx]
@debug 2 "building top-level column: field = $(field), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx"
A, nodeidx, bufferidx = build(field, x.batch, x.batch.msg.header, x.dictencodings, nodeidx, bufferidx, x.convert)
@debug 2 "built top-level column: A = $(typeof(A)), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx"
@debug 3 A
return A, (columnidx + 1, nodeidx, bufferidx)
end
const ListTypes = Union{Meta.Utf8, Meta.LargeUtf8, Meta.Binary, Meta.LargeBinary, Meta.List, Meta.LargeList}
const LargeLists = Union{Meta.LargeUtf8, Meta.LargeBinary, Meta.LargeList}
function build(field::Meta.Field, batch, rb, de, nodeidx, bufferidx, convert)
d = field.dictionary
if d !== nothing
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
buffer = rb.buffers[bufferidx]
S = d.indexType === nothing ? Int32 : juliaeltype(field, d.indexType, false)
bytes, indices = reinterp(S, batch, buffer, rb.compression)
encoding = de[d.id]
A = DictEncoded(bytes, validity, indices, encoding, buildmetadata(field.custom_metadata))
nodeidx += 1
bufferidx += 1
else
A, nodeidx, bufferidx = build(field, field.type, batch, rb, de, nodeidx, bufferidx, convert)
end
return A, nodeidx, bufferidx
end
function buildbitmap(batch, rb, nodeidx, bufferidx)
buffer = rb.buffers[bufferidx]
voff = batch.pos + buffer.offset
node = rb.nodes[nodeidx]
if rb.compression === nothing
return ValidityBitmap(batch.bytes, voff, node.length, node.null_count)
else
# compressed
ptr = pointer(batch.bytes, voff)
_, decodedbytes = uncompress(ptr, buffer, rb.compression)
return ValidityBitmap(decodedbytes, 1, node.length, node.null_count)
end
end
function uncompress(ptr::Ptr{UInt8}, buffer, compression)
if buffer.length == 0
return 0, UInt8[]
end
len = unsafe_load(convert(Ptr{Int64}, ptr))
ptr += 8 # skip past uncompressed length as Int64
encodedbytes = unsafe_wrap(Array, ptr, buffer.length - 8)
if compression.codec === Meta.CompressionType.LZ4_FRAME
decodedbytes = transcode(LZ4FrameDecompressor, encodedbytes)
elseif compression.codec === Meta.CompressionType.ZSTD
decodedbytes = transcode(ZstdDecompressor, encodedbytes)
else
error("unsupported compression type when reading arrow buffers: $(typeof(compression.codec))")
end
return len, decodedbytes
end
function reinterp(::Type{T}, batch, buf, compression) where {T}
ptr = pointer(batch.bytes, batch.pos + buf.offset)
if compression === nothing
return batch.bytes, unsafe_wrap(Array, convert(Ptr{T}, ptr), div(buf.length, sizeof(T)))
else
# compressed
len, decodedbytes = uncompress(ptr, buf, compression)
return decodedbytes, unsafe_wrap(Array, convert(Ptr{T}, pointer(decodedbytes)), div(len, sizeof(T)))
end
end
function build(f::Meta.Field, L::ListTypes, batch, rb, de, nodeidx, bufferidx, convert)
@debug 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
buffer = rb.buffers[bufferidx]
ooff = batch.pos + buffer.offset
OT = L isa LargeLists ? Int64 : Int32
bytes, offs = reinterp(OT, batch, buffer, rb.compression)
offsets = Offsets(bytes, offs)
bufferidx += 1
len = rb.nodes[nodeidx].length
nodeidx += 1
if L isa Meta.Utf8 || L isa Meta.LargeUtf8 || L isa Meta.Binary || L isa Meta.LargeBinary
buffer = rb.buffers[bufferidx]
bytes, A = reinterp(UInt8, batch, buffer, rb.compression)
bufferidx += 1
else
bytes = UInt8[]
A, nodeidx, bufferidx = build(f.children[1], batch, rb, de, nodeidx, bufferidx, convert)
end
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
return List{T, OT, typeof(A)}(bytes, validity, offsets, A, len, meta), nodeidx, bufferidx
end
function build(f::Meta.Field, L::Union{Meta.FixedSizeBinary, Meta.FixedSizeList}, batch, rb, de, nodeidx, bufferidx, convert)
@debug 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
len = rb.nodes[nodeidx].length
nodeidx += 1
if L isa Meta.FixedSizeBinary
buffer = rb.buffers[bufferidx]
bytes, A = reinterp(UInt8, batch, buffer, rb.compression)
bufferidx += 1
else
bytes = UInt8[]
A, nodeidx, bufferidx = build(f.children[1], batch, rb, de, nodeidx, bufferidx, convert)
end
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
return FixedSizeList{T, typeof(A)}(bytes, validity, A, len, meta), nodeidx, bufferidx
end
function build(f::Meta.Field, L::Meta.Map, batch, rb, de, nodeidx, bufferidx, convert)
@debug 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
buffer = rb.buffers[bufferidx]
ooff = batch.pos + buffer.offset
OT = Int32
bytes, offs = reinterp(OT, batch, buffer, rb.compression)
offsets = Offsets(bytes, offs)
bufferidx += 1
len = rb.nodes[nodeidx].length
nodeidx += 1
A, nodeidx, bufferidx = build(f.children[1], batch, rb, de, nodeidx, bufferidx, convert)
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
return Map{T, OT, typeof(A)}(validity, offsets, A, len, meta), nodeidx, bufferidx
end
function build(f::Meta.Field, L::Meta.Struct, batch, rb, de, nodeidx, bufferidx, convert)
@debug 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
len = rb.nodes[nodeidx].length
vecs = []
nodeidx += 1
for child in f.children
A, nodeidx, bufferidx = build(child, batch, rb, de, nodeidx, bufferidx, convert)
push!(vecs, A)
end
data = Tuple(vecs)
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
return Struct{T, typeof(data)}(validity, data, len, meta), nodeidx, bufferidx
end
function build(f::Meta.Field, L::Meta.Union, batch, rb, de, nodeidx, bufferidx, convert)
@debug 2 "building array: L = $L"
buffer = rb.buffers[bufferidx]
bytes, typeIds = reinterp(UInt8, batch, buffer, rb.compression)
bufferidx += 1
if L.mode == Meta.UnionMode.Dense
buffer = rb.buffers[bufferidx]
bytes2, offsets = reinterp(Int32, batch, buffer, rb.compression)
bufferidx += 1
end
vecs = []
nodeidx += 1
for child in f.children
A, nodeidx, bufferidx = build(child, batch, rb, de, nodeidx, bufferidx, convert)
push!(vecs, A)
end
data = Tuple(vecs)
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
if L.mode == Meta.UnionMode.Dense
B = DenseUnion{T, typeof(data)}(bytes, bytes2, typeIds, offsets, data, meta)
else
B = SparseUnion{T, typeof(data)}(bytes, typeIds, data, meta)
end
return B, nodeidx, bufferidx
end
function build(f::Meta.Field, L::Meta.Null, batch, rb, de, nodeidx, bufferidx, convert)
@debug 2 "building array: L = $L"
return MissingVector(rb.nodes[nodeidx].length), nodeidx + 1, bufferidx
end
# primitives
function build(f::Meta.Field, ::L, batch, rb, de, nodeidx, bufferidx, convert) where {L}
@debug 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
buffer = rb.buffers[bufferidx]
meta = buildmetadata(f.custom_metadata)
# get storage type (non-converted)
T = juliaeltype(f, nothing, false)
@debug 2 "storage type for primitive: T = $T"
bytes, A = reinterp(Base.nonmissingtype(T), batch, buffer, rb.compression)
len = rb.nodes[nodeidx].length
T = juliaeltype(f, meta, convert)
@debug 2 "final julia type for primitive: T = $T"
return Primitive(T, bytes, validity, A, len, meta), nodeidx + 1, bufferidx + 1
end