blob: 586b2e8d9ac5c9af3634ac5a8570422ffa02c19c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Python.h must be included before standard headers.
// See: http://docs.python.org/2/c-api/intro.html#include-files
#include <Python.h>
#include <iostream>
#include "common.hpp"
#include "mesos_scheduler_driver_impl.hpp"
#include "proxy_scheduler.hpp"
using namespace mesos;
using std::cerr;
using std::endl;
using std::map;
using std::string;
using std::vector;
namespace mesos {
namespace python {
void ProxyScheduler::registered(SchedulerDriver* driver,
const FrameworkID& frameworkId,
const MasterInfo& masterInfo)
{
InterpreterLock lock;
PyObject* fid = nullptr;
PyObject* minfo = nullptr;
PyObject* res = nullptr;
fid = createPythonProtobuf(frameworkId, "FrameworkID");
if (fid == nullptr) {
goto cleanup; // createPythonProtobuf will have set an exception.
}
minfo = createPythonProtobuf(masterInfo, "MasterInfo");
if (minfo == nullptr) {
goto cleanup; // createPythonProtobuf will have set an exception.
}
res = PyObject_CallMethod(impl->pythonScheduler,
(char*) "registered",
(char*) "OOO",
impl,
fid,
minfo);
if (res == nullptr) {
cerr << "Failed to call scheduler's registered" << endl;
goto cleanup;
}
cleanup:
if (PyErr_Occurred()) {
PyErr_Print();
driver->abort();
}
Py_XDECREF(fid);
Py_XDECREF(minfo);
Py_XDECREF(res);
}
void ProxyScheduler::reregistered(SchedulerDriver* driver,
const MasterInfo& masterInfo)
{
InterpreterLock lock;
PyObject* minfo = nullptr;
PyObject* res = nullptr;
minfo = createPythonProtobuf(masterInfo, "MasterInfo");
if (minfo == nullptr) {
goto cleanup; // createPythonProtobuf will have set an exception.
}
res = PyObject_CallMethod(impl->pythonScheduler,
(char*) "reregistered",
(char*) "OO",
impl,
minfo);
if (res == nullptr) {
cerr << "Failed to call scheduler's reregistered" << endl;
goto cleanup;
}
cleanup:
if (PyErr_Occurred()) {
PyErr_Print();
driver->abort();
}
Py_XDECREF(minfo);
Py_XDECREF(res);
}
void ProxyScheduler::disconnected(SchedulerDriver* driver)
{
InterpreterLock lock;
PyObject* res = nullptr;
res = PyObject_CallMethod(impl->pythonScheduler,
(char*) "disconnected",
(char*) "O",
impl);
if (res == nullptr) {
cerr << "Failed to call scheduler's disconnected" << endl;
goto cleanup;
}
cleanup:
if (PyErr_Occurred()) {
PyErr_Print();
driver->abort();
}
Py_XDECREF(res);
}
void ProxyScheduler::resourceOffers(SchedulerDriver* driver,
const vector<Offer>& offers)
{
InterpreterLock lock;
PyObject* list = nullptr;
PyObject* res = nullptr;
list = PyList_New(offers.size());
if (list == nullptr) {
goto cleanup;
}
for (size_t i = 0; i < offers.size(); i++) {
PyObject* offer = createPythonProtobuf(offers[i], "Offer");
if (offer == nullptr) {
goto cleanup;
}
PyList_SetItem(list, i, offer); // Steals the reference to offer.
}
res = PyObject_CallMethod(impl->pythonScheduler,
(char*) "resourceOffers",
(char*) "OO",
impl,
list);
if (res == nullptr) {
cerr << "Failed to call scheduler's resourceOffer" << endl;
goto cleanup;
}
cleanup:
if (PyErr_Occurred()) {
PyErr_Print();
driver->abort();
}
Py_XDECREF(list);
Py_XDECREF(res);
}
void ProxyScheduler::offerRescinded(SchedulerDriver* driver,
const OfferID& offerId)
{
InterpreterLock lock;
PyObject* oid = nullptr;
PyObject* res = nullptr;
oid = createPythonProtobuf(offerId, "OfferID");
if (oid == nullptr) {
goto cleanup; // createPythonProtobuf will have set an exception.
}
res = PyObject_CallMethod(impl->pythonScheduler,
(char*) "offerRescinded",
(char*) "OO",
impl,
oid);
if (res == nullptr) {
cerr << "Failed to call scheduler's offerRescinded" << endl;
goto cleanup;
}
cleanup:
if (PyErr_Occurred()) {
PyErr_Print();
driver->abort();
}
Py_XDECREF(oid);
Py_XDECREF(res);
}
void ProxyScheduler::statusUpdate(SchedulerDriver* driver,
const TaskStatus& status)
{
InterpreterLock lock;
PyObject* stat = nullptr;
PyObject* res = nullptr;
stat = createPythonProtobuf(status, "TaskStatus");
if (stat == nullptr) {
goto cleanup; // createPythonProtobuf will have set an exception.
}
res = PyObject_CallMethod(impl->pythonScheduler,
(char*) "statusUpdate",
(char*) "OO",
impl,
stat);
if (res == nullptr) {
cerr << "Failed to call scheduler's statusUpdate" << endl;
goto cleanup;
}
cleanup:
if (PyErr_Occurred()) {
PyErr_Print();
driver->abort();
}
Py_XDECREF(stat);
Py_XDECREF(res);
}
void ProxyScheduler::frameworkMessage(SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
const string& data)
{
InterpreterLock lock;
PyObject* eid = nullptr;
PyObject* sid = nullptr;
PyObject* res = nullptr;
eid = createPythonProtobuf(executorId, "ExecutorID");
if (eid == nullptr) {
goto cleanup; // createPythonProtobuf will have set an exception.
}
sid = createPythonProtobuf(slaveId, "SlaveID");
if (sid == nullptr) {
goto cleanup; // createPythonProtobuf will have set an exception.
}
res = PyObject_CallMethod(impl->pythonScheduler,
(char*) "frameworkMessage",
(char*) "OOOs#",
impl,
eid,
sid,
data.data(),
data.length());
if (res == nullptr) {
cerr << "Failed to call scheduler's frameworkMessage" << endl;
goto cleanup;
}
cleanup:
if (PyErr_Occurred()) {
PyErr_Print();
driver->abort();
}
Py_XDECREF(eid);
Py_XDECREF(sid);
Py_XDECREF(res);
}
void ProxyScheduler::slaveLost(SchedulerDriver* driver, const SlaveID& slaveId)
{
InterpreterLock lock;
PyObject* sid = nullptr;
PyObject* res = nullptr;
sid = createPythonProtobuf(slaveId, "SlaveID");
if (sid == nullptr) {
goto cleanup; // createPythonProtobuf will have set an exception.
}
res = PyObject_CallMethod(impl->pythonScheduler,
(char*) "slaveLost",
(char*) "OO",
impl,
sid);
if (res == nullptr) {
cerr << "Failed to call scheduler's slaveLost" << endl;
goto cleanup;
}
cleanup:
if (PyErr_Occurred()) {
PyErr_Print();
driver->abort();
}
Py_XDECREF(sid);
Py_XDECREF(res);
}
void ProxyScheduler::executorLost(SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
int status)
{
InterpreterLock lock;
PyObject* executorIdObj = nullptr;
PyObject* slaveIdObj = nullptr;
PyObject* res = nullptr;
executorIdObj = createPythonProtobuf(executorId, "ExecutorID");
slaveIdObj = createPythonProtobuf(slaveId, "SlaveID");
if (executorIdObj == nullptr || slaveIdObj == nullptr) {
goto cleanup; // createPythonProtobuf will have set an exception.
}
res = PyObject_CallMethod(impl->pythonScheduler,
(char*) "executorLost",
(char*) "OOOi",
impl,
executorIdObj,
slaveIdObj,
status);
if (res == nullptr) {
cerr << "Failed to call scheduler's executorLost" << endl;
goto cleanup;
}
cleanup:
if (PyErr_Occurred()) {
PyErr_Print();
driver->abort();
}
Py_XDECREF(executorIdObj);
Py_XDECREF(slaveIdObj);
Py_XDECREF(res);
}
void ProxyScheduler::error(SchedulerDriver* driver, const string& message)
{
InterpreterLock lock;
PyObject* res = PyObject_CallMethod(impl->pythonScheduler,
(char*) "error",
(char*) "Os#",
impl,
message.data(),
message.length());
if (res == nullptr) {
cerr << "Failed to call scheduler's error" << endl;
goto cleanup;
}
cleanup:
if (PyErr_Occurred()) {
PyErr_Print();
// No need for driver.stop(); it should stop itself.
}
Py_XDECREF(res);
}
} // namespace python {
} // namespace mesos {