blob: d6a7867452d19cf1e65b780d0801cb4a0cd98e18 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gmock/gmock.h>
#include <queue>
#include <vector>
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/http.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/queue.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/json.hpp>
#include <stout/lambda.hpp>
#include <stout/memory.hpp>
#include <stout/try.hpp>
#include "master/master.hpp"
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
using mesos::internal::master::Master;
using mesos::internal::slave::Containerizer;
using mesos::internal::slave::Slave;
using mesos::scheduler::Call;
using mesos::scheduler::Event;
using process::Future;
using process::Owned;
using process::PID;
using process::Promise;
using process::Queue;
using process::http::OK;
using process::metrics::internal::MetricsProcess;
using std::queue;
using std::vector;
using testing::_;
using testing::AtMost;
using testing::DoAll;
using testing::Return;
class SchedulerTest : public MesosTest
{
protected:
// Helper class for using EXPECT_CALL since the Mesos scheduler API
// is callback based.
class Callbacks
{
public:
MOCK_METHOD0(connected, void(void));
MOCK_METHOD0(disconnected, void(void));
MOCK_METHOD1(received, void(const std::queue<Event>&));
};
};
// Enqueues all received events into a libprocess queue.
ACTION_P(Enqueue, queue)
{
std::queue<Event> events = arg0;
while (!events.empty()) {
queue->put(events.front());
events.pop();
}
}
TEST_F(SchedulerTest, TaskRunning)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Try<PID<Slave> > slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
Callbacks callbacks;
Future<Nothing> connected;
EXPECT_CALL(callbacks, connected())
.WillOnce(FutureSatisfy(&connected));
scheduler::Mesos mesos(
master.get(),
DEFAULT_CREDENTIAL,
lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
AWAIT_READY(connected);
Queue<Event> events;
EXPECT_CALL(callbacks, received(_))
.WillRepeatedly(Enqueue(&events));
{
Call call;
call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
call.set_type(Call::REGISTER);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::REGISTERED, event.get().type());
FrameworkID id(event.get().registered().framework_id());
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::OFFERS, event.get().type());
EXPECT_NE(0, event.get().offers().offers().size());
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<Nothing> update;
EXPECT_CALL(containerizer, update(_, _))
.WillOnce(DoAll(FutureSatisfy(&update),
Return(Future<Nothing>())))
.WillRepeatedly(Return(Future<Nothing>())); // Ignore subsequent calls.
TaskInfo taskInfo;
taskInfo.set_name("");
taskInfo.mutable_task_id()->set_value("1");
taskInfo.mutable_slave_id()->CopyFrom(
event.get().offers().offers(0).slave_id());
taskInfo.mutable_resources()->CopyFrom(
event.get().offers().offers(0).resources());
taskInfo.mutable_executor()->CopyFrom(DEFAULT_EXECUTOR_INFO);
// TODO(benh): Enable just running a task with a command in the tests:
// taskInfo.mutable_command()->set_value("sleep 10");
{
Call call;
call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
call.mutable_framework_info()->mutable_id()->CopyFrom(id);
call.set_type(Call::LAUNCH);
call.mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
call.mutable_launch()->add_offer_ids()->CopyFrom(
event.get().offers().offers(0).id());
mesos.send(call);
}
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::UPDATE, event.get().type());
EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
AWAIT_READY(update);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
// TODO(benh): Write test for sending Call::Acknowledgement through
// master to slave when Event::Update was generated locally.
class MesosSchedulerDriverTest : public MesosTest {};
TEST_F(MesosSchedulerDriverTest, MetricsEndpoint)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
Future<Nothing> registered;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureSatisfy(&registered));
ASSERT_EQ(DRIVER_RUNNING, driver.start());
AWAIT_READY(registered);
Future<process::http::Response> response =
process::http::get(MetricsProcess::instance()->self(), "/snapshot");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
EXPECT_SOME_EQ(
"application/json",
response.get().headers.get("Content-Type"));
Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
ASSERT_SOME(parse);
JSON::Object metrics = parse.get();
EXPECT_EQ(1u, metrics.values.count("scheduler/event_queue_messages"));
driver.stop();
driver.join();
Shutdown();
}