blob: 84cb9a214dce22495188edbcd7420ea2e7fa6a39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "edge_mgmt.h"
#include "core_client_api.h"
#include "link_route_proxy.h"
#include <inttypes.h>
#include <errno.h>
/*
* an API that lets the core issue management requests
*/
static qdrc_client_t *_client; // req/resp client
static qdrc_event_subscription_t *_event_handle; // edge conn up/down
static void _mgmt_on_state_cb_CT(qdr_core_t *, qdrc_client_t *, void *, bool);
static void _mgmt_on_flow_cb_CT(qdr_core_t *, qdrc_client_t *, void *, int, bool);
//
// edge uplink connection event handler
//
static void _conn_event_CT(void *context, qdrc_event_t event_type, qdr_connection_t *conn)
{
qdr_core_t *core = (qdr_core_t *) context;
switch (event_type) {
case QDRC_EVENT_CONN_EDGE_ESTABLISHED:
// create a messaging client to the interior router's $management
//
qd_log(core->log, QD_LOG_TRACE,
"starting edge mgmt client (id=%"PRIu64")", conn->identity);
qdr_terminus_t *target = qdr_terminus(0);
qdr_terminus_set_address(target, "$management");
_client = qdrc_client_CT(core,
conn,
target,
100, // credit
0, // user_context
_mgmt_on_state_cb_CT,
_mgmt_on_flow_cb_CT);
if (!_client) {
qd_log(core->log, QD_LOG_ERROR,
"Failed to start edge management client");
}
break;
case QDRC_EVENT_CONN_EDGE_LOST:
// clean up messaging client
//
qd_log(core->log, QD_LOG_TRACE,
"stopping edge mgmt client (id=%"PRIu64")", conn->identity);
qdrc_client_free_CT(_client);
_client = NULL;
break;
}
}
// Per anagement request context
//
typedef struct qcm_edge_mgmt_request_t qcm_edge_mgmt_request_t;
struct qcm_edge_mgmt_request_t {
void *req_context;
qcm_edge_mgmt_reply_CT_t reply_callback;
qcm_edge_mgmt_error_CT_t error_callback;
};
ALLOC_DECLARE(qcm_edge_mgmt_request_t);
ALLOC_DEFINE(qcm_edge_mgmt_request_t);
// utility to parse out status code from management response message
static int _extract_mgmt_status(qdr_core_t *core,
qd_iterator_t *app_properties,
int32_t *statusCode,
char **statusDescription)
{
int rc = 0;
*statusDescription = NULL;
*statusCode = 500;
qd_parsed_field_t *properties = qd_parse(app_properties);
if (!properties || !qd_parse_ok(properties) || !qd_parse_is_map(properties)) {
qd_log(core->log, QD_LOG_ERROR, "bad edge management reply msg - invalid properties field");
rc = EINVAL;
goto exit;
}
qd_parsed_field_t *status_fld = qd_parse_value_by_key(properties, "statusCode");
if (!status_fld) {
qd_log(core->log, QD_LOG_ERROR, "bad edge management reply msg - statusCode field missing");
rc = EINVAL;
goto exit;
}
*statusCode = qd_parse_as_int(status_fld);
if (!qd_parse_ok(status_fld)) {
qd_log(core->log, QD_LOG_ERROR, "bad edge management reply msg - statusCode field invalid");
rc = EINVAL;
goto exit;
}
qd_parsed_field_t *desc_fld = qd_parse_value_by_key(properties, "statusDescription");
if (desc_fld) { // it's optional, so no error if unset
qd_iterator_t *tmp = qd_parse_raw(desc_fld);
*statusDescription = (char *)qd_iterator_copy(tmp);
}
exit:
if (properties)
qd_parse_free(properties);
return rc;
}
// mgmt client link state changed
static void _mgmt_on_state_cb_CT(qdr_core_t *core,
qdrc_client_t *client,
void *user_context,
bool active)
{
qd_log(core->log, QD_LOG_TRACE,
"edge mgmt client state change: uc=%p %s",
user_context,
(active) ? "active" : "down");
qcm_edge_link_route_proxy_state_CT(core, active);
}
// mgmt client credit granted by interior router
static void _mgmt_on_flow_cb_CT(qdr_core_t *core,
qdrc_client_t *client,
void *user_context,
int more_credit,
bool drain)
{
qd_log(core->log, QD_LOG_TRACE,
"edge mgmt client flow: uc=%p c=%d d=%s",
user_context, more_credit,
(drain) ? "T" : "F");
qcm_edge_link_route_proxy_flow_CT(core,
more_credit,
drain);
}
// terminal disposition set on sent request
static void _mgmt_on_ack_cb_CT(qdr_core_t *core,
qdrc_client_t *client,
void *user_context,
void *request_context,
uint64_t disposition)
{
qcm_edge_mgmt_request_t *req = (qcm_edge_mgmt_request_t *)request_context;
qd_log(core->log, QD_LOG_TRACE,
"edge mgmt request update: rc=%p d=0x%"PRIx64,
req->req_context, disposition);
if (disposition != PN_ACCEPTED) {
// failure - no reply will be sent to cleanup
if (req->error_callback) {
req->error_callback(core, req->req_context, "Request not accepted");
req->error_callback = NULL; // avoid recalling on mgmt done
}
}
}
// reply message received
static uint64_t _mgmt_on_reply_cb_CT(qdr_core_t *core,
qdrc_client_t *client,
void *user_context,
void *request_context,
qd_iterator_t *app_properties,
qd_iterator_t *body)
{
int32_t statusCode = 0;
char *statusDescription = 0;
uint64_t disposition = PN_ACCEPTED;
qcm_edge_mgmt_request_t *req = (qcm_edge_mgmt_request_t *)request_context;
if (_extract_mgmt_status(core, app_properties, &statusCode, &statusDescription)) {
// error - bad response
statusCode = 500;
}
qd_iterator_free(app_properties);
qd_log(core->log, QD_LOG_TRACE,
"Edge management request reply:"
" rc=%p status=%"PRId32": %s",
req->req_context, statusCode,
(statusDescription) ? statusDescription : "<no description>");
if (req->reply_callback)
disposition = req->reply_callback(core,
req->req_context,
statusCode,
statusDescription,
body);
free(statusDescription);
return disposition;
}
// request completed or aborted due to error
static void _mgmt_on_done_cb_CT(qdr_core_t *core,
qdrc_client_t *client,
void *user_context,
void *request_context,
const char *error)
{
qcm_edge_mgmt_request_t *req = (qcm_edge_mgmt_request_t *)request_context;
qd_log(core->log, QD_LOG_TRACE,
"edge mgmt request done: uc=%p rc=%p %s",
user_context, request_context, error ? error : "");
if (error && req->error_callback)
req->error_callback(core, req->req_context, error);
free_qcm_edge_mgmt_request_t(req);
}
// send management request - takes ownership of body
int qcm_edge_mgmt_request_CT(qdr_core_t *core,
void *request_context,
const char *operation,
const char *entity_type,
const char *identity,
const char *name,
qd_composed_field_t *body,
uint32_t timeout,
qcm_edge_mgmt_reply_CT_t reply_cb,
qcm_edge_mgmt_error_CT_t error_cb)
{
qd_log(core->log, QD_LOG_TRACE,
"New Edge management request: rc=%p %s type=%s id=%s",
request_context, operation, entity_type,
(identity) ? identity : "<unset>");
qcm_edge_mgmt_request_t *req = new_qcm_edge_mgmt_request_t();
ZERO(req);
req->req_context = request_context;
req->reply_callback = reply_cb;
req->error_callback = error_cb;
// create a message containing a management request
qd_composed_field_t *ap_fld = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
qd_compose_start_map(ap_fld);
qd_compose_insert_string(ap_fld, "operation");
qd_compose_insert_string(ap_fld, operation);
qd_compose_insert_string(ap_fld, "type");
qd_compose_insert_string(ap_fld, entity_type);
if (identity) {
qd_compose_insert_string(ap_fld, "identity");
qd_compose_insert_string(ap_fld, identity);
}
// note: if there is a name specified qdrouterd expects to find it in the
// *properties* header, which doesn't jive with the current mgmt spec
// (WD12)
if (name) {
qd_compose_insert_string(ap_fld, "name");
qd_compose_insert_string(ap_fld, name);
}
qd_compose_end_map(ap_fld);
return qdrc_client_request_CT(_client,
req, // request context
ap_fld,
body,
timeout,
_mgmt_on_reply_cb_CT,
_mgmt_on_ack_cb_CT,
_mgmt_on_done_cb_CT);
}
void qcm_edge_mgmt_init_CT(qdr_core_t *core)
{
_event_handle = qdrc_event_subscribe_CT(core,
(QDRC_EVENT_CONN_EDGE_ESTABLISHED
| QDRC_EVENT_CONN_EDGE_LOST),
_conn_event_CT,
0, // link event
0, // addr event
0, // router event
core); // context
}
void qcm_edge_mgmt_final_CT(qdr_core_t *core)
{
qdrc_event_unsubscribe_CT(core, _event_handle);
qdrc_client_free_CT(_client);
_client = NULL;
}