// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <memory>
#include <string>
#include <vector>

#include <mesos/mesos.hpp>

#include <mesos/authorizer/authorizer.hpp>

#ifndef __WINDOWS__
#include <mesos/log/log.hpp>
#endif // __WINDOWS__

#include <mesos/allocator/allocator.hpp>

#include <mesos/slave/resource_estimator.hpp>

#include <mesos/state/in_memory.hpp>
#include <mesos/state/log.hpp>
#include <mesos/state/protobuf.hpp>
#include <mesos/state/storage.hpp>

#include <mesos/zookeeper/url.hpp>

#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/http.hpp>
#include <process/id.hpp>
#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>

#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/gtest.hpp>
#include <stout/none.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/path.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>

#include <stout/os/permissions.hpp>

#ifdef __linux__
#include "linux/cgroups.hpp"
#endif

#ifdef USE_SSL_SOCKET
#include "authentication/executor/jwt_secret_generator.hpp"
#endif // USE_SSL_SOCKET

#include "authorizer/local/authorizer.hpp"

#include "common/authorization.hpp"
#include "common/http.hpp"

#include "files/files.hpp"

#include "master/constants.hpp"
#include "master/flags.hpp"
#include "master/master.hpp"
#include "master/registrar.hpp"

#include "master/allocator/mesos/hierarchical.hpp"

#include "master/contender/standalone.hpp"
#include "master/contender/zookeeper.hpp"

#include "master/detector/standalone.hpp"
#include "master/detector/zookeeper.hpp"

#include "slave/flags.hpp"
#include "slave/gc.hpp"
#include "slave/slave.hpp"
#include "slave/task_status_update_manager.hpp"

#include "slave/containerizer/composing.hpp"
#include "slave/containerizer/containerizer.hpp"
#include "slave/containerizer/fetcher.hpp"

#include "slave/volume_gid_manager/volume_gid_manager.hpp"

#include "tests/cluster.hpp"
#include "tests/mock_registrar.hpp"

using std::string;
using std::vector;

#ifdef USE_SSL_SOCKET
using mesos::authentication::executor::JWTSecretGenerator;
#endif // USE_SSL_SOCKET

using mesos::master::contender::StandaloneMasterContender;
using mesos::master::contender::ZooKeeperMasterContender;

using mesos::master::detector::MasterDetector;
using mesos::master::detector::StandaloneMasterDetector;
using mesos::master::detector::ZooKeeperMasterDetector;

using mesos::slave::ContainerTermination;

