Updated all GET_* v1 master calls to be served in parallel.

Using the same approach taken for the v0 read-only endpoints,
this enables parallel reads for the GET_* v1 master calls.

Review: https://reviews.apache.org/r/71978
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 16dbed3..8a58863 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -539,7 +539,7 @@
     field = v1::master::Event::Subscribed::kGetStateFieldNumber;
     writer->field(
         descriptor->FindFieldByNumber(field)->name(),
-        jsonifyGetState(master, approvers));
+        Master::ReadOnlyHandler(master).jsonifyGetState(approvers));
 
     field = v1::master::Event::Subscribed::kHeartbeatIntervalSecondsFieldNumber;
     writer->field(
@@ -565,7 +565,7 @@
 
   WireFormatLite::WriteBytes(
       mesos::v1::master::Event::Subscribed::kGetStateFieldNumber,
-      serializeGetState(approvers),
+      Master::ReadOnlyHandler(master).serializeGetState(approvers),
       &writer);
 
   WireFormatLite::WriteDouble(
@@ -1392,6 +1392,7 @@
           return deferBatchedRequest(
               &Master::ReadOnlyHandler::frameworks,
               principal,
+              ContentType::JSON,
               request.url.query,
               approvers);
         }));
@@ -1451,7 +1452,7 @@
 Future<Response> Master::Http::getFrameworks(
     const mesos::master::Call& call,
     const Option<Principal>& principal,
-    ContentType contentType) const
+    ContentType outputContentType) const
 {
   CHECK_EQ(mesos::master::Call::GET_FRAMEWORKS, call.type());
 
@@ -1460,182 +1461,16 @@
       principal,
       {VIEW_FRAMEWORK})
     .then(defer(
-        master->self(),
-        [=](const Owned<ObjectApprovers>& approvers) -> Future<Response> {
-          // Serialize the following message:
-          //
-          //   mesos::master::Response response;
-          //   response.set_type(mesos::master::Response::GET_FRAMEWORKS);
-          //   *response.mutable_get_frameworks() = _getFrameworks(approvers);
-
-          switch (contentType) {
-            case ContentType::PROTOBUF: {
-              string output;
-              google::protobuf::io::StringOutputStream stream(&output);
-              google::protobuf::io::CodedOutputStream writer(&stream);
-
-              WireFormatLite::WriteEnum(
-                  mesos::v1::master::Response::kTypeFieldNumber,
-                  mesos::v1::master::Response::GET_FRAMEWORKS,
-                  &writer);
-
-              WireFormatLite::WriteBytes(
-                  mesos::v1::master::Response::kGetFrameworksFieldNumber,
-                  serializeGetFrameworks(approvers),
-                  &writer);
-
-              // We must manually trim the unused buffer space since
-              // we use the string before the coded output stream is
-              // destructed.
-              writer.Trim();
-
-              return OK(std::move(output), stringify(contentType));
-            }
-
-            case ContentType::JSON: {
-              string body = jsonify([&](JSON::ObjectWriter* writer) {
-                const google::protobuf::Descriptor* descriptor =
-                  v1::master::Response::descriptor();
-
-                int field;
-
-                field = v1::master::Response::kTypeFieldNumber;
-                writer->field(
-                    descriptor->FindFieldByNumber(field)->name(),
-                    v1::master::Response::Type_Name(
-                        v1::master::Response::GET_FRAMEWORKS));
-
-                field = v1::master::Response::kGetFrameworksFieldNumber;
-                writer->field(
-                    descriptor->FindFieldByNumber(field)->name(),
-                    jsonifyGetFrameworks(master, approvers));
-              });
-
-              // TODO(bmahler): Pass jsonp query parameter through here.
-              return OK(std::move(body), stringify(contentType));
-            }
-
-            default:
-              return NotAcceptable("Request must accept json or protobuf");
-          }
-      }));
-}
-
-
-function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetFrameworks(
-    const Master* master,
-    const Owned<ObjectApprovers>& approvers)
-{
-  // Serialize the following:
-  //
-  //   mesos::master::Response::GetFrameworks getFrameworks;
-  //   for each framework:
-  //     *getFrameworks.add_frameworks() = model(*framework);
-  //   for each completed framework:
-  //     *getFrameworks.add_completed_frameworks() = model(*framework);
-
-  // TODO(bmahler): Consider not constructing the temporary framework
-  // objects and instead serialize directly, but since we don't
-  // expect a large number of pending tasks, we currently don't
-  // bother with the more efficient approach.
-
-  // TODO(bmahler): This copies the Owned object approvers.
-  return [=](JSON::ObjectWriter* writer) {
-    const google::protobuf::Descriptor* descriptor =
-      v1::master::Response::GetFrameworks::descriptor();
-
-    int field;
-
-    field = v1::master::Response::GetFrameworks::kFrameworksFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        [&](JSON::ArrayWriter* writer) {
-      foreachvalue (const Framework* framework,
-                    master->frameworks.registered) {
-        // Skip unauthorized frameworks.
-        if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
-          continue;
-        }
-
-        mesos::master::Response::GetFrameworks::Framework f = model(*framework);
-        writer->element(asV1Protobuf(f));
-      }
-    });
-
-    field =
-      v1::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        [&](JSON::ArrayWriter* writer) {
-      foreachvalue (const Owned<Framework>& framework,
-                    master->frameworks.completed) {
-        // Skip unauthorized frameworks.
-        if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
-          continue;
-        }
-
-        mesos::master::Response::GetFrameworks::Framework f = model(*framework);
-        writer->element(asV1Protobuf(f));
-      }
-    });
-  };
-}
-
-
-string Master::Http::serializeGetFrameworks(
-    const Owned<ObjectApprovers>& approvers) const
-{
-  // Serialize the following:
-  //
-  //   mesos::master::Response::GetFrameworks getFrameworks;
-  //   for each framework:
-  //     *getFrameworks.add_frameworks() = model(*framework);
-  //   for each completed framework:
-  //     *getFrameworks.add_completed_frameworks() = model(*framework);
-
-  string output;
-  google::protobuf::io::StringOutputStream stream(&output);
-  google::protobuf::io::CodedOutputStream writer(&stream);
-
-  // TODO(bmahler): Consider not constructing the temporary framework
-  // objects and instead serialize directly, but since we don't
-  // expect a large number of pending tasks, we currently don't
-  // bother with the more efficient approach.
-
-  foreachvalue (const Framework* framework,
-                master->frameworks.registered) {
-    // Skip unauthorized frameworks.
-    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
-      continue;
-    }
-
-    WireFormatLite2::WriteMessageWithoutCachedSizes(
-        mesos::master::Response::GetFrameworks::kFrameworksFieldNumber,
-        model(*framework),
-        &writer);
-  }
-
-  foreachvalue (const Owned<Framework>& framework,
-                master->frameworks.completed) {
-    // Skip unauthorized frameworks.
-    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
-      continue;
-    }
-
-    WireFormatLite2::WriteMessageWithoutCachedSizes(
-        mesos::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber,
-        model(*framework),
-        &writer);
-  }
-
-  // While an explicit Trim() isn't necessary (since the coded
-  // output stream is destructed before the string is returned),
-  // it's a quite tricky bug to diagnose if Trim() is missed, so
-  // we always do it explicitly to signal the reader about this
-  // subtlety.
-  writer.Trim();
-
-  return output;
+          master->self(),
+          [this, principal, outputContentType](
+              const Owned<ObjectApprovers>& approvers) {
+            return deferBatchedRequest(
+                &Master::ReadOnlyHandler::getFrameworks,
+                principal,
+                outputContentType,
+                {},
+                approvers);
+          }));
 }
 
 
@@ -1670,7 +1505,7 @@
 Future<Response> Master::Http::getExecutors(
     const mesos::master::Call& call,
     const Option<Principal>& principal,
-    ContentType contentType) const
+    ContentType outputContentType) const
 {
   CHECK_EQ(mesos::master::Call::GET_EXECUTORS, call.type());
 
@@ -1679,243 +1514,16 @@
       principal,
       {VIEW_FRAMEWORK, VIEW_EXECUTOR})
     .then(defer(
-        master->self(),
-        [=](const Owned<ObjectApprovers>& approvers) -> Response {
-          // Serialize the following message:
-          //
-          //   mesos::master::Response response;
-          //   response.set_type(mesos::master::Response::GET_EXECUTORS);
-          //   *response.mutable_get_executors() = _getExecutors(approvers);
-
-          switch (contentType) {
-            case ContentType::PROTOBUF: {
-              string output;
-              google::protobuf::io::StringOutputStream stream(&output);
-              google::protobuf::io::CodedOutputStream writer(&stream);
-
-              WireFormatLite::WriteEnum(
-                  mesos::v1::master::Response::kTypeFieldNumber,
-                  mesos::v1::master::Response::GET_EXECUTORS,
-                  &writer);
-
-              WireFormatLite::WriteBytes(
-                  mesos::v1::master::Response::kGetExecutorsFieldNumber,
-                  serializeGetExecutors(approvers),
-                  &writer);
-
-              // We must manually trim the unused buffer space since
-              // we use the string before the coded output stream is
-              // destructed.
-              writer.Trim();
-
-              return OK(std::move(output), stringify(contentType));
-            }
-
-            case ContentType::JSON: {
-              string body = jsonify([&](JSON::ObjectWriter* writer) {
-                const google::protobuf::Descriptor* descriptor =
-                  v1::master::Response::descriptor();
-
-                int field;
-
-                field = v1::master::Response::kTypeFieldNumber;
-                writer->field(
-                    descriptor->FindFieldByNumber(field)->name(),
-                    v1::master::Response::Type_Name(
-                        v1::master::Response::GET_EXECUTORS));
-
-                field = v1::master::Response::kGetExecutorsFieldNumber;
-                writer->field(
-                    descriptor->FindFieldByNumber(field)->name(),
-                    jsonifyGetExecutors(master, approvers));
-              });
-
-              // TODO(bmahler): Pass jsonp query parameter through here.
-              return OK(std::move(body), stringify(contentType));
-            }
-
-            default:
-              return NotAcceptable("Request must accept json or protobuf");
-          }
-      }));
-}
-
-
-function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetExecutors(
-    const Master* master,
-    const Owned<ObjectApprovers>& approvers)
-{
-  // Serialize the following:
-  //
-  //   mesos::master::Response::GetExecutors getExecutors;
-  //
-  //   for each (executor, agent):
-  //     mesos::master::Response::GetExecutors::Executor* executor =
-  //       getExecutors.add_executors();
-  //     *executor->mutable_executor_info() = executorInfo;
-  //     *executor->mutable_slave_id() = slaveId;
-
-  // TODO(bmahler): This copies the owned object approvers.
-  return [=](JSON::ObjectWriter* writer) {
-    // Construct framework list with both active and completed frameworks.
-    vector<const Framework*> frameworks;
-    foreachvalue (Framework* framework, master->frameworks.registered) {
-      // Skip unauthorized frameworks.
-      if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
-        frameworks.push_back(framework);
-      }
-    }
-    foreachvalue (const Owned<Framework>& framework,
-                  master->frameworks.completed) {
-      // Skip unauthorized frameworks.
-      if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
-        frameworks.push_back(framework.get());
-      }
-    }
-
-    const google::protobuf::Descriptor* descriptor =
-      v1::master::Response::GetExecutors::descriptor();
-
-    int field;
-
-    field = v1::master::Response::GetExecutors::kExecutorsFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        [&](JSON::ArrayWriter* writer) {
-      foreach (const Framework* framework, frameworks) {
-        foreachpair (const SlaveID& slaveId,
-                     const auto& executorsMap,
-                     framework->executors) {
-          foreachvalue (const ExecutorInfo& executorInfo, executorsMap) {
-            // Skip unauthorized executors.
-            if (!approvers->approved<VIEW_EXECUTOR>(
-                    executorInfo, framework->info)) {
-              continue;
-            }
-
-            writer->element([&](JSON::ObjectWriter* writer) {
-              const google::protobuf::Descriptor* descriptor =
-                v1::master::Response::GetExecutors::Executor::descriptor();
-
-              // Serialize the following message:
-              //
-              //   mesos::master::Response::GetExecutors::Executor executor;
-              //   *executor.mutable_executor_info() = executorInfo;
-              //   *executor.mutable_slave_id() = slaveId;
-              int field;
-
-              field = v1::master::Response::GetExecutors::Executor
-                ::kExecutorInfoFieldNumber;
-              writer->field(
-                  descriptor->FindFieldByNumber(field)->name(),
-                  asV1Protobuf(executorInfo));
-
-              field = v1::master::Response::GetExecutors::Executor
-                ::kAgentIdFieldNumber;
-              writer->field(
-                  descriptor->FindFieldByNumber(field)->name(),
-                  asV1Protobuf(slaveId));
-            });
-          }
-        }
-      }
-    });
-  };
-}
-
-
-string Master::Http::serializeGetExecutors(
-    const Owned<ObjectApprovers>& approvers) const
-{
-  // Construct framework list with both active and completed frameworks.
-  vector<const Framework*> frameworks;
-  foreachvalue (Framework* framework, master->frameworks.registered) {
-    // Skip unauthorized frameworks.
-    if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
-      frameworks.push_back(framework);
-    }
-  }
-  foreachvalue (const Owned<Framework>& framework,
-                master->frameworks.completed) {
-    // Skip unauthorized frameworks.
-    if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
-      frameworks.push_back(framework.get());
-    }
-  }
-
-  // Lambda for serializing the following message:
-  //
-  //   mesos::master::Response::GetExecutors::Executor executor;
-  //   *executor.mutable_executor_info() = executorInfo;
-  //   *executor.mutable_slave_id() = slaveId;
-  auto serializeExecutor = [](const ExecutorInfo& e, const SlaveID& s) {
-    string output;
-    google::protobuf::io::StringOutputStream stream(&output);
-    google::protobuf::io::CodedOutputStream writer(&stream);
-
-    WireFormatLite2::WriteMessageWithoutCachedSizes(
-        mesos::v1::master::Response::GetExecutors::Executor
-          ::kExecutorInfoFieldNumber,
-        e,
-        &writer);
-
-    WireFormatLite2::WriteMessageWithoutCachedSizes(
-        mesos::v1::master::Response::GetExecutors::Executor
-          ::kAgentIdFieldNumber,
-        s,
-        &writer);
-
-    // While an explicit Trim() isn't necessary (since the coded
-    // output stream is destructed before the string is returned),
-    // it's a quite tricky bug to diagnose if Trim() is missed, so
-    // we always do it explicitly to signal the reader about this
-    // subtlety.
-    writer.Trim();
-
-    return output;
-  };
-
-  string output;
-  google::protobuf::io::StringOutputStream stream(&output);
-  google::protobuf::io::CodedOutputStream writer(&stream);
-
-  // Serialize the following:
-  //
-  //   mesos::master::Response::GetExecutors getExecutors;
-  //
-  //   for each (executor, agent):
-  //     mesos::master::Response::GetExecutors::Executor* executor =
-  //       getExecutors.add_executors();
-  //     *executor->mutable_executor_info() = executorInfo;
-  //     *executor->mutable_slave_id() = slaveId;
-
-  foreach (const Framework* framework, frameworks) {
-    foreachpair (const SlaveID& slaveId,
-                 const auto& executorsMap,
-                 framework->executors) {
-      foreachvalue (const ExecutorInfo& executorInfo, executorsMap) {
-        // Skip unauthorized executors.
-        if (!approvers->approved<VIEW_EXECUTOR>(
-                executorInfo, framework->info)) {
-          continue;
-        }
-
-        WireFormatLite::WriteBytes(
-            mesos::v1::master::Response::GetExecutors::kExecutorsFieldNumber,
-            serializeExecutor(executorInfo, slaveId),
-            &writer);
-      }
-    }
-  }
-
-  // While an explicit Trim() isn't necessary (since the coded
-  // output stream is destructed before the string is returned),
-  // it's a quite tricky bug to diagnose if Trim() is missed, so
-  // we always do it explicitly to signal the reader about this
-  // subtlety.
-  writer.Trim();
-
-  return output;
+          master->self(),
+          [this, principal, outputContentType](
+             const Owned<ObjectApprovers>& approvers) {
+            return deferBatchedRequest(
+                &Master::ReadOnlyHandler::getExecutors,
+                principal,
+                outputContentType,
+                {},
+                approvers);
+          }));
 }
 
 
