blob: 2eb755abfd0d9ec78e0596d482ad5477a21e12f3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.UnknownHostException;
import java.security.PrivilegedAction;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
public class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
private Configuration conf;
private ApplicationId currentAppId;
private ApplicationState currentAppState = ApplicationState.NEW;
private final ResourceMgrDelegate rm;
private MRClientProtocol realProxy = null;
private String serviceAddr = "";
private String serviceHttpAddr = "";
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm) {
this.conf = new Configuration(conf); // Cloning for modifying.
// For faster redirects from AM to HS.
this.conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 3);
this.rm = rm;
}
private MRClientProtocol getProxy(JobID jobId) throws YarnRemoteException {
return getProxy(TypeConverter.toYarn(jobId).getAppId(), false);
}
private MRClientProtocol getRefreshedProxy(JobID jobId) throws YarnRemoteException {
return getProxy(TypeConverter.toYarn(jobId).getAppId(), true);
}
private MRClientProtocol getProxy(ApplicationId appId,
boolean forceRefresh) throws YarnRemoteException {
if (!appId.equals(currentAppId) || forceRefresh || realProxy == null) {
currentAppId = appId;
refreshProxy();
}
return realProxy;
}
private void refreshProxy() throws YarnRemoteException {
//TODO RM NPEs for unknown jobs. History may still be aware.
// Possibly allow nulls through the PB tunnel, otherwise deal with an exception
// and redirect to the history server.
ApplicationReport application = rm.getApplicationReport(currentAppId);
while (ApplicationState.RUNNING.equals(application.getState())) {
try {
if (application.getHost() == null || "".equals(application.getHost())) {
LOG.debug("AM not assigned to Job. Waiting to get the AM ...");
Thread.sleep(2000);
LOG.debug("Application state is " + application.getState());
application = rm.getApplicationReport(currentAppId);
continue;
}
serviceAddr = application.getHost() + ":" + application.getRpcPort();
serviceHttpAddr = application.getTrackingUrl();
currentAppState = application.getState();
if (UserGroupInformation.isSecurityEnabled()) {
String clientTokenEncoded = application.getClientToken();
Token<ApplicationTokenIdentifier> clientToken =
new Token<ApplicationTokenIdentifier>();
clientToken.decodeFromUrlString(clientTokenEncoded);
clientToken.setService(new Text(application.getHost() + ":"
+ application.getRpcPort()));
UserGroupInformation.getCurrentUser().addToken(clientToken);
}
LOG.info("Connecting to " + serviceAddr);
instantiateAMProxy(serviceAddr);
return;
} catch (Exception e) {
//possibly
//possibly the AM has crashed
//there may be some time before AM is restarted
//keep retrying by getting the address from RM
LOG.info("Could not connect to " + serviceAddr +
". Waiting for getting the latest AM address...");
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
}
application = rm.getApplicationReport(currentAppId);
}
}
currentAppState = application.getState();
/** we just want to return if its allocating, so that we dont
* block on it. This is to be able to return job status
* on a allocating Application.
*/
if (currentAppState == ApplicationState.NEW) {
realProxy = null;
return;
}
if (currentAppState == ApplicationState.SUCCEEDED
|| currentAppState == ApplicationState.FAILED
|| currentAppState == ApplicationState.KILLED) {
serviceAddr = conf.get(JHConfig.HS_BIND_ADDRESS,
JHConfig.DEFAULT_HS_BIND_ADDRESS);
LOG.info("Application state is completed. " +
"Redirecting to job history server " + serviceAddr);
try {
serviceHttpAddr = JobHistoryUtils.getHistoryUrl(conf, currentAppId);
} catch (UnknownHostException e) {
LOG.warn("Unable to get history url", e);
serviceHttpAddr = "UNKNOWN";
}
try {
instantiateHistoryProxy(serviceAddr);
return;
} catch (IOException e) {
throw new YarnException(e);
}
}
}
private void instantiateAMProxy(final String serviceAddr) throws IOException {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
realProxy = currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
Configuration myConf = new Configuration(conf);
myConf.setClass(
YarnConfiguration.YARN_SECURITY_INFO,
SchedulerSecurityInfo.class, SecurityInfo.class);
YarnRPC rpc = YarnRPC.create(myConf);
return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), myConf);
}
});
LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
}
private void instantiateHistoryProxy(final String serviceAddr)
throws IOException {
LOG.trace("Connecting to HistoryServer at: " + serviceAddr);
Configuration myConf = new Configuration(conf);
//TODO This should ideally be using it's own class (instead of ClientRMSecurityInfo)
myConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
ClientRMSecurityInfo.class, SecurityInfo.class);
YarnRPC rpc = YarnRPC.create(myConf);
realProxy = (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), myConf);
LOG.trace("Connected to HistoryServer at: " + serviceAddr);
}
public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
try {
GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
request.setJobId(jobID);
MRClientProtocol protocol = getProxy(arg0);
if (protocol == null) {
/* no AM to connect to, fake counters */
return new org.apache.hadoop.mapreduce.Counters();
}
return TypeConverter.fromYarn(protocol.getCounters(request).getCounters());
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
throw yre;
} catch(Exception e) {
LOG.debug("Failing to contact application master", e);
try {
GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
request.setJobId(jobID);
MRClientProtocol protocol = getRefreshedProxy(arg0);
if (protocol == null) {
/* no History to connect to, fake counters */
return new org.apache.hadoop.mapreduce.Counters();
}
return TypeConverter.fromYarn(protocol.getCounters(request).getCounters());
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
throw yre;
}
}
}
public String getJobHistoryDir() throws IOException, InterruptedException {
return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
}
public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
int arg2) throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list = null;
GetTaskAttemptCompletionEventsRequest request = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
MRClientProtocol protocol;
try {
request.setJobId(jobID);
request.setFromEventId(arg1);
request.setMaxEvents(arg2);
protocol = getProxy(arg0);
/** This is hack to get around the issue of faking jobstatus while the AM
* is coming up.
*/
if (protocol == null) {
return new TaskCompletionEvent[0];
}
list = getProxy(arg0).getTaskAttemptCompletionEvents(request).getCompletionEventList();
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
throw yre;
} catch(Exception e) {
LOG.debug("Failed to contact application master ", e);
try {
request.setJobId(jobID);
request.setFromEventId(arg1);
request.setMaxEvents(arg2);
protocol = getRefreshedProxy(arg0);
if (protocol == null) {
return new TaskCompletionEvent[0];
}
list = protocol.getTaskAttemptCompletionEvents(request).getCompletionEventList();
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
throw yre;
}
}
return TypeConverter.fromYarn(
list.toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
}
public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID
arg0)
throws IOException,
InterruptedException {
List<String> list = null;
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(arg0);
GetDiagnosticsRequest request = recordFactory.newRecordInstance(GetDiagnosticsRequest.class);
MRClientProtocol protocol;
try {
request.setTaskAttemptId(attemptID);
protocol = getProxy(arg0.getJobID());
if (protocol == null) {
return new String[0];
}
list = getProxy(arg0.getJobID()).getDiagnostics(request).getDiagnosticsList();
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
throw yre;
} catch(Exception e) {
LOG.debug("Failed to contact application master ", e);
try {
protocol = getRefreshedProxy(arg0.getJobID());
if (protocol == null) {
return new String[0];
}
list = protocol.getDiagnostics(request).getDiagnosticsList();
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
throw yre;
}
}
String[] result = new String[list.size()];
int i = 0;
for (String c : list) {
result[i++] = c.toString();
}
return result;
}
private JobStatus createFakeJobReport(ApplicationState state,
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId, String jobFile) {
JobReport jobreport = recordFactory.newRecordInstance(JobReport.class);
jobreport.setCleanupProgress(0);
jobreport.setFinishTime(0);
jobreport.setJobId(jobId);
jobreport.setMapProgress(0);
/** fix this, the start time should be fixed */
jobreport.setStartTime(0);
jobreport.setReduceProgress(0);
jobreport.setSetupProgress(0);
if (currentAppState == ApplicationState.NEW) {
/* the protocol wasnt instantiated because the applicaton wasnt launched
* return a fake report.
*/
jobreport.setJobState(JobState.NEW);
} else if (currentAppState == ApplicationState.SUBMITTED) {
jobreport.setJobState(JobState.NEW);
} else if (currentAppState == ApplicationState.KILLED) {
jobreport.setJobState(JobState.KILLED);
} else if (currentAppState == ApplicationState.FAILED) {
jobreport.setJobState(JobState.FAILED);
}
return TypeConverter.fromYarn(jobreport, jobFile, serviceHttpAddr);
}
public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException,
YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
String stagingDir = conf.get("yarn.apps.stagingDir");
String jobFile = stagingDir + "/" + jobId.toString();
JobReport report = null;
MRClientProtocol protocol;
GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class);
try {
request.setJobId(jobId);
protocol = getProxy(oldJobID);
if (protocol == null) {
return createFakeJobReport(currentAppState, jobId, jobFile);
}
report = getProxy(oldJobID).getJobReport(request).getJobReport();
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
throw yre;
} catch (Exception e) {
try {
request.setJobId(jobId);
protocol = getRefreshedProxy(oldJobID);
/* this is possible if an application that was running is killed */
if (protocol == null) {
return createFakeJobReport(currentAppState, jobId, jobFile);
}
report = protocol.getJobReport(request).getJobReport();
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
throw yre;
}
}
return TypeConverter.fromYarn(report, jobFile, serviceHttpAddr);
}
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
throws YarnRemoteException, YarnRemoteException {
List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports = null;
org.apache.hadoop.mapreduce.v2.api.records.JobId nJobID = TypeConverter.toYarn(jobID);
GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class);
MRClientProtocol protocol = null;
try {
request.setJobId(nJobID);
request.setTaskType(TypeConverter.toYarn(taskType));
protocol = getProxy(jobID);
if (protocol == null) {
return new org.apache.hadoop.mapreduce.TaskReport[0];
}
taskReports = getProxy(jobID).getTaskReports(request).getTaskReportList();
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
throw yre;
} catch(Exception e) {
LOG.debug("Failed to contact application master ", e);
try {
request.setJobId(nJobID);
request.setTaskType(TypeConverter.toYarn(taskType));
protocol = getRefreshedProxy(jobID);
if (protocol == null) {
return new org.apache.hadoop.mapreduce.TaskReport[0];
}
taskReports = protocol.getTaskReports(request).getTaskReportList();
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
throw yre;
}
}
return TypeConverter.fromYarn
(taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
}
public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
= TypeConverter.toYarn(taskAttemptID);
KillTaskAttemptRequest killRequest = recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
FailTaskAttemptRequest failRequest = recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
MRClientProtocol protocol = getProxy(taskAttemptID.getJobID());
if (protocol == null) {
return false;
}
try {
if (fail) {
failRequest.setTaskAttemptId(attemptID);
getProxy(taskAttemptID.getJobID()).failTaskAttempt(failRequest);
} else {
killRequest.setTaskAttemptId(attemptID);
getProxy(taskAttemptID.getJobID()).killTaskAttempt(killRequest);
}
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
throw yre;
} catch(Exception e) {
LOG.debug("Failed to contact application master ", e);
MRClientProtocol proxy = getRefreshedProxy(taskAttemptID.getJobID());
if (proxy == null) {
return false;
}
try {
if (fail) {
failRequest.setTaskAttemptId(attemptID);
proxy.failTaskAttempt(failRequest);
} else {
killRequest.setTaskAttemptId(attemptID);
proxy.killTaskAttempt(killRequest);
}
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
throw yre;
}
}
return true;
}
public boolean killJob(JobID oldJobID)
throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
= TypeConverter.toYarn(oldJobID);
KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);
MRClientProtocol protocol = getProxy(oldJobID);
if (protocol == null) {
return false;
}
try {
killRequest.setJobId(jobId);
protocol.killJob(killRequest);
return true;
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
throw yre;
} catch(Exception e) {
// Not really requied - if this is always the history context.
LOG.debug("Failed to contact application master ", e);
MRClientProtocol proxy = getRefreshedProxy(oldJobID);
if (proxy == null) {
return false;
}
try {
killRequest.setJobId(jobId);
protocol.killJob(killRequest);
return true;
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
throw yre;
}
}
}
}