// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include <mesos/v1/mesos.hpp>

#include <process/clock.hpp>
#include <process/owned.hpp>

#include <stout/hashmap.hpp>
#include <stout/foreach.hpp>
#include <stout/nothing.hpp>
#include <stout/path.hpp>
#include <stout/try.hpp>

#include <stout/os/getcwd.hpp>

#include "checks/checker.hpp"

#include "slave/containerizer/fetcher.hpp"

#include "tests/flags.hpp"
#include "tests/http_server_test_helper.hpp"
#include "tests/mesos.hpp"
#include "tests/utils.hpp"

using mesos::internal::slave::Fetcher;
using mesos::internal::slave::MesosContainerizer;

using mesos::master::detector::MasterDetector;

using mesos::v1::scheduler::Call;
using mesos::v1::scheduler::Event;
using mesos::v1::scheduler::Mesos;

using process::Future;
using process::Owned;

using std::pair;
using std::string;
using std::vector;

namespace mesos {
namespace internal {
namespace tests {

// This command fails every other invocation. Assuming `path` does not
// initially exist, for all runs i in Nat0, the following case i % 2 applies:
//
// Case 0:
//   - Attempt to remove the nonexistent temporary file.
//   - Create the temporary file.
//   - Exit with a non-zero status.
//
// Case 1:
//   - Remove the temporary file.
//   - Exit with a zero status.
#ifndef __WINDOWS__
#define FLAPPING_CHECK_COMMAND(path)                                    \
  string("rm ") + path + " || (touch " + path + "; exit 1)"
#else
#define FLAPPING_CHECK_COMMAND(path)                                    \
  string("powershell -command ") +                                      \
  "$ri_err = Remove-Item -ErrorAction SilentlyContinue"                 \
    " \"" + path + "\";"                                                \
  "if (-not $?) {"                                                      \
  "  Set-Content -Path (\"" + path + "\") -Value ($null);"              \
  "  exit 1"                                                            \
  "}"
#endif // !__WINDOWS__


// This command stalls each invocation except the first one. Assuming `path`
// does not initially exist, for all runs i in Nat0, the following applies:
//
// Case 0:
//   - Test whether the nonexistent temporary file exists.
//   - Create the temporary file.
//   - Exit with a non-zero status.
//
// Cases 1..n:
//   - Hang for 1000 seconds.
//   - Exit with a zero status.
#ifndef __WINDOWS__
#define STALLING_CHECK_COMMAND(path)                                    \
  string("(ls ") + path + " && " + SLEEP_COMMAND(1000) +                \
  ") || (touch " + path + "; exit 1)"
#else
#define STALLING_CHECK_COMMAND(path)                                    \
  string("powershell -command ") +                                      \
  "if (Test-Path \"" + path + "\") {" +                                 \
     SLEEP_COMMAND(1000) +                                              \
  "} else {"                                                            \
  "  Set-Content -Path (\"" + path + "\") -Value ($null);"              \
  "  exit 1"                                                            \
  "}"
#endif // !__WINDOWS__


// Tests for checks support in built-in executors. Logically the tests
// are elements of the cartesian product `executor-type` x `check-type`
// and are split into groups by `executor-type`:
//   * command executor tests,
//   * docker executor tests,
//   * default executor tests.

class CheckTest : public MesosTest
{
public:
  virtual void acknowledge(
      Mesos* mesos,
      const v1::FrameworkID& frameworkId,
      const v1::TaskStatus& status)
  {
    Call call;
    call.mutable_framework_id()->CopyFrom(frameworkId);
    call.set_type(Call::ACKNOWLEDGE);

    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
    acknowledge->mutable_task_id()->CopyFrom(status.task_id());
    acknowledge->mutable_agent_id()->CopyFrom(status.agent_id());
    acknowledge->set_uuid(status.uuid());

    mesos->send(call);
  }

  virtual void launchTask(
      Mesos* mesos,
      const v1::Offer& offer,
      const v1::TaskInfo& task)
  {
    Call call;
    call.mutable_framework_id()->CopyFrom(offer.framework_id());
    call.set_type(Call::ACCEPT);

    Call::Accept* accept = call.mutable_accept();
    accept->add_offer_ids()->CopyFrom(offer.id());

    v1::Offer::Operation* operation = accept->add_operations();
    operation->set_type(v1::Offer::Operation::LAUNCH);
    operation->mutable_launch()->add_task_infos()->CopyFrom(task);

    mesos->send(call);
  }

  virtual void launchTaskGroup(
      Mesos* mesos,
      const v1::Offer& offer,
      const v1::ExecutorInfo& executor,
      const v1::TaskGroupInfo& taskGroup)
  {
    Call call;
    call.mutable_framework_id()->CopyFrom(offer.framework_id());
    call.set_type(Call::ACCEPT);

    Call::Accept* accept = call.mutable_accept();
    accept->add_offer_ids()->CopyFrom(offer.id());

    v1::Offer::Operation* operation = accept->add_operations();
    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);

    v1::Offer::Operation::LaunchGroup* launchGroup =
      operation->mutable_launch_group();

    launchGroup->mutable_executor()->CopyFrom(executor);
    launchGroup->mutable_task_group()->CopyFrom(taskGroup);

    mesos->send(call);
  }

  virtual void reconcile(
      Mesos* mesos,
      const v1::FrameworkID& frameworkId,
      const vector<pair<const v1::TaskID, const v1::AgentID>>& tasks)
  {
    Call call;
    call.mutable_framework_id()->CopyFrom(frameworkId);
    call.set_type(Call::RECONCILE);

    call.mutable_reconcile();

    foreach (const auto& task, tasks) {
      Call::Reconcile::Task* reconcile =
        call.mutable_reconcile()->add_tasks();
      reconcile->mutable_task_id()->CopyFrom(task.first);
      reconcile->mutable_agent_id()->CopyFrom(task.second);
    }

    mesos->send(call);
  }

  virtual void subscribe(
      Mesos* mesos,
      const v1::FrameworkInfo& framework)
  {
    Call call;
    call.set_type(Call::SUBSCRIBE);
    Call::Subscribe* subscribe = call.mutable_subscribe();
    subscribe->mutable_framework_info()->CopyFrom(framework);

    mesos->send(call);
  }

  virtual void teardown(
      Mesos* mesos,
      const v1::FrameworkID& frameworkId)
  {
    Call call;
    call.set_type(Call::TEARDOWN);
    call.mutable_framework_id()->CopyFrom(frameworkId);

    mesos->send(call);
  }
};


// These are check tests with the command executor.
class CommandExecutorCheckTest : public CheckTest {};

// Verifies that a command check is supported by the command executor,
// its status is delivered in a task status update, and the last known
// status can be obtained during explicit and implicit reconciliation.
// Additionally ensures that the specified environment of the command
// check is honored.
TEST_F_TEMP_DISABLED_ON_WINDOWS(
    CommandExecutorCheckTest,
    CommandCheckDeliveredAndReconciled)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();
  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected));

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<Event::Update> updateTaskRunning;
  Future<Event::Update> updateCheckResult;
  Future<Event::Update> updateExplicitReconciliation;
  Future<Event::Update> updateImplicitReconciliation;

  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillOnce(FutureArg<1>(&updateExplicitReconciliation))
    .WillOnce(FutureArg<1>(&updateImplicitReconciliation))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  v1::Resources resources =
      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::TaskInfo taskInfo =
    v1::createTask(agentId, resources, SLEEP_COMMAND(10000));

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::COMMAND);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);

  v1::CommandInfo* checkCommand =
    checkInfo->mutable_command()->mutable_command();
  checkCommand->set_value("exit $STATUS");

  v1::Environment::Variable* variable =
    checkCommand->mutable_environment()->add_variables();
  variable->set_name("STATUS");
  variable->set_value("1");

  launchTask(&mesos, offer, taskInfo);

  AWAIT_READY(updateTaskRunning);
  const v1::TaskStatus& taskRunning = updateTaskRunning->status();

  ASSERT_EQ(TASK_RUNNING, taskRunning.state());
  EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());
  EXPECT_TRUE(taskRunning.has_check_status());
  EXPECT_TRUE(taskRunning.check_status().has_command());
  EXPECT_FALSE(taskRunning.check_status().command().has_exit_code());

  acknowledge(&mesos, frameworkId, taskRunning);

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
  EXPECT_TRUE(checkResult.has_check_status());
  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
  EXPECT_EQ(1, checkResult.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, checkResult);

  // Trigger explicit reconciliation.
  reconcile(
      &mesos,
      frameworkId,
      {std::make_pair(checkResult.task_id(), checkResult.agent_id())});

  AWAIT_READY(updateExplicitReconciliation);
  const v1::TaskStatus& explicitReconciliation =
    updateExplicitReconciliation->status();

  ASSERT_EQ(TASK_RUNNING, explicitReconciliation.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_RECONCILIATION,
      explicitReconciliation.reason());
  EXPECT_EQ(taskInfo.task_id(), explicitReconciliation.task_id());
  EXPECT_TRUE(explicitReconciliation.has_check_status());
  EXPECT_TRUE(explicitReconciliation.check_status().command().has_exit_code());
  EXPECT_EQ(1, explicitReconciliation.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, explicitReconciliation);

  // Trigger implicit reconciliation.
  reconcile(&mesos, frameworkId, {});

  AWAIT_READY(updateImplicitReconciliation);
  const v1::TaskStatus& implicitReconciliation =
    updateImplicitReconciliation->status();

  ASSERT_EQ(TASK_RUNNING, implicitReconciliation.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_RECONCILIATION,
      implicitReconciliation.reason());
  EXPECT_EQ(taskInfo.task_id(), implicitReconciliation.task_id());
  EXPECT_TRUE(implicitReconciliation.has_check_status());
  EXPECT_TRUE(implicitReconciliation.check_status().command().has_exit_code());
  EXPECT_EQ(1, implicitReconciliation.check_status().command().exit_code());
}


