| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "ContainerManagement.h" |
| |
| #include "rpc/RpcAuth.h" |
| #include "common/XmlConfig.h" |
| #include "common/SessionConfig.h" |
| #include <sstream> |
| |
| namespace libyarn { |
| |
| ContainerManagement::ContainerManagement() { |
| #ifdef MOCKTEST |
| stub = NULL; |
| #endif |
| } |
| |
| ContainerManagement::~ContainerManagement() { |
| } |
| |
| /* |
| --------------------- |
| message StartContainerRequestProto { |
| optional ContainerLaunchContextProto container_launch_context = 1; |
| optional hadoop.common.TokenProto container_token = 2; |
| } |
| |
| message StartContainerResponseProto { |
| repeated StringBytesMapProto services_meta_data = 1; |
| } |
| --------------------- |
| rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto); |
| --------------------- |
| message StartContainersRequestProto { |
| repeated StartContainerRequestProto start_container_request = 1; |
| } |
| |
| message StartContainersResponseProto { |
| repeated StringBytesMapProto services_meta_data = 1; |
| repeated ContainerIdProto succeeded_requests = 2; |
| repeated ContainerExceptionMapProto failed_requests = 3; |
| } |
| */ |
| |
| StartContainerResponse ContainerManagement::startContainer(Container &container, |
| StartContainerRequest &request, Token &nmToken) { |
| //1. setup Connection to NodeManager |
| string host = container.getNodeId().getHost(); |
| std::ostringstream oss; |
| oss << container.getNodeId().getPort(); |
| string port(oss.str()); |
| |
| LOG(DEBUG1, |
| "ContainerManagement::startContainer, is going to connect to NM [%s:%s] to start container", |
| host.c_str(), port.c_str()); |
| |
| UserInfo user = UserInfo::LocalUser(); |
| Yarn::Token anotherToken; |
| anotherToken.setIdentifier(nmToken.getIdentifier()); |
| anotherToken.setKind(nmToken.getKind()); |
| anotherToken.setPassword(nmToken.getPassword()); |
| anotherToken.setService(nmToken.getService()); |
| user.addToken(anotherToken); |
| |
| RpcAuth rpcAuth(user, AuthMethod::TOKEN); |
| Yarn::Config config; |
| SessionConfig sessionConfig(config); |
| |
| #ifdef MOCKTEST |
| ContainerManagementProtocol *nmClient = stub->getContainerManagementProtocol(); |
| #else |
| ContainerManagementProtocol *nmClient = new ContainerManagementProtocol( |
| host, port, anotherToken.getService(), sessionConfig, rpcAuth); |
| #endif |
| |
| //2. startContainers |
| StartContainersRequest scsRequest; |
| list<StartContainerRequest> requests; |
| requests.push_back(request); |
| scsRequest.setStartContainerRequests(requests); |
| |
| StartContainersResponse scsResponse = nmClient->startContainers(scsRequest); |
| |
| StartContainerResponse scResponse; |
| scResponse.setServicesMetaData(scsResponse.getServicesMetaData()); |
| |
| LOG(DEBUG1, |
| "ContainerManagement::startContainer, after start a container, id:%ld on NM [%s:%s]", |
| container.getId().getId(), host.c_str(), port.c_str()); |
| |
| //3. free |
| delete nmClient; |
| |
| return scResponse; |
| } |
| |
| /* |
| rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto); |
| |
| message StopContainersRequestProto { |
| repeated ContainerIdProto container_id = 1; |
| } |
| |
| message StopContainersResponseProto { |
| repeated ContainerIdProto succeeded_requests = 1; |
| repeated ContainerExceptionMapProto failed_requests = 2; |
| } |
| */ |
| |
| void ContainerManagement::stopContainer(Container &container, Token &nmToken) { |
| //1. setup Connection to NodeManager |
| string host = container.getNodeId().getHost(); |
| std::ostringstream oss; |
| oss << container.getNodeId().getPort(); |
| string port(oss.str()); |
| |
| LOG(DEBUG1, |
| "ContainerManagement::stopContainer, is going to connect to NM [%s:%s] to stop container", |
| host.c_str(), port.c_str()); |
| |
| UserInfo user = UserInfo::LocalUser(); |
| Yarn::Token anotherToken; |
| anotherToken.setIdentifier(nmToken.getIdentifier()); |
| anotherToken.setKind(nmToken.getKind()); |
| anotherToken.setPassword(nmToken.getPassword()); |
| anotherToken.setService(nmToken.getService()); |
| user.addToken(anotherToken); |
| |
| RpcAuth rpcAuth(user, AuthMethod::TOKEN); |
| Yarn::Config config; |
| SessionConfig sessionConfig(config); |
| |
| #ifdef MOCKTEST |
| ContainerManagementProtocol *nmClient = stub->getContainerManagementProtocol(); |
| #else |
| ContainerManagementProtocol *nmClient = new ContainerManagementProtocol( |
| host, port, anotherToken.getService(), sessionConfig, rpcAuth); |
| #endif |
| |
| //2. stopContainers |
| ContainerId cid = container.getId(); |
| list<ContainerId> cids; |
| cids.push_back(cid); |
| |
| StopContainersRequest request; |
| request.setContainerIds(cids); |
| nmClient->stopContainers(request); |
| //3. free |
| delete nmClient; |
| } |
| |
| ContainerStatus ContainerManagement::getContainerStatus(Container &container, |
| Token &nmToken) { |
| //1. setup Connection to NodeManager |
| string host = container.getNodeId().getHost(); |
| std::ostringstream oss; |
| oss << container.getNodeId().getPort(); |
| string port(oss.str()); |
| |
| LOG(DEBUG1, |
| "ContainerManagement, is going to connect to NM [%s:%s] to getContainerStatus container", |
| host.c_str(), port.c_str()); |
| |
| UserInfo user = UserInfo::LocalUser(); |
| Yarn::Token anotherToken; |
| anotherToken.setIdentifier(nmToken.getIdentifier()); |
| anotherToken.setKind(nmToken.getKind()); |
| anotherToken.setPassword(nmToken.getPassword()); |
| anotherToken.setService(nmToken.getService()); |
| user.addToken(anotherToken); |
| |
| RpcAuth rpcAuth(user, AuthMethod::TOKEN); |
| Yarn::Config config; |
| SessionConfig sessionConfig(config); |
| |
| #ifdef MOCKTEST |
| ContainerManagementProtocol *nmClient = stub->getContainerManagementProtocol(); |
| #else |
| ContainerManagementProtocol *nmClient = new ContainerManagementProtocol( |
| host, port, anotherToken.getService(), sessionConfig, rpcAuth); |
| #endif |
| |
| ContainerId cid = container.getId(); |
| list<ContainerId> cids; |
| cids.push_back(cid); |
| |
| GetContainerStatusesRequest request; |
| request.setContainerIds(cids); |
| |
| GetContainerStatusesResponse response = nmClient->getContainerStatuses(request); |
| |
| //3. free |
| delete nmClient; |
| list<ContainerStatus> statusList = response.getContainerStatuses(); |
| if (statusList.size() > 0){ |
| list<ContainerStatus>::iterator statusHead = statusList.begin(); |
| return (*statusHead); |
| }else{ |
| ContainerStatus status; |
| return status; |
| } |
| |
| } |
| |
| } /* namespace libyarn */ |