blob: 2a158071d4baf10147952a89f444f54692eabaa8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "slave/gc.hpp"
#include <list>
#include <process/check.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/adaptor.hpp>
#include <stout/foreach.hpp>
#include <stout/lambda.hpp>
#include <stout/os/rmdir.hpp>
#include "logging/logging.hpp"
#ifdef __linux__
#include "linux/fs.hpp"
#endif
#include "slave/gc_process.hpp"
using namespace process;
using process::wait; // Necessary on some OS's to disambiguate.
using std::list;
using std::map;
using std::string;
using process::metrics::Counter;
namespace mesos {
namespace internal {
namespace slave {
GarbageCollectorProcess::Metrics::Metrics(GarbageCollectorProcess *gc)
: path_removals_succeeded("gc/path_removals_succeeded"),
path_removals_failed("gc/path_removals_failed"),
path_removals_pending("gc/path_removals_pending", [gc]() {
// Multimap size is defined to take constant time, which means it
// basically has to be tracked as a member variable, which means we
// can safely do concurrent reads while the map is being updated.
return static_cast<double>(gc->paths.size());
})
{
process::metrics::add(path_removals_succeeded);
process::metrics::add(path_removals_failed);
process::metrics::add(path_removals_pending);
}
GarbageCollectorProcess::Metrics::~Metrics()
{
process::metrics::remove(path_removals_succeeded);
process::metrics::remove(path_removals_failed);
// Wait for the metric to be removed to protect against asynchronous
// evaluation referencing a deleted object.
process::metrics::remove(path_removals_pending).await();
}
GarbageCollectorProcess::~GarbageCollectorProcess()
{
foreachvalue (const Owned<PathInfo>& info, paths) {
info->promise.discard();
}
}
Future<Nothing> GarbageCollectorProcess::schedule(
const Duration& d,
const string& path)
{
LOG(INFO) << "Scheduling '" << path << "' for gc " << d << " in the future";
// If there's an existing schedule for this path, we must remove
// it here in order to reschedule.
if (timeouts.contains(path)) {
return unschedule(path)
.then(defer(
self(),
&Self::schedule,
d,
path));
}
Timeout removalTime = Timeout::in(d);
timeouts[path] = removalTime;
Owned<PathInfo> info(new PathInfo(path));
paths.put(removalTime, info);
// If the timer is not yet initialized or the timeout is sooner than
// the currently active timer, update it.
if (timer.timeout().remaining() == Seconds(0) ||
removalTime < timer.timeout()) {
reset(); // Schedule the timer for next event.
}
return info->promise.future();
}
Future<bool> GarbageCollectorProcess::unschedule(const string& path)
{
LOG(INFO) << "Unscheduling '" << path << "' from gc";
if (!timeouts.contains(path)) {
return false;
}
Timeout timeout = timeouts[path]; // Make a copy, as we erase() below.
CHECK(paths.contains(timeout));
// Locate the path.
foreach (const Owned<PathInfo>& info, paths.get(timeout)) {
if (info->path == path) {
// If the path is currently undergoing removal, we cannot
// prevent path removal and wait for removal completion.
if (info->removing) {
// Return false to be consistent with the behavior when
// `unschedule` is called after the path is removed.
return info->promise.future()
.then([]() { return false; });
}
// Discard the promise.
info->promise.discard();
// Clean up the maps.
CHECK(paths.remove(timeout, info));
CHECK_EQ(timeouts.erase(info->path), 1u);
return true;
}
}
LOG(FATAL) << "Inconsistent state across 'paths' and 'timeouts'";
return false;
}
// Fires a message to self for the next event. This also cancels any
// existing timer.
void GarbageCollectorProcess::reset()
{
Clock::cancel(timer); // Cancel the existing timer, if any.
if (!paths.empty()) {
Timeout removalTime = (*paths.begin()).first; // Get the first entry.
timer = delay(removalTime.remaining(), self(), &Self::remove, removalTime);
} else {
timer = Timer(); // Reset the timer.
}
}
void GarbageCollectorProcess::remove(const Timeout& removalTime)
{
if (paths.count(removalTime) > 0) {
list<Owned<PathInfo>> infos;
foreach (const Owned<PathInfo>& info, paths.get(removalTime)) {
if (info->removing) {
VLOG(1) << "Skipping deletion of '" << info-> path
<< "' as it is already in progress";
continue;
}
infos.push_back(info);
// Set `removing` to signify that the path is being cleaned up.
info->removing = true;
}
Counter _succeeded = metrics.path_removals_succeeded;
Counter _failed = metrics.path_removals_failed;
const string _workDir = workDir;
auto rmdirs =
[_succeeded, _failed, _workDir, infos]() mutable -> Future<Nothing> {
// Make mutable copies of the counters to work around MESOS-7907.
Counter succeeded = _succeeded;
Counter failed = _failed;
#ifdef __linux__
// Clear any possible persistent volume mount points in `infos`. See
// MESOS-8830.
Try<fs::MountInfoTable> mountTable = fs::MountInfoTable::read();
if (mountTable.isError()) {
LOG(ERROR) << "Skipping any path deletion because of failure on read "
"MountInfoTable for agent process: "
<< mountTable.error();
foreach (const Owned<PathInfo>& info, infos) {
info->promise.fail(mountTable.error());
++failed;
}
return Failure(mountTable.error());
}
foreach (const fs::MountInfoTable::Entry& entry,
adaptor::reverse(mountTable->entries)) {
// Ignore mounts whose targets are not under `workDir`.
if (!strings::startsWith(
path::join(entry.target, ""),
path::join(_workDir, ""))) {
continue;
}
for (auto it = infos.begin(); it != infos.end(); ) {
const Owned<PathInfo>& info = *it;
// TODO(zhitao): Validate that both `info->path` and `workDir` are
// real paths.
if (strings::startsWith(
path::join(entry.target, ""), path::join(info->path, ""))) {
LOG(WARNING)
<< "Unmounting dangling mount point '" << entry.target
<< "' of persistent volume '" << entry.root
<< "' inside garbage collected path '" << info->path << "'";
Try<Nothing> unmount = fs::unmount(entry.target);
if (unmount.isError()) {
LOG(WARNING) << "Skipping deletion of '"
<< info->path << "' because unmount failed on '"
<< entry.target << "': " << unmount.error();
info->promise.fail(unmount.error());
++failed;
it = infos.erase(it);
continue;
} else {
break;
}
}
it++;
}
}
#endif // __linux__
foreach (const Owned<PathInfo>& info, infos) {
// Run the removal operation with 'continueOnError = true'.
// It's possible for tasks and isolators to lay down files
// that are not deletable by GC. In the face of such errors
// GC needs to free up disk space wherever it can because the
// disk space has already been re-offered to frameworks.
LOG(INFO) << "Deleting " << info->path;
Try<Nothing> rmdir = os::rmdir(info->path, true, true, true);
if (rmdir.isError()) {
// TODO(zhitao): Change return value type of `rmdir` to
// `Try<Nothing, ErrnoError>` and check error type instead.
if (rmdir.error() == ErrnoError(ENOENT).message) {
LOG(INFO) << "Skipped '" << info->path << "' which does not exist";
} else {
LOG(WARNING) << "Failed to delete '" << info->path << "': "
<< rmdir.error();
info->promise.fail(rmdir.error());
++failed;
}
} else {
LOG(INFO) << "Deleted '" << info->path << "'";
info->promise.set(rmdir.get());
++succeeded;
}
}
return Nothing();
};
// NOTE: All `rmdirs` calls are dispatched to one executor so that:
// 1. They do not block other dispatches (MESOS-6549).
// 2. They do not occupy all worker threads (MESOS-7964).
executor.execute(rmdirs)
.onAny(defer(self(), &Self::_remove, lambda::_1, infos));
} else {
// This occurs when either:
// 1. The path(s) has already been removed (e.g. by prune()).
// 2. All paths under the removal time were unscheduled.
LOG(INFO) << "Ignoring gc event at " << removalTime.remaining()
<< " as the paths were already removed, or were unscheduled";
reset();
}
}
void GarbageCollectorProcess::_remove(const Future<Nothing>& result,
const list<Owned<PathInfo>> infos)
{
// Remove path records from `paths` and `timeouts` data structures.
foreach (const Owned<PathInfo>& info, infos) {
CHECK(paths.remove(timeouts[info->path], info));
CHECK_EQ(timeouts.erase(info->path), 1u);
}
reset();
}
void GarbageCollectorProcess::prune(const Duration& d)
{
foreach (const Timeout& removalTime, paths.keys()) {
if (removalTime.remaining() <= d) {
LOG(INFO) << "Pruning directories with remaining removal time "
<< removalTime.remaining();
dispatch(self(), &GarbageCollectorProcess::remove, removalTime);
}
}
}
GarbageCollector::GarbageCollector(const string& workDir)
{
process = new GarbageCollectorProcess(workDir);
spawn(process);
}
GarbageCollector::~GarbageCollector()
{
terminate(process);
wait(process);
delete process;
}
Future<Nothing> GarbageCollector::schedule(
const Duration& d,
const string& path)
{
return dispatch(process, &GarbageCollectorProcess::schedule, d, path);
}
Future<bool> GarbageCollector::unschedule(const string& path)
{
return dispatch(process, &GarbageCollectorProcess::unschedule, path);
}
void GarbageCollector::prune(const Duration& d)
{
dispatch(process, &GarbageCollectorProcess::prune, d);
}
} // namespace slave {
} // namespace internal {
} // namespace mesos {