blob: 486b52ce9cb009ea396936ab1395ea2d55de5ad0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import com.google.common.base.Preconditions;
/**
* Create proxy objects to communicate with a remote NN. All remote access to an
* NN should be funneled through this class. Most of the time you'll want to use
* {@link NameNodeProxies#createProxy(Configuration, URI, Class)}, which will
* create either an HA- or non-HA-enabled client proxy as appropriate.
*/
public class NameNodeProxies {
private static final Log LOG = LogFactory.getLog(NameNodeProxies.class);
/**
* Wrapper for a client proxy as well as its associated service ID.
* This is simply used as a tuple-like return type for
* {@link NameNodeProxies#createProxy} and
* {@link NameNodeProxies#createNonHAProxy}.
*/
public static class ProxyAndInfo<PROXYTYPE> {
private final PROXYTYPE proxy;
private final Text dtService;
public ProxyAndInfo(PROXYTYPE proxy, Text dtService) {
this.proxy = proxy;
this.dtService = dtService;
}
public PROXYTYPE getProxy() {
return proxy;
}
public Text getDelegationTokenService() {
return dtService;
}
}
/**
* Creates the namenode proxy with the passed protocol. This will handle
* creation of either HA- or non-HA-enabled proxy objects, depending upon
* if the provided URI is a configured logical URI.
*
* @param conf the configuration containing the required IPC
* properties, client failover configurations, etc.
* @param nameNodeUri the URI pointing either to a specific NameNode
* or to a logical nameservice.
* @param xface the IPC interface which should be created
* @return an object containing both the proxy and the associated
* delegation token service it corresponds to
* @throws IOException if there is an error creating the proxy
**/
@SuppressWarnings("unchecked")
public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
URI nameNodeUri, Class<T> xface) throws IOException {
Class<FailoverProxyProvider<T>> failoverProxyProviderClass =
getFailoverProxyProviderClass(conf, nameNodeUri, xface);
if (failoverProxyProviderClass == null) {
// Non-HA case
return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
UserGroupInformation.getCurrentUser(), true);
} else {
// HA case
FailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies
.createFailoverProxyProvider(conf, failoverProxyProviderClass, xface,
nameNodeUri);
Conf config = new Conf(conf);
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
config.maxFailoverAttempts, config.failoverSleepBaseMillis,
config.failoverSleepMaxMillis));
Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
return new ProxyAndInfo<T>(proxy, dtService);
}
}
/**
* Creates an explicitly non-HA-enabled proxy object. Most of the time you
* don't want to use this, and should instead use {@link NameNodeProxies#createProxy}.
*
* @param conf the configuration object
* @param nnAddr address of the remote NN to connect to
* @param xface the IPC interface which should be created
* @param ugi the user who is making the calls on the proxy object
* @param withRetries certain interfaces have a non-standard retry policy
* @return an object containing both the proxy and the associated
* delegation token service it corresponds to
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static <T> ProxyAndInfo<T> createNonHAProxy(
Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries) throws IOException {
Text dtService = SecurityUtil.buildTokenService(nnAddr);
T proxy;
if (xface == ClientProtocol.class) {
proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,
withRetries);
} else if (xface == JournalProtocol.class) {
proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
} else if (xface == NamenodeProtocol.class) {
proxy = (T) createNNProxyWithNamenodeProtocol(nnAddr, conf, ugi,
withRetries);
} else if (xface == GetUserMappingsProtocol.class) {
proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi);
} else if (xface == RefreshUserMappingsProtocol.class) {
proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf, ugi);
} else if (xface == RefreshAuthorizationPolicyProtocol.class) {
proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr,
conf, ugi);
} else {
String message = "Upsupported protocol found when creating the proxy " +
"connection to NameNode: " +
((xface != null) ? xface.getClass().getName() : "null");
LOG.error(message);
throw new IllegalStateException(message);
}
return new ProxyAndInfo<T>(proxy, dtService);
}
private static JournalProtocol createNNProxyWithJournalProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
throws IOException {
JournalProtocolPB proxy = (JournalProtocolPB) createNameNodeProxy(address,
conf, ugi, JournalProtocolPB.class);
return new JournalProtocolTranslatorPB(proxy);
}
private static RefreshAuthorizationPolicyProtocol
createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi) throws IOException {
RefreshAuthorizationPolicyProtocolPB proxy = (RefreshAuthorizationPolicyProtocolPB)
createNameNodeProxy(address, conf, ugi, RefreshAuthorizationPolicyProtocolPB.class);
return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(proxy);
}
private static RefreshUserMappingsProtocol
createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi) throws IOException {
RefreshUserMappingsProtocolPB proxy = (RefreshUserMappingsProtocolPB)
createNameNodeProxy(address, conf, ugi, RefreshUserMappingsProtocolPB.class);
return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy);
}
private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
throws IOException {
GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB)
createNameNodeProxy(address, conf, ugi, GetUserMappingsProtocolPB.class);
return new GetUserMappingsProtocolClientSideTranslatorPB(proxy);
}
private static NamenodeProtocol createNNProxyWithNamenodeProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries) throws IOException {
NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy(
address, conf, ugi, NamenodeProtocolPB.class);
if (withRetries) { // create the proxy with retries
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap
= new HashMap<Class<? extends Exception>, RetryPolicy>();
RetryPolicy methodPolicy = RetryPolicies.retryByException(timeoutPolicy,
exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap
= new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("getBlocks", methodPolicy);
methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
proxy = (NamenodeProtocolPB) RetryProxy.create(NamenodeProtocolPB.class,
proxy, methodNameToPolicyMap);
}
return new NamenodeProtocolTranslatorPB(proxy);
}
private static ClientProtocol createNNProxyWithClientProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries) throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
final RetryPolicy defaultPolicy =
RetryUtils.getDefaultRetryPolicy(
conf,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
SafeModeException.class);
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy)
.getProxy();
if (withRetries) { // create the proxy with retries
RetryPolicy createPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(5,
HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap
= new HashMap<Class<? extends Exception>, RetryPolicy>();
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
createPolicy);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap
= new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
.retryByRemoteException(defaultPolicy,
remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
defaultPolicy, exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap
= new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
ClientNamenodeProtocolPB.class,
new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
ClientNamenodeProtocolPB.class, proxy),
methodNameToPolicyMap,
defaultPolicy);
}
return new ClientNamenodeProtocolTranslatorPB(proxy);
}
@SuppressWarnings("unchecked")
private static Object createNameNodeProxy(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi, Class xface)
throws IOException {
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
ugi, conf, NetUtils.getDefaultSocketFactory(conf));
return proxy;
}
/** Gets the configured Failover proxy provider's class */
private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
if (nameNodeUri == null) {
return null;
}
String host = nameNodeUri.getHost();
String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "."
+ host;
try {
@SuppressWarnings("unchecked")
Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) conf
.getClass(configKey, null, FailoverProxyProvider.class);
if (ret != null) {
// If we found a proxy provider, then this URI should be a logical NN.
// Given that, it shouldn't have a non-default port number.
int port = nameNodeUri.getPort();
if (port > 0 && port != NameNode.DEFAULT_PORT) {
throw new IOException("Port " + port + " specified in URI "
+ nameNodeUri + " but host '" + host
+ "' is a logical (HA) namenode"
+ " and does not use port information.");
}
}
return ret;
} catch (RuntimeException e) {
if (e.getCause() instanceof ClassNotFoundException) {
throw new IOException("Could not load failover proxy provider class "
+ conf.get(configKey) + " which is configured for authority "
+ nameNodeUri, e);
} else {
throw e;
}
}
}
/** Creates the Failover proxy provider instance*/
@SuppressWarnings("unchecked")
private static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass,
Class<T> xface, URI nameNodeUri) throws IOException {
Preconditions.checkArgument(
xface.isAssignableFrom(NamenodeProtocols.class),
"Interface %s is not a NameNode protocol", xface);
try {
Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
.getConstructor(Configuration.class, URI.class, Class.class);
FailoverProxyProvider<?> provider = ctor.newInstance(conf, nameNodeUri,
xface);
return (FailoverProxyProvider<T>) provider;
} catch (Exception e) {
String message = "Couldn't create proxy provider " + failoverProxyProviderClass;
if (LOG.isDebugEnabled()) {
LOG.debug(message, e);
}
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
throw new IOException(message, e);
}
}
}
}