| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.bookkeeper.bookie.storage.ldb; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.ByteBufAllocator; |
| import io.netty.buffer.ByteBufUtil; |
| import io.netty.buffer.Unpooled; |
| import io.netty.buffer.UnpooledByteBufAllocator; |
| |
| import java.nio.charset.Charset; |
| import java.util.concurrent.BrokenBarrierException; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.junit.Test; |
| |
| /** |
| * Unit test for {@link WriteCache}. |
| */ |
| public class WriteCacheTest { |
| |
| private static final ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT; |
| |
| @Test |
| public void simple() throws Exception { |
| WriteCache cache = new WriteCache(allocator, 10 * 1024); |
| |
| ByteBuf entry1 = allocator.buffer(1024); |
| ByteBufUtil.writeUtf8(entry1, "entry-1"); |
| entry1.writerIndex(entry1.capacity()); |
| |
| assertTrue(cache.isEmpty()); |
| assertEquals(0, cache.count()); |
| assertEquals(0, cache.size()); |
| |
| cache.put(1, 1, entry1); |
| |
| assertFalse(cache.isEmpty()); |
| assertEquals(1, cache.count()); |
| assertEquals(entry1.readableBytes(), cache.size()); |
| |
| assertEquals(entry1, cache.get(1, 1)); |
| assertNull(cache.get(1, 2)); |
| assertNull(cache.get(2, 1)); |
| |
| assertEquals(entry1, cache.getLastEntry(1)); |
| assertEquals(null, cache.getLastEntry(2)); |
| |
| cache.clear(); |
| |
| assertTrue(cache.isEmpty()); |
| assertEquals(0, cache.count()); |
| assertEquals(0, cache.size()); |
| |
| entry1.release(); |
| cache.close(); |
| } |
| |
| @Test |
| public void cacheFull() throws Exception { |
| int cacheSize = 10 * 1024; |
| int entrySize = 1024; |
| int entriesCount = cacheSize / entrySize; |
| |
| WriteCache cache = new WriteCache(allocator, cacheSize); |
| |
| ByteBuf entry = allocator.buffer(entrySize); |
| entry.writerIndex(entry.capacity()); |
| |
| for (int i = 0; i < entriesCount; i++) { |
| assertTrue(cache.put(1, i, entry)); |
| } |
| |
| assertFalse(cache.put(1, 11, entry)); |
| |
| assertFalse(cache.isEmpty()); |
| assertEquals(entriesCount, cache.count()); |
| assertEquals(cacheSize, cache.size()); |
| |
| AtomicInteger findCount = new AtomicInteger(0); |
| cache.forEach((ledgerId, entryId, data) -> { |
| findCount.incrementAndGet(); |
| }); |
| |
| assertEquals(entriesCount, findCount.get()); |
| |
| cache.deleteLedger(1); |
| |
| findCount.set(0); |
| cache.forEach((ledgerId, entryId, data) -> { |
| findCount.incrementAndGet(); |
| }); |
| |
| assertEquals(0, findCount.get()); |
| |
| entry.release(); |
| cache.close(); |
| } |
| |
| @Test |
| public void testMultipleSegments() { |
| // Create cache with max size 1Mb and each segment is 16Kb |
| WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024); |
| |
| ByteBuf entry = Unpooled.buffer(1024); |
| entry.writerIndex(entry.capacity()); |
| |
| for (int i = 0; i < 48; i++) { |
| cache.put(1, i, entry); |
| } |
| |
| assertEquals(48, cache.count()); |
| assertEquals(48 * 1024, cache.size()); |
| |
| cache.close(); |
| } |
| |
| @Test |
| public void testEmptyCache() { |
| WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024); |
| |
| assertEquals(0, cache.count()); |
| assertEquals(0, cache.size()); |
| assertTrue(cache.isEmpty()); |
| |
| AtomicLong foundEntries = new AtomicLong(); |
| cache.forEach((ledgerId, entryId, entry) -> { |
| foundEntries.incrementAndGet(); |
| }); |
| |
| assertEquals(0, foundEntries.get()); |
| cache.close(); |
| } |
| |
| @Test |
| public void testMultipleWriters() throws Exception { |
| // Create cache with max size 1Mb and each segment is 16Kb |
| WriteCache cache = new WriteCache(allocator, 10 * 1024 * 1024, 16 * 1024); |
| |
| ExecutorService executor = Executors.newCachedThreadPool(); |
| |
| int numThreads = 10; |
| int entriesPerThread = 10 * 1024 / numThreads; |
| |
| CyclicBarrier barrier = new CyclicBarrier(numThreads); |
| CountDownLatch latch = new CountDownLatch(numThreads); |
| |
| for (int i = 0; i < numThreads; i++) { |
| int ledgerId = i; |
| |
| executor.submit(() -> { |
| try { |
| barrier.await(); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(ie); |
| } catch (BrokenBarrierException e) { |
| throw new RuntimeException(e); |
| } |
| |
| ByteBuf entry = Unpooled.buffer(1024); |
| entry.writerIndex(entry.capacity()); |
| |
| for (int entryId = 0; entryId < entriesPerThread; entryId++) { |
| assertTrue(cache.put(ledgerId, entryId, entry)); |
| } |
| |
| latch.countDown(); |
| }); |
| } |
| |
| // Wait for all tasks to be completed |
| latch.await(); |
| |
| // assertEquals(numThreads * entriesPerThread, cache.count()); |
| assertEquals(cache.count() * 1024, cache.size()); |
| |
| // Verify entries by iterating over write cache |
| AtomicLong currentLedgerId = new AtomicLong(0); |
| AtomicLong currentEntryId = new AtomicLong(0); |
| |
| cache.forEach((ledgerId, entryId, entry) -> { |
| assertEquals(currentLedgerId.get(), ledgerId); |
| assertEquals(currentEntryId.get(), entryId); |
| |
| if (currentEntryId.incrementAndGet() == entriesPerThread) { |
| currentLedgerId.incrementAndGet(); |
| currentEntryId.set(0); |
| } |
| }); |
| |
| cache.close(); |
| executor.shutdown(); |
| } |
| |
| @Test |
| public void testLedgerDeletion() { |
| WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024); |
| |
| ByteBuf entry = Unpooled.buffer(1024); |
| entry.writerIndex(entry.capacity()); |
| |
| for (long ledgerId = 0; ledgerId < 10; ledgerId++) { |
| for (int entryId = 0; entryId < 10; entryId++) { |
| cache.put(ledgerId, entryId, entry); |
| } |
| } |
| |
| assertEquals(100, cache.count()); |
| assertEquals(100 * 1024, cache.size()); |
| |
| cache.deleteLedger(5); |
| |
| // Entries are not immediately deleted, just ignored on scan |
| assertEquals(100, cache.count()); |
| assertEquals(100 * 1024, cache.size()); |
| |
| // Verify entries by iterating over write cache |
| AtomicLong currentLedgerId = new AtomicLong(0); |
| AtomicLong currentEntryId = new AtomicLong(0); |
| |
| cache.forEach((ledgerId, entryId, e) -> { |
| assertEquals(currentLedgerId.get(), ledgerId); |
| assertEquals(currentEntryId.get(), entryId); |
| |
| if (currentEntryId.incrementAndGet() == 10) { |
| currentLedgerId.incrementAndGet(); |
| currentEntryId.set(0); |
| |
| if (currentLedgerId.get() == 5) { |
| // Ledger 5 was deleted |
| currentLedgerId.incrementAndGet(); |
| } |
| } |
| }); |
| |
| cache.close(); |
| } |
| |
| @Test |
| public void testWriteReadsInMultipleSegments() { |
| // Create cache with max size 4 KB and each segment is 128 bytes |
| WriteCache cache = new WriteCache(allocator, 4 * 1024, 128); |
| |
| for (int i = 0; i < 48; i++) { |
| boolean inserted = cache.put(1, i, Unpooled.wrappedBuffer(("test-" + i).getBytes())); |
| assertTrue(inserted); |
| } |
| |
| assertEquals(48, cache.count()); |
| |
| for (int i = 0; i < 48; i++) { |
| ByteBuf b = cache.get(1, i); |
| |
| assertEquals("test-" + i, b.toString(Charset.forName("UTF-8"))); |
| } |
| |
| cache.close(); |
| } |
| |
| } |