blob: ff1722d278d5e58fabde5b47910ad2d2b38070e7 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
import pyarrow as pa
import numpy as np
dtypes = ['uint8', 'int16', 'float32']
cuda = pytest.importorskip("pyarrow.cuda")
nb_cuda = pytest.importorskip("numba.cuda")
from numba.cuda.cudadrv.devicearray import DeviceNDArray # noqa: E402
context_choices = None
context_choice_ids = ['pyarrow.cuda', 'numba.cuda']
def setup_module(module):
np.random.seed(1234)
ctx1 = cuda.Context()
nb_ctx1 = ctx1.to_numba()
nb_ctx2 = nb_cuda.current_context()
ctx2 = cuda.Context.from_numba(nb_ctx2)
module.context_choices = [(ctx1, nb_ctx1), (ctx2, nb_ctx2)]
def teardown_module(module):
del module.context_choices
@pytest.mark.parametrize("c", range(len(context_choice_ids)),
ids=context_choice_ids)
def test_context(c):
ctx, nb_ctx = context_choices[c]
assert ctx.handle == nb_ctx.handle.value
assert ctx.handle == ctx.to_numba().handle.value
ctx2 = cuda.Context.from_numba(nb_ctx)
assert ctx.handle == ctx2.handle
size = 10
buf = ctx.new_buffer(size)
assert ctx.handle == buf.context.handle
def make_random_buffer(size, target='host', dtype='uint8', ctx=None):
"""Return a host or device buffer with random data.
"""
dtype = np.dtype(dtype)
if target == 'host':
assert size >= 0
buf = pa.allocate_buffer(size*dtype.itemsize)
arr = np.frombuffer(buf, dtype=dtype)
arr[:] = np.random.randint(low=0, high=255, size=size,
dtype=np.uint8)
return arr, buf
elif target == 'device':
arr, buf = make_random_buffer(size, target='host', dtype=dtype)
dbuf = ctx.new_buffer(size * dtype.itemsize)
dbuf.copy_from_host(buf, position=0, nbytes=buf.size)
return arr, dbuf
raise ValueError('invalid target value')
@pytest.mark.parametrize("c", range(len(context_choice_ids)),
ids=context_choice_ids)
@pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
@pytest.mark.parametrize("size", [0, 1, 8, 1000])
def test_from_object(c, dtype, size):
ctx, nb_ctx = context_choices[c]
arr, cbuf = make_random_buffer(size, target='device', dtype=dtype, ctx=ctx)
# Creating device buffer from numba DeviceNDArray:
darr = nb_cuda.to_device(arr)
cbuf2 = ctx.buffer_from_object(darr)
assert cbuf2.size == cbuf.size
arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
np.testing.assert_equal(arr, arr2)
# Creating device buffer from a slice of numba DeviceNDArray:
if size >= 8:
# 1-D arrays
for s in [slice(size//4, None, None),
slice(size//4, -(size//4), None)]:
cbuf2 = ctx.buffer_from_object(darr[s])
arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
np.testing.assert_equal(arr[s], arr2)
# cannot test negative strides due to numba bug, see its issue 3705
if 0:
rdarr = darr[::-1]
cbuf2 = ctx.buffer_from_object(rdarr)
assert cbuf2.size == cbuf.size
arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
np.testing.assert_equal(arr, arr2)
with pytest.raises(ValueError,
match=('array data is non-contiguous')):
ctx.buffer_from_object(darr[::2])
# a rectangular 2-D array
s1 = size//4
s2 = size//s1
assert s1 * s2 == size
cbuf2 = ctx.buffer_from_object(darr.reshape(s1, s2))
assert cbuf2.size == cbuf.size
arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
np.testing.assert_equal(arr, arr2)
with pytest.raises(ValueError,
match=('array data is non-contiguous')):
ctx.buffer_from_object(darr.reshape(s1, s2)[:, ::2])
# a 3-D array
s1 = 4
s2 = size//8
s3 = size//(s1*s2)
assert s1 * s2 * s3 == size
cbuf2 = ctx.buffer_from_object(darr.reshape(s1, s2, s3))
assert cbuf2.size == cbuf.size
arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
np.testing.assert_equal(arr, arr2)
with pytest.raises(ValueError,
match=('array data is non-contiguous')):
ctx.buffer_from_object(darr.reshape(s1, s2, s3)[::2])
# Creating device buffer from am object implementing cuda array
# interface:
class MyObj:
def __init__(self, darr):
self.darr = darr
@property
def __cuda_array_interface__(self):
return self.darr.__cuda_array_interface__
cbuf2 = ctx.buffer_from_object(MyObj(darr))
assert cbuf2.size == cbuf.size
arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
np.testing.assert_equal(arr, arr2)
@pytest.mark.parametrize("c", range(len(context_choice_ids)),
ids=context_choice_ids)
@pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
def test_numba_memalloc(c, dtype):
ctx, nb_ctx = context_choices[c]
dtype = np.dtype(dtype)
# Allocate memory using numba context
# Warning: this will not be reflected in pyarrow context manager
# (e.g bytes_allocated does not change)
size = 10
mem = nb_ctx.memalloc(size * dtype.itemsize)
darr = DeviceNDArray((size,), (dtype.itemsize,), dtype, gpu_data=mem)
darr[:5] = 99
darr[5:] = 88
np.testing.assert_equal(darr.copy_to_host()[:5], 99)
np.testing.assert_equal(darr.copy_to_host()[5:], 88)
# wrap numba allocated memory with CudaBuffer
cbuf = cuda.CudaBuffer.from_numba(mem)
arr2 = np.frombuffer(cbuf.copy_to_host(), dtype=dtype)
np.testing.assert_equal(arr2, darr.copy_to_host())
@pytest.mark.parametrize("c", range(len(context_choice_ids)),
ids=context_choice_ids)
@pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
def test_pyarrow_memalloc(c, dtype):
ctx, nb_ctx = context_choices[c]
size = 10
arr, cbuf = make_random_buffer(size, target='device', dtype=dtype, ctx=ctx)
# wrap CudaBuffer with numba device array
mem = cbuf.to_numba()
darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem)
np.testing.assert_equal(darr.copy_to_host(), arr)
@pytest.mark.parametrize("c", range(len(context_choice_ids)),
ids=context_choice_ids)
@pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
def test_numba_context(c, dtype):
ctx, nb_ctx = context_choices[c]
size = 10
with nb_cuda.gpus[0]:
arr, cbuf = make_random_buffer(size, target='device',
dtype=dtype, ctx=ctx)
assert cbuf.context.handle == nb_ctx.handle.value
mem = cbuf.to_numba()
darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem)
np.testing.assert_equal(darr.copy_to_host(), arr)
darr[0] = 99
cbuf.context.synchronize()
arr2 = np.frombuffer(cbuf.copy_to_host(), dtype=dtype)
assert arr2[0] == 99
@pytest.mark.parametrize("c", range(len(context_choice_ids)),
ids=context_choice_ids)
@pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
def test_pyarrow_jit(c, dtype):
ctx, nb_ctx = context_choices[c]
@nb_cuda.jit
def increment_by_one(an_array):
pos = nb_cuda.grid(1)
if pos < an_array.size:
an_array[pos] += 1
# applying numba.cuda kernel to memory hold by CudaBuffer
size = 10
arr, cbuf = make_random_buffer(size, target='device', dtype=dtype, ctx=ctx)
threadsperblock = 32
blockspergrid = (arr.size + (threadsperblock - 1)) // threadsperblock
mem = cbuf.to_numba()
darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem)
increment_by_one[blockspergrid, threadsperblock](darr)
cbuf.context.synchronize()
arr1 = np.frombuffer(cbuf.copy_to_host(), dtype=arr.dtype)
np.testing.assert_equal(arr1, arr + 1)