// Verifies that a command check's status changes are delivered.
//
// TODO(alexr): When check mocking is available, ensure that *only*
// status changes are delivered.
TEST_F(CommandExecutorCheckTest, CommandCheckStatusChange)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();
  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected));

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<Event::Update> updateTaskRunning;
  Future<Event::Update> updateCheckResult;
  Future<Event::Update> updateCheckResultChanged;
  Future<Event::Update> updateCheckResultBack;

  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillOnce(FutureArg<1>(&updateCheckResultChanged))
    .WillOnce(FutureArg<1>(&updateCheckResultBack))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  v1::Resources resources =
      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::TaskInfo taskInfo =
      v1::createTask(agentId, resources, SLEEP_COMMAND(10000));

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::COMMAND);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);
  checkInfo->mutable_command()->mutable_command()->set_value(
      FLAPPING_CHECK_COMMAND(path::join(os::getcwd(), "XXXXXX")));

  launchTask(&mesos, offer, taskInfo);

  AWAIT_READY(updateTaskRunning);
  ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
  EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());

  acknowledge(&mesos, frameworkId, updateTaskRunning->status());

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
  EXPECT_EQ(1, checkResult.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, checkResult);

  AWAIT_READY(updateCheckResultChanged);
  const v1::TaskStatus& checkResultChanged = updateCheckResultChanged->status();

  ASSERT_EQ(TASK_RUNNING, checkResultChanged.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResultChanged.reason());
  EXPECT_TRUE(checkResultChanged.check_status().command().has_exit_code());
  EXPECT_EQ(0, checkResultChanged.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, checkResultChanged);

  AWAIT_READY(updateCheckResultBack);
  const v1::TaskStatus& checkResultBack = updateCheckResultBack->status();

  ASSERT_EQ(TASK_RUNNING, checkResultBack.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResultBack.reason());
  EXPECT_TRUE(checkResultBack.check_status().command().has_exit_code());
  EXPECT_EQ(1, checkResultBack.check_status().command().exit_code());
}


// Verifies that an environment variable set for the task is seen by its
// command check.
TEST_F_TEMP_DISABLED_ON_WINDOWS(
    CommandExecutorCheckTest,
    CommandCheckSeesParentsEnv)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();
  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected));

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<Event::Update> updateTaskRunning;
  Future<Event::Update> updateCheckResult;

  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  v1::Resources resources =
      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  const string envKey = "MESOS_CHECK_TASK_ENV";
  const int32_t envValue = 42;

  v1::TaskInfo taskInfo =
    v1::createTask(agentId, resources, SLEEP_COMMAND(10000));

  v1::Environment::Variable* variable =
    taskInfo.mutable_command()->mutable_environment()->add_variables();
  variable->set_name(envKey);
  variable->set_value(stringify(envValue));

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::COMMAND);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);

  v1::CommandInfo* checkCommand =
    checkInfo->mutable_command()->mutable_command();
  checkCommand->set_value("exit $" + envKey);

  launchTask(&mesos, offer, taskInfo);

  AWAIT_READY(updateTaskRunning);
  const v1::TaskStatus& taskRunning = updateTaskRunning->status();

  ASSERT_EQ(TASK_RUNNING, taskRunning.state());
  EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());

  acknowledge(&mesos, frameworkId, taskRunning);

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
  EXPECT_EQ(envValue, checkResult.check_status().command().exit_code());
}


// Verifies that a task and its command check has the same working directory.
TEST_F_TEMP_DISABLED_ON_WINDOWS(
    CommandExecutorCheckTest,
    CommandCheckSharesWorkDirWithTask)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();
  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected));

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<Event::Update> updateTaskRunning;
  Future<Event::Update> updateCheckResult;

  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  v1::Resources resources =
      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  const string filename = "nested_inherits_work_dir";

  v1::TaskInfo taskInfo =
    v1::createTask(agentId, resources, "touch " + filename + " && sleep 10000");

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::COMMAND);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);

  v1::CommandInfo* checkCommand =
    checkInfo->mutable_command()->mutable_command();
  checkCommand->set_value("ls " + filename);

  launchTask(&mesos, offer, taskInfo);

  AWAIT_READY(updateTaskRunning);
  const v1::TaskStatus& taskRunning = updateTaskRunning->status();

  ASSERT_EQ(TASK_RUNNING, taskRunning.state());
  EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());

  acknowledge(&mesos, frameworkId, taskRunning);

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());

  // There is a race between the task creating a file and the command check
  // verifying the file exists, and hence the file might not have been created
  // at the time the first check runs. However, we still expect a successful
  // command check and hence an extra status update.
  if (checkResult.check_status().command().exit_code() != 0)
  {
    // Inject an expectation for the extra status update we expect.
    Future<v1::scheduler::Event::Update> updateCheckResult2;
    EXPECT_CALL(*scheduler, update(_, _))
      .WillOnce(FutureArg<1>(&updateCheckResult2))
      .RetiresOnSaturation();

    // Acknowledge (to be able to get the next update).
    acknowledge(&mesos, frameworkId, checkResult);

    AWAIT_READY(updateCheckResult2);
    const v1::TaskStatus& checkResult2 = updateCheckResult2->status();

    ASSERT_EQ(TASK_RUNNING, checkResult2.state());
    ASSERT_EQ(
        v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
        checkResult2.reason());
    EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
    EXPECT_EQ(0, checkResult2.check_status().command().exit_code());
  }
}


// Verifies that when a command check times out after a successful check,
// an empty check status update is delivered.
TEST_F(CommandExecutorCheckTest, CommandCheckTimeout)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();
  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected));

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<Event::Update> updateTaskRunning;
  Future<Event::Update> updateCheckResult;
  Future<Event::Update> updateCheckResultTimeout;

  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillOnce(FutureArg<1>(&updateCheckResultTimeout))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  v1::Resources resources =
      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::TaskInfo taskInfo =
      v1::createTask(agentId, resources, SLEEP_COMMAND(10000));

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::COMMAND);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);
  checkInfo->set_timeout_seconds(1);
  checkInfo->mutable_command()->mutable_command()->set_value(
      STALLING_CHECK_COMMAND(path::join(os::getcwd(), "XXXXXX")));

  launchTask(&mesos, offer, taskInfo);

  AWAIT_READY(updateTaskRunning);
  ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
  EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());

  acknowledge(&mesos, frameworkId, updateTaskRunning->status());

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
  EXPECT_EQ(1, checkResult.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, checkResult);

  AWAIT_READY(updateCheckResultTimeout);
  const v1::TaskStatus& checkResultTimeout = updateCheckResultTimeout->status();

  ASSERT_EQ(TASK_RUNNING, checkResultTimeout.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResultTimeout.reason());
  EXPECT_FALSE(checkResultTimeout.check_status().command().has_exit_code());
}


