blob: 4f4711d54c471922f1a103310d4d360e41a99870 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <mesos/slave/isolator.hpp>
#include <mesos/type_utils.hpp>
#include <process/clock.hpp>
#include <process/pid.hpp>
#include <stout/adaptor.hpp>
#include <stout/foreach.hpp>
#include <stout/net.hpp>
#include <stout/stringify.hpp>
#include <stout/uuid.hpp>
#include "common/protobuf_utils.hpp"
#include "messages/messages.hpp"
using std::string;
using google::protobuf::RepeatedPtrField;
using mesos::slave::ContainerLimitation;
using mesos::slave::ContainerState;
using process::UPID;
namespace mesos {
namespace internal {
namespace protobuf {
bool isTerminalState(const TaskState& state)
{
return (state == TASK_FINISHED ||
state == TASK_FAILED ||
state == TASK_KILLED ||
state == TASK_LOST ||
state == TASK_ERROR);
}
StatusUpdate createStatusUpdate(
const FrameworkID& frameworkId,
const Option<SlaveID>& slaveId,
const TaskID& taskId,
const TaskState& state,
const TaskStatus::Source& source,
const Option<UUID>& uuid,
const string& message,
const Option<TaskStatus::Reason>& reason,
const Option<ExecutorID>& executorId,
const Option<bool>& healthy,
const Option<Labels>& labels,
const Option<ContainerStatus>& containerStatus)
{
StatusUpdate update;
update.set_timestamp(process::Clock::now().secs());
update.mutable_framework_id()->MergeFrom(frameworkId);
if (slaveId.isSome()) {
update.mutable_slave_id()->MergeFrom(slaveId.get());
}
if (executorId.isSome()) {
update.mutable_executor_id()->MergeFrom(executorId.get());
}
TaskStatus* status = update.mutable_status();
status->mutable_task_id()->MergeFrom(taskId);
if (slaveId.isSome()) {
status->mutable_slave_id()->MergeFrom(slaveId.get());
}
status->set_state(state);
status->set_source(source);
status->set_message(message);
status->set_timestamp(update.timestamp());
if (uuid.isSome()) {
update.set_uuid(uuid.get().toBytes());
status->set_uuid(uuid.get().toBytes());
}
if (reason.isSome()) {
status->set_reason(reason.get());
}
if (healthy.isSome()) {
status->set_healthy(healthy.get());
}
if (labels.isSome()) {
status->mutable_labels()->CopyFrom(labels.get());
}
if (containerStatus.isSome()) {
status->mutable_container_status()->CopyFrom(containerStatus.get());
}
return update;
}
StatusUpdate createStatusUpdate(
const FrameworkID& frameworkId,
const TaskStatus& status,
const Option<SlaveID>& slaveId)
{
StatusUpdate update;
update.mutable_framework_id()->MergeFrom(frameworkId);
if (status.has_executor_id()) {
update.mutable_executor_id()->MergeFrom(status.executor_id());
}
if (slaveId.isSome()) {
update.mutable_slave_id()->MergeFrom(slaveId.get());
}
update.mutable_status()->MergeFrom(status);
if (!status.has_timestamp()) {
update.set_timestamp(process::Clock::now().secs());
} else {
update.set_timestamp(status.timestamp());
}
if (status.has_uuid()) {
update.set_uuid(status.uuid());
}
return update;
}
Task createTask(
const TaskInfo& task,
const TaskState& state,
const FrameworkID& frameworkId)
{
Task t;
t.mutable_framework_id()->CopyFrom(frameworkId);
t.set_state(state);
t.set_name(task.name());
t.mutable_task_id()->CopyFrom(task.task_id());
t.mutable_slave_id()->CopyFrom(task.slave_id());
t.mutable_resources()->CopyFrom(task.resources());
if (task.has_executor()) {
t.mutable_executor_id()->CopyFrom(task.executor().executor_id());
}
if (task.has_labels()) {
t.mutable_labels()->CopyFrom(task.labels());
}
if (task.has_discovery()) {
t.mutable_discovery()->CopyFrom(task.discovery());
}
if (task.has_container()) {
t.mutable_container()->CopyFrom(task.container());
}
return t;
}
Option<bool> getTaskHealth(const Task& task)
{
Option<bool> healthy = None();
if (task.statuses_size() > 0) {
// The statuses list only keeps the most recent TaskStatus for
// each state, and appends later states at the end. Thus the last
// status is either a terminal state (where health is
// irrelevant), or the latest RUNNING status.
TaskStatus lastStatus = task.statuses(task.statuses_size() - 1);
if (lastStatus.has_healthy()) {
healthy = lastStatus.healthy();
}
}
return healthy;
}
Option<ContainerStatus> getTaskContainerStatus(const Task& task)
{
// The statuses list only keeps the most recent TaskStatus for
// each state, and appends later states at the end. Let's find
// the most recent TaskStatus with a valid container_status.
foreach (const TaskStatus& status, adaptor::reverse(task.statuses())) {
if (status.has_container_status()) {
return status.container_status();
}
}
return None();
}
/**
* Creates a MasterInfo protobuf from the process's UPID.
*
* This is only used by the `StandaloneMasterDetector` (used in tests
* and outside tests when ZK is not used).
*
* For example, when we start a slave with
* `--master=master@127.0.0.1:5050`, since the slave (and consequently
* its detector) doesn't have enough information about `MasterInfo`, it
* tries to construct it based on the only available information
* (`UPID`).
*
* @param pid The process's assigned untyped PID.
* @return A fully formed `MasterInfo` with the IP/hostname information
* as derived from the `UPID`.
*/
MasterInfo createMasterInfo(const UPID& pid)
{
MasterInfo info;
info.set_id(stringify(pid) + "-" + UUID::random().toString());
// NOTE: Currently, we store the ip in network order, which should
// be fixed. See MESOS-1201 for more details.
// TODO(marco): `ip` and `port` are deprecated in favor of `address`;
// remove them both after the deprecation cycle.
info.set_ip(pid.address.ip.in().get().s_addr);
info.set_port(pid.address.port);
info.mutable_address()->set_ip(stringify(pid.address.ip));
info.mutable_address()->set_port(pid.address.port);
info.set_pid(pid);
Try<string> hostname = net::getHostname(pid.address.ip);
if (hostname.isSome()) {
// Hostname is deprecated; but we need to update it
// to maintain backward compatibility.
// TODO(marco): Remove once we deprecate it.
info.set_hostname(hostname.get());
info.mutable_address()->set_hostname(hostname.get());
}
return info;
}
Label createLabel(const string& key, const Option<string>& value)
{
Label label;
label.set_key(key);
if (value.isSome()) {
label.set_value(value.get());
}
return label;
}
TimeInfo getCurrentTime()
{
TimeInfo timeInfo;
timeInfo.set_nanoseconds(process::Clock::now().duration().ns());
return timeInfo;
}
namespace slave {
ContainerLimitation createContainerLimitation(
const Resources& resources,
const string& message,
const TaskStatus::Reason& reason)
{
ContainerLimitation limitation;
foreach (Resource resource, resources) {
limitation.add_resources()->CopyFrom(resource);
}
limitation.set_message(message);
limitation.set_reason(reason);
return limitation;
}
ContainerState createContainerState(
const ExecutorInfo& executorInfo,
const ContainerID& container_id,
pid_t pid,
const string& directory)
{
ContainerState state;
state.mutable_executor_info()->CopyFrom(executorInfo);
state.mutable_container_id()->CopyFrom(container_id);
state.set_pid(pid);
state.set_directory(directory);
return state;
}
} // namespace slave {
namespace maintenance {
Unavailability createUnavailability(
const process::Time& start,
const Option<Duration>& duration)
{
Unavailability unavailability;
unavailability.mutable_start()->set_nanoseconds(start.duration().ns());
if (duration.isSome()) {
unavailability.mutable_duration()->set_nanoseconds(duration.get().ns());
}
return unavailability;
}
RepeatedPtrField<MachineID> createMachineList(
std::initializer_list<MachineID> ids)
{
RepeatedPtrField<MachineID> array;
foreach (const MachineID& id, ids) {
array.Add()->CopyFrom(id);
}
return array;
}
mesos::maintenance::Window createWindow(
std::initializer_list<MachineID> ids,
const Unavailability& unavailability)
{
mesos::maintenance::Window window;
window.mutable_unavailability()->CopyFrom(unavailability);
foreach (const MachineID& id, ids) {
window.add_machine_ids()->CopyFrom(id);
}
return window;
}
mesos::maintenance::Schedule createSchedule(
std::initializer_list<mesos::maintenance::Window> windows)
{
mesos::maintenance::Schedule schedule;
foreach (const mesos::maintenance::Window& window, windows) {
schedule.add_windows()->CopyFrom(window);
}
return schedule;
}
} // namespace maintenance {
} // namespace protobuf {
} // namespace internal {
} // namespace mesos {