blob: b8cfa7bb02e5d362367d25c001e7a46528c15a69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <qpid/dispatch/ctools.h>
#include "agent_connection.h"
#include <inttypes.h>
#include <stdio.h>
#define QDR_CONNECTION_NAME 0
#define QDR_CONNECTION_IDENTITY 1
#define QDR_CONNECTION_HOST 2
#define QDR_CONNECTION_ROLE 3
#define QDR_CONNECTION_PROTOCOL 4
#define QDR_CONNECTION_DIR 5
#define QDR_CONNECTION_CONTAINER_ID 6
#define QDR_CONNECTION_SASL_MECHANISMS 7
#define QDR_CONNECTION_IS_AUTHENTICATED 8
#define QDR_CONNECTION_USER 9
#define QDR_CONNECTION_IS_ENCRYPTED 10
#define QDR_CONNECTION_SSLPROTO 11
#define QDR_CONNECTION_SSLCIPHER 12
#define QDR_CONNECTION_PROPERTIES 13
#define QDR_CONNECTION_SSLSSF 14
#define QDR_CONNECTION_TENANT 15
#define QDR_CONNECTION_TYPE 16
#define QDR_CONNECTION_SSL 17
#define QDR_CONNECTION_OPENED 18
#define QDR_CONNECTION_ACTIVE 19
#define QDR_CONNECTION_ADMIN_STATUS 20
#define QDR_CONNECTION_OPER_STATUS 21
#define QDR_CONNECTION_UPTIME_SECONDS 22
#define QDR_CONNECTION_LAST_DLV_SECONDS 23
#define QDR_CONNECTION_ENABLE_PROTOCOL_TRACE 24
const char * const QDR_CONNECTION_DIR_IN = "in";
const char * const QDR_CONNECTION_DIR_OUT = "out";
const char * QDR_CONNECTION_ADMIN_STATUS_DELETED = "deleted";
const char * QDR_CONNECTION_ADMIN_STATUS_ENABLED = "enabled";
const char * QDR_CONNECTION_OPER_STATUS_UP = "up";
const char * QDR_CONNECTION_OPER_STATUS_CLOSING = "closing";
const char *qdr_connection_roles[] =
{"normal",
"inter-router",
"route-container",
"edge",
0};
const char *qdr_connection_columns[] =
{"name",
"identity",
"host",
"role",
"protocol",
"dir",
"container",
"sasl",
"isAuthenticated",
"user",
"isEncrypted",
"sslProto",
"sslCipher",
"properties",
"sslSsf",
"tenant",
"type",
"ssl",
"opened",
"active",
"adminStatus",
"operStatus",
"uptimeSeconds",
"lastDlvSeconds",
"enableProtocolTrace",
0};
const char *CONNECTION_TYPE = "org.apache.qpid.dispatch.connection";
static void qd_get_next_pn_data(pn_data_t **data, const char **d, int *d1)
{
if (pn_data_next(*data)) {
switch (pn_data_type(*data)) {
case PN_STRING:
*d = pn_data_get_string(*data).start;
break;
case PN_SYMBOL:
*d = pn_data_get_symbol(*data).start;
break;
case PN_INT:
*d1 = pn_data_get_int(*data);
break;
case PN_LONG:
*d1 = pn_data_get_long(*data);
break;
default:
break;
}
}
}
static void qdr_connection_insert_column_CT(qdr_core_t *core, qdr_connection_t *conn, int col, qd_composed_field_t *body, bool as_map)
{
char id_str[100];
const char *text = 0;
if (!conn)
return;
if (as_map)
qd_compose_insert_string(body, qdr_connection_columns[col]);
switch(col) {
case QDR_CONNECTION_NAME:
qd_compose_insert_string2(body, "connection/", conn->connection_info->host);
break;
case QDR_CONNECTION_IDENTITY: {
snprintf(id_str, 100, "%"PRId64, conn->identity);
qd_compose_insert_string(body, id_str);
break;
}
case QDR_CONNECTION_HOST:
qd_compose_insert_string(body, conn->connection_info->host);
break;
case QDR_CONNECTION_ROLE:
qd_compose_insert_string(body, qdr_connection_roles[conn->connection_info->role]);
break;
case QDR_CONNECTION_PROTOCOL:
qd_compose_insert_string(body, conn->protocol_adaptor->name);
break;
case QDR_CONNECTION_DIR:
if (conn->connection_info->dir == QD_INCOMING)
qd_compose_insert_string(body, QDR_CONNECTION_DIR_IN);
else
qd_compose_insert_string(body, QDR_CONNECTION_DIR_OUT);
break;
case QDR_CONNECTION_CONTAINER_ID:
if (conn->connection_info->container)
qd_compose_insert_string(body, conn->connection_info->container);
else
qd_compose_insert_null(body);
break;
case QDR_CONNECTION_SASL_MECHANISMS:
if (conn->connection_info->sasl_mechanisms)
qd_compose_insert_string(body, conn->connection_info->sasl_mechanisms);
else
qd_compose_insert_null(body);
break;
case QDR_CONNECTION_IS_AUTHENTICATED:
qd_compose_insert_bool(body, conn->connection_info->is_authenticated);
break;
case QDR_CONNECTION_USER:
if (conn->connection_info->user)
qd_compose_insert_string(body, conn->connection_info->user);
else
qd_compose_insert_null(body);
break;
case QDR_CONNECTION_IS_ENCRYPTED:
qd_compose_insert_bool(body, conn->connection_info->is_encrypted);
break;
case QDR_CONNECTION_SSLPROTO:
if (conn->connection_info->ssl_proto[0] != '\0')
qd_compose_insert_string(body, conn->connection_info->ssl_proto);
else
qd_compose_insert_null(body);
break;
case QDR_CONNECTION_SSLCIPHER:
if (conn->connection_info->ssl_cipher[0] != '\0')
qd_compose_insert_string(body, conn->connection_info->ssl_cipher);
else
qd_compose_insert_null(body);
break;
case QDR_CONNECTION_SSLSSF:
qd_compose_insert_long(body, conn->connection_info->ssl_ssf);
break;
case QDR_CONNECTION_TENANT:
if (conn->tenant_space)
qd_compose_insert_string(body, conn->tenant_space);
else
qd_compose_insert_null(body);
break;
case QDR_CONNECTION_TYPE:
qd_compose_insert_string(body, CONNECTION_TYPE);
break;
case QDR_CONNECTION_SSL:
qd_compose_insert_bool(body, conn->connection_info->ssl);
break;
case QDR_CONNECTION_OPENED:
qd_compose_insert_bool(body, conn->connection_info->opened);
break;
case QDR_CONNECTION_ENABLE_PROTOCOL_TRACE:
qd_compose_insert_bool(body, conn->enable_protocol_trace);
break;
case QDR_CONNECTION_ACTIVE:
if (conn->role == QDR_ROLE_EDGE_CONNECTION) {
if (core->router_mode == QD_ROUTER_MODE_INTERIOR) {
qd_compose_insert_bool(body, true);
}
else if (core->router_mode == QD_ROUTER_MODE_EDGE){
if (core->active_edge_connection == conn)
qd_compose_insert_bool(body, true);
else
qd_compose_insert_bool(body, false);
}
}
else {
qd_compose_insert_bool(body, true);
}
break;
case QDR_CONNECTION_ADMIN_STATUS:
text = conn->closed ? QDR_CONNECTION_ADMIN_STATUS_DELETED : QDR_CONNECTION_ADMIN_STATUS_ENABLED;
qd_compose_insert_string(body, text);
break;
case QDR_CONNECTION_OPER_STATUS:
text = conn->closed ? QDR_CONNECTION_OPER_STATUS_CLOSING : QDR_CONNECTION_OPER_STATUS_UP;
qd_compose_insert_string(body, text);
break;
case QDR_CONNECTION_UPTIME_SECONDS:
qd_compose_insert_uint(body, core->uptime_ticks - conn->conn_uptime);
break;
case QDR_CONNECTION_LAST_DLV_SECONDS:
if (conn->last_delivery_time==0)
qd_compose_insert_null(body);
else
qd_compose_insert_uint(body, core->uptime_ticks - conn->last_delivery_time);
break;
case QDR_CONNECTION_PROPERTIES: {
pn_data_t *data = conn->connection_info->connection_properties;
qd_compose_start_map(body);
if (data) {
pn_data_next(data);
size_t count = pn_data_get_map(data);
pn_data_enter(data);
if (count > 0) {
for (size_t i = 0; i < count/2; i++) {
const char *key = 0;
// We are assuming for now that all keys are strings
qd_get_next_pn_data(&data, &key, 0);
const char *value_string = 0;
int value_int = 0;
// We are assuming for now that all values are either strings or integers
qd_get_next_pn_data(&data, &value_string, &value_int);
// We now have the key and the value. Do not insert the key or the value if key is empty
if (key) {
qd_compose_insert_string(body, key);
if (value_string)
qd_compose_insert_string(body, value_string);
else if (value_int)
qd_compose_insert_int(body, value_int);
}
}
}
pn_data_exit(data);
}
qd_compose_end_map(body);
}
break;
}
}
static void qdr_agent_write_connection_CT(qdr_core_t *core, qdr_query_t *query, qdr_connection_t *conn)
{
qd_composed_field_t *body = query->body;
qd_compose_start_list(body);
if (conn) {
int i = 0;
while (query->columns[i] >= 0) {
qdr_connection_insert_column_CT(core, conn, query->columns[i], body, false);
i++;
}
}
qd_compose_end_list(body);
}
static void qdr_manage_advance_connection_CT(qdr_query_t *query, qdr_connection_t *conn)
{
if (conn) {
query->next_offset++;
conn = DEQ_NEXT(conn);
query->more = !!conn;
}
else {
query->more = false;
}
}
void qdra_connection_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
{
//
// Queries that get this far will always succeed.
//
query->status = QD_AMQP_OK;
//
// If the offset goes beyond the set of objects, end the query now.
//
if (offset >= DEQ_SIZE(core->open_connections)) {
query->more = false;
qdr_agent_enqueue_response_CT(core, query);
return;
}
//
// Run to the object at the offset.
//
qdr_connection_t *conn = DEQ_HEAD(core->open_connections);
for (int i = 0; i < offset && conn; i++)
conn = DEQ_NEXT(conn);
assert(conn);
if (conn) {
//
// Write the columns of the object into the response body.
//
qdr_agent_write_connection_CT(core, query, conn);
//
// Advance to the next connection
//
query->next_offset = offset;
qdr_manage_advance_connection_CT(query, conn);
}
else {
query->more = false;
}
//
// Enqueue the response.
//
qdr_agent_enqueue_response_CT(core, query);
}
void qdra_connection_get_next_CT(qdr_core_t *core, qdr_query_t *query)
{
qdr_connection_t *conn = 0;
if (query->next_offset < DEQ_SIZE(core->open_connections)) {
conn = DEQ_HEAD(core->open_connections);
for (int i = 0; i < query->next_offset && conn; i++)
conn = DEQ_NEXT(conn);
}
if (conn) {
//
// Write the columns of the connection entity into the response body.
//
qdr_agent_write_connection_CT(core, query, conn);
//
// Advance to the next object
//
qdr_manage_advance_connection_CT(query, conn);
} else
query->more = false;
//
// Enqueue the response.
//
qdr_agent_enqueue_response_CT(core, query);
}
static void qdr_manage_write_connection_map_CT(qdr_core_t *core,
qdr_connection_t *conn,
qd_composed_field_t *body,
const char *qdr_connection_columns[])
{
qd_compose_start_map(body);
for(int i = 0; i < QDR_CONNECTION_COLUMN_COUNT; i++) {
qd_compose_insert_string(body, qdr_connection_columns[i]);
qdr_connection_insert_column_CT(core, conn, i, body, false);
}
qd_compose_end_map(body);
}
static qdr_connection_t *_find_conn_CT(qdr_core_t *core, uint64_t conn_id)
{
qdr_connection_t *conn = DEQ_HEAD(core->open_connections);
while (conn) {
if (conn->identity == conn_id)
break;
conn = DEQ_NEXT(conn);
}
return conn;
}
static qdr_connection_t *qdr_connection_find_by_identity_CT(qdr_core_t *core, qd_iterator_t *identity)
{
if (!identity)
return 0;
qdr_connection_t *conn = DEQ_HEAD(core->open_connections);
while (conn) {
// Convert the passed in identity to a char*
char id[100];
snprintf(id, 100, "%"PRId64, conn->identity);
if (qd_iterator_equal(identity, (const unsigned char*) id))
break;
conn = DEQ_NEXT(conn);
}
return conn;
}
void qdra_connection_get_CT(qdr_core_t *core,
qd_iterator_t *name,
qd_iterator_t *identity,
qdr_query_t *query,
const char *qdr_connection_columns[])
{
qdr_connection_t *conn = 0;
if (!identity) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "Name not supported. Identity required";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of %s: %s", CONNECTION_TYPE, query->status.description);
}
else {
conn = qdr_connection_find_by_identity_CT(core, identity);
if (conn == 0) {
// Send back a 404
query->status = QD_AMQP_NOT_FOUND;
}
else {
//
// Write the columns of the connection entity into the response body.
//
qdr_manage_write_connection_map_CT(core, conn, query->body, qdr_connection_columns);
query->status = QD_AMQP_OK;
}
}
//
// Enqueue the response.
//
qdr_agent_enqueue_response_CT(core, query);
}
static void qdra_connection_set_bad_request(qdr_query_t *query)
{
query->status = QD_AMQP_BAD_REQUEST;
qd_compose_start_map(query->body);
qd_compose_end_map(query->body);
}
static void qdra_connection_update_set_status(qdr_core_t *core, qdr_query_t *query, qdr_connection_t *conn, qd_parsed_field_t *admin_state)
{
if (conn) {
qd_iterator_t *admin_status_iter = qd_parse_raw(admin_state);
if (qd_iterator_equal(admin_status_iter, (unsigned char*) QDR_CONNECTION_ADMIN_STATUS_DELETED)) {
// This connection has been force-closed.
// Inter-router and edge connections may not be force-closed
if (conn->role != QDR_ROLE_INTER_ROUTER && conn->role != QDR_ROLE_EDGE_CONNECTION) {
qdr_close_connection_CT(core, conn);
qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Connection force-closed by request from connection [C%"PRIu64"]", conn->identity, query->in_conn);
query->status = QD_AMQP_OK;
qdr_manage_write_connection_map_CT(core, conn, query->body, qdr_connection_columns);
}
else {
//
// You are trying to delete an inter-router connection and that is always forbidden, no matter what
// policy rights you have.
//
query->status = QD_AMQP_FORBIDDEN;
query->status.description = "You are not allowed to perform this operation.";
qd_compose_start_map(query->body);
qd_compose_end_map(query->body);
}
}
else if (qd_iterator_equal(admin_status_iter, (unsigned char*) QDR_CONNECTION_ADMIN_STATUS_ENABLED)) {
query->status = QD_AMQP_OK;
qdr_manage_write_connection_map_CT(core, conn, query->body, qdr_connection_columns);
}
else {
qdra_connection_set_bad_request(query);
}
}
else {
query->status = QD_AMQP_NOT_FOUND;
qd_compose_start_map(query->body);
qd_compose_end_map(query->body);
}
}
void qdra_connection_update_CT(qdr_core_t *core,
qd_iterator_t *name,
qd_iterator_t *identity,
qdr_query_t *query,
qd_parsed_field_t *in_body)
{
// If the request was successful then the statusCode MUST contain 200 (OK) and the body of the message
// MUST contain a map containing the actual attributes of the entity updated. These MAY differ from those
// requested.
// A map containing attributes that are not applicable for the entity being created, or invalid values for a
// given attribute, MUST result in a failure response with a statusCode of 400 (Bad Request).
if (qd_parse_is_map(in_body) && identity) {
// The absence of an attribute name implies that the entity should retain its already existing value.
// If the map contains a key-value pair where the value is null then the updated entity should have no value
// for that attribute, removing any previous value.
qd_parsed_field_t *admin_state = qd_parse_value_by_key(in_body, qdr_connection_columns[QDR_CONNECTION_ADMIN_STATUS]);
// Find the connection that the user connected on. This connection must have the correct policy rights which
// will allow the user on this connection to terminate some other connection.
qdr_connection_t *user_conn = _find_conn_CT(core, query->in_conn);
qd_parsed_field_t *trace_field = qd_parse_value_by_key(in_body, qdr_connection_columns[QDR_CONNECTION_ENABLE_PROTOCOL_TRACE]);
//
// The only two fields that can be updated on a connection is the enableProtocolTrace flag and the admin state.
// If both these fields are not there, this is a bad request
// For example, this qdmanage is a bad request - qdmanage update --type=connection identity=1
//
if (!trace_field && !admin_state) {
qdra_connection_set_bad_request(query);
qdr_agent_enqueue_response_CT(core, query);
return;
}
bool enable_protocol_trace = !!trace_field ? qd_parse_as_bool(trace_field) : false;
qdr_connection_t *conn = qdr_connection_find_by_identity_CT(core, identity);
if (!conn) {
//
// The identity supplied was used to obtain the connection. If the connection was not found,
// it is possible that the connection went away or the wrong identity was provided.
// Either way we will have to let the caller know that this is a bad request.
//
qdra_connection_set_bad_request(query);
qdr_agent_enqueue_response_CT(core, query);
return;
}
bool admin_status_bad_or_forbidden = false;
if (admin_state) {
if (!user_conn) {
// This is bad. The user connection (that was requesting that some
// other connection be dropped) is gone
query->status.description = "Parent connection no longer exists";
qdra_connection_set_bad_request(query);
admin_status_bad_or_forbidden = true;
}
else {
bool allow = user_conn->policy_spec ? user_conn->policy_spec->allowAdminStatusUpdate : true;
if (!allow) {
//
// Policy on the connection that is requesting that some other connection be deleted does not allow
// for the other connection to be deleted.Set the status to QD_AMQP_FORBIDDEN and just quit.
//
query->status = QD_AMQP_FORBIDDEN;
query->status.description = "You are not allowed to perform this operation.";
qd_compose_start_map(query->body);
qd_compose_end_map(query->body);
admin_status_bad_or_forbidden = true;
}
else {
qdra_connection_update_set_status(core, query, conn, admin_state);
}
}
if (admin_status_bad_or_forbidden) {
//
// Enqueue the response and return
//
qdr_agent_enqueue_response_CT(core, query);
return;
}
}
if (trace_field) {
//
// Trace logging needs to be turned on if enableProtocolTrace is true.
// Trace logging needs to be turned off if enableProtocolTrace is false.
//
if (conn->enable_protocol_trace != enable_protocol_trace) {
qdr_connection_work_type_t work_type = QDR_CONNECTION_WORK_TRACING_ON;
conn->enable_protocol_trace = enable_protocol_trace;
if (!enable_protocol_trace) {
work_type = QDR_CONNECTION_WORK_TRACING_OFF;
}
qdr_connection_work_t *work = new_qdr_connection_work_t();
ZERO(work);
work->work_type = work_type;
qdr_connection_enqueue_work_CT(core, conn, work);
}
query->status = QD_AMQP_OK;
qdr_manage_write_connection_map_CT(core, conn, query->body, qdr_connection_columns);
}
} // if (qd_parse_is_map(in_body) && identity)
else {
qdra_connection_set_bad_request(query);
}
//
// Enqueue the response.
//
qdr_agent_enqueue_response_CT(core, query);
}