@@ -1972,7 +1580,7 @@
 Future<Response> Master::Http::getState(
     const mesos::master::Call& call,
     const Option<Principal>& principal,
-    ContentType contentType) const
+    ContentType outputContentType) const
 {
   CHECK_EQ(mesos::master::Call::GET_STATE, call.type());
 
@@ -1981,153 +1589,16 @@
       principal,
       {VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR, VIEW_ROLE})
     .then(defer(
-        master->self(),
-        [=](const Owned<ObjectApprovers>& approvers) -> Response {
-          // Serialize the following message:
-          //
-          //   mesos::master::Response response;
-          //   response.set_type(mesos::master::Response::GET_STATE);
-          //   *response.mutable_get_state() = _getState(approvers);
-
-          switch (contentType) {
-            case ContentType::PROTOBUF: {
-              string output;
-              google::protobuf::io::StringOutputStream stream(&output);
-              google::protobuf::io::CodedOutputStream writer(&stream);
-
-              WireFormatLite::WriteEnum(
-                  mesos::v1::master::Response::kTypeFieldNumber,
-                  mesos::v1::master::Response::GET_STATE,
-                  &writer);
-
-              WireFormatLite::WriteBytes(
-                  mesos::v1::master::Response::kGetStateFieldNumber,
-                  serializeGetState(approvers),
-                  &writer);
-
-              // We must manually trim the unused buffer space since
-              // we use the string before the coded output stream is
-              // destructed.
-              writer.Trim();
-
-              return OK(std::move(output), stringify(contentType));
-            }
-
-            case ContentType::JSON: {
-              string body = jsonify([&](JSON::ObjectWriter* writer) {
-                const google::protobuf::Descriptor* descriptor =
-                  v1::master::Response::descriptor();
-
-                int field;
-
-                field = v1::master::Response::kTypeFieldNumber;
-                writer->field(
-                    descriptor->FindFieldByNumber(field)->name(),
-                    v1::master::Response::Type_Name(
-                        v1::master::Response::GET_STATE));
-
-                field = v1::master::Response::kGetStateFieldNumber;
-                writer->field(
-                    descriptor->FindFieldByNumber(field)->name(),
-                    jsonifyGetState(master, approvers));
-              });
-
-              // TODO(bmahler): Pass jsonp query parameter through here.
-              return OK(std::move(body), stringify(contentType));
-            }
-
-            default:
-              return NotAcceptable("Request must accept json or protobuf");
-          }
-        }));
-}
-
-
-function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetState(
-    const Master* master,
-    const Owned<ObjectApprovers>& approvers)
-{
-  // Jsonify the following message:
-  //
-  //   mesos::master::Response::GetState getState;
-  //   *getState.mutable_get_tasks() = _getTasks(approvers);
-  //   *getState.mutable_get_executors() = _getExecutors(approvers);
-  //   *getState.mutable_get_frameworks() = _getFrameworks(approvers);
-  //   *getState.mutable_get_agents() = _getAgents(approvers);
-
-  // TODO(bmahler): This copies the Owned object approvers.
-  return [=](JSON::ObjectWriter* writer) {
-    const google::protobuf::Descriptor* descriptor =
-      v1::master::Response::GetState::descriptor();
-
-    int field;
-
-    field = v1::master::Response::GetState::kGetTasksFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        jsonifyGetTasks(master, approvers));
-
-    field = v1::master::Response::GetState::kGetExecutorsFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        jsonifyGetExecutors(master, approvers));
-
-    field = v1::master::Response::GetState::kGetFrameworksFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        jsonifyGetFrameworks(master, approvers));
-
-    field = v1::master::Response::GetState::kGetAgentsFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        jsonifyGetAgents(master, approvers));
-  };
-}
-
-
-string Master::Http::serializeGetState(
-    const Owned<ObjectApprovers>& approvers) const
-{
-  // Serialize the following message:
-  //
-  //   mesos::master::Response::GetState getState;
-  //   *getState.mutable_get_tasks() = _getTasks(approvers);
-  //   *getState.mutable_get_executors() = _getExecutors(approvers);
-  //   *getState.mutable_get_frameworks() = _getFrameworks(approvers);
-  //   *getState.mutable_get_agents() = _getAgents(approvers);
-
-  string output;
-  google::protobuf::io::StringOutputStream stream(&output);
-  google::protobuf::io::CodedOutputStream writer(&stream);
-
-  WireFormatLite::WriteBytes(
-      mesos::v1::master::Response::GetState::kGetTasksFieldNumber,
-      serializeGetTasks(approvers),
-      &writer);
-
-  WireFormatLite::WriteBytes(
-      mesos::v1::master::Response::GetState::kGetExecutorsFieldNumber,
-      serializeGetExecutors(approvers),
-      &writer);
-
-  WireFormatLite::WriteBytes(
-      mesos::v1::master::Response::GetState::kGetFrameworksFieldNumber,
-      serializeGetFrameworks(approvers),
-      &writer);
-
-  WireFormatLite::WriteBytes(
-      mesos::v1::master::Response::GetState::kGetAgentsFieldNumber,
-      serializeGetAgents(approvers),
-      &writer);
-
-  // While an explicit Trim() isn't necessary (since the coded
-  // output stream is destructed before the string is returned),
-  // it's a quite tricky bug to diagnose if Trim() is missed, so
-  // we always do it explicitly to signal the reader about this
-  // subtlety.
-  writer.Trim();
-
-  return output;
+          master->self(),
+          [this, principal, outputContentType](
+              const Owned<ObjectApprovers>& approvers) {
+            return deferBatchedRequest(
+                &Master::ReadOnlyHandler::getState,
+                principal,
+                outputContentType,
+                {},
+                approvers);
+          }));
 }
 
 
@@ -2896,6 +2367,7 @@
           return deferBatchedRequest(
               &Master::ReadOnlyHandler::slaves,
               principal,
+              ContentType::JSON,
               request.url.query,
               approvers);
         }));
