| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <cstdint> |
| #include <cstdio> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include <gtest/gtest.h> |
| |
| #include "parquet/exception.h" |
| #include "parquet/util/memory.h" |
| #include "parquet/util/test-common.h" |
| |
| using arrow::default_memory_pool; |
| using arrow::MemoryPool; |
| |
| namespace parquet { |
| |
| class TestBuffer : public ::testing::Test {}; |
| |
| // Utility class to call private functions on MemPool. |
| class ChunkedAllocatorTest { |
| public: |
| static bool CheckIntegrity(ChunkedAllocator* pool, bool current_chunk_empty) { |
| return pool->CheckIntegrity(current_chunk_empty); |
| } |
| |
| static const int INITIAL_CHUNK_SIZE = ChunkedAllocator::INITIAL_CHUNK_SIZE; |
| static const int MAX_CHUNK_SIZE = ChunkedAllocator::MAX_CHUNK_SIZE; |
| }; |
| |
| const int ChunkedAllocatorTest::INITIAL_CHUNK_SIZE; |
| const int ChunkedAllocatorTest::MAX_CHUNK_SIZE; |
| |
| TEST(ChunkedAllocatorTest, Basic) { |
| ChunkedAllocator p; |
| ChunkedAllocator p2; |
| ChunkedAllocator p3; |
| |
| for (int iter = 0; iter < 2; ++iter) { |
| // allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes) |
| for (int i = 0; i < 768; ++i) { |
| // pads to 32 bytes |
| p.Allocate(25); |
| } |
| // we handed back 24K |
| EXPECT_EQ(24 * 1024, p.total_allocated_bytes()); |
| // .. and allocated 28K of chunks (4, 8, 16) |
| EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes()); |
| |
| // we're passing on the first two chunks, containing 12K of data; we're left with |
| // one chunk of 16K containing 12K of data |
| p2.AcquireData(&p, true); |
| EXPECT_EQ(12 * 1024, p.total_allocated_bytes()); |
| EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes()); |
| |
| // we allocate 8K, for which there isn't enough room in the current chunk, |
| // so another one is allocated (32K) |
| p.Allocate(8 * 1024); |
| EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes()); |
| |
| // we allocate 65K, which doesn't fit into the current chunk or the default |
| // size of the next allocated chunk (64K) |
| p.Allocate(65 * 1024); |
| EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes()); |
| if (iter == 0) { |
| EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); |
| } else { |
| EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); |
| } |
| EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); |
| |
| // Clear() resets allocated data, but doesn't remove any chunks |
| p.Clear(); |
| EXPECT_EQ(0, p.total_allocated_bytes()); |
| if (iter == 0) { |
| EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); |
| } else { |
| EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); |
| } |
| EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); |
| |
| // next allocation reuses existing chunks |
| p.Allocate(1024); |
| EXPECT_EQ(1024, p.total_allocated_bytes()); |
| if (iter == 0) { |
| EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); |
| } else { |
| EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); |
| } |
| EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); |
| |
| // ... unless it doesn't fit into any available chunk |
| p.Allocate(120 * 1024); |
| EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes()); |
| if (iter == 0) { |
| EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes()); |
| } else { |
| EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); |
| } |
| EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); |
| |
| // ... Try another chunk that fits into an existing chunk |
| p.Allocate(33 * 1024); |
| EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes()); |
| EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); |
| |
| // we're releasing 3 chunks, which get added to p2 |
| p2.AcquireData(&p, false); |
| EXPECT_EQ(0, p.total_allocated_bytes()); |
| EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); |
| EXPECT_EQ(0, p.GetTotalChunkSizes()); |
| |
| p3.AcquireData(&p2, true); // we're keeping the 65k chunk |
| EXPECT_EQ(33 * 1024, p2.total_allocated_bytes()); |
| EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes()); |
| |
| p.FreeAll(); |
| p2.FreeAll(); |
| p3.FreeAll(); |
| } |
| } |
| |
| // Test that we can keep an allocated chunk and a free chunk. |
| // This case verifies that when chunks are acquired by another memory pool the |
| // remaining chunks are consistent if there were more than one used chunk and some |
| // free chunks. |
| TEST(ChunkedAllocatorTest, Keep) { |
| ChunkedAllocator p; |
| p.Allocate(4 * 1024); |
| p.Allocate(8 * 1024); |
| p.Allocate(16 * 1024); |
| EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes()); |
| EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); |
| p.Clear(); |
| EXPECT_EQ(0, p.total_allocated_bytes()); |
| EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); |
| p.Allocate(1 * 1024); |
| p.Allocate(4 * 1024); |
| EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes()); |
| EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); |
| |
| ChunkedAllocator p2; |
| p2.AcquireData(&p, true); |
| EXPECT_EQ(4 * 1024, p.total_allocated_bytes()); |
| EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes()); |
| EXPECT_EQ(1 * 1024, p2.total_allocated_bytes()); |
| EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes()); |
| |
| p.FreeAll(); |
| p2.FreeAll(); |
| } |
| |
| // Tests that we can return partial allocations. |
| TEST(ChunkedAllocatorTest, ReturnPartial) { |
| ChunkedAllocator p; |
| uint8_t* ptr = p.Allocate(1024); |
| EXPECT_EQ(1024, p.total_allocated_bytes()); |
| memset(ptr, 0, 1024); |
| p.ReturnPartialAllocation(1024); |
| |
| uint8_t* ptr2 = p.Allocate(1024); |
| EXPECT_EQ(1024, p.total_allocated_bytes()); |
| EXPECT_TRUE(ptr == ptr2); |
| p.ReturnPartialAllocation(1016); |
| |
| ptr2 = p.Allocate(1016); |
| EXPECT_EQ(1024, p.total_allocated_bytes()); |
| EXPECT_TRUE(ptr2 == ptr + 8); |
| p.ReturnPartialAllocation(512); |
| memset(ptr2, 1, 1016 - 512); |
| |
| uint8_t* ptr3 = p.Allocate(512); |
| EXPECT_EQ(1024, p.total_allocated_bytes()); |
| EXPECT_TRUE(ptr3 == ptr + 512); |
| memset(ptr3, 2, 512); |
| |
| for (int i = 0; i < 8; ++i) { |
| EXPECT_EQ(0, ptr[i]); |
| } |
| for (int i = 8; i < 512; ++i) { |
| EXPECT_EQ(1, ptr[i]); |
| } |
| for (int i = 512; i < 1024; ++i) { |
| EXPECT_EQ(2, ptr[i]); |
| } |
| |
| p.FreeAll(); |
| } |
| |
| // Test that the ChunkedAllocator overhead is bounded when we make allocations of |
| // INITIAL_CHUNK_SIZE. |
| TEST(ChunkedAllocatorTest, MemoryOverhead) { |
| ChunkedAllocator p; |
| const int alloc_size = ChunkedAllocatorTest::INITIAL_CHUNK_SIZE; |
| const int num_allocs = 1000; |
| int64_t total_allocated = 0; |
| |
| for (int i = 0; i < num_allocs; ++i) { |
| uint8_t* mem = p.Allocate(alloc_size); |
| ASSERT_TRUE(mem != NULL); |
| total_allocated += alloc_size; |
| |
| int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated; |
| // The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most |
| // one empty chunk at the end. |
| EXPECT_LE(wasted_memory, ChunkedAllocatorTest::MAX_CHUNK_SIZE); |
| // The chunk doubling algorithm should not allocate chunks larger than the total |
| // amount of memory already allocated. |
| EXPECT_LE(wasted_memory, total_allocated); |
| } |
| |
| p.FreeAll(); |
| } |
| |
| // Test that the ChunkedAllocator overhead is bounded when we make alternating |
| // large and small allocations. |
| TEST(ChunkedAllocatorTest, FragmentationOverhead) { |
| ChunkedAllocator p; |
| const int num_allocs = 100; |
| int64_t total_allocated = 0; |
| |
| for (int i = 0; i < num_allocs; ++i) { |
| int alloc_size = i % 2 == 0 ? 1 : ChunkedAllocatorTest::MAX_CHUNK_SIZE; |
| uint8_t* mem = p.Allocate(alloc_size); |
| ASSERT_TRUE(mem != NULL); |
| total_allocated += alloc_size; |
| |
| int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated; |
| // Fragmentation should not waste more than half of each completed chunk. |
| EXPECT_LE(wasted_memory, total_allocated + ChunkedAllocatorTest::MAX_CHUNK_SIZE); |
| } |
| |
| p.FreeAll(); |
| } |
| |
| TEST(TestBufferedInputStream, Basics) { |
| int64_t source_size = 256; |
| int64_t stream_offset = 10; |
| int64_t stream_size = source_size - stream_offset; |
| int64_t chunk_size = 50; |
| std::shared_ptr<PoolBuffer> buf = AllocateBuffer(default_memory_pool(), source_size); |
| ASSERT_EQ(source_size, buf->size()); |
| for (int i = 0; i < source_size; i++) { |
| buf->mutable_data()[i] = i; |
| } |
| |
| auto wrapper = |
| std::make_shared<ArrowInputFile>(std::make_shared<::arrow::io::BufferReader>(buf)); |
| |
| std::unique_ptr<BufferedInputStream> stream(new BufferedInputStream( |
| default_memory_pool(), chunk_size, wrapper.get(), stream_offset, stream_size)); |
| |
| const uint8_t* output; |
| int64_t bytes_read; |
| |
| // source is at offset 10 |
| output = stream->Peek(10, &bytes_read); |
| ASSERT_EQ(10, bytes_read); |
| for (int i = 0; i < 10; i++) { |
| ASSERT_EQ(10 + i, output[i]) << i; |
| } |
| output = stream->Read(10, &bytes_read); |
| ASSERT_EQ(10, bytes_read); |
| for (int i = 0; i < 10; i++) { |
| ASSERT_EQ(10 + i, output[i]) << i; |
| } |
| output = stream->Read(10, &bytes_read); |
| ASSERT_EQ(10, bytes_read); |
| for (int i = 0; i < 10; i++) { |
| ASSERT_EQ(20 + i, output[i]) << i; |
| } |
| stream->Advance(5); |
| stream->Advance(5); |
| // source is at offset 40 |
| // read across buffer boundary. buffer size is 50 |
| output = stream->Read(20, &bytes_read); |
| ASSERT_EQ(20, bytes_read); |
| for (int i = 0; i < 20; i++) { |
| ASSERT_EQ(40 + i, output[i]) << i; |
| } |
| // read more than original chunk_size |
| output = stream->Read(60, &bytes_read); |
| ASSERT_EQ(60, bytes_read); |
| for (int i = 0; i < 60; i++) { |
| ASSERT_EQ(60 + i, output[i]) << i; |
| } |
| |
| stream->Advance(120); |
| // source is at offset 240 |
| // read outside of source boundary. source size is 256 |
| output = stream->Read(30, &bytes_read); |
| ASSERT_EQ(16, bytes_read); |
| for (int i = 0; i < 16; i++) { |
| ASSERT_EQ(240 + i, output[i]) << i; |
| } |
| } |
| |
| TEST(TestArrowInputFile, Basics) { |
| std::string data = "this is the data"; |
| auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str()); |
| |
| auto file = std::make_shared<::arrow::io::BufferReader>(data_buffer, data.size()); |
| auto source = std::make_shared<ArrowInputFile>(file); |
| |
| ASSERT_EQ(0, source->Tell()); |
| |
| uint8_t buffer[50]; |
| |
| ASSERT_NO_THROW(source->ReadAt(0, 4, buffer)); |
| ASSERT_EQ(0, std::memcmp(buffer, "this", 4)); |
| ASSERT_EQ(4, source->Tell()); |
| |
| std::shared_ptr<Buffer> pq_buffer; |
| |
| ASSERT_NO_THROW(pq_buffer = source->Read(7)); |
| |
| auto expected_buffer = std::make_shared<Buffer>(data_buffer + 4, 7); |
| |
| ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get())); |
| } |
| |
| } // namespace parquet |