blob: 821bd0967d55f7fbcb57a4efae5fc390af53ca79 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stout/foreach.hpp>
#include <stout/stringify.hpp>
#include "common/resources_utils.hpp"
using google::protobuf::RepeatedPtrField;
namespace mesos {
bool needCheckpointing(const Resource& resource)
{
return Resources::isDynamicallyReserved(resource) ||
Resources::isPersistentVolume(resource);
}
// NOTE: We effectively duplicate the logic in 'Resources::apply'
// which is less than ideal. But we cannot simply create
// 'Offer::Operation' and invoke 'Resources::apply' here.
// 'RESERVE' operation requires that the specified resources are
// dynamically reserved only, and 'CREATE' requires that the
// specified resources are already dynamically reserved.
// These requirements are violated when we try to infer dynamically
// reserved persistent volumes.
// TODO(mpark): Consider introducing an atomic 'RESERVE_AND_CREATE'
// operation to solve this problem.
Try<Resources> applyCheckpointedResources(
const Resources& resources,
const Resources& checkpointedResources)
{
Resources totalResources = resources;
foreach (const Resource& resource, checkpointedResources) {
if (!needCheckpointing(resource)) {
return Error("Unexpected checkpointed resources " + stringify(resource));
}
Resource stripped = resource;
// Since only unreserved and statically reserved resources can be specified
// on the agent, we strip away all of the dynamic reservations here to
// deduce the agent resources on which to apply the checkpointed resources.
if (Resources::isDynamicallyReserved(resource)) {
Resource::ReservationInfo reservation = stripped.reservations(0);
stripped.clear_reservations();
if (reservation.type() == Resource::ReservationInfo::STATIC) {
stripped.add_reservations()->CopyFrom(reservation);
}
}
// Strip persistence and volume from the disk info so that we can
// check whether it is contained in the `totalResources`.
if (Resources::isPersistentVolume(resource)) {
if (stripped.disk().has_source()) {
stripped.mutable_disk()->clear_persistence();
stripped.mutable_disk()->clear_volume();
} else {
stripped.clear_disk();
}
}
stripped.clear_shared();
if (!totalResources.contains(stripped)) {
return Error(
"Incompatible agent resources: " + stringify(totalResources) +
" does not contain " + stringify(stripped));
}
totalResources -= stripped;
totalResources += resource;
}
return totalResources;
}
void convertResourceFormat(Resource* resource, ResourceFormat format)
{
switch (format) {
case PRE_RESERVATION_REFINEMENT:
case ENDPOINT: {
CHECK(!resource->has_role());
CHECK(!resource->has_reservation());
switch (resource->reservations_size()) {
// Unreserved resource.
case 0: {
resource->set_role("*");
break;
}
// Resource with a single reservation.
case 1: {
const Resource::ReservationInfo& source = resource->reservations(0);
if (source.type() == Resource::ReservationInfo::DYNAMIC) {
Resource::ReservationInfo* target = resource->mutable_reservation();
if (source.has_principal()) {
target->set_principal(source.principal());
}
if (source.has_labels()) {
target->mutable_labels()->CopyFrom(source.labels());
}
}
resource->set_role(source.role());
if (format == PRE_RESERVATION_REFINEMENT) {
resource->clear_reservations();
}
break;
}
// Resource with refined reservations.
default: {
CHECK_NE(PRE_RESERVATION_REFINEMENT, format)
<< "Invalid resource format conversion: A 'Resource' object"
" being converted to the PRE_RESERVATION_REFINEMENT format"
" must not have refined reservations";
}
}
break;
}
case POST_RESERVATION_REFINEMENT: {
if (resource->reservations_size() > 0) {
// In this case, we're either already in
// the "post-reservation-refinement" format,
// or we're in the "endpoint" format.
// We clear out the "pre-reservation-refinement" fields
// in case the resources are in the "endpoint" format.
resource->clear_role();
resource->clear_reservation();
return;
}
// Unreserved resources.
if (resource->role() == "*") {
CHECK(!resource->has_reservation());
resource->clear_role();
return;
}
// Resource with a single reservation.
Resource::ReservationInfo* reservation = resource->add_reservations();
// Check the `Resource.reservation` to determine whether
// we have a static or dynamic reservation.
if (!resource->has_reservation()) {
reservation->set_type(Resource::ReservationInfo::STATIC);
} else {
reservation->CopyFrom(resource->reservation());
resource->clear_reservation();
reservation->set_type(Resource::ReservationInfo::DYNAMIC);
}
reservation->set_role(resource->role());
resource->clear_role();
break;
}
}
}
void convertResourceFormat(
RepeatedPtrField<Resource>* resources,
ResourceFormat format)
{
foreach (Resource& resource, *resources) {
convertResourceFormat(&resource, format);
}
}
void convertResourceFormat(
std::vector<Resource>* resources,
ResourceFormat format)
{
foreach (Resource& resource, *resources) {
convertResourceFormat(&resource, format);
}
}
Option<Error> validateAndNormalizeResources(Offer::Operation* operation)
{
CHECK_NOTNULL(operation);
switch (operation->type()) {
case Offer::Operation::RESERVE: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_reserve()) {
return Error(
"A RESERVE offer operation must have"
" the Offer.Operation.reserve field set.");
}
Option<Error> error =
Resources::validate(operation->reserve().resources());
if (error.isSome()) {
return error;
}
convertResourceFormat(
operation->mutable_reserve()->mutable_resources(),
POST_RESERVATION_REFINEMENT);
return None();
}
case Offer::Operation::UNRESERVE: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_unreserve()) {
return Error(
"An UNRESERVE offer operation must have"
" the Offer.Operation.unreserve field set.");
}
Option<Error> error =
Resources::validate(operation->unreserve().resources());
if (error.isSome()) {
return error;
}
convertResourceFormat(
operation->mutable_unreserve()->mutable_resources(),
POST_RESERVATION_REFINEMENT);
return None();
}
case Offer::Operation::CREATE: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_create()) {
return Error(
"A CREATE offer operation must have"
" the Offer.Operation.create field set.");
}
Option<Error> error =
Resources::validate(operation->create().volumes());
if (error.isSome()) {
return error;
}
convertResourceFormat(
operation->mutable_create()->mutable_volumes(),
POST_RESERVATION_REFINEMENT);
return None();
}
case Offer::Operation::DESTROY: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_destroy()) {
return Error(
"A DESTROY offer operation must have"
" the Offer.Operation.destroy field set.");
}
Option<Error> error =
Resources::validate(operation->destroy().volumes());
if (error.isSome()) {
return error;
}
convertResourceFormat(
operation->mutable_destroy()->mutable_volumes(),
POST_RESERVATION_REFINEMENT);
return None();
}
case Offer::Operation::LAUNCH: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_launch()) {
return Error(
"A LAUNCH offer operation must have"
" the Offer.Operation.launch field set.");
}
// Validate resources in LAUNCH.
foreach (const TaskInfo& task, operation->launch().task_infos()) {
Option<Error> error = Resources::validate(task.resources());
if (error.isSome()) {
return error;
}
if (task.has_executor()) {
Option<Error> error =
Resources::validate(task.executor().resources());
if (error.isSome()) {
return error;
}
}
}
// Normalize resources in LAUNCH.
foreach (
TaskInfo& task, *operation->mutable_launch()->mutable_task_infos()) {
convertResourceFormat(
task.mutable_resources(),
POST_RESERVATION_REFINEMENT);
if (task.has_executor()) {
convertResourceFormat(
task.mutable_executor()->mutable_resources(),
POST_RESERVATION_REFINEMENT);
}
}
return None();
}
case Offer::Operation::LAUNCH_GROUP: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_launch_group()) {
return Error(
"A LAUNCH_GROUP offer operation must have"
" the Offer.Operation.launch_group field set.");
}
Offer::Operation::LaunchGroup* launchGroup =
operation->mutable_launch_group();
// Validate resources in LAUNCH_GROUP.
if (launchGroup->has_executor()) {
Option<Error> error =
Resources::validate(launchGroup->executor().resources());
if (error.isSome()) {
return error;
}
}
foreach (const TaskInfo& task, launchGroup->task_group().tasks()) {
Option<Error> error = Resources::validate(task.resources());
if (error.isSome()) {
return error;
}
if (task.has_executor()) {
Option<Error> error =
Resources::validate(task.executor().resources());
if (error.isSome()) {
return error;
}
}
}
// Normalize resources in LAUNCH_GROUP.
if (launchGroup->has_executor()) {
convertResourceFormat(
launchGroup->mutable_executor()->mutable_resources(),
POST_RESERVATION_REFINEMENT);
}
foreach (
TaskInfo& task, *launchGroup->mutable_task_group()->mutable_tasks()) {
convertResourceFormat(
task.mutable_resources(),
POST_RESERVATION_REFINEMENT);
if (task.has_executor()) {
convertResourceFormat(
task.mutable_executor()->mutable_resources(),
POST_RESERVATION_REFINEMENT);
}
}
return None();
}
case Offer::Operation::UNKNOWN: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
return Error("Unknown offer operation");
}
}
UNREACHABLE();
}
Try<Nothing> downgradeResources(RepeatedPtrField<Resource>* resources)
{
foreach (const Resource& resource, *resources) {
CHECK(!resource.has_role());
CHECK(!resource.has_reservation());
}
foreach (const Resource& resource, *resources) {
if (Resources::hasRefinedReservations(resource)) {
return Error(
"Invalid resources downgrade: resource " + stringify(resource) +
" with refined reservations cannot be downgraded");
}
}
convertResourceFormat(resources, PRE_RESERVATION_REFINEMENT);
return Nothing();
}
} // namespace mesos {