// Verifies that when both command check and health check are specified,
// health and check updates include both statuses. Also verifies that
// both statuses are included upon reconciliation.
TEST_F(CommandExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();
  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected));

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<Event::Update> updateTaskRunning;
  Future<Event::Update> updateCheckResult;
  Future<Event::Update> updateHealthResult;
  Future<Event::Update> updateImplicitReconciliation;

  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillOnce(FutureArg<1>(&updateHealthResult))
    .WillOnce(FutureArg<1>(&updateImplicitReconciliation))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  v1::Resources resources =
      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::TaskInfo taskInfo =
      v1::createTask(agentId, resources, SLEEP_COMMAND(10000));

  // Set both check and health check interval to an increased value to
  // prevent a second update coming before reconciliation response.
  int interval = 10;

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::COMMAND);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(interval);
  checkInfo->mutable_command()->mutable_command()->set_value("exit 1");

  // Delay health check for 1s to ensure health update comes after check update.
  //
  // TODO(alexr): This can lead to flakiness on busy agents. A more robust
  // approach could be setting the grace period to MAX_INT, and make the
  // health check pass iff a file created by the check exists. Alternatively,
  // we can relax the expectation that the check update is delivered first.
  v1::HealthCheck* healthCheckInfo = taskInfo.mutable_health_check();
  healthCheckInfo->set_type(v1::HealthCheck::COMMAND);
  healthCheckInfo->set_delay_seconds(1);
  healthCheckInfo->set_interval_seconds(interval);
  healthCheckInfo->mutable_command()->set_value("exit 0");

  launchTask(&mesos, offer, taskInfo);

  AWAIT_READY(updateTaskRunning);
  ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
  EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());

  acknowledge(&mesos, frameworkId, updateTaskRunning->status());

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
  EXPECT_FALSE(checkResult.has_healthy());
  EXPECT_TRUE(checkResult.has_check_status());
  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
  EXPECT_EQ(1, checkResult.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, checkResult);

  AWAIT_READY(updateHealthResult);
  const v1::TaskStatus& healthResult = updateHealthResult->status();

  ASSERT_EQ(TASK_RUNNING, healthResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_HEALTH_CHECK_STATUS_UPDATED,
      healthResult.reason());
  EXPECT_EQ(taskInfo.task_id(), healthResult.task_id());
  EXPECT_TRUE(healthResult.has_healthy());
  EXPECT_TRUE(healthResult.healthy());
  EXPECT_TRUE(healthResult.has_check_status());
  EXPECT_TRUE(healthResult.check_status().command().has_exit_code());
  EXPECT_EQ(1, healthResult.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, healthResult);

  // Trigger implicit reconciliation.
  reconcile(&mesos, frameworkId, {});

  AWAIT_READY(updateImplicitReconciliation);
  const v1::TaskStatus& implicitReconciliation =
    updateImplicitReconciliation->status();

  ASSERT_EQ(TASK_RUNNING, implicitReconciliation.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_RECONCILIATION,
      implicitReconciliation.reason());
  EXPECT_EQ(taskInfo.task_id(), implicitReconciliation.task_id());
  EXPECT_TRUE(implicitReconciliation.has_healthy());
  EXPECT_TRUE(implicitReconciliation.healthy());
  EXPECT_TRUE(implicitReconciliation.has_check_status());
  EXPECT_TRUE(implicitReconciliation.check_status().command().has_exit_code());
  EXPECT_EQ(1, implicitReconciliation.check_status().command().exit_code());
}


// Verifies that an HTTP check is supported by the command executor and
// its status is delivered in a task status update.
//
// TODO(josephw): Enable this. Mesos builds its own `curl.exe`, since it
// can't rely on a package manager to get it. We need to make this test use
// that executable.
TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, HTTPCheckDelivered)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();
  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected));

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<v1::scheduler::Event::Update> updateTaskRunning;
  Future<v1::scheduler::Event::Update> updateCheckResult;
  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  const uint16_t testPort = getFreePort().get();

  // Use `test-helper` to launch a simple HTTP
  // server to respond to HTTP checks.
  const string command = strings::format(
      "%s %s --ip=127.0.0.1 --port=%u",
      getTestHelperPath("test-helper"),
      HttpServerTestHelper::NAME,
      testPort).get();

  v1::Resources resources =
      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::TaskInfo taskInfo = v1::createTask(agentId, resources, command);

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::HTTP);
  checkInfo->mutable_http()->set_port(testPort);
  checkInfo->mutable_http()->set_path("/help");
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);

  launchTask(&mesos, offer, taskInfo);

  AWAIT_READY(updateTaskRunning);
  const v1::TaskStatus& taskRunning = updateTaskRunning->status();

  ASSERT_EQ(TASK_RUNNING, taskRunning.state());
  EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());
  EXPECT_TRUE(taskRunning.has_check_status());
  EXPECT_TRUE(taskRunning.check_status().has_http());
  EXPECT_FALSE(taskRunning.check_status().http().has_status_code());

  acknowledge(&mesos, frameworkId, taskRunning);

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
  EXPECT_TRUE(checkResult.has_check_status());
  EXPECT_TRUE(checkResult.check_status().http().has_status_code());

  // Since it takes some time for the HTTP server to start serving requests,
  // the first several HTTP checks may not return 200. However we still expect
  // a successful HTTP check and hence an extra status update.
  if (checkResult.check_status().http().status_code() != 200)
  {
    // Inject an expectation for the extra status update we expect.
    Future<v1::scheduler::Event::Update> updateCheckResult2;
    EXPECT_CALL(*scheduler, update(_, _))
      .WillOnce(FutureArg<1>(&updateCheckResult2))
      .RetiresOnSaturation();

    // Acknowledge (to be able to get the next update).
    acknowledge(&mesos, frameworkId, checkResult);

    AWAIT_READY(updateCheckResult2);
    const v1::TaskStatus& checkResult2 = updateCheckResult2->status();

    ASSERT_EQ(TASK_RUNNING, checkResult2.state());
    ASSERT_EQ(
        v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
        checkResult2.reason());
    EXPECT_EQ(taskInfo.task_id(), checkResult2.task_id());
    EXPECT_TRUE(checkResult2.has_check_status());
    EXPECT_TRUE(checkResult2.check_status().http().has_status_code());
    EXPECT_EQ(200, checkResult2.check_status().http().status_code());
  }
}


// Verifies that a TCP check is supported by the command executor and
// its status is delivered in a task status update.
//
// TODO(alexr): Check if this test works on Windows.
TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, TCPCheckDelivered)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();
  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected));

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<v1::scheduler::Event::Update> updateTaskRunning;
  Future<v1::scheduler::Event::Update> updateCheckResult;
  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  const uint16_t testPort = getFreePort().get();

  // Use `test-helper` to launch a simple HTTP
  // server to respond to TCP checks.
  const string command = strings::format(
      "%s %s --ip=127.0.0.1 --port=%u",
      getTestHelperPath("test-helper"),
      HttpServerTestHelper::NAME,
      testPort).get();

  v1::Resources resources =
      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::TaskInfo taskInfo = v1::createTask(agentId, resources, command);

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::TCP);
  checkInfo->mutable_tcp()->set_port(testPort);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);

  launchTask(&mesos, offer, taskInfo);

  AWAIT_READY(updateTaskRunning);
  const v1::TaskStatus& taskRunning = updateTaskRunning->status();

  ASSERT_EQ(TASK_RUNNING, taskRunning.state());
  EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());
  EXPECT_TRUE(taskRunning.has_check_status());
  EXPECT_TRUE(taskRunning.check_status().has_tcp());
  EXPECT_FALSE(taskRunning.check_status().tcp().has_succeeded());

  acknowledge(&mesos, frameworkId, taskRunning);

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
  EXPECT_TRUE(checkResult.has_check_status());
  EXPECT_TRUE(checkResult.check_status().tcp().has_succeeded());

  // Since it takes some time for the HTTP server to start serving requests,
  // the first several TCP checks may fail. However we still expect a
  // successful TCP check and hence an extra status update.
  if (checkResult.check_status().tcp().succeeded() == false)
  {
    // Inject an expectation for the extra status update we expect.
    Future<v1::scheduler::Event::Update> updateCheckResult2;
    EXPECT_CALL(*scheduler, update(_, _))
      .WillOnce(FutureArg<1>(&updateCheckResult2))
      .RetiresOnSaturation();

    // Acknowledge (to be able to get the next update).
    acknowledge(&mesos, frameworkId, checkResult);

    AWAIT_READY(updateCheckResult2);
    const v1::TaskStatus& checkResult2 = updateCheckResult2->status();

    ASSERT_EQ(TASK_RUNNING, checkResult2.state());
    ASSERT_EQ(
        v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
        checkResult2.reason());
    EXPECT_EQ(taskInfo.task_id(), checkResult2.task_id());
    EXPECT_TRUE(checkResult2.has_check_status());
    EXPECT_TRUE(checkResult2.check_status().tcp().has_succeeded());
    EXPECT_EQ(true, checkResult2.check_status().tcp().succeeded());
  }
}


