blob: 4a5a443d02d0cbcd92d2956da3a7f9b1bf788071 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientRequest;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import com.sun.jersey.core.util.MultivaluedMapImpl;
@Private
@Evolving
public class TimelineClientImpl extends TimelineClient {
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
private static final Joiner JOINER = Joiner.on("");
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
private static Options opts;
private static final String ENTITY_DATA_TYPE = "entity";
private static final String DOMAIN_DATA_TYPE = "domain";
static {
opts = new Options();
opts.addOption("put", true, "Put the timeline entities/domain in a JSON file");
opts.getOption("put").setArgName("Path to the JSON file");
opts.addOption(ENTITY_DATA_TYPE, false, "Specify the JSON file contains the entities");
opts.addOption(DOMAIN_DATA_TYPE, false, "Specify the JSON file contains the domain");
opts.addOption("help", false, "Print usage");
}
private Client client;
private ConnectionConfigurator connConfigurator;
private DelegationTokenAuthenticator authenticator;
private DelegationTokenAuthenticatedURL.Token token;
private UserGroupInformation authUgi;
private String doAsUser;
private Configuration configuration;
private float timelineServiceVersion;
private TimelineWriter timelineWriter;
private volatile String timelineServiceAddress;
// Retry parameters for identifying new timeline service
// TODO consider to merge with connection retry
private int maxServiceRetries;
private long serviceRetryInterval;
private boolean timelineServiceV2 = false;
@Private
@VisibleForTesting
TimelineClientConnectionRetry connectionRetry;
private TimelineEntityDispatcher entityDispatcher;
// Abstract class for an operation that should be retried by timeline client
@Private
@VisibleForTesting
public static abstract class TimelineClientRetryOp {
// The operation that should be retried
public abstract Object run() throws IOException;
// The method to indicate if we should retry given the incoming exception
public abstract boolean shouldRetryOn(Exception e);
}
// Class to handle retry
// Outside this class, only visible to tests
@Private
@VisibleForTesting
static class TimelineClientConnectionRetry {
// maxRetries < 0 means keep trying
@Private
@VisibleForTesting
public int maxRetries;
@Private
@VisibleForTesting
public long retryInterval;
// Indicates if retries happened last time. Only tests should read it.
// In unit tests, retryOn() calls should _not_ be concurrent.
private boolean retried = false;
@Private
@VisibleForTesting
boolean getRetired() {
return retried;
}
// Constructor with default retry settings
public TimelineClientConnectionRetry(Configuration conf) {
Preconditions.checkArgument(conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES) >= -1,
"%s property value should be greater than or equal to -1",
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
Preconditions
.checkArgument(
conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
"%s property value should be greater than zero",
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
maxRetries = conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
retryInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
}
public Object retryOn(TimelineClientRetryOp op)
throws RuntimeException, IOException {
int leftRetries = maxRetries;
retried = false;
// keep trying
while (true) {
try {
// try perform the op, if fail, keep retrying
return op.run();
} catch (IOException | RuntimeException e) {
// break if there's no retries left
if (leftRetries == 0) {
break;
}
if (op.shouldRetryOn(e)) {
logException(e, leftRetries);
} else {
throw e;
}
}
if (leftRetries > 0) {
leftRetries--;
}
retried = true;
try {
// sleep for the given time interval
Thread.sleep(retryInterval);
} catch (InterruptedException ie) {
LOG.warn("Client retry sleep interrupted! ");
}
}
throw new RuntimeException("Failed to connect to timeline server. "
+ "Connection retries limit exceeded. "
+ "The posted timeline event may be missing");
};
private void logException(Exception e, int leftRetries) {
if (leftRetries > 0) {
LOG.info("Exception caught by TimelineClientConnectionRetry,"
+ " will try " + leftRetries + " more time(s).\nMessage: "
+ e.getMessage());
} else {
// note that maxRetries may be -1 at the very beginning
LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
+ " will keep retrying.\nMessage: "
+ e.getMessage());
}
}
}
private class TimelineJerseyRetryFilter extends ClientFilter {
@Override
public ClientResponse handle(final ClientRequest cr)
throws ClientHandlerException {
// Set up the retry operation
TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
@Override
public Object run() {
// Try pass the request, if fail, keep retrying
return getNext().handle(cr);
}
@Override
public boolean shouldRetryOn(Exception e) {
// Only retry on connection exceptions
return (e instanceof ClientHandlerException)
&& (e.getCause() instanceof ConnectException);
}
};
try {
return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
} catch (IOException e) {
throw new ClientHandlerException("Jersey retry failed!\nMessage: "
+ e.getMessage());
}
}
}
public TimelineClientImpl() {
super(TimelineClientImpl.class.getName(), null);
}
public TimelineClientImpl(ApplicationId applicationId) {
super(TimelineClientImpl.class.getName(), applicationId);
this.timelineServiceV2 = true;
}
protected void serviceInit(Configuration conf) throws Exception {
this.configuration = conf;
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser();
if (realUgi != null) {
authUgi = realUgi;
doAsUser = ugi.getShortUserName();
} else {
authUgi = ugi;
doAsUser = null;
}
ClientConfig cc = new DefaultClientConfig();
cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
connConfigurator = newConnConfigurator(conf);
if (UserGroupInformation.isSecurityEnabled()) {
authenticator = new KerberosDelegationTokenAuthenticator();
} else {
authenticator = new PseudoDelegationTokenAuthenticator();
}
authenticator.setConnectionConfigurator(connConfigurator);
token = new DelegationTokenAuthenticatedURL.Token();
connectionRetry = new TimelineClientConnectionRetry(conf);
client = new Client(new URLConnectionClientHandler(
new TimelineURLConnectionFactory()), cc);
TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
// TODO need to cleanup filter retry later.
if (!timelineServiceV2) {
client.addFilter(retryFilter);
}
// old version timeline service need to get address from configuration
// while new version need to auto discovery (with retry).
if (timelineServiceV2) {
maxServiceRetries = conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
serviceRetryInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
entityDispatcher = new TimelineEntityDispatcher(conf);
} else {
if (YarnConfiguration.useHttps(conf)) {
setTimelineServiceAddress(conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS));
} else {
setTimelineServiceAddress(conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
}
timelineServiceVersion =
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
LOG.info("Timeline service address: " + getTimelineServiceAddress());
}
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
if (timelineServiceV2) {
entityDispatcher.start();
} else {
timelineWriter = createTimelineWriter(configuration, authUgi, client,
constructResURI(getConfig(), timelineServiceAddress, false));
}
}
protected TimelineWriter createTimelineWriter(Configuration conf,
UserGroupInformation ugi, Client webClient, URI uri)
throws IOException {
if (Float.compare(this.timelineServiceVersion, 1.5f) == 0) {
return new FileSystemTimelineWriter(
conf, ugi, webClient, uri);
} else {
return new DirectTimelineWriter(ugi, webClient, uri);
}
}
@Override
protected void serviceStop() throws Exception {
if (this.timelineWriter != null) {
this.timelineWriter.close();
}
if (timelineServiceV2) {
entityDispatcher.stop();
}
super.serviceStop();
}
@Override
public void flush() throws IOException {
if (timelineWriter != null) {
timelineWriter.flush();
}
}
@Override
public TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException {
return timelineWriter.putEntities(entities);
}
@Override
public void putEntities(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
entities) throws IOException, YarnException {
if (!timelineServiceV2) {
throw new YarnException("v.2 method is invoked on a v.1.x client");
}
entityDispatcher.dispatchEntities(true, entities);
}
@Override
public void putEntitiesAsync(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
entities) throws IOException, YarnException {
if (!timelineServiceV2) {
throw new YarnException("v.2 method is invoked on a v.1.x client");
}
entityDispatcher.dispatchEntities(false, entities);
}
@Override
public void putDomain(TimelineDomain domain) throws IOException,
YarnException {
timelineWriter.putDomain(domain);
}
// Used for new timeline service only
@Private
protected void putObjects(String path, MultivaluedMap<String, String> params,
Object obj) throws IOException, YarnException {
int retries = verifyRestEndPointAvailable();
// timelineServiceAddress could be stale, add retry logic here.
boolean needRetry = true;
while (needRetry) {
try {
URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
putObjects(uri, path, params, obj);
needRetry = false;
} catch (IOException e) {
// handle exception for timelineServiceAddress being updated.
checkRetryWithSleep(retries, e);
retries--;
}
}
}
private int verifyRestEndPointAvailable() throws YarnException {
// timelineServiceAddress could haven't be initialized yet
// or stale (only for new timeline service)
int retries = pollTimelineServiceAddress(this.maxServiceRetries);
if (timelineServiceAddress == null) {
String errMessage = "TimelineClient has reached to max retry times : "
+ this.maxServiceRetries
+ ", but failed to fetch timeline service address. Please verify"
+ " Timeline Auxillary Service is configured in all the NMs";
LOG.error(errMessage);
throw new YarnException(errMessage);
}
return retries;
}
/**
* Check if reaching to maximum of retries.
* @param retries
* @param e
*/
private void checkRetryWithSleep(int retries, IOException e)
throws YarnException, IOException {
if (retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while retrying to connect to ATS");
}
} else {
StringBuilder msg =
new StringBuilder("TimelineClient has reached to max retry times : ");
msg.append(this.maxServiceRetries);
msg.append(" for service address: ");
msg.append(timelineServiceAddress);
LOG.error(msg.toString());
throw new IOException(msg.toString(), e);
}
}
protected void putObjects(
URI base, String path, MultivaluedMap<String, String> params, Object obj)
throws IOException, YarnException {
ClientResponse resp;
try {
resp = client.resource(base).path(path).queryParams(params)
.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.put(ClientResponse.class, obj);
} catch (RuntimeException re) {
// runtime exception is expected if the client cannot connect the server
String msg =
"Failed to get the response from the timeline server.";
LOG.error(msg, re);
throw new IOException(re);
}
if (resp == null ||
resp.getStatusInfo().getStatusCode() !=
ClientResponse.Status.OK.getStatusCode()) {
String msg = "Response from the timeline server is " +
((resp == null) ? "null":
"not successful," + " HTTP error code: " + resp.getStatus()
+ ", Server response:\n" + resp.getEntity(String.class));
LOG.error(msg);
throw new YarnException(msg);
}
}
@Override
public void setTimelineServiceAddress(String address) {
this.timelineServiceAddress = address;
}
private String getTimelineServiceAddress() {
return this.timelineServiceAddress;
}
@SuppressWarnings("unchecked")
@Override
public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
final String renewer) throws IOException, YarnException {
PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>
getDTAction =
new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
@Override
public Token<TimelineDelegationTokenIdentifier> run()
throws Exception {
DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
// TODO we should add retry logic here if timelineServiceAddress is
// not available immediately.
return (Token) authUrl.getDelegationToken(
constructResURI(getConfig(),
getTimelineServiceAddress(), false).toURL(),
token, renewer, doAsUser);
}
};
return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
}
@SuppressWarnings("unchecked")
@Override
public long renewDelegationToken(
final Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException {
final boolean isTokenServiceAddrEmpty =
timelineDT.getService().toString().isEmpty();
final String scheme = isTokenServiceAddrEmpty ? null
: (YarnConfiguration.useHttps(this.getConfig()) ? "https" : "http");
final InetSocketAddress address = isTokenServiceAddrEmpty ? null
: SecurityUtil.getTokenServiceAddr(timelineDT);
PrivilegedExceptionAction<Long> renewDTAction =
new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
// If the timeline DT to renew is different than cached, replace it.
// Token to set every time for retry, because when exception
// happens, DelegationTokenAuthenticatedURL will reset it to null;
if (!timelineDT.equals(token.getDelegationToken())) {
token.setDelegationToken((Token) timelineDT);
}
DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
// If the token service address is not available, fall back to use
// the configured service address.
final URI serviceURI = isTokenServiceAddrEmpty ?
constructResURI(getConfig(), getTimelineServiceAddress(), false)
: new URI(scheme, null, address.getHostName(),
address.getPort(), RESOURCE_URI_STR_V1, null, null);
return authUrl
.renewDelegationToken(serviceURI.toURL(), token, doAsUser);
}
};
return (Long) operateDelegationToken(renewDTAction);
}
@SuppressWarnings("unchecked")
@Override
public void cancelDelegationToken(
final Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException {
final boolean isTokenServiceAddrEmpty =
timelineDT.getService().toString().isEmpty();
final String scheme = isTokenServiceAddrEmpty ? null
: (YarnConfiguration.useHttps(this.getConfig()) ? "https" : "http");
final InetSocketAddress address = isTokenServiceAddrEmpty ? null
: SecurityUtil.getTokenServiceAddr(timelineDT);
PrivilegedExceptionAction<Void> cancelDTAction =
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
// If the timeline DT to cancel is different than cached, replace
// it.
// Token to set every time for retry, because when exception
// happens, DelegationTokenAuthenticatedURL will reset it to null;
if (!timelineDT.equals(token.getDelegationToken())) {
token.setDelegationToken((Token) timelineDT);
}
DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
// If the token service address is not available, fall back to use
// the configured service address.
final URI serviceURI = isTokenServiceAddrEmpty ?
constructResURI(getConfig(), getTimelineServiceAddress(), false)
: new URI(scheme, null, address.getHostName(),
address.getPort(), RESOURCE_URI_STR_V1, null, null);
authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
return null;
}
};
operateDelegationToken(cancelDTAction);
}
@Override
public String toString() {
return super.toString() + " with timeline server "
+ constructResURI(getConfig(), getTimelineServiceAddress(), false)
+ " and writer " + timelineWriter;
}
private Object operateDelegationToken(
final PrivilegedExceptionAction<?> action)
throws IOException, YarnException {
// Set up the retry operation
TimelineClientRetryOp tokenRetryOp =
createTimelineClientRetryOpForOperateDelegationToken(action);
return connectionRetry.retryOn(tokenRetryOp);
}
/**
* Poll TimelineServiceAddress for maximum of retries times if it is null.
*
* @param retries
* @return the left retry times
* @throws IOException
*/
private int pollTimelineServiceAddress(int retries) throws YarnException {
while (timelineServiceAddress == null && retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while trying to connect ATS");
}
retries--;
}
return retries;
}
private class TimelineURLConnectionFactory
implements HttpURLConnectionFactory {
@Override
public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
authUgi.checkTGTAndReloginFromKeytab();
try {
return new DelegationTokenAuthenticatedURL(
authenticator, connConfigurator).openConnection(url, token,
doAsUser);
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (AuthenticationException ae) {
throw new IOException(ae);
}
}
}
private static ConnectionConfigurator newConnConfigurator(Configuration conf) {
try {
return newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
} catch (Exception e) {
LOG.debug("Cannot load customized ssl related configuration. " +
"Fallback to system-generic settings.", e);
return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
}
}
private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
return conn;
}
};
private static ConnectionConfigurator newSslConnConfigurator(final int timeout,
Configuration conf) throws IOException, GeneralSecurityException {
final SSLFactory factory;
final SSLSocketFactory sf;
final HostnameVerifier hv;
factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
factory.init();
sf = factory.createSSLSocketFactory();
hv = factory.getHostnameVerifier();
return new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
if (conn instanceof HttpsURLConnection) {
HttpsURLConnection c = (HttpsURLConnection) conn;
c.setSSLSocketFactory(sf);
c.setHostnameVerifier(hv);
}
setTimeouts(conn, timeout);
return conn;
}
};
}
private static void setTimeouts(URLConnection connection, int socketTimeout) {
connection.setConnectTimeout(socketTimeout);
connection.setReadTimeout(socketTimeout);
}
private static URI constructResURI(
Configuration conf, String address, boolean v2) {
return URI.create(
JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1));
}
public static void main(String[] argv) throws Exception {
CommandLine cliParser = new GnuParser().parse(opts, argv);
if (cliParser.hasOption("put")) {
String path = cliParser.getOptionValue("put");
if (path != null && path.length() > 0) {
if (cliParser.hasOption(ENTITY_DATA_TYPE)) {
putTimelineDataInJSONFile(path, ENTITY_DATA_TYPE);
return;
} else if (cliParser.hasOption(DOMAIN_DATA_TYPE)) {
putTimelineDataInJSONFile(path, DOMAIN_DATA_TYPE);
return;
}
}
}
printUsage();
}
/**
* Put timeline data in a JSON file via command line.
*
* @param path
* path to the timeline data JSON file
* @param type
* the type of the timeline data in the JSON file
*/
private static void putTimelineDataInJSONFile(String path, String type) {
File jsonFile = new File(path);
if (!jsonFile.exists()) {
LOG.error("File [" + jsonFile.getAbsolutePath() + "] doesn't exist");
return;
}
ObjectMapper mapper = new ObjectMapper();
YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
TimelineEntities entities = null;
TimelineDomains domains = null;
try {
if (type.equals(ENTITY_DATA_TYPE)) {
entities = mapper.readValue(jsonFile, TimelineEntities.class);
} else if (type.equals(DOMAIN_DATA_TYPE)){
domains = mapper.readValue(jsonFile, TimelineDomains.class);
}
} catch (Exception e) {
LOG.error("Error when reading " + e.getMessage());
e.printStackTrace(System.err);
return;
}
Configuration conf = new YarnConfiguration();
TimelineClient client = TimelineClient.createTimelineClient();
client.init(conf);
client.start();
try {
if (UserGroupInformation.isSecurityEnabled()
&& conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false)) {
Token<TimelineDelegationTokenIdentifier> token =
client.getDelegationToken(
UserGroupInformation.getCurrentUser().getUserName());
UserGroupInformation.getCurrentUser().addToken(token);
}
if (type.equals(ENTITY_DATA_TYPE)) {
TimelinePutResponse response = client.putEntities(
entities.getEntities().toArray(
new TimelineEntity[entities.getEntities().size()]));
if (response.getErrors().size() == 0) {
LOG.info("Timeline entities are successfully put");
} else {
for (TimelinePutResponse.TimelinePutError error : response.getErrors()) {
LOG.error("TimelineEntity [" + error.getEntityType() + ":" +
error.getEntityId() + "] is not successfully put. Error code: " +
error.getErrorCode());
}
}
} else if (type.equals(DOMAIN_DATA_TYPE)) {
boolean hasError = false;
for (TimelineDomain domain : domains.getDomains()) {
try {
client.putDomain(domain);
} catch (Exception e) {
LOG.error("Error when putting domain " + domain.getId(), e);
hasError = true;
}
}
if (!hasError) {
LOG.info("Timeline domains are successfully put");
}
}
} catch(RuntimeException e) {
LOG.error("Error when putting the timeline data", e);
} catch (Exception e) {
LOG.error("Error when putting the timeline data", e);
} finally {
client.stop();
}
}
/**
* Helper function to print out usage
*/
private static void printUsage() {
new HelpFormatter().printHelp("TimelineClient", opts);
}
@VisibleForTesting
@Private
public UserGroupInformation getUgi() {
return authUgi;
}
@Override
public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
TimelineEntityGroupId groupId, TimelineEntity... entities)
throws IOException, YarnException {
if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
throw new YarnException(
"This API is not supported under current Timeline Service Version: "
+ timelineServiceVersion);
}
return timelineWriter.putEntities(appAttemptId, groupId, entities);
}
@Override
public void putDomain(ApplicationAttemptId appAttemptId,
TimelineDomain domain) throws IOException, YarnException {
if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
throw new YarnException(
"This API is not supported under current Timeline Service Version: "
+ timelineServiceVersion);
}
timelineWriter.putDomain(appAttemptId, domain);
}
@Private
@VisibleForTesting
public void setTimelineWriter(TimelineWriter writer) {
this.timelineWriter = writer;
}
@Private
@VisibleForTesting
public TimelineClientRetryOp
createTimelineClientRetryOpForOperateDelegationToken(
final PrivilegedExceptionAction<?> action) throws IOException {
return new TimelineClientRetryOpForOperateDelegationToken(
this.authUgi, action);
}
@Private
@VisibleForTesting
public class TimelineClientRetryOpForOperateDelegationToken
extends TimelineClientRetryOp {
private final UserGroupInformation authUgi;
private final PrivilegedExceptionAction<?> action;
public TimelineClientRetryOpForOperateDelegationToken(
UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
this.authUgi = authUgi;
this.action = action;
}
@Override
public Object run() throws IOException {
// Try pass the request, if fail, keep retrying
authUgi.checkTGTAndReloginFromKeytab();
try {
return authUgi.doAs(action);
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public boolean shouldRetryOn(Exception e) {
// retry on connection exceptions
// and SocketTimeoutException
return (e instanceof ConnectException
|| e instanceof SocketTimeoutException);
}
}
private final class EntitiesHolder extends FutureTask<Void> {
private final
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
entities;
private final boolean isSync;
EntitiesHolder(
final
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
entities,
final boolean isSync) {
super(new Callable<Void>() {
// publishEntities()
public Void call() throws Exception {
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
params.add("appid", getContextAppId().toString());
params.add("async", Boolean.toString(!isSync));
putObjects("entities", params, entities);
return null;
}
});
this.entities = entities;
this.isSync = isSync;
}
public boolean isSync() {
return isSync;
}
public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
getEntities() {
return entities;
}
}
/**
* This class is responsible for collecting the timeline entities and
* publishing them in async.
*/
private class TimelineEntityDispatcher {
/**
* Time period for which the timelineclient will wait for draining after
* stop.
*/
private static final long DRAIN_TIME_PERIOD = 2000L;
private int numberOfAsyncsToMerge;
private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
private ExecutorService executor;
TimelineEntityDispatcher(Configuration conf) {
timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
numberOfAsyncsToMerge =
conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
}
Runnable createRunnable() {
return new Runnable() {
@Override
public void run() {
try {
EntitiesHolder entitiesHolder;
while (!Thread.currentThread().isInterrupted()) {
// Merge all the async calls and make one push, but if its sync
// call push immediately
try {
entitiesHolder = timelineEntityQueue.take();
} catch (InterruptedException ie) {
LOG.info("Timeline dispatcher thread was interrupted ");
Thread.currentThread().interrupt();
return;
}
if (entitiesHolder != null) {
publishWithoutBlockingOnQueue(entitiesHolder);
}
}
} finally {
if (!timelineEntityQueue.isEmpty()) {
LOG.info("Yet to publish " + timelineEntityQueue.size()
+ " timelineEntities, draining them now. ");
}
// Try to drain the remaining entities to be published @ the max for
// 2 seconds
long timeTillweDrain =
System.currentTimeMillis() + DRAIN_TIME_PERIOD;
while (!timelineEntityQueue.isEmpty()) {
publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
if (System.currentTimeMillis() > timeTillweDrain) {
// time elapsed stop publishing further....
if (!timelineEntityQueue.isEmpty()) {
LOG.warn("Time to drain elapsed! Remaining "
+ timelineEntityQueue.size() + "timelineEntities will not"
+ " be published");
// if some entities were not drained then we need interrupt
// the threads which had put sync EntityHolders to the queue.
EntitiesHolder nextEntityInTheQueue = null;
while ((nextEntityInTheQueue =
timelineEntityQueue.poll()) != null) {
nextEntityInTheQueue.cancel(true);
}
}
break;
}
}
}
}
/**
* Publishes the given EntitiesHolder and return immediately if sync
* call, else tries to fetch the EntitiesHolder from the queue in non
* blocking fashion and collate the Entities if possible before
* publishing through REST.
*
* @param entitiesHolder
*/
private void publishWithoutBlockingOnQueue(
EntitiesHolder entitiesHolder) {
if (entitiesHolder.isSync()) {
entitiesHolder.run();
return;
}
int count = 1;
while (true) {
// loop till we find a sync put Entities or there is nothing
// to take
EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
if (nextEntityInTheQueue == null) {
// Nothing in the queue just publish and get back to the
// blocked wait state
entitiesHolder.run();
break;
} else if (nextEntityInTheQueue.isSync()) {
// flush all the prev async entities first
entitiesHolder.run();
// and then flush the sync entity
nextEntityInTheQueue.run();
break;
} else {
// append all async entities together and then flush
entitiesHolder.getEntities().addEntities(
nextEntityInTheQueue.getEntities().getEntities());
count++;
if (count == numberOfAsyncsToMerge) {
// Flush the entities if the number of the async
// putEntites merged reaches the desired limit. To avoid
// collecting multiple entities and delaying for a long
// time.
entitiesHolder.run();
break;
}
}
}
}
};
}
public void dispatchEntities(boolean sync,
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[]
entitiesTobePublished) throws YarnException {
if (executor.isShutdown()) {
throw new YarnException("Timeline client is in the process of stopping,"
+ " not accepting any more TimelineEntities");
}
// wrap all TimelineEntity into TimelineEntities object
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
entities =
new org.apache.hadoop.yarn.api.records.timelineservice.
TimelineEntities();
for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
entity : entitiesTobePublished) {
entities.addEntity(entity);
}
// created a holder and place it in queue
EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
try {
timelineEntityQueue.put(entitiesHolder);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException(
"Failed while adding entity to the queue for publishing", e);
}
if (sync) {
// In sync call we need to wait till its published and if any error then
// throw it back
try {
entitiesHolder.get();
} catch (ExecutionException e) {
throw new YarnException("Failed while publishing entity",
e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while publishing entity", e);
}
}
}
public void start() {
executor = Executors.newSingleThreadExecutor();
executor.execute(createRunnable());
}
public void stop() {
LOG.info("Stopping TimelineClient.");
executor.shutdownNow();
try {
executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
}