namespace mesos {
namespace internal {
namespace tests {
namespace cluster {

Try<process::Owned<Master>> Master::start(
    const master::Flags& flags,
    const Option<zookeeper::URL>& zookeeperUrl,
    const Option<mesos::allocator::Allocator*>& allocator,
    const Option<Authorizer*>& authorizer,
    const Option<std::shared_ptr<process::RateLimiter>>& slaveRemovalLimiter)
{
  process::Owned<Master> master(new Master());
  master->zookeeperUrl = zookeeperUrl;

  // If the allocator is not provided, create a default one.
  if (allocator.isNone()) {
    Try<mesos::allocator::Allocator*> _allocator =
      master::allocator::HierarchicalDRFAllocator::create();

    if (_allocator.isError()) {
      return Error(
          "Failed to create an instance of HierarchicalDRFAllocator: " +
          _allocator.error());
    }

    master->allocator.reset(_allocator.get());
  }

  // If the authorizer is not provided, create a default one.
  if (authorizer.isNone()) {
    // Indicates whether or not the caller explicitly specified the
    // authorization configuration for this master.
    bool authorizationSpecified = true;

    vector<string> authorizerNames = strings::split(flags.authorizers, ",");

    if (authorizerNames.empty()) {
      return Error("No authorizer specified");
    }

    if (authorizerNames.size() > 1) {
      return Error("Multiple authorizers not supported");
    }

    string authorizerName = authorizerNames[0];

    Result<Authorizer*> authorizer((None()));
    if (authorizerName != master::DEFAULT_AUTHORIZER) {
      LOG(INFO) << "Creating '" << authorizerName << "' authorizer";

      authorizer = Authorizer::create(authorizerName);
    } else {
      // `authorizerName` is `DEFAULT_AUTHORIZER` at this point.
      if (flags.acls.isSome()) {
        LOG(INFO) << "Creating default '" << authorizerName << "' authorizer";

        authorizer = Authorizer::create(flags.acls.get());
        CHECK_SOME(authorizer);
      } else {
        authorizationSpecified = false;
      }
    }

    if (authorizer.isError()) {
      return Error(
          "Could not create '" + authorizerName + "' authorizer: " +
          authorizer.error());
    } else if (authorizer.isSome()) {
      master->authorizer.reset(authorizer.get());

      if (authorizationSpecified) {
        // Authorization config was explicitly provided, so set authorization
        // callbacks in libprocess.
        master->setAuthorizationCallbacks(authorizer.get());
      }
    }
  } else {
    // An authorizer was provided, so set authorization callbacks in libprocess.
    master->setAuthorizationCallbacks(authorizer.get());
  }

  // Create the appropriate master contender/detector.
  if (zookeeperUrl.isSome()) {
    master->contender.reset(new ZooKeeperMasterContender(zookeeperUrl.get()));
    master->detector.reset(new ZooKeeperMasterDetector(zookeeperUrl.get()));
  } else {
    master->contender.reset(new StandaloneMasterContender());
    master->detector.reset(new StandaloneMasterDetector());
  }

  // Check for some invalid flag combinations.
  if (flags.registry_strict) {
    return Error("Support for '--registry_strict' has been removed");
  }

  if (flags.registry == "replicated_log" && flags.work_dir.isNone()) {
    return Error(
        "Need to specify --work_dir for replicated log based registry");
  }

  if (flags.registry == "replicated_log" &&
      zookeeperUrl.isSome() &&
      flags.quorum.isNone()) {
    return Error(
        "Need to specify --quorum for replicated log based registry"
        " when using ZooKeeper");
  }

  // Create the replicated-log-based registry, if specified in the flags.
  if (flags.registry == "replicated_log") {
#ifndef __WINDOWS__
    if (zookeeperUrl.isSome()) {
      // Use ZooKeeper-based replicated log.
      master->log.reset(new mesos::log::Log(
          flags.quorum.get(),
          path::join(flags.work_dir.get(), "replicated_log"),
          zookeeperUrl->servers,
          flags.zk_session_timeout,
          path::join(zookeeperUrl->path, "log_replicas"),
          zookeeperUrl->authentication,
          flags.log_auto_initialize));
    } else {
      master->log.reset(new mesos::log::Log(
          1,
          path::join(flags.work_dir.get(), "replicated_log"),
          std::set<process::UPID>(),
          flags.log_auto_initialize));
    }
#else
    return Error("Windows does not support replicated log");
#endif // __WINDOWS__
  }

  // Create the registry's storage backend.
  if (flags.registry == "in_memory") {
    master->storage.reset(new mesos::state::InMemoryStorage());
  } else if (flags.registry == "replicated_log") {
#ifndef __WINDOWS__
    master->storage.reset(new mesos::state::LogStorage(master->log.get()));
#else
    return Error("Windows does not support replicated log");
#endif // __WINDOWS__

  } else {
    return Error(
        "Unsupported option for registry persistence: " + flags.registry);
  }

  // Instantiate some other master dependencies.
  master->state.reset(new mesos::state::State(master->storage.get()));
  master->registrar.reset(new MockRegistrar(
      flags, master->state.get(), master::READONLY_HTTP_AUTHENTICATION_REALM));

  if (slaveRemovalLimiter.isNone() && flags.agent_removal_rate_limit.isSome()) {
    // Parse the flag value.
    // TODO(vinod): Move this parsing logic to flags once we have a
    // 'Rate' abstraction in stout.
    vector<string> tokens =
      strings::tokenize(flags.agent_removal_rate_limit.get(), "/");

    if (tokens.size() != 2) {
      return Error(
          "Invalid agent_removal_rate_limit: " +
          flags.agent_removal_rate_limit.get() +
          ". Format is <Number of agents>/<Duration>");
    }

    Try<int> permits = numify<int>(tokens[0]);
    if (permits.isError()) {
      return Error(
          "Invalid agent_removal_rate_limit: " +
          flags.agent_removal_rate_limit.get() +
          ". Format is <Number of agents>/<Duration>: " + permits.error());
    }

    Try<Duration> duration = Duration::parse(tokens[1]);
    if (duration.isError()) {
      return Error(
          "Invalid agent_removal_rate_limit: " +
          flags.agent_removal_rate_limit.get() +
          ". Format is <Number of agents>/<Duration>: " + duration.error());
    }

    master->slaveRemovalLimiter =
      std::make_shared<process::RateLimiter>(permits.get(), duration.get());
  }

  // Inject all the dependencies.
  master->master.reset(new master::Master(
      allocator.getOrElse(master->allocator.get()),
      master->registrar.get(),
      &master->files,
      master->contender.get(),
      master->detector.get(),
      authorizer.getOrElse(master->authorizer.get()),
      slaveRemovalLimiter.isSome() ? slaveRemovalLimiter
                                   : master->slaveRemovalLimiter,
      flags));

  // If we are using the `StandaloneMasterDetector`, appoint the only
  // master as the leading master.
  if (zookeeperUrl.isNone()) {
    StandaloneMasterDetector* _detector = CHECK_NOTNULL(
        dynamic_cast<StandaloneMasterDetector*>(master->detector.get()));

    _detector->appoint(master->master->info());
  }

  process::Future<Nothing> _recover =
    FUTURE_DISPATCH(master->master->self(), &master::Master::_recover);

  master->pid = process::spawn(master->master.get());

  // Speed up the tests by ensuring that the Master is recovered
  // before the test proceeds. Otherwise, authentication and
  // registration messages may be dropped, causing delayed retries.
  // NOTE: We use process::internal::await() to avoid awaiting a
  // Future forever when the Clock is paused.
  if (!process::internal::await(
          _recover,
          flags.registry_fetch_timeout + flags.registry_store_timeout)) {
    return Error("Failed to wait for Master::_recover");
  }

  bool paused = process::Clock::paused();

  // Need to settle the Clock to ensure that the Master finishes
  // executing _recover() before we return.
  process::Clock::pause();
  process::Clock::settle();

  // Return the Clock to its original state.
  if (!paused) {
    process::Clock::resume();
  }

  return master;
}


Master::~Master()
{
  // Remove any libprocess authorization callbacks that were installed.
  if (authorizationCallbacksSet) {
    process::http::authorization::unsetCallbacks();
  }

  // NOTE: Authenticators' lifetimes are tied to libprocess's lifetime.
  // This means that multiple masters in tests are not supported.
  process::http::authentication::unsetAuthenticator(
      master::READONLY_HTTP_AUTHENTICATION_REALM);
  process::http::authentication::unsetAuthenticator(
      master::READWRITE_HTTP_AUTHENTICATION_REALM);

  process::terminate(pid);
  process::wait(pid);
}


process::Owned<MasterDetector> Master::createDetector()
{
  if (zookeeperUrl.isSome()) {
    return process::Owned<MasterDetector>(
        new ZooKeeperMasterDetector(zookeeperUrl.get()));
  }

  return process::Owned<MasterDetector>(new StandaloneMasterDetector(pid));
}


MasterInfo Master::getMasterInfo()
{
  return master->info();
}


void Master::setAuthorizationCallbacks(Authorizer* authorizer)
{
  process::http::authorization::setCallbacks(
      authorization::createAuthorizationCallbacks(authorizer));

  authorizationCallbacksSet = true;
}


Try<process::Owned<Slave>> Slave::create(
    MasterDetector* detector,
    const slave::Flags& flags,
    const Option<string>& id,
    const Option<slave::Containerizer*>& containerizer,
    const Option<slave::GarbageCollector*>& gc,
    const Option<slave::TaskStatusUpdateManager*>& taskStatusUpdateManager,
    const Option<mesos::slave::ResourceEstimator*>& resourceEstimator,
    const Option<mesos::slave::QoSController*>& qosController,
    const Option<mesos::SecretGenerator*>& secretGenerator,
    const Option<Authorizer*>& providedAuthorizer,
    bool mock)
{
  process::Owned<Slave> slave(new Slave());

  // TODO(benh): Create a work directory if using the default.

  slave->flags = flags;
  slave->detector = detector;

  // If the garbage collector is not provided, create a default one.
  if (gc.isNone()) {
    slave->gc.reset(new slave::GarbageCollector(flags.work_dir));
  }

  // If the flag `--volume_gid_range` is specified, create a volume gid manager.
  slave::VolumeGidManager* volumeGidManager = nullptr;

#ifndef __WINDOWS__
  if (flags.volume_gid_range.isSome()) {
    Try<slave::VolumeGidManager*> _volumeGidManager =
      slave::VolumeGidManager::create(flags);

    if (_volumeGidManager.isError()) {
      return Error(
          "Failed to create volume gid manager: " + _volumeGidManager.error());
    }

    volumeGidManager = _volumeGidManager.get();
  }
#endif // __WINDOWS__

  // If the containerizer is not provided, create a default one.
  if (containerizer.isSome()) {
    slave->containerizer = containerizer.get();
  } else {
    // Create a new fetcher.
    slave->fetcher.reset(new slave::Fetcher(flags));

    Try<slave::Containerizer*> _containerizer =
      slave::Containerizer::create(
          flags,
          true,
          slave->fetcher.get(),
          gc.getOrElse(slave->gc.get()),
          nullptr,
          volumeGidManager);

    if (_containerizer.isError()) {
      return Error("Failed to create containerizer: " + _containerizer.error());
    }

    slave->containerizer = _containerizer.get();
  }

  // As composing containerizer doesn't affect behaviour of underlying
  // containerizers, we can always use composing containerizer turned on
  // by default in tests.
  if (!dynamic_cast<slave::ComposingContainerizer*>(slave->containerizer)) {
    Try<slave::ComposingContainerizer*> composing =
      slave::ComposingContainerizer::create({slave->containerizer});

    if (composing.isError()) {
      return Error(
          "Failed to create composing containerizer: " + composing.error());
    }

    slave->containerizer = composing.get();
  }

  if (containerizer.isNone()) {
    slave->ownedContainerizer.reset(slave->containerizer);
  }

  Option<Authorizer*> authorizer = providedAuthorizer;

  // If the authorizer is not provided, create a default one.
  if (providedAuthorizer.isNone()) {
    // Indicates whether or not the caller explicitly specified the
    // authorization configuration for this agent.
    bool authorizationSpecified = true;

    string authorizerName = flags.authorizer;

    Result<Authorizer*> createdAuthorizer((None()));
    if (authorizerName != slave::DEFAULT_AUTHORIZER) {
      LOG(INFO) << "Creating '" << authorizerName << "' authorizer";

      // NOTE: The contents of --acls will be ignored.
      createdAuthorizer = Authorizer::create(authorizerName);
    } else {
      // `authorizerName` is `DEFAULT_AUTHORIZER` at this point.
      if (flags.acls.isSome()) {
        LOG(INFO) << "Creating default '" << authorizerName << "' authorizer";

        createdAuthorizer = Authorizer::create(flags.acls.get());
      } else {
        // Neither a non-default authorizer nor a set of ACLs were specified.
        authorizationSpecified = false;
      }
    }

    if (createdAuthorizer.isError()) {
      EXIT(EXIT_FAILURE) << "Could not create '" << authorizerName
                         << "' authorizer: " << createdAuthorizer.error();
    } else if (createdAuthorizer.isSome()) {
      slave->authorizer.reset(createdAuthorizer.get());
      authorizer = createdAuthorizer.get();

      if (authorizationSpecified) {
        // Authorization config was explicitly provided, so set authorization
        // callbacks in libprocess.
        slave->setAuthorizationCallbacks(authorizer.get());
      }
    }
  } else {
    // An authorizer was provided, so set authorization callbacks in libprocess.
    slave->setAuthorizationCallbacks(providedAuthorizer.get());
  }

  // If the resource estimator is not provided, create a default one.
  if (resourceEstimator.isNone()) {
    Try<mesos::slave::ResourceEstimator*> _resourceEstimator =
      mesos::slave::ResourceEstimator::create(flags.resource_estimator);

    if (_resourceEstimator.isError()) {
      return Error(
          "Failed to create resource estimator: " + _resourceEstimator.error());
    }

    slave->resourceEstimator.reset(_resourceEstimator.get());
  }

  // If the QoS controller is not provided, create a default one.
  if (qosController.isNone()) {
    Try<mesos::slave::QoSController*> _qosController =
      mesos::slave::QoSController::create(flags.qos_controller);

    if (_qosController.isError()) {
      return Error(
          "Failed to create QoS controller: " + _qosController.error());
    }

    slave->qosController.reset(_qosController.get());
  }

  // If the secret generator is not provided, create a default one.
  if (secretGenerator.isNone()) {
    SecretGenerator* _secretGenerator = nullptr;

#ifdef USE_SSL_SOCKET
    if (flags.jwt_secret_key.isSome()) {
      Try<string> jwtSecretKey = os::read(flags.jwt_secret_key.get());
      if (jwtSecretKey.isError()) {
        return Error("Failed to read the file specified by --jwt_secret_key");
      }

      // TODO(greggomann): Factor the following code out into a common helper,
      // since we also do this when loading credentials.
      Try<os::Permissions> permissions =
        os::permissions(flags.jwt_secret_key.get());
      if (permissions.isError()) {
        LOG(WARNING) << "Failed to stat jwt secret key file '"
                     << flags.jwt_secret_key.get()
                     << "': " << permissions.error();
      } else if (permissions->others.rwx) {
        LOG(WARNING) << "Permissions on executor secret key file '"
                     << flags.jwt_secret_key.get()
                     << "' are too open; it is recommended that your"
                     << " key file is NOT accessible by others";
      }

      _secretGenerator = new JWTSecretGenerator(jwtSecretKey.get());
    }
#endif // USE_SSL_SOCKET

    slave->secretGenerator.reset(_secretGenerator);
  }

  // If the task status update manager is not provided, create a default one.
  if (taskStatusUpdateManager.isNone()) {
    slave->taskStatusUpdateManager.reset(
        new slave::TaskStatusUpdateManager(flags));
  }

  // Inject all the dependencies.
  if (mock) {
    slave->slave.reset(new MockSlave(
        id.isSome() ? id.get() : process::ID::generate("slave"),
        flags,
        detector,
        slave->containerizer,
        &slave->files,
        gc.getOrElse(slave->gc.get()),
        taskStatusUpdateManager.getOrElse(slave->taskStatusUpdateManager.get()),
        resourceEstimator.getOrElse(slave->resourceEstimator.get()),
        qosController.getOrElse(slave->qosController.get()),
        secretGenerator.getOrElse(slave->secretGenerator.get()),
        volumeGidManager,
        authorizer));
  } else {
    slave->slave.reset(new slave::Slave(
        id.isSome() ? id.get() : process::ID::generate("slave"),
        flags,
        detector,
        slave->containerizer,
        &slave->files,
        gc.getOrElse(slave->gc.get()),
        taskStatusUpdateManager.getOrElse(slave->taskStatusUpdateManager.get()),
        resourceEstimator.getOrElse(slave->resourceEstimator.get()),
        qosController.getOrElse(slave->qosController.get()),
        secretGenerator.getOrElse(slave->secretGenerator.get()),
        volumeGidManager,
        authorizer));
  }

  return slave;
}


Slave::~Slave()
{
  // Remove any libprocess authorization callbacks that were installed.
  if (authorizationCallbacksSet) {
    process::http::authorization::unsetCallbacks();
  }

  process::http::authentication::unsetAuthenticator(
      slave::READONLY_HTTP_AUTHENTICATION_REALM);
  process::http::authentication::unsetAuthenticator(
      slave::READWRITE_HTTP_AUTHENTICATION_REALM);
  process::http::authentication::unsetAuthenticator(
      slave::EXECUTOR_HTTP_AUTHENTICATION_REALM);
  process::http::authentication::unsetAuthenticator(
      slave::RESOURCE_PROVIDER_HTTP_AUTHENTICATION_REALM);

  // If either `shutdown()` or `terminate()` were called already,
  // skip the below container cleanup logic.  Additionally, we can skip
  // termination, as the shutdown/terminate will do this too.
  if (!cleanUpContainersInDestructor) {
    return;
  }

  // Startup didn't complete so don't try to do the full shutdown.
  if (!containerizer) {
    return;
  }

  // We should wait until agent recovery completes to prevent a potential race
  // between containerizer recovery process and the following code that invokes
  // methods of the containerizer, e.g. a test can start an agent that in turn
  // triggers containerizer recovery of orphaned containers, then immediately
  // destroys the agent. Thus, the containerizer might return a different set of
  // containers, depending on whether containerizer recovery has been finished.
  //
  // NOTE: This wait is omitted if a pointer to a containerizer object was
  // passed to the slave's constructor, as it might be a mock containerizer,
  // thereby agent recovery will never be finished.
  if (ownedContainerizer.get() != nullptr) {
    slave->recoveryInfo.recovered.future().await();
  }

  terminate();

  // This extra closure is necessary in order to use `AWAIT` and `ASSERT_*`,
  // as these macros require a void return type.
  [this]() {
    // Destroy the existing containers on the slave. Note that some
    // containers may terminate while we are doing this, so we ignore
    // any 'wait' failures and ensure that there are no containers
    // when we're done destroying.
    process::Future<hashset<ContainerID>> containers =
      containerizer->containers();

    AWAIT_READY(containers);

    // Because the `cgroups::destroy()` code path makes use of `delay()`, the
    // clock must not be paused in order to reliably destroy all remaining
    // containers. If necessary, we resume the clock here and then pause it
    // again when we're done destroying containers.
    bool paused = process::Clock::paused();

    if (paused) {
      process::Clock::resume();
    }

    foreach (const ContainerID& containerId, containers.get()) {
      process::Future<Option<ContainerTermination>> termination =
        containerizer->destroy(containerId);

      AWAIT(termination);

      if (!termination.isReady()) {
        LOG(ERROR) << "Failed to destroy container " << containerId << ": "
                   << (termination.isFailed() ?
                       termination.failure() :
                       "discarded");
      }
    }

    // When using the composing containerizer, the assertion checking
    // `containers->empty()` below is racy, since the containers are
    // not yet removed from that containerizer's internal data structures
    // when the future becomes ready. Instead, an internal function to clean
    // up these internal data structures is dispatched when the future
    // becomes ready.
    // To work around this, we wait for the clock to settle here. This can
    // be removed once MESOS-9413 is resolved.
    process::Clock::pause();
    process::Clock::settle();

    if (!paused) {
      process::Clock::resume();
    }

    containers = containerizer->containers();
    AWAIT_READY(containers);

    ASSERT_TRUE(containers->empty())
      << "Failed to destroy containers: " << stringify(containers.get());
  }();
}


void Slave::setAuthorizationCallbacks(Authorizer* authorizer)
{
  process::http::authorization::setCallbacks(
      authorization::createAuthorizationCallbacks(authorizer));

  authorizationCallbacksSet = true;
}


void Slave::shutdown()
{
  cleanUpContainersInDestructor = false;

  bool paused = process::Clock::paused();

  if (paused) {
    process::Clock::resume();
  }

  process::dispatch(slave.get(), &slave::Slave::shutdown, process::UPID(), "");
  wait();

  if (paused) {
    process::Clock::pause();
  }
}


void Slave::terminate()
{
  cleanUpContainersInDestructor = false;

  process::terminate(pid);
  wait();
}


MockSlave* Slave::mock()
{
  return dynamic_cast<MockSlave*>(slave.get());
}


void Slave::start()
{
  pid = process::spawn(slave.get());
}


void Slave::wait()
{
  process::wait(pid);
}

} // namespace cluster {
} // namespace tests {
} // namespace internal {
} // namespace mesos {
