blob: 188371e5e3a90d68319617d32bafafa72e6cdfd9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientRequest;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
/**
* Utility Connector class which is used by timeline clients to securely get
* connected to the timeline server.
*
*/
public class TimelineConnector extends AbstractService {
private static final Joiner JOINER = Joiner.on("");
private static final Log LOG = LogFactory.getLog(TimelineConnector.class);
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
private SSLFactory sslFactory;
private Client client;
private ConnectionConfigurator connConfigurator;
private DelegationTokenAuthenticator authenticator;
private DelegationTokenAuthenticatedURL.Token token;
private UserGroupInformation authUgi;
private String doAsUser;
@VisibleForTesting
TimelineClientConnectionRetry connectionRetry;
private boolean requireConnectionRetry;
public TimelineConnector(boolean requireConnectionRetry,
UserGroupInformation authUgi, String doAsUser,
DelegationTokenAuthenticatedURL.Token token) {
super("TimelineConnector");
this.requireConnectionRetry = requireConnectionRetry;
this.authUgi = authUgi;
this.doAsUser = doAsUser;
this.token = token;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
ClientConfig cc = new DefaultClientConfig();
cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
if (YarnConfiguration.useHttps(conf)) {
// If https is chosen, configures SSL client.
sslFactory = getSSLFactory(conf);
connConfigurator = getConnConfigurator(sslFactory);
} else {
connConfigurator = DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
}
if (UserGroupInformation.isSecurityEnabled()) {
authenticator = new KerberosDelegationTokenAuthenticator();
} else {
authenticator = new PseudoDelegationTokenAuthenticator();
}
authenticator.setConnectionConfigurator(connConfigurator);
connectionRetry = new TimelineClientConnectionRetry(conf);
client =
new Client(
new URLConnectionClientHandler(new TimelineURLConnectionFactory(
authUgi, authenticator, connConfigurator, token, doAsUser)),
cc);
if (requireConnectionRetry) {
TimelineJerseyRetryFilter retryFilter =
new TimelineJerseyRetryFilter(connectionRetry);
client.addFilter(retryFilter);
}
}
private static final ConnectionConfigurator
DEFAULT_TIMEOUT_CONN_CONFIGURATOR = new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
return conn;
}
};
private ConnectionConfigurator getConnConfigurator(SSLFactory sslFactoryObj) {
try {
return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, sslFactoryObj);
} catch (Exception e) {
LOG.debug("Cannot load customized ssl related configuration. "
+ "Fallback to system-generic settings.", e);
return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
}
}
private static ConnectionConfigurator initSslConnConfigurator(
final int timeout, SSLFactory sslFactory)
throws IOException, GeneralSecurityException {
final SSLSocketFactory sf;
final HostnameVerifier hv;
sf = sslFactory.createSSLSocketFactory();
hv = sslFactory.getHostnameVerifier();
return new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
if (conn instanceof HttpsURLConnection) {
HttpsURLConnection c = (HttpsURLConnection) conn;
c.setSSLSocketFactory(sf);
c.setHostnameVerifier(hv);
}
setTimeouts(conn, timeout);
return conn;
}
};
}
protected SSLFactory getSSLFactory(Configuration conf)
throws GeneralSecurityException, IOException {
SSLFactory newSSLFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
newSSLFactory.init();
return newSSLFactory;
}
private static void setTimeouts(URLConnection connection, int socketTimeout) {
connection.setConnectTimeout(socketTimeout);
connection.setReadTimeout(socketTimeout);
}
public static URI constructResURI(Configuration conf, String address,
String uri) {
return URI.create(
JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
address, uri));
}
DelegationTokenAuthenticatedURL getDelegationTokenAuthenticatedURL() {
return new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
}
protected void serviceStop() {
if (this.sslFactory != null) {
this.sslFactory.destroy();
}
}
public Client getClient() {
return client;
}
public Object operateDelegationToken(
final PrivilegedExceptionAction<?> action)
throws IOException, YarnException {
// Set up the retry operation
TimelineClientRetryOp tokenRetryOp =
createRetryOpForOperateDelegationToken(action);
return connectionRetry.retryOn(tokenRetryOp);
}
@Private
@VisibleForTesting
TimelineClientRetryOp createRetryOpForOperateDelegationToken(
final PrivilegedExceptionAction<?> action) throws IOException {
return new TimelineClientRetryOpForOperateDelegationToken(this.authUgi,
action);
}
/**
* Abstract class for an operation that should be retried by timeline client.
*/
@Private
@VisibleForTesting
public static abstract class TimelineClientRetryOp {
// The operation that should be retried
public abstract Object run() throws IOException;
// The method to indicate if we should retry given the incoming exception
public abstract boolean shouldRetryOn(Exception e);
}
private static class TimelineURLConnectionFactory
implements HttpURLConnectionFactory {
private DelegationTokenAuthenticator authenticator;
private UserGroupInformation authUgi;
private ConnectionConfigurator connConfigurator;
private Token token;
private String doAsUser;
public TimelineURLConnectionFactory(UserGroupInformation authUgi,
DelegationTokenAuthenticator authenticator,
ConnectionConfigurator connConfigurator,
DelegationTokenAuthenticatedURL.Token token, String doAsUser) {
this.authUgi = authUgi;
this.authenticator = authenticator;
this.connConfigurator = connConfigurator;
this.token = token;
this.doAsUser = doAsUser;
}
@Override
public HttpURLConnection getHttpURLConnection(final URL url)
throws IOException {
authUgi.checkTGTAndReloginFromKeytab();
try {
return new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator).openConnection(url, token, doAsUser);
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (AuthenticationException ae) {
throw new IOException(ae);
}
}
}
// Class to handle retry
// Outside this class, only visible to tests
@Private
@VisibleForTesting
static class TimelineClientConnectionRetry {
// maxRetries < 0 means keep trying
@Private
@VisibleForTesting
public int maxRetries;
@Private
@VisibleForTesting
public long retryInterval;
// Indicates if retries happened last time. Only tests should read it.
// In unit tests, retryOn() calls should _not_ be concurrent.
private boolean retried = false;
@Private
@VisibleForTesting
boolean getRetired() {
return retried;
}
// Constructor with default retry settings
public TimelineClientConnectionRetry(Configuration conf) {
Preconditions.checkArgument(
conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES)
>= -1,
"%s property value should be greater than or equal to -1",
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
Preconditions.checkArgument(
conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.
DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
"%s property value should be greater than zero",
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
maxRetries =
conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
retryInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
}
public Object retryOn(TimelineClientRetryOp op)
throws RuntimeException, IOException {
int leftRetries = maxRetries;
retried = false;
// keep trying
while (true) {
try {
// try perform the op, if fail, keep retrying
return op.run();
} catch (IOException | RuntimeException e) {
// break if there's no retries left
if (leftRetries == 0) {
break;
}
if (op.shouldRetryOn(e)) {
logException(e, leftRetries);
} else {
throw e;
}
}
if (leftRetries > 0) {
leftRetries--;
}
retried = true;
try {
// sleep for the given time interval
Thread.sleep(retryInterval);
} catch (InterruptedException ie) {
LOG.warn("Client retry sleep interrupted! ");
}
}
throw new RuntimeException("Failed to connect to timeline server. "
+ "Connection retries limit exceeded. "
+ "The posted timeline event may be missing");
};
private void logException(Exception e, int leftRetries) {
if (leftRetries > 0) {
LOG.info(
"Exception caught by TimelineClientConnectionRetry," + " will try "
+ leftRetries + " more time(s).\nMessage: " + e.getMessage());
} else {
// note that maxRetries may be -1 at the very beginning
LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
+ " will keep retrying.\nMessage: " + e.getMessage());
}
}
}
private static class TimelineJerseyRetryFilter extends ClientFilter {
private TimelineClientConnectionRetry connectionRetry;
public TimelineJerseyRetryFilter(
TimelineClientConnectionRetry connectionRetry) {
this.connectionRetry = connectionRetry;
}
@Override
public ClientResponse handle(final ClientRequest cr)
throws ClientHandlerException {
// Set up the retry operation
TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
@Override
public Object run() {
// Try pass the request, if fail, keep retrying
return getNext().handle(cr);
}
@Override
public boolean shouldRetryOn(Exception e) {
// Only retry on connection exceptions
return (e instanceof ClientHandlerException)
&& (e.getCause() instanceof ConnectException
|| e.getCause() instanceof SocketTimeoutException
|| e.getCause() instanceof SocketException);
}
};
try {
return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
} catch (IOException e) {
throw new ClientHandlerException(
"Jersey retry failed!\nMessage: " + e.getMessage());
}
}
}
@Private
@VisibleForTesting
public static class TimelineClientRetryOpForOperateDelegationToken
extends TimelineClientRetryOp {
private final UserGroupInformation authUgi;
private final PrivilegedExceptionAction<?> action;
public TimelineClientRetryOpForOperateDelegationToken(
UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
this.authUgi = authUgi;
this.action = action;
}
@Override
public Object run() throws IOException {
// Try pass the request, if fail, keep retrying
authUgi.checkTGTAndReloginFromKeytab();
try {
return authUgi.doAs(action);
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public boolean shouldRetryOn(Exception e) {
// retry on connection exceptions
// and SocketTimeoutException
return (e instanceof ConnectException
|| e instanceof SocketTimeoutException);
}
}
}