// TODO(alexr): Implement following tests for the docker executor once
// the docker executor supports checks.
//
// 1. COMMAND check with env var works, is delivered, and is reconciled
//    properly.
// 2. COMMAND check's status change is delivered. TODO(alexr): When check
//    mocking is available, ensure only status changes are delivered.
// 3. COMMAND check sees env vars set for the command itself.
// 4. COMMAND check shares working directory with the task.
// 5. COMMAND check times out.
// 6. COMMAND check and health check do not shadow each other; upon
//    reconciliation both statuses are available.
// 7. HTTP check works and is delivered.
// 8. TCP check works and is delivered.


// These are check tests with the default executor.
class DefaultExecutorCheckTest : public CheckTest
{
protected:
  slave::Flags CreateSlaveFlags()
  {
    slave::Flags flags = CheckTest::CreateSlaveFlags();

#ifndef USE_SSL_SOCKET
    // Disable operator API authentication for the default executor. Executor
    // authentication currently has SSL as a dependency, so we cannot require
    // executors to authenticate with the agent operator API if Mesos was not
    // built with SSL support.
    flags.authenticate_http_readwrite = false;

    // Set permissive ACLs in the agent so that the local authorizer will be
    // loaded and implicit executor authorization will be tested.
    ACLs acls;
    acls.set_permissive(true);

    flags.acls = acls;
#endif // USE_SSL_SOCKET

    return flags;
  }
};


// Verifies that a command check is supported by the default executor,
// its status is delivered in a task status update, and the last known
// status can be obtained during explicit and implicit reconciliation.
// Additionally ensures that the specified environment of the command
// check is honored.
//
// TODO(gkleiman): Check if this test works on Windows.
TEST_F_TEMP_DISABLED_ON_WINDOWS(
    DefaultExecutorCheckTest,
    CommandCheckDeliveredAndReconciled)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  slave::Flags flags = CreateSlaveFlags();

  Fetcher fetcher(flags);

  // We have to explicitly create a `Containerizer` in non-local mode,
  // because `LaunchNestedContainerSession` (used by command checks)
  // tries to start a IO switchboard, which doesn't work in local mode yet.
  Try<MesosContainerizer*> _containerizer =
    MesosContainerizer::create(flags, false, &fetcher);

  ASSERT_SOME(_containerizer);

  Owned<slave::Containerizer> containerizer(_containerizer.get());
  Owned<MasterDetector> detector = master.get()->createDetector();

  Try<Owned<cluster::Slave>> agent =
    StartSlave(detector.get(), containerizer.get(), flags);
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  const v1::Resources resources =
    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::ExecutorInfo executorInfo;
  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
  executorInfo.mutable_resources()->CopyFrom(resources);
  executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
      Seconds(10).ns());

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected))
    .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  // Update `executorInfo` with the subscribed `frameworkId`.
  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<Event::Update> updateTaskRunning;
  Future<Event::Update> updateCheckResult;
  Future<Event::Update> updateExplicitReconciliation;
  Future<Event::Update> updateImplicitReconciliation;

  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillOnce(FutureArg<1>(&updateExplicitReconciliation))
    .WillOnce(FutureArg<1>(&updateImplicitReconciliation))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  v1::TaskInfo taskInfo =
    v1::createTask(agentId, resources, SLEEP_COMMAND(10000));

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::COMMAND);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);

  v1::CommandInfo* checkCommand =
    checkInfo->mutable_command()->mutable_command();
  checkCommand->set_value("exit $STATUS");

  v1::Environment::Variable* variable =
    checkCommand->mutable_environment()->add_variables();
  variable->set_name("STATUS");
  variable->set_value("1");

  v1::TaskGroupInfo taskGroup;
  taskGroup.add_tasks()->CopyFrom(taskInfo);

  launchTaskGroup(&mesos, offer, executorInfo, taskGroup);

  AWAIT_READY(updateTaskRunning);
  const v1::TaskStatus& taskRunning = updateTaskRunning->status();

  ASSERT_EQ(TASK_RUNNING, taskRunning.state());
  EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());
  EXPECT_TRUE(taskRunning.has_check_status());
  EXPECT_TRUE(taskRunning.check_status().has_command());
  EXPECT_FALSE(taskRunning.check_status().command().has_exit_code());

  acknowledge(&mesos, frameworkId, taskRunning);

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
  EXPECT_TRUE(checkResult.has_check_status());
  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
  EXPECT_EQ(1, checkResult.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, checkResult);

  // Trigger explicit reconciliation.
  reconcile(
      &mesos,
      frameworkId,
      {std::make_pair(checkResult.task_id(), checkResult.agent_id())});

  AWAIT_READY(updateExplicitReconciliation);
  const v1::TaskStatus& explicitReconciliation =
    updateExplicitReconciliation->status();

  ASSERT_EQ(TASK_RUNNING, explicitReconciliation.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_RECONCILIATION,
      explicitReconciliation.reason());
  EXPECT_EQ(taskInfo.task_id(), explicitReconciliation.task_id());
  EXPECT_TRUE(explicitReconciliation.has_check_status());
  EXPECT_TRUE(explicitReconciliation.check_status().command().has_exit_code());
  EXPECT_EQ(1, explicitReconciliation.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, explicitReconciliation);

  // Trigger implicit reconciliation.
  reconcile(&mesos, frameworkId, {});

  AWAIT_READY(updateImplicitReconciliation);
  const v1::TaskStatus& implicitReconciliation =
    updateImplicitReconciliation->status();

  ASSERT_EQ(TASK_RUNNING, implicitReconciliation.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_RECONCILIATION,
      implicitReconciliation.reason());
  EXPECT_EQ(taskInfo.task_id(), implicitReconciliation.task_id());
  EXPECT_TRUE(implicitReconciliation.has_check_status());
  EXPECT_TRUE(implicitReconciliation.check_status().command().has_exit_code());
  EXPECT_EQ(1, implicitReconciliation.check_status().command().exit_code());

  // Cleanup all mesos launched containers.
  Future<hashset<ContainerID>> containerIds = containerizer->containers();
  AWAIT_READY(containerIds);

  EXPECT_CALL(*scheduler, disconnected(_));

  teardown(&mesos, frameworkId);

  foreach (const ContainerID& containerId, containerIds.get()) {
    AWAIT_READY(containerizer->wait(containerId));
  }
}


