blob: 08cf442848e6655b29c0635b36a013d4fca7ccd8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <limits>
#include <string>
#include <cuda.h>
#include "gtest/gtest.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/api.h"
#include "arrow/ipc/dictionary.h"
#include "arrow/ipc/test_common.h"
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"
#include "arrow/gpu/cuda_api.h"
#include "arrow/gpu/cuda_internal.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/macros.h"
namespace arrow {
using internal::checked_cast;
namespace cuda {
using internal::StatusFromCuda;
#define ASSERT_CUDA_OK(expr) ASSERT_OK(::arrow::cuda::internal::StatusFromCuda((expr)))
constexpr int kGpuNumber = 0;
// Needs a second GPU installed
constexpr int kOtherGpuNumber = 1;
template <typename Expected>
void AssertCudaBufferEquals(const CudaBuffer& buffer, Expected&& expected) {
ASSERT_OK_AND_ASSIGN(auto result, AllocateBuffer(buffer.size()));
ASSERT_OK(buffer.CopyToHost(0, buffer.size(), result->mutable_data()));
AssertBufferEqual(*result, expected);
}
template <typename Expected>
void AssertCudaBufferEquals(const Buffer& buffer, Expected&& expected) {
ASSERT_TRUE(IsCudaDevice(*buffer.device()));
AssertCudaBufferEquals(checked_cast<const CudaBuffer&>(buffer),
std::forward<Expected>(expected));
}
class TestCudaBase : public ::testing::Test {
public:
void SetUp() {
ASSERT_OK_AND_ASSIGN(manager_, CudaDeviceManager::Instance());
ASSERT_OK_AND_ASSIGN(device_, manager_->GetDevice(kGpuNumber));
// ASSERT_OK(device_->GetContext(kGpuNumber, &context_));
ASSERT_OK_AND_ASSIGN(context_, device_->GetContext());
ASSERT_OK_AND_ASSIGN(mm_, AsCudaMemoryManager(device_->default_memory_manager()));
cpu_device_ = CPUDevice::Instance();
cpu_mm_ = cpu_device_->default_memory_manager();
}
void TearDown() {
for (auto cu_context : non_primary_contexts_) {
ASSERT_CUDA_OK(cuCtxDestroy(cu_context));
}
}
Result<CUcontext> NonPrimaryRawContext() {
CUcontext ctx;
RETURN_NOT_OK(StatusFromCuda(cuCtxCreate(&ctx, /*flags=*/0, device_->handle())));
non_primary_contexts_.push_back(ctx);
return ctx;
}
Result<std::shared_ptr<CudaContext>> NonPrimaryContext() {
ARROW_ASSIGN_OR_RAISE(auto cuctx, NonPrimaryRawContext());
return device_->GetSharedContext(cuctx);
}
// Returns nullptr if kOtherGpuNumber does not correspond to an installed GPU
Result<std::shared_ptr<CudaDevice>> OtherGpuDevice() {
auto maybe_device = CudaDevice::Make(kOtherGpuNumber);
if (maybe_device.status().IsInvalid()) {
return nullptr;
}
return maybe_device;
}
protected:
CudaDeviceManager* manager_;
std::shared_ptr<CudaDevice> device_;
std::shared_ptr<CudaMemoryManager> mm_;
std::shared_ptr<CudaContext> context_;
std::shared_ptr<Device> cpu_device_;
std::shared_ptr<MemoryManager> cpu_mm_;
std::vector<CUcontext> non_primary_contexts_;
};
// ------------------------------------------------------------------------
// Test CudaDevice
class TestCudaDevice : public TestCudaBase {
public:
void SetUp() { TestCudaBase::SetUp(); }
};
TEST_F(TestCudaDevice, Basics) {
ASSERT_FALSE(device_->is_cpu());
ASSERT_TRUE(IsCudaDevice(*device_));
ASSERT_EQ(device_->device_number(), kGpuNumber);
ASSERT_GE(device_->total_memory(), 1 << 20);
ASSERT_NE(device_->device_name(), "");
ASSERT_NE(device_->ToString(), "");
ASSERT_OK_AND_ASSIGN(auto other_device, CudaDevice::Make(kGpuNumber));
ASSERT_FALSE(other_device->is_cpu());
ASSERT_TRUE(IsCudaDevice(*other_device));
ASSERT_EQ(other_device->device_number(), kGpuNumber);
ASSERT_EQ(other_device->total_memory(), device_->total_memory());
ASSERT_EQ(other_device->handle(), device_->handle());
ASSERT_EQ(other_device->device_name(), device_->device_name());
ASSERT_EQ(*other_device, *device_);
ASSERT_FALSE(IsCudaDevice(*cpu_device_));
// Try another device if possible
ASSERT_OK_AND_ASSIGN(other_device, OtherGpuDevice());
if (other_device != nullptr) {
ASSERT_FALSE(other_device->is_cpu());
ASSERT_EQ(other_device->device_number(), kOtherGpuNumber);
ASSERT_NE(*other_device, *device_);
ASSERT_NE(other_device->handle(), device_->handle());
ASSERT_NE(other_device->ToString(), device_->ToString());
}
ASSERT_RAISES(Invalid, CudaDevice::Make(-1));
ASSERT_RAISES(Invalid, CudaDevice::Make(99));
}
TEST_F(TestCudaDevice, Copy) {
auto cpu_buffer = Buffer::FromString("some data");
// CPU -> device
ASSERT_OK_AND_ASSIGN(auto other_buffer, Buffer::Copy(cpu_buffer, mm_));
ASSERT_EQ(other_buffer->device(), device_);
AssertCudaBufferEquals(*other_buffer, "some data");
// device -> CPU
ASSERT_OK_AND_ASSIGN(cpu_buffer, Buffer::Copy(other_buffer, cpu_mm_));
ASSERT_TRUE(cpu_buffer->device()->is_cpu());
AssertBufferEqual(*cpu_buffer, "some data");
// device -> device
const auto old_address = other_buffer->address();
ASSERT_OK_AND_ASSIGN(other_buffer, Buffer::Copy(other_buffer, mm_));
ASSERT_EQ(other_buffer->device(), device_);
ASSERT_NE(other_buffer->address(), old_address);
AssertCudaBufferEquals(*other_buffer, "some data");
// device (other context) -> device
ASSERT_OK_AND_ASSIGN(auto other_context, NonPrimaryContext());
ASSERT_OK_AND_ASSIGN(auto cuda_buffer, other_context->Allocate(9));
ASSERT_OK(cuda_buffer->CopyFromHost(0, "some data", 9));
ASSERT_OK_AND_ASSIGN(other_buffer, Buffer::Copy(cuda_buffer, mm_));
ASSERT_EQ(other_buffer->device(), device_);
AssertCudaBufferEquals(*other_buffer, "some data");
auto other_handle = cuda_buffer->context()->handle();
ASSERT_OK_AND_ASSIGN(cuda_buffer, CudaBuffer::FromBuffer(other_buffer));
ASSERT_NE(cuda_buffer->context()->handle(), other_handle);
// device -> other device
ASSERT_OK_AND_ASSIGN(auto other_device, OtherGpuDevice());
if (other_device != nullptr) {
ASSERT_OK_AND_ASSIGN(
other_buffer, Buffer::Copy(cuda_buffer, other_device->default_memory_manager()));
ASSERT_EQ(other_buffer->device(), other_device);
AssertCudaBufferEquals(*other_buffer, "some data");
}
}
// ------------------------------------------------------------------------
// Test CudaContext
class TestCudaContext : public TestCudaBase {
public:
void SetUp() { TestCudaBase::SetUp(); }
};
TEST_F(TestCudaContext, Basics) { ASSERT_EQ(*context_->device(), *device_); }
TEST_F(TestCudaContext, NonPrimaryContext) {
ASSERT_OK_AND_ASSIGN(auto other_context, NonPrimaryContext());
ASSERT_EQ(*other_context->device(), *device_);
ASSERT_NE(other_context->handle(), context_->handle());
}
TEST_F(TestCudaContext, GetDeviceAddress) {
const int64_t kSize = 100;
ASSERT_OK_AND_ASSIGN(auto buffer, context_->Allocate(kSize));
// GetDeviceAddress() is idempotent on device addresses
ASSERT_OK_AND_ASSIGN(auto devptr, context_->GetDeviceAddress(buffer->address()));
ASSERT_EQ(devptr, buffer->address());
}
// ------------------------------------------------------------------------
// Test CudaBuffer
class TestCudaBuffer : public TestCudaBase {
public:
void SetUp() { TestCudaBase::SetUp(); }
};
TEST_F(TestCudaBuffer, Allocate) {
const int64_t kSize = 100;
std::shared_ptr<CudaBuffer> buffer;
ASSERT_OK_AND_ASSIGN(buffer, context_->Allocate(kSize));
ASSERT_EQ(buffer->device(), context_->device());
ASSERT_EQ(kSize, buffer->size());
ASSERT_EQ(kSize, context_->bytes_allocated());
ASSERT_FALSE(buffer->is_cpu());
}
TEST_F(TestCudaBuffer, CopyFromHost) {
const int64_t kSize = 1000;
std::shared_ptr<CudaBuffer> device_buffer;
ASSERT_OK_AND_ASSIGN(device_buffer, context_->Allocate(kSize));
std::shared_ptr<ResizableBuffer> host_buffer;
ASSERT_OK(MakeRandomByteBuffer(kSize, default_memory_pool(), &host_buffer));
ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), 500));
ASSERT_OK(device_buffer->CopyFromHost(500, host_buffer->data() + 500, kSize - 500));
AssertCudaBufferEquals(*device_buffer, *host_buffer);
}
TEST_F(TestCudaBuffer, FromBuffer) {
const int64_t kSize = 1000;
// Initialize device buffer with random data
std::shared_ptr<ResizableBuffer> host_buffer;
std::shared_ptr<CudaBuffer> device_buffer;
ASSERT_OK_AND_ASSIGN(device_buffer, context_->Allocate(kSize));
ASSERT_OK(MakeRandomByteBuffer(kSize, default_memory_pool(), &host_buffer));
ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), 1000));
// Sanity check
AssertCudaBufferEquals(*device_buffer, *host_buffer);
// Get generic Buffer from device buffer
std::shared_ptr<Buffer> buffer;
std::shared_ptr<CudaBuffer> result;
buffer = std::static_pointer_cast<Buffer>(device_buffer);
ASSERT_OK_AND_ASSIGN(result, CudaBuffer::FromBuffer(buffer));
ASSERT_EQ(result->size(), kSize);
ASSERT_EQ(result->is_mutable(), true);
ASSERT_EQ(result->address(), buffer->address());
AssertCudaBufferEquals(*result, *host_buffer);
buffer = SliceBuffer(device_buffer, 0, kSize);
ASSERT_OK_AND_ASSIGN(result, CudaBuffer::FromBuffer(buffer));
ASSERT_EQ(result->size(), kSize);
ASSERT_EQ(result->is_mutable(), false);
ASSERT_EQ(result->address(), buffer->address());
AssertCudaBufferEquals(*result, *host_buffer);
buffer = SliceMutableBuffer(device_buffer, 0, kSize);
ASSERT_OK_AND_ASSIGN(result, CudaBuffer::FromBuffer(buffer));
ASSERT_EQ(result->size(), kSize);
ASSERT_EQ(result->is_mutable(), true);
ASSERT_EQ(result->address(), buffer->address());
AssertCudaBufferEquals(*result, *host_buffer);
buffer = SliceMutableBuffer(device_buffer, 3, kSize - 10);
buffer = SliceMutableBuffer(buffer, 8, kSize - 20);
ASSERT_OK_AND_ASSIGN(result, CudaBuffer::FromBuffer(buffer));
ASSERT_EQ(result->size(), kSize - 20);
ASSERT_EQ(result->is_mutable(), true);
ASSERT_EQ(result->address(), buffer->address());
AssertCudaBufferEquals(*result, *SliceBuffer(host_buffer, 11, kSize - 20));
}
// IPC only supported on Linux
#if defined(__linux)
TEST_F(TestCudaBuffer, DISABLED_ExportForIpc) {
// For this test to work, a second process needs to be spawned
const int64_t kSize = 1000;
std::shared_ptr<CudaBuffer> device_buffer;
ASSERT_OK_AND_ASSIGN(device_buffer, context_->Allocate(kSize));
std::shared_ptr<ResizableBuffer> host_buffer;
ASSERT_OK(MakeRandomByteBuffer(kSize, default_memory_pool(), &host_buffer));
ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), kSize));
// Export for IPC and serialize
std::shared_ptr<CudaIpcMemHandle> ipc_handle;
ASSERT_OK_AND_ASSIGN(ipc_handle, device_buffer->ExportForIpc());
std::shared_ptr<Buffer> serialized_handle;
ASSERT_OK_AND_ASSIGN(serialized_handle, ipc_handle->Serialize());
// Deserialize IPC handle and open
std::shared_ptr<CudaIpcMemHandle> ipc_handle2;
ASSERT_OK_AND_ASSIGN(ipc_handle2,
CudaIpcMemHandle::FromBuffer(serialized_handle->data()));
std::shared_ptr<CudaBuffer> ipc_buffer;
ASSERT_OK_AND_ASSIGN(ipc_buffer, context_->OpenIpcBuffer(*ipc_handle2));
ASSERT_EQ(kSize, ipc_buffer->size());
ASSERT_OK_AND_ASSIGN(auto ipc_data, AllocateBuffer(kSize));
ASSERT_OK(ipc_buffer->CopyToHost(0, kSize, ipc_data->mutable_data()));
ASSERT_EQ(0, std::memcmp(ipc_buffer->data(), host_buffer->data(), kSize));
}
#endif
// ------------------------------------------------------------------------
// Test CudaHostBuffer
class TestCudaHostBuffer : public TestCudaBase {
public:
};
TEST_F(TestCudaHostBuffer, AllocateGlobal) {
// Allocation using the global AllocateCudaHostBuffer() function
std::shared_ptr<CudaHostBuffer> host_buffer;
ASSERT_OK_AND_ASSIGN(host_buffer, AllocateCudaHostBuffer(kGpuNumber, 1024));
ASSERT_TRUE(host_buffer->is_cpu());
ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_);
ASSERT_OK_AND_ASSIGN(auto device_address, host_buffer->GetDeviceAddress(context_));
ASSERT_NE(device_address, 0);
ASSERT_OK_AND_ASSIGN(auto host_address, GetHostAddress(device_address));
ASSERT_EQ(host_address, host_buffer->data());
}
TEST_F(TestCudaHostBuffer, ViewOnDevice) {
ASSERT_OK_AND_ASSIGN(auto host_buffer, device_->AllocateHostBuffer(1024));
ASSERT_TRUE(host_buffer->is_cpu());
ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_);
// Try to view the host buffer on the device. This should correspond to
// GetDeviceAddress() in the previous test.
ASSERT_OK_AND_ASSIGN(auto device_buffer, Buffer::View(host_buffer, mm_));
ASSERT_FALSE(device_buffer->is_cpu());
ASSERT_EQ(device_buffer->memory_manager(), mm_);
ASSERT_NE(device_buffer->address(), 0);
ASSERT_EQ(device_buffer->size(), host_buffer->size());
ASSERT_EQ(device_buffer->parent(), host_buffer);
// View back the device buffer on the CPU. This should roundtrip.
ASSERT_OK_AND_ASSIGN(auto buffer, Buffer::View(device_buffer, cpu_mm_));
ASSERT_TRUE(buffer->is_cpu());
ASSERT_EQ(buffer->memory_manager(), cpu_mm_);
ASSERT_EQ(buffer->address(), host_buffer->address());
ASSERT_EQ(buffer->size(), host_buffer->size());
ASSERT_EQ(buffer->parent(), device_buffer);
}
// ------------------------------------------------------------------------
// Test CudaBufferWriter
class TestCudaBufferWriter : public TestCudaBase {
public:
void SetUp() { TestCudaBase::SetUp(); }
void Allocate(const int64_t size) {
ASSERT_OK_AND_ASSIGN(device_buffer_, context_->Allocate(size));
writer_.reset(new CudaBufferWriter(device_buffer_));
}
void TestWrites(const int64_t total_bytes, const int64_t chunksize,
const int64_t buffer_size = 0) {
std::shared_ptr<ResizableBuffer> buffer;
ASSERT_OK(MakeRandomByteBuffer(total_bytes, default_memory_pool(), &buffer));
if (buffer_size > 0) {
ASSERT_OK(writer_->SetBufferSize(buffer_size));
}
ASSERT_OK_AND_EQ(0, writer_->Tell());
const uint8_t* host_data = buffer->data();
ASSERT_OK(writer_->Write(host_data, chunksize));
ASSERT_OK_AND_EQ(chunksize, writer_->Tell());
ASSERT_OK(writer_->Seek(0));
ASSERT_OK_AND_EQ(0, writer_->Tell());
int64_t position = 0;
while (position < total_bytes) {
int64_t bytes_to_write = std::min(chunksize, total_bytes - position);
ASSERT_OK(writer_->Write(host_data + position, bytes_to_write));
position += bytes_to_write;
}
ASSERT_OK(writer_->Flush());
AssertCudaBufferEquals(*device_buffer_, *buffer);
}
protected:
std::shared_ptr<CudaBuffer> device_buffer_;
std::unique_ptr<CudaBufferWriter> writer_;
};
TEST_F(TestCudaBufferWriter, UnbufferedWrites) {
const int64_t kTotalSize = 1 << 16;
Allocate(kTotalSize);
TestWrites(kTotalSize, 1000);
}
TEST_F(TestCudaBufferWriter, BufferedWrites) {
const int64_t kTotalSize = 1 << 16;
Allocate(kTotalSize);
TestWrites(kTotalSize, 1000, 1 << 12);
}
TEST_F(TestCudaBufferWriter, EdgeCases) {
Allocate(1000);
std::shared_ptr<ResizableBuffer> buffer;
ASSERT_OK(MakeRandomByteBuffer(1000, default_memory_pool(), &buffer));
const uint8_t* host_data = buffer->data();
ASSERT_EQ(0, writer_->buffer_size());
ASSERT_OK(writer_->SetBufferSize(100));
ASSERT_EQ(100, writer_->buffer_size());
// Write 0 bytes
ASSERT_OK(writer_->Write(host_data, 0));
ASSERT_OK_AND_EQ(0, writer_->Tell());
// Write some data, then change buffer size
ASSERT_OK(writer_->Write(host_data, 10));
ASSERT_OK(writer_->SetBufferSize(200));
ASSERT_EQ(200, writer_->buffer_size());
ASSERT_EQ(0, writer_->num_bytes_buffered());
// Write more than buffer size
ASSERT_OK(writer_->Write(host_data + 10, 300));
ASSERT_EQ(0, writer_->num_bytes_buffered());
// Write exactly buffer size
ASSERT_OK(writer_->Write(host_data + 310, 200));
ASSERT_EQ(0, writer_->num_bytes_buffered());
// Write rest of bytes
ASSERT_OK(writer_->Write(host_data + 510, 390));
ASSERT_OK(writer_->Write(host_data + 900, 100));
// Close flushes
ASSERT_OK(writer_->Close());
// Check that everything was written
AssertCudaBufferEquals(*device_buffer_, Buffer(host_data, 1000));
}
// ------------------------------------------------------------------------
// Test CudaBufferReader
class TestCudaBufferReader : public TestCudaBase {
public:
void SetUp() { TestCudaBase::SetUp(); }
};
TEST_F(TestCudaBufferReader, Basics) {
std::shared_ptr<CudaBuffer> device_buffer;
const int64_t size = 1000;
ASSERT_OK_AND_ASSIGN(device_buffer, context_->Allocate(size));
std::shared_ptr<ResizableBuffer> buffer;
ASSERT_OK(MakeRandomByteBuffer(1000, default_memory_pool(), &buffer));
const uint8_t* host_data = buffer->data();
ASSERT_OK(device_buffer->CopyFromHost(0, host_data, 1000));
CudaBufferReader reader(device_buffer);
uint8_t stack_buffer[100] = {0};
ASSERT_OK(reader.Seek(950));
ASSERT_OK_AND_EQ(950, reader.Tell());
// Read() to host memory
ASSERT_OK_AND_EQ(50, reader.Read(100, stack_buffer));
ASSERT_EQ(0, std::memcmp(stack_buffer, host_data + 950, 50));
ASSERT_OK_AND_EQ(1000, reader.Tell());
// ReadAt() to host memory
ASSERT_OK_AND_EQ(45, reader.ReadAt(123, 45, stack_buffer));
ASSERT_EQ(0, std::memcmp(stack_buffer, host_data + 123, 45));
ASSERT_OK_AND_EQ(1000, reader.Tell());
// Read() to device buffer
ASSERT_OK(reader.Seek(925));
ASSERT_OK_AND_ASSIGN(auto tmp, reader.Read(100));
ASSERT_EQ(75, tmp->size());
ASSERT_FALSE(tmp->is_cpu());
ASSERT_EQ(*tmp->device(), *device_);
ASSERT_OK_AND_EQ(1000, reader.Tell());
ASSERT_OK(std::dynamic_pointer_cast<CudaBuffer>(tmp)->CopyToHost(0, tmp->size(),
stack_buffer));
ASSERT_EQ(0, std::memcmp(stack_buffer, host_data + 925, tmp->size()));
// ReadAt() to device buffer
ASSERT_OK(reader.Seek(42));
ASSERT_OK_AND_ASSIGN(tmp, reader.ReadAt(980, 30));
ASSERT_EQ(20, tmp->size());
ASSERT_FALSE(tmp->is_cpu());
ASSERT_EQ(*tmp->device(), *device_);
ASSERT_OK_AND_EQ(42, reader.Tell());
ASSERT_OK(std::dynamic_pointer_cast<CudaBuffer>(tmp)->CopyToHost(0, tmp->size(),
stack_buffer));
ASSERT_EQ(0, std::memcmp(stack_buffer, host_data + 980, tmp->size()));
}
TEST_F(TestCudaBufferReader, WillNeed) {
std::shared_ptr<CudaBuffer> device_buffer;
const int64_t size = 1000;
ASSERT_OK_AND_ASSIGN(device_buffer, context_->Allocate(size));
CudaBufferReader reader(device_buffer);
ASSERT_OK(reader.WillNeed({{0, size}}));
}
// ------------------------------------------------------------------------
// Test Cuda IPC
class TestCudaArrowIpc : public TestCudaBase {
public:
void SetUp() {
TestCudaBase::SetUp();
pool_ = default_memory_pool();
}
protected:
MemoryPool* pool_;
};
TEST_F(TestCudaArrowIpc, BasicWriteRead) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(ipc::test::MakeIntRecordBatch(&batch));
std::shared_ptr<CudaBuffer> device_serialized;
ASSERT_OK_AND_ASSIGN(device_serialized, SerializeRecordBatch(*batch, context_.get()));
// Test that ReadRecordBatch works properly
ipc::DictionaryMemo unused_memo;
std::shared_ptr<RecordBatch> device_batch;
ASSERT_OK_AND_ASSIGN(device_batch,
ReadRecordBatch(batch->schema(), &unused_memo, device_serialized));
ASSERT_OK(device_batch->Validate());
// Copy data from device, read batch, and compare
int64_t size = device_serialized->size();
ASSERT_OK_AND_ASSIGN(auto host_buffer, AllocateBuffer(size, pool_));
ASSERT_OK(device_serialized->CopyToHost(0, size, host_buffer->mutable_data()));
std::shared_ptr<RecordBatch> cpu_batch;
io::BufferReader cpu_reader(std::move(host_buffer));
ASSERT_OK_AND_ASSIGN(
cpu_batch, ipc::ReadRecordBatch(batch->schema(), &unused_memo,
ipc::IpcReadOptions::Defaults(), &cpu_reader));
CompareBatch(*batch, *cpu_batch);
}
TEST_F(TestCudaArrowIpc, DictionaryWriteRead) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(ipc::test::MakeDictionary(&batch));
ipc::DictionaryMemo dictionary_memo;
ASSERT_OK(ipc::internal::CollectDictionaries(*batch, &dictionary_memo));
std::shared_ptr<CudaBuffer> device_serialized;
ASSERT_OK_AND_ASSIGN(device_serialized, SerializeRecordBatch(*batch, context_.get()));
// Test that ReadRecordBatch works properly
std::shared_ptr<RecordBatch> device_batch;
ASSERT_OK_AND_ASSIGN(device_batch, ReadRecordBatch(batch->schema(), &dictionary_memo,
device_serialized));
// Copy data from device, read batch, and compare
int64_t size = device_serialized->size();
ASSERT_OK_AND_ASSIGN(auto host_buffer, AllocateBuffer(size, pool_));
ASSERT_OK(device_serialized->CopyToHost(0, size, host_buffer->mutable_data()));
std::shared_ptr<RecordBatch> cpu_batch;
io::BufferReader cpu_reader(std::move(host_buffer));
ASSERT_OK_AND_ASSIGN(
cpu_batch, ipc::ReadRecordBatch(batch->schema(), &dictionary_memo,
ipc::IpcReadOptions::Defaults(), &cpu_reader));
CompareBatch(*batch, *cpu_batch);
}
} // namespace cuda
} // namespace arrow