| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| package org.apache.bookkeeper.bookie; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.MoreObjects; |
| import com.google.common.collect.MapMaker; |
| import com.google.common.collect.Sets; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.ByteBufAllocator; |
| import io.netty.buffer.PooledByteBufAllocator; |
| import io.netty.buffer.Unpooled; |
| import io.netty.util.concurrent.FastThreadLocal; |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FilenameFilter; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.RandomAccessFile; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.AsynchronousCloseException; |
| import java.nio.channels.FileChannel; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.regex.Pattern; |
| import org.apache.bookkeeper.bookie.storage.CompactionEntryLog; |
| import org.apache.bookkeeper.bookie.storage.EntryLogScanner; |
| import org.apache.bookkeeper.bookie.storage.EntryLogger; |
| import org.apache.bookkeeper.conf.ServerConfiguration; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.apache.bookkeeper.stats.StatsLogger; |
| import org.apache.bookkeeper.util.DiskChecker; |
| import org.apache.bookkeeper.util.HardLink; |
| import org.apache.bookkeeper.util.IOUtils; |
| import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap; |
| import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.BiConsumerLong; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This class manages the writing of the bookkeeper entries. All the new |
| * entries are written to a common log. The LedgerCache will have pointers |
| * into files created by this class with offsets into the files to find |
| * the actual ledger entry. The entry log files created by this class are |
| * identified by a long. |
| */ |
| public class DefaultEntryLogger implements EntryLogger { |
| private static final Logger LOG = LoggerFactory.getLogger(DefaultEntryLogger.class); |
| |
| @VisibleForTesting |
| static final int UNINITIALIZED_LOG_ID = -0xDEAD; |
| |
| static class BufferedLogChannel extends BufferedChannel { |
| private final long logId; |
| private final EntryLogMetadata entryLogMetadata; |
| private final File logFile; |
| private long ledgerIdAssigned = UNASSIGNED_LEDGERID; |
| |
| public BufferedLogChannel(ByteBufAllocator allocator, FileChannel fc, int writeCapacity, int readCapacity, |
| long logId, File logFile, long unpersistedBytesBound) throws IOException { |
| super(allocator, fc, writeCapacity, readCapacity, unpersistedBytesBound); |
| this.logId = logId; |
| this.entryLogMetadata = new EntryLogMetadata(logId); |
| this.logFile = logFile; |
| } |
| public long getLogId() { |
| return logId; |
| } |
| |
| public File getLogFile() { |
| return logFile; |
| } |
| |
| public void registerWrittenEntry(long ledgerId, long entrySize) { |
| entryLogMetadata.addLedgerSize(ledgerId, entrySize); |
| } |
| |
| public ConcurrentLongLongHashMap getLedgersMap() { |
| return entryLogMetadata.getLedgersMap(); |
| } |
| |
| public Long getLedgerIdAssigned() { |
| return ledgerIdAssigned; |
| } |
| |
| public void setLedgerIdAssigned(Long ledgerId) { |
| this.ledgerIdAssigned = ledgerId; |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(BufferedChannel.class) |
| .add("logId", logId) |
| .add("logFile", logFile) |
| .add("ledgerIdAssigned", ledgerIdAssigned) |
| .toString(); |
| } |
| |
| /** |
| * Append the ledger map at the end of the entry log. |
| * Updates the entry log file header with the offset and size of the map. |
| */ |
| void appendLedgersMap() throws IOException { |
| |
| long ledgerMapOffset = this.position(); |
| |
| ConcurrentLongLongHashMap ledgersMap = this.getLedgersMap(); |
| int numberOfLedgers = (int) ledgersMap.size(); |
| |
| // Write the ledgers map into several batches |
| |
| final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; |
| final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); |
| |
| try { |
| ledgersMap.forEach(new BiConsumerLong() { |
| int remainingLedgers = numberOfLedgers; |
| boolean startNewBatch = true; |
| int remainingInBatch = 0; |
| |
| @Override |
| public void accept(long ledgerId, long size) { |
| if (startNewBatch) { |
| int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); |
| int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; |
| |
| serializedMap.clear(); |
| serializedMap.writeInt(ledgerMapSize - 4); |
| serializedMap.writeLong(INVALID_LID); |
| serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); |
| serializedMap.writeInt(batchSize); |
| |
| startNewBatch = false; |
| remainingInBatch = batchSize; |
| } |
| // Dump the ledger in the current batch |
| serializedMap.writeLong(ledgerId); |
| serializedMap.writeLong(size); |
| --remainingLedgers; |
| |
| if (--remainingInBatch == 0) { |
| // Close current batch |
| try { |
| write(serializedMap); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| |
| startNewBatch = true; |
| } |
| } |
| }); |
| } catch (RuntimeException e) { |
| if (e.getCause() instanceof IOException) { |
| throw (IOException) e.getCause(); |
| } else { |
| throw e; |
| } |
| } finally { |
| serializedMap.release(); |
| } |
| // Flush the ledger's map out before we write the header. |
| // Otherwise the header might point to something that is not fully |
| // written |
| super.flush(); |
| |
| // Update the headers with the map offset and count of ledgers |
| ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); |
| mapInfo.putLong(ledgerMapOffset); |
| mapInfo.putInt(numberOfLedgers); |
| mapInfo.flip(); |
| this.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); |
| } |
| } |
| |
| private final LedgerDirsManager ledgerDirsManager; |
| private final boolean entryLogPerLedgerEnabled; |
| |
| final RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; |
| |
| /** |
| * locks for compaction log. |
| */ |
| private final Object compactionLogLock = new Object(); |
| |
| private volatile BufferedLogChannel compactionLogChannel; |
| |
| final EntryLoggerAllocator entryLoggerAllocator; |
| private final EntryLogManager entryLogManager; |
| |
| private final CopyOnWriteArrayList<EntryLogListener> listeners = new CopyOnWriteArrayList<EntryLogListener>(); |
| |
| private static final int HEADER_V0 = 0; // Old log file format (no ledgers map index) |
| private static final int HEADER_V1 = 1; // Introduced ledger map index |
| static final int HEADER_CURRENT_VERSION = HEADER_V1; |
| |
| private static class Header { |
| final int version; |
| final long ledgersMapOffset; |
| final int ledgersCount; |
| |
| Header(int version, long ledgersMapOffset, int ledgersCount) { |
| this.version = version; |
| this.ledgersMapOffset = ledgersMapOffset; |
| this.ledgersCount = ledgersCount; |
| } |
| } |
| |
| /** |
| * The 1K block at the head of the entry logger file |
| * that contains the fingerprint and meta-data. |
| * |
| * <pre> |
| * Header is composed of: |
| * Fingerprint: 4 bytes "BKLO" |
| * Log file HeaderVersion enum: 4 bytes |
| * Ledger map offset: 8 bytes |
| * Ledgers Count: 4 bytes |
| * </pre> |
| */ |
| static final int LOGFILE_HEADER_SIZE = 1024; |
| final ByteBuf logfileHeader = Unpooled.buffer(LOGFILE_HEADER_SIZE); |
| |
| static final int HEADER_VERSION_POSITION = 4; |
| static final int LEDGERS_MAP_OFFSET_POSITION = HEADER_VERSION_POSITION + 4; |
| |
| /** |
| * Ledgers map is composed of multiple parts that can be split into separated entries. Each of them is composed of: |
| * |
| * <pre> |
| * length: (4 bytes) [0-3] |
| * ledger id (-1): (8 bytes) [4 - 11] |
| * entry id: (8 bytes) [12-19] |
| * num ledgers stored in current metadata entry: (4 bytes) [20 - 23] |
| * ledger entries: sequence of (ledgerid, size) (8 + 8 bytes each) [24..] |
| * </pre> |
| */ |
| static final int LEDGERS_MAP_HEADER_SIZE = 4 + 8 + 8 + 4; |
| static final int LEDGERS_MAP_ENTRY_SIZE = 8 + 8; |
| |
| // Break the ledgers map into multiple batches, each of which can contain up to 10K ledgers |
| static final int LEDGERS_MAP_MAX_BATCH_SIZE = 10000; |
| |
| static final long INVALID_LID = -1L; |
| |
| // EntryId used to mark an entry (belonging to INVALID_ID) as a component of the serialized ledgers map |
| static final long LEDGERS_MAP_ENTRY_ID = -2L; |
| |
| static final int MIN_SANE_ENTRY_SIZE = 8 + 8; |
| static final long MB = 1024 * 1024; |
| |
| private final int maxSaneEntrySize; |
| |
| private final ByteBufAllocator allocator; |
| |
| final ServerConfiguration conf; |
| |
| /** |
| * Entry Log Listener. |
| */ |
| interface EntryLogListener { |
| /** |
| * Rotate a new entry log to write. |
| */ |
| void onRotateEntryLog(); |
| } |
| |
| public DefaultEntryLogger(ServerConfiguration conf) throws IOException { |
| this(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()))); |
| } |
| |
| /** |
| * Create an EntryLogger that stores it's log files in the given directories. |
| */ |
| public DefaultEntryLogger(ServerConfiguration conf, |
| LedgerDirsManager ledgerDirsManager) throws IOException { |
| this(conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT); |
| } |
| |
| public DefaultEntryLogger(ServerConfiguration conf, |
| LedgerDirsManager ledgerDirsManager, EntryLogListener listener, StatsLogger statsLogger, |
| ByteBufAllocator allocator) throws IOException { |
| //We reserve 500 bytes as overhead for the protocol. This is not 100% accurate |
| // but the protocol varies so an exact value is difficult to determine |
| this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500; |
| this.allocator = allocator; |
| this.ledgerDirsManager = ledgerDirsManager; |
| this.conf = conf; |
| entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled(); |
| if (listener != null) { |
| addListener(listener); |
| } |
| |
| // Initialize the entry log header buffer. This cannot be a static object |
| // since in our unit tests, we run multiple Bookies and thus EntryLoggers |
| // within the same JVM. All of these Bookie instances access this header |
| // so there can be race conditions when entry logs are rolled over and |
| // this header buffer is cleared before writing it into the new logChannel. |
| logfileHeader.writeBytes("BKLO".getBytes(UTF_8)); |
| logfileHeader.writeInt(HEADER_CURRENT_VERSION); |
| logfileHeader.writerIndex(LOGFILE_HEADER_SIZE); |
| |
| // Find the largest logId |
| long logId = INVALID_LID; |
| for (File dir : ledgerDirsManager.getAllLedgerDirs()) { |
| if (!dir.exists()) { |
| throw new FileNotFoundException( |
| "Entry log directory '" + dir + "' does not exist"); |
| } |
| long lastLogId = getLastLogId(dir); |
| if (lastLogId > logId) { |
| logId = lastLogId; |
| } |
| } |
| this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1); |
| this.entryLoggerAllocator = new EntryLoggerAllocator(conf, ledgerDirsManager, recentlyCreatedEntryLogsStatus, |
| logId, allocator); |
| if (entryLogPerLedgerEnabled) { |
| this.entryLogManager = new EntryLogManagerForEntryLogPerLedger(conf, ledgerDirsManager, |
| entryLoggerAllocator, listeners, recentlyCreatedEntryLogsStatus, statsLogger); |
| } else { |
| this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf, ledgerDirsManager, entryLoggerAllocator, |
| listeners, recentlyCreatedEntryLogsStatus); |
| } |
| } |
| |
| EntryLogManager getEntryLogManager() { |
| return entryLogManager; |
| } |
| |
| void addListener(EntryLogListener listener) { |
| if (null != listener) { |
| listeners.add(listener); |
| } |
| } |
| |
| /** |
| * If the log id of current writable channel is the same as entryLogId and the position |
| * we want to read might end up reading from a position in the write buffer of the |
| * buffered channel, route this read to the current logChannel. Else, |
| * read from the BufferedReadChannel that is provided. |
| * @param entryLogId |
| * @param channel |
| * @param buff remaining() on this bytebuffer tells us the last position that we |
| * expect to read. |
| * @param pos The starting position from where we want to read. |
| * @return |
| */ |
| private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuf buff, long pos) |
| throws IOException { |
| BufferedLogChannel bc = entryLogManager.getCurrentLogIfPresent(entryLogId); |
| if (null != bc) { |
| synchronized (bc) { |
| if (pos + buff.writableBytes() >= bc.getFileChannelPosition()) { |
| return bc.read(buff, pos); |
| } |
| } |
| } |
| return channel.read(buff, pos); |
| } |
| |
| /** |
| * A thread-local variable that wraps a mapping of log ids to bufferedchannels |
| * These channels should be used only for reading. logChannel is the one |
| * that is used for writes. |
| */ |
| private final ThreadLocal<Map<Long, BufferedReadChannel>> logid2Channel = |
| new ThreadLocal<Map<Long, BufferedReadChannel>>() { |
| @Override |
| public Map<Long, BufferedReadChannel> initialValue() { |
| // Since this is thread local there only one modifier |
| // We dont really need the concurrency, but we need to use |
| // the weak values. Therefore using the concurrency level of 1 |
| return new MapMaker().concurrencyLevel(1) |
| .weakValues() |
| .makeMap(); |
| } |
| }; |
| |
| /** |
| * Each thread local buffered read channel can share the same file handle because reads are not relative |
| * and don't cause a change in the channel's position. We use this map to store the file channels. Each |
| * file channel is mapped to a log id which represents an open log file. |
| */ |
| private final ConcurrentMap<Long, FileChannel> logid2FileChannel = new ConcurrentHashMap<Long, FileChannel>(); |
| |
| /** |
| * Put the logId, bc pair in the map responsible for the current thread. |
| * @param logId |
| * @param bc |
| */ |
| public BufferedReadChannel putInReadChannels(long logId, BufferedReadChannel bc) { |
| Map<Long, BufferedReadChannel> threadMap = logid2Channel.get(); |
| return threadMap.put(logId, bc); |
| } |
| |
| /** |
| * Remove all entries for this log file in each thread's cache. |
| * @param logId |
| */ |
| public void removeFromChannelsAndClose(long logId) { |
| FileChannel fileChannel = logid2FileChannel.remove(logId); |
| if (null != fileChannel) { |
| try { |
| fileChannel.close(); |
| } catch (IOException e) { |
| LOG.warn("Exception while closing channel for log file:" + logId); |
| } |
| } |
| } |
| |
| public BufferedReadChannel getFromChannels(long logId) { |
| return logid2Channel.get().get(logId); |
| } |
| |
| @VisibleForTesting |
| long getLeastUnflushedLogId() { |
| return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId(); |
| } |
| |
| @Override |
| public Set<Long> getFlushedLogIds() { |
| Set<Long> logIds = new HashSet<>(); |
| synchronized (recentlyCreatedEntryLogsStatus) { |
| for (File dir : ledgerDirsManager.getAllLedgerDirs()) { |
| if (dir.exists() && dir.isDirectory()) { |
| File[] files = dir.listFiles(file -> file.getName().endsWith(".log")); |
| if (files != null && files.length > 0) { |
| for (File f : files) { |
| long logId = fileName2LogId(f.getName()); |
| if (recentlyCreatedEntryLogsStatus.isFlushedLogId(logId)) { |
| logIds.add(logId); |
| } |
| } |
| } |
| } |
| } |
| } |
| return logIds; |
| } |
| |
| long getPreviousAllocatedEntryLogId() { |
| return entryLoggerAllocator.getPreallocatedLogId(); |
| } |
| |
| /** |
| * Get the current log file for compaction. |
| */ |
| private File getCurCompactionLogFile() { |
| synchronized (compactionLogLock) { |
| if (compactionLogChannel == null) { |
| return null; |
| } |
| return compactionLogChannel.getLogFile(); |
| } |
| } |
| |
| void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException { |
| entryLogManager.prepareSortedLedgerStorageCheckpoint(numBytesFlushed); |
| } |
| |
| void prepareEntryMemTableFlush() { |
| entryLogManager.prepareEntryMemTableFlush(); |
| } |
| |
| boolean commitEntryMemTableFlush() throws IOException { |
| return entryLogManager.commitEntryMemTableFlush(); |
| } |
| |
| /** |
| * get EntryLoggerAllocator, Just for tests. |
| */ |
| EntryLoggerAllocator getEntryLoggerAllocator() { |
| return entryLoggerAllocator; |
| } |
| |
| /** |
| * Remove entry log. |
| * |
| * @param entryLogId |
| * Entry Log File Id |
| */ |
| @Override |
| public boolean removeEntryLog(long entryLogId) { |
| removeFromChannelsAndClose(entryLogId); |
| File entryLogFile; |
| try { |
| entryLogFile = findFile(entryLogId); |
| } catch (FileNotFoundException e) { |
| LOG.error("Trying to delete an entryLog file that could not be found: " |
| + entryLogId + ".log"); |
| return false; |
| } |
| if (!entryLogFile.delete()) { |
| LOG.warn("Could not delete entry log file {}", entryLogFile); |
| } |
| return true; |
| } |
| |
| private long getLastLogId(File dir) { |
| long id = readLastLogId(dir); |
| // read success |
| if (id > 0) { |
| return id; |
| } |
| // read failed, scan the ledger directories to find biggest log id |
| File[] logFiles = dir.listFiles(file -> file.getName().endsWith(".log")); |
| List<Long> logs = new ArrayList<Long>(); |
| if (logFiles != null) { |
| for (File lf : logFiles) { |
| long logId = fileName2LogId(lf.getName()); |
| logs.add(logId); |
| } |
| } |
| // no log file found in this directory |
| if (0 == logs.size()) { |
| return INVALID_LID; |
| } |
| // order the collections |
| Collections.sort(logs); |
| return logs.get(logs.size() - 1); |
| } |
| |
| /** |
| * reads id from the "lastId" file in the given directory. |
| */ |
| private long readLastLogId(File f) { |
| FileInputStream fis; |
| try { |
| fis = new FileInputStream(new File(f, "lastId")); |
| } catch (FileNotFoundException e) { |
| return INVALID_LID; |
| } |
| try (BufferedReader br = new BufferedReader(new InputStreamReader(fis, UTF_8))) { |
| String lastIdString = br.readLine(); |
| return Long.parseLong(lastIdString, 16); |
| } catch (IOException | NumberFormatException e) { |
| return INVALID_LID; |
| } |
| } |
| |
| /** |
| * Flushes all rotated log channels. After log channels are flushed, |
| * move leastUnflushedLogId ptr to current logId. |
| */ |
| void checkpoint() throws IOException { |
| entryLogManager.checkpoint(); |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| entryLogManager.flush(); |
| } |
| |
| long addEntry(long ledger, ByteBuffer entry) throws IOException { |
| return entryLogManager.addEntry(ledger, Unpooled.wrappedBuffer(entry), true); |
| } |
| |
| long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException { |
| return entryLogManager.addEntry(ledger, entry, rollLog); |
| } |
| |
| @Override |
| public long addEntry(long ledger, ByteBuf entry) throws IOException { |
| return entryLogManager.addEntry(ledger, entry, true); |
| } |
| |
| private final FastThreadLocal<ByteBuf> sizeBuffer = new FastThreadLocal<ByteBuf>() { |
| @Override |
| protected ByteBuf initialValue() throws Exception { |
| // Max usage is size (4 bytes) + ledgerId (8 bytes) + entryid (8 bytes) |
| return Unpooled.buffer(4 + 8 + 8); |
| } |
| }; |
| |
| private long addEntryForCompaction(long ledgerId, ByteBuf entry) throws IOException { |
| synchronized (compactionLogLock) { |
| int entrySize = entry.readableBytes() + 4; |
| if (compactionLogChannel == null) { |
| createNewCompactionLog(); |
| } |
| |
| ByteBuf sizeBuffer = this.sizeBuffer.get(); |
| sizeBuffer.clear(); |
| sizeBuffer.writeInt(entry.readableBytes()); |
| compactionLogChannel.write(sizeBuffer); |
| |
| long pos = compactionLogChannel.position(); |
| compactionLogChannel.write(entry); |
| compactionLogChannel.registerWrittenEntry(ledgerId, entrySize); |
| return (compactionLogChannel.getLogId() << 32L) | pos; |
| } |
| } |
| |
| private void flushCompactionLog() throws IOException { |
| synchronized (compactionLogLock) { |
| if (compactionLogChannel != null) { |
| compactionLogChannel.appendLedgersMap(); |
| compactionLogChannel.flushAndForceWrite(false); |
| LOG.info("Flushed compaction log file {} with logId {}.", |
| compactionLogChannel.getLogFile(), |
| compactionLogChannel.getLogId()); |
| // since this channel is only used for writing, after flushing the channel, |
| // we had to close the underlying file channel. Otherwise, we might end up |
| // leaking fds which cause the disk spaces could not be reclaimed. |
| compactionLogChannel.close(); |
| } else { |
| throw new IOException("Failed to flush compaction log which has already been removed."); |
| } |
| } |
| } |
| |
| private void createNewCompactionLog() throws IOException { |
| synchronized (compactionLogLock) { |
| if (compactionLogChannel == null) { |
| compactionLogChannel = entryLogManager.createNewLogForCompaction(); |
| } |
| } |
| } |
| |
| /** |
| * Remove the current compaction log, usually invoked when compaction failed and |
| * we need to do some clean up to remove the compaction log file. |
| */ |
| private void removeCurCompactionLog() { |
| synchronized (compactionLogLock) { |
| if (compactionLogChannel != null) { |
| if (!compactionLogChannel.getLogFile().delete()) { |
| LOG.warn("Could not delete compaction log file {}", compactionLogChannel.getLogFile()); |
| } |
| |
| try { |
| compactionLogChannel.close(); |
| } catch (IOException e) { |
| LOG.error("Failed to close file channel for compaction log {}", compactionLogChannel.getLogId(), |
| e); |
| } |
| compactionLogChannel = null; |
| } |
| } |
| } |
| |
| static long logIdForOffset(long offset) { |
| return offset >> 32L; |
| } |
| |
| |
| static long posForOffset(long location) { |
| return location & 0xffffffffL; |
| } |
| |
| |
| /** |
| * Exception type for representing lookup errors. Useful for disambiguating different error |
| * conditions for reporting purposes. |
| */ |
| static class EntryLookupException extends Exception { |
| EntryLookupException(String message) { |
| super(message); |
| } |
| |
| /** |
| * Represents case where log file is missing. |
| */ |
| static class MissingLogFileException extends EntryLookupException { |
| MissingLogFileException(long ledgerId, long entryId, long entryLogId, long pos) { |
| super(String.format("Missing entryLog %d for ledgerId %d, entry %d at offset %d", |
| entryLogId, |
| ledgerId, |
| entryId, |
| pos)); |
| } |
| } |
| |
| /** |
| * Represents case where entry log is present, but does not contain the specified entry. |
| */ |
| static class MissingEntryException extends EntryLookupException { |
| MissingEntryException(long ledgerId, long entryId, long entryLogId, long pos) { |
| super(String.format("pos %d (entry %d for ledgerId %d) past end of entryLog %d", |
| pos, |
| entryId, |
| ledgerId, |
| entryLogId)); |
| } |
| } |
| |
| /** |
| * Represents case where log is present, but encoded entry length header is invalid. |
| */ |
| static class InvalidEntryLengthException extends EntryLookupException { |
| InvalidEntryLengthException(long ledgerId, long entryId, long entryLogId, long pos) { |
| super(String.format("Invalid entry length at pos %d (entry %d for ledgerId %d) for entryLog %d", |
| pos, |
| entryId, |
| ledgerId, |
| entryLogId)); |
| } |
| } |
| |
| /** |
| * Represents case where the entry at pos is wrong. |
| */ |
| static class WrongEntryException extends EntryLookupException { |
| WrongEntryException(long foundEntryId, long foundLedgerId, long ledgerId, |
| long entryId, long entryLogId, long pos) { |
| super(String.format( |
| "Found entry %d, ledger %d at pos %d entryLog %d, should have found entry %d for ledgerId %d", |
| foundEntryId, |
| foundLedgerId, |
| pos, |
| entryLogId, |
| entryId, |
| ledgerId)); |
| } |
| } |
| } |
| |
| private BufferedReadChannel getFCForEntryInternal( |
| long ledgerId, long entryId, long entryLogId, long pos) |
| throws EntryLookupException, IOException { |
| try { |
| return getChannelForLogId(entryLogId); |
| } catch (FileNotFoundException e) { |
| throw new EntryLookupException.MissingLogFileException(ledgerId, entryId, entryLogId, pos); |
| } |
| } |
| |
| private ByteBuf readEntrySize(long ledgerId, long entryId, long entryLogId, long pos, BufferedReadChannel fc) |
| throws EntryLookupException, IOException { |
| ByteBuf sizeBuff = sizeBuffer.get(); |
| sizeBuff.clear(); |
| |
| long entrySizePos = pos - 4; // we want to get the entrySize as well as the ledgerId and entryId |
| |
| try { |
| if (readFromLogChannel(entryLogId, fc, sizeBuff, entrySizePos) != sizeBuff.capacity()) { |
| throw new EntryLookupException.MissingEntryException(ledgerId, entryId, entryLogId, entrySizePos); |
| } |
| } catch (BufferedChannelBase.BufferedChannelClosedException | AsynchronousCloseException e) { |
| throw new EntryLookupException.MissingLogFileException(ledgerId, entryId, entryLogId, entrySizePos); |
| } |
| return sizeBuff; |
| } |
| |
| void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException { |
| long entryLogId = logIdForOffset(location); |
| long pos = posForOffset(location); |
| BufferedReadChannel fc = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos); |
| ByteBuf sizeBuf = readEntrySize(ledgerId, entryId, entryLogId, pos, fc); |
| validateEntry(ledgerId, entryId, entryLogId, pos, sizeBuf); |
| } |
| |
| private void validateEntry(long ledgerId, long entryId, long entryLogId, long pos, ByteBuf sizeBuff) |
| throws IOException, EntryLookupException { |
| int entrySize = sizeBuff.readInt(); |
| |
| // entrySize does not include the ledgerId |
| if (entrySize > maxSaneEntrySize) { |
| LOG.warn("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in " |
| + entryLogId); |
| } |
| if (entrySize < MIN_SANE_ENTRY_SIZE) { |
| LOG.error("Read invalid entry length {}", entrySize); |
| throw new EntryLookupException.InvalidEntryLengthException(ledgerId, entryId, entryLogId, pos); |
| } |
| |
| long thisLedgerId = sizeBuff.getLong(4); |
| long thisEntryId = sizeBuff.getLong(12); |
| if (thisLedgerId != ledgerId || thisEntryId != entryId) { |
| throw new EntryLookupException.WrongEntryException( |
| thisEntryId, thisLedgerId, ledgerId, entryId, entryLogId, pos); |
| } |
| } |
| |
| @Override |
| public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation) |
| throws IOException, Bookie.NoEntryException { |
| return internalReadEntry(ledgerId, entryId, entryLocation, true /* validateEntry */); |
| } |
| |
| @Override |
| public ByteBuf readEntry(long location) throws IOException, Bookie.NoEntryException { |
| return internalReadEntry(location, -1L, -1L, false /* validateEntry */); |
| } |
| |
| |
| private ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry) |
| throws IOException, Bookie.NoEntryException { |
| long entryLogId = logIdForOffset(location); |
| long pos = posForOffset(location); |
| |
| |
| BufferedReadChannel fc = null; |
| int entrySize = -1; |
| try { |
| fc = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos); |
| |
| ByteBuf sizeBuff = readEntrySize(ledgerId, entryId, entryLogId, pos, fc); |
| entrySize = sizeBuff.getInt(0); |
| if (validateEntry) { |
| validateEntry(ledgerId, entryId, entryLogId, pos, sizeBuff); |
| } |
| } catch (EntryLookupException e) { |
| throw new IOException("Bad entry read from log file id: " + entryLogId, e); |
| } |
| |
| ByteBuf data = allocator.buffer(entrySize, entrySize); |
| int rc = readFromLogChannel(entryLogId, fc, data, pos); |
| if (rc != entrySize) { |
| data.release(); |
| throw new IOException("Bad entry read from log file id: " + entryLogId, |
| new EntryLookupException("Short read for " + ledgerId + "@" |
| + entryId + " in " + entryLogId + "@" |
| + pos + "(" + rc + "!=" + entrySize + ")")); |
| } |
| data.writerIndex(entrySize); |
| |
| return data; |
| } |
| |
| /** |
| * Read the header of an entry log. |
| */ |
| private Header getHeaderForLogId(long entryLogId) throws IOException { |
| BufferedReadChannel bc = getChannelForLogId(entryLogId); |
| |
| // Allocate buffer to read (version, ledgersMapOffset, ledgerCount) |
| ByteBuf headers = allocator.directBuffer(LOGFILE_HEADER_SIZE); |
| try { |
| bc.read(headers, 0); |
| |
| // Skip marker string "BKLO" |
| headers.readInt(); |
| |
| int headerVersion = headers.readInt(); |
| if (headerVersion < HEADER_V0 || headerVersion > HEADER_CURRENT_VERSION) { |
| LOG.info("Unknown entry log header version for log {}: {}", entryLogId, headerVersion); |
| } |
| |
| long ledgersMapOffset = headers.readLong(); |
| int ledgersCount = headers.readInt(); |
| return new Header(headerVersion, ledgersMapOffset, ledgersCount); |
| } finally { |
| headers.release(); |
| } |
| } |
| |
| private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException { |
| BufferedReadChannel fc = getFromChannels(entryLogId); |
| if (fc != null) { |
| return fc; |
| } |
| File file = findFile(entryLogId); |
| // get channel is used to open an existing entry log file |
| // it would be better to open using read mode |
| FileChannel newFc = new RandomAccessFile(file, "r").getChannel(); |
| FileChannel oldFc = logid2FileChannel.putIfAbsent(entryLogId, newFc); |
| if (null != oldFc) { |
| newFc.close(); |
| newFc = oldFc; |
| } |
| // We set the position of the write buffer of this buffered channel to Long.MAX_VALUE |
| // so that there are no overlaps with the write buffer while reading |
| fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes()); |
| putInReadChannels(entryLogId, fc); |
| return fc; |
| } |
| |
| /** |
| * Whether the log file exists or not. |
| */ |
| @Override |
| public boolean logExists(long logId) { |
| for (File d : ledgerDirsManager.getAllLedgerDirs()) { |
| File f = new File(d, Long.toHexString(logId) + ".log"); |
| if (f.exists()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Returns a set with the ids of all the entry log files. |
| * |
| * @throws IOException |
| */ |
| public Set<Long> getEntryLogsSet() throws IOException { |
| Set<Long> entryLogs = Sets.newTreeSet(); |
| |
| final FilenameFilter logFileFilter = new FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| return name.endsWith(".log"); |
| } |
| }; |
| |
| for (File d : ledgerDirsManager.getAllLedgerDirs()) { |
| File[] files = d.listFiles(logFileFilter); |
| if (files == null) { |
| throw new IOException("Failed to get list of files in directory " + d); |
| } |
| |
| for (File f : files) { |
| Long entryLogId = Long.parseLong(f.getName().split(".log")[0], 16); |
| entryLogs.add(entryLogId); |
| } |
| } |
| return entryLogs; |
| } |
| |
| private File findFile(long logId) throws FileNotFoundException { |
| for (File d : ledgerDirsManager.getAllLedgerDirs()) { |
| File f = new File(d, Long.toHexString(logId) + ".log"); |
| if (f.exists()) { |
| return f; |
| } |
| } |
| throw new FileNotFoundException("No file for log " + Long.toHexString(logId)); |
| } |
| |
| /** |
| * Scan entry log. |
| * |
| * @param entryLogId Entry Log Id |
| * @param scanner Entry Log Scanner |
| * @throws IOException |
| */ |
| @Override |
| public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException { |
| // Buffer where to read the entrySize (4 bytes) and the ledgerId (8 bytes) |
| ByteBuf headerBuffer = Unpooled.buffer(4 + 8); |
| BufferedReadChannel bc; |
| // Get the BufferedChannel for the current entry log file |
| try { |
| bc = getChannelForLogId(entryLogId); |
| } catch (IOException e) { |
| LOG.warn("Failed to get channel to scan entry log: " + entryLogId + ".log"); |
| throw e; |
| } |
| // Start the read position in the current entry log file to be after |
| // the header where all of the ledger entries are. |
| long pos = LOGFILE_HEADER_SIZE; |
| |
| // Start with a reasonably sized buffer size |
| ByteBuf data = allocator.directBuffer(1024 * 1024); |
| |
| try { |
| |
| // Read through the entry log file and extract the ledger ID's. |
| while (true) { |
| // Check if we've finished reading the entry log file. |
| if (pos >= bc.size()) { |
| break; |
| } |
| if (readFromLogChannel(entryLogId, bc, headerBuffer, pos) != headerBuffer.capacity()) { |
| LOG.warn("Short read for entry size from entrylog {}", entryLogId); |
| return; |
| } |
| long offset = pos; |
| |
| int entrySize = headerBuffer.readInt(); |
| if (entrySize <= 0) { // hitting padding |
| pos++; |
| headerBuffer.clear(); |
| continue; |
| } |
| long ledgerId = headerBuffer.readLong(); |
| headerBuffer.clear(); |
| |
| pos += 4; |
| if (ledgerId == INVALID_LID || !scanner.accept(ledgerId)) { |
| // skip this entry |
| pos += entrySize; |
| continue; |
| } |
| // read the entry |
| data.clear(); |
| data.capacity(entrySize); |
| int rc = readFromLogChannel(entryLogId, bc, data, pos); |
| if (rc != entrySize) { |
| LOG.warn("Short read for ledger entry from entryLog {}@{} ({} != {})", |
| entryLogId, pos, rc, entrySize); |
| return; |
| } |
| // process the entry |
| scanner.process(ledgerId, offset, data); |
| |
| // Advance position to the next entry |
| pos += entrySize; |
| } |
| } finally { |
| data.release(); |
| } |
| } |
| |
| public EntryLogMetadata getEntryLogMetadata(long entryLogId, AbstractLogCompactor.Throttler throttler) |
| throws IOException { |
| // First try to extract the EntryLogMetadata from the index, if there's no index then fallback to scanning the |
| // entry log |
| try { |
| return extractEntryLogMetadataFromIndex(entryLogId); |
| } catch (Exception e) { |
| LOG.info("Failed to get ledgers map index from: {}.log : {}", entryLogId, e.getMessage()); |
| |
| // Fall-back to scanning |
| return extractEntryLogMetadataByScanning(entryLogId, throttler); |
| } |
| } |
| |
| EntryLogMetadata extractEntryLogMetadataFromIndex(long entryLogId) throws IOException { |
| Header header = getHeaderForLogId(entryLogId); |
| |
| if (header.version < HEADER_V1) { |
| throw new IOException("Old log file header without ledgers map on entryLogId " + entryLogId); |
| } |
| |
| if (header.ledgersMapOffset == 0L) { |
| // The index was not stored in the log file (possibly because the bookie crashed before flushing it) |
| throw new IOException("No ledgers map index found on entryLogId " + entryLogId); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Recovering ledgers maps for log {} at offset: {}", entryLogId, header.ledgersMapOffset); |
| } |
| |
| BufferedReadChannel bc = getChannelForLogId(entryLogId); |
| |
| // There can be multiple entries containing the various components of the serialized ledgers map |
| long offset = header.ledgersMapOffset; |
| EntryLogMetadata meta = new EntryLogMetadata(entryLogId); |
| |
| final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; |
| ByteBuf ledgersMap = allocator.directBuffer(maxMapSize); |
| |
| try { |
| while (offset < bc.size()) { |
| // Read ledgers map size |
| sizeBuffer.get().clear(); |
| bc.read(sizeBuffer.get(), offset); |
| |
| int ledgersMapSize = sizeBuffer.get().readInt(); |
| if (ledgersMapSize <= 0) { |
| break; |
| } |
| // Read the index into a buffer |
| ledgersMap.clear(); |
| bc.read(ledgersMap, offset + 4, ledgersMapSize); |
| |
| // Discard ledgerId and entryId |
| long lid = ledgersMap.readLong(); |
| if (lid != INVALID_LID) { |
| throw new IOException("Cannot deserialize ledgers map from ledger " + lid); |
| } |
| |
| long entryId = ledgersMap.readLong(); |
| if (entryId != LEDGERS_MAP_ENTRY_ID) { |
| throw new IOException("Cannot deserialize ledgers map from entryId " + entryId); |
| } |
| |
| // Read the number of ledgers in the current entry batch |
| int ledgersCount = ledgersMap.readInt(); |
| |
| // Extract all (ledger,size) tuples from buffer |
| for (int i = 0; i < ledgersCount; i++) { |
| long ledgerId = ledgersMap.readLong(); |
| long size = ledgersMap.readLong(); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Recovering ledgers maps for log {} -- Found ledger: {} with size: {}", |
| entryLogId, ledgerId, size); |
| } |
| meta.addLedgerSize(ledgerId, size); |
| } |
| if (ledgersMap.isReadable()) { |
| throw new IOException("Invalid entry size when reading ledgers map"); |
| } |
| |
| // Move to next entry, if any |
| offset += ledgersMapSize + 4; |
| } |
| } catch (IndexOutOfBoundsException e) { |
| throw new IOException(e); |
| } finally { |
| ledgersMap.release(); |
| } |
| |
| if (meta.getLedgersMap().size() != header.ledgersCount) { |
| throw new IOException("Not all ledgers were found in ledgers map index. expected: " + header.ledgersCount |
| + " -- found: " + meta.getLedgersMap().size() + " -- entryLogId: " + entryLogId); |
| } |
| |
| return meta; |
| } |
| |
| private EntryLogMetadata extractEntryLogMetadataByScanning(long entryLogId, |
| AbstractLogCompactor.Throttler throttler) |
| throws IOException { |
| final EntryLogMetadata meta = new EntryLogMetadata(entryLogId); |
| |
| // Read through the entry log file and extract the entry log meta |
| scanEntryLog(entryLogId, new EntryLogScanner() { |
| @Override |
| public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { |
| if (throttler != null) { |
| throttler.acquire(entry.readableBytes()); |
| } |
| // add new entry size of a ledger to entry log meta |
| meta.addLedgerSize(ledgerId, entry.readableBytes() + 4); |
| } |
| |
| @Override |
| public boolean accept(long ledgerId) { |
| return ledgerId >= 0; |
| } |
| }); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Retrieved entry log meta data entryLogId: {}, meta: {}", entryLogId, meta); |
| } |
| return meta; |
| } |
| |
| /** |
| * Shutdown method to gracefully stop entry logger. |
| */ |
| @Override |
| public void close() { |
| // since logChannel is buffered channel, do flush when shutting down |
| LOG.info("Stopping EntryLogger"); |
| try { |
| flush(); |
| for (FileChannel fc : logid2FileChannel.values()) { |
| fc.close(); |
| } |
| // clear the mapping, so we don't need to go through the channels again in finally block in normal case. |
| logid2FileChannel.clear(); |
| entryLogManager.close(); |
| synchronized (compactionLogLock) { |
| if (compactionLogChannel != null) { |
| compactionLogChannel.close(); |
| compactionLogChannel = null; |
| } |
| } |
| } catch (IOException ie) { |
| // we have no idea how to avoid io exception during shutting down, so just ignore it |
| LOG.error("Error flush entry log during shutting down, which may cause entry log corrupted.", ie); |
| } finally { |
| for (FileChannel fc : logid2FileChannel.values()) { |
| IOUtils.close(LOG, fc); |
| } |
| |
| entryLogManager.forceClose(); |
| synchronized (compactionLogLock) { |
| IOUtils.close(LOG, compactionLogChannel); |
| } |
| } |
| // shutdown the pre-allocation thread |
| entryLoggerAllocator.stop(); |
| } |
| |
| protected LedgerDirsManager getLedgerDirsManager() { |
| return ledgerDirsManager; |
| } |
| |
| /** |
| * Convert log filename (hex format with suffix) to logId in long. |
| */ |
| static long fileName2LogId(String fileName) { |
| if (fileName != null && fileName.contains(".")) { |
| fileName = fileName.split("\\.")[0]; |
| } |
| try { |
| return Long.parseLong(fileName, 16); |
| } catch (Exception nfe) { |
| LOG.error("Invalid log file name {} found when trying to convert to logId.", fileName, nfe); |
| } |
| return INVALID_LID; |
| } |
| |
| /** |
| * Convert log Id to hex string. |
| */ |
| static String logId2HexString(long logId) { |
| return Long.toHexString(logId); |
| } |
| |
| /** |
| * Datastructure which maintains the status of logchannels. When a |
| * logChannel is created entry of < entryLogId, false > will be made to this |
| * sortedmap and when logChannel is rotated and flushed then the entry is |
| * updated to < entryLogId, true > and all the lowest entries with |
| * < entryLogId, true > status will be removed from the sortedmap. So that way |
| * we could get least unflushed LogId. |
| * |
| */ |
| static class RecentEntryLogsStatus { |
| private final SortedMap<Long, Boolean> entryLogsStatusMap; |
| private long leastUnflushedLogId; |
| |
| RecentEntryLogsStatus(long leastUnflushedLogId) { |
| entryLogsStatusMap = new TreeMap<>(); |
| this.leastUnflushedLogId = leastUnflushedLogId; |
| } |
| |
| synchronized void createdEntryLog(Long entryLogId) { |
| entryLogsStatusMap.put(entryLogId, false); |
| } |
| |
| synchronized void flushRotatedEntryLog(Long entryLogId) { |
| entryLogsStatusMap.replace(entryLogId, true); |
| while ((!entryLogsStatusMap.isEmpty()) && (entryLogsStatusMap.get(entryLogsStatusMap.firstKey()))) { |
| long leastFlushedLogId = entryLogsStatusMap.firstKey(); |
| entryLogsStatusMap.remove(leastFlushedLogId); |
| leastUnflushedLogId = leastFlushedLogId + 1; |
| } |
| } |
| |
| synchronized long getLeastUnflushedLogId() { |
| return leastUnflushedLogId; |
| } |
| |
| synchronized boolean isFlushedLogId(long entryLogId) { |
| return entryLogsStatusMap.getOrDefault(entryLogId, Boolean.FALSE) || entryLogId < leastUnflushedLogId; |
| } |
| } |
| |
| @Override |
| public CompactionEntryLog newCompactionLog(long logToCompact) throws IOException { |
| createNewCompactionLog(); |
| |
| File compactingLogFile = getCurCompactionLogFile(); |
| long compactionLogId = fileName2LogId(compactingLogFile.getName()); |
| File compactedLogFile = compactedLogFileFromCompacting(compactingLogFile, logToCompact); |
| File finalLogFile = new File(compactingLogFile.getParentFile(), |
| compactingLogFile.getName().substring(0, |
| compactingLogFile.getName().indexOf(".log") + 4)); |
| return new EntryLoggerCompactionEntryLog( |
| compactionLogId, logToCompact, compactingLogFile, compactedLogFile, finalLogFile); |
| |
| } |
| |
| private class EntryLoggerCompactionEntryLog implements CompactionEntryLog { |
| private final long compactionLogId; |
| private final long logIdToCompact; |
| private final File compactingLogFile; |
| private final File compactedLogFile; |
| private final File finalLogFile; |
| |
| EntryLoggerCompactionEntryLog(long compactionLogId, long logIdToCompact, |
| File compactingLogFile, |
| File compactedLogFile, |
| File finalLogFile) { |
| this.compactionLogId = compactionLogId; |
| this.logIdToCompact = logIdToCompact; |
| this.compactingLogFile = compactingLogFile; |
| this.compactedLogFile = compactedLogFile; |
| this.finalLogFile = finalLogFile; |
| } |
| |
| @Override |
| public long addEntry(long ledgerId, ByteBuf entry) throws IOException { |
| return addEntryForCompaction(ledgerId, entry); |
| } |
| @Override |
| public void scan(EntryLogScanner scanner) throws IOException { |
| scanEntryLog(compactionLogId, scanner); |
| } |
| @Override |
| public void flush() throws IOException { |
| flushCompactionLog(); |
| } |
| @Override |
| public void abort() { |
| removeCurCompactionLog(); |
| if (compactedLogFile.exists()) { |
| if (!compactedLogFile.delete()) { |
| LOG.warn("Could not delete file: {}", compactedLogFile); |
| } |
| } |
| } |
| |
| @Override |
| public void markCompacted() throws IOException { |
| if (compactingLogFile.exists()) { |
| if (!compactedLogFile.exists()) { |
| HardLink.createHardLink(compactingLogFile, compactedLogFile); |
| } |
| } else { |
| throw new IOException("Compaction log doesn't exist any more after flush: " + compactingLogFile); |
| } |
| removeCurCompactionLog(); |
| } |
| |
| @Override |
| public void makeAvailable() throws IOException { |
| if (!finalLogFile.exists()) { |
| HardLink.createHardLink(compactedLogFile, finalLogFile); |
| } |
| } |
| @Override |
| public void finalizeAndCleanup() { |
| if (compactedLogFile.exists()) { |
| if (!compactedLogFile.delete()) { |
| LOG.warn("Could not delete file: {}", compactedLogFile); |
| } |
| } |
| if (compactingLogFile.exists()) { |
| if (!compactingLogFile.delete()) { |
| LOG.warn("Could not delete file: {}", compactingLogFile); |
| } |
| } |
| } |
| |
| @Override |
| public long getDstLogId() { |
| return compactionLogId; |
| } |
| @Override |
| public long getSrcLogId() { |
| return logIdToCompact; |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(this) |
| .add("logId", compactionLogId) |
| .add("compactedLogId", logIdToCompact) |
| .add("compactingLogFile", compactingLogFile) |
| .add("compactedLogFile", compactedLogFile) |
| .add("finalLogFile", finalLogFile) |
| .toString(); |
| } |
| } |
| |
| @Override |
| public Collection<CompactionEntryLog> incompleteCompactionLogs() { |
| List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs(); |
| List<CompactionEntryLog> compactionLogs = new ArrayList<>(); |
| |
| for (File dir : ledgerDirs) { |
| File[] compactingPhaseFiles = dir.listFiles( |
| file -> file.getName().endsWith(TransactionalEntryLogCompactor.COMPACTING_SUFFIX)); |
| if (compactingPhaseFiles != null) { |
| for (File file : compactingPhaseFiles) { |
| if (file.delete()) { |
| LOG.info("Deleted failed compaction file {}", file); |
| } |
| } |
| } |
| File[] compactedPhaseFiles = dir.listFiles( |
| file -> file.getName().endsWith(TransactionalEntryLogCompactor.COMPACTED_SUFFIX)); |
| if (compactedPhaseFiles != null) { |
| for (File compactedFile : compactedPhaseFiles) { |
| LOG.info("Found compacted log file {} has partially flushed index, recovering index.", |
| compactedFile); |
| |
| File compactingLogFile = new File(compactedFile.getParentFile(), "doesntexist"); |
| long compactionLogId = -1L; |
| long compactedLogId = -1L; |
| String[] parts = compactedFile.getName().split(Pattern.quote(".")); |
| boolean valid = true; |
| if (parts.length != 4) { |
| valid = false; |
| } else { |
| try { |
| compactionLogId = Long.parseLong(parts[0], 16); |
| compactedLogId = Long.parseLong(parts[2], 16); |
| } catch (NumberFormatException nfe) { |
| valid = false; |
| } |
| } |
| |
| if (!valid) { |
| LOG.info("Invalid compacted file found ({}), deleting", compactedFile); |
| if (!compactedFile.delete()) { |
| LOG.warn("Couldn't delete invalid compacted file ({})", compactedFile); |
| } |
| continue; |
| } |
| File finalLogFile = new File(compactedFile.getParentFile(), compactionLogId + ".log"); |
| |
| compactionLogs.add( |
| new EntryLoggerCompactionEntryLog(compactionLogId, compactedLogId, |
| compactingLogFile, compactedFile, finalLogFile)); |
| } |
| } |
| } |
| return compactionLogs; |
| } |
| |
| private static File compactedLogFileFromCompacting(File compactionLogFile, long compactingLogId) { |
| File dir = compactionLogFile.getParentFile(); |
| String filename = compactionLogFile.getName(); |
| String newSuffix = ".log." + DefaultEntryLogger.logId2HexString(compactingLogId) |
| + TransactionalEntryLogCompactor.COMPACTED_SUFFIX; |
| String hardLinkFilename = filename.replace(TransactionalEntryLogCompactor.COMPACTING_SUFFIX, newSuffix); |
| return new File(dir, hardLinkFilename); |
| } |
| } |