| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.bookkeeper.bookie; |
| |
| import static com.google.common.base.Charsets.UTF_8; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.ByteBufUtil; |
| import io.netty.buffer.UnpooledByteBufAllocator; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.FileChannel; |
| import java.security.GeneralSecurityException; |
| import java.util.Arrays; |
| import java.util.Collections; |
| |
| import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo; |
| import org.apache.bookkeeper.client.BookKeeper; |
| import org.apache.bookkeeper.common.util.Watcher; |
| import org.apache.bookkeeper.conf.ServerConfiguration; |
| import org.apache.bookkeeper.proto.checksum.DigestManager; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.apache.bookkeeper.util.DiskChecker; |
| import org.apache.bookkeeper.util.SnapshotMap; |
| import org.apache.commons.io.FileUtils; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| /** |
| * Test cases for IndexPersistenceMgr. |
| */ |
| public class IndexPersistenceMgrTest { |
| |
| ServerConfiguration conf; |
| File journalDir, ledgerDir; |
| LedgerDirsManager ledgerDirsManager; |
| LedgerDirsMonitor ledgerMonitor; |
| |
| @Before |
| public void setUp() throws Exception { |
| journalDir = File.createTempFile("IndexPersistenceMgr", "Journal"); |
| journalDir.delete(); |
| journalDir.mkdir(); |
| ledgerDir = File.createTempFile("IndexPersistenceMgr", "Ledger"); |
| ledgerDir.delete(); |
| ledgerDir.mkdir(); |
| // Create current directories |
| Bookie.getCurrentDirectory(journalDir).mkdir(); |
| Bookie.getCurrentDirectory(ledgerDir).mkdir(); |
| |
| conf = new ServerConfiguration(); |
| conf.setMetadataServiceUri(null); |
| conf.setJournalDirName(journalDir.getPath()); |
| conf.setLedgerDirNames(new String[] { ledgerDir.getPath() }); |
| |
| ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| ledgerMonitor = new LedgerDirsMonitor(conf, |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()), |
| Collections.singletonList(ledgerDirsManager)); |
| ledgerMonitor.init(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| ledgerMonitor.shutdown(); |
| FileUtils.deleteDirectory(journalDir); |
| FileUtils.deleteDirectory(ledgerDir); |
| } |
| |
| private IndexPersistenceMgr createIndexPersistenceManager(int openFileLimit) throws Exception { |
| ServerConfiguration newConf = new ServerConfiguration(); |
| newConf.addConfiguration(conf); |
| newConf.setOpenFileLimit(openFileLimit); |
| |
| return new IndexPersistenceMgr( |
| newConf.getPageSize(), newConf.getPageSize() / LedgerEntryPage.getIndexEntrySize(), |
| newConf, new SnapshotMap<Long, Boolean>(), ledgerDirsManager, NullStatsLogger.INSTANCE); |
| } |
| |
| private static void getNumFileInfos(IndexPersistenceMgr indexPersistenceMgr, |
| int numFiles, byte[] masterKey) throws Exception { |
| for (int i = 0; i < numFiles; i++) { |
| indexPersistenceMgr.getFileInfo((long) i, masterKey); |
| } |
| } |
| |
| @Test |
| public void testEvictFileInfoWhenUnderlyingFileExists() throws Exception { |
| evictFileInfoTest(true); |
| } |
| |
| @Test |
| public void testEvictFileInfoWhenUnderlyingFileDoesntExist() throws Exception { |
| evictFileInfoTest(false); |
| } |
| |
| private void evictFileInfoTest(boolean createFile) throws Exception { |
| IndexPersistenceMgr indexPersistenceMgr = createIndexPersistenceManager(2); |
| try { |
| long lid = 99999L; |
| byte[] masterKey = "evict-file-info".getBytes(UTF_8); |
| // get file info and make sure the file created |
| FileInfo fi = indexPersistenceMgr.getFileInfo(lid, masterKey); |
| if (createFile) { |
| fi.checkOpen(true); |
| } |
| fi.setFenced(); |
| |
| // fill up the cache to evict file infos |
| getNumFileInfos(indexPersistenceMgr, 10, masterKey); |
| |
| // get the file info again, state should have been flushed |
| fi = indexPersistenceMgr.getFileInfo(lid, masterKey); |
| assertTrue("Fence bit should be persisted", fi.isFenced()); |
| } finally { |
| indexPersistenceMgr.close(); |
| } |
| } |
| |
| final long lid = 1L; |
| final byte[] masterKey = "write".getBytes(); |
| |
| @Test |
| public void testGetFileInfoReadBeforeWrite() throws Exception { |
| IndexPersistenceMgr indexPersistenceMgr = null; |
| try { |
| indexPersistenceMgr = createIndexPersistenceManager(1); |
| // get the file info for read |
| try { |
| indexPersistenceMgr.getFileInfo(lid, null); |
| fail("Should fail get file info for reading if the file doesn't exist"); |
| } catch (Bookie.NoLedgerException nle) { |
| // exepcted |
| } |
| assertEquals(0, indexPersistenceMgr.writeFileInfoCache.size()); |
| assertEquals(0, indexPersistenceMgr.readFileInfoCache.size()); |
| |
| CachedFileInfo writeFileInfo = indexPersistenceMgr.getFileInfo(lid, masterKey); |
| assertEquals(2, writeFileInfo.getRefCount()); |
| assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size()); |
| assertEquals(0, indexPersistenceMgr.readFileInfoCache.size()); |
| writeFileInfo.release(); |
| assertEquals(1, writeFileInfo.getRefCount()); |
| } finally { |
| if (null != indexPersistenceMgr) { |
| indexPersistenceMgr.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testGetFileInfoWriteBeforeRead() throws Exception { |
| IndexPersistenceMgr indexPersistenceMgr = null; |
| try { |
| indexPersistenceMgr = createIndexPersistenceManager(1); |
| |
| CachedFileInfo writeFileInfo = indexPersistenceMgr.getFileInfo(lid, masterKey); |
| assertEquals(2, writeFileInfo.getRefCount()); |
| assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size()); |
| assertEquals(0, indexPersistenceMgr.readFileInfoCache.size()); |
| writeFileInfo.release(); |
| |
| CachedFileInfo readFileInfo = indexPersistenceMgr.getFileInfo(lid, null); |
| assertEquals(3, readFileInfo.getRefCount()); |
| assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size()); |
| assertEquals(1, indexPersistenceMgr.readFileInfoCache.size()); |
| readFileInfo.release(); |
| assertEquals(2, writeFileInfo.getRefCount()); |
| assertEquals(2, readFileInfo.getRefCount()); |
| } finally { |
| if (null != indexPersistenceMgr) { |
| indexPersistenceMgr.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testReadFileInfoCacheEviction() throws Exception { |
| IndexPersistenceMgr indexPersistenceMgr = null; |
| try { |
| indexPersistenceMgr = createIndexPersistenceManager(1); |
| for (int i = 0; i < 3; i++) { |
| CachedFileInfo fileInfo = indexPersistenceMgr.getFileInfo(lid + i, masterKey); |
| // We need to make sure index file is created, otherwise the test case can be flaky |
| fileInfo.checkOpen(true); |
| fileInfo.release(); |
| |
| // load into read cache also |
| indexPersistenceMgr.getFileInfo(lid + i, null).release(); |
| } |
| |
| indexPersistenceMgr.getFileInfo(lid, masterKey); |
| assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size()); |
| assertEquals(2, indexPersistenceMgr.readFileInfoCache.size()); |
| |
| // trigger file info eviction on read file info cache |
| for (int i = 1; i <= 2; i++) { |
| indexPersistenceMgr.getFileInfo(lid + i, null); |
| } |
| assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size()); |
| assertEquals(2, indexPersistenceMgr.readFileInfoCache.size()); |
| |
| CachedFileInfo fileInfo = indexPersistenceMgr.writeFileInfoCache.asMap().get(lid); |
| assertNotNull(fileInfo); |
| assertEquals(2, fileInfo.getRefCount()); |
| fileInfo = indexPersistenceMgr.writeFileInfoCache.asMap().get(lid + 1); |
| assertNull(fileInfo); |
| fileInfo = indexPersistenceMgr.writeFileInfoCache.asMap().get(lid + 2); |
| assertNull(fileInfo); |
| fileInfo = indexPersistenceMgr.readFileInfoCache.asMap().get(lid); |
| assertNull(fileInfo); |
| fileInfo = indexPersistenceMgr.readFileInfoCache.asMap().get(lid + 1); |
| assertNotNull(fileInfo); |
| assertEquals(2, fileInfo.getRefCount()); |
| fileInfo = indexPersistenceMgr.readFileInfoCache.asMap().get(lid + 2); |
| assertNotNull(fileInfo); |
| assertEquals(2, fileInfo.getRefCount()); |
| } finally { |
| if (null != indexPersistenceMgr) { |
| indexPersistenceMgr.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testEvictionShouldNotAffectLongPollRead() throws Exception { |
| IndexPersistenceMgr indexPersistenceMgr = null; |
| Watcher<LastAddConfirmedUpdateNotification> watcher = notification -> notification.recycle(); |
| try { |
| indexPersistenceMgr = createIndexPersistenceManager(1); |
| indexPersistenceMgr.getFileInfo(lid, masterKey); |
| indexPersistenceMgr.getFileInfo(lid, null); |
| indexPersistenceMgr.updateLastAddConfirmed(lid, 1); |
| // watch should succeed because ledger is not evicted or closed |
| assertTrue( |
| indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, watcher)); |
| // now evict ledger 1 from write cache |
| indexPersistenceMgr.getFileInfo(lid + 1, masterKey); |
| // even if ledger 1 is evicted from write cache, watcher should still succeed |
| assertTrue( |
| indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, watcher)); |
| // now evict ledger 1 from read cache |
| indexPersistenceMgr.getFileInfo(lid + 2, masterKey); |
| indexPersistenceMgr.getFileInfo(lid + 2, null); |
| // even if ledger 1 is evicted from both cache, watcher should still succeed because it |
| // will create a new FileInfo when cache miss |
| assertTrue( |
| indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, watcher)); |
| } finally { |
| if (null != indexPersistenceMgr) { |
| indexPersistenceMgr.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testEvictBeforeReleaseRace() throws Exception { |
| IndexPersistenceMgr indexPersistenceMgr = null; |
| Watcher<LastAddConfirmedUpdateNotification> watcher = notification -> notification.recycle(); |
| try { |
| indexPersistenceMgr = createIndexPersistenceManager(1); |
| |
| indexPersistenceMgr.getFileInfo(1L, masterKey); |
| indexPersistenceMgr.getFileInfo(2L, masterKey); |
| indexPersistenceMgr.getFileInfo(3L, masterKey); |
| indexPersistenceMgr.getFileInfo(4L, masterKey); |
| |
| CachedFileInfo fi = indexPersistenceMgr.getFileInfo(1L, masterKey); |
| |
| // trigger eviction |
| indexPersistenceMgr.getFileInfo(2L, masterKey); |
| indexPersistenceMgr.getFileInfo(3L, null); |
| indexPersistenceMgr.getFileInfo(4L, null); |
| |
| Thread.sleep(1000); |
| |
| fi.setFenced(); |
| fi.release(); |
| |
| assertTrue(indexPersistenceMgr.isFenced(1)); |
| } finally { |
| if (null != indexPersistenceMgr) { |
| indexPersistenceMgr.close(); |
| } |
| } |
| } |
| |
| /* |
| * In this testcase index files (FileInfos) are precreated with different |
| * FileInfo header versions (FileInfo.V0 and FileInfo.V1) and it is |
| * validated that the current implementation of IndexPersistenceMgr (and |
| * corresponding FileInfo) is able to function as per the specifications of |
| * FileInfo header version. If it is FileInfo.V0 then explicitLac is not |
| * persisted and if it is FileInfo.V1 then explicitLac is persisted. |
| */ |
| @Test |
| public void testFileInfosOfVariousHeaderVersions() throws Exception { |
| IndexPersistenceMgr indexPersistenceMgr = null; |
| try { |
| indexPersistenceMgr = createIndexPersistenceManager(1); |
| long ledgerIdWithVersionZero = 25L; |
| validateFileInfo(indexPersistenceMgr, ledgerIdWithVersionZero, FileInfo.V0); |
| |
| long ledgerIdWithVersionOne = 135L; |
| validateFileInfo(indexPersistenceMgr, ledgerIdWithVersionOne, FileInfo.V1); |
| } finally { |
| if (null != indexPersistenceMgr) { |
| indexPersistenceMgr.close(); |
| } |
| } |
| } |
| |
| void validateFileInfo(IndexPersistenceMgr indexPersistenceMgr, long ledgerId, int headerVersion) |
| throws IOException, GeneralSecurityException { |
| BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32; |
| boolean getUseV2WireProtocol = true; |
| |
| preCreateFileInfoForLedger(ledgerId, headerVersion); |
| DigestManager digestManager = DigestManager.instantiate(ledgerId, masterKey, |
| BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT, |
| getUseV2WireProtocol); |
| |
| CachedFileInfo fileInfo = indexPersistenceMgr.getFileInfo(ledgerId, masterKey); |
| fileInfo.readHeader(); |
| assertEquals("ExplicitLac should be null", null, fileInfo.getExplicitLac()); |
| assertEquals("Header Version should match with precreated fileinfos headerversion", headerVersion, |
| fileInfo.headerVersion); |
| assertTrue("Masterkey should match with precreated fileinfos masterkey", |
| Arrays.equals(masterKey, fileInfo.masterKey)); |
| long explicitLac = 22; |
| ByteBuf explicitLacByteBuf = digestManager.computeDigestAndPackageForSendingLac(explicitLac).getBuffer(0); |
| explicitLacByteBuf.markReaderIndex(); |
| indexPersistenceMgr.setExplicitLac(ledgerId, explicitLacByteBuf); |
| explicitLacByteBuf.resetReaderIndex(); |
| assertEquals("explicitLac ByteBuf contents should match", 0, |
| ByteBufUtil.compare(explicitLacByteBuf, indexPersistenceMgr.getExplicitLac(ledgerId))); |
| /* |
| * release fileInfo untill it is marked dead and closed, so that |
| * contents of it are persisted. |
| */ |
| while (fileInfo.refCount.get() != FileInfoBackingCache.DEAD_REF) { |
| fileInfo.release(); |
| } |
| /* |
| * reopen the fileinfo and readHeader, so that whatever was persisted |
| * would be read. |
| */ |
| fileInfo = indexPersistenceMgr.getFileInfo(ledgerId, masterKey); |
| fileInfo.readHeader(); |
| assertEquals("Header Version should match with precreated fileinfos headerversion even after reopening", |
| headerVersion, fileInfo.headerVersion); |
| assertTrue("Masterkey should match with precreated fileinfos masterkey", |
| Arrays.equals(masterKey, fileInfo.masterKey)); |
| if (headerVersion == FileInfo.V0) { |
| assertEquals("Since it is V0 Header, explicitLac will not be persisted and should be null after reopening", |
| null, indexPersistenceMgr.getExplicitLac(ledgerId)); |
| } else { |
| explicitLacByteBuf.resetReaderIndex(); |
| assertEquals("Since it is V1 Header, explicitLac will be persisted and should not be null after reopening", |
| 0, ByteBufUtil.compare(explicitLacByteBuf, indexPersistenceMgr.getExplicitLac(ledgerId))); |
| } |
| } |
| |
| void preCreateFileInfoForLedger(long ledgerId, int headerVersion) throws IOException { |
| File ledgerCurDir = Bookie.getCurrentDirectory(ledgerDir); |
| String ledgerName = IndexPersistenceMgr.getLedgerName(ledgerId); |
| File indexFile = new File(ledgerCurDir, ledgerName); |
| indexFile.getParentFile().mkdirs(); |
| indexFile.createNewFile(); |
| /* |
| * precreate index file (FileInfo) for the ledger with specified |
| * headerversion. Even in FileInfo.V1 case, it is valid for |
| * explicitLacBufLength to be 0. If it is 0, then explicitLac is |
| * considered null (not set). |
| */ |
| try (RandomAccessFile raf = new RandomAccessFile(indexFile, "rw")) { |
| FileChannel fcForIndexFile = raf.getChannel(); |
| ByteBuffer bb = ByteBuffer.allocate((int) FileInfo.START_OF_DATA); |
| bb.putInt(FileInfo.SIGNATURE); |
| bb.putInt(headerVersion); |
| bb.putInt(masterKey.length); |
| bb.put(masterKey); |
| // statebits |
| bb.putInt(0); |
| if (headerVersion == FileInfo.V1) { |
| // explicitLacBufLength |
| bb.putInt(0); |
| } |
| bb.rewind(); |
| fcForIndexFile.position(0); |
| fcForIndexFile.write(bb); |
| fcForIndexFile.close(); |
| } |
| } |
| } |