| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.regionserver.wal; |
| |
| import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; |
| import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION; |
| import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; |
| import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; |
| import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL; |
| import static org.apache.hadoop.hbase.util.FutureUtils.addListener; |
| import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; |
| import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; |
| |
| import com.lmax.disruptor.RingBuffer; |
| import com.lmax.disruptor.Sequence; |
| import com.lmax.disruptor.Sequencer; |
| import io.opentelemetry.api.trace.Span; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.lang.management.MemoryType; |
| import java.net.URLEncoder; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayDeque; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Comparator; |
| import java.util.Deque; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.OptionalLong; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.TreeSet; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentNavigableMap; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.function.Supplier; |
| import org.apache.commons.lang3.mutable.MutableLong; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathFilter; |
| import org.apache.hadoop.hbase.Abortable; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.PrivateCellUtil; |
| import org.apache.hadoop.hbase.client.ConnectionUtils; |
| import org.apache.hadoop.hbase.client.RegionInfo; |
| import org.apache.hadoop.hbase.exceptions.TimeoutIOException; |
| import org.apache.hadoop.hbase.io.util.MemorySizeUtil; |
| import org.apache.hadoop.hbase.ipc.RpcServer; |
| import org.apache.hadoop.hbase.ipc.ServerCall; |
| import org.apache.hadoop.hbase.log.HBaseMarkers; |
| import org.apache.hadoop.hbase.regionserver.HRegion; |
| import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; |
| import org.apache.hadoop.hbase.trace.TraceUtil; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.CommonFSUtils; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; |
| import org.apache.hadoop.hbase.wal.WAL; |
| import org.apache.hadoop.hbase.wal.WALEdit; |
| import org.apache.hadoop.hbase.wal.WALFactory; |
| import org.apache.hadoop.hbase.wal.WALKeyImpl; |
| import org.apache.hadoop.hbase.wal.WALPrettyPrinter; |
| import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; |
| import org.apache.hadoop.hbase.wal.WALSplitter; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; |
| import org.apache.hbase.thirdparty.com.google.common.io.Closeables; |
| import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| /** |
| * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one |
| * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. |
| * This is done internal to the implementation. |
| * <p> |
| * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a |
| * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. |
| * A bunch of work in the below is done keeping account of these region sequence ids -- what is |
| * flushed out to hfiles, and what is yet in WAL and in memory only. |
| * <p> |
| * It is only practical to delete entire files. Thus, we delete an entire on-disk file |
| * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older |
| * (smaller) than the most-recent flush. |
| * <p> |
| * To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read, |
| * call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for |
| * replication where we may want to tail the active WAL file. |
| * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL |
| * is now a lame duck; any more appends or syncs will fail also with the same original exception. If |
| * we have made successful appends to the WAL and we then are unable to sync them, our current |
| * semantic is to return error to the client that the appends failed but also to abort the current |
| * context, usually the hosting server. We need to replay the WALs. <br> |
| * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client |
| * that the append failed. <br> |
| * TODO: replication may pick up these last edits though they have been marked as failed append |
| * (Need to keep our own file lengths, not rely on HDFS). |
| */ |
| @InterfaceAudience.Private |
| public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { |
| private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class); |
| |
| private static final Comparator<SyncFuture> SEQ_COMPARATOR = |
| Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode); |
| |
| private static final String SURVIVED_TOO_LONG_SEC_KEY = "hbase.regionserver.wal.too.old.sec"; |
| private static final int SURVIVED_TOO_LONG_SEC_DEFAULT = 900; |
| /** Don't log blocking regions more frequently than this. */ |
| private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5); |
| |
| protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms"; |
| protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms |
| protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms"; |
| protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms |
| protected static final String SLOW_SYNC_ROLL_THRESHOLD = |
| "hbase.regionserver.wal.slowsync.roll.threshold"; |
| protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings |
| protected static final String SLOW_SYNC_ROLL_INTERVAL_MS = |
| "hbase.regionserver.wal.slowsync.roll.interval.ms"; |
| protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute |
| |
| public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout"; |
| protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min |
| |
| public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier"; |
| |
| public static final String MAX_LOGS = "hbase.regionserver.maxlogs"; |
| |
| public static final String RING_BUFFER_SLOT_COUNT = |
| "hbase.regionserver.wal.disruptor.event.count"; |
| |
| public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms"; |
| public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000; |
| |
| public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; |
| public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; |
| |
| public static final String WAL_AVOID_LOCAL_WRITES_KEY = |
| "hbase.regionserver.wal.avoid-local-writes"; |
| public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false; |
| |
| /** |
| * file system instance |
| */ |
| protected final FileSystem fs; |
| |
| /** |
| * WAL directory, where all WAL files would be placed. |
| */ |
| protected final Path walDir; |
| |
| private final FileSystem remoteFs; |
| |
| private final Path remoteWALDir; |
| |
| /** |
| * dir path where old logs are kept. |
| */ |
| protected final Path walArchiveDir; |
| |
| /** |
| * Matches just those wal files that belong to this wal instance. |
| */ |
| protected final PathFilter ourFiles; |
| |
| /** |
| * Prefix of a WAL file, usually the region server name it is hosted on. |
| */ |
| protected final String walFilePrefix; |
| |
| /** |
| * Suffix included on generated wal file names |
| */ |
| protected final String walFileSuffix; |
| |
| /** |
| * Prefix used when checking for wal membership. |
| */ |
| protected final String prefixPathStr; |
| |
| protected final WALCoprocessorHost coprocessorHost; |
| |
| /** |
| * conf object |
| */ |
| protected final Configuration conf; |
| |
| protected final Abortable abortable; |
| |
| /** Listeners that are called on WAL events. */ |
| protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>(); |
| |
| /** Tracks the logs in the process of being closed. */ |
| protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>(); |
| |
| /** |
| * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence |
| * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has |
| * facility for answering questions such as "Is it safe to GC a WAL?". |
| */ |
| protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); |
| |
| /** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */ |
| protected final long slowSyncNs, rollOnSyncNs; |
| protected final int slowSyncRollThreshold; |
| protected final int slowSyncCheckInterval; |
| protected final AtomicInteger slowSyncCount = new AtomicInteger(); |
| |
| private final long walSyncTimeoutNs; |
| |
| private final long walTooOldNs; |
| |
| // If > than this size, roll the log. |
| protected final long logrollsize; |
| |
| /** |
| * Block size to use writing files. |
| */ |
| protected final long blocksize; |
| |
| /* |
| * If more than this many logs, force flush of oldest region to the oldest edit goes to disk. If |
| * too many and we crash, then will take forever replaying. Keep the number of logs tidy. |
| */ |
| protected final int maxLogs; |
| |
| protected final boolean useHsync; |
| |
| /** |
| * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock |
| * is held. We don't just use synchronized because that results in bogus and tedious findbugs |
| * warning when it thinks synchronized controls writer thread safety. It is held when we are |
| * actually rolling the log. It is checked when we are looking to see if we should roll the log or |
| * not. |
| */ |
| protected final ReentrantLock rollWriterLock = new ReentrantLock(true); |
| |
| // The timestamp (in ms) when the log file was created. |
| protected final AtomicLong filenum = new AtomicLong(-1); |
| |
| // Number of transactions in the current Wal. |
| protected final AtomicInteger numEntries = new AtomicInteger(0); |
| |
| /** |
| * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass |
| * WALEdit to background consumer thread, and the transaction id is the sequence number of the |
| * corresponding entry in queue. |
| */ |
| protected volatile long highestUnsyncedTxid = -1; |
| |
| /** |
| * Updated to the transaction id of the last successful sync call. This can be less than |
| * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in |
| * for it. |
| */ |
| protected final AtomicLong highestSyncedTxid = new AtomicLong(0); |
| |
| /** |
| * The total size of wal |
| */ |
| protected final AtomicLong totalLogSize = new AtomicLong(0); |
| /** |
| * Current log file. |
| */ |
| volatile W writer; |
| |
| // Last time to check low replication on hlog's pipeline |
| private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); |
| |
| // Last time we asked to roll the log due to a slow sync |
| private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); |
| |
| protected volatile boolean closed = false; |
| |
| protected final AtomicBoolean shutdown = new AtomicBoolean(false); |
| |
| protected final long walShutdownTimeout; |
| |
| private long nextLogTooOldNs = System.nanoTime(); |
| |
| /** |
| * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws |
| * an IllegalArgumentException if used to compare paths from different wals. |
| */ |
| final Comparator<Path> LOG_NAME_COMPARATOR = |
| (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); |
| |
| private static final class WALProps { |
| |
| /** |
| * Map the encoded region name to the highest sequence id. |
| * <p/> |
| * Contains all the regions it has an entry for. |
| */ |
| private final Map<byte[], Long> encodedName2HighestSequenceId; |
| |
| /** |
| * The log file size. Notice that the size may not be accurate if we do asynchronous close in |
| * subclasses. |
| */ |
| private final long logSize; |
| |
| /** |
| * The nanoTime of the log rolling, used to determine the time interval that has passed since. |
| */ |
| private final long rollTimeNs; |
| |
| /** |
| * If we do asynchronous close in subclasses, it is possible that when adding WALProps to the |
| * rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file, |
| * for safety. |
| */ |
| private volatile boolean closed = false; |
| |
| WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) { |
| this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; |
| this.logSize = logSize; |
| this.rollTimeNs = System.nanoTime(); |
| } |
| } |
| |
| /** |
| * Map of WAL log file to properties. The map is sorted by the log file creation timestamp |
| * (contained in the log file name). |
| */ |
| protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props = |
| new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); |
| |
| /** |
| * A cache of sync futures reused by threads. |
| */ |
| protected final SyncFutureCache syncFutureCache; |
| |
| /** |
| * The class name of the runtime implementation, used as prefix for logging/tracing. |
| * <p> |
| * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here, |
| * refer to HBASE-17676 for more details |
| * </p> |
| */ |
| protected final String implClassName; |
| |
| protected final AtomicBoolean rollRequested = new AtomicBoolean(false); |
| |
| protected final ExecutorService closeExecutor = Executors.newCachedThreadPool( |
| new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); |
| |
| // Run in caller if we get reject execution exception, to avoid aborting region server when we get |
| // reject execution exception. Usually this should not happen but let's make it more robust. |
| private final ExecutorService logArchiveExecutor = |
| new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), |
| new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(), |
| new ThreadPoolExecutor.CallerRunsPolicy()); |
| |
| private final int archiveRetries; |
| |
| protected ExecutorService consumeExecutor; |
| |
| private final Lock consumeLock = new ReentrantLock(); |
| |
| protected final Runnable consumer = this::consume; |
| |
| // check if there is already a consumer task in the event loop's task queue |
| protected Supplier<Boolean> hasConsumerTask; |
| |
| private static final int MAX_EPOCH = 0x3FFFFFFF; |
| // the lowest bit is waitingRoll, which means new writer is created, and we are waiting for old |
| // writer to be closed. |
| // the second-lowest bit is writerBroken which means the current writer is broken and rollWriter |
| // is needed. |
| // all other bits are the epoch number of the current writer, this is used to detect whether the |
| // writer is still the one when you issue the sync. |
| // notice that, modification to this field is only allowed under the protection of consumeLock. |
| private volatile int epochAndState; |
| |
| private boolean readyForRolling; |
| |
| private final Condition readyForRollingCond = consumeLock.newCondition(); |
| |
| private final RingBuffer<RingBufferTruck> waitingConsumePayloads; |
| |
| private final Sequence waitingConsumePayloadsGatingSequence; |
| |
| private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); |
| |
| private final long batchSize; |
| |
| protected final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>(); |
| |
| protected final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>(); |
| |
| protected final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR); |
| |
| // the highest txid of WAL entries being processed |
| protected long highestProcessedAppendTxid; |
| |
| // file length when we issue last sync request on the writer |
| private long fileLengthAtLastSync; |
| |
| private long highestProcessedAppendTxidAtLastSync; |
| |
| private int waitOnShutdownInSeconds; |
| |
| private String waitOnShutdownInSecondsConfigKey; |
| |
| protected boolean shouldShutDownConsumeExecutorWhenClose = true; |
| |
| private volatile boolean skipRemoteWAL = false; |
| |
| private volatile boolean markerEditOnly = false; |
| |
| public long getFilenum() { |
| return this.filenum.get(); |
| } |
| |
| /** |
| * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper |
| * method returns the creation timestamp from a given log file. It extracts the timestamp assuming |
| * the filename is created with the {@link #computeFilename(long filenum)} method. |
| * @return timestamp, as in the log file name. |
| */ |
| protected long getFileNumFromFileName(Path fileName) { |
| checkNotNull(fileName, "file name can't be null"); |
| if (!ourFiles.accept(fileName)) { |
| throw new IllegalArgumentException( |
| "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")"); |
| } |
| final String fileNameString = fileName.toString(); |
| String chompedPath = fileNameString.substring(prefixPathStr.length(), |
| (fileNameString.length() - walFileSuffix.length())); |
| return Long.parseLong(chompedPath); |
| } |
| |
| private int calculateMaxLogFiles(Configuration conf, long logRollSize) { |
| checkArgument(logRollSize > 0, |
| "The log roll size cannot be zero or negative when calculating max log files, " |
| + "current value is " + logRollSize); |
| Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf); |
| return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize); |
| } |
| |
| // must be power of 2 |
| protected final int getPreallocatedEventCount() { |
| // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will |
| // be stuck and make no progress if the buffer is filled with appends only and there is no |
| // sync. If no sync, then the handlers will be outstanding just waiting on sync completion |
| // before they return. |
| int preallocatedEventCount = this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16); |
| checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0"); |
| int floor = Integer.highestOneBit(preallocatedEventCount); |
| if (floor == preallocatedEventCount) { |
| return floor; |
| } |
| // max capacity is 1 << 30 |
| if (floor >= 1 << 29) { |
| return 1 << 30; |
| } |
| return floor << 1; |
| } |
| |
| protected final void setWaitOnShutdownInSeconds(int waitOnShutdownInSeconds, |
| String waitOnShutdownInSecondsConfigKey) { |
| this.waitOnShutdownInSeconds = waitOnShutdownInSeconds; |
| this.waitOnShutdownInSecondsConfigKey = waitOnShutdownInSecondsConfigKey; |
| } |
| |
| protected final void createSingleThreadPoolConsumeExecutor(String walType, final Path rootDir, |
| final String prefix) { |
| ThreadPoolExecutor threadPool = |
| new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), |
| new ThreadFactoryBuilder().setNameFormat(walType + "-%d-" + rootDir.toString() + "-prefix:" |
| + (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true).build()); |
| hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; |
| consumeExecutor = threadPool; |
| this.shouldShutDownConsumeExecutorWhenClose = true; |
| } |
| |
| protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir, |
| final String logDir, final String archiveDir, final Configuration conf, |
| final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, |
| final String suffix, FileSystem remoteFs, Path remoteWALDir) |
| throws FailedLogCloseException, IOException { |
| this.fs = fs; |
| this.walDir = new Path(rootDir, logDir); |
| this.walArchiveDir = new Path(rootDir, archiveDir); |
| this.conf = conf; |
| this.abortable = abortable; |
| this.remoteFs = remoteFs; |
| this.remoteWALDir = remoteWALDir; |
| |
| if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { |
| throw new IOException("Unable to mkdir " + walDir); |
| } |
| |
| if (!fs.exists(this.walArchiveDir)) { |
| if (!fs.mkdirs(this.walArchiveDir)) { |
| throw new IOException("Unable to mkdir " + this.walArchiveDir); |
| } |
| } |
| |
| // If prefix is null||empty then just name it wal |
| this.walFilePrefix = prefix == null || prefix.isEmpty() |
| ? "wal" |
| : URLEncoder.encode(prefix, StandardCharsets.UTF_8.name()); |
| // we only correctly differentiate suffices when numeric ones start with '.' |
| if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { |
| throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER |
| + "' but instead was '" + suffix + "'"); |
| } |
| // Now that it exists, set the storage policy for the entire directory of wal files related to |
| // this FSHLog instance |
| String storagePolicy = |
| conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); |
| CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy); |
| this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); |
| this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); |
| |
| this.ourFiles = new PathFilter() { |
| @Override |
| public boolean accept(final Path fileName) { |
| // The path should start with dir/<prefix> and end with our suffix |
| final String fileNameString = fileName.toString(); |
| if (!fileNameString.startsWith(prefixPathStr)) { |
| return false; |
| } |
| if (walFileSuffix.isEmpty()) { |
| // in the case of the null suffix, we need to ensure the filename ends with a timestamp. |
| return org.apache.commons.lang3.StringUtils |
| .isNumeric(fileNameString.substring(prefixPathStr.length())); |
| } else if (!fileNameString.endsWith(walFileSuffix)) { |
| return false; |
| } |
| return true; |
| } |
| }; |
| |
| if (failIfWALExists) { |
| final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles); |
| if (null != walFiles && 0 != walFiles.length) { |
| throw new IOException("Target WAL already exists within directory " + walDir); |
| } |
| } |
| |
| // Register listeners. TODO: Should this exist anymore? We have CPs? |
| if (listeners != null) { |
| for (WALActionsListener i : listeners) { |
| registerWALActionsListener(i); |
| } |
| } |
| this.coprocessorHost = new WALCoprocessorHost(this, conf); |
| |
| // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block |
| // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost |
| // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of |
| // the block size but experience from the field has it that this was not enough time for the |
| // roll to happen before end-of-block. So the new accounting makes WALs of about the same |
| // size as those made in hbase-1 (to prevent surprise), we now have default block size as |
| // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally |
| // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148. |
| this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir); |
| float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f); |
| this.logrollsize = (long) (this.blocksize * multiplier); |
| this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize))); |
| |
| LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" |
| + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" |
| + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir |
| + ", maxLogs=" + this.maxLogs); |
| this.slowSyncNs = |
| TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_TIME_MS)); |
| this.rollOnSyncNs = TimeUnit.MILLISECONDS |
| .toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS)); |
| this.slowSyncRollThreshold = |
| conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD); |
| this.slowSyncCheckInterval = |
| conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS); |
| this.walSyncTimeoutNs = |
| TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS)); |
| this.syncFutureCache = new SyncFutureCache(conf); |
| this.implClassName = getClass().getSimpleName(); |
| this.walTooOldNs = TimeUnit.SECONDS |
| .toNanos(conf.getInt(SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT)); |
| this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); |
| archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0); |
| this.walShutdownTimeout = |
| conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS); |
| |
| int preallocatedEventCount = |
| conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); |
| waitingConsumePayloads = |
| RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); |
| waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); |
| waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); |
| |
| // inrease the ringbuffer sequence so our txid is start from 1 |
| waitingConsumePayloads.publish(waitingConsumePayloads.next()); |
| waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); |
| |
| batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); |
| } |
| |
| /** |
| * Used to initialize the WAL. Usually just call rollWriter to create the first log writer. |
| */ |
| @Override |
| public void init() throws IOException { |
| rollWriter(); |
| } |
| |
| @Override |
| public void registerWALActionsListener(WALActionsListener listener) { |
| this.listeners.add(listener); |
| } |
| |
| @Override |
| public boolean unregisterWALActionsListener(WALActionsListener listener) { |
| return this.listeners.remove(listener); |
| } |
| |
| @Override |
| public WALCoprocessorHost getCoprocessorHost() { |
| return coprocessorHost; |
| } |
| |
| @Override |
| public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) { |
| return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); |
| } |
| |
| @Override |
| public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) { |
| return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); |
| } |
| |
| @Override |
| public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { |
| this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId); |
| } |
| |
| @Override |
| public void abortCacheFlush(byte[] encodedRegionName) { |
| this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); |
| } |
| |
| @Override |
| public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { |
| // Used by tests. Deprecated as too subtle for general usage. |
| return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); |
| } |
| |
| @Override |
| public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { |
| // This method is used by tests and for figuring if we should flush or not because our |
| // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use |
| // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId |
| // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the |
| // currently flushing sequence ids, and if anything found there, it is returning these. This is |
| // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if |
| // we crash during the flush. For figuring what to flush, we might get requeued if our sequence |
| // id is old even though we are currently flushing. This may mean we do too much flushing. |
| return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); |
| } |
| |
| @Override |
| public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException { |
| return rollWriter(false); |
| } |
| |
| @Override |
| public final void sync() throws IOException { |
| sync(useHsync); |
| } |
| |
| @Override |
| public final void sync(long txid) throws IOException { |
| sync(txid, useHsync); |
| } |
| |
| @Override |
| public final void sync(boolean forceSync) throws IOException { |
| TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync")); |
| } |
| |
| @Override |
| public final void sync(long txid, boolean forceSync) throws IOException { |
| TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync")); |
| } |
| |
| /** |
| * This is a convenience method that computes a new filename with a given file-number. |
| * @param filenum to use |
| */ |
| protected Path computeFilename(final long filenum) { |
| if (filenum < 0) { |
| throw new RuntimeException("WAL file number can't be < 0"); |
| } |
| String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix; |
| return new Path(walDir, child); |
| } |
| |
| /** |
| * This is a convenience method that computes a new filename with a given using the current WAL |
| * file-number |
| */ |
| public Path getCurrentFileName() { |
| return computeFilename(this.filenum.get()); |
| } |
| |
| /** |
| * retrieve the next path to use for writing. Increments the internal filenum. |
| */ |
| private Path getNewPath() throws IOException { |
| this.filenum.set(Math.max(getFilenum() + 1, EnvironmentEdgeManager.currentTime())); |
| Path newPath = getCurrentFileName(); |
| return newPath; |
| } |
| |
| public Path getOldPath() { |
| long currentFilenum = this.filenum.get(); |
| Path oldPath = null; |
| if (currentFilenum > 0) { |
| // ComputeFilename will take care of meta wal filename |
| oldPath = computeFilename(currentFilenum); |
| } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? |
| return oldPath; |
| } |
| |
| /** |
| * Tell listeners about pre log roll. |
| */ |
| private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) |
| throws IOException { |
| coprocessorHost.preWALRoll(oldPath, newPath); |
| |
| if (!this.listeners.isEmpty()) { |
| for (WALActionsListener i : this.listeners) { |
| i.preLogRoll(oldPath, newPath); |
| } |
| } |
| } |
| |
| /** |
| * Tell listeners about post log roll. |
| */ |
| private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) |
| throws IOException { |
| if (!this.listeners.isEmpty()) { |
| for (WALActionsListener i : this.listeners) { |
| i.postLogRoll(oldPath, newPath); |
| } |
| } |
| |
| coprocessorHost.postWALRoll(oldPath, newPath); |
| } |
| |
| // public only until class moves to o.a.h.h.wal |
| /** Returns the number of rolled log files */ |
| public int getNumRolledLogFiles() { |
| return walFile2Props.size(); |
| } |
| |
| // public only until class moves to o.a.h.h.wal |
| /** Returns the number of log files in use */ |
| public int getNumLogFiles() { |
| // +1 for current use log |
| return getNumRolledLogFiles() + 1; |
| } |
| |
| /** |
| * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, check the |
| * first (oldest) WAL, and return those regions which should be flushed so that it can be |
| * let-go/'archived'. |
| * @return stores of regions (encodedRegionNames) to flush in order to archive the oldest WAL file |
| */ |
| Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException { |
| Map<byte[], List<byte[]>> regions = null; |
| int logCount = getNumRolledLogFiles(); |
| if (logCount > this.maxLogs && logCount > 0) { |
| Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry(); |
| regions = |
| this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); |
| } |
| if (regions != null) { |
| List<String> listForPrint = new ArrayList<>(); |
| for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) { |
| StringBuilder families = new StringBuilder(); |
| for (int i = 0; i < r.getValue().size(); i++) { |
| if (i > 0) { |
| families.append(","); |
| } |
| families.append(Bytes.toString(r.getValue().get(i))); |
| } |
| listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]"); |
| } |
| LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs |
| + "; forcing (partial) flush of " + regions.size() + " region(s): " |
| + StringUtils.join(",", listForPrint)); |
| } |
| return regions; |
| } |
| |
| /** |
| * Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file. |
| */ |
| private void markClosedAndClean(Path path) { |
| WALProps props = walFile2Props.get(path); |
| // typically this should not be null, but if there is no big issue if it is already null, so |
| // let's make the code more robust |
| if (props != null) { |
| props.closed = true; |
| cleanOldLogs(); |
| } |
| } |
| |
| /** |
| * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. |
| * <p/> |
| * Use synchronized because we may call this method in different threads, normally when replacing |
| * writer, and since now close writer may be asynchronous, we will also call this method in the |
| * closeExecutor, right after we actually close a WAL writer. |
| */ |
| private synchronized void cleanOldLogs() { |
| List<Pair<Path, Long>> logsToArchive = null; |
| long now = System.nanoTime(); |
| boolean mayLogTooOld = nextLogTooOldNs <= now; |
| ArrayList<byte[]> regionsBlockingWal = null; |
| // For each log file, look at its Map of regions to the highest sequence id; if all sequence ids |
| // are older than what is currently in memory, the WAL can be GC'd. |
| for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) { |
| if (!e.getValue().closed) { |
| LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey()); |
| continue; |
| } |
| Path log = e.getKey(); |
| ArrayList<byte[]> regionsBlockingThisWal = null; |
| long ageNs = now - e.getValue().rollTimeNs; |
| if (ageNs > walTooOldNs) { |
| if (mayLogTooOld && regionsBlockingWal == null) { |
| regionsBlockingWal = new ArrayList<>(); |
| } |
| regionsBlockingThisWal = regionsBlockingWal; |
| } |
| Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId; |
| if (this.sequenceIdAccounting.areAllLower(sequenceNums, regionsBlockingThisWal)) { |
| if (logsToArchive == null) { |
| logsToArchive = new ArrayList<>(); |
| } |
| logsToArchive.add(Pair.newPair(log, e.getValue().logSize)); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("WAL file ready for archiving " + log); |
| } |
| } else if (regionsBlockingThisWal != null) { |
| StringBuilder sb = new StringBuilder(log.toString()).append(" has not been archived for ") |
| .append(TimeUnit.NANOSECONDS.toSeconds(ageNs)).append(" seconds; blocked by: "); |
| boolean isFirst = true; |
| for (byte[] region : regionsBlockingThisWal) { |
| if (!isFirst) { |
| sb.append("; "); |
| } |
| isFirst = false; |
| sb.append(Bytes.toString(region)); |
| } |
| LOG.warn(sb.toString()); |
| nextLogTooOldNs = now + SURVIVED_TOO_LONG_LOG_INTERVAL_NS; |
| regionsBlockingThisWal.clear(); |
| } |
| } |
| |
| if (logsToArchive != null) { |
| final List<Pair<Path, Long>> localLogsToArchive = logsToArchive; |
| // make it async |
| for (Pair<Path, Long> log : localLogsToArchive) { |
| logArchiveExecutor.execute(() -> { |
| archive(log); |
| }); |
| this.walFile2Props.remove(log.getFirst()); |
| } |
| } |
| } |
| |
| protected void archive(final Pair<Path, Long> log) { |
| totalLogSize.addAndGet(-log.getSecond()); |
| int retry = 1; |
| while (true) { |
| try { |
| archiveLogFile(log.getFirst()); |
| // successful |
| break; |
| } catch (Throwable e) { |
| if (retry > archiveRetries) { |
| LOG.error("Failed log archiving for the log {},", log.getFirst(), e); |
| if (this.abortable != null) { |
| this.abortable.abort("Failed log archiving", e); |
| break; |
| } |
| } else { |
| LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, e); |
| } |
| retry++; |
| } |
| } |
| } |
| |
| /* |
| * only public so WALSplitter can use. |
| * @return archived location of a WAL file with the given path p |
| */ |
| public static Path getWALArchivePath(Path archiveDir, Path p) { |
| return new Path(archiveDir, p.getName()); |
| } |
| |
| protected void archiveLogFile(final Path p) throws IOException { |
| Path newPath = getWALArchivePath(this.walArchiveDir, p); |
| // Tell our listeners that a log is going to be archived. |
| if (!this.listeners.isEmpty()) { |
| for (WALActionsListener i : this.listeners) { |
| i.preLogArchive(p, newPath); |
| } |
| } |
| LOG.info("Archiving " + p + " to " + newPath); |
| if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { |
| throw new IOException("Unable to rename " + p + " to " + newPath); |
| } |
| // Tell our listeners that a log has been archived. |
| if (!this.listeners.isEmpty()) { |
| for (WALActionsListener i : this.listeners) { |
| i.postLogArchive(p, newPath); |
| } |
| } |
| } |
| |
| protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) { |
| int oldNumEntries = this.numEntries.getAndSet(0); |
| String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; |
| if (oldPath != null) { |
| this.walFile2Props.put(oldPath, |
| new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); |
| this.totalLogSize.addAndGet(oldFileLen); |
| LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", |
| CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), |
| newPathString); |
| } else { |
| LOG.info("New WAL {}", newPathString); |
| } |
| } |
| |
| private Span createSpan(String name) { |
| return TraceUtil.createSpan(name).setAttribute(WAL_IMPL, implClassName); |
| } |
| |
| /** |
| * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}. |
| * <p/> |
| * <ul> |
| * <li>In the case of creating a new WAL, oldPath will be null.</li> |
| * <li>In the case of rolling over from one file to the next, none of the parameters will be null. |
| * </li> |
| * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be |
| * null.</li> |
| * </ul> |
| * @param oldPath may be null |
| * @param newPath may be null |
| * @param nextWriter may be null |
| * @return the passed in <code>newPath</code> |
| * @throws IOException if there is a problem flushing or closing the underlying FS |
| */ |
| Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { |
| return TraceUtil.trace(() -> { |
| doReplaceWriter(oldPath, newPath, nextWriter); |
| return newPath; |
| }, () -> createSpan("WAL.replaceWriter")); |
| } |
| |
| protected final void blockOnSync(SyncFuture syncFuture) throws IOException { |
| // Now we have published the ringbuffer, halt the current thread until we get an answer back. |
| try { |
| if (syncFuture != null) { |
| if (closed) { |
| throw new IOException("WAL has been closed"); |
| } else { |
| syncFuture.get(walSyncTimeoutNs); |
| } |
| } |
| } catch (TimeoutIOException tioe) { |
| throw new WALSyncTimeoutIOException(tioe); |
| } catch (InterruptedException ie) { |
| LOG.warn("Interrupted", ie); |
| throw convertInterruptedExceptionToIOException(ie); |
| } catch (ExecutionException e) { |
| throw ensureIOException(e.getCause()); |
| } |
| } |
| |
| private static IOException ensureIOException(final Throwable t) { |
| return (t instanceof IOException) ? (IOException) t : new IOException(t); |
| } |
| |
| private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| IOException ioe = new InterruptedIOException(); |
| ioe.initCause(ie); |
| return ioe; |
| } |
| |
| private W createCombinedWriter(W localWriter, Path localPath) |
| throws IOException, CommonFSUtils.StreamLacksCapabilityException { |
| // retry forever if we can not create the remote writer to prevent aborting the RS due to log |
| // rolling error, unless the skipRemoteWal is set to true. |
| // TODO: since for now we only have one thread doing log rolling, this may block the rolling for |
| // other wals |
| Path remoteWAL = new Path(remoteWALDir, localPath.getName()); |
| for (int retry = 0;; retry++) { |
| if (skipRemoteWAL) { |
| return localWriter; |
| } |
| W remoteWriter; |
| try { |
| remoteWriter = createWriterInstance(remoteFs, remoteWAL); |
| } catch (IOException e) { |
| LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e); |
| try { |
| Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); |
| } catch (InterruptedException ie) { |
| // restore the interrupt state |
| Thread.currentThread().interrupt(); |
| // must close local writer here otherwise no one will close it for us |
| Closeables.close(localWriter, true); |
| throw (IOException) new InterruptedIOException().initCause(ie); |
| } |
| continue; |
| } |
| return createCombinedWriter(localWriter, remoteWriter); |
| } |
| } |
| |
| private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException { |
| rollWriterLock.lock(); |
| try { |
| if (this.closed) { |
| throw new WALClosedException("WAL has been closed"); |
| } |
| // Return if nothing to flush. |
| if (!force && this.writer != null && this.numEntries.get() <= 0) { |
| return null; |
| } |
| Map<byte[], List<byte[]>> regionsToFlush = null; |
| try { |
| Path oldPath = getOldPath(); |
| Path newPath = getNewPath(); |
| // Any exception from here on is catastrophic, non-recoverable, so we currently abort. |
| W nextWriter = this.createWriterInstance(fs, newPath); |
| if (remoteFs != null) { |
| // create a remote wal if necessary |
| nextWriter = createCombinedWriter(nextWriter, newPath); |
| } |
| tellListenersAboutPreLogRoll(oldPath, newPath); |
| // NewPath could be equal to oldPath if replaceWriter fails. |
| newPath = replaceWriter(oldPath, newPath, nextWriter); |
| tellListenersAboutPostLogRoll(oldPath, newPath); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Create new " + implClassName + " writer with pipeline: " |
| + Arrays.toString(getPipeline())); |
| } |
| // We got a new writer, so reset the slow sync count |
| lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); |
| slowSyncCount.set(0); |
| // Can we delete any of the old log files? |
| if (getNumRolledLogFiles() > 0) { |
| cleanOldLogs(); |
| regionsToFlush = findRegionsToForceFlush(); |
| } |
| } catch (CommonFSUtils.StreamLacksCapabilityException exception) { |
| // If the underlying FileSystem can't do what we ask, treat as IO failure, so |
| // we'll abort. |
| throw new IOException( |
| "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", |
| exception); |
| } |
| return regionsToFlush; |
| } finally { |
| rollWriterLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException { |
| return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter")); |
| } |
| |
| // public only until class moves to o.a.h.h.wal |
| /** Returns the size of log files in use */ |
| public long getLogFileSize() { |
| return this.totalLogSize.get(); |
| } |
| |
| // public only until class moves to o.a.h.h.wal |
| public void requestLogRoll() { |
| requestLogRoll(ERROR); |
| } |
| |
| /** |
| * Get the backing files associated with this WAL. |
| * @return may be null if there are no files. |
| */ |
| FileStatus[] getFiles() throws IOException { |
| return CommonFSUtils.listStatus(fs, walDir, ourFiles); |
| } |
| |
| @Override |
| public void shutdown() throws IOException { |
| if (!shutdown.compareAndSet(false, true)) { |
| return; |
| } |
| closed = true; |
| // Tell our listeners that the log is closing |
| if (!this.listeners.isEmpty()) { |
| for (WALActionsListener i : this.listeners) { |
| i.logCloseRequested(); |
| } |
| } |
| |
| ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor( |
| new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build()); |
| |
| Future<Void> future = shutdownExecutor.submit(new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) { |
| try { |
| doShutdown(); |
| if (syncFutureCache != null) { |
| syncFutureCache.clear(); |
| } |
| } finally { |
| rollWriterLock.unlock(); |
| } |
| } else { |
| throw new IOException("Waiting for rollWriterLock timeout"); |
| } |
| return null; |
| } |
| }); |
| shutdownExecutor.shutdown(); |
| |
| try { |
| future.get(walShutdownTimeout, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| throw new InterruptedIOException("Interrupted when waiting for shutdown WAL"); |
| } catch (TimeoutException e) { |
| throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but" |
| + " the shutdown of WAL doesn't complete! Please check the status of underlying " |
| + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS |
| + "\"", e); |
| } catch (ExecutionException e) { |
| if (e.getCause() instanceof IOException) { |
| throw (IOException) e.getCause(); |
| } else { |
| throw new IOException(e.getCause()); |
| } |
| } finally { |
| // in shutdown, we may call cleanOldLogs so shutdown this executor in the end. |
| // In sync replication implementation, we may shut down a WAL without shutting down the whole |
| // region server, if we shut down this executor earlier we may get reject execution exception |
| // and abort the region server |
| logArchiveExecutor.shutdown(); |
| } |
| // we also need to wait logArchive to finish if we want to a graceful shutdown as we may still |
| // have some pending archiving tasks not finished yet, and in close we may archive all the |
| // remaining WAL files, there could be race if we do not wait for the background archive task |
| // finish |
| try { |
| if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) { |
| throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but" |
| + " the shutdown of WAL doesn't complete! Please check the status of underlying " |
| + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS |
| + "\""); |
| } |
| } catch (InterruptedException e) { |
| throw new InterruptedIOException("Interrupted when waiting for shutdown WAL"); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| shutdown(); |
| final FileStatus[] files = getFiles(); |
| if (null != files && 0 != files.length) { |
| for (FileStatus file : files) { |
| Path p = getWALArchivePath(this.walArchiveDir, file.getPath()); |
| // Tell our listeners that a log is going to be archived. |
| if (!this.listeners.isEmpty()) { |
| for (WALActionsListener i : this.listeners) { |
| i.preLogArchive(file.getPath(), p); |
| } |
| } |
| |
| if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { |
| throw new IOException("Unable to rename " + file.getPath() + " to " + p); |
| } |
| // Tell our listeners that a log was archived. |
| if (!this.listeners.isEmpty()) { |
| for (WALActionsListener i : this.listeners) { |
| i.postLogArchive(file.getPath(), p); |
| } |
| } |
| } |
| LOG.debug( |
| "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir)); |
| } |
| LOG.info("Closed WAL: " + toString()); |
| } |
| |
| /** Returns number of WALs currently in the process of closing. */ |
| public int getInflightWALCloseCount() { |
| return inflightWALClosures.size(); |
| } |
| |
| /** |
| * updates the sequence number of a specific store. depending on the flag: replaces current seq |
| * number if the given seq id is bigger, or even if it is lower than existing one |
| */ |
| @Override |
| public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, |
| boolean onlyIfGreater) { |
| sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); |
| } |
| |
| protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) { |
| return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync); |
| } |
| |
| protected boolean isLogRollRequested() { |
| return rollRequested.get(); |
| } |
| |
| protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) { |
| // If we have already requested a roll, don't do it again |
| // And only set rollRequested to true when there is a registered listener |
| if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) { |
| for (WALActionsListener i : this.listeners) { |
| i.logRollRequested(reason); |
| } |
| } |
| } |
| |
| long getUnflushedEntriesCount() { |
| long highestSynced = this.highestSyncedTxid.get(); |
| long highestUnsynced = this.highestUnsyncedTxid; |
| return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced; |
| } |
| |
| boolean isUnflushedEntries() { |
| return getUnflushedEntriesCount() > 0; |
| } |
| |
| /** |
| * Exposed for testing only. Use to tricks like halt the ring buffer appending. |
| */ |
| protected void atHeadOfRingBufferEventHandlerAppend() { |
| // Noop |
| } |
| |
| protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException { |
| // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. |
| atHeadOfRingBufferEventHandlerAppend(); |
| long start = EnvironmentEdgeManager.currentTime(); |
| byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); |
| long regionSequenceId = entry.getKey().getSequenceId(); |
| |
| // Edits are empty, there is nothing to append. Maybe empty when we are looking for a |
| // region sequence id only, a region edit/sequence id that is not associated with an actual |
| // edit. It has to go through all the rigmarole to be sure we have the right ordering. |
| if (entry.getEdit().isEmpty()) { |
| return false; |
| } |
| |
| // Coprocessor hook. |
| coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); |
| if (!listeners.isEmpty()) { |
| for (WALActionsListener i : listeners) { |
| i.visitLogEntryBeforeWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); |
| } |
| } |
| doAppend(writer, entry); |
| assert highestUnsyncedTxid < entry.getTxid(); |
| highestUnsyncedTxid = entry.getTxid(); |
| if (entry.isCloseRegion()) { |
| // let's clean all the records of this region |
| sequenceIdAccounting.onRegionClose(encodedRegionName); |
| } else { |
| sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, |
| entry.isInMemStore()); |
| } |
| coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); |
| // Update metrics. |
| postAppend(entry, EnvironmentEdgeManager.currentTime() - start); |
| numEntries.incrementAndGet(); |
| return true; |
| } |
| |
| private long postAppend(final Entry e, final long elapsedTime) throws IOException { |
| long len = 0; |
| if (!listeners.isEmpty()) { |
| for (Cell cell : e.getEdit().getCells()) { |
| len += PrivateCellUtil.estimatedSerializedSizeOf(cell); |
| } |
| for (WALActionsListener listener : listeners) { |
| listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); |
| } |
| } |
| return len; |
| } |
| |
| protected final void postSync(long timeInNanos, int handlerSyncs) { |
| if (timeInNanos > this.slowSyncNs) { |
| String msg = new StringBuilder().append("Slow sync cost: ") |
| .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)).append(" ms, current pipeline: ") |
| .append(Arrays.toString(getPipeline())).toString(); |
| LOG.info(msg); |
| if (timeInNanos > this.rollOnSyncNs) { |
| // A single sync took too long. |
| // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative |
| // effects. Here we have a single data point that indicates we should take immediate |
| // action, so do so. |
| LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" |
| + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" |
| + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " |
| + Arrays.toString(getPipeline())); |
| requestLogRoll(SLOW_SYNC); |
| } |
| slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this |
| } |
| if (!listeners.isEmpty()) { |
| for (WALActionsListener listener : listeners) { |
| listener.postSync(timeInNanos, handlerSyncs); |
| } |
| } |
| } |
| |
| protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, |
| WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException { |
| if (this.closed) { |
| throw new IOException( |
| "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); |
| } |
| MutableLong txidHolder = new MutableLong(); |
| MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { |
| txidHolder.setValue(ringBuffer.next()); |
| }); |
| long txid = txidHolder.longValue(); |
| ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null); |
| try { |
| FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); |
| entry.stampRegionSequenceId(we); |
| ringBuffer.get(txid).load(entry); |
| } finally { |
| ringBuffer.publish(txid); |
| } |
| return txid; |
| } |
| |
| @Override |
| public String toString() { |
| return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; |
| } |
| |
| /** |
| * if the given {@code path} is being written currently, then return its length. |
| * <p> |
| * This is used by replication to prevent replicating unacked log entries. See |
| * https://issues.apache.org/jira/browse/HBASE-14004 for more details. |
| */ |
| @Override |
| public OptionalLong getLogFileSizeIfBeingWritten(Path path) { |
| rollWriterLock.lock(); |
| try { |
| Path currentPath = getOldPath(); |
| if (path.equals(currentPath)) { |
| // Currently active path. |
| W writer = this.writer; |
| return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty(); |
| } else { |
| W temp = inflightWALClosures.get(path.getName()); |
| if (temp != null) { |
| // In the process of being closed, trailer bytes may or may not be flushed. |
| // Ensuring that we read all the bytes in a file is critical for correctness of tailing |
| // use cases like replication, see HBASE-25924/HBASE-25932. |
| return OptionalLong.of(temp.getSyncedLength()); |
| } |
| // Log rolled successfully. |
| return OptionalLong.empty(); |
| } |
| } finally { |
| rollWriterLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { |
| return TraceUtil.trace(() -> append(info, key, edits, true), |
| () -> createSpan("WAL.appendData")); |
| } |
| |
| @Override |
| public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { |
| return TraceUtil.trace(() -> append(info, key, edits, false), |
| () -> createSpan("WAL.appendMarker")); |
| } |
| |
| /** |
| * Helper that marks the future as DONE and offers it back to the cache. |
| */ |
| protected void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) { |
| future.done(txid, t); |
| syncFutureCache.offer(future); |
| } |
| |
| private static boolean waitingRoll(int epochAndState) { |
| return (epochAndState & 1) != 0; |
| } |
| |
| private static boolean writerBroken(int epochAndState) { |
| return ((epochAndState >>> 1) & 1) != 0; |
| } |
| |
| private static int epoch(int epochAndState) { |
| return epochAndState >>> 2; |
| } |
| |
| // return whether we have successfully set readyForRolling to true. |
| private boolean trySetReadyForRolling() { |
| // Check without holding lock first. Usually we will just return here. |
| // waitingRoll is volatile and unacedEntries is only accessed inside event loop, so it is safe |
| // to check them outside the consumeLock. |
| if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { |
| return false; |
| } |
| consumeLock.lock(); |
| try { |
| // 1. a roll is requested |
| // 2. all out-going entries have been acked(we have confirmed above). |
| if (waitingRoll(epochAndState)) { |
| readyForRolling = true; |
| readyForRollingCond.signalAll(); |
| return true; |
| } else { |
| return false; |
| } |
| } finally { |
| consumeLock.unlock(); |
| } |
| } |
| |
| private void syncFailed(long epochWhenSync, Throwable error) { |
| LOG.warn("sync failed", error); |
| this.onException(epochWhenSync, error); |
| } |
| |
| private void onException(long epochWhenSync, Throwable error) { |
| boolean shouldRequestLogRoll = true; |
| consumeLock.lock(); |
| try { |
| int currentEpochAndState = epochAndState; |
| if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { |
| // this is not the previous writer which means we have already rolled the writer. |
| // or this is still the current writer, but we have already marked it as broken and request |
| // a roll. |
| return; |
| } |
| this.epochAndState = currentEpochAndState | 0b10; |
| if (waitingRoll(currentEpochAndState)) { |
| readyForRolling = true; |
| readyForRollingCond.signalAll(); |
| // this means we have already in the middle of a rollWriter so just tell the roller thread |
| // that you can continue without requesting an extra log roll. |
| shouldRequestLogRoll = false; |
| } |
| } finally { |
| consumeLock.unlock(); |
| } |
| for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) { |
| toWriteAppends.addFirst(iter.next()); |
| } |
| highestUnsyncedTxid = highestSyncedTxid.get(); |
| if (shouldRequestLogRoll) { |
| // request a roll. |
| requestLogRoll(ERROR); |
| } |
| } |
| |
| private void syncCompleted(long epochWhenSync, W writer, long processedTxid, long startTimeNs) { |
| // Please see the last several comments on HBASE-22761, it is possible that we get a |
| // syncCompleted which acks a previous sync request after we received a syncFailed on the same |
| // writer. So here we will also check on the epoch and state, if the epoch has already been |
| // changed, i.e, we have already rolled the writer, or the writer is already broken, we should |
| // just skip here, to avoid mess up the state or accidentally release some WAL entries and |
| // cause data corruption. |
| // The syncCompleted call is on the critical write path, so we should try our best to make it |
| // fast. So here we do not hold consumeLock, for increasing performance. It is safe because |
| // there are only 3 possible situations: |
| // 1. For normal case, the only place where we change epochAndState is when rolling the writer. |
| // Before rolling actually happen, we will only change the state to waitingRoll which is another |
| // bit than writerBroken, and when we actually change the epoch, we can make sure that there is |
| // no outgoing sync request. So we will always pass the check here and there is no problem. |
| // 2. The writer is broken, but we have not called syncFailed yet. In this case, since |
| // syncFailed and syncCompleted are executed in the same thread, we will just face the same |
| // situation with #1. |
| // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are |
| // only 2 possible situations: |
| // a. we arrive before we actually roll the writer, then we will find out the writer is broken |
| // and give up. |
| // b. we arrive after we actually roll the writer, then we will find out the epoch is changed |
| // and give up. |
| // For both #a and #b, we do not need to hold the consumeLock as we will always update the |
| // epochAndState as a whole. |
| // So in general, for all the cases above, we do not need to hold the consumeLock. |
| int epochAndState = this.epochAndState; |
| if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) { |
| LOG.warn("Got a sync complete call after the writer is broken, skip"); |
| return; |
| } |
| |
| if (processedTxid < highestSyncedTxid.get()) { |
| return; |
| } |
| highestSyncedTxid.set(processedTxid); |
| for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) { |
| FSWALEntry entry = iter.next(); |
| if (entry.getTxid() <= processedTxid) { |
| entry.release(); |
| iter.remove(); |
| } else { |
| break; |
| } |
| } |
| postSync(System.nanoTime() - startTimeNs, finishSync()); |
| /** |
| * This method is used to be compatible with the original logic of {@link FSHLog}. |
| */ |
| checkSlowSyncCount(); |
| if (trySetReadyForRolling()) { |
| // we have just finished a roll, then do not need to check for log rolling, the writer will be |
| // closed soon. |
| return; |
| } |
| // If we haven't already requested a roll, check if we have exceeded logrollsize |
| if (!isLogRollRequested() && writer.getLength() > logrollsize) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength() |
| + ", logrollsize=" + logrollsize); |
| } |
| requestLogRoll(SIZE); |
| } |
| } |
| |
| // find all the sync futures between these two txids to see if we need to issue a hsync, if no |
| // sync futures then just use the default one. |
| private boolean isHsync(long beginTxid, long endTxid) { |
| SortedSet<SyncFuture> futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false), |
| new SyncFuture().reset(endTxid + 1, false)); |
| if (futures.isEmpty()) { |
| return useHsync; |
| } |
| for (SyncFuture future : futures) { |
| if (future.isForceSync()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private void sync(W writer) { |
| fileLengthAtLastSync = writer.getLength(); |
| long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; |
| boolean shouldUseHsync = |
| isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid); |
| highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; |
| final long startTimeNs = System.nanoTime(); |
| final long epoch = (long) epochAndState >>> 2L; |
| addListener(doWriterSync(writer, shouldUseHsync, currentHighestProcessedAppendTxid), |
| (result, error) -> { |
| if (error != null) { |
| syncFailed(epoch, error); |
| } else { |
| long syncedTxid = getSyncedTxid(currentHighestProcessedAppendTxid, result); |
| syncCompleted(epoch, writer, syncedTxid, startTimeNs); |
| } |
| }, consumeExecutor); |
| } |
| |
| /** |
| * This method is to adapt {@link FSHLog} and {@link AsyncFSWAL}. For {@link AsyncFSWAL}, we use |
| * {@link AbstractFSWAL#highestProcessedAppendTxid} at the point we calling |
| * {@link AsyncFSWAL#doWriterSync} method as successful syncedTxid. For {@link FSHLog}, because we |
| * use multi-thread {@code SyncRunner}s, we used the result of {@link CompletableFuture} as |
| * successful syncedTxid. |
| */ |
| protected long getSyncedTxid(long processedTxid, long completableFutureResult) { |
| return processedTxid; |
| } |
| |
| protected abstract CompletableFuture<Long> doWriterSync(W writer, boolean shouldUseHsync, |
| long txidWhenSyn); |
| |
| private int finishSyncLowerThanTxid(long txid) { |
| int finished = 0; |
| for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) { |
| SyncFuture sync = iter.next(); |
| if (sync.getTxid() <= txid) { |
| markFutureDoneAndOffer(sync, txid, null); |
| iter.remove(); |
| finished++; |
| } else { |
| break; |
| } |
| } |
| return finished; |
| } |
| |
| // try advancing the highestSyncedTxid as much as possible |
| private int finishSync() { |
| if (unackedAppends.isEmpty()) { |
| // All outstanding appends have been acked. |
| if (toWriteAppends.isEmpty()) { |
| // Also no appends that wait to be written out, then just finished all pending syncs. |
| long maxSyncTxid = highestSyncedTxid.get(); |
| for (SyncFuture sync : syncFutures) { |
| maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); |
| markFutureDoneAndOffer(sync, maxSyncTxid, null); |
| } |
| highestSyncedTxid.set(maxSyncTxid); |
| int finished = syncFutures.size(); |
| syncFutures.clear(); |
| return finished; |
| } else { |
| // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so |
| // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between |
| // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. |
| long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid(); |
| assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; |
| long doneTxid = lowestUnprocessedAppendTxid - 1; |
| highestSyncedTxid.set(doneTxid); |
| return finishSyncLowerThanTxid(doneTxid); |
| } |
| } else { |
| // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the |
| // first unacked append minus 1. |
| long lowestUnackedAppendTxid = unackedAppends.peek().getTxid(); |
| long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get()); |
| highestSyncedTxid.set(doneTxid); |
| return finishSyncLowerThanTxid(doneTxid); |
| } |
| } |
| |
| // confirm non-empty before calling |
| private static long getLastTxid(Deque<FSWALEntry> queue) { |
| return queue.peekLast().getTxid(); |
| } |
| |
| private void appendAndSync() throws IOException { |
| final W writer = this.writer; |
| // maybe a sync request is not queued when we issue a sync, so check here to see if we could |
| // finish some. |
| finishSync(); |
| long newHighestProcessedAppendTxid = -1L; |
| // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single |
| // threaded, this could save us some cycles |
| boolean addedToUnackedAppends = false; |
| for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) { |
| FSWALEntry entry = iter.next(); |
| /** |
| * For {@link FSHog},here may throw IOException,but for {@link AsyncFSWAL}, here would not |
| * throw any IOException. |
| */ |
| boolean appended = appendEntry(writer, entry); |
| newHighestProcessedAppendTxid = entry.getTxid(); |
| iter.remove(); |
| if (appended) { |
| // This is possible, when we fail to sync, we will add the unackedAppends back to |
| // toWriteAppends, so here we may get an entry which is already in the unackedAppends. |
| if ( |
| addedToUnackedAppends || unackedAppends.isEmpty() |
| || getLastTxid(unackedAppends) < entry.getTxid() |
| ) { |
| unackedAppends.addLast(entry); |
| addedToUnackedAppends = true; |
| } |
| // See HBASE-25905, here we need to make sure that, we will always write all the entries in |
| // unackedAppends out. As the code in the consume method will assume that, the entries in |
| // unackedAppends have all been sent out so if there is roll request and unackedAppends is |
| // not empty, we could just return as later there will be a syncCompleted call to clear the |
| // unackedAppends, or a syncFailed to lead us to another state. |
| // There could be other ways to fix, such as changing the logic in the consume method, but |
| // it will break the assumption and then (may) lead to a big refactoring. So here let's use |
| // this way to fix first, can optimize later. |
| if ( |
| writer.getLength() - fileLengthAtLastSync >= batchSize |
| && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends)) |
| ) { |
| break; |
| } |
| } |
| } |
| // if we have a newer transaction id, update it. |
| // otherwise, use the previous transaction id. |
| if (newHighestProcessedAppendTxid > 0) { |
| highestProcessedAppendTxid = newHighestProcessedAppendTxid; |
| } else { |
| newHighestProcessedAppendTxid = highestProcessedAppendTxid; |
| } |
| |
| if (writer.getLength() - fileLengthAtLastSync >= batchSize) { |
| // sync because buffer size limit. |
| sync(writer); |
| return; |
| } |
| if (writer.getLength() == fileLengthAtLastSync) { |
| // we haven't written anything out, just advance the highestSyncedSequence since we may only |
| // stamp some region sequence id. |
| if (unackedAppends.isEmpty()) { |
| highestSyncedTxid.set(highestProcessedAppendTxid); |
| finishSync(); |
| trySetReadyForRolling(); |
| } |
| return; |
| } |
| // reach here means that we have some unsynced data but haven't reached the batch size yet, |
| // but we will not issue a sync directly here even if there are sync requests because we may |
| // have some new data in the ringbuffer, so let's just return here and delay the decision of |
| // whether to issue a sync in the caller method. |
| } |
| |
| private void consume() { |
| consumeLock.lock(); |
| try { |
| int currentEpochAndState = epochAndState; |
| if (writerBroken(currentEpochAndState)) { |
| return; |
| } |
| if (waitingRoll(currentEpochAndState)) { |
| if (writer.getLength() > fileLengthAtLastSync) { |
| // issue a sync |
| sync(writer); |
| } else { |
| if (unackedAppends.isEmpty()) { |
| readyForRolling = true; |
| readyForRollingCond.signalAll(); |
| } |
| } |
| return; |
| } |
| } finally { |
| consumeLock.unlock(); |
| } |
| long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; |
| for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor |
| <= cursorBound; nextCursor++) { |
| if (!waitingConsumePayloads.isPublished(nextCursor)) { |
| break; |
| } |
| RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); |
| switch (truck.type()) { |
| case APPEND: |
| toWriteAppends.addLast(truck.unloadAppend()); |
| break; |
| case SYNC: |
| syncFutures.add(truck.unloadSync()); |
| break; |
| default: |
| LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); |
| break; |
| } |
| waitingConsumePayloadsGatingSequence.set(nextCursor); |
| } |
| |
| /** |
| * This method is used to be compatible with the original logic of {@link AsyncFSWAL}. |
| */ |
| if (markerEditOnly) { |
| drainNonMarkerEditsAndFailSyncs(); |
| } |
| try { |
| appendAndSync(); |
| } catch (IOException exception) { |
| /** |
| * For {@link FSHog},here may catch IOException,but for {@link AsyncFSWAL}, the code doesn't |
| * go in here. |
| */ |
| LOG.error("appendAndSync throws IOException.", exception); |
| onAppendEntryFailed(exception); |
| return; |
| } |
| if (hasConsumerTask.get()) { |
| return; |
| } |
| if (toWriteAppends.isEmpty()) { |
| if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { |
| consumerScheduled.set(false); |
| // recheck here since in append and sync we do not hold the consumeLock. Thing may |
| // happen like |
| // 1. we check cursor, no new entry |
| // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and |
| // give up scheduling the consumer task. |
| // 3. we set consumerScheduled to false and also give up scheduling consumer task. |
| if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { |
| // we will give up consuming so if there are some unsynced data we need to issue a sync. |
| if ( |
| writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() |
| && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync |
| ) { |
| // no new data in the ringbuffer and we have at least one sync request |
| sync(writer); |
| } |
| return; |
| } else { |
| // maybe someone has grabbed this before us |
| if (!consumerScheduled.compareAndSet(false, true)) { |
| return; |
| } |
| } |
| } |
| } |
| // reschedule if we still have something to write. |
| consumeExecutor.execute(consumer); |
| } |
| |
| private boolean shouldScheduleConsumer() { |
| int currentEpochAndState = epochAndState; |
| if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { |
| return false; |
| } |
| return consumerScheduled.compareAndSet(false, true); |
| } |
| |
| /** |
| * Append a set of edits to the WAL. |
| * <p/> |
| * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must |
| * have its region edit/sequence id assigned else it messes up our unification of mvcc and |
| * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in. |
| * <p/> |
| * NOTE: This appends, at a time that is usually after this call returns, starts a mvcc |
| * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment |
| * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must |
| * 'complete' the transaction this mvcc transaction by calling |
| * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it |
| * in the finally of a try/finally block within which this appends lives and any subsequent |
| * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the |
| * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not |
| * immediately available on return from this method. It WILL be available subsequent to a sync of |
| * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. |
| * @param hri the regioninfo associated with append |
| * @param key Modified by this call; we add to it this edits region edit/sequence id. |
| * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit |
| * sequence id that is after all currently appended edits. |
| * @param inMemstore Always true except for case where we are writing a region event meta marker |
| * edit, for example, a compaction completion record into the WAL or noting a |
| * Region Open event. In these cases the entry is just so we can finish an |
| * unfinished compaction after a crash when the new Server reads the WAL on |
| * recovery, etc. These transition event 'Markers' do not go via the memstore. |
| * When memstore is false, we presume a Marker event edit. |
| * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id |
| * in it. |
| */ |
| protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) |
| throws IOException { |
| if (markerEditOnly && !edits.isMetaEdit()) { |
| throw new IOException("WAL is closing, only marker edit is allowed"); |
| } |
| long txid = |
| stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); |
| if (shouldScheduleConsumer()) { |
| consumeExecutor.execute(consumer); |
| } |
| return txid; |
| } |
| |
| protected void doSync(boolean forceSync) throws IOException { |
| long txid = waitingConsumePayloads.next(); |
| SyncFuture future; |
| try { |
| future = getSyncFuture(txid, forceSync); |
| RingBufferTruck truck = waitingConsumePayloads.get(txid); |
| truck.load(future); |
| } finally { |
| waitingConsumePayloads.publish(txid); |
| } |
| if (shouldScheduleConsumer()) { |
| consumeExecutor.execute(consumer); |
| } |
| blockOnSync(future); |
| } |
| |
| protected void doSync(long txid, boolean forceSync) throws IOException { |
| if (highestSyncedTxid.get() >= txid) { |
| return; |
| } |
| // here we do not use ring buffer sequence as txid |
| long sequence = waitingConsumePayloads.next(); |
| SyncFuture future; |
| try { |
| future = getSyncFuture(txid, forceSync); |
| RingBufferTruck truck = waitingConsumePayloads.get(sequence); |
| truck.load(future); |
| } finally { |
| waitingConsumePayloads.publish(sequence); |
| } |
| if (shouldScheduleConsumer()) { |
| consumeExecutor.execute(consumer); |
| } |
| blockOnSync(future); |
| } |
| |
| private void drainNonMarkerEditsAndFailSyncs() { |
| if (toWriteAppends.isEmpty()) { |
| return; |
| } |
| boolean hasNonMarkerEdits = false; |
| Iterator<FSWALEntry> iter = toWriteAppends.descendingIterator(); |
| while (iter.hasNext()) { |
| FSWALEntry entry = iter.next(); |
| if (!entry.getEdit().isMetaEdit()) { |
| entry.release(); |
| hasNonMarkerEdits = true; |
| break; |
| } |
| } |
| if (hasNonMarkerEdits) { |
| for (;;) { |
| iter.remove(); |
| if (!iter.hasNext()) { |
| break; |
| } |
| iter.next().release(); |
| } |
| for (FSWALEntry entry : unackedAppends) { |
| entry.release(); |
| } |
| unackedAppends.clear(); |
| // fail the sync futures which are under the txid of the first remaining edit, if none, fail |
| // all the sync futures. |
| long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid(); |
| IOException error = new IOException("WAL is closing, only marker edit is allowed"); |
| for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) { |
| SyncFuture future = syncIter.next(); |
| if (future.getTxid() < txid) { |
| markFutureDoneAndOffer(future, future.getTxid(), error); |
| syncIter.remove(); |
| } else { |
| break; |
| } |
| } |
| } |
| } |
| |
| protected abstract W createWriterInstance(FileSystem fs, Path path) |
| throws IOException, CommonFSUtils.StreamLacksCapabilityException; |
| |
| protected abstract W createCombinedWriter(W localWriter, W remoteWriter); |
| |
| protected final void waitForSafePoint() { |
| consumeLock.lock(); |
| try { |
| int currentEpochAndState = epochAndState; |
| if (writerBroken(currentEpochAndState) || this.writer == null) { |
| return; |
| } |
| consumerScheduled.set(true); |
| epochAndState = currentEpochAndState | 1; |
| readyForRolling = false; |
| consumeExecutor.execute(consumer); |
| while (!readyForRolling) { |
| readyForRollingCond.awaitUninterruptibly(); |
| } |
| } finally { |
| consumeLock.unlock(); |
| } |
| } |
| |
| protected final void closeWriter(W writer, Path path) { |
| inflightWALClosures.put(path.getName(), writer); |
| closeExecutor.execute(() -> { |
| try { |
| writer.close(); |
| } catch (IOException e) { |
| LOG.warn("close old writer failed", e); |
| } finally { |
| // call this even if the above close fails, as there is no other chance we can set closed to |
| // true, it will not cause big problems. |
| markClosedAndClean(path); |
| inflightWALClosures.remove(path.getName()); |
| } |
| }); |
| } |
| |
| /** |
| * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer |
| * will begin to work before returning from this method. If we clear the flag after returning from |
| * this call, we may miss a roll request. The implementation class should choose a proper place to |
| * clear the {@link #rollRequested} flag, so we do not miss a roll request, typically before you |
| * start writing to the new writer. |
| */ |
| protected void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { |
| Preconditions.checkNotNull(nextWriter); |
| waitForSafePoint(); |
| /** |
| * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}. |
| */ |
| doCleanUpResources(); |
| // we will call rollWriter in init method, where we want to create the first writer and |
| // obviously the previous writer is null, so here we need this null check. And why we must call |
| // logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after |
| // closing the writer asynchronously, we need to make sure the WALProps is put into |
| // walFile2Props before we call markClosedAndClean |
| if (writer != null) { |
| long oldFileLen = writer.getLength(); |
| logRollAndSetupWalProps(oldPath, newPath, oldFileLen); |
| closeWriter(writer, oldPath); |
| } else { |
| logRollAndSetupWalProps(oldPath, newPath, 0); |
| } |
| this.writer = nextWriter; |
| /** |
| * Here is used for {@link AsyncFSWAL} and {@link FSHLog} to set the under layer filesystem |
| * output after writer is replaced. |
| */ |
| onWriterReplaced(nextWriter); |
| this.fileLengthAtLastSync = nextWriter.getLength(); |
| this.highestProcessedAppendTxidAtLastSync = 0L; |
| consumeLock.lock(); |
| try { |
| consumerScheduled.set(true); |
| int currentEpoch = epochAndState >>> 2; |
| int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; |
| // set a new epoch and also clear waitingRoll and writerBroken |
| this.epochAndState = nextEpoch << 2; |
| // Reset rollRequested status |
| rollRequested.set(false); |
| consumeExecutor.execute(consumer); |
| } finally { |
| consumeLock.unlock(); |
| } |
| } |
| |
| protected abstract void onWriterReplaced(W nextWriter); |
| |
| protected void doShutdown() throws IOException { |
| waitForSafePoint(); |
| /** |
| * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}. |
| */ |
| doCleanUpResources(); |
| if (this.writer != null) { |
| closeWriter(this.writer, getOldPath()); |
| this.writer = null; |
| } |
| closeExecutor.shutdown(); |
| try { |
| if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { |
| LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" |
| + " the close of async writer doesn't complete." |
| + "Please check the status of underlying filesystem" |
| + " or increase the wait time by the config \"" + this.waitOnShutdownInSecondsConfigKey |
| + "\""); |
| } |
| } catch (InterruptedException e) { |
| LOG.error("The wait for close of async writer is interrupted"); |
| Thread.currentThread().interrupt(); |
| } |
| IOException error = new IOException("WAL has been closed"); |
| long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; |
| // drain all the pending sync requests |
| for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor |
| <= cursorBound; nextCursor++) { |
| if (!waitingConsumePayloads.isPublished(nextCursor)) { |
| break; |
| } |
| RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); |
| switch (truck.type()) { |
| case SYNC: |
| syncFutures.add(truck.unloadSync()); |
| break; |
| default: |
| break; |
| } |
| } |
| // and fail them |
| syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); |
| if (this.shouldShutDownConsumeExecutorWhenClose) { |
| consumeExecutor.shutdown(); |
| } |
| } |
| |
| protected void doCleanUpResources() { |
| }; |
| |
| protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; |
| |
| /** |
| * This method gets the pipeline for the current WAL. |
| */ |
| abstract DatanodeInfo[] getPipeline(); |
| |
| /** |
| * This method gets the datanode replication count for the current WAL. |
| */ |
| abstract int getLogReplication(); |
| |
| protected abstract boolean doCheckLogLowReplication(); |
| |
| protected boolean isWriterBroken() { |
| return writerBroken(epochAndState); |
| } |
| |
| private void onAppendEntryFailed(IOException exception) { |
| LOG.warn("append entry failed", exception); |
| final long currentEpoch = (long) epochAndState >>> 2L; |
| this.onException(currentEpoch, exception); |
| } |
| |
| protected void checkSlowSyncCount() { |
| } |
| |
| /** Returns true if we exceeded the slow sync roll threshold over the last check interval */ |
| protected boolean doCheckSlowSync() { |
| boolean result = false; |
| long now = EnvironmentEdgeManager.currentTime(); |
| long elapsedTime = now - lastTimeCheckSlowSync; |
| if (elapsedTime >= slowSyncCheckInterval) { |
| if (slowSyncCount.get() >= slowSyncRollThreshold) { |
| if (elapsedTime >= (2 * slowSyncCheckInterval)) { |
| // If two or more slowSyncCheckInterval have elapsed this is a corner case |
| // where a train of slow syncs almost triggered us but then there was a long |
| // interval from then until the one more that pushed us over. If so, we |
| // should do nothing and let the count reset. |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("checkSlowSync triggered but we decided to ignore it; " + "count=" |
| + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", elapsedTime=" |
| + elapsedTime + " ms, slowSyncCheckInterval=" + slowSyncCheckInterval + " ms"); |
| } |
| // Fall through to count reset below |
| } else { |
| LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" |
| + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", current pipeline: " |
| + Arrays.toString(getPipeline())); |
| result = true; |
| } |
| } |
| lastTimeCheckSlowSync = now; |
| slowSyncCount.set(0); |
| } |
| return result; |
| } |
| |
| public void checkLogLowReplication(long checkInterval) { |
| long now = EnvironmentEdgeManager.currentTime(); |
| if (now - lastTimeCheckLowReplication < checkInterval) { |
| return; |
| } |
| // Will return immediately if we are in the middle of a WAL log roll currently. |
| if (!rollWriterLock.tryLock()) { |
| return; |
| } |
| try { |
| lastTimeCheckLowReplication = now; |
| if (doCheckLogLowReplication()) { |
| requestLogRoll(LOW_REPLICATION); |
| } |
| } finally { |
| rollWriterLock.unlock(); |
| } |
| } |
| |
| // Allow temporarily skipping the creation of remote writer. When failing to write to the remote |
| // dfs cluster, we need to reopen the regions and switch to use the original wal writer. But we |
| // need to write a close marker when closing a region, and if it fails, the whole rs will abort. |
| // So here we need to skip the creation of remote writer and make it possible to write the region |
| // close marker. |
| // Setting markerEdit only to true is for transiting from A to S, where we need to give up writing |
| // any pending wal entries as they will be discarded. The remote cluster will replicate the |
| // correct data back later. We still need to allow writing marker edits such as close region event |
| // to allow closing a region. |
| @Override |
| public void skipRemoteWAL(boolean markerEditOnly) { |
| if (markerEditOnly) { |
| this.markerEditOnly = true; |
| } |
| this.skipRemoteWAL = true; |
| } |
| |
| private static void split(final Configuration conf, final Path p) throws IOException { |
| FileSystem fs = CommonFSUtils.getWALFileSystem(conf); |
| if (!fs.exists(p)) { |
| throw new FileNotFoundException(p.toString()); |
| } |
| if (!fs.getFileStatus(p).isDirectory()) { |
| throw new IOException(p + " is not a directory"); |
| } |
| |
| final Path baseDir = CommonFSUtils.getWALRootDir(conf); |
| Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); |
| if ( |
| conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, |
| AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR) |
| ) { |
| archiveDir = new Path(archiveDir, p.getName()); |
| } |
| WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); |
| } |
| |
| W getWriter() { |
| return this.writer; |
| } |
| |
| private static void usage() { |
| System.err.println("Usage: AbstractFSWAL <ARGS>"); |
| System.err.println("Arguments:"); |
| System.err.println(" --dump Dump textual representation of passed one or more files"); |
| System.err.println(" For example: " |
| + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE"); |
| System.err.println(" --split Split the passed directory of WAL logs"); |
| System.err.println( |
| " For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR"); |
| } |
| |
| /** |
| * Pass one or more log file names, and it will either dump out a text version on |
| * <code>stdout</code> or split the specified log files. |
| */ |
| public static void main(String[] args) throws IOException { |
| if (args.length < 2) { |
| usage(); |
| System.exit(-1); |
| } |
| // either dump using the WALPrettyPrinter or split, depending on args |
| if (args[0].compareTo("--dump") == 0) { |
| WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); |
| } else if (args[0].compareTo("--perf") == 0) { |
| LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:"); |
| LOG.error(HBaseMarkers.FATAL, |
| "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]); |
| System.exit(-1); |
| } else if (args[0].compareTo("--split") == 0) { |
| Configuration conf = HBaseConfiguration.create(); |
| for (int i = 1; i < args.length; i++) { |
| try { |
| Path logPath = new Path(args[i]); |
| CommonFSUtils.setFsDefault(conf, logPath); |
| split(conf, logPath); |
| } catch (IOException t) { |
| t.printStackTrace(System.err); |
| System.exit(-1); |
| } |
| } |
| } else { |
| usage(); |
| System.exit(-1); |
| } |
| } |
| } |