blob: ab4b0607766f510f7c17afb60b751aced4d98c7d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
import unittest as unittest
import os, json, re, signal
import sys
import time
from system_test import TestCase, Qdrouterd, main_module, Process, TIMEOUT, DIR, TestTimeout
from system_test import Logger
from subprocess import PIPE, STDOUT
from proton import ConnectionException, Timeout, Url, symbol
from proton.handlers import MessagingHandler
from proton.reactor import Container, ReceiverOption
from proton.utils import BlockingConnection, LinkDetached, SyncRequestResponse
from qpid_dispatch_internal.policy.policy_util import is_ipv6_enabled
from qpid_dispatch_internal.compat import dict_iteritems
from test_broker import FakeBroker
class AbsoluteConnectionCountLimit(TestCase):
"""
Verify that connections beyond the absolute limit are denied and counted
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(AbsoluteConnectionCountLimit, cls).setUpClass()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 2, 'enableVhostPolicy': 'false'})
])
cls.router = cls.tester.qdrouterd('conn-limit-router', config, wait=True)
def address(self):
return self.router.addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', re.sub(r'amqp://', 'amqp://u1:password@', self.address()), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
universal_newlines=True)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception as e:
raise Exception("%s\n%s" % (e, out))
return out
def test_verify_maximum_connections(self):
addr = self.address()
# two connections should be ok
denied = False
try:
bc1 = BlockingConnection(addr)
bc2 = BlockingConnection(addr)
except ConnectionException:
denied = True
self.assertFalse(denied) # assert if connections that should open did not open
# third connection should be denied
denied = False
try:
bc3 = BlockingConnection(addr)
except ConnectionException:
denied = True
self.assertTrue(denied) # assert if connection that should not open did open
bc1.close()
bc2.close()
policystats = json.loads(self.run_qdmanage('query --type=policy'))
self.assertTrue(policystats[0]["connectionsDenied"] == 1)
self.assertTrue(policystats[0]["totalDenials"] == 1)
class LoadPolicyFromFolder(TestCase):
"""
Verify that specifying a policy folder from the router conf file
effects loading the policies in that folder.
This test relies on qdmanage utility.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(LoadPolicyFromFolder, cls).setUpClass()
ipv6_enabled = is_ipv6_enabled()
policy_config_path = os.path.join(DIR, 'policy-1')
replacements = {'{IPV6_LOOPBACK}':', ::1'}
for f in os.listdir(policy_config_path):
if f.endswith(".json.in"):
with open(policy_config_path+"/"+f[:-3], 'w') as outfile:
with open(policy_config_path + "/" + f) as infile:
for line in infile:
for src, target in dict_iteritems(replacements):
if ipv6_enabled:
line = line.replace(src, target)
else:
line = line.replace(src, '')
outfile.write(line)
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 2, 'policyDir': policy_config_path, 'enableVhostPolicy': 'true'})
])
cls.router = cls.tester.qdrouterd('conn-limit-router', config, wait=True)
def address(self):
return self.router.addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', re.sub(r'amqp://', 'amqp://u1:password@', self.address()), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
universal_newlines=True)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception as e:
raise Exception("%s\n%s" % (e, out))
return out
def test_verify_policies_are_loaded(self):
addr = self.address()
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 5)
def new_policy(self):
return """
{
"hostname": "dispatch-494",
"maxConnections": 50,
"maxConnectionsPerHost": 20,
"maxConnectionsPerUser": 8,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 9999,
"maxFrameSize": 222222,
"sources": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targets": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def updated_policy(self):
return """
{
"maxConnections": 500,
"maxConnectionsPerHost": 2,
"maxConnectionsPerUser": 30,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 123,
"users": "*",
"maxSessionWindow": 9999,
"maxFrameSize": 222222,
"sources": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targets": "public, private, $management",
"maxSenders": 222
}
}
}
"""
def test_verify_policy_add_update_delete(self):
# verify current vhost count
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 5)
# create
self.run_qdmanage('create --type=vhost --name=dispatch-494 --stdin', input=self.new_policy())
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 6)
found = False
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-494':
found = True
self.assertEqual(ruleset['maxConnections'], 50)
self.assertEqual(ruleset['maxConnectionsPerHost'], 20)
self.assertEqual(ruleset['maxConnectionsPerUser'], 8)
break
self.assertTrue(found)
# update
self.run_qdmanage('update --type=vhost --name=dispatch-494 --stdin', input=self.updated_policy())
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 6)
found = False
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-494':
found = True
self.assertEqual(ruleset['maxConnections'], 500)
self.assertEqual(ruleset['maxConnectionsPerHost'], 2)
self.assertEqual(ruleset['maxConnectionsPerUser'], 30)
break
self.assertTrue(found)
# delete
self.run_qdmanage('delete --type=vhost --name=dispatch-494')
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 5)
absent = True
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-494':
absent = False
break
self.assertTrue(absent)
def test_repeated_create_delete(self):
for i in range(0, 10):
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 5)
# create
self.run_qdmanage('create --type=vhost --name=dispatch-494 --stdin', input=self.new_policy())
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 6)
found = False
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-494':
found = True
break
self.assertTrue(found)
# delete
self.run_qdmanage('delete --type=vhost --name=dispatch-494')
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 5)
absent = True
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-494':
absent = False
break
self.assertTrue(absent)
class SenderReceiverLimits(TestCase):
"""
Verify that policy can limit senders and receivers by count.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(SenderReceiverLimits, cls).setUpClass()
policy_config_path = os.path.join(DIR, 'policy-3')
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 2, 'policyDir': policy_config_path, 'enableVhostPolicy': 'true'})
])
cls.router = cls.tester.qdrouterd('SenderReceiverLimits', config, wait=True)
def address(self):
return self.router.addresses[0]
def test_verify_n_receivers(self):
n = 4
addr = self.address()
br1 = BlockingConnection(addr)
# n receivers OK
br1.create_receiver(address="****YES_1of4***")
br1.create_receiver(address="****YES_20f4****")
br1.create_receiver(address="****YES_3of4****")
br1.create_receiver(address="****YES_4of4****")
# receiver n+1 should be denied
self.assertRaises(LinkDetached, br1.create_receiver, "****NO****")
br1.close()
def test_verify_n_senders(self):
n = 2
addr = self.address()
bs1 = BlockingConnection(addr)
# n senders OK
bs1.create_sender(address="****YES_1of2****")
bs1.create_sender(address="****YES_2of2****")
# sender n+1 should be denied
self.assertRaises(LinkDetached, bs1.create_sender, "****NO****")
bs1.close()
def test_verify_z_connection_stats(self):
# This test relies on being executed after test_verify_n_receivers and test_verify_n_senders.
# This test is named to follow those tests alphabetically.
# It also relies on executing after the router log file has written the policy logs.
# In some emulated environments the router log file writes may lag test execution.
# To accomodate the file lag this test may retry reading the log file.
verified = False
for tries in range(5):
with open('../setUpClass/SenderReceiverLimits.log', 'r') as router_log:
log_lines = router_log.read().split("\n")
close_lines = [s for s in log_lines if "senders_denied=1, receivers_denied=1" in s]
verified = len(close_lines) == 1
if verified:
break;
print("system_tests_policy, SenderReceiverLimits, test_verify_z_connection_stats: delay to wait for log to be written")
sys.stdout.flush()
time.sleep(1)
if not verified:
deny_lines = [s for s in log_lines if "DENY" in s]
resources_lines = [s for s in log_lines if "closed with resources" in s]
logger = Logger(title="Policy SenderReceiverLimits test_verify_z_connection_stats")
logger.log("Did not see log line containing: 'senders_denied=1, receivers_denied=1'")
logger.log("Policy DENY events")
for dl in deny_lines:
logger.log(" " + dl)
logger.log("Policy resources report")
for rl in resources_lines:
logger.log(" " + rl)
logger.dump()
self.assertTrue(verified, msg='Policy did not log sender and receiver denials.')
class PolicyVhostOverride(TestCase):
"""
Verify that listener policyVhost can override normally discovered vhost.
Verify that specific vhost and global denial counts are propagated.
This test conveniently forces the vhost denial statistics to be
on a named vhost and we know where to find them.
This test relies on qdmanage utility.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(PolicyVhostOverride, cls).setUpClass()
policy_config_path = os.path.join(DIR, 'policy-3')
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port(), 'policyVhost': 'override.host.com'}),
('policy', {'maxConnections': 2, 'policyDir': policy_config_path, 'enableVhostPolicy': 'true'})
])
cls.router = cls.tester.qdrouterd('PolicyVhostOverride', config, wait=True)
def address(self):
return self.router.addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', re.sub(r'amqp://', 'amqp://u1:password@', self.address()), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
universal_newlines=True)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception as e:
raise Exception("%s\n%s" % (e, out))
return out
def test_verify_n_receivers(self):
n = 4
addr = self.address()
br1 = BlockingConnection(addr)
# n receivers OK
br1.create_receiver(address="****YES_1of5***")
br1.create_receiver(address="****YES_20f5****")
br1.create_receiver(address="****YES_3of5****")
br1.create_receiver(address="****YES_4of5****")
br1.create_receiver(address="****YES_5of5****")
# receiver n+1 should be denied
self.assertRaises(LinkDetached, br1.create_receiver, "****NO****")
br1.close()
vhoststats = json.loads(self.run_qdmanage('query --type=vhostStats'))
foundStat = False
for vhs in vhoststats:
if vhs["id"] == "override.host.com":
foundStat = True
self.assertTrue(vhs["senderDenied"] == 0)
self.assertTrue(vhs["receiverDenied"] == 1)
break
self.assertTrue(foundStat, msg="did not find virtual host id 'override.host.com' in stats")
policystats = json.loads(self.run_qdmanage('query --type=policy'))
self.assertTrue(policystats[0]["linksDenied"] == 1)
self.assertTrue(policystats[0]["totalDenials"] == 1)
def test_verify_n_senders(self):
n = 2
addr = self.address()
bs1 = BlockingConnection(addr)
# n senders OK
bs1.create_sender(address="****YES_1of3****")
bs1.create_sender(address="****YES_2of3****")
bs1.create_sender(address="****YES_3of3****")
# sender n+1 should be denied
self.assertRaises(LinkDetached, bs1.create_sender, "****NO****")
bs1.close()
vhoststats = json.loads(self.run_qdmanage('query --type=vhostStats'))
foundStat = False
for vhs in vhoststats:
if vhs["id"] == "override.host.com":
foundStat = True
self.assertTrue(vhs["senderDenied"] == 1)
self.assertTrue(vhs["receiverDenied"] == 1)
break
self.assertTrue(foundStat, msg="did not find virtual host id 'override.host.com' in stats")
policystats = json.loads(self.run_qdmanage('query --type=policy'))
self.assertTrue(policystats[0]["linksDenied"] == 2)
self.assertTrue(policystats[0]["totalDenials"] == 2)
class Capabilities(ReceiverOption):
def __init__(self, value):
self.value = value
def apply(self, receiver):
receiver.source.capabilities.put_object(symbol(self.value))
class PolicyTerminusCapabilities(TestCase):
"""
Verify that specifying a policy folder from the router conf file
effects loading the policies in that folder.
This test relies on qdmanage utility.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(PolicyTerminusCapabilities, cls).setUpClass()
policy_config_path = os.path.join(DIR, 'policy-3')
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('policy', {'maxConnections': 2, 'policyDir': policy_config_path, 'enableVhostPolicy': 'true'}),
('listener', {'port': cls.tester.get_port(), 'policyVhost': 'capabilities1.host.com'}),
('listener', {'port': cls.tester.get_port(), 'policyVhost': 'capabilities2.host.com'})
])
cls.router = cls.tester.qdrouterd('PolicyTerminusCapabilities', config, wait=True)
def test_forbid_waypoint(self):
br1 = BlockingConnection(self.router.addresses[1])
self.assertRaises(LinkDetached, br1.create_receiver, address="ok1", options=Capabilities('qd.waypoint_1'))
br1.close()
def test_forbid_fallback(self):
br1 = BlockingConnection(self.router.addresses[0])
self.assertRaises(LinkDetached, br1.create_receiver, address="ok2", options=Capabilities('qd.fallback'))
br1.close()
class InterrouterLinksAllowed(TestCase):
inter_router_port = None
@classmethod
def setUpClass(cls):
"""Start a router"""
super(InterrouterLinksAllowed, cls).setUpClass()
policy_config_path = os.path.join(DIR, 'policy-5')
def router(name, connection):
config = [
('router', {'mode': 'interior', 'id': name}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'enableVhostPolicy': 'yes', 'policyDir': policy_config_path}),
connection
]
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
cls.routers = []
inter_router_port = cls.tester.get_port()
router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port}))
router('B', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port}))
# With these configs before DISPATCH-920 the routers never connect
# because the links are disallowed by policy. Before the wait_ready
# functions complete the routers should have tried the interrouter
# link.
cls.routers[0].wait_ready()
cls.routers[1].wait_ready()
cls.routers[0].teardown()
cls.routers[1].teardown()
def test_01_router_links_allowed(self):
with open(self.routers[0].outfile + '.out', 'r') as router_log:
log_lines = router_log.read().split("\n")
disallow_lines = [s for s in log_lines if "link disallowed" in s]
self.assertTrue(len(disallow_lines) == 0, msg='Inter-router links should be allowed but some were blocked by policy.')
class VhostPolicyNameField(TestCase):
"""
Verify that vhosts can be created getting the name from
'id' or from 'hostname'.
This test relies on qdmanage utility.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(VhostPolicyNameField, cls).setUpClass()
ipv6_enabled = is_ipv6_enabled()
policy_config_path = os.path.join(DIR, 'policy-1')
replacements = {'{IPV6_LOOPBACK}':', ::1'}
for f in os.listdir(policy_config_path):
if f.endswith(".json.in"):
with open(policy_config_path+"/"+f[:-3], 'w') as outfile:
with open(policy_config_path + "/" + f) as infile:
for line in infile:
for src, target in dict_iteritems(replacements):
if ipv6_enabled:
line = line.replace(src, target)
else:
line = line.replace(src, '')
outfile.write(line)
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 2, 'policyDir': policy_config_path, 'enableVhostPolicy': 'true'})
])
cls.router = cls.tester.qdrouterd('vhost-policy-name-field', config, wait=True)
def address(self):
return self.router.addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', re.sub(r'amqp://', 'amqp://u1:password@', self.address()), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
universal_newlines=True)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception as e:
raise Exception("%s\n%s" % (e, out))
return out
def id_policy(self):
return """
{
"id": "dispatch-918",
"maxConnections": 50,
"maxConnectionsPerHost": 20,
"maxConnectionsPerUser": 8,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 9999,
"maxFrameSize": 222222,
"sources": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targets": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def hostname_policy(self):
return """
{
"hostname": "dispatch-918",
"maxConnections": 51,
"maxConnectionsPerHost": 20,
"maxConnectionsPerUser": 8,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 9999,
"maxFrameSize": 222222,
"sources": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targets": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def both_policy(self):
return """
{
"id": "isogyre",
"hostname": "dispatch-918",
"maxConnections": 52,
"maxConnectionsPerHost": 20,
"maxConnectionsPerUser": 8,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 9999,
"maxFrameSize": 222222,
"sources": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targets": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def neither_policy(self):
return """
{
"maxConnections": 53,
"maxConnectionsPerHost": 20,
"maxConnectionsPerUser": 8,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 9999,
"maxFrameSize": 222222,
"sources": "public, private, $management, neither_policy",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targets": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def test_01_id_vs_hostname(self):
# verify current vhost count
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 5)
# create using 'id'
self.run_qdmanage('create --type=vhost --name=dispatch-918 --stdin', input=self.id_policy())
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 6)
found = False
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-918':
found = True
self.assertEqual(ruleset['maxConnections'], 50)
break
self.assertTrue(found)
# update using 'hostname'
self.run_qdmanage('update --type=vhost --name=dispatch-918 --stdin', input=self.hostname_policy())
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 6)
found = False
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-918':
found = True
self.assertEqual(ruleset['maxConnections'], 51)
break
self.assertTrue(found)
# update 'id' and 'hostname'
try:
self.run_qdmanage('update --type=vhost --name=dispatch-918 --stdin',
input=self.both_policy())
self.assertTrue(False) # should not be able to update 'id'
except Exception as e:
pass
# update using neither
self.run_qdmanage('update --type=vhost --name=dispatch-918 --stdin', input=self.neither_policy())
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 6)
found = False
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-918':
found = True
self.assertEqual(ruleset['maxConnections'], 53)
break
self.assertTrue(found)
isoFound = False
for ruleset in rulesets:
if ruleset['hostname'] == 'isogyre':
isoFound = True
break
self.assertFalse(isoFound)
class PolicyLinkNamePatternTest(TestCase):
"""
Verify that specifying a policy that generates a warning does
not cause the router to exit without showing the warning.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(PolicyLinkNamePatternTest, cls).setUpClass()
listen_port = cls.tester.get_port()
policy_config_path = os.path.join(DIR, 'policy-7')
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': listen_port}),
('policy', {'maxConnections': 2, 'policyDir': policy_config_path, 'enableVhostPolicy': 'true'})
])
cls.router = cls.tester.qdrouterd('PolicyLinkNamePatternTest', config, wait=False)
try:
cls.router.wait_ready(timeout = 5)
except Exception as e:
pass
def address(self):
return self.router.addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', re.sub(r'amqp://', 'amqp://u1:password@', self.address()), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
universal_newlines=True)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception as e:
raise Exception("%s\n%s" % (e, out))
return out
def default_patterns(self):
return """
{
"hostname": "$default",
"maxConnections": 3,
"maxConnectionsPerHost": 3,
"maxConnectionsPerUser": 3,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 1000000,
"maxFrameSize": 222222,
"sourcePattern": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targetPattern": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def disallowed_source(self):
return """
{
"hostname": "DISPATCH-1993-2",
"maxConnections": 3,
"maxConnectionsPerHost": 3,
"maxConnectionsPerUser": 3,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 1000000,
"maxFrameSize": 222222,
"sources": "public, private, $management",
"sourcePattern": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targetPattern": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def disallowed_target(self):
return """
{
"id": "DISPATCH-1993-3",
"maxConnections": 3,
"maxConnectionsPerHost": 3,
"maxConnectionsPerUser": 3,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 1000000,
"maxFrameSize": 222222,
"sourcePattern": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targetPattern": "public, private, $management",
"targets": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def disallowed_source_pattern1(self):
return """
{
"id": "DISPATCH-1993-3",
"maxConnections": 3,
"maxConnectionsPerHost": 3,
"maxConnectionsPerUser": 3,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 1000000,
"maxFrameSize": 222222,
"sourcePattern": "public, private, $management, abc-${user}.xyz",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targetPattern": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def disallowed_source_pattern2(self):
return """
{
"id": "DISPATCH-1993-3",
"maxConnections": 3,
"maxConnectionsPerHost": 3,
"maxConnectionsPerUser": 3,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 1000000,
"maxFrameSize": 222222,
"sourcePattern": "public, private, $management, abc/${user}.xyz",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targetPattern": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def test_link_name_parse_tree_patterns(self):
# update to replace source/target match patterns
qdm_out = "<not written>"
try:
qdm_out = self.run_qdmanage('update --type=vhost --name=vhost/$default --stdin', input=self.default_patterns())
except Exception as e:
self.assertTrue(False, msg=('Error running qdmanage %s' % str(e)))
self.assertNotIn("PolicyError", qdm_out)
# attempt an create that should be rejected
qdm_out = "<not written>"
exception = False
try:
qdm_out = self.run_qdmanage('create --type=vhost --name=DISPATCH-1993-2 --stdin', input=self.disallowed_source())
except Exception as e:
exception = True
self.assertTrue("InternalServerErrorStatus: PolicyError: Policy 'DISPATCH-1993-2' is invalid:" in str(e))
self.assertTrue(exception)
# attempt another create that should be rejected
qdm_out = "<not written>"
exception = False
try:
qdm_out = self.run_qdmanage('create --type=vhost --name=DISPATCH-1993-3 --stdin', input=self.disallowed_target())
except Exception as e:
exception = True
self.assertTrue("InternalServerErrorStatus: PolicyError: Policy 'DISPATCH-1993-3' is invalid:" in str(e))
self.assertTrue(exception)
# attempt another create that should be rejected - name subst must whole token
qdm_out = "<not written>"
exception = False
try:
qdm_out = self.run_qdmanage('create --type=vhost --name=DISPATCH-1993-3 --stdin', input=self.disallowed_source_pattern1())
except Exception as e:
exception = True
self.assertTrue("InternalServerErrorStatus: PolicyError:" in str(e))
self.assertTrue("Policy 'DISPATCH-1993-3' is invalid:" in str(e))
self.assertTrue(exception)
# attempt another create that should be rejected - name subst must be prefix or suffix
qdm_out = "<not written>"
exception = False
try:
qdm_out = self.run_qdmanage('create --type=vhost --name=DISPATCH-1993-3 --stdin', input=self.disallowed_source_pattern2())
except Exception as e:
exception = True
self.assertTrue("InternalServerErrorStatus: PolicyError:" in str(e))
self.assertTrue("Policy 'DISPATCH-1993-3' is invalid:" in str(e))
self.assertTrue(exception)
class PolicyHostamePatternTest(TestCase):
"""
Verify hostname pattern matching
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(PolicyHostamePatternTest, cls).setUpClass()
listen_port = cls.tester.get_port()
policy_config_path = os.path.join(DIR, 'policy-8')
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy8'}),
('listener', {'port': listen_port}),
('policy', {'maxConnections': 2, 'policyDir': policy_config_path, 'enableVhostPolicy': 'true', 'enableVhostNamePatterns': 'true'})
])
cls.router = cls.tester.qdrouterd('PolicyVhostNamePatternTest', config, wait=True)
try:
cls.router.wait_ready(timeout = 5)
except Exception:
pass
def address(self):
return self.router.addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', re.sub(r'amqp://', 'amqp://u1:password@', self.address()), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
universal_newlines=True)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception as e:
raise Exception("%s\n%s" % (e, out))
return out
def disallowed_hostname(self):
return """
{
"hostname": "#.#.0.0",
"maxConnections": 3,
"maxConnectionsPerHost": 3,
"maxConnectionsPerUser": 3,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 1000000,
"maxFrameSize": 222222,
"sources": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"maxSenders": 22
}
}
}
"""
def test_hostname_pattern_00_hello(self):
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 1)
def test_hostname_pattern_01_denied_add(self):
qdm_out = "<not written>"
try:
qdm_out = self.run_qdmanage('create --type=vhost --name=#.#.0.0 --stdin', input=self.disallowed_hostname())
except Exception as e:
self.assertTrue("pattern conflicts" in str(e), msg=('Error running qdmanage %s' % str(e)))
self.assertNotIn("222222", qdm_out)
class VhostPolicyFromRouterConfig(TestCase):
"""
Verify that connections beyond the vhost limit are denied.
Differently than global maxConnections, opening a connection
does not raise a ConnectionException, but when an attempt to
create a sync request and response client is made after limit
is reached, the connection times out.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(VhostPolicyFromRouterConfig, cls).setUpClass()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true'}),
('vhost', {
'hostname': '0.0.0.0', 'maxConnections': 2,
'allowUnknownUser': 'true',
'groups': {
'$default': {
'users': '*',
'remoteHosts': '*',
'sources': '*',
'targets': '*',
'allowDynamicSource': True
},
'anonymous': {
'users': 'anonymous',
'remoteHosts': '*',
'sourcePattern': 'addr/*/queue/*, simpleaddress, queue.${user}',
'targets': 'addr/*, simpleaddress, queue.${user}',
'allowDynamicSource': True,
'allowAnonymousSender': True
}
}
})
])
cls.router = cls.tester.qdrouterd('vhost-conn-limit-router', config, wait=True)
def address(self):
return self.router.addresses[0]
def test_verify_vhost_maximum_connections(self):
addr = "%s/$management" % self.address()
timeout = 5
# two connections should be ok
denied = False
try:
bc1 = SyncRequestResponse(BlockingConnection(addr, timeout=timeout))
bc2 = SyncRequestResponse(BlockingConnection(addr, timeout=timeout))
except ConnectionException:
denied = True
except Timeout:
denied = True
self.assertFalse(denied) # assert connections were opened
# third connection should be denied
denied = False
try:
bc3 = SyncRequestResponse(BlockingConnection(addr, timeout=timeout))
except ConnectionException:
denied = True
except Timeout:
denied = True
self.assertTrue(denied) # assert if connection that should not open did open
bc1.connection.close()
bc2.connection.close()
def test_vhost_allowed_addresses(self):
target_addr_list = ['addr/something', 'simpleaddress', 'queue.anonymous']
source_addr_list = ['addr/something/queue/one', 'simpleaddress', 'queue.anonymous']
# Attempt to connect to all allowed target addresses
for target_addr in target_addr_list:
sender = SenderAddressValidator("%s/%s" % (self.address(), target_addr))
self.assertFalse(sender.link_error,
msg="target address must be allowed, but it was not [%s]" % target_addr)
# Attempt to connect to all allowed source addresses
for source_addr in source_addr_list:
receiver = ReceiverAddressValidator("%s/%s" % (self.address(), source_addr))
self.assertFalse(receiver.link_error,
msg="source address must be allowed, but it was not [%s]" % source_addr)
def test_vhost_denied_addresses(self):
target_addr_list = ['addr', 'simpleaddress1', 'queue.user']
source_addr_list = ['addr/queue/one', 'simpleaddress1', 'queue.user']
# Attempt to connect to all not allowed target addresses
for target_addr in target_addr_list:
sender = SenderAddressValidator("%s/%s" % (self.address(), target_addr))
self.assertTrue(sender.link_error,
msg="target address must not be allowed, but it was [%s]" % target_addr)
# Attempt to connect to all not allowed source addresses
for source_addr in source_addr_list:
receiver = ReceiverAddressValidator("%s/%s" % (self.address(), source_addr))
self.assertTrue(receiver.link_error,
msg="source address must not be allowed, but it was [%s]" % source_addr)
class VhostPolicyConnLimit(TestCase):
"""
Verify that connections beyond the vhost limit are allowed
if override specified in vhost.group.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(VhostPolicyConnLimit, cls).setUpClass()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true'}),
('vhost', {
'hostname': '0.0.0.0', 'maxConnections': 100,
'maxConnectionsPerUser': 2,
'allowUnknownUser': 'true',
'groups': {
'$default': {
'users': '*',
'remoteHosts': '*',
'sources': '*',
'targets': '*',
'allowDynamicSource': True,
'maxConnectionsPerUser': 3
},
'anonymous': {
'users': 'anonymous',
'remoteHosts': '*',
'sourcePattern': 'addr/*/queue/*, simpleaddress, queue.${user}',
'targets': 'addr/*, simpleaddress, queue.${user}',
'allowDynamicSource': True,
'allowAnonymousSender': True,
'maxConnectionsPerUser': 3
}
}
})
])
cls.router = cls.tester.qdrouterd('vhost-policy-conn-limit', config, wait=True)
def address(self):
return self.router.addresses[0]
def test_verify_vhost_maximum_connections_override(self):
addr = "%s/$management" % self.address()
timeout = 5
# three connections should be ok
denied = False
try:
bc1 = SyncRequestResponse(BlockingConnection(addr, timeout=timeout))
bc2 = SyncRequestResponse(BlockingConnection(addr, timeout=timeout))
bc3 = SyncRequestResponse(BlockingConnection(addr, timeout=timeout))
except ConnectionException:
denied = True
except Timeout:
denied = True
self.assertFalse(denied) # assert connections were opened
# fourth connection should be denied
denied = False
try:
bc4 = SyncRequestResponse(BlockingConnection(addr, timeout=timeout))
except ConnectionException:
denied = True
except Timeout:
denied = True
self.assertTrue(denied) # assert if connection that should not open did open
bc1.connection.close()
bc2.connection.close()
bc3.connection.close()
class ClientAddressValidator(MessagingHandler):
"""
Base client class used to validate vhost policies through
receiver or clients based on allowed target and source
addresses.
Implementing classes must provide on_start() implementation
and create the respective sender or receiver.
"""
TIMEOUT = 3
def __init__(self, url):
super(ClientAddressValidator, self).__init__()
self.url = Url(url)
self.container = Container(self)
self.link_error = False
self.container.run()
signal.signal(signal.SIGALRM, self.timeout)
signal.alarm(ClientAddressValidator.TIMEOUT)
def timeout(self, signum, frame):
"""
In case router crashes or something goes wrong and client
is unable to connect, this method will be invoked and
set the link_error to True
:param signum:
:param frame:
:return:
"""
self.link_error = True
self.container.stop()
def on_link_error(self, event):
"""
When link was closed by the router due to policy violation.
:param event:
:return:
"""
self.link_error = True
event.connection.close()
signal.alarm(0)
def on_link_opened(self, event):
"""
When link was opened without error.
:param event:
:return:
"""
event.connection.close()
signal.alarm(0)
class ReceiverAddressValidator(ClientAddressValidator):
"""
Receiver implementation used to validate vhost policies
applied to source addresses.
"""
def __init__(self, url):
super(ReceiverAddressValidator, self).__init__(url)
def on_start(self, event):
"""
Creates the receiver.
:param event:
:return:
"""
event.container.create_receiver(self.url)
class SenderAddressValidator(ClientAddressValidator):
"""
Sender implementation used to validate vhost policies
applied to target addresses.
"""
def __init__(self, url):
super(SenderAddressValidator, self).__init__(url)
def on_start(self, event):
"""
Creates the sender
:param event:
:return:
"""
event.container.create_sender(self.url)
#
# Connector policy tests
#
class ConnectorPolicyMisconfiguredClient(FakeBroker):
'''
This client is targeted by a misconfigured connector whose policy
causes an immediate connection close.
'''
def __init__(self, url, container_id=None):
self.connection_opening = 0
self.connection_opened = 0
self.connection_error = 0
self.main_exited = False
super(ConnectorPolicyMisconfiguredClient, self).__init__(url, container_id)
def _main(self):
self._container.timeout = 1.0
self._container.start()
keep_running = True
while keep_running:
try:
self._container.process()
except:
self._stop_thread = True
keep_running = False
if self._stop_thread:
keep_running = False
self.main_exited = True
def join(self):
if not self._stop_thread:
self._stop_thread = True
self._container.wakeup()
if not self.main_exited:
self._thread.join(timeout=5)
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.acceptor = event.container.listen(self.url)
def timeout(self):
self._error = "Timeout Expired"
def on_connection_opening(self, event):
self.connection_opening += 1
super(ConnectorPolicyMisconfiguredClient, self).on_connection_opening(event)
def on_connection_opened(self, event):
self.connection_opened += 1
super(ConnectorPolicyMisconfiguredClient, self).on_connection_opened(event)
def on_connection_error(self, event):
self.connection_error += 1
class ConnectorPolicyMisconfigured(TestCase):
"""
Verify that a connector that has a vhostPolicy is not allowed
to open the connection if the policy is not defined
"""
remoteListenerPort = None
@classmethod
def setUpClass(cls):
"""Start the router"""
super(ConnectorPolicyMisconfigured, cls).setUpClass()
cls.remoteListenerPort = cls.tester.get_port();
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true'}),
('connector', {'name': 'novhost',
'idleTimeoutSeconds': 120, 'saslMechanisms': 'ANONYMOUS',
'host': '127.0.0.1', 'role': 'normal',
'port': cls.remoteListenerPort, 'policyVhost': 'nosuch'
}),
('vhost', {
'hostname': '0.0.0.0', 'maxConnections': 2,
'allowUnknownUser': 'true',
'groups': {
'$default': {
'users': '*',
'remoteHosts': '*',
'sources': '*',
'targets': '*',
'allowDynamicSource': True
},
'anonymous': {
'users': 'anonymous',
'remoteHosts': '*',
'sourcePattern': 'addr/*/queue/*, simpleaddress, queue.${user}',
'targets': 'addr/*, simpleaddress, queue.${user}',
'allowDynamicSource': True,
'allowAnonymousSender': True
}
}
})
])
cls.router = cls.tester.qdrouterd('connectorPolicyMisconfigured', config, wait=False)
def address(self):
return self.router.addresses[0]
def test_30_connector_policy_misconfigured(self):
url = "127.0.0.1:%d" % self.remoteListenerPort
tc = ConnectorPolicyMisconfiguredClient(url, "tc")
while tc.connection_error == 0 and tc._error == None:
time.sleep(0.1)
tc.join()
self.assertTrue(tc.connection_error == 1)
#
class ConnectorPolicyClient(FakeBroker):
'''
This client is targeted by a configured connector whose policy
allows certain sources and targets.
'''
def __init__(self, url, container_id=None):
self.connection_opening = 0
self.connection_opened = 0
self.connection_error = 0
self.main_exited = False
self.senders = []
self.receivers = []
self.link_error = False
self.sender_request = ""
self.receiver_request = ""
self.request_in_flight = False
self.req_close_sender = False
self.req_close_receiver = False
self.req_anonymous_sender = False
super(ConnectorPolicyClient, self).__init__(url, container_id)
def _main(self):
self._container.timeout = 1.0
self._container.start()
keep_running = True
while keep_running:
try:
self._container.process()
if not self.request_in_flight:
if self.sender_request != "":
sndr = self._container.create_sender(
self._connections[0], self.sender_request)
self.senders.append(sndr)
self.request_in_flight = True
self.sender_request = ""
elif self.receiver_request != "":
rcvr = self._container.create_receiver(
self._connections[0], self.receiver_request)
self.receivers.append(rcvr)
self.request_in_flight = True
self.receiver_request = ""
elif self.req_close_sender:
self.senders[0].close()
self.req_close_sender = False
elif self.req_close_receiver:
self.receivers[0].close()
self.req_close_receiver = False
elif self.req_anonymous_sender:
sndr = self._container.create_sender(
self._connections[0], name="anon")
self.senders.append(sndr)
self.request_in_flight = True
self.req_anonymous_sender = False
except:
self._stop_thread = True
keep_running = False
if self._stop_thread:
keep_running = False
self.main_exited = True
def join(self):
if not self._stop_thread:
self._stop_thread = True
self._container.wakeup()
if not self.main_exited:
self._thread.join(timeout=5)
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.acceptor = event.container.listen(self.url)
def timeout(self):
self._error = "Timeout Expired"
def on_connection_opening(self, event):
self.connection_opening += 1
super(ConnectorPolicyClient, self).on_connection_opening(event)
def on_connection_opened(self, event):
self.connection_opened += 1
super(ConnectorPolicyClient, self).on_connection_opened(event)
def on_connection_error(self, event):
self.connection_error += 1
def on_link_opened(self, event):
self.request_in_flight = False
def on_link_error(self, event):
self.link_error = True
self.request_in_flight = False
def try_sender(self, addr):
self.link_error = False
self.sender_request = addr
while (self.sender_request == addr or self.request_in_flight) \
and self.link_error == False and self._error is None:
time.sleep(0.10)
time.sleep(0.10)
return self.link_error == False
def try_receiver(self, addr):
self.link_error = False
self.receiver_request = addr
while (self.receiver_request == addr or self.request_in_flight) \
and self.link_error == False and self._error is None:
time.sleep(0.10)
time.sleep(0.10)
return self.link_error == False
def close_sender(self):
self.req_close_sender = True
while self.req_close_sender:
time.sleep(0.05)
def close_receiver(self):
self.req_close_receiver = True
while self.req_close_receiver:
time.sleep(0.05)
def try_anonymous_sender(self):
self.link_error = False
self.req_anonymous_sender = True
while (self.req_anonymous_sender or self.request_in_flight) \
and self.link_error == False and self._error is None:
time.sleep(0.10)
time.sleep(0.10)
return self.link_error == False
class ConnectorPolicySrcTgt(TestCase):
"""
Verify that a connector that has a vhostPolicy
* may open the connection
* may access allowed sources and targets
* may not access disallowed sources and targets
"""
remoteListenerPort = None
@classmethod
def setUpClass(cls):
"""Start the router"""
super(ConnectorPolicySrcTgt, cls).setUpClass()
cls.remoteListenerPort = cls.tester.get_port();
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true'}),
('connector', {'name': 'novhost',
'idleTimeoutSeconds': 120, 'saslMechanisms': 'ANONYMOUS',
'host': '127.0.0.1', 'role': 'normal',
'port': cls.remoteListenerPort, 'policyVhost': 'test'
}),
# Set up the prefix 'node' as a prefix for waypoint addresses
('address', {'prefix': 'node', 'waypoint': 'yes'}),
# Create a pair of default auto-links for 'node.1'
('autoLink', {'address': 'node.1', 'containerId': 'container.1', 'direction': 'in'}),
('autoLink', {'address': 'node.1', 'containerId': 'container.1', 'direction': 'out'}),
('vhost', {
'hostname': 'test',
'groups': {
'$connector': {
'sources': 'test,examples,work*',
'targets': 'examples,$management,play*',
}
}
})
])
cls.router = cls.tester.qdrouterd('ConnectorPolicySrcTgt', config, wait=False)
def address(self):
return self.router.addresses[0]
def test_31_connector_policy(self):
url = "127.0.0.1:%d" % self.remoteListenerPort
cpc = ConnectorPolicyClient(url, "cpc")
while cpc.connection_opened == 0 and cpc._error == None:
time.sleep(0.1)
time.sleep(0.05)
self.assertTrue(cpc.connection_error == 0) # expect connection to stay up
self.assertTrue(cpc._error is None)
# senders that should work
for addr in ["examples", "$management", "playtime"]: # allowed targets
try:
res = cpc.try_sender(addr)
except:
res = False
self.assertTrue(res)
# senders that should fail
for addr in ["test", "a/bad/addr"]: # denied targets
try:
res = cpc.try_sender(addr)
except:
res = False
self.assertFalse(res)
# receivers that should work
for addr in ["examples", "test", "workaholic"]: # allowed sources
try:
res = cpc.try_receiver(addr)
except:
res = False
self.assertTrue(res)
# receivers that should fail
for addr in ["$management", "a/bad/addr"]: # denied sources
try:
res = cpc.try_receiver(addr)
except:
res = False
self.assertFalse(res)
# anonomyous sender should be disallowed
res = cpc.try_anonymous_sender()
self.assertFalse(res)
# waypoint links should be disallowed
res = cpc.try_sender("node.1")
self.assertFalse(res)
res = cpc.try_receiver("node.1")
self.assertFalse(res)
class ConnectorPolicyNSndrRcvr(TestCase):
"""
Verify that a connector that has a vhostPolicy is allowed
* to open the connection
* is limited to the number of senders and receivers specified in the policy
"""
remoteListenerPort = None
MAX_SENDERS = 4
MAX_RECEIVERS = 3
@classmethod
def setUpClass(cls):
"""Start the router"""
super(ConnectorPolicyNSndrRcvr, cls).setUpClass()
cls.remoteListenerPort = cls.tester.get_port();
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true'}),
('connector', {'name': 'novhost',
'idleTimeoutSeconds': 120, 'saslMechanisms': 'ANONYMOUS',
'host': '127.0.0.1', 'role': 'normal',
'port': cls.remoteListenerPort, 'policyVhost': 'test'
}),
# Set up the prefix 'node' as a prefix for waypoint addresses
('address', {'prefix': 'node', 'waypoint': 'yes'}),
# Create a pair of default auto-links for 'node.1'
('autoLink', {'address': 'node.1', 'containerId': 'container.1', 'direction': 'in'}),
('autoLink', {'address': 'node.1', 'containerId': 'container.1', 'direction': 'out'}),
('vhost', {
'hostname': 'test',
'groups': {
'$connector': {
'sources': '*',
'targets': '*',
'maxSenders': cls.MAX_SENDERS,
'maxReceivers': cls.MAX_RECEIVERS,
'allowAnonymousSender': True,
'allowWaypointLinks': True
}
}
})
])
cls.router = cls.tester.qdrouterd('ConnectorPolicyNSndrRcvr', config, wait=False)
def address(self):
return self.router.addresses[0]
def test_32_connector_policy_max_sndr_rcvr(self):
url = "127.0.0.1:%d" % self.remoteListenerPort
cpc = ConnectorPolicyClient(url, "cpc")
while cpc.connection_opened == 0 and cpc._error == None:
time.sleep(0.1)
time.sleep(0.05)
self.assertTrue(cpc.connection_error == 0) # expect connection to stay up
self.assertTrue(cpc._error is None)
# senders that should work
# anonomyous sender should be allowed
res = cpc.try_anonymous_sender() # sender 1
self.assertTrue(res)
# waypoint links should be allowed
res = cpc.try_sender("node.1") # semder 2
self.assertTrue(res)
res = cpc.try_receiver("node.1") # receiver 1
self.assertTrue(res)
addr = "vermillion"
for i in range(self.MAX_SENDERS - 2):
try:
res = cpc.try_sender(addr)
except:
res = False
self.assertTrue(res)
# senders that should fail
for i in range(2):
try:
res = cpc.try_sender(addr)
except:
res = False
self.assertFalse(res)
# receivers that should work
for i in range(self.MAX_RECEIVERS - 1):
try:
res = cpc.try_receiver(addr)
except:
res = False
self.assertTrue(res)
# receivers that should fail
for i in range(2):
try:
res = cpc.try_receiver(addr)
except:
res = False
self.assertFalse(res)
# close a sender and verify that another one only may open
addr="skyblue"
cpc.close_sender()
for i in range(1):
try:
res = cpc.try_sender(addr)
except:
res = False
self.assertTrue(res)
# senders that should fail
for i in range(1):
try:
res = cpc.try_sender(addr)
except:
res = False
self.assertFalse(res)
# close a receiver and verify that another one only may open
cpc.close_receiver()
for i in range(1):
try:
res = cpc.try_receiver(addr)
except:
res = False
self.assertTrue(res)
# senders that should fail
for i in range(1):
try:
res = cpc.try_receiver(addr)
except:
res = False
self.assertFalse(res)
class VhostPolicyConfigHashPattern(TestCase):
"""
Verify that a vhost with a '#' symbol in the hostname does
not crash the router.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(VhostPolicyConfigHashPattern, cls).setUpClass()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true', 'enableVhostNamePatterns': 'true'}),
('vhost', {
'hostname': '#.example.com', 'maxConnections': 2,
'allowUnknownUser': 'true',
'groups': {
'$default': {
'users': '*',
'remoteHosts': '*',
'sources': '*',
'targets': '*',
'allowDynamicSource': True
}
}
})
])
cls.router = cls.tester.qdrouterd('vhost-policy-config-hash-pattern', config, wait=False)
cls.timed_out = False
try:
cls.router.wait_ready(timeout = 5)
except Exception:
cls.timed_out = True
def address(self):
return self.router.addresses[0]
def test_vhost_created(self):
# If the test fails then the router does not start
self.assertEqual(False, VhostPolicyConfigHashPattern.timed_out)
class PolicyConnectionAliasTest(MessagingHandler):
"""
This test tries to send an AMQP Open with a selectable hostname.
The hostname is expected to be an alias for a vhost. When the alias selects
the vhost then the connection is allowed.
"""
def __init__(self, test_host, target_hostname, send_address, print_to_console=False):
super(PolicyConnectionAliasTest, self).__init__()
self.test_host = test_host # router listener
self.target_hostname = target_hostname # vhost name for AMQP Open
self.send_address = send_address # dummy address allowed by policy
self.test_conn = None
self.dummy_sender = None
self.dummy_receiver = None
self.error = None
self.shut_down = False
self.connection_open_seen = False
self.logger = Logger(title=("PolicyConnectionAliasTest - use virtual_host '%s'" % (self.target_hostname)), print_to_console=print_to_console)
self.log_unhandled = False
def timeout(self):
self.error = "Timeout Expired"
self.logger.log("self.timeout " + self.error)
self._shut_down_test()
def on_start(self, event):
self.logger.log("on_start")
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.test_conn = event.container.connect(self.test_host.addresses[0],
virtual_host=self.target_hostname)
self.logger.log("on_start: done")
def on_connection_opened(self, event):
# This happens even if the connection is rejected.
# If the connection is rejected then it is immediately closed.
# Create a sender and receiver.
# If the sender gets on_sendable then the connection stayed up as expected.
self.logger.log("on_connection_opened")
self.connection_open_seen = True
self.dummy_sender = event.container.create_sender(self.test_conn, self.send_address)
self.dummy_receiver = event.container.create_receiver(self.test_conn, self.send_address)
def on_sendable(self, event):
# Success
self.logger.log("on_sendable: test is a success")
self._shut_down_test()
def on_connection_remote_close(self, event):
self.logger.log("on_connection_remote_close")
if self.connection_open_seen and not self.shut_down:
self.error = "Policy enforcement fail: expected connection was denied."
self._shut_down_test()
def on_unhandled(self, method, *args):
pass # self.logger.log("on_unhandled %s" % (method))
def _shut_down_test(self):
self.shut_down = True
if self.timer:
self.timer.cancel()
self.timer = None
if self.test_conn:
self.test_conn.close()
self.test_conn = None
def run(self):
try:
Container(self).run()
except Exception as e:
self.error = "Container run exception: %s" % (e)
self.logger.log(self.error)
self.logger.dump()
class PolicyVhostAlias(TestCase):
"""
Verify vhost aliases.
* A policy defines vhost A with alias B.
* A client opens a connection with hostname B in the AMQP Open.
* The test expects the connection to succeed using vhost A policy settings.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(PolicyVhostAlias, cls).setUpClass()
def router(name, mode, extra=None):
config = [
('router', {'mode': mode,
'id': name}),
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
('policy', {'enableVhostPolicy': 'true'}),
('vhost', {'hostname': 'A',
'allowUnknownUser': 'true',
'aliases': 'B',
'groups': {
'$default': {
'users': '*',
'maxConnections': 100,
'remoteHosts': '*',
'sources': '*',
'targets': '*',
'allowAnonymousSender': 'true',
'allowWaypointLinks': 'true',
'allowDynamicSource': 'true'
}
}
})
]
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
return cls.routers[-1]
cls.routers = []
router('A', 'interior')
cls.INT_A = cls.routers[0]
cls.INT_A.listener = cls.INT_A.addresses[0]
def test_100_policy_aliases(self):
test = PolicyConnectionAliasTest(PolicyVhostAlias.INT_A,
"B",
"address-B")
test.run()
if test.error is not None:
test.logger.log("test_100 test error: %s" % (test.error))
test.logger.dump()
self.assertTrue(test.error is None)
class PolicyVhostMultiTenantBlankHostname(TestCase):
"""
DISPATCH-1732: verify that a multitenant listener can handle an Open
with no hostname field.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(PolicyVhostMultiTenantBlankHostname, cls).setUpClass()
def router(name, mode, extra=None):
config = [
('router', {'mode': mode,
'id': name}),
('listener', {'role': 'normal',
'multiTenant': 'true',
'port': cls.tester.get_port(),
'policyVhost': 'myhost'}),
('policy', {'enableVhostPolicy': 'true'}),
('vhost', {'hostname': 'myhost',
'allowUnknownUser': 'true',
'groups': {
'$default': {
'users': '*',
'maxConnections': 100,
'remoteHosts': '*',
'sources': '*',
'targets': '*',
'allowAnonymousSender': 'true',
'allowWaypointLinks': 'true',
'allowDynamicSource': 'true'
}
}
})
]
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
return cls.routers[-1]
cls.routers = []
router('A', 'interior')
cls.INT_A = cls.routers[0]
cls.INT_A.listener = cls.INT_A.addresses[0]
def test_101_policy_alias_blank_vhost(self):
test = PolicyConnectionAliasTest(PolicyVhostMultiTenantBlankHostname.INT_A,
"",
"address-blank-101")
test.run()
if test.error is not None:
test.logger.log("test_101 test error: %s" % (test.error))
test.logger.dump()
self.assertTrue(test.error is None)
if __name__ == '__main__':
unittest.main(main_module())