blob: 2d7ad01556c5d20f311457803cbd00a7a22e147c [file] [log] [blame]
const OBJ_METADATA = IdDict{Any, Dict{String, String}}()
function setmetadata!(x, meta::Dict{String, String})
OBJ_METADATA[x] = meta
return
end
getmetadata(x, default=nothing) = get(OBJ_METADATA, x, default)
"""
Arrow.write(io::IO, tbl)
Arrow.write(file::String, tbl)
Write any Tables.jl-compatible `tbl` out as arrow formatted data.
Providing an `io::IO` argument will cause the data to be written to it
in the "streaming" format, while providing a `file::String` argument
will result in the "file" format being written.
Multiple record batches will be written based on the number of
`Tables.partitions(tbl)` that are provided; by default, this is just
one for a given table, but some table sources support automatic
partitioning. Note you can turn multiple table objects into partitions
by doing `Tables.partitioner([tbl1, tbl2, ...])`, but note that
each table must have the exact same `Tables.Schema`.
"""
function write end
function write(file::String, tbl; largelists::Bool=false, compress::Union{Nothing, Symbol}=nothing, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false)
open(file, "w") do io
write(io, tbl, true, largelists, compress, denseunions, dictencode, dictencodenested)
end
return file
end
function write(io::IO, tbl; largelists::Bool=false, compress::Union{Nothing, Symbol}=nothing, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, file::Bool=false)
return write(io, tbl, file, largelists, compress, denseunions, dictencode, dictencodenested)
end
@static if VERSION >= v"1.3"
const Cond = Threads.Condition
else
const Cond = Condition
end
struct OrderedChannel{T}
chan::Channel{T}
cond::Cond
i::Ref{Int}
end
OrderedChannel{T}(sz) where {T} = OrderedChannel{T}(Channel{T}(sz), Threads.Condition(), Ref(1))
Base.iterate(ch::OrderedChannel, st...) = iterate(ch.chan, st...)
macro lock(obj, expr)
esc(quote
@static if VERSION >= v"1.3"
lock($obj)
end
try
$expr
finally
@static if VERSION >= v"1.3"
unlock($obj)
end
end
end)
end
function Base.put!(ch::OrderedChannel{T}, x::T, i::Integer, incr::Bool=false) where {T}
@lock ch.cond begin
while ch.i[] < i
wait(ch.cond)
end
put!(ch.chan, x)
if incr
ch.i[] += 1
end
notify(ch.cond)
end
return
end
function Base.close(ch::OrderedChannel)
@lock ch.cond begin
while Base.n_waiters(ch.cond) > 0
wait(ch.cond)
end
close(ch.chan)
end
return
end
function write(io, source, writetofile, largelists, compress, denseunions, dictencode, dictencodenested)
if writetofile
@debug 1 "starting write of arrow formatted file"
Base.write(io, "ARROW1\0\0")
end
msgs = OrderedChannel{Message}(Inf)
# build messages
sch = Ref{Tables.Schema}()
firstcols = Ref{Any}()
dictencodingvalues = Dict{Int, Set}()
blocks = (Block[], Block[])
# start message writing from channel
@static if VERSION >= v"1.3-DEV"
tsk = Threads.@spawn for msg in msgs
Base.write(io, msg, blocks, sch)
end
else
tsk = @async for msg in msgs
Base.write(io, msg, blocks, sch)
end
end
@sync for (i, tbl) in enumerate(Tables.partitions(source))
@debug 1 "processing table partition i = $i"
if i == 1
cols = toarrowtable(tbl, largelists, compress, denseunions, dictencode, dictencodenested)
sch[] = Tables.schema(cols)
firstcols[] = cols
put!(msgs, makeschemamsg(sch[], cols), i)
if !isempty(cols.dictencodings)
for de in cols.dictencodings
dictencodingvalues[de.id] = Set(z for z in de.data)
dictsch = Tables.Schema((:col,), (eltype(de.data),))
put!(msgs, makedictionarybatchmsg(dictsch, (col=de.data,), de.id, false), i)
end
end
put!(msgs, makerecordbatchmsg(sch[], cols), i, true)
else
@static if VERSION >= v"1.3-DEV"
Threads.@spawn begin
try
cols = toarrowtable(tbl, largelists, compress, denseunions, dictencode, dictencodenested)
if !isempty(cols.dictencodings)
for de in cols.dictencodings
dictsch = Tables.Schema((:col,), (eltype(de.data),))
existing = dictencodingvalues[de.id]
# get new de.data we haven't seen before for delta update
vals = setdiff(de.data, existing)
put!(msgs, makedictionarybatchmsg(dictsch, (col=vals,), de.id, true), i)
# add new de.data to existing set for future diffs
union!(existing, vals)
end
end
put!(msgs, makerecordbatchmsg(sch[], cols), i, true)
catch e
showerror(stdout, e, catch_backtrace())
rethrow(e)
end
end
else
@async begin
try
cols = toarrowtable(tbl, largelists, compress, denseunions, dictencode, dictencodenested)
if !isempty(cols.dictencodings)
for de in cols.dictencodings
dictsch = Tables.Schema((:col,), (eltype(de.data),))
existing = dictencodingvalues[de.id]
# get new de.data we haven't seen before for delta update
vals = setdiff(de.data, existing)
put!(msgs, makedictionarybatchmsg(dictsch, (col=vals,), de.id, true), i)
# add new de.data to existing set for future diffs
union!(existing, vals)
end
end
put!(msgs, makerecordbatchmsg(sch[], cols), i, true)
catch e
showerror(stdout, e, catch_backtrace())
rethrow(e)
end
end
end
end
end
close(msgs)
wait(tsk)
# write empty message
if !writetofile
Base.write(io, Message(UInt8[], nothing, 0, true, false), blocks, sch)
end
if writetofile
b = FlatBuffers.Builder(1024)
schfoot = makeschema(b, sch[], firstcols[])
if !isempty(blocks[1])
N = length(blocks[1])
Meta.footerStartRecordBatchesVector(b, N)
for blk in Iterators.reverse(blocks[1])
Meta.createBlock(b, blk.offset, blk.metaDataLength, blk.bodyLength)
end
recordbatches = FlatBuffers.endvector!(b, N)
else
recordbatches = FlatBuffers.UOffsetT(0)
end
if !isempty(blocks[2])
N = length(blocks[2])
Meta.footerStartDictionariesVector(b, N)
for blk in Iterators.reverse(blocks[2])
Meta.createBlock(b, blk.offset, blk.metaDataLength, blk.bodyLength)
end
dicts = FlatBuffers.endvector!(b, N)
else
dicts = FlatBuffers.UOffsetT(0)
end
Meta.footerStart(b)
Meta.footerAddVersion(b, Meta.MetadataVersion.V4)
Meta.footerAddSchema(b, schfoot)
Meta.footerAddDictionaries(b, dicts)
Meta.footerAddRecordBatches(b, recordbatches)
foot = Meta.footerEnd(b)
FlatBuffers.finish!(b, foot)
footer = FlatBuffers.finishedbytes(b)
Base.write(io, footer)
Base.write(io, Int32(length(footer)))
Base.write(io, "ARROW1")
end
return io
end
struct ToArrowTable
sch::Tables.Schema
cols::Vector{Any}
metadata::Union{Nothing, Dict{String, String}}
dictencodings::Vector{DictEncoding}
end
function toarrowtable(x, largelists, compress, denseunions, dictencode, dictencodenested)
@debug 1 "converting input table to arrow formatted columns"
cols = Tables.columns(x)
meta = getmetadata(cols)
sch = Tables.schema(cols)
types = collect(sch.types)
N = length(types)
newcols = Vector{Any}(undef, N)
newtypes = Vector{Type}(undef, N)
dictencodings = DictEncoding[]
Tables.eachcolumn(sch, cols) do col, i, nm
newcol = toarrowvector(col, dictencodings; compression=compress, largelists=largelists, denseunions=denseunions, dictencode=dictencode, dictencodenested=dictencodenested)
newtypes[i] = eltype(newcol)
newcols[i] = newcol
end
# assign dict encoding ids
for (i, de) in enumerate(dictencodings)
de.id = i - 1
end
return ToArrowTable(Tables.Schema(sch.names, newtypes), newcols, meta, dictencodings)
end
Tables.columns(x::ToArrowTable) = x
Tables.rowcount(x::ToArrowTable) = length(x.cols) == 0 ? 0 : length(x.cols[1])
Tables.schema(x::ToArrowTable) = x.sch
Tables.columnnames(x::ToArrowTable) = x.sch.names
Tables.getcolumn(x::ToArrowTable, i::Int) = x.cols[i]
struct Message
msgflatbuf
columns
bodylen
isrecordbatch::Bool
blockmsg::Bool
end
struct Block
offset::Int64
metaDataLength::Int32
bodyLength::Int64
end
function Base.write(io::IO, msg::Message, blocks, sch)
metalen = padding(length(msg.msgflatbuf))
@debug 1 "writing message: metalen = $metalen, bodylen = $(msg.bodylen), isrecordbatch = $(msg.isrecordbatch)"
if msg.blockmsg
push!(blocks[msg.isrecordbatch ? 1 : 2], Block(position(io), metalen + 8, msg.bodylen))
end
# now write the final message spec out
# continuation byte
n = Base.write(io, 0xFFFFFFFF)
# metadata length
n += Base.write(io, Int32(metalen))
# message flatbuffer
n += Base.write(io, msg.msgflatbuf)
n += writezeros(io, paddinglength(n))
# message body
if msg.columns !== nothing
# write out buffers
for col in Tables.Columns(msg.columns)
writebuffer(io, col)
end
end
return n
end
function makemessage(b, headerType, header, columns=nothing, bodylen=0)
# write the message flatbuffer object
Meta.messageStart(b)
Meta.messageAddVersion(b, Meta.MetadataVersion.V5)
Meta.messageAddHeaderType(b, headerType)
Meta.messageAddHeader(b, header)
Meta.messageAddBodyLength(b, Int64(bodylen))
# Meta.messageAddCustomMetadata(b, meta)
# Meta.messageStartCustomMetadataVector(b, num_meta_elems)
msg = Meta.messageEnd(b)
FlatBuffers.finish!(b, msg)
return Message(FlatBuffers.finishedbytes(b), columns, bodylen, headerType == Meta.RecordBatch, headerType == Meta.RecordBatch || headerType == Meta.DictionaryBatch)
end
function makeschema(b, sch::Tables.Schema{names}, columns) where {names}
# build Field objects
N = length(names)
fieldoffsets = [fieldoffset(b, names[i], columns.cols[i]) for i = 1:N]
Meta.schemaStartFieldsVector(b, N)
for off in Iterators.reverse(fieldoffsets)
FlatBuffers.prependoffset!(b, off)
end
fields = FlatBuffers.endvector!(b, N)
if columns.metadata !== nothing
kvs = columns.metadata
kvoffs = Vector{FlatBuffers.UOffsetT}(undef, length(kvs))
for (i, (k, v)) in enumerate(kvs)
koff = FlatBuffers.createstring!(b, String(k))
voff = FlatBuffers.createstring!(b, String(v))
Meta.keyValueStart(b)
Meta.keyValueAddKey(b, koff)
Meta.keyValueAddValue(b, voff)
kvoffs[i] = Meta.keyValueEnd(b)
end
Meta.schemaStartCustomMetadataVector(b, length(kvs))
for off in Iterators.reverse(kvoffs)
FlatBuffers.prependoffset!(b, off)
end
meta = FlatBuffers.endvector!(b, length(kvs))
else
meta = FlatBuffers.UOffsetT(0)
end
# write schema object
Meta.schemaStart(b)
Meta.schemaAddEndianness(b, Meta.Endianness.Little)
Meta.schemaAddFields(b, fields)
Meta.schemaAddCustomMetadata(b, meta)
return Meta.schemaEnd(b)
end
function makeschemamsg(sch::Tables.Schema, columns)
@debug 1 "building schema message: sch = $sch"
b = FlatBuffers.Builder(1024)
schema = makeschema(b, sch, columns)
return makemessage(b, Meta.Schema, schema)
end
function fieldoffset(b, name, col)
nameoff = FlatBuffers.createstring!(b, String(name))
T = eltype(col)
nullable = T >: Missing
# check for custom metadata
if getmetadata(col) !== nothing
kvs = getmetadata(col)
kvoffs = Vector{FlatBuffers.UOffsetT}(undef, length(kvs))
for (i, (k, v)) in enumerate(kvs)
koff = FlatBuffers.createstring!(b, String(k))
voff = FlatBuffers.createstring!(b, String(v))
Meta.keyValueStart(b)
Meta.keyValueAddKey(b, koff)
Meta.keyValueAddValue(b, voff)
kvoffs[i] = Meta.keyValueEnd(b)
end
Meta.fieldStartCustomMetadataVector(b, length(kvs))
for off in Iterators.reverse(kvoffs)
FlatBuffers.prependoffset!(b, off)
end
meta = FlatBuffers.endvector!(b, length(kvs))
else
meta = FlatBuffers.UOffsetT(0)
end
# build dictionary
if isdictencoded(col)
encodingtype = indtype(col)
IT, inttype, _ = arrowtype(b, encodingtype)
Meta.dictionaryEncodingStart(b)
Meta.dictionaryEncodingAddId(b, Int64(getid(col)))
Meta.dictionaryEncodingAddIndexType(b, inttype)
# TODO: support isOrdered?
Meta.dictionaryEncodingAddIsOrdered(b, false)
dict = Meta.dictionaryEncodingEnd(b)
else
dict = FlatBuffers.UOffsetT(0)
end
type, typeoff, children = arrowtype(b, col)
if children !== nothing
Meta.fieldStartChildrenVector(b, length(children))
for off in Iterators.reverse(children)
FlatBuffers.prependoffset!(b, off)
end
children = FlatBuffers.endvector!(b, length(children))
else
Meta.fieldStartChildrenVector(b, 0)
children = FlatBuffers.endvector!(b, 0)
end
# build field object
if isdictencoded(col)
@debug 1 "building field: name = $name, nullable = $nullable, T = $T, type = $type, inttype = $IT, dictionary id = $(getid(col))"
else
@debug 1 "building field: name = $name, nullable = $nullable, T = $T, type = $type"
end
Meta.fieldStart(b)
Meta.fieldAddName(b, nameoff)
Meta.fieldAddNullable(b, nullable)
Meta.fieldAddTypeType(b, type)
Meta.fieldAddType(b, typeoff)
Meta.fieldAddDictionary(b, dict)
Meta.fieldAddChildren(b, children)
Meta.fieldAddCustomMetadata(b, meta)
return Meta.fieldEnd(b)
end
struct FieldNode
length::Int64
null_count::Int64
end
struct Buffer
offset::Int64
length::Int64
end
function makerecordbatchmsg(sch::Tables.Schema{names, types}, columns) where {names, types}
b = FlatBuffers.Builder(1024)
recordbatch, bodylen = makerecordbatch(b, sch, columns)
return makemessage(b, Meta.RecordBatch, recordbatch, columns, bodylen)
end
function makerecordbatch(b, sch::Tables.Schema{names, types}, columns) where {names, types}
nrows = Tables.rowcount(columns)
compress = nothing
fieldnodes = FieldNode[]
fieldbuffers = Buffer[]
bufferoffset = 0
for col in Tables.Columns(columns)
if col isa Compressed
compress = compressiontype(col)
end
bufferoffset = makenodesbuffers!(col, fieldnodes, fieldbuffers, bufferoffset)
end
@debug 1 "building record batch message: nrows = $nrows, sch = $sch, compress = $compress"
# write field nodes objects
FN = length(fieldnodes)
Meta.recordBatchStartNodesVector(b, FN)
for fn in Iterators.reverse(fieldnodes)
Meta.createFieldNode(b, fn.length, fn.null_count)
end
nodes = FlatBuffers.endvector!(b, FN)
# write buffer objects
bodylen = 0
BN = length(fieldbuffers)
Meta.recordBatchStartBuffersVector(b, BN)
for buf in Iterators.reverse(fieldbuffers)
Meta.createBuffer(b, buf.offset, buf.length)
bodylen += padding(buf.length)
end
buffers = FlatBuffers.endvector!(b, BN)
# compression
if compress !== nothing
Meta.bodyCompressionStart(b)
Meta.bodyCompressionAddCodec(b, compress)
Meta.bodyCompressionAddMethod(b, Meta.BodyCompressionMethod.BUFFER)
compression = Meta.bodyCompressionEnd(b)
else
compression = FlatBuffers.UOffsetT(0)
end
# write record batch object
@debug 1 "built record batch message: nrows = $nrows, nodes = $fieldnodes, buffers = $fieldbuffers, compress = $compress, bodylen = $bodylen"
Meta.recordBatchStart(b)
Meta.recordBatchAddLength(b, Int64(nrows))
Meta.recordBatchAddNodes(b, nodes)
Meta.recordBatchAddBuffers(b, buffers)
Meta.recordBatchAddCompression(b, compression)
return Meta.recordBatchEnd(b), bodylen
end
function makedictionarybatchmsg(sch, columns, id, isdelta)
@debug 1 "building dictionary message: id = $id, sch = $sch, isdelta = $isdelta"
b = FlatBuffers.Builder(1024)
recordbatch, bodylen = makerecordbatch(b, sch, columns)
Meta.dictionaryBatchStart(b)
Meta.dictionaryBatchAddId(b, Int64(id))
Meta.dictionaryBatchAddData(b, recordbatch)
Meta.dictionaryBatchAddIsDelta(b, isdelta)
dictionarybatch = Meta.dictionaryBatchEnd(b)
return makemessage(b, Meta.DictionaryBatch, dictionarybatch, columns, bodylen)
end
function makenodesbuffers!(col::MissingVector, fieldnodes, fieldbuffers, bufferoffset)
push!(fieldnodes, FieldNode(length(col), length(col)))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
return bufferoffset
end
function writebuffer(io, col::MissingVector)
return
end
function makenodesbuffers!(col::Primitive{T, S}, fieldnodes, fieldbuffers, bufferoffset) where {T, S}
len = length(col)
nc = nullcount(col)
push!(fieldnodes, FieldNode(len, nc))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
# validity bitmap
blen = nc == 0 ? 0 : bitpackedbytes(len)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length))"
# adjust buffer offset, make primitive array buffer
bufferoffset += blen
blen = len * sizeof(T)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length))"
return bufferoffset + padding(blen)
end
function writebitmap(io, col::ArrowVector)
v = col.validity
@debug 1 "writing validity bitmap: nc = $(v.nc), n = $(bitpackedbytes(v.â„“))"
v.nc == 0 && return 0
Base.write(io, view(v.bytes, v.pos:(v.pos + bitpackedbytes(v.â„“) - 1)))
end
function writebuffer(io, col::Primitive{T, S}) where {T, S}
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
writebitmap(io, col)
n = writearray(io, S, col.data)
@debug 1 "writing array: col = $(typeof(col.data)), n = $n, padded = $(padding(n))"
writezeros(io, paddinglength(n))
return
end
function makenodesbuffers!(col::Union{Map{T, O, A}, List{T, O, A}}, fieldnodes, fieldbuffers, bufferoffset) where {T, O, A}
len = length(col)
nc = nullcount(col)
push!(fieldnodes, FieldNode(len, nc))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
# validity bitmap
blen = nc == 0 ? 0 : bitpackedbytes(len)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length))"
# adjust buffer offset, make array buffer
bufferoffset += blen
blen = sizeof(O) * (len + 1)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length))"
bufferoffset += padding(blen)
if eltype(A) == UInt8
blen = length(col.data)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length))"
bufferoffset += padding(blen)
else
bufferoffset = makenodesbuffers!(col.data, fieldnodes, fieldbuffers, bufferoffset)
end
return bufferoffset
end
function writebuffer(io, col::Union{Map{T, O, A}, List{T, O, A}}) where {T, O, A}
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
writebitmap(io, col)
# write offsets
n = writearray(io, O, col.offsets.offsets)
@debug 1 "writing array: col = $(typeof(col.offsets.offsets)), n = $n, padded = $(padding(n))"
writezeros(io, paddinglength(n))
# write values array
if eltype(A) == UInt8
n = writearray(io, UInt8, col.data)
@debug 1 "writing array: col = $(typeof(col.data)), n = $n, padded = $(padding(n))"
writezeros(io, paddinglength(n))
else
writebuffer(io, col.data)
end
return
end
function makenodesbuffers!(col::FixedSizeList{T, A}, fieldnodes, fieldbuffers, bufferoffset) where {T, A}
len = length(col)
nc = nullcount(col)
push!(fieldnodes, FieldNode(len, nc))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
# validity bitmap
blen = nc == 0 ? 0 : bitpackedbytes(len)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length))"
bufferoffset += blen
if eltype(A) === UInt8
blen = getN(Base.nonmissingtype(T)) * len
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length))"
bufferoffset += padding(blen)
else
bufferoffset = makenodesbuffers!(col.data, fieldnodes, fieldbuffers, bufferoffset)
end
return bufferoffset
end
function writebuffer(io, col::FixedSizeList{T, A}) where {T, A}
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
writebitmap(io, col)
# write values array
if eltype(A) === UInt8
n = writearray(io, UInt8, col.data)
@debug 1 "writing array: col = $(typeof(col.data)), n = $n, padded = $(padding(n))"
writezeros(io, paddinglength(n))
else
writebuffer(io, col.data)
end
return
end
function makenodesbuffers!(col::Struct{T}, fieldnodes, fieldbuffers, bufferoffset) where {T}
len = length(col)
nc = nullcount(col)
push!(fieldnodes, FieldNode(len, nc))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
# validity bitmap
blen = nc == 0 ? 0 : bitpackedbytes(len)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length))"
bufferoffset += blen
for child in col.data
bufferoffset = makenodesbuffers!(child, fieldnodes, fieldbuffers, bufferoffset)
end
return bufferoffset
end
function writebuffer(io, col::Struct)
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
writebitmap(io, col)
# write values arrays
for child in col.data
writebuffer(io, child)
end
return
end
function makenodesbuffers!(col::Union{DenseUnion, SparseUnion}, fieldnodes, fieldbuffers, bufferoffset)
len = length(col)
nc = nullcount(col)
push!(fieldnodes, FieldNode(len, nc))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
# typeIds buffer
push!(fieldbuffers, Buffer(bufferoffset, len))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length))"
bufferoffset += padding(len)
if col isa DenseUnion
# offsets buffer
blen = sizeof(Int32) * len
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length))"
bufferoffset += padding(blen)
end
for child in col.data
bufferoffset = makenodesbuffers!(child, fieldnodes, fieldbuffers, bufferoffset)
end
return bufferoffset
end
function writebuffer(io, col::Union{DenseUnion, SparseUnion})
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
# typeIds buffer
n = writearray(io, UInt8, col.typeIds)
@debug 1 "writing array: col = $(typeof(col.typeIds)), n = $n, padded = $(padding(n))"
writezeros(io, paddinglength(n))
if col isa DenseUnion
n = writearray(io, Int32, col.offsets)
@debug 1 "writing array: col = $(typeof(col.offsets)), n = $n, padded = $(padding(n))"
writezeros(io, paddinglength(n))
end
for child in col.data
writebuffer(io, child)
end
return
end
function makenodesbuffers!(col::DictEncoded{T, S}, fieldnodes, fieldbuffers, bufferoffset) where {T, S}
len = length(col)
nc = nullcount(col)
push!(fieldnodes, FieldNode(len, nc))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
# validity bitmap
blen = nc == 0 ? 0 : bitpackedbytes(len)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length))"
bufferoffset += blen
# indices
blen = sizeof(S) * len
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length))"
bufferoffset += padding(blen)
return bufferoffset
end
function writebuffer(io, col::DictEncoded)
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
writebitmap(io, col)
# write indices
n = writearray(io, col.indices)
@debug 1 "writing array: col = $(typeof(col.indices)), n = $n, padded = $(padding(n))"
writezeros(io, paddinglength(n))
return
end
function makenodesbuffers!(col::Compressed, fieldnodes, fieldbuffers, bufferoffset)
push!(fieldnodes, FieldNode(col.len, col.nullcount))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
for buffer in col.buffers
blen = length(buffer.data) == 0 ? 0 : 8 + length(buffer.data)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length))"
bufferoffset += padding(blen)
end
for child in col.children
bufferoffset = makenodesbuffers!(child, fieldnodes, fieldbuffers, bufferoffset)
end
return bufferoffset
end
function writearray(io, b::CompressedBuffer)
if length(b.data) > 0
n = Base.write(io, b.uncompressedlength)
@debug 1 "writing compressed buffer: uncompressedlength = $(b.uncompressedlength), n = $(length(b.data))"
@debug 2 b.data
return n + Base.write(io, b.data)
end
return 0
end
function writebuffer(io, col::Compressed)
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
for buffer in col.buffers
n = writearray(io, buffer)
writezeros(io, paddinglength(n))
end
for child in col.children
writebuffer(io, child)
end
return
end