| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode.ha; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.lang.reflect.Proxy; |
| import java.net.URI; |
| import java.util.concurrent.TimeUnit; |
| import java.util.List; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; |
| import org.apache.hadoop.hdfs.ClientGSIContext; |
| import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; |
| import org.apache.hadoop.hdfs.protocol.ClientProtocol; |
| import org.apache.hadoop.io.retry.AtMostOnce; |
| import org.apache.hadoop.io.retry.Idempotent; |
| import org.apache.hadoop.io.retry.RetryPolicies; |
| import org.apache.hadoop.io.retry.RetryPolicy; |
| import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; |
| import org.apache.hadoop.ipc.AlignmentContext; |
| import org.apache.hadoop.ipc.Client.ConnectionId; |
| import org.apache.hadoop.ipc.ObserverRetryOnActiveException; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.ipc.RpcInvocationHandler; |
| import org.apache.hadoop.ipc.StandbyException; |
| import org.apache.hadoop.util.Time; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation |
| * that supports reading from observer namenode(s). |
| * |
| * This constructs a wrapper proxy that sends the request to observer |
| * namenode(s), if observer read is enabled. In case there are multiple |
| * observer namenodes, it will try them one by one in case the RPC failed. It |
| * will fail back to the active namenode after it has exhausted all the |
| * observer namenodes. |
| * |
| * Read and write requests will still be sent to active NN if reading from |
| * observer is turned off. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Evolving |
| public class ObserverReadProxyProvider<T> |
| extends AbstractNNFailoverProxyProvider<T> { |
| @VisibleForTesting |
| static final Logger LOG = LoggerFactory.getLogger( |
| ObserverReadProxyProvider.class); |
| |
| /** Configuration key for {@link #autoMsyncPeriodMs}. */ |
| static final String AUTO_MSYNC_PERIOD_KEY_PREFIX = |
| HdfsClientConfigKeys.Failover.PREFIX + "observer.auto-msync-period"; |
| /** Auto-msync disabled by default. */ |
| static final long AUTO_MSYNC_PERIOD_DEFAULT = -1; |
| |
| /** Client-side context for syncing with the NameNode server side. */ |
| private final AlignmentContext alignmentContext; |
| |
| /** The inner proxy provider used for active/standby failover. */ |
| private final AbstractNNFailoverProxyProvider<T> failoverProxy; |
| /** List of all NameNode proxies. */ |
| private final List<NNProxyInfo<T>> nameNodeProxies; |
| |
| /** The policy used to determine if an exception is fatal or retriable. */ |
| private final RetryPolicy observerRetryPolicy; |
| /** The combined proxy which redirects to other proxies as necessary. */ |
| private final ProxyInfo<T> combinedProxy; |
| |
| /** |
| * Whether reading from observer is enabled. If this is false, all read |
| * requests will still go to active NN. |
| */ |
| private boolean observerReadEnabled; |
| |
| /** |
| * This adjusts how frequently this proxy provider should auto-msync to the |
| * Active NameNode, automatically performing an msync() call to the active |
| * to fetch the current transaction ID before submitting read requests to |
| * observer nodes. See HDFS-14211 for more description of this feature. |
| * If this is below 0, never auto-msync. If this is 0, perform an msync on |
| * every read operation. If this is above 0, perform an msync after this many |
| * ms have elapsed since the last msync. |
| */ |
| private final long autoMsyncPeriodMs; |
| |
| /** |
| * The time, in millisecond epoch, that the last msync operation was |
| * performed. This includes any implicit msync (any operation which is |
| * serviced by the Active NameNode). |
| */ |
| private volatile long lastMsyncTimeMs = -1; |
| |
| /** |
| * A client using an ObserverReadProxyProvider should first sync with the |
| * active NameNode on startup. This ensures that the client reads data which |
| * is consistent with the state of the world as of the time of its |
| * instantiation. This variable will be true after this initial sync has |
| * been performed. |
| */ |
| private volatile boolean msynced = false; |
| |
| /** |
| * The index into the nameNodeProxies list currently being used. Should only |
| * be accessed in synchronized methods. |
| */ |
| private int currentIndex = -1; |
| |
| /** |
| * The proxy being used currently. Should only be accessed in synchronized |
| * methods. |
| */ |
| private NNProxyInfo<T> currentProxy; |
| |
| /** The last proxy that has been used. Only used for testing. */ |
| private volatile ProxyInfo<T> lastProxy = null; |
| |
| /** |
| * By default ObserverReadProxyProvider uses |
| * {@link ConfiguredFailoverProxyProvider} for failover. |
| */ |
| public ObserverReadProxyProvider( |
| Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) { |
| this(conf, uri, xface, factory, |
| new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory)); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public ObserverReadProxyProvider( |
| Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory, |
| AbstractNNFailoverProxyProvider<T> failoverProxy) { |
| super(conf, uri, xface, factory); |
| this.failoverProxy = failoverProxy; |
| this.alignmentContext = new ClientGSIContext(); |
| factory.setAlignmentContext(alignmentContext); |
| |
| // Don't bother configuring the number of retries and such on the retry |
| // policy since it is mainly only used for determining whether or not an |
| // exception is retriable or fatal |
| observerRetryPolicy = RetryPolicies.failoverOnNetworkException( |
| RetryPolicies.TRY_ONCE_THEN_FAIL, 1); |
| |
| // Get all NameNode proxies |
| nameNodeProxies = getProxyAddresses(uri, |
| HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); |
| |
| // Create a wrapped proxy containing all the proxies. Since this combined |
| // proxy is just redirecting to other proxies, all invocations can share it. |
| StringBuilder combinedInfo = new StringBuilder("["); |
| for (int i = 0; i < nameNodeProxies.size(); i++) { |
| if (i > 0) { |
| combinedInfo.append(","); |
| } |
| combinedInfo.append(nameNodeProxies.get(i).proxyInfo); |
| } |
| combinedInfo.append(']'); |
| T wrappedProxy = (T) Proxy.newProxyInstance( |
| ObserverReadInvocationHandler.class.getClassLoader(), |
| new Class<?>[] {xface}, new ObserverReadInvocationHandler()); |
| combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString()); |
| |
| autoMsyncPeriodMs = conf.getTimeDuration( |
| // The host of the URI is the nameservice ID |
| AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(), |
| AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); |
| |
| // TODO : make this configurable or remove this variable |
| if (wrappedProxy instanceof ClientProtocol) { |
| this.observerReadEnabled = true; |
| } else { |
| LOG.info("Disabling observer reads for {} because the requested proxy " |
| + "class does not implement {}", uri, ClientProtocol.class.getName()); |
| this.observerReadEnabled = false; |
| } |
| } |
| |
| public AlignmentContext getAlignmentContext() { |
| return alignmentContext; |
| } |
| |
| @Override |
| public ProxyInfo<T> getProxy() { |
| return combinedProxy; |
| } |
| |
| @Override |
| public void performFailover(T currentProxy) { |
| failoverProxy.performFailover(currentProxy); |
| } |
| |
| /** |
| * Check if a method is read-only. |
| * |
| * @return whether the 'method' is a read-only operation. |
| */ |
| private static boolean isRead(Method method) { |
| if (!method.isAnnotationPresent(ReadOnly.class)) { |
| return false; |
| } |
| return !method.getAnnotation(ReadOnly.class).activeOnly(); |
| } |
| |
| @VisibleForTesting |
| void setObserverReadEnabled(boolean flag) { |
| this.observerReadEnabled = flag; |
| } |
| |
| @VisibleForTesting |
| ProxyInfo<T> getLastProxy() { |
| return lastProxy; |
| } |
| |
| /** |
| * Return the currently used proxy. If there is none, first calls |
| * {@link #changeProxy(NNProxyInfo)} to initialize one. |
| */ |
| private NNProxyInfo<T> getCurrentProxy() { |
| return changeProxy(null); |
| } |
| |
| /** |
| * Move to the next proxy in the proxy list. If the NNProxyInfo supplied by |
| * the caller does not match the current proxy, the call is ignored; this is |
| * to handle concurrent calls (to avoid changing the proxy multiple times). |
| * The service state of the newly selected proxy will be updated before |
| * returning. |
| * |
| * @param initial The expected current proxy |
| * @return The new proxy that should be used. |
| */ |
| private synchronized NNProxyInfo<T> changeProxy(NNProxyInfo<T> initial) { |
| if (currentProxy != initial) { |
| // Must have been a concurrent modification; ignore the move request |
| return currentProxy; |
| } |
| currentIndex = (currentIndex + 1) % nameNodeProxies.size(); |
| currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex)); |
| currentProxy.setCachedState(getHAServiceState(currentProxy)); |
| LOG.debug("Changed current proxy from {} to {}", |
| initial == null ? "none" : initial.proxyInfo, |
| currentProxy.proxyInfo); |
| return currentProxy; |
| } |
| |
| /** |
| * Fetch the service state from a proxy. If it is unable to be fetched, |
| * assume it is in standby state, but log the exception. |
| */ |
| private HAServiceState getHAServiceState(NNProxyInfo<T> proxyInfo) { |
| IOException ioe; |
| try { |
| return getProxyAsClientProtocol(proxyInfo.proxy).getHAServiceState(); |
| } catch (RemoteException re) { |
| // Though a Standby will allow a getHAServiceState call, it won't allow |
| // delegation token lookup, so if DT is used it throws StandbyException |
| if (re.unwrapRemoteException() instanceof StandbyException) { |
| LOG.debug("NameNode {} threw StandbyException when fetching HAState", |
| proxyInfo.getAddress()); |
| return HAServiceState.STANDBY; |
| } |
| ioe = re; |
| } catch (IOException e) { |
| ioe = e; |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Failed to connect to {} while fetching HAServiceState", |
| proxyInfo.getAddress(), ioe); |
| } |
| return null; |
| } |
| |
| /** |
| * Return the input proxy, cast as a {@link ClientProtocol}. This catches any |
| * {@link ClassCastException} and wraps it in a more helpful message. This |
| * should ONLY be called if the caller is certain that the proxy is, in fact, |
| * a {@link ClientProtocol}. |
| */ |
| private ClientProtocol getProxyAsClientProtocol(T proxy) { |
| assert proxy instanceof ClientProtocol : "BUG: Attempted to use proxy " |
| + "of class " + proxy.getClass() + " as if it was a ClientProtocol."; |
| return (ClientProtocol) proxy; |
| } |
| |
| /** |
| * This will call {@link ClientProtocol#msync()} on the active NameNode |
| * (via the {@link #failoverProxy}) to initialize the state of this client. |
| * Calling it multiple times is a no-op; only the first will perform an |
| * msync. |
| * |
| * @see #msynced |
| */ |
| private synchronized void initializeMsync() throws IOException { |
| if (msynced) { |
| return; // No need for an msync |
| } |
| getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync(); |
| msynced = true; |
| lastMsyncTimeMs = Time.monotonicNow(); |
| } |
| |
| /** |
| * This will call {@link ClientProtocol#msync()} on the active NameNode |
| * (via the {@link #failoverProxy}) to update the state of this client, only |
| * if at least {@link #autoMsyncPeriodMs} ms has elapsed since the last time |
| * an msync was performed. |
| * |
| * @see #autoMsyncPeriodMs |
| */ |
| private void autoMsyncIfNecessary() throws IOException { |
| if (autoMsyncPeriodMs == 0) { |
| // Always msync |
| getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync(); |
| } else if (autoMsyncPeriodMs > 0) { |
| if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) { |
| synchronized (this) { |
| // Use a synchronized block so that only one thread will msync |
| // if many operations are submitted around the same time. |
| // Re-check the entry criterion since the status may have changed |
| // while waiting for the lock. |
| if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) { |
| getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync(); |
| lastMsyncTimeMs = Time.monotonicNow(); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * An InvocationHandler to handle incoming requests. This class's invoke |
| * method contains the primary logic for redirecting to observers. |
| * |
| * If observer reads are enabled, attempt to send read operations to the |
| * current proxy. If it is not an observer, or the observer fails, adjust |
| * the current proxy and retry on the next one. If all proxies are tried |
| * without success, the request is forwarded to the active. |
| * |
| * Write requests are always forwarded to the active. |
| */ |
| private class ObserverReadInvocationHandler implements RpcInvocationHandler { |
| |
| @Override |
| public Object invoke(Object proxy, final Method method, final Object[] args) |
| throws Throwable { |
| lastProxy = null; |
| Object retVal; |
| |
| if (observerReadEnabled && isRead(method)) { |
| if (!msynced) { |
| // An msync() must first be performed to ensure that this client is |
| // up-to-date with the active's state. This will only be done once. |
| initializeMsync(); |
| } else { |
| autoMsyncIfNecessary(); |
| } |
| |
| int failedObserverCount = 0; |
| int activeCount = 0; |
| int standbyCount = 0; |
| int unreachableCount = 0; |
| for (int i = 0; i < nameNodeProxies.size(); i++) { |
| NNProxyInfo<T> current = getCurrentProxy(); |
| HAServiceState currState = current.getCachedState(); |
| if (currState != HAServiceState.OBSERVER) { |
| if (currState == HAServiceState.ACTIVE) { |
| activeCount++; |
| } else if (currState == HAServiceState.STANDBY) { |
| standbyCount++; |
| } else if (currState == null) { |
| unreachableCount++; |
| } |
| LOG.debug("Skipping proxy {} for {} because it is in state {}", |
| current.proxyInfo, method.getName(), |
| currState == null ? "unreachable" : currState); |
| changeProxy(current); |
| continue; |
| } |
| LOG.debug("Attempting to service {} using proxy {}", |
| method.getName(), current.proxyInfo); |
| try { |
| retVal = method.invoke(current.proxy, args); |
| lastProxy = current; |
| LOG.debug("Invocation of {} using {} was successful", |
| method.getName(), current.proxyInfo); |
| return retVal; |
| } catch (InvocationTargetException ite) { |
| if (!(ite.getCause() instanceof Exception)) { |
| throw ite.getCause(); |
| } |
| Exception e = (Exception) ite.getCause(); |
| if (e instanceof RemoteException) { |
| RemoteException re = (RemoteException) e; |
| Exception unwrapped = re.unwrapRemoteException( |
| ObserverRetryOnActiveException.class); |
| if (unwrapped instanceof ObserverRetryOnActiveException) { |
| LOG.info("Encountered ObserverRetryOnActiveException from {}." + |
| " Retry active namenode directly.", current.proxyInfo); |
| break; |
| } |
| } |
| RetryAction retryInfo = observerRetryPolicy.shouldRetry(e, 0, 0, |
| method.isAnnotationPresent(Idempotent.class) |
| || method.isAnnotationPresent(AtMostOnce.class)); |
| if (retryInfo.action == RetryAction.RetryDecision.FAIL) { |
| throw e; |
| } else { |
| failedObserverCount++; |
| LOG.warn( |
| "Invocation returned exception on [{}]; {} failure(s) so far", |
| current.proxyInfo, failedObserverCount, e); |
| changeProxy(current); |
| } |
| } |
| } |
| |
| // Only log message if there are actual observer failures. |
| // Getting here with failedObserverCount = 0 could |
| // be that there is simply no Observer node running at all. |
| if (failedObserverCount > 0) { |
| // If we get here, it means all observers have failed. |
| LOG.warn("{} observers have failed for read request {}; also found " |
| + "{} standby, {} active, and {} unreachable. Falling back " |
| + "to active.", failedObserverCount, method.getName(), |
| standbyCount, activeCount, unreachableCount); |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Read falling back to active without observer read " |
| + "fail, is there no observer node running?"); |
| } |
| } |
| } |
| |
| // Either all observers have failed, observer reads are disabled, |
| // or this is a write request. In any case, forward the request to |
| // the active NameNode. |
| LOG.debug("Using failoverProxy to service {}", method.getName()); |
| ProxyInfo<T> activeProxy = failoverProxy.getProxy(); |
| try { |
| retVal = method.invoke(activeProxy.proxy, args); |
| } catch (InvocationTargetException e) { |
| // This exception will be handled by higher layers |
| throw e.getCause(); |
| } |
| // If this was reached, the request reached the active, so the |
| // state is up-to-date with active and no further msync is needed. |
| msynced = true; |
| lastMsyncTimeMs = Time.monotonicNow(); |
| lastProxy = activeProxy; |
| return retVal; |
| } |
| |
| @Override |
| public void close() throws IOException {} |
| |
| @Override |
| public ConnectionId getConnectionId() { |
| return RPC.getConnectionIdForProxy(observerReadEnabled |
| ? getCurrentProxy().proxy : failoverProxy.getProxy().proxy); |
| } |
| } |
| |
| @Override |
| public synchronized void close() throws IOException { |
| for (ProxyInfo<T> pi : nameNodeProxies) { |
| if (pi.proxy != null) { |
| if (pi.proxy instanceof Closeable) { |
| ((Closeable)pi.proxy).close(); |
| } else { |
| RPC.stopProxy(pi.proxy); |
| } |
| // Set to null to avoid the failoverProxy having to re-do the close |
| // if it is sharing a proxy instance |
| pi.proxy = null; |
| } |
| } |
| failoverProxy.close(); |
| } |
| |
| @Override |
| public boolean useLogicalURI() { |
| return failoverProxy.useLogicalURI(); |
| } |
| } |