blob: 24e3882ad1969c20a64daf90e408618c310e9a81 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __STATUS_UPDATE_MANAGER_HPP__
#define __STATUS_UPDATE_MANAGER_HPP__
#include <ostream>
#include <queue>
#include <string>
#include <utility>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <process/timeout.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/none.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/protobuf.hpp>
#include <stout/stringify.hpp>
#include <stout/utils.hpp>
#include <stout/uuid.hpp>
#include "common/type_utils.hpp"
#include "logging/logging.hpp"
#include "messages/messages.hpp"
#include "slave/flags.hpp"
namespace mesos {
namespace internal {
namespace slave {
// Forward declarations.
namespace state {
struct SlaveState;
}
class StatusUpdateManagerProcess;
struct StatusUpdateStream;
// StatusUpdateManager is responsible for
// 1) Reliably sending status updates to the master.
// 2) Checkpointing the update to disk (optional).
// 3) Sending ACKs to the executor (optional).
// 4) Receiving ACKs from the scheduler.
class StatusUpdateManager
{
public:
StatusUpdateManager();
virtual ~StatusUpdateManager();
void initialize(
const Flags& flags,
const process::PID<Slave>& slave);
// TODO(vinod): Come up with better names/signatures for the
// checkpointing and non-checkpointing 'update()' functions.
// Currently, it is not obvious that one version of 'update()'
// does checkpointing while the other doesn't.
// Checkpoints the status update and reliably sends the
// update to the master (and hence the scheduler).
// @return Whether the update is handled successfully
// (e.g. checkpointed).
process::Future<Nothing> update(
const StatusUpdate& update,
const SlaveID& slaveId,
const ExecutorID& executorId,
const ContainerID& containerId);
// Retries the update to the master (as long as the slave is
// alive), but does not checkpoint the update.
// @return Whether the update is handled successfully.
process::Future<Nothing> update(
const StatusUpdate& update,
const SlaveID& slaveId);
// Checkpoints the status update to disk if necessary.
// Also, sends the next pending status update, if any.
// @return True if the ACK is handled successfully (e.g., checkpointed)
// and the task's status update stream is not terminated.
// False same as above except the status update stream is terminated.
// Failed if there are any errors (e.g., duplicate, checkpointing).
process::Future<bool> acknowledgement(
const TaskID& taskId,
const FrameworkID& frameworkId,
const UUID& uuid);
// Recover status updates.
process::Future<Nothing> recover(
const std::string& rootDir,
const Option<state::SlaveState>& state);
// TODO(vinod): Remove this hack once the new leader detector code is merged.
void newMasterDetected(const process::UPID& pid);
// Resend all the pending updates right away.
// This is useful when the updates were pending because there was
// no master elected (e.g., during recovery) or framework failed over.
void flush();
// Closes all the status update streams corresponding to this framework.
// NOTE: This stops retrying any pending status updates for this framework.
void cleanup(const FrameworkID& frameworkId);
private:
StatusUpdateManagerProcess* process;
};
// StatusUpdateStream handles the status updates and acknowledgements
// of a task, checkpointing them if necessary. It also holds the information
// about received, acknowledged and pending status updates.
// NOTE: A task is expected to have a globally unique ID across the lifetime
// of a framework. In other words the tuple (taskId, frameworkId) should be
// always unique.
struct StatusUpdateStream
{
StatusUpdateStream(const TaskID& _taskId,
const FrameworkID& _frameworkId,
const SlaveID& _slaveId,
const Flags& _flags,
bool _checkpoint,
const Option<ExecutorID>& executorId,
const Option<ContainerID>& containerId)
: checkpoint(_checkpoint),
terminated(false),
taskId(_taskId),
frameworkId(_frameworkId),
slaveId(_slaveId),
flags(_flags),
error(None())
{
if (checkpoint) {
CHECK_SOME(executorId);
CHECK_SOME(containerId);
path = paths::getTaskUpdatesPath(
paths::getMetaRootDir(flags.work_dir),
slaveId,
frameworkId,
executorId.get(),
containerId.get(),
taskId);
// Create the base updates directory, if it doesn't exist.
Try<Nothing> directory = os::mkdir(os::dirname(path.get()).get());
if (directory.isError()) {
error = "Failed to create " + os::dirname(path.get()).get();
return;
}
// Open the updates file.
Try<int> result = os::open(
path.get(),
O_CREAT | O_WRONLY | O_APPEND | O_SYNC,
S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
if (result.isError()) {
error = "Failed to open '" + path.get() + "' for status updates";
return;
}
// We keep the file open through the lifetime of the task, because it
// makes it easy to append status update records to the file.
fd = result.get();
}
}
~StatusUpdateStream()
{
if (fd.isSome()) {
Try<Nothing> close = os::close(fd.get());
if (close.isError()) {
CHECK_SOME(path);
LOG(ERROR) << "Failed to close file '" << path.get() << "': "
<< close.error();
}
}
}
// This function handles the update, checkpointing if necessary.
// @return True if the update is successfully handled.
// False if the update is a duplicate.
// Error Any errors (e.g., checkpointing).
Try<bool> update(const StatusUpdate& update)
{
if (error.isSome()) {
return Error(error.get());
}
// Check that this status update has not already been acknowledged.
// This could happen in the rare case when the slave received the ACK
// from the framework, died, but slave's ACK to the executor never made it!
if (acknowledged.contains(UUID::fromBytes(update.uuid()))) {
LOG(WARNING) << "Ignoring status update " << update
<< " that has already been acknowledged by the framework!";
return false;
}
// Check that this update hasn't already been received.
// This could happen if the slave receives a status update from an executor,
// then crashes after it writes it to disk but before it sends an ack.
if (received.contains(UUID::fromBytes(update.uuid()))) {
LOG(WARNING) << "Ignoring duplicate status update " << update;
return false;
}
// Handle the update, checkpointing if necessary.
Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
if (result.isError()) {
return Error(result.error());
}
return true;
}
// This function handles the ACK, checkpointing if necessary.
// @return True if the acknowledgement is successfully handled.
// False if the acknowledgement is a duplicate.
// Error Any errors (e.g., checkpointing).
Try<bool> acknowledgement(
const TaskID& taskId,
const FrameworkID& frameworkId,
const UUID& uuid,
const StatusUpdate& update)
{
if (error.isSome()) {
return Error(error.get());
}
if (acknowledged.contains(uuid)) {
LOG(WARNING) << "Duplicate status update acknowledgment (UUID: "
<< uuid << ") for update " << update;
return false;
}
// This might happen if we retried a status update and got back
// acknowledgments for both the original and the retried update.
if (uuid != UUID::fromBytes(update.uuid())) {
LOG(WARNING) << "Unexpected status update acknowledgement (received "
<< uuid << ", expecting " << UUID::fromBytes(update.uuid())
<< ") for update " << update;
return false;
}
// Handle the ACK, checkpointing if necessary.
Try<Nothing> result = handle(update, StatusUpdateRecord::ACK);
if (result.isError()) {
return Error(result.error());
}
return true;
}
// Returns the next update (or none, if empty) in the queue.
Result<StatusUpdate> next()
{
if (error.isSome()) {
return Error(error.get());
}
if (!pending.empty()) {
return pending.front();
}
return None();
}
// Replays the stream by sequentially handling an update and its
// corresponding ACK, if present.
Try<Nothing> replay(
const std::vector<StatusUpdate>& updates,
const hashset<UUID>& acks)
{
if (error.isSome()) {
return Error(error.get());
}
VLOG(1) << "Replaying status update stream for task " << taskId;
foreach (const StatusUpdate& update, updates) {
// Handle the update.
_handle(update, StatusUpdateRecord::UPDATE);
// Check if the update has an ACK too.
if (acks.contains(UUID::fromBytes(update.uuid()))) {
_handle(update, StatusUpdateRecord::ACK);
}
}
return Nothing();
}
// TODO(vinod): Explore semantics to make these private.
const bool checkpoint;
bool terminated;
Option<process::Timeout> timeout; // Timeout for resending status update.
std::queue<StatusUpdate> pending;
private:
// Handles the status update and writes it to disk, if necessary.
// TODO(vinod): The write has to be asynchronous to avoid status updates that
// are being checkpointed, blocking the processing of other updates.
// One solution is to wrap the protobuf::write inside async, but its probably
// too much of an overhead to spin up a new libprocess per status update?
// A better solution might be to be have async write capability for file io.
Try<Nothing> handle(
const StatusUpdate& update,
const StatusUpdateRecord::Type& type)
{
CHECK(error.isNone());
// Checkpoint the update if necessary.
if (checkpoint) {
LOG(INFO) << "Checkpointing " << type << " for status update " << update;
CHECK_SOME(fd);
StatusUpdateRecord record;
record.set_type(type);
if (type == StatusUpdateRecord::UPDATE) {
record.mutable_update()->CopyFrom(update);
} else {
record.set_uuid(update.uuid());
}
Try<Nothing> write = ::protobuf::write(fd.get(), record);
if (write.isError()) {
error = "Failed to write status update " + stringify(update) +
" to '" + path.get() + "': " + write.error();
return Error(error.get());
}
}
// Now actually handle the update.
_handle(update, type);
return Nothing();
}
void _handle(const StatusUpdate& update, const StatusUpdateRecord::Type& type)
{
CHECK(error.isNone());
if (type == StatusUpdateRecord::UPDATE) {
// Record this update.
received.insert(UUID::fromBytes(update.uuid()));
// Add it to the pending updates queue.
pending.push(update);
} else {
// Record this ACK.
acknowledged.insert(UUID::fromBytes(update.uuid()));
// Remove the corresponding update from the pending queue.
pending.pop();
if (!terminated) {
terminated = protobuf::isTerminalState(update.status().state());
}
}
}
const TaskID taskId;
const FrameworkID frameworkId;
const SlaveID slaveId;
const Flags flags;
hashset<UUID> received;
hashset<UUID> acknowledged;
Option<std::string> path; // File path of the update stream.
Option<int> fd; // File descriptor to the update stream.
Option<std::string> error; // Potential non-retryable error.
};
} // namespace slave {
} // namespace internal {
} // namespace mesos {
#endif // __STATUS_UPDATE_MANAGER_HPP__