| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <dlfcn.h> |
| #include <errno.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <unistd.h> |
| |
| #include <arpa/inet.h> |
| |
| #include <iostream> |
| #include <string> |
| #include <sstream> |
| |
| #include <mesos/mesos.hpp> |
| #include <mesos/scheduler.hpp> |
| |
| #include <process/async.hpp> |
| #include <process/defer.hpp> |
| #include <process/delay.hpp> |
| #include <process/dispatch.hpp> |
| #include <process/future.hpp> |
| #include <process/id.hpp> |
| #include <process/mutex.hpp> |
| #include <process/process.hpp> |
| #include <process/protobuf.hpp> |
| |
| #include <stout/check.hpp> |
| #include <stout/duration.hpp> |
| #include <stout/error.hpp> |
| #include <stout/flags.hpp> |
| #include <stout/lambda.hpp> |
| #include <stout/nothing.hpp> |
| #include <stout/option.hpp> |
| #include <stout/os.hpp> |
| #include <stout/uuid.hpp> |
| |
| #include "sasl/authenticatee.hpp" |
| |
| #include "common/type_utils.hpp" |
| |
| #include "master/detector.hpp" |
| |
| #include "local/local.hpp" |
| |
| #include "master/detector.hpp" |
| |
| #include "logging/flags.hpp" |
| #include "logging/logging.hpp" |
| |
| #include "messages/messages.hpp" |
| |
| using namespace mesos; |
| using namespace mesos::internal; |
| using namespace mesos::internal::master; |
| |
| using namespace process; |
| |
| using std::queue; |
| using std::string; |
| using std::vector; |
| |
| using process::wait; // Necessary on some OS's to disambiguate. |
| |
| namespace mesos { |
| namespace scheduler { |
| |
| // The process (below) is responsible for receiving messages |
| // (eventually events) from the master and sending messages (via |
| // calls) to the master. |
| class MesosProcess : public ProtobufProcess<MesosProcess> |
| { |
| public: |
| MesosProcess( |
| const string& master, |
| const Option<Credential>& _credential, |
| const lambda::function<void(void)>& _connected, |
| const lambda::function<void(void)>& _disconnected, |
| lambda::function<void(const queue<Event>&)> _received) |
| : ProcessBase(ID::generate("scheduler")), |
| credential(_credential), |
| connected(_connected), |
| disconnected(_disconnected), |
| received(_received), |
| local(false), |
| failover(true), |
| detector(NULL), |
| authenticatee(NULL), |
| authenticating(None()), |
| authenticated(false), |
| reauthenticate(false) |
| { |
| GOOGLE_PROTOBUF_VERIFY_VERSION; |
| |
| // Load any flags from the environment (we use local::Flags in the |
| // event we run in 'local' mode, since it inherits |
| // logging::Flags). In the future, just as the TODO in |
| // local/main.cpp discusses, we'll probably want a way to load |
| // master::Flags and slave::Flags as well. |
| local::Flags flags; |
| |
| Try<Nothing> load = flags.load("MESOS_"); |
| |
| if (load.isError()) { |
| error("Failed to load flags: " + load.error()); |
| return; |
| } |
| |
| // Initialize libprocess (done here since at some point we might |
| // want to use flags to initialize libprocess). |
| process::initialize(); |
| |
| // Initialize logging. |
| if (flags.initialize_driver_logging) { |
| logging::initialize("mesos", flags); |
| } else { |
| VLOG(1) << "Disabling initialization of GLOG logging"; |
| } |
| |
| LOG(INFO) << "Version: " << MESOS_VERSION; |
| |
| // Launch a local cluster if necessary. |
| Option<UPID> pid = None(); |
| if (master == "local") { |
| pid = local::launch(flags); |
| local = true; |
| } |
| |
| Try<MasterDetector*> detector_ = |
| MasterDetector::create(pid.isSome() ? string(pid.get()) : master); |
| |
| if (detector_.isError()) { |
| error("Failed to create a master detector:" + detector_.error()); |
| return; |
| } |
| |
| // Save the detector so we can delete it later. |
| detector = detector_.get(); |
| } |
| |
| virtual ~MesosProcess() |
| { |
| delete authenticatee; |
| |
| // Check and see if we need to shutdown a local cluster. |
| if (local) { |
| local::shutdown(); |
| } |
| |
| // Wait for any callbacks to finish. |
| mutex.lock().await(); |
| } |
| |
| // TODO(benh): Move this to 'protected'. |
| using ProtobufProcess<MesosProcess>::send; |
| |
| void send(Call call) |
| { |
| if (master.isNone()) { |
| drop(call, "Disconnected"); |
| return; |
| } |
| |
| // If no user was specified in FrameworkInfo, use the current user. |
| // TODO(benh): Make FrameworkInfo.user be optional and add a |
| // 'user' to either TaskInfo or CommandInfo. |
| if (call.framework_info().user() == "") { |
| Result<string> user = os::user(); |
| CHECK_SOME(user); |
| |
| call.mutable_framework_info()->set_user(user.get()); |
| } |
| |
| // Only a REGISTER should not have set the framework ID. |
| if (call.type() != Call::REGISTER && !call.framework_info().has_id()) { |
| drop(call, "Call is mising FrameworkInfo.id"); |
| return; |
| } |
| |
| if (!call.IsInitialized()) { |
| drop(call, "Call is not properly initialized: " + |
| call.InitializationErrorString()); |
| return; |
| } |
| |
| switch (call.type()) { |
| case Call::REGISTER: { |
| RegisterFrameworkMessage message; |
| message.mutable_framework()->CopyFrom(call.framework_info()); |
| send(master.get(), message); |
| break; |
| } |
| |
| case Call::REREGISTER: { |
| ReregisterFrameworkMessage message; |
| message.mutable_framework()->CopyFrom(call.framework_info()); |
| message.set_failover(failover); |
| send(master.get(), message); |
| break; |
| } |
| |
| case Call::UNREGISTER: { |
| UnregisterFrameworkMessage message; |
| message.mutable_framework_id()->CopyFrom(call.framework_info().id()); |
| send(master.get(), message); |
| break; |
| } |
| |
| case Call::REQUEST: { |
| if (!call.has_request()) { |
| drop(call, "Expecting 'request' to be present"); |
| return; |
| } |
| ResourceRequestMessage message; |
| message.mutable_framework_id()->CopyFrom(call.framework_info().id()); |
| message.mutable_requests()->CopyFrom(call.request().requests()); |
| send(master.get(), message); |
| break; |
| } |
| |
| case Call::DECLINE: { |
| if (!call.has_decline()) { |
| drop(call, "Expecting 'decline' to be present"); |
| return; |
| } |
| LaunchTasksMessage message; |
| message.mutable_framework_id()->CopyFrom(call.framework_info().id()); |
| message.mutable_filters()->CopyFrom(call.decline().filters()); |
| message.mutable_offer_ids()->CopyFrom(call.decline().offer_ids()); |
| send(master.get(), message); |
| break; |
| } |
| |
| case Call::REVIVE: { |
| ReviveOffersMessage message; |
| message.mutable_framework_id()->CopyFrom(call.framework_info().id()); |
| send(master.get(), message); |
| break; |
| } |
| |
| case Call::LAUNCH: { |
| if (!call.has_launch()) { |
| drop(call, "Expecting 'launch' to be present"); |
| return; |
| } |
| // We do some local validation here, but really this should |
| // all happen in the master so it's only implemented once. |
| vector<TaskInfo> tasks; |
| |
| foreach (const TaskInfo& task, call.launch().task_infos()) { |
| // Check that each TaskInfo has either an ExecutorInfo or a |
| // CommandInfo but not both. |
| if (task.has_executor() == task.has_command()) { |
| drop(task, |
| "TaskInfo must have either an 'executor' or a 'command'"); |
| continue; |
| } |
| |
| // Ensure ExecutorInfo::framework_id is valid, if present. |
| if (task.has_executor() && |
| task.executor().has_framework_id() && |
| !(task.executor().framework_id() == call.framework_info().id())) { |
| drop(task, |
| "ExecutorInfo has an invalid FrameworkID (Actual: " + |
| stringify(task.executor().framework_id()) + " vs Expected: " + |
| stringify(call.framework_info().id()) + ")"); |
| continue; |
| } |
| |
| tasks.push_back(task); |
| |
| // Set ExecutorInfo::framework_id if missing since this |
| // field was added to the API later and thus was made |
| // optional. |
| if (task.has_executor() && !task.executor().has_framework_id()) { |
| tasks.back().mutable_executor()->mutable_framework_id()->CopyFrom( |
| call.framework_info().id()); |
| } |
| } |
| |
| LaunchTasksMessage message; |
| message.mutable_framework_id()->CopyFrom(call.framework_info().id()); |
| message.mutable_filters()->CopyFrom(call.launch().filters()); |
| message.mutable_offer_ids()->CopyFrom(call.launch().offer_ids()); |
| |
| foreach (const TaskInfo& task, tasks) { |
| message.add_tasks()->CopyFrom(task); |
| } |
| |
| send(master.get(), message); |
| break; |
| } |
| |
| case Call::KILL: { |
| if (!call.has_kill()) { |
| drop(call, "Expecting 'kill' to be present"); |
| return; |
| } |
| KillTaskMessage message; |
| message.mutable_framework_id()->CopyFrom(call.framework_info().id()); |
| message.mutable_task_id()->CopyFrom(call.kill().task_id()); |
| send(master.get(), message); |
| break; |
| } |
| |
| case Call::ACKNOWLEDGE: { |
| if (!call.has_acknowledge()) { |
| drop(call, "Expecting 'acknowledge' to be present"); |
| return; |
| } |
| StatusUpdateAcknowledgementMessage message; |
| message.mutable_framework_id()->CopyFrom(call.framework_info().id()); |
| message.mutable_slave_id()->CopyFrom(call.acknowledge().slave_id()); |
| message.mutable_task_id()->CopyFrom(call.acknowledge().task_id()); |
| message.set_uuid(call.acknowledge().uuid()); |
| send(master.get(), message); |
| break; |
| } |
| |
| case Call::RECONCILE: { |
| if (!call.has_reconcile()) { |
| drop(call, "Expecting 'reconcile' to be present"); |
| return; |
| } |
| ReconcileTasksMessage message; |
| message.mutable_framework_id()->CopyFrom(call.framework_info().id()); |
| message.mutable_statuses()->CopyFrom(call.reconcile().statuses()); |
| send(master.get(), message); |
| break; |
| } |
| |
| case Call::MESSAGE: { |
| if (!call.has_message()) { |
| drop(call, "Expecting 'message' to be present"); |
| return; |
| } |
| FrameworkToExecutorMessage message; |
| message.mutable_slave_id()->CopyFrom(call.message().slave_id()); |
| message.mutable_framework_id()->CopyFrom(call.framework_info().id()); |
| message.mutable_executor_id()->CopyFrom(call.message().executor_id()); |
| message.set_data(call.message().data()); |
| send(master.get(), message); |
| break; |
| } |
| |
| default: |
| VLOG(1) << "Unexpected call " << stringify(call.type()); |
| break; |
| } |
| } |
| |
| protected: |
| virtual void initialize() |
| { |
| install<FrameworkRegisteredMessage>(&MesosProcess::receive); |
| install<FrameworkReregisteredMessage>(&MesosProcess::receive); |
| install<ResourceOffersMessage>(&MesosProcess::receive); |
| install<RescindResourceOfferMessage>(&MesosProcess::receive); |
| install<StatusUpdateMessage>(&MesosProcess::receive); |
| install<LostSlaveMessage>(&MesosProcess::receive); |
| install<ExecutorToFrameworkMessage>(&MesosProcess::receive); |
| install<FrameworkErrorMessage>(&MesosProcess::receive); |
| |
| // Start detecting masters. |
| detector->detect() |
| .onAny(defer(self(), &MesosProcess::detected, lambda::_1)); |
| } |
| |
| void detected(const Future<Option<MasterInfo> >& future) |
| { |
| CHECK(!future.isDiscarded()); |
| |
| if (future.isFailed()) { |
| error("Failed to detect a master: " + future.failure()); |
| return; |
| } |
| |
| if (future.get().isNone()) { |
| master = None(); |
| |
| VLOG(1) << "No master detected"; |
| |
| mutex.lock() |
| .then(defer(self(), &Self::_detected)) |
| .onAny(lambda::bind(&Mutex::unlock, mutex)); |
| } else { |
| master = UPID(future.get().get().pid()); |
| |
| VLOG(1) << "New master detected at " << master.get(); |
| |
| if (credential.isSome()) { |
| // Authenticate with the master. |
| authenticate(); |
| } else { |
| mutex.lock() |
| .then(defer(self(), &Self::__detected)) |
| .onAny(lambda::bind(&Mutex::unlock, mutex)); |
| } |
| } |
| |
| // Keep detecting masters. |
| detector->detect(future.get()) |
| .onAny(defer(self(), &MesosProcess::detected, lambda::_1)); |
| } |
| |
| Future<Nothing> _detected() |
| { |
| return async(disconnected); |
| } |
| |
| Future<Nothing> __detected() |
| { |
| return async(connected); |
| } |
| |
| void authenticate() |
| { |
| authenticated = false; |
| |
| // We retry to authenticate and it's possible that we'll get |
| // disconnected while that is happening. |
| if (master.isNone()) { |
| return; |
| } |
| |
| if (authenticating.isSome()) { |
| // Authentication is in progress. Try to cancel it. |
| // Note that it is possible that 'authenticating' is ready |
| // and the dispatch to '_authenticate' is enqueued when we |
| // are here, making the 'discard' here a no-op. This is ok |
| // because we set 'reauthenticate' here which enforces a retry |
| // in '_authenticate'. |
| Future<bool>(authenticating.get()).discard(); |
| reauthenticate = true; |
| return; |
| } |
| |
| VLOG(1) << "Authenticating with master " << master.get(); |
| |
| CHECK_SOME(credential); |
| |
| CHECK(authenticatee == NULL); |
| authenticatee = new sasl::Authenticatee(credential.get(), self()); |
| |
| // NOTE: We do not pass 'Owned<Authenticatee>' here because doing |
| // so could make 'AuthenticateeProcess' responsible for deleting |
| // 'Authenticatee' causing a deadlock because the destructor of |
| // 'Authenticatee' waits on 'AuthenticateeProcess'. |
| // This will happen in the following scenario: |
| // --> 'AuthenticateeProcess' does a 'Future.set()'. |
| // --> '_authenticate()' is dispatched to this process. |
| // --> This process executes '_authenticatee()'. |
| // --> 'AuthenticateeProcess' removes the onAny callback |
| // from its queue which holds the last reference to |
| // 'Authenticatee'. |
| // --> '~Authenticatee()' is invoked by 'AuthenticateeProcess'. |
| // TODO(vinod): Consider using 'Shared' to 'Owned' upgrade. |
| authenticating = authenticatee->authenticate(master.get()) |
| .onAny(defer(self(), &Self::_authenticate)); |
| |
| delay(Seconds(5), |
| self(), |
| &Self::authenticationTimeout, |
| authenticating.get()); |
| } |
| |
| void _authenticate() |
| { |
| delete CHECK_NOTNULL(authenticatee); |
| authenticatee = NULL; |
| |
| CHECK_SOME(authenticating); |
| const Future<bool>& future = authenticating.get(); |
| |
| if (master.isNone()) { |
| VLOG(1) << "Ignoring authentication because no master is detected"; |
| authenticating = None(); |
| |
| // Set it to false because we do not want further retries until |
| // a new master is detected. |
| // We obviously do not need to reauthenticate either even if |
| // 'reauthenticate' is currently true because the master is |
| // lost. |
| reauthenticate = false; |
| return; |
| } |
| |
| if (reauthenticate || !future.isReady()) { |
| VLOG(1) |
| << "Failed to authenticate with master " << master.get() << ": " |
| << (reauthenticate ? "master changed" : |
| (future.isFailed() ? future.failure() : "future discarded")); |
| |
| authenticating = None(); |
| reauthenticate = false; |
| |
| // TODO(vinod): Add a limit on number of retries. |
| dispatch(self(), &Self::authenticate); // Retry. |
| return; |
| } |
| |
| if (!future.get()) { |
| VLOG(1) << "Master " << master.get() << " refused authentication"; |
| error("Authentication refused"); |
| return; |
| } |
| |
| VLOG(1) << "Successfully authenticated with master " << master.get(); |
| |
| authenticated = true; |
| authenticating = None(); |
| |
| mutex.lock() |
| .then(defer(self(), &Self::__authenticate)) |
| .onAny(lambda::bind(&Mutex::unlock, mutex)); |
| } |
| |
| Future<Nothing> __authenticate() |
| { |
| return async(connected); |
| } |
| |
| void authenticationTimeout(Future<bool> future) |
| { |
| // NOTE: Discarded future results in a retry in '_authenticate()'. |
| // Also note that a 'discard' here is safe even if another |
| // authenticator is in progress because this copy of the future |
| // corresponds to the original authenticator that started the timer. |
| if (future.discard()) { // This is a no-op if the future is already ready. |
| LOG(WARNING) << "Authentication timed out"; |
| } |
| } |
| |
| // NOTE: A None 'from' is possible when an event is injected locally. |
| void receive(const Option<UPID>& from, const Event& event) |
| { |
| // Check if we're disconnected but received an event. |
| if (from.isSome() && master.isNone()) { |
| // Ignore the event unless it's a registered or reregistered. |
| if (event.type() != Event::REGISTERED && |
| event.type() != Event::REREGISTERED) { |
| VLOG(1) << "Ignoring " << stringify(event.type()) |
| << " event because we're disconnected"; |
| return; |
| } |
| } else if (from.isSome() && master != from) { |
| VLOG(1) |
| << "Ignoring " << stringify(event.type()) |
| << " event because it was sent from '" << from.get() |
| << "' instead of the leading master '" << master.get() << "'"; |
| return; |
| } |
| |
| // Note that if 'from' is None we're locally injecting this event |
| // so we always want to enqueue it even if we're not connected! |
| |
| VLOG(1) << "Enqueuing event " << stringify(event.type()) << " from " |
| << (from.isNone() ? "(locally injected)" : from.get()); |
| |
| // Queue up the event and invoke the 'received' callback if this |
| // is the first event (between now and when the 'received' |
| // callback actually gets invoked more events might get queued). |
| events.push(event); |
| |
| if (events.size() == 1) { |
| mutex.lock() |
| .then(defer(self(), &Self::_receive)) |
| .onAny(lambda::bind(&Mutex::unlock, mutex)); |
| } |
| } |
| |
| Future<Nothing> _receive() |
| { |
| Future<Nothing> future = async(received, events); |
| events = queue<Event>(); |
| return future; |
| } |
| |
| void receive(const UPID& from, const FrameworkRegisteredMessage& message) |
| { |
| // We've now registered at least once with the master so we're no |
| // longer failing over. See the comment where 'failover' is |
| // declared for further details. |
| failover = false; |
| |
| Event event; |
| event.set_type(Event::REGISTERED); |
| |
| Event::Registered* registered = event.mutable_registered(); |
| |
| registered->mutable_framework_id()->CopyFrom(message.framework_id()); |
| registered->mutable_master_info()->CopyFrom(message.master_info()); |
| |
| receive(from, event); |
| } |
| |
| void receive(const UPID& from, const FrameworkReregisteredMessage& message) |
| { |
| // We've now registered at least once with the master so we're no |
| // longer failing over. See the comment where 'failover' is |
| // declared for further details. |
| failover = false; |
| |
| Event event; |
| event.set_type(Event::REREGISTERED); |
| |
| Event::Reregistered* reregistered = event.mutable_reregistered(); |
| |
| reregistered->mutable_framework_id()->CopyFrom(message.framework_id()); |
| reregistered->mutable_master_info()->CopyFrom(message.master_info()); |
| |
| receive(from, event); |
| } |
| |
| void receive(const UPID& from, const ResourceOffersMessage& message) |
| { |
| Event event; |
| event.set_type(Event::OFFERS); |
| |
| Event::Offers* offers = event.mutable_offers(); |
| |
| offers->mutable_offers()->CopyFrom(message.offers()); |
| |
| receive(from, event); |
| } |
| |
| void receive(const UPID& from, const RescindResourceOfferMessage& message) |
| { |
| Event event; |
| event.set_type(Event::RESCIND); |
| |
| Event::Rescind* rescind = event.mutable_rescind(); |
| |
| rescind->mutable_offer_id()->CopyFrom(message.offer_id()); |
| |
| receive(from, event); |
| } |
| |
| void receive(const UPID& from, const StatusUpdateMessage& message) |
| { |
| Event event; |
| event.set_type(Event::UPDATE); |
| |
| Event::Update* update = event.mutable_update(); |
| |
| update->mutable_status()->CopyFrom(message.update().status()); |
| |
| if (message.update().has_slave_id()) { |
| update->mutable_status()->mutable_slave_id()->CopyFrom( |
| message.update().slave_id()); |
| } |
| |
| if (message.update().has_executor_id()) { |
| update->mutable_status()->mutable_executor_id()->CopyFrom( |
| message.update().executor_id()); |
| } |
| |
| update->mutable_status()->set_timestamp(message.update().timestamp()); |
| |
| update->set_uuid(message.update().uuid()); |
| |
| receive(from, event); |
| } |
| |
| void receive(const UPID& from, const LostSlaveMessage& message) |
| { |
| Event event; |
| event.set_type(Event::FAILURE); |
| |
| Event::Failure* failure = event.mutable_failure(); |
| |
| failure->mutable_slave_id()->CopyFrom(message.slave_id()); |
| |
| receive(from, event); |
| } |
| |
| void receive(const UPID& from, const ExecutorToFrameworkMessage& _message) |
| { |
| Event event; |
| event.set_type(Event::MESSAGE); |
| |
| Event::Message* message = event.mutable_message(); |
| |
| message->mutable_slave_id()->CopyFrom(_message.slave_id()); |
| message->mutable_executor_id()->CopyFrom(_message.executor_id()); |
| message->set_data(_message.data()); |
| |
| receive(from, event); |
| } |
| |
| void receive(const UPID& from, const FrameworkErrorMessage& message) |
| { |
| Event event; |
| event.set_type(Event::ERROR); |
| |
| Event::Error* error = event.mutable_error(); |
| |
| error->set_message(message.message()); |
| |
| receive(from, event); |
| } |
| |
| // Helper for injecting an ERROR event. |
| void error(const string& message) |
| { |
| Event event; |
| |
| event.set_type(Event::ERROR); |
| |
| Event::Error* error = event.mutable_error(); |
| |
| error->set_message(message); |
| |
| receive(None(), event); |
| } |
| |
| // Helper for "dropping" a task that was launched. |
| void drop(const TaskInfo& task, const string& message) |
| { |
| Event event; |
| |
| event.set_type(Event::UPDATE); |
| |
| Event::Update* update = event.mutable_update(); |
| |
| TaskStatus* status = update->mutable_status(); |
| status->mutable_task_id()->CopyFrom(task.task_id()); |
| status->set_state(TASK_LOST); |
| status->set_message(message); |
| status->set_timestamp(Clock::now().secs()); |
| |
| update->set_uuid(UUID::random().toBytes()); |
| |
| receive(None(), event); |
| } |
| |
| void drop(const Call& call, const string& message) |
| { |
| VLOG(1) << "Dropping " << stringify(call.type()) << ": " << message; |
| |
| switch (call.type()) { |
| case Call::LAUNCH: { |
| // We drop the tasks preemptively (enqueing update events that |
| // put the task in TASK_LOST). This is a hack for now, to keep |
| // the tasks from being forever in PENDING state, when |
| // actually the master never received the launch. |
| // Unfortuantely this is insufficient since it doesn't capture |
| // the case when the scheduler process sends it but the master |
| // never receives it (i.e., during a master failover). In the |
| // future, this should be solved by higher-level abstractions |
| // and this hack should be considered for removal. |
| foreach (const TaskInfo& task, call.launch().task_infos()) { |
| drop(task, message); |
| } |
| break; |
| } |
| |
| default: |
| break; |
| } |
| } |
| |
| private: |
| const Option<Credential> credential; |
| |
| Mutex mutex; // Used to serialize the callback invocations. |
| |
| lambda::function<void(void)> connected; |
| lambda::function<void(void)> disconnected; |
| lambda::function<void(const queue<Event>&)> received; |
| |
| bool local; // Whether or not we launched a local cluster. |
| |
| // Whether or not this is the first time we've sent a |
| // REREGISTER. This is to maintain compatibility with what the |
| // master expects from SchedulerProcess. After the first REGISTER or |
| // REREGISTER event we force this to be false. |
| bool failover; |
| |
| MasterDetector* detector; |
| |
| queue<Event> events; |
| |
| Option<UPID> master; |
| |
| sasl::Authenticatee* authenticatee; |
| |
| // Indicates if an authentication attempt is in progress. |
| Option<Future<bool> > authenticating; |
| |
| // Indicates if the authentication is successful. |
| bool authenticated; |
| |
| // Indicates if a new authentication attempt should be enforced. |
| bool reauthenticate; |
| }; |
| |
| |
| Mesos::Mesos( |
| const string& master, |
| const lambda::function<void(void)>& connected, |
| const lambda::function<void(void)>& disconnected, |
| const lambda::function<void(const queue<Event>&)>& received) |
| { |
| process = |
| new MesosProcess(master, None(), connected, disconnected, received); |
| spawn(process); |
| } |
| |
| |
| Mesos::Mesos( |
| const string& master, |
| const Credential& credential, |
| const lambda::function<void(void)>& connected, |
| const lambda::function<void(void)>& disconnected, |
| const lambda::function<void(const queue<Event>&)>& received) |
| { |
| process = |
| new MesosProcess(master, credential, connected, disconnected, received); |
| spawn(process); |
| } |
| |
| |
| Mesos::~Mesos() |
| { |
| terminate(process); |
| wait(process); |
| delete process; |
| } |
| |
| |
| void Mesos::send(const Call& call) |
| { |
| dispatch(process, &MesosProcess::send, call); |
| } |
| |
| |
| } // namespace scheduler { |
| } // namespace mesos { |