blob: db64e63874cd57b0775ecaec168829af43e231c6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import static org.apache.hadoop.util.ExitUtil.terminate;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
/**
* Manages a collection of Journals. None of the methods are synchronized, it is
* assumed that FSEditLog methods, that use this class, use proper
* synchronization.
*/
@InterfaceAudience.Private
public class JournalSet implements JournalManager {
static final Log LOG = LogFactory.getLog(FSEditLog.class);
static final public Comparator<EditLogInputStream>
EDIT_LOG_INPUT_STREAM_COMPARATOR = new Comparator<EditLogInputStream>() {
@Override
public int compare(EditLogInputStream a, EditLogInputStream b) {
return ComparisonChain.start().
compare(a.getFirstTxId(), b.getFirstTxId()).
compare(b.getLastTxId(), a.getLastTxId()).
result();
}
};
/**
* Container for a JournalManager paired with its currently
* active stream.
*
* If a Journal gets disabled due to an error writing to its
* stream, then the stream will be aborted and set to null.
*/
static class JournalAndStream implements CheckableNameNodeResource {
private final JournalManager journal;
private boolean disabled = false;
private EditLogOutputStream stream;
private boolean required = false;
public JournalAndStream(JournalManager manager, boolean required) {
this.journal = manager;
this.required = required;
}
public void startLogSegment(long txId) throws IOException {
Preconditions.checkState(stream == null);
disabled = false;
stream = journal.startLogSegment(txId);
}
/**
* Closes the stream, also sets it to null.
*/
public void closeStream() throws IOException {
if (stream == null) return;
stream.close();
stream = null;
}
/**
* Close the Journal and Stream
*/
public void close() throws IOException {
closeStream();
journal.close();
}
/**
* Aborts the stream, also sets it to null.
*/
public void abort() {
if (stream == null) return;
try {
stream.abort();
} catch (IOException ioe) {
LOG.error("Unable to abort stream " + stream, ioe);
}
stream = null;
}
boolean isActive() {
return stream != null;
}
/**
* Should be used outside JournalSet only for testing.
*/
EditLogOutputStream getCurrentStream() {
return stream;
}
@Override
public String toString() {
return "JournalAndStream(mgr=" + journal +
", " + "stream=" + stream + ")";
}
void setCurrentStreamForTests(EditLogOutputStream stream) {
this.stream = stream;
}
JournalManager getManager() {
return journal;
}
private boolean isDisabled() {
return disabled;
}
private void setDisabled(boolean disabled) {
this.disabled = disabled;
}
@Override
public boolean isResourceAvailable() {
return !isDisabled();
}
@Override
public boolean isRequired() {
return required;
}
}
private List<JournalAndStream> journals = Lists.newArrayList();
final int minimumRedundantJournals;
JournalSet(int minimumRedundantResources) {
this.minimumRedundantJournals = minimumRedundantResources;
}
@Override
public void format(NamespaceInfo nsInfo) throws IOException {
// The iteration is done by FSEditLog itself
throw new UnsupportedOperationException();
}
@Override
public boolean hasSomeData() throws IOException {
// This is called individually on the underlying journals,
// not on the JournalSet.
throw new UnsupportedOperationException();
}
@Override
public EditLogOutputStream startLogSegment(final long txId) throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.startLogSegment(txId);
}
}, "starting log segment " + txId);
return new JournalSetOutputStream();
}
@Override
public void finalizeLogSegment(final long firstTxId, final long lastTxId)
throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.closeStream();
jas.getManager().finalizeLogSegment(firstTxId, lastTxId);
}
}
}, "finalize log segment " + firstTxId + ", " + lastTxId);
}
@Override
public void close() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.close();
}
}, "close journal");
}
/**
* In this function, we get a bunch of streams from all of our JournalManager
* objects. Then we add these to the collection one by one.
*
* @param streams The collection to add the streams to. It may or
* may not be sorted-- this is up to the caller.
* @param fromTxId The transaction ID to start looking for streams at
* @param inProgressOk Should we consider unfinalized streams?
*/
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) {
final PriorityQueue<EditLogInputStream> allStreams =
new PriorityQueue<EditLogInputStream>(64,
EDIT_LOG_INPUT_STREAM_COMPARATOR);
for (JournalAndStream jas : journals) {
if (jas.isDisabled()) {
LOG.info("Skipping jas " + jas + " since it's disabled");
continue;
}
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
}
// We want to group together all the streams that start on the same start
// transaction ID. To do this, we maintain an accumulator (acc) of all
// the streams we've seen at a given start transaction ID. When we see a
// higher start transaction ID, we select a stream from the accumulator and
// clear it. Then we begin accumulating streams with the new, higher start
// transaction ID.
LinkedList<EditLogInputStream> acc =
new LinkedList<EditLogInputStream>();
EditLogInputStream elis;
while ((elis = allStreams.poll()) != null) {
if (acc.isEmpty()) {
acc.add(elis);
} else {
long accFirstTxId = acc.get(0).getFirstTxId();
if (accFirstTxId == elis.getFirstTxId()) {
acc.add(elis);
} else if (accFirstTxId < elis.getFirstTxId()) {
streams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
acc.add(elis);
} else if (accFirstTxId > elis.getFirstTxId()) {
throw new RuntimeException("sorted set invariants violated! " +
"Got stream with first txid " + elis.getFirstTxId() +
", but the last firstTxId was " + accFirstTxId);
}
}
}
if (!acc.isEmpty()) {
streams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
}
}
/**
* Returns true if there are no journals, all redundant journals are disabled,
* or any required journals are disabled.
*
* @return True if there no journals, all redundant journals are disabled,
* or any required journals are disabled.
*/
public boolean isEmpty() {
return !NameNodeResourcePolicy.areResourcesAvailable(journals,
minimumRedundantJournals);
}
/**
* Called when some journals experience an error in some operation.
*/
private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals) {
if (badJournals == null || badJournals.isEmpty()) {
return; // nothing to do
}
for (JournalAndStream j : badJournals) {
LOG.error("Disabling journal " + j);
j.abort();
j.setDisabled(true);
}
}
/**
* Implementations of this interface encapsulate operations that can be
* iteratively applied on all the journals. For example see
* {@link JournalSet#mapJournalsAndReportErrors}.
*/
private interface JournalClosure {
/**
* The operation on JournalAndStream.
* @param jas Object on which operations are performed.
* @throws IOException
*/
public void apply(JournalAndStream jas) throws IOException;
}
/**
* Apply the given operation across all of the journal managers, disabling
* any for which the closure throws an IOException.
* @param closure {@link JournalClosure} object encapsulating the operation.
* @param status message used for logging errors (e.g. "opening journal")
* @throws IOException If the operation fails on all the journals.
*/
private void mapJournalsAndReportErrors(
JournalClosure closure, String status) throws IOException{
List<JournalAndStream> badJAS = Lists.newLinkedList();
for (JournalAndStream jas : journals) {
try {
closure.apply(jas);
} catch (Throwable t) {
if (jas.isRequired()) {
final String msg = "Error: " + status + " failed for required journal ("
+ jas + ")";
LOG.fatal(msg, t);
// If we fail on *any* of the required journals, then we must not
// continue on any of the other journals. Abort them to ensure that
// retry behavior doesn't allow them to keep going in any way.
abortAllJournals();
// the current policy is to shutdown the NN on errors to shared edits
// dir. There are many code paths to shared edits failures - syncs,
// roll of edits etc. All of them go through this common function
// where the isRequired() check is made. Applying exit policy here
// to catch all code paths.
terminate(1, msg);
} else {
LOG.error("Error: " + status + " failed for (journal " + jas + ")", t);
badJAS.add(jas);
}
}
}
disableAndReportErrorOnJournals(badJAS);
if (!NameNodeResourcePolicy.areResourcesAvailable(journals,
minimumRedundantJournals)) {
String message = status + " failed for too many journals";
LOG.error("Error: " + message);
throw new IOException(message);
}
}
/**
* Abort all of the underlying streams.
*/
private void abortAllJournals() {
for (JournalAndStream jas : journals) {
if (jas.isActive()) {
jas.abort();
}
}
}
/**
* An implementation of EditLogOutputStream that applies a requested method on
* all the journals that are currently active.
*/
private class JournalSetOutputStream extends EditLogOutputStream {
JournalSetOutputStream() throws IOException {
super();
}
@Override
public void write(final FSEditLogOp op)
throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().write(op);
}
}
}, "write op");
}
@Override
public void writeRaw(final byte[] data, final int offset, final int length)
throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().writeRaw(data, offset, length);
}
}
}, "write bytes");
}
@Override
public void create() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().create();
}
}
}, "create");
}
@Override
public void close() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.closeStream();
}
}, "close");
}
@Override
public void abort() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.abort();
}
}, "abort");
}
@Override
public void setReadyToFlush() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().setReadyToFlush();
}
}
}, "setReadyToFlush");
}
@Override
protected void flushAndSync() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().flushAndSync();
}
}
}, "flushAndSync");
}
@Override
public void flush() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().flush();
}
}
}, "flush");
}
@Override
public boolean shouldForceSync() {
for (JournalAndStream js : journals) {
if (js.isActive() && js.getCurrentStream().shouldForceSync()) {
return true;
}
}
return false;
}
@Override
protected long getNumSync() {
for (JournalAndStream jas : journals) {
if (jas.isActive()) {
return jas.getCurrentStream().getNumSync();
}
}
return 0;
}
}
@Override
public void setOutputBufferCapacity(final int size) {
try {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.getManager().setOutputBufferCapacity(size);
}
}, "setOutputBufferCapacity");
} catch (IOException e) {
LOG.error("Error in setting outputbuffer capacity");
}
}
@VisibleForTesting
List<JournalAndStream> getAllJournalStreams() {
return journals;
}
List<JournalManager> getJournalManagers() {
List<JournalManager> jList = new ArrayList<JournalManager>();
for (JournalAndStream j : journals) {
jList.add(j.getManager());
}
return jList;
}
void add(JournalManager j, boolean required) {
JournalAndStream jas = new JournalAndStream(j, required);
journals.add(jas);
}
void remove(JournalManager j) {
JournalAndStream jasToRemove = null;
for (JournalAndStream jas: journals) {
if (jas.getManager().equals(j)) {
jasToRemove = jas;
break;
}
}
if (jasToRemove != null) {
jasToRemove.abort();
journals.remove(jasToRemove);
}
}
@Override
public void purgeLogsOlderThan(final long minTxIdToKeep) throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.getManager().purgeLogsOlderThan(minTxIdToKeep);
}
}, "purgeLogsOlderThan " + minTxIdToKeep);
}
@Override
public void recoverUnfinalizedSegments() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.getManager().recoverUnfinalizedSegments();
}
}, "recoverUnfinalizedSegments");
}
/**
* Return a manifest of what finalized edit logs are available. All available
* edit logs are returned starting from the transaction id passed.
*
* @param fromTxId Starting transaction id to read the logs.
* @return RemoteEditLogManifest object.
*/
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
// Collect RemoteEditLogs available from each FileJournalManager
List<RemoteEditLog> allLogs = Lists.newArrayList();
for (JournalAndStream j : journals) {
if (j.getManager() instanceof FileJournalManager) {
FileJournalManager fjm = (FileJournalManager)j.getManager();
try {
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
} catch (Throwable t) {
LOG.warn("Cannot list edit logs in " + fjm, t);
}
}
}
// Group logs by their starting txid
ImmutableListMultimap<Long, RemoteEditLog> logsByStartTxId =
Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
long curStartTxId = fromTxId;
List<RemoteEditLog> logs = Lists.newArrayList();
while (true) {
ImmutableList<RemoteEditLog> logGroup = logsByStartTxId.get(curStartTxId);
if (logGroup.isEmpty()) {
// we have a gap in logs - for example because we recovered some old
// storage directory with ancient logs. Clear out any logs we've
// accumulated so far, and then skip to the next segment of logs
// after the gap.
SortedSet<Long> startTxIds = Sets.newTreeSet(logsByStartTxId.keySet());
startTxIds = startTxIds.tailSet(curStartTxId);
if (startTxIds.isEmpty()) {
break;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Found gap in logs at " + curStartTxId + ": " +
"not returning previous logs in manifest.");
}
logs.clear();
curStartTxId = startTxIds.first();
continue;
}
}
// Find the one that extends the farthest forward
RemoteEditLog bestLog = Collections.max(logGroup);
logs.add(bestLog);
// And then start looking from after that point
curStartTxId = bestLog.getEndTxId() + 1;
}
RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
if (LOG.isDebugEnabled()) {
LOG.debug("Generated manifest for logs since " + fromTxId + ":"
+ ret);
}
return ret;
}
/**
* Add sync times to the buffer.
*/
String getSyncTimes() {
StringBuilder buf = new StringBuilder();
for (JournalAndStream jas : journals) {
if (jas.isActive()) {
buf.append(jas.getCurrentStream().getTotalSyncTime());
buf.append(" ");
}
}
return buf.toString();
}
}