// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <process/collect.hpp>
#include <process/id.hpp>

#include <stout/os.hpp>

#include "slave/flags.hpp"
#include "slave/state.hpp"

#include "slave/containerizer/mesos/isolators/docker/volume/isolator.hpp"

namespace paths = mesos::internal::slave::docker::volume::paths;

using std::list;
using std::string;
using std::vector;

using process::Failure;
using process::Future;
using process::Owned;
using process::PID;

using mesos::internal::slave::docker::volume::DriverClient;

using mesos::slave::ContainerConfig;
using mesos::slave::ContainerLaunchInfo;
using mesos::slave::ContainerLimitation;
using mesos::slave::ContainerState;
using mesos::slave::Isolator;

namespace mesos {
namespace internal {
namespace slave {

DockerVolumeIsolatorProcess::DockerVolumeIsolatorProcess(
    const Flags& _flags,
    const string& _rootDir,
    const Owned<DriverClient>& _client)
  : ProcessBase(process::ID::generate("docker-volume-isolator")),
    flags(_flags),
    rootDir(_rootDir),
    client(_client) {}


DockerVolumeIsolatorProcess::~DockerVolumeIsolatorProcess() {}


Try<Isolator*> DockerVolumeIsolatorProcess::create(const Flags& flags)
{
  // Check for root permission.
  if (geteuid() != 0) {
    return Error("The 'docker/volume' isolator requires root permissions");
  }

  // TODO(gyliu513): Check dvdcli version, the version need to be
  // greater than or equal to 0.2.0.
  Option<string> dvdcli = os::which("dvdcli");
  if (dvdcli.isNone()) {
    return Error("The 'docker/volume' isolator cannot get dvdcli command");
  }

  VLOG(1) << "Found 'dvdcli' at '" << dvdcli.get() << "'";

  Try<Owned<DriverClient>> client = DriverClient::create(dvdcli.get());
  if (client.isError()) {
    return Error(
        "Unable to create docker volume driver client: " + client.error());
  }

  Try<Isolator*> isolator =
    DockerVolumeIsolatorProcess::_create(flags, client.get());

  if (isolator.isError()) {
    return Error(isolator.error());
  }

  return isolator.get();
}


Try<Isolator*> DockerVolumeIsolatorProcess::_create(
    const Flags& flags,
    const Owned<DriverClient>& client)
{
  // Create the docker volume information root directory if it does
  // not exist, this directory is used to checkpoint the docker
  // volumes used by containers.
  Try<Nothing> mkdir = os::mkdir(flags.docker_volume_checkpoint_dir);
  if (mkdir.isError()) {
    return Error(
        "Failed to create docker volume information root directory at '" +
        flags.docker_volume_checkpoint_dir + "': " + mkdir.error());
  }

  Result<string> rootDir = os::realpath(flags.docker_volume_checkpoint_dir);
  if (!rootDir.isSome()) {
    return Error(
        "Failed to determine canonical path of docker volume information root "
        "directory at '" + flags.docker_volume_checkpoint_dir + "': " +
        (rootDir.isError() ? rootDir.error() : "No such file or directory"));
  }

  VLOG(1) << "Initialized the docker volume information root directory at '"
          << rootDir.get() << "'";

  Owned<MesosIsolatorProcess> process(
      new DockerVolumeIsolatorProcess(
          flags,
          rootDir.get(),
          client));

  return new MesosIsolator(process);
}


Future<Nothing> DockerVolumeIsolatorProcess::recover(
    const list<ContainerState>& states,
    const hashset<ContainerID>& orphans)
{
  if (!os::exists(rootDir)) {
    VLOG(1) << "The checkpoint directory at '" << rootDir
            << "' does not exist. Skipping recovery.";

    return Nothing();
  }

  foreach (const ContainerState& state, states) {
    const ContainerID& containerId = state.container_id();

    Try<Nothing> recover = _recover(containerId);
    if (recover.isError()) {
      return Failure(
          "Failed to recover docker volumes for container " +
          stringify(containerId) + ": " + recover.error());
    }
  }

  Try<list<string>> entries = os::ls(rootDir);
  if (entries.isError()) {
    return Failure(
        "Unable to list docker volume checkpoint directory '" +
        rootDir + "': " + entries.error());
  }

  foreach (const string& entry, entries.get()) {
    ContainerID containerId;
    containerId.set_value(Path(entry).basename());

    if (infos.contains(containerId)) {
      continue;
    }

    // Recover docker volume information for orphan container.
    Try<Nothing> recover = _recover(containerId);
    if (recover.isError()) {
      return Failure(
          "Failed to recover docker volumes for orphan container " +
          stringify(containerId) + ": " + recover.error());
    }

    // Known orphan containers will be cleaned up by containerizer
    // using the normal cleanup path. See MESOS-2367 for details.
    if (orphans.contains(containerId)) {
      continue;
    }

    LOG(INFO) << "Cleanup volumes for unknown orphaned "
              << "container " << containerId;

    cleanup(containerId);
  }

  return Nothing();
}


Try<Nothing> DockerVolumeIsolatorProcess::_recover(
    const ContainerID& containerId)
{
  // NOTE: This method will add an 'Info' to 'infos' only if the
  // container was launched by the docker volume isolator.

  const string containerDir =
    paths::getContainerDir(rootDir, containerId.value());

  if (!os::exists(containerDir)) {
    // This may occur in the following cases:
    //   1. Executor has exited and the isolator has removed the
    //      container directory in '_cleanup()' but agent dies before
    //      noticing this.
    //   2. Agent dies before the isolator creates the container
    //      directory in 'prepare()'.
    // For the above cases, we do not need to do anything since there
    // is nothing to clean up after agent restarts.
    return Nothing();
  }

  const string volumesPath =
    paths::getVolumesPath(rootDir, containerId.value());

  if (!os::exists(volumesPath)) {
    // This could happen if the slave died after creating the container
    // directory but before it checkpointed anything in it.
    LOG(WARNING) << "The docker volumes checkpointed at '" << volumesPath
                 << "' for container " << containerId << " does not exist";

    // Construct an info object with empty docker volumes since no docker
    // volumes are mounted yet for this container, and this container will
    // be cleaned up by containerizer (as known orphan container) or by
    // `recover` (as unknown orphan container).
    infos.put(containerId, Owned<Info>(new Info(hashset<DockerVolume>())));

    return Nothing();
  }

  Try<string> read = os::read(volumesPath);
  if (read.isError()) {
    return Error(
        "Failed to read docker volumes checkpoint file '" +
        volumesPath + "': " + read.error());
  }

  if (read->empty()) {
    // This could happen if the slave is hard rebooted after the file is
    // created but before the data is synced on disk.
    LOG(WARNING) << "The docker volumes checkpointed at '" << volumesPath
                 << "' for container " << containerId << " is empty";

    // Construct an info object with empty docker volumes since no docker
    // volumes are mounted yet for this container, and this container will
    // be cleaned up by containerizer (as known orphan container) or by
    // `recover` (as unknown orphan container).
    infos.put(containerId, Owned<Info>(new Info(hashset<DockerVolume>())));

    return Nothing();
  }

  Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get());
  if (json.isError()) {
    return Error("JSON parse failed: " + json.error());
  }

