blob: 37b66e64026fdb3244d319aada62ab57eca93e7d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_ENABLED_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_KEY;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Base64.Decoder;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.HdfsKMSUtil;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FileEncryptionInfoProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.JsonSerialization;
import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/** A FileSystem for HDFS over the web. */
public class WebHdfsFileSystem extends FileSystem
implements DelegationTokenRenewer.Renewable,
TokenAspect.TokenManagementDelegator, KeyProviderTokenIssuer {
public static final Logger LOG = LoggerFactory
.getLogger(WebHdfsFileSystem.class);
/** WebHdfs version. */
public static final int VERSION = 1;
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
public static final String PATH_PREFIX = "/" + WebHdfsConstants.WEBHDFS_SCHEME
+ "/v" + VERSION;
public static final String EZ_HEADER = "X-Hadoop-Accept-EZ";
public static final String FEFINFO_HEADER = "X-Hadoop-feInfo";
/**
* Default connection factory may be overridden in tests to use smaller
* timeout values
*/
protected URLConnectionFactory connectionFactory;
@VisibleForTesting
public static final String CANT_FALLBACK_TO_INSECURE_MSG =
"The client is configured to only allow connecting to secure cluster";
private boolean canRefreshDelegationToken;
private UserGroupInformation ugi;
private URI uri;
private Token<?> delegationToken;
protected Text tokenServiceName;
private RetryPolicy retryPolicy = null;
private Path workingDir;
private Path cachedHomeDirectory;
private InetSocketAddress nnAddrs[];
private int currentNNAddrIndex;
private boolean disallowFallbackToInsecureCluster;
private boolean isInsecureCluster;
private String restCsrfCustomHeader;
private Set<String> restCsrfMethodsToIgnore;
private DFSOpsCountStatistics storageStatistics;
private KeyProvider testProvider;
/**
* Return the protocol scheme for the FileSystem.
* <p/>
*
* @return <code>webhdfs</code>
*/
@Override
public String getScheme() {
return WebHdfsConstants.WEBHDFS_SCHEME;
}
/**
* return the underlying transport protocol (http / https).
*/
protected String getTransportScheme() {
return "http";
}
protected Text getTokenKind() {
return WebHdfsConstants.WEBHDFS_TOKEN_KIND;
}
@Override
public synchronized void initialize(URI uri, Configuration conf
) throws IOException {
super.initialize(uri, conf);
setConf(conf);
// set user and acl patterns based on configuration file
UserParam.setUserPattern(conf.get(
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
AclPermissionParam.setAclPermissionPattern(conf.get(
HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_KEY,
HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT));
int connectTimeout = (int) conf.getTimeDuration(
HdfsClientConfigKeys.DFS_WEBHDFS_SOCKET_CONNECT_TIMEOUT_KEY,
URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
TimeUnit.MILLISECONDS);
int readTimeout = (int) conf.getTimeDuration(
HdfsClientConfigKeys.DFS_WEBHDFS_SOCKET_READ_TIMEOUT_KEY,
URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
TimeUnit.MILLISECONDS);
boolean isOAuth = conf.getBoolean(
HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY,
HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT);
if(isOAuth) {
LOG.debug("Enabling OAuth2 in WebHDFS");
connectionFactory = URLConnectionFactory
.newOAuth2URLConnectionFactory(connectTimeout, readTimeout, conf);
} else {
LOG.debug("Not enabling OAuth2 in WebHDFS");
connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(connectTimeout, readTimeout, conf);
}
ugi = UserGroupInformation.getCurrentUser();
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.nnAddrs = resolveNNAddr();
boolean isHA = HAUtilClient.isClientFailoverConfigured(conf, this.uri);
boolean isLogicalUri = isHA && HAUtilClient.isLogicalUri(conf, this.uri);
// In non-HA or non-logical URI case, the code needs to call
// getCanonicalUri() in order to handle the case where no port is
// specified in the URI
this.tokenServiceName = isLogicalUri ?
HAUtilClient.buildTokenServiceForLogicalUri(uri, getScheme())
: SecurityUtil.buildTokenService(getCanonicalUri());
if (!isHA) {
this.retryPolicy =
RetryUtils.getDefaultRetryPolicy(
conf,
HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_KEY,
HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_DEFAULT,
HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_KEY,
HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_DEFAULT,
HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME);
} else {
int maxFailoverAttempts = conf.getInt(
HdfsClientConfigKeys.HttpClient.FAILOVER_MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.HttpClient.FAILOVER_MAX_ATTEMPTS_DEFAULT);
int maxRetryAttempts = conf.getInt(
HdfsClientConfigKeys.HttpClient.RETRY_MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.HttpClient.RETRY_MAX_ATTEMPTS_DEFAULT);
int failoverSleepBaseMillis = conf.getInt(
HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_BASE_KEY,
HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_BASE_DEFAULT);
int failoverSleepMaxMillis = conf.getInt(
HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_KEY,
HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_DEFAULT);
this.retryPolicy = RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
maxFailoverAttempts, maxRetryAttempts, failoverSleepBaseMillis,
failoverSleepMaxMillis);
}
this.workingDir = makeQualified(new Path(getHomeDirectoryString(ugi)));
this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
this.isInsecureCluster = !this.canRefreshDelegationToken;
this.disallowFallbackToInsecureCluster = !conf.getBoolean(
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.initializeRestCsrf(conf);
this.delegationToken = null;
storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE
.put(DFSOpsCountStatistics.NAME,
new StorageStatisticsProvider() {
@Override
public StorageStatistics provide() {
return new DFSOpsCountStatistics();
}
});
}
/**
* Initializes client-side handling of cross-site request forgery (CSRF)
* protection by figuring out the custom HTTP headers that need to be sent in
* requests and which HTTP methods are ignored because they do not require
* CSRF protection.
*
* @param conf configuration to read
*/
private void initializeRestCsrf(Configuration conf) {
if (conf.getBoolean(DFS_WEBHDFS_REST_CSRF_ENABLED_KEY,
DFS_WEBHDFS_REST_CSRF_ENABLED_DEFAULT)) {
this.restCsrfCustomHeader = conf.getTrimmed(
DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_KEY,
DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_DEFAULT);
this.restCsrfMethodsToIgnore = new HashSet<>();
this.restCsrfMethodsToIgnore.addAll(getTrimmedStringList(conf,
DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_KEY,
DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_DEFAULT));
} else {
this.restCsrfCustomHeader = null;
this.restCsrfMethodsToIgnore = null;
}
}
/**
* Returns a list of strings from a comma-delimited configuration value.
*
* @param conf configuration to check
* @param name configuration property name
* @param defaultValue default value if no value found for name
* @return list of strings from comma-delimited configuration value, or an
* empty list if not found
*/
private static List<String> getTrimmedStringList(Configuration conf,
String name, String defaultValue) {
String valueString = conf.get(name, defaultValue);
if (valueString == null) {
return new ArrayList<>();
}
return new ArrayList<>(StringUtils.getTrimmedStringCollection(valueString));
}
@Override
public URI getCanonicalUri() {
return super.getCanonicalUri();
}
TokenSelector<DelegationTokenIdentifier> tokenSelector =
new AbstractDelegationTokenSelector<DelegationTokenIdentifier>(getTokenKind()){};
// the first getAuthParams() for a non-token op will either get the
// internal token from the ugi or lazy fetch one
protected synchronized Token<?> getDelegationToken() throws IOException {
if (delegationToken == null) {
Token<?> token = tokenSelector.selectToken(
new Text(getCanonicalServiceName()), ugi.getTokens());
// ugi tokens are usually indicative of a task which can't
// refetch tokens. even if ugi has credentials, don't attempt
// to get another token to match hdfs/rpc behavior
if (token != null) {
LOG.debug("Using UGI token: {}", token);
canRefreshDelegationToken = false;
} else {
if (canRefreshDelegationToken) {
token = getDelegationToken(null);
if (token != null) {
LOG.debug("Fetched new token: {}", token);
} else { // security is disabled
canRefreshDelegationToken = false;
isInsecureCluster = true;
}
}
}
setDelegationToken(token);
}
return delegationToken;
}
@VisibleForTesting
synchronized boolean replaceExpiredDelegationToken() throws IOException {
boolean replaced = false;
if (canRefreshDelegationToken) {
Token<?> token = getDelegationToken(null);
LOG.debug("Replaced expired token: {}", token);
setDelegationToken(token);
replaced = (token != null);
}
return replaced;
}
@Override
protected int getDefaultPort() {
return HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
}
@Override
public URI getUri() {
return this.uri;
}
@Override
protected URI canonicalizeUri(URI uri) {
return NetUtils.getCanonicalUri(uri, getDefaultPort());
}
/** @return the home directory */
@Deprecated
public static String getHomeDirectoryString(final UserGroupInformation ugi) {
return "/user/" + ugi.getShortUserName();
}
@Override
public Path getHomeDirectory() {
if (cachedHomeDirectory == null) {
final HttpOpParam.Op op = GetOpParam.Op.GETHOMEDIRECTORY;
try {
String pathFromDelegatedFS = new FsPathResponseRunner<String>(op, null){
@Override
String decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.getPath(json);
}
} .run();
cachedHomeDirectory = new Path(pathFromDelegatedFS).makeQualified(
this.getUri(), null);
} catch (IOException e) {
LOG.error("Unable to get HomeDirectory from original File System", e);
cachedHomeDirectory = new Path("/user/" + ugi.getShortUserName())
.makeQualified(this.getUri(), null);
}
}
return cachedHomeDirectory;
}
@Override
public synchronized Path getWorkingDirectory() {
return workingDir;
}
@Override
public synchronized void setWorkingDirectory(final Path dir) {
Path absolutePath = makeAbsolute(dir);
String result = absolutePath.toUri().getPath();
if (!DFSUtilClient.isValidName(result)) {
throw new IllegalArgumentException("Invalid DFS directory name " +
result);
}
workingDir = absolutePath;
}
private Path makeAbsolute(Path f) {
return f.isAbsolute()? f: new Path(workingDir, f);
}
static Map<?, ?> jsonParse(final HttpURLConnection c,
final boolean useErrorStream) throws IOException {
if (c.getContentLength() == 0) {
return null;
}
final InputStream in = useErrorStream ?
c.getErrorStream() : c.getInputStream();
if (in == null) {
throw new IOException("The " + (useErrorStream? "error": "input") +
" stream is null.");
}
try {
final String contentType = c.getContentType();
if (contentType != null) {
final MediaType parsed = MediaType.valueOf(contentType);
if (!MediaType.APPLICATION_JSON_TYPE.isCompatible(parsed)) {
throw new IOException("Content-Type \"" + contentType
+ "\" is incompatible with \"" + MediaType.APPLICATION_JSON
+ "\" (parsed=\"" + parsed + "\")");
}
}
return JsonSerialization.mapReader().readValue(in);
} finally {
in.close();
}
}
private static Map<?, ?> validateResponse(final HttpOpParam.Op op,
final HttpURLConnection conn, boolean unwrapException)
throws IOException {
final int code = conn.getResponseCode();
// server is demanding an authentication we don't support
if (code == HttpURLConnection.HTTP_UNAUTHORIZED) {
// match hdfs/rpc exception
throw new AccessControlException(conn.getResponseMessage());
}
if (code != op.getExpectedHttpResponseCode()) {
final Map<?, ?> m;
try {
m = jsonParse(conn, true);
} catch(Exception e) {
throw new IOException("Unexpected HTTP response: code=" + code + " != "
+ op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
+ ", message=" + conn.getResponseMessage(), e);
}
if (m == null) {
throw new IOException("Unexpected HTTP response: code=" + code + " != "
+ op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
+ ", message=" + conn.getResponseMessage());
} else if (m.get(RemoteException.class.getSimpleName()) == null) {
return m;
}
IOException re = JsonUtilClient.toRemoteException(m);
//check if exception is due to communication with a Standby name node
if (re.getMessage() != null && re.getMessage().endsWith(
StandbyException.class.getSimpleName())) {
LOG.trace("Detected StandbyException", re);
throw new IOException(re);
}
// extract UGI-related exceptions and unwrap InvalidToken
// the NN mangles these exceptions but the DN does not and may need
// to re-fetch a token if either report the token is expired
if (re.getMessage() != null && re.getMessage().startsWith(
SecurityUtil.FAILED_TO_GET_UGI_MSG_HEADER)) {
String[] parts = re.getMessage().split(":\\s+", 3);
re = new RemoteException(parts[1], parts[2]);
re = ((RemoteException)re).unwrapRemoteException(InvalidToken.class);
}
throw unwrapException? toIOException(re): re;
}
return null;
}
/**
* Covert an exception to an IOException.
*
* For a non-IOException, wrap it with IOException.
* For a RemoteException, unwrap it.
* For an IOException which is not a RemoteException, return it.
*/
private static IOException toIOException(Exception e) {
if (!(e instanceof IOException)) {
return new IOException(e);
}
final IOException ioe = (IOException)e;
if (!(ioe instanceof RemoteException)) {
return ioe;
}
return ((RemoteException)ioe).unwrapRemoteException();
}
private synchronized InetSocketAddress getCurrentNNAddr() {
return nnAddrs[currentNNAddrIndex];
}
/**
* Reset the appropriate state to gracefully fail over to another name node
*/
private synchronized void resetStateToFailOver() {
currentNNAddrIndex = (currentNNAddrIndex + 1) % nnAddrs.length;
}
/**
* Return a URL pointing to given path on the namenode.
*
* @param path to obtain the URL for
* @param query string to append to the path
* @return namenode URL referring to the given path
* @throws IOException on error constructing the URL
*/
private URL getNamenodeURL(String path, String query) throws IOException {
InetSocketAddress nnAddr = getCurrentNNAddr();
final URL url = new URL(getTransportScheme(), nnAddr.getHostName(),
nnAddr.getPort(), path + '?' + query);
LOG.trace("url={}", url);
return url;
}
private synchronized Param<?, ?>[] getAuthParameters(final HttpOpParam.Op op)
throws IOException {
List<Param<?,?>> authParams = Lists.newArrayList();
// Skip adding delegation token for token operations because these
// operations require authentication.
Token<?> token = null;
if (!op.getRequireAuth()) {
token = getDelegationToken();
}
if (token != null) {
authParams.add(new DelegationParam(token.encodeToUrlString()));
} else {
UserGroupInformation userUgi = ugi;
UserGroupInformation realUgi = userUgi.getRealUser();
if (realUgi != null) { // proxy user
authParams.add(new DoAsParam(userUgi.getShortUserName()));
userUgi = realUgi;
}
UserParam userParam = new UserParam((userUgi.getShortUserName()));
//in insecure, use user.name parameter, in secure, use spnego auth
if(isInsecureCluster) {
authParams.add(userParam);
}
}
return authParams.toArray(new Param<?,?>[0]);
}
URL toUrl(final HttpOpParam.Op op, final Path fspath,
final Param<?,?>... parameters) throws IOException {
//initialize URI path and query
final String path = PATH_PREFIX
+ (fspath == null? "/": makeQualified(fspath).toUri().getRawPath());
final String query = op.toQueryString()
+ Param.toSortedString("&", getAuthParameters(op))
+ Param.toSortedString("&", parameters);
final URL url = getNamenodeURL(path, query);
LOG.trace("url={}", url);
return url;
}
/**
* This class is for initialing a HTTP connection, connecting to server,
* obtaining a response, and also handling retry on failures.
*/
abstract class AbstractRunner<T> {
abstract protected URL getUrl() throws IOException;
protected final HttpOpParam.Op op;
private final boolean redirected;
protected ExcludeDatanodesParam excludeDatanodes =
new ExcludeDatanodesParam("");
private boolean checkRetry;
private String redirectHost;
private boolean followRedirect = true;
protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
this.op = op;
this.redirected = redirected;
}
protected AbstractRunner(final HttpOpParam.Op op, boolean redirected,
boolean followRedirect) {
this(op, redirected);
this.followRedirect = followRedirect;
}
T run() throws IOException {
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
connectUgi = ugi;
}
if (op.getRequireAuth()) {
connectUgi.checkTGTAndReloginFromKeytab();
}
try {
// the entire lifecycle of the connection must be run inside the
// doAs to ensure authentication is performed correctly
return connectUgi.doAs(
new PrivilegedExceptionAction<T>() {
@Override
public T run() throws IOException {
return runWithRetry();
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
}
/**
* Two-step requests redirected to a DN
*
* Create/Append:
* Step 1) Submit a Http request with neither auto-redirect nor data.
* Step 2) Submit another Http request with the URL from the Location header
* with data.
*
* The reason of having two-step create/append is for preventing clients to
* send out the data before the redirect. This issue is addressed by the
* "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
* Unfortunately, there are software library bugs (e.g. Jetty 6 http server
* and Java 6 http client), which do not correctly implement "Expect:
* 100-continue". The two-step create/append is a temporary workaround for
* the software library bugs.
*
* Open/Checksum
* Also implements two-step connects for other operations redirected to
* a DN such as open and checksum
*/
protected HttpURLConnection connect(URL url) throws IOException {
//redirect hostname and port
redirectHost = null;
// resolve redirects for a DN operation unless already resolved
if (op.getRedirect() && !redirected) {
final HttpOpParam.Op redirectOp =
HttpOpParam.TemporaryRedirectOp.valueOf(op);
final HttpURLConnection conn = connect(redirectOp, url);
// application level proxy like httpfs might not issue a redirect
if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) {
return conn;
}
try {
validateResponse(redirectOp, conn, false);
url = new URL(conn.getHeaderField("Location"));
redirectHost = url.getHost() + ":" + url.getPort();
} finally {
// TODO: consider not calling conn.disconnect() to allow connection reuse
// See http://tinyurl.com/java7-http-keepalive
conn.disconnect();
}
if (!followRedirect) {
return conn;
}
}
try {
final HttpURLConnection conn = connect(op, url);
// output streams will validate on close
if (!op.getDoOutput()) {
validateResponse(op, conn, false);
}
return conn;
} catch (IOException ioe) {
if (redirectHost != null) {
if (excludeDatanodes.getValue() != null) {
excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
+ excludeDatanodes.getValue());
} else {
excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
}
}
throw ioe;
}
}
private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
throws IOException {
final HttpURLConnection conn =
(HttpURLConnection)connectionFactory.openConnection(url);
final boolean doOutput = op.getDoOutput();
conn.setRequestMethod(op.getType().toString());
conn.setInstanceFollowRedirects(false);
if (restCsrfCustomHeader != null &&
!restCsrfMethodsToIgnore.contains(op.getType().name())) {
// The value of the header is unimportant. Only its presence matters.
conn.setRequestProperty(restCsrfCustomHeader, "\"\"");
}
conn.setRequestProperty(EZ_HEADER, "true");
switch (op.getType()) {
// if not sending a message body for a POST or PUT operation, need
// to ensure the server/proxy knows this
case POST:
case PUT: {
conn.setDoOutput(true);
if (!doOutput) {
// explicitly setting content-length to 0 won't do spnego!!
// opening and closing the stream will send "Content-Length: 0"
conn.getOutputStream().close();
} else {
conn.setRequestProperty("Content-Type",
MediaType.APPLICATION_OCTET_STREAM);
conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
}
break;
}
default:
conn.setDoOutput(doOutput);
break;
}
conn.connect();
return conn;
}
private T runWithRetry() throws IOException {
/**
* Do the real work.
*
* There are three cases that the code inside the loop can throw an
* IOException:
*
* <ul>
* <li>The connection has failed (e.g., ConnectException,
* @see FailoverOnNetworkExceptionRetry for more details)</li>
* <li>The namenode enters the standby state (i.e., StandbyException).</li>
* <li>The server returns errors for the command (i.e., RemoteException)</li>
* </ul>
*
* The call to shouldRetry() will conduct the retry policy. The policy
* examines the exception and swallows it if it decides to rerun the work.
*/
for(int retry = 0; ; retry++) {
checkRetry = !redirected;
final URL url = getUrl();
try {
final HttpURLConnection conn = connect(url);
return getResponse(conn);
} catch (AccessControlException ace) {
// no retries for auth failures
throw ace;
} catch (InvalidToken it) {
// try to replace the expired token with a new one. the attempt
// to acquire a new token must be outside this operation's retry
// so if it fails after its own retries, this operation fails too.
if (op.getRequireAuth() || !replaceExpiredDelegationToken()) {
throw it;
}
} catch (IOException ioe) {
// Attempt to include the redirected node in the exception. If the
// attempt to recreate the exception fails, just use the original.
String node = redirectHost;
if (node == null) {
node = url.getAuthority();
}
try {
IOException newIoe = ioe.getClass().getConstructor(String.class)
.newInstance(node + ": " + ioe.getMessage());
newIoe.initCause(ioe.getCause());
newIoe.setStackTrace(ioe.getStackTrace());
ioe = newIoe;
} catch (NoSuchMethodException | SecurityException
| InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
}
shouldRetry(ioe, retry);
}
}
}
private void shouldRetry(final IOException ioe, final int retry
) throws IOException {
InetSocketAddress nnAddr = getCurrentNNAddr();
if (checkRetry) {
try {
final RetryPolicy.RetryAction a = retryPolicy.shouldRetry(
ioe, retry, 0, true);
boolean isRetry =
a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
boolean isFailoverAndRetry =
a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
if (isRetry || isFailoverAndRetry) {
LOG.info("Retrying connect to namenode: {}. Already retried {}"
+ " time(s); retry policy is {}, delay {}ms.",
nnAddr, retry, retryPolicy, a.delayMillis);
if (isFailoverAndRetry) {
resetStateToFailOver();
}
Thread.sleep(a.delayMillis);
return;
}
} catch(Exception e) {
LOG.warn("Original exception is ", ioe);
throw toIOException(e);
}
}
throw toIOException(ioe);
}
abstract T getResponse(HttpURLConnection conn) throws IOException;
}
/**
* Abstract base class to handle path-based operations with params
*/
abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
private final Path fspath;
private Param<?,?>[] parameters;
AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
Param<?,?>... parameters) {
super(op, false);
this.fspath = fspath;
this.parameters = parameters;
}
AbstractFsPathRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
final Path fspath) {
super(op, false);
this.fspath = fspath;
this.parameters = parameters;
}
protected void updateURLParameters(Param<?, ?>... p) {
this.parameters = p;
}
@Override
protected URL getUrl() throws IOException {
if (excludeDatanodes.getValue() != null) {
Param<?, ?>[] tmpParam = new Param<?, ?>[parameters.length + 1];
System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
tmpParam[parameters.length] = excludeDatanodes;
return toUrl(op, fspath, tmpParam);
} else {
return toUrl(op, fspath, parameters);
}
}
Path getFspath() {
return fspath;
}
}
/**
* Default path-based implementation expects no json response
*/
class FsPathRunner extends AbstractFsPathRunner<Void> {
FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
super(op, fspath, parameters);
}
@Override
Void getResponse(HttpURLConnection conn) throws IOException {
return null;
}
}
/**
* Handle path-based operations with a json response
*/
abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> {
FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath,
Param<?,?>... parameters) {
super(op, fspath, parameters);
}
FsPathResponseRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
final Path fspath) {
super(op, parameters, fspath);
}
@Override
final T getResponse(HttpURLConnection conn) throws IOException {
try {
final Map<?,?> json = jsonParse(conn, false);
if (json == null) {
// match exception class thrown by parser
throw new IllegalStateException("Missing response");
}
return decodeResponse(json);
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) { // catch json parser errors
final IOException ioe =
new IOException("Response decoding failure: "+e.toString(), e);
LOG.debug("Response decoding failure.", e);
throw ioe;
} finally {
// Don't call conn.disconnect() to allow connection reuse
// See http://tinyurl.com/java7-http-keepalive
conn.getInputStream().close();
}
}
abstract T decodeResponse(Map<?,?> json) throws IOException;
}
/**
* Handle path-based operations with json boolean response
*/
class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> {
FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
super(op, fspath, parameters);
}
@Override
Boolean decodeResponse(Map<?,?> json) throws IOException {
return (Boolean)json.get("boolean");
}
}
/**
* Handle create/append output streams
*/
class FsPathOutputStreamRunner
extends AbstractFsPathRunner<FSDataOutputStream> {
private final int bufferSize;
FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
Param<?,?>... parameters) {
super(op, fspath, parameters);
this.bufferSize = bufferSize;
}
@Override
FSDataOutputStream getResponse(final HttpURLConnection conn)
throws IOException {
return new FSDataOutputStream(new BufferedOutputStream(
conn.getOutputStream(), bufferSize), statistics) {
@Override
public void write(int b) throws IOException {
try {
super.write(b);
} catch (IOException e) {
LOG.warn("Write to output stream for file '{}' failed. "
+ "Attempting to fetch the cause from the connection.",
getFspath(), e);
validateResponse(op, conn, true);
throw e;
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
try {
super.write(b, off, len);
} catch (IOException e) {
LOG.warn("Write to output stream for file '{}' failed. "
+ "Attempting to fetch the cause from the connection.",
getFspath(), e);
validateResponse(op, conn, true);
throw e;
}
}
@Override
public void close() throws IOException {
try {
super.close();
} finally {
try {
validateResponse(op, conn, true);
} finally {
// This is a connection to DataNode. Let's disconnect since
// there is little chance that the connection will be reused
// any time soonl
conn.disconnect();
}
}
}
};
}
}
class FsPathConnectionRunner extends AbstractFsPathRunner<HttpURLConnection> {
FsPathConnectionRunner(Op op, Path fspath, Param<?,?>... parameters) {
super(op, fspath, parameters);
}
@Override
HttpURLConnection getResponse(final HttpURLConnection conn)
throws IOException {
return conn;
}
}
/**
* Used by open() which tracks the resolved url itself
*/
class URLRunner extends AbstractRunner<HttpURLConnection> {
private final URL url;
@Override
protected URL getUrl() throws IOException {
return url;
}
protected URLRunner(final HttpOpParam.Op op, final URL url,
boolean redirected, boolean followRedirect) {
super(op, redirected, followRedirect);
this.url = url;
}
@Override
HttpURLConnection getResponse(HttpURLConnection conn) throws IOException {
return conn;
}
}
private FsPermission applyUMask(FsPermission permission) {
if (permission == null) {
permission = FsPermission.getDefault();
}
return FsCreateModes.applyUMask(permission,
FsPermission.getUMask(getConf()));
}
private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
@Override
HdfsFileStatus decodeResponse(Map<?,?> json) {
return JsonUtilClient.toFileStatus(json, true);
}
}.run();
if (status == null) {
throw new FileNotFoundException("File does not exist: " + f);
}
return status;
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS);
return getHdfsFileStatus(f).makeQualified(getUri(), f);
}
@Override
public AclStatus getAclStatus(Path f) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS;
AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
@Override
AclStatus decodeResponse(Map<?,?> json) {
return JsonUtilClient.toAclStatus(json);
}
}.run();
if (status == null) {
throw new FileNotFoundException("File does not exist: " + f);
}
return status;
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.MKDIRS);
final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
final FsPermission modes = applyUMask(permission);
return new FsPathBooleanRunner(op, f,
new PermissionParam(modes.getMasked()),
new UnmaskedPermissionParam(modes.getUnmasked())
).run();
}
/**
* Create a symlink pointing to the destination path.
*/
public void createSymlink(Path destination, Path f, boolean createParent
) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK);
final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
new FsPathRunner(op, f,
new DestinationParam(makeQualified(destination).toUri().getPath()),
new CreateParentParam(createParent)
).run();
}
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.RENAME);
final HttpOpParam.Op op = PutOpParam.Op.RENAME;
return new FsPathBooleanRunner(op, src,
new DestinationParam(makeQualified(dst).toUri().getPath())
).run();
}
@SuppressWarnings("deprecation")
@Override
public void rename(final Path src, final Path dst,
final Options.Rename... options) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.RENAME);
final HttpOpParam.Op op = PutOpParam.Op.RENAME;
new FsPathRunner(op, src,
new DestinationParam(makeQualified(dst).toUri().getPath()),
new RenameOptionSetParam(options)
).run();
}
@Override
public void setXAttr(Path p, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_XATTR);
final HttpOpParam.Op op = PutOpParam.Op.SETXATTR;
if (value != null) {
new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam(
XAttrCodec.encodeValue(value, XAttrCodec.HEX)),
new XAttrSetFlagParam(flag)).run();
} else {
new FsPathRunner(op, p, new XAttrNameParam(name),
new XAttrSetFlagParam(flag)).run();
}
}
@Override
public byte[] getXAttr(Path p, final String name) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_XATTR);
final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name),
new XAttrEncodingParam(XAttrCodec.HEX)) {
@Override
byte[] decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.getXAttr(json);
}
}.run();
}
@Override
public Map<String, byte[]> getXAttrs(Path p) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
return new FsPathResponseRunner<Map<String, byte[]>>(op, p,
new XAttrEncodingParam(XAttrCodec.HEX)) {
@Override
Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.toXAttrs(json);
}
}.run();
}
@Override
public Map<String, byte[]> getXAttrs(Path p, final List<String> names)
throws IOException {
Preconditions.checkArgument(names != null && !names.isEmpty(),
"XAttr names cannot be null or empty.");
Param<?,?>[] parameters = new Param<?,?>[names.size() + 1];
for (int i = 0; i < parameters.length - 1; i++) {
parameters[i] = new XAttrNameParam(names.get(i));
}
parameters[parameters.length - 1] = new XAttrEncodingParam(XAttrCodec.HEX);
final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) {
@Override
Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.toXAttrs(json);
}
}.run();
}
@Override
public List<String> listXAttrs(Path p) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.LISTXATTRS;
return new FsPathResponseRunner<List<String>>(op, p) {
@Override
List<String> decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.toXAttrNames(json);
}
}.run();
}
@Override
public void removeXAttr(Path p, String name) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR;
new FsPathRunner(op, p, new XAttrNameParam(name)).run();
}
@Override
public void setOwner(final Path p, final String owner, final String group
) throws IOException {
if (owner == null && group == null) {
throw new IOException("owner == null && group == null");
}
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_OWNER);
final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
new FsPathRunner(op, p,
new OwnerParam(owner), new GroupParam(group)
).run();
}
@Override
public void setPermission(final Path p, final FsPermission permission
) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_PERMISSION);
final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
new FsPathRunner(op, p,new PermissionParam(permission)).run();
}
@Override
public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES);
final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
}
@Override
public void removeAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
}
@Override
public void removeDefaultAcl(Path path) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
new FsPathRunner(op, path).run();
}
@Override
public void removeAcl(Path path) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_ACL);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
new FsPathRunner(op, path).run();
}
@Override
public void setAcl(final Path p, final List<AclEntry> aclSpec)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_ACL);
final HttpOpParam.Op op = PutOpParam.Op.SETACL;
new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
}
public void allowSnapshot(final Path p) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.ALLOW_SNAPSHOT);
final HttpOpParam.Op op = PutOpParam.Op.ALLOWSNAPSHOT;
new FsPathRunner(op, p).run();
}
public void enableECPolicy(String policyName) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.ENABLE_EC_POLICY);
final HttpOpParam.Op op = PutOpParam.Op.ENABLEECPOLICY;
new FsPathRunner(op, null, new ECPolicyParam(policyName)).run();
}
public void disableECPolicy(String policyName) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.DISABLE_EC_POLICY);
final HttpOpParam.Op op = PutOpParam.Op.DISABLEECPOLICY;
new FsPathRunner(op, null, new ECPolicyParam(policyName)).run();
}
public void setErasureCodingPolicy(Path p, String policyName)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_EC_POLICY);
final HttpOpParam.Op op = PutOpParam.Op.SETECPOLICY;
new FsPathRunner(op, p, new ECPolicyParam(policyName)).run();
}
public void unsetErasureCodingPolicy(Path p) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.UNSET_EC_POLICY);
final HttpOpParam.Op op = PostOpParam.Op.UNSETECPOLICY;
new FsPathRunner(op, p).run();
}
public ErasureCodingPolicy getErasureCodingPolicy(Path p)
throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_EC_POLICY);
final HttpOpParam.Op op =GetOpParam.Op.GETECPOLICY;
return new FsPathResponseRunner<ErasureCodingPolicy>(op, p) {
@Override
ErasureCodingPolicy decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.toECPolicy((Map<?, ?>) json);
}
}.run();
}
@Override
public Path createSnapshot(final Path path, final String snapshotName)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT);
final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT;
return new FsPathResponseRunner<Path>(op, path,
new SnapshotNameParam(snapshotName)) {
@Override
Path decodeResponse(Map<?,?> json) {
return new Path((String) json.get(Path.class.getSimpleName()));
}
}.run();
}
public void disallowSnapshot(final Path p) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.DISALLOW_SNAPSHOT);
final HttpOpParam.Op op = PutOpParam.Op.DISALLOWSNAPSHOT;
new FsPathRunner(op, p).run();
}
@Override
public void deleteSnapshot(final Path path, final String snapshotName)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT);
final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT;
new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run();
}
@Override
public void renameSnapshot(final Path path, final String snapshotOldName,
final String snapshotNewName) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT);
final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT;
new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName),
new SnapshotNameParam(snapshotNewName)).run();
}
public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
final String fromSnapshot, final String toSnapshot) throws IOException {
storageStatistics.incrementOpCounter(OpType.GET_SNAPSHOT_DIFF);
final HttpOpParam.Op op = GetOpParam.Op.GETSNAPSHOTDIFF;
return new FsPathResponseRunner<SnapshotDiffReport>(op, snapshotDir,
new OldSnapshotNameParam(fromSnapshot),
new SnapshotNameParam(toSnapshot)) {
@Override
SnapshotDiffReport decodeResponse(Map<?, ?> json) {
return JsonUtilClient.toSnapshotDiffReport(json);
}
}.run();
}
public SnapshottableDirectoryStatus[] getSnapshottableDirectoryList()
throws IOException {
storageStatistics
.incrementOpCounter(OpType.GET_SNAPSHOTTABLE_DIRECTORY_LIST);
final HttpOpParam.Op op = GetOpParam.Op.GETSNAPSHOTTABLEDIRECTORYLIST;
return new FsPathResponseRunner<SnapshottableDirectoryStatus[]>(op, null) {
@Override
SnapshottableDirectoryStatus[] decodeResponse(Map<?, ?> json) {
return JsonUtilClient.toSnapshottableDirectoryList(json);
}
}.run();
}
@Override
public boolean setReplication(final Path p, final short replication
) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_REPLICATION);
final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
return new FsPathBooleanRunner(op, p,
new ReplicationParam(replication)
).run();
}
@Override
public void setTimes(final Path p, final long mtime, final long atime
) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_TIMES);
final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
new FsPathRunner(op, p,
new ModificationTimeParam(mtime),
new AccessTimeParam(atime)
).run();
}
@Override
public long getDefaultBlockSize() {
return getConf().getLongBytes(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY,
HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
}
@Override
public short getDefaultReplication() {
return (short)getConf().getInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY,
HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT);
}
@Override
public void concat(final Path trg, final Path [] srcs) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CONCAT);
final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
}
@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,
final boolean overwrite, final int bufferSize, final short replication,
final long blockSize, final Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE);
final FsPermission modes = applyUMask(permission);
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
return new FsPathOutputStreamRunner(op, f, bufferSize,
new PermissionParam(modes.getMasked()),
new UnmaskedPermissionParam(modes.getUnmasked()),
new OverwriteParam(overwrite),
new BufferSizeParam(bufferSize),
new ReplicationParam(replication),
new BlockSizeParam(blockSize)
).run();
}
@Override
public FSDataOutputStream createNonRecursive(final Path f,
final FsPermission permission, final EnumSet<CreateFlag> flag,
final int bufferSize, final short replication, final long blockSize,
final Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE);
final FsPermission modes = applyUMask(permission);
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
return new FsPathOutputStreamRunner(op, f, bufferSize,
new PermissionParam(modes.getMasked()),
new UnmaskedPermissionParam(modes.getUnmasked()),
new CreateFlagParam(flag),
new CreateParentParam(false),
new BufferSizeParam(bufferSize),
new ReplicationParam(replication),
new BlockSizeParam(blockSize)
).run();
}
@Override
public FSDataOutputStream append(final Path f, final int bufferSize,
final Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.APPEND);
final HttpOpParam.Op op = PostOpParam.Op.APPEND;
return new FsPathOutputStreamRunner(op, f, bufferSize,
new BufferSizeParam(bufferSize)
).run();
}
@Override
public boolean truncate(Path f, long newLength) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.TRUNCATE);
final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE;
return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run();
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.DELETE);
final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
return new FsPathBooleanRunner(op, f,
new RecursiveParam(recursive)
).run();
}
@SuppressWarnings("resource")
@Override
public FSDataInputStream open(final Path f, final int bufferSize
) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.OPEN);
WebHdfsInputStream webfsInputStream =
new WebHdfsInputStream(f, bufferSize);
if (webfsInputStream.getFileEncryptionInfo() == null) {
return new FSDataInputStream(webfsInputStream);
} else {
return new FSDataInputStream(
webfsInputStream.createWrappedInputStream());
}
}
@Override
public synchronized void close() throws IOException {
try {
if (canRefreshDelegationToken && delegationToken != null) {
cancelDelegationToken(delegationToken);
}
} catch (IOException ioe) {
LOG.debug("Token cancel failed: ", ioe);
} finally {
if (connectionFactory != null) {
connectionFactory.destroy();
}
super.close();
}
}
// use FsPathConnectionRunner to ensure retries for InvalidTokens
class UnresolvedUrlOpener extends ByteRangeInputStream.URLOpener {
private final FsPathConnectionRunner runner;
UnresolvedUrlOpener(FsPathConnectionRunner runner) {
super(null);
this.runner = runner;
}
@Override
protected HttpURLConnection connect(long offset, boolean resolved)
throws IOException {
assert offset == 0;
HttpURLConnection conn = runner.run();
setURL(conn.getURL());
return conn;
}
}
class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
OffsetUrlOpener(final URL url) {
super(url);
}
/** Setup offset url and connect. */
@Override
protected HttpURLConnection connect(final long offset,
final boolean resolved) throws IOException {
final URL offsetUrl = offset == 0L? url
: new URL(url + "&" + new OffsetParam(offset));
return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved,
true).run();
}
}
private static final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
/** Remove offset parameter, if there is any, from the url */
static URL removeOffsetParam(final URL url) throws MalformedURLException {
String query = url.getQuery();
if (query == null) {
return url;
}
final String lower = StringUtils.toLowerCase(query);
if (!lower.startsWith(OFFSET_PARAM_PREFIX)
&& !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
return url;
}
//rebuild query
StringBuilder b = null;
for(final StringTokenizer st = new StringTokenizer(query, "&");
st.hasMoreTokens();) {
final String token = st.nextToken();
if (!StringUtils.toLowerCase(token).startsWith(OFFSET_PARAM_PREFIX)) {
if (b == null) {
b = new StringBuilder("?").append(token);
} else {
b.append('&').append(token);
}
}
}
query = b == null? "": b.toString();
final String urlStr = url.toString();
return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
}
static class OffsetUrlInputStream extends ByteRangeInputStream {
OffsetUrlInputStream(UnresolvedUrlOpener o, OffsetUrlOpener r)
throws IOException {
super(o, r);
}
/** Remove offset parameter before returning the resolved url. */
@Override
protected URL getResolvedUrl(final HttpURLConnection connection
) throws MalformedURLException {
return removeOffsetParam(connection.getURL());
}
}
/**
* Get {@link FileStatus} of files/directories in the given path. If path
* corresponds to a file then {@link FileStatus} of that file is returned.
* Else if path represents a directory then {@link FileStatus} of all
* files/directories inside given path is returned.
*
* @param f given path
* @return the statuses of the files/directories in the given path
*/
@Override
public FileStatus[] listStatus(final Path f) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
final URI fsUri = getUri();
final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
return new FsPathResponseRunner<FileStatus[]>(op, f) {
@Override
FileStatus[] decodeResponse(Map<?,?> json) {
HdfsFileStatus[] hdfsStatuses =
JsonUtilClient.toHdfsFileStatusArray(json);
final FileStatus[] statuses = new FileStatus[hdfsStatuses.length];
for (int i = 0; i < hdfsStatuses.length; i++) {
statuses[i] = hdfsStatuses[i].makeQualified(fsUri, f);
}
return statuses;
}
}.run();
}
private static final byte[] EMPTY_ARRAY = new byte[] {};
/**
* Get DirectoryEntries of the given path. DirectoryEntries contains an array
* of {@link FileStatus}, as well as iteration information.
*
* @param f given path
* @return DirectoryEntries for given path
*/
@Override
public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
FileNotFoundException, IOException {
byte[] prevKey = EMPTY_ARRAY;
if (token != null) {
prevKey = token;
}
DirectoryListing listing = new FsPathResponseRunner<DirectoryListing>(
GetOpParam.Op.LISTSTATUS_BATCH,
f, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) {
@Override
DirectoryListing decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.toDirectoryListing(json);
}
}.run();
// Qualify the returned FileStatus array
final URI fsUri = getUri();
final HdfsFileStatus[] statuses = listing.getPartialListing();
FileStatus[] qualified = new FileStatus[statuses.length];
for (int i = 0; i < statuses.length; i++) {
qualified[i] = statuses[i].makeQualified(fsUri, f);
}
return new DirectoryEntries(qualified, listing.getLastName(),
listing.hasMore());
}
@Override
public Token<DelegationTokenIdentifier> getDelegationToken(
final String renewer) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
Token<DelegationTokenIdentifier> token =
new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
op, null, new RenewerParam(renewer)) {
@Override
Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
throws IOException {
return JsonUtilClient.toDelegationToken(json);
}
}.run();
if (token != null) {
token.setService(tokenServiceName);
} else {
if (disallowFallbackToInsecureCluster) {
throw new AccessControlException(CANT_FALLBACK_TO_INSECURE_MSG);
}
}
return token;
}
@Override
public DelegationTokenIssuer[] getAdditionalTokenIssuers()
throws IOException {
KeyProvider keyProvider = getKeyProvider();
if (keyProvider instanceof DelegationTokenIssuer) {
return new DelegationTokenIssuer[] {(DelegationTokenIssuer) keyProvider};
}
return null;
}
@Override
public synchronized Token<?> getRenewToken() {
return delegationToken;
}
@Override
public <T extends TokenIdentifier> void setDelegationToken(
final Token<T> token) {
synchronized (this) {
delegationToken = token;
}
}
@Override
public synchronized long renewDelegationToken(final Token<?> token
) throws IOException {
final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
return new FsPathResponseRunner<Long>(op, null,
new TokenArgumentParam(token.encodeToUrlString())) {
@Override
Long decodeResponse(Map<?,?> json) throws IOException {
return ((Number) json.get("long")).longValue();
}
}.run();
}
@Override
public synchronized void cancelDelegationToken(final Token<?> token
) throws IOException {
final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
new FsPathRunner(op, null,
new TokenArgumentParam(token.encodeToUrlString())
).run();
}
public BlockLocation[] getFileBlockLocations(final FileStatus status,
final long offset, final long length) throws IOException {
if (status == null) {
return null;
}
return getFileBlockLocations(status.getPath(), offset, length);
}
@Override
public BlockLocation[] getFileBlockLocations(final Path p,
final long offset, final long length) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
return new FsPathResponseRunner<BlockLocation[]>(op, p,
new OffsetParam(offset), new LengthParam(length)) {
@Override
BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
return DFSUtilClient.locatedBlocks2Locations(
JsonUtilClient.toLocatedBlocks(json));
}
}.run();
}
@Override
public Path getTrashRoot(Path path) {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_TRASH_ROOT);
final HttpOpParam.Op op = GetOpParam.Op.GETTRASHROOT;
try {
String strTrashPath = new FsPathResponseRunner<String>(op, path) {
@Override
String decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.getPath(json);
}
}.run();
return new Path(strTrashPath).makeQualified(getUri(), null);
} catch(IOException e) {
LOG.warn("Cannot find trash root of " + path, e);
// keep the same behavior with dfs
return super.getTrashRoot(path).makeQualified(getUri(), null);
}
}
@Override
public void access(final Path path, final FsAction mode) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.CHECKACCESS;
new FsPathRunner(op, path, new FsActionParam(mode)).run();
}
@Override
public ContentSummary getContentSummary(final Path p) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY);
final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
return new FsPathResponseRunner<ContentSummary>(op, p) {
@Override
ContentSummary decodeResponse(Map<?,?> json) {
return JsonUtilClient.toContentSummary(json);
}
}.run();
}
@Override
public QuotaUsage getQuotaUsage(final Path p) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_QUOTA_USAGE);
final HttpOpParam.Op op = GetOpParam.Op.GETQUOTAUSAGE;
return new FsPathResponseRunner<QuotaUsage>(op, p) {
@Override
QuotaUsage decodeResponse(Map<?, ?> json) {
return JsonUtilClient.toQuotaUsage(json);
}
}.run();
}
@Override
public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM);
final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
@Override
MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
return JsonUtilClient.toMD5MD5CRC32FileChecksum(json);
}
}.run();
}
/**
* Resolve an HDFS URL into real INetSocketAddress. It works like a DNS
* resolver when the URL points to an non-HA cluster. When the URL points to
* an HA cluster with its logical name, the resolver further resolves the
* logical name(i.e., the authority in the URL) into real namenode addresses.
*/
private InetSocketAddress[] resolveNNAddr() {
Configuration conf = getConf();
final String scheme = uri.getScheme();
ArrayList<InetSocketAddress> ret = new ArrayList<>();
if (!HAUtilClient.isLogicalUri(conf, uri)) {
InetSocketAddress addr = NetUtils.createSocketAddr(uri.getAuthority(),
getDefaultPort());
ret.add(addr);
} else {
Map<String, Map<String, InetSocketAddress>> addresses = DFSUtilClient
.getHaNnWebHdfsAddresses(conf, scheme);
// Extract the entry corresponding to the logical name.
Map<String, InetSocketAddress> addrs = addresses.get(uri.getHost());
for (InetSocketAddress addr : addrs.values()) {
ret.add(addr);
}
}
InetSocketAddress[] r = new InetSocketAddress[ret.size()];
return ret.toArray(r);
}
@Override
public String getCanonicalServiceName() {
return tokenServiceName == null ? super.getCanonicalServiceName()
: tokenServiceName.toString();
}
@Override
public void setStoragePolicy(Path p, String policyName) throws IOException {
if (policyName == null) {
throw new IOException("policyName == null");
}
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_STORAGE_POLICY);
final HttpOpParam.Op op = PutOpParam.Op.SETSTORAGEPOLICY;
new FsPathRunner(op, p, new StoragePolicyParam(policyName)).run();
}
@Override
public Collection<BlockStoragePolicy> getAllStoragePolicies()
throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETALLSTORAGEPOLICY;
return new FsPathResponseRunner<Collection<BlockStoragePolicy>>(op, null) {
@Override
Collection<BlockStoragePolicy> decodeResponse(Map<?, ?> json)
throws IOException {
return JsonUtilClient.getStoragePolicies(json);
}
}.run();
}
@Override
public BlockStoragePolicy getStoragePolicy(Path src) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETSTORAGEPOLICY;
return new FsPathResponseRunner<BlockStoragePolicy>(op, src) {
@Override
BlockStoragePolicy decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.toBlockStoragePolicy((Map<?, ?>) json
.get(BlockStoragePolicy.class.getSimpleName()));
}
}.run();
}
@Override
public void unsetStoragePolicy(Path src) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.UNSET_STORAGE_POLICY);
final HttpOpParam.Op op = PostOpParam.Op.UNSETSTORAGEPOLICY;
new FsPathRunner(op, src).run();
}
/*
* Caller of this method should handle UnsupportedOperationException in case
* when new client is talking to old namenode that don't support
* FsServerDefaults call.
*/
@Override
public FsServerDefaults getServerDefaults() throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETSERVERDEFAULTS;
return new FsPathResponseRunner<FsServerDefaults>(op, null) {
@Override
FsServerDefaults decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.toFsServerDefaults(json);
}
}.run();
}
@VisibleForTesting
InetSocketAddress[] getResolvedNNAddr() {
return nnAddrs;
}
@VisibleForTesting
public void setRetryPolicy(RetryPolicy rp) {
this.retryPolicy = rp;
}
@Override
public URI getKeyProviderUri() throws IOException {
String keyProviderUri = null;
try {
keyProviderUri = getServerDefaults().getKeyProviderUri();
} catch (UnsupportedOperationException e) {
// This means server doesn't support GETSERVERDEFAULTS call.
// Do nothing, let keyProviderUri = null.
} catch (RemoteException e) {
if (e.getClassName() != null &&
e.getClassName().equals("java.lang.IllegalArgumentException")) {
// See HDFS-13100.
// This means server doesn't support GETSERVERDEFAULTS call.
// Do nothing, let keyProviderUri = null.
} else {
throw e;
}
}
return HdfsKMSUtil.getKeyProviderUri(ugi, getUri(), keyProviderUri,
getConf());
}
@Override
public KeyProvider getKeyProvider() throws IOException {
if (testProvider != null) {
return testProvider;
}
URI keyProviderUri = getKeyProviderUri();
if (keyProviderUri == null) {
return null;
}
return KMSUtil.createKeyProviderFromUri(getConf(), keyProviderUri);
}
@VisibleForTesting
public void setTestProvider(KeyProvider kp) {
testProvider = kp;
}
/**
* This class is used for opening, reading, and seeking files while using the
* WebHdfsFileSystem. This class will invoke the retry policy when performing
* any of these actions.
*/
@VisibleForTesting
public class WebHdfsInputStream extends FSInputStream {
private ReadRunner readRunner = null;
WebHdfsInputStream(Path path, int buffersize) throws IOException {
// Only create the ReadRunner once. Each read's byte array and position
// will be updated within the ReadRunner object before every read.
readRunner = new ReadRunner(path, buffersize);
}
@Override
public int read() throws IOException {
final byte[] b = new byte[1];
return (read(b, 0, 1) == -1) ? -1 : (b[0] & 0xff);
}
@Override
public int read(byte b[], int off, int len) throws IOException {
return readRunner.read(b, off, len);
}
@Override
public void seek(long newPos) throws IOException {
readRunner.seek(newPos);
}
@Override
public long getPos() throws IOException {
return readRunner.getPos();
}
protected int getBufferSize() throws IOException {
return readRunner.getBufferSize();
}
protected Path getPath() throws IOException {
return readRunner.getPath();
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public void close() throws IOException {
readRunner.close();
}
public void setFileLength(long len) {
readRunner.setFileLength(len);
}
public long getFileLength() {
return readRunner.getFileLength();
}
@VisibleForTesting
ReadRunner getReadRunner() {
return readRunner;
}
@VisibleForTesting
void setReadRunner(ReadRunner rr) {
this.readRunner = rr;
}
FileEncryptionInfo getFileEncryptionInfo() {
return readRunner.getFileEncryptionInfo();
}
InputStream createWrappedInputStream() throws IOException {
return HdfsKMSUtil.createWrappedInputStream(
this, getKeyProvider(), getFileEncryptionInfo(), getConf());
}
}
enum RunnerState {
DISCONNECTED, // Connection is closed programmatically by ReadRunner
OPEN, // Connection has been established by ReadRunner
SEEK, // Calling code has explicitly called seek()
CLOSED // Calling code has explicitly called close()
}
/**
* This class will allow retries to occur for both open and read operations.
* The first WebHdfsFileSystem#open creates a new WebHdfsInputStream object,
* which creates a new ReadRunner object that will be used to open a
* connection and read or seek into the input stream.
*
* ReadRunner is a subclass of the AbstractRunner class, which will run the
* ReadRunner#getUrl(), ReadRunner#connect(URL), and ReadRunner#getResponse
* methods within a retry loop, based on the configured retry policy.
* ReadRunner#connect will create a connection if one has not already been
* created. Otherwise, it will return the previously created connection
* object. This is necessary because a new connection should not be created
* for every read.
* Likewise, ReadRunner#getUrl will construct a new URL object only if the
* connection has not previously been established. Otherwise, it will return
* the previously created URL object.
* ReadRunner#getResponse will initialize the input stream if it has not
* already been initialized and read the requested data from the specified
* input stream.
*/
@VisibleForTesting
protected class ReadRunner extends AbstractFsPathRunner<Integer> {
private InputStream in = null;
private HttpURLConnection cachedConnection = null;
private byte[] readBuffer;
private int readOffset;
private int readLength;
private RunnerState runnerState = RunnerState.SEEK;
private URL originalUrl = null;
private URL resolvedUrl = null;
private final Path path;
private final int bufferSize;
private long pos = 0;
private long fileLength = 0;
private FileEncryptionInfo feInfo = null;
/* The following methods are WebHdfsInputStream helpers. */
ReadRunner(Path p, int bs) throws IOException {
super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs));
this.path = p;
this.bufferSize = bs;
getRedirectedUrl();
}
private void getRedirectedUrl() throws IOException {
URLRunner urlRunner = new URLRunner(GetOpParam.Op.OPEN, null, false,
false) {
@Override
protected URL getUrl() throws IOException {
return toUrl(op, path, new BufferSizeParam(bufferSize));
}
};
HttpURLConnection conn = urlRunner.run();
String feInfoStr = conn.getHeaderField(FEFINFO_HEADER);
if (feInfoStr != null) {
Decoder decoder = Base64.getDecoder();
byte[] decodedBytes = decoder.decode(
feInfoStr.getBytes(StandardCharsets.UTF_8));
feInfo = PBHelperClient
.convert(FileEncryptionInfoProto.parseFrom(decodedBytes));
}
String location = conn.getHeaderField("Location");
if (location != null) {
// This saves the location for datanode where redirect was issued.
// Need to remove offset because seek can be called after open.
resolvedUrl = removeOffsetParam(new URL(location));
} else {
// This is cached for proxies like httpfsfilesystem.
cachedConnection = conn;
}
originalUrl = super.getUrl();
}
int read(byte[] b, int off, int len) throws IOException {
if (runnerState == RunnerState.CLOSED) {
throw new IOException("Stream closed");
}
if (len == 0) {
return 0;
}
// Before the first read, pos and fileLength will be 0 and readBuffer
// will all be null. They will be initialized once the first connection
// is made. Only after that it makes sense to compare pos and fileLength.
if (pos >= fileLength && readBuffer != null) {
return -1;
}
// If a seek is occurring, the input stream will have been closed, so it
// needs to be reopened. Use the URLRunner to call AbstractRunner#connect
// with the previously-cached resolved URL and with the 'redirected' flag
// set to 'true'. The resolved URL contains the URL of the previously
// opened DN as opposed to the NN. It is preferable to use the resolved
// URL when creating a connection because it does not hit the NN or every
// seek, nor does it open a connection to a new DN after every seek.
// The redirect flag is needed so that AbstractRunner#connect knows the
// URL is already resolved.
// Note that when the redirected flag is set, retries are not attempted.
// So, if the connection fails using URLRunner, clear out the connection
// and fall through to establish the connection using ReadRunner.
if (runnerState == RunnerState.SEEK) {
try {
final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos));
cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true,
false).run();
} catch (IOException ioe) {
closeInputStream(RunnerState.DISCONNECTED);
}
}
readBuffer = b;
readOffset = off;
readLength = len;
int count = -1;
count = this.run();
if (count >= 0) {
statistics.incrementBytesRead(count);
pos += count;
} else if (pos < fileLength) {
throw new EOFException(
"Premature EOF: pos=" + pos + " < filelength=" + fileLength);
}
return count;
}
void seek(long newPos) throws IOException {
if (pos != newPos) {
pos = newPos;
closeInputStream(RunnerState.SEEK);
}
}
public void close() throws IOException {
closeInputStream(RunnerState.CLOSED);
}
/* The following methods are overriding AbstractRunner methods,
* to be called within the retry policy context by runWithRetry.
*/
@Override
protected URL getUrl() throws IOException {
// This method is called every time either a read is executed.
// The check for connection == null is to ensure that a new URL is only
// created upon a new connection and not for every read.
if (cachedConnection == null) {
// Update URL with current offset. BufferSize doesn't change, but it
// still must be included when creating the new URL.
updateURLParameters(new BufferSizeParam(bufferSize),
new OffsetParam(pos));
originalUrl = super.getUrl();
}
return originalUrl;
}
/* Only make the connection if it is not already open. Don't cache the
* connection here. After this method is called, runWithRetry will call
* validateResponse, and then call the below ReadRunner#getResponse. If
* the code path makes it that far, then we can cache the connection.
*/
@Override
protected HttpURLConnection connect(URL url) throws IOException {
HttpURLConnection conn = cachedConnection;
if (conn == null) {
try {
conn = super.connect(url);
} catch (IOException e) {
closeInputStream(RunnerState.DISCONNECTED);
throw e;
}
}
return conn;
}
/*
* This method is used to perform reads within the retry policy context.
* This code is relying on runWithRetry to always call the above connect
* method and the verifyResponse method prior to calling getResponse.
*/
@Override
Integer getResponse(final HttpURLConnection conn)
throws IOException {
try {
// In the "open-then-read" use case, runWithRetry will have executed
// ReadRunner#connect to make the connection and then executed
// validateResponse to validate the response code. Only then do we want
// to cache the connection.
// In the "read-after-seek" use case, the connection is made and the
// response is validated by the URLRunner. ReadRunner#read then caches
// the connection and the ReadRunner#connect will pass on the cached
// connection
// In either case, stream initialization is done here if necessary.
cachedConnection = conn;
if (in == null) {
in = initializeInputStream(conn);
}
int count = in.read(readBuffer, readOffset, readLength);
if (count < 0 && pos < fileLength) {
throw new EOFException(
"Premature EOF: pos=" + pos + " < filelength=" + fileLength);
}
return Integer.valueOf(count);
} catch (IOException e) {
String redirectHost = resolvedUrl.getAuthority();
if (excludeDatanodes.getValue() != null) {
excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
+ excludeDatanodes.getValue());
} else {
excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
}
// If an exception occurs, close the input stream and null it out so
// that if the abstract runner decides to retry, it will reconnect.
closeInputStream(RunnerState.DISCONNECTED);
throw e;
}
}
@VisibleForTesting
InputStream initializeInputStream(HttpURLConnection conn)
throws IOException {
// Cache the resolved URL so that it can be used in the event of
// a future seek operation.
resolvedUrl = removeOffsetParam(conn.getURL());
final String cl = conn.getHeaderField(HttpHeaders.CONTENT_LENGTH);
InputStream inStream = conn.getInputStream();
if (LOG.isDebugEnabled()) {
LOG.debug("open file: " + conn.getURL());
}
if (cl != null) {
long streamLength = Long.parseLong(cl);
fileLength = pos + streamLength;
// Java has a bug with >2GB request streams. It won't bounds check
// the reads so the transfer blocks until the server times out
inStream = new BoundedInputStream(inStream, streamLength);
} else {
fileLength = getHdfsFileStatus(path).getLen();
}
// Wrapping in BufferedInputStream because it is more performant than
// BoundedInputStream by itself.
runnerState = RunnerState.OPEN;
return new BufferedInputStream(inStream, bufferSize);
}
// Close both the InputStream and the connection.
@VisibleForTesting
void closeInputStream(RunnerState rs) throws IOException {
if (in != null) {
IOUtils.close(cachedConnection);
in = null;
}
cachedConnection = null;
runnerState = rs;
}
/* Getters and Setters */
@VisibleForTesting
protected InputStream getInputStream() {
return in;
}
@VisibleForTesting
protected void setInputStream(InputStream inStream) {
in = inStream;
}
Path getPath() {
return path;
}
int getBufferSize() {
return bufferSize;
}
long getFileLength() {
return fileLength;
}
void setFileLength(long len) {
fileLength = len;
}
long getPos() {
return pos;
}
protected FileEncryptionInfo getFileEncryptionInfo() {
return feInfo;
}
}
}