@@ -2905,204 +2377,22 @@
 Future<Response> Master::Http::getAgents(
     const mesos::master::Call& call,
     const Option<Principal>& principal,
-    ContentType contentType) const
+    ContentType outputContentType) const
 {
   CHECK_EQ(mesos::master::Call::GET_AGENTS, call.type());
 
   return ObjectApprovers::create(master->authorizer, principal, {VIEW_ROLE})
     .then(defer(
-        master->self(),
-        [=](const Owned<ObjectApprovers>& approvers) -> Response {
-          // Serialize the following message:
-          //
-          //   mesos::master::Response response;
-          //   response.set_type(mesos::master::Response::GET_AGENTS);
-          //   *response.mutable_get_agents() = _getAgents(approvers);
-
-          switch (contentType) {
-            case ContentType::PROTOBUF: {
-              string output;
-              google::protobuf::io::StringOutputStream stream(&output);
-              google::protobuf::io::CodedOutputStream writer(&stream);
-
-              WireFormatLite::WriteEnum(
-                  mesos::v1::master::Response::kTypeFieldNumber,
-                  mesos::v1::master::Response::GET_AGENTS,
-                  &writer);
-
-              WireFormatLite::WriteBytes(
-                  mesos::v1::master::Response::kGetAgentsFieldNumber,
-                  serializeGetAgents(approvers),
-                  &writer);
-
-              // We must manually trim the unused buffer space since
-              // we use the string before the coded output stream is
-              // destructed.
-              writer.Trim();
-
-              return OK(std::move(output), stringify(contentType));
-            }
-
-            case ContentType::JSON: {
-              string body = jsonify([&](JSON::ObjectWriter* writer) {
-                const google::protobuf::Descriptor* descriptor =
-                  v1::master::Response::descriptor();
-
-                int field;
-
-                field = v1::master::Response::kTypeFieldNumber;
-                writer->field(
-                    descriptor->FindFieldByNumber(field)->name(),
-                    v1::master::Response::Type_Name(
-                        v1::master::Response::GET_AGENTS));
-
-                field = v1::master::Response::kGetAgentsFieldNumber;
-                writer->field(
-                    descriptor->FindFieldByNumber(field)->name(),
-                    jsonifyGetAgents(master, approvers));
-              });
-
-              // TODO(bmahler): Pass jsonp query parameter through here.
-              return OK(std::move(body), stringify(contentType));
-            }
-
-            default:
-              return NotAcceptable("Request must accept json or protobuf");
-          }
-    }));
-}
-
-
-function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetAgents(
-    const Master* master,
-    const Owned<ObjectApprovers>& approvers)
-{
-  // Serialize the following:
-  //
-  //   mesos::master::Response::GetAgents getAgents;
-  //   for each registered agent:
-  //     *getAgents.add_agents() = protobuf::master::event::createAgentResponse(
-  //         agent,
-  //         master->slaves.draining.get(slave->id),
-  //         master->slaves.deactivated.contains(slave->id),
-  //         approvers);
-  //   for each recovered agent:
-  //     SlaveInfo* agent = getAgents.add_recovered_agents();
-  //     agent->CopyFrom(slaveInfo);
-  //     agent->clear_resources();
-  //     foreach (const Resource& resource, slaveInfo.resources()):
-  //       if (approvers->approved<VIEW_ROLE>(resource)):
-  //         *agent->add_resources() = resource;
-
-  // TODO(bmahler): This copies the Owned object approvers.
-  return [=](JSON::ObjectWriter* writer) {
-    const google::protobuf::Descriptor* descriptor =
-      v1::master::Response::GetAgents::descriptor();
-
-    int field;
-
-    field = v1::master::Response::GetAgents::kAgentsFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        [&](JSON::ArrayWriter* writer) {
-      foreachvalue (const Slave* slave, master->slaves.registered) {
-        // TODO(bmahler): Consider not constructing the temporary
-        // agent object and instead serialize directly.
-        mesos::master::Response::GetAgents::Agent agent =
-            protobuf::master::event::createAgentResponse(
-                *slave,
-                master->slaves.draining.get(slave->id),
-                master->slaves.deactivated.contains(slave->id),
+          master->self(),
+          [this, principal, outputContentType](
+              const Owned<ObjectApprovers>& approvers) {
+            return deferBatchedRequest(
+                &Master::ReadOnlyHandler::getAgents,
+                principal,
+                outputContentType,
+                {},
                 approvers);
-
-        writer->element(asV1Protobuf(agent));
-      }
-    });
-
-    field = v1::master::Response::GetAgents::kRecoveredAgentsFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        [&](JSON::ArrayWriter* writer) {
-      foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
-        // TODO(bmahler): Consider not constructing the temporary
-        // SlaveInfo object and instead serialize directly.
-        SlaveInfo agent = slaveInfo;
-        agent.clear_resources();
-        foreach (const Resource& resource, slaveInfo.resources()) {
-          if (approvers->approved<VIEW_ROLE>(resource)) {
-            *agent.add_resources() = resource;
-          }
-        }
-
-        writer->element(asV1Protobuf(agent));
-      }
-    });
-  };
-}
-
-
-string Master::Http::serializeGetAgents(
-    const Owned<ObjectApprovers>& approvers) const
-{
-  // Serialize the following:
-  //
-  //   mesos::master::Response::GetAgents getAgents;
-  //   for each registered agent:
-  //     *getAgents.add_agents() = protobuf::master::event::createAgentResponse(
-  //         agent,
-  //         master->slaves.draining.get(slave->id),
-  //         master->slaves.deactivated.contains(slave->id),
-  //         approvers);
-  //   for each recovered agent:
-  //     SlaveInfo* agent = getAgents.add_recovered_agents();
-  //     agent->CopyFrom(slaveInfo);
-  //     agent->clear_resources();
-  //     foreach (const Resource& resource, slaveInfo.resources()):
-  //       if (approvers->approved<VIEW_ROLE>(resource)):
-  //         *agent->add_resources() = resource;
-
-  string output;
-  google::protobuf::io::StringOutputStream stream(&output);
-  google::protobuf::io::CodedOutputStream writer(&stream);
-
-  foreachvalue (const Slave* slave, master->slaves.registered) {
-    // TODO(bmahler): Consider not constructing the temporary
-    // agent object and instead serialize directly.
-    WireFormatLite2::WriteMessageWithoutCachedSizes(
-        mesos::master::Response::GetAgents::kAgentsFieldNumber,
-        protobuf::master::event::createAgentResponse(
-            *slave,
-            master->slaves.draining.get(slave->id),
-            master->slaves.deactivated.contains(slave->id),
-            approvers),
-        &writer);
-  }
-
-  foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
-    // TODO(bmahler): Consider not constructing the temporary
-    // SlaveInfo object and instead serialize directly.
-    SlaveInfo agent = slaveInfo;
-    agent.clear_resources();
-    foreach (const Resource& resource, slaveInfo.resources()) {
-      if (approvers->approved<VIEW_ROLE>(resource)) {
-        *agent.add_resources() = resource;
-      }
-    }
-
-    WireFormatLite2::WriteMessageWithoutCachedSizes(
-        mesos::master::Response::GetAgents::kRecoveredAgentsFieldNumber,
-        agent,
-        &writer);
-  }
-
-  // While an explicit Trim() isn't necessary (since the coded
-  // output stream is destructed before the string is returned),
-  // it's a quite tricky bug to diagnose if Trim() is missed, so
-  // we always do it explicitly to signal the reader about this
-  // subtlety.
-  writer.Trim();
-
-  return output;
+          }));
 }
 
 
@@ -3392,6 +2682,7 @@
           return deferBatchedRequest(
               &Master::ReadOnlyHandler::state,
               principal,
+              ContentType::JSON,
               request.url.query,
               approvers);
         }));
@@ -3401,13 +2692,14 @@
 Future<Response> Master::Http::deferBatchedRequest(
     ReadOnlyRequestHandler handler,
     const Option<Principal>& principal,
+    ContentType outputContentType,
     const hashmap<std::string, std::string>& queryParameters,
     const Owned<ObjectApprovers>& approvers) const
 {
   bool scheduleBatch = batchedRequests.empty();
 
   auto it = std::find_if(batchedRequests.begin(), batchedRequests.end(),
-      [handler, &principal, &queryParameters](
+      [handler, &principal, &queryParameters, &outputContentType](
           const BatchedRequest& batchedRequest) {
         // NOTE: This is not a general-purpose request comparison, but
         // specific to the batched requests which are always members of
@@ -3415,6 +2707,7 @@
         // on query parameters and the current master state.
         return handler == batchedRequest.handler &&
                principal == batchedRequest.principal &&
+               outputContentType == batchedRequest.outputContentType &&
                queryParameters == batchedRequest.queryParameters;
       });
 
@@ -3434,6 +2727,7 @@
     future = promise.future();
     batchedRequests.push_back(BatchedRequest{
         handler,
+        outputContentType,
         queryParameters,
         principal,
         approvers,
@@ -3466,11 +2760,16 @@
   foreach (BatchedRequest& request, batchedRequests) {
     request.promise.associate(process::async(
         [this](ReadOnlyRequestHandler handler,
+               ContentType outputContentType,
                const hashmap<std::string, std::string>& queryParameters,
                const process::Owned<ObjectApprovers>& approvers) {
-          return (readonlyHandler.*handler)(queryParameters, approvers);
+          return (readonlyHandler.*handler)(
+              outputContentType,
+              queryParameters,
+              approvers);
         },
         request.handler,
+        request.outputContentType,
         request.queryParameters,
         request.approvers));
   }
@@ -3597,6 +2896,7 @@
           return deferBatchedRequest(
               &Master::ReadOnlyHandler::stateSummary,
               principal,
+              ContentType::JSON,
               request.url.query,
               approvers);
         }));
@@ -3650,6 +2950,7 @@
             return deferBatchedRequest(
                 &Master::ReadOnlyHandler::roles,
                 principal,
+                ContentType::JSON,
                 request.url.query,
                 approvers);
           }));
@@ -3711,64 +3012,22 @@
 Future<Response> Master::Http::getRoles(
     const mesos::master::Call& call,
     const Option<Principal>& principal,
-    ContentType contentType) const
+    ContentType outputContentType) const
 {
   CHECK_EQ(mesos::master::Call::GET_ROLES, call.type());
+
   return ObjectApprovers::create(master->authorizer, principal, {VIEW_ROLE})
-    .then(defer(master->self(),
-        [this, contentType](const Owned<ObjectApprovers>& approvers)
-          -> Response {
-      const vector<string> knownRoles = master->knownRoles();
-
-      mesos::master::Response response;
-      response.set_type(mesos::master::Response::GET_ROLES);
-
-      mesos::master::Response::GetRoles* getRoles =
-        response.mutable_get_roles();
-
-      foreach (const string& name, knownRoles) {
-        if (!approvers->approved<VIEW_ROLE>(name)) {
-          continue;
-        }
-
-        mesos::Role* role = getRoles->add_roles();
-
-        role->set_name(name);
-
-        role->set_weight(master->weights.get(name).getOrElse(DEFAULT_WEIGHT));
-
-        RoleResourceBreakdown resourceBreakdown(master, name);
-
-        ResourceQuantities allocatedAndOffered =
-          resourceBreakdown.allocated() + resourceBreakdown.offered();
-
-        // `resources` will be deprecated in favor of
-        // `offered`, `allocated`, `reserved`, and quota consumption.
-        // As a result, we don't bother trying to expose more
-        // than {cpus, mem, disk, gpus} since we don't know if
-        // anything outside this set is of type SCALAR.
-        foreach (const auto& quantity, allocatedAndOffered) {
-          if (quantity.first == "cpus" || quantity.first == "mem" ||
-              quantity.first == "disk" || quantity.first == "gpus") {
-            Resource* resource = role->add_resources();
-            resource->set_name(quantity.first);
-            resource->set_type(Value::SCALAR);
-            *resource->mutable_scalar() = quantity.second;
-          }
-        }
-
-        Option<Role*> role_ = master->roles.get(name);
-
-        if (role_.isSome()) {
-          foreachkey (const FrameworkID& frameworkId, (*role_)->frameworks) {
-            *role->add_frameworks() = frameworkId;
-          }
-        }
-      }
-
-      return OK(serialize(contentType, evolve(response)),
-                stringify(contentType));
-    }));
+    .then(defer(
+          master->self(),
+          [this, principal, outputContentType](
+              const Owned<ObjectApprovers>& approvers) {
+            return deferBatchedRequest(
+                &Master::ReadOnlyHandler::getRoles,
+                principal,
+                outputContentType,
+                {},
+                approvers);
+          }));
 }
 
 
@@ -3913,66 +3172,22 @@
 Future<Response> Master::Http::getOperations(
     const mesos::master::Call& call,
     const Option<Principal>& principal,
-    ContentType contentType) const
+    ContentType outputContentType) const
 {
   CHECK_EQ(mesos::master::Call::GET_OPERATIONS, call.type());
 
   return ObjectApprovers::create(master->authorizer, principal, {VIEW_ROLE})
     .then(defer(
-        master->self(),
-        [=](const Owned<ObjectApprovers>& approvers) -> Response {
-          // We consider a principal to be authorized to view an operation if it
-          // is authorized to view the resources the operation is performed on.
-          auto approved = [&approvers](const Operation& operation) {
-            Try<Resources> consumedResources =
-              protobuf::getConsumedResources(operation.info());
-
-            if (consumedResources.isError()) {
-              LOG(WARNING)
-                << "Could not approve operation " << operation.uuid()
-                << " since its consumed resources could not be determined:"
-                << consumedResources.error();
-
-              return false;
-            }
-
-            foreach (const Resource& resource, consumedResources.get()) {
-              if (!approvers->approved<VIEW_ROLE>(resource)) {
-                return false;
-              }
-            }
-
-            return true;
-          };
-
-          mesos::master::Response response;
-          response.set_type(mesos::master::Response::GET_OPERATIONS);
-
-          mesos::master::Response::GetOperations* operations =
-            response.mutable_get_operations();
-
-          foreachvalue (const Slave* slave, master->slaves.registered) {
-            foreachvalue (Operation* operation, slave->operations) {
-              if (approved(*operation)) {
-                operations->add_operations()->CopyFrom(*operation);
-              }
-            }
-
-            foreachvalue (
-                const Slave::ResourceProvider resourceProvider,
-                slave->resourceProviders) {
-              foreachvalue (Operation* operation, resourceProvider.operations) {
-                if (approved(*operation)) {
-                  operations->add_operations()->CopyFrom(*operation);
-                }
-              }
-            }
-          }
-
-          return OK(
-              serialize(contentType, evolve(response)),
-              stringify(contentType));
-        }));
+          master->self(),
+          [this, principal, outputContentType](
+             const Owned<ObjectApprovers>& approvers) {
+            return deferBatchedRequest(
+                &Master::ReadOnlyHandler::getOperations,
+                principal,
+                outputContentType,
+                {},
+                approvers);
+          }));
 }
 
 
