blob: 3cf5429b2d8a2fb956b4b47dde855734d775d72d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef __TASK_STATUS_UPDATE_MANAGER_HPP__
#define __TASK_STATUS_UPDATE_MANAGER_HPP__
#include <queue>
#include <string>
#include <mesos/mesos.hpp>
#include <process/future.hpp>
#include <process/pid.hpp>
#include <process/timeout.hpp>
#include <stout/hashset.hpp>
#include <stout/lambda.hpp>
#include <stout/option.hpp>
#include <stout/try.hpp>
#include <stout/uuid.hpp>
#include "messages/messages.hpp"
#include "slave/flags.hpp"
namespace mesos {
namespace internal {
namespace slave {
// Forward declarations.
namespace state {
struct SlaveState;
}
class TaskStatusUpdateManagerProcess;
struct TaskStatusUpdateStream;
// TaskStatusUpdateManager is responsible for
// 1) Reliably sending status updates to the master.
// 2) Checkpointing the update to disk (optional).
// 3) Sending ACKs to the executor (optional).
// 4) Receiving ACKs from the scheduler.
class TaskStatusUpdateManager
{
public:
TaskStatusUpdateManager(const Flags& flags);
virtual ~TaskStatusUpdateManager();
// Expects a callback 'forward' which gets called whenever there is
// a new status update that needs to be forwarded to the master.
void initialize(const lambda::function<void(StatusUpdate)>& forward);
// TODO(vinod): Come up with better names/signatures for the
// checkpointing and non-checkpointing 'update()' functions.
// Currently, it is not obvious that one version of 'update()'
// does checkpointing while the other doesn't.
// Checkpoints the status update and reliably sends the
// update to the master (and hence the scheduler).
// @return Whether the update is handled successfully
// (e.g. checkpointed).
process::Future<Nothing> update(
const StatusUpdate& update,
const SlaveID& slaveId,
const ExecutorID& executorId,
const ContainerID& containerId);
// Retries the update to the master (as long as the slave is
// alive), but does not checkpoint the update.
// @return Whether the update is handled successfully.
process::Future<Nothing> update(
const StatusUpdate& update,
const SlaveID& slaveId);
// Checkpoints the status update to disk if necessary.
// Also, sends the next pending status update, if any.
// @return True if the ACK is handled successfully (e.g., checkpointed)
// and the task's status update stream is not terminated.
// False same as above except the status update stream is terminated.
// Failed if there are any errors (e.g., duplicate, checkpointing).
process::Future<bool> acknowledgement(
const TaskID& taskId,
const FrameworkID& frameworkId,
const id::UUID& uuid);
// Recover status updates.
process::Future<Nothing> recover(
const std::string& rootDir,
const Option<state::SlaveState>& state);
// Pause sending updates.
// This is useful when the slave is disconnected because a
// disconnected slave will drop the updates.
void pause();
// Unpause and resend all the pending updates right away.
// This is useful when the updates were pending because there was
// no master elected (e.g., during recovery) or framework failed over.
void resume();
// Closes all the status update streams corresponding to this framework.
// NOTE: This stops retrying any pending status updates for this framework.
void cleanup(const FrameworkID& frameworkId);
private:
TaskStatusUpdateManagerProcess* process;
};
// TaskStatusUpdateStream handles the status updates and acknowledgements
// of a task, checkpointing them if necessary. It also holds the information
// about received, acknowledged and pending status updates.
// NOTE: A task is expected to have a globally unique ID across the lifetime
// of a framework. In other words the tuple (taskId, frameworkId) should be
// always unique.
struct TaskStatusUpdateStream
{
TaskStatusUpdateStream(const TaskID& _taskId,
const FrameworkID& _frameworkId,
const SlaveID& _slaveId,
const Flags& _flags,
bool _checkpoint,
const Option<ExecutorID>& executorId,
const Option<ContainerID>& containerId);
~TaskStatusUpdateStream();
// This function handles the update, checkpointing if necessary.
// @return True if the update is successfully handled.
// False if the update is a duplicate.
// Error Any errors (e.g., checkpointing).
Try<bool> update(const StatusUpdate& update);
// This function handles the ACK, checkpointing if necessary.
// @return True if the acknowledgement is successfully handled.
// False if the acknowledgement is a duplicate.
// Error Any errors (e.g., checkpointing).
Try<bool> acknowledgement(
const TaskID& taskId,
const FrameworkID& frameworkId,
const id::UUID& uuid,
const StatusUpdate& update);
// Returns the next update (or none, if empty) in the queue.
Result<StatusUpdate> next();
// Replays the stream by sequentially handling an update and its
// corresponding ACK, if present.
Try<Nothing> replay(
const std::vector<StatusUpdate>& updates,
const hashset<id::UUID>& acks);
// TODO(vinod): Explore semantics to make these private.
const bool checkpoint;
bool terminated;
Option<process::Timeout> timeout; // Timeout for resending status update.
std::queue<StatusUpdate> pending;
private:
// Handles the status update and writes it to disk, if necessary.
// TODO(vinod): The write has to be asynchronous to avoid status updates that
// are being checkpointed, blocking the processing of other updates.
// One solution is to wrap the protobuf::write inside async, but its probably
// too much of an overhead to spin up a new libprocess per status update?
// A better solution might be to be have async write capability for file io.
Try<Nothing> handle(
const StatusUpdate& update,
const StatusUpdateRecord::Type& type);
void _handle(
const StatusUpdate& update,
const StatusUpdateRecord::Type& type);
const TaskID taskId;
const FrameworkID frameworkId;
const SlaveID slaveId;
const Flags flags;
hashset<id::UUID> received;
hashset<id::UUID> acknowledged;
Option<std::string> path; // File path of the update stream.
Option<int_fd> fd; // File descriptor to the update stream.
Option<std::string> error; // Potential non-retryable error.
};
} // namespace slave {
} // namespace internal {
} // namespace mesos {
#endif // __TASK_STATUS_UPDATE_MANAGER_HPP__