blob: 03ac8cceb2fd5be94b3bf6a2667217ceb02e6ac3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <stdio.h>
#include <qpid/dispatch/parse.h>
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/router.h>
#include <qpid/dispatch/router_core.h>
#include <qpid/dispatch/compose.h>
#include <qpid/dispatch/dispatch.h>
#include <qpid/dispatch/connection_manager.h>
#include "dispatch_private.h"
#include "agent.h"
const char *ENTITY = "entityType";
const char *TYPE = "type";
const char *COUNT = "count";
const char *OFFSET = "offset";
const char *NAME = "name";
const char *IDENTITY = "identity";
const char *OPERATION = "operation";
const char *ATTRIBUTE_NAMES = "attributeNames";
const unsigned char *config_address_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.config.address";
const unsigned char *link_route_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.config.linkRoute";
const unsigned char *auto_link_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.config.autoLink";
const unsigned char *address_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.address";
const unsigned char *link_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.link";
const unsigned char *console_entity_type = (unsigned char*) "org.apache.qpid.dispatch.console";
const char * const status_description = "statusDescription";
const char * const correlation_id = "correlation-id";
const char * const results = "results";
const char * const status_code = "statusCode";
const char * MANAGEMENT_INTERNAL = "_local/$_management_internal";
//TODO - Move these to amqp.h
const unsigned char *MANAGEMENT_QUERY = (unsigned char*) "QUERY";
const unsigned char *MANAGEMENT_CREATE = (unsigned char*) "CREATE";
const unsigned char *MANAGEMENT_READ = (unsigned char*) "READ";
const unsigned char *MANAGEMENT_UPDATE = (unsigned char*) "UPDATE";
const unsigned char *MANAGEMENT_DELETE = (unsigned char*) "DELETE";
typedef struct qd_management_context_t {
qd_message_t *response;
qd_composed_field_t *out_body;
qd_field_iterator_t *reply_to;
qd_field_iterator_t *correlation_id;
qdr_query_t *query;
qdr_core_t *core;
int count;
int current_count;
qd_schema_entity_operation_t operation_type;
} qd_management_context_t ;
ALLOC_DECLARE(qd_management_context_t);
ALLOC_DEFINE(qd_management_context_t);
/**
* Convenience function to create and initialize context (qd_management_context_t)
*/
static qd_management_context_t* qd_management_context(qd_field_iterator_t *reply_to,
qd_field_iterator_t *correlation_id,
qdr_query_t *query,
qdr_core_t *core,
qd_schema_entity_operation_t operation_type,
int count)
{
qd_management_context_t *ctx = new_qd_management_context_t();
ctx->response = qd_message();
ctx->out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
ctx->reply_to = reply_to;
ctx->correlation_id = correlation_id;
ctx->query = query;
ctx->core = core;
ctx->count = count;
ctx->current_count = 0;
ctx->operation_type = operation_type;
return ctx;
}
static void qd_parse_agent_request(qd_agent_request_t *request,
qd_schema_entity_type_t *entity_type,
qd_field_iterator_t **cid,
qd_field_iterator_t **reply_to,
qd_field_iterator_t **name_iter,
qd_field_iterator_t **identity_iter,
qd_parsed_field_t **body)
{
qd_buffer_list_t *buffer_list = get_request_buffers(request);
qd_field_iterator_t *iter = qd_address_iterator_buffer(DEQ_HEAD(*buffer_list), 0, qd_buffer_list_length(buffer_list), ITER_VIEW_ALL);
*entity_type = get_request_entity_type(request);
*cid = qd_parse_raw(qd_parse(iter));
*reply_to = qd_parse_raw(qd_parse(iter));
*name_iter = qd_parse_raw(qd_parse(iter));
*identity_iter = qd_parse_raw(qd_parse(iter));
*body = qd_parse(iter);
}
/**
* Sets the error status on a new composed field.
*/
static void qd_set_response_status(const qd_amqp_error_t *error, qd_composed_field_t **field)
{
//
// Insert appropriate success or error
//
*field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, *field);
qd_compose_start_map(*field);
qd_compose_insert_string(*field, status_description);
qd_compose_insert_string(*field, error->description);
qd_compose_insert_string(*field, status_code);
qd_compose_insert_uint(*field, error->status);
qd_compose_end_map(*field);
}
static void qd_set_properties(qd_field_iterator_t *correlation_id,
qd_field_iterator_t *reply_to,
qd_composed_field_t **fld)
{
// Set the correlation_id and reply_to on fld
*fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
qd_compose_start_list(*fld);
qd_compose_insert_null(*fld); // message-id
qd_compose_insert_null(*fld); // user-id
qd_compose_insert_string_iterator(*fld, reply_to); // to
qd_compose_insert_null(*fld); // subject
qd_compose_insert_null(*fld);
qd_compose_insert_typed_iterator(*fld, correlation_id);
qd_compose_end_list(*fld);
qd_field_iterator_free(correlation_id);
}
void send_response(void *ctx,
qd_field_iterator_t *reply_to,
qd_field_iterator_t *correlation_id,
const qd_amqp_error_t *status,
qd_composed_field_t *out_body)
{
qd_composed_field_t *fld = 0;
// Start composing the message.
// First set the properties on the message like reply_to, correlation-id etc.
qd_set_properties(correlation_id, reply_to, &fld);
// Second, set the status on the message, QD_AMQP_OK or QD_AMQP_BAD_REQUEST and so on.
qd_set_response_status(status, &fld);
qd_message_t *response = qd_message();
qdr_core_t *core = (qdr_core_t*)ctx; // This is not right since this is also used by non-core stuff
// Finally, compose and send the message.
qd_message_compose_3(response, fld, out_body);
qdr_send_to1(core, response, reply_to, true, false);
qd_compose_free(fld);
}
void qd_create_ssl_profile(qd_agent_request_t *request)
{
qd_schema_entity_type_t entity_type;
qd_field_iterator_t *correlation_id;
qd_field_iterator_t *reply_to;
qd_field_iterator_t *name_iter;
qd_field_iterator_t *identity_iter;
qd_parsed_field_t *in_body;
qd_parse_agent_request(request, &entity_type, &correlation_id, &reply_to, &name_iter, &identity_iter, &in_body);
qd_dispatch_t *qd = (qd_dispatch_t*)get_request_context(request);
qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
qd_amqp_error_t status = qd_connection_manager_create_ssl_profile(qd, name_iter, in_body, out_body);
send_response(get_request_context(request), reply_to, correlation_id, &status, out_body);
}
void qd_query_ssl_profile(qd_agent_request_t *request)
{
qd_schema_entity_type_t entity_type;
qd_field_iterator_t *correlation_id;
qd_field_iterator_t *reply_to;
qd_field_iterator_t *name_iter;
qd_field_iterator_t *identity_iter;
qd_parsed_field_t *in_body;
qd_parse_agent_request(request, &entity_type, &correlation_id, &reply_to, &name_iter, &identity_iter, &in_body);
qd_dispatch_t *qd = (qd_dispatch_t*)get_request_context(request);
qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
qd_amqp_error_t status = qd_connection_manager_query_ssl_profile(qd, get_request_offset(request), get_request_count(request), in_body, out_body);
send_response(get_request_context(request), reply_to, correlation_id, &status, out_body);
}
void qd_delete_ssl_profile(qd_agent_request_t *request)
{
qd_schema_entity_type_t entity_type;
qd_field_iterator_t *correlation_id;
qd_field_iterator_t *reply_to;
qd_field_iterator_t *name_iter;
qd_field_iterator_t *identity_iter;
qd_parsed_field_t *in_body;
qd_parse_agent_request(request, &entity_type, &correlation_id, &reply_to, &name_iter, &identity_iter, &in_body);
}
static void qd_manage_response_handler(void *context, const qd_amqp_error_t *status, bool more)
{
qd_management_context_t *mgmt_ctx = (qd_management_context_t*) context;
if (mgmt_ctx->operation_type == QD_SCHEMA_ENTITY_OPERATION_QUERY) {
if (status->status / 100 == 2) { // There is no error, proceed to conditionally call get_next
if (more) {
mgmt_ctx->current_count++; // Increment how many you have at hand
if (mgmt_ctx->count != mgmt_ctx->current_count) {
qdr_query_get_next(mgmt_ctx->query);
return;
} else
//
// This is the one case where the core agent won't free the query itself.
//
qdr_query_free(mgmt_ctx->query);
}
}
qd_compose_end_list(mgmt_ctx->out_body);
qd_compose_end_map(mgmt_ctx->out_body);
}
else if (mgmt_ctx->operation_type == QD_SCHEMA_ENTITY_OPERATION_DELETE) {
// The body of the delete response message MUST consist of an amqp-value section containing a Map with zero entries.
qd_compose_start_map(mgmt_ctx->out_body);
qd_compose_end_map(mgmt_ctx->out_body);
}
else if (mgmt_ctx->operation_type == QD_SCHEMA_ENTITY_OPERATION_READ) {
if (status->status / 100 != 2) {
qd_compose_start_map(mgmt_ctx->out_body);
qd_compose_end_map(mgmt_ctx->out_body);
}
}
//Send the response to the caller
send_response(mgmt_ctx->core, mgmt_ctx->reply_to, mgmt_ctx->correlation_id, status, mgmt_ctx->out_body);
qd_message_free(mgmt_ctx->response);
qd_compose_free(mgmt_ctx->out_body);
free_qd_management_context_t(mgmt_ctx);
}
void qd_core_agent_query_handler(qd_agent_request_t *request)
{
qd_schema_entity_type_t entity_type;
qd_field_iterator_t *correlation_id;
qd_field_iterator_t *reply_to;
qd_field_iterator_t *name_iter;
qd_field_iterator_t *identity_iter;
qd_parsed_field_t *in_body;
qd_parse_agent_request(request, &entity_type, &correlation_id, &reply_to, &name_iter, &identity_iter, &in_body);
int count = get_request_count(request);
int offset = get_request_offset(request);
qdr_core_t *core = (qdr_core_t*)get_request_context(request);
// Call local function that creates and returns a local qd_management_context_t object containing the values passed in.
qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, correlation_id, 0, core, QD_SCHEMA_ENTITY_OPERATION_QUERY, count);
//
// Add the Body.
//
qd_composed_field_t *field = mgmt_ctx->out_body;
// Start a map in the body. Look for the end map in the callback function, qd_manage_response_handler.
qd_compose_start_map(field);
//add a "attributeNames" key
qd_compose_insert_string(field, ATTRIBUTE_NAMES);
// Grab the attribute names from the incoming message body. The attribute names will be used later on in the response.
qd_parsed_field_t *attribute_names_parsed_field = 0;
if (in_body != 0 && qd_parse_is_map(in_body)) {
attribute_names_parsed_field = qd_parse_value_by_key(in_body, ATTRIBUTE_NAMES);
}
// Set the callback function.
qdr_manage_handler(core, qd_manage_response_handler);
mgmt_ctx->query = qdr_manage_query(core, mgmt_ctx, entity_type, attribute_names_parsed_field, field);
//Add the attribute names
qdr_query_add_attribute_names(mgmt_ctx->query); //this adds a list of attribute names like ["attribute1", "attribute2", "attribute3", "attribute4",]
qd_compose_insert_string(field, results); //add a "results" key
qd_compose_start_list(field); //start the list for results
qdr_query_get_first(mgmt_ctx->query, offset);
qd_parse_free(in_body);
}
void qd_core_agent_create_handler(qd_agent_request_t *request)
{
qd_schema_entity_type_t entity_type;
qd_field_iterator_t *correlation_id;
qd_field_iterator_t *reply_to;
qd_field_iterator_t *name_iter;
qd_field_iterator_t *identity_iter;
qd_parsed_field_t *in_body;
qd_parse_agent_request(request, &entity_type, &correlation_id, &reply_to, &name_iter, &identity_iter, &in_body);
qdr_core_t *core = (qdr_core_t*)get_request_context(request);
// Set the callback function.
qdr_manage_handler(core, qd_manage_response_handler);
// Call local function that creates and returns a qd_management_context_t containing the values passed in.
qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, correlation_id, 0, core, QD_SCHEMA_ENTITY_OPERATION_CREATE, 0);
qdr_manage_create(core, mgmt_ctx, entity_type, name_iter, in_body, mgmt_ctx->out_body);
}
void qd_core_agent_read_handler(qd_agent_request_t *request)
{
qd_schema_entity_type_t entity_type;
qd_field_iterator_t *correlation_id;
qd_field_iterator_t *reply_to;
qd_field_iterator_t *name_iter;
qd_field_iterator_t *identity_iter;
qd_parsed_field_t *in_body;
qd_parse_agent_request(request, &entity_type, &correlation_id, &reply_to, &name_iter, &identity_iter, &in_body);
qdr_core_t *core = (qdr_core_t*)get_request_context(request);
// Set the callback function.
qdr_manage_handler(core, qd_manage_response_handler);
// Call local function that creates and returns a qd_management_context_t containing the values passed in.
qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, correlation_id, 0, core, QD_SCHEMA_ENTITY_OPERATION_READ, 0);
//Call the read API function
qdr_manage_read(core, mgmt_ctx, entity_type, name_iter, identity_iter, mgmt_ctx->out_body);
}
void qd_core_agent_update_handler(qd_agent_request_t *request)
{
qd_schema_entity_type_t entity_type;
qd_field_iterator_t *correlation_id;
qd_field_iterator_t *reply_to;
qd_field_iterator_t *name_iter;
qd_field_iterator_t *identity_iter;
qd_parsed_field_t *in_body;
qd_parse_agent_request(request, &entity_type, &correlation_id, &reply_to, &name_iter, &identity_iter, &in_body);
qdr_core_t *core = (qdr_core_t*)get_request_context(request);
qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, correlation_id, 0, core, QD_SCHEMA_ENTITY_OPERATION_UPDATE, 0);
qdr_manage_update(core, mgmt_ctx, entity_type, name_iter, identity_iter, in_body, mgmt_ctx->out_body);
}
void qd_core_agent_delete_handler(qd_agent_request_t *request)
{
qd_schema_entity_type_t entity_type;
qd_field_iterator_t *correlation_id;
qd_field_iterator_t *reply_to;
qd_field_iterator_t *name_iter;
qd_field_iterator_t *identity_iter;
qd_parsed_field_t *in_body;
qd_parse_agent_request(request, &entity_type, &correlation_id, &reply_to, &name_iter, &identity_iter, &in_body);
qdr_core_t *core = (qdr_core_t*)get_request_context(request);
// Set the callback function.
qdr_manage_handler(core, qd_manage_response_handler);
// Call local function that creates and returns a qd_management_context_t containing the values passed in.
qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, correlation_id, 0, core, QD_SCHEMA_ENTITY_OPERATION_DELETE, 0);
qdr_manage_delete(core, mgmt_ctx, entity_type, name_iter, identity_iter);
}