blob: 9a47c76a1a10f95582d21ba6d7be9327627fa8f8 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License
#
"""
"""
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
import json
import pdb
import sys
from .policy_util import PolicyError, HostStruct, HostAddr, PolicyAppConnectionMgr, is_ipv6_enabled
from ..compat import PY_STRING_TYPE
from ..compat import PY_TEXT_TYPE
from ..compat import dict_iteritems
from ..compat import dict_keys
"""
Entity implementing the business logic of user connection/access policy.
"""
#
#
class PolicyKeys(object):
"""
String constants
"""
# Common key words
KW_IGNORED_NAME = "name"
KW_IGNORED_IDENTITY = "identity"
KW_IGNORED_TYPE = "type"
KW_VHOST_NAME = "hostname"
KW_VHOST_DEPRECATED_ID = "id"
# Policy ruleset key words
KW_MAXCONN = "maxConnections"
KW_MAXCONNPERHOST = "maxConnectionsPerHost"
KW_MAXCONNPERUSER = "maxConnectionsPerUser"
KW_CONNECTION_ALLOW_DEFAULT = "allowUnknownUser"
KW_GROUPS = "groups"
# Policy settings key words
KW_USERS = "users"
KW_REMOTE_HOSTS = "remoteHosts"
KW_MAX_FRAME_SIZE = "maxFrameSize"
KW_MAX_MESSAGE_SIZE = "maxMessageSize"
KW_MAX_SESSION_WINDOW = "maxSessionWindow"
KW_MAX_SESSIONS = "maxSessions"
KW_MAX_SENDERS = "maxSenders"
KW_MAX_RECEIVERS = "maxReceivers"
KW_ALLOW_DYNAMIC_SRC = "allowDynamicSource"
KW_ALLOW_ANONYMOUS_SENDER = "allowAnonymousSender"
KW_ALLOW_USERID_PROXY = "allowUserIdProxy"
KW_ALLOW_WAYPOINT_LINKS = "allowWaypointLinks"
KW_ALLOW_FALLBACK_LINKS = "allowFallbackLinks"
KW_ALLOW_DYNAMIC_LINK_ROUTES = "allowDynamicLinkRoutes"
KW_ALLOW_ADMIN_STATUS_UPDATE = "allowAdminStatusUpdate"
KW_SOURCES = "sources"
KW_TARGETS = "targets"
KW_SOURCE_PATTERN = "sourcePattern"
KW_TARGET_PATTERN = "targetPattern"
KW_VHOST_ALIASES = "aliases"
# Policy stats key words
KW_CONNECTIONS_APPROVED = "connectionsApproved"
KW_CONNECTIONS_DENIED = "connectionsDenied"
KW_CONNECTIONS_CURRENT = "connectionsCurrent"
KW_LINKS_DENIED = "linksDenied"
KW_TOTAL_DENIALS = "totalDenials"
KW_PER_USER_STATE = "perUserState"
KW_PER_HOST_STATE = "perHostState"
# What settings does a user get when allowed to connect but
# not restricted by a user group?
KW_DEFAULT_SETTINGS = "$default"
# Config file separator character for two IP addresses in a range
KC_CONFIG_IP_SEP = "-"
# Config file separator character for names in a list
KC_CONFIG_LIST_SEP = ","
# user-to-group computed map in compiled ruleset
RULESET_U2G_MAP = "U2G"
# policy stats controlled by C code but referenced by settings
KW_CSTATS = "denialCounts"
# Username subsitituion token in link source and target names and patterns
KC_TOKEN_USER = "${user}"
# Link target/source name wildcard tuple keys
KC_TUPLE_ABSENT = 'a'
KC_TUPLE_PREFIX = 'p'
KC_TUPLE_SUFFIX = 's'
KC_TUPLE_EMBED = 'e'
KC_TUPLE_WILDCARD = '*'
#
#
class PolicyCompiler(object):
"""
Validate incoming configuration for legal schema.
- Warn about section options that go unused.
- Disallow negative max connection/message size numbers.
- Check that connectionOrigins resolve to IP hosts.
- Enforce internal consistency,
"""
allowed_ruleset_options = [
PolicyKeys.KW_IGNORED_NAME,
PolicyKeys.KW_IGNORED_IDENTITY,
PolicyKeys.KW_IGNORED_TYPE,
PolicyKeys.KW_VHOST_NAME,
PolicyKeys.KW_MAXCONN,
PolicyKeys.KW_MAX_MESSAGE_SIZE,
PolicyKeys.KW_MAXCONNPERHOST,
PolicyKeys.KW_MAXCONNPERUSER,
PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT,
PolicyKeys.KW_GROUPS,
PolicyKeys.KW_VHOST_ALIASES
]
allowed_settings_options = [
PolicyKeys.KW_USERS,
PolicyKeys.KW_REMOTE_HOSTS,
PolicyKeys.KW_MAXCONNPERHOST,
PolicyKeys.KW_MAXCONNPERUSER,
PolicyKeys.KW_MAX_FRAME_SIZE,
PolicyKeys.KW_MAX_MESSAGE_SIZE,
PolicyKeys.KW_MAX_SESSION_WINDOW,
PolicyKeys.KW_MAX_SESSIONS,
PolicyKeys.KW_MAX_SENDERS,
PolicyKeys.KW_MAX_RECEIVERS,
PolicyKeys.KW_ALLOW_DYNAMIC_SRC,
PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER,
PolicyKeys.KW_ALLOW_USERID_PROXY,
PolicyKeys.KW_ALLOW_WAYPOINT_LINKS,
PolicyKeys.KW_ALLOW_FALLBACK_LINKS,
PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES,
PolicyKeys.KW_ALLOW_ADMIN_STATUS_UPDATE,
PolicyKeys.KW_SOURCES,
PolicyKeys.KW_TARGETS,
PolicyKeys.KW_SOURCE_PATTERN,
PolicyKeys.KW_TARGET_PATTERN
]
def __init__(self):
"""
Create a validator
"""
pass
def validateNumber(self, val, v_min, v_max, errors):
"""
Range check a numeric int policy value
@param[in] val policy value to check
@param[in] v_min minumum value
@param[in] v_max maximum value. zero disables check
@param[out] errors failure message
@return v_min <= val <= v_max
"""
try:
v_int = int(val)
except Exception as e:
errors.append("Value '%s' does not resolve to an integer." % val)
return False
if v_int < v_min:
errors.append("Value '%s' is below minimum '%s'." % (val, v_min))
return False
if v_max > 0 and v_int > v_max:
errors.append("Value '%s' is above maximum '%s'." % (val, v_max))
return False
return True
def compile_connection_group(self, vhostname, groupname, val, list_out, warnings, errors):
"""
Handle an ingressHostGroups submap.
Each origin value is verified. On a successful run the submap
is replaced parsed lists of HostAddr objects.
@param[in] vhostname vhost name
@param[in] groupname vhost/group name
@param[in] val origin list as text string
@param[out] list_out user inputs replaced with HostAddr objects
@param[out] warnings nonfatal irregularities observed
@param[out] errors descriptions of failure
@return - origins is usable. If True then warnings[] may contain useful
information about fields that are ignored. If False then
warnings[] may contain info and errors[0] will hold the
description of why the origin was rejected.
"""
key = PolicyKeys.KW_REMOTE_HOSTS
# convert val string to list of host specs
if isinstance(val, list):
# ['abc', 'def', 'mytarget']
pass
elif isinstance(val, (PY_STRING_TYPE, PY_TEXT_TYPE)):
val = [x.strip(' ') for x in val.split(PolicyKeys.KC_CONFIG_LIST_SEP)]
else:
errors.append(
"Policy vhost '%s' user group '%s' option '%s' has illegal value '%s'. Type must be 'str' or 'list' but is '%s;" %
(vhostname, groupname, key, val, type(val)))
return False
for coname in val:
try:
coha = HostAddr(coname, PolicyKeys.KC_CONFIG_IP_SEP)
list_out.append(coha)
except Exception as e:
errors.append("Policy vhost '%s' user group '%s' option '%s' connectionOption '%s' failed to translate: '%s'." %
(vhostname, groupname, key, coname, e))
return False
return True
def compile_app_settings(self, vhostname, usergroup, policy_in, policy_out, warnings, errors):
"""
Compile a vhostUserGroupSettings schema from processed json format to local internal format.
@param[in] name vhost name
@param[in] policy_in user config settings
@param[out] policy_out validated Internal format
@param[out] warnings nonfatal irregularities observed
@param[out] errors descriptions of failure
@return - settings are usable. If True then warnings[] may contain useful
information about fields that are ignored. If False then
warnings[] may contain info and errors[0] will hold the
description of why the policy was rejected.
"""
# rulesets may not come through standard config so make nice defaults
policy_out[PolicyKeys.KW_USERS] = ''
policy_out[PolicyKeys.KW_REMOTE_HOSTS] = ''
# DISPATCH-1277 - KW_MAX_FRAME_SIZE must be defaulted to 16384 not 2147483647
policy_out[PolicyKeys.KW_MAX_FRAME_SIZE] = 16384
policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = None
policy_out[PolicyKeys.KW_MAX_SESSION_WINDOW] = 2147483647
policy_out[PolicyKeys.KW_MAX_SESSIONS] = 65536
policy_out[PolicyKeys.KW_MAX_SENDERS] = 2147483647
policy_out[PolicyKeys.KW_MAX_RECEIVERS] = 2147483647
policy_out[PolicyKeys.KW_ALLOW_DYNAMIC_SRC] = False
policy_out[PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER] = False
policy_out[PolicyKeys.KW_ALLOW_USERID_PROXY] = False
policy_out[PolicyKeys.KW_ALLOW_WAYPOINT_LINKS] = True
policy_out[PolicyKeys.KW_ALLOW_FALLBACK_LINKS] = True
policy_out[PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES] = True
policy_out[PolicyKeys.KW_ALLOW_ADMIN_STATUS_UPDATE] = True
policy_out[PolicyKeys.KW_SOURCES] = ''
policy_out[PolicyKeys.KW_TARGETS] = ''
policy_out[PolicyKeys.KW_SOURCE_PATTERN] = ''
policy_out[PolicyKeys.KW_TARGET_PATTERN] = ''
policy_out[PolicyKeys.KW_MAXCONNPERHOST] = None # optional group limit
policy_out[PolicyKeys.KW_MAXCONNPERUSER] = None
cerror = []
user_sources = False
user_targets = False
user_src_pattern = False
user_tgt_pattern = False
for key, val in dict_iteritems(policy_in):
if key not in self.allowed_settings_options:
warnings.append("Policy vhost '%s' user group '%s' option '%s' is ignored." %
(vhostname, usergroup, key))
if key in [PolicyKeys.KW_MAXCONNPERHOST,
PolicyKeys.KW_MAXCONNPERUSER
]:
if not self.validateNumber(val, 0, 65535, cerror):
msg = ("Policy vhost '%s' user group '%s' option '%s' has error '%s'." %
(vhostname, usergroup, key, cerror[0]))
errors.append(msg)
return False
policy_out[key] = int(val)
elif key in [PolicyKeys.KW_MAX_FRAME_SIZE,
PolicyKeys.KW_MAX_MESSAGE_SIZE,
PolicyKeys.KW_MAX_RECEIVERS,
PolicyKeys.KW_MAX_SENDERS,
PolicyKeys.KW_MAX_SESSION_WINDOW,
PolicyKeys.KW_MAX_SESSIONS
]:
if not self.validateNumber(val, 0, 0, cerror):
errors.append("Policy vhost '%s' user group '%s' option '%s' has error '%s'." %
(vhostname, usergroup, key, cerror[0]))
return False
policy_out[key] = int(val)
elif key == PolicyKeys.KW_REMOTE_HOSTS:
# Conection groups are lists of IP addresses that need to be
# converted into binary structures for comparisons.
val_out = []
if not self.compile_connection_group(vhostname, usergroup, val, val_out, warnings, errors):
return False
policy_out[key] = val_out
elif key in [PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER,
PolicyKeys.KW_ALLOW_DYNAMIC_SRC,
PolicyKeys.KW_ALLOW_USERID_PROXY,
PolicyKeys.KW_ALLOW_WAYPOINT_LINKS,
PolicyKeys.KW_ALLOW_FALLBACK_LINKS,
PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES,
PolicyKeys.KW_ALLOW_ADMIN_STATUS_UPDATE
]:
if isinstance(val, (PY_STRING_TYPE, PY_TEXT_TYPE)) and val.lower() in ['true', 'false']:
val = True if val == 'true' else False
if not type(val) is bool:
errors.append("Policy vhost '%s' user group '%s' option '%s' has illegal boolean value '%s'." %
(vhostname, usergroup, key, val))
return False
policy_out[key] = val
elif key in [PolicyKeys.KW_USERS,
PolicyKeys.KW_SOURCES,
PolicyKeys.KW_TARGETS,
PolicyKeys.KW_SOURCE_PATTERN,
PolicyKeys.KW_TARGET_PATTERN,
PolicyKeys.KW_VHOST_ALIASES
]:
# accept a string or list
if isinstance(val, list):
# ['abc', 'def', 'mytarget']
pass
elif isinstance(val, (PY_STRING_TYPE, PY_TEXT_TYPE)):
# 'abc, def, mytarget'
val = [x.strip(' ') for x in val.split(PolicyKeys.KC_CONFIG_LIST_SEP)]
else:
errors.append("Policy vhost '%s' user group '%s' option '%s' has illegal value '%s'. Type must be 'str' or 'list' but is '%s;" %
(vhostname, usergroup, key, val, type(val)))
# deduplicate address lists
val = list(set(val))
# val is CSV string with no white space between values: 'abc,def,mytarget,tmp-${user}'
if key == PolicyKeys.KW_USERS:
# user name list items are literal strings and need no special handling
policy_out[key] = ','.join(val)
else:
# source and target names get special handling for the '${user}' substitution token
# The literal string is translated to a (key, prefix, suffix) set of three strings.
# C code does not have to search for the username token and knows with authority
# how to construct match strings.
# A wildcard is also signaled.
utoken = PolicyKeys.KC_TOKEN_USER
eVal = []
for v in val:
vcount = v.count(utoken)
if vcount > 1:
errors.append("Policy vhost '%s' user group '%s' policy key '%s' item '%s' contains multiple user subtitution tokens" %
(vhostname, usergroup, key, v))
return False
elif vcount == 1:
# a single token is present as a prefix, suffix, or embedded
# construct cChar, S1, S2 encodings to be added to eVal description
if v.startswith(utoken):
# prefix
eVal.append(PolicyKeys.KC_TUPLE_PREFIX)
eVal.append('')
eVal.append(v[v.find(utoken) + len(utoken):])
elif v.endswith(utoken):
# suffix
eVal.append(PolicyKeys.KC_TUPLE_SUFFIX)
eVal.append(v[0:v.find(utoken)])
eVal.append('')
else:
# embedded
if key in [PolicyKeys.KW_SOURCE_PATTERN,
PolicyKeys.KW_TARGET_PATTERN]:
errors.append("Policy vhost '%s' user group '%s' policy key '%s' item '%s' may contain match pattern '%s' as a prefix or a suffix only." %
(vhostname, usergroup, key, v, utoken))
return False
eVal.append(PolicyKeys.KC_TUPLE_EMBED)
eVal.append(v[0:v.find(utoken)])
eVal.append(v[v.find(utoken) + len(utoken):])
else:
# ${user} token is absent
if v == PolicyKeys.KC_TUPLE_WILDCARD:
eVal.append(PolicyKeys.KC_TUPLE_WILDCARD)
eVal.append('')
eVal.append('')
else:
eVal.append(PolicyKeys.KC_TUPLE_ABSENT)
eVal.append(v)
eVal.append('')
policy_out[key] = ','.join(eVal)
if key == PolicyKeys.KW_SOURCES:
user_sources = True
if key == PolicyKeys.KW_TARGETS:
user_targets = True
if key == PolicyKeys.KW_SOURCE_PATTERN:
user_src_pattern = True
if key == PolicyKeys.KW_TARGET_PATTERN:
user_tgt_pattern = True
if user_sources and user_src_pattern:
errors.append("Policy vhost '%s' user group '%s' specifies conflicting 'sources' and 'sourcePattern' attributes. Use only one or the other." % (vhostname, usergroup))
return False
if user_targets and user_tgt_pattern:
errors.append("Policy vhost '%s' user group '%s' specifies conflicting 'targets' and 'targetPattern' attributes. Use only one or the other." % (vhostname, usergroup))
return False
return True
def compile_access_ruleset(self, name, policy_in, policy_out, warnings, errors):
"""
Compile a vhost schema from processed json format to local internal format.
@param[in] name vhost name
@param[in] policy_in raw policy to be validated
@param[out] policy_out validated Internal format
@param[out] warnings nonfatal irregularities observed
@param[out] errors descriptions of failure
@return - policy is usable. If True then warnings[] may contain useful
information about fields that are ignored. If False then
warnings[] may contain info and errors[0] will hold the
description of why the policy was rejected.
"""
cerror = []
# rulesets may not come through standard config so make nice defaults
policy_out[PolicyKeys.KW_MAXCONN] = 65535
policy_out[PolicyKeys.KW_MAXCONNPERHOST] = 65535
policy_out[PolicyKeys.KW_MAXCONNPERUSER] = 65535
policy_out[PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT] = False
policy_out[PolicyKeys.KW_GROUPS] = {}
policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = None
policy_out[PolicyKeys.KW_VHOST_ALIASES] = []
# validate the options
for key, val in dict_iteritems(policy_in):
if key not in self.allowed_ruleset_options:
warnings.append("Policy vhost '%s' option '%s' is ignored." %
(name, key))
if key in [PolicyKeys.KW_MAXCONN,
PolicyKeys.KW_MAXCONNPERHOST,
PolicyKeys.KW_MAXCONNPERUSER
]:
if not self.validateNumber(val, 0, 65535, cerror):
msg = ("Policy vhost '%s' option '%s' has error '%s'." %
(name, key, cerror[0]))
errors.append(msg)
return False
policy_out[key] = val
elif key in [PolicyKeys.KW_MAX_MESSAGE_SIZE
]:
if not self.validateNumber(val, 0, 0, cerror):
msg = ("Policy vhost '%s' option '%s' has error '%s'." %
(name, key, cerror[0]))
errors.append(msg)
return False
policy_out[key] = val
elif key in [PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT]:
if not type(val) is bool:
errors.append("Policy vhost '%s' option '%s' must be of type 'bool' but is '%s'" %
(name, key, type(val)))
return False
policy_out[key] = val
elif key in [PolicyKeys.KW_VHOST_ALIASES]:
# vhost aliases is a CSV string. convert to a list
val0 = [x.strip(' ') for x in val.split(PolicyKeys.KC_CONFIG_LIST_SEP)]
# Reject aliases that duplicate the vhost itself or other aliases
val = []
for vtest in val0:
if vtest == name:
errors.append("Policy vhost '%s' option '%s' value '%s' duplicates vhost name" %
(name, key, vtest))
return False
if vtest in val:
errors.append("Policy vhost '%s' option '%s' value '%s' is duplicated" %
(name, key, vtest))
return False
val.append(vtest)
policy_out[key] = val
elif key in [PolicyKeys.KW_GROUPS]:
if not type(val) is dict:
errors.append("Policy vhost '%s' option '%s' must be of type 'dict' but is '%s'" %
(name, key, type(val)))
return False
for skey, sval in dict_iteritems(val):
newsettings = {}
if not self.compile_app_settings(name, skey, sval, newsettings, warnings, errors):
return False
policy_out[key][skey] = {}
policy_out[key][skey].update(newsettings)
# Verify that each user is in only one group.
# Create user-to-group map for looking up user's group
policy_out[PolicyKeys.RULESET_U2G_MAP] = {}
if PolicyKeys.KW_GROUPS in policy_out:
for group, groupsettings in dict_iteritems(policy_out[PolicyKeys.KW_GROUPS]):
if PolicyKeys.KW_USERS in groupsettings:
users = [x.strip(' ') for x in groupsettings[PolicyKeys.KW_USERS].split(PolicyKeys.KC_CONFIG_LIST_SEP)]
for user in users:
if user in policy_out[PolicyKeys.RULESET_U2G_MAP]:
errors.append("Policy vhost '%s' user '%s' is in multiple user groups '%s' and '%s'" %
(name, user, policy_out[PolicyKeys.RULESET_U2G_MAP][user], group))
return False
else:
policy_out[PolicyKeys.RULESET_U2G_MAP][user] = group
else:
warnings.append("Policy vhost '%s' user group '%s' has no defined users. This policy has no effect" % (name, group))
# Default connections require a default settings
if policy_out[PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT]:
if PolicyKeys.KW_DEFAULT_SETTINGS not in policy_out[PolicyKeys.KW_GROUPS]:
errors.append("Policy vhost '%s' allows connections by default but default settings are not defined" %
(name))
return False
return True
#
#
class AppStats(object):
"""
Maintain live state and statistics for an vhost.
"""
def __init__(self, id, manager, ruleset):
self.my_id = id
self._manager = manager
self.conn_mgr = PolicyAppConnectionMgr(
ruleset[PolicyKeys.KW_MAXCONN],
ruleset[PolicyKeys.KW_MAXCONNPERUSER],
ruleset[PolicyKeys.KW_MAXCONNPERHOST])
self._cstats = self._manager.get_agent().qd.qd_dispatch_policy_c_counts_alloc()
self._manager.get_agent().add_implementation(self, "vhostStats")
def update_ruleset(self, ruleset):
"""
The parent ruleset has changed.
Propagate settings into the connection manager.
@param ruleset: new ruleset
@return:
"""
self.conn_mgr.update(
ruleset[PolicyKeys.KW_MAXCONN],
ruleset[PolicyKeys.KW_MAXCONNPERHOST],
ruleset[PolicyKeys.KW_MAXCONNPERUSER])
def refresh_entity(self, attributes):
"""Refresh management attributes"""
entitymap = {}
entitymap[PolicyKeys.KW_VHOST_NAME] = self.my_id
entitymap[PolicyKeys.KW_VHOST_DEPRECATED_ID] = self.my_id
entitymap[PolicyKeys.KW_CONNECTIONS_APPROVED] = self.conn_mgr.connections_approved
entitymap[PolicyKeys.KW_CONNECTIONS_DENIED] = self.conn_mgr.connections_denied
entitymap[PolicyKeys.KW_CONNECTIONS_CURRENT] = self.conn_mgr.connections_active
entitymap[PolicyKeys.KW_PER_USER_STATE] = self.conn_mgr.per_user_state
entitymap[PolicyKeys.KW_PER_HOST_STATE] = self.conn_mgr.per_host_state
self._manager.get_agent().qd.qd_dispatch_policy_c_counts_refresh(self._cstats, entitymap)
attributes.update(entitymap)
def can_connect(self, conn_id, user, host, diags, group_max_conn_user, group_max_conn_host):
return self.conn_mgr.can_connect(conn_id, user, host, diags, group_max_conn_user, group_max_conn_host)
def disconnect(self, conn_id, user, host):
self.conn_mgr.disconnect(conn_id, user, host)
def count_other_denial(self):
self.conn_mgr.count_other_denial()
def get_cstats(self):
return self._cstats
#
#
class ConnectionFacts(object):
def __init__(self, user, host, app, conn_name):
self.user = user
self.host = host
self.app = app
self.conn_name = conn_name
#
#
class PolicyLocal(object):
"""
The local policy database.
"""
def __init__(self, manager):
"""
Create instance
@params manager policy manager class
"""
# manager is a class
# It provides access the dispatch system functions
self._manager = manager
# rulesetdb is a map
# key : vhost name
# val : ruleset for this app
# created by configuration
# augmented by policy compiler
self.rulesetdb = {}
# settingsdb is a map
# key : <vhost name>
# val : a map
# key : <user group name>
# val : settings to use for user's connection
# created by configuration
self.settingsdb = {}
# statsdb is a map
# key : <vhost name>
# val : AppStats object
self.statsdb = {}
# _policy_compiler is a function
# validates incoming policy and readies it for internal use
self._policy_compiler = PolicyCompiler()
# _connections is a map
# key : numeric connection id
# val : ConnectionFacts
# Entries created as connection AMQP Opens arrive
# Entries destroyed as sockets closed
self._connections = {}
# _default_vhost is a string
# holds the name of the vhost to use when the
# open.hostname is not found in the rulesetdb
self._default_vhost = ""
# _use_hostname_patterns
# holds policy setting.
# When true policy ruleset definitions are propagated to C code
self.use_hostname_patterns = False
# _max_message_size
# holds global value from policy config object
self._max_message_size = 0
# _vhost_aliases is a map
# key : alias vhost name
# val : actual vhost to which alias refers
self._vhost_aliases = {}
#
# Service interfaces
#
def create_ruleset(self, attributes):
"""
Create or update named policy ruleset.
@param[in] attributes: from config
"""
warnings = []
diag = []
candidate = {}
name = attributes[PolicyKeys.KW_VHOST_NAME]
result = self._policy_compiler.compile_access_ruleset(name, attributes, candidate, warnings, diag)
if not result:
raise PolicyError("Policy '%s' is invalid: %s" % (name, diag[0]))
if len(warnings) > 0:
for warning in warnings:
self._manager.log_warning(warning)
# Reject if any vhost alias name conflicts
if name in self._vhost_aliases:
# hostname is an alias
raise PolicyError(
"Policy is creating vhost '%s' but that name is already an alias for vhost '%s'" % (name, self._vhost_aliases[name]))
for vhost_alias in candidate[PolicyKeys.KW_VHOST_ALIASES]:
# alias is a hostname
if vhost_alias in self.rulesetdb.keys():
raise PolicyError(
"Policy for vhost '%s' defines alias '%s' which conflicts with an existing vhost named '%s'" % (name, vhost_alias, vhost_alias))
if name not in self.rulesetdb:
# Creating new ruleset. Vhost aliases cannot overlap
for vhost_alias in candidate[PolicyKeys.KW_VHOST_ALIASES]:
if vhost_alias in self._vhost_aliases:
raise PolicyError(
"Policy for vhost '%s' alias '%s' conflicts with existing alias for vhost '%s'" % (name, vhost_alias, self._vhost_aliases[vhost_alias]))
else:
# Updating an existing ruleset.
# Vhost aliases still cannot overlap but replacement is allowed
for vhost_alias in candidate[PolicyKeys.KW_VHOST_ALIASES]:
if vhost_alias in self._vhost_aliases and not self._vhost_aliases[vhost_alias] == name:
raise PolicyError(
"Policy for vhost '%s' alias '%s' conflicts with existing alias for vhost '%s'" % (name, vhost_alias, self._vhost_aliases[vhost_alias]))
# Reject if parse tree optimized name collision
# Coincidently add name and aliases to parse tree
if self.use_hostname_patterns:
agent = self._manager.get_agent()
# construct a list of names to be added
tnames = []
tnames.append(name)
tnames += candidate[PolicyKeys.KW_VHOST_ALIASES]
# create a list of names to undo in case a subsequent name does not work
snames = []
for tname in tnames:
if not agent.qd.qd_dispatch_policy_host_pattern_add(agent.dispatch, tname):
# undo the snames list
for sname in snames:
agent.qd.qd_dispatch_policy_host_pattern_del(agent.dispatch, sname)
raise PolicyError("Policy for vhost '%s' alias '%s' optimized pattern conflicts with existing pattern" % (name, tname))
snames.append(tname)
# Names pass administrative approval
if name not in self.rulesetdb:
# add new aliases
for nname in candidate[PolicyKeys.KW_VHOST_ALIASES]:
self._vhost_aliases[nname] = name
if name not in self.statsdb:
self.statsdb[name] = AppStats(name, self._manager, candidate)
self._manager.log_info("Created policy rules for vhost %s" % name)
else:
# remove old aliases
old_aliases = self.rulesetdb[name][PolicyKeys.KW_VHOST_ALIASES]
for oname in old_aliases:
del self._vhost_aliases[oname]
# add new aliases
for nname in candidate[PolicyKeys.KW_VHOST_ALIASES]:
self._vhost_aliases[nname] = name
self.statsdb[name].update_ruleset(candidate)
self._manager.log_info("Updated policy rules for vhost %s" % name)
# TODO: ruleset lock
self.rulesetdb[name] = {}
self.rulesetdb[name].update(candidate)
def policy_delete(self, name):
"""
Delete named policy
@param[in] name vhost name
"""
if name not in self.rulesetdb:
raise PolicyError("Policy '%s' does not exist" % name)
# TODO: ruleset lock
if self.use_hostname_patterns:
agent = self._manager.get_agent()
agent.qd.qd_dispatch_policy_host_pattern_remove(agent.dispatch, name)
anames = self.rulesetdb[name][PolicyKeys.KW_VHOST_ALIASES]
for aname in anames:
agent.qd.qd_dispatch_policy_host_pattern_remove(agent.dispatch, aname)
del self.rulesetdb[name]
#
# db enumerator
#
def policy_db_get_names(self):
"""
Return a list of vhost names in this policy
"""
return dict_keys(self.rulesetdb)
def set_default_vhost(self, name):
"""
Set the default vhost name.
@param name: the name of the default vhost
@return: none
"""
self._default_vhost = name
self._manager.log_info("Policy fallback defaultVhost is defined: '%s'" % name)
def default_vhost_enabled(self):
"""
The default vhost is enabled if the name is not blank and
the vhost is defined in rulesetdb.
@return:
"""
return not self._default_vhost == "" and self._default_vhost in self.rulesetdb
#
# Runtime query interface
#
def lookup_vhost_alias(self, vhost_in):
"""
Resolve given vhost name to vhost settings name.
If the incoming name is a vhost hostname then return the same name.
If the incoming name is a vhost alias hostname then return the containing vhost name.
If a default vhost is defined then return its name.
:param vhost_in: vhost name to test
:return: name of policy settings vhost to be applied or blank if lookup failed.
"""
vhost = vhost_in
if self.use_hostname_patterns:
agent = self._manager.get_agent()
vhost = agent.qd.qd_dispatch_policy_host_pattern_lookup(agent.dispatch, vhost)
# Translate an aliased vhost to a concrete vhost. If no alias then use current vhost.
vhost = self._vhost_aliases.get(vhost, vhost)
# If no usable vhost yet then try default vhost
if vhost not in self.rulesetdb:
vhost = self._default_vhost if self.default_vhost_enabled() else ""
return vhost
def lookup_user(self, user, rhost, vhost_in, conn_name, conn_id):
"""
Lookup function called from C.
Determine if a user on host accessing vhost through AMQP Open is allowed
according to the policy access rules.
If allowed then return the policy vhost settings name. If stats.can_connect
returns true then it has registered and counted the connection.
@param[in] user connection authId
@param[in] rhost connection remote host numeric IP address as string
@param[in] vhost_in vhost user is accessing
@param[in] conn_name connection name used for tracking reports
@param[in] conn_id internal connection id
@return settings user-group name if allowed; "" if not allowed
"""
try:
# choose rule set based on incoming vhost or default vhost
# or potential vhost found by pattern matching
vhost = self.lookup_vhost_alias(vhost_in)
if vhost == "":
self._manager.log_info(
"DENY AMQP Open for user '%s', rhost '%s', vhost '%s': "
"No policy defined for vhost" % (user, rhost, vhost_in))
return ""
if vhost != vhost_in:
self._manager.log_debug(
"AMQP Open for user '%s', rhost '%s', vhost '%s': "
"proceeds using vhost '%s' ruleset" % (user, rhost, vhost_in, vhost))
ruleset = self.rulesetdb[vhost]
# look up the stats
if vhost not in self.statsdb:
msg = (
"DENY AMQP Open for user '%s', rhost '%s', vhost '%s': "
"INTERNAL: Policy is defined but stats are missing" % (user, rhost, vhost))
raise PolicyError(msg)
stats = self.statsdb[vhost]
# Get settings for user in a user group or in default
if user in ruleset[PolicyKeys.RULESET_U2G_MAP]:
usergroup = ruleset[PolicyKeys.RULESET_U2G_MAP][user]
elif "*" in ruleset[PolicyKeys.RULESET_U2G_MAP]:
usergroup = ruleset[PolicyKeys.RULESET_U2G_MAP]["*"]
else:
if ruleset[PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT]:
usergroup = PolicyKeys.KW_DEFAULT_SETTINGS
else:
self._manager.log_info(
"DENY AMQP Open for user '%s', rhost '%s', vhost '%s': "
"User is not in a user group and unknown users are denied" % (user, rhost, vhost))
stats.count_other_denial()
return ""
groupsettings = ruleset[PolicyKeys.KW_GROUPS][usergroup]
# User in usergroup allowed to connect from rhost?
allowed = False
if PolicyKeys.KW_REMOTE_HOSTS in groupsettings:
# Users are restricted to connecting from a rhost
# defined by the group's remoteHost list
cglist = groupsettings[PolicyKeys.KW_REMOTE_HOSTS]
uhs = HostStruct(rhost)
for cohost in cglist:
if cohost.match_bin(uhs):
allowed = True
break
if not allowed:
self._manager.log_info(
"DENY AMQP Open for user '%s', rhost '%s', vhost '%s': "
"User is not allowed to connect from this network host" % (user, rhost, vhost))
stats.count_other_denial()
return ""
# This user passes administrative approval.
# Now check live connection counts
# Extract optional usergroup connection counts
group_max_conn_user = groupsettings.get(PolicyKeys.KW_MAXCONNPERUSER)
group_max_conn_host = groupsettings.get(PolicyKeys.KW_MAXCONNPERHOST)
diags = []
if not stats.can_connect(conn_name, user, rhost, diags, group_max_conn_user, group_max_conn_host):
for diag in diags:
self._manager.log_info(
"DENY AMQP Open for user '%s', rhost '%s', vhost '%s': "
"%s" % (user, rhost, vhost, diag))
return ""
# Record facts about this connection to use during teardown
facts = ConnectionFacts(user, rhost, vhost, conn_name)
self._connections[conn_id] = facts
# Return success
return usergroup
except Exception as e:
self._manager.log_info(
"DENY AMQP Open lookup_user failed for user '%s', rhost '%s', vhost '%s': "
"Internal error: %s" % (user, rhost, vhost, e))
# return failure
return ""
def lookup_settings(self, vhost_in, groupname, upolicy):
"""
Given a settings name, return the aggregated policy blob.
@param[in] vhost_in: vhost user is accessing
@param[in] groupname: user group name
@param[out] upolicy: dict holding policy values - the settings blob
TODO: make this a c struct
@return if lookup worked
# Note: the upolicy output is a non-nested dict with settings of interest
"""
try:
vhost = self.lookup_vhost_alias(vhost_in)
if vhost != vhost_in:
self._manager.log_debug(
"AMQP Open lookup settings for vhost '%s': "
"proceeds using vhost '%s' ruleset" % (vhost_in, vhost))
if vhost not in self.rulesetdb:
self._manager.log_info(
"lookup_settings fail for vhost '%s', user group '%s': "
"No policy defined for this vhost" % (vhost, groupname))
return False
ruleset = self.rulesetdb[vhost]
if groupname not in ruleset[PolicyKeys.KW_GROUPS]:
self._manager.log_trace(
"lookup_settings fail for vhost '%s', user group '%s': "
"This vhost has no settings for the user group" % (vhost, groupname))
return False
upolicy.update(ruleset[PolicyKeys.KW_GROUPS][groupname])
maxsize = upolicy.get(PolicyKeys.KW_MAX_MESSAGE_SIZE, None)
if maxsize is None:
maxsize = ruleset.get(PolicyKeys.KW_MAX_MESSAGE_SIZE, None)
if maxsize is None:
maxsize = self._max_message_size
upolicy[PolicyKeys.KW_MAX_MESSAGE_SIZE] = maxsize
upolicy[PolicyKeys.KW_CSTATS] = self.statsdb[vhost].get_cstats()
return True
except Exception as e:
return False
def close_connection(self, conn_id):
"""
Close the connection.
@param conn_id:
@return:
"""
try:
if conn_id in self._connections:
facts = self._connections[conn_id]
stats = self.statsdb[facts.app]
stats.disconnect(facts.conn_name, facts.user, facts.host)
del self._connections[conn_id]
except Exception as e:
self._manager.log_trace(
"Policy internal error closing connection id %s. %s" % (conn_id, str(e)))
def set_max_message_size(self, size):
"""
record max message size from policy config object
:param size:
:return:ls
"""
self._max_message_size = size
#
#
def test_load_config(self):
"""
Test function to load a policy.
@return:
"""
ruleset_str = '["vhost", {"hostname": "photoserver", "maxConnections": 50, "maxConnectionsPerUser": 5, "maxConnectionsPerHost": 20, "allowUnknownUser": true, "aliases": "antialias",'
ruleset_str += '"groups": {'
ruleset_str += '"anonymous": { "users": "anonymous", "remoteHosts": "*", "maxFrameSize": 111111, "maxMessageSize": 111111, "maxSessionWindow": 111111, "maxSessions": 1, "maxSenders": 11, "maxReceivers": 11, "allowDynamicSource": false, "allowAnonymousSender": false, "sources": "public", "targets": "" },'
ruleset_str += '"users": { "users": "u1, u2", "remoteHosts": "*", "maxFrameSize": 222222, "maxMessageSize": 222222, "maxSessionWindow": 222222, "maxSessions": 2, "maxSenders": 22, "maxReceivers": 22, "allowDynamicSource": false, "allowAnonymousSender": false, "sources": "public, private", "targets": "public" },'
ruleset_str += '"paidsubscribers": { "users": "p1, p2", "remoteHosts": "*", "maxFrameSize": 333333, "maxMessageSize": 333333, "maxSessionWindow": 333333, "maxSessions": 3, "maxSenders": 33, "maxReceivers": 33, "allowDynamicSource": true, "allowAnonymousSender": false, "sources": "public, private", "targets": "public, private" },'
ruleset_str += '"test": { "users": "zeke, ynot", "remoteHosts": "10.48.0.0-10.48.255.255, 192.168.100.0-192.168.100.255", "maxFrameSize": 444444, "maxMessageSize": 444444, "maxSessionWindow": 444444, "maxSessions": 4, "maxSenders": 44, "maxReceivers": 44, "allowDynamicSource": true, "allowAnonymousSender": true, "sources": "private", "targets": "private" },'
if is_ipv6_enabled():
ruleset_str += '"admin": { "users": "alice, bob", "remoteHosts": "10.48.0.0-10.48.255.255, 192.168.100.0-192.168.100.255, 10.18.0.0-10.18.255.255, 127.0.0.1, ::1", "maxFrameSize": 555555, "maxMessageSize": 555555, "maxSessionWindow": 555555, "maxSessions": 5, "maxSenders": 55, "maxReceivers": 55, "allowDynamicSource": true, "allowAnonymousSender": true, "sources": "public, private, management", "targets": "public, private, management" },'
ruleset_str += '"superuser": { "users": "ellen", "remoteHosts": "72.135.2.9, 127.0.0.1, ::1", "maxFrameSize": 666666, "maxMessageSize": 666666, "maxSessionWindow": 666666, "maxSessions": 6, "maxSenders": 66, "maxReceivers": 66, "allowDynamicSource": false, "allowAnonymousSender": false, "sources": "public, private, management, root", "targets": "public, private, management, root" },'
else:
ruleset_str += '"admin": { "users": "alice, bob", "remoteHosts": "10.48.0.0-10.48.255.255, 192.168.100.0-192.168.100.255, 10.18.0.0-10.18.255.255, 127.0.0.1", "maxFrameSize": 555555, "maxMessageSize": 555555, "maxSessionWindow": 555555, "maxSessions": 5, "maxSenders": 55, "maxReceivers": 55, "allowDynamicSource": true, "allowAnonymousSender": true, "sources": "public, private, management", "targets": "public, private, management" },'
ruleset_str += '"superuser": { "users": "ellen", "remoteHosts": "72.135.2.9, 127.0.0.1", "maxFrameSize": 666666, "maxMessageSize": 666666, "maxSessionWindow": 666666, "maxSessions": 6, "maxSenders": 66, "maxReceivers": 66, "allowDynamicSource": false, "allowAnonymousSender": false, "sources": "public, private, management, root", "targets": "public, private, management, root" },'
ruleset_str += '"$default": { "remoteHosts": "*", "maxFrameSize": 222222, "maxMessageSize": 222222, "maxSessionWindow": 222222, "maxSessions": 2, "maxSenders": 22, "maxReceivers": 22, "allowDynamicSource": false, "allowAnonymousSender": false, "sources": "public, private", "targets": "public" }'
ruleset_str += '}}]'
ruleset = json.loads(ruleset_str)
self.create_ruleset(ruleset[1])