blob: 17cc96da4603205a45419342c03589036a17fa7c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stout/foreach.hpp>
#include <stout/stringify.hpp>
#include "common/resources_utils.hpp"
using std::vector;
using google::protobuf::Descriptor;
using google::protobuf::Message;
using google::protobuf::RepeatedPtrField;
namespace mesos {
bool needCheckpointing(const Resource& resource)
{
return !Resources::hasResourceProvider(resource) &&
(Resources::isDynamicallyReserved(resource) ||
Resources::isPersistentVolume(resource));
}
// NOTE: We effectively duplicate the logic in 'Resources::apply'
// which is less than ideal. But we cannot simply create
// 'Offer::Operation' and invoke 'Resources::apply' here.
// 'RESERVE' operation requires that the specified resources are
// dynamically reserved only, and 'CREATE' requires that the
// specified resources are already dynamically reserved.
// These requirements are violated when we try to infer dynamically
// reserved persistent volumes.
// TODO(mpark): Consider introducing an atomic 'RESERVE_AND_CREATE'
// operation to solve this problem.
Try<Resources> applyCheckpointedResources(
const Resources& resources,
const Resources& checkpointedResources)
{
Resources totalResources = resources;
foreach (const Resource& resource, checkpointedResources) {
if (!needCheckpointing(resource)) {
return Error("Unexpected checkpointed resources " + stringify(resource));
}
Resource stripped = resource;
// Since only unreserved and statically reserved resources can be specified
// on the agent, we strip away all of the dynamic reservations here to
// deduce the agent resources on which to apply the checkpointed resources.
if (Resources::isDynamicallyReserved(resource)) {
Resource::ReservationInfo reservation = stripped.reservations(0);
stripped.clear_reservations();
if (reservation.type() == Resource::ReservationInfo::STATIC) {
stripped.add_reservations()->CopyFrom(reservation);
}
}
// Strip persistence and volume from the disk info so that we can
// check whether it is contained in the `totalResources`.
if (Resources::isPersistentVolume(resource)) {
if (stripped.disk().has_source()) {
stripped.mutable_disk()->clear_persistence();
stripped.mutable_disk()->clear_volume();
} else {
stripped.clear_disk();
}
}
stripped.clear_shared();
if (!totalResources.contains(stripped)) {
return Error(
"Incompatible agent resources: " + stringify(totalResources) +
" does not contain " + stringify(stripped));
}
totalResources -= stripped;
totalResources += resource;
}
return totalResources;
}
namespace internal {
// NOTE: Use template here so that it works for both internal and v1.
template <typename TResources,
typename TResource,
typename TResourceConversion,
typename TOperation>
Try<vector<TResourceConversion>> getResourceConversions(
const TOperation& operation)
{
vector<TResourceConversion> conversions;
switch (operation.type()) {
case TOperation::UNKNOWN:
return Error("Unknown operation");
case TOperation::LAUNCH:
case TOperation::LAUNCH_GROUP:
case TOperation::CREATE_DISK:
case TOperation::DESTROY_DISK:
return Error("Operation not supported");
case TOperation::RESERVE: {
TResources reserved(operation.reserve().resources());
// If the operation explicitly specifies a `source` we use that,
// otherwise we assume that the operation is "pushing" a single
// reservation. At this point, the resources in the operation
// should have been already sanity checked, so we don't have to
// repeat that here.
TResources consumed;
if (operation.reserve().source_size() > 0) {
consumed = TResources(operation.reserve().source());
} else {
consumed = reserved.popReservation();
}
conversions.emplace_back(consumed, reserved);
break;
}
case TOperation::UNRESERVE: {
foreach (const TResource& reserved, operation.unreserve().resources()) {
// Note that we only allow "popping" a single reservation at time.
TResources converted = TResources(reserved).popReservation();
conversions.emplace_back(reserved, converted);
}
break;
}
case TOperation::CREATE: {
foreach (const TResource& volume, operation.create().volumes()) {
// Strip persistence and volume from the disk info so that we
// can subtract it from the original resources.
// TODO(jieyu): Non-persistent volumes are not supported for
// now. Persistent volumes can only be be created from regular
// disk resources. Revisit this once we start to support
// non-persistent volumes.
TResource stripped = volume;
if (stripped.disk().has_source()) {
stripped.mutable_disk()->clear_persistence();
stripped.mutable_disk()->clear_volume();
} else {
stripped.clear_disk();
}
// Since we only allow persistent volumes to be shared, the
// original resource must be non-shared.
stripped.clear_shared();
conversions.emplace_back(stripped, volume);
}
break;
}
case TOperation::DESTROY: {
foreach (const TResource& volume, operation.destroy().volumes()) {
// Strip persistence and volume from the disk info so that we
// can subtract it from the original resources.
TResource stripped = volume;
if (stripped.disk().has_source()) {
stripped.mutable_disk()->clear_persistence();
stripped.mutable_disk()->clear_volume();
} else {
stripped.clear_disk();
}
// Since we only allow persistent volumes to be shared, we
// return the resource to non-shared state after destroy.
stripped.clear_shared();
conversions.emplace_back(
volume,
stripped,
[volume](const TResources& resources) -> Try<Nothing> {
if (resources.contains(volume)) {
return Error(
"Persistent volume " + stringify(volume) + " cannot be "
"removed due to additional shared copies");
}
return Nothing();
});
}
break;
}
case TOperation::GROW_VOLUME: {
const TResource& volume = operation.grow_volume().volume();
const TResource& addition = operation.grow_volume().addition();
if (TResources::hasResourceProvider(volume)) {
return Error("Operation not supported for resource provider");
}
// To grow a persistent volume, we consume the original volume and the
// additional resource and convert into a single volume with the new size.
TResource converted = volume;
*converted.mutable_scalar() += addition.scalar();
conversions.emplace_back(TResources(volume) + addition, converted);
break;
}
case TOperation::SHRINK_VOLUME: {
const TResource& volume = operation.shrink_volume().volume();
if (TResources::hasResourceProvider(volume)) {
return Error("Operation not supported for resource provider");
}
// To shrink a persistent volume, we consume the original volume and
// convert into a new volume with reduced size and a freed disk resource
// without persistent volume info.
TResource freed = volume;
*freed.mutable_scalar() = operation.shrink_volume().subtract();
// TODO(zhitao): Move this to helper function
// `Resources::stripPersistentVolume`.
if (freed.disk().has_source()) {
freed.mutable_disk()->clear_persistence();
freed.mutable_disk()->clear_volume();
} else {
freed.clear_disk();
}
// Since we only allow persistent volumes to be shared, the
// freed resource must be non-shared.
freed.clear_shared();
TResource shrunk = volume;
*shrunk.mutable_scalar() -= operation.shrink_volume().subtract();
conversions.emplace_back(volume, TResources(shrunk) + freed);
break;
}
}
return conversions;
}
} // namespace internal {
Try<vector<ResourceConversion>> getResourceConversions(
const Offer::Operation& operation)
{
return internal::getResourceConversions<
Resources,
Resource,
ResourceConversion,
Offer::Operation>(operation);
}
Try<vector<v1::ResourceConversion>> getResourceConversions(
const v1::Offer::Operation& operation)
{
return internal::getResourceConversions<
v1::Resources,
v1::Resource,
v1::ResourceConversion,
v1::Offer::Operation>(operation);
}
Result<ResourceProviderID> getResourceProviderId(
const Offer::Operation& operation)
{
Option<Resource> resource;
switch (operation.type()) {
case Offer::Operation::LAUNCH:
return Error("Unexpected LAUNCH operation");
case Offer::Operation::LAUNCH_GROUP:
return Error("Unexpected LAUNCH_GROUP operation");
case Offer::Operation::RESERVE:
if (operation.reserve().resources().empty()) {
return Error("Operation contains no resources");
}
resource = operation.reserve().resources(0);
break;
case Offer::Operation::UNRESERVE:
if (operation.unreserve().resources().empty()) {
return Error("Operation contains no resources");
}
resource = operation.unreserve().resources(0);
break;
case Offer::Operation::CREATE:
if (operation.create().volumes().empty()) {
return Error("Operation contains no resources");
}
resource = operation.create().volumes(0);
break;
case Offer::Operation::DESTROY:
if (operation.destroy().volumes().empty()) {
return Error("Operation contains no resources");
}
resource = operation.destroy().volumes(0);
break;
case Offer::Operation::GROW_VOLUME:
resource = operation.grow_volume().volume();
break;
case Offer::Operation::SHRINK_VOLUME:
resource = operation.shrink_volume().volume();
break;
case Offer::Operation::CREATE_DISK:
resource = operation.create_disk().source();
break;
case Offer::Operation::DESTROY_DISK:
resource = operation.destroy_disk().source();
break;
case Offer::Operation::UNKNOWN:
return Error("Unknown offer operation");
}
CHECK_SOME(resource);
if (resource->has_provider_id()) {
return resource->provider_id();
}
return None();
}
Result<ResourceProviderID> getResourceProviderId(const Resources& resources)
{
if (resources.empty()) {
return Error("Cannot determine resource provider for empty resources");
}
const Resource resource = *resources.begin();
const Option<ResourceProviderID> resourceProviderId =
resource.has_provider_id() ? resource.provider_id()
: Option<ResourceProviderID>::none();
foreach (const Resource& resource_, resources) {
const Option<ResourceProviderID> resourceProviderId_ =
resource_.has_provider_id() ? resource_.provider_id()
: Option<ResourceProviderID>::none();
if (resourceProviderId_ != resourceProviderId) {
return Error("Resources are from multiple resource providers");
}
}
return resourceProviderId;
}
void convertResourceFormat(Resource* resource, ResourceFormat format)
{
switch (format) {
case PRE_RESERVATION_REFINEMENT:
case ENDPOINT: {
CHECK(!resource->has_role());
CHECK(!resource->has_reservation());
switch (resource->reservations_size()) {
// Unreserved resource.
case 0: {
resource->set_role("*");
break;
}
// Resource with a single reservation.
case 1: {
const Resource::ReservationInfo& source = resource->reservations(0);
if (source.type() == Resource::ReservationInfo::DYNAMIC) {
Resource::ReservationInfo* target = resource->mutable_reservation();
if (source.has_principal()) {
target->set_principal(source.principal());
}
if (source.has_labels()) {
target->mutable_labels()->CopyFrom(source.labels());
}
}
resource->set_role(source.role());
if (format == PRE_RESERVATION_REFINEMENT) {
resource->clear_reservations();
}
break;
}
// Resource with refined reservations.
default: {
CHECK_NE(PRE_RESERVATION_REFINEMENT, format)
<< "Invalid resource format conversion: A 'Resource' object"
" being converted to the PRE_RESERVATION_REFINEMENT format"
" must not have refined reservations";
}
}
break;
}
case POST_RESERVATION_REFINEMENT: {
if (resource->reservations_size() > 0) {
// In this case, we're either already in
// the "post-reservation-refinement" format,
// or we're in the "endpoint" format.
// We clear out the "pre-reservation-refinement" fields
// in case the resources are in the "endpoint" format.
resource->clear_role();
resource->clear_reservation();
return;
}
// Unreserved resources.
if (resource->role() == "*") {
CHECK(!resource->has_reservation());
resource->clear_role();
return;
}
// Resource with a single reservation.
Resource::ReservationInfo* reservation = resource->add_reservations();
// Check the `Resource.reservation` to determine whether
// we have a static or dynamic reservation.
if (!resource->has_reservation()) {
reservation->set_type(Resource::ReservationInfo::STATIC);
} else {
reservation->CopyFrom(resource->reservation());
resource->clear_reservation();
reservation->set_type(Resource::ReservationInfo::DYNAMIC);
}
reservation->set_role(resource->role());
resource->clear_role();
break;
}
}
}
void convertResourceFormat(
RepeatedPtrField<Resource>* resources,
ResourceFormat format)
{
foreach (Resource& resource, *resources) {
convertResourceFormat(&resource, format);
}
}
void convertResourceFormat(
std::vector<Resource>* resources,
ResourceFormat format)
{
foreach (Resource& resource, *resources) {
convertResourceFormat(&resource, format);
}
}
namespace internal {
// Given a protobuf descriptor `descriptor`, recursively populates the provided
// `result` where the keys are the message descriptors within `descriptor`'s
// schema (including itself), and the corresponding value is `true` if the key
// contains a `mesos::Resource`, and `false` otherwise.
static void precomputeResourcesContainment(
const Descriptor* descriptor,
hashmap<const Descriptor*, bool>* result)
{
CHECK_NOTNULL(descriptor);
CHECK_NOTNULL(result);
if (result->contains(descriptor)) {
return;
}
if (descriptor == mesos::Resource::descriptor()) {
result->insert({descriptor, true});
}
result->insert({descriptor, false});
for (int i = 0; i < descriptor->field_count(); ++i) {
// `message_type()` returns `nullptr` if the field is not a message type.
const Descriptor* messageDescriptor = descriptor->field(i)->message_type();
if (messageDescriptor == nullptr) {
continue;
}
precomputeResourcesContainment(messageDescriptor, result);
result->at(descriptor) |= result->at(messageDescriptor);
}
}
static Try<Nothing> convertResourcesImpl(
Message* message,
Try<Nothing> (*convertResource)(mesos::Resource* resource),
const hashmap<const Descriptor*, bool>& resourcesContainment)
{
CHECK_NOTNULL(message);
const Descriptor* descriptor = message->GetDescriptor();
if (descriptor == mesos::Resource::descriptor()) {
return convertResource(static_cast<mesos::Resource*>(message));
}
const google::protobuf::Reflection* reflection = message->GetReflection();
for (int i = 0; i < descriptor->field_count(); ++i) {
const google::protobuf::FieldDescriptor* field = descriptor->field(i);
const Descriptor* messageDescriptor = field->message_type();
if (messageDescriptor == nullptr ||
!resourcesContainment.at(messageDescriptor)) {
continue;
}
if (!field->is_repeated()) {
if (reflection->HasField(*message, field)) {
Try<Nothing> result = convertResourcesImpl(
reflection->MutableMessage(message, field),
convertResource,
resourcesContainment);
if (result.isError()) {
return result;
}
}
} else {
const int size = reflection->FieldSize(*message, field);
for (int j = 0; j < size; ++j) {
Try<Nothing> result = convertResourcesImpl(
reflection->MutableRepeatedMessage(message, field, j),
convertResource,
resourcesContainment);
if (result.isError()) {
return result;
}
}
}
}
return Nothing();
}
} // namespace internal {
void upgradeResource(Resource* resource)
{
convertResourceFormat(resource, POST_RESERVATION_REFINEMENT);
}
void upgradeResources(RepeatedPtrField<Resource>* resources)
{
convertResourceFormat(resources, POST_RESERVATION_REFINEMENT);
}
void upgradeResources(vector<Resource>* resources)
{
convertResourceFormat(resources, POST_RESERVATION_REFINEMENT);
}
void upgradeResources(Message* message)
{
CHECK_NOTNULL(message);
const Descriptor* descriptor = message->GetDescriptor();
hashmap<const Descriptor*, bool> resourcesContainment;
internal::precomputeResourcesContainment(descriptor, &resourcesContainment);
if (!resourcesContainment.at(descriptor)) {
return;
}
internal::convertResourcesImpl(
message,
[](Resource* resource) -> Try<Nothing> {
upgradeResource(resource);
return Nothing();
},
resourcesContainment);
}
Option<Error> validateAndUpgradeResources(Offer::Operation* operation)
{
CHECK_NOTNULL(operation);
switch (operation->type()) {
case Offer::Operation::RESERVE: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_reserve()) {
return Error(
"A RESERVE offer operation must have"
" the Offer.Operation.reserve field set.");
}
Option<Error> error =
Resources::validate(operation->reserve().resources());
if (error.isSome()) {
return error;
}
error = Resources::validate(operation->reserve().source());
if (error.isSome()) {
return error;
}
break;
}
case Offer::Operation::UNRESERVE: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_unreserve()) {
return Error(
"An UNRESERVE offer operation must have"
" the Offer.Operation.unreserve field set.");
}
Option<Error> error =
Resources::validate(operation->unreserve().resources());
if (error.isSome()) {
return error;
}
break;
}
case Offer::Operation::CREATE: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_create()) {
return Error(
"A CREATE offer operation must have"
" the Offer.Operation.create field set.");
}
Option<Error> error =
Resources::validate(operation->create().volumes());
if (error.isSome()) {
return error;
}
break;
}
case Offer::Operation::DESTROY: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_destroy()) {
return Error(
"A DESTROY offer operation must have"
" the Offer.Operation.destroy field set.");
}
Option<Error> error =
Resources::validate(operation->destroy().volumes());
if (error.isSome()) {
return error;
}
break;
}
case Offer::Operation::GROW_VOLUME: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_grow_volume()) {
return Error(
"A GROW_VOLUME operation must have"
" the Offer.Operation.grow_volume field set");
}
Option<Error> error = Resources::validate(
operation->grow_volume().volume());
if (error.isSome()) {
return error;
}
error = Resources::validate(operation->grow_volume().addition());
if (error.isSome()) {
return error;
}
break;
}
case Offer::Operation::SHRINK_VOLUME: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_shrink_volume()) {
return Error(
"A SHRINK_VOLUME offer operation must have"
" the Offer.Operation.shrink_volume field set");
}
Option<Error> error = Resources::validate(
operation->shrink_volume().volume());
if (error.isSome()) {
return error;
}
break;
}
case Offer::Operation::LAUNCH: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_launch()) {
return Error(
"A LAUNCH offer operation must have"
" the Offer.Operation.launch field set.");
}
// Validate resources in LAUNCH.
foreach (const TaskInfo& task, operation->launch().task_infos()) {
Option<Error> error = Resources::validate(task.resources());
if (error.isSome()) {
return error;
}
if (task.has_executor()) {
Option<Error> error =
Resources::validate(task.executor().resources());
if (error.isSome()) {
return error;
}
}
}
break;
}
case Offer::Operation::LAUNCH_GROUP: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_launch_group()) {
return Error(
"A LAUNCH_GROUP offer operation must have"
" the Offer.Operation.launch_group field set.");
}
Offer::Operation::LaunchGroup* launchGroup =
operation->mutable_launch_group();
// Validate resources in LAUNCH_GROUP.
if (launchGroup->has_executor()) {
Option<Error> error =
Resources::validate(launchGroup->executor().resources());
if (error.isSome()) {
return error;
}
}
foreach (const TaskInfo& task, launchGroup->task_group().tasks()) {
Option<Error> error = Resources::validate(task.resources());
if (error.isSome()) {
return error;
}
if (task.has_executor()) {
Option<Error> error =
Resources::validate(task.executor().resources());
if (error.isSome()) {
return error;
}
}
}
break;
}
case Offer::Operation::CREATE_DISK: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_create_disk()) {
return Error(
"A CREATE_DISK offer operation must have"
" the Offer.Operation.create_disk field set.");
}
Option<Error> error =
Resources::validate(operation->create_disk().source());
if (error.isSome()) {
return error;
}
break;
}
case Offer::Operation::DESTROY_DISK: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
if (!operation->has_destroy_disk()) {
return Error(
"A DESTROY_DISK offer operation must have"
" the Offer.Operation.destroy_disk field set.");
}
Option<Error> error =
Resources::validate(operation->destroy_disk().source());
if (error.isSome()) {
return error;
}
break;
}
case Offer::Operation::UNKNOWN: {
// TODO(mpark): Once we perform a sanity check validation for
// offer operations as specified in MESOS-7760, this should no
// longer have to be handled in this function.
return Error("Unknown offer operation");
}
}
upgradeResources(operation);
return None();
}
Try<Nothing> downgradeResource(Resource* resource)
{
CHECK(!resource->has_role());
CHECK(!resource->has_reservation());
if (Resources::hasRefinedReservations(*resource)) {
return Error("Cannot downgrade resources containing refined reservations");
}
convertResourceFormat(resource, PRE_RESERVATION_REFINEMENT);
return Nothing();
}
Try<Nothing> downgradeResources(RepeatedPtrField<Resource>* resources)
{
CHECK_NOTNULL(resources);
foreach (Resource& resource, *resources) {
Try<Nothing> result = downgradeResource(&resource);
if (result.isError()) {
return result;
}
}
return Nothing();
}
Try<Nothing> downgradeResources(vector<Resource>* resources)
{
CHECK_NOTNULL(resources);
foreach (Resource& resource, *resources) {
Try<Nothing> result = downgradeResource(&resource);
if (result.isError()) {
return result;
}
}
return Nothing();
}
Try<Nothing> downgradeResources(Message* message)
{
CHECK_NOTNULL(message);
const Descriptor* descriptor = message->GetDescriptor();
hashmap<const Descriptor*, bool> resourcesContainment;
internal::precomputeResourcesContainment(descriptor, &resourcesContainment);
if (!resourcesContainment.at(descriptor)) {
return Nothing();
}
return internal::convertResourcesImpl(
message, downgradeResource, resourcesContainment);
}
Resources shrinkResources(const Resources& resources, ResourceQuantities target)
{
if (target.empty()) {
return Resources();
}
// TODO(mzhu): Add a `shuffle()` method in `Resources` to avoid this copy.
google::protobuf::RepeatedPtrField<Resource> resourceVector = resources;
std::random_shuffle(resourceVector.begin(), resourceVector.end());
Resources result;
foreach (Resource& resource, resourceVector) {
Value::Scalar scalar = target.get(resource.name());
if (scalar == Value::Scalar()) {
// Resource that has zero quantity is dropped (shrunk to zero).
continue;
}
// Target can only be explicitly specified for scalar resources.
CHECK_EQ(Value::SCALAR, resource.type()) << " Resources: " << resources;
if (Resources::shrink(&resource, scalar)) {
target -= ResourceQuantities::fromScalarResource(resource);
result += std::move(resource);
}
}
return result;
}
Resources shrinkResources(const Resources& resources, ResourceLimits target)
{
if (target.empty()) {
return resources;
}
// TODO(mzhu): Add a `shuffle()` method in `Resources` to avoid this copy.
google::protobuf::RepeatedPtrField<Resource> resourceVector = resources;
std::random_shuffle(resourceVector.begin(), resourceVector.end());
Resources result;
foreach (Resource resource, resourceVector) {
Option<Value::Scalar> limit = target.get(resource.name());
if (limit.isNone()) {
// Resource that has infinite limit is kept as is.
result += std::move(resource);
continue;
}
// Target can only be explicitly specified for scalar resources.
CHECK_EQ(Value::SCALAR, resource.type()) << " Resources: " << resources;
if (Resources::shrink(&resource, *limit)) {
target -= ResourceQuantities::fromScalarResource(resource);
result += std::move(resource);
}
}
return result;
}
} // namespace mesos {