blob: e9ef1e208cb01535e9366db7872b922c8f06ec40 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gmock/gmock.h>
#include <list>
#include <string>
#include <vector>
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/pid.hpp>
#include <stout/none.hpp>
#include <stout/os.hpp>
#include <stout/protobuf.hpp>
#include <stout/result.hpp>
#include <stout/try.hpp>
#include "master/master.hpp"
#include "slave/constants.hpp"
#include "slave/paths.hpp"
#include "slave/slave.hpp"
#include "messages/messages.hpp"
#include "tests/mesos.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
using namespace mesos::internal::slave::paths;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
using process::Clock;
using process::Future;
using process::PID;
using std::list;
using std::string;
using std::vector;
using testing::_;
using testing::AtMost;
using testing::Return;
using testing::SaveArg;
// TODO(benh): Move this into utils, make more generic, and use in
// other tests.
vector<TaskInfo> createTasks(const Offer& offer)
{
TaskInfo task;
task.set_name("test-task");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offer.slave_id());
task.mutable_resources()->MergeFrom(offer.resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
vector<TaskInfo> tasks;
tasks.push_back(task);
return tasks;
}
class StatusUpdateManagerTest: public MesosTest {};
TEST_F(StatusUpdateManagerTest, CheckpointStatusUpdate)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
slave::Flags flags = CreateSlaveFlags();
flags.checkpoint = true;
Try<PID<Slave> > slave = StartSlave(&exec, flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true); // Enable checkpointing.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(_, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
Future<Nothing> _statusUpdateAcknowledgement =
FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
AWAIT_READY(_statusUpdateAcknowledgement);
// Ensure that both the status update and its acknowledgement are
// correctly checkpointed.
Try<list<string> > found = os::find(flags.work_dir, TASK_UPDATES_FILE);
ASSERT_SOME(found);
ASSERT_EQ(1u, found.get().size());
Try<int> fd = os::open(found.get().front(), O_RDONLY);
ASSERT_SOME(fd);
int updates = 0;
int acks = 0;
string uuid;
Result<StatusUpdateRecord> record = None();
while (true) {
record = ::protobuf::read<StatusUpdateRecord>(fd.get());
ASSERT_FALSE(record.isError());
if (record.isNone()) { // Reached EOF.
break;
}
if (record.get().type() == StatusUpdateRecord::UPDATE) {
EXPECT_EQ(TASK_RUNNING, record.get().update().status().state());
uuid = record.get().update().uuid();
updates++;
} else {
EXPECT_EQ(uuid, record.get().uuid());
acks++;
}
}
ASSERT_EQ(1, updates);
ASSERT_EQ(1, acks);
close(fd.get());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown();
}
TEST_F(StatusUpdateManagerTest, RetryStatusUpdate)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
slave::Flags flags = CreateSlaveFlags();
flags.checkpoint = true;
Try<PID<Slave> > slave = StartSlave(&exec, flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true); // Enable checkpointing.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(_, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<StatusUpdateMessage> statusUpdateMessage =
DROP_PROTOBUF(StatusUpdateMessage(), master.get(), _);
Clock::pause();
driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
AWAIT_READY(statusUpdateMessage);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
Clock::resume();
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown();
}
// This test verifies that status update manager ignores
// duplicate ACK for an earlier update when it is waiting
// for an ACK for a later update. This could happen when the
// duplicate ACK is for a retried update.
TEST_F(StatusUpdateManagerTest, IgnoreDuplicateStatusUpdateAck)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
slave::Flags flags = CreateSlaveFlags();
flags.checkpoint = true;
Try<PID<Slave> > slave = StartSlave(&exec, flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true); // Enable checkpointing.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
FrameworkID frameworkId;
EXPECT_CALL(sched, registered(_, _, _))
.WillOnce(SaveArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
ExecutorDriver* execDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(SaveArg<0>(&execDriver));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
// Drop the first update, so that status update manager
// resends the update.
Future<StatusUpdateMessage> statusUpdateMessage =
DROP_PROTOBUF(StatusUpdateMessage(), master.get(), _);
Clock::pause();
driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
AWAIT_READY(statusUpdateMessage);
StatusUpdate update = statusUpdateMessage.get().update();
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
// This is the ACK for the retried update.
Future<Nothing> ack =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
AWAIT_READY(ack);
// Now send TASK_FINISHED update so that the status update manager
// is waiting for its ACK, which it never gets because we drop the
// update.
DROP_PROTOBUFS(StatusUpdateMessage(), master.get(), _);
Future<Nothing> update2 = FUTURE_DISPATCH(_, &Slave::_statusUpdate);
TaskStatus status2 = status.get();
status2.set_state(TASK_FINISHED);
execDriver->sendStatusUpdate(status2);
AWAIT_READY(update2);
// This is to catch the duplicate ack for TASK_RUNNING.
Future<Nothing> duplicateAck =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
// Now send a duplicate ACK for the TASK_RUNNING update.
process::dispatch(
slave.get(),
&Slave::statusUpdateAcknowledgement,
master.get(),
update.slave_id(),
frameworkId,
update.status().task_id(),
update.uuid());
AWAIT_READY(duplicateAck);
Clock::resume();
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown();
}
// This test verifies that status update manager ignores
// unexpected ACK for an earlier update when it is waiting
// for an ACK for another update. We do this by dropping ACKs
// for the original update and sending a random ACK to the slave.
TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
slave::Flags flags = CreateSlaveFlags();
flags.checkpoint = true;
Try<PID<Slave> > slave = StartSlave(&exec, flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true); // Enable checkpointing.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
FrameworkID frameworkId;
EXPECT_CALL(sched, registered(_, _, _))
.WillOnce(SaveArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
ExecutorDriver* execDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(SaveArg<0>(&execDriver));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<StatusUpdateMessage> statusUpdateMessage =
FUTURE_PROTOBUF(StatusUpdateMessage(), master.get(), _);
// Drop the ACKs, so that status update manager
// retries the update.
DROP_PROTOBUFS(StatusUpdateAcknowledgementMessage(), _, _);
driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
AWAIT_READY(statusUpdateMessage);
StatusUpdate update = statusUpdateMessage.get().update();
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
Future<Nothing> unexpectedAck =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
// Now send an ACK with a random UUID.
process::dispatch(
slave.get(),
&Slave::statusUpdateAcknowledgement,
master.get(),
update.slave_id(),
frameworkId,
update.status().task_id(),
UUID::random().toBytes());
AWAIT_READY(unexpectedAck);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown();
}
// This test verifies that the slave and status update manager
// properly handle duplicate terminal status updates, when the
// second update is received before the ACK for the first update.
// The proper behavior here is for the status update manager to
// drop the duplicate update.
TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
slave::Flags flags = CreateSlaveFlags();
flags.checkpoint = true;
Try<PID<Slave> > slave = StartSlave(&exec, flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true); // Enable checkpointing.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
FrameworkID frameworkId;
EXPECT_CALL(sched, registered(_, _, _))
.WillOnce(SaveArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
ExecutorDriver* execDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(SaveArg<0>(&execDriver));
// Send a terminal update right away.
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
// Drop the first ACK from the scheduler to the slave.
Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get());
Future<Nothing> __statusUpdate =
FUTURE_DISPATCH(slave.get(), &Slave::__statusUpdate);
Clock::pause();
driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
AWAIT_READY(status);
EXPECT_EQ(TASK_FINISHED, status.get().state());
AWAIT_READY(statusUpdateAckMessage);
// At this point the status update manager has enqueued
// TASK_FINISHED update.
AWAIT_READY(__statusUpdate);
Future<Nothing> __statusUpdate2 =
FUTURE_DISPATCH(slave.get(), &Slave::__statusUpdate);
// Now send a TASK_KILLED update for the same task.
TaskStatus status2 = status.get();
status2.set_state(TASK_KILLED);
execDriver->sendStatusUpdate(status2);
// At this point the status update manager has enqueued
// TASK_FINISHED and TASK_KILLED updates.
AWAIT_READY(__statusUpdate2);
// After we advance the clock, the scheduler should receive
// the retried TASK_FINISHED update and acknowledge it. The
// TASK_KILLED update should be dropped by the status update
// manager, as the stream is already terminated.
Future<TaskStatus> update;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&update));
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
// Ensure the scheduler receives TASK_FINISHED.
AWAIT_READY(update);
EXPECT_EQ(TASK_FINISHED, update.get().state());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
Clock::resume();
driver.stop();
driver.join();
Shutdown();
}
// This test verifies that the slave and status update manager
// properly handle duplicate terminal status updates, when the
// second update is received after the ACK for the first update.
// The proper behavior here is for the status update manager to
// forward the duplicate update to the scheduler.
TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateAfterAck)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
slave::Flags flags = CreateSlaveFlags();
flags.checkpoint = true;
Try<PID<Slave> > slave = StartSlave(&exec, flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true); // Enable checkpointing.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
FrameworkID frameworkId;
EXPECT_CALL(sched, registered(_, _, _))
.WillOnce(SaveArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
ExecutorDriver* execDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(SaveArg<0>(&execDriver));
// Send a terminal update right away.
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
Future<Nothing> _statusUpdateAcknowledgement =
FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
AWAIT_READY(status);
EXPECT_EQ(TASK_FINISHED, status.get().state());
AWAIT_READY(_statusUpdateAcknowledgement);
Future<TaskStatus> update;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&update));
Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
Clock::pause();
// Now send a TASK_KILLED update for the same task.
TaskStatus status2 = status.get();
status2.set_state(TASK_KILLED);
execDriver->sendStatusUpdate(status2);
// Ensure the scheduler receives TASK_KILLED.
AWAIT_READY(update);
EXPECT_EQ(TASK_KILLED, update.get().state());
// Ensure the slave properly handles the ACK.
// Clock::settle() ensures that the slave successfully
// executes Slave::_statusUpdateAcknowledgement().
AWAIT_READY(_statusUpdateAcknowledgement2);
Clock::settle();
Clock::resume();
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown();
}
// This test verifies that the slave and status update manager
// properly handle duplicate status updates, when the second
// update with the same UUID is received before the ACK for the
// first update. The proper behavior here is for the status update
// manager to drop the duplicate update.
TEST_F(StatusUpdateManagerTest, DuplicateUpdateBeforeAck)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
slave::Flags flags = CreateSlaveFlags();
flags.checkpoint = true;
Try<PID<Slave> > slave = StartSlave(&exec, flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true); // Enable checkpointing.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
FrameworkID frameworkId;
EXPECT_CALL(sched, registered(_, _, _))
.WillOnce(SaveArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
ExecutorDriver* execDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(SaveArg<0>(&execDriver));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
// Capture the first status update message.
Future<StatusUpdateMessage> statusUpdateMessage =
FUTURE_PROTOBUF(StatusUpdateMessage(), _, _);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
// Drop the first ACK from the scheduler to the slave.
Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get());
Clock::pause();
driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
AWAIT_READY(statusUpdateMessage);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
AWAIT_READY(statusUpdateAckMessage);
Future<Nothing> __statusUpdate =
FUTURE_DISPATCH(slave.get(), &Slave::__statusUpdate);
// Now resend the TASK_RUNNING update.
process::post(slave.get(), statusUpdateMessage.get());
// At this point the status update manager has handled
// the duplicate status update.
AWAIT_READY(__statusUpdate);
// After we advance the clock, the status update manager should
// retry the TASK_RUNING update and the scheduler should receive
// and acknowledge it.
Future<TaskStatus> update;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&update));
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
// Ensure the scheduler receives TASK_FINISHED.
AWAIT_READY(update);
EXPECT_EQ(TASK_RUNNING, update.get().state());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
Clock::resume();
driver.stop();
driver.join();
Shutdown();
}