blob: 836af9d68f8a58dc34dd3a63a4bafe01f8104aee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.web.resources;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.Principal;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutionException;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.hdfs.web.ParamFilter;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.sun.jersey.spi.container.ResourceFilters;
/** Web-hdfs NameNode implementation. */
@Path("")
@ResourceFilters(ParamFilter.class)
public class NamenodeWebHdfsMethods {
public static final Log LOG = LogFactory.getLog(NamenodeWebHdfsMethods.class);
private static final UriFsPathParam ROOT = new UriFsPathParam("");
private volatile Boolean useIpcCallq;
private String scheme;
private Principal userPrincipal;
private String remoteAddr;
private @Context ServletContext context;
private @Context HttpServletResponse response;
private boolean supportEZ;
public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
// the request object is a proxy to thread-locals so we have to extract
// what we want from it since the external call will be processed in a
// different thread.
scheme = request.getScheme();
userPrincipal = request.getUserPrincipal();
// get the remote address, if coming in via a trusted proxy server then
// the address with be that of the proxied client
remoteAddr = JspHelper.getRemoteAddr(request);
supportEZ =
Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER));
}
protected void init(final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final UriFsPathParam path, final HttpOpParam<?> op,
final Param<?, ?>... parameters) {
if (useIpcCallq == null) {
Configuration conf =
(Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
useIpcCallq = conf.getBoolean(
DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ,
DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ_DEFAULT);
}
if (LOG.isTraceEnabled()) {
LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
+ ", ugi=" + ugi + ", " + username + ", " + doAsUser
+ Param.toSortedString(", ", parameters));
}
//clear content type
response.setContentType(null);
}
private static NamenodeProtocols getRPCServer(NameNode namenode)
throws IOException {
final NamenodeProtocols np = namenode.getRpcServer();
if (np == null) {
throw new RetriableException("Namenode is in startup mode");
}
return np;
}
protected ClientProtocol getRpcClientProtocol() throws IOException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final ClientProtocol cp = namenode.getRpcServer();
if (cp == null) {
throw new RetriableException("Namenode is in startup mode");
}
return cp;
}
protected String getScheme() {
return scheme;
}
protected ServletContext getContext() {
return context;
}
private <T> T doAs(final UserGroupInformation ugi,
final PrivilegedExceptionAction<T> action)
throws IOException, InterruptedException {
return useIpcCallq ? doAsExternalCall(ugi, action) : ugi.doAs(action);
}
private <T> T doAsExternalCall(final UserGroupInformation ugi,
final PrivilegedExceptionAction<T> action)
throws IOException, InterruptedException {
// set the remote address, if coming in via a trust proxy server then
// the address with be that of the proxied client
ExternalCall<T> call = new ExternalCall<T>(action){
@Override
public UserGroupInformation getRemoteUser() {
return ugi;
}
@Override
public String getProtocol() {
return "webhdfs";
}
@Override
public String getHostAddress() {
return getRemoteAddr();
}
@Override
public InetAddress getHostInetAddress() {
try {
return InetAddress.getByName(getHostAddress());
} catch (UnknownHostException e) {
return null;
}
}
};
queueExternalCall(call);
T result = null;
try {
result = call.get();
} catch (ExecutionException ee) {
Throwable t = ee.getCause();
if (t instanceof RuntimeException) {
throw (RuntimeException)t;
} else if (t instanceof IOException) {
throw (IOException)t;
} else {
throw new IOException(t);
}
}
return result;
}
protected String getRemoteAddr() {
return remoteAddr;
}
protected void queueExternalCall(ExternalCall call)
throws IOException, InterruptedException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
namenode.queueExternalCall(call);
}
@VisibleForTesting
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize, final String excludeDatanodes,
final String remoteAddr, final HdfsFileStatus status) throws IOException {
FSNamesystem fsn = namenode.getNamesystem();
if (fsn == null) {
throw new IOException("Namesystem has not been intialized yet.");
}
final BlockManager bm = fsn.getBlockManager();
HashSet<Node> excludes = new HashSet<Node>();
if (excludeDatanodes != null) {
for (String host : StringUtils
.getTrimmedStringCollection(excludeDatanodes)) {
int idx = host.indexOf(":");
if (idx != -1) {
excludes.add(bm.getDatanodeManager().getDatanodeByXferAddr(
host.substring(0, idx), Integer.parseInt(host.substring(idx + 1))));
} else {
excludes.add(bm.getDatanodeManager().getDatanodeByHost(host));
}
}
}
if (op == PutOpParam.Op.CREATE) {
//choose a datanode near to client
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
).getDatanodeByHost(remoteAddr);
if (clientNode != null) {
final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
path, clientNode, excludes, blocksize);
if (storages.length > 0) {
return storages[0].getDatanodeDescriptor();
}
}
} else if (op == GetOpParam.Op.OPEN
|| op == GetOpParam.Op.GETFILECHECKSUM
|| op == PostOpParam.Op.APPEND) {
//choose a datanode containing a replica
final NamenodeProtocols np = getRPCServer(namenode);
if (status == null) {
throw new FileNotFoundException("File " + path + " not found.");
}
final long len = status.getLen();
if (op == GetOpParam.Op.OPEN) {
if (openOffset < 0L || (openOffset >= len && len > 0)) {
throw new IOException("Offset=" + openOffset
+ " out of the range [0, " + len + "); " + op + ", path=" + path);
}
}
if (len > 0) {
final long offset = op == GetOpParam.Op.OPEN? openOffset: len - 1;
final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
final int count = locations.locatedBlockCount();
if (count > 0) {
return bestNode(locations.get(0).getLocations(), excludes);
}
}
}
return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology(
).chooseRandom(NodeBase.ROOT, excludes);
}
/**
* Choose the datanode to redirect the request. Note that the nodes have been
* sorted based on availability and network distances, thus it is sufficient
* to return the first element of the node here.
*/
protected static DatanodeInfo bestNode(DatanodeInfo[] nodes,
HashSet<Node> excludes) throws IOException {
for (DatanodeInfo dn: nodes) {
if (false == dn.isDecommissioned() && false == excludes.contains(dn)) {
return dn;
}
}
throw new IOException("No active nodes contain this block");
}
private Token<? extends TokenIdentifier> generateDelegationToken(
final NameNode namenode, final UserGroupInformation ugi,
final String renewer) throws IOException {
final Credentials c = DelegationTokenSecretManager.createCredentials(
namenode, ugi, renewer != null? renewer: ugi.getShortUserName());
if (c == null) {
return null;
}
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
Text kind = scheme.equals("http")
? WebHdfsConstants.WEBHDFS_TOKEN_KIND
: WebHdfsConstants.SWEBHDFS_TOKEN_KIND;
t.setKind(kind);
return t;
}
private URI redirectURI(ResponseBuilder rb, final NameNode namenode,
final UserGroupInformation ugi, final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize, final String excludeDatanodes,
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn;
final NamenodeProtocols np = getRPCServer(namenode);
HdfsFileStatus status = null;
if (op == GetOpParam.Op.OPEN
|| op == GetOpParam.Op.GETFILECHECKSUM
|| op == PostOpParam.Op.APPEND) {
status = np.getFileInfo(path);
}
dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
excludeDatanodes, remoteAddr, status);
if (dn == null) {
throw new IOException("Failed to find datanode, suggest to check cluster"
+ " health. excludeDatanodes=" + excludeDatanodes);
}
final String delegationQuery;
if (!UserGroupInformation.isSecurityEnabled()) {
//security disabled
delegationQuery = Param.toSortedString("&", doAsUser, username);
} else if (delegation.getValue() != null) {
//client has provided a token
delegationQuery = "&" + delegation;
} else {
//generate a token
final Token<? extends TokenIdentifier> t = generateDelegationToken(
namenode, ugi, null);
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
}
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append(op.toQueryString());
queryBuilder.append(delegationQuery);
queryBuilder.append("&").append(new NamenodeAddressParam(namenode));
queryBuilder.append(Param.toSortedString("&", parameters));
boolean prependReservedRawPath = false;
if (op == GetOpParam.Op.OPEN && supportEZ
&& status.getFileEncryptionInfo() != null) {
prependReservedRawPath = true;
rb.header(WebHdfsFileSystem.FEFINFO_HEADER,
encodeFeInfo(status.getFileEncryptionInfo()));
}
final String uripath = WebHdfsFileSystem.PATH_PREFIX +
(prependReservedRawPath ? "/.reserved/raw" + path : path);
int port = "http".equals(scheme) ? dn.getInfoPort() : dn
.getInfoSecurePort();
final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath,
queryBuilder.toString(), null);
if (LOG.isTraceEnabled()) {
LOG.trace("redirectURI=" + uri);
}
return uri;
}
/** Handle HTTP PUT request for the root. */
@PUT
@Path("/")
@Consumes({"*/*"})
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response putRoot(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
final UserParam username,
@QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
final DoAsParam doAsUser,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op,
@QueryParam(DestinationParam.NAME) @DefaultValue(DestinationParam.DEFAULT)
final DestinationParam destination,
@QueryParam(OwnerParam.NAME) @DefaultValue(OwnerParam.DEFAULT)
final OwnerParam owner,
@QueryParam(GroupParam.NAME) @DefaultValue(GroupParam.DEFAULT)
final GroupParam group,
@QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
final PermissionParam permission,
@QueryParam(OverwriteParam.NAME) @DefaultValue(OverwriteParam.DEFAULT)
final OverwriteParam overwrite,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize,
@QueryParam(ReplicationParam.NAME) @DefaultValue(ReplicationParam.DEFAULT)
final ReplicationParam replication,
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
final BlockSizeParam blockSize,
@QueryParam(ModificationTimeParam.NAME) @DefaultValue(ModificationTimeParam.DEFAULT)
final ModificationTimeParam modificationTime,
@QueryParam(AccessTimeParam.NAME) @DefaultValue(AccessTimeParam.DEFAULT)
final AccessTimeParam accessTime,
@QueryParam(RenameOptionSetParam.NAME) @DefaultValue(RenameOptionSetParam.DEFAULT)
final RenameOptionSetParam renameOptions,
@QueryParam(CreateParentParam.NAME) @DefaultValue(CreateParentParam.DEFAULT)
final CreateParentParam createParent,
@QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT)
final TokenArgumentParam delegationTokenArgument,
@QueryParam(AclPermissionParam.NAME) @DefaultValue(AclPermissionParam.DEFAULT)
final AclPermissionParam aclPermission,
@QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
final XAttrNameParam xattrName,
@QueryParam(XAttrValueParam.NAME) @DefaultValue(XAttrValueParam.DEFAULT)
final XAttrValueParam xattrValue,
@QueryParam(XAttrSetFlagParam.NAME) @DefaultValue(XAttrSetFlagParam.DEFAULT)
final XAttrSetFlagParam xattrSetFlag,
@QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
final SnapshotNameParam snapshotName,
@QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
final OldSnapshotNameParam oldSnapshotName,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes,
@QueryParam(CreateFlagParam.NAME) @DefaultValue(CreateFlagParam.DEFAULT)
final CreateFlagParam createFlagParam,
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
final NoRedirectParam noredirect,
@QueryParam(StoragePolicyParam.NAME) @DefaultValue(StoragePolicyParam
.DEFAULT) final StoragePolicyParam policyName
) throws IOException, InterruptedException {
return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
owner, group, permission, overwrite, bufferSize, replication,
blockSize, modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue,
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
createFlagParam, noredirect, policyName);
}
/** Handle HTTP PUT request. */
@PUT
@Path("{" + UriFsPathParam.NAME + ":.*}")
@Consumes({"*/*"})
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response put(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
final UserParam username,
@QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
final DoAsParam doAsUser,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op,
@QueryParam(DestinationParam.NAME) @DefaultValue(DestinationParam.DEFAULT)
final DestinationParam destination,
@QueryParam(OwnerParam.NAME) @DefaultValue(OwnerParam.DEFAULT)
final OwnerParam owner,
@QueryParam(GroupParam.NAME) @DefaultValue(GroupParam.DEFAULT)
final GroupParam group,
@QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
final PermissionParam permission,
@QueryParam(OverwriteParam.NAME) @DefaultValue(OverwriteParam.DEFAULT)
final OverwriteParam overwrite,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize,
@QueryParam(ReplicationParam.NAME) @DefaultValue(ReplicationParam.DEFAULT)
final ReplicationParam replication,
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
final BlockSizeParam blockSize,
@QueryParam(ModificationTimeParam.NAME) @DefaultValue(ModificationTimeParam.DEFAULT)
final ModificationTimeParam modificationTime,
@QueryParam(AccessTimeParam.NAME) @DefaultValue(AccessTimeParam.DEFAULT)
final AccessTimeParam accessTime,
@QueryParam(RenameOptionSetParam.NAME) @DefaultValue(RenameOptionSetParam.DEFAULT)
final RenameOptionSetParam renameOptions,
@QueryParam(CreateParentParam.NAME) @DefaultValue(CreateParentParam.DEFAULT)
final CreateParentParam createParent,
@QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT)
final TokenArgumentParam delegationTokenArgument,
@QueryParam(AclPermissionParam.NAME) @DefaultValue(AclPermissionParam.DEFAULT)
final AclPermissionParam aclPermission,
@QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
final XAttrNameParam xattrName,
@QueryParam(XAttrValueParam.NAME) @DefaultValue(XAttrValueParam.DEFAULT)
final XAttrValueParam xattrValue,
@QueryParam(XAttrSetFlagParam.NAME) @DefaultValue(XAttrSetFlagParam.DEFAULT)
final XAttrSetFlagParam xattrSetFlag,
@QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
final SnapshotNameParam snapshotName,
@QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
final OldSnapshotNameParam oldSnapshotName,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes,
@QueryParam(CreateFlagParam.NAME) @DefaultValue(CreateFlagParam.DEFAULT)
final CreateFlagParam createFlagParam,
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
final NoRedirectParam noredirect,
@QueryParam(StoragePolicyParam.NAME) @DefaultValue(StoragePolicyParam
.DEFAULT) final StoragePolicyParam policyName
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, destination, owner,
group, permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, delegationTokenArgument,
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
oldSnapshotName, excludeDatanodes, createFlagParam, noredirect, policyName);
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
return put(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, destination, owner, group,
permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue,
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
createFlagParam, noredirect, policyName);
}
});
}
protected Response put(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,
final DoAsParam doAsUser,
final String fullpath,
final PutOpParam op,
final DestinationParam destination,
final OwnerParam owner,
final GroupParam group,
final PermissionParam permission,
final OverwriteParam overwrite,
final BufferSizeParam bufferSize,
final ReplicationParam replication,
final BlockSizeParam blockSize,
final ModificationTimeParam modificationTime,
final AccessTimeParam accessTime,
final RenameOptionSetParam renameOptions,
final CreateParentParam createParent,
final TokenArgumentParam delegationTokenArgument,
final AclPermissionParam aclPermission,
final XAttrNameParam xattrName,
final XAttrValueParam xattrValue,
final XAttrSetFlagParam xattrSetFlag,
final SnapshotNameParam snapshotName,
final OldSnapshotNameParam oldSnapshotName,
final ExcludeDatanodesParam exclDatanodes,
final CreateFlagParam createFlagParam,
final NoRedirectParam noredirectParam,
final StoragePolicyParam policyName
) throws IOException, URISyntaxException {
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
final ClientProtocol cp = getRpcClientProtocol();
switch(op.getValue()) {
case CREATE:
{
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final URI uri = redirectURI(null, namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
exclDatanodes.getValue(), permission, overwrite, bufferSize,
replication, blockSize, createParent, createFlagParam);
if(!noredirectParam.getValue()) {
return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
} else {
final String js = JsonUtil.toJsonString("Location", uri);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
case MKDIRS:
{
final boolean b = cp.mkdirs(fullpath, permission.getFsPermission(), true);
final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case CREATESYMLINK:
{
cp.createSymlink(destination.getValue(), fullpath,
PermissionParam.getDefaultFsPermission(), createParent.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case RENAME:
{
final EnumSet<Options.Rename> s = renameOptions.getValue();
if (s.isEmpty()) {
final boolean b = cp.rename(fullpath, destination.getValue());
final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
} else {
cp.rename2(fullpath, destination.getValue(),
s.toArray(new Options.Rename[s.size()]));
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
}
case SETREPLICATION:
{
final boolean b = cp.setReplication(fullpath, replication.getValue(conf));
final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case SETOWNER:
{
if (owner.getValue() == null && group.getValue() == null) {
throw new IllegalArgumentException("Both owner and group are empty.");
}
cp.setOwner(fullpath, owner.getValue(), group.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case SETPERMISSION:
{
cp.setPermission(fullpath, permission.getFsPermission());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case SETTIMES:
{
cp.setTimes(fullpath, modificationTime.getValue(), accessTime.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case RENEWDELEGATIONTOKEN:
{
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegationTokenArgument.getValue());
final long expiryTime = cp.renewDelegationToken(token);
final String js = JsonUtil.toJsonString("long", expiryTime);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case CANCELDELEGATIONTOKEN:
{
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegationTokenArgument.getValue());
cp.cancelDelegationToken(token);
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case MODIFYACLENTRIES: {
cp.modifyAclEntries(fullpath, aclPermission.getAclPermission(true));
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case REMOVEACLENTRIES: {
cp.removeAclEntries(fullpath, aclPermission.getAclPermission(false));
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case REMOVEDEFAULTACL: {
cp.removeDefaultAcl(fullpath);
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case REMOVEACL: {
cp.removeAcl(fullpath);
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case SETACL: {
cp.setAcl(fullpath, aclPermission.getAclPermission(true));
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case SETXATTR: {
cp.setXAttr(
fullpath,
XAttrHelper.buildXAttr(xattrName.getXAttrName(),
xattrValue.getXAttrValue()), xattrSetFlag.getFlag());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case REMOVEXATTR: {
cp.removeXAttr(fullpath, XAttrHelper.buildXAttr(xattrName.getXAttrName()));
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case ALLOWSNAPSHOT: {
cp.allowSnapshot(fullpath);
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case CREATESNAPSHOT: {
String snapshotPath =
cp.createSnapshot(fullpath, snapshotName.getValue());
final String js = JsonUtil.toJsonString(
org.apache.hadoop.fs.Path.class.getSimpleName(), snapshotPath);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case RENAMESNAPSHOT: {
cp.renameSnapshot(fullpath, oldSnapshotName.getValue(),
snapshotName.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case DISALLOWSNAPSHOT: {
cp.disallowSnapshot(fullpath);
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case SETSTORAGEPOLICY: {
if (policyName.getValue() == null) {
throw new IllegalArgumentException("Storage policy name is empty.");
}
cp.setStoragePolicy(fullpath, policyName.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}
}
/** Handle HTTP POST request for the root. */
@POST
@Path("/")
@Consumes({"*/*"})
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response postRoot(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
final UserParam username,
@QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
final DoAsParam doAsUser,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op,
@QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
final ConcatSourcesParam concatSrcs,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes,
@QueryParam(NewLengthParam.NAME) @DefaultValue(NewLengthParam.DEFAULT)
final NewLengthParam newLength,
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
final NoRedirectParam noredirect
) throws IOException, InterruptedException {
return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs,
bufferSize, excludeDatanodes, newLength, noredirect);
}
/** Handle HTTP POST request. */
@POST
@Path("{" + UriFsPathParam.NAME + ":.*}")
@Consumes({"*/*"})
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response post(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
final UserParam username,
@QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
final DoAsParam doAsUser,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op,
@QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
final ConcatSourcesParam concatSrcs,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes,
@QueryParam(NewLengthParam.NAME) @DefaultValue(NewLengthParam.DEFAULT)
final NewLengthParam newLength,
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
final NoRedirectParam noredirect
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
excludeDatanodes, newLength);
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
return post(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, concatSrcs, bufferSize,
excludeDatanodes, newLength, noredirect);
}
});
}
protected Response post(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,
final DoAsParam doAsUser,
final String fullpath,
final PostOpParam op,
final ConcatSourcesParam concatSrcs,
final BufferSizeParam bufferSize,
final ExcludeDatanodesParam excludeDatanodes,
final NewLengthParam newLength,
final NoRedirectParam noredirectParam
) throws IOException, URISyntaxException {
final ClientProtocol cp = getRpcClientProtocol();
switch(op.getValue()) {
case APPEND:
{
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final URI uri = redirectURI(null, namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L, -1L,
excludeDatanodes.getValue(), bufferSize);
if(!noredirectParam.getValue()) {
return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
} else {
final String js = JsonUtil.toJsonString("Location", uri);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
case CONCAT:
{
cp.concat(fullpath, concatSrcs.getAbsolutePaths());
return Response.ok().build();
}
case TRUNCATE:
{
if (newLength.getValue() == null) {
throw new IllegalArgumentException(
"newLength parameter is Missing");
}
// We treat each rest request as a separate client.
final boolean b = cp.truncate(fullpath, newLength.getValue(),
"DFSClient_" + DFSUtil.getSecureRandom().nextLong());
final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case UNSETSTORAGEPOLICY: {
cp.unsetStoragePolicy(fullpath);
return Response.ok().build();
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}
}
/** Handle HTTP GET request for the root. */
@GET
@Path("/")
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response getRoot(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
final UserParam username,
@QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
final DoAsParam doAsUser,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op,
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
final OffsetParam offset,
@QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT)
final LengthParam length,
@QueryParam(RenewerParam.NAME) @DefaultValue(RenewerParam.DEFAULT)
final RenewerParam renewer,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize,
@QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
final List<XAttrNameParam> xattrNames,
@QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT)
final XAttrEncodingParam xattrEncoding,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes,
@QueryParam(FsActionParam.NAME) @DefaultValue(FsActionParam.DEFAULT)
final FsActionParam fsAction,
@QueryParam(TokenKindParam.NAME) @DefaultValue(TokenKindParam.DEFAULT)
final TokenKindParam tokenKind,
@QueryParam(TokenServiceParam.NAME) @DefaultValue(TokenServiceParam.DEFAULT)
final TokenServiceParam tokenService,
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
final NoRedirectParam noredirect,
@QueryParam(StartAfterParam.NAME) @DefaultValue(StartAfterParam.DEFAULT)
final StartAfterParam startAfter
) throws IOException, InterruptedException {
return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length,
renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes, fsAction,
tokenKind, tokenService, noredirect, startAfter);
}
/** Handle HTTP GET request. */
@GET
@Path("{" + UriFsPathParam.NAME + ":.*}")
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response get(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
final UserParam username,
@QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
final DoAsParam doAsUser,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op,
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
final OffsetParam offset,
@QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT)
final LengthParam length,
@QueryParam(RenewerParam.NAME) @DefaultValue(RenewerParam.DEFAULT)
final RenewerParam renewer,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize,
@QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
final List<XAttrNameParam> xattrNames,
@QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT)
final XAttrEncodingParam xattrEncoding,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes,
@QueryParam(FsActionParam.NAME) @DefaultValue(FsActionParam.DEFAULT)
final FsActionParam fsAction,
@QueryParam(TokenKindParam.NAME) @DefaultValue(TokenKindParam.DEFAULT)
final TokenKindParam tokenKind,
@QueryParam(TokenServiceParam.NAME) @DefaultValue(TokenServiceParam.DEFAULT)
final TokenServiceParam tokenService,
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
final NoRedirectParam noredirect,
@QueryParam(StartAfterParam.NAME) @DefaultValue(StartAfterParam.DEFAULT)
final StartAfterParam startAfter
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, offset, length,
renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction,
tokenKind, tokenService, startAfter);
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
return get(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
xattrNames, xattrEncoding, excludeDatanodes, fsAction, tokenKind,
tokenService, noredirect, startAfter);
}
});
}
private static String encodeFeInfo(FileEncryptionInfo feInfo) {
String encodedValue = Base64.encodeBase64String(
PBHelperClient.convert(feInfo).toByteArray());
return encodedValue;
}
protected Response get(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,
final DoAsParam doAsUser,
final String fullpath,
final GetOpParam op,
final OffsetParam offset,
final LengthParam length,
final RenewerParam renewer,
final BufferSizeParam bufferSize,
final List<XAttrNameParam> xattrNames,
final XAttrEncodingParam xattrEncoding,
final ExcludeDatanodesParam excludeDatanodes,
final FsActionParam fsAction,
final TokenKindParam tokenKind,
final TokenServiceParam tokenService,
final NoRedirectParam noredirectParam,
final StartAfterParam startAfter
) throws IOException, URISyntaxException {
final Configuration conf = (Configuration) context
.getAttribute(JspHelper.CURRENT_CONF);
final ClientProtocol cp = getRpcClientProtocol();
switch(op.getValue()) {
case OPEN:
{
final NameNode namenode = (NameNode)context.getAttribute("name.node");
ResponseBuilder rb = Response.noContent();
final URI uri = redirectURI(rb, namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
excludeDatanodes.getValue(), offset, length, bufferSize);
if(!noredirectParam.getValue()) {
return rb.status(Status.TEMPORARY_REDIRECT).location(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
} else {
final String js = JsonUtil.toJsonString("Location", uri);
return rb.status(Status.OK).entity(js).type(MediaType.APPLICATION_JSON)
.build();
}
}
case GETFILEBLOCKLOCATIONS:
{
final long offsetValue = offset.getValue();
final Long lengthValue = length.getValue();
FileSystem fs = FileSystem.get(conf != null ?
conf : new Configuration());
BlockLocation[] locations = fs.getFileBlockLocations(
new org.apache.hadoop.fs.Path(fullpath),
offsetValue,
lengthValue != null? lengthValue: Long.MAX_VALUE);
final String js = JsonUtil.toJsonString("BlockLocations",
JsonUtil.toJsonMap(locations));
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GET_BLOCK_LOCATIONS:
{
final long offsetValue = offset.getValue();
final Long lengthValue = length.getValue();
final LocatedBlocks locatedblocks = cp.getBlockLocations(fullpath,
offsetValue, lengthValue != null? lengthValue: Long.MAX_VALUE);
final String js = JsonUtil.toJsonString(locatedblocks);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETFILESTATUS:
{
final HdfsFileStatus status = cp.getFileInfo(fullpath);
if (status == null) {
throw new FileNotFoundException("File does not exist: " + fullpath);
}
final String js = JsonUtil.toJsonString(status, true);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case LISTSTATUS:
{
final StreamingOutput streaming = getListingStream(cp, fullpath);
return Response.ok(streaming).type(MediaType.APPLICATION_JSON).build();
}
case GETCONTENTSUMMARY:
{
final ContentSummary contentsummary = cp.getContentSummary(fullpath);
final String js = JsonUtil.toJsonString(contentsummary);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETQUOTAUSAGE:
{
final QuotaUsage quotaUsage = cp.getQuotaUsage(fullpath);
final String js = JsonUtil.toJsonString(quotaUsage);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETFILECHECKSUM:
{
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final URI uri = redirectURI(null, namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L, -1L, null);
if(!noredirectParam.getValue()) {
return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
} else {
final String js = JsonUtil.toJsonString("Location", uri);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
case GETDELEGATIONTOKEN:
{
if (delegation.getValue() != null) {
throw new IllegalArgumentException(delegation.getName()
+ " parameter is not null.");
}
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final Token<? extends TokenIdentifier> token = generateDelegationToken(
namenode, ugi, renewer.getValue());
final String setServiceName = tokenService.getValue();
final String setKind = tokenKind.getValue();
if (setServiceName != null) {
token.setService(new Text(setServiceName));
}
if (setKind != null) {
token.setKind(new Text(setKind));
}
final String js = JsonUtil.toJsonString(token);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETHOMEDIRECTORY: {
String userHome = DFSUtilClient.getHomeDirectory(conf, ugi).toString();
final String js = JsonUtil.toJsonString("Path", userHome);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETACLSTATUS: {
AclStatus status = cp.getAclStatus(fullpath);
if (status == null) {
throw new FileNotFoundException("File does not exist: " + fullpath);
}
final String js = JsonUtil.toJsonString(status);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETXATTRS: {
List<String> names = null;
if (xattrNames != null) {
names = Lists.newArrayListWithCapacity(xattrNames.size());
for (XAttrNameParam xattrName : xattrNames) {
if (xattrName.getXAttrName() != null) {
names.add(xattrName.getXAttrName());
}
}
}
List<XAttr> xAttrs = cp.getXAttrs(fullpath, (names != null &&
!names.isEmpty()) ? XAttrHelper.buildXAttrs(names) : null);
final String js = JsonUtil.toJsonString(xAttrs,
xattrEncoding.getEncoding());
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case LISTXATTRS: {
final List<XAttr> xAttrs = cp.listXAttrs(fullpath);
final String js = JsonUtil.toJsonString(xAttrs);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case CHECKACCESS: {
cp.checkAccess(fullpath, FsAction.getFsAction(fsAction.getValue()));
return Response.ok().build();
}
case GETTRASHROOT: {
final String trashPath = getTrashRoot(fullpath, conf);
final String jsonStr = JsonUtil.toJsonString("Path", trashPath);
return Response.ok(jsonStr).type(MediaType.APPLICATION_JSON).build();
}
case LISTSTATUS_BATCH:
{
byte[] start = HdfsFileStatus.EMPTY_NAME;
if (startAfter != null && startAfter.getValue() != null) {
start = startAfter.getValue().getBytes(Charsets.UTF_8);
}
final DirectoryListing listing = getDirectoryListing(cp, fullpath, start);
final String js = JsonUtil.toJsonString(listing);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETALLSTORAGEPOLICY: {
BlockStoragePolicy[] storagePolicies = cp.getStoragePolicies();
final String js = JsonUtil.toJsonString(storagePolicies);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETSTORAGEPOLICY: {
BlockStoragePolicy storagePolicy = cp.getStoragePolicy(fullpath);
final String js = JsonUtil.toJsonString(storagePolicy);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETSERVERDEFAULTS: {
// Since none of the server defaults values are hot reloaded, we can
// cache the output of serverDefaults.
String serverDefaultsResponse =
(String) context.getAttribute("serverDefaults");
if (serverDefaultsResponse == null) {
FsServerDefaults serverDefaults = cp.getServerDefaults();
serverDefaultsResponse = JsonUtil.toJsonString(serverDefaults);
context.setAttribute("serverDefaults", serverDefaultsResponse);
}
return Response.ok(serverDefaultsResponse)
.type(MediaType.APPLICATION_JSON).build();
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}
}
private static String getTrashRoot(String fullPath,
Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf != null ? conf : new Configuration());
return fs.getTrashRoot(
new org.apache.hadoop.fs.Path(fullPath)).toUri().getPath();
}
private static DirectoryListing getDirectoryListing(final ClientProtocol cp,
final String p, byte[] startAfter) throws IOException {
final DirectoryListing listing = cp.getListing(p, startAfter, false);
if (listing == null) { // the directory does not exist
throw new FileNotFoundException("File " + p + " does not exist.");
}
return listing;
}
private static StreamingOutput getListingStream(final ClientProtocol cp,
final String p) throws IOException {
// allows exceptions like FNF or ACE to prevent http response of 200 for
// a failure since we can't (currently) return error responses in the
// middle of a streaming operation
final DirectoryListing firstDirList = getDirectoryListing(cp, p,
HdfsFileStatus.EMPTY_NAME);
// must save ugi because the streaming object will be executed outside
// the remote user's ugi
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
return new StreamingOutput() {
@Override
public void write(final OutputStream outstream) throws IOException {
final PrintWriter out = new PrintWriter(new OutputStreamWriter(
outstream, Charsets.UTF_8));
out.println("{\"" + FileStatus.class.getSimpleName() + "es\":{\""
+ FileStatus.class.getSimpleName() + "\":[");
try {
// restore remote user's ugi
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException {
long n = 0;
for (DirectoryListing dirList = firstDirList; ;
dirList = getDirectoryListing(cp, p, dirList.getLastName())
) {
// send each segment of the directory listing
for (HdfsFileStatus s : dirList.getPartialListing()) {
if (n++ > 0) {
out.println(',');
}
out.print(JsonUtil.toJsonString(s, false));
}
// stop if last segment
if (!dirList.hasMore()) {
break;
}
}
return null;
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
out.println();
out.println("]}}");
out.flush();
}
};
}
/** Handle HTTP DELETE request for the root. */
@DELETE
@Path("/")
@Produces(MediaType.APPLICATION_JSON)
public Response deleteRoot(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
final UserParam username,
@QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
final DoAsParam doAsUser,
@QueryParam(DeleteOpParam.NAME) @DefaultValue(DeleteOpParam.DEFAULT)
final DeleteOpParam op,
@QueryParam(RecursiveParam.NAME) @DefaultValue(RecursiveParam.DEFAULT)
final RecursiveParam recursive,
@QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
final SnapshotNameParam snapshotName
) throws IOException, InterruptedException {
return delete(ugi, delegation, username, doAsUser, ROOT, op, recursive,
snapshotName);
}
/** Handle HTTP DELETE request. */
@DELETE
@Path("{" + UriFsPathParam.NAME + ":.*}")
@Produces(MediaType.APPLICATION_JSON)
public Response delete(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
final UserParam username,
@QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
final DoAsParam doAsUser,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(DeleteOpParam.NAME) @DefaultValue(DeleteOpParam.DEFAULT)
final DeleteOpParam op,
@QueryParam(RecursiveParam.NAME) @DefaultValue(RecursiveParam.DEFAULT)
final RecursiveParam recursive,
@QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
final SnapshotNameParam snapshotName
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, recursive, snapshotName);
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException {
return delete(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, recursive, snapshotName);
}
});
}
protected Response delete(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,
final DoAsParam doAsUser,
final String fullpath,
final DeleteOpParam op,
final RecursiveParam recursive,
final SnapshotNameParam snapshotName
) throws IOException {
final ClientProtocol cp = getRpcClientProtocol();
switch(op.getValue()) {
case DELETE: {
final boolean b = cp.delete(fullpath, recursive.getValue());
final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case DELETESNAPSHOT: {
cp.deleteSnapshot(fullpath, snapshotName.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}
}
}