blob: c5e41516e5b9b08acf0641c89c3386e050783bf0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <mesos/agent/agent.hpp>
#include <mesos/master/master.hpp>
#include <mesos/v1/agent/agent.hpp>
#include <mesos/v1/master/master.hpp>
#include <process/pid.hpp>
#include <stout/check.hpp>
#include <stout/json.hpp>
#include <stout/protobuf.hpp>
#include "internal/evolve.hpp"
#include "master/constants.hpp"
using std::string;
using google::protobuf::RepeatedPtrField;
using process::UPID;
namespace mesos {
namespace internal {
// Helper for evolving a type by serializing/parsing when the types
// have not changed across versions.
template <typename T>
static T evolve(const google::protobuf::Message& message)
{
T t;
string data;
// NOTE: We need to use 'SerializePartialToString' instead of
// 'SerializeToString' because some required fields might not be set
// and we don't want an exception to get thrown.
CHECK(message.SerializePartialToString(&data))
<< "Failed to serialize " << message.GetTypeName()
<< " while evolving to " << t.GetTypeName();
// NOTE: We need to use 'ParsePartialFromString' instead of
// 'ParseFromString' because some required fields might not
// be set and we don't want an exception to get thrown.
CHECK(t.ParsePartialFromString(data))
<< "Failed to parse " << t.GetTypeName()
<< " while evolving from " << message.GetTypeName();
return t;
}
v1::AgentID evolve(const SlaveID& slaveId)
{
// NOTE: Not using 'evolve<SlaveID, v1::AgentID>(slaveId)' since
// this will be a common 'evolve' call and we wanted to speed up
// performance.
v1::AgentID id;
id.set_value(slaveId.value());
return id;
}
v1::AgentInfo evolve(const SlaveInfo& slaveInfo)
{
return evolve<v1::AgentInfo>(slaveInfo);
}
v1::ContainerInfo evolve(const ContainerInfo& containerInfo)
{
return evolve<v1::ContainerInfo>(containerInfo);
}
v1::DomainInfo evolve(const DomainInfo& domainInfo)
{
return evolve<v1::DomainInfo>(domainInfo);
}
v1::DrainInfo evolve(const DrainInfo& drainInfo)
{
return evolve<v1::DrainInfo>(drainInfo);
}
v1::ExecutorID evolve(const ExecutorID& executorId)
{
return evolve<v1::ExecutorID>(executorId);
}
v1::ExecutorInfo evolve(const ExecutorInfo& executorInfo)
{
return evolve<v1::ExecutorInfo>(executorInfo);
}
v1::FileInfo evolve(const FileInfo& fileInfo)
{
return evolve<v1::FileInfo>(fileInfo);
}
v1::FrameworkID evolve(const FrameworkID& frameworkId)
{
return evolve<v1::FrameworkID>(frameworkId);
}
v1::FrameworkInfo evolve(const FrameworkInfo& frameworkInfo)
{
return evolve<v1::FrameworkInfo>(frameworkInfo);
}
v1::InverseOffer evolve(const InverseOffer& inverseOffer)
{
return evolve<v1::InverseOffer>(inverseOffer);
}
v1::KillPolicy evolve(const KillPolicy& killPolicy)
{
return evolve<v1::KillPolicy>(killPolicy);
}
v1::MachineID evolve(const MachineID& machineId)
{
return evolve<v1::MachineID>(machineId);
}
v1::MasterInfo evolve(const MasterInfo& masterInfo)
{
return evolve<v1::MasterInfo>(masterInfo);
}
v1::Offer evolve(const Offer& offer)
{
return evolve<v1::Offer>(offer);
}
v1::OfferID evolve(const OfferID& offerId)
{
return evolve<v1::OfferID>(offerId);
}
v1::OperationStatus evolve(const OperationStatus& status)
{
v1::OperationStatus _status = evolve<v1::OperationStatus>(status);
if (status.has_slave_id()) {
*_status.mutable_agent_id() = evolve<v1::AgentID>(status.slave_id());
}
return _status;
}
v1::Resource evolve(const Resource& resource)
{
return evolve<v1::Resource>(resource);
}
v1::ResourceProviderID evolve(
const ResourceProviderID& resourceProviderId)
{
// NOTE: We do not use the common 'devolve' call for performance.
v1::ResourceProviderID id;
id.set_value(resourceProviderId.value());
return id;
}
v1::Resources evolve(const Resources& resources)
{
return evolve<v1::Resource>(
static_cast<const RepeatedPtrField<Resource>&>(resources));
}
v1::Task evolve(const Task& task)
{
return evolve<v1::Task>(task);
}
v1::TaskID evolve(const TaskID& taskId)
{
return evolve<v1::TaskID>(taskId);
}
v1::TaskInfo evolve(const TaskInfo& taskInfo)
{
return evolve<v1::TaskInfo>(taskInfo);
}
v1::TaskStatus evolve(const TaskStatus& status)
{
return evolve<v1::TaskStatus>(status);
}
v1::UUID evolve(const UUID& uuid)
{
return evolve<v1::UUID>(uuid);
}
v1::agent::Call evolve(const agent::Call& call)
{
return evolve<v1::agent::Call>(call);
}
v1::agent::ProcessIO evolve(const agent::ProcessIO& processIO)
{
return evolve<v1::agent::ProcessIO>(processIO);
}
v1::agent::Response evolve(const agent::Response& response)
{
return evolve<v1::agent::Response>(response);
}
v1::maintenance::ClusterStatus evolve(const maintenance::ClusterStatus& status)
{
return evolve<v1::maintenance::ClusterStatus>(status);
}
v1::maintenance::Schedule evolve(const maintenance::Schedule& schedule)
{
return evolve<v1::maintenance::Schedule>(schedule);
}
v1::master::Response evolve(const mesos::master::Response& response)
{
return evolve<v1::master::Response>(response);
}
v1::resource_provider::Call evolve(const resource_provider::Call& call)
{
return evolve<v1::resource_provider::Call>(call);
}
v1::resource_provider::Event evolve(const resource_provider::Event& event)
{
return evolve<v1::resource_provider::Event>(event);
}
// TODO(xujyan): Do we need this conversion when Mesos never sends out
// `scheduler::Call` thus never needs to evovle internal call to a v1 call?
// Perhaps we should remove the method so there's no need to maintain it.
v1::scheduler::Call evolve(const scheduler::Call& call)
{
v1::scheduler::Call _call = evolve<v1::scheduler::Call>(call);
// Certain conversions require special handling.
if (_call.type() == v1::scheduler::Call::SUBSCRIBE) {
// v1 Subscribe.suppressed_roles cannot be automatically converted
// because its tag is used by another field in the internal Subscribe.
*(_call.mutable_subscribe()->mutable_suppressed_roles()) =
call.subscribe().suppressed_roles();
}
return _call;
}
v1::scheduler::Event evolve(const scheduler::Event& event)
{
return evolve<v1::scheduler::Event>(event);
}
v1::scheduler::Event evolve(const ExitedExecutorMessage& message)
{
v1::scheduler::Event event;
event.set_type(v1::scheduler::Event::FAILURE);
v1::scheduler::Event::Failure* failure = event.mutable_failure();
*failure->mutable_agent_id() = evolve(message.slave_id());
*failure->mutable_executor_id() = evolve(message.executor_id());
failure->set_status(message.status());
return event;
}
v1::scheduler::Event evolve(const ExecutorToFrameworkMessage& message)
{
v1::scheduler::Event event;
event.set_type(v1::scheduler::Event::MESSAGE);
v1::scheduler::Event::Message* message_ = event.mutable_message();
*message_->mutable_agent_id() = evolve(message.slave_id());
*message_->mutable_executor_id() = evolve(message.executor_id());
message_->set_data(message.data());
return event;
}
v1::scheduler::Event evolve(const FrameworkErrorMessage& message)
{
v1::scheduler::Event event;
event.set_type(v1::scheduler::Event::ERROR);
v1::scheduler::Event::Error* error = event.mutable_error();
error->set_message(message.message());
return event;
}
v1::scheduler::Event evolve(const FrameworkRegisteredMessage& message)
{
v1::scheduler::Event event;
event.set_type(v1::scheduler::Event::SUBSCRIBED);
v1::scheduler::Event::Subscribed* subscribed = event.mutable_subscribed();
*subscribed->mutable_framework_id() = evolve(message.framework_id());
// TODO(anand): The master should pass the heartbeat interval as an argument
// to `evolve()`.
subscribed->set_heartbeat_interval_seconds(
master::DEFAULT_HEARTBEAT_INTERVAL.secs());
*subscribed->mutable_master_info() = evolve(message.master_info());
return event;
}
v1::scheduler::Event evolve(const FrameworkReregisteredMessage& message)
{
v1::scheduler::Event event;
event.set_type(v1::scheduler::Event::SUBSCRIBED);
v1::scheduler::Event::Subscribed* subscribed = event.mutable_subscribed();
*subscribed->mutable_framework_id() = evolve(message.framework_id());
// TODO(anand): The master should pass the heartbeat interval as an argument
// to `evolve()`.
subscribed->set_heartbeat_interval_seconds(
master::DEFAULT_HEARTBEAT_INTERVAL.secs());
*subscribed->mutable_master_info() = evolve(message.master_info());
return event;
}
v1::scheduler::Event evolve(const InverseOffersMessage& message)
{
v1::scheduler::Event event;
event.set_type(v1::scheduler::Event::INVERSE_OFFERS);
v1::scheduler::Event::InverseOffers* inverse_offers =
event.mutable_inverse_offers();
*inverse_offers->mutable_inverse_offers() =
evolve<v1::InverseOffer>(message.inverse_offers());
return event;
}
v1::scheduler::Event evolve(const LostSlaveMessage& message)
{
v1::scheduler::Event event;
event.set_type(v1::scheduler::Event::FAILURE);
v1::scheduler::Event::Failure* failure = event.mutable_failure();
*failure->mutable_agent_id() = evolve(message.slave_id());
return event;
}
v1::scheduler::Event evolve(const ResourceOffersMessage& message)
{
v1::scheduler::Event event;
event.set_type(v1::scheduler::Event::OFFERS);
v1::scheduler::Event::Offers* offers = event.mutable_offers();
*offers->mutable_offers() = evolve<v1::Offer>(message.offers());
return event;
}
v1::scheduler::Event evolve(const RescindInverseOfferMessage& message)
{
v1::scheduler::Event event;
event.set_type(v1::scheduler::Event::RESCIND_INVERSE_OFFER);
v1::scheduler::Event::RescindInverseOffer* rescindInverseOffer =
event.mutable_rescind_inverse_offer();
*rescindInverseOffer->mutable_inverse_offer_id() =
evolve(message.inverse_offer_id());
return event;
}
v1::scheduler::Event evolve(const RescindResourceOfferMessage& message)
{
v1::scheduler::Event event;
event.set_type(v1::scheduler::Event::RESCIND);
v1::scheduler::Event::Rescind* rescind = event.mutable_rescind();
*rescind->mutable_offer_id() = evolve(message.offer_id());
return event;
}
v1::scheduler::Event evolve(const StatusUpdateMessage& message)
{
v1::scheduler::Event event;
event.set_type(v1::scheduler::Event::UPDATE);
v1::scheduler::Event::Update* update = event.mutable_update();
*update->mutable_status() = evolve(message.update().status());
if (message.update().has_slave_id()) {
*update->mutable_status()->mutable_agent_id() =
evolve(message.update().slave_id());
}
if (message.update().has_executor_id()) {
*update->mutable_status()->mutable_executor_id() =
evolve(message.update().executor_id());
}
update->mutable_status()->set_timestamp(message.update().timestamp());
// If the update does not have a 'uuid', it does not need
// acknowledging. However, prior to 0.23.0, the update uuid
// was required and always set. In 0.24.0, we can rely on the
// update uuid check here, until then we must still check for
// this being sent from the driver (from == UPID()) or from
// the master (pid == UPID()).
// TODO(vinod): Get rid of this logic in 0.25.0 because master
// and slave correctly set task status in 0.24.0.
if (!message.update().has_uuid() || message.update().uuid() == "") {
update->mutable_status()->clear_uuid();
} else if (UPID(message.pid()) == UPID()) {
update->mutable_status()->clear_uuid();
} else {
update->mutable_status()->set_uuid(message.update().uuid());
}
return event;
}
v1::scheduler::Event evolve(const UpdateOperationStatusMessage& message)
{
v1::scheduler::Event event;
event.set_type(v1::scheduler::Event::UPDATE_OPERATION_STATUS);
*event.mutable_update_operation_status()->mutable_status() =
evolve(message.status());
return event;
}
v1::executor::Call evolve(const executor::Call& call)
{
return evolve<v1::executor::Call>(call);
}
v1::executor::Event evolve(const executor::Event& event)
{
return evolve<v1::executor::Event>(event);
}
v1::scheduler::Response evolve(const scheduler::Response& response)
{
return evolve<v1::scheduler::Response>(response);
}
v1::executor::Event evolve(const ExecutorRegisteredMessage& message)
{
v1::executor::Event event;
event.set_type(v1::executor::Event::SUBSCRIBED);
v1::executor::Event::Subscribed* subscribed = event.mutable_subscribed();
*subscribed->mutable_executor_info() = evolve(message.executor_info());
*subscribed->mutable_framework_info() = evolve(message.framework_info());
*subscribed->mutable_agent_info() = evolve(message.slave_info());
return event;
}
v1::executor::Event evolve(const FrameworkToExecutorMessage& message)
{
v1::executor::Event event;
event.set_type(v1::executor::Event::MESSAGE);
v1::executor::Event::Message* message_ = event.mutable_message();
message_->set_data(message.data());
return event;
}
v1::executor::Event evolve(const KillTaskMessage& message)
{
v1::executor::Event event;
event.set_type(v1::executor::Event::KILL);
v1::executor::Event::Kill* kill = event.mutable_kill();
*kill->mutable_task_id() = evolve(message.task_id());
if (message.has_kill_policy()) {
*kill->mutable_kill_policy() = evolve(message.kill_policy());
}
return event;
}
v1::executor::Event evolve(const RunTaskMessage& message)
{
v1::executor::Event event;
event.set_type(v1::executor::Event::LAUNCH);
v1::executor::Event::Launch* launch = event.mutable_launch();
*launch->mutable_task() = evolve(message.task());
return event;
}
v1::executor::Event evolve(const ShutdownExecutorMessage&)
{
v1::executor::Event event;
event.set_type(v1::executor::Event::SHUTDOWN);
return event;
}
v1::executor::Event evolve(
const StatusUpdateAcknowledgementMessage& message)
{
v1::executor::Event event;
event.set_type(v1::executor::Event::ACKNOWLEDGED);
v1::executor::Event::Acknowledged* acknowledged =
event.mutable_acknowledged();
*acknowledged->mutable_task_id() = evolve(message.task_id());
acknowledged->set_uuid(message.uuid());
return event;
}
v1::master::Event evolve(const mesos::master::Event& event)
{
return evolve<v1::master::Event>(event);
}
template<>
v1::master::Response evolve<v1::master::Response::GET_FLAGS>(
const JSON::Object& object)
{
v1::master::Response response;
response.set_type(v1::master::Response::GET_FLAGS);
v1::master::Response::GetFlags* getFlags = response.mutable_get_flags();
Result<JSON::Object> flags = object.at<JSON::Object>("flags");
CHECK_SOME(flags) << "Failed to find 'flags' key in the JSON object";
foreachpair (const string& key,
const JSON::Value& value,
flags->values) {
v1::Flag* flag = getFlags->add_flags();
flag->set_name(key);
CHECK(value.is<JSON::String>())
<< "Flag '" + key + "' value is not a string";
flag->set_value(value.as<JSON::String>().value);
}
return response;
}
// TODO(vinod): Consolidate master and agent flags evolution.
template<>
v1::agent::Response evolve<v1::agent::Response::GET_FLAGS>(
const JSON::Object& object)
{
v1::agent::Response response;
response.set_type(v1::agent::Response::GET_FLAGS);
v1::agent::Response::GetFlags* getFlags = response.mutable_get_flags();
Result<JSON::Object> flags = object.at<JSON::Object>("flags");
CHECK_SOME(flags) << "Failed to find 'flags' key in the JSON object";
foreachpair (const string& key,
const JSON::Value& value,
flags->values) {
v1::Flag* flag = getFlags->add_flags();
flag->set_name(key);
CHECK(value.is<JSON::String>())
<< "Flag '" + key + "' value is not a string";
flag->set_value(value.as<JSON::String>().value);
}
return response;
}
template<>
v1::master::Response evolve<v1::master::Response::GET_VERSION>(
const JSON::Object& object)
{
v1::master::Response response;
response.set_type(v1::master::Response::GET_VERSION);
*response.mutable_get_version()->mutable_version_info() =
CHECK_NOTERROR(::protobuf::parse<v1::VersionInfo>(object));
return response;
}
// TODO(vinod): Consolidate master and agent version evolution.
template<>
v1::agent::Response evolve<v1::agent::Response::GET_VERSION>(
const JSON::Object& object)
{
v1::agent::Response response;
response.set_type(v1::agent::Response::GET_VERSION);
*response.mutable_get_version()->mutable_version_info() =
CHECK_NOTERROR(::protobuf::parse<v1::VersionInfo>(object));
return response;
}
template<>
v1::agent::Response evolve<v1::agent::Response::GET_CONTAINERS>(
const JSON::Array& array)
{
v1::agent::Response response;
response.set_type(v1::agent::Response::GET_CONTAINERS);
foreach (const JSON::Value& value, array.values) {
v1::agent::Response::GetContainers::Container* container =
response.mutable_get_containers()->add_containers();
JSON::Object object = value.as<JSON::Object>();
Result<JSON::String> container_id =
object.find<JSON::String>("container_id");
CHECK_SOME(container_id);
container->mutable_container_id()->set_value(container_id->value);
Result<JSON::String> framework_id =
object.find<JSON::String>("framework_id");
CHECK(!framework_id.isError());
if (framework_id.isSome()) {
container->mutable_framework_id()->set_value(framework_id->value);
}
Result<JSON::String> executor_id = object.find<JSON::String>("executor_id");
CHECK(!executor_id.isError());
if (executor_id.isSome()) {
container->mutable_executor_id()->set_value(executor_id->value);
}
Result<JSON::String> executor_name =
object.find<JSON::String>("executor_name");
CHECK(!executor_name.isError());
if (executor_name.isSome()) {
container->set_executor_name(executor_name->value);
}
Result<JSON::Object> container_status = object.find<JSON::Object>("status");
if (container_status.isSome()) {
*container->mutable_container_status() = CHECK_NOTERROR(
::protobuf::parse<v1::ContainerStatus>(container_status.get()));
}
Result<JSON::Object> resource_statistics =
object.find<JSON::Object>("statistics");
if (resource_statistics.isSome()) {
*container->mutable_resource_statistics() = CHECK_NOTERROR(
::protobuf::parse<v1::ResourceStatistics>(resource_statistics.get()));
}
}
return response;
}
} // namespace internal {
} // namespace mesos {