blob: caefd160fbb8dd583b3c32b6d0df24318a86c53e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "master/detector/standalone.hpp"
#include <set>
#include <mesos/master/detector.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
#include "common/protobuf_utils.hpp"
using namespace process;
using std::set;
namespace mesos {
namespace master {
namespace detector {
class StandaloneMasterDetectorProcess
: public Process<StandaloneMasterDetectorProcess>
{
public:
StandaloneMasterDetectorProcess()
: ProcessBase(ID::generate("standalone-master-detector")) {}
explicit StandaloneMasterDetectorProcess(const MasterInfo& _leader)
: ProcessBase(ID::generate("standalone-master-detector")),
leader(_leader) {}
~StandaloneMasterDetectorProcess() override
{
discardPromises(&promises);
}
void appoint(const Option<MasterInfo>& leader_)
{
leader = leader_;
setPromises(&promises, leader);
}
Future<Option<MasterInfo>> detect(
const Option<MasterInfo>& previous = None())
{
if (leader != previous) {
return leader;
}
Promise<Option<MasterInfo>>* promise = new Promise<Option<MasterInfo>>();
promise->future()
.onDiscard(defer(self(), &Self::discard, promise->future()));
promises.insert(promise);
return promise->future();
}
private:
void discard(const Future<Option<MasterInfo>>& future)
{
// Discard the promise holding this future.
discardPromises(&promises, future);
}
Option<MasterInfo> leader; // The appointed master.
set<Promise<Option<MasterInfo>>*> promises;
};
StandaloneMasterDetector::StandaloneMasterDetector()
{
process = new StandaloneMasterDetectorProcess();
spawn(process);
}
StandaloneMasterDetector::StandaloneMasterDetector(const MasterInfo& leader)
{
process = new StandaloneMasterDetectorProcess(leader);
spawn(process);
}
StandaloneMasterDetector::StandaloneMasterDetector(const UPID& leader)
{
process = new StandaloneMasterDetectorProcess(
mesos::internal::protobuf::createMasterInfo(leader));
spawn(process);
}
StandaloneMasterDetector::~StandaloneMasterDetector()
{
terminate(process);
process::wait(process);
delete process;
}
void StandaloneMasterDetector::appoint(const Option<MasterInfo>& leader)
{
dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader);
}
void StandaloneMasterDetector::appoint(const UPID& leader)
{
dispatch(process,
&StandaloneMasterDetectorProcess::appoint,
mesos::internal::protobuf::createMasterInfo(leader));
}
Future<Option<MasterInfo>> StandaloneMasterDetector::detect(
const Option<MasterInfo>& previous)
{
return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous);
}
} // namespace detector {
} // namespace master {
} // namespace mesos {