Updated all GET_* v1 master calls to be served in parallel.
Using the same approach taken for the v0 read-only endpoints,
this enables parallel reads for the GET_* v1 master calls.
Review: https://reviews.apache.org/r/71978
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 16dbed3..8a58863 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -539,7 +539,7 @@
field = v1::master::Event::Subscribed::kGetStateFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
- jsonifyGetState(master, approvers));
+ Master::ReadOnlyHandler(master).jsonifyGetState(approvers));
field = v1::master::Event::Subscribed::kHeartbeatIntervalSecondsFieldNumber;
writer->field(
@@ -565,7 +565,7 @@
WireFormatLite::WriteBytes(
mesos::v1::master::Event::Subscribed::kGetStateFieldNumber,
- serializeGetState(approvers),
+ Master::ReadOnlyHandler(master).serializeGetState(approvers),
&writer);
WireFormatLite::WriteDouble(
@@ -1392,6 +1392,7 @@
return deferBatchedRequest(
&Master::ReadOnlyHandler::frameworks,
principal,
+ ContentType::JSON,
request.url.query,
approvers);
}));
@@ -1451,7 +1452,7 @@
Future<Response> Master::Http::getFrameworks(
const mesos::master::Call& call,
const Option<Principal>& principal,
- ContentType contentType) const
+ ContentType outputContentType) const
{
CHECK_EQ(mesos::master::Call::GET_FRAMEWORKS, call.type());
@@ -1460,182 +1461,16 @@
principal,
{VIEW_FRAMEWORK})
.then(defer(
- master->self(),
- [=](const Owned<ObjectApprovers>& approvers) -> Future<Response> {
- // Serialize the following message:
- //
- // mesos::master::Response response;
- // response.set_type(mesos::master::Response::GET_FRAMEWORKS);
- // *response.mutable_get_frameworks() = _getFrameworks(approvers);
-
- switch (contentType) {
- case ContentType::PROTOBUF: {
- string output;
- google::protobuf::io::StringOutputStream stream(&output);
- google::protobuf::io::CodedOutputStream writer(&stream);
-
- WireFormatLite::WriteEnum(
- mesos::v1::master::Response::kTypeFieldNumber,
- mesos::v1::master::Response::GET_FRAMEWORKS,
- &writer);
-
- WireFormatLite::WriteBytes(
- mesos::v1::master::Response::kGetFrameworksFieldNumber,
- serializeGetFrameworks(approvers),
- &writer);
-
- // We must manually trim the unused buffer space since
- // we use the string before the coded output stream is
- // destructed.
- writer.Trim();
-
- return OK(std::move(output), stringify(contentType));
- }
-
- case ContentType::JSON: {
- string body = jsonify([&](JSON::ObjectWriter* writer) {
- const google::protobuf::Descriptor* descriptor =
- v1::master::Response::descriptor();
-
- int field;
-
- field = v1::master::Response::kTypeFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- v1::master::Response::Type_Name(
- v1::master::Response::GET_FRAMEWORKS));
-
- field = v1::master::Response::kGetFrameworksFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- jsonifyGetFrameworks(master, approvers));
- });
-
- // TODO(bmahler): Pass jsonp query parameter through here.
- return OK(std::move(body), stringify(contentType));
- }
-
- default:
- return NotAcceptable("Request must accept json or protobuf");
- }
- }));
-}
-
-
-function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetFrameworks(
- const Master* master,
- const Owned<ObjectApprovers>& approvers)
-{
- // Serialize the following:
- //
- // mesos::master::Response::GetFrameworks getFrameworks;
- // for each framework:
- // *getFrameworks.add_frameworks() = model(*framework);
- // for each completed framework:
- // *getFrameworks.add_completed_frameworks() = model(*framework);
-
- // TODO(bmahler): Consider not constructing the temporary framework
- // objects and instead serialize directly, but since we don't
- // expect a large number of pending tasks, we currently don't
- // bother with the more efficient approach.
-
- // TODO(bmahler): This copies the Owned object approvers.
- return [=](JSON::ObjectWriter* writer) {
- const google::protobuf::Descriptor* descriptor =
- v1::master::Response::GetFrameworks::descriptor();
-
- int field;
-
- field = v1::master::Response::GetFrameworks::kFrameworksFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- [&](JSON::ArrayWriter* writer) {
- foreachvalue (const Framework* framework,
- master->frameworks.registered) {
- // Skip unauthorized frameworks.
- if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- continue;
- }
-
- mesos::master::Response::GetFrameworks::Framework f = model(*framework);
- writer->element(asV1Protobuf(f));
- }
- });
-
- field =
- v1::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- [&](JSON::ArrayWriter* writer) {
- foreachvalue (const Owned<Framework>& framework,
- master->frameworks.completed) {
- // Skip unauthorized frameworks.
- if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- continue;
- }
-
- mesos::master::Response::GetFrameworks::Framework f = model(*framework);
- writer->element(asV1Protobuf(f));
- }
- });
- };
-}
-
-
-string Master::Http::serializeGetFrameworks(
- const Owned<ObjectApprovers>& approvers) const
-{
- // Serialize the following:
- //
- // mesos::master::Response::GetFrameworks getFrameworks;
- // for each framework:
- // *getFrameworks.add_frameworks() = model(*framework);
- // for each completed framework:
- // *getFrameworks.add_completed_frameworks() = model(*framework);
-
- string output;
- google::protobuf::io::StringOutputStream stream(&output);
- google::protobuf::io::CodedOutputStream writer(&stream);
-
- // TODO(bmahler): Consider not constructing the temporary framework
- // objects and instead serialize directly, but since we don't
- // expect a large number of pending tasks, we currently don't
- // bother with the more efficient approach.
-
- foreachvalue (const Framework* framework,
- master->frameworks.registered) {
- // Skip unauthorized frameworks.
- if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- continue;
- }
-
- WireFormatLite2::WriteMessageWithoutCachedSizes(
- mesos::master::Response::GetFrameworks::kFrameworksFieldNumber,
- model(*framework),
- &writer);
- }
-
- foreachvalue (const Owned<Framework>& framework,
- master->frameworks.completed) {
- // Skip unauthorized frameworks.
- if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- continue;
- }
-
- WireFormatLite2::WriteMessageWithoutCachedSizes(
- mesos::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber,
- model(*framework),
- &writer);
- }
-
- // While an explicit Trim() isn't necessary (since the coded
- // output stream is destructed before the string is returned),
- // it's a quite tricky bug to diagnose if Trim() is missed, so
- // we always do it explicitly to signal the reader about this
- // subtlety.
- writer.Trim();
-
- return output;
+ master->self(),
+ [this, principal, outputContentType](
+ const Owned<ObjectApprovers>& approvers) {
+ return deferBatchedRequest(
+ &Master::ReadOnlyHandler::getFrameworks,
+ principal,
+ outputContentType,
+ {},
+ approvers);
+ }));
}
@@ -1670,7 +1505,7 @@
Future<Response> Master::Http::getExecutors(
const mesos::master::Call& call,
const Option<Principal>& principal,
- ContentType contentType) const
+ ContentType outputContentType) const
{
CHECK_EQ(mesos::master::Call::GET_EXECUTORS, call.type());
@@ -1679,243 +1514,16 @@
principal,
{VIEW_FRAMEWORK, VIEW_EXECUTOR})
.then(defer(
- master->self(),
- [=](const Owned<ObjectApprovers>& approvers) -> Response {
- // Serialize the following message:
- //
- // mesos::master::Response response;
- // response.set_type(mesos::master::Response::GET_EXECUTORS);
- // *response.mutable_get_executors() = _getExecutors(approvers);
-
- switch (contentType) {
- case ContentType::PROTOBUF: {
- string output;
- google::protobuf::io::StringOutputStream stream(&output);
- google::protobuf::io::CodedOutputStream writer(&stream);
-
- WireFormatLite::WriteEnum(
- mesos::v1::master::Response::kTypeFieldNumber,
- mesos::v1::master::Response::GET_EXECUTORS,
- &writer);
-
- WireFormatLite::WriteBytes(
- mesos::v1::master::Response::kGetExecutorsFieldNumber,
- serializeGetExecutors(approvers),
- &writer);
-
- // We must manually trim the unused buffer space since
- // we use the string before the coded output stream is
- // destructed.
- writer.Trim();
-
- return OK(std::move(output), stringify(contentType));
- }
-
- case ContentType::JSON: {
- string body = jsonify([&](JSON::ObjectWriter* writer) {
- const google::protobuf::Descriptor* descriptor =
- v1::master::Response::descriptor();
-
- int field;
-
- field = v1::master::Response::kTypeFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- v1::master::Response::Type_Name(
- v1::master::Response::GET_EXECUTORS));
-
- field = v1::master::Response::kGetExecutorsFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- jsonifyGetExecutors(master, approvers));
- });
-
- // TODO(bmahler): Pass jsonp query parameter through here.
- return OK(std::move(body), stringify(contentType));
- }
-
- default:
- return NotAcceptable("Request must accept json or protobuf");
- }
- }));
-}
-
-
-function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetExecutors(
- const Master* master,
- const Owned<ObjectApprovers>& approvers)
-{
- // Serialize the following:
- //
- // mesos::master::Response::GetExecutors getExecutors;
- //
- // for each (executor, agent):
- // mesos::master::Response::GetExecutors::Executor* executor =
- // getExecutors.add_executors();
- // *executor->mutable_executor_info() = executorInfo;
- // *executor->mutable_slave_id() = slaveId;
-
- // TODO(bmahler): This copies the owned object approvers.
- return [=](JSON::ObjectWriter* writer) {
- // Construct framework list with both active and completed frameworks.
- vector<const Framework*> frameworks;
- foreachvalue (Framework* framework, master->frameworks.registered) {
- // Skip unauthorized frameworks.
- if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- frameworks.push_back(framework);
- }
- }
- foreachvalue (const Owned<Framework>& framework,
- master->frameworks.completed) {
- // Skip unauthorized frameworks.
- if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- frameworks.push_back(framework.get());
- }
- }
-
- const google::protobuf::Descriptor* descriptor =
- v1::master::Response::GetExecutors::descriptor();
-
- int field;
-
- field = v1::master::Response::GetExecutors::kExecutorsFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- [&](JSON::ArrayWriter* writer) {
- foreach (const Framework* framework, frameworks) {
- foreachpair (const SlaveID& slaveId,
- const auto& executorsMap,
- framework->executors) {
- foreachvalue (const ExecutorInfo& executorInfo, executorsMap) {
- // Skip unauthorized executors.
- if (!approvers->approved<VIEW_EXECUTOR>(
- executorInfo, framework->info)) {
- continue;
- }
-
- writer->element([&](JSON::ObjectWriter* writer) {
- const google::protobuf::Descriptor* descriptor =
- v1::master::Response::GetExecutors::Executor::descriptor();
-
- // Serialize the following message:
- //
- // mesos::master::Response::GetExecutors::Executor executor;
- // *executor.mutable_executor_info() = executorInfo;
- // *executor.mutable_slave_id() = slaveId;
- int field;
-
- field = v1::master::Response::GetExecutors::Executor
- ::kExecutorInfoFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- asV1Protobuf(executorInfo));
-
- field = v1::master::Response::GetExecutors::Executor
- ::kAgentIdFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- asV1Protobuf(slaveId));
- });
- }
- }
- }
- });
- };
-}
-
-
-string Master::Http::serializeGetExecutors(
- const Owned<ObjectApprovers>& approvers) const
-{
- // Construct framework list with both active and completed frameworks.
- vector<const Framework*> frameworks;
- foreachvalue (Framework* framework, master->frameworks.registered) {
- // Skip unauthorized frameworks.
- if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- frameworks.push_back(framework);
- }
- }
- foreachvalue (const Owned<Framework>& framework,
- master->frameworks.completed) {
- // Skip unauthorized frameworks.
- if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- frameworks.push_back(framework.get());
- }
- }
-
- // Lambda for serializing the following message:
- //
- // mesos::master::Response::GetExecutors::Executor executor;
- // *executor.mutable_executor_info() = executorInfo;
- // *executor.mutable_slave_id() = slaveId;
- auto serializeExecutor = [](const ExecutorInfo& e, const SlaveID& s) {
- string output;
- google::protobuf::io::StringOutputStream stream(&output);
- google::protobuf::io::CodedOutputStream writer(&stream);
-
- WireFormatLite2::WriteMessageWithoutCachedSizes(
- mesos::v1::master::Response::GetExecutors::Executor
- ::kExecutorInfoFieldNumber,
- e,
- &writer);
-
- WireFormatLite2::WriteMessageWithoutCachedSizes(
- mesos::v1::master::Response::GetExecutors::Executor
- ::kAgentIdFieldNumber,
- s,
- &writer);
-
- // While an explicit Trim() isn't necessary (since the coded
- // output stream is destructed before the string is returned),
- // it's a quite tricky bug to diagnose if Trim() is missed, so
- // we always do it explicitly to signal the reader about this
- // subtlety.
- writer.Trim();
-
- return output;
- };
-
- string output;
- google::protobuf::io::StringOutputStream stream(&output);
- google::protobuf::io::CodedOutputStream writer(&stream);
-
- // Serialize the following:
- //
- // mesos::master::Response::GetExecutors getExecutors;
- //
- // for each (executor, agent):
- // mesos::master::Response::GetExecutors::Executor* executor =
- // getExecutors.add_executors();
- // *executor->mutable_executor_info() = executorInfo;
- // *executor->mutable_slave_id() = slaveId;
-
- foreach (const Framework* framework, frameworks) {
- foreachpair (const SlaveID& slaveId,
- const auto& executorsMap,
- framework->executors) {
- foreachvalue (const ExecutorInfo& executorInfo, executorsMap) {
- // Skip unauthorized executors.
- if (!approvers->approved<VIEW_EXECUTOR>(
- executorInfo, framework->info)) {
- continue;
- }
-
- WireFormatLite::WriteBytes(
- mesos::v1::master::Response::GetExecutors::kExecutorsFieldNumber,
- serializeExecutor(executorInfo, slaveId),
- &writer);
- }
- }
- }
-
- // While an explicit Trim() isn't necessary (since the coded
- // output stream is destructed before the string is returned),
- // it's a quite tricky bug to diagnose if Trim() is missed, so
- // we always do it explicitly to signal the reader about this
- // subtlety.
- writer.Trim();
-
- return output;
+ master->self(),
+ [this, principal, outputContentType](
+ const Owned<ObjectApprovers>& approvers) {
+ return deferBatchedRequest(
+ &Master::ReadOnlyHandler::getExecutors,
+ principal,
+ outputContentType,
+ {},
+ approvers);
+ }));
}
@@ -1972,7 +1580,7 @@
Future<Response> Master::Http::getState(
const mesos::master::Call& call,
const Option<Principal>& principal,
- ContentType contentType) const
+ ContentType outputContentType) const
{
CHECK_EQ(mesos::master::Call::GET_STATE, call.type());
@@ -1981,153 +1589,16 @@
principal,
{VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR, VIEW_ROLE})
.then(defer(
- master->self(),
- [=](const Owned<ObjectApprovers>& approvers) -> Response {
- // Serialize the following message:
- //
- // mesos::master::Response response;
- // response.set_type(mesos::master::Response::GET_STATE);
- // *response.mutable_get_state() = _getState(approvers);
-
- switch (contentType) {
- case ContentType::PROTOBUF: {
- string output;
- google::protobuf::io::StringOutputStream stream(&output);
- google::protobuf::io::CodedOutputStream writer(&stream);
-
- WireFormatLite::WriteEnum(
- mesos::v1::master::Response::kTypeFieldNumber,
- mesos::v1::master::Response::GET_STATE,
- &writer);
-
- WireFormatLite::WriteBytes(
- mesos::v1::master::Response::kGetStateFieldNumber,
- serializeGetState(approvers),
- &writer);
-
- // We must manually trim the unused buffer space since
- // we use the string before the coded output stream is
- // destructed.
- writer.Trim();
-
- return OK(std::move(output), stringify(contentType));
- }
-
- case ContentType::JSON: {
- string body = jsonify([&](JSON::ObjectWriter* writer) {
- const google::protobuf::Descriptor* descriptor =
- v1::master::Response::descriptor();
-
- int field;
-
- field = v1::master::Response::kTypeFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- v1::master::Response::Type_Name(
- v1::master::Response::GET_STATE));
-
- field = v1::master::Response::kGetStateFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- jsonifyGetState(master, approvers));
- });
-
- // TODO(bmahler): Pass jsonp query parameter through here.
- return OK(std::move(body), stringify(contentType));
- }
-
- default:
- return NotAcceptable("Request must accept json or protobuf");
- }
- }));
-}
-
-
-function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetState(
- const Master* master,
- const Owned<ObjectApprovers>& approvers)
-{
- // Jsonify the following message:
- //
- // mesos::master::Response::GetState getState;
- // *getState.mutable_get_tasks() = _getTasks(approvers);
- // *getState.mutable_get_executors() = _getExecutors(approvers);
- // *getState.mutable_get_frameworks() = _getFrameworks(approvers);
- // *getState.mutable_get_agents() = _getAgents(approvers);
-
- // TODO(bmahler): This copies the Owned object approvers.
- return [=](JSON::ObjectWriter* writer) {
- const google::protobuf::Descriptor* descriptor =
- v1::master::Response::GetState::descriptor();
-
- int field;
-
- field = v1::master::Response::GetState::kGetTasksFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- jsonifyGetTasks(master, approvers));
-
- field = v1::master::Response::GetState::kGetExecutorsFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- jsonifyGetExecutors(master, approvers));
-
- field = v1::master::Response::GetState::kGetFrameworksFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- jsonifyGetFrameworks(master, approvers));
-
- field = v1::master::Response::GetState::kGetAgentsFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- jsonifyGetAgents(master, approvers));
- };
-}
-
-
-string Master::Http::serializeGetState(
- const Owned<ObjectApprovers>& approvers) const
-{
- // Serialize the following message:
- //
- // mesos::master::Response::GetState getState;
- // *getState.mutable_get_tasks() = _getTasks(approvers);
- // *getState.mutable_get_executors() = _getExecutors(approvers);
- // *getState.mutable_get_frameworks() = _getFrameworks(approvers);
- // *getState.mutable_get_agents() = _getAgents(approvers);
-
- string output;
- google::protobuf::io::StringOutputStream stream(&output);
- google::protobuf::io::CodedOutputStream writer(&stream);
-
- WireFormatLite::WriteBytes(
- mesos::v1::master::Response::GetState::kGetTasksFieldNumber,
- serializeGetTasks(approvers),
- &writer);
-
- WireFormatLite::WriteBytes(
- mesos::v1::master::Response::GetState::kGetExecutorsFieldNumber,
- serializeGetExecutors(approvers),
- &writer);
-
- WireFormatLite::WriteBytes(
- mesos::v1::master::Response::GetState::kGetFrameworksFieldNumber,
- serializeGetFrameworks(approvers),
- &writer);
-
- WireFormatLite::WriteBytes(
- mesos::v1::master::Response::GetState::kGetAgentsFieldNumber,
- serializeGetAgents(approvers),
- &writer);
-
- // While an explicit Trim() isn't necessary (since the coded
- // output stream is destructed before the string is returned),
- // it's a quite tricky bug to diagnose if Trim() is missed, so
- // we always do it explicitly to signal the reader about this
- // subtlety.
- writer.Trim();
-
- return output;
+ master->self(),
+ [this, principal, outputContentType](
+ const Owned<ObjectApprovers>& approvers) {
+ return deferBatchedRequest(
+ &Master::ReadOnlyHandler::getState,
+ principal,
+ outputContentType,
+ {},
+ approvers);
+ }));
}
@@ -2896,6 +2367,7 @@
return deferBatchedRequest(
&Master::ReadOnlyHandler::slaves,
principal,
+ ContentType::JSON,
request.url.query,
approvers);
}));
@@ -2905,204 +2377,22 @@
Future<Response> Master::Http::getAgents(
const mesos::master::Call& call,
const Option<Principal>& principal,
- ContentType contentType) const
+ ContentType outputContentType) const
{
CHECK_EQ(mesos::master::Call::GET_AGENTS, call.type());
return ObjectApprovers::create(master->authorizer, principal, {VIEW_ROLE})
.then(defer(
- master->self(),
- [=](const Owned<ObjectApprovers>& approvers) -> Response {
- // Serialize the following message:
- //
- // mesos::master::Response response;
- // response.set_type(mesos::master::Response::GET_AGENTS);
- // *response.mutable_get_agents() = _getAgents(approvers);
-
- switch (contentType) {
- case ContentType::PROTOBUF: {
- string output;
- google::protobuf::io::StringOutputStream stream(&output);
- google::protobuf::io::CodedOutputStream writer(&stream);
-
- WireFormatLite::WriteEnum(
- mesos::v1::master::Response::kTypeFieldNumber,
- mesos::v1::master::Response::GET_AGENTS,
- &writer);
-
- WireFormatLite::WriteBytes(
- mesos::v1::master::Response::kGetAgentsFieldNumber,
- serializeGetAgents(approvers),
- &writer);
-
- // We must manually trim the unused buffer space since
- // we use the string before the coded output stream is
- // destructed.
- writer.Trim();
-
- return OK(std::move(output), stringify(contentType));
- }
-
- case ContentType::JSON: {
- string body = jsonify([&](JSON::ObjectWriter* writer) {
- const google::protobuf::Descriptor* descriptor =
- v1::master::Response::descriptor();
-
- int field;
-
- field = v1::master::Response::kTypeFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- v1::master::Response::Type_Name(
- v1::master::Response::GET_AGENTS));
-
- field = v1::master::Response::kGetAgentsFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- jsonifyGetAgents(master, approvers));
- });
-
- // TODO(bmahler): Pass jsonp query parameter through here.
- return OK(std::move(body), stringify(contentType));
- }
-
- default:
- return NotAcceptable("Request must accept json or protobuf");
- }
- }));
-}
-
-
-function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetAgents(
- const Master* master,
- const Owned<ObjectApprovers>& approvers)
-{
- // Serialize the following:
- //
- // mesos::master::Response::GetAgents getAgents;
- // for each registered agent:
- // *getAgents.add_agents() = protobuf::master::event::createAgentResponse(
- // agent,
- // master->slaves.draining.get(slave->id),
- // master->slaves.deactivated.contains(slave->id),
- // approvers);
- // for each recovered agent:
- // SlaveInfo* agent = getAgents.add_recovered_agents();
- // agent->CopyFrom(slaveInfo);
- // agent->clear_resources();
- // foreach (const Resource& resource, slaveInfo.resources()):
- // if (approvers->approved<VIEW_ROLE>(resource)):
- // *agent->add_resources() = resource;
-
- // TODO(bmahler): This copies the Owned object approvers.
- return [=](JSON::ObjectWriter* writer) {
- const google::protobuf::Descriptor* descriptor =
- v1::master::Response::GetAgents::descriptor();
-
- int field;
-
- field = v1::master::Response::GetAgents::kAgentsFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- [&](JSON::ArrayWriter* writer) {
- foreachvalue (const Slave* slave, master->slaves.registered) {
- // TODO(bmahler): Consider not constructing the temporary
- // agent object and instead serialize directly.
- mesos::master::Response::GetAgents::Agent agent =
- protobuf::master::event::createAgentResponse(
- *slave,
- master->slaves.draining.get(slave->id),
- master->slaves.deactivated.contains(slave->id),
+ master->self(),
+ [this, principal, outputContentType](
+ const Owned<ObjectApprovers>& approvers) {
+ return deferBatchedRequest(
+ &Master::ReadOnlyHandler::getAgents,
+ principal,
+ outputContentType,
+ {},
approvers);
-
- writer->element(asV1Protobuf(agent));
- }
- });
-
- field = v1::master::Response::GetAgents::kRecoveredAgentsFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- [&](JSON::ArrayWriter* writer) {
- foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
- // TODO(bmahler): Consider not constructing the temporary
- // SlaveInfo object and instead serialize directly.
- SlaveInfo agent = slaveInfo;
- agent.clear_resources();
- foreach (const Resource& resource, slaveInfo.resources()) {
- if (approvers->approved<VIEW_ROLE>(resource)) {
- *agent.add_resources() = resource;
- }
- }
-
- writer->element(asV1Protobuf(agent));
- }
- });
- };
-}
-
-
-string Master::Http::serializeGetAgents(
- const Owned<ObjectApprovers>& approvers) const
-{
- // Serialize the following:
- //
- // mesos::master::Response::GetAgents getAgents;
- // for each registered agent:
- // *getAgents.add_agents() = protobuf::master::event::createAgentResponse(
- // agent,
- // master->slaves.draining.get(slave->id),
- // master->slaves.deactivated.contains(slave->id),
- // approvers);
- // for each recovered agent:
- // SlaveInfo* agent = getAgents.add_recovered_agents();
- // agent->CopyFrom(slaveInfo);
- // agent->clear_resources();
- // foreach (const Resource& resource, slaveInfo.resources()):
- // if (approvers->approved<VIEW_ROLE>(resource)):
- // *agent->add_resources() = resource;
-
- string output;
- google::protobuf::io::StringOutputStream stream(&output);
- google::protobuf::io::CodedOutputStream writer(&stream);
-
- foreachvalue (const Slave* slave, master->slaves.registered) {
- // TODO(bmahler): Consider not constructing the temporary
- // agent object and instead serialize directly.
- WireFormatLite2::WriteMessageWithoutCachedSizes(
- mesos::master::Response::GetAgents::kAgentsFieldNumber,
- protobuf::master::event::createAgentResponse(
- *slave,
- master->slaves.draining.get(slave->id),
- master->slaves.deactivated.contains(slave->id),
- approvers),
- &writer);
- }
-
- foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
- // TODO(bmahler): Consider not constructing the temporary
- // SlaveInfo object and instead serialize directly.
- SlaveInfo agent = slaveInfo;
- agent.clear_resources();
- foreach (const Resource& resource, slaveInfo.resources()) {
- if (approvers->approved<VIEW_ROLE>(resource)) {
- *agent.add_resources() = resource;
- }
- }
-
- WireFormatLite2::WriteMessageWithoutCachedSizes(
- mesos::master::Response::GetAgents::kRecoveredAgentsFieldNumber,
- agent,
- &writer);
- }
-
- // While an explicit Trim() isn't necessary (since the coded
- // output stream is destructed before the string is returned),
- // it's a quite tricky bug to diagnose if Trim() is missed, so
- // we always do it explicitly to signal the reader about this
- // subtlety.
- writer.Trim();
-
- return output;
+ }));
}
@@ -3392,6 +2682,7 @@
return deferBatchedRequest(
&Master::ReadOnlyHandler::state,
principal,
+ ContentType::JSON,
request.url.query,
approvers);
}));
@@ -3401,13 +2692,14 @@
Future<Response> Master::Http::deferBatchedRequest(
ReadOnlyRequestHandler handler,
const Option<Principal>& principal,
+ ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const Owned<ObjectApprovers>& approvers) const
{
bool scheduleBatch = batchedRequests.empty();
auto it = std::find_if(batchedRequests.begin(), batchedRequests.end(),
- [handler, &principal, &queryParameters](
+ [handler, &principal, &queryParameters, &outputContentType](
const BatchedRequest& batchedRequest) {
// NOTE: This is not a general-purpose request comparison, but
// specific to the batched requests which are always members of
@@ -3415,6 +2707,7 @@
// on query parameters and the current master state.
return handler == batchedRequest.handler &&
principal == batchedRequest.principal &&
+ outputContentType == batchedRequest.outputContentType &&
queryParameters == batchedRequest.queryParameters;
});
@@ -3434,6 +2727,7 @@
future = promise.future();
batchedRequests.push_back(BatchedRequest{
handler,
+ outputContentType,
queryParameters,
principal,
approvers,
@@ -3466,11 +2760,16 @@
foreach (BatchedRequest& request, batchedRequests) {
request.promise.associate(process::async(
[this](ReadOnlyRequestHandler handler,
+ ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) {
- return (readonlyHandler.*handler)(queryParameters, approvers);
+ return (readonlyHandler.*handler)(
+ outputContentType,
+ queryParameters,
+ approvers);
},
request.handler,
+ request.outputContentType,
request.queryParameters,
request.approvers));
}
@@ -3597,6 +2896,7 @@
return deferBatchedRequest(
&Master::ReadOnlyHandler::stateSummary,
principal,
+ ContentType::JSON,
request.url.query,
approvers);
}));
@@ -3650,6 +2950,7 @@
return deferBatchedRequest(
&Master::ReadOnlyHandler::roles,
principal,
+ ContentType::JSON,
request.url.query,
approvers);
}));
@@ -3711,64 +3012,22 @@
Future<Response> Master::Http::getRoles(
const mesos::master::Call& call,
const Option<Principal>& principal,
- ContentType contentType) const
+ ContentType outputContentType) const
{
CHECK_EQ(mesos::master::Call::GET_ROLES, call.type());
+
return ObjectApprovers::create(master->authorizer, principal, {VIEW_ROLE})
- .then(defer(master->self(),
- [this, contentType](const Owned<ObjectApprovers>& approvers)
- -> Response {
- const vector<string> knownRoles = master->knownRoles();
-
- mesos::master::Response response;
- response.set_type(mesos::master::Response::GET_ROLES);
-
- mesos::master::Response::GetRoles* getRoles =
- response.mutable_get_roles();
-
- foreach (const string& name, knownRoles) {
- if (!approvers->approved<VIEW_ROLE>(name)) {
- continue;
- }
-
- mesos::Role* role = getRoles->add_roles();
-
- role->set_name(name);
-
- role->set_weight(master->weights.get(name).getOrElse(DEFAULT_WEIGHT));
-
- RoleResourceBreakdown resourceBreakdown(master, name);
-
- ResourceQuantities allocatedAndOffered =
- resourceBreakdown.allocated() + resourceBreakdown.offered();
-
- // `resources` will be deprecated in favor of
- // `offered`, `allocated`, `reserved`, and quota consumption.
- // As a result, we don't bother trying to expose more
- // than {cpus, mem, disk, gpus} since we don't know if
- // anything outside this set is of type SCALAR.
- foreach (const auto& quantity, allocatedAndOffered) {
- if (quantity.first == "cpus" || quantity.first == "mem" ||
- quantity.first == "disk" || quantity.first == "gpus") {
- Resource* resource = role->add_resources();
- resource->set_name(quantity.first);
- resource->set_type(Value::SCALAR);
- *resource->mutable_scalar() = quantity.second;
- }
- }
-
- Option<Role*> role_ = master->roles.get(name);
-
- if (role_.isSome()) {
- foreachkey (const FrameworkID& frameworkId, (*role_)->frameworks) {
- *role->add_frameworks() = frameworkId;
- }
- }
- }
-
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
- }));
+ .then(defer(
+ master->self(),
+ [this, principal, outputContentType](
+ const Owned<ObjectApprovers>& approvers) {
+ return deferBatchedRequest(
+ &Master::ReadOnlyHandler::getRoles,
+ principal,
+ outputContentType,
+ {},
+ approvers);
+ }));
}
@@ -3913,66 +3172,22 @@
Future<Response> Master::Http::getOperations(
const mesos::master::Call& call,
const Option<Principal>& principal,
- ContentType contentType) const
+ ContentType outputContentType) const
{
CHECK_EQ(mesos::master::Call::GET_OPERATIONS, call.type());
return ObjectApprovers::create(master->authorizer, principal, {VIEW_ROLE})
.then(defer(
- master->self(),
- [=](const Owned<ObjectApprovers>& approvers) -> Response {
- // We consider a principal to be authorized to view an operation if it
- // is authorized to view the resources the operation is performed on.
- auto approved = [&approvers](const Operation& operation) {
- Try<Resources> consumedResources =
- protobuf::getConsumedResources(operation.info());
-
- if (consumedResources.isError()) {
- LOG(WARNING)
- << "Could not approve operation " << operation.uuid()
- << " since its consumed resources could not be determined:"
- << consumedResources.error();
-
- return false;
- }
-
- foreach (const Resource& resource, consumedResources.get()) {
- if (!approvers->approved<VIEW_ROLE>(resource)) {
- return false;
- }
- }
-
- return true;
- };
-
- mesos::master::Response response;
- response.set_type(mesos::master::Response::GET_OPERATIONS);
-
- mesos::master::Response::GetOperations* operations =
- response.mutable_get_operations();
-
- foreachvalue (const Slave* slave, master->slaves.registered) {
- foreachvalue (Operation* operation, slave->operations) {
- if (approved(*operation)) {
- operations->add_operations()->CopyFrom(*operation);
- }
- }
-
- foreachvalue (
- const Slave::ResourceProvider resourceProvider,
- slave->resourceProviders) {
- foreachvalue (Operation* operation, resourceProvider.operations) {
- if (approved(*operation)) {
- operations->add_operations()->CopyFrom(*operation);
- }
- }
- }
- }
-
- return OK(
- serialize(contentType, evolve(response)),
- stringify(contentType));
- }));
+ master->self(),
+ [this, principal, outputContentType](
+ const Owned<ObjectApprovers>& approvers) {
+ return deferBatchedRequest(
+ &Master::ReadOnlyHandler::getOperations,
+ principal,
+ outputContentType,
+ {},
+ approvers);
+ }));
}
@@ -4043,6 +3258,7 @@
return deferBatchedRequest(
&Master::ReadOnlyHandler::tasks,
principal,
+ ContentType::JSON,
request.url.query,
approvers);
}));
@@ -4052,7 +3268,7 @@
Future<Response> Master::Http::getTasks(
const mesos::master::Call& call,
const Option<Principal>& principal,
- ContentType contentType) const
+ ContentType outputContentType) const
{
CHECK_EQ(mesos::master::Call::GET_TASKS, call.type());
@@ -4061,290 +3277,16 @@
principal,
{VIEW_FRAMEWORK, VIEW_TASK})
.then(defer(
- master->self(),
- [=](const Owned<ObjectApprovers>& approvers) -> Response {
- // Serialize the following message:
- //
- // mesos::master::Response response;
- // response.set_type(mesos::master::Response::GET_TASKS);
- // *response.mutable_get_tasks() = _getTasks(approvers);
-
- switch (contentType) {
- case ContentType::PROTOBUF: {
- string output;
- google::protobuf::io::StringOutputStream stream(&output);
- google::protobuf::io::CodedOutputStream writer(&stream);
-
- WireFormatLite::WriteEnum(
- mesos::v1::master::Response::kTypeFieldNumber,
- mesos::v1::master::Response::GET_TASKS,
- &writer);
-
- WireFormatLite::WriteBytes(
- mesos::v1::master::Response::kGetTasksFieldNumber,
- serializeGetTasks(approvers),
- &writer);
-
- // We must manually trim the unused buffer space since
- // we use the string before the coded output stream is
- // destructed.
- writer.Trim();
-
- return OK(std::move(output), stringify(contentType));
- }
-
- case ContentType::JSON: {
- string body = jsonify([&](JSON::ObjectWriter* writer) {
- const google::protobuf::Descriptor* descriptor =
- v1::master::Response::descriptor();
-
- int field;
-
- field = v1::master::Response::kTypeFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- v1::master::Response::Type_Name(
- v1::master::Response::GET_TASKS));
-
- field = v1::master::Response::kGetTasksFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- jsonifyGetTasks(master, approvers));
- });
-
- // TODO(bmahler): Pass jsonp query parameter through here.
- return OK(std::move(body), stringify(contentType));
- }
-
- default:
- return NotAcceptable("Request must accept json or protobuf");
- }
- }));
-}
-
-
-function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetTasks(
- const Master* master,
- const Owned<ObjectApprovers>& approvers)
-{
- // Jsonify the following message:
- //
- // master::Response::GetTasks getTasks;
- // for each pending task:
- // *getTasks.add_pending_tasks() =
- // protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
- // for each task:
- // *getTasks.add_tasks() = *task;
- // for each unreachable task:
- // *getTasks.add_unreachable_tasks() = *task;
- // for each completed task:
- // *getTasks.add_completed_tasks() = *task;
-
- // TODO(bmahler): This copies the Owned object approvers.
- return [=](JSON::ObjectWriter* writer) {
- // Construct framework list with both active and completed frameworks.
- vector<const Framework*> frameworks;
- foreachvalue (Framework* framework, master->frameworks.registered) {
- // Skip unauthorized frameworks.
- if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- frameworks.push_back(framework);
- }
- }
- foreachvalue (const Owned<Framework>& framework,
- master->frameworks.completed) {
- // Skip unauthorized frameworks.
- if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- frameworks.push_back(framework.get());
- }
- }
-
- const google::protobuf::Descriptor* descriptor =
- v1::master::Response::GetTasks::descriptor();
-
- int field;
-
- // Pending tasks.
- field = v1::master::Response::GetTasks::kPendingTasksFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- [&](JSON::ArrayWriter* writer) {
- foreach (const Framework* framework, frameworks) {
- foreachvalue (const TaskInfo& t, framework->pendingTasks) {
- // Skip unauthorized tasks.
- if (!approvers->approved<VIEW_TASK>(t, framework->info)) {
- continue;
- }
-
- Task task =
- protobuf::createTask(t, TASK_STAGING, framework->id());
-
- writer->element(asV1Protobuf(task));
- }
- }
- });
-
- // Active tasks.
- field = v1::master::Response::GetTasks::kTasksFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- [&](JSON::ArrayWriter* writer) {
- foreach (const Framework* framework, frameworks) {
- foreachvalue (Task* task, framework->tasks) {
- CHECK_NOTNULL(task);
- // Skip unauthorized tasks.
- if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
- continue;
- }
-
- writer->element(asV1Protobuf(*task));
- }
- }
- });
-
- // Unreachable tasks.
- field = v1::master::Response::GetTasks::kUnreachableTasksFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- [&](JSON::ArrayWriter* writer) {
- foreach (const Framework* framework, frameworks) {
- foreachvalue (const Owned<Task>& task,
- framework->unreachableTasks) {
- // Skip unauthorized tasks.
- if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
- continue;
- }
-
- writer->element(asV1Protobuf(*task));
- }
- }
- });
-
- // Completed tasks.
- field = v1::master::Response::GetTasks::kCompletedTasksFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- [&](JSON::ArrayWriter* writer) {
- foreach (const Framework* framework, frameworks) {
- foreach (const Owned<Task>& task, framework->completedTasks) {
- // Skip unauthorized tasks.
- if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
- continue;
- }
-
- writer->element(asV1Protobuf(*task));
- }
- }
- });
- };
-}
-
-
-string Master::Http::serializeGetTasks(
- const Owned<ObjectApprovers>& approvers) const
-{
- // Construct framework list with both active and completed frameworks.
- vector<const Framework*> frameworks;
- foreachvalue (Framework* framework, master->frameworks.registered) {
- // Skip unauthorized frameworks.
- if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- frameworks.push_back(framework);
- }
- }
- foreachvalue (const Owned<Framework>& framework,
- master->frameworks.completed) {
- // Skip unauthorized frameworks.
- if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- frameworks.push_back(framework.get());
- }
- }
-
- // Serialize the following message:
- //
- // mesos::master::Response::GetTasks getTasks;
- // for each pending task:
- // *getTasks.add_pending_tasks() =
- // protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
- // for each task:
- // *getTasks.add_tasks() = *task;
- // for each unreachable task:
- // *getTasks.add_unreachable_tasks() = *task;
- // for each completed task:
- // *getTasks.add_completed_tasks() = *task;
-
- string output;
- google::protobuf::io::StringOutputStream stream(&output);
- google::protobuf::io::CodedOutputStream writer(&stream);
-
- foreach (const Framework* framework, frameworks) {
- // Pending tasks.
- foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
- // Skip unauthorized tasks.
- if (!approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
- continue;
- }
-
- // TODO(bmahler): Consider not constructing the temporary task
- // object and instead serialize directly. Since we don't expect
- // a large number of pending tasks, we currently don't bother
- // with the more efficient approach.
- //
- // *getTasks.add_pending_tasks() =
- // protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
- WireFormatLite2::WriteMessageWithoutCachedSizes(
- mesos::v1::master::Response::GetTasks::kPendingTasksFieldNumber,
- protobuf::createTask(taskInfo, TASK_STAGING, framework->id()),
- &writer);
- }
-
- // Active tasks.
- foreachvalue (Task* task, framework->tasks) {
- CHECK_NOTNULL(task);
- // Skip unauthorized tasks.
- if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
- continue;
- }
-
- WireFormatLite2::WriteMessageWithoutCachedSizes(
- mesos::v1::master::Response::GetTasks::kTasksFieldNumber,
- *task,
- &writer);
- }
-
- // Unreachable tasks.
- foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
- // Skip unauthorized tasks.
- if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
- continue;
- }
-
- WireFormatLite2::WriteMessageWithoutCachedSizes(
- mesos::v1::master::Response::GetTasks::kUnreachableTasksFieldNumber,
- *task,
- &writer);
- }
-
- // Completed tasks.
- foreach (const Owned<Task>& task, framework->completedTasks) {
- // Skip unauthorized tasks.
- if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
- continue;
- }
-
- WireFormatLite2::WriteMessageWithoutCachedSizes(
- mesos::v1::master::Response::GetTasks::kCompletedTasksFieldNumber,
- *task,
- &writer);
- }
- }
-
- // While an explicit Trim() isn't necessary (since the coded
- // output stream is destructed before the string is returned),
- // it's a quite tricky bug to diagnose if Trim() is missed, so
- // we always do it explicitly to signal the reader about this
- // subtlety.
- writer.Trim();
-
- return output;
+ master->self(),
+ [this, principal, outputContentType](
+ const Owned<ObjectApprovers>& approvers) {
+ return deferBatchedRequest(
+ &Master::ReadOnlyHandler::getTasks,
+ principal,
+ outputContentType,
+ {},
+ approvers);
+ }));
}
diff --git a/src/master/master.hpp b/src/master/master.hpp
index c9d417c..3074918 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1309,34 +1309,117 @@
// /frameworks
process::http::Response frameworks(
+ ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// /roles
process::http::Response roles(
+ ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// /slaves
process::http::Response slaves(
+ ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// /state
process::http::Response state(
+ ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// /state-summary
process::http::Response stateSummary(
+ ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// /tasks
process::http::Response tasks(
+ ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
+ // master::Call::GET_STATE
+ process::http::Response getState(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& queryParameters,
+ const process::Owned<ObjectApprovers>& approvers) const;
+
+ // master::Call::GET_AGENTS
+ process::http::Response getAgents(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& queryParameters,
+ const process::Owned<ObjectApprovers>& approvers) const;
+
+ // master::Call::GET_FRAMEWORKS
+ process::http::Response getFrameworks(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& queryParameters,
+ const process::Owned<ObjectApprovers>& approvers) const;
+
+ // master::Call::GET_EXECUTORS
+ process::http::Response getExecutors(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& queryParameters,
+ const process::Owned<ObjectApprovers>& approvers) const;
+
+ // master::Call::GET_OPERATIONS
+ process::http::Response getOperations(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& queryParameters,
+ const process::Owned<ObjectApprovers>& approvers) const;
+
+ // master::Call::GET_TASKS
+ process::http::Response getTasks(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& queryParameters,
+ const process::Owned<ObjectApprovers>& approvers) const;
+
+ // master::Call::GET_ROLES
+ process::http::Response getRoles(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& queryParameters,
+ const process::Owned<ObjectApprovers>& approvers) const;
+
+ // TODO(bmahler): These could just live in the .cpp file,
+ // however they are shared with SUBSCRIBE which currently
+ // is not implemented as a read only handler here. Make these
+ // private or only in the .cpp file once SUBSCRIBE is moved
+ // into readonly_handler.cpp.
+ std::string serializeGetState(
+ const process::Owned<ObjectApprovers>& approvers) const;
+ std::string serializeGetAgents(
+ const process::Owned<ObjectApprovers>& approvers) const;
+ std::string serializeGetFrameworks(
+ const process::Owned<ObjectApprovers>& approvers) const;
+ std::string serializeGetExecutors(
+ const process::Owned<ObjectApprovers>& approvers) const;
+ std::string serializeGetOperations(
+ const process::Owned<ObjectApprovers>& approvers) const;
+ std::string serializeGetTasks(
+ const process::Owned<ObjectApprovers>& approvers) const;
+ std::string serializeGetRoles(
+ const process::Owned<ObjectApprovers>& approvers) const;
+
+ std::function<void(JSON::ObjectWriter*)> jsonifyGetState(
+ const process::Owned<ObjectApprovers>& approvers) const;
+ std::function<void(JSON::ObjectWriter*)> jsonifyGetAgents(
+ const process::Owned<ObjectApprovers>& approvers) const;
+ std::function<void(JSON::ObjectWriter*)> jsonifyGetFrameworks(
+ const process::Owned<ObjectApprovers>& approvers) const;
+ std::function<void(JSON::ObjectWriter*)> jsonifyGetExecutors(
+ const process::Owned<ObjectApprovers>& approvers) const;
+ std::function<void(JSON::ObjectWriter*)> jsonifyGetOperations(
+ const process::Owned<ObjectApprovers>& approvers) const;
+ std::function<void(JSON::ObjectWriter*)> jsonifyGetTasks(
+ const process::Owned<ObjectApprovers>& approvers) const;
+ std::function<void(JSON::ObjectWriter*)> jsonifyGetRoles(
+ const process::Owned<ObjectApprovers>& approvers) const;
+
private:
const Master* master;
};
@@ -1631,12 +1714,6 @@
const mesos::master::Call& call,
const Option<process::http::authentication::Principal>& principal,
ContentType contentType) const;
-
- static std::function<void(JSON::ObjectWriter*)> jsonifyGetAgents(
- const Master* master,
- const process::Owned<ObjectApprovers>& approvers);
- std::string serializeGetAgents(
- const process::Owned<ObjectApprovers>& approvers) const;
mesos::master::Response::GetAgents _getAgents(
const process::Owned<ObjectApprovers>& approvers) const;
@@ -1734,12 +1811,6 @@
const mesos::master::Call& call,
const Option<process::http::authentication::Principal>& principal,
ContentType contentType) const;
-
- static std::function<void(JSON::ObjectWriter*)> jsonifyGetTasks(
- const Master* master,
- const process::Owned<ObjectApprovers>& approvers);
- std::string serializeGetTasks(
- const process::Owned<ObjectApprovers>& approvers) const;
mesos::master::Response::GetTasks _getTasks(
const process::Owned<ObjectApprovers>& approvers) const;
@@ -1777,12 +1848,6 @@
const mesos::master::Call& call,
const Option<process::http::authentication::Principal>& principal,
ContentType contentType) const;
-
- static std::function<void(JSON::ObjectWriter*)> jsonifyGetFrameworks(
- const Master* master,
- const process::Owned<ObjectApprovers>& approvers);
- std::string serializeGetFrameworks(
- const process::Owned<ObjectApprovers>& approvers) const;
mesos::master::Response::GetFrameworks _getFrameworks(
const process::Owned<ObjectApprovers>& approvers) const;
@@ -1790,12 +1855,6 @@
const mesos::master::Call& call,
const Option<process::http::authentication::Principal>& principal,
ContentType contentType) const;
-
- static std::function<void(JSON::ObjectWriter*)> jsonifyGetExecutors(
- const Master* master,
- const process::Owned<ObjectApprovers>& approvers);
- std::string serializeGetExecutors(
- const process::Owned<ObjectApprovers>& approvers) const;
mesos::master::Response::GetExecutors _getExecutors(
const process::Owned<ObjectApprovers>& approvers) const;
@@ -1803,12 +1862,6 @@
const mesos::master::Call& call,
const Option<process::http::authentication::Principal>& principal,
ContentType contentType) const;
-
- static std::function<void(JSON::ObjectWriter*)> jsonifyGetState(
- const Master* master,
- const process::Owned<ObjectApprovers>& approvers);
- std::string serializeGetState(
- const process::Owned<ObjectApprovers>& approvers) const;
mesos::master::Response::GetState _getState(
const process::Owned<ObjectApprovers>& approvers) const;
@@ -1864,12 +1917,14 @@
typedef process::http::Response
(Master::ReadOnlyHandler::*ReadOnlyRequestHandler)(
+ ContentType,
const hashmap<std::string, std::string>&,
const process::Owned<ObjectApprovers>&) const;
process::Future<process::http::Response> deferBatchedRequest(
ReadOnlyRequestHandler handler,
const Option<process::http::authentication::Principal>& principal,
+ ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
@@ -1878,6 +1933,7 @@
struct BatchedRequest
{
ReadOnlyRequestHandler handler;
+ ContentType outputContentType;
hashmap<std::string, std::string> queryParameters;
Option<process::http::authentication::Principal> principal;
process::Owned<ObjectApprovers> approvers;
@@ -2685,6 +2741,11 @@
hashmap<FrameworkID, Framework*> frameworks;
};
+
+mesos::master::Response::GetFrameworks::Framework model(
+ const Framework& framework);
+
+
} // namespace master {
} // namespace internal {
} // namespace mesos {
diff --git a/src/master/readonly_handler.cpp b/src/master/readonly_handler.cpp
index 500ce3f..fbe748d 100644
--- a/src/master/readonly_handler.cpp
+++ b/src/master/readonly_handler.cpp
@@ -19,6 +19,12 @@
#include <string>
#include <vector>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/wire_format_lite.h>
+
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
#include <mesos/mesos.hpp>
#include <mesos/authorizer/authorizer.hpp>
@@ -36,8 +42,11 @@
#include "common/build.hpp"
#include "common/http.hpp"
+using google::protobuf::internal::WireFormatLite;
+
using process::Owned;
+using process::http::NotAcceptable;
using process::http::OK;
using mesos::authorization::VIEW_EXECUTOR;
@@ -46,6 +55,9 @@
using mesos::authorization::VIEW_ROLE;
using mesos::authorization::VIEW_TASK;
+using mesos::internal::protobuf::WireFormatLite2;
+
+using std::function;
using std::vector;
using std::string;
@@ -661,9 +673,12 @@
process::http::Response Master::ReadOnlyHandler::frameworks(
+ ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
+ CHECK_EQ(outputContentType, ContentType::JSON);
+
IDAcceptor<FrameworkID> selectFrameworkId(
query.get("framework_id"));
@@ -719,9 +734,12 @@
process::http::Response Master::ReadOnlyHandler::roles(
+ ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
+ CHECK_EQ(outputContentType, ContentType::JSON);
+
const Master* master = this->master;
const vector<string> knownRoles = master->knownRoles();
@@ -796,9 +814,12 @@
process::http::Response Master::ReadOnlyHandler::slaves(
+ ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
+ CHECK_EQ(outputContentType, ContentType::JSON);
+
IDAcceptor<SlaveID> selectSlaveId(query.get("slave_id"));
return process::http::OK(
@@ -808,9 +829,12 @@
process::http::Response Master::ReadOnlyHandler::state(
+ ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
+ CHECK_EQ(outputContentType, ContentType::JSON);
+
const Master* master = this->master;
auto calculateState = [master, &approvers](JSON::ObjectWriter* writer) {
writer->field("version", MESOS_VERSION);
@@ -954,9 +978,12 @@
process::http::Response Master::ReadOnlyHandler::stateSummary(
+ ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
+ CHECK_EQ(outputContentType, ContentType::JSON);
+
const Master* master = this->master;
auto stateSummary = [master, &approvers](JSON::ObjectWriter* writer) {
writer->field("hostname", master->info().hostname());
@@ -1150,9 +1177,12 @@
process::http::Response Master::ReadOnlyHandler::tasks(
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
+ CHECK_EQ(outputContentType, ContentType::JSON);
+
// Get list options (limit and offset).
Result<int> result = numify<int>(query.get("limit"));
size_t limit = result.isSome() ? result.get() : TASK_LIMIT;
@@ -1257,6 +1287,1182 @@
return OK(jsonify(tasksWriter), query.get("jsonp"));
}
+
+function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetAgents(
+ const Owned<ObjectApprovers>& approvers) const
+{
+ // Serialize the following:
+ //
+ // mesos::master::Response::GetAgents getAgents;
+ // for each registered agent:
+ // *getAgents.add_agents() = protobuf::master::event::createAgentResponse(
+ // agent,
+ // master->slaves.draining.get(slave->id),
+ // master->slaves.deactivated.contains(slave->id),
+ // approvers);
+ // for each recovered agent:
+ // SlaveInfo* agent = getAgents.add_recovered_agents();
+ // agent->CopyFrom(slaveInfo);
+ // agent->clear_resources();
+ // foreach (const Resource& resource, slaveInfo.resources()):
+ // if (approvers->approved<VIEW_ROLE>(resource)):
+ // *agent->add_resources() = resource;
+
+ // TODO(bmahler): This copies the Owned object approvers.
+ return [=](JSON::ObjectWriter* writer) {
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Response::GetAgents::descriptor();
+
+ int field;
+
+ field = v1::master::Response::GetAgents::kAgentsFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ [&](JSON::ArrayWriter* writer) {
+ foreachvalue (const Slave* slave, master->slaves.registered) {
+ // TODO(bmahler): Consider not constructing the temporary
+ // agent object and instead serialize directly.
+ mesos::master::Response::GetAgents::Agent agent =
+ protobuf::master::event::createAgentResponse(
+ *slave,
+ master->slaves.draining.get(slave->id),
+ master->slaves.deactivated.contains(slave->id),
+ approvers);
+
+ writer->element(asV1Protobuf(agent));
+ }
+ });
+
+ field = v1::master::Response::GetAgents::kRecoveredAgentsFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ [&](JSON::ArrayWriter* writer) {
+ foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
+ // TODO(bmahler): Consider not constructing the temporary
+ // SlaveInfo object and instead serialize directly.
+ SlaveInfo agent = slaveInfo;
+ agent.clear_resources();
+ foreach (const Resource& resource, slaveInfo.resources()) {
+ if (approvers->approved<VIEW_ROLE>(resource)) {
+ *agent.add_resources() = resource;
+ }
+ }
+
+ writer->element(asV1Protobuf(agent));
+ }
+ });
+ };
+}
+
+
+string Master::ReadOnlyHandler::serializeGetAgents(
+ const Owned<ObjectApprovers>& approvers) const
+{
+ // Serialize the following:
+ //
+ // mesos::master::Response::GetAgents getAgents;
+ // for each registered agent:
+ // *getAgents.add_agents() = protobuf::master::event::createAgentResponse(
+ // agent,
+ // master->slaves.draining.get(slave->id),
+ // master->slaves.deactivated.contains(slave->id),
+ // approvers);
+ // for each recovered agent:
+ // SlaveInfo* agent = getAgents.add_recovered_agents();
+ // agent->CopyFrom(slaveInfo);
+ // agent->clear_resources();
+ // foreach (const Resource& resource, slaveInfo.resources()):
+ // if (approvers->approved<VIEW_ROLE>(resource)):
+ // *agent->add_resources() = resource;
+
+ string output;
+ google::protobuf::io::StringOutputStream stream(&output);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ foreachvalue (const Slave* slave, master->slaves.registered) {
+ // TODO(bmahler): Consider not constructing the temporary
+ // agent object and instead serialize directly.
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::master::Response::GetAgents::kAgentsFieldNumber,
+ protobuf::master::event::createAgentResponse(
+ *slave,
+ master->slaves.draining.get(slave->id),
+ master->slaves.deactivated.contains(slave->id),
+ approvers),
+ &writer);
+ }
+
+ foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
+ // TODO(bmahler): Consider not constructing the temporary
+ // SlaveInfo object and instead serialize directly.
+ SlaveInfo agent = slaveInfo;
+ agent.clear_resources();
+ foreach (const Resource& resource, slaveInfo.resources()) {
+ if (approvers->approved<VIEW_ROLE>(resource)) {
+ *agent.add_resources() = resource;
+ }
+ }
+
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::master::Response::GetAgents::kRecoveredAgentsFieldNumber,
+ agent,
+ &writer);
+ }
+
+ // While an explicit Trim() isn't necessary (since the coded
+ // output stream is destructed before the string is returned),
+ // it's a quite tricky bug to diagnose if Trim() is missed, so
+ // we always do it explicitly to signal the reader about this
+ // subtlety.
+ writer.Trim();
+
+ return output;
+}
+
+
+
+process::http::Response Master::ReadOnlyHandler::getAgents(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
+{
+ // Serialize the following message:
+ //
+ // mesos::master::Response response;
+ // response.set_type(mesos::master::Response::GET_AGENTS);
+ // *response.mutable_get_agents() = _getAgents(approvers);
+
+ switch (outputContentType) {
+ case ContentType::PROTOBUF: {
+ string output;
+ google::protobuf::io::StringOutputStream stream(&output);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ WireFormatLite::WriteEnum(
+ mesos::v1::master::Response::kTypeFieldNumber,
+ mesos::v1::master::Response::GET_AGENTS,
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::kGetAgentsFieldNumber,
+ serializeGetAgents(approvers),
+ &writer);
+
+ // We must manually trim the unused buffer space since
+ // we use the string before the coded output stream is
+ // destructed.
+ writer.Trim();
+
+ return OK(std::move(output), stringify(outputContentType));
+ }
+
+ case ContentType::JSON: {
+ string body = jsonify([&](JSON::ObjectWriter* writer) {
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Response::descriptor();
+
+ int field;
+
+ field = v1::master::Response::kTypeFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ v1::master::Response::Type_Name(
+ v1::master::Response::GET_AGENTS));
+
+ field = v1::master::Response::kGetAgentsFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ jsonifyGetAgents(approvers));
+ });
+
+ // TODO(bmahler): Pass jsonp query parameter through here.
+ return OK(std::move(body), stringify(outputContentType));
+ }
+
+ default:
+ return NotAcceptable("Request must accept json or protobuf");
+ }
+}
+
+
+function<void(JSON::ObjectWriter*)>
+ Master::ReadOnlyHandler::jsonifyGetFrameworks(
+ const Owned<ObjectApprovers>& approvers) const
+{
+ // Serialize the following:
+ //
+ // mesos::master::Response::GetFrameworks getFrameworks;
+ // for each framework:
+ // *getFrameworks.add_frameworks() = model(*framework);
+ // for each completed framework:
+ // *getFrameworks.add_completed_frameworks() = model(*framework);
+
+ // TODO(bmahler): Consider not constructing the temporary framework
+ // objects and instead serialize directly, but since we don't
+ // expect a large number of pending tasks, we currently don't
+ // bother with the more efficient approach.
+
+ // TODO(bmahler): This copies the Owned object approvers.
+ return [=](JSON::ObjectWriter* writer) {
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Response::GetFrameworks::descriptor();
+
+ int field;
+
+ field = v1::master::Response::GetFrameworks::kFrameworksFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ [&](JSON::ArrayWriter* writer) {
+ foreachvalue (const Framework* framework,
+ master->frameworks.registered) {
+ // Skip unauthorized frameworks.
+ if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ continue;
+ }
+
+ mesos::master::Response::GetFrameworks::Framework f = model(*framework);
+ writer->element(asV1Protobuf(f));
+ }
+ });
+
+ field =
+ v1::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ [&](JSON::ArrayWriter* writer) {
+ foreachvalue (const Owned<Framework>& framework,
+ master->frameworks.completed) {
+ // Skip unauthorized frameworks.
+ if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ continue;
+ }
+
+ mesos::master::Response::GetFrameworks::Framework f = model(*framework);
+ writer->element(asV1Protobuf(f));
+ }
+ });
+ };
+}
+
+
+string Master::ReadOnlyHandler::serializeGetFrameworks(
+ const Owned<ObjectApprovers>& approvers) const
+{
+ // Serialize the following:
+ //
+ // mesos::master::Response::GetFrameworks getFrameworks;
+ // for each framework:
+ // *getFrameworks.add_frameworks() = model(*framework);
+ // for each completed framework:
+ // *getFrameworks.add_completed_frameworks() = model(*framework);
+
+ string output;
+ google::protobuf::io::StringOutputStream stream(&output);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ // TODO(bmahler): Consider not constructing the temporary framework
+ // objects and instead serialize directly, but since we don't
+ // expect a large number of pending tasks, we currently don't
+ // bother with the more efficient approach.
+
+ foreachvalue (const Framework* framework,
+ master->frameworks.registered) {
+ // Skip unauthorized frameworks.
+ if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ continue;
+ }
+
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::master::Response::GetFrameworks::kFrameworksFieldNumber,
+ model(*framework),
+ &writer);
+ }
+
+ foreachvalue (const Owned<Framework>& framework,
+ master->frameworks.completed) {
+ // Skip unauthorized frameworks.
+ if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ continue;
+ }
+
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber,
+ model(*framework),
+ &writer);
+ }
+
+ // While an explicit Trim() isn't necessary (since the coded
+ // output stream is destructed before the string is returned),
+ // it's a quite tricky bug to diagnose if Trim() is missed, so
+ // we always do it explicitly to signal the reader about this
+ // subtlety.
+ writer.Trim();
+
+ return output;
+}
+
+
+process::http::Response Master::ReadOnlyHandler::getFrameworks(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
+{
+ // Serialize the following message:
+ //
+ // mesos::master::Response response;
+ // response.set_type(mesos::master::Response::GET_FRAMEWORKS);
+ // *response.mutable_get_frameworks() = _getFrameworks(approvers);
+
+ switch (outputContentType) {
+ case ContentType::PROTOBUF: {
+ string output;
+ google::protobuf::io::StringOutputStream stream(&output);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ WireFormatLite::WriteEnum(
+ mesos::v1::master::Response::kTypeFieldNumber,
+ mesos::v1::master::Response::GET_FRAMEWORKS,
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::kGetFrameworksFieldNumber,
+ serializeGetFrameworks(approvers),
+ &writer);
+
+ // We must manually trim the unused buffer space since
+ // we use the string before the coded output stream is
+ // destructed.
+ writer.Trim();
+
+ return OK(std::move(output), stringify(outputContentType));
+ }
+
+ case ContentType::JSON: {
+ string body = jsonify([&](JSON::ObjectWriter* writer) {
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Response::descriptor();
+
+ int field;
+
+ field = v1::master::Response::kTypeFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ v1::master::Response::Type_Name(
+ v1::master::Response::GET_FRAMEWORKS));
+
+ field = v1::master::Response::kGetFrameworksFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ jsonifyGetFrameworks(approvers));
+ });
+
+ // TODO(bmahler): Pass jsonp query parameter through here.
+ return OK(std::move(body), stringify(outputContentType));
+ }
+
+ default:
+ return NotAcceptable("Request must accept json or protobuf");
+ }
+}
+
+
+function<void(JSON::ObjectWriter*)>
+ Master::ReadOnlyHandler::jsonifyGetExecutors(
+ const Owned<ObjectApprovers>& approvers) const
+{
+ // Serialize the following:
+ //
+ // mesos::master::Response::GetExecutors getExecutors;
+ //
+ // for each (executor, agent):
+ // mesos::master::Response::GetExecutors::Executor* executor =
+ // getExecutors.add_executors();
+ // *executor->mutable_executor_info() = executorInfo;
+ // *executor->mutable_slave_id() = slaveId;
+
+ // TODO(bmahler): This copies the owned object approvers.
+ return [=](JSON::ObjectWriter* writer) {
+ // Construct framework list with both active and completed frameworks.
+ vector<const Framework*> frameworks;
+ foreachvalue (Framework* framework, master->frameworks.registered) {
+ // Skip unauthorized frameworks.
+ if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ frameworks.push_back(framework);
+ }
+ }
+ foreachvalue (const Owned<Framework>& framework,
+ master->frameworks.completed) {
+ // Skip unauthorized frameworks.
+ if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ frameworks.push_back(framework.get());
+ }
+ }
+
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Response::GetExecutors::descriptor();
+
+ int field;
+
+ field = v1::master::Response::GetExecutors::kExecutorsFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ [&](JSON::ArrayWriter* writer) {
+ foreach (const Framework* framework, frameworks) {
+ foreachpair (const SlaveID& slaveId,
+ const auto& executorsMap,
+ framework->executors) {
+ foreachvalue (const ExecutorInfo& executorInfo, executorsMap) {
+ // Skip unauthorized executors.
+ if (!approvers->approved<VIEW_EXECUTOR>(
+ executorInfo, framework->info)) {
+ continue;
+ }
+
+ writer->element([&](JSON::ObjectWriter* writer) {
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Response::GetExecutors::Executor::descriptor();
+
+ // Serialize the following message:
+ //
+ // mesos::master::Response::GetExecutors::Executor executor;
+ // *executor.mutable_executor_info() = executorInfo;
+ // *executor.mutable_slave_id() = slaveId;
+ int field;
+
+ field = v1::master::Response::GetExecutors::Executor
+ ::kExecutorInfoFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ asV1Protobuf(executorInfo));
+
+ field = v1::master::Response::GetExecutors::Executor
+ ::kAgentIdFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ asV1Protobuf(slaveId));
+ });
+ }
+ }
+ }
+ });
+ };
+}
+
+
+string Master::ReadOnlyHandler::serializeGetExecutors(
+ const Owned<ObjectApprovers>& approvers) const
+{
+ // Construct framework list with both active and completed frameworks.
+ vector<const Framework*> frameworks;
+ foreachvalue (Framework* framework, master->frameworks.registered) {
+ // Skip unauthorized frameworks.
+ if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ frameworks.push_back(framework);
+ }
+ }
+ foreachvalue (const Owned<Framework>& framework,
+ master->frameworks.completed) {
+ // Skip unauthorized frameworks.
+ if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ frameworks.push_back(framework.get());
+ }
+ }
+
+ // Lambda for serializing the following message:
+ //
+ // mesos::master::Response::GetExecutors::Executor executor;
+ // *executor.mutable_executor_info() = executorInfo;
+ // *executor.mutable_slave_id() = slaveId;
+ auto serializeExecutor = [](const ExecutorInfo& e, const SlaveID& s) {
+ string output;
+ google::protobuf::io::StringOutputStream stream(&output);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::v1::master::Response::GetExecutors::Executor
+ ::kExecutorInfoFieldNumber,
+ e,
+ &writer);
+
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::v1::master::Response::GetExecutors::Executor
+ ::kAgentIdFieldNumber,
+ s,
+ &writer);
+
+ // While an explicit Trim() isn't necessary (since the coded
+ // output stream is destructed before the string is returned),
+ // it's a quite tricky bug to diagnose if Trim() is missed, so
+ // we always do it explicitly to signal the reader about this
+ // subtlety.
+ writer.Trim();
+
+ return output;
+ };
+
+ string output;
+ google::protobuf::io::StringOutputStream stream(&output);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ // Serialize the following:
+ //
+ // mesos::master::Response::GetExecutors getExecutors;
+ //
+ // for each (executor, agent):
+ // mesos::master::Response::GetExecutors::Executor* executor =
+ // getExecutors.add_executors();
+ // *executor->mutable_executor_info() = executorInfo;
+ // *executor->mutable_slave_id() = slaveId;
+
+ foreach (const Framework* framework, frameworks) {
+ foreachpair (const SlaveID& slaveId,
+ const auto& executorsMap,
+ framework->executors) {
+ foreachvalue (const ExecutorInfo& executorInfo, executorsMap) {
+ // Skip unauthorized executors.
+ if (!approvers->approved<VIEW_EXECUTOR>(
+ executorInfo, framework->info)) {
+ continue;
+ }
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::GetExecutors::kExecutorsFieldNumber,
+ serializeExecutor(executorInfo, slaveId),
+ &writer);
+ }
+ }
+ }
+
+ // While an explicit Trim() isn't necessary (since the coded
+ // output stream is destructed before the string is returned),
+ // it's a quite tricky bug to diagnose if Trim() is missed, so
+ // we always do it explicitly to signal the reader about this
+ // subtlety.
+ writer.Trim();
+
+ return output;
+}
+
+
+process::http::Response Master::ReadOnlyHandler::getExecutors(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
+{
+ // Serialize the following message:
+ //
+ // mesos::master::Response response;
+ // response.set_type(mesos::master::Response::GET_EXECUTORS);
+ // *response.mutable_get_executors() = _getExecutors(approvers);
+
+ switch (outputContentType) {
+ case ContentType::PROTOBUF: {
+ string output;
+ google::protobuf::io::StringOutputStream stream(&output);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ WireFormatLite::WriteEnum(
+ mesos::v1::master::Response::kTypeFieldNumber,
+ mesos::v1::master::Response::GET_EXECUTORS,
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::kGetExecutorsFieldNumber,
+ serializeGetExecutors(approvers),
+ &writer);
+
+ // We must manually trim the unused buffer space since
+ // we use the string before the coded output stream is
+ // destructed.
+ writer.Trim();
+
+ return OK(std::move(output), stringify(outputContentType));
+ }
+
+ case ContentType::JSON: {
+ string body = jsonify([&](JSON::ObjectWriter* writer) {
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Response::descriptor();
+
+ int field;
+
+ field = v1::master::Response::kTypeFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ v1::master::Response::Type_Name(
+ v1::master::Response::GET_EXECUTORS));
+
+ field = v1::master::Response::kGetExecutorsFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ jsonifyGetExecutors(approvers));
+ });
+
+ // TODO(bmahler): Pass jsonp query parameter through here.
+ return OK(std::move(body), stringify(outputContentType));
+ }
+
+ default:
+ return NotAcceptable("Request must accept json or protobuf");
+ }
+}
+
+
+function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetTasks(
+ const Owned<ObjectApprovers>& approvers) const
+{
+ // Jsonify the following message:
+ //
+ // master::Response::GetTasks getTasks;
+ // for each pending task:
+ // *getTasks.add_pending_tasks() =
+ // protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+ // for each task:
+ // *getTasks.add_tasks() = *task;
+ // for each unreachable task:
+ // *getTasks.add_unreachable_tasks() = *task;
+ // for each completed task:
+ // *getTasks.add_completed_tasks() = *task;
+
+ // TODO(bmahler): This copies the Owned object approvers.
+ return [=](JSON::ObjectWriter* writer) {
+ // Construct framework list with both active and completed frameworks.
+ vector<const Framework*> frameworks;
+ foreachvalue (Framework* framework, master->frameworks.registered) {
+ // Skip unauthorized frameworks.
+ if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ frameworks.push_back(framework);
+ }
+ }
+ foreachvalue (const Owned<Framework>& framework,
+ master->frameworks.completed) {
+ // Skip unauthorized frameworks.
+ if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ frameworks.push_back(framework.get());
+ }
+ }
+
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Response::GetTasks::descriptor();
+
+ int field;
+
+ // Pending tasks.
+ field = v1::master::Response::GetTasks::kPendingTasksFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ [&](JSON::ArrayWriter* writer) {
+ foreach (const Framework* framework, frameworks) {
+ foreachvalue (const TaskInfo& t, framework->pendingTasks) {
+ // Skip unauthorized tasks.
+ if (!approvers->approved<VIEW_TASK>(t, framework->info)) {
+ continue;
+ }
+
+ Task task =
+ protobuf::createTask(t, TASK_STAGING, framework->id());
+
+ writer->element(asV1Protobuf(task));
+ }
+ }
+ });
+
+ // Active tasks.
+ field = v1::master::Response::GetTasks::kTasksFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ [&](JSON::ArrayWriter* writer) {
+ foreach (const Framework* framework, frameworks) {
+ foreachvalue (Task* task, framework->tasks) {
+ CHECK_NOTNULL(task);
+ // Skip unauthorized tasks.
+ if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+ continue;
+ }
+
+ writer->element(asV1Protobuf(*task));
+ }
+ }
+ });
+
+ // Unreachable tasks.
+ field = v1::master::Response::GetTasks::kUnreachableTasksFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ [&](JSON::ArrayWriter* writer) {
+ foreach (const Framework* framework, frameworks) {
+ foreachvalue (const Owned<Task>& task,
+ framework->unreachableTasks) {
+ // Skip unauthorized tasks.
+ if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+ continue;
+ }
+
+ writer->element(asV1Protobuf(*task));
+ }
+ }
+ });
+
+ // Completed tasks.
+ field = v1::master::Response::GetTasks::kCompletedTasksFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ [&](JSON::ArrayWriter* writer) {
+ foreach (const Framework* framework, frameworks) {
+ foreach (const Owned<Task>& task, framework->completedTasks) {
+ // Skip unauthorized tasks.
+ if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+ continue;
+ }
+
+ writer->element(asV1Protobuf(*task));
+ }
+ }
+ });
+ };
+}
+
+
+string Master::ReadOnlyHandler::serializeGetTasks(
+ const Owned<ObjectApprovers>& approvers) const
+{
+ // Construct framework list with both active and completed frameworks.
+ vector<const Framework*> frameworks;
+ foreachvalue (Framework* framework, master->frameworks.registered) {
+ // Skip unauthorized frameworks.
+ if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ frameworks.push_back(framework);
+ }
+ }
+ foreachvalue (const Owned<Framework>& framework,
+ master->frameworks.completed) {
+ // Skip unauthorized frameworks.
+ if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ frameworks.push_back(framework.get());
+ }
+ }
+
+ // Serialize the following message:
+ //
+ // mesos::master::Response::GetTasks getTasks;
+ // for each pending task:
+ // *getTasks.add_pending_tasks() =
+ // protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+ // for each task:
+ // *getTasks.add_tasks() = *task;
+ // for each unreachable task:
+ // *getTasks.add_unreachable_tasks() = *task;
+ // for each completed task:
+ // *getTasks.add_completed_tasks() = *task;
+
+ string output;
+ google::protobuf::io::StringOutputStream stream(&output);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ foreach (const Framework* framework, frameworks) {
+ // Pending tasks.
+ foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
+ // Skip unauthorized tasks.
+ if (!approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
+ continue;
+ }
+
+ // TODO(bmahler): Consider not constructing the temporary task
+ // object and instead serialize directly. Since we don't expect
+ // a large number of pending tasks, we currently don't bother
+ // with the more efficient approach.
+ //
+ // *getTasks.add_pending_tasks() =
+ // protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::v1::master::Response::GetTasks::kPendingTasksFieldNumber,
+ protobuf::createTask(taskInfo, TASK_STAGING, framework->id()),
+ &writer);
+ }
+
+ // Active tasks.
+ foreachvalue (Task* task, framework->tasks) {
+ CHECK_NOTNULL(task);
+ // Skip unauthorized tasks.
+ if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+ continue;
+ }
+
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::v1::master::Response::GetTasks::kTasksFieldNumber,
+ *task,
+ &writer);
+ }
+
+ // Unreachable tasks.
+ foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
+ // Skip unauthorized tasks.
+ if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+ continue;
+ }
+
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::v1::master::Response::GetTasks::kUnreachableTasksFieldNumber,
+ *task,
+ &writer);
+ }
+
+ // Completed tasks.
+ foreach (const Owned<Task>& task, framework->completedTasks) {
+ // Skip unauthorized tasks.
+ if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+ continue;
+ }
+
+ WireFormatLite2::WriteMessageWithoutCachedSizes(
+ mesos::v1::master::Response::GetTasks::kCompletedTasksFieldNumber,
+ *task,
+ &writer);
+ }
+ }
+
+ // While an explicit Trim() isn't necessary (since the coded
+ // output stream is destructed before the string is returned),
+ // it's a quite tricky bug to diagnose if Trim() is missed, so
+ // we always do it explicitly to signal the reader about this
+ // subtlety.
+ writer.Trim();
+
+ return output;
+}
+
+
+process::http::Response Master::ReadOnlyHandler::getTasks(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
+{
+ // Serialize the following message:
+ //
+ // mesos::master::Response response;
+ // response.set_type(mesos::master::Response::GET_TASKS);
+ // *response.mutable_get_tasks() = _getTasks(approvers);
+
+ switch (outputContentType) {
+ case ContentType::PROTOBUF: {
+ string output;
+ google::protobuf::io::StringOutputStream stream(&output);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ WireFormatLite::WriteEnum(
+ mesos::v1::master::Response::kTypeFieldNumber,
+ mesos::v1::master::Response::GET_TASKS,
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::kGetTasksFieldNumber,
+ serializeGetTasks(approvers),
+ &writer);
+
+ // We must manually trim the unused buffer space since
+ // we use the string before the coded output stream is
+ // destructed.
+ writer.Trim();
+
+ return OK(std::move(output), stringify(outputContentType));
+ }
+
+ case ContentType::JSON: {
+ string body = jsonify([&](JSON::ObjectWriter* writer) {
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Response::descriptor();
+
+ int field;
+
+ field = v1::master::Response::kTypeFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ v1::master::Response::Type_Name(
+ v1::master::Response::GET_TASKS));
+
+ field = v1::master::Response::kGetTasksFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ jsonifyGetTasks(approvers));
+ });
+
+ // TODO(bmahler): Pass jsonp query parameter through here.
+ return OK(std::move(body), stringify(outputContentType));
+ }
+
+ default:
+ return NotAcceptable("Request must accept json or protobuf");
+ }
+}
+
+
+process::http::Response Master::ReadOnlyHandler::getOperations(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
+{
+ // We consider a principal to be authorized to view an operation if it
+ // is authorized to view the resources the operation is performed on.
+ auto approved = [&approvers](const Operation& operation) {
+ Try<Resources> consumedResources =
+ protobuf::getConsumedResources(operation.info());
+
+ if (consumedResources.isError()) {
+ LOG(WARNING)
+ << "Could not approve operation " << operation.uuid()
+ << " since its consumed resources could not be determined:"
+ << consumedResources.error();
+
+ return false;
+ }
+
+ foreach (const Resource& resource, consumedResources.get()) {
+ if (!approvers->approved<VIEW_ROLE>(resource)) {
+ return false;
+ }
+ }
+
+ return true;
+ };
+
+ mesos::master::Response response;
+ response.set_type(mesos::master::Response::GET_OPERATIONS);
+
+ mesos::master::Response::GetOperations* operations =
+ response.mutable_get_operations();
+
+ foreachvalue (const Slave* slave, master->slaves.registered) {
+ foreachvalue (Operation* operation, slave->operations) {
+ if (approved(*operation)) {
+ operations->add_operations()->CopyFrom(*operation);
+ }
+ }
+
+ foreachvalue (
+ const Slave::ResourceProvider resourceProvider,
+ slave->resourceProviders) {
+ foreachvalue (Operation* operation, resourceProvider.operations) {
+ if (approved(*operation)) {
+ operations->add_operations()->CopyFrom(*operation);
+ }
+ }
+ }
+ }
+
+ return OK(
+ serialize(outputContentType, evolve(response)),
+ stringify(outputContentType));
+}
+
+
+process::http::Response Master::ReadOnlyHandler::getRoles(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
+{
+ const vector<string> knownRoles = master->knownRoles();
+
+ mesos::master::Response response;
+ response.set_type(mesos::master::Response::GET_ROLES);
+
+ mesos::master::Response::GetRoles* getRoles =
+ response.mutable_get_roles();
+
+ foreach (const string& name, knownRoles) {
+ if (!approvers->approved<VIEW_ROLE>(name)) {
+ continue;
+ }
+
+ mesos::Role* role = getRoles->add_roles();
+
+ role->set_name(name);
+
+ role->set_weight(master->weights.get(name).getOrElse(DEFAULT_WEIGHT));
+
+ RoleResourceBreakdown resourceBreakdown(master, name);
+
+ ResourceQuantities allocatedAndOffered =
+ resourceBreakdown.allocated() + resourceBreakdown.offered();
+
+ // `resources` will be deprecated in favor of
+ // `offered`, `allocated`, `reserved`, and quota consumption.
+ // As a result, we don't bother trying to expose more
+ // than {cpus, mem, disk, gpus} since we don't know if
+ // anything outside this set is of type SCALAR.
+ foreach (const auto& quantity, allocatedAndOffered) {
+ if (quantity.first == "cpus" || quantity.first == "mem" ||
+ quantity.first == "disk" || quantity.first == "gpus") {
+ Resource* resource = role->add_resources();
+ resource->set_name(quantity.first);
+ resource->set_type(Value::SCALAR);
+ *resource->mutable_scalar() = quantity.second;
+ }
+ }
+
+ Option<Role*> role_ = master->roles.get(name);
+
+ if (role_.isSome()) {
+ foreachkey (const FrameworkID& frameworkId, (*role_)->frameworks) {
+ *role->add_frameworks() = frameworkId;
+ }
+ }
+ }
+
+ return OK(serialize(outputContentType, evolve(response)),
+ stringify(outputContentType));
+}
+
+
+function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetState(
+ const Owned<ObjectApprovers>& approvers) const
+{
+ // Jsonify the following message:
+ //
+ // mesos::master::Response::GetState getState;
+ // *getState.mutable_get_tasks() = _getTasks(approvers);
+ // *getState.mutable_get_executors() = _getExecutors(approvers);
+ // *getState.mutable_get_frameworks() = _getFrameworks(approvers);
+ // *getState.mutable_get_agents() = _getAgents(approvers);
+
+ // TODO(bmahler): This copies the Owned object approvers.
+ return [=](JSON::ObjectWriter* writer) {
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Response::GetState::descriptor();
+
+ int field;
+
+ field = v1::master::Response::GetState::kGetTasksFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ jsonifyGetTasks(approvers));
+
+ field = v1::master::Response::GetState::kGetExecutorsFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ jsonifyGetExecutors(approvers));
+
+ field = v1::master::Response::GetState::kGetFrameworksFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ jsonifyGetFrameworks(approvers));
+
+ field = v1::master::Response::GetState::kGetAgentsFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ jsonifyGetAgents(approvers));
+ };
+}
+
+
+string Master::ReadOnlyHandler::serializeGetState(
+ const Owned<ObjectApprovers>& approvers) const
+{
+ // Serialize the following message:
+ //
+ // mesos::master::Response::GetState getState;
+ // *getState.mutable_get_tasks() = _getTasks(approvers);
+ // *getState.mutable_get_executors() = _getExecutors(approvers);
+ // *getState.mutable_get_frameworks() = _getFrameworks(approvers);
+ // *getState.mutable_get_agents() = _getAgents(approvers);
+
+ string output;
+ google::protobuf::io::StringOutputStream stream(&output);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::GetState::kGetTasksFieldNumber,
+ serializeGetTasks(approvers),
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::GetState::kGetExecutorsFieldNumber,
+ serializeGetExecutors(approvers),
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::GetState::kGetFrameworksFieldNumber,
+ serializeGetFrameworks(approvers),
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::GetState::kGetAgentsFieldNumber,
+ serializeGetAgents(approvers),
+ &writer);
+
+ // While an explicit Trim() isn't necessary (since the coded
+ // output stream is destructed before the string is returned),
+ // it's a quite tricky bug to diagnose if Trim() is missed, so
+ // we always do it explicitly to signal the reader about this
+ // subtlety.
+ writer.Trim();
+
+ return output;
+}
+
+
+process::http::Response Master::ReadOnlyHandler::getState(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
+{
+ // Serialize the following message:
+ //
+ // mesos::master::Response response;
+ // response.set_type(mesos::master::Response::GET_STATE);
+ // *response.mutable_get_state() = _getState(approvers);
+
+ switch (outputContentType) {
+ case ContentType::PROTOBUF: {
+ string output;
+ google::protobuf::io::StringOutputStream stream(&output);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ WireFormatLite::WriteEnum(
+ mesos::v1::master::Response::kTypeFieldNumber,
+ mesos::v1::master::Response::GET_STATE,
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Response::kGetStateFieldNumber,
+ serializeGetState(approvers),
+ &writer);
+
+ // We must manually trim the unused buffer space since
+ // we use the string before the coded output stream is
+ // destructed.
+ writer.Trim();
+
+ return OK(std::move(output), stringify(outputContentType));
+ }
+
+ case ContentType::JSON: {
+ string body = jsonify([&](JSON::ObjectWriter* writer) {
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Response::descriptor();
+
+ int field;
+
+ field = v1::master::Response::kTypeFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ v1::master::Response::Type_Name(
+ v1::master::Response::GET_STATE));
+
+ field = v1::master::Response::kGetStateFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ jsonifyGetState(approvers));
+ });
+
+ // TODO(bmahler): Pass jsonp query parameter through here.
+ return OK(std::move(body), stringify(outputContentType));
+ }
+
+ default:
+ return NotAcceptable("Request must accept json or protobuf");
+ }
+}
+
} // namespace master {
} // namespace internal {
} // namespace mesos {
diff --git a/src/tests/master_load_tests.cpp b/src/tests/master_load_tests.cpp
index ac16fe7..6cee248 100644
--- a/src/tests/master_load_tests.cpp
+++ b/src/tests/master_load_tests.cpp
@@ -390,15 +390,20 @@
Response reference;
if (request.endpoint == "/state") {
- reference = readOnlyHandler.state(queryParameters, approvers);
+ reference = readOnlyHandler.state(
+ ContentType::JSON, queryParameters, approvers);
} else if (request.endpoint == "/state-summary") {
- reference = readOnlyHandler.stateSummary(queryParameters, approvers);
+ reference = readOnlyHandler.stateSummary(
+ ContentType::JSON, queryParameters, approvers);
} else if (request.endpoint == "/roles") {
- reference = readOnlyHandler.roles(queryParameters, approvers);
+ reference = readOnlyHandler.roles(
+ ContentType::JSON, queryParameters, approvers);
} else if (request.endpoint == "/frameworks") {
- reference = readOnlyHandler.frameworks(queryParameters, approvers);
+ reference = readOnlyHandler.frameworks(
+ ContentType::JSON, queryParameters, approvers);
} else if (request.endpoint == "/slaves") {
- reference = readOnlyHandler.slaves(queryParameters, approvers);
+ reference = readOnlyHandler.slaves(
+ ContentType::JSON, queryParameters, approvers);
} else {
UNREACHABLE();
}