blob: fad9391a3f852979cee353ad5848f663150e1ea9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "slave/task_status_update_manager.hpp"
#include <process/delay.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
#include <process/timer.hpp>
#include <stout/check.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/lambda.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
#include <stout/stringify.hpp>
#include <stout/utils.hpp>
#include <stout/uuid.hpp>
#include "common/protobuf_utils.hpp"
#include "logging/logging.hpp"
#include "slave/constants.hpp"
#include "slave/flags.hpp"
#include "slave/slave.hpp"
#include "slave/state.hpp"
using lambda::function;
using std::string;
using process::wait; // Necessary on some OS's to disambiguate.
using process::Failure;
using process::Future;
using process::PID;
using process::Timeout;
using process::UPID;
namespace mesos {
namespace internal {
namespace slave {
using state::SlaveState;
using state::FrameworkState;
using state::ExecutorState;
using state::RunState;
using state::TaskState;
class TaskStatusUpdateManagerProcess
: public ProtobufProcess<TaskStatusUpdateManagerProcess>
{
public:
TaskStatusUpdateManagerProcess(const Flags& flags);
~TaskStatusUpdateManagerProcess() override;
// Explicitly use 'initialize' since we're overloading below.
using process::ProcessBase::initialize;
// TaskStatusUpdateManager implementation.
void initialize(const function<void(StatusUpdate)>& forward);
Future<Nothing> update(
const StatusUpdate& update,
const SlaveID& slaveId,
const ExecutorID& executorId,
const ContainerID& containerId);
Future<Nothing> update(
const StatusUpdate& update,
const SlaveID& slaveId);
Future<bool> acknowledgement(
const TaskID& taskId,
const FrameworkID& frameworkId,
const id::UUID& uuid);
Future<Nothing> recover(
const string& rootDir,
const Option<SlaveState>& state);
void pause();
void resume();
void cleanup(const FrameworkID& frameworkId);
private:
// Helper function to handle update.
Future<Nothing> _update(
const StatusUpdate& update,
const SlaveID& slaveId,
bool checkpoint,
const Option<ExecutorID>& executorId,
const Option<ContainerID>& containerId);
// Status update timeout.
void timeout(const Duration& duration);
// Forwards the status update to the master and starts a timer based
// on the 'duration' to check for ACK from the scheduler.
// NOTE: This should only be used for those messages that expect an
// ACK (e.g updates from the executor).
Timeout forward(const StatusUpdate& update, const Duration& duration);
// Helper functions.
// Creates a new status update stream (opening the updates file, if path is
// present) and adds it to streams.
TaskStatusUpdateStream* createStatusUpdateStream(
const TaskID& taskId,
const FrameworkID& frameworkId,
const SlaveID& slaveId,
bool checkpoint,
const Option<ExecutorID>& executorId,
const Option<ContainerID>& containerId);
TaskStatusUpdateStream* getStatusUpdateStream(
const TaskID& taskId,
const FrameworkID& frameworkId);
void cleanupStatusUpdateStream(
const TaskID& taskId,
const FrameworkID& frameworkId);
const Flags flags;
bool paused;
function<void(StatusUpdate)> forward_;
hashmap<FrameworkID, hashmap<TaskID, TaskStatusUpdateStream*>> streams;
};
TaskStatusUpdateManagerProcess::TaskStatusUpdateManagerProcess(
const Flags& _flags)
: ProcessBase(process::ID::generate("task-status-update-manager")),
flags(_flags),
paused(false)
{
}
TaskStatusUpdateManagerProcess::~TaskStatusUpdateManagerProcess()
{
foreachkey (const FrameworkID& frameworkId, streams) {
foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
delete stream;
}
}
streams.clear();
}
void TaskStatusUpdateManagerProcess::initialize(
const function<void(StatusUpdate)>& forward)
{
forward_ = forward;
}
void TaskStatusUpdateManagerProcess::pause()
{
LOG(INFO) << "Pausing sending task status updates";
paused = true;
}
void TaskStatusUpdateManagerProcess::resume()
{
LOG(INFO) << "Resuming sending task status updates";
paused = false;
foreachkey (const FrameworkID& frameworkId, streams) {
foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
if (!stream->pending.empty()) {
const StatusUpdate& update = stream->pending.front();
LOG(WARNING) << "Resending task status update " << update;
stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
}
}
}
Future<Nothing> TaskStatusUpdateManagerProcess::recover(
const string& rootDir,
const Option<SlaveState>& state)
{
LOG(INFO) << "Recovering task status update manager";
if (state.isNone()) {
return Nothing();
}
foreachvalue (const FrameworkState& framework, state->frameworks) {
foreachvalue (const ExecutorState& executor, framework.executors) {
LOG(INFO) << "Recovering executor '" << executor.id
<< "' of framework " << framework.id;
if (executor.info.isNone()) {
LOG(WARNING) << "Skipping recovering task status updates of"
<< " executor '" << executor.id
<< "' of framework " << framework.id
<< " because its info cannot be recovered";
continue;
}
if (executor.latest.isNone()) {
LOG(WARNING) << "Skipping recovering task status updates of"
<< " executor '" << executor.id
<< "' of framework " << framework.id
<< " because its latest run cannot be recovered";
continue;
}
// We are only interested in the latest run of the executor!
const ContainerID& latest = executor.latest.get();
Option<RunState> run = executor.runs.get(latest);
CHECK_SOME(run);
if (run->completed) {
VLOG(1) << "Skipping recovering task status updates of"
<< " executor '" << executor.id
<< "' of framework " << framework.id
<< " because its latest run " << latest.value()
<< " is completed";
continue;
}
foreachvalue (const TaskState& task, run->tasks) {
// No updates were ever received for this task!
// This means either:
// 1) the executor never received this task or
// 2) executor launched it but the slave died before it got any updates.
if (task.updates.empty()) {
LOG(WARNING) << "No status updates found for task " << task.id
<< " of framework " << framework.id;
continue;
}
// Create a new status update stream.
TaskStatusUpdateStream* stream = createStatusUpdateStream(
task.id, framework.id, state->id, true, executor.id, latest);
// Replay the stream.
Try<Nothing> replay = stream->replay(task.updates, task.acks);
if (replay.isError()) {
return Failure(
"Failed to replay status updates for task " + stringify(task.id) +
" of framework " + stringify(framework.id) +
": " + replay.error());
}
// At the end of the replay, the stream is either terminated or
// contains only unacknowledged, if any, pending updates. The
// pending updates will be flushed after the slave
// reregisters with the master.
if (stream->terminated) {
cleanupStatusUpdateStream(task.id, framework.id);
}
}
}
}
return Nothing();
}
void TaskStatusUpdateManagerProcess::cleanup(const FrameworkID& frameworkId)
{
LOG(INFO) << "Closing task status update streams for framework "
<< frameworkId;
if (streams.contains(frameworkId)) {
foreachkey (const TaskID& taskId, utils::copy(streams[frameworkId])) {
cleanupStatusUpdateStream(taskId, frameworkId);
}
}
}
Future<Nothing> TaskStatusUpdateManagerProcess::update(
const StatusUpdate& update,
const SlaveID& slaveId,
const ExecutorID& executorId,
const ContainerID& containerId)
{
return _update(update, slaveId, true, executorId, containerId);
}
Future<Nothing> TaskStatusUpdateManagerProcess::update(
const StatusUpdate& update,
const SlaveID& slaveId)
{
return _update(update, slaveId, false, None(), None());
}
Future<Nothing> TaskStatusUpdateManagerProcess::_update(
const StatusUpdate& update,
const SlaveID& slaveId,
bool checkpoint,
const Option<ExecutorID>& executorId,
const Option<ContainerID>& containerId)
{
const TaskID& taskId = update.status().task_id();
const FrameworkID& frameworkId = update.framework_id();
LOG(INFO) << "Received task status update " << update;
// Write the status update to disk and enqueue it to send it to the master.
// Create/Get the status update stream for this task.
TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
if (stream == nullptr) {
stream = createStatusUpdateStream(
taskId, frameworkId, slaveId, checkpoint, executorId, containerId);
}
// Verify that we didn't get a non-checkpointable update for a
// stream that is checkpointable, and vice-versa.
if (stream->checkpoint != checkpoint) {
return Failure(
"Mismatched checkpoint value for task status update " +
stringify(update) + " (expected checkpoint=" +
stringify(stream->checkpoint) + " actual checkpoint=" +
stringify(checkpoint) + ")");
}
// Handle the status update.
Try<bool> result = stream->update(update);
if (result.isError()) {
return Failure(result.error());
}
// We don't return a failed future here so that the slave can re-ack
// the duplicate update.
if (!result.get()) {
return Nothing();
}
// Forward the status update to the master if this is the first in the stream.
// Subsequent status updates will get sent in 'acknowledgement()'.
if (!paused && stream->pending.size() == 1) {
CHECK_NONE(stream->timeout);
const Result<StatusUpdate>& next = stream->next();
if (next.isError()) {
return Failure(next.error());
}
CHECK_SOME(next);
stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
return Nothing();
}
Timeout TaskStatusUpdateManagerProcess::forward(
const StatusUpdate& update,
const Duration& duration)
{
CHECK(!paused);
VLOG(1) << "Forwarding task status update " << update << " to the agent";
// Forward the update.
forward_(update);
// Send a message to self to resend after some delay if no ACK is received.
return delay(duration,
self(),
&TaskStatusUpdateManagerProcess::timeout,
duration).timeout();
}
Future<bool> TaskStatusUpdateManagerProcess::acknowledgement(
const TaskID& taskId,
const FrameworkID& frameworkId,
const id::UUID& uuid)
{
LOG(INFO) << "Received task status update acknowledgement (UUID: " << uuid
<< ") for task " << taskId
<< " of framework " << frameworkId;
TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
// This might happen if we haven't completed recovery yet or if the
// acknowledgement is for a stream that has been cleaned up.
if (stream == nullptr) {
return Failure(
"Cannot find the task status update stream for task " +
stringify(taskId) + " of framework " + stringify(frameworkId));
}
// Get the corresponding update for this ACK.
const Result<StatusUpdate>& update = stream->next();
if (update.isError()) {
return Failure(update.error());
}
// This might happen if we retried a status update and got back
// acknowledgments for both the original and the retried update.
if (update.isNone()) {
return Failure(
"Unexpected task status update acknowledgment (UUID: " +
uuid.toString() + ") for task " + stringify(taskId) + " of framework " +
stringify(frameworkId));
}
// Handle the acknowledgement.
Try<bool> result =
stream->acknowledgement(taskId, frameworkId, uuid, update.get());
if (result.isError()) {
return Failure(result.error());
}
if (!result.get()) {
return Failure("Duplicate task status acknowledgement");
}
// Reset the timeout.
stream->timeout = None();
// Get the next update in the queue.
const Result<StatusUpdate>& next = stream->next();
if (next.isError()) {
return Failure(next.error());
}
bool terminated = stream->terminated;
if (terminated) {
if (next.isSome()) {
LOG(WARNING) << "Acknowledged a terminal"
<< " task status update " << update.get()
<< " but updates are still pending";
}
cleanupStatusUpdateStream(taskId, frameworkId);
} else if (!paused && next.isSome()) {
// Forward the next queued status update.
stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
return !terminated;
}
// TODO(vinod): There should be a limit on the retries.
void TaskStatusUpdateManagerProcess::timeout(const Duration& duration)
{
if (paused) {
return;
}
// Check and see if we should resend any status updates.
foreachkey (const FrameworkID& frameworkId, streams) {
foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
CHECK_NOTNULL(stream);
if (!stream->pending.empty()) {
CHECK_SOME(stream->timeout);
if (stream->timeout->expired()) {
const StatusUpdate& update = stream->pending.front();
LOG(WARNING) << "Resending task status update " << update;
// Bounded exponential backoff.
Duration duration_ =
std::min(duration * 2, STATUS_UPDATE_RETRY_INTERVAL_MAX);
stream->timeout = forward(update, duration_);
}
}
}
}
}
TaskStatusUpdateStream*
TaskStatusUpdateManagerProcess::createStatusUpdateStream(
const TaskID& taskId,
const FrameworkID& frameworkId,
const SlaveID& slaveId,
bool checkpoint,
const Option<ExecutorID>& executorId,
const Option<ContainerID>& containerId)
{
VLOG(1) << "Creating StatusUpdate stream for task " << taskId
<< " of framework " << frameworkId;
TaskStatusUpdateStream* stream = new TaskStatusUpdateStream(
taskId, frameworkId, slaveId, flags, checkpoint, executorId, containerId);
streams[frameworkId][taskId] = stream;
return stream;
}
TaskStatusUpdateStream* TaskStatusUpdateManagerProcess::getStatusUpdateStream(
const TaskID& taskId,
const FrameworkID& frameworkId)
{
if (!streams.contains(frameworkId)) {
return nullptr;
}
if (!streams[frameworkId].contains(taskId)) {
return nullptr;
}
return streams[frameworkId][taskId];
}
void TaskStatusUpdateManagerProcess::cleanupStatusUpdateStream(
const TaskID& taskId,
const FrameworkID& frameworkId)
{
VLOG(1) << "Cleaning up status update stream"
<< " for task " << taskId
<< " of framework " << frameworkId;
CHECK(streams.contains(frameworkId))
<< "Cannot find the task status update streams for framework "
<< frameworkId;
CHECK(streams[frameworkId].contains(taskId))
<< "Cannot find the status update streams for task " << taskId;
TaskStatusUpdateStream* stream = streams[frameworkId][taskId];
streams[frameworkId].erase(taskId);
if (streams[frameworkId].empty()) {
streams.erase(frameworkId);
}
delete stream;
}
TaskStatusUpdateManager::TaskStatusUpdateManager(const Flags& flags)
{
process = new TaskStatusUpdateManagerProcess(flags);
spawn(process);
}
TaskStatusUpdateManager::~TaskStatusUpdateManager()
{
terminate(process);
wait(process);
delete process;
}
void TaskStatusUpdateManager::initialize(
const function<void(StatusUpdate)>& forward)
{
dispatch(process, &TaskStatusUpdateManagerProcess::initialize, forward);
}
Future<Nothing> TaskStatusUpdateManager::update(
const StatusUpdate& update,
const SlaveID& slaveId,
const ExecutorID& executorId,
const ContainerID& containerId)
{
return dispatch(
process,
&TaskStatusUpdateManagerProcess::update,
update,
slaveId,
executorId,
containerId);
}
Future<Nothing> TaskStatusUpdateManager::update(
const StatusUpdate& update,
const SlaveID& slaveId)
{
return dispatch(
process,
&TaskStatusUpdateManagerProcess::update,
update,
slaveId);
}
Future<bool> TaskStatusUpdateManager::acknowledgement(
const TaskID& taskId,
const FrameworkID& frameworkId,
const id::UUID& uuid)
{
return dispatch(
process,
&TaskStatusUpdateManagerProcess::acknowledgement,
taskId,
frameworkId,
uuid);
}
Future<Nothing> TaskStatusUpdateManager::recover(
const string& rootDir,
const Option<SlaveState>& state)
{
return dispatch(
process, &TaskStatusUpdateManagerProcess::recover, rootDir, state);
}
void TaskStatusUpdateManager::pause()
{
dispatch(process, &TaskStatusUpdateManagerProcess::pause);
}
void TaskStatusUpdateManager::resume()
{
dispatch(process, &TaskStatusUpdateManagerProcess::resume);
}
void TaskStatusUpdateManager::cleanup(const FrameworkID& frameworkId)
{
dispatch(process, &TaskStatusUpdateManagerProcess::cleanup, frameworkId);
}
TaskStatusUpdateStream::TaskStatusUpdateStream(
const TaskID& _taskId,
const FrameworkID& _frameworkId,
const SlaveID& _slaveId,
const Flags& _flags,
bool _checkpoint,
const Option<ExecutorID>& executorId,
const Option<ContainerID>& containerId)
: checkpoint(_checkpoint),
terminated(false),
taskId(_taskId),
frameworkId(_frameworkId),
slaveId(_slaveId),
flags(_flags),
error(None())
{
if (checkpoint) {
CHECK_SOME(executorId);
CHECK_SOME(containerId);
path = paths::getTaskUpdatesPath(
paths::getMetaRootDir(flags.work_dir),
slaveId,
frameworkId,
executorId.get(),
containerId.get(),
taskId);
// Create the base updates directory, if it doesn't exist.
const string& dirName = Path(path.get()).dirname();
Try<Nothing> directory = os::mkdir(dirName);
if (directory.isError()) {
error = "Failed to create '" + dirName + "': " + directory.error();
return;
}
// Open the updates file.
// NOTE: We don't use `O_SYNC` here because we only read this file
// if the host did not crash. `os::write` success implies the kernel
// will have flushed our data to the page cache. This is sufficient
// for the recovery scenarios we use this data for.
Try<int_fd> result = os::open(
path.get(),
O_CREAT | O_WRONLY | O_APPEND | O_CLOEXEC,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (result.isError()) {
error = "Failed to open '" + path.get() +
"' for status updates: " + result.error();
return;
}
// We keep the file open through the lifetime of the task, because it
// makes it easy to append status update records to the file.
fd = result.get();
}
}
TaskStatusUpdateStream::~TaskStatusUpdateStream()
{
if (fd.isSome()) {
Try<Nothing> close = os::close(fd.get());
if (close.isError()) {
CHECK_SOME(path);
LOG(ERROR) << "Failed to close file '" << path.get() << "': "
<< close.error();
}
}
}
Try<bool> TaskStatusUpdateStream::update(const StatusUpdate& update)
{
if (error.isSome()) {
return Error(error.get());
}
if (!update.has_uuid()) {
return Error("Task status update is missing 'uuid'");
}
// Check that this status update has not already been acknowledged.
// This could happen in the rare case when the slave received the ACK
// from the framework, died, but slave's ACK to the executor never made it!
if (acknowledged.contains(id::UUID::fromBytes(update.uuid()).get())) {
LOG(WARNING) << "Ignoring task status update " << update
<< " that has already been acknowledged by the framework!";
return false;
}
// Check that this update hasn't already been received.
// This could happen if the slave receives a status update from an executor,
// then crashes after it writes it to disk but before it sends an ack.
if (received.contains(id::UUID::fromBytes(update.uuid()).get())) {
LOG(WARNING) << "Ignoring duplicate task status update " << update;
return false;
}
// Handle the update, checkpointing if necessary.
Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
if (result.isError()) {
return Error(result.error());
}
return true;
}
Try<bool> TaskStatusUpdateStream::acknowledgement(
const TaskID& taskId,
const FrameworkID& frameworkId,
const id::UUID& uuid,
const StatusUpdate& update)
{
if (error.isSome()) {
return Error(error.get());
}
if (acknowledged.contains(uuid)) {
LOG(WARNING) << "Duplicate task status update acknowledgment (UUID: "
<< uuid << ") for update " << update;
return false;
}
// This might happen if we retried a status update and got back
// acknowledgments for both the original and the retried update.
if (uuid != id::UUID::fromBytes(update.uuid()).get()) {
LOG(WARNING) << "Unexpected task status update acknowledgement (received "
<< uuid << ", expecting "
<< id::UUID::fromBytes(update.uuid()).get()
<< ") for update " << update;
return false;
}
// Handle the ACK, checkpointing if necessary.
Try<Nothing> result = handle(update, StatusUpdateRecord::ACK);
if (result.isError()) {
return Error(result.error());
}
return true;
}
Result<StatusUpdate> TaskStatusUpdateStream::next()
{
if (error.isSome()) {
return Error(error.get());
}
if (!pending.empty()) {
return pending.front();
}
return None();
}
Try<Nothing> TaskStatusUpdateStream::replay(
const std::vector<StatusUpdate>& updates,
const hashset<id::UUID>& acks)
{
if (error.isSome()) {
return Error(error.get());
}
VLOG(1) << "Replaying task status update stream for task " << taskId;
foreach (const StatusUpdate& update, updates) {
// Handle the update.
_handle(update, StatusUpdateRecord::UPDATE);
// Check if the update has an ACK too.
if (acks.contains(id::UUID::fromBytes(update.uuid()).get())) {
_handle(update, StatusUpdateRecord::ACK);
}
}
return Nothing();
}
Try<Nothing> TaskStatusUpdateStream::handle(
const StatusUpdate& update,
const StatusUpdateRecord::Type& type)
{
CHECK_NONE(error);
// Checkpoint the update if necessary.
if (checkpoint) {
LOG(INFO) << "Checkpointing " << type << " for task status update "
<< update;
CHECK_SOME(fd);
StatusUpdateRecord record;
record.set_type(type);
if (type == StatusUpdateRecord::UPDATE) {
record.mutable_update()->CopyFrom(update);
} else {
record.set_uuid(update.uuid());
}
Try<Nothing> write = ::protobuf::write(fd.get(), record);
if (write.isError()) {
error = "Failed to write task status update " + stringify(update) +
" to '" + path.get() + "': " + write.error();
return Error(error.get());
}
}
// Now actually handle the update.
_handle(update, type);
return Nothing();
}
void TaskStatusUpdateStream::_handle(
const StatusUpdate& update,
const StatusUpdateRecord::Type& type)
{
CHECK_NONE(error);
if (type == StatusUpdateRecord::UPDATE) {
// Record this update.
received.insert(id::UUID::fromBytes(update.uuid()).get());
// Add it to the pending updates queue.
pending.push(update);
} else {
// Record this ACK.
acknowledged.insert(id::UUID::fromBytes(update.uuid()).get());
// Remove the corresponding update from the pending queue.
pending.pop();
if (!terminated) {
terminated = protobuf::isTerminalState(update.status().state());
}
}
}
} // namespace slave {
} // namespace internal {
} // namespace mesos {