blob: dbbcaaf10d60eb7d2aad30091e4b00ddc733b19c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <stdio.h>
#include <qpid/dispatch/parse.h>
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/router.h>
#include <qpid/dispatch/router_core.h>
#include <qpid/dispatch/compose.h>
#include <qpid/dispatch/dispatch.h>
#include "router_core_private.h"
#include "dispatch_private.h"
#include "agent_link.h"
#include "alloc.h"
const char *ENTITY = "entityType";
const char *TYPE = "type";
const char *COUNT = "count";
const char *OFFSET = "offset";
const char *NAME = "name";
const char *IDENTITY = "identity";
const char *OPERATION = "operation";
const char *ATTRIBUTE_NAMES = "attributeNames";
const unsigned char *config_address_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.config.address";
const unsigned char *link_route_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.config.linkRoute";
const unsigned char *auto_link_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.config.autoLink";
const unsigned char *address_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.address";
const unsigned char *link_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.link";
const char * const status_description = "statusDescription";
const char * const correlation_id = "correlation-id";
const char * const results = "results";
const char * const status_code = "statusCode";
const char * MANAGEMENT_INTERNAL = "_local/$_management_internal";
//TODO - Move these to amqp.h
const unsigned char *MANAGEMENT_QUERY = (unsigned char*) "QUERY";
const unsigned char *MANAGEMENT_CREATE = (unsigned char*) "CREATE";
const unsigned char *MANAGEMENT_READ = (unsigned char*) "READ";
const unsigned char *MANAGEMENT_UPDATE = (unsigned char*) "UPDATE";
const unsigned char *MANAGEMENT_DELETE = (unsigned char*) "DELETE";
typedef enum {
QD_ROUTER_OPERATION_QUERY,
QD_ROUTER_OPERATION_CREATE,
QD_ROUTER_OPERATION_READ,
QD_ROUTER_OPERATION_UPDATE,
QD_ROUTER_OPERATION_DELETE
} qd_router_operation_type_t;
typedef struct qd_management_context_t {
qd_message_t *msg;
qd_message_t *source;
qd_composed_field_t *field;
qdr_query_t *query;
qdr_core_t *core;
int count;
int current_count;
qd_router_operation_type_t operation_type;
} qd_management_context_t ;
ALLOC_DECLARE(qd_management_context_t);
ALLOC_DEFINE(qd_management_context_t);
/**
* Convenience function to create and initialize context (qd_management_context_t)
*/
static qd_management_context_t* qd_management_context(qd_message_t *msg,
qd_message_t *source,
qd_composed_field_t *field,
qdr_query_t *query,
qdr_core_t *core,
qd_router_operation_type_t operation_type,
int count)
{
qd_management_context_t *ctx = new_qd_management_context_t();
ctx->count = count;
ctx->field = field;
ctx->msg = msg;
ctx->source = qd_message_copy(source);
ctx->query = query;
ctx->current_count = 0;
ctx->core = core;
ctx->operation_type = operation_type;
return ctx;
}
/**
* Sets the error status on a new composed field.
*/
static void qd_set_response_status(const qd_amqp_error_t *error, qd_composed_field_t **field)
{
//
// Insert appropriate success or error
//
*field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, *field);
qd_compose_start_map(*field);
qd_compose_insert_string(*field, status_description);
qd_compose_insert_string(*field, error->description);
qd_compose_insert_string(*field, status_code);
qd_compose_insert_uint(*field, error->status);
qd_compose_end_map(*field);
}
static void qd_set_properties(qd_message_t *msg,
qd_field_iterator_t **reply_to,
qd_composed_field_t **fld)
{
qd_field_iterator_t *correlation_id = qd_message_field_iterator_typed(msg, QD_FIELD_CORRELATION_ID);
// Grab the reply_to field from the incoming message. This is the address we will send the response to.
*reply_to = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO);
*fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
qd_compose_start_list(*fld);
qd_compose_insert_null(*fld); // message-id
qd_compose_insert_null(*fld); // user-id
qd_compose_insert_string_iterator(*fld, *reply_to); // to
qd_compose_insert_null(*fld); // subject
qd_compose_insert_null(*fld);
qd_compose_insert_typed_iterator(*fld, correlation_id);
qd_compose_end_list(*fld);
qd_field_iterator_free(correlation_id);
}
static void qd_manage_response_handler(void *context, const qd_amqp_error_t *status, bool more)
{
qd_management_context_t *ctx = (qd_management_context_t*) context;
if (ctx->operation_type == QD_ROUTER_OPERATION_QUERY) {
if (status->status / 100 == 2) { // There is no error, proceed to conditionally call get_next
if (more) {
ctx->current_count++; // Increment how many you have at hand
if (ctx->count != ctx->current_count) {
qdr_query_get_next(ctx->query);
return;
} else
//
// This is the one case where the core agent won't free the query itself.
//
qdr_query_free(ctx->query);
}
}
qd_compose_end_list(ctx->field);
qd_compose_end_map(ctx->field);
}
else if (ctx->operation_type == QD_ROUTER_OPERATION_DELETE) {
// The body of the delete response message MUST consist of an amqp-value section containing a Map with zero entries.
qd_compose_start_map(ctx->field);
qd_compose_end_map(ctx->field);
}
qd_field_iterator_t *reply_to = 0;
qd_composed_field_t *fld = 0;
// Start composing the message.
// First set the properties on the message like reply_to, correlation-id etc.
qd_set_properties(ctx->source, &reply_to, &fld);
// Second, set the status on the message, QD_AMQP_OK or QD_AMQP_BAD_REQUEST and so on.
qd_set_response_status(status, &fld);
// Finally, compose and send the message.
qd_message_compose_3(ctx->msg, fld, ctx->field);
qdr_send_to1(ctx->core, ctx->msg, reply_to, true, false);
// We have come to the very end. Free the appropriate memory.
// Just go over this with Ted to see if I freed everything.
qd_field_iterator_free(reply_to);
qd_compose_free(fld);
if (ctx->msg)
qd_message_free(ctx->msg);
if (ctx->source)
qd_message_free(ctx->source);
qd_compose_free(ctx->field);
free_qd_management_context_t(ctx);
}
static void qd_core_agent_query_handler(qdr_core_t *core,
qd_router_entity_type_t entity_type,
qd_router_operation_type_t operation_type,
qd_message_t *msg,
int *count,
int *offset)
{
//
// Add the Body.
//
qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
// Start a map in the body. Look for the end map in the callback function, qd_manage_response_handler.
qd_compose_start_map(field);
//add a "attributeNames" key
qd_compose_insert_string(field, ATTRIBUTE_NAMES);
// Call local function that creates and returns a local qd_management_context_t object containing the values passed in.
qd_management_context_t *ctx = qd_management_context(qd_message(), msg, field, 0, core, operation_type, (*count));
// Grab the attribute names from the incoming message body. The attribute names will be used later on in the response.
qd_parsed_field_t *attribute_names_parsed_field = 0;
qd_field_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
qd_parsed_field_t *body = qd_parse(body_iter);
if (body != 0 && qd_parse_is_map(body)) {
attribute_names_parsed_field = qd_parse_value_by_key(body, ATTRIBUTE_NAMES);
}
// Set the callback function.
qdr_manage_handler(core, qd_manage_response_handler);
ctx->query = qdr_manage_query(core, ctx, entity_type, attribute_names_parsed_field, field);
//Add the attribute names
qdr_query_add_attribute_names(ctx->query); //this adds adds a list of attribute names like ["attribute1", "attribute2", "attribute3", "attribute4",]
qd_compose_insert_string(field, results); //add a "results" key
qd_compose_start_list(field); //start the list for results
qdr_query_get_first(ctx->query, (*offset));
qd_field_iterator_free(body_iter);
qd_parse_free(body);
}
static void qd_core_agent_read_handler(qdr_core_t *core,
qd_message_t *msg,
qd_router_entity_type_t entity_type,
qd_router_operation_type_t operation_type,
qd_field_iterator_t *identity_iter,
qd_field_iterator_t *name_iter)
{
//
// Add the Body
//
qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
// Set the callback function.
qdr_manage_handler(core, qd_manage_response_handler);
// Call local function that creates and returns a qd_management_context_t containing the values passed in.
qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, core, operation_type, 0);
//Call the read API function
qdr_manage_read(core, ctx, entity_type, name_iter, identity_iter, body);
}
static void qd_core_agent_create_handler(qdr_core_t *core,
qd_message_t *msg,
qd_router_entity_type_t entity_type,
qd_router_operation_type_t operation_type,
qd_field_iterator_t *name_iter)
{
//
// Add the Body
//
qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
// Set the callback function.
qdr_manage_handler(core, qd_manage_response_handler);
// Call local function that creates and returns a qd_management_context_t containing the values passed in.
qd_management_context_t *ctx = qd_management_context(qd_message(), msg, out_body, 0, core, operation_type, 0);
qd_field_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
qd_parsed_field_t *in_body = qd_parse(body_iter);
qdr_manage_create(core, ctx, entity_type, name_iter, in_body, out_body);
qd_field_iterator_free(body_iter);
}
static void qd_core_agent_update_handler(qdr_core_t *core,
qd_message_t *msg,
qd_router_entity_type_t entity_type,
qd_router_operation_type_t operation_type,
qd_field_iterator_t *identity_iter,
qd_field_iterator_t *name_iter)
{
qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
// Set the callback function.
qdr_manage_handler(core, qd_manage_response_handler);
qd_management_context_t *ctx = qd_management_context(qd_message(), msg, out_body, 0, core, operation_type, 0);
qd_field_iterator_t *iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
qd_parsed_field_t *in_body= qd_parse(iter);
qd_field_iterator_free(iter);
qdr_manage_update(core, ctx, entity_type, name_iter, identity_iter, in_body, out_body);
}
static void qd_core_agent_delete_handler(qdr_core_t *core,
qd_message_t *msg,
qd_router_entity_type_t entity_type,
qd_router_operation_type_t operation_type,
qd_field_iterator_t *identity_iter,
qd_field_iterator_t *name_iter)
{
//
// Add the Body
//
qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
// Set the callback function.
qdr_manage_handler(core, qd_manage_response_handler);
// Call local function that creates and returns a qd_management_context_t containing the values passed in.
qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, core, operation_type, 0);
qdr_manage_delete(core, ctx, entity_type, name_iter, identity_iter);
}
/**
* Checks the content of the message to see if this can be handled by the C-management agent. If this agent cannot handle it, it will be
* forwarded to the Python agent.
*/
static bool qd_can_handle_request(qd_parsed_field_t *properties_fld,
qd_router_entity_type_t *entity_type,
qd_router_operation_type_t *operation_type,
qd_field_iterator_t **identity_iter,
qd_field_iterator_t **name_iter,
int *count,
int *offset)
{
// The must be a property field and that property field should be a AMQP map. This is true for QUERY but I need
// to check if it true for CREATE, UPDATE and DELETE
if (properties_fld == 0 || !qd_parse_is_map(properties_fld))
return false;
//
// Only certain entity types can be handled by this agent.
// 'entityType': 'org.apache.qpid.dispatch.router.address
// 'entityType': 'org.apache.qpid.dispatch.router.link'
// TODO - Add more entity types here. The above is not a complete list.
qd_parsed_field_t *parsed_field = qd_parse_value_by_key(properties_fld, IDENTITY);
if (parsed_field!=0) {
*identity_iter = qd_parse_raw(parsed_field);
}
parsed_field = qd_parse_value_by_key(properties_fld, NAME);
if (parsed_field!=0) {
*name_iter = qd_parse_raw(parsed_field);
}
parsed_field = qd_parse_value_by_key(properties_fld, ENTITY);
if (parsed_field == 0) { // Sometimes there is no 'entityType' but 'type' might be available.
parsed_field = qd_parse_value_by_key(properties_fld, TYPE);
if (parsed_field == 0)
return false;
}
if (qd_field_iterator_equal(qd_parse_raw(parsed_field), address_entity_type))
*entity_type = QD_ROUTER_ADDRESS;
else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), link_entity_type))
*entity_type = QD_ROUTER_LINK;
else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), config_address_entity_type))
*entity_type = QD_ROUTER_CONFIG_ADDRESS;
else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), link_route_entity_type))
*entity_type = QD_ROUTER_CONFIG_LINK_ROUTE;
else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), auto_link_entity_type))
*entity_type = QD_ROUTER_CONFIG_AUTO_LINK;
else
return false;
parsed_field = qd_parse_value_by_key(properties_fld, OPERATION);
if (parsed_field == 0)
return false;
if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_QUERY))
(*operation_type) = QD_ROUTER_OPERATION_QUERY;
else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_CREATE))
(*operation_type) = QD_ROUTER_OPERATION_CREATE;
else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_READ))
(*operation_type) = QD_ROUTER_OPERATION_READ;
else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_UPDATE))
(*operation_type) = QD_ROUTER_OPERATION_UPDATE;
else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_DELETE))
(*operation_type) = QD_ROUTER_OPERATION_DELETE;
else
// This is an unknown operation type. cannot be handled, return false.
return false;
// Obtain the count and offset.
parsed_field = qd_parse_value_by_key(properties_fld, COUNT);
if (parsed_field)
(*count) = qd_parse_as_int(parsed_field);
else
(*count) = -1;
parsed_field = qd_parse_value_by_key(properties_fld, OFFSET);
if (parsed_field)
(*offset) = qd_parse_as_int(parsed_field);
else
(*offset) = 0;
return true;
}
/**
*
* Handler for the management agent.
*
*/
void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unused_link_id)
{
qdr_core_t *core = (qdr_core_t*) context;
qd_field_iterator_t *app_properties_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
qd_router_entity_type_t entity_type = 0;
qd_router_operation_type_t operation_type = 0;
qd_field_iterator_t *identity_iter = 0;
qd_field_iterator_t *name_iter = 0;
int32_t count = 0;
int32_t offset = 0;
qd_parsed_field_t *properties_fld = qd_parse(app_properties_iter);
if (qd_can_handle_request(properties_fld, &entity_type, &operation_type, &identity_iter, &name_iter, &count, &offset)) {
switch (operation_type) {
case QD_ROUTER_OPERATION_QUERY:
qd_core_agent_query_handler(core, entity_type, operation_type, msg, &count, &offset);
break;
case QD_ROUTER_OPERATION_CREATE:
qd_core_agent_create_handler(core, msg, entity_type, operation_type, name_iter);
break;
case QD_ROUTER_OPERATION_READ:
qd_core_agent_read_handler(core, msg, entity_type, operation_type, identity_iter, name_iter);
break;
case QD_ROUTER_OPERATION_UPDATE:
qd_core_agent_update_handler(core, msg, entity_type, operation_type, identity_iter, name_iter);
break;
case QD_ROUTER_OPERATION_DELETE:
qd_core_agent_delete_handler(core, msg, entity_type, operation_type, identity_iter, name_iter);
break;
}
} else {
//
// The C management agent is not going to handle this request. Forward it off to Python.
//
qdr_send_to2(core, msg, MANAGEMENT_INTERNAL, false, false);
}
qd_field_iterator_free(app_properties_iter);
qd_parse_free(properties_fld);
}