blob: c436332fbd9744c16a4e6c44b37c2fb42f6389e1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <iostream>
#include <queue>
#include <vector>
#include <mesos/type_utils.hpp>
#include <mesos/v1/mesos.hpp>
#include <mesos/v1/resources.hpp>
#include <mesos/v1/scheduler.hpp>
#include <process/delay.hpp>
#include <process/future.hpp>
#include <process/owned.hpp>
#include <process/protobuf.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/flags.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/stringify.hpp>
#include "common/parse.hpp"
#include "common/protobuf_utils.hpp"
#include "hdfs/hdfs.hpp"
#include "internal/devolve.hpp"
#include "logging/logging.hpp"
#include "v1/parse.hpp"
using std::cerr;
using std::cout;
using std::endl;
using std::queue;
using std::string;
using std::vector;
using google::protobuf::RepeatedPtrField;
using mesos::internal::devolve;
using mesos::v1::AgentID;
using mesos::v1::CapabilityInfo;
using mesos::v1::CheckInfo;
using mesos::v1::CheckStatusInfo;
using mesos::v1::CommandInfo;
using mesos::v1::ContainerInfo;
using mesos::v1::Credential;
using mesos::v1::Environment;
using mesos::v1::ExecutorInfo;
using mesos::v1::FrameworkID;
using mesos::v1::FrameworkInfo;
using mesos::v1::Image;
using mesos::v1::Label;
using mesos::v1::Labels;
using mesos::v1::Offer;
using mesos::v1::Resource;
using mesos::v1::Resources;
using mesos::v1::RLimitInfo;
using mesos::v1::TaskGroupInfo;
using mesos::v1::TaskID;
using mesos::v1::TaskInfo;
using mesos::v1::TaskState;
using mesos::v1::TaskStatus;
using mesos::v1::Volume;
using mesos::v1::scheduler::Call;
using mesos::v1::scheduler::Event;
using mesos::v1::scheduler::Mesos;
using process::Future;
using process::Owned;
class Flags : public virtual flags::FlagsBase
{
public:
Flags()
{
add(&Flags::master,
"master",
"Mesos master (e.g., IP:PORT).");
add(&Flags::task,
"task",
"The value could be a JSON-formatted string of `TaskInfo` or a\n"
"file path containing the JSON-formatted `TaskInfo`. Path must\n"
"be of the form `file:///path/to/file` or `/path/to/file`.\n"
"\n"
"See the `TaskInfo` message in `mesos.proto` for the expected\n"
"format. NOTE: `agent_id` need not to be set.\n"
"\n"
"Example:\n"
"{\n"
" \"name\": \"Name of the task\",\n"
" \"task_id\": {\"value\" : \"Id of the task\"},\n"
" \"agent_id\": {\"value\" : \"\"},\n"
" \"resources\": [\n"
" {\n"
" \"name\": \"cpus\",\n"
" \"type\": \"SCALAR\",\n"
" \"scalar\": {\n"
" \"value\": 0.1\n"
" }\n"
" },\n"
" {\n"
" \"name\": \"mem\",\n"
" \"type\": \"SCALAR\",\n"
" \"scalar\": {\n"
" \"value\": 32\n"
" }\n"
" }\n"
" ],\n"
" \"command\": {\n"
" \"value\": \"sleep 1000\"\n"
" }\n"
"}");
add(&Flags::task_group,
"task_group",
"The value could be a JSON-formatted string of `TaskGroupInfo` or a\n"
"file path containing the JSON-formatted `TaskGroupInfo`. Path must\n"
"be of the form `file:///path/to/file` or `/path/to/file`.\n"
"\n"
"See the `TaskGroupInfo` message in `mesos.proto` for the expected\n"
"format. NOTE: `agent_id` need not to be set.\n"
"\n"
"Example:\n"
"{\n"
" \"tasks\":\n"
" [\n"
" {\n"
" \"name\": \"Name of the task\",\n"
" \"task_id\": {\"value\" : \"Id of the task\"},\n"
" \"agent_id\": {\"value\" : \"\"},\n"
" \"resources\": [{\n"
" \"name\": \"cpus\",\n"
" \"type\": \"SCALAR\",\n"
" \"scalar\": {\n"
" \"value\": 0.1\n"
" }\n"
" },\n"
" {\n"
" \"name\": \"mem\",\n"
" \"type\": \"SCALAR\",\n"
" \"scalar\": {\n"
" \"value\": 32\n"
" }\n"
" }],\n"
" \"command\": {\n"
" \"value\": \"sleep 1000\"\n"
" }\n"
" }\n"
" ]\n"
"}");
add(&Flags::name,
"name",
"Name for the command.");
add(&Flags::shell,
"shell",
"Determine the command is a shell or not. If not, 'command' will be\n"
"treated as executable value and arguments (TODO).",
true);
// TODO(alexr): Once MESOS-4882 lands, elaborate on what `command` can
// mean: an executable, a shell command, an entrypoint for a container.
add(&Flags::command,
"command",
"Command to launch.");
add(&Flags::environment,
"env",
"Shell command environment variables.\n"
"The value could be a JSON formatted string of environment variables\n"
"(i.e., {\"name1\": \"value1\"}) or a file path containing the JSON\n"
"formatted environment variables. Path should be of the form\n"
"'file:///path/to/file'.");
add(&Flags::resources,
"resources",
"Resources for the command.",
"cpus:1;mem:128");
add(&Flags::hadoop,
"hadoop",
"Path to 'hadoop' script (used for copying packages).",
"hadoop");
add(&Flags::hdfs,
"hdfs",
"The ip:port of the NameNode service.",
"localhost:9000");
add(&Flags::package,
"package",
"Package to upload into HDFS and copy into command's\n"
"working directory (requires 'hadoop', see --hadoop).");
add(&Flags::overwrite,
"overwrite",
"Overwrite the package in HDFS if it already exists.",
false);
add(&Flags::checkpoint,
"checkpoint",
"Enable checkpointing for the framework.",
false);
add(&Flags::appc_image,
"appc_image",
"Appc image name that follows the Appc spec\n"
"(e.g., ubuntu, example.com/reduce-worker).");
add(&Flags::docker_image,
"docker_image",
"Docker image that follows the Docker CLI naming <image>:<tag>\n"
"(i.e., ubuntu, busybox:latest).");
add(&Flags::framework_capabilities,
"framework_capabilities",
"Comma-separated list of optional framework capabilities to enable.\n"
"RESERVATION_REFINEMENT and TASK_KILLING_STATE are always enabled.\n"
"PARTITION_AWARE is enabled unless --no-partition-aware is specified.");
add(&Flags::containerizer,
"containerizer",
"Containerizer to be used (i.e., docker, mesos).",
"mesos");
add(&Flags::effective_capabilities,
"effective_capabilities",
"JSON representation of effective system capabilities that should be\n"
"granted to the command.\n"
"\n"
"Example:\n"
"{\n"
" \"capabilities\": [\n"
" \"NET_RAW\",\n"
" \"SYS_ADMIN\"\n"
" ]\n"
"}");
add(&Flags::bounding_capabilities,
"bounding_capabilities",
"JSON representation of system capabilities bounding set that should\n"
"be applied to the command.\n"
"\n"
"Example:\n"
"{\n"
" \"capabilities\": [\n"
" \"NET_RAW\",\n"
" \"SYS_ADMIN\"\n"
" ]\n"
"}");
add(&Flags::rlimits,
"rlimits",
"JSON representation of resource limits for the command. For\n"
"example, the following sets the limit for CPU time to be one\n"
"second, and the size of created files to be unlimited:\n"
"{\n"
" \"rlimits\": [\n"
" {\n"
" \"type\":\"RLMT_CPU\",\n"
" \"soft\":\"1\",\n"
" \"hard\":\"1\"\n"
" },\n"
" {\n"
" \"type\":\"RLMT_FSIZE\"\n"
" }\n"
" ]\n"
"}");
add(&Flags::role,
"role",
"Role to use when registering.",
"*");
add(&Flags::kill_after,
"kill_after",
"Specifies a delay after which the task is killed\n"
"(e.g., 10secs, 2mins, etc).");
add(&Flags::networks,
"networks",
"Comma-separated list of networks that the container will join,\n"
"e.g., `net1,net2`.");
add(&Flags::principal,
"principal",
"The principal to use for framework authentication.");
add(&Flags::secret,
"secret",
"The secret to use for framework authentication.");
add(&Flags::volumes,
"volumes",
"The value could be a JSON-formatted string of volumes or a\n"
"file path containing the JSON-formatted volumes. Path must\n"
"be of the form `file:///path/to/file` or `/path/to/file`.\n"
"\n"
"See the `Volume` message in `mesos.proto` for the expected format.\n"
"\n"
"Example:\n"
"[\n"
" {\n"
" \"container_path\":\"/path/to/container\",\n"
" \"mode\":\"RW\",\n"
" \"source\":\n"
" {\n"
" \"docker_volume\":\n"
" {\n"
" \"driver\": \"volume_driver\",\n"
" \"driver_options\":\n"
" {\"parameter\":[\n"
" {\n"
" \"key\": \"key\",\n"
" \"value\": \"value\"\n"
" }\n"
" ]},\n"
" \"name\": \"volume_name\"\n"
" },\n"
" \"type\": \"DOCKER_VOLUME\"\n"
" }\n"
" }\n"
"]");
add(&Flags::content_type,
"content_type",
"The content type to use for scheduler protocol messages. 'json'\n"
"and 'protobuf' are valid choices.",
"protobuf");
add(&Flags::tty,
"tty",
"Attach a TTY to the task being launched",
false);
add(&Flags::partition_aware,
"partition_aware",
"Enable partition-awareness for the framework.",
true);
}
string master;
Option<string> name;
Option<TaskInfo> task;
Option<TaskGroupInfo> task_group;
bool shell;
Option<string> command;
Option<hashmap<string, string>> environment;
string resources;
string hadoop;
string hdfs;
Option<string> package;
bool overwrite;
bool checkpoint;
Option<string> appc_image;
Option<string> docker_image;
Option<std::set<string>> framework_capabilities;
Option<JSON::Array> volumes;
string containerizer;
Option<CapabilityInfo> effective_capabilities;
Option<CapabilityInfo> bounding_capabilities;
Option<RLimitInfo> rlimits;
string role;
Option<Duration> kill_after;
Option<string> networks;
Option<string> principal;
Option<string> secret;
string content_type;
bool partition_aware;
bool tty;
};
class CommandScheduler : public process::Process<CommandScheduler>
{
public:
CommandScheduler(
const FrameworkInfo& _frameworkInfo,
const string& _master,
mesos::ContentType _contentType,
const Option<Duration>& _killAfter,
const Option<Credential>& _credential,
const Option<TaskInfo>& _task,
const Option<TaskGroupInfo>& _taskGroup,
const Option<string>& _networks)
: state(DISCONNECTED),
frameworkInfo(_frameworkInfo),
master(_master),
contentType(_contentType),
killAfter(_killAfter),
credential(_credential),
task(_task),
taskGroup(_taskGroup),
networks(_networks),
launched(false),
terminatedTaskCount(0) {}
~CommandScheduler() override {}
protected:
void initialize() override
{
// We initialize the library here to ensure that callbacks are only invoked
// after the process has spawned.
mesos.reset(new Mesos(
master,
contentType,
process::defer(self(), &Self::connected),
process::defer(self(), &Self::disconnected),
process::defer(self(), &Self::received, lambda::_1),
credential));
}
void connected()
{
state = CONNECTED;
doReliableRegistration();
}
void disconnected()
{
state = DISCONNECTED;
}
void doReliableRegistration()
{
if (state == SUBSCRIBED || state == DISCONNECTED) {
return;
}
Call call;
call.set_type(Call::SUBSCRIBE);
if (frameworkInfo.has_id()) {
call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
}
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
mesos->send(call);
process::delay(Seconds(1), self(), &Self::doReliableRegistration);
}
void killTask(const TaskID& taskId, const AgentID& agentId)
{
cout << "Asked to kill task '" << taskId
<< "' on agent '" << agentId << "'" << endl;
Call call;
call.set_type(Call::KILL);
CHECK(frameworkInfo.has_id());
call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
Call::Kill* kill = call.mutable_kill();
kill->mutable_task_id()->CopyFrom(taskId);
kill->mutable_agent_id()->CopyFrom(agentId);
mesos->send(call);
}
void offers(const vector<Offer>& offers)
{
CHECK_EQ(SUBSCRIBED, state);
foreach (const Offer& offer, offers) {
// Strip the allocation from the offer since we use a single role.
Resources offered = offer.resources();
offered.unallocate();
Resources requiredResources;
CHECK_NE(task.isSome(), taskGroup.isSome())
<< "Either task or task group should be set but not both";
if (task.isSome()) {
requiredResources = Resources(task->resources());
} else {
foreach (const TaskInfo& _task, taskGroup->tasks()) {
requiredResources += Resources(_task.resources());
}
}
if (!launched && offered.toUnreserved().contains(requiredResources)) {
TaskInfo _task;
TaskGroupInfo _taskGroup;
if (task.isSome()) {
_task = task.get();
_task.mutable_agent_id()->MergeFrom(offer.agent_id());
// Takes resources first from the specified role, then from '*'.
Option<Resources> resources = [&]() {
if (frameworkInfo.role() == "*") {
return offered.find(Resources(_task.resources()));
} else {
Resource::ReservationInfo reservation;
reservation.set_type(Resource::ReservationInfo::STATIC);
reservation.set_role(frameworkInfo.role());
return offered.find(
Resources(_task.resources()).pushReservation(reservation));
}
}();
CHECK_SOME(resources);
_task.mutable_resources()->CopyFrom(resources.get());
} else {
foreach (TaskInfo _task, taskGroup->tasks()) {
_task.mutable_agent_id()->MergeFrom(offer.agent_id());
// Takes resources first from the specified role, then from '*'.
Option<Resources> resources = [&]() {
if (frameworkInfo.role() == "*") {
return offered.find(Resources(_task.resources()));
} else {
Resource::ReservationInfo reservation;
reservation.set_type(Resource::ReservationInfo::STATIC);
reservation.set_role(frameworkInfo.role());
return offered.find(
Resources(_task.resources()).pushReservation(reservation));
}
}();
CHECK_SOME(resources);
_task.mutable_resources()->CopyFrom(resources.get());
_taskGroup.add_tasks()->CopyFrom(_task);
}
}
Call call;
call.set_type(Call::ACCEPT);
CHECK(frameworkInfo.has_id());
call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
Offer::Operation* operation = accept->add_operations();
if (task.isSome()) {
operation->set_type(Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(_task);
} else {
operation->set_type(Offer::Operation::LAUNCH_GROUP);
ExecutorInfo* executorInfo =
operation->mutable_launch_group()->mutable_executor();
executorInfo->set_type(ExecutorInfo::DEFAULT);
executorInfo->mutable_executor_id()->set_value(
"default-executor");
executorInfo->mutable_framework_id()->CopyFrom(frameworkInfo.id());
executorInfo->mutable_resources()->CopyFrom(
Resources::parse("cpus:0.1;mem:32;disk:32").get());
// Setup any CNI networks that the `task_group` needs to
// join, in case the `--networks` flag was specified.
if (networks.isSome() && !networks->empty()) {
ContainerInfo* containerInfo = executorInfo->mutable_container();
containerInfo->set_type(ContainerInfo::MESOS);
foreach (const string& network,
strings::tokenize(networks.get(), ",")) {
containerInfo->add_network_infos()->set_name(network);
}
}
operation->mutable_launch_group()->mutable_task_group()->CopyFrom(
_taskGroup);
}
mesos->send(call);
if (task.isSome()) {
cout << "Submitted task '" << task->name() << "' to agent '"
<< offer.agent_id() << "'" << endl;
} else {
vector<TaskID> taskIds;
foreach (const TaskInfo& _task, taskGroup->tasks()) {
taskIds.push_back(_task.task_id());
}
cout << "Submitted task group with tasks "<< taskIds
<< " to agent '" << offer.agent_id() << "'" << endl;
}
launched = true;
} else {
Call call;
call.set_type(Call::DECLINE);
CHECK(frameworkInfo.has_id());
call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
Call::Decline* decline = call.mutable_decline();
decline->add_offer_ids()->CopyFrom(offer.id());
mesos->send(call);
call.Clear();
call.set_type(Call::SUPPRESS);
call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
mesos->send(call);
}
}
}
void received(queue<Event> events)
{
while (!events.empty()) {
Event event = events.front();
events.pop();
switch (event.type()) {
case Event::SUBSCRIBED: {
frameworkInfo.mutable_id()->
CopyFrom(event.subscribed().framework_id());
state = SUBSCRIBED;
cout << "Subscribed with ID " << frameworkInfo.id() << endl;
break;
}
case Event::OFFERS: {
offers(google::protobuf::convert(event.offers().offers()));
break;
}
case Event::UPDATE: {
update(event.update().status());
break;
}
case Event::ERROR: {
EXIT(EXIT_FAILURE)
<< "Received an ERROR event: " << event.error().message();
break;
}
case Event::HEARTBEAT:
case Event::INVERSE_OFFERS:
case Event::FAILURE:
case Event::RESCIND:
case Event::RESCIND_INVERSE_OFFER:
case Event::UPDATE_OPERATION_STATUS:
case Event::MESSAGE: {
break;
}
case Event::UNKNOWN: {
LOG(WARNING) << "Received an UNKNOWN event and ignored";
break;
}
}
}
}
void update(const TaskStatus& status)
{
CHECK_EQ(SUBSCRIBED, state);
cout << "Received status update " << status.state()
<< " for task '" << status.task_id() << "'" << endl;
if (status.has_message()) {
cout << " message: '" << status.message() << "'" << endl;
}
if (status.has_source()) {
cout << " source: " << TaskStatus::Source_Name(status.source()) << endl;
}
if (status.has_reason()) {
cout << " reason: " << TaskStatus::Reason_Name(status.reason()) << endl;
}
if (status.has_healthy()) {
cout << " healthy?: " << status.healthy() << endl;
}
if (status.has_check_status()) {
cout << " check status: " << status.check_status() << endl;
}
if (status.has_limitation() && !status.limitation().resources().empty()) {
cout << " resource limit violation: "
<< status.limitation().resources() << endl;
}
if (status.has_uuid()) {
Call call;
call.set_type(Call::ACKNOWLEDGE);
CHECK(frameworkInfo.has_id());
call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
Call::Acknowledge* acknowledge = call.mutable_acknowledge();
acknowledge->mutable_agent_id()->CopyFrom(status.agent_id());
acknowledge->mutable_task_id()->CopyFrom(status.task_id());
acknowledge->set_uuid(status.uuid());
mesos->send(call);
}
// If a task kill delay has been specified, schedule task kill.
if (killAfter.isSome() && TaskState::TASK_RUNNING == status.state()) {
delay(killAfter.get(),
self(),
&Self::killTask,
status.task_id(),
status.agent_id());
}
if (mesos::internal::protobuf::isTerminalState(devolve(status).state())) {
CHECK_NE(task.isSome(), taskGroup.isSome())
<< "Either task or task group should be set but not both";
if (task.isSome()) {
terminate(self());
} else {
terminatedTaskCount++;
if (terminatedTaskCount == taskGroup->tasks().size()) {
terminate(self());
}
}
}
}
private:
enum State
{
DISCONNECTED,
CONNECTED,
SUBSCRIBED
} state;
FrameworkInfo frameworkInfo;
const string master;
mesos::ContentType contentType;
const Option<Duration> killAfter;
const Option<Credential> credential;
const Option<TaskInfo> task;
const Option<TaskGroupInfo> taskGroup;
const Option<string> networks;
bool launched;
int terminatedTaskCount;
Owned<Mesos> mesos;
};
// TODO(jojy): Consider breaking down the method for each 'containerizer'.
static Result<ContainerInfo> getContainerInfo(
const string& containerizer,
const Option<vector<Volume>>& volumes,
const Option<string>& networks,
const Option<string>& appcImage,
const Option<string>& dockerImage,
const Option<CapabilityInfo>& effective_capabilities,
const Option<CapabilityInfo>& bounding_capabilities,
const Option<RLimitInfo>& rlimits,
const bool tty)
{
if (containerizer.empty()) {
return None();
}
ContainerInfo containerInfo;
if (volumes.isSome()) {
foreach (const Volume& volume, volumes.get()) {
containerInfo.add_volumes()->CopyFrom(volume);
}
}
// Mesos containerizer supports 'appc' and 'docker' images.
if (containerizer == "mesos") {
if (!tty &&
appcImage.isNone() &&
dockerImage.isNone() &&
effective_capabilities.isNone() &&
bounding_capabilities.isNone() &&
rlimits.isNone() &&
(networks.isNone() || networks->empty()) &&
(volumes.isNone() || volumes->empty())) {
return None();
}
containerInfo.set_type(ContainerInfo::MESOS);
if (dockerImage.isSome()) {
Image* image = containerInfo.mutable_mesos()->mutable_image();
image->set_type(Image::DOCKER);
image->mutable_docker()->set_name(dockerImage.get());
} else if (appcImage.isSome()) {
Image::Appc appc;
appc.set_name(appcImage.get());
// TODO(jojy): Labels are hard coded right now. Consider
// adding label flags for customization.
Label arch;
arch.set_key("arch");
arch.set_value("amd64");
Label os;
os.set_key("os");
os.set_value("linux");
Labels labels;
labels.add_labels()->CopyFrom(os);
labels.add_labels()->CopyFrom(arch);
appc.mutable_labels()->CopyFrom(labels);
Image* image = containerInfo.mutable_mesos()->mutable_image();
image->set_type(Image::APPC);
image->mutable_appc()->CopyFrom(appc);
}
if (networks.isSome() && !networks->empty()) {
foreach (const string& network,
strings::tokenize(networks.get(), ",")) {
containerInfo.add_network_infos()->set_name(network);
}
}
if (effective_capabilities.isSome()) {
containerInfo
.mutable_linux_info()
->mutable_effective_capabilities()
->CopyFrom(effective_capabilities.get());
}
if (bounding_capabilities.isSome()) {
containerInfo
.mutable_linux_info()
->mutable_bounding_capabilities()
->CopyFrom(bounding_capabilities.get());
}
if (rlimits.isSome()) {
containerInfo.mutable_rlimit_info()->CopyFrom(rlimits.get());
}
if (tty) {
containerInfo.mutable_tty_info();
}
return containerInfo;
} else if (containerizer == "docker") {
// 'docker' containerizer only supports 'docker' images.
if (dockerImage.isNone()) {
return Error("'Docker' containerizer requires docker image name");
}
containerInfo.set_type(ContainerInfo::DOCKER);
containerInfo.mutable_docker()->set_image(dockerImage.get());
if (networks.isSome() && !networks->empty()) {
vector<string> tokens = strings::tokenize(networks.get(), ",");
if (tokens.size() > 1) {
EXIT(EXIT_FAILURE)
<< "'Docker' containerizer can only support a single network";
} else {
containerInfo.mutable_docker()->set_network(
ContainerInfo::DockerInfo::USER);
containerInfo.add_network_infos()->set_name(tokens.front());
}
}
if (tty) {
return Error("'Docker' containerizer does not allow attaching a TTY");
}
return containerInfo;
}
return Error("Unsupported containerizer: " + containerizer);
}
int main(int argc, char** argv)
{
Flags flags;
mesos::ContentType contentType = mesos::ContentType::PROTOBUF;
// Load flags from command line only.
Try<flags::Warnings> load = flags.load(None(), argc, argv);
// TODO(marco): this should be encapsulated entirely into the
// FlagsBase API - possibly with a 'guard' that prevents FlagsBase
// from calling ::exit(EXIT_FAILURE) after calling usage() (which
// would be the default behavior); see MESOS-2766.
if (flags.help) {
cout << flags.usage() << endl;
return EXIT_SUCCESS;
}
if (load.isError()) {
cerr << flags.usage(load.error()) << endl;
return EXIT_FAILURE;
}
mesos::internal::logging::initialize(argv[0], false);
// Log any flag warnings.
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
if (flags.task.isSome() && flags.task_group.isSome()) {
EXIT(EXIT_FAILURE) << flags.usage(
"Either task or task group should be set but not both."
" Provide either '--task' OR '--task_group'");
} else if (flags.task.isNone() && flags.task_group.isNone()) {
if (flags.name.isNone()) {
EXIT(EXIT_FAILURE) << flags.usage("Missing required option --name");
}
if (flags.shell && flags.command.isNone()) {
EXIT(EXIT_FAILURE) << flags.usage("Missing required option --command");
}
} else {
// Either --task or --task_group is set.
if (flags.name.isSome() ||
flags.command.isSome() ||
flags.environment.isSome() ||
flags.appc_image.isSome() ||
flags.docker_image.isSome() ||
flags.volumes.isSome()) {
EXIT(EXIT_FAILURE) << flags.usage(
"'--name, --command, --env, --appc_image, --docker_image,"
" --volumes' can only be set when both '--task'"
" and '--task_group' are not set");
}
if (flags.task.isSome() && flags.networks.isSome()) {
EXIT(EXIT_FAILURE) << flags.usage(
"'--networks' can only be set when"
" '--task' is not set");
}
}
if (flags.content_type == "json" ||
flags.content_type == mesos::APPLICATION_JSON) {
contentType = mesos::ContentType::JSON;
} else if (flags.content_type == "protobuf" ||
flags.content_type == mesos::APPLICATION_PROTOBUF) {
contentType = mesos::ContentType::PROTOBUF;
} else {
EXIT(EXIT_FAILURE) << "Invalid content type '" << flags.content_type << "'";
}
Result<string> user = os::user();
if (!user.isSome()) {
if (user.isError()) {
EXIT(EXIT_FAILURE) << "Failed to get username: " << user.error();
} else {
#ifndef __WINDOWS__
EXIT(EXIT_FAILURE) << "No username for uid " << ::getuid();
#else
// NOTE: The `::getuid()` function does not exist on Windows.
EXIT(EXIT_FAILURE) << "No username for current user";
#endif // __WINDOWS__
}
}
Option<hashmap<string, string>> environment = None();
if (flags.environment.isSome()) {
environment = flags.environment.get();
}
// Copy the package to HDFS, if requested. Save its location
// as a URI for passing to the command (in CommandInfo).
Option<string> uri = None();
if (flags.package.isSome()) {
Try<Owned<HDFS>> hdfs = HDFS::create(flags.hadoop);
if (hdfs.isError()) {
EXIT(EXIT_FAILURE) << "Failed to create HDFS client: " << hdfs.error();
}
// TODO(benh): If HDFS is not properly configured with
// 'fs.default.name' then we'll copy to the local
// filesystem. Currently this will silently fail on our end (i.e.,
// the 'copyFromLocal' will be successful) but we'll fail to
// download the URI when we launch the executor (unless it's
// already been uploaded before ...).
// Store the file at '/user/package'.
string path = path::join("/", user.get(), flags.package.get());
// Check if the file exists and remove it if we're overwriting.
Future<bool> exists = hdfs.get()->exists(path);
exists.await();
if (!exists.isReady()) {
EXIT(EXIT_FAILURE)
<< "Failed to check if file exists: "
<< (exists.isFailed() ? exists.failure() : "discarded");
} else if (exists.get() && flags.overwrite) {
Future<Nothing> rm = hdfs.get()->rm(path);
rm.await();
if (!rm.isReady()) {
EXIT(EXIT_FAILURE)
<< "Failed to remove existing file: "
<< (rm.isFailed() ? rm.failure() : "discarded");
}
} else if (exists.get()) {
EXIT(EXIT_FAILURE) << "File already exists (see --overwrite)";
}
Future<Nothing> copy = hdfs.get()->copyFromLocal(flags.package.get(), path);
copy.await();
if (!copy.isReady()) {
EXIT(EXIT_FAILURE)
<< "Failed to copy package: "
<< (copy.isFailed() ? copy.failure() : "discarded");
}
// Now save the URI.
uri = "hdfs://" + flags.hdfs + path;
}
Option<string> appcImage;
if (flags.appc_image.isSome()) {
appcImage = flags.appc_image.get();
}
Option<string> dockerImage;
if (flags.docker_image.isSome()) {
dockerImage = flags.docker_image.get();
}
if (appcImage.isSome() && dockerImage.isSome()) {
EXIT(EXIT_FAILURE)
<< "Flags '--docker-image' and '--appc-image' are both set";
}
// Always enable the following capabilities.
vector<FrameworkInfo::Capability::Type> frameworkCapabilities = {
FrameworkInfo::Capability::RESERVATION_REFINEMENT,
FrameworkInfo::Capability::TASK_KILLING_STATE,
FrameworkInfo::Capability::REVOCABLE_RESOURCES,
};
// Enable PARTITION_AWARE unless disabled by the user.
if (flags.partition_aware) {
frameworkCapabilities.push_back(
FrameworkInfo::Capability::PARTITION_AWARE);
}
if (flags.framework_capabilities.isSome()) {
foreach (const string& capability, flags.framework_capabilities.get()) {
FrameworkInfo::Capability::Type type;
if (!FrameworkInfo::Capability::Type_Parse(capability, &type)) {
EXIT(EXIT_FAILURE)
<< "Flags '--framework_capabilities' specifies an unknown"
<< " capability '" << capability << "'";
}
if (type != FrameworkInfo::Capability::GPU_RESOURCES) {
EXIT(EXIT_FAILURE)
<< "Flags '--framework_capabilities' specifies an unsupported"
<< " capability '" << capability << "'";
}
frameworkCapabilities.push_back(type);
}
}
Option<vector<Volume>> volumes = None();
if (flags.volumes.isSome()) {
Try<RepeatedPtrField<Volume>> parse =
::protobuf::parse<RepeatedPtrField<Volume>>(flags.volumes.get());
if (parse.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to convert '--volumes' to protobuf: " << parse.error();
}
vector<Volume> _volumes;
foreach (const Volume& volume, parse.get()) {
_volumes.push_back(volume);
}
volumes = _volumes;
}
FrameworkInfo frameworkInfo;
frameworkInfo.set_user(user.get());
frameworkInfo.set_name("mesos-execute instance");
frameworkInfo.set_role(flags.role);
frameworkInfo.set_checkpoint(flags.checkpoint);
foreach (const FrameworkInfo::Capability::Type& capability,
frameworkCapabilities) {
frameworkInfo.add_capabilities()->set_type(capability);
}
Option<Credential> credential = None();
if (flags.principal.isSome()) {
frameworkInfo.set_principal(flags.principal.get());
if (flags.secret.isSome()) {
Credential credential_;
credential_.set_principal(flags.principal.get());
credential_.set_secret(flags.secret.get());
credential = credential_;
}
}
Option<TaskInfo> taskInfo = flags.task;
if (flags.task.isNone() && flags.task_group.isNone()) {
TaskInfo task;
task.set_name(flags.name.get());
task.mutable_task_id()->set_value(flags.name.get());
static const Try<Resources> resources = Resources::parse(flags.resources);
if (resources.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to parse resources '" << flags.resources << "': "
<< resources.error();
}
task.mutable_resources()->CopyFrom(resources.get());
CommandInfo* commandInfo = task.mutable_command();
if (flags.shell) {
CHECK_SOME(flags.command);
commandInfo->set_shell(true);
commandInfo->set_value(flags.command.get());
} else {
// TODO(gilbert): Treat 'command' as executable value and arguments.
commandInfo->set_shell(false);
}
if (flags.environment.isSome()) {
Environment* environment_ = commandInfo->mutable_environment();
foreachpair (
const string& name, const string& value, environment.get()) {
Environment::Variable* environmentVariable =
environment_->add_variables();
environmentVariable->set_name(name);
environmentVariable->set_value(value);
}
}
if (uri.isSome()) {
task.mutable_command()->add_uris()->set_value(uri.get());
}
Result<ContainerInfo> containerInfo =
getContainerInfo(
flags.containerizer,
volumes,
flags.networks,
appcImage,
dockerImage,
flags.effective_capabilities,
flags.bounding_capabilities,
flags.rlimits,
flags.tty);
if (containerInfo.isError()){
EXIT(EXIT_FAILURE) << containerInfo.error();
}
if (containerInfo.isSome()) {
task.mutable_container()->CopyFrom(containerInfo.get());
}
taskInfo = task;
}
Owned<CommandScheduler> scheduler(
new CommandScheduler(
frameworkInfo,
flags.master,
contentType,
flags.kill_after,
credential,
taskInfo,
flags.task_group,
flags.networks));
process::spawn(scheduler.get());
process::wait(scheduler.get());
return EXIT_SUCCESS;
}