| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs.server.namenode.ha; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.security.PrivilegedAction; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Lists; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.HAUtil; |
| import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; |
| import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; |
| import org.apache.hadoop.hdfs.server.namenode.EditLogInputException; |
| import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; |
| import org.apache.hadoop.hdfs.server.namenode.FSEditLog; |
| import org.apache.hadoop.hdfs.server.namenode.FSImage; |
| import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.security.SecurityUtil; |
| |
| import static org.apache.hadoop.util.Time.monotonicNow; |
| import static org.apache.hadoop.util.ExitUtil.terminate; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| |
| |
| /** |
| * EditLogTailer represents a thread which periodically reads from edits |
| * journals and applies the transactions contained within to a given |
| * FSNamesystem. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Evolving |
| public class EditLogTailer { |
| public static final Log LOG = LogFactory.getLog(EditLogTailer.class); |
| |
| private final EditLogTailerThread tailerThread; |
| |
| private final Configuration conf; |
| private final FSNamesystem namesystem; |
| private final Iterator<RemoteNameNodeInfo> nnLookup; |
| private FSEditLog editLog; |
| |
| private RemoteNameNodeInfo currentNN; |
| |
| /** |
| * The last transaction ID at which an edit log roll was initiated. |
| */ |
| private long lastRollTriggerTxId = HdfsServerConstants.INVALID_TXID; |
| |
| /** |
| * The highest transaction ID loaded by the Standby. |
| */ |
| private long lastLoadedTxnId = HdfsServerConstants.INVALID_TXID; |
| |
| /** |
| * The last time we successfully loaded a non-zero number of edits from the |
| * shared directory. |
| */ |
| private long lastLoadTimeMs; |
| |
| /** |
| * The last time we triggered a edit log roll on active namenode. |
| */ |
| private long lastRollTimeMs; |
| |
| /** |
| * How often the Standby should roll edit logs. Since the Standby only reads |
| * from finalized log segments, the Standby will only be as up-to-date as how |
| * often the logs are rolled. |
| */ |
| private final long logRollPeriodMs; |
| |
| /** |
| * The timeout in milliseconds of calling rollEdits RPC to Active NN. |
| * @see HDFS-4176. |
| */ |
| private final long rollEditsTimeoutMs; |
| |
| /** |
| * The executor to run roll edit RPC call in a daemon thread. |
| */ |
| private final ExecutorService rollEditsRpcExecutor; |
| |
| /** |
| * How often the tailer should check if there are new edit log entries |
| * ready to be consumed. This is the initial delay before any backoff. |
| */ |
| private final long sleepTimeMs; |
| /** |
| * The maximum time the tailer should wait between checking for new edit log |
| * entries. Exponential backoff will be applied when an edit log tail is |
| * performed but no edits are available to be read. If this is less than or |
| * equal to 0, backoff is disabled. |
| */ |
| private final long maxSleepTimeMs; |
| |
| private final int nnCount; |
| private NamenodeProtocol cachedActiveProxy = null; |
| // count of the number of NNs we have attempted in the current lookup loop |
| private int nnLoopCount = 0; |
| |
| /** |
| * Maximum number of retries we should give each of the remote namenodes |
| * before giving up. |
| */ |
| private int maxRetries; |
| |
| /** |
| * Whether the tailer should tail the in-progress edit log segments. |
| */ |
| private final boolean inProgressOk; |
| |
| public EditLogTailer(FSNamesystem namesystem, Configuration conf) { |
| this.tailerThread = new EditLogTailerThread(); |
| this.conf = conf; |
| this.namesystem = namesystem; |
| this.editLog = namesystem.getEditLog(); |
| |
| lastLoadTimeMs = monotonicNow(); |
| lastRollTimeMs = monotonicNow(); |
| |
| logRollPeriodMs = conf.getTimeDuration( |
| DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, |
| DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT, |
| TimeUnit.SECONDS, TimeUnit.MILLISECONDS); |
| List<RemoteNameNodeInfo> nns = Collections.emptyList(); |
| if (logRollPeriodMs >= 0) { |
| try { |
| nns = RemoteNameNodeInfo.getRemoteNameNodes(conf); |
| } catch (IOException e) { |
| throw new IllegalArgumentException("Remote NameNodes not correctly configured!", e); |
| } |
| |
| for (RemoteNameNodeInfo info : nns) { |
| // overwrite the socket address, if we need to |
| InetSocketAddress ipc = NameNode.getServiceAddress(info.getConfiguration(), true); |
| // sanity check the ipc address |
| Preconditions.checkArgument(ipc.getPort() > 0, |
| "Active NameNode must have an IPC port configured. " + "Got address '%s'", ipc); |
| info.setIpcAddress(ipc); |
| } |
| |
| LOG.info("Will roll logs on active node every " + |
| (logRollPeriodMs / 1000) + " seconds."); |
| } else { |
| LOG.info("Not going to trigger log rolls on active node because " + |
| DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY + " is negative."); |
| } |
| |
| sleepTimeMs = conf.getTimeDuration( |
| DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, |
| DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT, |
| TimeUnit.SECONDS, TimeUnit.MILLISECONDS); |
| long maxSleepTimeMsTemp = conf.getTimeDuration( |
| DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY, |
| DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_DEFAULT, |
| TimeUnit.MILLISECONDS); |
| if (maxSleepTimeMsTemp > 0 && maxSleepTimeMsTemp < sleepTimeMs) { |
| LOG.warn(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY |
| + " was configured to be " + maxSleepTimeMsTemp |
| + " ms, but this is less than " |
| + DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY |
| + ". Disabling backoff when tailing edit logs."); |
| maxSleepTimeMs = 0; |
| } else { |
| maxSleepTimeMs = maxSleepTimeMsTemp; |
| } |
| |
| maxRetries = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, |
| DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT); |
| if (maxRetries <= 0) { |
| LOG.error("Specified a non-positive number of retries for the number of retries for the " + |
| "namenode connection when manipulating the edit log (" + |
| DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY + "), setting to default: " + |
| DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT); |
| maxRetries = DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT; |
| } |
| |
| inProgressOk = conf.getBoolean( |
| DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, |
| DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT); |
| |
| nnCount = nns.size(); |
| // setup the iterator to endlessly loop the nns |
| this.nnLookup = Iterators.cycle(nns); |
| |
| rollEditsTimeoutMs = conf.getTimeDuration( |
| DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, |
| DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT, |
| TimeUnit.SECONDS, TimeUnit.MILLISECONDS); |
| |
| rollEditsRpcExecutor = Executors.newSingleThreadExecutor( |
| new ThreadFactoryBuilder().setDaemon(true).build()); |
| |
| LOG.debug("logRollPeriodMs=" + logRollPeriodMs + |
| " sleepTime=" + sleepTimeMs); |
| } |
| |
| public void start() { |
| tailerThread.start(); |
| } |
| |
| public void stop() throws IOException { |
| tailerThread.setShouldRun(false); |
| tailerThread.interrupt(); |
| try { |
| tailerThread.join(); |
| } catch (InterruptedException e) { |
| LOG.warn("Edit log tailer thread exited with an exception"); |
| throw new IOException(e); |
| } finally { |
| rollEditsRpcExecutor.shutdown(); |
| } |
| } |
| |
| @VisibleForTesting |
| FSEditLog getEditLog() { |
| return editLog; |
| } |
| |
| @VisibleForTesting |
| public void setEditLog(FSEditLog editLog) { |
| this.editLog = editLog; |
| } |
| |
| public void catchupDuringFailover() throws IOException { |
| Preconditions.checkState(tailerThread == null || |
| !tailerThread.isAlive(), |
| "Tailer thread should not be running once failover starts"); |
| // Important to do tailing as the login user, in case the shared |
| // edits storage is implemented by a JournalManager that depends |
| // on security credentials to access the logs (eg QuorumJournalManager). |
| SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| try { |
| // It is already under the full name system lock and the checkpointer |
| // thread is already stopped. No need to acqure any other lock. |
| doTailEdits(); |
| } catch (InterruptedException e) { |
| throw new IOException(e); |
| } |
| return null; |
| } |
| }); |
| } |
| |
| @VisibleForTesting |
| public long doTailEdits() throws IOException, InterruptedException { |
| // Write lock needs to be interruptible here because the |
| // transitionToActive RPC takes the write lock before calling |
| // tailer.stop() -- so if we're not interruptible, it will |
| // deadlock. |
| namesystem.writeLockInterruptibly(); |
| try { |
| FSImage image = namesystem.getFSImage(); |
| |
| long lastTxnId = image.getLastAppliedTxId(); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("lastTxnId: " + lastTxnId); |
| } |
| Collection<EditLogInputStream> streams; |
| try { |
| streams = editLog.selectInputStreams(lastTxnId + 1, 0, |
| null, inProgressOk, true); |
| } catch (IOException ioe) { |
| // This is acceptable. If we try to tail edits in the middle of an edits |
| // log roll, i.e. the last one has been finalized but the new inprogress |
| // edits file hasn't been started yet. |
| LOG.warn("Edits tailer failed to find any streams. Will try again " + |
| "later.", ioe); |
| return 0; |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("edit streams to load from: " + streams.size()); |
| } |
| |
| // Once we have streams to load, errors encountered are legitimate cause |
| // for concern, so we don't catch them here. Simple errors reading from |
| // disk are ignored. |
| long editsLoaded = 0; |
| try { |
| editsLoaded = image.loadEdits(streams, namesystem); |
| } catch (EditLogInputException elie) { |
| editsLoaded = elie.getNumEditsLoaded(); |
| throw elie; |
| } finally { |
| if (editsLoaded > 0 || LOG.isDebugEnabled()) { |
| LOG.debug(String.format("Loaded %d edits starting from txid %d ", |
| editsLoaded, lastTxnId)); |
| } |
| } |
| |
| if (editsLoaded > 0) { |
| lastLoadTimeMs = monotonicNow(); |
| } |
| lastLoadedTxnId = image.getLastAppliedTxId(); |
| return editsLoaded; |
| } finally { |
| namesystem.writeUnlock(); |
| } |
| } |
| |
| /** |
| * @return time in msec of when we last loaded a non-zero number of edits. |
| */ |
| public long getLastLoadTimeMs() { |
| return lastLoadTimeMs; |
| } |
| |
| /** |
| * @return true if the configured log roll period has elapsed. |
| */ |
| private boolean tooLongSinceLastLoad() { |
| return logRollPeriodMs >= 0 && |
| (monotonicNow() - lastRollTimeMs) > logRollPeriodMs; |
| } |
| |
| /** |
| * NameNodeProxy factory method. |
| * @return a Callable to roll logs on remote NameNode. |
| */ |
| @VisibleForTesting |
| Callable<Void> getNameNodeProxy() { |
| return new MultipleNameNodeProxy<Void>() { |
| @Override |
| protected Void doWork() throws IOException { |
| cachedActiveProxy.rollEditLog(); |
| return null; |
| } |
| }; |
| } |
| |
| /** |
| * Trigger the active node to roll its logs. |
| */ |
| @VisibleForTesting |
| void triggerActiveLogRoll() { |
| LOG.info("Triggering log roll on remote NameNode"); |
| Future<Void> future = null; |
| try { |
| future = rollEditsRpcExecutor.submit(getNameNodeProxy()); |
| future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS); |
| lastRollTimeMs = monotonicNow(); |
| lastRollTriggerTxId = lastLoadedTxnId; |
| } catch (ExecutionException e) { |
| LOG.warn("Unable to trigger a roll of the active NN", e); |
| } catch (TimeoutException e) { |
| if (future != null) { |
| future.cancel(true); |
| } |
| LOG.warn(String.format( |
| "Unable to finish rolling edits in %d ms", rollEditsTimeoutMs)); |
| } catch (InterruptedException e) { |
| LOG.warn("Unable to trigger a roll of the active NN", e); |
| } |
| } |
| |
| @VisibleForTesting |
| void sleep(long sleepTimeMillis) throws InterruptedException { |
| Thread.sleep(sleepTimeMillis); |
| } |
| |
| /** |
| * The thread which does the actual work of tailing edits journals and |
| * applying the transactions to the FSNS. |
| */ |
| private class EditLogTailerThread extends Thread { |
| private volatile boolean shouldRun = true; |
| |
| private EditLogTailerThread() { |
| super("Edit log tailer"); |
| } |
| |
| private void setShouldRun(boolean shouldRun) { |
| this.shouldRun = shouldRun; |
| } |
| |
| @Override |
| public void run() { |
| SecurityUtil.doAsLoginUserOrFatal( |
| new PrivilegedAction<Object>() { |
| @Override |
| public Object run() { |
| doWork(); |
| return null; |
| } |
| }); |
| } |
| |
| private void doWork() { |
| long currentSleepTimeMs = sleepTimeMs; |
| while (shouldRun) { |
| long editsTailed = 0; |
| try { |
| // There's no point in triggering a log roll if the Standby hasn't |
| // read any more transactions since the last time a roll was |
| // triggered. |
| boolean triggeredLogRoll = false; |
| if (tooLongSinceLastLoad() && |
| lastRollTriggerTxId < lastLoadedTxnId) { |
| triggerActiveLogRoll(); |
| triggeredLogRoll = true; |
| } |
| /** |
| * Check again in case someone calls {@link EditLogTailer#stop} while |
| * we're triggering an edit log roll, since ipc.Client catches and |
| * ignores {@link InterruptedException} in a few places. This fixes |
| * the bug described in HDFS-2823. |
| */ |
| if (!shouldRun) { |
| break; |
| } |
| // Prevent reading of name system while being modified. The full |
| // name system lock will be acquired to further block even the block |
| // state updates. |
| namesystem.cpLockInterruptibly(); |
| try { |
| editsTailed = doTailEdits(); |
| } finally { |
| namesystem.cpUnlock(); |
| } |
| //Update NameDirSize Metric |
| if (triggeredLogRoll) { |
| namesystem.getFSImage().getStorage().updateNameDirSize(); |
| } |
| } catch (EditLogInputException elie) { |
| LOG.warn("Error while reading edits from disk. Will try again.", elie); |
| } catch (InterruptedException ie) { |
| // interrupter should have already set shouldRun to false |
| continue; |
| } catch (Throwable t) { |
| LOG.fatal("Unknown error encountered while tailing edits. " + |
| "Shutting down standby NN.", t); |
| terminate(1, t); |
| } |
| |
| try { |
| if (editsTailed == 0 && maxSleepTimeMs > 0) { |
| // If no edits were tailed, apply exponential backoff |
| // before tailing again. Double the current sleep time on each |
| // empty response, but don't exceed the max. If the sleep time |
| // was configured as 0, start the backoff at 1 ms. |
| currentSleepTimeMs = Math.min(maxSleepTimeMs, |
| (currentSleepTimeMs == 0 ? 1 : currentSleepTimeMs) * 2); |
| } else { |
| currentSleepTimeMs = sleepTimeMs; // reset to initial sleep time |
| } |
| EditLogTailer.this.sleep(currentSleepTimeMs); |
| } catch (InterruptedException e) { |
| LOG.warn("Edit log tailer interrupted", e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Manage the 'active namenode proxy'. This cannot just be the a single proxy since we could |
| * failover across a number of NameNodes, rather than just between an active and a standby. |
| * <p> |
| * We - lazily - get a proxy to one of the configured namenodes and attempt to make the request |
| * against it. If it doesn't succeed, either because the proxy failed to be created or the request |
| * failed, we try the next NN in the list. We try this up to the configuration maximum number of |
| * retries before throwing up our hands. A working proxy is retained across attempts since we |
| * expect the active NameNode to switch rarely. |
| * <p> |
| * This mechanism is <b>very bad</b> for cases where we care about being <i>fast</i>; it just |
| * blindly goes and tries namenodes. |
| */ |
| @VisibleForTesting |
| abstract class MultipleNameNodeProxy<T> implements Callable<T> { |
| |
| /** |
| * Do the actual work to the remote namenode via the {@link #cachedActiveProxy}. |
| * @return the result of the work, if there is one |
| * @throws IOException if the actions done to the proxy throw an exception. |
| */ |
| protected abstract T doWork() throws IOException; |
| |
| public T call() throws IOException { |
| // reset the loop count on success |
| nnLoopCount = 0; |
| while ((cachedActiveProxy = getActiveNodeProxy()) != null) { |
| try { |
| T ret = doWork(); |
| return ret; |
| } catch (IOException e) { |
| LOG.warn("Exception from remote name node " + currentNN |
| + ", try next.", e); |
| |
| // Try next name node if exception happens. |
| cachedActiveProxy = null; |
| nnLoopCount++; |
| } |
| } |
| throw new IOException("Cannot find any valid remote NN to service request!"); |
| } |
| |
| private NamenodeProtocol getActiveNodeProxy() throws IOException { |
| if (cachedActiveProxy == null) { |
| while (true) { |
| // if we have reached the max loop count, quit by returning null |
| if ((nnLoopCount / nnCount) >= maxRetries) { |
| return null; |
| } |
| |
| currentNN = nnLookup.next(); |
| try { |
| int rpcTimeout = conf.getInt( |
| DFSConfigKeys.DFS_HA_LOGROLL_RPC_TIMEOUT_KEY, |
| DFSConfigKeys.DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT); |
| NamenodeProtocolPB proxy = RPC.waitForProxy(NamenodeProtocolPB.class, |
| RPC.getProtocolVersion(NamenodeProtocolPB.class), currentNN.getIpcAddress(), conf, |
| rpcTimeout, Long.MAX_VALUE); |
| cachedActiveProxy = new NamenodeProtocolTranslatorPB(proxy); |
| break; |
| } catch (IOException e) { |
| LOG.info("Failed to reach " + currentNN, e); |
| // couldn't even reach this NN, try the next one |
| nnLoopCount++; |
| } |
| } |
| } |
| assert cachedActiveProxy != null; |
| return cachedActiveProxy; |
| } |
| } |
| } |
| |