blob: 7dd68bd9626888186032514f802b47ab3f065eb9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <Python.h>
#include <stdio.h>
#include <string.h>
#include <stdbool.h>
#include <stdlib.h>
#include <qpid/dispatch/python_embedded.h>
//#include <qpid/dispatch/compose.h>
#include "agent.h"
#include "agent_private.h"
#include "schema_enum.h"
#include "compose_private.h"
#define MANAGEMENT_INTERNAL_MODULE "qpid_dispatch_internal.management.agent"
#define MANAGEMENT_MODULE "qpid_dispatch.management"
static PyObject *qd_post_management_request(PyObject *self,
PyObject *args,
PyObject *keywds)
{
int operation; //Is this a CREATE, READ, UPDATE, DELETE or QUERY
int entity_type; // Is this a listener or connector or address.... etc.
int count = 0; // used for queries only
int offset = 0; //used for queries only
PyObject *cid = 0;
PyObject *reply_to = 0;
PyObject *name = 0;
PyObject *identity = 0;
PyObject *body = 0;
static char *kwlist[] = {"cid", "reply_to", "name", "identity", "body", "operation", "entity_type", "count", "offset", NULL};
if (!PyArg_ParseTupleAndKeywords(args, keywds, "OOOOOiiii", kwlist, &cid, &reply_to, &name, &identity, &body, &operation, &entity_type, &count, &offset))
return 0;
qd_composed_field_t *field = qd_compose_subfield(0);
qd_py_to_composed(cid, field);
qd_py_to_composed(reply_to, field);
qd_py_to_composed(name, field);
qd_py_to_composed(identity, field);
qd_py_to_composed(body, field);
qd_buffer_list_t *buffers = qd_compose_buffers(field);
//
// Create a request and add it to the work_queue
//
qd_agent_request_t *request = NEW(qd_agent_request_t);
request->buffer_list = buffers;
request->count = count;
request->entity_type = entity_type;
request->operation = operation;
AgentAdapter *adapter = ((AgentAdapter*) self);
//request->ctx = adapter->agent->handlers[entity_type]->ctx;
qd_management_work_item_t *work_item = NEW(qd_management_work_item_t);
work_item->request = request;
//
// Add work item to the work item list after locking the work item list
//
sys_mutex_lock(adapter->agent->lock);
DEQ_INSERT_TAIL(adapter->agent->work_queue, work_item);
sys_mutex_unlock(adapter->agent->lock);
//create_handler(request);
//
// TODO - Kick off processing of the work queue
//
//qd_timer_schedule(adapter->agent->timer, 0);
return Py_None;
}
/**
* Declare all the methods in the AgentAdapter.
* post_management_request is the name of the method that the python side would call and qd_post_management_request is the C implementation
* of the function.
*/
static PyMethodDef AgentAdapter_functions[] = {
//{"post_management_request", (PyCFunction)qd_post_management_request, METH_VARARGS|METH_KEYWORDS, "Posts a management request to a work queue"},
{"post_management_request", (PyCFunction)qd_post_management_request, METH_VARARGS|METH_KEYWORDS, "Posts a management request to a work queue"},
{0, 0, 0, 0} // <-- Not sure why we need this
};
static PyTypeObject AgentAdapterType = {
PyObject_HEAD_INIT(0)
0, /* ob_size*/
MANAGEMENT_INTERNAL_MODULE ".AgentAdapter", /* tp_name*/
sizeof(AgentAdapter), /* tp_basicsize*/
0, /* tp_itemsize*/
0, /* tp_dealloc*/
0, /* tp_print*/
0, /* tp_getattr*/
0, /* tp_setattr*/
0, /* tp_compare*/
0, /* tp_repr*/
0, /* tp_as_number*/
0, /* tp_as_sequence*/
0, /* tp_as_mapping*/
0, /* tp_hash */
0, /* tp_call*/
0, /* tp_str*/
0, /* tp_getattro*/
0, /* tp_setattro*/
0, /* tp_as_buffer*/
Py_TPFLAGS_DEFAULT, /* tp_flags*/
"Agent request Adapter", /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
AgentAdapter_functions, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
0, /* tp_init */
0, /* tp_alloc */
0, /* tp_new */
0, /* tp_free */
0, /* tp_is_gc */
0, /* tp_bases */
0, /* tp_mro */
0, /* tp_cache */
0, /* tp_subclasses */
0, /* tp_weaklist */
0, /* tp_del */
0 /* tp_version_tag */
};
/*static void process_work_queue(void *context)
{
qd_agent_t *agent = (qd_agent_t *)context;
qd_management_work_item_t *work_item = DEQ_HEAD(agent->work_queue);
//TODO - The following works well with core but no corresponding functions for non-core
while(work_item) {
qd_agent_request_t *request = work_item->request;
qd_entity_type_handler_t *handler = agent->handlers[request->entity_type];
switch (request->operation) {
case QD_SCHEMA_ENTITY_OPERATION_READ:
handler->read_handler(request->ctx,request);
break;
case QD_SCHEMA_ENTITY_OPERATION_DELETE:
handler->delete_handler(request->ctx, request);
break;
case QD_SCHEMA_ENTITY_OPERATION_CREATE:
handler->create_handler(request->ctx, request);
break;
case QD_SCHEMA_ENTITY_OPERATION_UPDATE:
handler->update_handler(request->ctx, request);
break;
case QD_SCHEMA_ENTITY_OPERATION_QUERY:
handler->query_handler(request->ctx, request);
break;
case QD_SCHEMA_ENTITY_OPERATION_ENUM_COUNT:
break;
}
work_item = DEQ_NEXT(work_item);
}
}*/
qd_agent_t* qd_agent(qd_dispatch_t *qd, char *address, const char *config_path)
{
//
// Create a new instance of AgentAdapterType
//
AgentAdapterType.tp_new = PyType_GenericNew;
PyType_Ready(&AgentAdapterType);
// Load the qpid_dispatch_internal.management Python module
PyObject *module = PyImport_ImportModule(MANAGEMENT_INTERNAL_MODULE);
if (!module) {
qd_error_py();
//qd_log(log_source, QD_LOG_CRITICAL, "Cannot load dispatch extension module '%s'", MANAGEMENT_INTERNAL_MODULE);
abort();
}
PyTypeObject *agentAdapterType = &AgentAdapterType;
Py_INCREF(agentAdapterType);
//Use the "AgentAdapter" name to add the AgentAdapterType to the management
PyModule_AddObject(module, "AgentAdapter", (PyObject*) &AgentAdapterType);
PyObject *adapterType = PyObject_GetAttrString(module, "AgentAdapter");
PyObject *adapterInstance = PyObject_CallObject(adapterType, 0);
//
//Instantiate the new agent and return it
//
qd_agent_t *agent = NEW(qd_agent_t);
agent->adapter = ((AgentAdapter*) adapterInstance);
agent->qd = qd;
agent->address = address;
agent->config_file = config_path;
agent->log_source = qd_log_source("AGENT");
//agent->timer = qd_timer(qd, process_work_queue, agent);
DEQ_INIT(agent->work_queue);
agent->lock = sys_mutex();
AgentAdapter *adapter = ((AgentAdapter*) adapterInstance);
adapter->agent = agent;
//
// Initialize the handlers to zeros
//
for (int i=0; i < QD_SCHEMA_ENTITY_TYPE_ENUM_COUNT; i++)
agent->handlers[i] = 0;
Py_DECREF(agentAdapterType);
Py_DECREF(module);
//TODO - This is a test
qd_agent_start(agent);
return agent;
}
qd_error_t qd_agent_start(qd_agent_t *agent)
{
// Load the qpid_dispatch_internal.management Python module
PyObject *module = PyImport_ImportModule(MANAGEMENT_INTERNAL_MODULE);
char *class = "ManagementAgent";
//
//Instantiate the ManagementAgent class found in qpid_dispatch_internal/management/agent.py
//
PyObject* pClass = PyObject_GetAttrString(module, class); QD_ERROR_PY_RET();
//
// Constructor Arguments for ManagementAgent
//
PyObject* pArgs = PyTuple_New(3);
// arg 0: management address $management
PyObject *address = PyString_FromString(agent->address);
PyTuple_SetItem(pArgs, 0, address);
// arg 1: adapter instance
PyTuple_SetItem(pArgs, 1, (PyObject*)agent->adapter);
// arg 2: config file location
PyObject *config_file = PyString_FromString((char *)agent->config_file);
PyTuple_SetItem(pArgs, 2, config_file);
//
// Instantiate the ManagementAgent class
//
PyObject* pyManagementInstance = PyInstance_New(pClass, pArgs, 0); QD_ERROR_PY_RET();
if (!pyManagementInstance) {
qd_log(agent->log_source, QD_LOG_CRITICAL, "Cannot create instance of Python class '%s.%s'", MANAGEMENT_INTERNAL_MODULE, class);
}
Py_DECREF(pArgs);
Py_DECREF(pClass);
return qd_error_code();
}
void qd_agent_register_handlers(qd_agent_t *agent,
void *ctx,
qd_schema_entity_type_t entity_type,
qd_agent_handler_t create_handler,
qd_agent_handler_t read_handler,
qd_agent_handler_t update_handler,
qd_agent_handler_t delete_handler,
qd_agent_handler_t query_handler)
{
qd_entity_type_handler_t *entity_handler = NEW(qd_entity_type_handler_t);
entity_handler->ctx = ctx;
entity_handler->entity_type = entity_type;
entity_handler->delete_handler = delete_handler;
entity_handler->update_handler = update_handler;
entity_handler->query_handler = query_handler;
entity_handler->create_handler = create_handler;
entity_handler->read_handler = read_handler;
//Store the entity_handler in the appropriate cell of the handler array indexed by the enum qd_schema_entity_type_t
agent->handlers[entity_type] = entity_handler;
}
qd_buffer_list_t *get_request_buffers(qd_agent_request_t *request)
{
return request->buffer_list;
}
qd_schema_entity_type_t get_request_entity_type(qd_agent_request_t *request)
{
return request->entity_type;
}
void *get_request_context(qd_agent_request_t *request)
{
return request->ctx;
}
int get_request_count(qd_agent_request_t *request)
{
return request->count;
}
int get_request_offset(qd_agent_request_t *request)
{
return request->offset;
}
void qd_agent_free(qd_agent_t *agent)
{
}