| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.wal; |
| |
| import java.io.EOFException; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.text.ParseException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TreeMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.commons.lang3.ArrayUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; |
| import org.apache.hadoop.hbase.master.SplitLogManager; |
| import org.apache.hadoop.hbase.monitoring.MonitoredTask; |
| import org.apache.hadoop.hbase.monitoring.TaskMonitor; |
| import org.apache.hadoop.hbase.procedure2.util.StringUtils; |
| import org.apache.hadoop.hbase.regionserver.LastSequenceId; |
| import org.apache.hadoop.hbase.regionserver.RegionServerServices; |
| import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.CancelableProgressable; |
| import org.apache.hadoop.hbase.util.CommonFSUtils; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; |
| import org.apache.hadoop.hbase.wal.WAL.Entry; |
| import org.apache.hadoop.hbase.wal.WAL.Reader; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; |
| import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; |
| |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; |
| |
| /** |
| * Split RegionServer WAL files. Splits the WAL into new files, |
| * one per region, to be picked up on Region reopen. Deletes the split WAL when finished. |
| * Create an instance and call {@link #splitWAL(FileStatus, CancelableProgressable)} per file or |
| * use static helper methods. |
| */ |
| @InterfaceAudience.Private |
| public class WALSplitter { |
| private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class); |
| public static final String SPLIT_SKIP_ERRORS_KEY = "hbase.hlog.split.skip.errors"; |
| |
| /** |
| * By default we retry errors in splitting, rather than skipping. |
| */ |
| public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; |
| |
| // Parameters for split process |
| protected final Path walRootDir; |
| protected final FileSystem walFS; |
| protected final Configuration conf; |
| final Path rootDir; |
| final FileSystem rootFS; |
| final RegionServerServices rsServices; |
| |
| // Major subcomponents of the split process. |
| // These are separated into inner classes to make testing easier. |
| OutputSink outputSink; |
| private EntryBuffers entryBuffers; |
| |
| /** |
| * Coordinator for split log. Used by the zk-based log splitter. |
| * Not used by the procedure v2-based log splitter. |
| */ |
| private SplitLogWorkerCoordination splitLogWorkerCoordination; |
| |
| private final WALFactory walFactory; |
| |
| // For checking the latest flushed sequence id |
| protected final LastSequenceId sequenceIdChecker; |
| |
| // Map encodedRegionName -> lastFlushedSequenceId |
| protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>(); |
| |
| // Map encodedRegionName -> maxSeqIdInStores |
| protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>(); |
| |
| // the file being split currently |
| private FileStatus fileBeingSplit; |
| |
| private final String tmpDirName; |
| |
| /** |
| * Split WAL directly to hfiles instead of into intermediary 'recovered.edits' files. |
| */ |
| public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile"; |
| public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false; |
| |
| /** |
| * True if we are to run with bounded amount of writers rather than let the count blossom. |
| * Default is 'false'. Does not apply if you have set 'hbase.wal.split.to.hfile' as that |
| * is always bounded. Only applies when you are doing recovery to 'recovered.edits' |
| * files (the old default). Bounded writing tends to have higher throughput. |
| */ |
| public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; |
| |
| public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize"; |
| public final static String SPLIT_WAL_WRITER_THREADS = |
| "hbase.regionserver.hlog.splitlog.writer.threads"; |
| |
| private final int numWriterThreads; |
| private final long bufferSize; |
| private final boolean splitWriterCreationBounded; |
| private final boolean hfile; |
| private final boolean skipErrors; |
| |
| WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir, |
| FileSystem walFS, Path rootDir, FileSystem rootFS) { |
| this(factory, conf, walRootDir, walFS, rootDir, rootFS, null, null, null); |
| } |
| |
| WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir, |
| FileSystem walFS, Path rootDir, FileSystem rootFS, LastSequenceId idChecker, |
| SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) { |
| this.conf = HBaseConfiguration.create(conf); |
| String codecClassName = |
| conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); |
| this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); |
| this.walRootDir = walRootDir; |
| this.walFS = walFS; |
| this.rootDir = rootDir; |
| this.rootFS = rootFS; |
| this.sequenceIdChecker = idChecker; |
| this.splitLogWorkerCoordination = splitLogWorkerCoordination; |
| this.rsServices = rsServices; |
| this.walFactory = factory; |
| this.tmpDirName = |
| conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); |
| // if we limit the number of writers opened for sinking recovered edits |
| this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); |
| this.bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024); |
| this.numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3); |
| this.hfile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE); |
| this.skipErrors = conf.getBoolean(SPLIT_SKIP_ERRORS_KEY, SPLIT_SKIP_ERRORS_DEFAULT); |
| } |
| |
| WALFactory getWalFactory() { |
| return this.walFactory; |
| } |
| |
| FileStatus getFileBeingSplit() { |
| return fileBeingSplit; |
| } |
| |
| String getTmpDirName() { |
| return this.tmpDirName; |
| } |
| |
| Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() { |
| return regionMaxSeqIdInStores; |
| } |
| |
| /** |
| * Splits a WAL file. |
| * Used by old {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and tests. |
| * Not used by new procedure-based WAL splitter. |
| * |
| * @return false if it is interrupted by the progress-able. |
| */ |
| public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, |
| Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, |
| SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory, |
| RegionServerServices rsServices) throws IOException { |
| Path rootDir = CommonFSUtils.getRootDir(conf); |
| FileSystem rootFS = rootDir.getFileSystem(conf); |
| WALSplitter splitter = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker, |
| splitLogWorkerCoordination, rsServices); |
| // splitWAL returns a data structure with whether split is finished and if the file is corrupt. |
| // We don't need to propagate corruption flag here because it is propagated by the |
| // SplitLogWorkerCoordination. |
| return splitter.splitWAL(logfile, reporter).isFinished(); |
| } |
| |
| /** |
| * Split a folder of WAL files. Delete the directory when done. |
| * Used by tools and unit tests. It should be package private. |
| * It is public only because TestWALObserver is in a different package, |
| * which uses this method to do log splitting. |
| * @return List of output files created by the split. |
| */ |
| public static List<Path> split(Path walRootDir, Path walsDir, Path archiveDir, FileSystem walFS, |
| Configuration conf, final WALFactory factory) throws IOException { |
| Path rootDir = CommonFSUtils.getRootDir(conf); |
| FileSystem rootFS = rootDir.getFileSystem(conf); |
| WALSplitter splitter = new WALSplitter(factory, conf, walRootDir, walFS, rootDir, rootFS); |
| final FileStatus[] wals = |
| SplitLogManager.getFileList(conf, Collections.singletonList(walsDir), null); |
| List<Path> splits = new ArrayList<>(); |
| if (ArrayUtils.isNotEmpty(wals)) { |
| for (FileStatus wal: wals) { |
| SplitWALResult splitWALResult = splitter.splitWAL(wal, null); |
| if (splitWALResult.isFinished()) { |
| WALSplitUtil.archive(wal.getPath(), splitWALResult.isCorrupt(), archiveDir, walFS, conf); |
| if (splitter.outputSink.splits != null) { |
| splits.addAll(splitter.outputSink.splits); |
| } |
| } |
| } |
| } |
| if (!walFS.delete(walsDir, true)) { |
| throw new IOException("Unable to delete src dir " + walsDir); |
| } |
| return splits; |
| } |
| |
| /** |
| * Data structure returned as result by #splitWAL(FileStatus, CancelableProgressable). |
| * Test {@link #isFinished()} to see if we are done with the WAL and {@link #isCorrupt()} for if |
| * the WAL is corrupt. |
| */ |
| static final class SplitWALResult { |
| private final boolean finished; |
| private final boolean corrupt; |
| |
| private SplitWALResult(boolean finished, boolean corrupt) { |
| this.finished = finished; |
| this.corrupt = corrupt; |
| } |
| |
| public boolean isFinished() { |
| return finished; |
| } |
| |
| public boolean isCorrupt() { |
| return corrupt; |
| } |
| } |
| |
| /** |
| * Setup the output sinks and entry buffers ahead of splitting WAL. |
| */ |
| private void createOutputSinkAndEntryBuffers() { |
| PipelineController controller = new PipelineController(); |
| if (this.hfile) { |
| this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize); |
| this.outputSink = new BoundedRecoveredHFilesOutputSink(this, controller, |
| this.entryBuffers, this.numWriterThreads); |
| } else if (this.splitWriterCreationBounded) { |
| this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize); |
| this.outputSink = new BoundedRecoveredEditsOutputSink(this, controller, |
| this.entryBuffers, this.numWriterThreads); |
| } else { |
| this.entryBuffers = new EntryBuffers(controller, this.bufferSize); |
| this.outputSink = new RecoveredEditsOutputSink(this, controller, |
| this.entryBuffers, this.numWriterThreads); |
| } |
| } |
| |
| /** |
| * WAL splitting implementation, splits one WAL file. |
| * @param walStatus should be for an actual WAL file. |
| */ |
| SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) throws IOException { |
| Path wal = walStatus.getPath(); |
| Preconditions.checkArgument(walStatus.isFile(), "Not a regular file " + wal.toString()); |
| boolean corrupt = false; |
| int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); |
| boolean outputSinkStarted = false; |
| boolean cancelled = false; |
| int editsCount = 0; |
| int editsSkipped = 0; |
| MonitoredTask status = |
| TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area."); |
| status.enableStatusJournal(true); |
| Reader walReader = null; |
| this.fileBeingSplit = walStatus; |
| long startTS = EnvironmentEdgeManager.currentTime(); |
| long length = walStatus.getLen(); |
| String lengthStr = StringUtils.humanSize(length); |
| createOutputSinkAndEntryBuffers(); |
| try { |
| String logStr = "Splitting " + wal + ", size=" + lengthStr + " (" + length + "bytes)"; |
| LOG.info(logStr); |
| status.setStatus(logStr); |
| if (cancel != null && !cancel.progress()) { |
| cancelled = true; |
| return new SplitWALResult(false, corrupt); |
| } |
| walReader = getReader(walStatus, this.skipErrors, cancel); |
| if (walReader == null) { |
| LOG.warn("Nothing in {}; empty?", wal); |
| return new SplitWALResult(true, corrupt); |
| } |
| LOG.info("Open {} took {}ms", wal, EnvironmentEdgeManager.currentTime() - startTS); |
| int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); |
| int numOpenedFilesLastCheck = 0; |
| outputSink.setReporter(cancel); |
| outputSink.setStatus(status); |
| outputSink.startWriterThreads(); |
| outputSinkStarted = true; |
| Entry entry; |
| startTS = EnvironmentEdgeManager.currentTime(); |
| while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) { |
| byte[] region = entry.getKey().getEncodedRegionName(); |
| String encodedRegionNameAsStr = Bytes.toString(region); |
| Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); |
| if (lastFlushedSequenceId == null) { |
| if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(), |
| encodedRegionNameAsStr))) { |
| // The region directory itself is not present in the FS. This indicates that |
| // the region/table is already removed. We can just skip all the edits for this |
| // region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits |
| // will get skipped by the seqId check below. |
| // See more details at https://issues.apache.org/jira/browse/HBASE-24189 |
| LOG.info("{} no longer in filesystem; skipping all edits.", encodedRegionNameAsStr); |
| lastFlushedSequenceId = Long.MAX_VALUE; |
| } else { |
| if (sequenceIdChecker != null) { |
| RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); |
| Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); |
| for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) { |
| maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), |
| storeSeqId.getSequenceId()); |
| } |
| regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); |
| lastFlushedSequenceId = ids.getLastFlushedSequenceId(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Last flushed sequenceid for " + encodedRegionNameAsStr + ": " |
| + TextFormat.shortDebugString(ids)); |
| } |
| } |
| if (lastFlushedSequenceId == null) { |
| lastFlushedSequenceId = -1L; |
| } |
| } |
| lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId); |
| } |
| editsCount++; |
| if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) { |
| editsSkipped++; |
| continue; |
| } |
| // Don't send Compaction/Close/Open region events to recovered edit type sinks. |
| if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { |
| editsSkipped++; |
| continue; |
| } |
| entryBuffers.appendEntry(entry); |
| int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck; |
| // If sufficient edits have passed, check if we should report progress. |
| if (editsCount % interval == 0 |
| || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) { |
| numOpenedFilesLastCheck = this.getNumOpenWriters(); |
| String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits())) |
| + " edits, skipped " + editsSkipped + " edits."; |
| status.setStatus("Split " + countsStr); |
| if (cancel != null && !cancel.progress()) { |
| cancelled = true; |
| return new SplitWALResult(false, corrupt); |
| } |
| } |
| } |
| } catch (InterruptedException ie) { |
| IOException iie = new InterruptedIOException(); |
| iie.initCause(ie); |
| throw iie; |
| } catch (CorruptedLogFileException e) { |
| LOG.warn("Could not parse, corrupt WAL={}", wal, e); |
| // If splitLogWorkerCoordination, then its old-school zk-coordinated splitting so update |
| // zk. Otherwise, it is the newer procedure-based WAL split which has no zk component. |
| if (this.splitLogWorkerCoordination != null) { |
| // Some tests pass in a csm of null. |
| splitLogWorkerCoordination.markCorrupted(walRootDir, wal.getName(), walFS); |
| } |
| corrupt = true; |
| } catch (IOException e) { |
| e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; |
| throw e; |
| } finally { |
| final String log = "Finishing writing output for " + wal + " so closing down"; |
| LOG.debug(log); |
| status.setStatus(log); |
| try { |
| if (null != walReader) { |
| walReader.close(); |
| } |
| } catch (IOException exception) { |
| LOG.warn("Could not close {} reader", wal, exception); |
| } |
| try { |
| if (outputSinkStarted) { |
| // Set cancelled to true as the immediate following statement will reset its value. |
| // If close() throws an exception, cancelled will have the right value |
| cancelled = true; |
| cancelled = outputSink.close() == null; |
| } |
| } finally { |
| long processCost = EnvironmentEdgeManager.currentTime() - startTS; |
| // See if length got updated post lease recovery |
| String msg = "Processed " + editsCount + " edits across " + |
| outputSink.getNumberOfRecoveredRegions() + " Regions in " + processCost + |
| " ms; skipped=" + editsSkipped + "; WAL=" + wal + ", size=" + lengthStr + |
| ", length=" + length + ", corrupted=" + corrupt + ", cancelled=" + cancelled; |
| LOG.info(msg); |
| status.markComplete(msg); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Completed split of {}, journal: {}", wal, status.prettyPrintJournal()); |
| } |
| } |
| } |
| return new SplitWALResult(!cancelled, corrupt); |
| } |
| |
| private boolean isRegionDirPresentUnderRoot(TableName tn, String region) throws IOException { |
| return this.rootFS.exists(CommonFSUtils.getRegionDir(this.rootDir, tn, region)); |
| } |
| |
| /** |
| * Create a new {@link Reader} for reading logs to split. |
| * @return Returns null if file has length zero or file can't be found. |
| */ |
| protected Reader getReader(FileStatus walStatus, boolean skipErrors, CancelableProgressable cancel) |
| throws IOException, CorruptedLogFileException { |
| Path path = walStatus.getPath(); |
| long length = walStatus.getLen(); |
| Reader in; |
| |
| // Check for possibly empty file. With appends, currently Hadoop reports a |
| // zero length even if the file has been sync'd. Revisit if HDFS-376 or |
| // HDFS-878 is committed. |
| if (length <= 0) { |
| LOG.warn("File {} might be still open, length is 0", path); |
| } |
| |
| try { |
| RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, cancel); |
| try { |
| in = getReader(path, cancel); |
| } catch (EOFException e) { |
| if (length <= 0) { |
| // TODO should we ignore an empty, not-last log file if skip.errors |
| // is false? Either way, the caller should decide what to do. E.g. |
| // ignore if this is the last log in sequence. |
| // TODO is this scenario still possible if the log has been |
| // recovered (i.e. closed) |
| LOG.warn("Could not open {} for reading. File is empty", path, e); |
| } |
| // EOFException being ignored |
| return null; |
| } |
| } catch (IOException e) { |
| if (e instanceof FileNotFoundException) { |
| // A wal file may not exist anymore. Nothing can be recovered so move on |
| LOG.warn("File {} does not exist anymore", path, e); |
| return null; |
| } |
| if (!skipErrors || e instanceof InterruptedIOException) { |
| throw e; // Don't mark the file corrupted if interrupted, or not skipErrors |
| } |
| throw new CorruptedLogFileException("skipErrors=true; could not open " + path + |
| ", skipping", e); |
| } |
| return in; |
| } |
| |
| private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) |
| throws CorruptedLogFileException, IOException { |
| try { |
| return in.next(); |
| } catch (EOFException eof) { |
| // truncated files are expected if a RS crashes (see HBASE-2643) |
| LOG.info("EOF from {}; continuing.", path); |
| return null; |
| } catch (IOException e) { |
| // If the IOE resulted from bad file format, |
| // then this problem is idempotent and retrying won't help |
| if (e.getCause() != null && (e.getCause() instanceof ParseException |
| || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { |
| LOG.warn("Parse exception from {}; continuing", path, e); |
| return null; |
| } |
| if (!skipErrors) { |
| throw e; |
| } |
| throw new CorruptedLogFileException("skipErrors=true Ignoring exception" |
| + " while parsing wal " + path + ". Marking as corrupted", e); |
| } |
| } |
| |
| /** |
| * Create a new {@link WALProvider.Writer} for writing log splits. |
| * @return a new Writer instance, caller should close |
| */ |
| protected WALProvider.Writer createWriter(Path logfile) throws IOException { |
| return walFactory.createRecoveredEditsWriter(walFS, logfile); |
| } |
| |
| /** |
| * Create a new {@link Reader} for reading logs to split. |
| * @return new Reader instance, caller should close |
| */ |
| private Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { |
| return walFactory.createReader(walFS, curLogFile, reporter); |
| } |
| |
| /** |
| * Get current open writers |
| */ |
| private int getNumOpenWriters() { |
| int result = 0; |
| if (this.outputSink != null) { |
| result += this.outputSink.getNumOpenWriters(); |
| } |
| return result; |
| } |
| |
| /** |
| * Contains some methods to control WAL-entries producer / consumer interactions |
| */ |
| public static class PipelineController { |
| // If an exception is thrown by one of the other threads, it will be |
| // stored here. |
| AtomicReference<Throwable> thrown = new AtomicReference<>(); |
| |
| // Wait/notify for when data has been produced by the writer thread, |
| // consumed by the reader thread, or an exception occurred |
| final Object dataAvailable = new Object(); |
| |
| void writerThreadError(Throwable t) { |
| thrown.compareAndSet(null, t); |
| } |
| |
| /** |
| * Check for errors in the writer threads. If any is found, rethrow it. |
| */ |
| void checkForErrors() throws IOException { |
| Throwable thrown = this.thrown.get(); |
| if (thrown == null) { |
| return; |
| } |
| if (thrown instanceof IOException) { |
| throw new IOException(thrown); |
| } else { |
| throw new RuntimeException(thrown); |
| } |
| } |
| } |
| |
| static class CorruptedLogFileException extends Exception { |
| private static final long serialVersionUID = 1L; |
| |
| CorruptedLogFileException(String s) { |
| super(s); |
| } |
| |
| /** |
| * CorruptedLogFileException with cause |
| * |
| * @param message the message for this exception |
| * @param cause the cause for this exception |
| */ |
| CorruptedLogFileException(String message, Throwable cause) { |
| super(message, cause); |
| } |
| } |
| } |