// Verifies that a command check's status changes are delivered.
//
// TODO(alexr): When check mocking is available, ensure that *only*
// status changes are delivered.
//
// TODO(gkleiman): Check if this test works on Windows.
TEST_F_TEMP_DISABLED_ON_WINDOWS(
    DefaultExecutorCheckTest,
    CommandCheckStatusChange)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  slave::Flags flags = CreateSlaveFlags();

  Fetcher fetcher(flags);

  // We have to explicitly create a `Containerizer` in non-local mode,
  // because `LaunchNestedContainerSession` (used by command checks)
  // tries to start a IO switchboard, which doesn't work in local mode yet.
  Try<MesosContainerizer*> _containerizer =
    MesosContainerizer::create(flags, false, &fetcher);

  ASSERT_SOME(_containerizer);

  Owned<slave::Containerizer> containerizer(_containerizer.get());
  Owned<MasterDetector> detector = master.get()->createDetector();

  Try<Owned<cluster::Slave>> agent =
    StartSlave(detector.get(), containerizer.get(), flags);
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  const v1::Resources resources =
    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::ExecutorInfo executorInfo;
  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
  executorInfo.mutable_resources()->CopyFrom(resources);
  executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
      Seconds(10).ns());

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected))
    .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  // Update `executorInfo` with the subscribed `frameworkId`.
  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<Event::Update> updateTaskRunning;
  Future<Event::Update> updateCheckResult;
  Future<Event::Update> updateCheckResultChanged;
  Future<Event::Update> updateCheckResultBack;

  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillOnce(FutureArg<1>(&updateCheckResultChanged))
    .WillOnce(FutureArg<1>(&updateCheckResultBack))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  v1::TaskInfo taskInfo =
    v1::createTask(agentId, resources, SLEEP_COMMAND(10000));

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::COMMAND);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);
  checkInfo->mutable_command()->mutable_command()->set_value(
      FLAPPING_CHECK_COMMAND(path::join(os::getcwd(), "XXXXXX")));

  v1::TaskGroupInfo taskGroup;
  taskGroup.add_tasks()->CopyFrom(taskInfo);

  launchTaskGroup(&mesos, offer, executorInfo, taskGroup);

  AWAIT_READY(updateTaskRunning);
  ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
  EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());

  acknowledge(&mesos, frameworkId, updateTaskRunning->status());

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
  EXPECT_EQ(1, checkResult.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, checkResult);

  AWAIT_READY(updateCheckResultChanged);
  const v1::TaskStatus& checkResultChanged = updateCheckResultChanged->status();

  ASSERT_EQ(TASK_RUNNING, checkResultChanged.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResultChanged.reason());
  EXPECT_TRUE(checkResultChanged.check_status().command().has_exit_code());
  EXPECT_EQ(0, checkResultChanged.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, checkResultChanged);

  AWAIT_READY(updateCheckResultBack);
  const v1::TaskStatus& checkResultBack = updateCheckResultBack->status();

  ASSERT_EQ(TASK_RUNNING, checkResultBack.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResultBack.reason());
  EXPECT_TRUE(checkResultBack.check_status().command().has_exit_code());
  EXPECT_EQ(1, checkResultBack.check_status().command().exit_code());

  // Cleanup all mesos launched containers.
  Future<hashset<ContainerID>> containerIds = containerizer->containers();
  AWAIT_READY(containerIds);

  EXPECT_CALL(*scheduler, disconnected(_));

  teardown(&mesos, frameworkId);

  foreach (const ContainerID& containerId, containerIds.get()) {
    AWAIT_READY(containerizer->wait(containerId));
  }
}


// Verifies that an environment variable set for the task is seen by its
// command check.
TEST_F_TEMP_DISABLED_ON_WINDOWS(
    DefaultExecutorCheckTest,
    CommandCheckSeesParentsEnv)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  slave::Flags flags = CreateSlaveFlags();

  Fetcher fetcher(flags);

  // We have to explicitly create a `Containerizer` in non-local mode,
  // because `LaunchNestedContainerSession` (used by command checks)
  // tries to start a IO switchboard, which doesn't work in local mode yet.
  Try<MesosContainerizer*> _containerizer =
    MesosContainerizer::create(flags, false, &fetcher);

  ASSERT_SOME(_containerizer);

  Owned<slave::Containerizer> containerizer(_containerizer.get());
  Owned<MasterDetector> detector = master.get()->createDetector();

  Try<Owned<cluster::Slave>> agent =
    StartSlave(detector.get(), containerizer.get(), flags);
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  const v1::Resources resources =
    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::ExecutorInfo executorInfo;
  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
  executorInfo.mutable_resources()->CopyFrom(resources);
  executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
      Seconds(10).ns());

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected))
    .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  // Update `executorInfo` with the subscribed `frameworkId`.
  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<Event::Update> updateTaskRunning;
  Future<Event::Update> updateCheckResult;

  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  const string envKey = "MESOS_CHECK_TASK_ENV";
  const int32_t envValue = 42;

  v1::TaskInfo taskInfo =
    v1::createTask(agentId, resources, SLEEP_COMMAND(10000));

  v1::Environment::Variable* variable =
    taskInfo.mutable_command()->mutable_environment()->add_variables();
  variable->set_name(envKey);
  variable->set_value(stringify(envValue));

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::COMMAND);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);

  v1::CommandInfo* checkCommand =
    checkInfo->mutable_command()->mutable_command();
  checkCommand->set_value("exit $" + envKey);

  v1::TaskGroupInfo taskGroup;
  taskGroup.add_tasks()->CopyFrom(taskInfo);

  launchTaskGroup(&mesos, offer, executorInfo, taskGroup);

  AWAIT_READY(updateTaskRunning);
  const v1::TaskStatus& taskRunning = updateTaskRunning->status();

  ASSERT_EQ(TASK_RUNNING, taskRunning.state());
  EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());

  acknowledge(&mesos, frameworkId, taskRunning);

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());

  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
  EXPECT_EQ(envValue, checkResult.check_status().command().exit_code());

  // Cleanup all mesos launched containers.
  Future<hashset<ContainerID>> containerIds = containerizer->containers();
  AWAIT_READY(containerIds);

  EXPECT_CALL(*scheduler, disconnected(_));

  teardown(&mesos, frameworkId);

  foreach (const ContainerID& containerId, containerIds.get()) {
    AWAIT_READY(containerizer->wait(containerId));
  }
}


// Verifies that a task and its command check has the same working directory.
TEST_F_TEMP_DISABLED_ON_WINDOWS(
    DefaultExecutorCheckTest,
    CommandCheckSharesWorkDirWithTask)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  slave::Flags flags = CreateSlaveFlags();

  Fetcher fetcher(flags);

  // We have to explicitly create a `Containerizer` in non-local mode,
  // because `LaunchNestedContainerSession` (used by command checks)
  // tries to start a IO switchboard, which doesn't work in local mode yet.
  Try<MesosContainerizer*> _containerizer =
    MesosContainerizer::create(flags, false, &fetcher);

  ASSERT_SOME(_containerizer);

  Owned<slave::Containerizer> containerizer(_containerizer.get());
  Owned<MasterDetector> detector = master.get()->createDetector();

  Try<Owned<cluster::Slave>> agent =
    StartSlave(detector.get(), containerizer.get(), flags);
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  const v1::Resources resources =
    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::ExecutorInfo executorInfo;
  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
  executorInfo.mutable_resources()->CopyFrom(resources);
  executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
      Seconds(10).ns());

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected))
    .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  // Update `executorInfo` with the subscribed `frameworkId`.
  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<Event::Update> updateTaskRunning;
  Future<Event::Update> updateCheckResult;

  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  const string filename = "nested_inherits_work_dir";

  v1::TaskInfo taskInfo = v1::createTask(
      agentId,
      resources,
      v1::createCommandInfo(
          strings::format("touch %s; sleep 1000", filename).get()));

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::COMMAND);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);

  // Wait in a busy loop until the file has been created.
  checkInfo->mutable_command()->mutable_command()->CopyFrom(
      v1::createCommandInfo("while [ -f " + filename + "]; do :; done"));

  v1::TaskGroupInfo taskGroup;
  taskGroup.add_tasks()->CopyFrom(taskInfo);

  launchTaskGroup(&mesos, offer, executorInfo, taskGroup);

  AWAIT_READY(updateTaskRunning);
  const v1::TaskStatus& taskRunning = updateTaskRunning->status();

  ASSERT_EQ(TASK_RUNNING, taskRunning.state());
  EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());

  acknowledge(&mesos, frameworkId, taskRunning);

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());

  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
  EXPECT_EQ(0, checkResult.check_status().command().exit_code());

  // Cleanup all mesos launched containers.
  Future<hashset<ContainerID>> containerIds = containerizer->containers();
  AWAIT_READY(containerIds);

  EXPECT_CALL(*scheduler, disconnected(_));

  teardown(&mesos, frameworkId);

  foreach (const ContainerID& containerId, containerIds.get()) {
    AWAIT_READY(containerizer->wait(containerId));
  }
}


