blob: 5f4e7c5c1295f18079377351503fcf3db4fb64d8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.Client;
@Private
@Evolving
public class TimelineClientImpl extends TimelineClient {
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
private static Options opts;
private static final String ENTITY_DATA_TYPE = "entity";
private static final String DOMAIN_DATA_TYPE = "domain";
static {
opts = new Options();
opts.addOption("put", true, "Put the timeline entities/domain in a JSON file");
opts.getOption("put").setArgName("Path to the JSON file");
opts.addOption(ENTITY_DATA_TYPE, false, "Specify the JSON file contains the entities");
opts.addOption(DOMAIN_DATA_TYPE, false, "Specify the JSON file contains the domain");
opts.addOption("help", false, "Print usage");
}
@VisibleForTesting
protected DelegationTokenAuthenticatedURL.Token token;
@VisibleForTesting
protected UserGroupInformation authUgi;
@VisibleForTesting
protected String doAsUser;
private boolean timelineServiceV15Enabled;
private TimelineWriter timelineWriter;
private String timelineServiceAddress;
@Private
@VisibleForTesting
TimelineConnector connector;
public TimelineClientImpl() {
super(TimelineClientImpl.class.getName());
}
protected void serviceInit(Configuration conf) throws Exception {
if (!YarnConfiguration.timelineServiceV1Enabled(conf)) {
throw new IOException("Timeline V1 client is not properly configured. "
+ "Either timeline service is not enabled or version is not set to"
+ " 1.x");
}
timelineServiceV15Enabled =
YarnConfiguration.timelineServiceV15Enabled(conf);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser();
if (realUgi != null) {
authUgi = realUgi;
doAsUser = ugi.getShortUserName();
} else {
authUgi = ugi;
doAsUser = null;
}
token = new DelegationTokenAuthenticatedURL.Token();
connector = createTimelineConnector();
if (YarnConfiguration.useHttps(conf)) {
timelineServiceAddress =
conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
} else {
timelineServiceAddress =
conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
}
LOG.info("Timeline service address: " + getTimelineServiceAddress());
super.serviceInit(conf);
}
@VisibleForTesting
protected TimelineConnector createTimelineConnector() {
TimelineConnector newConnector =
new TimelineConnector(true, authUgi, doAsUser, token);
addIfService(newConnector);
return newConnector;
}
@Override
protected void serviceStart() throws Exception {
timelineWriter = createTimelineWriter(getConfig(), authUgi,
connector.getClient(), TimelineConnector.constructResURI(getConfig(),
timelineServiceAddress, RESOURCE_URI_STR_V1));
}
protected TimelineWriter createTimelineWriter(Configuration conf,
UserGroupInformation ugi, Client webClient, URI uri)
throws IOException {
if (timelineServiceV15Enabled) {
return new FileSystemTimelineWriter(
conf, ugi, webClient, uri);
} else {
return new DirectTimelineWriter(ugi, webClient, uri);
}
}
@Override
protected void serviceStop() throws Exception {
if (this.timelineWriter != null) {
this.timelineWriter.close();
}
super.serviceStop();
}
@Override
public void flush() throws IOException {
if (timelineWriter != null) {
timelineWriter.flush();
}
}
@Override
public TimelinePutResponse putEntities(TimelineEntity... entities)
throws IOException, YarnException {
return timelineWriter.putEntities(entities);
}
@Override
public void putDomain(TimelineDomain domain) throws IOException,
YarnException {
timelineWriter.putDomain(domain);
}
private String getTimelineServiceAddress() {
return this.timelineServiceAddress;
}
@SuppressWarnings("unchecked")
@Override
public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
final String renewer) throws IOException, YarnException {
PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>
getDTAction =
new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
@Override
public Token<TimelineDelegationTokenIdentifier> run()
throws Exception {
DelegationTokenAuthenticatedURL authUrl =
connector.getDelegationTokenAuthenticatedURL();
// TODO we should add retry logic here if timelineServiceAddress is
// not available immediately.
return (Token) authUrl.getDelegationToken(
TimelineConnector.constructResURI(getConfig(),
getTimelineServiceAddress(), RESOURCE_URI_STR_V1).toURL(),
token, renewer, doAsUser);
}
};
return (Token<TimelineDelegationTokenIdentifier>) connector
.operateDelegationToken(getDTAction);
}
@SuppressWarnings("unchecked")
@Override
public long renewDelegationToken(
final Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException {
final boolean isTokenServiceAddrEmpty =
timelineDT.getService().toString().isEmpty();
final String scheme = isTokenServiceAddrEmpty ? null
: (YarnConfiguration.useHttps(this.getConfig()) ? "https" : "http");
final InetSocketAddress address = isTokenServiceAddrEmpty ? null
: SecurityUtil.getTokenServiceAddr(timelineDT);
PrivilegedExceptionAction<Long> renewDTAction =
new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
// If the timeline DT to renew is different than cached, replace it.
// Token to set every time for retry, because when exception
// happens, DelegationTokenAuthenticatedURL will reset it to null;
if (!timelineDT.equals(token.getDelegationToken())) {
token.setDelegationToken((Token) timelineDT);
}
DelegationTokenAuthenticatedURL authUrl =
connector.getDelegationTokenAuthenticatedURL();
// If the token service address is not available, fall back to use
// the configured service address.
final URI serviceURI = isTokenServiceAddrEmpty
? TimelineConnector.constructResURI(getConfig(),
getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
: new URI(scheme, null, address.getHostName(),
address.getPort(), RESOURCE_URI_STR_V1, null, null);
return authUrl
.renewDelegationToken(serviceURI.toURL(), token, doAsUser);
}
};
return (Long) connector.operateDelegationToken(renewDTAction);
}
@SuppressWarnings("unchecked")
@Override
public void cancelDelegationToken(
final Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException {
final boolean isTokenServiceAddrEmpty =
timelineDT.getService().toString().isEmpty();
final String scheme = isTokenServiceAddrEmpty ? null
: (YarnConfiguration.useHttps(this.getConfig()) ? "https" : "http");
final InetSocketAddress address = isTokenServiceAddrEmpty ? null
: SecurityUtil.getTokenServiceAddr(timelineDT);
PrivilegedExceptionAction<Void> cancelDTAction =
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
// If the timeline DT to cancel is different than cached, replace
// it.
// Token to set every time for retry, because when exception
// happens, DelegationTokenAuthenticatedURL will reset it to null;
if (!timelineDT.equals(token.getDelegationToken())) {
token.setDelegationToken((Token) timelineDT);
}
DelegationTokenAuthenticatedURL authUrl =
connector.getDelegationTokenAuthenticatedURL();
// If the token service address is not available, fall back to use
// the configured service address.
final URI serviceURI = isTokenServiceAddrEmpty
? TimelineConnector.constructResURI(getConfig(),
getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
: new URI(scheme, null, address.getHostName(),
address.getPort(), RESOURCE_URI_STR_V1, null, null);
authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
return null;
}
};
connector.operateDelegationToken(cancelDTAction);
}
@Override
public String toString() {
return super.toString() + " with timeline server "
+ TimelineConnector.constructResURI(getConfig(),
getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
+ " and writer " + timelineWriter;
}
public static void main(String[] argv) throws Exception {
CommandLine cliParser = new GnuParser().parse(opts, argv);
if (cliParser.hasOption("put")) {
String path = cliParser.getOptionValue("put");
if (path != null && path.length() > 0) {
if (cliParser.hasOption(ENTITY_DATA_TYPE)) {
putTimelineDataInJSONFile(path, ENTITY_DATA_TYPE);
return;
} else if (cliParser.hasOption(DOMAIN_DATA_TYPE)) {
putTimelineDataInJSONFile(path, DOMAIN_DATA_TYPE);
return;
}
}
}
printUsage();
}
/**
* Put timeline data in a JSON file via command line.
*
* @param path
* path to the timeline data JSON file
* @param type
* the type of the timeline data in the JSON file
*/
private static void putTimelineDataInJSONFile(String path, String type) {
File jsonFile = new File(path);
if (!jsonFile.exists()) {
LOG.error("File [" + jsonFile.getAbsolutePath() + "] doesn't exist");
return;
}
YarnJacksonJaxbJsonProvider.configObjectMapper(MAPPER);
TimelineEntities entities = null;
TimelineDomains domains = null;
try {
if (type.equals(ENTITY_DATA_TYPE)) {
entities = MAPPER.readValue(jsonFile, TimelineEntities.class);
} else if (type.equals(DOMAIN_DATA_TYPE)){
domains = MAPPER.readValue(jsonFile, TimelineDomains.class);
}
} catch (Exception e) {
LOG.error("Error when reading " + e.getMessage());
e.printStackTrace(System.err);
return;
}
Configuration conf = new YarnConfiguration();
TimelineClient client = TimelineClient.createTimelineClient();
client.init(conf);
client.start();
try {
if (UserGroupInformation.isSecurityEnabled()
&& conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false)) {
Token<TimelineDelegationTokenIdentifier> token =
client.getDelegationToken(
UserGroupInformation.getCurrentUser().getUserName());
UserGroupInformation.getCurrentUser().addToken(token);
}
if (type.equals(ENTITY_DATA_TYPE)) {
TimelinePutResponse response = client.putEntities(
entities.getEntities().toArray(
new TimelineEntity[entities.getEntities().size()]));
if (response.getErrors().size() == 0) {
LOG.info("Timeline entities are successfully put");
} else {
for (TimelinePutResponse.TimelinePutError error : response.getErrors()) {
LOG.error("TimelineEntity [" + error.getEntityType() + ":" +
error.getEntityId() + "] is not successfully put. Error code: " +
error.getErrorCode());
}
}
} else if (type.equals(DOMAIN_DATA_TYPE) && domains != null) {
boolean hasError = false;
for (TimelineDomain domain : domains.getDomains()) {
try {
client.putDomain(domain);
} catch (Exception e) {
LOG.error("Error when putting domain " + domain.getId(), e);
hasError = true;
}
}
if (!hasError) {
LOG.info("Timeline domains are successfully put");
}
}
} catch(RuntimeException e) {
LOG.error("Error when putting the timeline data", e);
} catch (Exception e) {
LOG.error("Error when putting the timeline data", e);
} finally {
client.stop();
}
}
/**
* Helper function to print out usage
*/
private static void printUsage() {
new HelpFormatter().printHelp("TimelineClient", opts);
}
@VisibleForTesting
@Private
public UserGroupInformation getUgi() {
return authUgi;
}
@Override
public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
TimelineEntityGroupId groupId, TimelineEntity... entities)
throws IOException, YarnException {
if (!timelineServiceV15Enabled) {
throw new YarnException(
"This API is not supported under current Timeline Service Version:");
}
return timelineWriter.putEntities(appAttemptId, groupId, entities);
}
@Override
public void putDomain(ApplicationAttemptId appAttemptId,
TimelineDomain domain) throws IOException, YarnException {
if (!timelineServiceV15Enabled) {
throw new YarnException(
"This API is not supported under current Timeline Service Version:");
}
timelineWriter.putDomain(appAttemptId, domain);
}
@Private
@VisibleForTesting
public void setTimelineWriter(TimelineWriter writer) {
this.timelineWriter = writer;
}
}