blob: 138bee6bde9fae51afddd7be866e79293a4cd787 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifdef __WINDOWS__
#include <stout/internal/windows/grp.hpp>
#include <stout/internal/windows/pwd.hpp>
#else
#include <grp.h>
#include <pwd.h>
#endif // __WINDOWS__
#include <ostream>
#include <vector>
#include <mesos/slave/isolator.hpp>
#include <mesos/type_utils.hpp>
#include <process/clock.hpp>
#include <process/pid.hpp>
#include <stout/adaptor.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/net.hpp>
#include <stout/stringify.hpp>
#include <stout/unreachable.hpp>
#include <stout/uuid.hpp>
#include <stout/os/permissions.hpp>
#ifdef __WINDOWS__
#include <stout/windows.hpp>
#endif // __WINDOWS__
#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/resources_utils.hpp"
#include "master/master.hpp"
#include "master/constants.hpp"
#include "messages/messages.hpp"
using std::ostream;
using std::set;
using std::string;
using std::vector;
using google::protobuf::Map;
using google::protobuf::RepeatedPtrField;
using mesos::authorization::VIEW_ROLE;
using mesos::slave::ContainerLimitation;
using mesos::slave::ContainerState;
using process::Owned;
using process::UPID;
namespace mesos {
namespace internal {
namespace protobuf {
bool frameworkHasCapability(
const FrameworkInfo& framework,
FrameworkInfo::Capability::Type capability)
{
foreach (const FrameworkInfo::Capability& c,
framework.capabilities()) {
if (c.type() == capability) {
return true;
}
}
return false;
}
bool isTerminalState(const TaskState& state)
{
return (state == TASK_FINISHED ||
state == TASK_FAILED ||
state == TASK_KILLED ||
state == TASK_LOST ||
state == TASK_ERROR ||
state == TASK_DROPPED ||
state == TASK_GONE ||
state == TASK_GONE_BY_OPERATOR);
}
StatusUpdate createStatusUpdate(
const FrameworkID& frameworkId,
const Option<SlaveID>& slaveId,
const TaskID& taskId,
const TaskState& state,
const TaskStatus::Source& source,
const Option<id::UUID>& uuid,
const string& message,
const Option<TaskStatus::Reason>& reason,
const Option<ExecutorID>& executorId,
const Option<bool>& healthy,
const Option<CheckStatusInfo>& checkStatus,
const Option<Labels>& labels,
const Option<ContainerStatus>& containerStatus,
const Option<TimeInfo>& unreachableTime,
const Option<Resources>& limitedResources)
{
StatusUpdate update;
update.set_timestamp(process::Clock::now().secs());
update.mutable_framework_id()->MergeFrom(frameworkId);
if (slaveId.isSome()) {
update.mutable_slave_id()->MergeFrom(slaveId.get());
}
if (executorId.isSome()) {
update.mutable_executor_id()->MergeFrom(executorId.get());
}
// TODO(alexr): Use `createTaskStatus()` instead
// once `UUID` is required in this function.
TaskStatus* status = update.mutable_status();
status->mutable_task_id()->MergeFrom(taskId);
if (slaveId.isSome()) {
status->mutable_slave_id()->MergeFrom(slaveId.get());
}
status->set_state(state);
status->set_source(source);
status->set_message(message);
status->set_timestamp(update.timestamp());
if (uuid.isSome()) {
update.set_uuid(uuid->toBytes());
status->set_uuid(uuid->toBytes());
}
if (reason.isSome()) {
status->set_reason(reason.get());
}
if (healthy.isSome()) {
status->set_healthy(healthy.get());
}
if (checkStatus.isSome()) {
status->mutable_check_status()->CopyFrom(checkStatus.get());
}
if (labels.isSome()) {
status->mutable_labels()->CopyFrom(labels.get());
}
if (containerStatus.isSome()) {
status->mutable_container_status()->CopyFrom(containerStatus.get());
}
if (unreachableTime.isSome()) {
status->mutable_unreachable_time()->CopyFrom(unreachableTime.get());
}
if (limitedResources.isSome()) {
// Check that we are only sending the `Limitation` field when the
// reason is a container limitation.
CHECK_SOME(reason);
CHECK(
reason.get() == TaskStatus::REASON_CONTAINER_LIMITATION ||
reason.get() == TaskStatus::REASON_CONTAINER_LIMITATION_DISK ||
reason.get() == TaskStatus::REASON_CONTAINER_LIMITATION_MEMORY)
<< reason.get();
status->mutable_limitation()->mutable_resources()->CopyFrom(
limitedResources.get());
}
return update;
}
StatusUpdate createStatusUpdate(
const FrameworkID& frameworkId,
const TaskStatus& status,
const Option<SlaveID>& slaveId)
{
StatusUpdate update;
update.mutable_framework_id()->MergeFrom(frameworkId);
if (status.has_executor_id()) {
update.mutable_executor_id()->MergeFrom(status.executor_id());
}
update.mutable_status()->MergeFrom(status);
if (slaveId.isSome()) {
update.mutable_slave_id()->MergeFrom(slaveId.get());
// We also populate `TaskStatus.slave_id` if the executor
// did not set it.
if (!status.has_slave_id()) {
update.mutable_status()->mutable_slave_id()->MergeFrom(slaveId.get());
}
}
if (!status.has_timestamp()) {
update.set_timestamp(process::Clock::now().secs());
} else {
update.set_timestamp(status.timestamp());
}
if (status.has_uuid()) {
update.set_uuid(status.uuid());
}
return update;
}
TaskStatus createTaskStatus(
const TaskID& taskId,
const TaskState& state,
const id::UUID& uuid,
double timestamp)
{
TaskStatus status;
status.set_uuid(uuid.toBytes());
status.set_timestamp(timestamp);
status.mutable_task_id()->CopyFrom(taskId);
status.set_state(state);
return status;
}
TaskStatus createTaskStatus(
TaskStatus status,
const id::UUID& uuid,
double timestamp,
const Option<TaskState>& state,
const Option<string>& message,
const Option<TaskStatus::Source>& source,
const Option<TaskStatus::Reason>& reason,
const Option<string>& data,
const Option<bool>& healthy,
const Option<CheckStatusInfo>& checkStatus,
const Option<Labels>& labels,
const Option<ContainerStatus>& containerStatus,
const Option<TimeInfo>& unreachableTime)
{
status.set_uuid(uuid.toBytes());
status.set_timestamp(timestamp);
if (state.isSome()) {
status.set_state(state.get());
}
if (message.isSome()) {
status.set_message(message.get());
}
if (source.isSome()) {
status.set_source(source.get());
}
if (reason.isSome()) {
status.set_reason(reason.get());
}
if (data.isSome()) {
status.set_data(data.get());
}
if (healthy.isSome()) {
status.set_healthy(healthy.get());
}
if (checkStatus.isSome()) {
status.mutable_check_status()->CopyFrom(checkStatus.get());
}
if (labels.isSome()) {
status.mutable_labels()->CopyFrom(labels.get());
}
if (containerStatus.isSome()) {
status.mutable_container_status()->CopyFrom(containerStatus.get());
}
if (unreachableTime.isSome()) {
status.mutable_unreachable_time()->CopyFrom(unreachableTime.get());
}
return status;
}
Task createTask(
const TaskInfo& task,
const TaskState& state,
const FrameworkID& frameworkId)
{
Task t;
t.mutable_framework_id()->CopyFrom(frameworkId);
t.set_state(state);
t.set_name(task.name());
t.mutable_task_id()->CopyFrom(task.task_id());
t.mutable_slave_id()->CopyFrom(task.slave_id());
t.mutable_resources()->CopyFrom(task.resources());
if (task.has_executor()) {
t.mutable_executor_id()->CopyFrom(task.executor().executor_id());
}
if (task.has_labels()) {
t.mutable_labels()->CopyFrom(task.labels());
}
if (task.has_discovery()) {
t.mutable_discovery()->CopyFrom(task.discovery());
}
if (task.has_container()) {
t.mutable_container()->CopyFrom(task.container());
}
// Copy `user` if set.
if (task.has_command() && task.command().has_user()) {
t.set_user(task.command().user());
} else if (task.has_executor() && task.executor().command().has_user()) {
t.set_user(task.executor().command().user());
}
return t;
}
Option<bool> getTaskHealth(const Task& task)
{
Option<bool> healthy = None();
if (task.statuses_size() > 0) {
// The statuses list only keeps the most recent `TaskStatus` for
// each state, and appends later statuses at the end. Thus the last
// status is either a terminal state (where health is irrelevant),
// or the latest TASK_RUNNING status.
const TaskStatus& lastStatus = task.statuses(task.statuses_size() - 1);
if (lastStatus.has_healthy()) {
healthy = lastStatus.healthy();
}
}
return healthy;
}
Option<CheckStatusInfo> getTaskCheckStatus(const Task& task)
{
Option<CheckStatusInfo> checkStatus = None();
if (task.statuses_size() > 0) {
// The statuses list only keeps the most recent `TaskStatus` for
// each state, and appends later statuses at the end. Thus the last
// status is either a terminal state (where check is irrelevant),
// or the latest TASK_RUNNING status.
const TaskStatus& lastStatus = task.statuses(task.statuses_size() - 1);
if (lastStatus.has_check_status()) {
checkStatus = lastStatus.check_status();
}
}
return checkStatus;
}
Option<ContainerStatus> getTaskContainerStatus(const Task& task)
{
// The statuses list only keeps the most recent TaskStatus for
// each state, and appends later states at the end. Let's find
// the most recent TaskStatus with a valid container_status.
foreach (const TaskStatus& status, adaptor::reverse(task.statuses())) {
if (status.has_container_status()) {
return status.container_status();
}
}
return None();
}
bool isTerminalState(const OperationState& state)
{
switch (state) {
case OPERATION_FINISHED:
case OPERATION_FAILED:
case OPERATION_ERROR:
case OPERATION_DROPPED:
return true;
case OPERATION_UNSUPPORTED:
case OPERATION_PENDING:
case OPERATION_UNREACHABLE:
case OPERATION_GONE_BY_OPERATOR:
case OPERATION_RECOVERING:
case OPERATION_UNKNOWN:
return false;
}
UNREACHABLE();
}
OperationStatus createOperationStatus(
const OperationState& state,
const Option<OperationID>& operationId,
const Option<string>& message,
const Option<Resources>& convertedResources,
const Option<id::UUID>& uuid,
const Option<SlaveID>& slaveId,
const Option<ResourceProviderID>& resourceProviderId)
{
OperationStatus status;
status.set_state(state);
if (operationId.isSome()) {
status.mutable_operation_id()->CopyFrom(operationId.get());
}
if (message.isSome()) {
status.set_message(message.get());
}
if (convertedResources.isSome()) {
status.mutable_converted_resources()->CopyFrom(convertedResources.get());
}
if (uuid.isSome()) {
status.mutable_uuid()->CopyFrom(protobuf::createUUID(uuid.get()));
}
if (slaveId.isSome()) {
status.mutable_slave_id()->CopyFrom(slaveId.get());
}
if (resourceProviderId.isSome()) {
status.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
}
return status;
}
Operation createOperation(
const Offer::Operation& info,
const OperationStatus& latestStatus,
const Option<FrameworkID>& frameworkId,
const Option<SlaveID>& slaveId,
const Option<UUID>& operationUUID)
{
Operation operation;
if (frameworkId.isSome()) {
operation.mutable_framework_id()->CopyFrom(frameworkId.get());
}
if (slaveId.isSome()) {
operation.mutable_slave_id()->CopyFrom(slaveId.get());
}
operation.mutable_info()->CopyFrom(info);
operation.mutable_latest_status()->CopyFrom(latestStatus);
if (operationUUID.isSome()) {
operation.mutable_uuid()->CopyFrom(operationUUID.get());
} else {
operation.mutable_uuid()->CopyFrom(protobuf::createUUID());
}
return operation;
}
UpdateOperationStatusMessage createUpdateOperationStatusMessage(
const UUID& operationUUID,
const OperationStatus& status,
const Option<OperationStatus>& latestStatus,
const Option<FrameworkID>& frameworkId,
const Option<SlaveID>& slaveId)
{
UpdateOperationStatusMessage update;
if (frameworkId.isSome()) {
update.mutable_framework_id()->CopyFrom(frameworkId.get());
}
if (slaveId.isSome()) {
update.mutable_slave_id()->CopyFrom(slaveId.get());
}
update.mutable_status()->CopyFrom(status);
if (latestStatus.isSome()) {
update.mutable_latest_status()->CopyFrom(latestStatus.get());
}
update.mutable_operation_uuid()->CopyFrom(operationUUID);
return update;
}
UUID createUUID(const Option<id::UUID>& uuid)
{
UUID uuid_;
if (uuid.isSome()) {
uuid_.set_value(uuid->toBytes());
} else {
uuid_.set_value(id::UUID::random().toBytes());
}
return uuid_;
}
/**
* Creates a MasterInfo protobuf from the process's UPID.
*
* This is only used by the `StandaloneMasterDetector` (used in tests
* and outside tests when ZK is not used).
*
* For example, when we start a slave with
* `--master=master@127.0.0.1:5050`, since the slave (and consequently
* its detector) doesn't have enough information about `MasterInfo`, it
* tries to construct it based on the only available information
* (`UPID`).
*
* @param pid The process's assigned untyped PID.
* @return A fully formed `MasterInfo` with the IP/hostname information
* as derived from the `UPID`.
*/
MasterInfo createMasterInfo(const UPID& pid)
{
MasterInfo info;
info.set_id(stringify(pid) + "-" + id::UUID::random().toString());
// NOTE: Currently, we store the ip in network order, which should
// be fixed. See MESOS-1201 for more details.
// TODO(marco): `ip` and `port` are deprecated in favor of `address`;
// remove them both after the deprecation cycle.
info.set_ip(pid.address.ip.in()->s_addr);
info.set_port(pid.address.port);
info.mutable_address()->set_ip(stringify(pid.address.ip));
info.mutable_address()->set_port(pid.address.port);
info.set_pid(pid);
Try<string> hostname = net::getHostname(pid.address.ip);
if (hostname.isSome()) {
// Hostname is deprecated; but we need to update it
// to maintain backward compatibility.
// TODO(marco): Remove once we deprecate it.
info.set_hostname(hostname.get());
info.mutable_address()->set_hostname(hostname.get());
}
foreach (const MasterInfo::Capability& capability,
mesos::internal::master::MASTER_CAPABILITIES()) {
info.add_capabilities()->CopyFrom(capability);
}
return info;
}
Label createLabel(const string& key, const Option<string>& value)
{
Label label;
label.set_key(key);
if (value.isSome()) {
label.set_value(value.get());
}
return label;
}
Labels convertStringMapToLabels(const Map<string, string>& map)
{
Labels labels;
foreach (const auto& entry, map) {
Label* label = labels.mutable_labels()->Add();
label->set_key(entry.first);
label->set_value(entry.second);
}
return labels;
}
Try<Map<string, string>> convertLabelsToStringMap(const Labels& labels)
{
Map<string, string> map;
foreach (const Label& label, labels.labels()) {
if (map.count(label.key())) {
return Error("Repeated key '" + label.key() + "' in labels");
}
if (!label.has_value()) {
return Error("Missing value for key '" + label.key() + "' in labels");
}
map[label.key()] = label.value();
}
return map;
}
void injectAllocationInfo(
Offer::Operation* operation,
const Resource::AllocationInfo& allocationInfo)
{
struct Injector
{
void operator()(
Resource& resource, const Resource::AllocationInfo& allocationInfo)
{
if (!resource.has_allocation_info()) {
resource.mutable_allocation_info()->CopyFrom(allocationInfo);
}
}
void operator()(
RepeatedPtrField<Resource>* resources,
const Resource::AllocationInfo& allocationInfo)
{
foreach (Resource& resource, *resources) {
operator()(resource, allocationInfo);
}
}
} inject;
switch (operation->type()) {
case Offer::Operation::LAUNCH: {
Offer::Operation::Launch* launch = operation->mutable_launch();
foreach (TaskInfo& task, *launch->mutable_task_infos()) {
inject(task.mutable_resources(), allocationInfo);
if (task.has_executor()) {
inject(
task.mutable_executor()->mutable_resources(),
allocationInfo);
}
}
break;
}
case Offer::Operation::LAUNCH_GROUP: {
Offer::Operation::LaunchGroup* launchGroup =
operation->mutable_launch_group();
if (launchGroup->has_executor()) {
inject(
launchGroup->mutable_executor()->mutable_resources(),
allocationInfo);
}
TaskGroupInfo* taskGroup = launchGroup->mutable_task_group();
foreach (TaskInfo& task, *taskGroup->mutable_tasks()) {
inject(task.mutable_resources(), allocationInfo);
if (task.has_executor()) {
inject(
task.mutable_executor()->mutable_resources(),
allocationInfo);
}
}
break;
}
case Offer::Operation::RESERVE: {
inject(
operation->mutable_reserve()->mutable_resources(),
allocationInfo);
break;
}
case Offer::Operation::UNRESERVE: {
inject(
operation->mutable_unreserve()->mutable_resources(),
allocationInfo);
break;
}
case Offer::Operation::CREATE: {
inject(
operation->mutable_create()->mutable_volumes(),
allocationInfo);
break;
}
case Offer::Operation::DESTROY: {
inject(
operation->mutable_destroy()->mutable_volumes(),
allocationInfo);
break;
}
case Offer::Operation::GROW_VOLUME: {
inject(
*operation->mutable_grow_volume()->mutable_volume(),
allocationInfo);
inject(
*operation->mutable_grow_volume()->mutable_addition(),
allocationInfo);
break;
}
case Offer::Operation::SHRINK_VOLUME: {
inject(
*operation->mutable_shrink_volume()->mutable_volume(),
allocationInfo);
break;
}
case Offer::Operation::CREATE_DISK: {
inject(
*operation->mutable_create_disk()->mutable_source(),
allocationInfo);
break;
}
case Offer::Operation::DESTROY_DISK: {
inject(
*operation->mutable_destroy_disk()->mutable_source(),
allocationInfo);
break;
}
case Offer::Operation::UNKNOWN:
break; // No-op.
}
}
void stripAllocationInfo(Offer::Operation* operation)
{
struct Stripper
{
void operator()(Resource& resource)
{
if (resource.has_allocation_info()) {
resource.clear_allocation_info();
}
}
void operator()(RepeatedPtrField<Resource>* resources)
{
foreach (Resource& resource, *resources) {
operator()(resource);
}
}
} strip;
switch (operation->type()) {
case Offer::Operation::LAUNCH: {
Offer::Operation::Launch* launch = operation->mutable_launch();
foreach (TaskInfo& task, *launch->mutable_task_infos()) {
strip(task.mutable_resources());
if (task.has_executor()) {
strip(task.mutable_executor()->mutable_resources());
}
}
break;
}
case Offer::Operation::LAUNCH_GROUP: {
Offer::Operation::LaunchGroup* launchGroup =
operation->mutable_launch_group();
if (launchGroup->has_executor()) {
strip(launchGroup->mutable_executor()->mutable_resources());
}
TaskGroupInfo* taskGroup = launchGroup->mutable_task_group();
foreach (TaskInfo& task, *taskGroup->mutable_tasks()) {
strip(task.mutable_resources());
if (task.has_executor()) {
strip(task.mutable_executor()->mutable_resources());
}
}
break;
}
case Offer::Operation::RESERVE: {
strip(operation->mutable_reserve()->mutable_resources());
break;
}
case Offer::Operation::UNRESERVE: {
strip(operation->mutable_unreserve()->mutable_resources());
break;
}
case Offer::Operation::CREATE: {
strip(operation->mutable_create()->mutable_volumes());
break;
}
case Offer::Operation::DESTROY: {
strip(operation->mutable_destroy()->mutable_volumes());
break;
}
case Offer::Operation::GROW_VOLUME: {
strip(*operation->mutable_grow_volume()->mutable_volume());
strip(*operation->mutable_grow_volume()->mutable_addition());
break;
}
case Offer::Operation::SHRINK_VOLUME: {
strip(*operation->mutable_shrink_volume()->mutable_volume());
break;
}
case Offer::Operation::CREATE_DISK: {
strip(*operation->mutable_create_disk()->mutable_source());
break;
}
case Offer::Operation::DESTROY_DISK: {
strip(*operation->mutable_destroy_disk()->mutable_source());
break;
}
case Offer::Operation::UNKNOWN:
break; // No-op.
}
}
bool isSpeculativeOperation(const Offer::Operation& operation)
{
switch (operation.type()) {
case Offer::Operation::LAUNCH:
case Offer::Operation::LAUNCH_GROUP:
case Offer::Operation::CREATE_DISK:
case Offer::Operation::DESTROY_DISK:
return false;
case Offer::Operation::RESERVE:
case Offer::Operation::UNRESERVE:
case Offer::Operation::CREATE:
case Offer::Operation::DESTROY:
// TODO(zhitao): Convert `GROW_VOLUME` and `SHRINK_VOLUME` to
// non-speculative operations once we can support non-speculative operator
// API.
case Offer::Operation::GROW_VOLUME:
case Offer::Operation::SHRINK_VOLUME:
return true;
case Offer::Operation::UNKNOWN:
UNREACHABLE();
}
UNREACHABLE();
}
RepeatedPtrField<ResourceVersionUUID> createResourceVersions(
const hashmap<Option<ResourceProviderID>, UUID>& resourceVersions)
{
RepeatedPtrField<ResourceVersionUUID> result;
foreachpair (
const Option<ResourceProviderID>& resourceProviderId,
const UUID& uuid,
resourceVersions) {
ResourceVersionUUID* entry = result.Add();
if (resourceProviderId.isSome()) {
entry->mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
}
entry->mutable_uuid()->CopyFrom(uuid);
}
return result;
}
hashmap<Option<ResourceProviderID>, UUID> parseResourceVersions(
const RepeatedPtrField<ResourceVersionUUID>& resourceVersionUUIDs)
{
hashmap<Option<ResourceProviderID>, UUID> result;
foreach (
const ResourceVersionUUID& resourceVersionUUID,
resourceVersionUUIDs) {
const Option<ResourceProviderID> resourceProviderId =
resourceVersionUUID.has_resource_provider_id()
? resourceVersionUUID.resource_provider_id()
: Option<ResourceProviderID>::none();
CHECK(!result.contains(resourceProviderId));
result.insert({std::move(resourceProviderId), resourceVersionUUID.uuid()});
}
return result;
}
TimeInfo getCurrentTime()
{
TimeInfo timeInfo;
timeInfo.set_nanoseconds(process::Clock::now().duration().ns());
return timeInfo;
}
FileInfo createFileInfo(const string& path, const struct stat& s)
{
FileInfo file;
file.set_path(path);
file.set_nlink(s.st_nlink);
file.set_size(s.st_size);
file.mutable_mtime()->set_nanoseconds(Seconds((s.st_mtime)).ns());
file.set_mode(s.st_mode);
// NOTE: `getpwuid` and `getgrgid` return `nullptr` on Windows.
passwd* p = getpwuid(s.st_uid);
if (p != nullptr) {
file.set_uid(p->pw_name);
} else {
file.set_uid(stringify(s.st_uid));
}
struct group* g = getgrgid(s.st_gid);
if (g != nullptr) {
file.set_gid(g->gr_name);
} else {
file.set_gid(stringify(s.st_gid));
}
return file;
}
ContainerID getRootContainerId(const ContainerID& containerId)
{
ContainerID rootContainerId = containerId;
while (rootContainerId.has_parent()) {
// NOTE: Looks like protobuf does not handle copying well when
// nesting message is involved, because the source and the target
// point to the same object. Therefore, we create a temporary
// variable and use an extra copy here.
ContainerID id = rootContainerId.parent();
rootContainerId = id;
}
return rootContainerId;
}
ContainerID parseContainerId(const string& value)
{
vector<string> tokens = strings::split(value, ".");
Option<ContainerID> result;
foreach (const string& token, tokens) {
ContainerID id;
id.set_value(token);
if (result.isSome()) {
id.mutable_parent()->CopyFrom(result.get());
}
result = id;
}
CHECK_SOME(result);
return result.get();
}
Try<Resources> getConsumedResources(const Offer::Operation& operation)
{
switch (operation.type()) {
case Offer::Operation::CREATE_DISK:
return operation.create_disk().source();
case Offer::Operation::DESTROY_DISK:
return operation.destroy_disk().source();
case Offer::Operation::RESERVE:
case Offer::Operation::UNRESERVE:
case Offer::Operation::CREATE:
case Offer::Operation::DESTROY:
case Offer::Operation::GROW_VOLUME:
case Offer::Operation::SHRINK_VOLUME: {
Try<vector<ResourceConversion>> conversions =
getResourceConversions(operation);
if (conversions.isError()) {
return Error(conversions.error());
}
Resources consumed;
foreach (const ResourceConversion& conversion, conversions.get()) {
consumed += conversion.consumed;
}
return consumed;
}
case Offer::Operation::LAUNCH:
case Offer::Operation::LAUNCH_GROUP:
// TODO(bbannier): Consider adding support for 'LAUNCH' and
// 'LAUNCH_GROUP' operations.
case Offer::Operation::UNKNOWN:
return Error("Unsupported operation");
}
UNREACHABLE();
}
namespace slave {
bool operator==(const Capabilities& left, const Capabilities& right)
{
// TODO(bmahler): Use reflection-based equality to avoid breaking
// as new capabilities are added. Note that it needs to be set-based
// equality.
return left.multiRole == right.multiRole &&
left.hierarchicalRole == right.hierarchicalRole &&
left.reservationRefinement == right.reservationRefinement &&
left.resourceProvider == right.resourceProvider &&
left.resizeVolume == right.resizeVolume;
}
bool operator!=(const Capabilities& left, const Capabilities& right)
{
return !(left == right);
}
ostream& operator<<(ostream& stream, const Capabilities& c)
{
set<string> names;
foreach (const SlaveInfo::Capability& capability, c.toRepeatedPtrField()) {
names.insert(SlaveInfo::Capability::Type_Name(capability.type()));
}
return stream << stringify(names);
}
ContainerLimitation createContainerLimitation(
const Resources& resources,
const string& message,
const TaskStatus::Reason& reason)
{
ContainerLimitation limitation;
foreach (Resource resource, resources) {
limitation.add_resources()->CopyFrom(resource);
}
limitation.set_message(message);
limitation.set_reason(reason);
return limitation;
}
ContainerState createContainerState(
const Option<ExecutorInfo>& executorInfo,
const ContainerID& containerId,
pid_t pid,
const string& directory)
{
ContainerState state;
if (executorInfo.isSome()) {
state.mutable_executor_info()->CopyFrom(executorInfo.get());
}
state.mutable_container_id()->CopyFrom(containerId);
state.set_pid(pid);
state.set_directory(directory);
return state;
}
} // namespace slave {
namespace maintenance {
Unavailability createUnavailability(
const process::Time& start,
const Option<Duration>& duration)
{
Unavailability unavailability;
unavailability.mutable_start()->set_nanoseconds(start.duration().ns());
if (duration.isSome()) {
unavailability.mutable_duration()->set_nanoseconds(duration->ns());
}
return unavailability;
}
RepeatedPtrField<MachineID> createMachineList(
std::initializer_list<MachineID> ids)
{
RepeatedPtrField<MachineID> array;
foreach (const MachineID& id, ids) {
array.Add()->CopyFrom(id);
}
return array;
}
mesos::maintenance::Window createWindow(
std::initializer_list<MachineID> ids,
const Unavailability& unavailability)
{
mesos::maintenance::Window window;
window.mutable_unavailability()->CopyFrom(unavailability);
foreach (const MachineID& id, ids) {
window.add_machine_ids()->CopyFrom(id);
}
return window;
}
mesos::maintenance::Schedule createSchedule(
std::initializer_list<mesos::maintenance::Window> windows)
{
mesos::maintenance::Schedule schedule;
foreach (const mesos::maintenance::Window& window, windows) {
schedule.add_windows()->CopyFrom(window);
}
return schedule;
}
} // namespace maintenance {
namespace master {
namespace event {
mesos::master::Event createTaskUpdated(
const Task& task,
const TaskState& state,
const TaskStatus& status)
{
mesos::master::Event event;
event.set_type(mesos::master::Event::TASK_UPDATED);
mesos::master::Event::TaskUpdated* taskUpdated = event.mutable_task_updated();
taskUpdated->mutable_framework_id()->CopyFrom(task.framework_id());
taskUpdated->mutable_status()->CopyFrom(status);
taskUpdated->set_state(state);
return event;
}
mesos::master::Event createTaskAdded(const Task& task)
{
mesos::master::Event event;
event.set_type(mesos::master::Event::TASK_ADDED);
event.mutable_task_added()->mutable_task()->CopyFrom(task);
return event;
}
mesos::master::Event createFrameworkAdded(
const mesos::internal::master::Framework& _framework)
{
mesos::master::Event event;
event.set_type(mesos::master::Event::FRAMEWORK_ADDED);
mesos::master::Response::GetFrameworks::Framework* framework =
event.mutable_framework_added()->mutable_framework();
framework->mutable_framework_info()->CopyFrom(_framework.info);
framework->set_active(_framework.active());
framework->set_connected(_framework.connected());
framework->set_recovered(_framework.recovered());
framework->mutable_registered_time()->set_nanoseconds(
_framework.registeredTime.duration().ns());
framework->mutable_reregistered_time()->set_nanoseconds(
_framework.reregisteredTime.duration().ns());
framework->mutable_unregistered_time()->set_nanoseconds(
_framework.unregisteredTime.duration().ns());
return event;
}
mesos::master::Event createFrameworkUpdated(
const mesos::internal::master::Framework& _framework)
{
mesos::master::Event event;
event.set_type(mesos::master::Event::FRAMEWORK_UPDATED);
mesos::master::Response::GetFrameworks::Framework* framework =
event.mutable_framework_updated()->mutable_framework();
framework->mutable_framework_info()->CopyFrom(_framework.info);
framework->set_active(_framework.active());
framework->set_connected(_framework.connected());
framework->set_recovered(_framework.recovered());
framework->mutable_registered_time()->set_nanoseconds(
_framework.registeredTime.duration().ns());
framework->mutable_reregistered_time()->set_nanoseconds(
_framework.reregisteredTime.duration().ns());
framework->mutable_unregistered_time()->set_nanoseconds(
_framework.unregisteredTime.duration().ns());
return event;
}
mesos::master::Event createFrameworkRemoved(const FrameworkInfo& frameworkInfo)
{
mesos::master::Event event;
event.set_type(mesos::master::Event::FRAMEWORK_REMOVED);
event.mutable_framework_removed()->mutable_framework_info()->CopyFrom(
frameworkInfo);
return event;
}
mesos::master::Response::GetAgents::Agent createAgentResponse(
const mesos::internal::master::Slave& slave,
const Option<Owned<ObjectApprovers>>& approvers)
{
mesos::master::Response::GetAgents::Agent agent;
agent.mutable_agent_info()->CopyFrom(slave.info);
agent.set_pid(string(slave.pid));
agent.set_active(slave.active);
agent.set_version(slave.version);
agent.mutable_registered_time()->set_nanoseconds(
slave.registeredTime.duration().ns());
if (slave.reregisteredTime.isSome()) {
agent.mutable_reregistered_time()->set_nanoseconds(
slave.reregisteredTime->duration().ns());
}
agent.mutable_agent_info()->clear_resources();
foreach (const Resource& resource, slave.info.resources()) {
if (approvers.isNone() || approvers.get()->approved<VIEW_ROLE>(resource)) {
agent.mutable_agent_info()->add_resources()->CopyFrom(resource);
}
}
foreach (Resource resource, slave.totalResources) {
if (approvers.isNone() || approvers.get()->approved<VIEW_ROLE>(resource)) {
convertResourceFormat(&resource, ENDPOINT);
agent.add_total_resources()->CopyFrom(resource);
}
}
foreach (Resource resource, Resources::sum(slave.usedResources)) {
if (approvers.isNone() || approvers.get()->approved<VIEW_ROLE>(resource)) {
convertResourceFormat(&resource, ENDPOINT);
agent.add_allocated_resources()->CopyFrom(resource);
}
}
foreach (Resource resource, slave.offeredResources) {
if (approvers.isNone() || approvers.get()->approved<VIEW_ROLE>(resource)) {
convertResourceFormat(&resource, ENDPOINT);
agent.add_offered_resources()->CopyFrom(resource);
}
}
agent.mutable_capabilities()->CopyFrom(
slave.capabilities.toRepeatedPtrField());
foreachvalue (
const mesos::internal::master::Slave::ResourceProvider& resourceProvider,
slave.resourceProviders) {
mesos::master::Response::GetAgents::Agent::ResourceProvider* provider =
agent.add_resource_providers();
provider->mutable_resource_provider_info()->CopyFrom(resourceProvider.info);
provider->mutable_total_resources()->CopyFrom(
resourceProvider.totalResources);
}
return agent;
}
mesos::master::Event createAgentAdded(
const mesos::internal::master::Slave& slave)
{
mesos::master::Event event;
event.set_type(mesos::master::Event::AGENT_ADDED);
event.mutable_agent_added()->mutable_agent()->CopyFrom(
createAgentResponse(slave));
return event;
}
mesos::master::Event createAgentRemoved(const SlaveID& slaveId)
{
mesos::master::Event event;
event.set_type(mesos::master::Event::AGENT_REMOVED);
event.mutable_agent_removed()->mutable_agent_id()->CopyFrom(
slaveId);
return event;
}
} // namespace event {
} // namespace master {
namespace framework {
set<string> getRoles(const FrameworkInfo& frameworkInfo)
{
if (protobuf::frameworkHasCapability(
frameworkInfo,
FrameworkInfo::Capability::MULTI_ROLE)) {
return set<string>(
frameworkInfo.roles().begin(),
frameworkInfo.roles().end());
} else {
return {frameworkInfo.role()};
}
}
} // namespace framework {
} // namespace protobuf {
} // namespace internal {
} // namespace mesos {