// Verifies that when a command check times out after a successful check,
// an empty check status update is delivered.
//
// TODO(gkleiman): Check if this test works on Windows.
TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, CommandCheckTimeout)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  slave::Flags flags = CreateSlaveFlags();

  Fetcher fetcher(flags);

  // We have to explicitly create a `Containerizer` in non-local mode,
  // because `LaunchNestedContainerSession` (used by command checks)
  // tries to start a IO switchboard, which doesn't work in local mode yet.
  Try<MesosContainerizer*> _containerizer =
    MesosContainerizer::create(flags, false, &fetcher);

  ASSERT_SOME(_containerizer);

  Owned<slave::Containerizer> containerizer(_containerizer.get());
  Owned<MasterDetector> detector = master.get()->createDetector();

  Try<Owned<cluster::Slave>> agent =
    StartSlave(detector.get(), containerizer.get(), flags);
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  const v1::Resources resources =
    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::ExecutorInfo executorInfo;
  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
  executorInfo.mutable_resources()->CopyFrom(resources);
  executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
      Seconds(10).ns());

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected))
    .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  // Update `executorInfo` with the subscribed `frameworkId`.
  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<Event::Update> updateTaskRunning;
  Future<Event::Update> updateCheckResult;
  Future<Event::Update> updateCheckResultTimeout;

  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillOnce(FutureArg<1>(&updateCheckResultTimeout))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  v1::TaskInfo taskInfo =
    v1::createTask(agentId, resources, SLEEP_COMMAND(10000));

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::COMMAND);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);
  checkInfo->set_timeout_seconds(1);
  checkInfo->mutable_command()->mutable_command()->set_value(
      STALLING_CHECK_COMMAND(path::join(os::getcwd(), "XXXXXX")));

  v1::TaskGroupInfo taskGroup;
  taskGroup.add_tasks()->CopyFrom(taskInfo);

  launchTaskGroup(&mesos, offer, executorInfo, taskGroup);

  AWAIT_READY(updateTaskRunning);
  ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
  EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());

  acknowledge(&mesos, frameworkId, updateTaskRunning->status());

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
  EXPECT_EQ(1, checkResult.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, checkResult);

  AWAIT_READY(updateCheckResultTimeout);
  const v1::TaskStatus& checkResultTimeout = updateCheckResultTimeout->status();

  ASSERT_EQ(TASK_RUNNING, checkResultTimeout.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResultTimeout.reason());
  EXPECT_FALSE(checkResultTimeout.check_status().command().has_exit_code());

  // Cleanup all mesos launched containers.
  Future<hashset<ContainerID>> containerIds = containerizer->containers();
  AWAIT_READY(containerIds);

  EXPECT_CALL(*scheduler, disconnected(_));

  teardown(&mesos, frameworkId);

  foreach (const ContainerID& containerId, containerIds.get()) {
    AWAIT_READY(containerizer->wait(containerId));
  }
}


// Verifies that when both command check and health check are specified,
// health and check updates include both statuses. Also verifies that
// both statuses are included upon reconciliation.
//
// TODO(gkleiman): Check if this test works on Windows.
TEST_F(DefaultExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  slave::Flags flags = CreateSlaveFlags();

  Fetcher fetcher(flags);

  // We have to explicitly create a `Containerizer` in non-local mode,
  // because `LaunchNestedContainerSession` (used by command checks)
  // tries to start a IO switchboard, which doesn't work in local mode yet.
  Try<MesosContainerizer*> _containerizer =
    MesosContainerizer::create(flags, false, &fetcher);

  ASSERT_SOME(_containerizer);

  Owned<slave::Containerizer> containerizer(_containerizer.get());
  Owned<MasterDetector> detector = master.get()->createDetector();

  Try<Owned<cluster::Slave>> agent =
    StartSlave(detector.get(), containerizer.get(), flags);
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  const v1::Resources resources =
    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::ExecutorInfo executorInfo;
  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
  executorInfo.mutable_resources()->CopyFrom(resources);
  executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
      Seconds(10).ns());

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected))
    .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  // Update `executorInfo` with the subscribed `frameworkId`.
  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<Event::Update> updateTaskRunning;
  Future<Event::Update> updateCheckResult;
  Future<Event::Update> updateHealthResult;
  Future<Event::Update> updateImplicitReconciliation;

  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillOnce(FutureArg<1>(&updateHealthResult))
    .WillOnce(FutureArg<1>(&updateImplicitReconciliation))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  v1::TaskInfo taskInfo =
      v1::createTask(agentId, resources, SLEEP_COMMAND(10000));

  // Set both check and health check interval to an increased value to
  // prevent a second update coming before reconciliation response.
  int interval = 10;

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::COMMAND);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(interval);
  checkInfo->mutable_command()->mutable_command()->set_value("exit 1");

  // Delay health check for 1s to ensure health update comes after check update.
  //
  // TODO(alexr): This can lead to flakiness on busy agents. A more robust
  // approach could be setting the grace period to MAX_INT, and make the
  // health check pass iff a file created by the check exists. Alternatively,
  // we can relax the expectation that the check update is delivered first.
  v1::HealthCheck* healthCheckInfo = taskInfo.mutable_health_check();
  healthCheckInfo->set_type(v1::HealthCheck::COMMAND);
  healthCheckInfo->set_delay_seconds(1);
  healthCheckInfo->set_interval_seconds(interval);
  healthCheckInfo->mutable_command()->set_value("exit 0");

  launchTask(&mesos, offer, taskInfo);

  AWAIT_READY(updateTaskRunning);
  ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
  EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());

  acknowledge(&mesos, frameworkId, updateTaskRunning->status());

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
  EXPECT_FALSE(checkResult.has_healthy());
  EXPECT_TRUE(checkResult.has_check_status());
  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
  EXPECT_EQ(1, checkResult.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, checkResult);

  AWAIT_READY(updateHealthResult);
  const v1::TaskStatus& healthResult = updateHealthResult->status();

  ASSERT_EQ(TASK_RUNNING, healthResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_HEALTH_CHECK_STATUS_UPDATED,
      healthResult.reason());
  EXPECT_EQ(taskInfo.task_id(), healthResult.task_id());
  EXPECT_TRUE(healthResult.has_healthy());
  EXPECT_TRUE(healthResult.healthy());
  EXPECT_TRUE(healthResult.has_check_status());
  EXPECT_TRUE(healthResult.check_status().command().has_exit_code());
  EXPECT_EQ(1, healthResult.check_status().command().exit_code());

  acknowledge(&mesos, frameworkId, healthResult);

  // Trigger implicit reconciliation.
  reconcile(&mesos, frameworkId, {});

  AWAIT_READY(updateImplicitReconciliation);
  const v1::TaskStatus& implicitReconciliation =
    updateImplicitReconciliation->status();

  ASSERT_EQ(TASK_RUNNING, implicitReconciliation.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_RECONCILIATION,
      implicitReconciliation.reason());
  EXPECT_EQ(taskInfo.task_id(), implicitReconciliation.task_id());
  EXPECT_TRUE(implicitReconciliation.has_healthy());
  EXPECT_TRUE(implicitReconciliation.healthy());
  EXPECT_TRUE(implicitReconciliation.has_check_status());
  EXPECT_TRUE(implicitReconciliation.check_status().command().has_exit_code());
  EXPECT_EQ(1, implicitReconciliation.check_status().command().exit_code());

  // Cleanup all mesos launched containers.
  Future<hashset<ContainerID>> containerIds = containerizer->containers();
  AWAIT_READY(containerIds);

  EXPECT_CALL(*scheduler, disconnected(_));

  teardown(&mesos, frameworkId);

  foreach (const ContainerID& containerId, containerIds.get()) {
    AWAIT_READY(containerizer->wait(containerId));
  }
}