@@ -4043,6 +3258,7 @@
           return deferBatchedRequest(
               &Master::ReadOnlyHandler::tasks,
               principal,
+              ContentType::JSON,
               request.url.query,
               approvers);
         }));
@@ -4052,7 +3268,7 @@
 Future<Response> Master::Http::getTasks(
     const mesos::master::Call& call,
     const Option<Principal>& principal,
-    ContentType contentType) const
+    ContentType outputContentType) const
 {
   CHECK_EQ(mesos::master::Call::GET_TASKS, call.type());
 
@@ -4061,290 +3277,16 @@
       principal,
       {VIEW_FRAMEWORK, VIEW_TASK})
     .then(defer(
-        master->self(),
-        [=](const Owned<ObjectApprovers>& approvers) -> Response {
-          // Serialize the following message:
-          //
-          //    mesos::master::Response response;
-          //    response.set_type(mesos::master::Response::GET_TASKS);
-          //    *response.mutable_get_tasks() = _getTasks(approvers);
-
-          switch (contentType) {
-            case ContentType::PROTOBUF: {
-              string output;
-              google::protobuf::io::StringOutputStream stream(&output);
-              google::protobuf::io::CodedOutputStream writer(&stream);
-
-              WireFormatLite::WriteEnum(
-                  mesos::v1::master::Response::kTypeFieldNumber,
-                  mesos::v1::master::Response::GET_TASKS,
-                  &writer);
-
-              WireFormatLite::WriteBytes(
-                  mesos::v1::master::Response::kGetTasksFieldNumber,
-                  serializeGetTasks(approvers),
-                  &writer);
-
-              // We must manually trim the unused buffer space since
-              // we use the string before the coded output stream is
-              // destructed.
-              writer.Trim();
-
-              return OK(std::move(output), stringify(contentType));
-            }
-
-            case ContentType::JSON: {
-              string body = jsonify([&](JSON::ObjectWriter* writer) {
-                const google::protobuf::Descriptor* descriptor =
-                  v1::master::Response::descriptor();
-
-                int field;
-
-                field = v1::master::Response::kTypeFieldNumber;
-                writer->field(
-                    descriptor->FindFieldByNumber(field)->name(),
-                    v1::master::Response::Type_Name(
-                        v1::master::Response::GET_TASKS));
-
-                field = v1::master::Response::kGetTasksFieldNumber;
-                writer->field(
-                    descriptor->FindFieldByNumber(field)->name(),
-                    jsonifyGetTasks(master, approvers));
-              });
-
-              // TODO(bmahler): Pass jsonp query parameter through here.
-              return OK(std::move(body), stringify(contentType));
-            }
-
-            default:
-              return NotAcceptable("Request must accept json or protobuf");
-          }
-    }));
-}
-
-
-function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetTasks(
-    const Master* master,
-    const Owned<ObjectApprovers>& approvers)
-{
-  // Jsonify the following message:
-  //
-  //  master::Response::GetTasks getTasks;
-  //  for each pending task:
-  //    *getTasks.add_pending_tasks() =
-  //      protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
-  //  for each task:
-  //    *getTasks.add_tasks() = *task;
-  //  for each unreachable task:
-  //    *getTasks.add_unreachable_tasks() = *task;
-  //  for each completed task:
-  //    *getTasks.add_completed_tasks() = *task;
-
-  // TODO(bmahler): This copies the Owned object approvers.
-  return [=](JSON::ObjectWriter* writer) {
-    // Construct framework list with both active and completed frameworks.
-    vector<const Framework*> frameworks;
-    foreachvalue (Framework* framework, master->frameworks.registered) {
-      // Skip unauthorized frameworks.
-      if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
-        frameworks.push_back(framework);
-      }
-    }
-    foreachvalue (const Owned<Framework>& framework,
-                  master->frameworks.completed) {
-      // Skip unauthorized frameworks.
-      if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
-        frameworks.push_back(framework.get());
-      }
-    }
-
-    const google::protobuf::Descriptor* descriptor =
-      v1::master::Response::GetTasks::descriptor();
-
-    int field;
-
-    // Pending tasks.
-    field = v1::master::Response::GetTasks::kPendingTasksFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        [&](JSON::ArrayWriter* writer) {
-          foreach (const Framework* framework, frameworks) {
-            foreachvalue (const TaskInfo& t, framework->pendingTasks) {
-              // Skip unauthorized tasks.
-              if (!approvers->approved<VIEW_TASK>(t, framework->info)) {
-                continue;
-              }
-
-              Task task =
-                  protobuf::createTask(t, TASK_STAGING, framework->id());
-
-              writer->element(asV1Protobuf(task));
-            }
-          }
-        });
-
-    // Active tasks.
-    field = v1::master::Response::GetTasks::kTasksFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        [&](JSON::ArrayWriter* writer) {
-          foreach (const Framework* framework, frameworks) {
-            foreachvalue (Task* task, framework->tasks) {
-              CHECK_NOTNULL(task);
-              // Skip unauthorized tasks.
-              if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
-                continue;
-              }
-
-              writer->element(asV1Protobuf(*task));
-            }
-          }
-        });
-
-    // Unreachable tasks.
-    field = v1::master::Response::GetTasks::kUnreachableTasksFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        [&](JSON::ArrayWriter* writer) {
-          foreach (const Framework* framework, frameworks) {
-            foreachvalue (const Owned<Task>& task,
-                          framework->unreachableTasks) {
-              // Skip unauthorized tasks.
-              if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
-                continue;
-              }
-
-              writer->element(asV1Protobuf(*task));
-            }
-          }
-        });
-
-    // Completed tasks.
-    field = v1::master::Response::GetTasks::kCompletedTasksFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        [&](JSON::ArrayWriter* writer) {
-          foreach (const Framework* framework, frameworks) {
-            foreach (const Owned<Task>& task, framework->completedTasks) {
-              // Skip unauthorized tasks.
-              if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
-                continue;
-              }
-
-              writer->element(asV1Protobuf(*task));
-            }
-          }
-        });
-  };
-}
-
-
-string Master::Http::serializeGetTasks(
-    const Owned<ObjectApprovers>& approvers) const
-{
-  // Construct framework list with both active and completed frameworks.
-  vector<const Framework*> frameworks;
-  foreachvalue (Framework* framework, master->frameworks.registered) {
-    // Skip unauthorized frameworks.
-    if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
-      frameworks.push_back(framework);
-    }
-  }
-  foreachvalue (const Owned<Framework>& framework,
-                master->frameworks.completed) {
-    // Skip unauthorized frameworks.
-    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
-      frameworks.push_back(framework.get());
-    }
-  }
-
-  // Serialize the following message:
-  //
-  //  mesos::master::Response::GetTasks getTasks;
-  //  for each pending task:
-  //    *getTasks.add_pending_tasks() =
-  //      protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
-  //  for each task:
-  //    *getTasks.add_tasks() = *task;
-  //  for each unreachable task:
-  //    *getTasks.add_unreachable_tasks() = *task;
-  //  for each completed task:
-  //    *getTasks.add_completed_tasks() = *task;
-
-  string output;
-  google::protobuf::io::StringOutputStream stream(&output);
-  google::protobuf::io::CodedOutputStream writer(&stream);
-
-  foreach (const Framework* framework, frameworks) {
-    // Pending tasks.
-    foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
-      // Skip unauthorized tasks.
-      if (!approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
-        continue;
-      }
-
-      // TODO(bmahler): Consider not constructing the temporary task
-      // object and instead serialize directly. Since we don't expect
-      // a large number of pending tasks, we currently don't bother
-      // with the more efficient approach.
-      //
-      // *getTasks.add_pending_tasks() =
-      //   protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
-      WireFormatLite2::WriteMessageWithoutCachedSizes(
-          mesos::v1::master::Response::GetTasks::kPendingTasksFieldNumber,
-          protobuf::createTask(taskInfo, TASK_STAGING, framework->id()),
-          &writer);
-    }
-
-    // Active tasks.
-    foreachvalue (Task* task, framework->tasks) {
-      CHECK_NOTNULL(task);
-      // Skip unauthorized tasks.
-      if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
-        continue;
-      }
-
-      WireFormatLite2::WriteMessageWithoutCachedSizes(
-          mesos::v1::master::Response::GetTasks::kTasksFieldNumber,
-          *task,
-          &writer);
-    }
-
-    // Unreachable tasks.
-    foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
-      // Skip unauthorized tasks.
-      if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
-        continue;
-      }
-
-      WireFormatLite2::WriteMessageWithoutCachedSizes(
-          mesos::v1::master::Response::GetTasks::kUnreachableTasksFieldNumber,
-          *task,
-          &writer);
-    }
-
-    // Completed tasks.
-    foreach (const Owned<Task>& task, framework->completedTasks) {
-      // Skip unauthorized tasks.
-      if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
-        continue;
-      }
-
-      WireFormatLite2::WriteMessageWithoutCachedSizes(
-          mesos::v1::master::Response::GetTasks::kCompletedTasksFieldNumber,
-          *task,
-          &writer);
-    }
-  }
-
-  // While an explicit Trim() isn't necessary (since the coded
-  // output stream is destructed before the string is returned),
-  // it's a quite tricky bug to diagnose if Trim() is missed, so
-  // we always do it explicitly to signal the reader about this
-  // subtlety.
-  writer.Trim();
-
-  return output;
+          master->self(),
+          [this, principal, outputContentType](
+             const Owned<ObjectApprovers>& approvers) {
+            return deferBatchedRequest(
+                &Master::ReadOnlyHandler::getTasks,
+                principal,
+                outputContentType,
+                {},
+                approvers);
+          }));
 }
 
 
