| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| package org.apache.bookkeeper.bookie; |
| |
| import static com.google.common.base.Charsets.UTF_8; |
| import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.ByteBufAllocator; |
| import io.netty.buffer.Unpooled; |
| |
| import java.io.BufferedWriter; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStreamWriter; |
| import java.io.RandomAccessFile; |
| import java.nio.channels.FileChannel; |
| import java.util.List; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; |
| import org.apache.bookkeeper.conf.ServerConfiguration; |
| |
| /** |
| * An allocator pre-allocates entry log files. |
| */ |
| @Slf4j |
| class EntryLoggerAllocator { |
| |
| private long preallocatedLogId; |
| Future<BufferedLogChannel> preallocation = null; |
| ExecutorService allocatorExecutor; |
| private final ServerConfiguration conf; |
| private final LedgerDirsManager ledgerDirsManager; |
| private final Object createEntryLogLock = new Object(); |
| private final Object createCompactionLogLock = new Object(); |
| private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; |
| private final boolean entryLogPreAllocationEnabled; |
| private final ByteBufAllocator byteBufAllocator; |
| final ByteBuf logfileHeader = Unpooled.buffer(EntryLogger.LOGFILE_HEADER_SIZE); |
| |
| EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, |
| EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId, |
| ByteBufAllocator byteBufAllocator) { |
| this.conf = conf; |
| this.byteBufAllocator = byteBufAllocator; |
| this.ledgerDirsManager = ledgerDirsManager; |
| this.preallocatedLogId = logId; |
| this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus; |
| this.entryLogPreAllocationEnabled = conf.isEntryLogFilePreAllocationEnabled(); |
| this.allocatorExecutor = Executors.newSingleThreadExecutor(); |
| |
| // Initialize the entry log header buffer. This cannot be a static object |
| // since in our unit tests, we run multiple Bookies and thus EntryLoggers |
| // within the same JVM. All of these Bookie instances access this header |
| // so there can be race conditions when entry logs are rolled over and |
| // this header buffer is cleared before writing it into the new logChannel. |
| logfileHeader.writeBytes("BKLO".getBytes(UTF_8)); |
| logfileHeader.writeInt(EntryLogger.HEADER_CURRENT_VERSION); |
| logfileHeader.writerIndex(EntryLogger.LOGFILE_HEADER_SIZE); |
| |
| } |
| |
| synchronized long getPreallocatedLogId() { |
| return preallocatedLogId; |
| } |
| |
| BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException { |
| synchronized (createEntryLogLock) { |
| BufferedLogChannel bc; |
| if (!entryLogPreAllocationEnabled){ |
| // create a new log directly |
| bc = allocateNewLog(dirForNextEntryLog); |
| return bc; |
| } else { |
| // allocate directly to response request |
| if (null == preallocation){ |
| bc = allocateNewLog(dirForNextEntryLog); |
| } else { |
| // has a preallocated entry log |
| try { |
| bc = preallocation.get(); |
| } catch (ExecutionException ee) { |
| if (ee.getCause() instanceof IOException) { |
| throw (IOException) (ee.getCause()); |
| } else { |
| throw new IOException("Error to execute entry log allocation.", ee); |
| } |
| } catch (CancellationException ce) { |
| throw new IOException("Task to allocate a new entry log is cancelled.", ce); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie); |
| } |
| } |
| // preallocate a new log in background upon every call |
| preallocation = allocatorExecutor.submit(() -> allocateNewLog(dirForNextEntryLog)); |
| return bc; |
| } |
| } |
| } |
| |
| BufferedLogChannel createNewLogForCompaction(File dirForNextEntryLog) throws IOException { |
| synchronized (createCompactionLogLock) { |
| return allocateNewLog(dirForNextEntryLog, COMPACTING_SUFFIX); |
| } |
| } |
| |
| private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog) throws IOException { |
| return allocateNewLog(dirForNextEntryLog, ".log"); |
| } |
| |
| /** |
| * Allocate a new log file. |
| */ |
| private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog, String suffix) throws IOException { |
| List<File> ledgersDirs = ledgerDirsManager.getAllLedgerDirs(); |
| String logFileName; |
| // It would better not to overwrite existing entry log files |
| File testLogFile = null; |
| do { |
| if (preallocatedLogId >= Integer.MAX_VALUE) { |
| preallocatedLogId = 0; |
| } else { |
| ++preallocatedLogId; |
| } |
| logFileName = Long.toHexString(preallocatedLogId) + suffix; |
| for (File dir : ledgersDirs) { |
| testLogFile = new File(dir, logFileName); |
| if (testLogFile.exists()) { |
| log.warn("Found existed entry log " + testLogFile |
| + " when trying to create it as a new log."); |
| testLogFile = null; |
| break; |
| } |
| } |
| } while (testLogFile == null); |
| |
| File newLogFile = new File(dirForNextEntryLog, logFileName); |
| FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel(); |
| |
| BufferedLogChannel logChannel = new BufferedLogChannel(byteBufAllocator, channel, conf.getWriteBufferBytes(), |
| conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes()); |
| logfileHeader.readerIndex(0); |
| logChannel.write(logfileHeader); |
| |
| for (File f : ledgersDirs) { |
| setLastLogId(f, preallocatedLogId); |
| } |
| |
| if (suffix.equals(EntryLogger.LOG_FILE_SUFFIX)) { |
| recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId); |
| } |
| |
| log.info("Created new entry log file {} for logId {}.", newLogFile, preallocatedLogId); |
| return logChannel; |
| } |
| |
| /** |
| * writes the given id to the "lastId" file in the given directory. |
| */ |
| private void setLastLogId(File dir, long logId) throws IOException { |
| FileOutputStream fos; |
| fos = new FileOutputStream(new File(dir, "lastId")); |
| BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos, UTF_8)); |
| try { |
| bw.write(Long.toHexString(logId) + "\n"); |
| bw.flush(); |
| } catch (IOException e) { |
| log.warn("Failed write lastId file"); |
| } finally { |
| try { |
| bw.close(); |
| } catch (IOException e) { |
| log.error("Could not close lastId file in {}", dir.getPath()); |
| } |
| } |
| } |
| |
| /** |
| * Stop the allocator. |
| */ |
| void stop() { |
| // wait until the preallocation finished. |
| allocatorExecutor.shutdown(); |
| try { |
| if (!allocatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) { |
| log.warn("Timedout while awaiting for allocatorExecutor's termination, so force shuttingdown"); |
| } |
| } catch (InterruptedException e) { |
| log.warn("Got InterruptedException while awaiting termination of allocatorExecutor, so force shuttingdown"); |
| Thread.currentThread().interrupt(); |
| } |
| allocatorExecutor.shutdownNow(); |
| |
| log.info("Stopped entry logger preallocator."); |
| } |
| |
| /** |
| * get the preallocation for tests. |
| */ |
| Future<BufferedLogChannel> getPreallocationFuture(){ |
| return preallocation; |
| } |
| } |