blob: fbca0e6c4cf37d1d8a1e888dd188c3e2c4c5742f [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.ipc.RemoteException;
/**
* Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
* iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
* dequeues it and starts reading from the next.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entry> {
private static final Log LOG = LogFactory.getLog(WALEntryStream.class);
private Reader reader;
private Path currentPath;
// cache of next entry for hasNext()
private Entry currentEntry;
// position after reading current entry
private long currentPosition = 0;
private final ReplicationSourceLogQueue logQueue;
private final String walGroupId;
private FileSystem fs;
private Configuration conf;
private MetricsSource metrics;
/**
* Create an entry stream over the given queue
* @param logQueue the queue of WAL paths
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
* @param metrics replication metrics
* @param walGroupId wal prefix
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
MetricsSource metrics, String walGroupId) {
this(logQueue, fs, conf, 0, metrics, walGroupId);
}
/**
* Create an entry stream over the given queue at the given start position
* @param logQueue the queue of WAL paths
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
* @param conf the {@link Configuration} to use to create {@link Reader} for this stream
* @param startPosition the position in the first WAL to start reading at
* @param metrics the replication metrics
* @param walGroupId wal prefix
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
long startPosition, MetricsSource metrics, String walGroupId) {
this.logQueue = logQueue;
this.fs = fs;
this.conf = conf;
this.currentPosition = startPosition;
this.metrics = metrics;
this.walGroupId = walGroupId;
}
/**
* @return true if there is another WAL {@link Entry}
* @throws WALEntryStreamRuntimeException if there was an Exception while reading
*/
@Override
public boolean hasNext() {
if (currentEntry == null) {
try {
tryAdvanceEntry();
} catch (Exception e) {
throw new WALEntryStreamRuntimeException(e);
}
}
return currentEntry != null;
}
/**
* @return the next WAL entry in this stream
* @throws WALEntryStreamRuntimeException if there was an IOException
* @throws NoSuchElementException if no more entries in the stream.
*/
@Override
public Entry next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
Entry save = currentEntry;
currentEntry = null; // gets reloaded by hasNext()
return save;
}
/**
* Not supported.
*/
@Override
public void remove() {
throw new UnsupportedOperationException();
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws IOException {
closeReader();
}
/**
* @return the iterator over WAL entries in the queue.
*/
@Override
public Iterator<Entry> iterator() {
return this;
}
/**
* @return the position of the last Entry returned by next()
*/
public long getPosition() {
return currentPosition;
}
/**
* @return the {@link Path} of the current WAL
*/
public Path getCurrentPath() {
return currentPath;
}
private String getCurrentPathStat() {
StringBuilder sb = new StringBuilder();
if (currentPath != null) {
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
.append(currentPosition).append("\n");
} else {
sb.append("no replication ongoing, waiting for new log");
}
return sb.toString();
}
/**
* Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
* false)
* @throws IOException io exception while resetting the reader
*/
public void reset() throws IOException {
if (reader != null && currentPath != null) {
resetReader();
}
}
private void setPosition(long position) {
currentPosition = position;
}
private void setCurrentPath(Path path) {
this.currentPath = path;
}
private void tryAdvanceEntry() throws IOException {
if (checkReader()) {
readNextEntryAndSetPosition();
if (currentEntry == null) { // no more entries in this log file - see if log was rolled
if (logQueue.getQueue(walGroupId).size() > 1) { // log was rolled
// Before dequeueing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader,
// and a new log was enqueued while we were reading. See HBASE-6758
resetReader();
readNextEntryAndSetPosition();
if (currentEntry == null) {
if (checkAllBytesParsed()) { // now we're certain we're done with this log file
dequeueCurrentLog();
if (openNextLog()) {
readNextEntryAndSetPosition();
}
}
}
} // no other logs, we've simply hit the end of the current open log. Do nothing
}
}
// do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
}
// HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
private boolean checkAllBytesParsed() throws IOException {
// -1 means the wal wasn't closed cleanly.
final long trailerSize = currentTrailerSize();
FileStatus stat = null;
try {
stat = fs.getFileStatus(this.currentPath);
} catch (IOException exception) {
LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it "
+ (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat());
metrics.incrUnknownFileLengthForClosedWAL();
}
if (stat != null) {
if (trailerSize < 0) {
if (currentPosition < stat.getLen()) {
final long skippedBytes = stat.getLen() - currentPosition;
if (LOG.isDebugEnabled()) {
LOG.debug("Reached the end of WAL file '" + currentPath
+ "'. It was not closed cleanly, so we did not parse " + skippedBytes
+ " bytes of data. This is normally ok.");
}
metrics.incrUncleanlyClosedWALs();
metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
}
} else if (currentPosition + trailerSize < stat.getLen()) {
LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition
+ ", which is too far away from reported file length " + stat.getLen()
+ ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat());
setPosition(0);
resetReader();
metrics.incrRestartedWALReading();
metrics.incrRepeatedFileBytes(currentPosition);
return false;
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is "
+ (stat == null ? "N/A" : stat.getLen()));
}
metrics.incrCompletedWAL();
return true;
}
private void dequeueCurrentLog() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Reached the end of log " + currentPath);
}
closeReader();
logQueue.remove(walGroupId);
setCurrentPath(null);
setPosition(0);
}
private void readNextEntryAndSetPosition() throws IOException {
Entry readEntry = reader.next();
long readerPos = reader.getPosition();
if (readEntry != null) {
metrics.incrLogEditsRead();
metrics.incrLogReadInBytes(readerPos - currentPosition);
}
currentEntry = readEntry; // could be null
setPosition(readerPos);
}
private void closeReader() throws IOException {
if (reader != null) {
reader.close();
reader = null;
}
}
// if we don't have a reader, open a reader on the next log
private boolean checkReader() throws IOException {
if (reader == null) {
return openNextLog();
}
return true;
}
// open a reader on the next log in queue
private boolean openNextLog() throws IOException {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
Path nextPath = queue.peek();
if (nextPath != null) {
openReader(nextPath);
if (reader != null) {
return true;
}
}
return false;
}
private Path getArchivedLog(Path path) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
} else {
LOG.error("Couldn't locate log: " + path);
return path;
}
}
private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
// If the log was archived, continue reading from there
Path archivedLog = getArchivedLog(path);
if (!path.equals(archivedLog)) {
openReader(archivedLog);
} else {
throw fnfe;
}
}
private void openReader(Path path) throws IOException {
try {
// Detect if this is a new file, if so get a new reader else
// reset the current reader so that we see the new data
if (reader == null || !getCurrentPath().equals(path)) {
closeReader();
reader = WALFactory.createReader(fs, path, conf);
seek();
setCurrentPath(path);
} else {
resetReader();
}
} catch (FileNotFoundException fnfe) {
handleFileNotFound(path, fnfe);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
if (!(ioe instanceof FileNotFoundException)) {
throw ioe;
}
handleFileNotFound(path, (FileNotFoundException)ioe);
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn("Try to recover the WAL lease " + path, lnre);
recoverLease(conf, path);
reader = null;
} catch (NullPointerException npe) {
// Workaround for race condition in HDFS-4380
// which throws a NPE if we open a file before any data node has the most recent block
// Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
LOG.warn("Got NPE opening reader, will retry.");
reader = null;
}
}
// For HBASE-15019
private void recoverLease(final Configuration conf, final Path path) {
try {
final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
@Override
public boolean progress() {
LOG.debug("recover WAL lease: " + path);
return true;
}
});
} catch (IOException e) {
LOG.warn("unable to recover lease for WAL: " + path, e);
}
}
private void resetReader() throws IOException {
try {
reader.reset();
seek();
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
Path archivedLog = getArchivedLog(currentPath);
if (!currentPath.equals(archivedLog)) {
openReader(archivedLog);
} else {
throw fnfe;
}
} catch (NullPointerException npe) {
throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
}
}
private void seek() throws IOException {
if (currentPosition != 0) {
reader.seek(currentPosition);
}
}
private long currentTrailerSize() {
long size = -1L;
if (reader instanceof ProtobufLogReader) {
final ProtobufLogReader pblr = (ProtobufLogReader) reader;
size = pblr.trailerSize();
}
return size;
}
@InterfaceAudience.Private
public static class WALEntryStreamRuntimeException extends RuntimeException {
private static final long serialVersionUID = -6298201811259982568L;
public WALEntryStreamRuntimeException(Exception e) {
super(e);
}
}
}