| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <stdio.h> |
| #include <qpid/dispatch/parse.h> |
| #include <qpid/dispatch/iterator.h> |
| #include <qpid/dispatch/router.h> |
| #include <qpid/dispatch/router_core.h> |
| #include <qpid/dispatch/compose.h> |
| #include <qpid/dispatch/dispatch.h> |
| #include "router_core_private.h" |
| #include "dispatch_private.h" |
| #include "agent_link.h" |
| #include "alloc.h" |
| |
| const char *ENTITY = "entityType"; |
| const char *TYPE = "type"; |
| const char *COUNT = "count"; |
| const char *OFFSET = "offset"; |
| const char *NAME = "name"; |
| const char *IDENTITY = "identity"; |
| |
| |
| const char *OPERATION = "operation"; |
| const char *ATTRIBUTE_NAMES = "attributeNames"; |
| |
| const unsigned char *config_address_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.config.address"; |
| const unsigned char *link_route_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.config.linkRoute"; |
| const unsigned char *auto_link_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.config.autoLink"; |
| const unsigned char *address_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.address"; |
| const unsigned char *link_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.link"; |
| |
| const char * const status_description = "statusDescription"; |
| const char * const correlation_id = "correlation-id"; |
| const char * const results = "results"; |
| const char * const status_code = "statusCode"; |
| |
| const char * MANAGEMENT_INTERNAL = "_local/$_management_internal"; |
| |
| //TODO - Move these to amqp.h |
| const unsigned char *MANAGEMENT_QUERY = (unsigned char*) "QUERY"; |
| const unsigned char *MANAGEMENT_CREATE = (unsigned char*) "CREATE"; |
| const unsigned char *MANAGEMENT_READ = (unsigned char*) "READ"; |
| const unsigned char *MANAGEMENT_UPDATE = (unsigned char*) "UPDATE"; |
| const unsigned char *MANAGEMENT_DELETE = (unsigned char*) "DELETE"; |
| |
| |
| typedef enum { |
| QD_ROUTER_OPERATION_QUERY, |
| QD_ROUTER_OPERATION_CREATE, |
| QD_ROUTER_OPERATION_READ, |
| QD_ROUTER_OPERATION_UPDATE, |
| QD_ROUTER_OPERATION_DELETE |
| } qd_router_operation_type_t; |
| |
| |
| typedef struct qd_management_context_t { |
| qd_message_t *msg; |
| qd_message_t *source; |
| qd_composed_field_t *field; |
| qdr_query_t *query; |
| qdr_core_t *core; |
| int count; |
| int current_count; |
| qd_router_operation_type_t operation_type; |
| } qd_management_context_t ; |
| |
| ALLOC_DECLARE(qd_management_context_t); |
| ALLOC_DEFINE(qd_management_context_t); |
| |
| /** |
| * Convenience function to create and initialize context (qd_management_context_t) |
| */ |
| static qd_management_context_t* qd_management_context(qd_message_t *msg, |
| qd_message_t *source, |
| qd_composed_field_t *field, |
| qdr_query_t *query, |
| qdr_core_t *core, |
| qd_router_operation_type_t operation_type, |
| int count) |
| { |
| qd_management_context_t *ctx = new_qd_management_context_t(); |
| ctx->count = count; |
| ctx->field = field; |
| ctx->msg = msg; |
| ctx->source = qd_message_copy(source); |
| ctx->query = query; |
| ctx->current_count = 0; |
| ctx->core = core; |
| ctx->operation_type = operation_type; |
| |
| return ctx; |
| } |
| |
| |
| /** |
| * Sets the error status on a new composed field. |
| */ |
| static void qd_set_response_status(const qd_amqp_error_t *error, qd_composed_field_t **field) |
| { |
| // |
| // Insert appropriate success or error |
| // |
| *field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, *field); |
| qd_compose_start_map(*field); |
| |
| qd_compose_insert_string(*field, status_description); |
| qd_compose_insert_string(*field, error->description); |
| |
| qd_compose_insert_string(*field, status_code); |
| qd_compose_insert_uint(*field, error->status); |
| |
| qd_compose_end_map(*field); |
| } |
| |
| |
| static void qd_set_properties(qd_message_t *msg, |
| qd_field_iterator_t **reply_to, |
| qd_composed_field_t **fld) |
| { |
| qd_field_iterator_t *correlation_id = qd_message_field_iterator_typed(msg, QD_FIELD_CORRELATION_ID); |
| // Grab the reply_to field from the incoming message. This is the address we will send the response to. |
| *reply_to = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO); |
| *fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0); |
| qd_compose_start_list(*fld); |
| qd_compose_insert_null(*fld); // message-id |
| qd_compose_insert_null(*fld); // user-id |
| qd_compose_insert_string_iterator(*fld, *reply_to); // to |
| qd_compose_insert_null(*fld); // subject |
| qd_compose_insert_null(*fld); |
| qd_compose_insert_typed_iterator(*fld, correlation_id); |
| qd_compose_end_list(*fld); |
| qd_field_iterator_free(correlation_id); |
| } |
| |
| |
| static void qd_manage_response_handler(void *context, const qd_amqp_error_t *status, bool more) |
| { |
| qd_management_context_t *ctx = (qd_management_context_t*) context; |
| |
| if (ctx->operation_type == QD_ROUTER_OPERATION_QUERY) { |
| if (status->status / 100 == 2) { // There is no error, proceed to conditionally call get_next |
| if (more) { |
| ctx->current_count++; // Increment how many you have at hand |
| if (ctx->count != ctx->current_count) { |
| qdr_query_get_next(ctx->query); |
| return; |
| } else |
| // |
| // This is the one case where the core agent won't free the query itself. |
| // |
| qdr_query_free(ctx->query); |
| } |
| } |
| qd_compose_end_list(ctx->field); |
| qd_compose_end_map(ctx->field); |
| } |
| else if (ctx->operation_type == QD_ROUTER_OPERATION_DELETE) { |
| // The body of the delete response message MUST consist of an amqp-value section containing a Map with zero entries. |
| qd_compose_start_map(ctx->field); |
| qd_compose_end_map(ctx->field); |
| } |
| |
| qd_field_iterator_t *reply_to = 0; |
| qd_composed_field_t *fld = 0; |
| |
| // Start composing the message. |
| // First set the properties on the message like reply_to, correlation-id etc. |
| qd_set_properties(ctx->source, &reply_to, &fld); |
| |
| // Second, set the status on the message, QD_AMQP_OK or QD_AMQP_BAD_REQUEST and so on. |
| qd_set_response_status(status, &fld); |
| |
| // Finally, compose and send the message. |
| qd_message_compose_3(ctx->msg, fld, ctx->field); |
| qdr_send_to1(ctx->core, ctx->msg, reply_to, true, false); |
| |
| // We have come to the very end. Free the appropriate memory. |
| // Just go over this with Ted to see if I freed everything. |
| |
| qd_field_iterator_free(reply_to); |
| qd_compose_free(fld); |
| |
| if (ctx->msg) |
| qd_message_free(ctx->msg); |
| if (ctx->source) |
| qd_message_free(ctx->source); |
| qd_compose_free(ctx->field); |
| |
| free_qd_management_context_t(ctx); |
| } |
| |
| |
| static void qd_core_agent_query_handler(qdr_core_t *core, |
| qd_router_entity_type_t entity_type, |
| qd_router_operation_type_t operation_type, |
| qd_message_t *msg, |
| int *count, |
| int *offset) |
| { |
| // |
| // Add the Body. |
| // |
| qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); |
| |
| // Start a map in the body. Look for the end map in the callback function, qd_manage_response_handler. |
| qd_compose_start_map(field); |
| |
| //add a "attributeNames" key |
| qd_compose_insert_string(field, ATTRIBUTE_NAMES); |
| |
| // Call local function that creates and returns a local qd_management_context_t object containing the values passed in. |
| qd_management_context_t *ctx = qd_management_context(qd_message(), msg, field, 0, core, operation_type, (*count)); |
| |
| // Grab the attribute names from the incoming message body. The attribute names will be used later on in the response. |
| qd_parsed_field_t *attribute_names_parsed_field = 0; |
| |
| qd_field_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY); |
| |
| qd_parsed_field_t *body = qd_parse(body_iter); |
| if (body != 0 && qd_parse_is_map(body)) { |
| attribute_names_parsed_field = qd_parse_value_by_key(body, ATTRIBUTE_NAMES); |
| } |
| |
| // Set the callback function. |
| qdr_manage_handler(core, qd_manage_response_handler); |
| ctx->query = qdr_manage_query(core, ctx, entity_type, attribute_names_parsed_field, field); |
| |
| //Add the attribute names |
| qdr_query_add_attribute_names(ctx->query); //this adds adds a list of attribute names like ["attribute1", "attribute2", "attribute3", "attribute4",] |
| qd_compose_insert_string(field, results); //add a "results" key |
| qd_compose_start_list(field); //start the list for results |
| |
| qdr_query_get_first(ctx->query, (*offset)); |
| |
| qd_field_iterator_free(body_iter); |
| qd_parse_free(body); |
| } |
| |
| |
| static void qd_core_agent_read_handler(qdr_core_t *core, |
| qd_message_t *msg, |
| qd_router_entity_type_t entity_type, |
| qd_router_operation_type_t operation_type, |
| qd_field_iterator_t *identity_iter, |
| qd_field_iterator_t *name_iter) |
| { |
| // |
| // Add the Body |
| // |
| qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); |
| |
| // Set the callback function. |
| qdr_manage_handler(core, qd_manage_response_handler); |
| |
| // Call local function that creates and returns a qd_management_context_t containing the values passed in. |
| qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, core, operation_type, 0); |
| |
| //Call the read API function |
| qdr_manage_read(core, ctx, entity_type, name_iter, identity_iter, body); |
| } |
| |
| |
| static void qd_core_agent_create_handler(qdr_core_t *core, |
| qd_message_t *msg, |
| qd_router_entity_type_t entity_type, |
| qd_router_operation_type_t operation_type, |
| qd_field_iterator_t *name_iter) |
| { |
| // |
| // Add the Body |
| // |
| qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); |
| |
| // Set the callback function. |
| qdr_manage_handler(core, qd_manage_response_handler); |
| |
| // Call local function that creates and returns a qd_management_context_t containing the values passed in. |
| qd_management_context_t *ctx = qd_management_context(qd_message(), msg, out_body, 0, core, operation_type, 0); |
| |
| qd_field_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY); |
| |
| qd_parsed_field_t *in_body = qd_parse(body_iter); |
| |
| qdr_manage_create(core, ctx, entity_type, name_iter, in_body, out_body); |
| |
| qd_field_iterator_free(body_iter); |
| } |
| |
| |
| static void qd_core_agent_update_handler(qdr_core_t *core, |
| qd_message_t *msg, |
| qd_router_entity_type_t entity_type, |
| qd_router_operation_type_t operation_type, |
| qd_field_iterator_t *identity_iter, |
| qd_field_iterator_t *name_iter) |
| { |
| qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); |
| |
| // Set the callback function. |
| qdr_manage_handler(core, qd_manage_response_handler); |
| |
| qd_management_context_t *ctx = qd_management_context(qd_message(), msg, out_body, 0, core, operation_type, 0); |
| |
| qd_field_iterator_t *iter = qd_message_field_iterator(msg, QD_FIELD_BODY); |
| qd_parsed_field_t *in_body= qd_parse(iter); |
| qd_field_iterator_free(iter); |
| |
| qdr_manage_update(core, ctx, entity_type, name_iter, identity_iter, in_body, out_body); |
| |
| } |
| |
| |
| static void qd_core_agent_delete_handler(qdr_core_t *core, |
| qd_message_t *msg, |
| qd_router_entity_type_t entity_type, |
| qd_router_operation_type_t operation_type, |
| qd_field_iterator_t *identity_iter, |
| qd_field_iterator_t *name_iter) |
| { |
| // |
| // Add the Body |
| // |
| qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); |
| |
| // Set the callback function. |
| qdr_manage_handler(core, qd_manage_response_handler); |
| |
| // Call local function that creates and returns a qd_management_context_t containing the values passed in. |
| qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, core, operation_type, 0); |
| |
| qdr_manage_delete(core, ctx, entity_type, name_iter, identity_iter); |
| } |
| |
| |
| /** |
| * Checks the content of the message to see if this can be handled by the C-management agent. If this agent cannot handle it, it will be |
| * forwarded to the Python agent. |
| */ |
| static bool qd_can_handle_request(qd_parsed_field_t *properties_fld, |
| qd_router_entity_type_t *entity_type, |
| qd_router_operation_type_t *operation_type, |
| qd_field_iterator_t **identity_iter, |
| qd_field_iterator_t **name_iter, |
| int *count, |
| int *offset) |
| { |
| // The must be a property field and that property field should be a AMQP map. This is true for QUERY but I need |
| // to check if it true for CREATE, UPDATE and DELETE |
| if (properties_fld == 0 || !qd_parse_is_map(properties_fld)) |
| return false; |
| |
| // |
| // Only certain entity types can be handled by this agent. |
| // 'entityType': 'org.apache.qpid.dispatch.router.address |
| // 'entityType': 'org.apache.qpid.dispatch.router.link' |
| // TODO - Add more entity types here. The above is not a complete list. |
| |
| qd_parsed_field_t *parsed_field = qd_parse_value_by_key(properties_fld, IDENTITY); |
| if (parsed_field!=0) { |
| *identity_iter = qd_parse_raw(parsed_field); |
| } |
| parsed_field = qd_parse_value_by_key(properties_fld, NAME); |
| if (parsed_field!=0) { |
| *name_iter = qd_parse_raw(parsed_field); |
| } |
| |
| parsed_field = qd_parse_value_by_key(properties_fld, ENTITY); |
| |
| if (parsed_field == 0) { // Sometimes there is no 'entityType' but 'type' might be available. |
| parsed_field = qd_parse_value_by_key(properties_fld, TYPE); |
| if (parsed_field == 0) |
| return false; |
| } |
| |
| if (qd_field_iterator_equal(qd_parse_raw(parsed_field), address_entity_type)) |
| *entity_type = QD_ROUTER_ADDRESS; |
| else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), link_entity_type)) |
| *entity_type = QD_ROUTER_LINK; |
| else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), config_address_entity_type)) |
| *entity_type = QD_ROUTER_CONFIG_ADDRESS; |
| else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), link_route_entity_type)) |
| *entity_type = QD_ROUTER_CONFIG_LINK_ROUTE; |
| else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), auto_link_entity_type)) |
| *entity_type = QD_ROUTER_CONFIG_AUTO_LINK; |
| else |
| return false; |
| |
| |
| parsed_field = qd_parse_value_by_key(properties_fld, OPERATION); |
| |
| if (parsed_field == 0) |
| return false; |
| |
| if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_QUERY)) |
| (*operation_type) = QD_ROUTER_OPERATION_QUERY; |
| else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_CREATE)) |
| (*operation_type) = QD_ROUTER_OPERATION_CREATE; |
| else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_READ)) |
| (*operation_type) = QD_ROUTER_OPERATION_READ; |
| else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_UPDATE)) |
| (*operation_type) = QD_ROUTER_OPERATION_UPDATE; |
| else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_DELETE)) |
| (*operation_type) = QD_ROUTER_OPERATION_DELETE; |
| else |
| // This is an unknown operation type. cannot be handled, return false. |
| return false; |
| |
| // Obtain the count and offset. |
| parsed_field = qd_parse_value_by_key(properties_fld, COUNT); |
| if (parsed_field) |
| (*count) = qd_parse_as_int(parsed_field); |
| else |
| (*count) = -1; |
| |
| parsed_field = qd_parse_value_by_key(properties_fld, OFFSET); |
| if (parsed_field) |
| (*offset) = qd_parse_as_int(parsed_field); |
| else |
| (*offset) = 0; |
| |
| return true; |
| } |
| |
| |
| /** |
| * |
| * Handler for the management agent. |
| * |
| */ |
| void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unused_link_id) |
| { |
| qdr_core_t *core = (qdr_core_t*) context; |
| qd_field_iterator_t *app_properties_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES); |
| |
| qd_router_entity_type_t entity_type = 0; |
| qd_router_operation_type_t operation_type = 0; |
| |
| qd_field_iterator_t *identity_iter = 0; |
| qd_field_iterator_t *name_iter = 0; |
| |
| int32_t count = 0; |
| int32_t offset = 0; |
| |
| qd_parsed_field_t *properties_fld = qd_parse(app_properties_iter); |
| |
| if (qd_can_handle_request(properties_fld, &entity_type, &operation_type, &identity_iter, &name_iter, &count, &offset)) { |
| switch (operation_type) { |
| case QD_ROUTER_OPERATION_QUERY: |
| qd_core_agent_query_handler(core, entity_type, operation_type, msg, &count, &offset); |
| break; |
| case QD_ROUTER_OPERATION_CREATE: |
| qd_core_agent_create_handler(core, msg, entity_type, operation_type, name_iter); |
| break; |
| case QD_ROUTER_OPERATION_READ: |
| qd_core_agent_read_handler(core, msg, entity_type, operation_type, identity_iter, name_iter); |
| break; |
| case QD_ROUTER_OPERATION_UPDATE: |
| qd_core_agent_update_handler(core, msg, entity_type, operation_type, identity_iter, name_iter); |
| break; |
| case QD_ROUTER_OPERATION_DELETE: |
| qd_core_agent_delete_handler(core, msg, entity_type, operation_type, identity_iter, name_iter); |
| break; |
| } |
| } else { |
| // |
| // The C management agent is not going to handle this request. Forward it off to Python. |
| // |
| qdr_send_to2(core, msg, MANAGEMENT_INTERNAL, false, false); |
| } |
| |
| qd_field_iterator_free(app_properties_iter); |
| qd_parse_free(properties_fld); |
| |
| } |
| |