blob: 8766ffe25a22eaaa9c87ccd8f169ed3a15438141 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <unistd.h>
#include <list>
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
#include <process/check.hpp>
#include <process/clock.hpp>
#include <process/collect.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/latch.hpp>
#include <process/message.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <process/queue.hpp>
#include <process/subprocess.hpp>
#include <stout/json.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/try.hpp>
#include "master/flags.hpp"
#include "master/master.hpp"
#include "slave/constants.hpp"
#include "slave/gc.hpp"
#include "slave/flags.hpp"
#include "slave/paths.hpp"
#include "slave/slave.hpp"
#include "slave/containerizer/fetcher.hpp"
#include "tests/containerizer.hpp"
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
#include "tests/mock_fetcher.hpp"
#include "tests/utils.hpp"
using mesos::fetcher::FetcherInfo;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
using mesos::internal::slave::Containerizer;
using mesos::internal::slave::MesosContainerizer;
using mesos::internal::slave::MesosContainerizerProcess;
using mesos::internal::slave::Fetcher;
using mesos::internal::slave::FetcherProcess;
using mesos::master::detector::MasterDetector;
using process::TEST_AWAIT_TIMEOUT;
using process::Future;
using process::HttpEvent;
using process::Latch;
using process::Owned;
using process::PID;
using process::Process;
using process::Promise;
using process::Queue;
using process::Subprocess;
using std::cout;
using std::endl;
using std::list;
using std::string;
using std::vector;
using testing::_;
using testing::DoAll;
using testing::DoDefault;
using testing::Eq;
using testing::Invoke;
using testing::InvokeWithoutArgs;
using testing::Return;
namespace mesos {
namespace internal {
namespace tests {
static const string ASSETS_DIRECTORY_NAME = "mesos-fetcher-test-assets";
static const string COMMAND_NAME = "mesos-fetcher-test-cmd";
static const string ARCHIVE_NAME = "mesos-fetcher-test-archive.tgz";
static const string ARCHIVED_COMMAND_NAME = "mesos-fetcher-test-acmd";
// Every task executes one of these shell scripts, which create a
// file that includes the current task name in its name. The latter
// is expected to be passed in as a script argument. The existence
// of the file with that name is then used as proof that the task
// ran successfully.
static const string COMMAND_SCRIPT = "touch " + COMMAND_NAME + "$1";
static const string ARCHIVED_COMMAND_SCRIPT =
"touch " + ARCHIVED_COMMAND_NAME + "$1";
class FetcherCacheTest : public MesosTest
{
public:
// A helper struct that captures useful information for each of the
// tasks that we have launched to help test expectations.
struct Task
{
Path runDirectory;
Queue<TaskStatus> statusQueue;
};
void setupCommandFileAsset();
protected:
void setupArchiveAsset();
void SetUp() override;
void TearDown() override;
// Sets up the slave and starts it. Calling this late in the test
// instead of having it included in SetUp() gives us the opportunity
// to manipulate values in 'flags', first.
void startSlave();
// Stops the slave, deleting the containerizer, for subsequent
// recovery testing.
void stopSlave();
Try<Task> launchTask(const CommandInfo& commandInfo, size_t taskIndex);
Try<vector<Task>> launchTasks(const vector<CommandInfo>& commandInfos);
void verifyCacheMetrics();
// Promises whose futures indicate that FetcherProcess::_fetch() has been
// called for a task with a given index.
vector<Owned<Promise<Nothing>>> fetchContentionWaypoints;
string assetsDirectory;
string commandPath;
string archivePath;
Owned<cluster::Master> master;
Owned<cluster::Slave> slave;
slave::Flags flags;
SlaveID slaveId;
Owned<MasterDetector> detector;
Owned<MesosContainerizer> containerizer;
// NOTE: This is technically owned by the `fetcher`, but we violate
// this ownership in the tests.
MockFetcherProcess* fetcherProcess;
MockScheduler scheduler;
Owned<MesosSchedulerDriver> driver;
private:
Owned<Fetcher> fetcher;
FrameworkID frameworkId;
// If this test did not succeed as indicated by the above variable,
// the contents of these sandboxes will be dumped during tear down.
vector<Path> sandboxes;
};
void FetcherCacheTest::SetUp()
{
MesosTest::SetUp();
flags = CreateSlaveFlags();
flags.resources = "cpus:1000;mem:1000";
assetsDirectory = path::join(flags.work_dir, ASSETS_DIRECTORY_NAME);
ASSERT_SOME(os::mkdir(assetsDirectory));
setupCommandFileAsset();
setupArchiveAsset();
Try<Owned<cluster::Master>> _master = StartMaster();
ASSERT_SOME(_master);
master = _master.get();
FrameworkInfo frameworkInfo;
frameworkInfo.set_name("default");
frameworkInfo.set_checkpoint(true);
driver.reset(new MesosSchedulerDriver(
&scheduler, frameworkInfo, master->pid, DEFAULT_CREDENTIAL));
EXPECT_CALL(scheduler, registered(driver.get(), _, _));
// This installs a temporary reaction to resourceOffers calls, which
// must be in place BEFORE starting the scheduler driver. This
// "cover" is necessary, because we only add relevant mock actions
// in launchTask() and launchTasks() AFTER starting the driver.
EXPECT_CALL(scheduler, resourceOffers(driver.get(), _))
.WillRepeatedly(DeclineOffers());
}
// Dumps the contents of a text file to cout, assuming
// there are only text files.
static void logFile(const Path& path, const string& filename)
{
string filePath = path::join(path.string(), filename);
Try<string> text = os::read(filePath);
if (text.isSome()) {
cout << "Begin file contents of `" << filename << "`:" << endl;
cout << text.get() << endl;
cout << "End file" << endl;
} else {
cout << "File `" << filename << "` not readable: " << text.error() << endl;
}
}
// Dumps the contents of all files in the sandbox to cout, assuming
// there are only text files.
static void logSandbox(const Path& path)
{
Try<list<string>> entries = os::ls(path.string());
if (entries.isSome()) {
cout << "Begin listing sandbox `" << path.string() << "`:" << endl;
foreach (const string& entry, entries.get()) {
logFile(path, entry);
}
cout << "End sandbox" << endl;
} else {
cout << "Could not list sandbox `" << path.string()
<< "`: " << entries.error() << endl;
}
}
void FetcherCacheTest::verifyCacheMetrics()
{
JSON::Object metrics = Metrics();
ASSERT_EQ(
1u,
metrics.values.count("containerizer/fetcher/cache_size_total_bytes"));
// The total size is always given by the corresponding agent flag.
EXPECT_SOME_EQ(
flags.fetcher_cache_size.bytes(),
metrics.at<JSON::Number>("containerizer/fetcher/cache_size_total_bytes"));
Try<std::list<Path>> files = fetcherProcess->cacheFiles();
ASSERT_SOME(files);
Bytes used;
foreach (const auto& file, files.get()) {
Try<Bytes> size = os::stat::size(file);
ASSERT_SOME(size);
used += size.get();
}
ASSERT_EQ(
1u,
metrics.values.count("containerizer/fetcher/cache_size_used_bytes"));
// Verify that the used amount of cache is the total of the size of
// all the files in the cache.
EXPECT_SOME_EQ(
used.bytes(),
metrics.at<JSON::Number>("containerizer/fetcher/cache_size_used_bytes"));
}
void FetcherCacheTest::TearDown()
{
if (HasFatalFailure()) {
// A gtest macro has terminated the test prematurely. Now stream
// additional info that might help debug the situation to where
// gtest writes its output: cout.
cout << "Begin listing sandboxes" << endl;
foreach (const Path& path, sandboxes) {
logSandbox(path);
}
cout << "End sandboxes" << endl;
}
driver->stop();
driver->join();
master.reset();
slave.reset();
MesosTest::TearDown();
}
// TODO(bernd-mesos): Make this abstractions as generic and generally
// available for all testing as possible.
void FetcherCacheTest::startSlave()
{
fetcherProcess = new MockFetcherProcess(flags);
fetcher.reset(new Fetcher(Owned<FetcherProcess>(fetcherProcess)));
Try<MesosContainerizer*> create = MesosContainerizer::create(
flags, true, fetcher.get());
ASSERT_SOME(create);
containerizer.reset(create.get());
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
detector = master->createDetector();
Try<Owned<cluster::Slave>> _slave =
StartSlave(detector.get(), containerizer.get(), flags);
ASSERT_SOME(_slave);
slave = _slave.get();
AWAIT_READY(slaveRegisteredMessage);
slaveId = slaveRegisteredMessage->slave_id();
}
void FetcherCacheTest::setupCommandFileAsset()
{
commandPath = path::join(assetsDirectory, COMMAND_NAME);
ASSERT_SOME(os::write(commandPath, COMMAND_SCRIPT));
// Make the command file read-only, so we can discern the URI
// executable flag.
ASSERT_SOME(os::chmod(commandPath, S_IRUSR | S_IRGRP | S_IROTH));
}
void FetcherCacheTest::setupArchiveAsset()
{
string path = path::join(assetsDirectory, ARCHIVED_COMMAND_NAME);
ASSERT_SOME(os::write(path, ARCHIVED_COMMAND_SCRIPT));
// Make the archived command file executable before archiving it,
// since the executable flag for CommandInfo::URI has no effect on
// what comes out of an archive.
ASSERT_SOME(os::chmod(path, S_IRWXU | S_IRWXG | S_IRWXO));
const string cwd = os::getcwd();
ASSERT_SOME(os::chdir(assetsDirectory));
// Create an uncompressed archive (see MESOS-3579).
ASSERT_SOME(os::shell(
"tar cf '" + ARCHIVE_NAME + "' '" + ARCHIVED_COMMAND_NAME + "' 2>&1"));
ASSERT_SOME(os::chdir(cwd));
archivePath = path::join(assetsDirectory, ARCHIVE_NAME);
// Make the archive file read-only, so we can tell if it becomes
// executable by accident.
ASSERT_SOME(os::chmod(archivePath, S_IRUSR | S_IRGRP | S_IROTH));
}
static string taskName(int taskIndex)
{
return stringify(taskIndex);
}
// TODO(bernd-mesos): Use Path, not string, create Path::executable().
static bool isExecutable(const string& path)
{
Try<bool> access = os::access(path, X_OK);
EXPECT_SOME(access);
return access.isSome() && access.get();
}
// Create a future that indicates that the task observed by the given
// status queue is finished.
static Future<Nothing> awaitFinished(FetcherCacheTest::Task task)
{
return task.statusQueue.get()
.then([=](const TaskStatus& status) -> Future<Nothing> {
if (status.state() == TASK_FINISHED) {
return Nothing();
}
return awaitFinished(task);
});
}
// Create a future that indicates that all tasks are finished.
// TODO(bernd-mesos): Make this abstractions as generic and generally
// available for all testing as possible.
static Future<vector<Nothing>> awaitFinished(
vector<FetcherCacheTest::Task> tasks)
{
vector<Future<Nothing>> futures;
foreach (FetcherCacheTest::Task task, tasks) {
futures.push_back(awaitFinished(task));
}
return collect(futures);
}
// Pushes the TaskStatus value in mock call argument #1 into the
// given queue, which later on shall be queried by awaitFinished().
ACTION_P(PushTaskStatus, taskStatusQueue)
{
const TaskStatus& taskStatus = arg1;
// Input parameters of ACTION_P are const. We make a mutable copy
// so that we can use put().
Queue<TaskStatus> queue = taskStatusQueue;
queue.put(taskStatus);
}
// Launches a task as described by its CommandInfo and returns its sandbox
// run directory path. Its completion will be indicated by the result of
// awaitFinished(task), where `task` is the return value of this method.
// TODO(bernd-mesos): Make this abstraction as generic and generally
// available for all testing as possible.
Try<FetcherCacheTest::Task> FetcherCacheTest::launchTask(
const CommandInfo& commandInfo,
size_t taskIndex)
{
Future<vector<Offer>> offers;
EXPECT_CALL(scheduler, resourceOffers(driver.get(), _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(DeclineOffers());
offers.await(TEST_AWAIT_TIMEOUT);
if (!offers.isReady()) {
return Error("Failed to wait for resource offers: " +
(offers.isFailed() ? offers.failure() : "discarded"));
}
if (offers->empty()) {
return Error("Received empty list of offers");
}
const Offer offer = offers.get()[0];
TaskInfo task;
task.set_name(taskName(taskIndex));
task.mutable_task_id()->set_value(taskName(taskIndex));
task.mutable_slave_id()->CopyFrom(offer.slave_id());
// We don't care about resources in these tests. This small amount
// will always succeed.
task.mutable_resources()->CopyFrom(
Resources::parse("cpus:1;mem:1").get());
task.mutable_command()->CopyFrom(commandInfo);
// Since we are always using a command executor here, the executor
// ID can be determined by copying the task ID.
ExecutorID executorId;
executorId.set_value(task.task_id().value());
vector<TaskInfo> tasks;
tasks.push_back(task);
Queue<TaskStatus> taskStatusQueue;
EXPECT_CALL(scheduler, statusUpdate(driver.get(), _))
.WillRepeatedly(PushTaskStatus(taskStatusQueue));
driver->launchTasks(offer.id(), tasks);
const Path sandboxPath = Path(slave::paths::getExecutorLatestRunPath(
flags.work_dir,
slaveId,
offer.framework_id(),
executorId));
sandboxes.push_back(sandboxPath);
return Task{sandboxPath, taskStatusQueue};
}
// Pushes the task status value of a task status update callback
// into the task status queue that corresponds to the task index/ID
// for which the status update is being reported. 'tasks' must be a
// 'vector<Task>>', where every slot index corresponds to a task
// index/ID.
// TODO(bernd-mesos): Make this abstractions as generic and generally
// available for all testing as possible.
ACTION_TEMPLATE(PushIndexedTaskStatus,
HAS_1_TEMPLATE_PARAMS(int, k),
AND_1_VALUE_PARAMS(tasks))
{
const TaskStatus& taskStatus = ::std::get<k>(args);
Try<int> taskId = numify<int>(taskStatus.task_id().value());
ASSERT_SOME(taskId);
Queue<TaskStatus> queue = (tasks)[taskId.get()].statusQueue;
queue.put(taskStatus);
}
// Satisfies the first promise in the list that is not satisfied yet.
ACTION_P(SatisfyOne, promises)
{
foreach (const Owned<Promise<Nothing>>& promise, *promises) {
if (promise->future().isPending()) {
promise->set(Nothing());
return;
}
}
FAIL() << "Tried to call FetcherProcess::_fetch() "
<< "for more tasks than launched";
}
// Launches the tasks described by the given CommandInfo and returns a
// vector holding the run directory paths. All these tasks run
// concurrently. Their completion will be indicated by the result of
// awaitFinished(tasks), where `tasks` is the return value of this
// method.
// TODO(bernd-mesos): Make this abstraction as generic and generally
// available for all testing as possible.
Try<vector<FetcherCacheTest::Task>> FetcherCacheTest::launchTasks(
const vector<CommandInfo>& commandInfos)
{
vector<FetcherCacheTest::Task> result;
// When _fetch() is called, notify us by satisfying a promise that
// a task has passed the code stretch in which it competes for cache
// entries.
EXPECT_CALL(*fetcherProcess, _fetch(_, _, _, _, _))
.WillRepeatedly(
DoAll(SatisfyOne(&fetchContentionWaypoints),
Invoke(fetcherProcess, &MockFetcherProcess::unmocked__fetch)));
Future<vector<Offer>> offers;
EXPECT_CALL(scheduler, resourceOffers(driver.get(), _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(DeclineOffers());
offers.await(TEST_AWAIT_TIMEOUT);
if (!offers.isReady()) {
return Error("Failed to wait for resource offers: " +
(offers.isFailed() ? offers.failure() : "discarded"));
}
EXPECT_FALSE(offers->empty());
const Offer offer = offers.get()[0];
vector<TaskInfo> tasks;
foreach (const CommandInfo& commandInfo, commandInfos) {
size_t taskIndex = tasks.size();
// Grabbing the framework ID from somewhere. It should not matter
// if this happens several times, as we expect the framework ID to
// remain the same.
frameworkId = offer.framework_id();
TaskInfo task;
task.set_name(taskName(taskIndex));
task.mutable_task_id()->set_value(taskName(taskIndex));
task.mutable_slave_id()->CopyFrom(offer.slave_id());
// We don't care about resources in these tests. This small amount
// will always succeed.
task.mutable_resources()->CopyFrom(
Resources::parse("cpus:1;mem:1").get());
task.mutable_command()->CopyFrom(commandInfo);
tasks.push_back(task);
// Since we are always using a command executor here, the executor
// ID can be determined by copying the task ID.
ExecutorID executorId;
executorId.set_value(task.task_id().value());
Path sandboxPath = Path(slave::paths::getExecutorLatestRunPath(
flags.work_dir,
slaveId,
frameworkId,
executorId));
sandboxes.push_back(sandboxPath);
// Grabbing task status futures to wait for. We make a queue of futures
// for each task. We can then wait until the front element indicates
// status TASK_FINISHED. We use a queue, because we never know which
// status update will be the one we have been waiting for.
Queue<TaskStatus> taskStatusQueue;
result.push_back(Task {sandboxPath, taskStatusQueue});
auto waypoint = Owned<Promise<Nothing>>(new Promise<Nothing>());
fetchContentionWaypoints.push_back(waypoint);
}
EXPECT_CALL(scheduler, statusUpdate(driver.get(), _))
.WillRepeatedly(PushIndexedTaskStatus<1>(result));
driver->launchTasks(offer.id(), tasks);
return result;
}
// Tests fetching from the local asset directory without cache. This
// gives us a baseline for the following tests and lets us debug our
// test infrastructure without extra complications.
TEST_F(FetcherCacheTest, LocalUncached)
{
startSlave();
driver->start();
const int index = 0;
CommandInfo::URI uri;
uri.set_value(commandPath);
uri.set_executable(true);
CommandInfo commandInfo;
commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(index));
commandInfo.add_uris()->CopyFrom(uri);
const Try<Task> task = launchTask(commandInfo, index);
ASSERT_SOME(task);
AWAIT_READY(awaitFinished(task.get()));
EXPECT_EQ(0u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_TRUE(fetcherProcess->cacheFiles()->empty());
const string path = path::join(task->runDirectory.string(), COMMAND_NAME);
EXPECT_TRUE(isExecutable(path));
EXPECT_TRUE(os::exists(path + taskName(index)));
}
// Tests fetching from the local asset directory with simple caching.
// Only one download must occur. Fetching is serialized, to cover
// code areas without overlapping/concurrent fetch attempts.
TEST_F(FetcherCacheTest, LocalCached)
{
startSlave();
driver->start();
for (size_t i = 0; i < 2; i++) {
CommandInfo::URI uri;
uri.set_value(commandPath);
uri.set_executable(true);
uri.set_cache(true);
CommandInfo commandInfo;
commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
commandInfo.add_uris()->CopyFrom(uri);
const Try<Task> task = launchTask(commandInfo, i);
ASSERT_SOME(task);
AWAIT_READY(awaitFinished(task.get()));
const string path = path::join(task->runDirectory.string(), COMMAND_NAME);
EXPECT_TRUE(isExecutable(path));
EXPECT_TRUE(os::exists(path + taskName(i)));
EXPECT_EQ(1u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(1u, fetcherProcess->cacheFiles()->size());
verifyCacheMetrics();
}
}
// This test launches a task with enabled cache, then removes all cached files,
// then attempts to launch another task with the same URIs as the first task.
// We expect that the fetcher retries to download all the artifacts when cached
// files are missing.
TEST_F(FetcherCacheTest, LocalCachedMissing)
{
startSlave();
driver->start();
for (size_t i = 0; i < 2; i++) {
CommandInfo::URI uri;
uri.set_value(commandPath);
uri.set_executable(true);
uri.set_cache(true);
CommandInfo commandInfo;
commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
commandInfo.add_uris()->CopyFrom(uri);
const Try<Task> task = launchTask(commandInfo, i);
ASSERT_SOME(task);
AWAIT_READY(awaitFinished(task.get()));
const string path = path::join(task->runDirectory.string(), COMMAND_NAME);
EXPECT_TRUE(isExecutable(path));
EXPECT_TRUE(os::exists(path + taskName(i)));
EXPECT_EQ(1u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(1u, fetcherProcess->cacheFiles()->size());
verifyCacheMetrics();
EXPECT_SOME(os::rm(fetcherProcess->cacheFiles()->front()));
}
}
TEST_F(FetcherCacheTest, CachedCustomFilename)
{
startSlave();
driver->start();
const int index = 0;
const string customOutputFile = "my-command";
CommandInfo::URI uri;
uri.set_value(commandPath);
uri.set_executable(true);
uri.set_cache(true);
uri.set_output_file(customOutputFile);
CommandInfo commandInfo;
commandInfo.set_value("./" + customOutputFile + " " + taskName(index));
commandInfo.add_uris()->CopyFrom(uri);
const Try<Task> task = launchTask(commandInfo, index);
ASSERT_SOME(task);
AWAIT_READY(awaitFinished(task.get()));
EXPECT_EQ(1u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(1u, fetcherProcess->cacheFiles()->size());
verifyCacheMetrics();
// Verify that the downloaded executable lives at our custom output path.
const string executablePath = path::join(
task->runDirectory.string(), customOutputFile);
EXPECT_TRUE(isExecutable(executablePath));
// The script specified by COMMAND_SCRIPT just statically touches a file
// named $COMMAND_NAME + $1, so if we want to verify that it ran here we have
// to check this path in addition to the custom-named executable we saved.
const string outputPath = path::join(
task->runDirectory.string(), COMMAND_NAME);
EXPECT_TRUE(os::exists(outputPath + taskName(index)));
}
TEST_F(FetcherCacheTest, CachedCustomOutputFileWithSubdirectory)
{
startSlave();
driver->start();
const int index = 0;
const string customOutputFile = "subdir/my-command";
CommandInfo::URI uri;
uri.set_value(commandPath);
uri.set_executable(true);
uri.set_cache(true);
uri.set_output_file(customOutputFile);
CommandInfo commandInfo;
commandInfo.set_value("./" + customOutputFile + " " + taskName(index));
commandInfo.add_uris()->CopyFrom(uri);
const Try<Task> task = launchTask(commandInfo, index);
ASSERT_SOME(task);
AWAIT_READY(awaitFinished(task.get()));
EXPECT_EQ(1u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(1u, fetcherProcess->cacheFiles()->size());
verifyCacheMetrics();
// Verify that the downloaded executable lives at our custom output file
// path.
const string executablePath = path::join(
task->runDirectory.string(), customOutputFile);
EXPECT_TRUE(isExecutable(executablePath));
// The script specified by COMMAND_SCRIPT just statically touches a file
// named $COMMAND_NAME + $1, so if we want to verify that it ran here we have
// to check this path in addition to the custom-named executable we saved.
const string outputPath = path::join(
task->runDirectory.string(), COMMAND_NAME);
EXPECT_TRUE(os::exists(outputPath + taskName(index)));
}
// Tests falling back on bypassing the cache when fetching the download
// size of a URI that is supposed to be cached fails.
TEST_F(FetcherCacheTest, CachedFallback)
{
startSlave();
driver->start();
// Make sure the content-length request fails.
ASSERT_SOME(os::rm(commandPath));
CommandInfo::URI uri;
uri.set_value(commandPath);
uri.set_executable(true);
uri.set_cache(true);
CommandInfo commandInfo;
commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(0));
commandInfo.add_uris()->CopyFrom(uri);
// Bring back the asset just before running mesos-fetcher to fetch it.
Future<FetcherInfo> fetcherInfo;
EXPECT_CALL(*fetcherProcess, run(_, _, _, _))
.WillOnce(DoAll(FutureArg<3>(&fetcherInfo),
InvokeWithoutArgs(this,
&FetcherCacheTest::setupCommandFileAsset),
Invoke(fetcherProcess,
&MockFetcherProcess::unmocked_run)));
const Try<Task> task = launchTask(commandInfo, 0);
ASSERT_SOME(task);
AWAIT_READY(awaitFinished(task.get()));
const string path = path::join(task->runDirectory.string(), COMMAND_NAME);
EXPECT_TRUE(isExecutable(path));
EXPECT_TRUE(os::exists(path + taskName(0)));
AWAIT_READY(fetcherInfo);
ASSERT_EQ(1, fetcherInfo->items_size());
EXPECT_EQ(FetcherInfo::Item::BYPASS_CACHE,
fetcherInfo->items(0).action());
EXPECT_EQ(0u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_TRUE(fetcherProcess->cacheFiles()->empty());
verifyCacheMetrics();
}
// Tests archive extraction without caching as a baseline for the
// subsequent test below.
TEST_F(FetcherCacheTest, LocalUncachedExtract)
{
startSlave();
driver->start();
const int index = 0;
CommandInfo::URI uri;
uri.set_value(archivePath);
uri.set_extract(true);
CommandInfo commandInfo;
commandInfo.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(index));
commandInfo.add_uris()->CopyFrom(uri);
const Try<Task> task = launchTask(commandInfo, index);
ASSERT_SOME(task);
AWAIT_READY(awaitFinished(task.get()));
EXPECT_TRUE(os::exists(
path::join(task->runDirectory.string(), ARCHIVE_NAME)));
EXPECT_FALSE(isExecutable(
path::join(task->runDirectory.string(), ARCHIVE_NAME)));
const string path =
path::join(task->runDirectory.string(), ARCHIVED_COMMAND_NAME);
EXPECT_TRUE(isExecutable(path));
EXPECT_TRUE(os::exists(path + taskName(index)));
EXPECT_EQ(0u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_TRUE(fetcherProcess->cacheFiles()->empty());
verifyCacheMetrics();
}
// Tests archive extraction in combination with caching.
TEST_F(FetcherCacheTest, LocalCachedExtract)
{
startSlave();
driver->start();
for (size_t i = 0; i < 2; i++) {
CommandInfo::URI uri;
uri.set_value(archivePath);
uri.set_extract(true);
uri.set_cache(true);
CommandInfo commandInfo;
commandInfo.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(i));
commandInfo.add_uris()->CopyFrom(uri);
const Try<Task> task = launchTask(commandInfo, i);
ASSERT_SOME(task);
AWAIT_READY(awaitFinished(task.get()));
EXPECT_FALSE(os::exists(
path::join(task->runDirectory.string(), ARCHIVE_NAME)));
const string path =
path::join(task->runDirectory.string(), ARCHIVED_COMMAND_NAME);
EXPECT_TRUE(isExecutable(path));
EXPECT_TRUE(os::exists(path + taskName(i)));
EXPECT_EQ(1u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(1u, fetcherProcess->cacheFiles()->size());
verifyCacheMetrics();
}
}
class FetcherCacheHttpTest : public FetcherCacheTest
{
public:
// A minimal HTTP server (NOTE: not written as an actor, but this is
// deprecated, see below) just reusing what is already implemented
// somewhere to serve some HTTP requests for file downloads. Plus
// counting how many requests are made. Plus the ability to pause
// answering requests, stalling them.
//
// TODO(bernd-mesos): This class follows a dangerous style of mixing
// actors and non-actors, DO NOT REPLICATE. Ultimately we want to
// replace this with a generic HTTP server that can be used by other
// tests as well and enables things like pausing requests,
// manipulating requests, mocking, etc.
class HttpServer : public Process<HttpServer>
{
public:
public:
HttpServer(const string& _commandPath, const string& _archivePath)
: countRequests(0),
countCommandRequests(0),
countArchiveRequests(0),
commandPath(_commandPath),
archivePath(_archivePath)
{
CHECK(!_commandPath.empty());
CHECK(!_archivePath.empty());
}
void initialize() override
{
provide(COMMAND_NAME, commandPath);
provide(ARCHIVE_NAME, archivePath);
}
string url()
{
return "http://" + stringify(self().address) + "/" + self().id + "/";
}
// Stalls the execution of future HTTP requests inside consume().
void pause()
{
// If there is no latch or if the existing latch has already been
// triggered, create a new latch.
if (latch.get() == nullptr || latch->await(Duration::min())) {
latch.reset(new Latch());
}
}
void resume()
{
if (latch.get() != nullptr) {
latch->trigger();
}
}
void consume(HttpEvent&& event) override
{
if (latch.get() != nullptr) {
latch->await();
}
countRequests++;
if (strings::contains(event.request->url.path, COMMAND_NAME)) {
countCommandRequests++;
}
if (strings::contains(event.request->url.path, ARCHIVE_NAME)) {
countArchiveRequests++;
}
ProcessBase::consume(std::move(event));
}
void resetCounts()
{
countRequests = 0;
countCommandRequests = 0;
countArchiveRequests = 0;
}
size_t countRequests;
size_t countCommandRequests;
size_t countArchiveRequests;
private:
const string commandPath;
const string archivePath;
Owned<Latch> latch;
};
void SetUp() override
{
FetcherCacheTest::SetUp();
httpServer = new HttpServer(commandPath, archivePath);
spawn(httpServer);
}
void TearDown() override
{
terminate(httpServer);
wait(httpServer);
delete httpServer;
FetcherCacheTest::TearDown();
}
HttpServer* httpServer;
};
// Tests fetching via HTTP with caching. Only one download must
// occur. Fetching is serialized, to cover code areas without
// overlapping/concurrent fetch attempts.
TEST_F(FetcherCacheHttpTest, HttpCachedSerialized)
{
startSlave();
driver->start();
for (size_t i = 0; i < 3; i++) {
CommandInfo::URI uri;
uri.set_value(httpServer->url() + COMMAND_NAME);
uri.set_executable(true);
uri.set_cache(true);
CommandInfo commandInfo;
commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
commandInfo.add_uris()->CopyFrom(uri);
const Try<Task> task = launchTask(commandInfo, i);
ASSERT_SOME(task);
AWAIT_READY(awaitFinished(task.get()));
const string path =
path::join(task->runDirectory.string(), COMMAND_NAME);
EXPECT_TRUE(isExecutable(path));
EXPECT_TRUE(os::exists(path + taskName(i)));
EXPECT_EQ(1u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(1u, fetcherProcess->cacheFiles()->size());
verifyCacheMetrics();
// 2 requests: 1 for content-length, 1 for download.
EXPECT_EQ(2u, httpServer->countCommandRequests);
}
}
// Tests multiple concurrent fetching efforts that require some
// concurrency control. One task must "win" and perform the size
// and download request for the URI alone. The others must reuse
// the result.
TEST_F(FetcherCacheHttpTest, HttpCachedConcurrent)
{
startSlave();
driver->start();
// Causes fetch contention. No task can run yet until resume().
httpServer->pause();
vector<CommandInfo> commandInfos;
const size_t countTasks = 5;
for (size_t i = 0; i < countTasks; i++) {
CommandInfo::URI uri0;
uri0.set_value(httpServer->url() + COMMAND_NAME);
uri0.set_executable(true);
uri0.set_cache(true);
CommandInfo commandInfo;
commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
commandInfo.add_uris()->CopyFrom(uri0);
// Not always caching this URI causes that it will be downloaded
// some of the time. Thus we exercise code paths that eagerly fetch
// new assets while waiting for pending downloads of cached assets
// as well as code paths where no downloading occurs at all.
if (i % 2 == 1) {
CommandInfo::URI uri1;
uri1.set_value(httpServer->url() + ARCHIVE_NAME);
commandInfo.add_uris()->CopyFrom(uri1);
}
commandInfos.push_back(commandInfo);
}
Try<vector<Task>> tasks = launchTasks(commandInfos);
ASSERT_SOME(tasks);
ASSERT_EQ(countTasks, tasks->size());
// Having paused the HTTP server, ensure that FetcherProcess::_fetch()
// has been called for each task, which means that all tasks are competing
// for downloading the same URIs.
foreach (const Owned<Promise<Nothing>>& waypoint, fetchContentionWaypoints) {
AWAIT(waypoint->future());
}
// Now let the tasks run.
httpServer->resume();
AWAIT_READY(awaitFinished(tasks.get()));
EXPECT_EQ(1u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(1u, fetcherProcess->cacheFiles()->size());
verifyCacheMetrics();
// HTTP requests regarding the archive asset as follows. Archive
// "content-length" requests: 1, archive file downloads: 2.
EXPECT_EQ(2u, httpServer->countCommandRequests);
// HTTP requests regarding the command asset as follows. Command
// "content-length" requests: 0, command file downloads: 2.
EXPECT_EQ(2u, httpServer->countArchiveRequests);
for (size_t i = 0; i < countTasks; i++) {
EXPECT_EQ(i % 2 == 1, os::exists(
path::join(tasks->at(i).runDirectory.string(), ARCHIVE_NAME)));
EXPECT_TRUE(isExecutable(
path::join(tasks->at(i).runDirectory.string(), COMMAND_NAME)));
EXPECT_TRUE(os::exists(
path::join(tasks->at(i).runDirectory.string(),
COMMAND_NAME + taskName(i))));
}
}
// Tests using multiple URIs per command, variations of caching,
// setting the executable flag, and archive extraction.
TEST_F(FetcherCacheHttpTest, HttpMixed)
{
startSlave();
driver->start();
// Causes fetch contention. No task can run yet until resume().
httpServer->pause();
vector<CommandInfo> commandInfos;
// Task 0.
CommandInfo::URI uri00;
uri00.set_value(httpServer->url() + ARCHIVE_NAME);
uri00.set_cache(true);
uri00.set_extract(false);
uri00.set_executable(false);
CommandInfo::URI uri01;
uri01.set_value(httpServer->url() + COMMAND_NAME);
uri01.set_extract(false);
uri01.set_executable(true);
CommandInfo commandInfo0;
commandInfo0.set_value("./" + COMMAND_NAME + " " + taskName(0));
commandInfo0.add_uris()->CopyFrom(uri00);
commandInfo0.add_uris()->CopyFrom(uri01);
commandInfos.push_back(commandInfo0);
// Task 1.
CommandInfo::URI uri10;
uri10.set_value(httpServer->url() + ARCHIVE_NAME);
uri10.set_extract(true);
uri10.set_executable(false);
CommandInfo::URI uri11;
uri11.set_value(httpServer->url() + COMMAND_NAME);
uri11.set_extract(true);
uri11.set_executable(false);
CommandInfo commandInfo1;
commandInfo1.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(1));
commandInfo1.add_uris()->CopyFrom(uri10);
commandInfo1.add_uris()->CopyFrom(uri11);
commandInfos.push_back(commandInfo1);
// Task 2.
CommandInfo::URI uri20;
uri20.set_value(httpServer->url() + ARCHIVE_NAME);
uri20.set_cache(true);
uri20.set_extract(true);
uri20.set_executable(false);
CommandInfo::URI uri21;
uri21.set_value(httpServer->url() + COMMAND_NAME);
uri21.set_extract(false);
uri21.set_executable(false);
CommandInfo commandInfo2;
commandInfo2.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(2));
commandInfo2.add_uris()->CopyFrom(uri20);
commandInfo2.add_uris()->CopyFrom(uri21);
commandInfos.push_back(commandInfo2);
Try<vector<Task>> tasks = launchTasks(commandInfos);
ASSERT_SOME(tasks);
ASSERT_EQ(3u, tasks->size());
// Having paused the HTTP server, ensure that FetcherProcess::_fetch()
// has been called for each task, which means that all tasks are competing
// for downloading the same URIs.
foreach (const Owned<Promise<Nothing>>& waypoint, fetchContentionWaypoints) {
AWAIT(waypoint->future());
}
// Now let the tasks run.
httpServer->resume();
AWAIT_READY(awaitFinished(tasks.get()));
EXPECT_EQ(1u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(1u, fetcherProcess->cacheFiles()->size());
verifyCacheMetrics();
// HTTP requests regarding the command asset as follows. Command
// "content-length" requests: 0, command file downloads: 3.
EXPECT_EQ(3u, httpServer->countCommandRequests);
// HTTP requests regarding the archive asset as follows. Archive
// "content-length" requests: 1, archive file downloads: 2.
EXPECT_EQ(3u, httpServer->countArchiveRequests);
// Task 0.
EXPECT_FALSE(isExecutable(
path::join(tasks->at(0).runDirectory.string(), ARCHIVE_NAME)));
EXPECT_FALSE(os::exists(
path::join(tasks->at(0).runDirectory.string(), ARCHIVED_COMMAND_NAME)));
EXPECT_TRUE(isExecutable(
path::join(tasks->at(0).runDirectory.string(), COMMAND_NAME)));
EXPECT_TRUE(os::exists(
path::join(tasks->at(0).runDirectory.string(),
COMMAND_NAME + taskName(0))));
// Task 1.
EXPECT_FALSE(isExecutable(path::join(
tasks->at(1).runDirectory.string(),
ARCHIVE_NAME)));
EXPECT_TRUE(isExecutable(path::join(
tasks->at(1).runDirectory.string(),
ARCHIVED_COMMAND_NAME)));
EXPECT_TRUE(os::exists(path::join(
tasks->at(1).runDirectory.string(),
ARCHIVED_COMMAND_NAME + taskName(1))));
EXPECT_FALSE(isExecutable(path::join(
tasks->at(1).runDirectory.string(),
COMMAND_NAME)));
// Task 2.
EXPECT_FALSE(os::exists(path::join(
tasks->at(2).runDirectory.string(),
ARCHIVE_NAME)));
EXPECT_TRUE(isExecutable(path::join(
tasks->at(2).runDirectory.string(),
ARCHIVED_COMMAND_NAME)));
EXPECT_TRUE(os::exists(path::join(
tasks->at(2).runDirectory.string(),
ARCHIVED_COMMAND_NAME + taskName(2))));
EXPECT_FALSE(isExecutable(path::join(
tasks->at(2).runDirectory.string(),
COMMAND_NAME)));
}
// Tests slave recovery of the fetcher cache. The cache must be
// wiped clean on recovery, causing renewed downloads.
// TODO(bernd-mesos): Debug flaky behavior reported in MESOS-2871,
// then reenable this test.
TEST_F(FetcherCacheHttpTest, DISABLED_HttpCachedRecovery)
{
startSlave();
driver->start();
for (size_t i = 0; i < 3; i++) {
CommandInfo::URI uri;
uri.set_value(httpServer->url() + COMMAND_NAME);
uri.set_executable(true);
uri.set_cache(true);
CommandInfo commandInfo;
commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
commandInfo.add_uris()->CopyFrom(uri);
const Try<Task> task = launchTask(commandInfo, i);
ASSERT_SOME(task);
AWAIT_READY(awaitFinished(task.get()));
const string path = path::join(task->runDirectory.string(), COMMAND_NAME);
EXPECT_TRUE(isExecutable(path));
EXPECT_TRUE(os::exists(path + taskName(i)));
EXPECT_EQ(1u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(1u, fetcherProcess->cacheFiles()->size());
// content-length requests: 1
// downloads: 1
EXPECT_EQ(2u, httpServer->countCommandRequests);
}
// Stop and destroy the current slave.
slave->terminate();
// Start over.
httpServer->resetCounts();
// Don't reuse the old fetcher, which has stale state after
// stopping the slave.
Fetcher fetcher2(flags);
Try<MesosContainerizer*> _containerizer =
MesosContainerizer::create(flags, true, &fetcher2);
ASSERT_SOME(_containerizer);
containerizer.reset(_containerizer.get());
// Set up so we can wait until the new slave updates the container's
// resources (this occurs after the executor has reregistered).
Future<Nothing> update =
FUTURE_DISPATCH(_, &MesosContainerizerProcess::update);
Try<Owned<cluster::Slave>> _slave =
StartSlave(detector.get(), containerizer.get(), flags);
ASSERT_SOME(_slave);
slave = _slave.get();
// Wait until the containerizer is updated.
AWAIT_READY(update);
// Repeat of the above to see if it works the same.
for (size_t i = 0; i < 3; i++) {
CommandInfo::URI uri;
uri.set_value(httpServer->url() + COMMAND_NAME);
uri.set_executable(true);
uri.set_cache(true);
CommandInfo commandInfo;
commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
commandInfo.add_uris()->CopyFrom(uri);
const Try<Task> task = launchTask(commandInfo, i);
ASSERT_SOME(task);
AWAIT_READY(awaitFinished(task.get()));
const string path =
path::join(task->runDirectory.string(), COMMAND_NAME);
EXPECT_TRUE(isExecutable(path));
EXPECT_TRUE(os::exists(path + taskName(i)));
EXPECT_EQ(1u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(1u, fetcherProcess->cacheFiles()->size());
verifyCacheMetrics();
// content-length requests: 1
// downloads: 1
EXPECT_EQ(2u, httpServer->countCommandRequests);
}
}
// Tests cache eviction. Limits the available cache space then fetches
// more task scripts than fit into the cache and runs them all. We
// observe how the number of cache files rises and then stays constant.
TEST_F(FetcherCacheTest, SimpleEviction)
{
const size_t countCacheEntries = 2;
// Let only the first 'countCacheEntries' downloads fit in the cache.
flags.fetcher_cache_size = COMMAND_SCRIPT.size() * countCacheEntries;
startSlave();
driver->start();
for (size_t i = 0; i < countCacheEntries + 2; i++) {
string commandFilename = "cmd" + stringify(i);
string command = commandFilename + " " + taskName(i);
commandPath = path::join(assetsDirectory, commandFilename);
ASSERT_SOME(os::write(commandPath, COMMAND_SCRIPT));
CommandInfo::URI uri;
uri.set_value(commandPath);
uri.set_executable(true);
uri.set_cache(true);
CommandInfo commandInfo;
commandInfo.set_value("./" + command);
commandInfo.add_uris()->CopyFrom(uri);
const Try<Task> task = launchTask(commandInfo, i);
ASSERT_SOME(task);
AWAIT_READY(awaitFinished(task.get()));
// Check that the task succeeded.
EXPECT_TRUE(isExecutable(
path::join(task->runDirectory.string(), commandFilename)));
EXPECT_TRUE(os::exists(
path::join(task->runDirectory.string(), COMMAND_NAME + taskName(i))));
if (i < countCacheEntries) {
EXPECT_EQ(i + 1, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(i+1u, fetcherProcess->cacheFiles()->size());
} else {
EXPECT_EQ(countCacheEntries, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(countCacheEntries,
fetcherProcess->cacheFiles()->size());
}
}
verifyCacheMetrics();
}
// Tests cache eviction fallback to bypassing the cache. A first task
// runs normally. Then a second succeeds using eviction. Then a third
// task fails to evict, but still gets executed bypassing the cache.
TEST_F(FetcherCacheTest, FallbackFromEviction)
{
// The size by which every task's URI download is going to be larger
// than the previous one.
const size_t growth = 10;
// Let only the first two downloads fit into the cache, one at a time,
// the second evicting the first. The third file won't fit any more,
// being larger than the entire cache.
flags.fetcher_cache_size = COMMAND_SCRIPT.size() + growth;
startSlave();
driver->start();
// We'll run 3 tasks and these are the task completion futures to wait
// for each time.
Future<FetcherInfo> fetcherInfo0;
Future<FetcherInfo> fetcherInfo1;
Future<FetcherInfo> fetcherInfo2;
EXPECT_CALL(*fetcherProcess, run(_, _, _, _))
.WillOnce(DoAll(FutureArg<3>(&fetcherInfo0),
Invoke(fetcherProcess,
&MockFetcherProcess::unmocked_run)))
.WillOnce(DoAll(FutureArg<3>(&fetcherInfo1),
Invoke(fetcherProcess,
&MockFetcherProcess::unmocked_run)))
.WillOnce(DoAll(FutureArg<3>(&fetcherInfo2),
Invoke(fetcherProcess,
&MockFetcherProcess::unmocked_run)));
// Task 0:
const string commandFilename0 = "cmd0";
const string command0 = commandFilename0 + " " + taskName(0);
commandPath = path::join(assetsDirectory, commandFilename0);
// Write the command into the script that gets fetched.
ASSERT_SOME(os::write(commandPath, COMMAND_SCRIPT));
CommandInfo::URI uri0;
uri0.set_value(commandPath);
uri0.set_executable(true);
uri0.set_cache(true);
CommandInfo commandInfo0;
commandInfo0.set_value("./" + command0);
commandInfo0.add_uris()->CopyFrom(uri0);
const Try<Task> task0 = launchTask(commandInfo0, 0);
ASSERT_SOME(task0) << task0.error();
AWAIT_READY(awaitFinished(task0.get()));
// Check that the task succeeded.
EXPECT_TRUE(isExecutable(
path::join(task0->runDirectory.string(), commandFilename0)));
EXPECT_TRUE(os::exists(
path::join(task0->runDirectory.string(), COMMAND_NAME + taskName(0))));
AWAIT_READY(fetcherInfo0);
ASSERT_EQ(1, fetcherInfo0->items_size());
EXPECT_EQ(FetcherInfo::Item::DOWNLOAD_AND_CACHE,
fetcherInfo0->items(0).action());
// We have put a file of size 'COMMAND_SCRIPT.size()' in the cache
// with space 'COMMAND_SCRIPT.size() + growth'. So we must have 'growth'
// space left.
ASSERT_EQ(Bytes(growth), fetcherProcess->availableCacheSpace());
EXPECT_EQ(1u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(1u, fetcherProcess->cacheFiles()->size());
verifyCacheMetrics();
// Task 1:
const string commandFilename1 = "cmd1";
const string command1 = commandFilename1 + " " + taskName(1);
commandPath = path::join(assetsDirectory, commandFilename1);
// Write the command into the script that gets fetched. Add 'growth'
// extra characters so the cache will fill up to the last byte.
ASSERT_SOME(os::write(
commandPath,
COMMAND_SCRIPT + string(growth, '\n')));
CommandInfo::URI uri1;
uri1.set_value(commandPath);
uri1.set_executable(true);
uri1.set_cache(true);
CommandInfo commandInfo1;
commandInfo1.set_value("./" + command1);
commandInfo1.add_uris()->CopyFrom(uri1);
const Try<Task> task1 = launchTask(commandInfo1, 1);
ASSERT_SOME(task1) << task1.error();
AWAIT_READY(awaitFinished(task1.get()));
// Check that the task succeeded.
EXPECT_TRUE(isExecutable(
path::join(task1->runDirectory.string(), commandFilename1)));
EXPECT_TRUE(os::exists(
path::join(task1->runDirectory.string(), COMMAND_NAME + taskName(1))));
AWAIT_READY(fetcherInfo1);
ASSERT_EQ(1, fetcherInfo1->items_size());
EXPECT_EQ(FetcherInfo::Item::DOWNLOAD_AND_CACHE,
fetcherInfo1->items(0).action());
// The cache must now be full.
ASSERT_EQ(Bytes(0u), fetcherProcess->availableCacheSpace());
EXPECT_EQ(1u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(1u, fetcherProcess->cacheFiles()->size());
verifyCacheMetrics();
// Task 2:
const string commandFilename2 = "cmd2";
const string command2 = commandFilename2 + " " + taskName(2);
commandPath = path::join(assetsDirectory, commandFilename2);
// Write the command into the script that gets fetched. Add
// '2 * growth' now. Thus the file will be so big that it will not
// fit into the cache any more.
ASSERT_SOME(os::write(
commandPath,
COMMAND_SCRIPT + string(2 * growth, '\n')));
CommandInfo::URI uri2;
uri2.set_value(commandPath);
uri2.set_executable(true);
uri2.set_cache(true);
CommandInfo commandInfo2;
commandInfo2.set_value("./" + command2);
commandInfo2.add_uris()->CopyFrom(uri2);
const Try<Task> task2 = launchTask(commandInfo2, 2);
ASSERT_SOME(task2) << task2.error();
AWAIT_READY(awaitFinished(task2.get()));
// Check that the task succeeded.
EXPECT_TRUE(isExecutable(
path::join(task2->runDirectory.string(), commandFilename2)));
EXPECT_TRUE(os::exists(
path::join(task2->runDirectory.string(), COMMAND_NAME + taskName(2))));
AWAIT_READY(fetcherInfo2);
ASSERT_EQ(1, fetcherInfo2->items_size());
EXPECT_EQ(FetcherInfo::Item::BYPASS_CACHE,
fetcherInfo2->items(0).action());
EXPECT_EQ(1u, fetcherProcess->cacheSize());
ASSERT_SOME(fetcherProcess->cacheFiles());
EXPECT_EQ(1u, fetcherProcess->cacheFiles()->size());
verifyCacheMetrics();
}
// Tests LRU cache eviction strategy.
TEST_F(FetcherCacheTest, RemoveLRUCacheEntries)
{
// Let only two downloads fit in the cache.
flags.fetcher_cache_size = COMMAND_SCRIPT.size() * 2;
startSlave();
driver->start();
// Start commands using a pattern that will fill the cache with two entries
// and request the second entry again. The first entry is then the LRU.
// Adding a new entry should therefore evict the first entry.
vector<int> commandCreationPattern;
commandCreationPattern.push_back(0);
commandCreationPattern.push_back(1);
commandCreationPattern.push_back(1);
commandCreationPattern.push_back(2);
int taskIndex = 0;
// Fill up the cache
foreach (const int i, commandCreationPattern) {
string commandFilename = "cmd" + stringify(i);
string command = commandFilename + " " + taskName(taskIndex);
commandPath = path::join(assetsDirectory, commandFilename);
ASSERT_SOME(os::write(commandPath, COMMAND_SCRIPT));
CommandInfo::URI uri;
uri.set_value(commandPath);
uri.set_executable(true);
uri.set_cache(true);
CommandInfo commandInfo;
commandInfo.set_value("./" + command);
commandInfo.add_uris()->CopyFrom(uri);
const Try<Task> task = launchTask(commandInfo, taskIndex);
ASSERT_SOME(task);
AWAIT_READY(awaitFinished(task.get()));
// Check that the task succeeded.
EXPECT_TRUE(isExecutable(
path::join(task->runDirectory.string(), commandFilename)));
EXPECT_TRUE(os::exists(path::join(task->runDirectory.string(),
COMMAND_NAME + taskName(taskIndex))));
++taskIndex;
}
EXPECT_EQ(2u, fetcherProcess->cacheSize());
verifyCacheMetrics();
// FetcherProcess::cacheFiles returns all cache files that are in the cache
// directory. We expect cmd1 and cmd2 to be there, cmd0 should have been
// evicted.
Try<list<Path>> cacheFiles = fetcherProcess->cacheFiles();
ASSERT_SOME(cacheFiles);
bool cmd1Found = false;
bool cmd2Found = false;
foreach (const Path& cacheFile, cacheFiles.get()) {
if (strings::contains(cacheFile.basename(), "cmd1")) {
cmd1Found = true;
}
if (strings::contains(cacheFile.basename(), "cmd2")) {
cmd2Found = true;
}
}
EXPECT_TRUE(cmd1Found);
EXPECT_TRUE(cmd2Found);
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {