blob: 960fe0084ef228b33ca51e4725722fb803f5babb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.util.StringUtils.getTrimmedStringCollection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Path;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.hdfs.web.ParamFilter;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
import org.apache.hadoop.hdfs.web.resources.CreateFlagParam;
import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam;
import org.apache.hadoop.hdfs.web.resources.FsActionParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.GroupParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
import org.apache.hadoop.hdfs.web.resources.NewLengthParam;
import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OldSnapshotNameParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.hdfs.web.resources.PermissionParam;
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
import org.apache.hadoop.hdfs.web.resources.SnapshotNameParam;
import org.apache.hadoop.hdfs.web.resources.StartAfterParam;
import org.apache.hadoop.hdfs.web.resources.StoragePolicyParam;
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.TokenKindParam;
import org.apache.hadoop.hdfs.web.resources.TokenServiceParam;
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.hdfs.web.resources.XAttrEncodingParam;
import org.apache.hadoop.hdfs.web.resources.XAttrNameParam;
import org.apache.hadoop.hdfs.web.resources.XAttrSetFlagParam;
import org.apache.hadoop.hdfs.web.resources.XAttrValueParam;
import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLDecoder;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
/**
* WebHDFS Router implementation. This is an extension of
* {@link NamenodeWebHdfsMethods}, and tries to reuse as much as possible.
*/
@Path("")
@ResourceFilters(ParamFilter.class)
public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
private static final Logger LOG =
LoggerFactory.getLogger(RouterWebHdfsMethods.class);
private static final ThreadLocal<String> REMOTE_ADDRESS =
new ThreadLocal<String>();
private @Context HttpServletRequest request;
private String method;
private String query;
private String reqPath;
public RouterWebHdfsMethods(@Context HttpServletRequest request) {
super(request);
this.method = request.getMethod();
this.query = request.getQueryString();
this.reqPath = request.getServletPath();
REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request));
}
@Override
protected void init(final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final UriFsPathParam path, final HttpOpParam<?> op,
final Param<?, ?>... parameters) {
super.init(ugi, delegation, username, doAsUser, path, op, parameters);
REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request));
}
@Override
protected ClientProtocol getRpcClientProtocol() throws IOException {
final Router router = getRouter();
final RouterRpcServer routerRpcServer = router.getRpcServer();
if (routerRpcServer == null) {
throw new RetriableException("Router is in startup mode");
}
return routerRpcServer;
}
private void reset() {
REMOTE_ADDRESS.set(null);
}
@Override
protected String getRemoteAddr() {
return REMOTE_ADDRESS.get();
}
@Override
protected void queueExternalCall(ExternalCall call)
throws IOException, InterruptedException {
getRouter().getRpcServer().getServer().queueCall(call);
}
private Router getRouter() {
return (Router)getContext().getAttribute("name.node");
}
private static RouterRpcServer getRPCServer(final Router router)
throws IOException {
final RouterRpcServer routerRpcServer = router.getRpcServer();
if (routerRpcServer == null) {
throw new RetriableException("Router is in startup mode");
}
return routerRpcServer;
}
@Override
protected Response put(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,
final DoAsParam doAsUser,
final String fullpath,
final PutOpParam op,
final DestinationParam destination,
final OwnerParam owner,
final GroupParam group,
final PermissionParam permission,
final OverwriteParam overwrite,
final BufferSizeParam bufferSize,
final ReplicationParam replication,
final BlockSizeParam blockSize,
final ModificationTimeParam modificationTime,
final AccessTimeParam accessTime,
final RenameOptionSetParam renameOptions,
final CreateParentParam createParent,
final TokenArgumentParam delegationTokenArgument,
final AclPermissionParam aclPermission,
final XAttrNameParam xattrName,
final XAttrValueParam xattrValue,
final XAttrSetFlagParam xattrSetFlag,
final SnapshotNameParam snapshotName,
final OldSnapshotNameParam oldSnapshotName,
final ExcludeDatanodesParam exclDatanodes,
final CreateFlagParam createFlagParam,
final NoRedirectParam noredirectParam,
final StoragePolicyParam policyName
) throws IOException, URISyntaxException {
switch(op.getValue()) {
case CREATE:
{
final Router router = getRouter();
final URI uri = redirectURI(router, fullpath);
if (!noredirectParam.getValue()) {
return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
} else {
final String js = JsonUtil.toJsonString("Location", uri);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
case MKDIRS:
case CREATESYMLINK:
case RENAME:
case SETREPLICATION:
case SETOWNER:
case SETPERMISSION:
case SETTIMES:
case RENEWDELEGATIONTOKEN:
case CANCELDELEGATIONTOKEN:
case MODIFYACLENTRIES:
case REMOVEACLENTRIES:
case REMOVEDEFAULTACL:
case REMOVEACL:
case SETACL:
case SETXATTR:
case REMOVEXATTR:
case ALLOWSNAPSHOT:
case CREATESNAPSHOT:
case RENAMESNAPSHOT:
case DISALLOWSNAPSHOT:
case SETSTORAGEPOLICY:
{
// Whitelist operations that can handled by NamenodeWebHdfsMethods
return super.put(ugi, delegation, username, doAsUser, fullpath, op,
destination, owner, group, permission,
overwrite, bufferSize, replication, blockSize, modificationTime,
accessTime, renameOptions, createParent, delegationTokenArgument,
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
oldSnapshotName, exclDatanodes, createFlagParam, noredirectParam,
policyName);
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}
}
@Override
protected Response post(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,
final DoAsParam doAsUser,
final String fullpath,
final PostOpParam op,
final ConcatSourcesParam concatSrcs,
final BufferSizeParam bufferSize,
final ExcludeDatanodesParam excludeDatanodes,
final NewLengthParam newLength,
final NoRedirectParam noRedirectParam
) throws IOException, URISyntaxException {
switch(op.getValue()) {
case APPEND:
{
final Router router = getRouter();
final URI uri = redirectURI(router, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L,
excludeDatanodes.getValue(), bufferSize);
if (!noRedirectParam.getValue()) {
return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
} else {
final String js = JsonUtil.toJsonString("Location", uri);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
case CONCAT:
case TRUNCATE:
case UNSETSTORAGEPOLICY:
{
return super.post(ugi, delegation, username, doAsUser, fullpath, op,
concatSrcs, bufferSize, excludeDatanodes, newLength,
noRedirectParam);
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}
}
@Override
protected Response get(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,
final DoAsParam doAsUser,
final String fullpath,
final GetOpParam op,
final OffsetParam offset,
final LengthParam length,
final RenewerParam renewer,
final BufferSizeParam bufferSize,
final List<XAttrNameParam> xattrNames,
final XAttrEncodingParam xattrEncoding,
final ExcludeDatanodesParam excludeDatanodes,
final FsActionParam fsAction,
final TokenKindParam tokenKind,
final TokenServiceParam tokenService,
final NoRedirectParam noredirectParam,
final StartAfterParam startAfter
) throws IOException, URISyntaxException {
try {
final Router router = getRouter();
switch (op.getValue()) {
case OPEN:
{
final URI uri = redirectURI(router, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), offset.getValue(),
excludeDatanodes.getValue(), offset, length, bufferSize);
if (!noredirectParam.getValue()) {
return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
} else {
final String js = JsonUtil.toJsonString("Location", uri);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
case GETFILECHECKSUM:
{
final URI uri = redirectURI(router, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L, null);
if (!noredirectParam.getValue()) {
return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
} else {
final String js = JsonUtil.toJsonString("Location", uri);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
case GET_BLOCK_LOCATIONS:
case GETFILESTATUS:
case LISTSTATUS:
case GETCONTENTSUMMARY:
case GETHOMEDIRECTORY:
case GETACLSTATUS:
case GETXATTRS:
case LISTXATTRS:
case CHECKACCESS:
{
return super.get(ugi, delegation, username, doAsUser, fullpath, op,
offset, length, renewer, bufferSize, xattrNames, xattrEncoding,
excludeDatanodes, fsAction, tokenKind, tokenService,
noredirectParam, startAfter);
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}
} finally {
reset();
}
}
/**
* Get the redirect URI from the Namenode responsible for a path.
* @param router Router to check.
* @param path Path to get location for.
* @return URI returned by the Namenode.
* @throws IOException If it cannot get the redirect URI.
*/
private URI redirectURI(final Router router, final String path)
throws IOException {
// Forward the request to the proper Namenode
final HttpURLConnection conn = forwardRequest(router, path);
try {
conn.setInstanceFollowRedirects(false);
conn.setDoOutput(true);
conn.connect();
// Read the reply from the Namenode
int responseCode = conn.getResponseCode();
if (responseCode != HttpServletResponse.SC_TEMPORARY_REDIRECT) {
LOG.info("We expected a redirection from the Namenode, not {}",
responseCode);
return null;
}
// Extract the redirect location and return it
String redirectLocation = conn.getHeaderField("Location");
try {
// We modify the namenode location and the path
redirectLocation = redirectLocation
.replaceAll("(?<=[?&;])namenoderpcaddress=.*?(?=[&;])",
"namenoderpcaddress=" + router.getRouterId())
.replaceAll("(?<=[/])webhdfs/v1/.*?(?=[?])",
"webhdfs/v1" + path);
return new URI(redirectLocation);
} catch (URISyntaxException e) {
LOG.error("Cannot parse redirect location {}", redirectLocation);
}
} finally {
if (conn != null) {
conn.disconnect();
}
}
return null;
}
/**
* Forwards a request to a subcluster.
* @param router Router to check.
* @param path Path in HDFS.
* @return Reply from the subcluster.
* @throws IOException
*/
private HttpURLConnection forwardRequest(
final Router router, final String path) throws IOException {
final Configuration conf =
(Configuration)getContext().getAttribute(JspHelper.CURRENT_CONF);
URLConnectionFactory connectionFactory =
URLConnectionFactory.newDefaultURLConnectionFactory(conf);
// Find the namespace responsible for a path
final RouterRpcServer rpcServer = getRPCServer(router);
RemoteLocation createLoc = rpcServer.getCreateLocation(path);
String nsId = createLoc.getNameserviceId();
String dest = createLoc.getDest();
ActiveNamenodeResolver nnResolver = router.getNamenodeResolver();
List<? extends FederationNamenodeContext> namenodes =
nnResolver.getNamenodesForNameserviceId(nsId);
// Go over the namenodes responsible for that namespace
for (FederationNamenodeContext namenode : namenodes) {
try {
// Generate the request for the namenode
String nnWebAddress = namenode.getWebAddress();
String[] nnWebAddressSplit = nnWebAddress.split(":");
String host = nnWebAddressSplit[0];
int port = Integer.parseInt(nnWebAddressSplit[1]);
// Avoid double-encoding here
query = URLDecoder.decode(query, "UTF-8");
URI uri = new URI(getScheme(), null, host, port,
reqPath + dest, query, null);
URL url = uri.toURL();
// Send a request to the proper Namenode
final HttpURLConnection conn =
(HttpURLConnection)connectionFactory.openConnection(url);
conn.setRequestMethod(method);
return conn;
} catch (Exception e) {
LOG.error("Cannot redirect request to {}", namenode, e);
}
}
return null;
}
/**
* Get a URI to redirect an operation to.
* @param router Router to check.
* @param ugi User group information.
* @param delegation Delegation token.
* @param username User name.
* @param doAsUser Do as user.
* @param path Path to check.
* @param op Operation to perform.
* @param openOffset Offset for opening a file.
* @param excludeDatanodes Blocks to excluded.
* @param parameters Other parameters.
* @return Redirection URI.
* @throws URISyntaxException If it cannot parse the URI.
* @throws IOException If it cannot create the URI.
*/
private URI redirectURI(final Router router, final UserGroupInformation ugi,
final DelegationParam delegation, final UserParam username,
final DoAsParam doAsUser, final String path, final HttpOpParam.Op op,
final long openOffset, final String excludeDatanodes,
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn =
chooseDatanode(router, path, op, openOffset, excludeDatanodes);
if (dn == null) {
throw new IOException("Failed to find datanode, suggest to check cluster"
+ " health. excludeDatanodes=" + excludeDatanodes);
}
final String delegationQuery;
if (!UserGroupInformation.isSecurityEnabled()) {
// security disabled
delegationQuery = Param.toSortedString("&", doAsUser, username);
} else if (delegation.getValue() != null) {
// client has provided a token
delegationQuery = "&" + delegation;
} else {
// generate a token
final Token<? extends TokenIdentifier> t = generateDelegationToken(
router, ugi, request.getUserPrincipal().getName());
delegationQuery = "&delegation=" + t.encodeToUrlString();
}
final String redirectQuery = op.toQueryString() + delegationQuery
+ "&namenoderpcaddress=" + router.getRouterId()
+ Param.toSortedString("&", parameters);
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
int port = "http".equals(getScheme()) ? dn.getInfoPort() :
dn.getInfoSecurePort();
final URI uri = new URI(getScheme(), null, dn.getHostName(), port, uripath,
redirectQuery, null);
if (LOG.isTraceEnabled()) {
LOG.trace("redirectURI={}", uri);
}
return uri;
}
private DatanodeInfo chooseDatanode(final Router router,
final String path, final HttpOpParam.Op op, final long openOffset,
final String excludeDatanodes) throws IOException {
// We need to get the DNs as a privileged user
final RouterRpcServer rpcServer = getRPCServer(router);
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
DatanodeInfo[] dns = loginUser.doAs(
new PrivilegedAction<DatanodeInfo[]>() {
@Override
public DatanodeInfo[] run() {
try {
return rpcServer.getDatanodeReport(DatanodeReportType.LIVE);
} catch (IOException e) {
LOG.error("Cannot get the datanodes from the RPC server", e);
return null;
}
}
});
HashSet<Node> excludes = new HashSet<Node>();
if (excludeDatanodes != null) {
Collection<String> collection =
getTrimmedStringCollection(excludeDatanodes);
for (DatanodeInfo dn : dns) {
if (collection.contains(dn.getName())) {
excludes.add(dn);
}
}
}
if (op == GetOpParam.Op.OPEN ||
op == PostOpParam.Op.APPEND ||
op == GetOpParam.Op.GETFILECHECKSUM) {
// Choose a datanode containing a replica
final ClientProtocol cp = getRpcClientProtocol();
final HdfsFileStatus status = cp.getFileInfo(path);
if (status == null) {
throw new FileNotFoundException("File " + path + " not found.");
}
final long len = status.getLen();
if (op == GetOpParam.Op.OPEN) {
if (openOffset < 0L || (openOffset >= len && len > 0)) {
throw new IOException("Offset=" + openOffset
+ " out of the range [0, " + len + "); " + op + ", path=" + path);
}
}
if (len > 0) {
final long offset = op == GetOpParam.Op.OPEN ? openOffset : len - 1;
final LocatedBlocks locations = cp.getBlockLocations(path, offset, 1);
final int count = locations.locatedBlockCount();
if (count > 0) {
LocatedBlock location0 = locations.get(0);
return bestNode(location0.getLocations(), excludes);
}
}
}
return getRandomDatanode(dns, excludes);
}
/**
* Get a random Datanode from a subcluster.
* @param dns Nodes to be chosen from.
* @param excludes Nodes to be excluded from.
* @return Random datanode from a particular subluster.
*/
private static DatanodeInfo getRandomDatanode(
final DatanodeInfo[] dns, final HashSet<Node> excludes) {
DatanodeInfo dn = null;
if (dns == null) {
return dn;
}
int numDNs = dns.length;
int availableNodes = 0;
if (excludes.isEmpty()) {
availableNodes = numDNs;
} else {
for (DatanodeInfo di : dns) {
if (!excludes.contains(di)) {
availableNodes++;
}
}
}
// Return a random one from the list
if (availableNodes > 0) {
while (dn == null || excludes.contains(dn)) {
Random rnd = new Random();
int idx = rnd.nextInt(numDNs);
dn = dns[idx];
}
}
return dn;
}
/**
* Generate the delegation tokens for this request.
* @param router Router.
* @param ugi User group information.
* @param renewer Who is asking for the renewal.
* @return The delegation tokens.
* @throws IOException If it cannot create the tokens.
*/
private Token<? extends TokenIdentifier> generateDelegationToken(
final Router router, final UserGroupInformation ugi,
final String renewer) throws IOException {
throw new UnsupportedOperationException("TODO Generate token for ugi=" +
ugi + " request=" + request);
}
}