diff --git a/src/master/master.hpp b/src/master/master.hpp
index c9d417c..3074918 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1309,34 +1309,117 @@
 
     // /frameworks
     process::http::Response frameworks(
+        ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // /roles
     process::http::Response roles(
+        ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // /slaves
     process::http::Response slaves(
+        ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // /state
     process::http::Response state(
+        ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // /state-summary
     process::http::Response stateSummary(
+        ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // /tasks
     process::http::Response tasks(
+        ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
+    // master::Call::GET_STATE
+    process::http::Response getState(
+        ContentType outputContentType,
+        const hashmap<std::string, std::string>& queryParameters,
+        const process::Owned<ObjectApprovers>& approvers) const;
+
+    // master::Call::GET_AGENTS
+    process::http::Response getAgents(
+        ContentType outputContentType,
+        const hashmap<std::string, std::string>& queryParameters,
+        const process::Owned<ObjectApprovers>& approvers) const;
+
+    // master::Call::GET_FRAMEWORKS
+    process::http::Response getFrameworks(
+        ContentType outputContentType,
+        const hashmap<std::string, std::string>& queryParameters,
+        const process::Owned<ObjectApprovers>& approvers) const;
+
+    // master::Call::GET_EXECUTORS
+    process::http::Response getExecutors(
+        ContentType outputContentType,
+        const hashmap<std::string, std::string>& queryParameters,
+        const process::Owned<ObjectApprovers>& approvers) const;
+
+    // master::Call::GET_OPERATIONS
+    process::http::Response getOperations(
+        ContentType outputContentType,
+        const hashmap<std::string, std::string>& queryParameters,
+        const process::Owned<ObjectApprovers>& approvers) const;
+
+    // master::Call::GET_TASKS
+    process::http::Response getTasks(
+        ContentType outputContentType,
+        const hashmap<std::string, std::string>& queryParameters,
+        const process::Owned<ObjectApprovers>& approvers) const;
+
+    // master::Call::GET_ROLES
+    process::http::Response getRoles(
+        ContentType outputContentType,
+        const hashmap<std::string, std::string>& queryParameters,
+        const process::Owned<ObjectApprovers>& approvers) const;
+
+    // TODO(bmahler): These could just live in the .cpp file,
+    // however they are shared with SUBSCRIBE which currently
+    // is not implemented as a read only handler here. Make these
+    // private or only in the .cpp file once SUBSCRIBE is moved
+    // into readonly_handler.cpp.
+    std::string serializeGetState(
+        const process::Owned<ObjectApprovers>& approvers) const;
+    std::string serializeGetAgents(
+        const process::Owned<ObjectApprovers>& approvers) const;
+    std::string serializeGetFrameworks(
+        const process::Owned<ObjectApprovers>& approvers) const;
+    std::string serializeGetExecutors(
+        const process::Owned<ObjectApprovers>& approvers) const;
+    std::string serializeGetOperations(
+        const process::Owned<ObjectApprovers>& approvers) const;
+    std::string serializeGetTasks(
+        const process::Owned<ObjectApprovers>& approvers) const;
+    std::string serializeGetRoles(
+        const process::Owned<ObjectApprovers>& approvers) const;
+
+    std::function<void(JSON::ObjectWriter*)> jsonifyGetState(
+        const process::Owned<ObjectApprovers>& approvers) const;
+    std::function<void(JSON::ObjectWriter*)> jsonifyGetAgents(
+        const process::Owned<ObjectApprovers>& approvers) const;
+    std::function<void(JSON::ObjectWriter*)> jsonifyGetFrameworks(
+        const process::Owned<ObjectApprovers>& approvers) const;
+    std::function<void(JSON::ObjectWriter*)> jsonifyGetExecutors(
+        const process::Owned<ObjectApprovers>& approvers) const;
+    std::function<void(JSON::ObjectWriter*)> jsonifyGetOperations(
+        const process::Owned<ObjectApprovers>& approvers) const;
+    std::function<void(JSON::ObjectWriter*)> jsonifyGetTasks(
+        const process::Owned<ObjectApprovers>& approvers) const;
+    std::function<void(JSON::ObjectWriter*)> jsonifyGetRoles(
+        const process::Owned<ObjectApprovers>& approvers) const;
+
   private:
     const Master* master;
   };
@@ -1631,12 +1714,6 @@
         const mesos::master::Call& call,
         const Option<process::http::authentication::Principal>& principal,
         ContentType contentType) const;
-
-    static std::function<void(JSON::ObjectWriter*)> jsonifyGetAgents(
-        const Master* master,
-        const process::Owned<ObjectApprovers>& approvers);
-    std::string serializeGetAgents(
-        const process::Owned<ObjectApprovers>& approvers) const;
     mesos::master::Response::GetAgents _getAgents(
         const process::Owned<ObjectApprovers>& approvers) const;
 
@@ -1734,12 +1811,6 @@
         const mesos::master::Call& call,
         const Option<process::http::authentication::Principal>& principal,
         ContentType contentType) const;
-
-    static std::function<void(JSON::ObjectWriter*)> jsonifyGetTasks(
-        const Master* master,
-        const process::Owned<ObjectApprovers>& approvers);
-    std::string serializeGetTasks(
-        const process::Owned<ObjectApprovers>& approvers) const;
     mesos::master::Response::GetTasks _getTasks(
         const process::Owned<ObjectApprovers>& approvers) const;
 
@@ -1777,12 +1848,6 @@
         const mesos::master::Call& call,
         const Option<process::http::authentication::Principal>& principal,
         ContentType contentType) const;
-
-    static std::function<void(JSON::ObjectWriter*)> jsonifyGetFrameworks(
-        const Master* master,
-        const process::Owned<ObjectApprovers>& approvers);
-    std::string serializeGetFrameworks(
-        const process::Owned<ObjectApprovers>& approvers) const;
     mesos::master::Response::GetFrameworks _getFrameworks(
         const process::Owned<ObjectApprovers>& approvers) const;
 
@@ -1790,12 +1855,6 @@
         const mesos::master::Call& call,
         const Option<process::http::authentication::Principal>& principal,
         ContentType contentType) const;
-
-    static std::function<void(JSON::ObjectWriter*)> jsonifyGetExecutors(
-        const Master* master,
-        const process::Owned<ObjectApprovers>& approvers);
-    std::string serializeGetExecutors(
-        const process::Owned<ObjectApprovers>& approvers) const;
     mesos::master::Response::GetExecutors _getExecutors(
         const process::Owned<ObjectApprovers>& approvers) const;
 
@@ -1803,12 +1862,6 @@
         const mesos::master::Call& call,
         const Option<process::http::authentication::Principal>& principal,
         ContentType contentType) const;
-
-    static std::function<void(JSON::ObjectWriter*)> jsonifyGetState(
-        const Master* master,
-        const process::Owned<ObjectApprovers>& approvers);
-    std::string serializeGetState(
-        const process::Owned<ObjectApprovers>& approvers) const;
     mesos::master::Response::GetState _getState(
         const process::Owned<ObjectApprovers>& approvers) const;
 
