| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.bookkeeper.bookie; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import com.google.common.collect.Sets; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| import io.netty.buffer.UnpooledByteBufAllocator; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.nio.channels.FileChannel; |
| import java.nio.charset.Charset; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.BrokenBarrierException; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLongArray; |
| import java.util.concurrent.locks.Lock; |
| |
| import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; |
| import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; |
| import org.apache.bookkeeper.common.testing.annotations.FlakyTest; |
| import org.apache.bookkeeper.conf.ServerConfiguration; |
| import org.apache.bookkeeper.conf.TestBKConfiguration; |
| import org.apache.bookkeeper.util.DiskChecker; |
| import org.apache.bookkeeper.util.IOUtils; |
| import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap; |
| import org.apache.commons.io.FileUtils; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.FixMethodOrder; |
| import org.junit.Test; |
| import org.junit.runners.MethodSorters; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Tests for EntryLog. |
| */ |
| @FixMethodOrder(MethodSorters.NAME_ASCENDING) |
| public class TestEntryLog { |
| private static final Logger LOG = LoggerFactory.getLogger(TestEntryLog.class); |
| |
| final List<File> tempDirs = new ArrayList<File>(); |
| final Random rand = new Random(); |
| |
| File createTempDir(String prefix, String suffix) throws IOException { |
| File dir = IOUtils.createTempDir(prefix, suffix); |
| tempDirs.add(dir); |
| return dir; |
| } |
| |
| private File rootDir; |
| private File curDir; |
| private ServerConfiguration conf; |
| private LedgerDirsManager dirsMgr; |
| private EntryLogger entryLogger; |
| |
| @Before |
| public void setUp() throws Exception { |
| this.rootDir = createTempDir("bkTest", ".dir"); |
| this.curDir = BookieImpl.getCurrentDirectory(rootDir); |
| BookieImpl.checkDirectoryStructure(curDir); |
| this.conf = TestBKConfiguration.newServerConfiguration(); |
| this.dirsMgr = new LedgerDirsManager( |
| conf, |
| new File[] { rootDir }, |
| new DiskChecker( |
| conf.getDiskUsageThreshold(), |
| conf.getDiskUsageWarnThreshold())); |
| this.entryLogger = new EntryLogger(conf, dirsMgr); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| if (null != this.entryLogger) { |
| entryLogger.shutdown(); |
| } |
| |
| for (File dir : tempDirs) { |
| FileUtils.deleteDirectory(dir); |
| } |
| tempDirs.clear(); |
| } |
| |
| @Test |
| public void testDeferCreateNewLog() throws Exception { |
| entryLogger.shutdown(); |
| |
| // mark `curDir` as filled |
| this.conf.setMinUsableSizeForEntryLogCreation(1); |
| this.dirsMgr = new LedgerDirsManager( |
| conf, |
| new File[] { rootDir }, |
| new DiskChecker( |
| conf.getDiskUsageThreshold(), |
| conf.getDiskUsageWarnThreshold())); |
| this.dirsMgr.addToFilledDirs(curDir); |
| |
| entryLogger = new EntryLogger(conf, dirsMgr); |
| EntryLogManagerForSingleEntryLog entryLogManager = |
| (EntryLogManagerForSingleEntryLog) entryLogger.getEntryLogManager(); |
| assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId()); |
| |
| // add the first entry will trigger file creation |
| entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer()); |
| assertEquals(0L, entryLogManager.getCurrentLogId()); |
| } |
| |
| @Test |
| public void testDeferCreateNewLogWithoutEnoughDiskSpaces() throws Exception { |
| entryLogger.shutdown(); |
| |
| // mark `curDir` as filled |
| this.conf.setMinUsableSizeForEntryLogCreation(Long.MAX_VALUE); |
| this.dirsMgr = new LedgerDirsManager( |
| conf, |
| new File[] { rootDir }, |
| new DiskChecker( |
| conf.getDiskUsageThreshold(), |
| conf.getDiskUsageWarnThreshold())); |
| this.dirsMgr.addToFilledDirs(curDir); |
| |
| entryLogger = new EntryLogger(conf, dirsMgr); |
| EntryLogManagerForSingleEntryLog entryLogManager = |
| (EntryLogManagerForSingleEntryLog) entryLogger.getEntryLogManager(); |
| assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId()); |
| |
| // add the first entry will trigger file creation |
| try { |
| entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer()); |
| fail("Should fail to append entry if there is no enough reserved space left"); |
| } catch (NoWritableLedgerDirException e) { |
| assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId()); |
| } |
| } |
| |
| @Test |
| public void testCorruptEntryLog() throws Exception { |
| // create some entries |
| entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer()); |
| entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer()); |
| entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer()); |
| entryLogger.flush(); |
| entryLogger.shutdown(); |
| // now lets truncate the file to corrupt the last entry, which simulates a partial write |
| File f = new File(curDir, "0.log"); |
| RandomAccessFile raf = new RandomAccessFile(f, "rw"); |
| raf.setLength(raf.length() - 10); |
| raf.close(); |
| // now see which ledgers are in the log |
| entryLogger = new EntryLogger(conf, dirsMgr); |
| |
| EntryLogMetadata meta = entryLogger.getEntryLogMetadata(0L); |
| LOG.info("Extracted Meta From Entry Log {}", meta); |
| assertTrue(meta.getLedgersMap().containsKey(1L)); |
| assertFalse(meta.getLedgersMap().containsKey(2L)); |
| assertTrue(meta.getLedgersMap().containsKey(3L)); |
| } |
| |
| private static ByteBuf generateEntry(long ledger, long entry) { |
| byte[] data = generateDataString(ledger, entry).getBytes(); |
| ByteBuf bb = Unpooled.buffer(8 + 8 + data.length); |
| bb.writeLong(ledger); |
| bb.writeLong(entry); |
| bb.writeBytes(data); |
| return bb; |
| } |
| |
| private ByteBuf generateEntry(long ledger, long entry, int length) { |
| ByteBuf bb = Unpooled.buffer(length); |
| bb.writeLong(ledger); |
| bb.writeLong(entry); |
| byte[] randbyteArray = new byte[length - 8 - 8]; |
| rand.nextBytes(randbyteArray); |
| bb.writeBytes(randbyteArray); |
| return bb; |
| } |
| |
| private static String generateDataString(long ledger, long entry) { |
| return ("ledger-" + ledger + "-" + entry); |
| } |
| |
| @Test |
| public void testMissingLogId() throws Exception { |
| // create some entries |
| int numLogs = 3; |
| int numEntries = 10; |
| long[][] positions = new long[2 * numLogs][]; |
| for (int i = 0; i < numLogs; i++) { |
| positions[i] = new long[numEntries]; |
| |
| EntryLogger logger = new EntryLogger(conf, dirsMgr); |
| for (int j = 0; j < numEntries; j++) { |
| positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer()); |
| } |
| logger.flush(); |
| logger.shutdown(); |
| } |
| // delete last log id |
| File lastLogId = new File(curDir, "lastId"); |
| lastLogId.delete(); |
| |
| // write another entries |
| for (int i = numLogs; i < 2 * numLogs; i++) { |
| positions[i] = new long[numEntries]; |
| |
| EntryLogger logger = new EntryLogger(conf, dirsMgr); |
| for (int j = 0; j < numEntries; j++) { |
| positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer()); |
| } |
| logger.flush(); |
| logger.shutdown(); |
| } |
| |
| EntryLogger newLogger = new EntryLogger(conf, dirsMgr); |
| for (int i = 0; i < (2 * numLogs + 1); i++) { |
| File logFile = new File(curDir, Long.toHexString(i) + ".log"); |
| assertTrue(logFile.exists()); |
| } |
| for (int i = 0; i < 2 * numLogs; i++) { |
| for (int j = 0; j < numEntries; j++) { |
| String expectedValue = "ledger-" + i + "-" + j; |
| ByteBuf value = newLogger.readEntry(i, j, positions[i][j]); |
| long ledgerId = value.readLong(); |
| long entryId = value.readLong(); |
| byte[] data = new byte[value.readableBytes()]; |
| value.readBytes(data); |
| value.release(); |
| assertEquals(i, ledgerId); |
| assertEquals(j, entryId); |
| assertEquals(expectedValue, new String(data)); |
| } |
| } |
| } |
| |
| /** |
| * Test that EntryLogger Should fail with FNFE, if entry logger directories does not exist. |
| */ |
| @Test |
| public void testEntryLoggerShouldThrowFNFEIfDirectoriesDoesNotExist() |
| throws Exception { |
| File tmpDir = createTempDir("bkTest", ".dir"); |
| EntryLogger entryLogger = null; |
| try { |
| entryLogger = new EntryLogger(conf, new LedgerDirsManager(conf, new File[] { tmpDir }, |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()))); |
| fail("Expecting FileNotFoundException"); |
| } catch (FileNotFoundException e) { |
| assertEquals("Entry log directory '" + tmpDir + "/current' does not exist", e |
| .getLocalizedMessage()); |
| } finally { |
| if (entryLogger != null) { |
| entryLogger.shutdown(); |
| } |
| } |
| } |
| |
| /** |
| * Test to verify the DiskFull during addEntry. |
| */ |
| @Test |
| public void testAddEntryFailureOnDiskFull() throws Exception { |
| File ledgerDir1 = createTempDir("bkTest", ".dir"); |
| File ledgerDir2 = createTempDir("bkTest", ".dir"); |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); |
| conf.setJournalDirName(ledgerDir1.toString()); |
| conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(), |
| ledgerDir2.getAbsolutePath() }); |
| BookieImpl bookie = new BookieImpl(conf); |
| EntryLogger entryLogger = new EntryLogger(conf, |
| bookie.getLedgerDirsManager()); |
| InterleavedLedgerStorage ledgerStorage = |
| ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage()); |
| ledgerStorage.entryLogger = entryLogger; |
| // Create ledgers |
| ledgerStorage.setMasterKey(1, "key".getBytes()); |
| ledgerStorage.setMasterKey(2, "key".getBytes()); |
| ledgerStorage.setMasterKey(3, "key".getBytes()); |
| // Add entries |
| ledgerStorage.addEntry(generateEntry(1, 1)); |
| ledgerStorage.addEntry(generateEntry(2, 1)); |
| // Add entry with disk full failure simulation |
| bookie.getLedgerDirsManager().addToFilledDirs(((EntryLogManagerBase) entryLogger.getEntryLogManager()) |
| .getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID).getLogFile().getParentFile()); |
| ledgerStorage.addEntry(generateEntry(3, 1)); |
| // Verify written entries |
| Assert.assertTrue(0 == generateEntry(1, 1).compareTo(ledgerStorage.getEntry(1, 1))); |
| Assert.assertTrue(0 == generateEntry(2, 1).compareTo(ledgerStorage.getEntry(2, 1))); |
| Assert.assertTrue(0 == generateEntry(3, 1).compareTo(ledgerStorage.getEntry(3, 1))); |
| } |
| |
| /** |
| * Explicitly try to recover using the ledgers map index at the end of the entry log. |
| */ |
| @Test |
| public void testRecoverFromLedgersMap() throws Exception { |
| // create some entries |
| entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer()); |
| entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer()); |
| entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer()); |
| entryLogger.addEntry(1L, generateEntry(1, 2).nioBuffer()); |
| |
| EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); |
| entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID); |
| entryLogManager.flushRotatedLogs(); |
| |
| EntryLogMetadata meta = entryLogger.extractEntryLogMetadataFromIndex(0L); |
| LOG.info("Extracted Meta From Entry Log {}", meta); |
| assertEquals(60, meta.getLedgersMap().get(1L)); |
| assertEquals(30, meta.getLedgersMap().get(2L)); |
| assertEquals(30, meta.getLedgersMap().get(3L)); |
| assertFalse(meta.getLedgersMap().containsKey(4L)); |
| assertEquals(120, meta.getTotalSize()); |
| assertEquals(120, meta.getRemainingSize()); |
| } |
| |
| /** |
| * Explicitly try to recover using the ledgers map index at the end of the entry log. |
| */ |
| @Test |
| public void testRecoverFromLedgersMapOnV0EntryLog() throws Exception { |
| // create some entries |
| entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer()); |
| entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer()); |
| entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer()); |
| entryLogger.addEntry(1L, generateEntry(1, 2).nioBuffer()); |
| ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(EntryLogger.UNASSIGNED_LEDGERID); |
| entryLogger.shutdown(); |
| |
| // Rewrite the entry log header to be on V0 format |
| File f = new File(curDir, "0.log"); |
| RandomAccessFile raf = new RandomAccessFile(f, "rw"); |
| raf.seek(EntryLogger.HEADER_VERSION_POSITION); |
| // Write zeros to indicate V0 + no ledgers map info |
| raf.write(new byte[4 + 8]); |
| raf.close(); |
| |
| // now see which ledgers are in the log |
| entryLogger = new EntryLogger(conf, dirsMgr); |
| |
| try { |
| entryLogger.extractEntryLogMetadataFromIndex(0L); |
| fail("Should not be possible to recover from ledgers map index"); |
| } catch (IOException e) { |
| // Ok |
| } |
| |
| // Public method should succeed by falling back to scanning the file |
| EntryLogMetadata meta = entryLogger.getEntryLogMetadata(0L); |
| LOG.info("Extracted Meta From Entry Log {}", meta); |
| assertEquals(60, meta.getLedgersMap().get(1L)); |
| assertEquals(30, meta.getLedgersMap().get(2L)); |
| assertEquals(30, meta.getLedgersMap().get(3L)); |
| assertFalse(meta.getLedgersMap().containsKey(4L)); |
| assertEquals(120, meta.getTotalSize()); |
| assertEquals(120, meta.getRemainingSize()); |
| } |
| |
| /** |
| * Test pre-allocate for entry log in EntryLoggerAllocator. |
| * @throws Exception |
| */ |
| @Test |
| public void testPreAllocateLog() throws Exception { |
| entryLogger.shutdown(); |
| |
| // enable pre-allocation case |
| conf.setEntryLogFilePreAllocationEnabled(true); |
| |
| entryLogger = new EntryLogger(conf, dirsMgr); |
| // create a logger whose initialization phase allocating a new entry log |
| ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(EntryLogger.UNASSIGNED_LEDGERID); |
| assertNotNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture()); |
| |
| entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer()); |
| // the Future<BufferedLogChannel> is not null all the time |
| assertNotNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture()); |
| entryLogger.shutdown(); |
| |
| // disable pre-allocation case |
| conf.setEntryLogFilePreAllocationEnabled(false); |
| // create a logger |
| entryLogger = new EntryLogger(conf, dirsMgr); |
| assertNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture()); |
| |
| entryLogger.addEntry(2L, generateEntry(1, 1).nioBuffer()); |
| |
| // the Future<BufferedLogChannel> is null all the time |
| assertNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture()); |
| } |
| |
| /** |
| * Test the getEntryLogsSet() method. |
| */ |
| @Test |
| public void testGetEntryLogsSet() throws Exception { |
| // create some entries |
| EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager()); |
| assertEquals(Sets.newHashSet(), entryLogger.getEntryLogsSet()); |
| |
| entryLogManagerBase.createNewLog(EntryLogger.UNASSIGNED_LEDGERID); |
| entryLogManagerBase.flushRotatedLogs(); |
| |
| Thread.sleep(2000); |
| assertEquals(Sets.newHashSet(0L, 1L), entryLogger.getEntryLogsSet()); |
| |
| entryLogManagerBase.createNewLog(EntryLogger.UNASSIGNED_LEDGERID); |
| entryLogManagerBase.flushRotatedLogs(); |
| |
| assertEquals(Sets.newHashSet(0L, 1L, 2L), entryLogger.getEntryLogsSet()); |
| } |
| |
| /** |
| * In this testcase, entryLogger flush and entryLogger addEntry (which would |
| * call createNewLog) are called concurrently. Since entryLogger flush |
| * method flushes both currentlog and rotatedlogs, it is expected all the |
| * currentLog and rotatedLogs are supposed to be flush and forcewritten. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testFlushOrder() throws Exception { |
| entryLogger.shutdown(); |
| |
| int logSizeLimit = 256 * 1024; |
| conf.setEntryLogPerLedgerEnabled(false); |
| conf.setEntryLogFilePreAllocationEnabled(false); |
| conf.setFlushIntervalInBytes(0); |
| conf.setEntryLogSizeLimit(logSizeLimit); |
| |
| entryLogger = new EntryLogger(conf, dirsMgr); |
| EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); |
| AtomicBoolean exceptionHappened = new AtomicBoolean(false); |
| |
| CyclicBarrier barrier = new CyclicBarrier(2); |
| List<BufferedLogChannel> rotatedLogChannels; |
| BufferedLogChannel currentActiveChannel; |
| |
| exceptionHappened.set(false); |
| |
| /* |
| * higher the number of rotated logs, it would be easier to reproduce |
| * the issue regarding flush order |
| */ |
| addEntriesAndRotateLogs(entryLogger, 30); |
| |
| rotatedLogChannels = new LinkedList<BufferedLogChannel>(entryLogManager.getRotatedLogChannels()); |
| currentActiveChannel = entryLogManager.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID); |
| long currentActiveChannelUnpersistedBytes = currentActiveChannel.getUnpersistedBytes(); |
| |
| Thread flushThread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| barrier.await(); |
| entryLogger.flush(); |
| } catch (InterruptedException | BrokenBarrierException | IOException e) { |
| LOG.error("Exception happened for entryLogger.flush", e); |
| exceptionHappened.set(true); |
| } |
| } |
| }); |
| |
| Thread createdNewLogThread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| barrier.await(); |
| /* |
| * here we are adding entry of size logSizeLimit with |
| * rolllog=true, so it would create a new entrylog. |
| */ |
| entryLogger.addEntry(123, generateEntry(123, 456, logSizeLimit), true); |
| } catch (InterruptedException | BrokenBarrierException | IOException e) { |
| LOG.error("Exception happened for entryLogManager.createNewLog", e); |
| exceptionHappened.set(true); |
| } |
| } |
| }); |
| |
| /* |
| * concurrently entryLogger flush and entryLogger addEntry (which would |
| * call createNewLog) would be called from different threads. |
| */ |
| flushThread.start(); |
| createdNewLogThread.start(); |
| flushThread.join(); |
| createdNewLogThread.join(); |
| |
| Assert.assertFalse("Exception happened in one of the operation", exceptionHappened.get()); |
| |
| if (conf.getFlushIntervalInBytes() > 0) { |
| /* |
| * if flush of the previous current channel is called then the |
| * unpersistedBytes should be less than what it was before, actually |
| * it would be close to zero (but when new log is created with |
| * addEntry call, ledgers map will be appended at the end of entry |
| * log) |
| */ |
| Assert.assertTrue( |
| "previous currentChannel unpersistedBytes should be less than " |
| + currentActiveChannelUnpersistedBytes |
| + ", but it is actually " + currentActiveChannel.getUnpersistedBytes(), |
| currentActiveChannel.getUnpersistedBytes() < currentActiveChannelUnpersistedBytes); |
| } |
| for (BufferedLogChannel rotatedLogChannel : rotatedLogChannels) { |
| Assert.assertEquals("previous rotated entrylog should be flushandforcewritten", 0, |
| rotatedLogChannel.getUnpersistedBytes()); |
| } |
| } |
| |
| void addEntriesAndRotateLogs(EntryLogger entryLogger, int numOfRotations) |
| throws IOException { |
| EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); |
| entryLogManager.setCurrentLogForLedgerAndAddToRotate(EntryLogger.UNASSIGNED_LEDGERID, null); |
| for (int i = 0; i < numOfRotations; i++) { |
| addEntries(entryLogger, 10); |
| entryLogManager.setCurrentLogForLedgerAndAddToRotate(EntryLogger.UNASSIGNED_LEDGERID, null); |
| } |
| addEntries(entryLogger, 10); |
| } |
| |
| void addEntries(EntryLogger entryLogger, int noOfEntries) throws IOException { |
| for (int j = 0; j < noOfEntries; j++) { |
| int ledgerId = Math.abs(rand.nextInt()); |
| int entryId = Math.abs(rand.nextInt()); |
| entryLogger.addEntry(ledgerId, generateEntry(ledgerId, entryId).nioBuffer()); |
| } |
| } |
| |
| static class LedgerStorageWriteTask implements Callable<Boolean> { |
| long ledgerId; |
| int entryId; |
| LedgerStorage ledgerStorage; |
| |
| LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { |
| this.ledgerId = ledgerId; |
| this.entryId = entryId; |
| this.ledgerStorage = ledgerStorage; |
| } |
| |
| @Override |
| public Boolean call() throws IOException, BookieException { |
| try { |
| ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); |
| } catch (IOException e) { |
| LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); |
| throw new IOException("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, |
| e); |
| } |
| return true; |
| } |
| } |
| |
| static class LedgerStorageFlushTask implements Callable<Boolean> { |
| LedgerStorage ledgerStorage; |
| |
| LedgerStorageFlushTask(LedgerStorage ledgerStorage) { |
| this.ledgerStorage = ledgerStorage; |
| } |
| |
| @Override |
| public Boolean call() throws IOException { |
| try { |
| ledgerStorage.flush(); |
| } catch (IOException e) { |
| LOG.error("Got Exception for flush call", e); |
| throw new IOException("Got Exception for Flush call", e); |
| } |
| return true; |
| } |
| } |
| |
| static class LedgerStorageReadTask implements Callable<Boolean> { |
| long ledgerId; |
| int entryId; |
| LedgerStorage ledgerStorage; |
| |
| LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { |
| this.ledgerId = ledgerId; |
| this.entryId = entryId; |
| this.ledgerStorage = ledgerStorage; |
| } |
| |
| @Override |
| public Boolean call() throws IOException { |
| try { |
| ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId); |
| ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, entryId); |
| if (!expectedByteBuf.equals(actualByteBuf)) { |
| LOG.error("Expected Entry: {} Actual Entry: {}", expectedByteBuf.toString(Charset.defaultCharset()), |
| actualByteBuf.toString(Charset.defaultCharset())); |
| throw new IOException("Expected Entry: " + expectedByteBuf.toString(Charset.defaultCharset()) |
| + " Actual Entry: " + actualByteBuf.toString(Charset.defaultCharset())); |
| } |
| } catch (IOException e) { |
| LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); |
| throw new IOException("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, |
| e); |
| } |
| return true; |
| } |
| } |
| |
| /** |
| * test concurrent write operations and then concurrent read operations |
| * using InterleavedLedgerStorage. |
| */ |
| @FlakyTest(value = "https://github.com/apache/bookkeeper/issues/1516") |
| public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception { |
| testConcurrentWriteAndReadCalls(InterleavedLedgerStorage.class.getName(), false); |
| } |
| |
| /** |
| * test concurrent write operations and then concurrent read operations |
| * using InterleavedLedgerStorage with EntryLogPerLedger enabled. |
| */ |
| @FlakyTest(value = "https://github.com/apache/bookkeeper/issues/1516") |
| public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorageWithELPLEnabled() throws Exception { |
| testConcurrentWriteAndReadCalls(InterleavedLedgerStorage.class.getName(), true); |
| } |
| |
| /** |
| * test concurrent write operations and then concurrent read operations |
| * using SortedLedgerStorage. |
| */ |
| @FlakyTest(value = "https://github.com/apache/bookkeeper/issues/1516") |
| public void testConcurrentWriteAndReadCallsOfSortedLedgerStorage() throws Exception { |
| testConcurrentWriteAndReadCalls(SortedLedgerStorage.class.getName(), false); |
| } |
| |
| /** |
| * test concurrent write operations and then concurrent read operations |
| * using SortedLedgerStorage with EntryLogPerLedger enabled. |
| */ |
| @FlakyTest(value = "https://github.com/apache/bookkeeper/issues/1516") |
| public void testConcurrentWriteAndReadCallsOfSortedLedgerStorageWithELPLEnabled() throws Exception { |
| testConcurrentWriteAndReadCalls(SortedLedgerStorage.class.getName(), true); |
| } |
| |
| public void testConcurrentWriteAndReadCalls(String ledgerStorageClass, boolean entryLogPerLedgerEnabled) |
| throws Exception { |
| File ledgerDir = createTempDir("bkTest", ".dir"); |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setJournalDirName(ledgerDir.toString()); |
| conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()}); |
| conf.setLedgerStorageClass(ledgerStorageClass); |
| conf.setEntryLogPerLedgerEnabled(entryLogPerLedgerEnabled); |
| BookieImpl bookie = new BookieImpl(conf); |
| CompactableLedgerStorage ledgerStorage = (CompactableLedgerStorage) bookie.ledgerStorage; |
| Random rand = new Random(0); |
| |
| if (ledgerStorageClass.equals(SortedLedgerStorage.class.getName())) { |
| Assert.assertEquals("LedgerStorage Class", SortedLedgerStorage.class, ledgerStorage.getClass()); |
| if (entryLogPerLedgerEnabled) { |
| Assert.assertEquals("MemTable Class", EntryMemTableWithParallelFlusher.class, |
| ((SortedLedgerStorage) ledgerStorage).memTable.getClass()); |
| } else { |
| Assert.assertEquals("MemTable Class", EntryMemTable.class, |
| ((SortedLedgerStorage) ledgerStorage).memTable.getClass()); |
| } |
| } |
| |
| int numOfLedgers = 70; |
| int numEntries = 1500; |
| // Create ledgers |
| for (int i = 0; i < numOfLedgers; i++) { |
| ledgerStorage.setMasterKey(i, "key".getBytes()); |
| } |
| |
| ExecutorService executor = Executors.newFixedThreadPool(10); |
| List<Callable<Boolean>> writeAndFlushTasks = new ArrayList<Callable<Boolean>>(); |
| for (int j = 0; j < numEntries; j++) { |
| for (int i = 0; i < numOfLedgers; i++) { |
| writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, ledgerStorage)); |
| } |
| } |
| |
| /* |
| * add some flush tasks to the list of writetasks list. |
| */ |
| for (int i = 0; i < (numOfLedgers * numEntries) / 500; i++) { |
| writeAndFlushTasks.add(rand.nextInt(writeAndFlushTasks.size()), new LedgerStorageFlushTask(ledgerStorage)); |
| } |
| |
| // invoke all those write/flush tasks all at once concurrently |
| executor.invokeAll(writeAndFlushTasks).forEach((future) -> { |
| try { |
| future.get(); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| LOG.error("Write/Flush task failed because of InterruptedException", ie); |
| Assert.fail("Write/Flush task interrupted"); |
| } catch (Exception ex) { |
| LOG.error("Write/Flush task failed because of exception", ex); |
| Assert.fail("Write/Flush task failed " + ex.getMessage()); |
| } |
| }); |
| |
| List<Callable<Boolean>> readAndFlushTasks = new ArrayList<Callable<Boolean>>(); |
| for (int j = 0; j < numEntries; j++) { |
| for (int i = 0; i < numOfLedgers; i++) { |
| readAndFlushTasks.add(new LedgerStorageReadTask(i, j, ledgerStorage)); |
| } |
| } |
| |
| /* |
| * add some flush tasks to the list of readtasks list. |
| */ |
| for (int i = 0; i < (numOfLedgers * numEntries) / 500; i++) { |
| readAndFlushTasks.add(rand.nextInt(readAndFlushTasks.size()), new LedgerStorageFlushTask(ledgerStorage)); |
| } |
| |
| // invoke all those read/flush tasks all at once concurrently |
| executor.invokeAll(readAndFlushTasks).forEach((future) -> { |
| try { |
| future.get(); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| LOG.error("Read/Flush task failed because of InterruptedException", ie); |
| Assert.fail("Read/Flush task interrupted"); |
| } catch (Exception ex) { |
| LOG.error("Read/Flush task failed because of exception", ex); |
| Assert.fail("Read/Flush task failed " + ex.getMessage()); |
| } |
| }); |
| |
| executor.shutdownNow(); |
| } |
| |
| /** |
| * Test to verify the leastUnflushedLogId logic in EntryLogsStatus. |
| */ |
| @Test |
| public void testEntryLoggersRecentEntryLogsStatus() throws Exception { |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setLedgerDirNames(createAndGetLedgerDirs(2)); |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| |
| EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogger.RecentEntryLogsStatus recentlyCreatedLogsStatus = entryLogger.recentlyCreatedEntryLogsStatus; |
| |
| recentlyCreatedLogsStatus.createdEntryLog(0L); |
| Assert.assertEquals("entryLogger's leastUnflushedLogId ", 0L, entryLogger.getLeastUnflushedLogId()); |
| recentlyCreatedLogsStatus.flushRotatedEntryLog(0L); |
| // since we marked entrylog - 0 as rotated, LeastUnflushedLogId would be previous rotatedlog+1 |
| Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId()); |
| recentlyCreatedLogsStatus.createdEntryLog(1L); |
| Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId()); |
| recentlyCreatedLogsStatus.createdEntryLog(2L); |
| recentlyCreatedLogsStatus.createdEntryLog(3L); |
| recentlyCreatedLogsStatus.createdEntryLog(4L); |
| Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId()); |
| recentlyCreatedLogsStatus.flushRotatedEntryLog(1L); |
| Assert.assertEquals("entryLogger's leastUnflushedLogId ", 2L, entryLogger.getLeastUnflushedLogId()); |
| recentlyCreatedLogsStatus.flushRotatedEntryLog(3L); |
| // here though we rotated entrylog-3, entrylog-2 is not yet rotated so |
| // LeastUnflushedLogId should be still 2 |
| Assert.assertEquals("entryLogger's leastUnflushedLogId ", 2L, entryLogger.getLeastUnflushedLogId()); |
| recentlyCreatedLogsStatus.flushRotatedEntryLog(2L); |
| // entrylog-3 is already rotated, so leastUnflushedLogId should be 4 |
| Assert.assertEquals("entryLogger's leastUnflushedLogId ", 4L, entryLogger.getLeastUnflushedLogId()); |
| recentlyCreatedLogsStatus.flushRotatedEntryLog(4L); |
| Assert.assertEquals("entryLogger's leastUnflushedLogId ", 5L, entryLogger.getLeastUnflushedLogId()); |
| recentlyCreatedLogsStatus.createdEntryLog(5L); |
| recentlyCreatedLogsStatus.createdEntryLog(7L); |
| recentlyCreatedLogsStatus.createdEntryLog(9L); |
| Assert.assertEquals("entryLogger's leastUnflushedLogId ", 5L, entryLogger.getLeastUnflushedLogId()); |
| recentlyCreatedLogsStatus.flushRotatedEntryLog(5L); |
| // since we marked entrylog-5 as rotated, LeastUnflushedLogId would be previous rotatedlog+1 |
| Assert.assertEquals("entryLogger's leastUnflushedLogId ", 6L, entryLogger.getLeastUnflushedLogId()); |
| recentlyCreatedLogsStatus.flushRotatedEntryLog(7L); |
| Assert.assertEquals("entryLogger's leastUnflushedLogId ", 8L, entryLogger.getLeastUnflushedLogId()); |
| } |
| |
| String[] createAndGetLedgerDirs(int numOfLedgerDirs) throws IOException { |
| File ledgerDir; |
| File curDir; |
| String[] ledgerDirsPath = new String[numOfLedgerDirs]; |
| for (int i = 0; i < numOfLedgerDirs; i++) { |
| ledgerDir = createTempDir("bkTest", ".dir"); |
| curDir = BookieImpl.getCurrentDirectory(ledgerDir); |
| BookieImpl.checkDirectoryStructure(curDir); |
| ledgerDirsPath[i] = ledgerDir.getAbsolutePath(); |
| } |
| return ledgerDirsPath; |
| } |
| |
| /* |
| * test for validating if the EntryLog/BufferedChannel flushes/forcewrite if the bytes written to it are more than |
| * flushIntervalInBytes |
| */ |
| @Test |
| public void testFlushIntervalInBytes() throws Exception { |
| long flushIntervalInBytes = 5000; |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setEntryLogPerLedgerEnabled(true); |
| conf.setFlushIntervalInBytes(flushIntervalInBytes); |
| conf.setLedgerDirNames(createAndGetLedgerDirs(2)); |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager()); |
| |
| /* |
| * when entryLogger is created Header of length EntryLogger.LOGFILE_HEADER_SIZE is created |
| */ |
| long ledgerId = 0L; |
| int firstEntrySize = 1000; |
| long entry0Position = entryLogger.addEntry(0L, generateEntry(ledgerId, 0L, firstEntrySize)); |
| // entrylogger writes length of the entry (4 bytes) before writing entry |
| long expectedUnpersistedBytes = EntryLogger.LOGFILE_HEADER_SIZE + firstEntrySize + 4; |
| Assert.assertEquals("Unpersisted Bytes of entrylog", expectedUnpersistedBytes, |
| entryLogManagerBase.getCurrentLogForLedger(ledgerId).getUnpersistedBytes()); |
| |
| /* |
| * 'flushIntervalInBytes' number of bytes are flushed so BufferedChannel should be forcewritten |
| */ |
| int secondEntrySize = (int) (flushIntervalInBytes - expectedUnpersistedBytes); |
| long entry1Position = entryLogger.addEntry(0L, generateEntry(ledgerId, 1L, secondEntrySize)); |
| Assert.assertEquals("Unpersisted Bytes of entrylog", 0, |
| entryLogManagerBase.getCurrentLogForLedger(ledgerId).getUnpersistedBytes()); |
| |
| /* |
| * since entrylog/Bufferedchannel is persisted (forcewritten), we should be able to read the entrylog using |
| * newEntryLogger |
| */ |
| conf.setEntryLogPerLedgerEnabled(false); |
| EntryLogger newEntryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogManager newEntryLogManager = newEntryLogger.getEntryLogManager(); |
| Assert.assertEquals("EntryLogManager class type", EntryLogManagerForSingleEntryLog.class, |
| newEntryLogManager.getClass()); |
| |
| ByteBuf buf = newEntryLogger.readEntry(ledgerId, 0L, entry0Position); |
| long readLedgerId = buf.readLong(); |
| long readEntryId = buf.readLong(); |
| Assert.assertEquals("LedgerId", ledgerId, readLedgerId); |
| Assert.assertEquals("EntryId", 0L, readEntryId); |
| |
| buf = newEntryLogger.readEntry(ledgerId, 1L, entry1Position); |
| readLedgerId = buf.readLong(); |
| readEntryId = buf.readLong(); |
| Assert.assertEquals("LedgerId", ledgerId, readLedgerId); |
| Assert.assertEquals("EntryId", 1L, readEntryId); |
| } |
| |
| /* |
| * tests basic logic of EntryLogManager interface for |
| * EntryLogManagerForEntryLogPerLedger. |
| */ |
| @Test |
| public void testEntryLogManagerInterfaceForEntryLogPerLedger() throws Exception { |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setEntryLogFilePreAllocationEnabled(true); |
| conf.setEntryLogPerLedgerEnabled(true); |
| conf.setLedgerDirNames(createAndGetLedgerDirs(2)); |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| |
| EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger |
| .getEntryLogManager(); |
| |
| Assert.assertEquals("Number of current active EntryLogs ", 0, entryLogManager.getCopyOfCurrentLogs().size()); |
| Assert.assertEquals("Number of Rotated Logs ", 0, entryLogManager.getRotatedLogChannels().size()); |
| |
| int numOfLedgers = 5; |
| int numOfThreadsPerLedger = 10; |
| validateLockAcquireAndRelease(numOfLedgers, numOfThreadsPerLedger, entryLogManager); |
| |
| for (long i = 0; i < numOfLedgers; i++) { |
| entryLogManager.setCurrentLogForLedgerAndAddToRotate(i, |
| createDummyBufferedLogChannel(entryLogger, i, conf)); |
| } |
| |
| for (long i = 0; i < numOfLedgers; i++) { |
| Assert.assertEquals("LogChannel for ledger: " + i, entryLogManager.getCurrentLogIfPresent(i), |
| entryLogManager.getCurrentLogForLedger(i)); |
| } |
| |
| Assert.assertEquals("Number of current active EntryLogs ", numOfLedgers, |
| entryLogManager.getCopyOfCurrentLogs().size()); |
| Assert.assertEquals("Number of Rotated Logs ", 0, entryLogManager.getRotatedLogChannels().size()); |
| |
| for (long i = 0; i < numOfLedgers; i++) { |
| entryLogManager.setCurrentLogForLedgerAndAddToRotate(i, |
| createDummyBufferedLogChannel(entryLogger, numOfLedgers + i, conf)); |
| } |
| |
| /* |
| * since new entryLogs are set for all the ledgers, previous entrylogs would be added to rotatedLogChannels |
| */ |
| Assert.assertEquals("Number of current active EntryLogs ", numOfLedgers, |
| entryLogManager.getCopyOfCurrentLogs().size()); |
| Assert.assertEquals("Number of Rotated Logs ", numOfLedgers, |
| entryLogManager.getRotatedLogChannels().size()); |
| |
| for (long i = 0; i < numOfLedgers; i++) { |
| entryLogManager.setCurrentLogForLedgerAndAddToRotate(i, |
| createDummyBufferedLogChannel(entryLogger, 2 * numOfLedgers + i, conf)); |
| } |
| |
| /* |
| * again since new entryLogs are set for all the ledgers, previous entrylogs would be added to |
| * rotatedLogChannels |
| */ |
| Assert.assertEquals("Number of current active EntryLogs ", numOfLedgers, |
| entryLogManager.getCopyOfCurrentLogs().size()); |
| Assert.assertEquals("Number of Rotated Logs ", 2 * numOfLedgers, |
| entryLogManager.getRotatedLogChannels().size()); |
| |
| for (BufferedLogChannel logChannel : entryLogManager.getRotatedLogChannels()) { |
| entryLogManager.getRotatedLogChannels().remove(logChannel); |
| } |
| Assert.assertEquals("Number of Rotated Logs ", 0, entryLogManager.getRotatedLogChannels().size()); |
| |
| // entrylogid is sequential |
| for (long i = 0; i < numOfLedgers; i++) { |
| assertEquals("EntryLogid for Ledger " + i, 2 * numOfLedgers + i, |
| entryLogManager.getCurrentLogForLedger(i).getLogId()); |
| } |
| |
| for (long i = 2 * numOfLedgers; i < (3 * numOfLedgers); i++) { |
| assertTrue("EntryLog with logId: " + i + " should be present", |
| entryLogManager.getCurrentLogIfPresent(i) != null); |
| } |
| } |
| |
| private EntryLogger.BufferedLogChannel createDummyBufferedLogChannel(EntryLogger entryLogger, long logid, |
| ServerConfiguration servConf) throws IOException { |
| File tmpFile = File.createTempFile("entrylog", logid + ""); |
| tmpFile.deleteOnExit(); |
| FileChannel fc = new RandomAccessFile(tmpFile, "rw").getChannel(); |
| EntryLogger.BufferedLogChannel logChannel = new BufferedLogChannel(UnpooledByteBufAllocator.DEFAULT, fc, 10, 10, |
| logid, tmpFile, servConf.getFlushIntervalInBytes()); |
| return logChannel; |
| } |
| |
| /* |
| * validates the concurrency aspect of entryLogManager's lock |
| * |
| * Executor of fixedThreadPool of size 'numOfLedgers * numOfThreadsPerLedger' is created and the same number |
| * of tasks are submitted to the Executor. In each task, lock of that ledger is acquired and then released. |
| */ |
| private void validateLockAcquireAndRelease(int numOfLedgers, int numOfThreadsPerLedger, |
| EntryLogManagerForEntryLogPerLedger entryLogManager) throws InterruptedException { |
| ExecutorService tpe = Executors.newFixedThreadPool(numOfLedgers * numOfThreadsPerLedger); |
| CountDownLatch latchToStart = new CountDownLatch(1); |
| CountDownLatch latchToWait = new CountDownLatch(1); |
| AtomicInteger numberOfThreadsAcquiredLock = new AtomicInteger(0); |
| AtomicBoolean irptExceptionHappened = new AtomicBoolean(false); |
| Random rand = new Random(); |
| |
| for (int i = 0; i < numOfLedgers * numOfThreadsPerLedger; i++) { |
| long ledgerId = i % numOfLedgers; |
| tpe.submit(() -> { |
| try { |
| latchToStart.await(); |
| Lock lock = entryLogManager.getLock(ledgerId); |
| lock.lock(); |
| numberOfThreadsAcquiredLock.incrementAndGet(); |
| latchToWait.await(); |
| lock.unlock(); |
| } catch (InterruptedException | IOException e) { |
| irptExceptionHappened.set(true); |
| } |
| }); |
| } |
| |
| assertEquals("Number Of Threads acquired Lock", 0, numberOfThreadsAcquiredLock.get()); |
| latchToStart.countDown(); |
| Thread.sleep(1000); |
| /* |
| * since there are only "numOfLedgers" ledgers, only < "numOfLedgers" |
| * threads should have been able to acquire lock, because multiple |
| * ledgers can end up getting same lock because their hashcode might |
| * fall in the same bucket. |
| * |
| * |
| * After acquiring the lock there must be waiting on 'latchToWait' latch |
| */ |
| int currentNumberOfThreadsAcquiredLock = numberOfThreadsAcquiredLock.get(); |
| assertTrue("Number Of Threads acquired Lock " + currentNumberOfThreadsAcquiredLock, |
| (currentNumberOfThreadsAcquiredLock > 0) && (currentNumberOfThreadsAcquiredLock <= numOfLedgers)); |
| latchToWait.countDown(); |
| Thread.sleep(2000); |
| assertEquals("Number Of Threads acquired Lock", numOfLedgers * numOfThreadsPerLedger, |
| numberOfThreadsAcquiredLock.get()); |
| } |
| |
| /* |
| * test EntryLogManager.EntryLogManagerForEntryLogPerLedger removes the |
| * ledger from its cache map if entry is not added to that ledger or its |
| * corresponding state is not accessed for more than evictionPeriod |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testEntryLogManagerExpiryRemoval() throws Exception { |
| int evictionPeriod = 1; |
| |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setEntryLogFilePreAllocationEnabled(false); |
| conf.setEntryLogPerLedgerEnabled(true); |
| conf.setLedgerDirNames(createAndGetLedgerDirs(2)); |
| conf.setEntrylogMapAccessExpiryTimeInSeconds(evictionPeriod); |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| |
| EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogManagerForEntryLogPerLedger entryLogManager = |
| (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager(); |
| |
| long ledgerId = 0L; |
| |
| BufferedLogChannel logChannel = createDummyBufferedLogChannel(entryLogger, 0, conf); |
| entryLogManager.setCurrentLogForLedgerAndAddToRotate(ledgerId, logChannel); |
| |
| BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId); |
| assertEquals("LogChannel for ledger " + ledgerId + " should match", logChannel, currentLogForLedger); |
| |
| Thread.sleep(evictionPeriod * 1000 + 100); |
| entryLogManager.doEntryLogMapCleanup(); |
| |
| /* |
| * since for more than evictionPeriod, that ledger is not accessed and cache is cleaned up, mapping for that |
| * ledger should not be available anymore |
| */ |
| currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId); |
| assertEquals("LogChannel for ledger " + ledgerId + " should be null", null, currentLogForLedger); |
| Assert.assertEquals("Number of current active EntryLogs ", 0, entryLogManager.getCopyOfCurrentLogs().size()); |
| Assert.assertEquals("Number of rotated EntryLogs ", 1, entryLogManager.getRotatedLogChannels().size()); |
| Assert.assertTrue("CopyOfRotatedLogChannels should contain the created LogChannel", |
| entryLogManager.getRotatedLogChannels().contains(logChannel)); |
| |
| Assert.assertTrue("since mapentry must have been evicted, it should be null", |
| (entryLogManager.getCacheAsMap().get(ledgerId) == null) |
| || (entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == null)); |
| } |
| |
| /* |
| * tests if the maximum size of cache (maximumNumberOfActiveEntryLogs) is |
| * honored in EntryLogManagerForEntryLogPerLedger's cache eviction policy. |
| */ |
| @Test |
| public void testCacheMaximumSizeEvictionPolicy() throws Exception { |
| entryLogger.shutdown(); |
| final int cacheMaximumSize = 20; |
| |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setEntryLogFilePreAllocationEnabled(true); |
| conf.setEntryLogPerLedgerEnabled(true); |
| conf.setLedgerDirNames(createAndGetLedgerDirs(1)); |
| conf.setMaximumNumberOfActiveEntryLogs(cacheMaximumSize); |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| |
| entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogManagerForEntryLogPerLedger entryLogManager = |
| (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager(); |
| |
| for (int i = 0; i < cacheMaximumSize + 10; i++) { |
| entryLogManager.createNewLog(i); |
| int cacheSize = entryLogManager.getCacheAsMap().size(); |
| Assert.assertTrue("Cache maximum size is expected to be less than " + cacheMaximumSize |
| + " but current cacheSize is " + cacheSize, cacheSize <= cacheMaximumSize); |
| } |
| } |
| |
| @Test |
| public void testLongLedgerIdsWithEntryLogPerLedger() throws Exception { |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setEntryLogFilePreAllocationEnabled(true); |
| conf.setEntryLogPerLedgerEnabled(true); |
| conf.setLedgerDirNames(createAndGetLedgerDirs(1)); |
| conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); |
| |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| |
| EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger |
| .getEntryLogManager(); |
| |
| int numOfLedgers = 5; |
| int numOfEntries = 4; |
| long[][] pos = new long[numOfLedgers][numOfEntries]; |
| for (int i = 0; i < numOfLedgers; i++) { |
| long ledgerId = Long.MAX_VALUE - i; |
| entryLogManager.createNewLog(ledgerId); |
| for (int entryId = 0; entryId < numOfEntries; entryId++) { |
| pos[i][entryId] = entryLogger.addEntry(ledgerId, generateEntry(ledgerId, entryId).nioBuffer()); |
| } |
| } |
| /* |
| * do checkpoint to make sure entrylog files are persisted |
| */ |
| entryLogger.checkpoint(); |
| |
| for (int i = 0; i < numOfLedgers; i++) { |
| long ledgerId = Long.MAX_VALUE - i; |
| for (int entryId = 0; entryId < numOfEntries; entryId++) { |
| String expectedValue = generateDataString(ledgerId, entryId); |
| ByteBuf buf = entryLogger.readEntry(ledgerId, entryId, pos[i][entryId]); |
| long readLedgerId = buf.readLong(); |
| long readEntryId = buf.readLong(); |
| byte[] readData = new byte[buf.readableBytes()]; |
| buf.readBytes(readData); |
| assertEquals("LedgerId ", ledgerId, readLedgerId); |
| assertEquals("EntryId ", entryId, readEntryId); |
| assertEquals("Entry Data ", expectedValue, new String(readData)); |
| } |
| } |
| } |
| |
| /* |
| * when entrylog for ledger is removed from ledgerIdEntryLogMap, then |
| * ledgermap should be appended to that entrylog, before moving that |
| * entrylog to rotatedlogchannels. |
| */ |
| @Test |
| public void testAppendLedgersMapOnCacheRemoval() throws Exception { |
| final int cacheMaximumSize = 5; |
| |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setEntryLogFilePreAllocationEnabled(true); |
| conf.setEntryLogPerLedgerEnabled(true); |
| conf.setLedgerDirNames(createAndGetLedgerDirs(1)); |
| conf.setMaximumNumberOfActiveEntryLogs(cacheMaximumSize); |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| |
| EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger |
| .getEntryLogManager(); |
| |
| long ledgerId = 0L; |
| entryLogManager.createNewLog(ledgerId); |
| int entrySize = 200; |
| int numOfEntries = 4; |
| for (int i = 0; i < numOfEntries; i++) { |
| entryLogger.addEntry(ledgerId, generateEntry(ledgerId, i, entrySize)); |
| } |
| |
| BufferedLogChannel logChannelForledger = entryLogManager.getCurrentLogForLedger(ledgerId); |
| long logIdOfLedger = logChannelForledger.getLogId(); |
| /* |
| * do checkpoint to make sure entrylog files are persisted |
| */ |
| entryLogger.checkpoint(); |
| |
| try { |
| entryLogger.extractEntryLogMetadataFromIndex(logIdOfLedger); |
| } catch (IOException ie) { |
| // expected because appendLedgersMap wouldn't have been called |
| } |
| |
| /* |
| * create entrylogs for more ledgers, so that ledgerIdEntryLogMap would |
| * reach its limit and remove the oldest entrylog. |
| */ |
| for (int i = 1; i <= cacheMaximumSize; i++) { |
| entryLogManager.createNewLog(i); |
| } |
| /* |
| * do checkpoint to make sure entrylog files are persisted |
| */ |
| entryLogger.checkpoint(); |
| |
| EntryLogMetadata entryLogMetadata = entryLogger.extractEntryLogMetadataFromIndex(logIdOfLedger); |
| ConcurrentLongLongHashMap ledgersMap = entryLogMetadata.getLedgersMap(); |
| Assert.assertEquals("There should be only one entry in entryLogMetadata", 1, ledgersMap.size()); |
| Assert.assertTrue("Usage should be 1", Double.compare(1.0, entryLogMetadata.getUsage()) == 0); |
| Assert.assertEquals("Total size of entries", (entrySize + 4) * numOfEntries, ledgersMap.get(ledgerId)); |
| } |
| |
| /** |
| * test EntryLogManager.EntryLogManagerForEntryLogPerLedger doesn't removes |
| * the ledger from its cache map if ledger's corresponding state is accessed |
| * within the evictionPeriod. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testExpiryRemovalByAccessingOnAnotherThread() throws Exception { |
| int evictionPeriod = 1; |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setEntryLogFilePreAllocationEnabled(false); |
| conf.setEntryLogPerLedgerEnabled(true); |
| conf.setLedgerDirNames(createAndGetLedgerDirs(2)); |
| conf.setEntrylogMapAccessExpiryTimeInSeconds(evictionPeriod); |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| |
| EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogManagerForEntryLogPerLedger entryLogManager = |
| (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager(); |
| |
| long ledgerId = 0L; |
| |
| BufferedLogChannel newLogChannel = createDummyBufferedLogChannel(entryLogger, 1, conf); |
| entryLogManager.setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel); |
| |
| Thread t = new Thread() { |
| public void run() { |
| try { |
| Thread.sleep((evictionPeriod * 1000) / 2); |
| entryLogManager.getCurrentLogForLedger(ledgerId); |
| } catch (InterruptedException | IOException e) { |
| } |
| } |
| }; |
| |
| t.start(); |
| Thread.sleep(evictionPeriod * 1000 + 100); |
| entryLogManager.doEntryLogMapCleanup(); |
| |
| /* |
| * in this scenario, that ledger is accessed by other thread during |
| * eviction period time, so it should not be evicted. |
| */ |
| BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId); |
| assertEquals("LogChannel for ledger " + ledgerId, newLogChannel, currentLogForLedger); |
| Assert.assertEquals("Number of current active EntryLogs ", 1, entryLogManager.getCopyOfCurrentLogs().size()); |
| Assert.assertEquals("Number of rotated EntryLogs ", 0, entryLogManager.getRotatedLogChannels().size()); |
| } |
| |
| /** |
| * test EntryLogManager.EntryLogManagerForEntryLogPerLedger removes the |
| * ledger from its cache map if entry is not added to that ledger or its |
| * corresponding state is not accessed for more than evictionPeriod. In this |
| * testcase we try to call unrelated methods or access state of other |
| * ledgers within the eviction period. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testExpiryRemovalByAccessingNonCacheRelatedMethods() throws Exception { |
| int evictionPeriod = 1; |
| |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setEntryLogFilePreAllocationEnabled(false); |
| conf.setEntryLogPerLedgerEnabled(true); |
| conf.setLedgerDirNames(createAndGetLedgerDirs(2)); |
| conf.setEntrylogMapAccessExpiryTimeInSeconds(evictionPeriod); |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| |
| EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogManagerForEntryLogPerLedger entryLogManager = |
| (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager(); |
| |
| long ledgerId = 0L; |
| |
| BufferedLogChannel newLogChannel = createDummyBufferedLogChannel(entryLogger, 1, conf); |
| entryLogManager.setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel); |
| |
| AtomicBoolean exceptionOccured = new AtomicBoolean(false); |
| Thread t = new Thread() { |
| public void run() { |
| try { |
| Thread.sleep(500); |
| /* |
| * any of the following operations should not access entry |
| * of 'ledgerId' in the cache |
| */ |
| entryLogManager.getCopyOfCurrentLogs(); |
| entryLogManager.getRotatedLogChannels(); |
| entryLogManager.getCurrentLogIfPresent(newLogChannel.getLogId()); |
| entryLogManager.getDirForNextEntryLog(ledgerDirsManager.getWritableLedgerDirs()); |
| long newLedgerId = 100; |
| BufferedLogChannel logChannelForNewLedger = |
| createDummyBufferedLogChannel(entryLogger, newLedgerId, conf); |
| entryLogManager.setCurrentLogForLedgerAndAddToRotate(newLedgerId, logChannelForNewLedger); |
| entryLogManager.getCurrentLogIfPresent(newLedgerId); |
| } catch (Exception e) { |
| LOG.error("Got Exception in thread", e); |
| exceptionOccured.set(true); |
| } |
| } |
| }; |
| |
| t.start(); |
| Thread.sleep(evictionPeriod * 1000 + 100); |
| entryLogManager.doEntryLogMapCleanup(); |
| Assert.assertFalse("Exception occured in thread, which is not expected", exceptionOccured.get()); |
| |
| /* |
| * since for more than evictionPeriod, that ledger is not accessed and cache is cleaned up, mapping for that |
| * ledger should not be available anymore |
| */ |
| BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId); |
| assertEquals("LogChannel for ledger " + ledgerId + " should be null", null, currentLogForLedger); |
| // expected number of current active entryLogs is 1 since we created entrylog for 'newLedgerId' |
| Assert.assertEquals("Number of current active EntryLogs ", 1, entryLogManager.getCopyOfCurrentLogs().size()); |
| Assert.assertEquals("Number of rotated EntryLogs ", 1, entryLogManager.getRotatedLogChannels().size()); |
| Assert.assertTrue("CopyOfRotatedLogChannels should contain the created LogChannel", |
| entryLogManager.getRotatedLogChannels().contains(newLogChannel)); |
| |
| Assert.assertTrue("since mapentry must have been evicted, it should be null", |
| (entryLogManager.getCacheAsMap().get(ledgerId) == null) |
| || (entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == null)); |
| } |
| |
| /* |
| * testing EntryLogger functionality (addEntry/createNewLog/flush) and EntryLogManager with entryLogPerLedger |
| * enabled |
| */ |
| @Test |
| public void testEntryLogManagerForEntryLogPerLedger() throws Exception { |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setEntryLogPerLedgerEnabled(true); |
| conf.setFlushIntervalInBytes(10000000); |
| conf.setLedgerDirNames(createAndGetLedgerDirs(2)); |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); |
| Assert.assertEquals("EntryLogManager class type", EntryLogManagerForEntryLogPerLedger.class, |
| entryLogManager.getClass()); |
| |
| int numOfActiveLedgers = 20; |
| int numEntries = 5; |
| |
| for (int j = 0; j < numEntries; j++) { |
| for (long i = 0; i < numOfActiveLedgers; i++) { |
| entryLogger.addEntry(i, generateEntry(i, j)); |
| } |
| } |
| |
| for (long i = 0; i < numOfActiveLedgers; i++) { |
| BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(i); |
| Assert.assertTrue("unpersistedBytes should be greater than LOGFILE_HEADER_SIZE", |
| logChannel.getUnpersistedBytes() > EntryLogger.LOGFILE_HEADER_SIZE); |
| } |
| |
| for (long i = 0; i < numOfActiveLedgers; i++) { |
| entryLogManager.createNewLog(i); |
| } |
| |
| /* |
| * since we created new entrylog for all the activeLedgers, entrylogs of all the ledgers |
| * should be rotated and hence the size of copyOfRotatedLogChannels should be numOfActiveLedgers |
| */ |
| List<BufferedLogChannel> rotatedLogs = entryLogManager.getRotatedLogChannels(); |
| Assert.assertEquals("Number of rotated entrylogs", numOfActiveLedgers, rotatedLogs.size()); |
| |
| /* |
| * Since newlog is created for all slots, so they are moved to rotated logs and hence unpersistedBytes of all |
| * the slots should be just EntryLogger.LOGFILE_HEADER_SIZE |
| * |
| */ |
| for (long i = 0; i < numOfActiveLedgers; i++) { |
| BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(i); |
| Assert.assertEquals("unpersistedBytes should be LOGFILE_HEADER_SIZE", EntryLogger.LOGFILE_HEADER_SIZE, |
| logChannel.getUnpersistedBytes()); |
| } |
| |
| for (int j = numEntries; j < 2 * numEntries; j++) { |
| for (long i = 0; i < numOfActiveLedgers; i++) { |
| entryLogger.addEntry(i, generateEntry(i, j)); |
| } |
| } |
| |
| for (long i = 0; i < numOfActiveLedgers; i++) { |
| BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(i); |
| Assert.assertTrue("unpersistedBytes should be greater than LOGFILE_HEADER_SIZE", |
| logChannel.getUnpersistedBytes() > EntryLogger.LOGFILE_HEADER_SIZE); |
| } |
| |
| Assert.assertEquals("LeastUnflushedloggerID", 0, entryLogger.getLeastUnflushedLogId()); |
| |
| /* |
| * here flush is called so all the rotatedLogChannels should be file closed and there shouldn't be any |
| * rotatedlogchannel and also leastUnflushedLogId should be advanced to numOfActiveLedgers |
| */ |
| entryLogger.flush(); |
| Assert.assertEquals("Number of rotated entrylogs", 0, entryLogManager.getRotatedLogChannels().size()); |
| Assert.assertEquals("LeastUnflushedloggerID", numOfActiveLedgers, entryLogger.getLeastUnflushedLogId()); |
| |
| /* |
| * after flush (flushCurrentLogs) unpersistedBytes should be 0. |
| */ |
| for (long i = 0; i < numOfActiveLedgers; i++) { |
| BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(i); |
| Assert.assertEquals("unpersistedBytes should be 0", 0L, logChannel.getUnpersistedBytes()); |
| } |
| } |
| |
| /* |
| * with entryLogPerLedger enabled, create multiple entrylogs, add entries of ledgers and read them before and after |
| * flush |
| */ |
| @Test |
| public void testReadAddCallsOfMultipleEntryLogs() throws Exception { |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setEntryLogPerLedgerEnabled(true); |
| conf.setLedgerDirNames(createAndGetLedgerDirs(2)); |
| // pre allocation enabled |
| conf.setEntryLogFilePreAllocationEnabled(true); |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| |
| EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager()); |
| |
| int numOfActiveLedgers = 10; |
| int numEntries = 10; |
| long[][] positions = new long[numOfActiveLedgers][]; |
| for (int i = 0; i < numOfActiveLedgers; i++) { |
| positions[i] = new long[numEntries]; |
| } |
| |
| /* |
| * addentries to the ledgers |
| */ |
| for (int j = 0; j < numEntries; j++) { |
| for (int i = 0; i < numOfActiveLedgers; i++) { |
| positions[i][j] = entryLogger.addEntry((long) i, generateEntry(i, j)); |
| long entryLogId = (positions[i][j] >> 32L); |
| /** |
| * |
| * Though EntryLogFilePreAllocation is enabled, Since things are not done concurrently here, |
| * entryLogIds will be sequential. |
| */ |
| Assert.assertEquals("EntryLogId for ledger: " + i, i, entryLogId); |
| } |
| } |
| |
| /* |
| * read the entries which are written |
| */ |
| for (int j = 0; j < numEntries; j++) { |
| for (int i = 0; i < numOfActiveLedgers; i++) { |
| String expectedValue = "ledger-" + i + "-" + j; |
| ByteBuf buf = entryLogger.readEntry(i, j, positions[i][j]); |
| long ledgerId = buf.readLong(); |
| long entryId = buf.readLong(); |
| byte[] data = new byte[buf.readableBytes()]; |
| buf.readBytes(data); |
| assertEquals("LedgerId ", i, ledgerId); |
| assertEquals("EntryId ", j, entryId); |
| assertEquals("Entry Data ", expectedValue, new String(data)); |
| } |
| } |
| |
| for (long i = 0; i < numOfActiveLedgers; i++) { |
| entryLogManagerBase.createNewLog(i); |
| } |
| |
| entryLogManagerBase.flushRotatedLogs(); |
| |
| // reading after flush of rotatedlogs |
| for (int j = 0; j < numEntries; j++) { |
| for (int i = 0; i < numOfActiveLedgers; i++) { |
| String expectedValue = "ledger-" + i + "-" + j; |
| ByteBuf buf = entryLogger.readEntry(i, j, positions[i][j]); |
| long ledgerId = buf.readLong(); |
| long entryId = buf.readLong(); |
| byte[] data = new byte[buf.readableBytes()]; |
| buf.readBytes(data); |
| assertEquals("LedgerId ", i, ledgerId); |
| assertEquals("EntryId ", j, entryId); |
| assertEquals("Entry Data ", expectedValue, new String(data)); |
| } |
| } |
| } |
| |
| class ReadTask implements Callable<Boolean> { |
| long ledgerId; |
| int entryId; |
| long position; |
| EntryLogger entryLogger; |
| |
| ReadTask(long ledgerId, int entryId, long position, EntryLogger entryLogger) { |
| this.ledgerId = ledgerId; |
| this.entryId = entryId; |
| this.position = position; |
| this.entryLogger = entryLogger; |
| } |
| |
| @Override |
| public Boolean call() throws IOException { |
| try { |
| ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId); |
| ByteBuf actualByteBuf = entryLogger.readEntry(ledgerId, entryId, position); |
| if (!expectedByteBuf.equals(actualByteBuf)) { |
| LOG.error("Expected Entry: {} Actual Entry: {}", expectedByteBuf.toString(Charset.defaultCharset()), |
| actualByteBuf.toString(Charset.defaultCharset())); |
| throw new IOException("Expected Entry: " + expectedByteBuf.toString(Charset.defaultCharset()) |
| + " Actual Entry: " + actualByteBuf.toString(Charset.defaultCharset())); |
| } |
| } catch (IOException e) { |
| LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); |
| throw new IOException("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, |
| e); |
| } |
| return true; |
| } |
| } |
| |
| /* |
| * test concurrent read operations of entries from flushed rotatedlogs with entryLogPerLedgerEnabled |
| */ |
| @Test |
| public void testConcurrentReadCallsAfterEntryLogsAreRotated() throws Exception { |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setEntryLogPerLedgerEnabled(true); |
| conf.setFlushIntervalInBytes(1000 * 25); |
| conf.setLedgerDirNames(createAndGetLedgerDirs(3)); |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| |
| EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| int numOfActiveLedgers = 15; |
| int numEntries = 2000; |
| final AtomicLongArray positions = new AtomicLongArray(numOfActiveLedgers * numEntries); |
| EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) |
| entryLogger.getEntryLogManager(); |
| |
| for (int i = 0; i < numOfActiveLedgers; i++) { |
| for (int j = 0; j < numEntries; j++) { |
| positions.set(i * numEntries + j, entryLogger.addEntry((long) i, generateEntry(i, j))); |
| long entryLogId = (positions.get(i * numEntries + j) >> 32L); |
| /** |
| * |
| * Though EntryLogFilePreAllocation is enabled, Since things are not done concurrently here, entryLogIds |
| * will be sequential. |
| */ |
| Assert.assertEquals("EntryLogId for ledger: " + i, i, entryLogId); |
| } |
| } |
| |
| for (long i = 0; i < numOfActiveLedgers; i++) { |
| entryLogManager.createNewLog(i); |
| } |
| entryLogManager.flushRotatedLogs(); |
| |
| // reading after flush of rotatedlogs |
| ArrayList<ReadTask> readTasks = new ArrayList<ReadTask>(); |
| for (int i = 0; i < numOfActiveLedgers; i++) { |
| for (int j = 0; j < numEntries; j++) { |
| readTasks.add(new ReadTask(i, j, positions.get(i * numEntries + j), entryLogger)); |
| } |
| } |
| |
| ExecutorService executor = Executors.newFixedThreadPool(40); |
| executor.invokeAll(readTasks).forEach((future) -> { |
| try { |
| future.get(); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| LOG.error("Read/Flush task failed because of InterruptedException", ie); |
| Assert.fail("Read/Flush task interrupted"); |
| } catch (Exception ex) { |
| LOG.error("Read/Flush task failed because of exception", ex); |
| Assert.fail("Read/Flush task failed " + ex.getMessage()); |
| } |
| }); |
| } |
| |
| /** |
| * testcase to validate when ledgerdirs become full and eventually all |
| * ledgerdirs become full. Later a ledgerdir becomes writable. |
| */ |
| @Test |
| public void testEntryLoggerAddEntryWhenLedgerDirsAreFull() throws Exception { |
| int numberOfLedgerDirs = 3; |
| List<File> ledgerDirs = new ArrayList<File>(); |
| String[] ledgerDirsPath = new String[numberOfLedgerDirs]; |
| List<File> curDirs = new ArrayList<File>(); |
| |
| File ledgerDir; |
| File curDir; |
| for (int i = 0; i < numberOfLedgerDirs; i++) { |
| ledgerDir = createTempDir("bkTest", ".dir").getAbsoluteFile(); |
| curDir = BookieImpl.getCurrentDirectory(ledgerDir); |
| BookieImpl.checkDirectoryStructure(curDir); |
| ledgerDirs.add(ledgerDir); |
| ledgerDirsPath[i] = ledgerDir.getPath(); |
| curDirs.add(curDir); |
| } |
| |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| // pre-allocation is disabled |
| conf.setEntryLogFilePreAllocationEnabled(false); |
| conf.setEntryLogPerLedgerEnabled(true); |
| conf.setLedgerDirNames(ledgerDirsPath); |
| |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| |
| EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) |
| entryLogger.getEntryLogManager(); |
| Assert.assertEquals("EntryLogManager class type", EntryLogManagerForEntryLogPerLedger.class, |
| entryLogManager.getClass()); |
| |
| entryLogger.addEntry(0L, generateEntry(0, 1)); |
| entryLogger.addEntry(1L, generateEntry(1, 1)); |
| entryLogger.addEntry(2L, generateEntry(2, 1)); |
| |
| File ledgerDirForLedger0 = entryLogManager.getCurrentLogForLedger(0L).getLogFile().getParentFile(); |
| File ledgerDirForLedger1 = entryLogManager.getCurrentLogForLedger(1L).getLogFile().getParentFile(); |
| File ledgerDirForLedger2 = entryLogManager.getCurrentLogForLedger(2L).getLogFile().getParentFile(); |
| |
| Set<File> ledgerDirsSet = new HashSet<File>(); |
| ledgerDirsSet.add(ledgerDirForLedger0); |
| ledgerDirsSet.add(ledgerDirForLedger1); |
| ledgerDirsSet.add(ledgerDirForLedger2); |
| |
| /* |
| * since there are 3 ledgerdirs, entrylogs for all the 3 ledgers should be in different ledgerdirs. |
| */ |
| Assert.assertEquals("Current active LedgerDirs size", 3, ledgerDirs.size()); |
| Assert.assertEquals("Number of rotated logchannels", 0, entryLogManager.getRotatedLogChannels().size()); |
| |
| /* |
| * ledgerDirForLedger0 is added to filledDirs, for ledger0 new entrylog should not be created in |
| * ledgerDirForLedger0 |
| */ |
| ledgerDirsManager.addToFilledDirs(ledgerDirForLedger0); |
| addEntryAndValidateFolders(entryLogger, entryLogManager, 2, ledgerDirForLedger0, false, ledgerDirForLedger1, |
| ledgerDirForLedger2); |
| Assert.assertEquals("Number of rotated logchannels", 1, entryLogManager.getRotatedLogChannels().size()); |
| |
| /* |
| * ledgerDirForLedger1 is also added to filledDirs, so for all the ledgers new entryLogs should be in |
| * ledgerDirForLedger2 |
| */ |
| ledgerDirsManager.addToFilledDirs(ledgerDirForLedger1); |
| addEntryAndValidateFolders(entryLogger, entryLogManager, 3, ledgerDirForLedger2, true, ledgerDirForLedger2, |
| ledgerDirForLedger2); |
| Assert.assertTrue("Number of rotated logchannels", (2 <= entryLogManager.getRotatedLogChannels().size()) |
| && (entryLogManager.getRotatedLogChannels().size() <= 3)); |
| int numOfRotatedLogChannels = entryLogManager.getRotatedLogChannels().size(); |
| |
| /* |
| * since ledgerDirForLedger2 is added to filleddirs, all the dirs are full. If all the dirs are full then it |
| * will continue to use current entrylogs for new entries instead of creating new one. So for all the ledgers |
| * ledgerdirs should be same as before - ledgerDirForLedger2 |
| */ |
| ledgerDirsManager.addToFilledDirs(ledgerDirForLedger2); |
| addEntryAndValidateFolders(entryLogger, entryLogManager, 4, ledgerDirForLedger2, true, ledgerDirForLedger2, |
| ledgerDirForLedger2); |
| Assert.assertEquals("Number of rotated logchannels", numOfRotatedLogChannels, |
| entryLogManager.getRotatedLogChannels().size()); |
| |
| /* |
| * ledgerDirForLedger1 is added back to writableDirs, so new entrylog for all the ledgers should be created in |
| * ledgerDirForLedger1 |
| */ |
| ledgerDirsManager.addToWritableDirs(ledgerDirForLedger1, true); |
| addEntryAndValidateFolders(entryLogger, entryLogManager, 4, ledgerDirForLedger1, true, ledgerDirForLedger1, |
| ledgerDirForLedger1); |
| Assert.assertEquals("Number of rotated logchannels", numOfRotatedLogChannels + 3, |
| entryLogManager.getRotatedLogChannels().size()); |
| } |
| |
| /* |
| * in this method we add an entry and validate the ledgerdir of the |
| * currentLogForLedger against the provided expected ledgerDirs. |
| */ |
| void addEntryAndValidateFolders(EntryLogger entryLogger, EntryLogManagerBase entryLogManager, int entryId, |
| File expectedDirForLedger0, boolean equalsForLedger0, File expectedDirForLedger1, |
| File expectedDirForLedger2) throws IOException { |
| entryLogger.addEntry(0L, generateEntry(0, entryId)); |
| entryLogger.addEntry(1L, generateEntry(1, entryId)); |
| entryLogger.addEntry(2L, generateEntry(2, entryId)); |
| |
| if (equalsForLedger0) { |
| Assert.assertEquals("LedgerDir for ledger 0 after adding entry " + entryId, expectedDirForLedger0, |
| entryLogManager.getCurrentLogForLedger(0L).getLogFile().getParentFile()); |
| } else { |
| Assert.assertNotEquals("LedgerDir for ledger 0 after adding entry " + entryId, expectedDirForLedger0, |
| entryLogManager.getCurrentLogForLedger(0L).getLogFile().getParentFile()); |
| } |
| Assert.assertEquals("LedgerDir for ledger 1 after adding entry " + entryId, expectedDirForLedger1, |
| entryLogManager.getCurrentLogForLedger(1L).getLogFile().getParentFile()); |
| Assert.assertEquals("LedgerDir for ledger 2 after adding entry " + entryId, expectedDirForLedger2, |
| entryLogManager.getCurrentLogForLedger(2L).getLogFile().getParentFile()); |
| } |
| |
| /* |
| * entries added using entrylogger with entryLogPerLedger enabled and the same entries are read using entrylogger |
| * with entryLogPerLedger disabled |
| */ |
| @Test |
| public void testSwappingEntryLogManagerFromEntryLogPerLedgerToSingle() throws Exception { |
| testSwappingEntryLogManager(true, false); |
| } |
| |
| /* |
| * entries added using entrylogger with entryLogPerLedger disabled and the same entries are read using entrylogger |
| * with entryLogPerLedger enabled |
| */ |
| @Test |
| public void testSwappingEntryLogManagerFromSingleToEntryLogPerLedger() throws Exception { |
| testSwappingEntryLogManager(false, true); |
| } |
| |
| public void testSwappingEntryLogManager(boolean initialEntryLogPerLedgerEnabled, |
| boolean laterEntryLogPerLedgerEnabled) throws Exception { |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| conf.setEntryLogPerLedgerEnabled(initialEntryLogPerLedgerEnabled); |
| conf.setLedgerDirNames(createAndGetLedgerDirs(2)); |
| // pre allocation enabled |
| conf.setEntryLogFilePreAllocationEnabled(true); |
| LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| |
| EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); |
| EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); |
| Assert.assertEquals( |
| "EntryLogManager class type", initialEntryLogPerLedgerEnabled |
| ? EntryLogManagerForEntryLogPerLedger.class : EntryLogManagerForSingleEntryLog.class, |
| entryLogManager.getClass()); |
| |
| int numOfActiveLedgers = 10; |
| int numEntries = 10; |
| long[][] positions = new long[numOfActiveLedgers][]; |
| for (int i = 0; i < numOfActiveLedgers; i++) { |
| positions[i] = new long[numEntries]; |
| } |
| |
| /* |
| * addentries to the ledgers |
| */ |
| for (int j = 0; j < numEntries; j++) { |
| for (int i = 0; i < numOfActiveLedgers; i++) { |
| positions[i][j] = entryLogger.addEntry((long) i, generateEntry(i, j)); |
| long entryLogId = (positions[i][j] >> 32L); |
| if (initialEntryLogPerLedgerEnabled) { |
| Assert.assertEquals("EntryLogId for ledger: " + i, i, entryLogId); |
| } else { |
| Assert.assertEquals("EntryLogId for ledger: " + i, 0, entryLogId); |
| } |
| } |
| } |
| |
| for (long i = 0; i < numOfActiveLedgers; i++) { |
| entryLogManager.createNewLog(i); |
| } |
| |
| /** |
| * since new entrylog is created for all the ledgers, the previous |
| * entrylogs must be rotated and with the following flushRotatedLogs |
| * call they should be forcewritten and file should be closed. |
| */ |
| entryLogManager.flushRotatedLogs(); |
| |
| /* |
| * new entrylogger and entryLogManager are created with |
| * 'laterEntryLogPerLedgerEnabled' conf |
| */ |
| conf.setEntryLogPerLedgerEnabled(laterEntryLogPerLedgerEnabled); |
| LedgerDirsManager newLedgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| EntryLogger newEntryLogger = new EntryLogger(conf, newLedgerDirsManager); |
| EntryLogManager newEntryLogManager = newEntryLogger.getEntryLogManager(); |
| Assert.assertEquals("EntryLogManager class type", |
| laterEntryLogPerLedgerEnabled ? EntryLogManagerForEntryLogPerLedger.class |
| : EntryLogManagerForSingleEntryLog.class, |
| newEntryLogManager.getClass()); |
| |
| /* |
| * read the entries (which are written with previous entrylogger) with |
| * new entrylogger |
| */ |
| for (int j = 0; j < numEntries; j++) { |
| for (int i = 0; i < numOfActiveLedgers; i++) { |
| String expectedValue = "ledger-" + i + "-" + j; |
| ByteBuf buf = newEntryLogger.readEntry(i, j, positions[i][j]); |
| long ledgerId = buf.readLong(); |
| long entryId = buf.readLong(); |
| byte[] data = new byte[buf.readableBytes()]; |
| buf.readBytes(data); |
| assertEquals("LedgerId ", i, ledgerId); |
| assertEquals("EntryId ", j, entryId); |
| assertEquals("Entry Data ", expectedValue, new String(data)); |
| } |
| } |
| } |
| |
| } |