blob: 874c5ce9665875abc04e51c393242c58c74dcaf5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "ContainerManagement.h"
#include "rpc/RpcAuth.h"
#include "common/XmlConfig.h"
#include "common/SessionConfig.h"
#include <sstream>
namespace libyarn {
ContainerManagement::ContainerManagement() {
#ifdef MOCKTEST
stub = NULL;
#endif
}
ContainerManagement::~ContainerManagement() {
}
/*
---------------------
message StartContainerRequestProto {
optional ContainerLaunchContextProto container_launch_context = 1;
optional hadoop.common.TokenProto container_token = 2;
}
message StartContainerResponseProto {
repeated StringBytesMapProto services_meta_data = 1;
}
---------------------
rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto);
---------------------
message StartContainersRequestProto {
repeated StartContainerRequestProto start_container_request = 1;
}
message StartContainersResponseProto {
repeated StringBytesMapProto services_meta_data = 1;
repeated ContainerIdProto succeeded_requests = 2;
repeated ContainerExceptionMapProto failed_requests = 3;
}
*/
StartContainerResponse ContainerManagement::startContainer(Container &container,
StartContainerRequest &request, Token &nmToken) {
//1. setup Connection to NodeManager
string host = container.getNodeId().getHost();
std::ostringstream oss;
oss << container.getNodeId().getPort();
string port(oss.str());
LOG(DEBUG1,
"ContainerManagement::startContainer, is going to connect to NM [%s:%s] to start container",
host.c_str(), port.c_str());
UserInfo user = UserInfo::LocalUser();
Yarn::Token anotherToken;
anotherToken.setIdentifier(nmToken.getIdentifier());
anotherToken.setKind(nmToken.getKind());
anotherToken.setPassword(nmToken.getPassword());
anotherToken.setService(nmToken.getService());
user.addToken(anotherToken);
RpcAuth rpcAuth(user, AuthMethod::TOKEN);
Yarn::Config config;
SessionConfig sessionConfig(config);
#ifdef MOCKTEST
ContainerManagementProtocol *nmClient = stub->getContainerManagementProtocol();
#else
ContainerManagementProtocol *nmClient = new ContainerManagementProtocol(
host, port, anotherToken.getService(), sessionConfig, rpcAuth);
#endif
//2. startContainers
StartContainersRequest scsRequest;
list<StartContainerRequest> requests;
requests.push_back(request);
scsRequest.setStartContainerRequests(requests);
StartContainersResponse scsResponse = nmClient->startContainers(scsRequest);
StartContainerResponse scResponse;
scResponse.setServicesMetaData(scsResponse.getServicesMetaData());
LOG(DEBUG1,
"ContainerManagement::startContainer, after start a container, id:%ld on NM [%s:%s]",
container.getId().getId(), host.c_str(), port.c_str());
//3. free
delete nmClient;
return scResponse;
}
/*
rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto);
message StopContainersRequestProto {
repeated ContainerIdProto container_id = 1;
}
message StopContainersResponseProto {
repeated ContainerIdProto succeeded_requests = 1;
repeated ContainerExceptionMapProto failed_requests = 2;
}
*/
void ContainerManagement::stopContainer(Container &container, Token &nmToken) {
//1. setup Connection to NodeManager
string host = container.getNodeId().getHost();
std::ostringstream oss;
oss << container.getNodeId().getPort();
string port(oss.str());
LOG(DEBUG1,
"ContainerManagement::stopContainer, is going to connect to NM [%s:%s] to stop container",
host.c_str(), port.c_str());
UserInfo user = UserInfo::LocalUser();
Yarn::Token anotherToken;
anotherToken.setIdentifier(nmToken.getIdentifier());
anotherToken.setKind(nmToken.getKind());
anotherToken.setPassword(nmToken.getPassword());
anotherToken.setService(nmToken.getService());
user.addToken(anotherToken);
RpcAuth rpcAuth(user, AuthMethod::TOKEN);
Yarn::Config config;
SessionConfig sessionConfig(config);
#ifdef MOCKTEST
ContainerManagementProtocol *nmClient = stub->getContainerManagementProtocol();
#else
ContainerManagementProtocol *nmClient = new ContainerManagementProtocol(
host, port, anotherToken.getService(), sessionConfig, rpcAuth);
#endif
//2. stopContainers
ContainerId cid = container.getId();
list<ContainerId> cids;
cids.push_back(cid);
StopContainersRequest request;
request.setContainerIds(cids);
nmClient->stopContainers(request);
//3. free
delete nmClient;
}
ContainerStatus ContainerManagement::getContainerStatus(Container &container,
Token &nmToken) {
//1. setup Connection to NodeManager
string host = container.getNodeId().getHost();
std::ostringstream oss;
oss << container.getNodeId().getPort();
string port(oss.str());
LOG(DEBUG1,
"ContainerManagement, is going to connect to NM [%s:%s] to getContainerStatus container",
host.c_str(), port.c_str());
UserInfo user = UserInfo::LocalUser();
Yarn::Token anotherToken;
anotherToken.setIdentifier(nmToken.getIdentifier());
anotherToken.setKind(nmToken.getKind());
anotherToken.setPassword(nmToken.getPassword());
anotherToken.setService(nmToken.getService());
user.addToken(anotherToken);
RpcAuth rpcAuth(user, AuthMethod::TOKEN);
Yarn::Config config;
SessionConfig sessionConfig(config);
#ifdef MOCKTEST
ContainerManagementProtocol *nmClient = stub->getContainerManagementProtocol();
#else
ContainerManagementProtocol *nmClient = new ContainerManagementProtocol(
host, port, anotherToken.getService(), sessionConfig, rpcAuth);
#endif
ContainerId cid = container.getId();
list<ContainerId> cids;
cids.push_back(cid);
GetContainerStatusesRequest request;
request.setContainerIds(cids);
GetContainerStatusesResponse response = nmClient->getContainerStatuses(request);
//3. free
delete nmClient;
list<ContainerStatus> statusList = response.getContainerStatuses();
if (statusList.size() > 0){
list<ContainerStatus>::iterator statusHead = statusList.begin();
return (*statusHead);
}else{
ContainerStatus status;
return status;
}
}
} /* namespace libyarn */