@@ -1864,12 +1917,14 @@
 
     typedef process::http::Response
       (Master::ReadOnlyHandler::*ReadOnlyRequestHandler)(
+          ContentType,
           const hashmap<std::string, std::string>&,
           const process::Owned<ObjectApprovers>&) const;
 
     process::Future<process::http::Response> deferBatchedRequest(
         ReadOnlyRequestHandler handler,
         const Option<process::http::authentication::Principal>& principal,
+        ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
@@ -1878,6 +1933,7 @@
     struct BatchedRequest
     {
       ReadOnlyRequestHandler handler;
+      ContentType outputContentType;
       hashmap<std::string, std::string> queryParameters;
       Option<process::http::authentication::Principal> principal;
       process::Owned<ObjectApprovers> approvers;
@@ -2685,6 +2741,11 @@
   hashmap<FrameworkID, Framework*> frameworks;
 };
 
+
+mesos::master::Response::GetFrameworks::Framework model(
+    const Framework& framework);
+
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {
diff --git a/src/master/readonly_handler.cpp b/src/master/readonly_handler.cpp
index 500ce3f..fbe748d 100644
--- a/src/master/readonly_handler.cpp
+++ b/src/master/readonly_handler.cpp
@@ -19,6 +19,12 @@
 #include <string>
 #include <vector>
 
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/wire_format_lite.h>
+
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
 #include <mesos/mesos.hpp>
 
 #include <mesos/authorizer/authorizer.hpp>
@@ -36,8 +42,11 @@
 #include "common/build.hpp"
 #include "common/http.hpp"
 
+using google::protobuf::internal::WireFormatLite;
+
 using process::Owned;
 
+using process::http::NotAcceptable;
 using process::http::OK;
 
 using mesos::authorization::VIEW_EXECUTOR;
@@ -46,6 +55,9 @@
 using mesos::authorization::VIEW_ROLE;
 using mesos::authorization::VIEW_TASK;
 
+using mesos::internal::protobuf::WireFormatLite2;
+
+using std::function;
 using std::vector;
 using std::string;
 
@@ -661,9 +673,12 @@
 
 
 process::http::Response Master::ReadOnlyHandler::frameworks(
+    ContentType outputContentType,
     const hashmap<std::string, std::string>& query,
     const process::Owned<ObjectApprovers>& approvers) const
 {
+  CHECK_EQ(outputContentType, ContentType::JSON);
+
   IDAcceptor<FrameworkID> selectFrameworkId(
       query.get("framework_id"));
 
@@ -719,9 +734,12 @@
 
 
 process::http::Response Master::ReadOnlyHandler::roles(
+    ContentType outputContentType,
     const hashmap<std::string, std::string>& query,
     const process::Owned<ObjectApprovers>& approvers) const
 {
+  CHECK_EQ(outputContentType, ContentType::JSON);
+
   const Master* master = this->master;
 
   const vector<string> knownRoles = master->knownRoles();
@@ -796,9 +814,12 @@
 
 
 process::http::Response Master::ReadOnlyHandler::slaves(
+    ContentType outputContentType,
     const hashmap<std::string, std::string>& query,
     const process::Owned<ObjectApprovers>& approvers) const
 {
+  CHECK_EQ(outputContentType, ContentType::JSON);
+
   IDAcceptor<SlaveID> selectSlaveId(query.get("slave_id"));
 
   return process::http::OK(
@@ -808,9 +829,12 @@
 
 
 process::http::Response Master::ReadOnlyHandler::state(
+    ContentType outputContentType,
     const hashmap<std::string, std::string>& query,
     const process::Owned<ObjectApprovers>& approvers) const
 {
+  CHECK_EQ(outputContentType, ContentType::JSON);
+
   const Master* master = this->master;
   auto calculateState = [master, &approvers](JSON::ObjectWriter* writer) {
     writer->field("version", MESOS_VERSION);
@@ -954,9 +978,12 @@
 
 
 process::http::Response Master::ReadOnlyHandler::stateSummary(
+    ContentType outputContentType,
     const hashmap<std::string, std::string>& query,
     const process::Owned<ObjectApprovers>& approvers) const
 {
+  CHECK_EQ(outputContentType, ContentType::JSON);
+
   const Master* master = this->master;
   auto stateSummary = [master, &approvers](JSON::ObjectWriter* writer) {
     writer->field("hostname", master->info().hostname());
@@ -1150,9 +1177,12 @@
 
 
 process::http::Response Master::ReadOnlyHandler::tasks(
-  const hashmap<std::string, std::string>& query,
-  const process::Owned<ObjectApprovers>& approvers) const
+    ContentType outputContentType,
+    const hashmap<std::string, std::string>& query,
+    const process::Owned<ObjectApprovers>& approvers) const
 {
+  CHECK_EQ(outputContentType, ContentType::JSON);
+
   // Get list options (limit and offset).
   Result<int> result = numify<int>(query.get("limit"));
   size_t limit = result.isSome() ? result.get() : TASK_LIMIT;
@@ -1257,6 +1287,1182 @@
   return OK(jsonify(tasksWriter), query.get("jsonp"));
 }
 
+
+function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetAgents(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following:
+  //
+  //   mesos::master::Response::GetAgents getAgents;
+  //   for each registered agent:
+  //     *getAgents.add_agents() = protobuf::master::event::createAgentResponse(
+  //         agent,
+  //         master->slaves.draining.get(slave->id),
+  //         master->slaves.deactivated.contains(slave->id),
+  //         approvers);
+  //   for each recovered agent:
+  //     SlaveInfo* agent = getAgents.add_recovered_agents();
+  //     agent->CopyFrom(slaveInfo);
+  //     agent->clear_resources();
+  //     foreach (const Resource& resource, slaveInfo.resources()):
+  //       if (approvers->approved<VIEW_ROLE>(resource)):
+  //         *agent->add_resources() = resource;
+
+  // TODO(bmahler): This copies the Owned object approvers.
+  return [=](JSON::ObjectWriter* writer) {
+    const google::protobuf::Descriptor* descriptor =
+      v1::master::Response::GetAgents::descriptor();
+
+    int field;
+
+    field = v1::master::Response::GetAgents::kAgentsFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+      foreachvalue (const Slave* slave, master->slaves.registered) {
+        // TODO(bmahler): Consider not constructing the temporary
+        // agent object and instead serialize directly.
+        mesos::master::Response::GetAgents::Agent agent =
+            protobuf::master::event::createAgentResponse(
+                *slave,
+                master->slaves.draining.get(slave->id),
+                master->slaves.deactivated.contains(slave->id),
+                approvers);
+
+        writer->element(asV1Protobuf(agent));
+      }
+    });
+
+    field = v1::master::Response::GetAgents::kRecoveredAgentsFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+      foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
+        // TODO(bmahler): Consider not constructing the temporary
+        // SlaveInfo object and instead serialize directly.
+        SlaveInfo agent = slaveInfo;
+        agent.clear_resources();
+        foreach (const Resource& resource, slaveInfo.resources()) {
+          if (approvers->approved<VIEW_ROLE>(resource)) {
+            *agent.add_resources() = resource;
+          }
+        }
+
+        writer->element(asV1Protobuf(agent));
+      }
+    });
+  };
+}
+
+
+string Master::ReadOnlyHandler::serializeGetAgents(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following:
+  //
+  //   mesos::master::Response::GetAgents getAgents;
+  //   for each registered agent:
+  //     *getAgents.add_agents() = protobuf::master::event::createAgentResponse(
+  //         agent,
+  //         master->slaves.draining.get(slave->id),
+  //         master->slaves.deactivated.contains(slave->id),
+  //         approvers);
+  //   for each recovered agent:
+  //     SlaveInfo* agent = getAgents.add_recovered_agents();
+  //     agent->CopyFrom(slaveInfo);
+  //     agent->clear_resources();
+  //     foreach (const Resource& resource, slaveInfo.resources()):
+  //       if (approvers->approved<VIEW_ROLE>(resource)):
+  //         *agent->add_resources() = resource;
+
+  string output;
+  google::protobuf::io::StringOutputStream stream(&output);
+  google::protobuf::io::CodedOutputStream writer(&stream);
+
+  foreachvalue (const Slave* slave, master->slaves.registered) {
+    // TODO(bmahler): Consider not constructing the temporary
+    // agent object and instead serialize directly.
+    WireFormatLite2::WriteMessageWithoutCachedSizes(
+        mesos::master::Response::GetAgents::kAgentsFieldNumber,
+        protobuf::master::event::createAgentResponse(
+            *slave,
+            master->slaves.draining.get(slave->id),
+            master->slaves.deactivated.contains(slave->id),
+            approvers),
+        &writer);
+  }
+
+  foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
+    // TODO(bmahler): Consider not constructing the temporary
+    // SlaveInfo object and instead serialize directly.
+    SlaveInfo agent = slaveInfo;
+    agent.clear_resources();
+    foreach (const Resource& resource, slaveInfo.resources()) {
+      if (approvers->approved<VIEW_ROLE>(resource)) {
+        *agent.add_resources() = resource;
+      }
+    }
+
+    WireFormatLite2::WriteMessageWithoutCachedSizes(
+        mesos::master::Response::GetAgents::kRecoveredAgentsFieldNumber,
+        agent,
+        &writer);
+  }
+
+  // While an explicit Trim() isn't necessary (since the coded
+  // output stream is destructed before the string is returned),
+  // it's a quite tricky bug to diagnose if Trim() is missed, so
+  // we always do it explicitly to signal the reader about this
+  // subtlety.
+  writer.Trim();
+
+  return output;
+}
+
+
+
+process::http::Response Master::ReadOnlyHandler::getAgents(
+    ContentType outputContentType,
+    const hashmap<std::string, std::string>& query,
+    const process::Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following message:
+  //
+  //   mesos::master::Response response;
+  //   response.set_type(mesos::master::Response::GET_AGENTS);
+  //   *response.mutable_get_agents() = _getAgents(approvers);
+
+  switch (outputContentType) {
+    case ContentType::PROTOBUF: {
+      string output;
+      google::protobuf::io::StringOutputStream stream(&output);
+      google::protobuf::io::CodedOutputStream writer(&stream);
+
+      WireFormatLite::WriteEnum(
+          mesos::v1::master::Response::kTypeFieldNumber,
+          mesos::v1::master::Response::GET_AGENTS,
+          &writer);
+
+      WireFormatLite::WriteBytes(
+          mesos::v1::master::Response::kGetAgentsFieldNumber,
+          serializeGetAgents(approvers),
+          &writer);
+
+      // We must manually trim the unused buffer space since
+      // we use the string before the coded output stream is
+      // destructed.
+      writer.Trim();
+
+      return OK(std::move(output), stringify(outputContentType));
+    }
+
+    case ContentType::JSON: {
+      string body = jsonify([&](JSON::ObjectWriter* writer) {
+        const google::protobuf::Descriptor* descriptor =
+          v1::master::Response::descriptor();
+
+        int field;
+
+        field = v1::master::Response::kTypeFieldNumber;
+        writer->field(
+            descriptor->FindFieldByNumber(field)->name(),
+            v1::master::Response::Type_Name(
+                v1::master::Response::GET_AGENTS));
+
+        field = v1::master::Response::kGetAgentsFieldNumber;
+        writer->field(
+            descriptor->FindFieldByNumber(field)->name(),
+            jsonifyGetAgents(approvers));
+      });
+
+      // TODO(bmahler): Pass jsonp query parameter through here.
+      return OK(std::move(body), stringify(outputContentType));
+    }
+
+    default:
+      return NotAcceptable("Request must accept json or protobuf");
+  }
+}
+
+
+function<void(JSON::ObjectWriter*)>
+  Master::ReadOnlyHandler::jsonifyGetFrameworks(
+      const Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following:
+  //
+  //   mesos::master::Response::GetFrameworks getFrameworks;
+  //   for each framework:
+  //     *getFrameworks.add_frameworks() = model(*framework);
+  //   for each completed framework:
+  //     *getFrameworks.add_completed_frameworks() = model(*framework);
+
+  // TODO(bmahler): Consider not constructing the temporary framework
+  // objects and instead serialize directly, but since we don't
+  // expect a large number of pending tasks, we currently don't
+  // bother with the more efficient approach.
+
+  // TODO(bmahler): This copies the Owned object approvers.
+  return [=](JSON::ObjectWriter* writer) {
+    const google::protobuf::Descriptor* descriptor =
+      v1::master::Response::GetFrameworks::descriptor();
+
+    int field;
+
+    field = v1::master::Response::GetFrameworks::kFrameworksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+      foreachvalue (const Framework* framework,
+                    master->frameworks.registered) {
+        // Skip unauthorized frameworks.
+        if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+          continue;
+        }
+
+        mesos::master::Response::GetFrameworks::Framework f = model(*framework);
+        writer->element(asV1Protobuf(f));
+      }
+    });
+
+    field =
+      v1::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+      foreachvalue (const Owned<Framework>& framework,
+                    master->frameworks.completed) {
+        // Skip unauthorized frameworks.
+        if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+          continue;
+        }
+
+        mesos::master::Response::GetFrameworks::Framework f = model(*framework);
+        writer->element(asV1Protobuf(f));
+      }
+    });
+  };
+}
+
+
+string Master::ReadOnlyHandler::serializeGetFrameworks(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following:
+  //
+  //   mesos::master::Response::GetFrameworks getFrameworks;
+  //   for each framework:
+  //     *getFrameworks.add_frameworks() = model(*framework);
+  //   for each completed framework:
+  //     *getFrameworks.add_completed_frameworks() = model(*framework);
+
+  string output;
+  google::protobuf::io::StringOutputStream stream(&output);
+  google::protobuf::io::CodedOutputStream writer(&stream);
+
+  // TODO(bmahler): Consider not constructing the temporary framework
+  // objects and instead serialize directly, but since we don't
+  // expect a large number of pending tasks, we currently don't
+  // bother with the more efficient approach.
+
+  foreachvalue (const Framework* framework,
+                master->frameworks.registered) {
+    // Skip unauthorized frameworks.
+    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+      continue;
+    }
+
+    WireFormatLite2::WriteMessageWithoutCachedSizes(
+        mesos::master::Response::GetFrameworks::kFrameworksFieldNumber,
+        model(*framework),
+        &writer);
+  }
+
+  foreachvalue (const Owned<Framework>& framework,
+                master->frameworks.completed) {
+    // Skip unauthorized frameworks.
+    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+      continue;
+    }
+
+    WireFormatLite2::WriteMessageWithoutCachedSizes(
+        mesos::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber,
+        model(*framework),
+        &writer);
+  }
+
+  // While an explicit Trim() isn't necessary (since the coded
+  // output stream is destructed before the string is returned),
+  // it's a quite tricky bug to diagnose if Trim() is missed, so
+  // we always do it explicitly to signal the reader about this
+  // subtlety.
+  writer.Trim();
+
+  return output;
+}
+
+
+process::http::Response Master::ReadOnlyHandler::getFrameworks(
+    ContentType outputContentType,
+    const hashmap<std::string, std::string>& query,
+    const process::Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following message:
+  //
+  //   mesos::master::Response response;
+  //   response.set_type(mesos::master::Response::GET_FRAMEWORKS);
+  //   *response.mutable_get_frameworks() = _getFrameworks(approvers);
+
+  switch (outputContentType) {
+    case ContentType::PROTOBUF: {
+      string output;
+      google::protobuf::io::StringOutputStream stream(&output);
+      google::protobuf::io::CodedOutputStream writer(&stream);
+
+      WireFormatLite::WriteEnum(
+          mesos::v1::master::Response::kTypeFieldNumber,
+          mesos::v1::master::Response::GET_FRAMEWORKS,
+          &writer);
+
+      WireFormatLite::WriteBytes(
+          mesos::v1::master::Response::kGetFrameworksFieldNumber,
+          serializeGetFrameworks(approvers),
+          &writer);
+
+      // We must manually trim the unused buffer space since
+      // we use the string before the coded output stream is
+      // destructed.
+      writer.Trim();
+
+      return OK(std::move(output), stringify(outputContentType));
+    }
+
+    case ContentType::JSON: {
+      string body = jsonify([&](JSON::ObjectWriter* writer) {
+        const google::protobuf::Descriptor* descriptor =
+          v1::master::Response::descriptor();
+
+        int field;
+
+        field = v1::master::Response::kTypeFieldNumber;
+        writer->field(
+            descriptor->FindFieldByNumber(field)->name(),
+            v1::master::Response::Type_Name(
+                v1::master::Response::GET_FRAMEWORKS));
+
+        field = v1::master::Response::kGetFrameworksFieldNumber;
+        writer->field(
+            descriptor->FindFieldByNumber(field)->name(),
+            jsonifyGetFrameworks(approvers));
+      });
+
+      // TODO(bmahler): Pass jsonp query parameter through here.
+      return OK(std::move(body), stringify(outputContentType));
+    }
+
+    default:
+      return NotAcceptable("Request must accept json or protobuf");
+  }
+}
+
+
+function<void(JSON::ObjectWriter*)>
+  Master::ReadOnlyHandler::jsonifyGetExecutors(
+      const Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following:
+  //
+  //   mesos::master::Response::GetExecutors getExecutors;
+  //
+  //   for each (executor, agent):
+  //     mesos::master::Response::GetExecutors::Executor* executor =
+  //       getExecutors.add_executors();
+  //     *executor->mutable_executor_info() = executorInfo;
+  //     *executor->mutable_slave_id() = slaveId;
+
+  // TODO(bmahler): This copies the owned object approvers.
+  return [=](JSON::ObjectWriter* writer) {
+    // Construct framework list with both active and completed frameworks.
+    vector<const Framework*> frameworks;
+    foreachvalue (Framework* framework, master->frameworks.registered) {
+      // Skip unauthorized frameworks.
+      if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+        frameworks.push_back(framework);
+      }
+    }
+    foreachvalue (const Owned<Framework>& framework,
+                  master->frameworks.completed) {
+      // Skip unauthorized frameworks.
+      if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+        frameworks.push_back(framework.get());
+      }
+    }
+
+    const google::protobuf::Descriptor* descriptor =
+      v1::master::Response::GetExecutors::descriptor();
+
+    int field;
+
+    field = v1::master::Response::GetExecutors::kExecutorsFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+      foreach (const Framework* framework, frameworks) {
+        foreachpair (const SlaveID& slaveId,
+                     const auto& executorsMap,
+                     framework->executors) {
+          foreachvalue (const ExecutorInfo& executorInfo, executorsMap) {
+            // Skip unauthorized executors.
+            if (!approvers->approved<VIEW_EXECUTOR>(
+                    executorInfo, framework->info)) {
+              continue;
+            }
+
+            writer->element([&](JSON::ObjectWriter* writer) {
+              const google::protobuf::Descriptor* descriptor =
+                v1::master::Response::GetExecutors::Executor::descriptor();
+
+              // Serialize the following message:
+              //
+              //   mesos::master::Response::GetExecutors::Executor executor;
+              //   *executor.mutable_executor_info() = executorInfo;
+              //   *executor.mutable_slave_id() = slaveId;
+              int field;
+
+              field = v1::master::Response::GetExecutors::Executor
+                ::kExecutorInfoFieldNumber;
+              writer->field(
+                  descriptor->FindFieldByNumber(field)->name(),
+                  asV1Protobuf(executorInfo));
+
+              field = v1::master::Response::GetExecutors::Executor
+                ::kAgentIdFieldNumber;
+              writer->field(
+                  descriptor->FindFieldByNumber(field)->name(),
+                  asV1Protobuf(slaveId));
+            });
+          }
+        }
+      }
+    });
+  };
+}
+
+
+string Master::ReadOnlyHandler::serializeGetExecutors(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Construct framework list with both active and completed frameworks.
+  vector<const Framework*> frameworks;
+  foreachvalue (Framework* framework, master->frameworks.registered) {
+    // Skip unauthorized frameworks.
+    if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+      frameworks.push_back(framework);
+    }
+  }
+  foreachvalue (const Owned<Framework>& framework,
+                master->frameworks.completed) {
+    // Skip unauthorized frameworks.
+    if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+      frameworks.push_back(framework.get());
+    }
+  }
+
+  // Lambda for serializing the following message:
+  //
+  //   mesos::master::Response::GetExecutors::Executor executor;
+  //   *executor.mutable_executor_info() = executorInfo;
+  //   *executor.mutable_slave_id() = slaveId;
+  auto serializeExecutor = [](const ExecutorInfo& e, const SlaveID& s) {
+    string output;
+    google::protobuf::io::StringOutputStream stream(&output);
+    google::protobuf::io::CodedOutputStream writer(&stream);
+
+    WireFormatLite2::WriteMessageWithoutCachedSizes(
+        mesos::v1::master::Response::GetExecutors::Executor
+          ::kExecutorInfoFieldNumber,
+        e,
+        &writer);
+
+    WireFormatLite2::WriteMessageWithoutCachedSizes(
+        mesos::v1::master::Response::GetExecutors::Executor
+          ::kAgentIdFieldNumber,
+        s,
+        &writer);
+
+    // While an explicit Trim() isn't necessary (since the coded
+    // output stream is destructed before the string is returned),
+    // it's a quite tricky bug to diagnose if Trim() is missed, so
+    // we always do it explicitly to signal the reader about this
+    // subtlety.
+    writer.Trim();
+
+    return output;
+  };
+
+  string output;
+  google::protobuf::io::StringOutputStream stream(&output);
+  google::protobuf::io::CodedOutputStream writer(&stream);
+
+  // Serialize the following:
+  //
+  //   mesos::master::Response::GetExecutors getExecutors;
+  //
+  //   for each (executor, agent):
+  //     mesos::master::Response::GetExecutors::Executor* executor =
+  //       getExecutors.add_executors();
+  //     *executor->mutable_executor_info() = executorInfo;
+  //     *executor->mutable_slave_id() = slaveId;
+
+  foreach (const Framework* framework, frameworks) {
+    foreachpair (const SlaveID& slaveId,
+                 const auto& executorsMap,
+                 framework->executors) {
+      foreachvalue (const ExecutorInfo& executorInfo, executorsMap) {
+        // Skip unauthorized executors.
+        if (!approvers->approved<VIEW_EXECUTOR>(
+                executorInfo, framework->info)) {
+          continue;
+        }
+
+        WireFormatLite::WriteBytes(
+            mesos::v1::master::Response::GetExecutors::kExecutorsFieldNumber,
+            serializeExecutor(executorInfo, slaveId),
+            &writer);
+      }
+    }
+  }
+
+  // While an explicit Trim() isn't necessary (since the coded
+  // output stream is destructed before the string is returned),
+  // it's a quite tricky bug to diagnose if Trim() is missed, so
+  // we always do it explicitly to signal the reader about this
+  // subtlety.
+  writer.Trim();
+
+  return output;
+}
+
+
+process::http::Response Master::ReadOnlyHandler::getExecutors(
+    ContentType outputContentType,
+    const hashmap<std::string, std::string>& query,
+    const process::Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following message:
+  //
+  //   mesos::master::Response response;
+  //   response.set_type(mesos::master::Response::GET_EXECUTORS);
+  //   *response.mutable_get_executors() = _getExecutors(approvers);
+
+  switch (outputContentType) {
+    case ContentType::PROTOBUF: {
+      string output;
+      google::protobuf::io::StringOutputStream stream(&output);
+      google::protobuf::io::CodedOutputStream writer(&stream);
+
+      WireFormatLite::WriteEnum(
+          mesos::v1::master::Response::kTypeFieldNumber,
+          mesos::v1::master::Response::GET_EXECUTORS,
+          &writer);
+
+      WireFormatLite::WriteBytes(
+          mesos::v1::master::Response::kGetExecutorsFieldNumber,
+          serializeGetExecutors(approvers),
+          &writer);
+
+      // We must manually trim the unused buffer space since
+      // we use the string before the coded output stream is
+      // destructed.
+      writer.Trim();
+
+      return OK(std::move(output), stringify(outputContentType));
+    }
+
+    case ContentType::JSON: {
+      string body = jsonify([&](JSON::ObjectWriter* writer) {
+        const google::protobuf::Descriptor* descriptor =
+          v1::master::Response::descriptor();
+
+        int field;
+
+        field = v1::master::Response::kTypeFieldNumber;
+        writer->field(
+            descriptor->FindFieldByNumber(field)->name(),
+            v1::master::Response::Type_Name(
+                v1::master::Response::GET_EXECUTORS));
+
+        field = v1::master::Response::kGetExecutorsFieldNumber;
+        writer->field(
+            descriptor->FindFieldByNumber(field)->name(),
+            jsonifyGetExecutors(approvers));
+      });
+
+      // TODO(bmahler): Pass jsonp query parameter through here.
+      return OK(std::move(body), stringify(outputContentType));
+    }
+
+    default:
+      return NotAcceptable("Request must accept json or protobuf");
+  }
+}
+
+
+function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetTasks(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Jsonify the following message:
+  //
+  //  master::Response::GetTasks getTasks;
+  //  for each pending task:
+  //    *getTasks.add_pending_tasks() =
+  //      protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+  //  for each task:
+  //    *getTasks.add_tasks() = *task;
+  //  for each unreachable task:
+  //    *getTasks.add_unreachable_tasks() = *task;
+  //  for each completed task:
+  //    *getTasks.add_completed_tasks() = *task;
+
+  // TODO(bmahler): This copies the Owned object approvers.
+  return [=](JSON::ObjectWriter* writer) {
+    // Construct framework list with both active and completed frameworks.
+    vector<const Framework*> frameworks;
+    foreachvalue (Framework* framework, master->frameworks.registered) {
+      // Skip unauthorized frameworks.
+      if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+        frameworks.push_back(framework);
+      }
+    }
+    foreachvalue (const Owned<Framework>& framework,
+                  master->frameworks.completed) {
+      // Skip unauthorized frameworks.
+      if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+        frameworks.push_back(framework.get());
+      }
+    }
+
+    const google::protobuf::Descriptor* descriptor =
+      v1::master::Response::GetTasks::descriptor();
+
+    int field;
+
+    // Pending tasks.
+    field = v1::master::Response::GetTasks::kPendingTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreach (const Framework* framework, frameworks) {
+            foreachvalue (const TaskInfo& t, framework->pendingTasks) {
+              // Skip unauthorized tasks.
+              if (!approvers->approved<VIEW_TASK>(t, framework->info)) {
+                continue;
+              }
+
+              Task task =
+                  protobuf::createTask(t, TASK_STAGING, framework->id());
+
+              writer->element(asV1Protobuf(task));
+            }
+          }
+        });
+
+    // Active tasks.
+    field = v1::master::Response::GetTasks::kTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreach (const Framework* framework, frameworks) {
+            foreachvalue (Task* task, framework->tasks) {
+              CHECK_NOTNULL(task);
+              // Skip unauthorized tasks.
+              if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+                continue;
+              }
+
+              writer->element(asV1Protobuf(*task));
+            }
+          }
+        });
+
+    // Unreachable tasks.
+    field = v1::master::Response::GetTasks::kUnreachableTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreach (const Framework* framework, frameworks) {
+            foreachvalue (const Owned<Task>& task,
+                          framework->unreachableTasks) {
+              // Skip unauthorized tasks.
+              if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+                continue;
+              }
+
+              writer->element(asV1Protobuf(*task));
+            }
+          }
+        });
+
+    // Completed tasks.
+    field = v1::master::Response::GetTasks::kCompletedTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreach (const Framework* framework, frameworks) {
+            foreach (const Owned<Task>& task, framework->completedTasks) {
+              // Skip unauthorized tasks.
+              if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+                continue;
+              }
+
+              writer->element(asV1Protobuf(*task));
+            }
+          }
+        });
+  };
+}
+
+
+string Master::ReadOnlyHandler::serializeGetTasks(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Construct framework list with both active and completed frameworks.
+  vector<const Framework*> frameworks;
+  foreachvalue (Framework* framework, master->frameworks.registered) {
+    // Skip unauthorized frameworks.
+    if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+      frameworks.push_back(framework);
+    }
+  }
+  foreachvalue (const Owned<Framework>& framework,
+                master->frameworks.completed) {
+    // Skip unauthorized frameworks.
+    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+      frameworks.push_back(framework.get());
+    }
+  }
+
+  // Serialize the following message:
+  //
+  //  mesos::master::Response::GetTasks getTasks;
+  //  for each pending task:
+  //    *getTasks.add_pending_tasks() =
+  //      protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+  //  for each task:
+  //    *getTasks.add_tasks() = *task;
+  //  for each unreachable task:
+  //    *getTasks.add_unreachable_tasks() = *task;
+  //  for each completed task:
+  //    *getTasks.add_completed_tasks() = *task;
+
+  string output;
+  google::protobuf::io::StringOutputStream stream(&output);
+  google::protobuf::io::CodedOutputStream writer(&stream);
+
+  foreach (const Framework* framework, frameworks) {
+    // Pending tasks.
+    foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
+      // Skip unauthorized tasks.
+      if (!approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
+        continue;
+      }
+
+      // TODO(bmahler): Consider not constructing the temporary task
+      // object and instead serialize directly. Since we don't expect
+      // a large number of pending tasks, we currently don't bother
+      // with the more efficient approach.
+      //
+      // *getTasks.add_pending_tasks() =
+      //   protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+      WireFormatLite2::WriteMessageWithoutCachedSizes(
+          mesos::v1::master::Response::GetTasks::kPendingTasksFieldNumber,
+          protobuf::createTask(taskInfo, TASK_STAGING, framework->id()),
+          &writer);
+    }
+
+    // Active tasks.
+    foreachvalue (Task* task, framework->tasks) {
+      CHECK_NOTNULL(task);
+      // Skip unauthorized tasks.
+      if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+        continue;
+      }
+
+      WireFormatLite2::WriteMessageWithoutCachedSizes(
+          mesos::v1::master::Response::GetTasks::kTasksFieldNumber,
+          *task,
+          &writer);
+    }
+
+    // Unreachable tasks.
+    foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
+      // Skip unauthorized tasks.
+      if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+        continue;
+      }
+
+      WireFormatLite2::WriteMessageWithoutCachedSizes(
+          mesos::v1::master::Response::GetTasks::kUnreachableTasksFieldNumber,
+          *task,
+          &writer);
+    }
+
+    // Completed tasks.
+    foreach (const Owned<Task>& task, framework->completedTasks) {
+      // Skip unauthorized tasks.
+      if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+        continue;
+      }
+
+      WireFormatLite2::WriteMessageWithoutCachedSizes(
+          mesos::v1::master::Response::GetTasks::kCompletedTasksFieldNumber,
+          *task,
+          &writer);
+    }
+  }
+
+  // While an explicit Trim() isn't necessary (since the coded
+  // output stream is destructed before the string is returned),
+  // it's a quite tricky bug to diagnose if Trim() is missed, so
+  // we always do it explicitly to signal the reader about this
+  // subtlety.
+  writer.Trim();
+
+  return output;
+}
+
+
+process::http::Response Master::ReadOnlyHandler::getTasks(
+    ContentType outputContentType,
+    const hashmap<std::string, std::string>& query,
+    const process::Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following message:
+  //
+  //    mesos::master::Response response;
+  //    response.set_type(mesos::master::Response::GET_TASKS);
+  //    *response.mutable_get_tasks() = _getTasks(approvers);
+
+  switch (outputContentType) {
+    case ContentType::PROTOBUF: {
+      string output;
+      google::protobuf::io::StringOutputStream stream(&output);
+      google::protobuf::io::CodedOutputStream writer(&stream);
+
+      WireFormatLite::WriteEnum(
+          mesos::v1::master::Response::kTypeFieldNumber,
+          mesos::v1::master::Response::GET_TASKS,
+          &writer);
+
+      WireFormatLite::WriteBytes(
+          mesos::v1::master::Response::kGetTasksFieldNumber,
+          serializeGetTasks(approvers),
+          &writer);
+
+      // We must manually trim the unused buffer space since
+      // we use the string before the coded output stream is
+      // destructed.
+      writer.Trim();
+
+      return OK(std::move(output), stringify(outputContentType));
+    }
+
+    case ContentType::JSON: {
+      string body = jsonify([&](JSON::ObjectWriter* writer) {
+        const google::protobuf::Descriptor* descriptor =
+          v1::master::Response::descriptor();
+
+        int field;
+
+        field = v1::master::Response::kTypeFieldNumber;
+        writer->field(
+            descriptor->FindFieldByNumber(field)->name(),
+            v1::master::Response::Type_Name(
+                v1::master::Response::GET_TASKS));
+
+        field = v1::master::Response::kGetTasksFieldNumber;
+        writer->field(
+            descriptor->FindFieldByNumber(field)->name(),
+            jsonifyGetTasks(approvers));
+      });
+
+      // TODO(bmahler): Pass jsonp query parameter through here.
+      return OK(std::move(body), stringify(outputContentType));
+    }
+
+    default:
+      return NotAcceptable("Request must accept json or protobuf");
+  }
+}
+
+
+process::http::Response Master::ReadOnlyHandler::getOperations(
+    ContentType outputContentType,
+    const hashmap<std::string, std::string>& query,
+    const process::Owned<ObjectApprovers>& approvers) const
+{
+  // We consider a principal to be authorized to view an operation if it
+  // is authorized to view the resources the operation is performed on.
+  auto approved = [&approvers](const Operation& operation) {
+    Try<Resources> consumedResources =
+      protobuf::getConsumedResources(operation.info());
+
+    if (consumedResources.isError()) {
+      LOG(WARNING)
+        << "Could not approve operation " << operation.uuid()
+        << " since its consumed resources could not be determined:"
+        << consumedResources.error();
+
+      return false;
+    }
+
+    foreach (const Resource& resource, consumedResources.get()) {
+      if (!approvers->approved<VIEW_ROLE>(resource)) {
+        return false;
+      }
+    }
+
+    return true;
+  };
+
+  mesos::master::Response response;
+  response.set_type(mesos::master::Response::GET_OPERATIONS);
+
+  mesos::master::Response::GetOperations* operations =
+    response.mutable_get_operations();
+
+  foreachvalue (const Slave* slave, master->slaves.registered) {
+    foreachvalue (Operation* operation, slave->operations) {
+      if (approved(*operation)) {
+        operations->add_operations()->CopyFrom(*operation);
+      }
+    }
+
+    foreachvalue (
+        const Slave::ResourceProvider resourceProvider,
+        slave->resourceProviders) {
+      foreachvalue (Operation* operation, resourceProvider.operations) {
+        if (approved(*operation)) {
+          operations->add_operations()->CopyFrom(*operation);
+        }
+      }
+    }
+  }
+
+  return OK(
+      serialize(outputContentType, evolve(response)),
+      stringify(outputContentType));
+}
+
+
+process::http::Response Master::ReadOnlyHandler::getRoles(
+    ContentType outputContentType,
+    const hashmap<std::string, std::string>& query,
+    const process::Owned<ObjectApprovers>& approvers) const
+{
+  const vector<string> knownRoles = master->knownRoles();
+
+  mesos::master::Response response;
+  response.set_type(mesos::master::Response::GET_ROLES);
+
+  mesos::master::Response::GetRoles* getRoles =
+    response.mutable_get_roles();
+
+  foreach (const string& name, knownRoles) {
+    if (!approvers->approved<VIEW_ROLE>(name)) {
+      continue;
+    }
+
+    mesos::Role* role = getRoles->add_roles();
+
+    role->set_name(name);
+
+    role->set_weight(master->weights.get(name).getOrElse(DEFAULT_WEIGHT));
+
+    RoleResourceBreakdown resourceBreakdown(master, name);
+
+    ResourceQuantities allocatedAndOffered =
+      resourceBreakdown.allocated() + resourceBreakdown.offered();
+
+    // `resources` will be deprecated in favor of
+    // `offered`, `allocated`, `reserved`, and quota consumption.
+    // As a result, we don't bother trying to expose more
+    // than {cpus, mem, disk, gpus} since we don't know if
+    // anything outside this set is of type SCALAR.
+    foreach (const auto& quantity, allocatedAndOffered) {
+      if (quantity.first == "cpus" || quantity.first == "mem" ||
+          quantity.first == "disk" || quantity.first == "gpus") {
+        Resource* resource = role->add_resources();
+        resource->set_name(quantity.first);
+        resource->set_type(Value::SCALAR);
+        *resource->mutable_scalar() = quantity.second;
+      }
+    }
+
+    Option<Role*> role_ = master->roles.get(name);
+
+    if (role_.isSome()) {
+      foreachkey (const FrameworkID& frameworkId, (*role_)->frameworks) {
+        *role->add_frameworks() = frameworkId;
+      }
+    }
+  }
+
+  return OK(serialize(outputContentType, evolve(response)),
+            stringify(outputContentType));
+}
+
+
+function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetState(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Jsonify the following message:
+  //
+  //   mesos::master::Response::GetState getState;
+  //   *getState.mutable_get_tasks() = _getTasks(approvers);
+  //   *getState.mutable_get_executors() = _getExecutors(approvers);
+  //   *getState.mutable_get_frameworks() = _getFrameworks(approvers);
+  //   *getState.mutable_get_agents() = _getAgents(approvers);
+
+  // TODO(bmahler): This copies the Owned object approvers.
+  return [=](JSON::ObjectWriter* writer) {
+    const google::protobuf::Descriptor* descriptor =
+      v1::master::Response::GetState::descriptor();
+
+    int field;
+
+    field = v1::master::Response::GetState::kGetTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        jsonifyGetTasks(approvers));
+
+    field = v1::master::Response::GetState::kGetExecutorsFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        jsonifyGetExecutors(approvers));
+
+    field = v1::master::Response::GetState::kGetFrameworksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        jsonifyGetFrameworks(approvers));
+
+    field = v1::master::Response::GetState::kGetAgentsFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        jsonifyGetAgents(approvers));
+  };
+}
+
+
+string Master::ReadOnlyHandler::serializeGetState(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following message:
+  //
+  //   mesos::master::Response::GetState getState;
+  //   *getState.mutable_get_tasks() = _getTasks(approvers);
+  //   *getState.mutable_get_executors() = _getExecutors(approvers);
+  //   *getState.mutable_get_frameworks() = _getFrameworks(approvers);
+  //   *getState.mutable_get_agents() = _getAgents(approvers);
+
+  string output;
+  google::protobuf::io::StringOutputStream stream(&output);
+  google::protobuf::io::CodedOutputStream writer(&stream);
+
+  WireFormatLite::WriteBytes(
+      mesos::v1::master::Response::GetState::kGetTasksFieldNumber,
+      serializeGetTasks(approvers),
+      &writer);
+
+  WireFormatLite::WriteBytes(
+      mesos::v1::master::Response::GetState::kGetExecutorsFieldNumber,
+      serializeGetExecutors(approvers),
+      &writer);
+
+  WireFormatLite::WriteBytes(
+      mesos::v1::master::Response::GetState::kGetFrameworksFieldNumber,
+      serializeGetFrameworks(approvers),
+      &writer);
+
+  WireFormatLite::WriteBytes(
+      mesos::v1::master::Response::GetState::kGetAgentsFieldNumber,
+      serializeGetAgents(approvers),
+      &writer);
+
+  // While an explicit Trim() isn't necessary (since the coded
+  // output stream is destructed before the string is returned),
+  // it's a quite tricky bug to diagnose if Trim() is missed, so
+  // we always do it explicitly to signal the reader about this
+  // subtlety.
+  writer.Trim();
+
+  return output;
+}
+
+
+process::http::Response Master::ReadOnlyHandler::getState(
+    ContentType outputContentType,
+    const hashmap<std::string, std::string>& query,
+    const process::Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following message:
+  //
+  //   mesos::master::Response response;
+  //   response.set_type(mesos::master::Response::GET_STATE);
+  //   *response.mutable_get_state() = _getState(approvers);
+
+  switch (outputContentType) {
+    case ContentType::PROTOBUF: {
+      string output;
+      google::protobuf::io::StringOutputStream stream(&output);
+      google::protobuf::io::CodedOutputStream writer(&stream);
+
+      WireFormatLite::WriteEnum(
+          mesos::v1::master::Response::kTypeFieldNumber,
+          mesos::v1::master::Response::GET_STATE,
+          &writer);
+
+      WireFormatLite::WriteBytes(
+          mesos::v1::master::Response::kGetStateFieldNumber,
+          serializeGetState(approvers),
+          &writer);
+
+      // We must manually trim the unused buffer space since
+      // we use the string before the coded output stream is
+      // destructed.
+      writer.Trim();
+
+      return OK(std::move(output), stringify(outputContentType));
+    }
+
+    case ContentType::JSON: {
+      string body = jsonify([&](JSON::ObjectWriter* writer) {
+        const google::protobuf::Descriptor* descriptor =
+          v1::master::Response::descriptor();
+
+        int field;
+
+        field = v1::master::Response::kTypeFieldNumber;
+        writer->field(
+            descriptor->FindFieldByNumber(field)->name(),
+            v1::master::Response::Type_Name(
+                v1::master::Response::GET_STATE));
+
+        field = v1::master::Response::kGetStateFieldNumber;
+        writer->field(
+            descriptor->FindFieldByNumber(field)->name(),
+            jsonifyGetState(approvers));
+      });
+
+      // TODO(bmahler): Pass jsonp query parameter through here.
+      return OK(std::move(body), stringify(outputContentType));
+    }
+
+    default:
+      return NotAcceptable("Request must accept json or protobuf");
+  }
+}
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {
diff --git a/src/tests/master_load_tests.cpp b/src/tests/master_load_tests.cpp
index ac16fe7..6cee248 100644
--- a/src/tests/master_load_tests.cpp
+++ b/src/tests/master_load_tests.cpp
@@ -390,15 +390,20 @@
 
     Response reference;
     if (request.endpoint == "/state") {
-      reference = readOnlyHandler.state(queryParameters, approvers);
+      reference = readOnlyHandler.state(
+          ContentType::JSON, queryParameters, approvers);
     } else if (request.endpoint == "/state-summary") {
-      reference = readOnlyHandler.stateSummary(queryParameters, approvers);
+      reference = readOnlyHandler.stateSummary(
+          ContentType::JSON, queryParameters, approvers);
     } else if (request.endpoint == "/roles") {
-      reference = readOnlyHandler.roles(queryParameters, approvers);
+      reference = readOnlyHandler.roles(
+          ContentType::JSON, queryParameters, approvers);
     } else if (request.endpoint == "/frameworks") {
-      reference = readOnlyHandler.frameworks(queryParameters, approvers);
+      reference = readOnlyHandler.frameworks(
+          ContentType::JSON, queryParameters, approvers);
     } else if (request.endpoint == "/slaves") {
-      reference = readOnlyHandler.slaves(queryParameters, approvers);
+      reference = readOnlyHandler.slaves(
+          ContentType::JSON, queryParameters, approvers);
     } else {
       UNREACHABLE();
     }