Added offer constraints to `MesosSchedulerDriver::updateFramework()`.
This patch adds an ability to set V0 framework's offer constraints
via the C++ V0 scheduler driver.
Review: https://reviews.apache.org/r/72874
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 61cc846..1401c34 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -23,6 +23,7 @@
#include <vector>
#include <mesos/mesos.hpp>
+#include <mesos/scheduler/scheduler.pb.h>
// Mesos scheduler interface and scheduler driver. A scheduler is used
// to interact with Mesos in order to run distributed computations.
@@ -344,14 +345,16 @@
virtual Status reconcileTasks(
const std::vector<TaskStatus>& statuses) = 0;
- // Inform Mesos master about changes to the `FrameworkInfo` and
- // the set of suppressed roles. The driver will store the new
- // `FrameworkInfo` and the new set of suppressed roles, and all
- // subsequent re-registrations will use them.
+ // Requests Mesos master to change the `FrameworkInfo`, the set of suppressed
+ // roles and the offer constraints. The driver will store the new
+ // `FrameworkInfo`, the new set of suppressed roles and the new offer
+ // constraints, and all subsequent re-registrations will use them.
//
// NOTE: If the supplied info is invalid or fails authorization,
- // the `error()` callback will be invoked asynchronously (after
- // the master replies with a `FrameworkErrorMessage`).
+ // or the supplied offer constraints are not valid, the `error()` callback
+ // will be invoked asynchronously (after the master replies with a
+ // `FrameworkErrorMessage`). Note that validity of non-empty (i.e.
+ // not default-constructed) offer constraints may depend on master flags.
//
// NOTE: This must be called after initial registration with the
// master completes and the `FrameworkID` is assigned. The assigned
@@ -362,7 +365,8 @@
// during driver initialization.
virtual Status updateFramework(
const FrameworkInfo& frameworkInfo,
- const std::vector<std::string>& suppressedRoles) = 0;
+ const std::vector<std::string>& suppressedRoles,
+ ::mesos::scheduler::OfferConstraints&& offerConstraints) = 0;
};
@@ -524,7 +528,9 @@
Status updateFramework(
const FrameworkInfo& frameworkInfo,
- const std::vector<std::string>& suppressedRoles) override;
+ const std::vector<std::string>& suppressedRoles,
+ ::mesos::scheduler::OfferConstraints&& offerConstraints)
+ override;
protected:
// Used to detect (i.e., choose) the master.
diff --git a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
index 1817bba..4d71765 100644
--- a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
+++ b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
@@ -1086,7 +1086,8 @@
MesosSchedulerDriver* driver =
(MesosSchedulerDriver*) env->GetLongField(thiz, __driver);
- Status status = driver->updateFramework(frameworkInfo, suppressedRoles);
+ Status status = driver->updateFramework(
+ frameworkInfo, suppressedRoles, ::mesos::scheduler::OfferConstraints());
return convert<Status>(env, status);
}
diff --git a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp
index 256632a..17260cd 100644
--- a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp
+++ b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp
@@ -859,7 +859,9 @@
return nullptr;
}
- Status status = self->driver->updateFramework(framework, *suppressedRoles);
+ Status status = self->driver->updateFramework(
+ framework, *suppressedRoles, ::mesos::scheduler::OfferConstraints{});
+
return PyInt_FromLong(status);
}
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 768ce7d..119a062 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -105,6 +105,8 @@
using google::protobuf::RepeatedPtrField;
+using mesos::scheduler::OfferConstraints;
+
using mesos::master::detector::MasterDetector;
using process::Clock;
@@ -839,6 +841,7 @@
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(framework);
+ *subscribe->mutable_offer_constraints() = offerConstraints;
*subscribe->mutable_suppressed_roles() = RepeatedPtrField<string>(
suppressedRoles.begin(), suppressedRoles.end());
@@ -1645,7 +1648,8 @@
void updateFramework(
const FrameworkInfo& framework_,
- set<string>&& suppressedRoles_)
+ set<string>&& suppressedRoles_,
+ OfferConstraints&& offerConstraints_)
{
if (!framework.has_id() || framework.id().value().empty()) {
error("MesosSchedulerDriver::updateFramework() must not be called"
@@ -1664,6 +1668,7 @@
framework = framework_;
suppressedRoles = std::move(suppressedRoles_);
+ offerConstraints = std::move(offerConstraints_);
if (connected) {
sendUpdateFramework();
@@ -1725,8 +1730,10 @@
*call.mutable_framework_id() = framework.id();
call.set_type(Call::UPDATE_FRAMEWORK);
- *call.mutable_update_framework()->mutable_framework_info() = framework;
- *call.mutable_update_framework()->mutable_suppressed_roles() =
+ Call::UpdateFramework* updateFramework = call.mutable_update_framework();
+ *updateFramework->mutable_framework_info() = framework;
+ *updateFramework->mutable_offer_constraints() = offerConstraints;
+ *updateFramework->mutable_suppressed_roles() =
RepeatedPtrField<string>(suppressedRoles.begin(), suppressedRoles.end());
VLOG(1) << "Sending UPDATE_FRAMEWORK message";
@@ -1740,6 +1747,7 @@
Scheduler* scheduler;
FrameworkInfo framework;
set<string> suppressedRoles;
+ OfferConstraints offerConstraints;
std::recursive_mutex* mutex;
Latch* latch;
@@ -2476,7 +2484,8 @@
Status MesosSchedulerDriver::updateFramework(
const FrameworkInfo& update,
- const vector<string>& suppressedRoles_)
+ const vector<string>& suppressedRoles_,
+ OfferConstraints&& offerConstraints_)
{
synchronized (mutex) {
if (status != DRIVER_RUNNING) {
@@ -2497,8 +2506,12 @@
<< " " << suppressedRoles_.size() - suppressedRoles.size()
<< " duplicates " << suppressedRoles_;
- dispatch(process, &SchedulerProcess::updateFramework, framework,
- std::move(suppressedRoles));
+ dispatch(
+ process,
+ &SchedulerProcess::updateFramework,
+ framework,
+ std::move(suppressedRoles),
+ std::move(offerConstraints_));
return status;
}
diff --git a/src/tests/master/update_framework_tests.cpp b/src/tests/master/update_framework_tests.cpp
index d6c45f6..3f86573 100644
--- a/src/tests/master/update_framework_tests.cpp
+++ b/src/tests/master/update_framework_tests.cpp
@@ -1003,7 +1003,7 @@
driver.start();
- driver.updateFramework(DEFAULT_FRAMEWORK_INFO, {});
+ driver.updateFramework(DEFAULT_FRAMEWORK_INFO, {}, {});
AWAIT_READY(error);
EXPECT_EQ(error.get(),
@@ -1042,7 +1042,7 @@
*update.mutable_id() = frameworkId.get();
*update.mutable_id()->mutable_value() += "-deadbeef";
- driver.updateFramework(update, {});
+ driver.updateFramework(update, {}, {});
AWAIT_READY(error);
EXPECT_EQ(
@@ -1084,7 +1084,7 @@
FrameworkInfo update = changeAllMutableFields(DEFAULT_FRAMEWORK_INFO);
update.set_checkpoint(!update.checkpoint());
*update.mutable_id() = frameworkId.get();
- driver.updateFramework(update, {});
+ driver.updateFramework(update, {}, {});
AWAIT_READY(error);
EXPECT_TRUE(strings::contains(
@@ -1142,7 +1142,7 @@
FrameworkInfo update = changeAllMutableFields(DEFAULT_FRAMEWORK_INFO);
*update.mutable_id() = frameworkId.get();
- driver.updateFramework(update, {});
+ driver.updateFramework(update, {}, {});
AWAIT_READY(updateFrameworkMessage);
@@ -1235,7 +1235,7 @@
update.add_roles("new_role");
*update.mutable_id() = frameworkId.get();
- driver.updateFramework(update, {});
+ driver.updateFramework(update, {}, {});
AWAIT_READY(offers);
@@ -1295,7 +1295,7 @@
update.clear_roles();
*update.mutable_id() = frameworkId.get();
- driver.updateFramework(update, {});
+ driver.updateFramework(update, {}, {});
AWAIT_READY(rescindedOfferId);
AWAIT_READY(recoverResources);
@@ -1366,7 +1366,7 @@
vector<string> suppressedRoles(
update.roles().begin(), update.roles().end());
- driver.updateFramework(update, suppressedRoles);
+ driver.updateFramework(update, suppressedRoles, {});
// Ensure that the allocator processes the update, so that this test
// does not rely on Master maintaining an ordering between scheduler API calls
@@ -1442,8 +1442,8 @@
vector<string> suppressedRoles(
update.roles().begin(), update.roles().end());
- driver.updateFramework(update, suppressedRoles);
- driver.updateFramework(update, {});
+ driver.updateFramework(update, suppressedRoles, {});
+ driver.updateFramework(update, {}, {});
// Now the previously declined agent should be re-offered.
Clock::pause();
diff --git a/src/tests/scheduler_driver_tests.cpp b/src/tests/scheduler_driver_tests.cpp
index 63d7a3b..b45c367 100644
--- a/src/tests/scheduler_driver_tests.cpp
+++ b/src/tests/scheduler_driver_tests.cpp
@@ -692,7 +692,7 @@
// In addition to setting a filter, suppress role2 (to check that
// reviveOffers() not only removes filters, but also unsuppresses roles).
*frameworkInfo.mutable_id() = frameworkId.get();
- driver.updateFramework(frameworkInfo, {"role2"});
+ driver.updateFramework(frameworkInfo, {"role2"}, {});
// Wait for updateFramework() to be dispatched to the allocator.
// Otherwise, REVIVE might be processed by the allocator before the update.