blob: cdb6d215e88163eb8f5aa7ad79dfcc8343b01868 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <cstdio>
#include <memory>
#include <string>
#include <vector>
#include <gtest/gtest.h>
#include "parquet/exception.h"
#include "parquet/util/memory.h"
#include "parquet/util/test-common.h"
using arrow::default_memory_pool;
using arrow::MemoryPool;
namespace parquet {
class TestBuffer : public ::testing::Test {};
// Utility class to call private functions on MemPool.
class ChunkedAllocatorTest {
public:
static bool CheckIntegrity(ChunkedAllocator* pool, bool current_chunk_empty) {
return pool->CheckIntegrity(current_chunk_empty);
}
static const int INITIAL_CHUNK_SIZE = ChunkedAllocator::INITIAL_CHUNK_SIZE;
static const int MAX_CHUNK_SIZE = ChunkedAllocator::MAX_CHUNK_SIZE;
};
const int ChunkedAllocatorTest::INITIAL_CHUNK_SIZE;
const int ChunkedAllocatorTest::MAX_CHUNK_SIZE;
TEST(ChunkedAllocatorTest, Basic) {
ChunkedAllocator p;
ChunkedAllocator p2;
ChunkedAllocator p3;
for (int iter = 0; iter < 2; ++iter) {
// allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes)
for (int i = 0; i < 768; ++i) {
// pads to 32 bytes
p.Allocate(25);
}
// we handed back 24K
EXPECT_EQ(24 * 1024, p.total_allocated_bytes());
// .. and allocated 28K of chunks (4, 8, 16)
EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes());
// we're passing on the first two chunks, containing 12K of data; we're left with
// one chunk of 16K containing 12K of data
p2.AcquireData(&p, true);
EXPECT_EQ(12 * 1024, p.total_allocated_bytes());
EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes());
// we allocate 8K, for which there isn't enough room in the current chunk,
// so another one is allocated (32K)
p.Allocate(8 * 1024);
EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes());
// we allocate 65K, which doesn't fit into the current chunk or the default
// size of the next allocated chunk (64K)
p.Allocate(65 * 1024);
EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes());
if (iter == 0) {
EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
} else {
EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
}
EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
// Clear() resets allocated data, but doesn't remove any chunks
p.Clear();
EXPECT_EQ(0, p.total_allocated_bytes());
if (iter == 0) {
EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
} else {
EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
}
EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
// next allocation reuses existing chunks
p.Allocate(1024);
EXPECT_EQ(1024, p.total_allocated_bytes());
if (iter == 0) {
EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
} else {
EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
}
EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
// ... unless it doesn't fit into any available chunk
p.Allocate(120 * 1024);
EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes());
if (iter == 0) {
EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes());
} else {
EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
}
EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
// ... Try another chunk that fits into an existing chunk
p.Allocate(33 * 1024);
EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes());
EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
// we're releasing 3 chunks, which get added to p2
p2.AcquireData(&p, false);
EXPECT_EQ(0, p.total_allocated_bytes());
EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
EXPECT_EQ(0, p.GetTotalChunkSizes());
p3.AcquireData(&p2, true); // we're keeping the 65k chunk
EXPECT_EQ(33 * 1024, p2.total_allocated_bytes());
EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes());
p.FreeAll();
p2.FreeAll();
p3.FreeAll();
}
}
// Test that we can keep an allocated chunk and a free chunk.
// This case verifies that when chunks are acquired by another memory pool the
// remaining chunks are consistent if there were more than one used chunk and some
// free chunks.
TEST(ChunkedAllocatorTest, Keep) {
ChunkedAllocator p;
p.Allocate(4 * 1024);
p.Allocate(8 * 1024);
p.Allocate(16 * 1024);
EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes());
EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
p.Clear();
EXPECT_EQ(0, p.total_allocated_bytes());
EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
p.Allocate(1 * 1024);
p.Allocate(4 * 1024);
EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes());
EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
ChunkedAllocator p2;
p2.AcquireData(&p, true);
EXPECT_EQ(4 * 1024, p.total_allocated_bytes());
EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes());
EXPECT_EQ(1 * 1024, p2.total_allocated_bytes());
EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes());
p.FreeAll();
p2.FreeAll();
}
// Tests that we can return partial allocations.
TEST(ChunkedAllocatorTest, ReturnPartial) {
ChunkedAllocator p;
uint8_t* ptr = p.Allocate(1024);
EXPECT_EQ(1024, p.total_allocated_bytes());
memset(ptr, 0, 1024);
p.ReturnPartialAllocation(1024);
uint8_t* ptr2 = p.Allocate(1024);
EXPECT_EQ(1024, p.total_allocated_bytes());
EXPECT_TRUE(ptr == ptr2);
p.ReturnPartialAllocation(1016);
ptr2 = p.Allocate(1016);
EXPECT_EQ(1024, p.total_allocated_bytes());
EXPECT_TRUE(ptr2 == ptr + 8);
p.ReturnPartialAllocation(512);
memset(ptr2, 1, 1016 - 512);
uint8_t* ptr3 = p.Allocate(512);
EXPECT_EQ(1024, p.total_allocated_bytes());
EXPECT_TRUE(ptr3 == ptr + 512);
memset(ptr3, 2, 512);
for (int i = 0; i < 8; ++i) {
EXPECT_EQ(0, ptr[i]);
}
for (int i = 8; i < 512; ++i) {
EXPECT_EQ(1, ptr[i]);
}
for (int i = 512; i < 1024; ++i) {
EXPECT_EQ(2, ptr[i]);
}
p.FreeAll();
}
// Test that the ChunkedAllocator overhead is bounded when we make allocations of
// INITIAL_CHUNK_SIZE.
TEST(ChunkedAllocatorTest, MemoryOverhead) {
ChunkedAllocator p;
const int alloc_size = ChunkedAllocatorTest::INITIAL_CHUNK_SIZE;
const int num_allocs = 1000;
int64_t total_allocated = 0;
for (int i = 0; i < num_allocs; ++i) {
uint8_t* mem = p.Allocate(alloc_size);
ASSERT_TRUE(mem != NULL);
total_allocated += alloc_size;
int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
// The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most
// one empty chunk at the end.
EXPECT_LE(wasted_memory, ChunkedAllocatorTest::MAX_CHUNK_SIZE);
// The chunk doubling algorithm should not allocate chunks larger than the total
// amount of memory already allocated.
EXPECT_LE(wasted_memory, total_allocated);
}
p.FreeAll();
}
// Test that the ChunkedAllocator overhead is bounded when we make alternating
// large and small allocations.
TEST(ChunkedAllocatorTest, FragmentationOverhead) {
ChunkedAllocator p;
const int num_allocs = 100;
int64_t total_allocated = 0;
for (int i = 0; i < num_allocs; ++i) {
int alloc_size = i % 2 == 0 ? 1 : ChunkedAllocatorTest::MAX_CHUNK_SIZE;
uint8_t* mem = p.Allocate(alloc_size);
ASSERT_TRUE(mem != NULL);
total_allocated += alloc_size;
int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
// Fragmentation should not waste more than half of each completed chunk.
EXPECT_LE(wasted_memory, total_allocated + ChunkedAllocatorTest::MAX_CHUNK_SIZE);
}
p.FreeAll();
}
TEST(TestBufferedInputStream, Basics) {
int64_t source_size = 256;
int64_t stream_offset = 10;
int64_t stream_size = source_size - stream_offset;
int64_t chunk_size = 50;
std::shared_ptr<PoolBuffer> buf = AllocateBuffer(default_memory_pool(), source_size);
ASSERT_EQ(source_size, buf->size());
for (int i = 0; i < source_size; i++) {
buf->mutable_data()[i] = i;
}
auto wrapper =
std::make_shared<ArrowInputFile>(std::make_shared<::arrow::io::BufferReader>(buf));
std::unique_ptr<BufferedInputStream> stream(new BufferedInputStream(
default_memory_pool(), chunk_size, wrapper.get(), stream_offset, stream_size));
const uint8_t* output;
int64_t bytes_read;
// source is at offset 10
output = stream->Peek(10, &bytes_read);
ASSERT_EQ(10, bytes_read);
for (int i = 0; i < 10; i++) {
ASSERT_EQ(10 + i, output[i]) << i;
}
output = stream->Read(10, &bytes_read);
ASSERT_EQ(10, bytes_read);
for (int i = 0; i < 10; i++) {
ASSERT_EQ(10 + i, output[i]) << i;
}
output = stream->Read(10, &bytes_read);
ASSERT_EQ(10, bytes_read);
for (int i = 0; i < 10; i++) {
ASSERT_EQ(20 + i, output[i]) << i;
}
stream->Advance(5);
stream->Advance(5);
// source is at offset 40
// read across buffer boundary. buffer size is 50
output = stream->Read(20, &bytes_read);
ASSERT_EQ(20, bytes_read);
for (int i = 0; i < 20; i++) {
ASSERT_EQ(40 + i, output[i]) << i;
}
// read more than original chunk_size
output = stream->Read(60, &bytes_read);
ASSERT_EQ(60, bytes_read);
for (int i = 0; i < 60; i++) {
ASSERT_EQ(60 + i, output[i]) << i;
}
stream->Advance(120);
// source is at offset 240
// read outside of source boundary. source size is 256
output = stream->Read(30, &bytes_read);
ASSERT_EQ(16, bytes_read);
for (int i = 0; i < 16; i++) {
ASSERT_EQ(240 + i, output[i]) << i;
}
}
TEST(TestArrowInputFile, Basics) {
std::string data = "this is the data";
auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str());
auto file = std::make_shared<::arrow::io::BufferReader>(data_buffer, data.size());
auto source = std::make_shared<ArrowInputFile>(file);
ASSERT_EQ(0, source->Tell());
uint8_t buffer[50];
ASSERT_NO_THROW(source->ReadAt(0, 4, buffer));
ASSERT_EQ(0, std::memcmp(buffer, "this", 4));
ASSERT_EQ(4, source->Tell());
std::shared_ptr<Buffer> pq_buffer;
ASSERT_NO_THROW(pq_buffer = source->Read(7));
auto expected_buffer = std::make_shared<Buffer>(data_buffer + 4, 7);
ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get()));
}
} // namespace parquet