blob: 893d3e366164ccebd2847ed4c2874ab00e0e5b7b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with *this work for additional information
// regarding copyright ownership. The ASF licenses *this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use *this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <memory>
#include <process/loop.hpp>
#include "tests/mesos.hpp"
#include "tests/master/mock_master_api_subscriber.hpp"
using mesos::v1::master::Call;
using mesos::v1::master::Event;
using mesos::internal::master::Master;
using mesos::internal::recordio::Reader;
using process::Future;
using process::Failure;
using process::Promise;
using testing::_;
using testing::Return;
namespace mesos {
namespace internal {
namespace tests {
namespace v1 {
class MockMasterAPISubscriberProcess
: public process::Process<MockMasterAPISubscriberProcess>
{
public:
MockMasterAPISubscriberProcess(MockMasterAPISubscriber* subscriber_)
: subscriber(subscriber_) {};
Future<Nothing> subscribe(
const process::PID<Master>& masterPid, ContentType contentType)
{
Call call;
call.set_type(Call::SUBSCRIBE);
process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
return process::http::streaming::post(
masterPid,
"api/v1",
headers,
serialize(contentType, call),
stringify(contentType))
.then(defer(self(), &Self::_subscribe, lambda::_1, contentType));
}
protected:
void finalize() override
{
receiveLoop.discard();
}
private:
Future<Nothing> _subscribe(
const process::Future<process::http::Response>& response,
ContentType contentType)
{
if (response.isFailed()) {
return Failure(
"Failed to subscribe to master API events: " + response.failure());
}
if (process::http::OK().status != response->status) {
return Failure(
"SUBSCRIBE call returned bad HTTP status code: " +
stringify(response->status));
}
if (response->type != process::http::Response::PIPE) {
return Failure("Response type is not PIPE");
}
if (response->reader.isNone()) {
return Failure("Response reader is set to None");
}
auto deserializer = lambda::bind(
deserialize<Event>, contentType, lambda::_1);
std::unique_ptr<Reader<Event>> reader(new Reader<Event>(
deserializer, response->reader.get()));
auto decode = lambda::bind(
[](std::unique_ptr<Reader<Event>>& d) { return d->read(); },
std::move(reader));
receiveLoop = process::loop(
self(),
std::move(decode),
[this](const Result<Event>& event) -> process::ControlFlow<Nothing> {
if (event.isError()) {
LOG(FATAL) << "Failed to read master streaming API event: "
<< event.error();
}
if (event.isNone()) {
LOG(INFO) << "Received EOF from master streaming API";
return process::Break();
}
LOG(INFO) << "Received " << event->type()
<< " event from master streaming API";
subscriber->handleEvent(event.get());
return process::Continue();
});
LOG(INFO) << "Subscribed to master streaming API events";
receiveLoop.onAny([]() {
LOG(INFO) << "Stopped master streaming API receive loop";
});
return Nothing();
}
MockMasterAPISubscriber* subscriber;
Future<Nothing> receiveLoop;
};
MockMasterAPISubscriber::MockMasterAPISubscriber()
{
EXPECT_CALL(*this, subscribed(_))
.WillRepeatedly(Return());
EXPECT_CALL(*this, taskAdded(_))
.WillRepeatedly(Return());
EXPECT_CALL(*this, taskUpdated(_))
.WillRepeatedly(Return());
EXPECT_CALL(*this, agentAdded(_))
.WillRepeatedly(Return());
EXPECT_CALL(*this, agentRemoved(_))
.WillRepeatedly(Return());
EXPECT_CALL(*this, frameworkAdded(_))
.WillRepeatedly(Return());
EXPECT_CALL(*this, frameworkUpdated(_))
.WillRepeatedly(Return());
EXPECT_CALL(*this, frameworkRemoved(_))
.WillRepeatedly(Return());
EXPECT_CALL(*this, heartbeat())
.WillRepeatedly(Return());
subscribeCalled = false;
pid = spawn(new MockMasterAPISubscriberProcess(this), true);
}
MockMasterAPISubscriber::~MockMasterAPISubscriber()
{
process::terminate(pid);
// The process holds a pointer to this object, and so
// we must ensure it won't access the pointer before
// we exit the destructor.
//
// TODO(asekretenko): Figure out a way to avoid blocking.
process::wait(pid);
}
Future<Nothing> MockMasterAPISubscriber::subscribe(
const process::PID<Master>& masterPid,
ContentType contentType)
{
if (subscribeCalled) {
return Failure(
"MockMasterAPISubscriber::subscribe should be called at most once");
}
subscribeCalled = true;
return dispatch(
pid,
&MockMasterAPISubscriberProcess::subscribe,
masterPid,
contentType);
}
void MockMasterAPISubscriber::handleEvent(const Event& event)
{
switch (event.type()) {
case Event::SUBSCRIBED:
subscribed(event.subscribed());
break;
case Event::TASK_ADDED:
taskAdded(event.task_added());
break;
case Event::TASK_UPDATED:
taskUpdated(event.task_updated());
break;
case Event::AGENT_ADDED:
agentAdded(event.agent_added());
break;
case Event::AGENT_REMOVED:
agentRemoved(event.agent_removed());
break;
case Event::FRAMEWORK_ADDED:
frameworkAdded(event.framework_added());
break;
case Event::FRAMEWORK_UPDATED:
frameworkUpdated(event.framework_updated());
break;
case Event::FRAMEWORK_REMOVED:
frameworkRemoved(event.framework_removed());
break;
case Event::HEARTBEAT:
heartbeat();
break;
case Event::UNKNOWN:
LOG(FATAL) << "Received event of a type UNKNOWN";
}
}
} // namespace v1 {
} // namespace tests {
} // namespace internal {
} // namespace mesos {