blob: 85188a0d6c750b8ec402584acf032929852bedbf [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
const OBJ_METADATA = IdDict{Any, Dict{String, String}}()
"""
Arrow.setmetadata!(x, metadata::Dict{String, String})
Set the metadata for any object, provided as a `Dict{String, String}`.
Metadata attached to a table or column will be serialized when written
as a stream or file.
"""
function setmetadata!(x, meta::Dict{String, String})
OBJ_METADATA[x] = meta
return
end
"""
Arrow.getmetadata(x) => Dict{String, String}
Retrieve any metadata (as a `Dict{String, String}`) attached to an object.
Metadata may be attached to any object via [`Arrow.setmetadata!`](@ref),
or deserialized via the arrow format directly (the format allows attaching metadata
to table, column, and other objects).
"""
getmetadata(x, default=nothing) = get(OBJ_METADATA, x, default)
"""
Arrow.write(io::IO, tbl)
Arrow.write(file::String, tbl)
tbl |> Arrow.write(io_or_file)
Write any [Tables.jl](https://github.com/JuliaData/Tables.jl)-compatible `tbl` out as arrow formatted data.
Providing an `io::IO` argument will cause the data to be written to it
in the ["streaming" format](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format), unless `file=true` keyword argument is passed.
Providing a `file::String` argument will result in the ["file" format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format) being written.
Multiple record batches will be written based on the number of
`Tables.partitions(tbl)` that are provided; by default, this is just
one for a given table, but some table sources support automatic
partitioning. Note you can turn multiple table objects into partitions
by doing `Tables.partitioner([tbl1, tbl2, ...])`, but note that
each table must have the exact same `Tables.Schema`.
By default, `Arrow.write` will use multiple threads to write multiple
record batches simultaneously (e.g. if julia is started with `julia -t 8` or the `JULIA_NUM_THREADS` environment variable is set).
Supported keyword arguments to `Arrow.write` include:
* `compress`: possible values include `:lz4`, `:zstd`, or your own initialized `LZ4FrameCompressor` or `ZstdCompressor` objects; will cause all buffers in each record batch to use the respective compression encoding
* `alignment::Int=8`: specify the number of bytes to align buffers to when written in messages; strongly recommended to only use alignment values of 8 or 64 for modern memory cache line optimization
* `dictencode::Bool=false`: whether all columns should use dictionary encoding when being written; to dict encode specific columns, wrap the column/array in `Arrow.DictEncode(col)`
* `dictencodenested::Bool=false`: whether nested data type columns should also dict encode nested arrays/buffers; other language implementations [may not support this](https://arrow.apache.org/docs/status.html)
* `denseunions::Bool=true`: whether Julia `Vector{<:Union}` arrays should be written using the dense union layout; passing `false` will result in the sparse union layout
* `largelists::Bool=false`: causes list column types to be written with Int64 offset arrays; mainly for testing purposes; by default, Int64 offsets will be used only if needed
* `file::Bool=false`: if a an `io` argument is being written to, passing `file=true` will cause the arrow file format to be written instead of just IPC streaming
"""
function write end
write(io_or_file; kw...) = x -> write(io_or_file, x; kw...)
function write(file::String, tbl; largelists::Bool=false, compress::Union{Nothing, Symbol, LZ4FrameCompressor, ZstdCompressor}=nothing, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, alignment::Int=8)
open(file, "w") do io
write(io, tbl, true, largelists, compress, denseunions, dictencode, dictencodenested, alignment)
end
return file
end
function write(io::IO, tbl; largelists::Bool=false, compress::Union{Nothing, Symbol, LZ4FrameCompressor, ZstdCompressor}=nothing, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, alignment::Int=8, file::Bool=false)
return write(io, tbl, file, largelists, compress, denseunions, dictencode, dictencodenested, alignment)
end
function write(io, source, writetofile, largelists, compress, denseunions, dictencode, dictencodenested, alignment)
if compress === :lz4
compress = LZ4_FRAME_COMPRESSOR[]
elseif compress === :zstd
compress = ZSTD_COMPRESSOR[]
elseif compress isa Symbol
throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`"))
end
if writetofile
@debug 1 "starting write of arrow formatted file"
Base.write(io, "ARROW1\0\0")
end
msgs = OrderedChannel{Message}(Inf)
# build messages
sch = Ref{Tables.Schema}()
firstcols = Ref{Any}()
dictencodings = Dict{Int64, Any}() # Lockable{DictEncoding}
blocks = (Block[], Block[])
# start message writing from channel
tsk = Threads.@spawn for msg in msgs
Base.write(io, msg, blocks, sch, alignment)
end
@sync for (i, tbl) in enumerate(Tables.partitions(source))
@debug 1 "processing table partition i = $i"
if i == 1
cols = toarrowtable(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested)
sch[] = Tables.schema(cols)
firstcols[] = cols
put!(msgs, makeschemamsg(sch[], cols), i)
if !isempty(dictencodings)
des = sort!(collect(dictencodings); by=x->x.first, rev=true)
for (id, delock) in des
# assign dict encoding ids
de = delock.x
dictsch = Tables.Schema((:col,), (eltype(de.data),))
put!(msgs, makedictionarybatchmsg(dictsch, (col=de.data,), id, false, alignment), i)
end
end
put!(msgs, makerecordbatchmsg(sch[], cols, alignment), i, true)
else
Threads.@spawn begin
cols = toarrowtable(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested)
if !isempty(cols.dictencodingdeltas)
for de in cols.dictencodingdeltas
dictsch = Tables.Schema((:col,), (eltype(de.data),))
put!(msgs, makedictionarybatchmsg(dictsch, (col=de.data,), de.id, true, alignment), i)
end
end
put!(msgs, makerecordbatchmsg(sch[], cols, alignment), i, true)
end
end
end
# close our message-writing channel, no further put!-ing is allowed
close(msgs)
# now wait for our message-writing task to finish writing
wait(tsk)
# write empty message
if !writetofile
Base.write(io, Message(UInt8[], nothing, 0, true, false), blocks, sch, alignment)
end
if writetofile
b = FlatBuffers.Builder(1024)
schfoot = makeschema(b, sch[], firstcols[])
if !isempty(blocks[1])
N = length(blocks[1])
Meta.footerStartRecordBatchesVector(b, N)
for blk in Iterators.reverse(blocks[1])
Meta.createBlock(b, blk.offset, blk.metaDataLength, blk.bodyLength)
end
recordbatches = FlatBuffers.endvector!(b, N)
else
recordbatches = FlatBuffers.UOffsetT(0)
end
if !isempty(blocks[2])
N = length(blocks[2])
Meta.footerStartDictionariesVector(b, N)
for blk in Iterators.reverse(blocks[2])
Meta.createBlock(b, blk.offset, blk.metaDataLength, blk.bodyLength)
end
dicts = FlatBuffers.endvector!(b, N)
else
dicts = FlatBuffers.UOffsetT(0)
end
Meta.footerStart(b)
Meta.footerAddVersion(b, Meta.MetadataVersion.V4)
Meta.footerAddSchema(b, schfoot)
Meta.footerAddDictionaries(b, dicts)
Meta.footerAddRecordBatches(b, recordbatches)
foot = Meta.footerEnd(b)
FlatBuffers.finish!(b, foot)
footer = FlatBuffers.finishedbytes(b)
Base.write(io, footer)
Base.write(io, Int32(length(footer)))
Base.write(io, "ARROW1")
end
return io
end
struct ToArrowTable
sch::Tables.Schema
cols::Vector{Any}
metadata::Union{Nothing, Dict{String, String}}
dictencodingdeltas::Vector{DictEncoding}
end
function toarrowtable(x, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested)
@debug 1 "converting input table to arrow formatted columns"
cols = Tables.columns(x)
meta = getmetadata(cols)
sch = Tables.schema(cols)
types = collect(sch.types)
N = length(types)
newcols = Vector{Any}(undef, N)
newtypes = Vector{Type}(undef, N)
dictencodingdeltas = DictEncoding[]
Tables.eachcolumn(sch, cols) do col, i, nm
newcol = toarrowvector(col, i, dictencodings, dictencodingdeltas; compression=compress, largelists=largelists, denseunions=denseunions, dictencode=dictencode, dictencodenested=dictencodenested)
newtypes[i] = eltype(newcol)
newcols[i] = newcol
end
minlen, maxlen = extrema(length, newcols)
minlen == maxlen || throw(ArgumentError("columns with unequal lengths detected: $minlen < $maxlen"))
return ToArrowTable(Tables.Schema(sch.names, newtypes), newcols, meta, dictencodingdeltas)
end
Tables.columns(x::ToArrowTable) = x
Tables.rowcount(x::ToArrowTable) = length(x.cols) == 0 ? 0 : length(x.cols[1])
Tables.schema(x::ToArrowTable) = x.sch
Tables.columnnames(x::ToArrowTable) = x.sch.names
Tables.getcolumn(x::ToArrowTable, i::Int) = x.cols[i]
struct Message
msgflatbuf
columns
bodylen
isrecordbatch::Bool
blockmsg::Bool
end
struct Block
offset::Int64
metaDataLength::Int32
bodyLength::Int64
end
function Base.write(io::IO, msg::Message, blocks, sch, alignment)
metalen = padding(length(msg.msgflatbuf), alignment)
@debug 1 "writing message: metalen = $metalen, bodylen = $(msg.bodylen), isrecordbatch = $(msg.isrecordbatch)"
if msg.blockmsg
push!(blocks[msg.isrecordbatch ? 1 : 2], Block(position(io), metalen + 8, msg.bodylen))
end
# now write the final message spec out
# continuation byte
n = Base.write(io, 0xFFFFFFFF)
# metadata length
n += Base.write(io, Int32(metalen))
# message flatbuffer
n += Base.write(io, msg.msgflatbuf)
n += writezeros(io, paddinglength(length(msg.msgflatbuf), alignment))
# message body
if msg.columns !== nothing
# write out buffers
for col in Tables.Columns(msg.columns)
writebuffer(io, col, alignment)
end
end
return n
end
function makemessage(b, headerType, header, columns=nothing, bodylen=0)
# write the message flatbuffer object
Meta.messageStart(b)
Meta.messageAddVersion(b, Meta.MetadataVersion.V5)
Meta.messageAddHeaderType(b, headerType)
Meta.messageAddHeader(b, header)
Meta.messageAddBodyLength(b, Int64(bodylen))
# Meta.messageAddCustomMetadata(b, meta)
# Meta.messageStartCustomMetadataVector(b, num_meta_elems)
msg = Meta.messageEnd(b)
FlatBuffers.finish!(b, msg)
return Message(FlatBuffers.finishedbytes(b), columns, bodylen, headerType == Meta.RecordBatch, headerType == Meta.RecordBatch || headerType == Meta.DictionaryBatch)
end
function makeschema(b, sch::Tables.Schema{names}, columns) where {names}
# build Field objects
N = length(names)
fieldoffsets = [fieldoffset(b, names[i], columns.cols[i]) for i = 1:N]
Meta.schemaStartFieldsVector(b, N)
for off in Iterators.reverse(fieldoffsets)
FlatBuffers.prependoffset!(b, off)
end
fields = FlatBuffers.endvector!(b, N)
if columns.metadata !== nothing
kvs = columns.metadata
kvoffs = Vector{FlatBuffers.UOffsetT}(undef, length(kvs))
for (i, (k, v)) in enumerate(kvs)
koff = FlatBuffers.createstring!(b, String(k))
voff = FlatBuffers.createstring!(b, String(v))
Meta.keyValueStart(b)
Meta.keyValueAddKey(b, koff)
Meta.keyValueAddValue(b, voff)
kvoffs[i] = Meta.keyValueEnd(b)
end
Meta.schemaStartCustomMetadataVector(b, length(kvs))
for off in Iterators.reverse(kvoffs)
FlatBuffers.prependoffset!(b, off)
end
meta = FlatBuffers.endvector!(b, length(kvs))
else
meta = FlatBuffers.UOffsetT(0)
end
# write schema object
Meta.schemaStart(b)
Meta.schemaAddEndianness(b, Meta.Endianness.Little)
Meta.schemaAddFields(b, fields)
Meta.schemaAddCustomMetadata(b, meta)
return Meta.schemaEnd(b)
end
function makeschemamsg(sch::Tables.Schema, columns)
@debug 1 "building schema message: sch = $sch"
b = FlatBuffers.Builder(1024)
schema = makeschema(b, sch, columns)
return makemessage(b, Meta.Schema, schema)
end
function fieldoffset(b, name, col)
nameoff = FlatBuffers.createstring!(b, String(name))
T = eltype(col)
nullable = T >: Missing
# check for custom metadata
if getmetadata(col) !== nothing
kvs = getmetadata(col)
kvoffs = Vector{FlatBuffers.UOffsetT}(undef, length(kvs))
for (i, (k, v)) in enumerate(kvs)
koff = FlatBuffers.createstring!(b, String(k))
voff = FlatBuffers.createstring!(b, String(v))
Meta.keyValueStart(b)
Meta.keyValueAddKey(b, koff)
Meta.keyValueAddValue(b, voff)
kvoffs[i] = Meta.keyValueEnd(b)
end
Meta.fieldStartCustomMetadataVector(b, length(kvs))
for off in Iterators.reverse(kvoffs)
FlatBuffers.prependoffset!(b, off)
end
meta = FlatBuffers.endvector!(b, length(kvs))
else
meta = FlatBuffers.UOffsetT(0)
end
# build dictionary
if isdictencoded(col)
encodingtype = indtype(col)
IT, inttype, _ = arrowtype(b, encodingtype)
Meta.dictionaryEncodingStart(b)
Meta.dictionaryEncodingAddId(b, Int64(getid(col)))
Meta.dictionaryEncodingAddIndexType(b, inttype)
# TODO: support isOrdered?
Meta.dictionaryEncodingAddIsOrdered(b, false)
dict = Meta.dictionaryEncodingEnd(b)
else
dict = FlatBuffers.UOffsetT(0)
end
type, typeoff, children = arrowtype(b, col)
if children !== nothing
Meta.fieldStartChildrenVector(b, length(children))
for off in Iterators.reverse(children)
FlatBuffers.prependoffset!(b, off)
end
children = FlatBuffers.endvector!(b, length(children))
else
Meta.fieldStartChildrenVector(b, 0)
children = FlatBuffers.endvector!(b, 0)
end
# build field object
if isdictencoded(col)
@debug 1 "building field: name = $name, nullable = $nullable, T = $T, type = $type, inttype = $IT, dictionary id = $(getid(col))"
else
@debug 1 "building field: name = $name, nullable = $nullable, T = $T, type = $type"
end
Meta.fieldStart(b)
Meta.fieldAddName(b, nameoff)
Meta.fieldAddNullable(b, nullable)
Meta.fieldAddTypeType(b, type)
Meta.fieldAddType(b, typeoff)
Meta.fieldAddDictionary(b, dict)
Meta.fieldAddChildren(b, children)
Meta.fieldAddCustomMetadata(b, meta)
return Meta.fieldEnd(b)
end
struct FieldNode
length::Int64
null_count::Int64
end
struct Buffer
offset::Int64
length::Int64
end
function makerecordbatchmsg(sch::Tables.Schema{names, types}, columns, alignment) where {names, types}
b = FlatBuffers.Builder(1024)
recordbatch, bodylen = makerecordbatch(b, sch, columns, alignment)
return makemessage(b, Meta.RecordBatch, recordbatch, columns, bodylen)
end
function makerecordbatch(b, sch::Tables.Schema{names, types}, columns, alignment) where {names, types}
nrows = Tables.rowcount(columns)
compress = nothing
fieldnodes = FieldNode[]
fieldbuffers = Buffer[]
bufferoffset = 0
for col in Tables.Columns(columns)
if col isa Compressed
compress = compressiontype(col)
end
bufferoffset = makenodesbuffers!(col, fieldnodes, fieldbuffers, bufferoffset, alignment)
end
@debug 1 "building record batch message: nrows = $nrows, sch = $sch, compress = $compress"
# write field nodes objects
FN = length(fieldnodes)
Meta.recordBatchStartNodesVector(b, FN)
for fn in Iterators.reverse(fieldnodes)
Meta.createFieldNode(b, fn.length, fn.null_count)
end
nodes = FlatBuffers.endvector!(b, FN)
# write buffer objects
bodylen = 0
BN = length(fieldbuffers)
Meta.recordBatchStartBuffersVector(b, BN)
for buf in Iterators.reverse(fieldbuffers)
Meta.createBuffer(b, buf.offset, buf.length)
bodylen += padding(buf.length, alignment)
end
buffers = FlatBuffers.endvector!(b, BN)
# compression
if compress !== nothing
Meta.bodyCompressionStart(b)
Meta.bodyCompressionAddCodec(b, compress)
Meta.bodyCompressionAddMethod(b, Meta.BodyCompressionMethod.BUFFER)
compression = Meta.bodyCompressionEnd(b)
else
compression = FlatBuffers.UOffsetT(0)
end
# write record batch object
@debug 1 "built record batch message: nrows = $nrows, nodes = $fieldnodes, buffers = $fieldbuffers, compress = $compress, bodylen = $bodylen"
Meta.recordBatchStart(b)
Meta.recordBatchAddLength(b, Int64(nrows))
Meta.recordBatchAddNodes(b, nodes)
Meta.recordBatchAddBuffers(b, buffers)
Meta.recordBatchAddCompression(b, compression)
return Meta.recordBatchEnd(b), bodylen
end
function makedictionarybatchmsg(sch, columns, id, isdelta, alignment)
@debug 1 "building dictionary message: id = $id, sch = $sch, isdelta = $isdelta"
b = FlatBuffers.Builder(1024)
recordbatch, bodylen = makerecordbatch(b, sch, columns, alignment)
Meta.dictionaryBatchStart(b)
Meta.dictionaryBatchAddId(b, Int64(id))
Meta.dictionaryBatchAddData(b, recordbatch)
Meta.dictionaryBatchAddIsDelta(b, isdelta)
dictionarybatch = Meta.dictionaryBatchEnd(b)
return makemessage(b, Meta.DictionaryBatch, dictionarybatch, columns, bodylen)
end