blob: 0986129b7048d2d563fc820797a918a0eea5c545 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AppendOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ConcatDeleteOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CreateSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DisallowSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogSegmentOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeFinalizeOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeStartOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaByStorageTypeOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* FSEditLog maintains a log of the namespace modifications.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FSEditLog implements LogsPurgeable {
public static final Log LOG = LogFactory.getLog(FSEditLog.class);
/**
* State machine for edit log.
*
* In a non-HA setup:
*
* The log starts in UNINITIALIZED state upon construction. Once it's
* initialized, it is usually in IN_SEGMENT state, indicating that edits may
* be written. In the middle of a roll, or while saving the namespace, it
* briefly enters the BETWEEN_LOG_SEGMENTS state, indicating that the previous
* segment has been closed, but the new one has not yet been opened.
*
* In an HA setup:
*
* The log starts in UNINITIALIZED state upon construction. Once it's
* initialized, it sits in the OPEN_FOR_READING state the entire time that the
* NN is in standby. Upon the NN transition to active, the log will be CLOSED,
* and then move to being BETWEEN_LOG_SEGMENTS, much as if the NN had just
* started up, and then will move to IN_SEGMENT so it can begin writing to the
* log. The log states will then revert to behaving as they do in a non-HA
* setup.
*/
private enum State {
UNINITIALIZED,
BETWEEN_LOG_SEGMENTS,
IN_SEGMENT,
OPEN_FOR_READING,
CLOSED;
}
private State state = State.UNINITIALIZED;
//initialize
private JournalSet journalSet = null;
private EditLogOutputStream editLogStream = null;
// a monotonically increasing counter that represents transactionIds.
// All of the threads which update/increment txid are synchronized,
// so make txid volatile instead of AtomicLong.
private volatile long txid = 0;
// stores the last synced transactionId.
private long synctxid = 0;
// the first txid of the log that's currently open for writing.
// If this value is N, we are currently writing to edits_inprogress_N
private volatile long curSegmentTxId = HdfsServerConstants.INVALID_TXID;
// the time of printing the statistics to the log file.
private long lastPrintTime;
// is a sync currently running?
private volatile boolean isSyncRunning;
// is an automatic sync scheduled?
private volatile boolean isAutoSyncScheduled = false;
// these are statistics counters.
private long numTransactions; // number of transactions
private final AtomicLong numTransactionsBatchedInSync = new AtomicLong();
private long totalTimeTransactions; // total time for all transactions
private NameNodeMetrics metrics;
private final NNStorage storage;
private final Configuration conf;
private final List<URI> editsDirs;
protected final OpInstanceCache cache = new OpInstanceCache();
/**
* The edit directories that are shared between primary and secondary.
*/
private final List<URI> sharedEditsDirs;
/**
* Take this lock when adding journals to or closing the JournalSet. Allows
* us to ensure that the JournalSet isn't closed or updated underneath us
* in selectInputStreams().
*/
private final Object journalSetLock = new Object();
private static class TransactionId {
public long txid;
TransactionId(long value) {
this.txid = value;
}
}
// stores the most current transactionId of this thread.
private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() {
@Override
protected synchronized TransactionId initialValue() {
return new TransactionId(Long.MAX_VALUE);
}
};
static FSEditLog newInstance(Configuration conf, NNStorage storage,
List<URI> editsDirs) {
boolean asyncEditLogging = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT);
LOG.info("Edit logging is async:" + asyncEditLogging);
return asyncEditLogging
? new FSEditLogAsync(conf, storage, editsDirs)
: new FSEditLog(conf, storage, editsDirs);
}
/**
* Constructor for FSEditLog. Underlying journals are constructed, but
* no streams are opened until open() is called.
*
* @param conf The namenode configuration
* @param storage Storage object used by namenode
* @param editsDirs List of journals to use
*/
FSEditLog(Configuration conf, NNStorage storage, List<URI> editsDirs) {
isSyncRunning = false;
this.conf = conf;
this.storage = storage;
metrics = NameNode.getNameNodeMetrics();
lastPrintTime = monotonicNow();
// If this list is empty, an error will be thrown on first use
// of the editlog, as no journals will exist
this.editsDirs = Lists.newArrayList(editsDirs);
this.sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
}
public synchronized void initJournalsForWrite() {
Preconditions.checkState(state == State.UNINITIALIZED ||
state == State.CLOSED, "Unexpected state: %s", state);
initJournals(this.editsDirs);
state = State.BETWEEN_LOG_SEGMENTS;
}
public synchronized void initSharedJournalsForRead() {
if (state == State.OPEN_FOR_READING) {
LOG.warn("Initializing shared journals for READ, already open for READ",
new Exception());
return;
}
Preconditions.checkState(state == State.UNINITIALIZED ||
state == State.CLOSED);
initJournals(this.sharedEditsDirs);
state = State.OPEN_FOR_READING;
}
private synchronized void initJournals(List<URI> dirs) {
int minimumRedundantJournals = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
synchronized(journalSetLock) {
journalSet = new JournalSet(minimumRedundantJournals);
for (URI u : dirs) {
boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
.contains(u);
if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
StorageDirectory sd = storage.getStorageDirectory(u);
if (sd != null) {
journalSet.add(new FileJournalManager(conf, sd, storage),
required, sharedEditsDirs.contains(u));
}
} else {
journalSet.add(createJournal(u), required,
sharedEditsDirs.contains(u));
}
}
}
if (journalSet.isEmpty()) {
LOG.error("No edits directories configured!");
}
}
/**
* Get the list of URIs the editlog is using for storage
* @return collection of URIs in use by the edit log
*/
Collection<URI> getEditURIs() {
return editsDirs;
}
/**
* Initialize the output stream for logging, opening the first
* log segment.
*/
synchronized void openForWrite(int layoutVersion) throws IOException {
Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
"Bad state: %s", state);
long segmentTxId = getLastWrittenTxId() + 1;
// Safety check: we should never start a segment if there are
// newer txids readable.
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
journalSet.selectInputStreams(streams, segmentTxId, true, false);
if (!streams.isEmpty()) {
String error = String.format("Cannot start writing at txid %s " +
"when there is a stream available for read: %s",
segmentTxId, streams.get(0));
IOUtils.cleanup(LOG, streams.toArray(new EditLogInputStream[0]));
throw new IllegalStateException(error);
}
startLogSegment(segmentTxId, true, layoutVersion);
assert state == State.IN_SEGMENT : "Bad state: " + state;
}
/**
* @return true if the log is currently open in write mode, regardless
* of whether it actually has an open segment.
*/
synchronized boolean isOpenForWrite() {
return state == State.IN_SEGMENT ||
state == State.BETWEEN_LOG_SEGMENTS;
}
/**
* Return true if the log is currently open in write mode.
* This method is not synchronized and must be used only for metrics.
* @return true if the log is currently open in write mode, regardless
* of whether it actually has an open segment.
*/
boolean isOpenForWriteWithoutLock() {
return state == State.IN_SEGMENT ||
state == State.BETWEEN_LOG_SEGMENTS;
}
/**
* @return true if the log is open in write mode and has a segment open
* ready to take edits.
*/
synchronized boolean isSegmentOpen() {
return state == State.IN_SEGMENT;
}
/**
* Return true the state is IN_SEGMENT.
* This method is not synchronized and must be used only for metrics.
* @return true if the log is open in write mode and has a segment open
* ready to take edits.
*/
boolean isSegmentOpenWithoutLock() {
return state == State.IN_SEGMENT;
}
/**
* @return true if the log is open in read mode.
*/
public synchronized boolean isOpenForRead() {
return state == State.OPEN_FOR_READING;
}
/**
* Shutdown the file store.
*/
synchronized void close() {
if (state == State.CLOSED) {
LOG.debug("Closing log when already closed");
return;
}
try {
if (state == State.IN_SEGMENT) {
assert editLogStream != null;
waitForSyncToFinish();
endCurrentLogSegment(true);
}
} finally {
if (journalSet != null && !journalSet.isEmpty()) {
try {
synchronized(journalSetLock) {
journalSet.close();
}
} catch (IOException ioe) {
LOG.warn("Error closing journalSet", ioe);
}
}
state = State.CLOSED;
}
}
/**
* Format all configured journals which are not file-based.
*
* File-based journals are skipped, since they are formatted by the
* Storage format code.
*/
synchronized void formatNonFileJournals(NamespaceInfo nsInfo) throws IOException {
Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
"Bad state: %s", state);
for (JournalManager jm : journalSet.getJournalManagers()) {
if (!(jm instanceof FileJournalManager)) {
jm.format(nsInfo);
}
}
}
synchronized List<FormatConfirmable> getFormatConfirmables() {
Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
"Bad state: %s", state);
List<FormatConfirmable> ret = Lists.newArrayList();
for (final JournalManager jm : journalSet.getJournalManagers()) {
// The FJMs are confirmed separately since they are also
// StorageDirectories
if (!(jm instanceof FileJournalManager)) {
ret.add(jm);
}
}
return ret;
}
/**
* Write an operation to the edit log.
* <p/>
* Additionally, this will sync the edit log if required by the underlying
* edit stream's automatic sync policy (e.g. when the buffer is full, or
* if a time interval has elapsed).
*/
void logEdit(final FSEditLogOp op) {
boolean needsSync = false;
synchronized (this) {
assert isOpenForWrite() :
"bad state: " + state;
// wait if an automatic sync is scheduled
waitIfAutoSyncScheduled();
// check if it is time to schedule an automatic sync
needsSync = doEditTransaction(op);
if (needsSync) {
isAutoSyncScheduled = true;
}
}
// Sync the log if an automatic sync is required.
if (needsSync) {
logSync();
}
}
synchronized boolean doEditTransaction(final FSEditLogOp op) {
long start = beginTransaction();
op.setTransactionId(txid);
try {
editLogStream.write(op);
} catch (IOException ex) {
// All journals failed, it is handled in logSync.
} finally {
op.reset();
}
endTransaction(start);
return shouldForceSync();
}
/**
* Wait if an automatic sync is scheduled
*/
synchronized void waitIfAutoSyncScheduled() {
try {
while (isAutoSyncScheduled) {
this.wait(1000);
}
} catch (InterruptedException e) {
}
}
/**
* Signal that an automatic sync scheduling is done if it is scheduled
*/
synchronized void doneWithAutoSyncScheduling() {
if (isAutoSyncScheduled) {
isAutoSyncScheduled = false;
notifyAll();
}
}
/**
* Check if should automatically sync buffered edits to
* persistent store
*
* @return true if any of the edit stream says that it should sync
*/
private boolean shouldForceSync() {
return editLogStream.shouldForceSync();
}
private long beginTransaction() {
assert Thread.holdsLock(this);
// get a new transactionId
txid++;
//
// record the transactionId when new data was written to the edits log
//
TransactionId id = myTransactionId.get();
id.txid = txid;
return monotonicNow();
}
private void endTransaction(long start) {
assert Thread.holdsLock(this);
// update statistics
long end = monotonicNow();
numTransactions++;
totalTimeTransactions += (end-start);
if (metrics != null) // Metrics is non-null only when used inside name node
metrics.addTransaction(end-start);
}
/**
* Return the transaction ID of the last transaction written to the log.
*/
public synchronized long getLastWrittenTxId() {
return txid;
}
/**
* Return the transaction ID of the last transaction written to the log.
* This method is not synchronized and must be used only for metrics.
* @return The transaction ID of the last transaction written to the log
*/
long getLastWrittenTxIdWithoutLock() {
return txid;
}
/**
* @return the first transaction ID in the current log segment
*/
@VisibleForTesting
public synchronized long getCurSegmentTxId() {
Preconditions.checkState(isSegmentOpen(),
"Bad state: %s", state);
return curSegmentTxId;
}
/**
* Return the first transaction ID in the current log segment.
* This method is not synchronized and must be used only for metrics.
* @return The first transaction ID in the current log segment
*/
long getCurSegmentTxIdWithoutLock() {
return curSegmentTxId;
}
/**
* Set the transaction ID to use for the next transaction written.
*/
synchronized void setNextTxId(long nextTxId) {
Preconditions.checkArgument(synctxid <= txid &&
nextTxId >= txid,
"May not decrease txid." +
" synctxid=%s txid=%s nextTxId=%s",
synctxid, txid, nextTxId);
txid = nextTxId - 1;
}
/**
* Blocks until all ongoing edits have been synced to disk.
* This differs from logSync in that it waits for edits that have been
* written by other threads, not just edits from the calling thread.
*
* NOTE: this should be done while holding the FSNamesystem lock, or
* else more operations can start writing while this is in progress.
*/
void logSyncAll() {
// Make sure we're synced up to the most recent transaction ID.
long lastWrittenTxId = getLastWrittenTxId();
LOG.info("logSyncAll toSyncToTxId=" + lastWrittenTxId
+ " lastSyncedTxid=" + synctxid
+ " mostRecentTxid=" + txid);
logSync(lastWrittenTxId);
lastWrittenTxId = getLastWrittenTxId();
LOG.info("Done logSyncAll lastWrittenTxId=" + lastWrittenTxId
+ " lastSyncedTxid=" + synctxid
+ " mostRecentTxid=" + txid);
}
/**
* Sync all modifications done by this thread.
*
* The internal concurrency design of this class is as follows:
* - Log items are written synchronized into an in-memory buffer,
* and each assigned a transaction ID.
* - When a thread (client) would like to sync all of its edits, logSync()
* uses a ThreadLocal transaction ID to determine what edit number must
* be synced to.
* - The isSyncRunning volatile boolean tracks whether a sync is currently
* under progress.
*
* The data is double-buffered within each edit log implementation so that
* in-memory writing can occur in parallel with the on-disk writing.
*
* Each sync occurs in three steps:
* 1. synchronized, it swaps the double buffer and sets the isSyncRunning
* flag.
* 2. unsynchronized, it flushes the data to storage
* 3. synchronized, it resets the flag and notifies anyone waiting on the
* sync.
*
* The lack of synchronization on step 2 allows other threads to continue
* to write into the memory buffer while the sync is in progress.
* Because this step is unsynchronized, actions that need to avoid
* concurrency with sync() should be synchronized and also call
* waitForSyncToFinish() before assuming they are running alone.
*/
public void logSync() {
// Fetch the transactionId of this thread.
logSync(myTransactionId.get().txid);
}
protected void logSync(long mytxid) {
long syncStart = 0;
boolean sync = false;
long editsBatchedInSync = 0;
try {
EditLogOutputStream logStream = null;
synchronized (this) {
try {
printStatistics(false);
// if somebody is already syncing, then wait
while (mytxid > synctxid && isSyncRunning) {
try {
wait(1000);
} catch (InterruptedException ie) {
}
}
//
// If this transaction was already flushed, then nothing to do
//
if (mytxid <= synctxid) {
return;
}
// now, this thread will do the sync. track if other edits were
// included in the sync - ie. batched. if this is the only edit
// synced then the batched count is 0
editsBatchedInSync = txid - synctxid - 1;
syncStart = txid;
isSyncRunning = true;
sync = true;
// swap buffers
try {
if (journalSet.isEmpty()) {
throw new IOException("No journals available to flush");
}
editLogStream.setReadyToFlush();
} catch (IOException e) {
final String msg =
"Could not sync enough journals to persistent storage " +
"due to " + e.getMessage() + ". " +
"Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception());
synchronized(journalSetLock) {
IOUtils.cleanup(LOG, journalSet);
}
terminate(1, msg);
}
} finally {
// Prevent RuntimeException from blocking other log edit write
doneWithAutoSyncScheduling();
}
//editLogStream may become null,
//so store a local variable for flush.
logStream = editLogStream;
}
// do the sync
long start = monotonicNow();
try {
if (logStream != null) {
logStream.flush();
}
} catch (IOException ex) {
synchronized (this) {
final String msg =
"Could not sync enough journals to persistent storage. "
+ "Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception());
synchronized(journalSetLock) {
IOUtils.cleanup(LOG, journalSet);
}
terminate(1, msg);
}
}
long elapsed = monotonicNow() - start;
if (metrics != null) { // Metrics non-null only when used inside name node
metrics.addSync(elapsed);
metrics.incrTransactionsBatchedInSync(editsBatchedInSync);
numTransactionsBatchedInSync.addAndGet(editsBatchedInSync);
}
} finally {
// Prevent RuntimeException from blocking other log edit sync
synchronized (this) {
if (sync) {
synctxid = syncStart;
for (JournalManager jm : journalSet.getJournalManagers()) {
/**
* {@link FileJournalManager#lastReadableTxId} is only meaningful
* for file-based journals. Therefore the interface is not added to
* other types of {@link JournalManager}.
*/
if (jm instanceof FileJournalManager) {
((FileJournalManager)jm).setLastReadableTxId(syncStart);
}
}
isSyncRunning = false;
}
this.notifyAll();
}
}
}
//
// print statistics every 1 minute.
//
private void printStatistics(boolean force) {
long now = monotonicNow();
if (lastPrintTime + 60000 > now && !force) {
return;
}
lastPrintTime = now;
StringBuilder buf = new StringBuilder();
buf.append("Number of transactions: ");
buf.append(numTransactions);
buf.append(" Total time for transactions(ms): ");
buf.append(totalTimeTransactions);
buf.append(" Number of transactions batched in Syncs: ");
buf.append(numTransactionsBatchedInSync.get());
buf.append(" Number of syncs: ");
buf.append(editLogStream.getNumSync());
buf.append(" SyncTimes(ms): ");
buf.append(journalSet.getSyncTimes());
LOG.info(buf);
}
/** Record the RPC IDs if necessary */
private void logRpcIds(FSEditLogOp op, boolean toLogRpcIds) {
if (toLogRpcIds) {
op.setRpcClientId(Server.getClientId());
op.setRpcCallId(Server.getCallId());
}
}
public void logAppendFile(String path, INodeFile file, boolean newBlock,
boolean toLogRpcIds) {
FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
assert uc != null;
AppendOp op = AppendOp.getInstance(cache.get()).setPath(path)
.setClientName(uc.getClientName())
.setClientMachine(uc.getClientMachine())
.setNewBlock(newBlock);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add open lease record to edit log.
* Records the block locations of the last block.
*/
public void logOpenFile(String path, INodeFile newNode, boolean overwrite,
boolean toLogRpcIds) {
Preconditions.checkArgument(newNode.isUnderConstruction());
PermissionStatus permissions = newNode.getPermissionStatus();
AddOp op = AddOp.getInstance(cache.get())
.setInodeId(newNode.getId())
.setPath(path)
.setReplication(newNode.getFileReplication())
.setModificationTime(newNode.getModificationTime())
.setAccessTime(newNode.getAccessTime())
.setBlockSize(newNode.getPreferredBlockSize())
.setBlocks(newNode.getBlocks())
.setPermissionStatus(permissions)
.setClientName(newNode.getFileUnderConstructionFeature().getClientName())
.setClientMachine(
newNode.getFileUnderConstructionFeature().getClientMachine())
.setOverwrite(overwrite)
.setStoragePolicyId(newNode.getLocalStoragePolicyID());
AclFeature f = newNode.getAclFeature();
if (f != null) {
op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
}
XAttrFeature x = newNode.getXAttrFeature();
if (x != null) {
op.setXAttrs(x.getXAttrs());
}
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add close lease record to edit log.
*/
public void logCloseFile(String path, INodeFile newNode) {
CloseOp op = CloseOp.getInstance(cache.get())
.setPath(path)
.setReplication(newNode.getFileReplication())
.setModificationTime(newNode.getModificationTime())
.setAccessTime(newNode.getAccessTime())
.setBlockSize(newNode.getPreferredBlockSize())
.setBlocks(newNode.getBlocks())
.setPermissionStatus(newNode.getPermissionStatus());
logEdit(op);
}
public void logAddBlock(String path, INodeFile file) {
Preconditions.checkArgument(file.isUnderConstruction());
BlockInfo[] blocks = file.getBlocks();
Preconditions.checkState(blocks != null && blocks.length > 0);
BlockInfo pBlock = blocks.length > 1 ? blocks[blocks.length - 2] : null;
BlockInfo lastBlock = blocks[blocks.length - 1];
AddBlockOp op = AddBlockOp.getInstance(cache.get()).setPath(path)
.setPenultimateBlock(pBlock).setLastBlock(lastBlock);
logEdit(op);
}
public void logUpdateBlocks(String path, INodeFile file, boolean toLogRpcIds) {
Preconditions.checkArgument(file.isUnderConstruction());
UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
.setPath(path)
.setBlocks(file.getBlocks());
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add create directory record to edit log
*/
public void logMkDir(String path, INode newNode) {
PermissionStatus permissions = newNode.getPermissionStatus();
MkdirOp op = MkdirOp.getInstance(cache.get())
.setInodeId(newNode.getId())
.setPath(path)
.setTimestamp(newNode.getModificationTime())
.setPermissionStatus(permissions);
AclFeature f = newNode.getAclFeature();
if (f != null) {
op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
}
XAttrFeature x = newNode.getXAttrFeature();
if (x != null) {
op.setXAttrs(x.getXAttrs());
}
logEdit(op);
}
/**
* Add rename record to edit log.
*
* The destination should be the file name, not the destination directory.
* TODO: use String parameters until just before writing to disk
*/
void logRename(String src, String dst, long timestamp, boolean toLogRpcIds) {
RenameOldOp op = RenameOldOp.getInstance(cache.get())
.setSource(src)
.setDestination(dst)
.setTimestamp(timestamp);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add rename record to edit log.
*
* The destination should be the file name, not the destination directory.
*/
void logRename(String src, String dst, long timestamp, boolean toLogRpcIds,
Options.Rename... options) {
RenameOp op = RenameOp.getInstance(cache.get())
.setSource(src)
.setDestination(dst)
.setTimestamp(timestamp)
.setOptions(options);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add set replication record to edit log
*/
void logSetReplication(String src, short replication) {
SetReplicationOp op = SetReplicationOp.getInstance(cache.get())
.setPath(src)
.setReplication(replication);
logEdit(op);
}
/**
* Add set storage policy id record to edit log
*/
void logSetStoragePolicy(String src, byte policyId) {
SetStoragePolicyOp op = SetStoragePolicyOp.getInstance(cache.get())
.setPath(src).setPolicyId(policyId);
logEdit(op);
}
/** Add set namespace quota record to edit log
*
* @param src the string representation of the path to a directory
* @param nsQuota namespace quota
* @param dsQuota diskspace quota
*/
void logSetQuota(String src, long nsQuota, long dsQuota) {
SetQuotaOp op = SetQuotaOp.getInstance(cache.get())
.setSource(src)
.setNSQuota(nsQuota)
.setDSQuota(dsQuota);
logEdit(op);
}
/** Add set quota by storage type record to edit log */
void logSetQuotaByStorageType(String src, long dsQuota, StorageType type) {
SetQuotaByStorageTypeOp op = SetQuotaByStorageTypeOp.getInstance(cache.get())
.setSource(src)
.setQuotaByStorageType(dsQuota, type);
logEdit(op);
}
/** Add set permissions record to edit log */
void logSetPermissions(String src, FsPermission permissions) {
SetPermissionsOp op = SetPermissionsOp.getInstance(cache.get())
.setSource(src)
.setPermissions(permissions);
logEdit(op);
}
/** Add set owner record to edit log */
void logSetOwner(String src, String username, String groupname) {
SetOwnerOp op = SetOwnerOp.getInstance(cache.get())
.setSource(src)
.setUser(username)
.setGroup(groupname);
logEdit(op);
}
/**
* concat(trg,src..) log
*/
void logConcat(String trg, String[] srcs, long timestamp, boolean toLogRpcIds) {
ConcatDeleteOp op = ConcatDeleteOp.getInstance(cache.get())
.setTarget(trg)
.setSources(srcs)
.setTimestamp(timestamp);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add delete file record to edit log
*/
void logDelete(String src, long timestamp, boolean toLogRpcIds) {
DeleteOp op = DeleteOp.getInstance(cache.get())
.setPath(src)
.setTimestamp(timestamp);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add truncate file record to edit log
*/
void logTruncate(String src, String clientName, String clientMachine,
long size, long timestamp, Block truncateBlock) {
TruncateOp op = TruncateOp.getInstance(cache.get())
.setPath(src)
.setClientName(clientName)
.setClientMachine(clientMachine)
.setNewLength(size)
.setTimestamp(timestamp)
.setTruncateBlock(truncateBlock);
logEdit(op);
}
/**
* Add legacy block generation stamp record to edit log
*/
void logLegacyGenerationStamp(long genstamp) {
SetGenstampV1Op op = SetGenstampV1Op.getInstance(cache.get())
.setGenerationStamp(genstamp);
logEdit(op);
}
/**
* Add generation stamp record to edit log
*/
void logGenerationStamp(long genstamp) {
SetGenstampV2Op op = SetGenstampV2Op.getInstance(cache.get())
.setGenerationStamp(genstamp);
logEdit(op);
}
/**
* Record a newly allocated block ID in the edit log
*/
void logAllocateBlockId(long blockId) {
AllocateBlockIdOp op = AllocateBlockIdOp.getInstance(cache.get())
.setBlockId(blockId);
logEdit(op);
}
/**
* Add access time record to edit log
*/
void logTimes(String src, long mtime, long atime) {
TimesOp op = TimesOp.getInstance(cache.get())
.setPath(src)
.setModificationTime(mtime)
.setAccessTime(atime);
logEdit(op);
}
/**
* Add a create symlink record.
*/
void logSymlink(String path, String value, long mtime, long atime,
INodeSymlink node, boolean toLogRpcIds) {
SymlinkOp op = SymlinkOp.getInstance(cache.get())
.setId(node.getId())
.setPath(path)
.setValue(value)
.setModificationTime(mtime)
.setAccessTime(atime)
.setPermissionStatus(node.getPermissionStatus());
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* log delegation token to edit log
* @param id DelegationTokenIdentifier
* @param expiryTime of the token
*/
void logGetDelegationToken(DelegationTokenIdentifier id,
long expiryTime) {
GetDelegationTokenOp op = GetDelegationTokenOp.getInstance(cache.get())
.setDelegationTokenIdentifier(id)
.setExpiryTime(expiryTime);
logEdit(op);
}
void logRenewDelegationToken(DelegationTokenIdentifier id,
long expiryTime) {
RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance(cache.get())
.setDelegationTokenIdentifier(id)
.setExpiryTime(expiryTime);
logEdit(op);
}
void logCancelDelegationToken(DelegationTokenIdentifier id) {
CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance(cache.get())
.setDelegationTokenIdentifier(id);
logEdit(op);
}
void logUpdateMasterKey(DelegationKey key) {
UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance(cache.get())
.setDelegationKey(key);
logEdit(op);
}
void logReassignLease(String leaseHolder, String src, String newHolder) {
ReassignLeaseOp op = ReassignLeaseOp.getInstance(cache.get())
.setLeaseHolder(leaseHolder)
.setPath(src)
.setNewHolder(newHolder);
logEdit(op);
}
void logCreateSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) {
CreateSnapshotOp op = CreateSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(snapRoot).setSnapshotName(snapName);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logDeleteSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) {
DeleteSnapshotOp op = DeleteSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(snapRoot).setSnapshotName(snapName);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logRenameSnapshot(String path, String snapOldName, String snapNewName,
boolean toLogRpcIds) {
RenameSnapshotOp op = RenameSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(path).setSnapshotOldName(snapOldName)
.setSnapshotNewName(snapNewName);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logAllowSnapshot(String path) {
AllowSnapshotOp op = AllowSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(path);
logEdit(op);
}
void logDisallowSnapshot(String path) {
DisallowSnapshotOp op = DisallowSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(path);
logEdit(op);
}
/**
* Log a CacheDirectiveInfo returned from
* {@link CacheManager#addDirective(CacheDirectiveInfo, FSPermissionChecker)}
*/
void logAddCacheDirectiveInfo(CacheDirectiveInfo directive,
boolean toLogRpcIds) {
AddCacheDirectiveInfoOp op =
AddCacheDirectiveInfoOp.getInstance(cache.get())
.setDirective(directive);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logModifyCacheDirectiveInfo(
CacheDirectiveInfo directive, boolean toLogRpcIds) {
ModifyCacheDirectiveInfoOp op =
ModifyCacheDirectiveInfoOp.getInstance(
cache.get()).setDirective(directive);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logRemoveCacheDirectiveInfo(Long id, boolean toLogRpcIds) {
RemoveCacheDirectiveInfoOp op =
RemoveCacheDirectiveInfoOp.getInstance(cache.get()).setId(id);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logAddCachePool(CachePoolInfo pool, boolean toLogRpcIds) {
AddCachePoolOp op =
AddCachePoolOp.getInstance(cache.get()).setPool(pool);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logModifyCachePool(CachePoolInfo info, boolean toLogRpcIds) {
ModifyCachePoolOp op =
ModifyCachePoolOp.getInstance(cache.get()).setInfo(info);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logRemoveCachePool(String poolName, boolean toLogRpcIds) {
RemoveCachePoolOp op =
RemoveCachePoolOp.getInstance(cache.get()).setPoolName(poolName);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logStartRollingUpgrade(long startTime) {
RollingUpgradeStartOp op = RollingUpgradeStartOp.getInstance(cache.get());
op.setTime(startTime);
logEdit(op);
}
void logFinalizeRollingUpgrade(long finalizeTime) {
RollingUpgradeOp op = RollingUpgradeFinalizeOp.getInstance(cache.get());
op.setTime(finalizeTime);
logEdit(op);
}
void logSetAcl(String src, List<AclEntry> entries) {
final SetAclOp op = SetAclOp.getInstance(cache.get());
op.src = src;
op.aclEntries = entries;
logEdit(op);
}
void logSetXAttrs(String src, List<XAttr> xAttrs, boolean toLogRpcIds) {
final SetXAttrOp op = SetXAttrOp.getInstance(cache.get());
op.src = src;
op.xAttrs = xAttrs;
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logRemoveXAttrs(String src, List<XAttr> xAttrs, boolean toLogRpcIds) {
final RemoveXAttrOp op = RemoveXAttrOp.getInstance(cache.get());
op.src = src;
op.xAttrs = xAttrs;
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Get all the journals this edit log is currently operating on.
*/
List<JournalAndStream> getJournals() {
// The list implementation is CopyOnWriteArrayList,
// so we don't need to synchronize this method.
return journalSet.getAllJournalStreams();
}
/**
* Used only by tests.
*/
@VisibleForTesting
public JournalSet getJournalSet() {
return journalSet;
}
@VisibleForTesting
synchronized void setJournalSetForTesting(JournalSet js) {
this.journalSet = js;
}
/**
* Used only by tests.
*/
@VisibleForTesting
void setMetricsForTests(NameNodeMetrics metrics) {
this.metrics = metrics;
}
/**
* Return a manifest of what finalized edit logs are available
*/
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId)
throws IOException {
return journalSet.getEditLogManifest(fromTxId);
}
/**
* Finalizes the current edit log and opens a new log segment.
*
* @param layoutVersion The layout version of the new edit log segment.
* @return the transaction id of the BEGIN_LOG_SEGMENT transaction in the new
* log.
*/
synchronized long rollEditLog(int layoutVersion) throws IOException {
LOG.info("Rolling edit logs");
endCurrentLogSegment(true);
long nextTxId = getLastWrittenTxId() + 1;
startLogSegment(nextTxId, true, layoutVersion);
assert curSegmentTxId == nextTxId;
return nextTxId;
}
/**
* Start writing to the log segment with the given txid.
* Transitions from BETWEEN_LOG_SEGMENTS state to IN_LOG_SEGMENT state.
*/
synchronized void startLogSegment(final long segmentTxId,
boolean writeHeaderTxn, int layoutVersion) throws IOException {
LOG.info("Starting log segment at " + segmentTxId);
Preconditions.checkArgument(segmentTxId > 0,
"Bad txid: %s", segmentTxId);
Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
"Bad state: %s", state);
Preconditions.checkState(segmentTxId > curSegmentTxId,
"Cannot start writing to log segment " + segmentTxId +
" when previous log segment started at " + curSegmentTxId);
Preconditions.checkArgument(segmentTxId == txid + 1,
"Cannot start log segment at txid %s when next expected " +
"txid is %s", segmentTxId, txid + 1);
numTransactions = 0;
totalTimeTransactions = 0;
numTransactionsBatchedInSync.set(0L);
// TODO no need to link this back to storage anymore!
// See HDFS-2174.
storage.attemptRestoreRemovedStorage();
try {
editLogStream = journalSet.startLogSegment(segmentTxId, layoutVersion);
} catch (IOException ex) {
throw new IOException("Unable to start log segment " +
segmentTxId + ": too few journals successfully started.", ex);
}
curSegmentTxId = segmentTxId;
state = State.IN_SEGMENT;
if (writeHeaderTxn) {
logEdit(LogSegmentOp.getInstance(cache.get(),
FSEditLogOpCodes.OP_START_LOG_SEGMENT));
logSync();
}
}
/**
* Finalize the current log segment.
* Transitions from IN_SEGMENT state to BETWEEN_LOG_SEGMENTS state.
*/
public synchronized void endCurrentLogSegment(boolean writeEndTxn) {
LOG.info("Ending log segment " + curSegmentTxId +
", " + getLastWrittenTxId());
Preconditions.checkState(isSegmentOpen(),
"Bad state: %s", state);
if (writeEndTxn) {
logEdit(LogSegmentOp.getInstance(cache.get(),
FSEditLogOpCodes.OP_END_LOG_SEGMENT));
}
// always sync to ensure all edits are flushed.
logSyncAll();
printStatistics(true);
final long lastTxId = getLastWrittenTxId();
final long lastSyncedTxId = getSyncTxId();
Preconditions.checkArgument(lastTxId == lastSyncedTxId,
"LastWrittenTxId %s is expected to be the same as lastSyncedTxId %s",
lastTxId, lastSyncedTxId);
try {
journalSet.finalizeLogSegment(curSegmentTxId, lastTxId);
editLogStream = null;
} catch (IOException e) {
//All journals have failed, it will be handled in logSync.
}
state = State.BETWEEN_LOG_SEGMENTS;
}
/**
* Abort all current logs. Called from the backup node.
*/
synchronized void abortCurrentLogSegment() {
try {
//Check for null, as abort can be called any time.
if (editLogStream != null) {
editLogStream.abort();
editLogStream = null;
state = State.BETWEEN_LOG_SEGMENTS;
}
} catch (IOException e) {
LOG.warn("All journals failed to abort", e);
}
}
/**
* Archive any log files that are older than the given txid.
*
* If the edit log is not open for write, then this call returns with no
* effect.
*/
@Override
public synchronized void purgeLogsOlderThan(final long minTxIdToKeep) {
// Should not purge logs unless they are open for write.
// This prevents the SBN from purging logs on shared storage, for example.
if (!isOpenForWrite()) {
return;
}
assert curSegmentTxId == HdfsServerConstants.INVALID_TXID || // on format this is no-op
minTxIdToKeep <= curSegmentTxId :
"cannot purge logs older than txid " + minTxIdToKeep +
" when current segment starts at " + curSegmentTxId;
if (minTxIdToKeep == 0) {
return;
}
// This could be improved to not need synchronization. But currently,
// journalSet is not threadsafe, so we need to synchronize this method.
try {
journalSet.purgeLogsOlderThan(minTxIdToKeep);
} catch (IOException ex) {
//All journals have failed, it will be handled in logSync.
}
}
/**
* The actual sync activity happens while not synchronized on this object.
* Thus, synchronized activities that require that they are not concurrent
* with file operations should wait for any running sync to finish.
*/
synchronized void waitForSyncToFinish() {
while (isSyncRunning) {
try {
wait(1000);
} catch (InterruptedException ie) {}
}
}
/**
* Return the txid of the last synced transaction.
*/
public synchronized long getSyncTxId() {
return synctxid;
}
// sets the initial capacity of the flush buffer.
synchronized void setOutputBufferCapacity(int size) {
journalSet.setOutputBufferCapacity(size);
}
/**
* Create (or find if already exists) an edit output stream, which
* streams journal records (edits) to the specified backup node.<br>
*
* The new BackupNode will start receiving edits the next time this
* NameNode's logs roll.
*
* @param bnReg the backup node registration information.
* @param nnReg this (active) name-node registration.
* @throws IOException
*/
synchronized void registerBackupNode(
NamenodeRegistration bnReg, // backup node
NamenodeRegistration nnReg) // active name-node
throws IOException {
if(bnReg.isRole(NamenodeRole.CHECKPOINT))
return; // checkpoint node does not stream edits
JournalManager jas = findBackupJournal(bnReg);
if (jas != null) {
// already registered
LOG.info("Backup node " + bnReg + " re-registers");
return;
}
LOG.info("Registering new backup node: " + bnReg);
BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
synchronized(journalSetLock) {
journalSet.add(bjm, false);
}
}
synchronized void releaseBackupStream(NamenodeRegistration registration)
throws IOException {
BackupJournalManager bjm = this.findBackupJournal(registration);
if (bjm != null) {
LOG.info("Removing backup journal " + bjm);
synchronized(journalSetLock) {
journalSet.remove(bjm);
}
}
}
/**
* Find the JournalAndStream associated with this BackupNode.
*
* @return null if it cannot be found
*/
private synchronized BackupJournalManager findBackupJournal(
NamenodeRegistration bnReg) {
for (JournalManager bjm : journalSet.getJournalManagers()) {
if ((bjm instanceof BackupJournalManager)
&& ((BackupJournalManager) bjm).matchesRegistration(bnReg)) {
return (BackupJournalManager) bjm;
}
}
return null;
}
/**
* Write an operation to the edit log. Do not sync to persistent
* store yet.
*/
synchronized void logEdit(final int length, final byte[] data) {
long start = beginTransaction();
try {
editLogStream.writeRaw(data, 0, length);
} catch (IOException ex) {
// All journals have failed, it will be handled in logSync.
}
endTransaction(start);
}
/**
* Run recovery on all journals to recover any unclosed segments
*/
synchronized void recoverUnclosedStreams() {
Preconditions.checkState(
state == State.BETWEEN_LOG_SEGMENTS,
"May not recover segments - wrong state: %s", state);
try {
journalSet.recoverUnfinalizedSegments();
} catch (IOException ex) {
// All journals have failed, it is handled in logSync.
// TODO: are we sure this is OK?
}
}
public long getSharedLogCTime() throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
if (jas.isShared()) {
return jas.getManager().getJournalCTime();
}
}
throw new IOException("No shared log found.");
}
public synchronized void doPreUpgradeOfSharedLog() throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
if (jas.isShared()) {
jas.getManager().doPreUpgrade();
}
}
}
public synchronized void doUpgradeOfSharedLog() throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
if (jas.isShared()) {
jas.getManager().doUpgrade(storage);
}
}
}
public synchronized void doFinalizeOfSharedLog() throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
if (jas.isShared()) {
jas.getManager().doFinalize();
}
}
}
public synchronized boolean canRollBackSharedLog(StorageInfo prevStorage,
int targetLayoutVersion) throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
if (jas.isShared()) {
return jas.getManager().canRollBack(storage, prevStorage,
targetLayoutVersion);
}
}
throw new IOException("No shared log found.");
}
public synchronized void doRollback() throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
if (jas.isShared()) {
jas.getManager().doRollback();
}
}
}
public synchronized void discardSegments(long markerTxid)
throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
jas.getManager().discardSegments(markerTxid);
}
}
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk, boolean onlyDurableTxns)
throws IOException {
journalSet.selectInputStreams(streams, fromTxId,
inProgressOk, onlyDurableTxns);
}
public Collection<EditLogInputStream> selectInputStreams(
long fromTxId, long toAtLeastTxId) throws IOException {
return selectInputStreams(fromTxId, toAtLeastTxId, null, true, false);
}
public Collection<EditLogInputStream> selectInputStreams(
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
boolean inProgressOK) throws IOException {
return selectInputStreams(fromTxId, toAtLeastTxId,
recovery, inProgressOK, false);
}
/**
* Select a list of input streams.
*
* @param fromTxId first transaction in the selected streams
* @param toAtLeastTxId the selected streams must contain this transaction
* @param recovery recovery context
* @param inProgressOk set to true if in-progress streams are OK
* @param onlyDurableTxns set to true if streams are bounded
* by the durable TxId
*/
public Collection<EditLogInputStream> selectInputStreams(long fromTxId,
long toAtLeastTxId, MetaRecoveryContext recovery, boolean inProgressOk,
boolean onlyDurableTxns) throws IOException {
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
synchronized(journalSetLock) {
Preconditions.checkState(journalSet.isOpen(), "Cannot call " +
"selectInputStreams() on closed FSEditLog");
selectInputStreams(streams, fromTxId, inProgressOk, onlyDurableTxns);
}
try {
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
} catch (IOException e) {
if (recovery != null) {
// If recovery mode is enabled, continue loading even if we know we
// can't load up to toAtLeastTxId.
LOG.error(e);
} else {
closeAllStreams(streams);
throw e;
}
}
return streams;
}
/**
* Check for gaps in the edit log input stream list.
* Note: we're assuming that the list is sorted and that txid ranges don't
* overlap. This could be done better and with more generality with an
* interval tree.
*/
private void checkForGaps(List<EditLogInputStream> streams, long fromTxId,
long toAtLeastTxId, boolean inProgressOk) throws IOException {
Iterator<EditLogInputStream> iter = streams.iterator();
long txId = fromTxId;
while (true) {
if (txId > toAtLeastTxId) return;
if (!iter.hasNext()) break;
EditLogInputStream elis = iter.next();
if (elis.getFirstTxId() > txId) break;
long next = elis.getLastTxId();
if (next == HdfsServerConstants.INVALID_TXID) {
if (!inProgressOk) {
throw new RuntimeException("inProgressOk = false, but " +
"selectInputStreams returned an in-progress edit " +
"log input stream (" + elis + ")");
}
// We don't know where the in-progress stream ends.
// It could certainly go all the way up to toAtLeastTxId.
return;
}
txId = next + 1;
}
throw new IOException(String.format("Gap in transactions. Expected to "
+ "be able to read up until at least txid %d but unable to find any "
+ "edit logs containing txid %d", toAtLeastTxId, txId));
}
/**
* Close all the streams in a collection
* @param streams The list of streams to close
*/
static void closeAllStreams(Iterable<EditLogInputStream> streams) {
for (EditLogInputStream s : streams) {
IOUtils.closeStream(s);
}
}
/**
* Retrieve the implementation class for a Journal scheme.
* @param conf The configuration to retrieve the information from
* @param uriScheme The uri scheme to look up.
* @return the class of the journal implementation
* @throws IllegalArgumentException if no class is configured for uri
*/
static Class<? extends JournalManager> getJournalClass(Configuration conf,
String uriScheme) {
String key
= DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + "." + uriScheme;
Class <? extends JournalManager> clazz = null;
try {
clazz = conf.getClass(key, null, JournalManager.class);
} catch (RuntimeException re) {
throw new IllegalArgumentException(
"Invalid class specified for " + uriScheme, re);
}
if (clazz == null) {
LOG.warn("No class configured for " +uriScheme
+ ", " + key + " is empty");
throw new IllegalArgumentException(
"No class configured for " + uriScheme);
}
return clazz;
}
/**
* Construct a custom journal manager.
* The class to construct is taken from the configuration.
* @param uri Uri to construct
* @return The constructed journal manager
* @throws IllegalArgumentException if no class is configured for uri
*/
@VisibleForTesting
JournalManager createJournal(URI uri) {
Class<? extends JournalManager> clazz
= getJournalClass(conf, uri.getScheme());
try {
Constructor<? extends JournalManager> cons
= clazz.getConstructor(Configuration.class, URI.class,
NamespaceInfo.class);
return cons.newInstance(conf, uri, storage.getNamespaceInfo());
} catch (Exception e) {
throw new IllegalArgumentException("Unable to construct journal, "
+ uri, e);
}
}
@VisibleForTesting
// needed by async impl to restart thread when edit log is replaced by a
// spy because a spy is a shallow copy
public void restart() {
}
/**
* Return total number of syncs happened on this edit log.
* @return long - count
*/
public long getTotalSyncCount() {
// Avoid NPE as possible.
if (editLogStream == null) {
return 0;
}
long count = 0;
try {
count = editLogStream.getNumSync();
} catch (NullPointerException ignore) {
// This method is used for metrics, so we don't synchronize it.
// Therefore NPE can happen even if there is a null check before.
}
return count;
}
}