| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <dlfcn.h> |
| #include <errno.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <unistd.h> |
| |
| #include <arpa/inet.h> |
| |
| #include <iostream> |
| #include <map> |
| #include <string> |
| #include <sstream> |
| |
| #include <mesos/mesos.hpp> |
| #include <mesos/scheduler.hpp> |
| |
| #include <process/defer.hpp> |
| #include <process/delay.hpp> |
| #include <process/future.hpp> |
| #include <process/id.hpp> |
| #include <process/dispatch.hpp> |
| #include <process/id.hpp> |
| #include <process/owned.hpp> |
| #include <process/pid.hpp> |
| #include <process/process.hpp> |
| #include <process/protobuf.hpp> |
| |
| #include <process/metrics/gauge.hpp> |
| #include <process/metrics/metrics.hpp> |
| |
| #include <stout/check.hpp> |
| #include <stout/duration.hpp> |
| #include <stout/error.hpp> |
| #include <stout/flags.hpp> |
| #include <stout/hashmap.hpp> |
| #include <stout/lambda.hpp> |
| #include <stout/option.hpp> |
| #include <stout/os.hpp> |
| #include <stout/stopwatch.hpp> |
| #include <stout/utils.hpp> |
| #include <stout/uuid.hpp> |
| |
| #include "sasl/authenticatee.hpp" |
| |
| #include "common/lock.hpp" |
| #include "common/type_utils.hpp" |
| |
| #include "master/detector.hpp" |
| |
| #include "local/local.hpp" |
| |
| #include "master/detector.hpp" |
| |
| #include "logging/flags.hpp" |
| #include "logging/logging.hpp" |
| |
| #include "messages/messages.hpp" |
| |
| using namespace mesos; |
| using namespace mesos::internal; |
| using namespace mesos::internal::master; |
| |
| using namespace process; |
| |
| using std::map; |
| using std::string; |
| using std::vector; |
| |
| using process::wait; // Necessary on some OS's to disambiguate. |
| |
| using utils::copy; |
| |
| namespace mesos { |
| namespace internal { |
| |
| // The scheduler process (below) is responsible for interacting with |
| // the master and responding to Mesos API calls from scheduler |
| // drivers. In order to allow a message to be sent back to the master |
| // we allow friend functions to invoke 'send', 'post', etc. Therefore, |
| // we must make sure that any necessary synchronization is performed. |
| |
| class SchedulerProcess : public ProtobufProcess<SchedulerProcess> |
| { |
| public: |
| SchedulerProcess(MesosSchedulerDriver* _driver, |
| Scheduler* _scheduler, |
| const FrameworkInfo& _framework, |
| const Option<Credential>& _credential, |
| const string& schedulerId, |
| MasterDetector* _detector, |
| pthread_mutex_t* _mutex, |
| pthread_cond_t* _cond) |
| // We use a UUID here to ensure that the master can reliably |
| // distinguish between scheduler runs. Otherwise the master may |
| // receive a delayed ExitedEvent enqueued behind a |
| // re-registration, and deactivate the framework incorrectly. |
| // TODO(bmahler): Investigate better ways to solve this problem. |
| // Check if bidirectional links in Erlang provides better |
| // semantics: |
| // http://www.erlang.org/doc/reference_manual/processes.html#id84804. |
| // Consider using unique PIDs throughout libprocess and relying |
| // on name registration to identify the process without the PID. |
| : ProcessBase(schedulerId), |
| metrics(*this), |
| driver(_driver), |
| scheduler(_scheduler), |
| framework(_framework), |
| mutex(_mutex), |
| cond(_cond), |
| failover(_framework.has_id() && !framework.id().value().empty()), |
| connected(false), |
| aborted(false), |
| detector(_detector), |
| credential(_credential), |
| authenticatee(NULL), |
| authenticating(None()), |
| authenticated(false), |
| reauthenticate(false) |
| { |
| LOG(INFO) << "Version: " << MESOS_VERSION; |
| } |
| |
| virtual ~SchedulerProcess() |
| { |
| delete authenticatee; |
| } |
| |
| protected: |
| virtual void initialize() |
| { |
| // TODO(benh): Get access to flags so that we can decide whether |
| // or not to make ZooKeeper verbose. |
| install<FrameworkRegisteredMessage>( |
| &SchedulerProcess::registered, |
| &FrameworkRegisteredMessage::framework_id, |
| &FrameworkRegisteredMessage::master_info); |
| |
| install<FrameworkReregisteredMessage>( |
| &SchedulerProcess::reregistered, |
| &FrameworkReregisteredMessage::framework_id, |
| &FrameworkReregisteredMessage::master_info); |
| |
| install<ResourceOffersMessage>( |
| &SchedulerProcess::resourceOffers, |
| &ResourceOffersMessage::offers, |
| &ResourceOffersMessage::pids); |
| |
| install<RescindResourceOfferMessage>( |
| &SchedulerProcess::rescindOffer, |
| &RescindResourceOfferMessage::offer_id); |
| |
| install<StatusUpdateMessage>( |
| &SchedulerProcess::statusUpdate, |
| &StatusUpdateMessage::update, |
| &StatusUpdateMessage::pid); |
| |
| install<LostSlaveMessage>( |
| &SchedulerProcess::lostSlave, |
| &LostSlaveMessage::slave_id); |
| |
| install<ExecutorToFrameworkMessage>( |
| &SchedulerProcess::frameworkMessage, |
| &ExecutorToFrameworkMessage::slave_id, |
| &ExecutorToFrameworkMessage::framework_id, |
| &ExecutorToFrameworkMessage::executor_id, |
| &ExecutorToFrameworkMessage::data); |
| |
| install<FrameworkErrorMessage>( |
| &SchedulerProcess::error, |
| &FrameworkErrorMessage::message); |
| |
| // Start detecting masters. |
| detector->detect() |
| .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1)); |
| } |
| |
| void detected(const Future<Option<MasterInfo> >& _master) |
| { |
| if (aborted) { |
| VLOG(1) << "Ignoring the master change because the driver is aborted!"; |
| return; |
| } |
| |
| CHECK(!_master.isDiscarded()); |
| |
| if (_master.isFailed()) { |
| EXIT(1) << "Failed to detect a master: " << _master.failure(); |
| } |
| |
| if (_master.get().isSome()) { |
| master = UPID(_master.get().get().pid()); |
| } else { |
| master = None(); |
| } |
| |
| if (connected) { |
| // There are three cases here: |
| // 1. The master failed. |
| // 2. The master failed over to a new master. |
| // 3. The master failed over to the same master. |
| // In any case, we will reconnect (possibly immediately), so we |
| // must notify schedulers of the disconnection. |
| Stopwatch stopwatch; |
| if (FLAGS_v >= 1) { |
| stopwatch.start(); |
| } |
| |
| scheduler->disconnected(driver); |
| |
| VLOG(1) << "Scheduler::disconnected took " << stopwatch.elapsed(); |
| } |
| |
| connected = false; |
| |
| if (master.isSome()) { |
| LOG(INFO) << "New master detected at " << master.get(); |
| link(master.get()); |
| |
| if (credential.isSome()) { |
| // Authenticate with the master. |
| authenticate(); |
| } else { |
| // Proceed with registration without authentication. |
| LOG(INFO) << "No credentials provided." |
| << " Attempting to register without authentication"; |
| |
| doReliableRegistration(); |
| } |
| } else { |
| // In this case, we don't actually invoke Scheduler::error |
| // since we might get reconnected to a master imminently. |
| LOG(INFO) << "No master detected"; |
| } |
| |
| // Keep detecting masters. |
| detector->detect(_master.get()) |
| .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1)); |
| } |
| |
| |
| void authenticate() |
| { |
| if (aborted) { |
| VLOG(1) << "Ignoring authenticate because the driver is aborted!"; |
| return; |
| } |
| |
| authenticated = false; |
| |
| if (master.isNone()) { |
| return; |
| } |
| |
| if (authenticating.isSome()) { |
| // Authentication is in progress. Try to cancel it. |
| // Note that it is possible that 'authenticating' is ready |
| // and the dispatch to '_authenticate' is enqueued when we |
| // are here, making the 'discard' here a no-op. This is ok |
| // because we set 'reauthenticate' here which enforces a retry |
| // in '_authenticate'. |
| copy(authenticating.get()).discard(); |
| reauthenticate = true; |
| return; |
| } |
| |
| LOG(INFO) << "Authenticating with master " << master.get(); |
| |
| CHECK_SOME(credential); |
| |
| CHECK(authenticatee == NULL); |
| authenticatee = new sasl::Authenticatee(credential.get(), self()); |
| |
| // NOTE: We do not pass 'Owned<Authenticatee>' here because doing |
| // so could make 'AuthenticateeProcess' responsible for deleting |
| // 'Authenticatee' causing a deadlock because the destructor of |
| // 'Authenticatee' waits on 'AuthenticateeProcess'. |
| // This will happen in the following scenario: |
| // --> 'AuthenticateeProcess' does a 'Future.set()'. |
| // --> '_authenticate()' is dispatched to this process. |
| // --> This process executes '_authenticatee()'. |
| // --> 'AuthenticateeProcess' removes the onAny callback |
| // from its queue which holds the last reference to |
| // 'Authenticatee'. |
| // --> '~Authenticatee()' is invoked by 'AuthenticateeProcess'. |
| // TODO(vinod): Consider using 'Shared' to 'Owned' upgrade. |
| authenticating = authenticatee->authenticate(master.get()) |
| .onAny(defer(self(), &Self::_authenticate)); |
| |
| delay(Seconds(5), |
| self(), |
| &Self::authenticationTimeout, |
| authenticating.get()); |
| } |
| |
| void _authenticate() |
| { |
| if (aborted) { |
| VLOG(1) << "Ignoring _authenticate because the driver is aborted!"; |
| return; |
| } |
| |
| delete CHECK_NOTNULL(authenticatee); |
| authenticatee = NULL; |
| |
| CHECK_SOME(authenticating); |
| const Future<bool>& future = authenticating.get(); |
| |
| if (master.isNone()) { |
| LOG(INFO) << "Ignoring _authenticate because the master is lost"; |
| authenticating = None(); |
| // Set it to false because we do not want further retries until |
| // a new master is detected. |
| // We obviously do not need to reauthenticate either even if |
| // 'reauthenticate' is currently true because the master is |
| // lost. |
| reauthenticate = false; |
| return; |
| } |
| |
| if (reauthenticate || !future.isReady()) { |
| LOG(INFO) |
| << "Failed to authenticate with master " << master.get() << ": " |
| << (reauthenticate ? "master changed" : |
| (future.isFailed() ? future.failure() : "future discarded")); |
| |
| authenticating = None(); |
| reauthenticate = false; |
| |
| // TODO(vinod): Add a limit on number of retries. |
| dispatch(self(), &Self::authenticate); // Retry. |
| return; |
| } |
| |
| if (!future.get()) { |
| LOG(ERROR) << "Master " << master.get() << " refused authentication"; |
| error("Master refused authentication"); |
| return; |
| } |
| |
| LOG(INFO) << "Successfully authenticated with master " << master.get(); |
| |
| authenticated = true; |
| authenticating = None(); |
| |
| doReliableRegistration(); // Proceed with registration. |
| } |
| |
| void authenticationTimeout(Future<bool> future) |
| { |
| if (aborted) { |
| VLOG(1) << "Ignoring authentication timeout because " |
| << "the driver is aborted!"; |
| return; |
| } |
| |
| // NOTE: Discarded future results in a retry in '_authenticate()'. |
| // Also note that a 'discard' here is safe even if another |
| // authenticator is in progress because this copy of the future |
| // corresponds to the original authenticator that started the timer. |
| if (future.discard()) { // This is a no-op if the future is already ready. |
| LOG(WARNING) << "Authentication timed out"; |
| } |
| } |
| |
| void registered( |
| const UPID& from, |
| const FrameworkID& frameworkId, |
| const MasterInfo& masterInfo) |
| { |
| if (aborted) { |
| VLOG(1) << "Ignoring framework registered message because " |
| << "the driver is aborted!"; |
| return; |
| } |
| |
| if (connected) { |
| VLOG(1) << "Ignoring framework registered message because " |
| << "the driver is already connected!"; |
| return; |
| } |
| |
| if (master != from) { |
| LOG(WARNING) |
| << "Ignoring framework registered message because it was sent " |
| << "from '" << from << "' instead of the leading master '" |
| << (master.isSome() ? master.get() : UPID()) << "'"; |
| return; |
| } |
| |
| LOG(INFO) << "Framework registered with " << frameworkId; |
| |
| framework.mutable_id()->MergeFrom(frameworkId); |
| |
| connected = true; |
| failover = false; |
| |
| Stopwatch stopwatch; |
| if (FLAGS_v >= 1) { |
| stopwatch.start(); |
| } |
| |
| scheduler->registered(driver, frameworkId, masterInfo); |
| |
| VLOG(1) << "Scheduler::registered took " << stopwatch.elapsed(); |
| } |
| |
| void reregistered( |
| const UPID& from, |
| const FrameworkID& frameworkId, |
| const MasterInfo& masterInfo) |
| { |
| if (aborted) { |
| VLOG(1) << "Ignoring framework re-registered message because " |
| << "the driver is aborted!"; |
| return; |
| } |
| |
| if (connected) { |
| VLOG(1) << "Ignoring framework re-registered message because " |
| << "the driver is already connected!"; |
| return; |
| } |
| |
| if (master != from) { |
| LOG(WARNING) |
| << "Ignoring framework re-registered message because it was sent " |
| << "from '" << from << "' instead of the leading master '" |
| << (master.isSome() ? master.get() : UPID()) << "'"; |
| return; |
| } |
| |
| LOG(INFO) << "Framework re-registered with " << frameworkId; |
| |
| CHECK(framework.id() == frameworkId); |
| |
| connected = true; |
| failover = false; |
| |
| Stopwatch stopwatch; |
| if (FLAGS_v >= 1) { |
| stopwatch.start(); |
| } |
| |
| scheduler->reregistered(driver, masterInfo); |
| |
| VLOG(1) << "Scheduler::reregistered took " << stopwatch.elapsed(); |
| } |
| |
| void doReliableRegistration() |
| { |
| if (connected || master.isNone()) { |
| return; |
| } |
| |
| if (credential.isSome() && !authenticated) { |
| return; |
| } |
| |
| VLOG(1) << "Sending registration request to " << master.get(); |
| |
| if (!framework.has_id() || framework.id() == "") { |
| // Touched for the very first time. |
| RegisterFrameworkMessage message; |
| message.mutable_framework()->MergeFrom(framework); |
| send(master.get(), message); |
| } else { |
| // Not the first time, or failing over. |
| ReregisterFrameworkMessage message; |
| message.mutable_framework()->MergeFrom(framework); |
| message.set_failover(failover); |
| send(master.get(), message); |
| } |
| |
| delay(Seconds(1), self(), &Self::doReliableRegistration); |
| } |
| |
| void resourceOffers( |
| const UPID& from, |
| const vector<Offer>& offers, |
| const vector<string>& pids) |
| { |
| if (aborted) { |
| VLOG(1) << "Ignoring resource offers message because " |
| << "the driver is aborted!"; |
| return; |
| } |
| |
| if (!connected) { |
| VLOG(1) << "Ignoring resource offers message because the driver is " |
| << "disconnected!"; |
| return; |
| } |
| |
| CHECK_SOME(master); |
| |
| if (from != master.get()) { |
| VLOG(1) << "Ignoring resource offers message because it was sent " |
| << "from '" << from << "' instead of the leading master '" |
| << master.get() << "'"; |
| return; |
| } |
| |
| VLOG(2) << "Received " << offers.size() << " offers"; |
| |
| CHECK(offers.size() == pids.size()); |
| |
| // Save the pid associated with each slave (one per offer) so |
| // later we can send framework messages directly. |
| for (size_t i = 0; i < offers.size(); i++) { |
| UPID pid(pids[i]); |
| // Check if parse failed (e.g., due to DNS). |
| if (pid != UPID()) { |
| VLOG(3) << "Saving PID '" << pids[i] << "'"; |
| savedOffers[offers[i].id()][offers[i].slave_id()] = pid; |
| } else { |
| VLOG(1) << "Failed to parse PID '" << pids[i] << "'"; |
| } |
| } |
| |
| Stopwatch stopwatch; |
| if (FLAGS_v >= 1) { |
| stopwatch.start(); |
| } |
| |
| scheduler->resourceOffers(driver, offers); |
| |
| VLOG(1) << "Scheduler::resourceOffers took " << stopwatch.elapsed(); |
| } |
| |
| void rescindOffer(const UPID& from, const OfferID& offerId) |
| { |
| if (aborted) { |
| VLOG(1) << "Ignoring rescind offer message because " |
| << "the driver is aborted!"; |
| return; |
| } |
| |
| if (!connected) { |
| VLOG(1) << "Ignoring rescind offer message because the driver is " |
| << "disconnected!"; |
| return; |
| } |
| |
| CHECK_SOME(master); |
| |
| if (from != master.get()) { |
| VLOG(1) << "Ignoring rescind offer message because it was sent " |
| << "from '" << from << "' instead of the leading master '" |
| << master.get() << "'"; |
| return; |
| } |
| |
| VLOG(1) << "Rescinded offer " << offerId; |
| |
| savedOffers.erase(offerId); |
| |
| Stopwatch stopwatch; |
| if (FLAGS_v >= 1) { |
| stopwatch.start(); |
| } |
| |
| scheduler->offerRescinded(driver, offerId); |
| |
| VLOG(1) << "Scheduler::offerRescinded took " << stopwatch.elapsed(); |
| } |
| |
| void statusUpdate( |
| const UPID& from, |
| const StatusUpdate& update, |
| const UPID& pid) |
| { |
| const TaskStatus& status = update.status(); |
| |
| if (aborted) { |
| VLOG(1) << "Ignoring task status update message because " |
| << "the driver is aborted!"; |
| return; |
| } |
| |
| // Allow status updates created from the driver itself. |
| if (from != UPID()) { |
| if (!connected) { |
| VLOG(1) << "Ignoring status update message because the driver is " |
| << "disconnected!"; |
| return; |
| } |
| |
| CHECK_SOME(master); |
| |
| if (from != master.get()) { |
| VLOG(1) << "Ignoring status update message because it was sent " |
| << "from '" << from << "' instead of the leading master '" |
| << master.get() << "'"; |
| return; |
| } |
| } |
| |
| VLOG(2) << "Received status update " << update << " from " << pid; |
| |
| CHECK(framework.id() == update.framework_id()); |
| |
| // TODO(benh): Note that this maybe a duplicate status update! |
| // Once we get support to try and have a more consistent view |
| // of what's running in the cluster, we'll just let this one |
| // slide. The alternative is possibly dealing with a scheduler |
| // failover and not correctly giving the scheduler it's status |
| // update, which seems worse than giving a status update |
| // multiple times (of course, if a scheduler re-uses a TaskID, |
| // that could be bad. |
| |
| Stopwatch stopwatch; |
| if (FLAGS_v >= 1) { |
| stopwatch.start(); |
| } |
| |
| scheduler->statusUpdate(driver, status); |
| |
| VLOG(1) << "Scheduler::statusUpdate took " << stopwatch.elapsed(); |
| |
| // Note that we need to look at the volatile 'aborted' here to |
| // so that we don't acknowledge the update if the driver was |
| // aborted during the processing of the update. |
| if (aborted) { |
| VLOG(1) << "Not sending status update acknowledgment message because " |
| << "the driver is aborted!"; |
| return; |
| } |
| |
| // Don't acknowledge updates created by the driver or master. |
| if (from != UPID() && pid != UPID()) { |
| // We drop updates while we're disconnected. |
| CHECK(connected); |
| CHECK_SOME(master); |
| |
| VLOG(2) << "Sending ACK for status update " << update |
| << " to " << master.get(); |
| |
| StatusUpdateAcknowledgementMessage message; |
| message.mutable_framework_id()->MergeFrom(framework.id()); |
| message.mutable_slave_id()->MergeFrom(update.slave_id()); |
| message.mutable_task_id()->MergeFrom(update.status().task_id()); |
| message.set_uuid(update.uuid()); |
| send(master.get(), message); |
| } |
| } |
| |
| void lostSlave(const UPID& from, const SlaveID& slaveId) |
| { |
| if (aborted) { |
| VLOG(1) << "Ignoring lost slave message because the driver is aborted!"; |
| return; |
| } |
| |
| if (!connected) { |
| VLOG(1) << "Ignoring lost slave message because the driver is " |
| << "disconnected!"; |
| return; |
| } |
| |
| CHECK_SOME(master); |
| |
| if (from != master.get()) { |
| VLOG(1) << "Ignoring lost slave message because it was sent " |
| << "from '" << from << "' instead of the leading master '" |
| << master.get() << "'"; |
| return; |
| } |
| |
| VLOG(1) << "Lost slave " << slaveId; |
| |
| savedSlavePids.erase(slaveId); |
| |
| Stopwatch stopwatch; |
| if (FLAGS_v >= 1) { |
| stopwatch.start(); |
| } |
| |
| scheduler->slaveLost(driver, slaveId); |
| |
| VLOG(1) << "Scheduler::slaveLost took " << stopwatch.elapsed(); |
| } |
| |
| void frameworkMessage(const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const ExecutorID& executorId, |
| const string& data) |
| { |
| if (aborted) { |
| VLOG(1) << "Ignoring framework message because the driver is aborted!"; |
| return; |
| } |
| |
| VLOG(2) << "Received framework message"; |
| |
| Stopwatch stopwatch; |
| if (FLAGS_v >= 1) { |
| stopwatch.start(); |
| } |
| |
| scheduler->frameworkMessage(driver, executorId, slaveId, data); |
| |
| VLOG(1) << "Scheduler::frameworkMessage took " << stopwatch.elapsed(); |
| } |
| |
| void error(const string& message) |
| { |
| if (aborted) { |
| VLOG(1) << "Ignoring error message because the driver is aborted!"; |
| return; |
| } |
| |
| LOG(INFO) << "Got error '" << message << "'"; |
| |
| driver->abort(); |
| |
| Stopwatch stopwatch; |
| if (FLAGS_v >= 1) { |
| stopwatch.start(); |
| } |
| |
| scheduler->error(driver, message); |
| |
| VLOG(1) << "Scheduler::error took " << stopwatch.elapsed(); |
| } |
| |
| void stop(bool failover) |
| { |
| LOG(INFO) << "Stopping framework '" << framework.id() << "'"; |
| |
| // Whether or not we send an unregister message, we want to |
| // terminate this process. |
| terminate(self()); |
| |
| if (connected && !failover) { |
| UnregisterFrameworkMessage message; |
| message.mutable_framework_id()->MergeFrom(framework.id()); |
| CHECK_SOME(master); |
| send(master.get(), message); |
| } |
| |
| Lock lock(mutex); |
| pthread_cond_signal(cond); |
| } |
| |
| // NOTE: This function informs the master to stop attempting to send |
| // messages to this scheduler. The abort flag stops any already |
| // enqueued messages or messages in flight from being handled. We |
| // don't want to terminate the process because one might do a |
| // MesosSchedulerDriver::stop later, which dispatches to |
| // SchedulerProcess::stop. |
| void abort() |
| { |
| LOG(INFO) << "Aborting framework '" << framework.id() << "'"; |
| |
| CHECK(aborted); |
| |
| if (!connected) { |
| VLOG(1) << "Not sending a deactivate message as master is disconnected"; |
| } else { |
| DeactivateFrameworkMessage message; |
| message.mutable_framework_id()->MergeFrom(framework.id()); |
| CHECK_SOME(master); |
| send(master.get(), message); |
| } |
| |
| Lock lock(mutex); |
| pthread_cond_signal(cond); |
| } |
| |
| void killTask(const TaskID& taskId) |
| { |
| if (!connected) { |
| VLOG(1) << "Ignoring kill task message as master is disconnected"; |
| return; |
| } |
| |
| KillTaskMessage message; |
| message.mutable_framework_id()->MergeFrom(framework.id()); |
| message.mutable_task_id()->MergeFrom(taskId); |
| CHECK_SOME(master); |
| send(master.get(), message); |
| } |
| |
| void requestResources(const vector<Request>& requests) |
| { |
| if (!connected) { |
| VLOG(1) << "Ignoring request resources message as master is disconnected"; |
| return; |
| } |
| |
| ResourceRequestMessage message; |
| message.mutable_framework_id()->MergeFrom(framework.id()); |
| foreach (const Request& request, requests) { |
| message.add_requests()->MergeFrom(request); |
| } |
| CHECK_SOME(master); |
| send(master.get(), message); |
| } |
| |
| void launchTasks(const vector<OfferID>& offerIds, |
| const vector<TaskInfo>& tasks, |
| const Filters& filters) |
| { |
| if (!connected) { |
| VLOG(1) << "Ignoring launch tasks message as master is disconnected"; |
| // NOTE: Reply to the framework with TASK_LOST messages for each |
| // task. This is a hack for now, to not let the scheduler |
| // believe the tasks are forever in PENDING state, when actually |
| // the master never received the launchTask message. Also, |
| // realize that this hack doesn't capture the case when the |
| // scheduler process sends it but the master never receives it |
| // (message lost, master failover etc). In the future, this |
| // should be solved by the replicated log and timeouts. |
| foreach (const TaskInfo& task, tasks) { |
| StatusUpdate update; |
| update.mutable_framework_id()->MergeFrom(framework.id()); |
| TaskStatus* status = update.mutable_status(); |
| status->mutable_task_id()->MergeFrom(task.task_id()); |
| status->set_state(TASK_LOST); |
| status->set_message("Master Disconnected"); |
| update.set_timestamp(Clock::now().secs()); |
| update.set_uuid(UUID::random().toBytes()); |
| |
| statusUpdate(UPID(), update, UPID()); |
| } |
| return; |
| } |
| |
| vector<TaskInfo> result; |
| |
| foreach (const TaskInfo& task, tasks) { |
| // Check that each TaskInfo has either an ExecutorInfo or a |
| // CommandInfo but not both. |
| if (task.has_executor() == task.has_command()) { |
| StatusUpdate update; |
| update.mutable_framework_id()->MergeFrom(framework.id()); |
| TaskStatus* status = update.mutable_status(); |
| status->mutable_task_id()->MergeFrom(task.task_id()); |
| status->set_state(TASK_LOST); |
| status->set_message( |
| "TaskInfo must have either an 'executor' or a 'command'"); |
| update.set_timestamp(Clock::now().secs()); |
| update.set_uuid(UUID::random().toBytes()); |
| |
| statusUpdate(UPID(), update, UPID()); |
| continue; |
| } |
| |
| // Ensure the ExecutorInfo.framework_id is valid, if present. |
| if (task.has_executor() && |
| task.executor().has_framework_id() && |
| !(task.executor().framework_id() == framework.id())) { |
| StatusUpdate update; |
| update.mutable_framework_id()->MergeFrom(framework.id()); |
| TaskStatus* status = update.mutable_status(); |
| status->mutable_task_id()->MergeFrom(task.task_id()); |
| status->set_state(TASK_LOST); |
| status->set_message( |
| "ExecutorInfo has an invalid FrameworkID (Actual: " + |
| stringify(task.executor().framework_id()) + " vs Expected: " + |
| stringify(framework.id()) + ")"); |
| update.set_timestamp(Clock::now().secs()); |
| update.set_uuid(UUID::random().toBytes()); |
| |
| statusUpdate(UPID(), update, UPID()); |
| continue; |
| } |
| |
| TaskInfo copy = task; |
| |
| // Set the ExecutorInfo.framework_id if missing. |
| if (task.has_executor() && !task.executor().has_framework_id()) { |
| copy.mutable_executor()->mutable_framework_id()->CopyFrom( |
| framework.id()); |
| } |
| |
| result.push_back(copy); |
| } |
| |
| LaunchTasksMessage message; |
| message.mutable_framework_id()->MergeFrom(framework.id()); |
| message.mutable_filters()->MergeFrom(filters); |
| |
| foreach (const OfferID& offerId, offerIds) { |
| message.add_offer_ids()->MergeFrom(offerId); |
| |
| foreach (const TaskInfo& task, result) { |
| // Keep only the slave PIDs where we run tasks so we can send |
| // framework messages directly. |
| if (savedOffers.contains(offerId)) { |
| if (savedOffers[offerId].count(task.slave_id()) > 0) { |
| savedSlavePids[task.slave_id()] = |
| savedOffers[offerId][task.slave_id()]; |
| } else { |
| LOG(WARNING) << "Attempting to launch task " << task.task_id() |
| << " with the wrong slave id " << task.slave_id(); |
| } |
| } else { |
| LOG(WARNING) << "Attempting to launch task " << task.task_id() |
| << " with an unknown offer " << offerId; |
| } |
| } |
| // Remove the offer since we saved all the PIDs we might use. |
| savedOffers.erase(offerId); |
| } |
| |
| foreach (const TaskInfo& task, result) { |
| message.add_tasks()->MergeFrom(task); |
| } |
| |
| CHECK_SOME(master); |
| send(master.get(), message); |
| } |
| |
| void reviveOffers() |
| { |
| if (!connected) { |
| VLOG(1) << "Ignoring revive offers message as master is disconnected"; |
| return; |
| } |
| |
| ReviveOffersMessage message; |
| message.mutable_framework_id()->MergeFrom(framework.id()); |
| CHECK_SOME(master); |
| send(master.get(), message); |
| } |
| |
| void sendFrameworkMessage(const ExecutorID& executorId, |
| const SlaveID& slaveId, |
| const string& data) |
| { |
| if (!connected) { |
| VLOG(1) << "Ignoring send framework message as master is disconnected"; |
| return; |
| } |
| |
| VLOG(2) << "Asked to send framework message to slave " |
| << slaveId; |
| |
| // TODO(benh): After a scheduler has re-registered it won't have |
| // any saved slave PIDs, maybe it makes sense to try and save each |
| // PID that this scheduler tries to send a message to? Or we can |
| // just wait for them to recollect as new offers come in and get |
| // accepted. |
| |
| if (savedSlavePids.count(slaveId) > 0) { |
| UPID slave = savedSlavePids[slaveId]; |
| CHECK(slave != UPID()); |
| |
| FrameworkToExecutorMessage message; |
| message.mutable_slave_id()->MergeFrom(slaveId); |
| message.mutable_framework_id()->MergeFrom(framework.id()); |
| message.mutable_executor_id()->MergeFrom(executorId); |
| message.set_data(data); |
| send(slave, message); |
| } else { |
| VLOG(1) << "Cannot send directly to slave " << slaveId |
| << "; sending through master"; |
| |
| FrameworkToExecutorMessage message; |
| message.mutable_slave_id()->MergeFrom(slaveId); |
| message.mutable_framework_id()->MergeFrom(framework.id()); |
| message.mutable_executor_id()->MergeFrom(executorId); |
| message.set_data(data); |
| CHECK_SOME(master); |
| send(master.get(), message); |
| } |
| } |
| |
| void reconcileTasks(const vector<TaskStatus>& statuses) |
| { |
| if (!connected) { |
| VLOG(1) << "Ignoring task reconciliation as master is disconnected"; |
| return; |
| } |
| |
| ReconcileTasksMessage message; |
| message.mutable_framework_id()->MergeFrom(framework.id()); |
| |
| foreach (const TaskStatus& status, statuses) { |
| message.add_statuses()->MergeFrom(status); |
| } |
| |
| CHECK_SOME(master); |
| send(master.get(), message); |
| } |
| |
| private: |
| friend class mesos::MesosSchedulerDriver; |
| |
| struct Metrics |
| { |
| Metrics(const SchedulerProcess& schedulerProcess) |
| : event_queue_messages( |
| "scheduler/event_queue_messages", |
| defer(schedulerProcess, &SchedulerProcess::_event_queue_messages)) |
| { |
| // TODO(dhamon): When we start checking the return value of 'add' we may |
| // get failures in situations where multiple SchedulerProcesses are active |
| // (ie, the fault tolerance tests). At that point we'll need MESOS-1285 to |
| // be fixed and to use self().id in the metric name. |
| process::metrics::add(event_queue_messages); |
| } |
| |
| ~Metrics() |
| { |
| process::metrics::remove(event_queue_messages); |
| } |
| |
| // Process metrics. |
| process::metrics::Gauge event_queue_messages; |
| } metrics; |
| |
| double _event_queue_messages() |
| { |
| return static_cast<double>(eventCount<MessageEvent>()); |
| } |
| |
| MesosSchedulerDriver* driver; |
| Scheduler* scheduler; |
| FrameworkInfo framework; |
| pthread_mutex_t* mutex; |
| pthread_cond_t* cond; |
| bool failover; |
| Option<UPID> master; |
| |
| bool connected; // Flag to indicate if framework is registered. |
| volatile bool aborted; // Flag to indicate if the driver is aborted. |
| |
| MasterDetector* detector; |
| |
| hashmap<OfferID, hashmap<SlaveID, UPID> > savedOffers; |
| hashmap<SlaveID, UPID> savedSlavePids; |
| |
| const Option<Credential> credential; |
| |
| sasl::Authenticatee* authenticatee; |
| |
| // Indicates if an authentication attempt is in progress. |
| Option<Future<bool> > authenticating; |
| |
| // Indicates if the authentication is successful. |
| bool authenticated; |
| |
| // Indicates if a new authentication attempt should be enforced. |
| bool reauthenticate; |
| }; |
| |
| } // namespace internal { |
| } // namespace mesos { |
| |
| |
| void MesosSchedulerDriver::initialize() { |
| GOOGLE_PROTOBUF_VERIFY_VERSION; |
| |
| // Load any flags from the environment (we use local::Flags in the |
| // event we run in 'local' mode, since it inherits logging::Flags). |
| // In the future, just as the TODO in local/main.cpp discusses, |
| // we'll probably want a way to load master::Flags and slave::Flags |
| // as well. |
| local::Flags flags; |
| |
| Try<Nothing> load = flags.load("MESOS_"); |
| |
| if (load.isError()) { |
| status = DRIVER_ABORTED; |
| scheduler->error(this, load.error()); |
| return; |
| } |
| |
| // Initialize libprocess. |
| process::initialize(schedulerId); |
| |
| // Initialize logging. |
| // TODO(benh): Replace whitespace in framework.name() with '_'? |
| if (flags.initialize_driver_logging) { |
| logging::initialize(framework.name(), flags); |
| } else { |
| VLOG(1) << "Disabling initialization of GLOG logging"; |
| } |
| |
| // Initialize mutex and condition variable. TODO(benh): Consider |
| // using a libprocess Latch rather than a pthread mutex and |
| // condition variable for signaling. |
| pthread_mutexattr_t attr; |
| pthread_mutexattr_init(&attr); |
| pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); |
| pthread_mutex_init(&mutex, &attr); |
| pthread_mutexattr_destroy(&attr); |
| pthread_cond_init(&cond, 0); |
| |
| // TODO(benh): Check the user the framework wants to run tasks as, |
| // see if the current user can switch to that user, or via an |
| // authentication module ensure this is acceptable. |
| |
| // See FrameWorkInfo in include/mesos/mesos.proto: |
| if (framework.user().empty()) { |
| Result<string> user = os::user(); |
| CHECK_SOME(user); |
| |
| framework.set_user(user.get()); |
| } |
| if (framework.hostname().empty()) { |
| framework.set_hostname(os::hostname().get()); |
| } |
| |
| // Launch a local cluster if necessary. |
| Option<UPID> pid; |
| if (master == "local") { |
| pid = local::launch(flags); |
| } |
| |
| CHECK(process == NULL); |
| |
| url = pid.isSome() ? static_cast<string>(pid.get()) : master; |
| } |
| |
| // Implementation of C++ API. |
| // |
| // Notes: |
| // |
| // (1) Callbacks should be serialized as well as calls into the |
| // class. We do the former because the message reads from |
| // SchedulerProcess are serialized. We do the latter currently by |
| // using locks for certain methods ... but this may change in the |
| // future. |
| // |
| // (2) There is a variable called state, that represents the current |
| // state of the driver and is used to enforce its state transitions. |
| // TODO(vinod): Deprecate this in favor of the constructor that takes |
| // the credential. |
| MesosSchedulerDriver::MesosSchedulerDriver( |
| Scheduler* _scheduler, |
| const FrameworkInfo& _framework, |
| const string& _master) |
| : detector(NULL), |
| scheduler(_scheduler), |
| framework(_framework), |
| master(_master), |
| process(NULL), |
| status(DRIVER_NOT_STARTED), |
| credential(NULL), |
| schedulerId("scheduler-" + UUID::random().toString()) |
| { |
| initialize(); |
| } |
| |
| |
| // The implementation of this is same as the above constructor |
| // except that the SchedulerProcess is passed the credential. |
| MesosSchedulerDriver::MesosSchedulerDriver( |
| Scheduler* _scheduler, |
| const FrameworkInfo& _framework, |
| const string& _master, |
| const Credential& _credential) |
| : detector(NULL), |
| scheduler(_scheduler), |
| framework(_framework), |
| master(_master), |
| process(NULL), |
| status(DRIVER_NOT_STARTED), |
| credential(new Credential(_credential)), |
| schedulerId("scheduler-" + UUID::random().toString()) |
| { |
| initialize(); |
| } |
| |
| |
| MesosSchedulerDriver::~MesosSchedulerDriver() |
| { |
| // We want to make sure the SchedulerProcess has completed so it |
| // doesn't try to make calls into us after we are gone. There is an |
| // unfortunate deadlock scenario that occurs when we try and wait |
| // for a process that we are currently executing within (e.g., |
| // because a callback on 'this' invoked from a SchedulerProcess |
| // ultimately invokes this destructor). This deadlock is actually a |
| // bug in the client code: provided that the SchedulerProcess class |
| // _only_ makes calls into instances of Scheduler, then such a |
| // deadlock implies that the destructor got called from within a |
| // method of the Scheduler instance that is being destructed! Note |
| // that we could add a method to libprocess that told us whether or |
| // not this was about to be deadlock, and possibly report this back |
| // to the user somehow. It might make sense to try and add some more |
| // debug output for the case where we wait indefinitely due to |
| // deadlock. |
| if (process != NULL) { |
| // We call 'terminate()' here to ensure that SchedulerProcess |
| // terminates even if the user forgot to call stop/abort on the |
| // driver. |
| terminate(process); |
| wait(process); |
| delete process; |
| } |
| |
| pthread_mutex_destroy(&mutex); |
| pthread_cond_destroy(&cond); |
| |
| if (detector != NULL) { |
| delete detector; |
| } |
| |
| // Check and see if we need to shutdown a local cluster. |
| if (master == "local" || master == "localquiet") { |
| local::shutdown(); |
| } |
| } |
| |
| |
| Status MesosSchedulerDriver::start() |
| { |
| Lock lock(&mutex); |
| |
| if (status != DRIVER_NOT_STARTED) { |
| return status; |
| } |
| |
| if (detector == NULL) { |
| Try<MasterDetector*> detector_ = MasterDetector::create(url); |
| |
| if (detector_.isError()) { |
| status = DRIVER_ABORTED; |
| string message = "Failed to create a master detector for '" + |
| master + "': " + detector_.error(); |
| scheduler->error(this, message); |
| return status; |
| } |
| |
| // Save the detector so we can delete it later. |
| detector = detector_.get(); |
| } |
| |
| CHECK(process == NULL); |
| |
| if (credential == NULL) { |
| process = new SchedulerProcess( |
| this, |
| scheduler, |
| framework, |
| None(), |
| schedulerId, |
| detector, |
| &mutex, |
| &cond); |
| } else { |
| const Credential& cred = *credential; |
| process = new SchedulerProcess( |
| this, |
| scheduler, |
| framework, |
| cred, |
| schedulerId, |
| detector, |
| &mutex, |
| &cond); |
| } |
| |
| spawn(process); |
| |
| return status = DRIVER_RUNNING; |
| } |
| |
| |
| Status MesosSchedulerDriver::stop(bool failover) |
| { |
| Lock lock(&mutex); |
| |
| if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) { |
| return status; |
| } |
| |
| // 'process' might be NULL if the driver has failed to instantiate |
| // it due to bad parameters (e.g. error in creating the detector |
| // or loading flags). |
| if (process != NULL) { |
| dispatch(process, &SchedulerProcess::stop, failover); |
| } |
| |
| // TODO(benh): It might make more sense to clean up our local |
| // cluster here than in the destructor. However, what would be even |
| // better is to allow multiple local clusters to exist (i.e. not use |
| // global vars in local.cpp) so that ours can just be an instance |
| // variable in MesosSchedulerDriver. |
| |
| bool aborted = status == DRIVER_ABORTED; |
| |
| status = DRIVER_STOPPED; |
| |
| return aborted ? DRIVER_ABORTED : status; |
| } |
| |
| |
| Status MesosSchedulerDriver::abort() |
| { |
| Lock lock(&mutex); |
| |
| if (status != DRIVER_RUNNING) { |
| return status; |
| } |
| |
| CHECK(process != NULL); |
| |
| // We set the volatile aborted to true here to prevent any further |
| // messages from being processed in the SchedulerProcess. However, |
| // if abort() is called from another thread as the SchedulerProcess, |
| // there may be at most one additional message processed. |
| // TODO(bmahler): Use an atomic boolean. |
| process->aborted = true; |
| |
| // Dispatching here ensures that we still process the outstanding |
| // requests *from* the scheduler, since those do proceed when |
| // aborted is true. |
| dispatch(process, &SchedulerProcess::abort); |
| |
| return status = DRIVER_ABORTED; |
| } |
| |
| |
| Status MesosSchedulerDriver::join() |
| { |
| Lock lock(&mutex); |
| |
| if (status != DRIVER_RUNNING) { |
| return status; |
| } |
| |
| while (status == DRIVER_RUNNING) { |
| pthread_cond_wait(&cond, &mutex); |
| } |
| |
| CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED); |
| |
| return status; |
| } |
| |
| |
| Status MesosSchedulerDriver::run() |
| { |
| Status status = start(); |
| return status != DRIVER_RUNNING ? status : join(); |
| } |
| |
| |
| Status MesosSchedulerDriver::killTask(const TaskID& taskId) |
| { |
| Lock lock(&mutex); |
| |
| if (status != DRIVER_RUNNING) { |
| return status; |
| } |
| |
| CHECK(process != NULL); |
| |
| dispatch(process, &SchedulerProcess::killTask, taskId); |
| |
| return status; |
| } |
| |
| |
| Status MesosSchedulerDriver::launchTasks( |
| const OfferID& offerId, |
| const vector<TaskInfo>& tasks, |
| const Filters& filters) |
| { |
| vector<OfferID> offerIds; |
| offerIds.push_back(offerId); |
| |
| return launchTasks(offerIds, tasks, filters); |
| } |
| |
| |
| Status MesosSchedulerDriver::launchTasks( |
| const vector<OfferID>& offerIds, |
| const vector<TaskInfo>& tasks, |
| const Filters& filters) |
| { |
| Lock lock(&mutex); |
| |
| if (status != DRIVER_RUNNING) { |
| return status; |
| } |
| |
| CHECK(process != NULL); |
| |
| dispatch(process, &SchedulerProcess::launchTasks, offerIds, tasks, filters); |
| |
| return status; |
| } |
| |
| |
| Status MesosSchedulerDriver::declineOffer( |
| const OfferID& offerId, |
| const Filters& filters) |
| { |
| vector<OfferID> offerIds; |
| offerIds.push_back(offerId); |
| |
| return launchTasks(offerIds, vector<TaskInfo>(), filters); |
| } |
| |
| |
| Status MesosSchedulerDriver::reviveOffers() |
| { |
| Lock lock(&mutex); |
| |
| if (status != DRIVER_RUNNING) { |
| return status; |
| } |
| |
| CHECK(process != NULL); |
| |
| dispatch(process, &SchedulerProcess::reviveOffers); |
| |
| return status; |
| } |
| |
| |
| Status MesosSchedulerDriver::sendFrameworkMessage( |
| const ExecutorID& executorId, |
| const SlaveID& slaveId, |
| const string& data) |
| { |
| Lock lock(&mutex); |
| |
| if (status != DRIVER_RUNNING) { |
| return status; |
| } |
| |
| CHECK(process != NULL); |
| |
| dispatch(process, &SchedulerProcess::sendFrameworkMessage, |
| executorId, slaveId, data); |
| |
| return status; |
| } |
| |
| |
| Status MesosSchedulerDriver::reconcileTasks( |
| const vector<TaskStatus>& statuses) |
| { |
| Lock lock(&mutex); |
| |
| if (status != DRIVER_RUNNING) { |
| return status; |
| } |
| |
| CHECK(process != NULL); |
| |
| dispatch(process, &SchedulerProcess::reconcileTasks, statuses); |
| |
| return status; |
| } |
| |
| |
| Status MesosSchedulerDriver::requestResources( |
| const vector<Request>& requests) |
| { |
| Lock lock(&mutex); |
| |
| if (status != DRIVER_RUNNING) { |
| return status; |
| } |
| |
| CHECK(process != NULL); |
| |
| dispatch(process, &SchedulerProcess::requestResources, requests); |
| |
| return status; |
| } |