// Verifies that task groups with multiple tasks, each one with a check,
// are supported by the default executor and its status updates are delivered.
TEST_F_TEMP_DISABLED_ON_WINDOWS(
    DefaultExecutorCheckTest, MultipleTasksWithChecks)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  // Disable AuthN on the agent.
  slave::Flags flags = CreateSlaveFlags();
  flags.authenticate_http_readwrite = false;

  Owned<MasterDetector> detector = master.get()->createDetector();
  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), flags);
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  const v1::Resources resources =
    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::ExecutorInfo executorInfo;
  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
  executorInfo.mutable_resources()->CopyFrom(resources);

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected));

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  // Update `executorInfo` with the subscribed `frameworkId`.
  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<v1::scheduler::Event::Update> updates[4];

  {
    testing::InSequence dummy;
    for (int i = 0; i < 4; i++) {
      EXPECT_CALL(*scheduler, update(_, _))
        .WillOnce(FutureArg<1>(&updates[i]));
    }

    EXPECT_CALL(*scheduler, update(_, _))
      .WillRepeatedly(Return()); // Ignore subsequent updates.
  }

  v1::TaskInfo taskInfo1 =
    v1::createTask(agentId, resources, SLEEP_COMMAND(10000));

  v1::CheckInfo* checkInfo = taskInfo1.mutable_check();
  checkInfo->set_type(v1::CheckInfo::TCP);
  checkInfo->mutable_tcp()->set_port(getFreePort().get());
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);
  checkInfo->set_timeout_seconds(1);

  v1::TaskInfo taskInfo2 =
    v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
  taskInfo2.mutable_check()->CopyFrom(taskInfo1.check());

  v1::TaskGroupInfo taskGroup = v1::createTaskGroupInfo({taskInfo1, taskInfo2});

  launchTaskGroup(&mesos, offer, executorInfo, taskGroup);

  enum class Stage { INITIAL, RUNNING, CHECKED };
  hashmap<v1::TaskID, Stage> taskStages;
  taskStages.put(taskInfo1.task_id(), Stage::INITIAL);
  taskStages.put(taskInfo2.task_id(), Stage::INITIAL);

  for (int i = 0; i < 4; i++ ) {
    AWAIT_READY(updates[i]);

    const v1::TaskStatus& taskStatus = updates[i]->status();

    Option<Stage> taskStage = taskStages.get(taskStatus.task_id());
    ASSERT_SOME(taskStage);

    switch (taskStage.get()) {
      case Stage::INITIAL: {
        ASSERT_EQ(TASK_RUNNING, taskStatus.state());
        ASSERT_TRUE(taskStatus.check_status().has_tcp());
        ASSERT_FALSE(taskStatus.check_status().tcp().has_succeeded());

        taskStages.put(taskStatus.task_id(), Stage::RUNNING);

        break;
      }
      case Stage::RUNNING: {
        ASSERT_EQ(TASK_RUNNING, taskStatus.state());
        ASSERT_EQ(
            v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
            taskStatus.reason());
        ASSERT_TRUE(taskStatus.check_status().has_tcp());
        ASSERT_TRUE(taskStatus.check_status().tcp().has_succeeded());

        taskStages.put(taskStatus.task_id(), Stage::CHECKED);

        break;
      }
      case Stage::CHECKED: {
        FAIL() << "Unexpected task update: " << updates[1]->DebugString();
        break;
      }
    }

    // Acknowledge (to be able to get the next update).
    acknowledge(&mesos, frameworkId, taskStatus);
  }
}


// Verifies that an HTTP check is supported by the default executor and
// its status is delivered in a task status update.
//
// TODO(josephw): Enable this. Mesos builds its own `curl.exe`, since it
// can't rely on a package manager to get it. We need to make this test use
// that executable.
TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, HTTPCheckDelivered)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();
  Try<Owned<cluster::Slave>> agent =
    StartSlave(detector.get(), CreateSlaveFlags());
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  const v1::Resources resources =
    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::ExecutorInfo executorInfo;
  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
  executorInfo.mutable_resources()->CopyFrom(resources);

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected));

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  // Update `executorInfo` with the subscribed `frameworkId`.
  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<v1::scheduler::Event::Update> updateTaskRunning;
  Future<v1::scheduler::Event::Update> updateCheckResult;
  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  const uint16_t testPort = getFreePort().get();

  // Use `test-helper` to launch a simple HTTP
  // server to respond to HTTP checks.
  const string command = strings::format(
      "%s %s --ip=127.0.0.1 --port=%u",
      getTestHelperPath("test-helper"),
      HttpServerTestHelper::NAME,
      testPort).get();

  v1::TaskInfo taskInfo = v1::createTask(agentId, resources, command);

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::HTTP);
  checkInfo->mutable_http()->set_port(testPort);
  checkInfo->mutable_http()->set_path("/help");
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);

  v1::TaskGroupInfo taskGroup;
  taskGroup.add_tasks()->CopyFrom(taskInfo);

  launchTaskGroup(&mesos, offer, executorInfo, taskGroup);

  AWAIT_READY(updateTaskRunning);
  const v1::TaskStatus& taskRunning = updateTaskRunning->status();

  ASSERT_EQ(TASK_RUNNING, taskRunning.state());
  EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());
  EXPECT_TRUE(taskRunning.has_check_status());
  EXPECT_TRUE(taskRunning.check_status().has_http());
  EXPECT_FALSE(taskRunning.check_status().http().has_status_code());

  // Acknowledge (to be able to get the next update).
  acknowledge(&mesos, frameworkId, taskRunning);

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
  EXPECT_TRUE(checkResult.has_check_status());
  EXPECT_TRUE(checkResult.check_status().http().has_status_code());

  // Since it takes some time for the HTTP server to start serving requests,
  // the first several HTTP checks may not return 200. However we still expect
  // a successful HTTP check and hence an extra status update.
  if (checkResult.check_status().http().status_code() != 200)
  {
    // Inject an expectation for the extra status update we expect.
    Future<v1::scheduler::Event::Update> updateCheckResult2;
    EXPECT_CALL(*scheduler, update(_, _))
      .WillOnce(FutureArg<1>(&updateCheckResult2))
      .RetiresOnSaturation();

    // Acknowledge (to be able to get the next update).
    acknowledge(&mesos, frameworkId, checkResult);

    AWAIT_READY(updateCheckResult2);
    const v1::TaskStatus& checkResult2 = updateCheckResult2->status();

    ASSERT_EQ(TASK_RUNNING, checkResult2.state());
    ASSERT_EQ(
        v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
        checkResult2.reason());
    EXPECT_EQ(taskInfo.task_id(), checkResult2.task_id());
    EXPECT_TRUE(checkResult2.has_check_status());
    EXPECT_TRUE(checkResult2.check_status().http().has_status_code());
    EXPECT_EQ(200, checkResult2.check_status().http().status_code());
  }
}


