// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <mutex>
#include <string>
#include <vector>

#include <mesos/hook.hpp>

#include <mesos/module/hook.hpp>

#include <process/collect.hpp>
#include <process/future.hpp>

#include <stout/check.hpp>
#include <stout/foreach.hpp>
#include <stout/linkedhashmap.hpp>
#include <stout/nothing.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>

#include "hook/manager.hpp"
#include "module/manager.hpp"

using std::map;
using std::string;
using std::vector;

using process::collect;
using process::Future;

using mesos::modules::ModuleManager;

namespace mesos {
namespace internal {

static std::mutex mutex;
static LinkedHashMap<string, Hook*> availableHooks;


Try<Nothing> HookManager::initialize(const string& hookList)
{
  synchronized (mutex) {
    const vector<string> hooks = strings::split(hookList, ",");
    foreach (const string& hook, hooks) {
      if (availableHooks.contains(hook)) {
        return Error("Hook module '" + hook + "' already loaded");
      }

      if (!ModuleManager::contains<Hook>(hook)) {
        return Error("No hook module named '" + hook + "' available");
      }

      // Let's create an instance of the hook module.
      Try<Hook*> module = ModuleManager::create<Hook>(hook);
      if (module.isError()) {
        return Error(
            "Failed to instantiate hook module '" + hook + "': " +
            module.error());
      }

      // Add the hook module to the list of available hooks.
      availableHooks[hook] = module.get();
    }
  }

  return Nothing();
}


Try<Nothing> HookManager::unload(const string& hookName)
{
  synchronized (mutex) {
    if (!availableHooks.contains(hookName)) {
      return Error(
          "Error unloading hook module '" + hookName + "': module not loaded");
    }

    // Now remove the hook from the list of available hooks.
    availableHooks.erase(hookName);
  }

  return Nothing();
}


bool HookManager::hooksAvailable()
{
  synchronized (mutex) {
    return !availableHooks.empty();
  }
}


Labels HookManager::masterLaunchTaskLabelDecorator(
    const TaskInfo& taskInfo,
    const FrameworkInfo& frameworkInfo,
    const SlaveInfo& slaveInfo)
{
  // We need a mutable copy of the task info and set the new
  // labels after each hook invocation. Otherwise, the last hook
  // will be the only effective hook setting the labels.
  TaskInfo taskInfo_ = taskInfo;

  synchronized (mutex) {
    foreachpair (const string& name, Hook* hook, availableHooks) {
      Result<Labels> result =
        hook->masterLaunchTaskLabelDecorator(
            taskInfo_,
            frameworkInfo,
            slaveInfo);

      // NOTE: If the hook returns None(), the task labels won't be
      // changed.
      if (result.isSome()) {
        *taskInfo_.mutable_labels() = std::move(result.get());
      } else if (result.isError()) {
        LOG(WARNING) << "Master label decorator hook failed for module '"
                    << name << "': " << result.error();
      }
    }
  }

  return std::move(*taskInfo_.mutable_labels());
}


Resources HookManager::masterLaunchTaskResourceDecorator(
    const TaskInfo& taskInfo,
    const Resources& slaveResources)
{
  TaskInfo taskInfo_ = taskInfo;

  synchronized (mutex) {
    foreachpair (const string& name, Hook* hook, availableHooks) {
      Result<Resources> result =
        hook->masterLaunchTaskResourceDecorator(
          taskInfo_,
          slaveResources);

      if (result.isSome()) {
        *taskInfo_.mutable_resources() = std::move(result.get());
      } else if (result.isError()) {
        LOG(WARNING) << "Master resource decorator hook failed for module '"
                     << name << "': " << result.error();
      }
    }
  }
  return std::move(*taskInfo_.mutable_resources());
}


void HookManager::masterSlaveLostHook(const SlaveInfo& slaveInfo)
{
  foreachpair (const string& name, Hook* hook, availableHooks) {
    Try<Nothing> result = hook->masterSlaveLostHook(slaveInfo);
    if (result.isError()) {
      LOG(WARNING) << "Master agent-lost hook failed for module '"
                   << name << "': " << result.error();
    }
  }
}


Labels HookManager::slaveRunTaskLabelDecorator(
    const TaskInfo& taskInfo,
    const ExecutorInfo& executorInfo,
    const FrameworkInfo& frameworkInfo,
    const SlaveInfo& slaveInfo)
{
  synchronized (mutex) {
    TaskInfo taskInfo_ = taskInfo;

    foreachpair (const string& name, Hook* hook, availableHooks) {
      const Result<Labels> result = hook->slaveRunTaskLabelDecorator(
          taskInfo_, executorInfo, frameworkInfo, slaveInfo);

      // NOTE: If the hook returns None(), the task labels won't be
      // changed.
      if (result.isSome()) {
        taskInfo_.mutable_labels()->CopyFrom(result.get());
      } else if (result.isError()) {
        LOG(WARNING) << "Agent label decorator hook failed for module '"
                    << name << "': " << result.error();
      }
    }

    return taskInfo_.labels();
  }
}


Environment HookManager::slaveExecutorEnvironmentDecorator(
    ExecutorInfo executorInfo)
{
  synchronized (mutex) {
    foreachpair (const string& name, Hook* hook, availableHooks) {
      const Result<Environment> result =
        hook->slaveExecutorEnvironmentDecorator(executorInfo);

      // NOTE: If the hook returns None(), the environment won't be
      // changed.
      if (result.isSome()) {
        executorInfo.mutable_command()->mutable_environment()->CopyFrom(
            result.get());
      } else if (result.isError()) {
        LOG(WARNING) << "Agent environment decorator hook failed for module '"
                    << name << "': " << result.error();
      }
    }

    return executorInfo.command().environment();
  }
}


Future<DockerTaskExecutorPrepareInfo>
  HookManager::slavePreLaunchDockerTaskExecutorDecorator(
      const Option<TaskInfo>& taskInfo,
      const ExecutorInfo& executorInfo,
      const string& containerName,
      const string& containerWorkDirectory,
      const string& mappedSandboxDirectory,
      const Option<map<string, string>>& env)
{
  // We execute these hooks according to their ordering so any conflicting
  // `DockerTaskExecutorPrepareInfo` can be deterministically resolved
  // (the last hook takes priority).
  vector<Future<Option<DockerTaskExecutorPrepareInfo>>> futures;
  futures.reserve(availableHooks.size());

  foreachvalue (Hook* hook, availableHooks) {
    // Chain together each hook.
    futures.push_back(
        hook->slavePreLaunchDockerTaskExecutorDecorator(
            taskInfo,
            executorInfo,
            containerName,
            containerWorkDirectory,
            mappedSandboxDirectory,
            env));
  }

  return collect(futures)
    .then([](const vector<Option<DockerTaskExecutorPrepareInfo>>& results)
        -> Future<DockerTaskExecutorPrepareInfo> {
      DockerTaskExecutorPrepareInfo taskExecutorDecoratorInfo;

      foreach (const Option<DockerTaskExecutorPrepareInfo>& result, results) {
        if (result.isSome()) {
          taskExecutorDecoratorInfo.MergeFrom(result.get());
        }
      }

      return taskExecutorDecoratorInfo;
    });
}


void HookManager::slavePostFetchHook(
    const ContainerID& containerId,
    const string& directory)
{
  foreachpair (const string& name, Hook* hook, availableHooks) {
    Try<Nothing> result = hook->slavePostFetchHook(containerId, directory);
    if (result.isError()) {
      LOG(WARNING) << "Agent post fetch hook failed for module "
                   << "'" << name << "': " << result.error();
    }
  }
}


void HookManager::slaveRemoveExecutorHook(
    const FrameworkInfo& frameworkInfo,
    const ExecutorInfo& executorInfo)
{
  foreachpair (const string& name, Hook* hook, availableHooks) {
    const Try<Nothing> result =
      hook->slaveRemoveExecutorHook(frameworkInfo, executorInfo);
    if (result.isError()) {
      LOG(WARNING) << "Agent remove executor hook failed for module '"
                   << name << "': " << result.error();
    }
  }
}


TaskStatus HookManager::slaveTaskStatusDecorator(
    const FrameworkID& frameworkId,
    TaskStatus status)
{
  synchronized (mutex) {
    foreachpair (const string& name, Hook* hook, availableHooks) {
      const Result<TaskStatus> result =
        hook->slaveTaskStatusDecorator(frameworkId, status);

      // NOTE: Labels/ContainerStatus remain unchanged if the hook returns
      // None().
      if (result.isSome()) {
        if (result->has_labels()) {
          status.mutable_labels()->CopyFrom(result->labels());
        }

        if (result->has_container_status()) {
          status.mutable_container_status()->CopyFrom(
              result->container_status());
        }
      } else if (result.isError()) {
        LOG(WARNING) << "Agent TaskStatus decorator hook failed for "
                     << "module '" << name << "': " << result.error();
      }
    }

    return status;
  }
}


Resources HookManager::slaveResourcesDecorator(
    const SlaveInfo& slaveInfo)
{
  // We need a mutable copy of the Resources object. Each hook will
  // see the changes made by previous hooks, so the order of execution
  // matters. Hooks are executed in the order they are specified by
  // the user (note that `availableHooks` is a LinkedHashMap).
  SlaveInfo slaveInfo_ = slaveInfo;

  synchronized (mutex) {
    foreachpair (const string& name, Hook* hook, availableHooks) {
      const Result<Resources> result =
        hook->slaveResourcesDecorator(slaveInfo_);

      // NOTE: Resources remain unchanged if the hook returns None().
      if (result.isSome()) {
        slaveInfo_.mutable_resources()->CopyFrom(result.get());
      } else if (result.isError()) {
        LOG(WARNING) << "Agent Resources decorator hook failed for "
                     << "module '" << name << "': " << result.error();
      }
    }

    return slaveInfo_.resources();
  }
}


Attributes HookManager::slaveAttributesDecorator(
    const SlaveInfo& slaveInfo)
{
  // We need a mutable copy of the Attributes object. Each hook will
  // see the changes made by previous hooks, so the order of execution
  // matters. Hooks are executed in the order they are specified by
  // the user (note that `availableHooks` is a LinkedHashMap).
  SlaveInfo slaveInfo_ = slaveInfo;

  synchronized (mutex) {
    foreachpair (const string& name, Hook* hook, availableHooks) {
      const Result<Attributes> result =
        hook->slaveAttributesDecorator(slaveInfo_);

      // NOTE: Attributes remain unchanged if the hook returns None().
      if (result.isSome()) {
        slaveInfo_.mutable_attributes()->CopyFrom(result.get());
      } else if (result.isError()) {
        LOG(WARNING) << "Agent Attributes decorator hook failed for "
                     << "module '" << name << "': " << result.error();
      }
    }

    return slaveInfo_.attributes();
  }
}

} // namespace internal {
} // namespace mesos {
