blob: eca7c4d8b8fa86ae0170d332569fa533e3c7830c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Arrow.DictEncoding
Represents the "pool" of possible values for a [`DictEncoded`](@ref)
array type. Whether the order of values is significant can be checked
by looking at the `isOrdered` boolean field.
"""
mutable struct DictEncoding{T, A} <: ArrowVector{T}
id::Int64
data::A
isOrdered::Bool
metadata::Union{Nothing, Dict{String, String}}
end
Base.size(d::DictEncoding) = size(d.data)
@propagate_inbounds function Base.getindex(d::DictEncoding{T}, i::Integer) where {T}
@boundscheck checkbounds(d, i)
return @inbounds ArrowTypes.arrowconvert(T, d.data[i])
end
# convenience wrapper to signal that an input column should be
# dict encoded when written to the arrow format
struct DictEncodeType{T} end
getT(::Type{DictEncodeType{T}}) where {T} = T
"""
Arrow.DictEncode(::AbstractVector, id::Integer=nothing)
Signals that a column/array should be dictionary encoded when serialized
to the arrow streaming/file format. An optional `id` number may be provided
to signal that multiple columns should use the same pool when being
dictionary encoded.
"""
struct DictEncode{T, A} <: AbstractVector{DictEncodeType{T}}
id::Int64
data::A
end
DictEncode(x::A, id=-1) where {A} = DictEncode{eltype(A), A}(id, x)
Base.IndexStyle(::Type{<:DictEncode}) = Base.IndexLinear()
Base.size(x::DictEncode) = (length(x.data),)
Base.iterate(x::DictEncode, st...) = iterate(x.data, st...)
Base.getindex(x::DictEncode, i::Int) = getindex(x.data, i)
ArrowTypes.ArrowType(::Type{<:DictEncodeType}) = DictEncodedType()
"""
Arrow.DictEncoded
A dictionary encoded array type (similar to a `PooledArray`). Behaves just
like a normal array in most respects; internally, possible values are stored
in the `encoding::DictEncoding` field, while the `indices::Vector{<:Integer}`
field holds the "codes" of each element for indexing into the encoding pool.
Any column/array can be dict encoding when serializing to the arrow format
either by passing the `dictencode=true` keyword argument to [`Arrow.write`](@ref)
(which causes _all_ columns to be dict encoded), or wrapping individual columns/
arrays in [`Arrow.DictEncode(x)`](@ref).
"""
struct DictEncoded{T, S, A} <: ArrowVector{T}
arrow::Vector{UInt8} # need to hold a reference to arrow memory blob
validity::ValidityBitmap
indices::Vector{S}
encoding::DictEncoding{T, A}
metadata::Union{Nothing, Dict{String, String}}
end
DictEncoded(b::Vector{UInt8}, v::ValidityBitmap, inds::Vector{S}, encoding::DictEncoding{T, A}, meta) where {S, T, A} =
DictEncoded{T, S, A}(b, v, inds, encoding, meta)
Base.size(d::DictEncoded) = size(d.indices)
isdictencoded(d::DictEncoded) = true
isdictencoded(x) = false
isdictencoded(c::Compressed{Z, A}) where {Z, A <: DictEncoded} = true
signedtype(::Type{UInt8}) = Int8
signedtype(::Type{UInt16}) = Int16
signedtype(::Type{UInt32}) = Int32
signedtype(::Type{UInt64}) = Int64
indtype(d::DictEncoded{T, S, A}) where {T, S, A} = S
indtype(c::Compressed{Z, A}) where {Z, A <: DictEncoded} = indtype(c.data)
dictencodeid(colidx, nestedlevel, fieldid) = (Int64(nestedlevel) << 48) | (Int64(fieldid) << 32) | Int64(colidx)
getid(d::DictEncoded) = d.encoding.id
getid(c::Compressed{Z, A}) where {Z, A <: DictEncoded} = c.data.encoding.id
arrowvector(::DictEncodedType, x::DictEncoded, i, nl, fi, de, ded, meta; kw...) = x
function arrowvector(::DictEncodedType, x, i, nl, fi, de, ded, meta; dictencode::Bool=false, dictencodenested::Bool=false, kw...)
@assert x isa DictEncode
id = x.id == -1 ? dictencodeid(i, nl, fi) : x.id
x = x.data
len = length(x)
validity = ValidityBitmap(x)
if !haskey(de, id)
# dict encoding doesn't exist yet, so create for 1st time
if DataAPI.refarray(x) === x
# need to encode ourselves
x = PooledArray(x, encodingtype(length(x)))
inds = DataAPI.refarray(x)
else
inds = copy(DataAPI.refarray(x))
end
# adjust to "offset" instead of index
for i = 1:length(inds)
@inbounds inds[i] -= 1
end
pool = DataAPI.refpool(x)
# horrible hack? yes. better than taking CategoricalArrays dependency? also yes.
if typeof(pool).name.name == :CategoricalRefPool
pool = [get(pool[i]) for i = 1:length(pool)]
end
data = arrowvector(pool, i, nl, fi, de, ded, nothing; dictencode=dictencodenested, dictencodenested=dictencodenested, dictencoding=true, kw...)
encoding = DictEncoding{eltype(data), typeof(data)}(id, data, false, getmetadata(data))
de[id] = Lockable(encoding)
else
# encoding already exists
# compute inds based on it
# if value doesn't exist in encoding, push! it
# also add to deltas updates
encodinglockable = de[id]
@lock encodinglockable begin
encoding = encodinglockable.x
len = length(x)
ET = encodingtype(len)
pool = Dict{Union{eltype(encoding), eltype(x)}, ET}(a => (b - 1) for (b, a) in enumerate(encoding))
deltas = eltype(x)[]
inds = Vector{ET}(undef, len)
categorical = typeof(x).name.name == :CategoricalArray
for (j, val) in enumerate(x)
if categorical
val = get(val)
end
@inbounds inds[j] = get!(pool, val) do
push!(deltas, val)
length(pool)
end
end
if !isempty(deltas)
data = arrowvector(deltas, i, nl, fi, de, ded, nothing; dictencode=dictencodenested, dictencodenested=dictencodenested, dictencoding=true, kw...)
push!(ded, DictEncoding{eltype(data), typeof(data)}(id, data, false, getmetadata(data)))
if typeof(encoding.data) <: ChainedVector
append!(encoding.data, data)
else
data2 = ChainedVector([encoding.data, data])
encoding = DictEncoding{eltype(data2), typeof(data2)}(id, data2, false, getmetadata(encoding))
de[id] = Lockable(encoding)
end
end
end
end
if meta !== nothing && getmetadata(encoding) !== nothing
merge!(meta, getmetadata(encoding))
elseif getmetadata(encoding) !== nothing
meta = getmetadata(encoding)
end
return DictEncoded(UInt8[], validity, inds, encoding, meta)
end
@propagate_inbounds function Base.getindex(d::DictEncoded, i::Integer)
@boundscheck checkbounds(d, i)
@inbounds valid = d.validity[i]
!valid && return missing
@inbounds idx = d.indices[i]
return @inbounds d.encoding[idx + 1]
end
@propagate_inbounds function Base.setindex!(d::DictEncoded{T}, v, i::Integer) where {T}
@boundscheck checkbounds(d, i)
if v === missing
@inbounds d.validity[i] = false
else
ix = findfirst(d.encoding.data, v)
if ix === nothing
push!(d.encoding.data, v)
@inbounds d.indices[i] = length(d.encoding.data) - 1
else
@inbounds d.indices[i] = ix - 1
end
end
return v
end
function Base.copy(x::DictEncoded{T, S}) where {T, S}
pool = copy(x.encoding.data)
valid = x.validity
inds = x.indices
refs = copy(inds)
@inbounds for i = 1:length(inds)
refs[i] = refs[i] + one(S)
end
return PooledArray(PooledArrays.RefArray(refs), Dict{T, S}(val => i for (i, val) in enumerate(pool)), pool)
end
function compress(Z::Meta.CompressionType, comp, x::A) where {A <: DictEncoded}
len = length(x)
nc = nullcount(x)
validity = compress(Z, comp, x.validity)
inds = compress(Z, comp, x.indices)
return Compressed{Z, A}(x, [validity, inds], len, nc, Compressed[])
end
function makenodesbuffers!(col::DictEncoded{T, S}, fieldnodes, fieldbuffers, bufferoffset, alignment) where {T, S}
len = length(col)
nc = nullcount(col)
push!(fieldnodes, FieldNode(len, nc))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
# validity bitmap
blen = nc == 0 ? 0 : bitpackedbytes(len, alignment)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
bufferoffset += blen
# indices
blen = sizeof(S) * len
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
bufferoffset += padding(blen, alignment)
return bufferoffset
end
function writebuffer(io, col::DictEncoded, alignment)
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
writebitmap(io, col, alignment)
# write indices
n = writearray(io, col.indices)
@debug 1 "writing array: col = $(typeof(col.indices)), n = $n, padded = $(padding(n, alignment))"
writezeros(io, paddinglength(n, alignment))
return
end