blob: 0f715a4e949fa4428d24200c43216af7428259b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <sstream>
#include <list>
#include "rpc/RpcAuth.h"
#include "common/XmlConfig.h"
#include "common/SessionConfig.h"
#include "ApplicationClient.h"
#include "ApplicationMaster.h"
#include "ContainerManagement.h"
#include "LibYarnClient.h"
#include "LibYarnConstants.h"
#include "common/Logger.h"
using namespace Yarn::Internal;
namespace libyarn {
LibYarnClient::LibYarnClient(string &user, string &rmHost, string &rmPort,
string &schedHost, string &schedPort, string &amHost, int32_t amPort,
string &am_tracking_url, int heartbeatInterval) :
amUser(user), schedHost(schedHost), schedPort(schedPort), amHost(
amHost), amPort(amPort), am_tracking_url(am_tracking_url), heartbeatInterval(
heartbeatInterval), response_id(0), clientJobId(""), keepRun(
false), needHeartbeatAlive(false) {
pthread_mutex_init(&(heartbeatLock), NULL);
amrmClient = NULL;
appClient = (void*) new ApplicationClient(user, rmHost, rmPort);
nmClient = (void*) new ContainerManagement();
}
#ifdef MOCKTEST
LibYarnClient::LibYarnClient(string &user,string &rmHost, string &rmPort, string &schedHost,
string &schedPort, string &amHost, int32_t amPort,
string &am_tracking_url, int heartbeatInterval,Mock::TestLibYarnClientStub *stub):
amUser(user),schedHost(schedHost), schedPort(schedPort), amHost(amHost),
amPort(amPort), am_tracking_url(am_tracking_url),
heartbeatInterval(heartbeatInterval),clientJobId(""),
keepRun(false), needHeartbeatAlive(false) {
pthread_mutex_init( &(heartbeatLock), NULL );
libyarnStub = stub;
appClient = (void*) libyarnStub->getApplicationClient();
amrmClient = (void*) libyarnStub->getApplicationMaster();
nmClient = (void*) libyarnStub->getContainerManagement();
}
#endif
LibYarnClient::~LibYarnClient() {
#ifndef MOCKTEST
if (keepRun) {
// No need to run heart-beat thread now.
keepRun = false;
void *thrc = NULL;
int rc = pthread_join(heartbeatThread, &thrc);
if (rc != 0) {
LOG(DEBUG1, "LibYarnClient::~LibYarnClient, fail to join heart-beat thread. "
"error code %d", rc);
} else {
LOG(DEBUG1, "LibYarnClient::~LibYarnClient, join heart-beat thread successfully.");
}
}
#endif
if (amrmClient != NULL) {
delete (ApplicationMaster*) amrmClient;
}
delete (ApplicationClient*) appClient;
delete (ContainerManagement*) nmClient;
}
string LibYarnClient::getErrorMessage() {
return errorMessage;
}
void LibYarnClient::setErrorMessage(string errorMsg) {
errorMessage = errorMsg;
}
bool LibYarnClient::isJobHealthy() {
return keepRun;
}
list<ResourceRequest>& LibYarnClient::getAskRequests() {
return askRequests;
}
void LibYarnClient::clearAskRequests() {
LOG(DEBUG1, "LibYarnClient::clear ask requests.");
askRequests.clear();
}
void* heartbeatFunc(void* args) {
int failcounter = 0;
int retry = 2;
LibYarnClient *client = (LibYarnClient*) args;
while (client->keepRun) {
try {
client->dummyAllocate();
failcounter = 0;
} catch (const ApplicationMasterNotRegisteredException &e) {
/*
* In case catch this exception,
* heartbeat thread should exits, and re-register AM.
*/
LOG(WARNING, "LibYarnClient::heartbeat dummy allocation "
"catch ApplicationMasterNotRegisteredException. %s",
e.msg());
client->keepRun = false;
break;
} catch (const YarnException &e) {
LOG(WARNING, "LibYarnClient::heartbeat dummy allocation "
"is not correctly executed with exception raised. %s",
e.msg());
failcounter++;
if (failcounter > retry) {
/* In case retry too many times with errors/exceptions, this
* thread will return. LibYarn has to re-register application
* and start the heartbeat thread again.
*/
LOG(WARNING, "LibYarnClient::heartbeatFunc, there are too many "
"failures raised. This heart-beat thread exits now.");
client->keepRun = false;
break;
}
}
usleep((client->heartbeatInterval) * 1000);
}
LOG(INFO, "LibYarnClient::heartbeatFunc, goes into exit phase.");
return (void *) 0;
}
int LibYarnClient::createJob(string &jobName, string &queue, string &jobId) {
try {
//Only one jobId for the client right now.
if (clientJobId != ""){
throw std::invalid_argument( "Exist an application for the client");
}
ApplicationClient *applicationClient = (ApplicationClient*) appClient;
//1. getNewApplication
ApplicationId appId = applicationClient->getNewApplication();
LOG(DEBUG1, "LibYarnClient::createJob, getNewApplication finished, appId:[clusterTimeStamp:%lld,id:%d]",
appId.getClusterTimestamp(), appId.getId());
//2. submitApplication
ApplicationSubmissionContext appSubmitCtx;
appSubmitCtx.setApplicationId(appId);
appSubmitCtx.setApplicationName(jobName);
appSubmitCtx.setQueue(queue);
Priority priority(0);
appSubmitCtx.setPriority(priority);
ContainerLaunchContext amContainerSpec;
appSubmitCtx.setAMContainerSpec(amContainerSpec);
appSubmitCtx.setUnmanagedAM(true);
appSubmitCtx.setMaxAppAttempts(1);
applicationClient->submitApplication(appSubmitCtx);
LOG(DEBUG1, "LibYarnClient::createJob, submitApplication finished");
//3. wait util AM is ACCEPTED and return the AMRMToken
ApplicationReport report;
int retry = 10;
while (retry > 0) {
report = applicationClient->getApplicationReport(appId);
LOG(DEBUG1, "LibYarnClient::createJob, appId[cluster_timestamp:%lld,id:%d], appState:%d",
appId.getClusterTimestamp(), appId.getId(), report.getYarnApplicationState());
if ((report.getAMRMToken().getPassword() != "") &&
report.getYarnApplicationState() == YarnApplicationState::ACCEPTED) {
break;
} else {
retry--;
usleep(1000000L);
}
if (retry == 0)
THROW(AccessControlException, "can not register application to YARN");
}
clientAppId = appId;
clientAppAttempId = report.getCurrentAppAttemptId();
//4.1 new ApplicationMaster
Token token = report.getAMRMToken();
UserInfo user;
if (applicationClient->getMethod() == SIMPLE) {
user = UserInfo::LocalUser();
} else if (applicationClient->getMethod() == KERBEROS) {
user.setEffectiveUser(applicationClient->getPrincipal());
user.setRealUser(applicationClient->getUser());
} else {
LOG(WARNING, "LibYarnClient::createJob: unsupported RPC method:%d. ",
applicationClient->getMethod());
}
Yarn::Token AMToken;
AMToken.setIdentifier(token.getIdentifier());
AMToken.setKind(token.getKind());
AMToken.setPassword(token.getPassword());
AMToken.setService(token.getService());
user.addToken(AMToken);
#ifndef MOCKTEST
amrmClient = (void*) new ApplicationMaster(this->schedHost, this->schedPort,
user, token.getService());
#endif
//4.2 register to RM scheduler as AM
((ApplicationMaster*) amrmClient)->registerApplicationMaster(amHost, amPort, am_tracking_url);
LOG(DEBUG1, "LibYarnClient::createJob, registerApplicationMaster finished");
#ifndef MOCKTEST
keepRun = true;
//5. setup the heartbeat thread to allocate, release, heartbeat
int rc = pthread_create(&heartbeatThread, NULL, heartbeatFunc, this);
if (rc != 0) {
keepRun = false;
LOG(WARNING, "LibYarnClient::createJob, fail to create heart-beat thread. "
"error code %d", rc);
throw std::runtime_error("Fail to create heart-beat thread.");
}
needHeartbeatAlive = true;
#endif
LOG(DEBUG1, "LibYarnClient::createJob, after AM register to RM, a heartbeat thread has been started");
//6. return jobId
std::stringstream ss;
ss << "job_" << appId.getClusterTimestamp() << "_" << appId.getId();
jobId = ss.str();
LOG(INFO, "LibYarnClient::createJob, appId[cluster_timestamp:%lld,id:%d]",
clientAppId.getClusterTimestamp(), clientAppId.getId());
clientJobId = jobId;
return FR_SUCCEEDED;
} catch (const YarnNetworkConnectException &e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::createJob, catch network connection exception:" << e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (const std::exception &e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::createJob, catch exception:" << e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::createJob, catch unexpected exception.";
setErrorMessage(errorMsg.str());
return FR_FAILED;
}
}
int LibYarnClient::forceKillJob(string &jobId) {
#ifndef MOCKTEST
if (keepRun) {
keepRun = false;
void *thrc = NULL;
int rc = pthread_join(heartbeatThread, &thrc);
if (rc != 0) {
LOG(WARNING, "LibYarnClient::forceKillJob, fail to join heart-beat thread. "
"error code %d", rc);
return FR_FAILED;
} else {
LOG(INFO, "LibYarnClient::forceKillJob, join heart-beat thread successfully.");
}
}
#endif
try{
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong, please check the jobId argument");
}
needHeartbeatAlive = false;
for (map<int64_t, Container*>::iterator it = jobIdContainers.begin();
it != jobIdContainers.end(); it++) {
std::ostringstream key;
Container *container = it->second;
key << container->getNodeId().getHost() << ":" << container->getNodeId().getPort();
Token nmToken = nmTokenCache[key.str()];
((ContainerManagement*) nmClient)->stopContainer((*container), nmToken);
LOG(DEBUG1, "LibYarnClient::forceKillJob, container:%ld is stopped",container->getId().getId());
}
((ApplicationClient*) appClient)->forceKillApplication(clientAppId);
LOG(INFO, "LibYarnClient::force to kill this application.");
for (map<int64_t,Container*>::iterator it = jobIdContainers.begin();
it != jobIdContainers.end(); it++) {
LOG(DEBUG1, "LibYarnClient::forceKillJob, container:%ld in jobIdContainers is deleted",
it->second->getId().getId());
delete it->second;
it->second = NULL;
}
jobIdContainers.clear();
activeFailContainerIds.clear();
return FR_SUCCEEDED;
} catch (std::exception& e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::forceKillJob, catch the exception:" << e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::forceKillJob, catch unexpected exception.";
setErrorMessage(errorMsg.str());
return FR_FAILED;
}
}
void LibYarnClient::dummyAllocate() {
pthread_mutex_lock(&heartbeatLock);
ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient;
//1) requestProto_blank
list<ResourceRequest> asksBlank;
//2) releasesBlank
list<ContainerId> releasesBlank;
//3) blacklistRequestBlank
ResourceBlacklistRequest blacklistRequestBlank;
//4) progress
float progress = 0.5;
int allocatedNum = 0;
try {
LOG(DEBUG1, "LibYarnClient::dummyAllocate, do a AM-RM heartbeat with response_id:%d", response_id);
AllocateResponse response = amrmClientAlias->allocate(asksBlank,
releasesBlank, blacklistRequestBlank, response_id, progress);
response_id = response.getResponseId();
list<Container> allocatedContainers = response.getAllocatedContainers();
allocatedNum = allocatedContainers.size();
LOG(DEBUG1, "LibYarnClient::dummyAllocate returned response_id :%d", response_id);
if (allocatedNum > 0) {
/*
* In rare case, client gets allocated containers in heartbeat,
* free them immediately.
*/
LOG(DEBUG1, "LibYarnClient::dummyAllocate returned allocated size: %d, "
"free them immediately.", allocatedNum);
list<ContainerId> releases;
for (list<Container>::iterator it = allocatedContainers.begin();
it != allocatedContainers.end(); it++) {
releases.push_back((*it).getId());
}
list<ResourceRequest> asksBlank;
ResourceBlacklistRequest blacklistRequestBlank;
response = amrmClientAlias->allocate(asksBlank, releases,
blacklistRequestBlank, response_id, progress);
response_id = response.getResponseId();
}
pthread_mutex_unlock(&heartbeatLock);
}
catch (const YarnException &e) {
LOG(WARNING, "LibYarnClient::dummyAllocate, dummy allocation "
"is not correctly executed with exception raised. %s",
e.msg());
pthread_mutex_unlock(&heartbeatLock);
throw;
}
}
void LibYarnClient::addResourceRequest(Resource capability,
int32_t num_containers, string host, int32_t priority,
bool relax_locality)
{
ResourceRequest *req = new ResourceRequest();
req->setCapability(capability);
req->setNumContainers(num_containers);
Priority priorityProto;
priorityProto.setPriority(priority);
req->setPriority(priorityProto);
req->setResourceName(host);
req->setRelaxLocality(relax_locality);
try {
askRequests.push_back(*req);
LOG(DEBUG1, "LibYarnClient::put a request into ask list, "
"mem:%d, cpu:%d, priority:%d, resource name:%s, relax:%d, num_containers:%d",
capability.getMemory(), capability.getVirtualCores(), priority, host.c_str(),
relax_locality, num_containers);
} catch (std::exception &e) {
LOG(WARNING, "LibYarnClient::Fail to add a resource request "
"to ask list. %s ", e.what());
}
}
/*
* This function creates resource requests according to the requirement of the caller, then put these
* requests into a list. The logic is a little similar to java client AMRMClient:addContainerRequest().
* There are three level nodes in YARN: off-switch(aka ANY), rack, host.By now, only ANY-level and host-level
* is supported.
*
* Parameters:
* jobId: jobId
* capability: the quota of the resource
* count: the required number of the containers
* preferred: node list, NULL means ANY host. If one node's rack name is NULL, a default rack name is set.
* priority: priority
*/
int LibYarnClient::addContainerRequests(string &jobId, Resource &capability,
int32_t num_containers, list<LibYarnNodeInfo> &preferred,
int32_t priority, bool relax_locality)
{
try {
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong, check the jobId argument");
}
map<string, int32_t> inferredRacks;
for (list<LibYarnNodeInfo>::iterator iter = preferred.begin();
iter != preferred.end(); iter++) {
LOG(DEBUG1, "LibYarnClient::addContainerRequests, "
"get a preferred host info, host:%s,rack:%s,container number:%d",
iter->getHost().c_str(), iter->getRack().c_str(), iter->getContainerNum());
/* add a resource request for this node */
addResourceRequest(capability, iter->getContainerNum(), iter->getHost(), priority, true);
map<string, int32_t>::iterator it = inferredRacks.find(iter->getRack());
if (it != inferredRacks.end())
it->second += iter->getContainerNum();
else
inferredRacks.insert(make_pair(iter->getRack(), iter->getContainerNum()));
}
/* add resource requests for racks*/
for (map<string, int32_t>::iterator it = inferredRacks.begin();
it != inferredRacks.end(); it++) {
addResourceRequest(capability, it->second, it->first, priority, relax_locality);
}
/* add resource request for off-switch */
addResourceRequest(capability, num_containers, YARN_HOST_ANY, priority, relax_locality);
return FR_SUCCEEDED;
} catch (std::exception &e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::addContainerRequests catch std exception:" << e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::addContainerRequests catch unexpected exception.";
setErrorMessage(errorMsg.str());
return FR_FAILED;
}
}
/*
message AllocateRequestProto {
repeated ResourceRequestProto ask = 1;
repeated ContainerIdProto release = 2;
optional ResourceBlacklistRequestProto blacklist_request = 3;
optional int32 response_id = 4;
optional float progress = 5;
}
message AllocateResponseProto {
optional AMCommandProto a_m_command = 1;
optional int32 response_id = 2;
repeated ContainerProto allocated_containers = 3;
repeated ContainerStatusProto completed_container_statuses = 4;
optional ResourceProto limit = 5;
repeated NodeReportProto updated_nodes = 6;
optional int32 num_cluster_nodes = 7;
optional PreemptionMessageProto preempt = 8;
repeated NMTokenProto nm_tokens = 9;
}
*/
int LibYarnClient::allocateResources(string &jobId,
list<string> &blackListAdditions, list<string> &blackListRemovals,
list<Container> &allocatedContainers, int32_t num_containers)
{
try{
AllocateResponse response;
int retry = 5;
int allocatedNumOnce = 0;
int allocatedNumTotal = 0;
pthread_mutex_lock(&heartbeatLock);
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong, check the jobId argument");
}
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
}
ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient;
list<Container> allocatedContainerCache;
list<ContainerReport> preContainerReports;
preContainerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId);
list<ContainerId> releasesBlank;
ResourceBlacklistRequest blacklistRequest;
blacklistRequest.setBlacklistAdditions(blackListAdditions);
blacklistRequest.setBlacklistRemovals(blackListRemovals);
float progress = 0.5;
LOG(DEBUG1, "LibYarnClient::request to allocate resource, container number:%d",
num_containers);
while (retry > 0) {
LOG(DEBUG1, "LibYarnClient::allocate with response id : %d", response_id);
AllocateResponse response = amrmClientAlias->allocate(
this->askRequests, releasesBlank, blacklistRequest,
response_id, progress);
response_id = response.getResponseId();
LOG(DEBUG1, "LibYarnClient::allocate returned response id : %d", response_id);
list<NMToken> nmTokens = response.getNMTokens();
for (list<NMToken>::iterator it = nmTokens.begin(); it != nmTokens.end(); it++) {
std::ostringstream oss;
oss << (*it).getNodeId().getHost() << ":" << (*it).getNodeId().getPort();
nmTokenCache[oss.str()] = (*it).getToken();
}
this->clearAskRequests();
list<Container> allocatedContainerOnce = response.getAllocatedContainers();
allocatedNumOnce = allocatedContainerOnce.size();
if (allocatedNumOnce <= 0) {
LOG(DEBUG1, "LibYarnClient:: fail to allocate from YARN RM, try again");
retry--;
if(retry == 0 && allocatedNumTotal == 0) {
/* If failed, just return to Resource Broker to handle*/
pthread_mutex_unlock(&heartbeatLock);
LOG(WARNING,"LibYarnClient:: fail to allocate from YARN RM after retry several times");
return FR_SUCCEEDED;
}
} else {
allocatedNumTotal += allocatedNumOnce;
allocatedContainerCache.insert(allocatedContainerCache.end(), allocatedContainerOnce.begin(), allocatedContainerOnce.end());
LOG(DEBUG1, "LibYarnClient:: allocate %d containers from YARN RM", allocatedNumOnce);
if (allocatedNumTotal >= num_containers) {
LOG(DEBUG1, "LibYarnClient:: allocate enough containers from YARN RM, "
"expected:%d, total:%d", num_containers, allocatedNumTotal);
break;
}
}
usleep(TimeInterval::ALLOCATE_INTERVAL_MS);
}
LOG(INFO,"LibYarnClient::allocate resource, response_id:%d, allocated container number:%d",
response_id, allocatedNumTotal);
/* a workaround for allocate more container than request */
list<ContainerId> releases;
list<ContainerReport> afterContainerReports;
afterContainerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId);
for (list<ContainerReport>::iterator ait = afterContainerReports.begin();
ait != afterContainerReports.end(); ait++) {
bool foundInPre = false;
for (list<ContainerReport>::iterator pit =
preContainerReports.begin();
pit != preContainerReports.end(); pit++) {
if (pit->getId().getId() == ait->getId().getId()) {
foundInPre = true;
break;
}
}
if (!foundInPre) {
bool foundInNewAllocated = false;
for (list<Container>::iterator cit =
allocatedContainerCache.begin();
cit != allocatedContainerCache.end(); cit++) {
if (cit->getId().getId() == ait->getId().getId()) {
foundInNewAllocated = true;
break;
}
}
if (!foundInNewAllocated) {
releases.push_back((*ait).getId());
}
}
}
int totalNeedRelease = allocatedContainerCache.size() - num_containers;
LOG(DEBUG1, "LibYarnClient::allocateResources, total_allocated_containers:%ld, total_need_release:%d",
allocatedContainerCache.size(), totalNeedRelease);
if(totalNeedRelease > 0) {
for (int i = 0; i < totalNeedRelease; i++) {
list<Container>::iterator it = allocatedContainerCache.begin();
releases.push_back((*it).getId());
allocatedContainerCache.erase(it);
}
list<ResourceRequest> asksBlank;
ResourceBlacklistRequest blacklistRequestBlank;
response = amrmClientAlias->allocate(asksBlank, releases,
blacklistRequestBlank, response_id, progress);
response_id = response.getResponseId();
}
/* 3. store allocated containers */
for (list<Container>::iterator it = allocatedContainerCache.begin();
it != allocatedContainerCache.end(); it++) {
Container *container = new Container((*it));
int64_t containerId = container->getId().getId();
jobIdContainers[containerId] = container;
}
allocatedContainers = allocatedContainerCache;
LOG(DEBUG1, "LibYarnClient::allocateResources, put all allocated containers size:%ld",
allocatedContainerCache.size());
pthread_mutex_unlock(&heartbeatLock);
return FR_SUCCEEDED;
} catch(std::exception &e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::allocateResources, catch exception:" << e.what();
setErrorMessage(errorMsg.str());
pthread_mutex_unlock(&heartbeatLock);
return FR_FAILED;
} catch (const ApplicationMasterNotRegisteredException &e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::allocateResources, "
"catch ApplicationMasterNotRegisteredException." << e.what();
setErrorMessage(errorMsg.str());
pthread_mutex_unlock(&heartbeatLock);
return FR_FAILED;
} catch (...) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::allocateResources, catch unexpected exception.";
setErrorMessage(errorMsg.str());
pthread_mutex_unlock(&heartbeatLock);
return FR_FAILED;
}
}
int LibYarnClient::releaseResources(string &jobId,int64_t releaseContainerIds[], int releaseContainerSize)
{
try{
pthread_mutex_lock(&heartbeatLock);
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
}
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
}
ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient;
//1) asksBlank
list<ResourceRequest> asksBlank;
//2) releases
list<ContainerId> releases;
for (int i = 0; i < releaseContainerSize; i++) {
int64_t containerId = releaseContainerIds[i];
map<int64_t, Container*>::iterator it = jobIdContainers.find(containerId);
if (it != jobIdContainers.end()) {
releases.push_back((it->second)->getId());
}
}
//3) blacklistRequestBlank
ResourceBlacklistRequest blacklistRequestBlank;
//4) progress
float progress = 0.5;
AllocateResponse response = amrmClientAlias->allocate(asksBlank,
releases, blacklistRequestBlank, response_id, progress);
response_id = response.getResponseId();
//erase from the map jobIdContainers
for (list<ContainerId>::iterator it = releases.begin(); it != releases.end(); it++) {
LOG(DEBUG1, "LibYarnClient::releaseResource, released ContainerId:%ld",
it->getId());
map<int64_t, Container*>::iterator cit = jobIdContainers.find(it->getId());
if (cit != jobIdContainers.end()) {
delete cit->second;
cit->second = NULL;
jobIdContainers.erase(it->getId());
}
//erase the element if in activeFailContainers
set<int64_t>::iterator sit = activeFailContainerIds.find(it->getId());
if (sit != activeFailContainerIds.end()) {
LOG(INFO, "LibYarnClient::releaseResource, remove %ld from activeFailContainerIds",
(*sit));
activeFailContainerIds.erase(*sit);
}
}
LOG(INFO, "LibYarnClient::release resources, container number:%d",
releases.size());
pthread_mutex_unlock(&heartbeatLock);
return FR_SUCCEEDED;
} catch (std::exception &e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::releaseResources, catch exception:" << e.what();
setErrorMessage(errorMsg.str());
pthread_mutex_unlock(&heartbeatLock);
return FR_FAILED;
} catch (...) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::releaseResources, catch unexpected exception.";
setErrorMessage(errorMsg.str());
pthread_mutex_unlock(&heartbeatLock);
return FR_FAILED;
}
}
/*
---------------------
message StartContainerRequestProto {
optional ContainerLaunchContextProto container_launch_context = 1;
optional hadoop.common.TokenProto container_token = 2;
}
message StartContainerResponseProto {
repeated StringBytesMapProto services_meta_data = 1;
}
---------------------
rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto);
---------------------
message StartContainersRequestProto {
repeated StartContainerRequestProto start_container_request = 1;
}
message StartContainersResponseProto {
repeated StringBytesMapProto services_meta_data = 1;
repeated ContainerIdProto succeeded_requests = 2;
repeated ContainerExceptionMapProto failed_requests = 3;
}
*/
int LibYarnClient::activeResources(string &jobId,int64_t activeContainerIds[],int activeContainerSize) {
try{
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
}
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
}
LOG(DEBUG1, "LibYarnClient::activeResources, activeResources started");
for (int i = 0; i < activeContainerSize; i++) {
int64_t containerId = activeContainerIds[i];
map<int64_t, Container*>::iterator it = jobIdContainers.find(containerId);
if (it != jobIdContainers.end()) {
try {
Container *container = it->second;
std::ostringstream key;
key << container->getNodeId().getHost() << ":" << container->getNodeId().getPort();
Token nmToken = nmTokenCache[key.str()];
string cmd("sleep 10000000000");
list<string> cmds;
cmds.push_back(cmd);
ContainerLaunchContext ctx;
ctx.setCommand(cmds);
StartContainerRequest request;
request.setContainerLaunchCtx(ctx);
Token cToken = container->getContainerToken();
request.setContainerToken(cToken);
LOG(DEBUG1, "LibYarnClient::activeResources active containerId:%ld", containerId);
((ContainerManagement*)nmClient)->startContainer((*container), request, nmToken);
} catch (std::exception& e) {
LOG(WARNING, "LibYarnClient::activeResources, activeResources Failed Id:%ld,exception:%s",
containerId,e.what());
activeFailContainerIds.insert(containerId);
}
}
}
LOG(INFO, "LibYarnClient::active resources, container number:%d",
activeContainerSize);
return FR_SUCCEEDED;
} catch (std::exception& e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::activeResources, Catch the Exception:" << e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::activeResources, catch unexpected exception.";
setErrorMessage(errorMsg.str());
return FR_FAILED;
}
}
int LibYarnClient::getActiveFailContainerIds(set<int64_t> &activeFailIds){
activeFailIds = activeFailContainerIds;
return FR_SUCCEEDED;
}
int LibYarnClient::finishJob(string &jobId, FinalApplicationStatus finalStatus) {
#ifndef MOCKTEST
if (keepRun) {
// No need to run heart-beat thread now.
keepRun = false;
void *thrc = NULL;
int rc = pthread_join(heartbeatThread, &thrc);
if (rc != 0) {
LOG(WARNING, "LibYarnClient::finishJob, fail to join heart-beat thread. "
"error code %d", rc);
return FR_FAILED;
} else {
LOG(INFO, "LibYarnClient::finishJob, join heart-beat thread successfully.");
}
}
#endif
try {
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
}
needHeartbeatAlive = false;
//1. we should stop all containers related with this job
for (map<int64_t, Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) {
std::ostringstream key;
Container *container = it->second;
key << container->getNodeId().getHost() << ":" << container->getNodeId().getPort();
Token nmToken = nmTokenCache[key.str()];
((ContainerManagement*) nmClient)->stopContainer((*container), nmToken);
LOG(DEBUG1, "LibYarnClient::finishJob, container:%ld is stopped",container->getId().getId());
}
LOG(DEBUG1, "LibYarnClient::finishJob, all containers for jobId:%s are stopped",jobId.c_str());
//2. finish AM
string diagnostics("");
string tracking_url("");
((ApplicationMaster*) amrmClient)->finishApplicationMaster(diagnostics, tracking_url, finalStatus);
LOG(INFO, "LibYarnClient::finishJob, finish AM for jobId:%s, finalStatus:%d", jobId.c_str(), finalStatus);
//free the Container* memory
for (map<int64_t, Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers.end(); it++) {
LOG(DEBUG1, "LibYarnClient::finishJob, container:%ld in jobIdContainers is deleted",
it->second->getId().getId());
delete it->second;
it->second = NULL;
}
jobIdContainers.clear();
activeFailContainerIds.clear();
return FR_SUCCEEDED;
} catch (std::exception& e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::finishJob, catch the Exception:" << e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (const ApplicationMasterNotRegisteredException &e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::finishJob, "
"catch ApplicationMasterNotRegisteredException." << e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::finishJob, catch unexpected exception.";
setErrorMessage(errorMsg.str());
return FR_FAILED;
}
}
int LibYarnClient::getApplicationReport(string &jobId,ApplicationReport &applicationReport) {
try {
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
}
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
}
LOG(DEBUG1, "LibYarnClient::getApplicationReport, appId[cluster_timestamp:%lld,id:%d]",
clientAppId.getClusterTimestamp(), clientAppId.getId());
applicationReport = ((ApplicationClient*) appClient)->getApplicationReport(clientAppId);
LOG(DEBUG1, "LibYarnClient::getApplicationReport, appId[cluster_timestamp:%lld,id:%d],getCurrentAppAttemptId:%d",
applicationReport.getApplicationId().getClusterTimestamp(),
applicationReport.getApplicationId().getId(),
applicationReport.getCurrentAppAttemptId().getAttemptId());
return FR_SUCCEEDED;
} catch (std::exception& e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::getApplicationReport, Catch the Exception:"
<< e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::getApplicationReport, catch unexpected exception.";
setErrorMessage(errorMsg.str());
return FR_FAILED;
}
}
int LibYarnClient::getContainerReports(string &jobId,list<ContainerReport> &containerReports) {
try {
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
}
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
}
LOG(DEBUG1, "LibYarnClient::getContainerReports, appId[cluster_timestamp:%lld,id:%d]",
clientAppId.getClusterTimestamp(), clientAppId.getId());
containerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId);
return FR_SUCCEEDED;
} catch (std::exception& e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::getContainerReports, catch the Exception:" << e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::getContainerReports, catch unexpected exception.";
setErrorMessage(errorMsg.str());
return FR_FAILED;
}
}
int LibYarnClient::getContainerStatuses(string &jobId, int64_t containerIds[],
int containerSize, list<ContainerStatus> &containerStatues)
{
try {
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
}
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
}
for (int i = 0; i < containerSize; i++) {
int64_t containerId = containerIds[i];
map<int64_t, Container*>::iterator it = jobIdContainers.find(containerId);
if (it != jobIdContainers.end()) {
try {
Container *container = it->second;
std::ostringstream key;
key << container->getNodeId().getHost() << ":"<< container->getNodeId().getPort();
Token nmToken = nmTokenCache[key.str()];
ContainerStatus containerStatus = ((ContainerManagement*) nmClient)->getContainerStatus((*container), nmToken);
// the response containerId will be 0 if the request containerId is not exist
if (containerStatus.getContainerId().getId() != 0){
containerStatues.push_back(containerStatus);
}
} catch (std::exception& e) {
LOG(INFO, "LibYarnClient::getContainerStatuses, getContainerStatuses Failed Id:%ld,exception:%s",
containerId, e.what());
}
}
}
return FR_SUCCEEDED;
} catch (std::exception& e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::getContainerStatuses, Catch the Exception:" << e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::getContainerStatuses, catch unexpected exception.";
setErrorMessage(errorMsg.str());
return FR_FAILED;
}
}
int LibYarnClient::getQueueInfo(string &queue, bool includeApps,
bool includeChildQueues, bool recursive, QueueInfo &queueInfo)
{
try {
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
}
queueInfo = ((ApplicationClient*) appClient)->getQueueInfo(queue,
includeApps, includeChildQueues, recursive);
return FR_SUCCEEDED;
} catch (std::exception& e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::getQueueInfo, Catch the Exception:" << e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::getQueueInfo, catch unexpected exception.";
setErrorMessage(errorMsg.str());
return FR_FAILED;
}
}
int LibYarnClient::getClusterNodes(list<NodeState> &states,list<NodeReport> &nodeReports) {
try{
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
}
nodeReports = ((ApplicationClient*) appClient)->getClusterNodes(states);
return FR_SUCCEEDED;
} catch (std::exception& e) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::getClusterNodes, Catch the Exception:" << e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
std::stringstream errorMsg;
errorMsg << "LibYarnClient::getClusterNodes, catch unexpected exception.";
setErrorMessage(errorMsg.str());
return FR_FAILED;
}
}
}