// Verifies that a TCP check is supported by the default executor and
// its status is delivered in a task status update.
//
// TODO(alexr): Check if this test works on Windows.
TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, TCPCheckDelivered)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  // Disable AuthN on the agent.
  slave::Flags flags = CreateSlaveFlags();
  flags.authenticate_http_readwrite = false;

  Owned<MasterDetector> detector = master.get()->createDetector();
  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), flags);
  ASSERT_SOME(agent);

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;

  const v1::Resources resources =
    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();

  v1::ExecutorInfo executorInfo;
  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
  executorInfo.mutable_resources()->CopyFrom(resources);

  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  Future<Nothing> connected;
  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(FutureSatisfy(&connected));

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(connected);

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  subscribe(&mesos, frameworkInfo);

  AWAIT_READY(subscribed);

  v1::FrameworkID frameworkId(subscribed->framework_id());

  // Update `executorInfo` with the subscribed `frameworkId`.
  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);

  AWAIT_READY(offers);
  EXPECT_NE(0, offers->offers().size());
  const v1::Offer& offer = offers->offers(0);
  const v1::AgentID& agentId = offer.agent_id();

  Future<v1::scheduler::Event::Update> updateTaskRunning;
  Future<v1::scheduler::Event::Update> updateCheckResult;
  EXPECT_CALL(*scheduler, update(_, _))
    .WillOnce(FutureArg<1>(&updateTaskRunning))
    .WillOnce(FutureArg<1>(&updateCheckResult))
    .WillRepeatedly(Return()); // Ignore subsequent updates.

  const uint16_t testPort = getFreePort().get();

  // Use `test-helper` to launch a simple HTTP
  // server to respond to TCP checks.
  const string command = strings::format(
      "%s %s --ip=127.0.0.1 --port=%u",
      getTestHelperPath("test-helper"),
      HttpServerTestHelper::NAME,
      testPort).get();

  v1::TaskInfo taskInfo = v1::createTask(agentId, resources, command);

  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
  checkInfo->set_type(v1::CheckInfo::TCP);
  checkInfo->mutable_tcp()->set_port(testPort);
  checkInfo->set_delay_seconds(0);
  checkInfo->set_interval_seconds(0);

  v1::TaskGroupInfo taskGroup;
  taskGroup.add_tasks()->CopyFrom(taskInfo);

  launchTaskGroup(&mesos, offer, executorInfo, taskGroup);

  AWAIT_READY(updateTaskRunning);
  const v1::TaskStatus& taskRunning = updateTaskRunning->status();

  ASSERT_EQ(TASK_RUNNING, taskRunning.state());
  EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());
  EXPECT_TRUE(taskRunning.has_check_status());
  EXPECT_TRUE(taskRunning.check_status().has_tcp());
  EXPECT_FALSE(taskRunning.check_status().tcp().has_succeeded());

  // Acknowledge (to be able to get the next update).
  acknowledge(&mesos, frameworkId, taskRunning);

  AWAIT_READY(updateCheckResult);
  const v1::TaskStatus& checkResult = updateCheckResult->status();

  ASSERT_EQ(TASK_RUNNING, checkResult.state());
  ASSERT_EQ(
      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
      checkResult.reason());
  EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
  EXPECT_TRUE(checkResult.has_check_status());
  EXPECT_TRUE(checkResult.check_status().tcp().has_succeeded());

  // Since it takes some time for the HTTP server to start serving requests,
  // the first several TCP checks may fail. However we still expect a
  // successful TCP check and hence an extra status update.
  if (checkResult.check_status().tcp().succeeded() == false)
  {
    // Inject an expectation for the extra status update we expect.
    Future<v1::scheduler::Event::Update> updateCheckResult2;
    EXPECT_CALL(*scheduler, update(_, _))
      .WillOnce(FutureArg<1>(&updateCheckResult2))
      .RetiresOnSaturation();

    // Acknowledge (to be able to get the next update).
    acknowledge(&mesos, frameworkId, checkResult);

    AWAIT_READY(updateCheckResult2);
    const v1::TaskStatus& checkResult2 = updateCheckResult2->status();

    ASSERT_EQ(TASK_RUNNING, checkResult2.state());
    ASSERT_EQ(
        v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
        checkResult2.reason());
    EXPECT_EQ(taskInfo.task_id(), checkResult2.task_id());
    EXPECT_TRUE(checkResult2.has_check_status());
    EXPECT_TRUE(checkResult2.check_status().tcp().has_succeeded());
    EXPECT_EQ(true, checkResult2.check_status().tcp().succeeded());
  }
}


// These are protobuf validation tests.
//
// TODO(alexr): Move these tests once validation code is moved closer to
// protobuf definitions.

// This tests ensures `CheckInfo` protobuf is validated correctly.
TEST_F(CheckTest, CheckInfoValidation)
{
  using namespace mesos::internal::checks;

  // Check type must be set to a known value.
  {
    CheckInfo checkInfo;

    Option<Error> validate = validation::checkInfo(checkInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ("CheckInfo must specify 'type'", validate->message);

    checkInfo.set_type(CheckInfo::UNKNOWN);
    validate = validation::checkInfo(checkInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ("'UNKNOWN' is not a valid check type", validate->message);
  }

  // The associated message for a given type must be set.
  {
    CheckInfo checkInfo;

    checkInfo.set_type(CheckInfo::COMMAND);
    Option<Error> validate = validation::checkInfo(checkInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ(
        "Expecting 'command' to be set for COMMAND check",
        validate->message);

    checkInfo.set_type(CheckInfo::HTTP);
    validate = validation::checkInfo(checkInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ(
        "Expecting 'http' to be set for HTTP check",
        validate->message);

    checkInfo.set_type(CheckInfo::TCP);
    validate = validation::checkInfo(checkInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ(
        "Expecting 'tcp' to be set for TCP check",
        validate->message);
  }

  // Command check must specify an actual command in `command.command.value`.
  {
    CheckInfo checkInfo;

    checkInfo.set_type(CheckInfo::COMMAND);
    checkInfo.mutable_command()->CopyFrom(CheckInfo::Command());
    Option<Error> validate = validation::checkInfo(checkInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ("Command check must contain 'shell command'", validate->message);

    checkInfo.mutable_command()->mutable_command()->CopyFrom(CommandInfo());
    validate = validation::checkInfo(checkInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ("Command check must contain 'shell command'", validate->message);
  }

  // Command check must specify a command with a valid environment.
  // Environment variable's `value` field must be set in this case.
  {
    CheckInfo checkInfo;
    checkInfo.set_type(CheckInfo::COMMAND);
    checkInfo.mutable_command()->CopyFrom(CheckInfo::Command());
    checkInfo.mutable_command()->mutable_command()->CopyFrom(
        createCommandInfo("exit 0"));

    Option<Error> validate = validation::checkInfo(checkInfo);
    EXPECT_NONE(validate);

    Environment::Variable* variable =
      checkInfo.mutable_command()->mutable_command()->mutable_environment()
          ->mutable_variables()->Add();
    variable->set_name("ENV_VAR_KEY");

    validate = validation::checkInfo(checkInfo);
    EXPECT_SOME(validate);
  }

  // HTTP check may specify a path starting with '/'.
  {
    CheckInfo checkInfo;

    checkInfo.set_type(CheckInfo::HTTP);
    checkInfo.mutable_http()->set_port(8080);

    Option<Error> validate = validation::checkInfo(checkInfo);
    EXPECT_NONE(validate);

    checkInfo.mutable_http()->set_path("healthz");
    validate = validation::checkInfo(checkInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ(
        "The path 'healthz' of HTTP check must start with '/'",
        validate->message);
  }

  // Check's duration parameters must be non-negative.
  {
    CheckInfo checkInfo;

    checkInfo.set_type(CheckInfo::HTTP);
    checkInfo.mutable_http()->set_port(8080);

    checkInfo.set_delay_seconds(-1.0);
    Option<Error> validate = validation::checkInfo(checkInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ(
        "Expecting 'delay_seconds' to be non-negative",
        validate->message);

    checkInfo.set_delay_seconds(0.0);
    checkInfo.set_interval_seconds(-1.0);
    validate = validation::checkInfo(checkInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ(
        "Expecting 'interval_seconds' to be non-negative",
        validate->message);

    checkInfo.set_interval_seconds(0.0);
    checkInfo.set_timeout_seconds(-1.0);
    validate = validation::checkInfo(checkInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ(
        "Expecting 'timeout_seconds' to be non-negative",
        validate->message);

    checkInfo.set_timeout_seconds(0.0);
    validate = validation::checkInfo(checkInfo);
    EXPECT_NONE(validate);
  }
}


// This tests ensures `CheckStatusInfo` protobuf is validated correctly.
TEST_F(CheckTest, CheckStatusInfoValidation)
{
  using namespace mesos::internal::checks;

  // Check status type must be set to a known value.
  {
    CheckStatusInfo checkStatusInfo;

    Option<Error> validate = validation::checkStatusInfo(checkStatusInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ("CheckStatusInfo must specify 'type'", validate->message);

    checkStatusInfo.set_type(CheckInfo::UNKNOWN);
    validate = validation::checkStatusInfo(checkStatusInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ(
        "'UNKNOWN' is not a valid check's status type",
        validate->message);
  }

  // The associated message for a given type must be set.
  {
    CheckStatusInfo checkStatusInfo;

    checkStatusInfo.set_type(CheckInfo::COMMAND);
    Option<Error> validate = validation::checkStatusInfo(checkStatusInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ(
        "Expecting 'command' to be set for COMMAND check's status",
        validate->message);

    checkStatusInfo.set_type(CheckInfo::HTTP);
    validate = validation::checkStatusInfo(checkStatusInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ(
        "Expecting 'http' to be set for HTTP check's status",
        validate->message);

    checkStatusInfo.set_type(CheckInfo::TCP);
    validate = validation::checkStatusInfo(checkStatusInfo);
    EXPECT_SOME(validate);
    EXPECT_EQ(
        "Expecting 'tcp' to be set for TCP check's status",
        validate->message);
  }
}

} // namespace tests {
} // namespace internal {
} // namespace mesos {