  Try<DockerVolumes> parse = ::protobuf::parse<DockerVolumes>(json.get());
  if (parse.isError()) {
    return Error("Protobuf parse failed: " + parse.error());
  }

  hashset<DockerVolume> volumes;

  foreach (const DockerVolume& volume, parse->volumes()) {
    VLOG(1) << "Recovering docker volume with driver '"
            << volume.driver() << "' and name '" << volume.name()
            << "' for container " << containerId;

    if (volumes.contains(volume)) {
      return Error(
          "Duplicate docker volume with driver '" + volume.driver() + "' "
          "and name '" + volume.name() + "'");
    }

    volumes.insert(volume);
  }

  infos.put(containerId, Owned<Info>(new Info(volumes)));

  return Nothing();
}


Future<Option<ContainerLaunchInfo>> DockerVolumeIsolatorProcess::prepare(
    const ContainerID& containerId,
    const ContainerConfig& containerConfig)
{
  const ExecutorInfo& executorInfo = containerConfig.executor_info();

  if (!executorInfo.has_container()) {
    return None();
  }

  if (executorInfo.container().type() != ContainerInfo::MESOS) {
    return Failure(
        "Can only prepare docker volume driver for a MESOS container");
  }

  // The hashset is used to check if there are duplicated docker
  // volume for the same container.
  hashset<DockerVolume> volumes;

  // Represents mounts that will be sent to the driver client.
  struct Mount
  {
    DockerVolume volume;
    hashmap<string, string> options;
  };

  vector<Mount> mounts;

  // The mount points in the container.
  vector<string> targets;

  foreach (const Volume& _volume, executorInfo.container().volumes()) {
    if (!_volume.has_source()) {
      continue;
    }

    if (_volume.source().type() != Volume::Source::DOCKER_VOLUME) {
      VLOG(1) << "Ignored volume type '" << _volume.source().type()
              << "' for container " << containerId << " as only "
              << "'DOCKER_VOLUME' was supported by the docker "
              << "volume isolator";
      continue;
    }

    const string& name = _volume.source().docker_volume().name();

    if (!_volume.source().docker_volume().has_driver()) {
      return Failure("The volume driver is not specified for volume '" +
                     name + "' with container " + stringify(containerId));
    }

    const string& driver = _volume.source().docker_volume().driver();

    DockerVolume volume;
    volume.set_driver(driver);
    volume.set_name(name);

    if (volumes.contains(volume)) {
      return Failure(
          "Found duplicate docker volume with driver '" +
          driver + "' and name '" + name + "'");
    }

    // Determine driver options.
    hashmap<string, string> options;
    if (_volume.source().docker_volume().has_driver_options()) {
      foreach (const Parameter& parameter,
               _volume.source().docker_volume().driver_options().parameter()) {
        options[parameter.key()] = parameter.value();
      }
    }

    // Determine the target of the mount.
    string target;

    // The logic to determine a volume mount target is identical to
    // linux filesystem isolator, because docker volume isolator has
    // a dependency on that isolator, and it assumes that if the
    // container specifies a rootfs the sandbox is already bind
    // mounted into the container.
    if (path::absolute(_volume.container_path())) {
      // To specify a docker volume for a container, operators should
      // be allowed to define the 'container_path' either as an absolute
      // path or a relative path. Please see linux filesystem isolator
      // for detail.
      if (containerConfig.has_rootfs()) {
        target = path::join(
            containerConfig.rootfs(),
            _volume.container_path());

        Try<Nothing> mkdir = os::mkdir(target);
        if (mkdir.isError()) {
          return Failure(
              "Failed to create the target of the mount at '" +
              target + "': " + mkdir.error());
        }
      } else {
        target = _volume.container_path();

        if (!os::exists(target)) {
          return Failure("Absolute container path '" + target + "' "
                         "does not exist");
        }
      }
    } else {
      if (containerConfig.has_rootfs()) {
        target = path::join(containerConfig.rootfs(),
                            flags.sandbox_directory,
                            _volume.container_path());
      } else {
        target = path::join(containerConfig.directory(),
                            _volume.container_path());
      }

      // NOTE: We cannot create the mount point at 'target' if
      // container has rootfs defined. The bind mount of the sandbox
      // will hide what's inside 'target'. So we should always create
      // the mount point in 'directory'.
      string mountPoint = path::join(
          containerConfig.directory(),
          _volume.container_path());

      Try<Nothing> mkdir = os::mkdir(mountPoint);
      if (mkdir.isError()) {
        return Failure(
            "Failed to create the target of the mount at '" +
            mountPoint + "': " + mkdir.error());
      }
    }

    Mount mount;
    mount.volume = volume;
    mount.options = options;

    volumes.insert(volume);
    mounts.push_back(mount);
    targets.push_back(target);
  }

  // It is possible that there is no external volume specified for
  // this container. We avoid checkpointing empty state and creating
  // an empty `Info`.
  if (volumes.empty()) {
    return None();
  }

  // Create the container directory.
  const string containerDir =
    paths::getContainerDir(rootDir, containerId.value());

  Try<Nothing> mkdir = os::mkdir(containerDir);
  if (mkdir.isError()) {
    return Failure(
        "Failed to create the container directory at '" +
        containerDir + "': " + mkdir.error());
  }

  // Create DockerVolumes protobuf message to checkpoint.
  DockerVolumes state;
  foreach (const DockerVolume& volume, volumes) {
    state.add_volumes()->CopyFrom(volume);
  }

  const string volumesPath =
    paths::getVolumesPath(rootDir, containerId.value());

  Try<Nothing> checkpoint = state::checkpoint(
      volumesPath,
      stringify(JSON::protobuf(state)));

  if (checkpoint.isError()) {
    return Failure(
        "Failed to checkpoint docker volumes at '" +
        volumesPath + "': " + checkpoint.error());
  }

  VLOG(1) << "Successfully created checkpoint at '" << volumesPath << "'";

  infos.put(containerId, Owned<Info>(new Info(volumes)));

  // Invoke driver client to create the mount.
  list<Future<string>> futures;
  foreach (const Mount& mount, mounts) {
    futures.push_back(this->mount(
        mount.volume.driver(),
        mount.volume.name(),
        mount.options));
  }

  // NOTE: Wait for all `mount()` to finish before returning to make
  // sure `unmount()` is not called (via 'cleanup()') if some mount on
  // is still pending.
  return await(futures)
    .then(defer(
        PID<DockerVolumeIsolatorProcess>(this),
        &DockerVolumeIsolatorProcess::_prepare,
        containerId,
        targets,
        lambda::_1));
}


Future<Option<ContainerLaunchInfo>> DockerVolumeIsolatorProcess::_prepare(
    const ContainerID& containerId,
    const vector<string>& targets,
    const list<Future<string>>& futures)
{
  ContainerLaunchInfo launchInfo;
  launchInfo.add_clone_namespaces(CLONE_NEWNS);

  vector<string> messages;
  vector<string> sources;
  foreach (const Future<string>& future, futures) {
    if (!future.isReady()) {
      messages.push_back(future.isFailed() ? future.failure() : "discarded");
      continue;
    }

    sources.push_back(strings::trim(future.get()));
  }

  if (!messages.empty()) {
    return Failure(strings::join("\n", messages));
  }

  CHECK_EQ(sources.size(), targets.size());

  for (size_t i = 0; i < sources.size(); i++) {
    const string& source = sources[i];
    const string& target = targets[i];

    LOG(INFO) << "Mounting docker volume mount point '" << source
              << "' to '" << target  << "' for container " << containerId;

    // Launch mount command as a non-shell subprocess to avoid
    // injecting arbitrary shell commands (e.g., user defined
    // 'container_path' in volume can be postfixed with any
    // unsafe arbitrary commands).
    CommandInfo* command = launchInfo.add_pre_exec_commands();
    command->set_shell(false);
    command->set_value("mount");
    command->add_arguments("mount");
    command->add_arguments("-n");
    command->add_arguments("--rbind");
    command->add_arguments(source);
    command->add_arguments(target);
  }

  return launchInfo;
}


Future<Nothing> DockerVolumeIsolatorProcess::cleanup(
    const ContainerID& containerId)
{
  if (!infos.contains(containerId)) {
    VLOG(1) << "Ignoring cleanup request for unknown container " << containerId;

    return Nothing();
  }

  hashmap<DockerVolume, int> references;
  foreachvalue (const Owned<Info>& info, infos) {
    foreach (const DockerVolume& volume, info->volumes) {
      if (!references.contains(volume)) {
        references[volume] = 1;
      } else {
        references[volume]++;
      }
    }
  }

  list<Future<Nothing>> futures;

  foreach (const DockerVolume& volume, infos[containerId]->volumes) {
    if (references.contains(volume) && references[volume] > 1) {
      VLOG(1) << "Cannot unmount the volume with driver '"
              << volume.driver() << "' and name '" << volume.name()
              << "' for container " << containerId
              << " since its reference count is " << references[volume];
      continue;
    }

    LOG(INFO) << "Unmounting the volume with driver '"
              << volume.driver() << "' and name '" << volume.name()
              << "' for container " << containerId;

    // Invoke dvdcli client to unmount the docker volume.
    futures.push_back(this->unmount(volume.driver(), volume.name()));
  }

  // Erase the `Info` struct of this container before unmounting the volumes.
  // This is to ensure the reference count of the volume will not be wrongly
  // increased if unmounting volumes fail, otherwise next time when another
  // container using the same volume is destroyed, we would NOT unmount the
  // volume since its reference count would be larger than 1.
  infos.erase(containerId);

  return await(futures)
    .then(defer(
        PID<DockerVolumeIsolatorProcess>(this),
        &DockerVolumeIsolatorProcess::_cleanup,
        containerId,
        lambda::_1));
}


Future<Nothing> DockerVolumeIsolatorProcess::_cleanup(
    const ContainerID& containerId,
    const list<Future<Nothing>>& futures)
{
  vector<string> messages;
  foreach (const Future<Nothing>& future, futures) {
    if (!future.isReady()) {
      messages.push_back(future.isFailed() ? future.failure() : "discarded");
    }
  }

  if (!messages.empty()) {
    return Failure(strings::join("\n", messages));
  }

  const string containerDir =
    paths::getContainerDir(rootDir, containerId.value());

  Try<Nothing> rmdir = os::rmdir(containerDir);
  if (rmdir.isError()) {
    return Failure(
        "Failed to remove the checkpoint directory at '" +
        containerDir + "': " + rmdir.error());
  }

  LOG(INFO) << "Removed the checkpoint directory at '" << containerDir
            << "' for container " << containerId;

  return Nothing();
}


Future<string> DockerVolumeIsolatorProcess::mount(
    const string& driver,
    const string& name,
    const hashmap<string, string>& options)
{
  DockerVolume volume;
  volume.set_driver(driver);
  volume.set_name(name);

  return sequences[volume].add<string>(
      defer(PID<DockerVolumeIsolatorProcess>(this), [=]() -> Future<string> {
        return _mount(driver, name, options);
      }));
}


Future<string> DockerVolumeIsolatorProcess::_mount(
    const string& driver,
    const string& name,
    const hashmap<string, string>& options)
{
  return client->mount(driver, name, options);
}


Future<Nothing> DockerVolumeIsolatorProcess::unmount(
    const string& driver,
    const string& name)
{
  DockerVolume volume;
  volume.set_driver(driver);
  volume.set_name(name);

  return sequences[volume].add<Nothing>(
      defer(PID<DockerVolumeIsolatorProcess>(this), [=]() -> Future<Nothing> {
        return _unmount(driver, name);
      }));
}


Future<Nothing> DockerVolumeIsolatorProcess::_unmount(
    const string& driver,
    const string& name)
{
  return client->unmount(driver, name);
}

} // namespace slave {
} // namespace internal {
} // namespace mesos {
