blob: fb71a8460a31352ad253fe16bd3e1a766f27824e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.core.util.MultivaluedMapImpl;
/**
* Implementation of timeline v2 client interface.
*
*/
public class TimelineV2ClientImpl extends TimelineV2Client {
private static final Log LOG = LogFactory.getLog(TimelineV2ClientImpl.class);
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
private TimelineEntityDispatcher entityDispatcher;
private TimelineEntityDispatcher subAppEntityDispatcher;
private volatile String timelineServiceAddress;
@VisibleForTesting
volatile Token currentTimelineToken = null;
// Retry parameters for identifying new timeline service
// TODO consider to merge with connection retry
private int maxServiceRetries;
private long serviceRetryInterval;
private TimelineConnector connector;
private ApplicationId contextAppId;
private UserGroupInformation authUgi;
public TimelineV2ClientImpl(ApplicationId appId) {
super(TimelineV2ClientImpl.class.getName());
this.contextAppId = appId;
}
public ApplicationId getContextAppId() {
return contextAppId;
}
protected void serviceInit(Configuration conf) throws Exception {
if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
throw new IOException("Timeline V2 client is not properly configured. "
+ "Either timeline service is not enabled or version is not set to"
+ " 2");
}
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser();
String doAsUser = null;
if (realUgi != null) {
authUgi = realUgi;
doAsUser = ugi.getShortUserName();
} else {
authUgi = ugi;
doAsUser = null;
}
// TODO need to add/cleanup filter retry later for ATSV2. similar to V1
DelegationTokenAuthenticatedURL.Token token =
new DelegationTokenAuthenticatedURL.Token();
connector = new TimelineConnector(false, authUgi, doAsUser, token);
addIfService(connector);
// new version need to auto discovery (with retry till ATS v2 address is
// got).
maxServiceRetries =
conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
serviceRetryInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
entityDispatcher = new TimelineEntityDispatcher(conf);
subAppEntityDispatcher = new TimelineEntityDispatcher(conf);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
entityDispatcher.start();
subAppEntityDispatcher.start();
}
@Override
protected void serviceStop() throws Exception {
entityDispatcher.stop();
subAppEntityDispatcher.stop();
super.serviceStop();
}
@Override
public void putEntities(TimelineEntity... entities)
throws IOException, YarnException {
entityDispatcher.dispatchEntities(true, entities, false);
}
@Override
public void putEntitiesAsync(TimelineEntity... entities)
throws IOException, YarnException {
entityDispatcher.dispatchEntities(false, entities, false);
}
@Override
public void putSubAppEntities(TimelineEntity... entities)
throws IOException, YarnException {
subAppEntityDispatcher.dispatchEntities(true, entities, true);
}
@Override
public void putSubAppEntitiesAsync(TimelineEntity... entities)
throws IOException, YarnException {
subAppEntityDispatcher.dispatchEntities(false, entities, true);
}
@Override
public void setTimelineCollectorInfo(CollectorInfo collectorInfo) {
if (collectorInfo == null) {
LOG.warn("Not setting collector info as it is null.");
return;
}
// First update the token so that it is available when collector address is
// used.
if (collectorInfo.getCollectorToken() != null) {
// Use collector address to update token service if its not available.
setTimelineDelegationToken(
collectorInfo.getCollectorToken(), collectorInfo.getCollectorAddr());
}
// Update timeline service address.
if (collectorInfo.getCollectorAddr() != null &&
!collectorInfo.getCollectorAddr().isEmpty() &&
!collectorInfo.getCollectorAddr().equals(timelineServiceAddress)) {
this.timelineServiceAddress = collectorInfo.getCollectorAddr();
LOG.info("Updated timeline service address to " + timelineServiceAddress);
}
}
private void setTimelineDelegationToken(Token delegationToken,
String collectorAddr) {
// Checks below are to ensure that an invalid token is not updated in UGI.
// This is required because timeline token is set via a public API.
if (!delegationToken.getKind().equals(
TimelineDelegationTokenIdentifier.KIND_NAME.toString())) {
LOG.warn("Timeline token to be updated should be of kind " +
TimelineDelegationTokenIdentifier.KIND_NAME);
return;
}
if (collectorAddr == null || collectorAddr.isEmpty()) {
collectorAddr = timelineServiceAddress;
}
// Token need not be updated if either address or token service does not
// exist.
String service = delegationToken.getService();
if ((service == null || service.isEmpty()) &&
(collectorAddr == null || collectorAddr.isEmpty())) {
LOG.warn("Timeline token does not have service and timeline service " +
"address is not yet set. Not updating the token");
return;
}
// No need to update a duplicate token.
if (currentTimelineToken != null &&
currentTimelineToken.equals(delegationToken)) {
return;
}
currentTimelineToken = delegationToken;
// Convert the token, sanitize the token service and add it to UGI.
org.apache.hadoop.security.token.
Token<TimelineDelegationTokenIdentifier> timelineToken =
new org.apache.hadoop.security.token.
Token<TimelineDelegationTokenIdentifier>(
delegationToken.getIdentifier().array(),
delegationToken.getPassword().array(),
new Text(delegationToken.getKind()),
service == null ? new Text() : new Text(service));
// Prefer timeline service address over service coming in the token for
// updating the token service.
InetSocketAddress serviceAddr =
(collectorAddr != null && !collectorAddr.isEmpty()) ?
NetUtils.createSocketAddr(collectorAddr) :
SecurityUtil.getTokenServiceAddr(timelineToken);
SecurityUtil.setTokenService(timelineToken, serviceAddr);
authUgi.addToken(timelineToken);
LOG.info("Updated timeline delegation token " + timelineToken);
}
@Private
protected void putObjects(String path, MultivaluedMap<String, String> params,
Object obj) throws IOException, YarnException {
int retries = verifyRestEndPointAvailable();
// timelineServiceAddress could be stale, add retry logic here.
boolean needRetry = true;
while (needRetry) {
try {
URI uri = TimelineConnector.constructResURI(getConfig(),
timelineServiceAddress, RESOURCE_URI_STR_V2);
putObjects(uri, path, params, obj);
needRetry = false;
} catch (IOException e) {
// handle exception for timelineServiceAddress being updated.
checkRetryWithSleep(retries, e);
retries--;
}
}
}
/**
* Check if reaching to maximum of retries.
*
* @param retries
* @param e
*/
private void checkRetryWithSleep(int retries, IOException e)
throws YarnException, IOException {
if (retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while retrying to connect to ATS");
}
} else {
StringBuilder msg =
new StringBuilder("TimelineClient has reached to max retry times : ");
msg.append(this.maxServiceRetries);
msg.append(" for service address: ");
msg.append(timelineServiceAddress);
LOG.error(msg.toString());
throw new IOException(msg.toString(), e);
}
}
private ClientResponse doPutObjects(URI base, String path,
MultivaluedMap<String, String> params, Object obj) {
return connector.getClient().resource(base).path(path).queryParams(params)
.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON)
.put(ClientResponse.class, obj);
}
protected void putObjects(final URI base, final String path,
final MultivaluedMap<String, String> params, final Object obj)
throws IOException, YarnException {
ClientResponse resp = null;
try {
resp = authUgi.doAs(new PrivilegedExceptionAction<ClientResponse>() {
@Override
public ClientResponse run() throws Exception {
return doPutObjects(base, path, params, obj);
}
});
} catch (UndeclaredThrowableException ue) {
Throwable cause = ue.getCause();
if (cause instanceof IOException) {
throw (IOException)cause;
} else {
throw new IOException(cause);
}
} catch (InterruptedException ie) {
throw (IOException) new InterruptedIOException().initCause(ie);
}
if (resp == null ||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
String msg =
"Response from the timeline server is " + ((resp == null) ? "null"
: "not successful," + " HTTP error code: " + resp.getStatus()
+ ", Server response:\n" + resp.getEntity(String.class));
LOG.error(msg);
throw new YarnException(msg);
}
}
private int verifyRestEndPointAvailable() throws YarnException {
// timelineServiceAddress could haven't be initialized yet
// or stale (only for new timeline service)
int retries = pollTimelineServiceAddress(this.maxServiceRetries);
if (timelineServiceAddress == null) {
String errMessage = "TimelineClient has reached to max retry times : "
+ this.maxServiceRetries
+ ", but failed to fetch timeline service address. Please verify"
+ " Timeline Auxiliary Service is configured in all the NMs";
LOG.error(errMessage);
throw new YarnException(errMessage);
}
return retries;
}
/**
* Poll TimelineServiceAddress for maximum of retries times if it is null.
*
* @param retries
* @return the left retry times
* @throws IOException
*/
private int pollTimelineServiceAddress(int retries) throws YarnException {
while (timelineServiceAddress == null && retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while trying to connect ATS");
}
retries--;
}
return retries;
}
private final class EntitiesHolder extends FutureTask<Void> {
private final TimelineEntities entities;
private final boolean isSync;
EntitiesHolder(final TimelineEntities entities, final boolean isSync,
final boolean subappwrite) {
super(new Callable<Void>() {
// publishEntities()
public Void call() throws Exception {
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
params.add("appid", getContextAppId().toString());
params.add("async", Boolean.toString(!isSync));
params.add("subappwrite", Boolean.toString(subappwrite));
putObjects("entities", params, entities);
return null;
}
});
this.entities = entities;
this.isSync = isSync;
}
public boolean isSync() {
return isSync;
}
public TimelineEntities getEntities() {
return entities;
}
}
/**
* This class is responsible for collecting the timeline entities and
* publishing them in async.
*/
private class TimelineEntityDispatcher {
/**
* Time period for which the timelineclient will wait for draining after
* stop.
*/
private final long drainTimeoutPeriod;
private int numberOfAsyncsToMerge;
private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
private ExecutorService executor;
TimelineEntityDispatcher(Configuration conf) {
timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
numberOfAsyncsToMerge =
conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
drainTimeoutPeriod = conf.getLong(
YarnConfiguration.TIMELINE_V2_CLIENT_DRAIN_TIME_MILLIS,
YarnConfiguration.DEFAULT_TIMELINE_V2_CLIENT_DRAIN_TIME_MILLIS);
}
Runnable createRunnable() {
return new Runnable() {
@Override
public void run() {
try {
EntitiesHolder entitiesHolder;
while (!Thread.currentThread().isInterrupted()) {
// Merge all the async calls and make one push, but if its sync
// call push immediately
try {
entitiesHolder = timelineEntityQueue.take();
} catch (InterruptedException ie) {
LOG.info("Timeline dispatcher thread was interrupted ");
Thread.currentThread().interrupt();
return;
}
if (entitiesHolder != null) {
publishWithoutBlockingOnQueue(entitiesHolder);
}
}
} finally {
if (!timelineEntityQueue.isEmpty()) {
LOG.info("Yet to publish " + timelineEntityQueue.size()
+ " timelineEntities, draining them now. ");
}
// Try to drain the remaining entities to be published @ the max for
// 2 seconds
long timeTillweDrain =
System.currentTimeMillis() + drainTimeoutPeriod;
while (!timelineEntityQueue.isEmpty()) {
publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
if (System.currentTimeMillis() > timeTillweDrain) {
// time elapsed stop publishing further....
if (!timelineEntityQueue.isEmpty()) {
LOG.warn("Time to drain elapsed! Remaining "
+ timelineEntityQueue.size() + "timelineEntities will not"
+ " be published");
// if some entities were not drained then we need interrupt
// the threads which had put sync EntityHolders to the queue.
EntitiesHolder nextEntityInTheQueue = null;
while ((nextEntityInTheQueue =
timelineEntityQueue.poll()) != null) {
nextEntityInTheQueue.cancel(true);
}
}
break;
}
}
}
}
/**
* Publishes the given EntitiesHolder and return immediately if sync
* call, else tries to fetch the EntitiesHolder from the queue in non
* blocking fashion and collate the Entities if possible before
* publishing through REST.
*
* @param entitiesHolder
*/
private void publishWithoutBlockingOnQueue(
EntitiesHolder entitiesHolder) {
if (entitiesHolder.isSync()) {
entitiesHolder.run();
return;
}
int count = 1;
while (true) {
// loop till we find a sync put Entities or there is nothing
// to take
EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
if (nextEntityInTheQueue == null) {
// Nothing in the queue just publish and get back to the
// blocked wait state
entitiesHolder.run();
break;
} else if (nextEntityInTheQueue.isSync()) {
// flush all the prev async entities first
entitiesHolder.run();
// and then flush the sync entity
nextEntityInTheQueue.run();
break;
} else {
// append all async entities together and then flush
entitiesHolder.getEntities().addEntities(
nextEntityInTheQueue.getEntities().getEntities());
count++;
if (count == numberOfAsyncsToMerge) {
// Flush the entities if the number of the async
// putEntites merged reaches the desired limit. To avoid
// collecting multiple entities and delaying for a long
// time.
entitiesHolder.run();
break;
}
}
}
}
};
}
public void dispatchEntities(boolean sync,
TimelineEntity[] entitiesTobePublished, boolean subappwrite)
throws YarnException {
if (executor.isShutdown()) {
throw new YarnException("Timeline client is in the process of stopping,"
+ " not accepting any more TimelineEntities");
}
// wrap all TimelineEntity into TimelineEntities object
TimelineEntities entities = new TimelineEntities();
for (TimelineEntity entity : entitiesTobePublished) {
entities.addEntity(entity);
}
// created a holder and place it in queue
EntitiesHolder entitiesHolder =
new EntitiesHolder(entities, sync, subappwrite);
try {
timelineEntityQueue.put(entitiesHolder);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException(
"Failed while adding entity to the queue for publishing", e);
}
if (sync) {
// In sync call we need to wait till its published and if any error then
// throw it back
try {
entitiesHolder.get();
} catch (ExecutionException e) {
throw new YarnException("Failed while publishing entity",
e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while publishing entity", e);
}
}
}
public void start() {
executor = Executors.newSingleThreadExecutor();
executor.execute(createRunnable());
}
public void stop() {
LOG.info("Stopping TimelineClient.");
executor.shutdownNow();
try {
executor.awaitTermination(drainTimeoutPeriod, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
}