blob: 005579360bb149b4973e0bd26f39b762dd67073a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Python.h must be included before standard headers.
// See: http://docs.python.org/2/c-api/intro.html#include-files
#include <Python.h>
#include <string>
#include "common.hpp"
#include "mesos_executor_driver_impl.hpp"
#include "proxy_executor.hpp"
using namespace mesos;
using namespace mesos::python;
using std::cerr;
using std::endl;
using std::string;
using std::vector;
using std::map;
namespace mesos { namespace python {
/**
* Python type object for MesosExecutorDriverImpl.
*/
PyTypeObject MesosExecutorDriverImplType = {
PyObject_HEAD_INIT(nullptr)
0, /* ob_size */
"_mesos.MesosExecutorDriverImpl", /* tp_name */
sizeof(MesosExecutorDriverImpl), /* tp_basicsize */
0, /* tp_itemsize */
(destructor) MesosExecutorDriverImpl_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /* tp_flags */
"Private MesosExecutorDriver implementation", /* tp_doc */
(traverseproc) MesosExecutorDriverImpl_traverse, /* tp_traverse */
(inquiry) MesosExecutorDriverImpl_clear, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
MesosExecutorDriverImpl_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc) MesosExecutorDriverImpl_init, /* tp_init */
0, /* tp_alloc */
MesosExecutorDriverImpl_new, /* tp_new */
};
/**
* List of Python methods in MesosExecutorDriverImpl.
*/
PyMethodDef MesosExecutorDriverImpl_methods[] = {
{ "start",
(PyCFunction) MesosExecutorDriverImpl_start,
METH_NOARGS,
"Start the driver to connect to Mesos"
},
{ "stop",
(PyCFunction) MesosExecutorDriverImpl_stop,
METH_NOARGS,
"Stop the driver, disconnecting from Mesos"
},
{ "abort",
(PyCFunction) MesosExecutorDriverImpl_abort,
METH_NOARGS,
"Abort the driver, disallowing calls from and to the driver"
},
{ "join",
(PyCFunction) MesosExecutorDriverImpl_join,
METH_NOARGS,
"Wait for a running driver to disconnect from Mesos"
},
{ "run",
(PyCFunction) MesosExecutorDriverImpl_run,
METH_NOARGS,
"Start a driver and run it, returning when it disconnects from Mesos"
},
{ "sendStatusUpdate",
(PyCFunction) MesosExecutorDriverImpl_sendStatusUpdate,
METH_VARARGS,
"Send a status update for a task"
},
{ "sendFrameworkMessage",
(PyCFunction) MesosExecutorDriverImpl_sendFrameworkMessage,
METH_VARARGS,
"Send a FrameworkMessage to an agent"
},
{ nullptr } /* Sentinel */
};
/**
* Create, but don't initialize, a new MesosExecutorDriverImpl
* (called by Python before init method).
*/
PyObject* MesosExecutorDriverImpl_new(PyTypeObject *type,
PyObject *args,
PyObject *kwds)
{
MesosExecutorDriverImpl *self;
self = (MesosExecutorDriverImpl *) type->tp_alloc(type, 0);
if (self != nullptr) {
self->driver = nullptr;
self->proxyExecutor = nullptr;
self->pythonExecutor = nullptr;
}
return (PyObject*) self;
}
/**
* Initialize a MesosExecutorDriverImpl with constructor arguments.
*/
int MesosExecutorDriverImpl_init(MesosExecutorDriverImpl *self,
PyObject *args,
PyObject *kwds)
{
PyObject *pythonExecutor = nullptr;
if (!PyArg_ParseTuple(args, "O", &pythonExecutor)) {
return -1;
}
if (pythonExecutor != nullptr) {
PyObject* tmp = self->pythonExecutor;
Py_INCREF(pythonExecutor);
self->pythonExecutor = pythonExecutor;
Py_XDECREF(tmp);
}
if (self->driver != nullptr) {
delete self->driver;
self->driver = nullptr;
}
if (self->proxyExecutor != nullptr) {
delete self->proxyExecutor;
self->proxyExecutor = nullptr;
}
self->proxyExecutor = new ProxyExecutor(self);
self->driver = new MesosExecutorDriver(self->proxyExecutor);
return 0;
}
/**
* Free a MesosExecutorDriverImpl.
*/
void MesosExecutorDriverImpl_dealloc(MesosExecutorDriverImpl* self)
{
if (self->driver != nullptr) {
// We need to wrap the driver destructor in an "allow threads"
// macro since the MesosExecutorDriver destructor waits for the
// ExecutorProcess to terminate and there might be a thread that
// is trying to acquire the GIL to call through the
// ProxyExecutor. It will only be after this thread executes that
// the ExecutorProcess might actually get a terminate.
Py_BEGIN_ALLOW_THREADS
delete self->driver;
Py_END_ALLOW_THREADS
self->driver = nullptr;
}
if (self->proxyExecutor != nullptr) {
delete self->proxyExecutor;
self->proxyExecutor = nullptr;
}
MesosExecutorDriverImpl_clear(self);
self->ob_type->tp_free((PyObject*) self);
}
/**
* Traverse fields of a MesosExecutorDriverImpl on a cyclic GC search.
* See http://docs.python.org/extending/newtypes.html.
*/
int MesosExecutorDriverImpl_traverse(MesosExecutorDriverImpl* self,
visitproc visit,
void* arg)
{
Py_VISIT(self->pythonExecutor);
return 0;
}
/**
* Clear fields of a MesosExecutorDriverImpl that can participate in
* GC cycles. See http://docs.python.org/extending/newtypes.html.
*/
int MesosExecutorDriverImpl_clear(MesosExecutorDriverImpl* self)
{
Py_CLEAR(self->pythonExecutor);
return 0;
}
PyObject* MesosExecutorDriverImpl_start(MesosExecutorDriverImpl* self)
{
if (self->driver == nullptr) {
PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is nullptr");
return nullptr;
}
Status status = self->driver->start();
return PyInt_FromLong(status); // Sets an exception if creating the int fails.
}
PyObject* MesosExecutorDriverImpl_stop(MesosExecutorDriverImpl* self)
{
if (self->driver == nullptr) {
PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is nullptr");
return nullptr;
}
Status status = self->driver->stop();
return PyInt_FromLong(status); // Sets an exception if creating the int fails.
}
PyObject* MesosExecutorDriverImpl_abort(MesosExecutorDriverImpl* self)
{
if (self->driver == nullptr) {
PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is nullptr");
return nullptr;
}
Status status = self->driver->abort();
return PyInt_FromLong(status); // Sets an exception if creating the int fails.
}
PyObject* MesosExecutorDriverImpl_join(MesosExecutorDriverImpl* self)
{
if (self->driver == nullptr) {
PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is nullptr");
return nullptr;
}
Status status;
Py_BEGIN_ALLOW_THREADS
status = self->driver->join();
Py_END_ALLOW_THREADS
return PyInt_FromLong(status); // Sets an exception if creating the int fails.
}
PyObject* MesosExecutorDriverImpl_run(MesosExecutorDriverImpl* self)
{
if (self->driver == nullptr) {
PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is nullptr");
return nullptr;
}
Status status;
Py_BEGIN_ALLOW_THREADS
status = self->driver->run();
Py_END_ALLOW_THREADS
return PyInt_FromLong(status); // Sets an exception if creating the int fails.
}
PyObject* MesosExecutorDriverImpl_sendStatusUpdate(
MesosExecutorDriverImpl* self,
PyObject* args)
{
if (self->driver == nullptr) {
PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is nullptr");
return nullptr;
}
PyObject* statusObj = nullptr;
TaskStatus taskStatus;
if (!PyArg_ParseTuple(args, "O", &statusObj)) {
return nullptr;
}
if (!readPythonProtobuf(statusObj, &taskStatus)) {
PyErr_Format(PyExc_Exception,
"Could not deserialize Python TaskStatus");
return nullptr;
}
Status status = self->driver->sendStatusUpdate(taskStatus);
return PyInt_FromLong(status); // Sets an exception if creating the int fails.
}
PyObject* MesosExecutorDriverImpl_sendFrameworkMessage(
MesosExecutorDriverImpl* self,
PyObject* args)
{
if (self->driver == nullptr) {
PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is nullptr");
return nullptr;
}
const char* data;
int length;
if (!PyArg_ParseTuple(args, "s#", &data, &length)) {
return nullptr;
}
Status status = self->driver->sendFrameworkMessage(string(data, length));
return PyInt_FromLong(status); // Sets an exception if creating the int fails.
}
} // namespace python {
} // namespace mesos {