[cgroups2] Introduces the MemoryControllerProcess.

Introduces the `MemoryControllerProcess`, the cgroups v2 memory
isolator, which will be used by the `Cgroups2IsolatorProcess`.

Unlike the `MemorySubsystemProcess`, the cgroups v1 memory isolator, we:

- Don't allow limits on swap memory to be set.
- Don't report memory pressure levels (this facility is no longer part of
  the cgroups memory controller's API)

Future work may include:

- Adding support for swap memory, and
- Reporting the (now available) memory pressure stall information

This patch updates the ROOT_MemUsage so it passes on a cgroups v2
machine using the new MemoryControllerProcess.

This closes #581
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 84f423f..963d420 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -355,7 +355,8 @@
     linux/ebpf.cpp
     slave/containerizer/mesos/isolators/cgroups2/controller.cpp
     slave/containerizer/mesos/isolators/cgroups2/controllers/core.cpp
-    slave/containerizer/mesos/isolators/cgroups2/controllers/cpu.cpp)
+    slave/containerizer/mesos/isolators/cgroups2/controllers/cpu.cpp
+    slave/containerizer/mesos/isolators/cgroups2/controllers/memory.cpp)
 
 endif ()
 
diff --git a/src/Makefile.am b/src/Makefile.am
index 3677df5..779b893 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1503,7 +1503,9 @@
   slave/containerizer/mesos/isolators/cgroups2/controllers/core.cpp     \
   slave/containerizer/mesos/isolators/cgroups2/controllers/core.hpp     \
   slave/containerizer/mesos/isolators/cgroups2/controllers/cpu.cpp    \
-  slave/containerizer/mesos/isolators/cgroups2/controllers/cpu.hpp
+  slave/containerizer/mesos/isolators/cgroups2/controllers/cpu.hpp    \
+  slave/containerizer/mesos/isolators/cgroups2/controllers/memory.cpp    \
+  slave/containerizer/mesos/isolators/cgroups2/controllers/memory.hpp
 endif
 
 if ENABLE_SECCOMP_ISOLATOR
diff --git a/src/slave/containerizer/mesos/isolators/cgroups2/cgroups2.cpp b/src/slave/containerizer/mesos/isolators/cgroups2/cgroups2.cpp
index d8ed7f0..6fce8c9 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups2/cgroups2.cpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups2/cgroups2.cpp
@@ -20,6 +20,7 @@
 #include "slave/containerizer/mesos/isolators/cgroups2/cgroups2.hpp"
 #include "slave/containerizer/mesos/isolators/cgroups2/controllers/core.hpp"
 #include "slave/containerizer/mesos/isolators/cgroups2/controllers/cpu.hpp"
+#include "slave/containerizer/mesos/isolators/cgroups2/controllers/memory.hpp"
 
 #include <set>
 #include <string>
@@ -75,7 +76,8 @@
 {
   hashmap<string, Try<Owned<ControllerProcess>>(*)(const Flags&)> creators = {
     {"core", &CoreControllerProcess::create},
-    {"cpu", &CpuControllerProcess::create}
+    {"cpu", &CpuControllerProcess::create},
+    {"mem", &MemoryControllerProcess::create}
   };
 
   hashmap<string, Owned<Controller>> controllers;
diff --git a/src/slave/containerizer/mesos/isolators/cgroups2/constants.hpp b/src/slave/containerizer/mesos/isolators/cgroups2/constants.hpp
index dafc7f9..9498a47 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups2/constants.hpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups2/constants.hpp
@@ -32,8 +32,12 @@
 const Duration CPU_CFS_PERIOD = Milliseconds(100); // Linux default.
 const Duration MIN_CPU_CFS_QUOTA = Milliseconds(1);
 
+// Memory controller constants.
+const Bytes MIN_MEMORY = Megabytes(32);
+
 const std::string CGROUPS_V2_CONTROLLER_CORE_NAME = "core";
 const std::string CGROUPS_V2_CONTROLLER_CPU_NAME = "cpu";
+const std::string CGROUPS_V2_CONTROLLER_MEMORY_NAME = "memory";
 
 } // namespace slave {
 } // namespace internal {
