| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs.server.namenode; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.BufferedInputStream; |
| import java.io.EOFException; |
| import java.io.DataInputStream; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.server.common.Storage; |
| import org.apache.hadoop.io.IOUtils; |
| import com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * An implementation of the abstract class {@link EditLogInputStream}, which |
| * reads edits from a local file. |
| */ |
| public class EditLogFileInputStream extends EditLogInputStream { |
| private final File file; |
| private final FileInputStream fStream; |
| final private long firstTxId; |
| final private long lastTxId; |
| private final int logVersion; |
| private final FSEditLogOp.Reader reader; |
| private final FSEditLogLoader.PositionTrackingInputStream tracker; |
| private final boolean isInProgress; |
| |
| /** |
| * Open an EditLogInputStream for the given file. |
| * The file is pretransactional, so has no txids |
| * @param name filename to open |
| * @throws LogHeaderCorruptException if the header is either missing or |
| * appears to be corrupt/truncated |
| * @throws IOException if an actual IO error occurs while reading the |
| * header |
| */ |
| EditLogFileInputStream(File name) |
| throws LogHeaderCorruptException, IOException { |
| this(name, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, false); |
| } |
| |
| /** |
| * Open an EditLogInputStream for the given file. |
| * @param name filename to open |
| * @param firstTxId first transaction found in file |
| * @param lastTxId last transaction id found in file |
| * @throws LogHeaderCorruptException if the header is either missing or |
| * appears to be corrupt/truncated |
| * @throws IOException if an actual IO error occurs while reading the |
| * header |
| */ |
| public EditLogFileInputStream(File name, long firstTxId, long lastTxId, |
| boolean isInProgress) |
| throws LogHeaderCorruptException, IOException { |
| file = name; |
| fStream = new FileInputStream(name); |
| |
| BufferedInputStream bin = new BufferedInputStream(fStream); |
| tracker = new FSEditLogLoader.PositionTrackingInputStream(bin); |
| DataInputStream in = new DataInputStream(tracker); |
| |
| try { |
| logVersion = readLogVersion(in); |
| } catch (EOFException eofe) { |
| throw new LogHeaderCorruptException("No header found in log"); |
| } |
| |
| reader = new FSEditLogOp.Reader(in, tracker, logVersion); |
| this.firstTxId = firstTxId; |
| this.lastTxId = lastTxId; |
| this.isInProgress = isInProgress; |
| } |
| |
| @Override |
| public long getFirstTxId() throws IOException { |
| return firstTxId; |
| } |
| |
| @Override |
| public long getLastTxId() throws IOException { |
| return lastTxId; |
| } |
| |
| @Override |
| public String getName() { |
| return file.getPath(); |
| } |
| |
| @Override |
| protected FSEditLogOp nextOp() throws IOException { |
| FSEditLogOp op = reader.readOp(false); |
| if ((op != null) && (op.hasTransactionId())) { |
| long txId = op.getTransactionId(); |
| if ((txId >= lastTxId) && |
| (lastTxId != HdfsConstants.INVALID_TXID)) { |
| // |
| // Sometimes, the NameNode crashes while it's writing to the |
| // edit log. In that case, you can end up with an unfinalized edit log |
| // which has some garbage at the end. |
| // JournalManager#recoverUnfinalizedSegments will finalize these |
| // unfinished edit logs, giving them a defined final transaction |
| // ID. Then they will be renamed, so that any subsequent |
| // readers will have this information. |
| // |
| // Since there may be garbage at the end of these "cleaned up" |
| // logs, we want to be sure to skip it here if we've read everything |
| // we were supposed to read out of the stream. |
| // So we force an EOF on all subsequent reads. |
| // |
| long skipAmt = file.length() - tracker.getPos(); |
| if (skipAmt > 0) { |
| FSImage.LOG.warn("skipping " + skipAmt + " bytes at the end " + |
| "of edit log '" + getName() + "': reached txid " + txId + |
| " out of " + lastTxId); |
| tracker.skip(skipAmt); |
| } |
| } |
| } |
| return op; |
| } |
| |
| @Override |
| protected FSEditLogOp nextValidOp() { |
| try { |
| return reader.readOp(true); |
| } catch (IOException e) { |
| return null; |
| } |
| } |
| |
| @Override |
| public int getVersion() throws IOException { |
| return logVersion; |
| } |
| |
| @Override |
| public long getPosition() { |
| return tracker.getPos(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| fStream.close(); |
| } |
| |
| @Override |
| public long length() throws IOException { |
| // file size + size of both buffers |
| return file.length(); |
| } |
| |
| @Override |
| public boolean isInProgress() { |
| return isInProgress; |
| } |
| |
| @Override |
| public String toString() { |
| return getName(); |
| } |
| |
| static FSEditLogLoader.EditLogValidation validateEditLog(File file) throws IOException { |
| EditLogFileInputStream in; |
| try { |
| in = new EditLogFileInputStream(file); |
| } catch (LogHeaderCorruptException corrupt) { |
| // If the header is malformed or the wrong value, this indicates a corruption |
| FSImage.LOG.warn("Log at " + file + " has no valid header", |
| corrupt); |
| return new FSEditLogLoader.EditLogValidation(0, |
| HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, true); |
| } |
| |
| try { |
| return FSEditLogLoader.validateEditLog(in); |
| } finally { |
| IOUtils.closeStream(in); |
| } |
| } |
| |
| /** |
| * Read the header of fsedit log |
| * @param in fsedit stream |
| * @return the edit log version number |
| * @throws IOException if error occurs |
| */ |
| @VisibleForTesting |
| static int readLogVersion(DataInputStream in) |
| throws IOException, LogHeaderCorruptException { |
| int logVersion; |
| try { |
| logVersion = in.readInt(); |
| } catch (EOFException eofe) { |
| throw new LogHeaderCorruptException( |
| "Reached EOF when reading log header"); |
| } |
| if (logVersion < HdfsConstants.LAYOUT_VERSION || // future version |
| logVersion > Storage.LAST_UPGRADABLE_LAYOUT_VERSION) { // unsupported |
| throw new LogHeaderCorruptException( |
| "Unexpected version of the file system log file: " |
| + logVersion + ". Current version = " |
| + HdfsConstants.LAYOUT_VERSION + "."); |
| } |
| return logVersion; |
| } |
| |
| /** |
| * Exception indicating that the header of an edits log file is |
| * corrupted. This can be because the header is not present, |
| * or because the header data is invalid (eg claims to be |
| * over a newer version than the running NameNode) |
| */ |
| static class LogHeaderCorruptException extends IOException { |
| private static final long serialVersionUID = 1L; |
| |
| private LogHeaderCorruptException(String msg) { |
| super(msg); |
| } |
| } |
| } |