// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Python.h must be included before standard headers.
// See: http://docs.python.org/2/c-api/intro.html#include-files
#include <Python.h>

#include <string>

#include "common.hpp"
#include "mesos_scheduler_driver_impl.hpp"
#include "proxy_scheduler.hpp"

using namespace mesos;
using namespace mesos::python;

using std::cerr;
using std::endl;
using std::string;
using std::vector;
using std::map;
using std::unique_ptr;

namespace mesos {
namespace python {

/**
 * Python type object for MesosSchedulerDriverImpl.
 */
PyTypeObject MesosSchedulerDriverImplType = {
  PyObject_HEAD_INIT(nullptr)
  0,                                                /* ob_size */
  "_mesos.MesosSchedulerDriverImpl",                /* tp_name */
  sizeof(MesosSchedulerDriverImpl),                 /* tp_basicsize */
  0,                                                /* tp_itemsize */
  (destructor) MesosSchedulerDriverImpl_dealloc,    /* tp_dealloc */
  0,                                                /* tp_print */
  0,                                                /* tp_getattr */
  0,                                                /* tp_setattr */
  0,                                                /* tp_compare */
  0,                                                /* tp_repr */
  0,                                                /* tp_as_number */
  0,                                                /* tp_as_sequence */
  0,                                                /* tp_as_mapping */
  0,                                                /* tp_hash */
  0,                                                /* tp_call */
  0,                                                /* tp_str */
  0,                                                /* tp_getattro */
  0,                                                /* tp_setattro */
  0,                                                /* tp_as_buffer */
  Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC,          /* tp_flags */
  "Private MesosSchedulerDriver implementation",    /* tp_doc */
  (traverseproc) MesosSchedulerDriverImpl_traverse, /* tp_traverse */
  (inquiry) MesosSchedulerDriverImpl_clear,         /* tp_clear */
  0,                                                /* tp_richcompare */
  0,                                                /* tp_weaklistoffset */
  0,                                                /* tp_iter */
  0,                                                /* tp_iternext */
  MesosSchedulerDriverImpl_methods,                 /* tp_methods */
  0,                                                /* tp_members */
  0,                                                /* tp_getset */
  0,                                                /* tp_base */
  0,                                                /* tp_dict */
  0,                                                /* tp_descr_get */
  0,                                                /* tp_descr_set */
  0,                                                /* tp_dictoffset */
  (initproc) MesosSchedulerDriverImpl_init,         /* tp_init */
  0,                                                /* tp_alloc */
  MesosSchedulerDriverImpl_new,                     /* tp_new */
};


/**
 * List of Python methods in MesosSchedulerDriverImpl.
 */
PyMethodDef MesosSchedulerDriverImpl_methods[] = {
  { "start",
    (PyCFunction) MesosSchedulerDriverImpl_start,
    METH_NOARGS,
    "Start the driver to connect to Mesos"
  },
  { "stop",
    (PyCFunction) MesosSchedulerDriverImpl_stop,
    METH_VARARGS,
    "Stop the driver, disconnecting from Mesos"
  },
  { "abort",
    (PyCFunction) MesosSchedulerDriverImpl_abort,
    METH_NOARGS,
    "Abort the driver, disabling calls from and to the driver"
  },
  { "join",
    (PyCFunction) MesosSchedulerDriverImpl_join,
    METH_NOARGS,
    "Wait for a running driver to disconnect from Mesos"
  },
  { "run",
    (PyCFunction) MesosSchedulerDriverImpl_run,
    METH_NOARGS,
    "Start a driver and run it, returning when it disconnects from Mesos"
  },
  { "requestResources",
    (PyCFunction) MesosSchedulerDriverImpl_requestResources,
    METH_VARARGS,
    "Request resources from the Mesos allocator"
  },
  { "launchTasks",
    (PyCFunction) MesosSchedulerDriverImpl_launchTasks,
    METH_VARARGS,
    "Reply to a Mesos offer with a list of tasks"
  },
  { "killTask",
    (PyCFunction) MesosSchedulerDriverImpl_killTask,
    METH_VARARGS,
    "Kill the task with the given ID"
  },
  { "acceptOffers",
    (PyCFunction) MesosSchedulerDriverImpl_acceptOffers,
    METH_VARARGS,
    "Reply to a Mesos offer with a list of offer operations"
  },
  { "declineOffer",
    (PyCFunction) MesosSchedulerDriverImpl_declineOffer,
    METH_VARARGS,
    "Decline a Mesos offer"
  },
  { "reviveOffers",
    (PyCFunction) MesosSchedulerDriverImpl_reviveOffers,
    METH_VARARGS,
    "Remove all filters, unsuppress and ask Mesos for new offers for the roles"
  },
  { "suppressOffers",
    (PyCFunction) MesosSchedulerDriverImpl_suppressOffers,
    METH_VARARGS,
    "Set suppressed roles for the Framework"
  },
  { "acknowledgeStatusUpdate",
    (PyCFunction) MesosSchedulerDriverImpl_acknowledgeStatusUpdate,
    METH_VARARGS,
    "Acknowledge a status update"
  },
  { "sendFrameworkMessage",
    (PyCFunction) MesosSchedulerDriverImpl_sendFrameworkMessage,
    METH_VARARGS,
    "Send a FrameworkMessage to an agent"
  },
  { "reconcileTasks",
    (PyCFunction) MesosSchedulerDriverImpl_reconcileTasks,
    METH_VARARGS,
    "Master sends status updates if task status is different from expected"
  },
  { "updateFramework",
    (PyCFunction) MesosSchedulerDriverImpl_updateFramework,
    METH_VARARGS,
    "Updates FrameworkInfo and suppressed roles"
  },
  { nullptr }  /* Sentinel */
};


/**
 * Create, but don't initialize, a new MesosSchedulerDriverImpl
 * (called by Python before init method).
 */
PyObject* MesosSchedulerDriverImpl_new(PyTypeObject* type,
                                       PyObject* args,
                                       PyObject* kwds)
{
  MesosSchedulerDriverImpl* self;
  self = (MesosSchedulerDriverImpl*) type->tp_alloc(type, 0);
  if (self != nullptr) {
    self->driver = nullptr;
    self->proxyScheduler = nullptr;
    self->pythonScheduler = nullptr;
  }
  return (PyObject*) self;
}


/**
 * Initialize a MesosSchedulerDriverImpl with constructor arguments.
 */
int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
                                  PyObject* args,
                                  PyObject* kwds)
{
  // Note: We use an integer for 'implicitAcknoweldgements' because
  // it is the recommended way to pass booleans through CPython.
  PyObject* schedulerObj = nullptr;
  PyObject* frameworkObj = nullptr;
  const char* master;
  int implicitAcknowledgements = 1; // Enabled by default.
  PyObject* credentialObj = nullptr;
  PyObject* suppressedRolesObj = nullptr;

  if (!PyArg_ParseTuple(
      args,
      "OOs|iOO",
      &schedulerObj,
      &frameworkObj,
      &master,
      &implicitAcknowledgements,
      &credentialObj,
      &suppressedRolesObj)) {
    return -1;
  }

  if (schedulerObj != nullptr) {
    PyObject* tmp = self->pythonScheduler;
    Py_INCREF(schedulerObj);
    self->pythonScheduler = schedulerObj;
    Py_XDECREF(tmp);
  }

  FrameworkInfo framework;
  if (frameworkObj != nullptr) {
    if (!readPythonProtobuf(frameworkObj, &framework)) {
      PyErr_Format(PyExc_Exception,
                   "Could not deserialize Python FrameworkInfo");
      return -1;
    }
  }

  Credential credential;
  if (credentialObj != nullptr) {
    if (!readPythonProtobuf(credentialObj, &credential)) {
      PyErr_Format(PyExc_Exception, "Could not deserialize Python Credential");
      return -1;
    }
  }

  unique_ptr<vector<string>> suppressedRoles;
  if (suppressedRolesObj != nullptr && suppressedRolesObj != Py_None) {
    suppressedRoles = constructFromIterable<string>(suppressedRolesObj);
    if (!suppressedRoles) {
      // Exception has been set by constructFromIterable
      return -1;
    }
  }

  if (self->driver != nullptr) {
    delete self->driver;
    self->driver = nullptr;
  }

  if (self->proxyScheduler != nullptr) {
    delete self->proxyScheduler;
    self->proxyScheduler = nullptr;
  }

  self->proxyScheduler = new ProxyScheduler(self);

  if (credentialObj != nullptr) {
    self->driver = new MesosSchedulerDriver(
        self->proxyScheduler,
        framework,
        suppressedRoles ? *suppressedRoles : vector<string>{},
        master,
        implicitAcknowledgements != 0,
        credential);
  } else {
    self->driver = new MesosSchedulerDriver(
        self->proxyScheduler,
        framework,
        suppressedRoles ? *suppressedRoles : vector<string>{},
        master,
        implicitAcknowledgements != 0);
  }

  return 0;
}


/**
 * Free a MesosSchedulerDriverImpl.
 */
void MesosSchedulerDriverImpl_dealloc(MesosSchedulerDriverImpl* self)
{
  if (self->driver != nullptr) {
    // We need to wrap the driver destructor in an "allow threads"
    // macro since the MesosSchedulerDriver destructor waits for the
    // SchedulerProcess to terminate and there might be a thread that
    // is trying to acquire the GIL to call through the
    // ProxyScheduler. It will only be after this thread executes that
    // the SchedulerProcess might actually get a terminate.
    Py_BEGIN_ALLOW_THREADS
    delete self->driver;
    Py_END_ALLOW_THREADS
    self->driver = nullptr;
  }

  if (self->proxyScheduler != nullptr) {
    delete self->proxyScheduler;
    self->proxyScheduler = nullptr;
  }

  MesosSchedulerDriverImpl_clear(self);
  self->ob_type->tp_free((PyObject*) self);
}


/**
 * Traverse fields of a MesosSchedulerDriverImpl on a cyclic GC search.
 * See http://docs.python.org/extending/newtypes.html.
 */
int MesosSchedulerDriverImpl_traverse(MesosSchedulerDriverImpl* self,
                                      visitproc visit,
                                      void* arg)
{
  Py_VISIT(self->pythonScheduler);
  return 0;
}


/**
 * Clear fields of a MesosSchedulerDriverImpl that can participate in
 * GC cycles. See http://docs.python.org/extending/newtypes.html.
 */
int MesosSchedulerDriverImpl_clear(MesosSchedulerDriverImpl* self)
{
  Py_CLEAR(self->pythonScheduler);
  return 0;
}


PyObject* MesosSchedulerDriverImpl_start(MesosSchedulerDriverImpl* self)
{
  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  Status status = self->driver->start();
  return PyInt_FromLong(status); // Sets exception if creating long fails.
}


PyObject* MesosSchedulerDriverImpl_stop(MesosSchedulerDriverImpl* self,
                                        PyObject* args)
{
  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  bool failover = false; // Should match default in mesos.py.

  if (!PyArg_ParseTuple(args, "|b", &failover)) {
    return nullptr;
  }

  Status status = self->driver->stop(failover);
  return PyInt_FromLong(status); // Sets exception if creating long fails.
}


PyObject* MesosSchedulerDriverImpl_abort(MesosSchedulerDriverImpl* self)
{
  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  Status status = self->driver->abort();
  return PyInt_FromLong(status); // Sets exception if creating long fails.
}


PyObject* MesosSchedulerDriverImpl_join(MesosSchedulerDriverImpl* self)
{
  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  Status status;
  Py_BEGIN_ALLOW_THREADS
  status = self->driver->join();
  Py_END_ALLOW_THREADS
  return PyInt_FromLong(status); // Sets exception if creating long fails.
}


PyObject* MesosSchedulerDriverImpl_run(MesosSchedulerDriverImpl* self)
{
  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  Status status;
  Py_BEGIN_ALLOW_THREADS
  status = self->driver->run();
  Py_END_ALLOW_THREADS
  return PyInt_FromLong(status); // Sets exception if creating long fails.
}


PyObject* MesosSchedulerDriverImpl_requestResources(
    MesosSchedulerDriverImpl* self,
    PyObject* args)
{
  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  PyObject* requestsObj = nullptr;
  vector<Request> requests;

  if (!PyArg_ParseTuple(args, "O", &requestsObj)) {
    return nullptr;
  }

  if (!PyList_Check(requestsObj)) {
    PyErr_Format(PyExc_Exception,
                 "Parameter 2 to requestsResources is not a list");
    return nullptr;
  }
  Py_ssize_t len = PyList_Size(requestsObj);
  for (int i = 0; i < len; i++) {
    PyObject* requestObj = PyList_GetItem(requestsObj, i);
    if (requestObj == nullptr) {
      return nullptr; // Exception will have been set by PyList_GetItem.
    }
    Request request;
    if (!readPythonProtobuf(requestObj, &request)) {
      PyErr_Format(PyExc_Exception, "Could not deserialize Python Request");
      return nullptr;
    }
    requests.push_back(request);
  }

  Status status = self->driver->requestResources(requests);
  return PyInt_FromLong(status); // Sets exception if creating long fails.
}


PyObject* MesosSchedulerDriverImpl_launchTasks(MesosSchedulerDriverImpl* self,
                                               PyObject* args)
{
  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  PyObject* offerIdsObj = nullptr;
  PyObject* tasksObj = nullptr;
  PyObject* filtersObj = nullptr;
  vector<OfferID> offerIds;
  vector<TaskInfo> tasks;
  Filters filters;

  if (!PyArg_ParseTuple(args, "OO|O", &offerIdsObj, &tasksObj, &filtersObj)) {
    return nullptr;
  }

  // Offer argument can be a list of offer ids or a single offer id (for
  // backward compatibility).
  if (!PyList_Check(offerIdsObj)) {
    OfferID offerId;
    if (!readPythonProtobuf(offerIdsObj, &offerId)) {
      PyErr_Format(PyExc_Exception, "Could not deserialize Python OfferID");
      return nullptr;
    }
    offerIds.push_back(offerId);
  } else {
    Py_ssize_t len = PyList_Size(offerIdsObj);
    for (int i = 0; i < len; i++) {
      PyObject* offerObj = PyList_GetItem(offerIdsObj, i);
      if (offerObj == nullptr) {
        return nullptr;
      }
      OfferID offerId;
      if (!readPythonProtobuf(offerObj, &offerId)) {
        PyErr_Format(PyExc_Exception,
                     "Could not deserialize Python OfferID");
        return nullptr;
      }
      offerIds.push_back(offerId);
    }
  }

  if (!PyList_Check(tasksObj)) {
    PyErr_Format(PyExc_Exception, "Parameter 2 to launchTasks is not a list");
    return nullptr;
  }
  Py_ssize_t len = PyList_Size(tasksObj);
  for (int i = 0; i < len; i++) {
    PyObject* taskObj = PyList_GetItem(tasksObj, i);
    if (taskObj == nullptr) {
      return nullptr; // Exception will have been set by PyList_GetItem.
    }
    TaskInfo task;
    if (!readPythonProtobuf(taskObj, &task)) {
      PyErr_Format(PyExc_Exception,
                   "Could not deserialize Python TaskInfo");
      return nullptr;
    }
    tasks.push_back(task);
  }

  if (filtersObj != nullptr) {
    if (!readPythonProtobuf(filtersObj, &filters)) {
      PyErr_Format(PyExc_Exception,
                   "Could not deserialize Python Filters");
      return nullptr;
    }
  }

  Status status = self->driver->launchTasks(offerIds, tasks, filters);
  return PyInt_FromLong(status); // Sets exception if creating long fails.
}


PyObject* MesosSchedulerDriverImpl_killTask(MesosSchedulerDriverImpl* self,
                                            PyObject* args)
{
  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  PyObject* tidObj = nullptr;
  TaskID tid;
  if (!PyArg_ParseTuple(args, "O", &tidObj)) {
    return nullptr;
  }
  if (!readPythonProtobuf(tidObj, &tid)) {
    PyErr_Format(PyExc_Exception, "Could not deserialize Python TaskID");
    return nullptr;
  }

  Status status = self->driver->killTask(tid);
  return PyInt_FromLong(status); // Sets exception if creating long fails.
}


PyObject* MesosSchedulerDriverImpl_acceptOffers(MesosSchedulerDriverImpl* self,
                                                PyObject* args)
{
  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  PyObject* offerIdsObj = nullptr;
  PyObject* operationsObj = nullptr;
  PyObject* filtersObj = nullptr;
  Py_ssize_t len = 0;
  vector<OfferID> offerIds;
  vector<Offer::Operation> operations;
  Filters filters;

  if (!PyArg_ParseTuple(args,
                        "OO|O",
                        &offerIdsObj,
                        &operationsObj,
                        &filtersObj)) {
    return nullptr;
  }

  if (!PyList_Check(offerIdsObj)) {
    PyErr_Format(PyExc_Exception, "Parameter 1 to acceptOffers is not a list");
    return nullptr;
  }

  len = PyList_Size(offerIdsObj);
  for (int i = 0; i < len; i++) {
    PyObject* offerObj = PyList_GetItem(offerIdsObj, i);
    if (offerObj == nullptr) {
      return nullptr;
    }

    OfferID offerId;
    if (!readPythonProtobuf(offerObj, &offerId)) {
      PyErr_Format(PyExc_Exception,
                   "Could not deserialize Python OfferID");
      return nullptr;
    }
    offerIds.push_back(offerId);
  }

  if (!PyList_Check(operationsObj)) {
    PyErr_Format(PyExc_Exception, "Parameter 2 to acceptOffers is not a list");
    return nullptr;
  }

  len = PyList_Size(operationsObj);
  for (int i = 0; i < len; i++) {
    PyObject* operationObj = PyList_GetItem(operationsObj, i);
    if (operationObj == nullptr) {
      return nullptr; // Exception will have been set by PyList_GetItem.
    }

    Offer::Operation operation;
    if (!readPythonProtobuf(operationObj, &operation)) {
      PyErr_Format(PyExc_Exception,
                   "Could not deserialize Python Offer.Operation");
      return nullptr;
    }
    operations.push_back(operation);
  }

  if (filtersObj != nullptr) {
    if (!readPythonProtobuf(filtersObj, &filters)) {
      PyErr_Format(PyExc_Exception,
                   "Could not deserialize Python Filters");
      return nullptr;
    }
  }

  Status status = self->driver->acceptOffers(offerIds, operations, filters);
  return PyInt_FromLong(status); // Sets exception if creating long fails.
}


PyObject* MesosSchedulerDriverImpl_declineOffer(MesosSchedulerDriverImpl* self,
                                                PyObject* args)
{
  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  PyObject* offerIdObj = nullptr;
  PyObject* filtersObj = nullptr;
  OfferID offerId;
  Filters filters;

  if (!PyArg_ParseTuple(args, "O|O", &offerIdObj, &filtersObj)) {
    return nullptr;
  }

  if (!readPythonProtobuf(offerIdObj, &offerId)) {
    PyErr_Format(PyExc_Exception, "Could not deserialize Python OfferID");
    return nullptr;
  }

  if (filtersObj != nullptr) {
    if (!readPythonProtobuf(filtersObj, &filters)) {
      PyErr_Format(PyExc_Exception,
                   "Could not deserialize Python Filters");
      return nullptr;
    }
  }

  Status status = self->driver->declineOffer(offerId, filters);
  return PyInt_FromLong(status); // Sets exception if creating long fails.
}


PyObject* MesosSchedulerDriverImpl_reviveOffers(
    MesosSchedulerDriverImpl* self,
    PyObject* args)
{
  PyObject* rolesObj = nullptr;
  if (!PyArg_ParseTuple(args, "|O", &rolesObj)) {
    return nullptr;
  }

  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  Status status;

  if (rolesObj == nullptr || rolesObj == Py_None) {
    status = self->driver->reviveOffers();
  } else {
    unique_ptr<vector<string>> roles = constructFromIterable<string>(rolesObj);
    if (!roles) {
      return nullptr;
    }

    status = self->driver->reviveOffers(*roles);
  }

  return PyInt_FromLong(status);
}


PyObject* MesosSchedulerDriverImpl_suppressOffers(
    MesosSchedulerDriverImpl* self,
    PyObject* args)
{
  PyObject* rolesObj = nullptr;
  if (!PyArg_ParseTuple(args, "|O", &rolesObj)) {
    return nullptr;
  }

  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  Status status;

  if (rolesObj == nullptr || rolesObj == Py_None) {
    status = self->driver->suppressOffers();
  } else {
    unique_ptr<vector<string>> roles = constructFromIterable<string>(rolesObj);
    if (!roles) {
      return nullptr;
    }

    status = self->driver->suppressOffers(*roles);
  }

  return PyInt_FromLong(status); // Sets exception if creating long fails.
}


PyObject* MesosSchedulerDriverImpl_acknowledgeStatusUpdate(
    MesosSchedulerDriverImpl* self,
    PyObject* args)
{
  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  PyObject* taskStatusObj = nullptr;
  TaskStatus taskStatus;

  if (!PyArg_ParseTuple(args, "O", &taskStatusObj)) {
    return nullptr;
  }

  if (!readPythonProtobuf(taskStatusObj, &taskStatus)) {
    PyErr_Format(PyExc_Exception, "Could not deserialize Python TaskStatus");
    return nullptr;
  }

  Status status = self->driver->acknowledgeStatusUpdate(taskStatus);

  return PyInt_FromLong(status); // Sets exception if creating long fails.
}


PyObject* MesosSchedulerDriverImpl_sendFrameworkMessage(
    MesosSchedulerDriverImpl* self,
    PyObject* args)
{
  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  PyObject* slaveIdObj = nullptr;
  PyObject* executorIdObj = nullptr;
  SlaveID slaveId;
  ExecutorID executorId;
  const char* data;
  int length;

  if (!PyArg_ParseTuple(
      args, "OOs#", &executorIdObj, &slaveIdObj, &data, &length)) {
    return nullptr;
  }

  if (!readPythonProtobuf(executorIdObj, &executorId)) {
    PyErr_Format(PyExc_Exception, "Could not deserialize Python ExecutorID");
    return nullptr;
  }

  if (!readPythonProtobuf(slaveIdObj, &slaveId)) {
    PyErr_Format(PyExc_Exception, "Could not deserialize Python SlaveID");
    return nullptr;
  }

  Status status = self->driver->sendFrameworkMessage(
      executorId, slaveId, string(data, length));

  return PyInt_FromLong(status); // Sets exception if creating long fails.
}


PyObject* MesosSchedulerDriverImpl_reconcileTasks(
    MesosSchedulerDriverImpl* self,
    PyObject* args)
{
  if (self->driver == nullptr) {
    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
    return nullptr;
  }

  PyObject* statusesObj = nullptr;
  vector<TaskStatus> statuses;

  if (!PyArg_ParseTuple(args, "O", &statusesObj)) {
    return nullptr;
  }

  if (!PyList_Check(statusesObj)) {
    PyErr_Format(PyExc_Exception,
                 "Parameter 1 to reconcileTasks is not a list");

    return nullptr;
  }

  Py_ssize_t len = PyList_Size(statusesObj);
  for (int i = 0; i < len; i++) {
    PyObject* statusObj = PyList_GetItem(statusesObj, i);
    if (statusObj == nullptr) {
      return nullptr;
    }

    TaskStatus status;
    if (!readPythonProtobuf(statusObj, &status)) {
      PyErr_Format(PyExc_Exception,
                   "Could not deserialize Python TaskStatus");
      return nullptr;
    }
    statuses.push_back(status);
  }

  Status status = self->driver->reconcileTasks(statuses);
  return PyInt_FromLong(status);
}


PyObject* MesosSchedulerDriverImpl_updateFramework(
    MesosSchedulerDriverImpl* self,
    PyObject* args)
{
  PyObject* frameworkObj = nullptr;
  PyObject* suppressedRolesObj = nullptr;

  if (!PyArg_ParseTuple(args, "OO", &frameworkObj, &suppressedRolesObj)) {
    return nullptr;
  }

  FrameworkInfo framework;
  if (!readPythonProtobuf(frameworkObj, &framework)) {
    PyErr_Format(PyExc_Exception,
                 "Could not deserialize Python FrameworkInfo");
    return nullptr;
  }

  unique_ptr<vector<string>> suppressedRoles;
  suppressedRoles = constructFromIterable<string>(suppressedRolesObj);
  if (!suppressedRoles) {
    // Exception has been set by constructFromIterable
    return nullptr;
  }

  Status status = self->driver->updateFramework(
      framework, *suppressedRoles, ::mesos::scheduler::OfferConstraints{});

  return PyInt_FromLong(status);
}


} // namespace python {
} // namespace mesos {
