| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <Python.h> |
| #include "qpid/dispatch/python_embedded.h" |
| #include "policy.h" |
| #include "policy_internal.h" |
| #include <stdio.h> |
| #include <string.h> |
| #include "dispatch_private.h" |
| #include "qpid/dispatch/container.h" |
| #include "qpid/dispatch/server.h" |
| #include <proton/message.h> |
| #include <proton/condition.h> |
| #include <proton/connection.h> |
| #include <proton/transport.h> |
| #include <proton/error.h> |
| #include <proton/event.h> |
| |
| |
| // |
| // The current statistics maintained globally through multiple |
| // reconfiguration of policy settings. |
| // |
| static int n_connections = 0; |
| static int n_denied = 0; |
| static int n_processed = 0; |
| |
| // |
| // error conditions signaled to effect denial |
| // |
| static char* RESOURCE_LIMIT_EXCEEDED = "amqp:resource-limit-exceeded"; |
| //static char* UNAUTHORIZED_ACCESS = "amqp:unauthorized-access"; |
| //static char* CONNECTION_FORCED = "amqp:connection:forced"; |
| |
| // |
| // error descriptions signaled to effect denial |
| // |
| static char* CONNECTION_DISALLOWED = "connection disallowed by local policy"; |
| static char* SESSION_DISALLOWED = "session disallowed by local policy"; |
| static char* LINK_DISALLOWED = "link disallowed by local policy"; |
| |
| // |
| // Policy configuration/statistics management interface |
| // |
| struct qd_policy_t { |
| qd_dispatch_t *qd; |
| qd_log_source_t *log_source; |
| void *py_policy_manager; |
| // configured settings |
| int max_connection_limit; |
| char *policyDir; |
| bool enableVhostPolicy; |
| // live statistics |
| int connections_processed; |
| int connections_denied; |
| int connections_current; |
| }; |
| |
| /** Create the policy structure |
| * @param[in] qd pointer the the qd |
| **/ |
| qd_policy_t *qd_policy(qd_dispatch_t *qd) |
| { |
| qd_policy_t *policy = NEW(qd_policy_t); |
| |
| policy->qd = qd; |
| policy->log_source = qd_log_source("POLICY"); |
| policy->max_connection_limit = 65535; |
| policy->policyDir = 0; |
| policy->enableVhostPolicy = false; |
| policy->connections_processed= 0; |
| policy->connections_denied = 0; |
| policy->connections_current = 0; |
| |
| qd_log(policy->log_source, QD_LOG_TRACE, "Policy Initialized"); |
| return policy; |
| } |
| |
| |
| /** Free the policy structure |
| * @param[in] policy pointer to the policy |
| **/ |
| void qd_policy_free(qd_policy_t *policy) |
| { |
| if (policy->policyDir) |
| free(policy->policyDir); |
| free(policy); |
| } |
| |
| // |
| // |
| #define CHECK() if (qd_error_code()) goto error |
| |
| qd_error_t qd_entity_configure_policy(qd_policy_t *policy, qd_entity_t *entity) |
| { |
| policy->max_connection_limit = qd_entity_opt_long(entity, "maxConnections", 65535); CHECK(); |
| if (policy->max_connection_limit < 0) |
| return qd_error(QD_ERROR_CONFIG, "maxConnections must be >= 0"); |
| policy->policyDir = |
| qd_entity_opt_string(entity, "policyDir", 0); CHECK(); |
| policy->enableVhostPolicy = qd_entity_opt_bool(entity, "enableVhostPolicy", false); CHECK(); |
| qd_log(policy->log_source, QD_LOG_INFO, "Policy configured maxConnections: %d, policyDir: '%s', access rules enabled: '%s'", |
| policy->max_connection_limit, policy->policyDir, (policy->enableVhostPolicy ? "true" : "false")); |
| return QD_ERROR_NONE; |
| |
| error: |
| if (policy->policyDir) |
| free(policy->policyDir); |
| qd_policy_free(policy); |
| return qd_error_code(); |
| } |
| |
| |
| // |
| // |
| qd_error_t qd_register_policy_manager(qd_policy_t *policy, void *policy_manager) |
| { |
| policy->py_policy_manager = policy_manager; |
| return QD_ERROR_NONE; |
| } |
| |
| |
| long qd_policy_c_counts_alloc() |
| { |
| qd_policy_denial_counts_t * dc = NEW(qd_policy_denial_counts_t); |
| assert(dc); |
| memset(dc, 0, sizeof(qd_policy_denial_counts_t)); |
| return (long)dc; |
| } |
| |
| |
| void qd_policy_c_counts_free(long ccounts) |
| { |
| void *dc = (void *)ccounts; |
| assert(dc); |
| free(dc); |
| } |
| |
| |
| qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t *entity) |
| { |
| qd_policy_denial_counts_t *dc = (qd_policy_denial_counts_t*)ccounts; |
| if (!qd_entity_set_long(entity, "sessionDenied", dc->sessionDenied) && |
| !qd_entity_set_long(entity, "senderDenied", dc->senderDenied) && |
| !qd_entity_set_long(entity, "receiverDenied", dc->receiverDenied) |
| ) |
| return QD_ERROR_NONE; |
| return qd_error_code(); |
| } |
| |
| |
| /** Update the statistics in qdrouterd.conf["policy"] |
| * @param[in] entity pointer to the policy management object |
| **/ |
| qd_error_t qd_entity_refresh_policy(qd_entity_t* entity, void *unused) { |
| // Return global stats |
| if (!qd_entity_set_long(entity, "connectionsProcessed", n_processed) && |
| !qd_entity_set_long(entity, "connectionsDenied", n_denied) && |
| !qd_entity_set_long(entity, "connectionsCurrent", n_connections) |
| ) |
| return QD_ERROR_NONE; |
| return qd_error_code(); |
| } |
| |
| |
| // |
| // Functions related to absolute connection counts. |
| // These handle connections at the socket level with |
| // no regard to user identity. Simple yes/no decisions |
| // are made and there is no AMQP channel for returning |
| // error conditions. |
| // |
| |
| bool qd_policy_socket_accept(void *context, const char *hostname) |
| { |
| qd_policy_t *policy = (qd_policy_t *)context; |
| bool result = true; |
| if (n_connections < policy->max_connection_limit) { |
| // connection counted and allowed |
| n_connections += 1; |
| qd_log(policy->log_source, QD_LOG_TRACE, "ALLOW Connection '%s' based on global connection count. N= %d", hostname, n_connections); |
| } else { |
| // connection denied |
| result = false; |
| n_denied += 1; |
| qd_log(policy->log_source, QD_LOG_INFO, "DENY Connection '%s' based on global connection count. N= %d", hostname, n_connections); |
| } |
| n_processed += 1; |
| return result; |
| } |
| |
| |
| // |
| // |
| void qd_policy_socket_close(void *context, const qd_connection_t *conn) |
| { |
| qd_policy_t *policy = (qd_policy_t *)context; |
| |
| n_connections -= 1; |
| assert (n_connections >= 0); |
| if (policy->enableVhostPolicy) { |
| // HACK ALERT: TODO: This should be deferred to a Python thread |
| qd_python_lock_state_t lock_state = qd_python_lock(); |
| PyObject *module = PyImport_ImportModule("qpid_dispatch_internal.policy.policy_manager"); |
| if (module) { |
| PyObject *close_connection = PyObject_GetAttrString(module, "policy_close_connection"); |
| if (close_connection) { |
| PyObject *result = PyObject_CallFunction(close_connection, "(OK)", |
| (PyObject *)policy->py_policy_manager, |
| conn->connection_id); |
| if (result) { |
| Py_XDECREF(result); |
| } else { |
| qd_log(policy->log_source, QD_LOG_DEBUG, "Internal: Connection close failed: result"); |
| } |
| Py_XDECREF(close_connection); |
| } else { |
| qd_log(policy->log_source, QD_LOG_DEBUG, "Internal: Connection close failed: close_connection"); |
| } |
| Py_XDECREF(module); |
| } else { |
| qd_log(policy->log_source, QD_LOG_DEBUG, "Internal: Connection close failed: module"); |
| } |
| qd_python_unlock(lock_state); |
| } |
| const char *hostname = qdpn_connector_name(conn->pn_cxtr); |
| qd_log(policy->log_source, QD_LOG_DEBUG, "Connection '%s' closed with resources n_sessions=%d, n_senders=%d, n_receivers=%d. nConnections= %d.", |
| hostname, conn->n_sessions, conn->n_senders, conn->n_receivers, n_connections); |
| } |
| |
| |
| // |
| // Functions related to authenticated connection denial. |
| // An AMQP Open has been received over some connection. |
| // Evaluate the connection auth and the Open fields to |
| // allow or deny the Open. Denied Open attempts are |
| // effected by returning Open and then Close_with_condition. |
| // |
| /** Look up user/host/app in python vhost and give the AMQP Open |
| * a go-no_go decision. Return false if the mechanics of calling python |
| * fails. A policy lookup will deny the connection by returning a blank |
| * usergroup name in the name buffer. |
| * Connection and connection denial counting is done in the python code. |
| * @param[in] policy pointer to policy |
| * @param[in] username authenticated user name |
| * @param[in] hostip numeric host ip address |
| * @param[in] app application name received in remote AMQP Open.hostname |
| * @param[in] conn_name connection name for tracking |
| * @param[out] name_buf pointer to settings name buffer |
| * @param[in] name_buf_size size of settings_buf |
| **/ |
| bool qd_policy_open_lookup_user( |
| qd_policy_t *policy, |
| const char *username, |
| const char *hostip, |
| const char *app, |
| const char *conn_name, |
| char *name_buf, |
| int name_buf_size, |
| uint64_t conn_id, |
| qd_policy_settings_t *settings) |
| { |
| // TODO: crolke 2016-03-24 - Workaround for PROTON-1133: Port number is included in Open hostname |
| // Strip the ':NNNN', if any, from the app name so that policy will work with proton 0.12 |
| char appname[HOST_NAME_MAX + 1]; |
| strncpy(appname, app, HOST_NAME_MAX); |
| appname[HOST_NAME_MAX] = 0; |
| char * colonp = strstr(appname, ":"); |
| if (colonp) { |
| *colonp = 0; |
| } |
| // Lookup the user/host/app for allow/deny and to get settings name |
| bool res = false; |
| qd_python_lock_state_t lock_state = qd_python_lock(); |
| PyObject *module = PyImport_ImportModule("qpid_dispatch_internal.policy.policy_manager"); |
| if (module) { |
| PyObject *lookup_user = PyObject_GetAttrString(module, "policy_lookup_user"); |
| if (lookup_user) { |
| PyObject *result = PyObject_CallFunction(lookup_user, "(OssssK)", |
| (PyObject *)policy->py_policy_manager, |
| username, hostip, appname, conn_name, conn_id); |
| if (result) { |
| const char *res_string = PyString_AsString(result); |
| strncpy(name_buf, res_string, name_buf_size); |
| Py_XDECREF(result); |
| res = true; // settings name returned |
| } else { |
| qd_log(policy->log_source, QD_LOG_DEBUG, "Internal: lookup_user: result"); |
| } |
| Py_XDECREF(lookup_user); |
| } else { |
| qd_log(policy->log_source, QD_LOG_DEBUG, "Internal: lookup_user: lookup_user"); |
| } |
| } |
| if (!res) { |
| if (module) { |
| Py_XDECREF(module); |
| } |
| qd_python_unlock(lock_state); |
| return false; |
| } |
| |
| // |
| if (name_buf[0]) { |
| // Go get the named settings |
| res = false; |
| PyObject *upolicy = PyDict_New(); |
| if (upolicy) { |
| PyObject *lookup_settings = PyObject_GetAttrString(module, "policy_lookup_settings"); |
| if (lookup_settings) { |
| PyObject *result2 = PyObject_CallFunction(lookup_settings, "(OssO)", |
| (PyObject *)policy->py_policy_manager, |
| appname, name_buf, upolicy); |
| if (result2) { |
| settings->maxFrameSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxFrameSize", 0); |
| settings->maxMessageSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxMessageSize", 0); |
| settings->maxSessionWindow = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessionWindow", 0); |
| settings->maxSessions = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0); |
| settings->maxSenders = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0); |
| settings->maxReceivers = qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0); |
| settings->allowAnonymousSender = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false); |
| settings->allowDynamicSrc = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSrc", false); |
| settings->sources = qd_entity_get_string((qd_entity_t*)upolicy, "sources"); |
| settings->targets = qd_entity_get_string((qd_entity_t*)upolicy, "targets"); |
| settings->denialCounts = (qd_policy_denial_counts_t*) |
| qd_entity_get_long((qd_entity_t*)upolicy, "denialCounts"); |
| Py_XDECREF(result2); |
| res = true; // named settings content returned |
| } else { |
| qd_log(policy->log_source, QD_LOG_DEBUG, "Internal: lookup_user: result2"); |
| } |
| Py_XDECREF(lookup_settings); |
| } else { |
| qd_log(policy->log_source, QD_LOG_DEBUG, "Internal: lookup_user: lookup_settings"); |
| } |
| Py_XDECREF(upolicy); |
| } else { |
| qd_log(policy->log_source, QD_LOG_DEBUG, "Internal: lookup_user: upolicy"); |
| } |
| } |
| Py_XDECREF(module); |
| qd_python_unlock(lock_state); |
| |
| if (name_buf[0]) { |
| qd_log(policy->log_source, |
| QD_LOG_TRACE, |
| "ALLOW AMQP Open lookup_user: %s, hostip: %s, app: %s, connection: %s. Usergroup: '%s'%s", |
| username, hostip, appname, conn_name, name_buf, (res ? "" : " Internal error.")); |
| } else { |
| // Denials are logged in python code |
| } |
| |
| return res; |
| } |
| |
| |
| // |
| // |
| void qd_policy_private_deny_amqp_connection(pn_connection_t *conn, const char *cond_name, const char *cond_descr) |
| { |
| pn_condition_t * cond = pn_connection_condition(conn); |
| (void) pn_condition_set_name( cond, cond_name); |
| (void) pn_condition_set_description(cond, cond_descr); |
| pn_connection_close(conn); |
| // Connection denial counts are counted and logged by python code. |
| } |
| |
| |
| // |
| // |
| void qd_policy_deny_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) |
| { |
| pn_condition_t * cond = pn_session_condition(ssn); |
| (void) pn_condition_set_name( cond, RESOURCE_LIMIT_EXCEEDED); |
| (void) pn_condition_set_description(cond, SESSION_DISALLOWED); |
| pn_session_close(ssn); |
| qd_conn->policy_settings->denialCounts->sessionDenied++; |
| } |
| |
| |
| // |
| // |
| bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) |
| { |
| bool result = true; |
| if (qd_conn->policy_settings) { |
| if (qd_conn->policy_settings->maxSessions) { |
| if (qd_conn->n_sessions == qd_conn->policy_settings->maxSessions) { |
| qd_policy_deny_amqp_session(ssn, qd_conn); |
| result = false; |
| } |
| } |
| } |
| pn_connection_t *conn = qd_connection_pn(qd_conn); |
| qd_dispatch_t *qd = qd_conn->server->qd; |
| qd_policy_t *policy = qd->policy; |
| const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr); |
| const char *app = pn_connection_remote_hostname(conn); |
| if (result) { |
| qd_log(policy->log_source, |
| QD_LOG_TRACE, |
| "ALLOW AMQP Begin Session. user: %s, hostip: %s, app: %s", |
| qd_conn->user_id, hostip, app); |
| } else { |
| qd_log(policy->log_source, |
| QD_LOG_INFO, |
| "DENY AMQP Begin Session due to session limit. user: %s, hostip: %s, app: %s", |
| qd_conn->user_id, hostip, app); |
| } |
| return result; |
| } |
| |
| |
| // |
| // |
| void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_conn) |
| { |
| if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow) { |
| pn_session_set_incoming_capacity(ssn, qd_conn->policy_settings->maxSessionWindow); |
| } else { |
| pn_session_set_incoming_capacity(ssn, 1000000); |
| } |
| } |
| |
| // |
| // |
| void _qd_policy_deny_amqp_link(pn_link_t *link, qd_connection_t *qd_conn) |
| { |
| pn_condition_t * cond = pn_link_condition(link); |
| (void) pn_condition_set_name( cond, RESOURCE_LIMIT_EXCEEDED); |
| (void) pn_condition_set_description(cond, LINK_DISALLOWED); |
| pn_link_close(link); |
| } |
| |
| |
| // |
| // |
| void _qd_policy_deny_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn) |
| { |
| _qd_policy_deny_amqp_link(pn_link, qd_conn); |
| qd_conn->policy_settings->denialCounts->senderDenied++; |
| } |
| |
| |
| // |
| // |
| void _qd_policy_deny_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_conn) |
| { |
| _qd_policy_deny_amqp_link(pn_link, qd_conn); |
| qd_conn->policy_settings->denialCounts->receiverDenied++; |
| } |
| |
| |
| // |
| // |
| #define MIN(a,b) (((a)<(b))?(a):(b)) |
| |
| char * _qd_policy_link_user_name_subst(const char *uname, const char *proposed, char *obuf, int osize) |
| { |
| if (strlen(uname) == 0) |
| return NULL; |
| |
| const char *duser = "${user}"; |
| char *retptr = obuf; |
| const char *wiptr = proposed; |
| const char *findptr = strstr(proposed, uname); |
| if (findptr == NULL) { |
| return NULL; |
| } |
| |
| // Copy leading before match |
| int segsize = findptr - wiptr; |
| int copysize = MIN(osize, segsize); |
| if (copysize) |
| strncpy(obuf, wiptr, copysize); |
| wiptr += copysize; |
| osize -= copysize; |
| obuf += copysize; |
| |
| // Copy the substitution string |
| segsize = strlen(duser); |
| copysize = MIN(osize, segsize); |
| if (copysize) |
| strncpy(obuf, duser, copysize); |
| wiptr += strlen(uname); |
| osize -= copysize; |
| obuf += copysize; |
| |
| // Copy trailing after match |
| strncpy(obuf, wiptr, osize); |
| return retptr; |
| } |
| |
| |
| // |
| // |
| // Size of 'easy' temporary copy of allowed input string |
| #define QPALN_SIZE 1024 |
| // Size of user-name-substituted proposed string. |
| #define QPALN_USERBUFSIZE 300 |
| // C in the CSV string |
| #define QPALN_COMMA_SEP "," |
| // Wildcard character |
| #define QPALN_WILDCARD '*' |
| |
| bool _qd_policy_approve_link_name(const char *username, const char *allowed, const char *proposed) |
| { |
| // Verify string sizes are usable |
| size_t p_len = strlen(proposed); |
| if (p_len == 0) { |
| // degenerate case of blank name being opened. will never match anything. |
| return false; |
| } |
| size_t a_len = strlen(allowed); |
| if (a_len == 0) { |
| // no names in 'allowed'. |
| return false; |
| } |
| |
| // Create a temporary writable copy of incoming allowed list |
| char t_allow[QPALN_SIZE + 1]; // temporary buffer for normal allow lists |
| char * pa = t_allow; |
| if (a_len > QPALN_SIZE) { |
| pa = (char *)malloc(a_len + 1); // malloc a buffer for larger allow lists |
| } |
| strncpy(pa, allowed, a_len); |
| pa[a_len] = 0; |
| // Do reverse user substitution into proposed |
| char substbuf[QPALN_USERBUFSIZE]; |
| char * prop2 = _qd_policy_link_user_name_subst(username, proposed, substbuf, QPALN_USERBUFSIZE); |
| char *toknext = 0; |
| char *tok = strtok_r(pa, QPALN_COMMA_SEP, &toknext); |
| assert (tok); |
| bool result = false; |
| while (tok != NULL) { |
| if (*tok == QPALN_WILDCARD) { |
| result = true; |
| break; |
| } |
| int matchlen = p_len; |
| int len = strlen(tok); |
| if (tok[len-1] == QPALN_WILDCARD) { |
| matchlen = len - 1; |
| assert(len > 0); |
| } |
| if (strncmp(tok, proposed, matchlen) == 0) { |
| result = true; |
| break; |
| } |
| if (prop2 && strncmp(tok, prop2, matchlen) == 0) { |
| result = true; |
| break; |
| } |
| tok = strtok_r(NULL, QPALN_COMMA_SEP, &toknext); |
| } |
| if (pa != t_allow) { |
| free(pa); |
| } |
| return result; |
| } |
| |
| // |
| // |
| bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn) |
| { |
| const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr); |
| const char *app = pn_connection_remote_hostname(qd_connection_pn(qd_conn)); |
| |
| if (qd_conn->policy_settings->maxSenders) { |
| if (qd_conn->n_senders == qd_conn->policy_settings->maxSenders) { |
| // Max sender limit specified and violated. |
| qd_log(qd_conn->server->qd->policy->log_source, QD_LOG_INFO, |
| "DENY AMQP Attach sender for user '%s', host '%s', app '%s' based on maxSenders limit", |
| qd_conn->user_id, hostip, app); |
| _qd_policy_deny_amqp_sender_link(pn_link, qd_conn); |
| return false; |
| } else { |
| // max sender limit not violated |
| } |
| } else { |
| // max sender limit not specified |
| } |
| // Approve sender link based on target |
| const char * target = pn_terminus_get_address(pn_link_remote_target(pn_link)); |
| bool lookup; |
| if (target && *target) { |
| // a target is specified |
| lookup = _qd_policy_approve_link_name(qd_conn->user_id, qd_conn->policy_settings->targets, target); |
| |
| qd_log(qd_conn->server->qd->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), |
| "%s AMQP Attach sender link '%s' for user '%s', host '%s', app '%s' based on link target name", |
| (lookup ? "ALLOW" : "DENY"), target, qd_conn->user_id, hostip, app); |
| |
| if (!lookup) { |
| _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn); |
| return false; |
| } |
| } else { |
| // A sender with no remote target. |
| // This happens all the time with anonymous relay |
| lookup = qd_conn->policy_settings->allowAnonymousSender; |
| qd_log(qd_conn->server->qd->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), |
| "%s AMQP Attach anonymous sender for user '%s', host '%s', app '%s'", |
| (lookup ? "ALLOW" : "DENY"), qd_conn->user_id, hostip, app); |
| if (!lookup) { |
| _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn); |
| return false; |
| } |
| } |
| // Approved |
| return true; |
| } |
| |
| |
| bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_conn) |
| { |
| const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr); |
| const char *app = pn_connection_remote_hostname(qd_connection_pn(qd_conn)); |
| |
| if (qd_conn->policy_settings->maxReceivers) { |
| if (qd_conn->n_receivers == qd_conn->policy_settings->maxReceivers) { |
| // Max sender limit specified and violated. |
| qd_log(qd_conn->server->qd->policy->log_source, QD_LOG_INFO, |
| "DENY AMQP Attach receiver for user '%s', host '%s', app '%s' based on maxReceivers limit", |
| qd_conn->user_id, hostip, app); |
| _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn); |
| return false; |
| } else { |
| // max receiver limit not violated |
| } |
| } else { |
| // max receiver limit not specified |
| } |
| // Approve receiver link based on source |
| bool dynamic_src = pn_terminus_is_dynamic(pn_link_remote_source(pn_link)); |
| if (dynamic_src) { |
| bool lookup = qd_conn->policy_settings->allowDynamicSrc; |
| qd_log(qd_conn->server->qd->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), |
| "%s AMQP Attach receiver dynamic source for user '%s', host '%s', app '%s',", |
| (lookup ? "ALLOW" : "DENY"), qd_conn->user_id, hostip, app); |
| // Dynamic source policy rendered the decision |
| if (!lookup) { |
| _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn); |
| } |
| return lookup; |
| } |
| const char * source = pn_terminus_get_address(pn_link_remote_source(pn_link)); |
| if (source && *source) { |
| // a source is specified |
| bool lookup = _qd_policy_approve_link_name(qd_conn->user_id, qd_conn->policy_settings->sources, source); |
| |
| qd_log(qd_conn->server->qd->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), |
| "%s AMQP Attach receiver link '%s' for user '%s', host '%s', app '%s' based on link source name", |
| (lookup ? "ALLOW" : "DENY"), source, qd_conn->user_id, hostip, app); |
| |
| if (!lookup) { |
| _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn); |
| return false; |
| } |
| } else { |
| // A receiver with no remote source. |
| qd_log(qd_conn->server->qd->policy->log_source, QD_LOG_TRACE, |
| "DENY AMQP Attach receiver link '' for user '%s', host '%s', app '%s'", |
| qd_conn->user_id, hostip, app); |
| _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn); |
| return false; |
| } |
| // Approved |
| return true; |
| } |
| |
| |
| // |
| // |
| void qd_policy_amqp_open(void *context, bool discard) |
| { |
| qd_connection_t *qd_conn = (qd_connection_t *)context; |
| if (!discard) { |
| pn_connection_t *conn = qd_connection_pn(qd_conn); |
| qd_dispatch_t *qd = qd_conn->server->qd; |
| qd_policy_t *policy = qd->policy; |
| bool connection_allowed = true; |
| |
| if (policy->enableVhostPolicy) { |
| // Open connection or not based on policy. |
| pn_transport_t *pn_trans = pn_connection_transport(conn); |
| const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr); |
| const char *pcrh = pn_connection_remote_hostname(conn); |
| const char *app = (pcrh ? pcrh : ""); |
| const char *conn_name = qdpn_connector_name(qd_conn->pn_cxtr); |
| #define SETTINGS_NAME_SIZE 256 |
| char settings_name[SETTINGS_NAME_SIZE]; |
| uint32_t conn_id = qd_conn->connection_id; |
| qd_conn->policy_settings = NEW(qd_policy_settings_t); // TODO: memory pool for settings |
| memset(qd_conn->policy_settings, 0, sizeof(qd_policy_settings_t)); |
| |
| if (qd_policy_open_lookup_user(policy, qd_conn->user_id, hostip, app, conn_name, |
| settings_name, SETTINGS_NAME_SIZE, conn_id, |
| qd_conn->policy_settings) && |
| settings_name[0]) { |
| // This connection is allowed by policy. |
| // Apply transport policy settings |
| if (qd_conn->policy_settings->maxFrameSize > 0) |
| pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->maxFrameSize); |
| if (qd_conn->policy_settings->maxSessions > 0) |
| pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->maxSessions - 1); |
| } else { |
| // This connection is denied by policy. |
| connection_allowed = false; |
| } |
| } else { |
| // No policy implies automatic policy allow |
| // Note that connections not governed by policy have no policy_settings. |
| } |
| if (connection_allowed) { |
| if (pn_connection_state(conn) & PN_LOCAL_UNINIT) |
| pn_connection_open(conn); |
| policy_notify_opened(qd_conn->open_container, qd_conn, qd_conn->context); |
| } else { |
| qd_policy_private_deny_amqp_connection(conn, RESOURCE_LIMIT_EXCEEDED, CONNECTION_DISALLOWED); |
| } |
| } |
| qd_connection_set_event_stall(qd_conn, false); |
| } |