diff --git a/src/slave/containerizer/mesos/isolators/cgroups2/controllers/memory.cpp b/src/slave/containerizer/mesos/isolators/cgroups2/controllers/memory.cpp
new file mode 100644
index 0000000..732b1c6
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/cgroups2/controllers/memory.cpp
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <sstream>
+
+#include <process/defer.hpp>
+#include <process/id.hpp>
+#include <process/pid.hpp>
+
+#include <stout/bytes.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "linux/cgroups2.hpp"
+
+#include "slave/containerizer/mesos/isolators/cgroups2/constants.hpp"
+#include "slave/containerizer/mesos/isolators/cgroups2/controllers/memory.hpp"
+
+using process::Failure;
+using process::Future;
+using process::PID;
+using process::Owned;
+
+using cgroups2::memory::Stats;
+
+using mesos::slave::ContainerConfig;
+using mesos::slave::ContainerLimitation;
+
+using std::ostringstream;
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+Try<Owned<ControllerProcess>> MemoryControllerProcess::create(const Flags& flags)
+{
+  return Owned<ControllerProcess>(new MemoryControllerProcess(flags));
+}
+
+
+MemoryControllerProcess::MemoryControllerProcess(const Flags& _flags)
+  : ProcessBase(process::ID::generate("cgroups-v2-memory-controller")),
+    ControllerProcess(_flags) {}
+
+
+string MemoryControllerProcess::name() const
+{
+  return CGROUPS_V2_CONTROLLER_MEMORY_NAME;
+}
+
+
+Future<Nothing> MemoryControllerProcess::prepare(
+    const ContainerID& containerId,
+    const string& cgroup,
+    const ContainerConfig& containerConfig)
+{
+  if (infos.contains(containerId)) {
+    return Failure("Already prepared");
+  }
+
+  infos.put(containerId, Info());
+
+  return Nothing();
+}
+
+
+Future<Nothing> MemoryControllerProcess::isolate(
+    const ContainerID& containerId,
+    const string& cgroup,
+    pid_t pid)
+{
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  // TODO(dleamy): Implement manual OOM score adjustment, similar to as it done
+  //               in the cgroups v1 isolator.
+
+  return Nothing();
+}
+
+
+Future<Nothing> MemoryControllerProcess::recover(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  if (infos.contains(containerId)) {
+    return Failure("Already recovered");
+  }
+
+  infos.put(containerId, Info());
+  infos[containerId].hardLimitUpdated = true;
+
+  return Nothing();
+}
+
+
+Future<Nothing> MemoryControllerProcess::update(
+  const ContainerID& containerId,
+  const string& cgroup,
+  const Resources& resourceRequests,
+  const google::protobuf::Map<string, Value::Scalar>& resourceLimits)
+{
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  if (resourceRequests.mem().isNone()) {
+    return Failure("No memory resources requested");
+  }
+
+  Bytes memory = *resourceRequests.mem();
+  Bytes softLimit = std::max(memory, MIN_MEMORY);
+
+  // Set the soft memory limit.
+  Try<Nothing> low = cgroups2::memory::set_low(cgroup, softLimit);
+  if (low.isError()) {
+    return Failure("Failed to set soft memory limit: " + low.error());
+  }
+
+  LOG(INFO) << "Updated soft memory limit to " << softLimit << " for container "
+            << containerId;
+
+  // Determine the new hard memory limit.
+  Option<Bytes> newHardLimit = [&resourceLimits, &softLimit]() -> Option<Bytes>
+  {
+    if (resourceLimits.count("mem") > 0) {
+      double requestedLimit = resourceLimits.at("mem").value();
+      if (std::isinf(requestedLimit)) {
+        return None();
+      }
+
+      return std::max(
+          Megabytes(static_cast<uint64_t>(requestedLimit)), MIN_MEMORY);
+    }
+
+    return softLimit;
+  }();
+
+  Result<Bytes> currentHardLimit = cgroups2::memory::max(cgroup);
+  if (currentHardLimit.isError()) {
+    return Failure("Failed to get current hard memory limit: "
+                   + currentHardLimit.error());
+  }
+
+  // We only update the hard limit if:
+  // 1) The hard limit has not yet been set for the container, or
+  // 2) The new hard limit is greater than the existing hard limit.
+  //
+  // This is done to avoid the chance of triggering an OOM by reducing the
+  // hard limit to below the current memory usage.
+
+  bool updateHardLimit = !infos[containerId].hardLimitUpdated
+    || newHardLimit.isNone() // infinite memory limit
+    || *newHardLimit > *currentHardLimit;
+
+  if (updateHardLimit) {
+    Try<Nothing> max = cgroups2::memory::set_max(cgroup, newHardLimit);
+    if (max.isError()) {
+      return Failure("Failed to set hard memory limit: " + max.error());
+    }
+
+    infos[containerId].hardLimitUpdated = true;
+  }
+
+  return Nothing();
+}
+
+
+Future<Nothing> MemoryControllerProcess::cleanup(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  if (!infos.contains(containerId)) {
+    LOG(INFO) << "Ignoring memory cleanup for unknown container "
+              << containerId;
+
+    return Nothing();
+  }
+
+  infos.erase(containerId);
+
+  return Nothing();
+}
+
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
diff --git a/src/slave/containerizer/mesos/isolators/cgroups2/controllers/memory.hpp b/src/slave/containerizer/mesos/isolators/cgroups2/controllers/memory.hpp
new file mode 100644
index 0000000..2e60b2c
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/cgroups2/controllers/memory.hpp
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __MEMORY_HPP__
+#define __MEMORY_HPP__
+
+#include <string>
+
+#include <process/future.hpp>
+
+#include <stout/hashmap.hpp>
+
+#include "slave/flags.hpp"
+#include "slave/containerizer/mesos/isolators/cgroups2/controller.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class MemoryControllerProcess : public ControllerProcess
+{
+public:
+  static Try<process::Owned<ControllerProcess>> create(
+      const Flags& flags);
+
+  ~MemoryControllerProcess() override = default;
+
+  std::string name() const override;
+
+  process::Future<Nothing> prepare(
+      const ContainerID& containerId,
+      const std::string& cgroup,
+      const mesos::slave::ContainerConfig& containerConfig) override;
+
+  process::Future<Nothing> isolate(
+      const ContainerID& containerId,
+      const std::string& cgroup,
+      pid_t pid) override;
+
+  process::Future<Nothing> recover(
+      const ContainerID& containerId,
+      const std::string& cgroup) override;
+
+  process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const std::string& cgroup,
+      const Resources& resourceRequests,
+      const google::protobuf::Map<
+          std::string, Value::Scalar>& resourceLimits = {}) override;
+
+  process::Future<Nothing> cleanup(
+      const ContainerID& containerId,
+      const std::string& cgroup) override;
+
+private:
+  struct Info
+  {
+    // Check if the hard memory limit has been updated for the container.
+    // Also true if the container was recovered.
+    bool hardLimitUpdated = false;
+  };
+
+  MemoryControllerProcess(const Flags& flags);
+
+  hashmap<ContainerID, Info> infos;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MEMORY_HPP__