blob: 8e481027c8ca56019b4471809af78de2de3b75f0 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import unittest as unittest
import os, json
from system_test import TestCase, Qdrouterd, main_module, Process, TIMEOUT, DIR
from subprocess import PIPE, STDOUT
from proton import ConnectionException
from proton.utils import BlockingConnection, LinkDetached
from qpid_dispatch_internal.policy.policy_util import is_ipv6_enabled
class AbsoluteConnectionCountLimit(TestCase):
"""
Verify that connections beyond the absolute limit are denied
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(AbsoluteConnectionCountLimit, cls).setUpClass()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 2, 'enableVhostPolicy': 'false'})
])
cls.router = cls.tester.qdrouterd('conn-limit-router', config, wait=True)
def address(self):
return self.router.addresses[0]
def test_verify_maximum_connections(self):
addr = self.address()
# two connections should be ok
denied = False
try:
bc1 = BlockingConnection(addr)
bc2 = BlockingConnection(addr)
except ConnectionException:
denied = True
self.assertFalse(denied) # assert if connections that should open did not open
# third connection should be denied
denied = False
try:
bc3 = BlockingConnection(addr)
except ConnectionException:
denied = True
self.assertTrue(denied) # assert if connection that should not open did open
bc1.close()
bc2.close()
class LoadPolicyFromFolder(TestCase):
"""
Verify that specifying a policy folder from the router conf file
effects loading the policies in that folder.
This test relies on qdmanage utility.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(LoadPolicyFromFolder, cls).setUpClass()
ipv6_enabled = is_ipv6_enabled()
policy_config_path = os.path.join(DIR, 'policy-1')
replacements = {'{IPV6_LOOPBACK}':', ::1'}
for f in os.listdir(policy_config_path):
if f.endswith(".json.in"):
with open(policy_config_path+"/"+f[:-3], 'w') as outfile:
with open(policy_config_path + "/" + f) as infile:
for line in infile:
for src, target in replacements.iteritems():
if ipv6_enabled:
line = line.replace(src, target)
else:
line = line.replace(src, '')
outfile.write(line)
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 2, 'policyDir': policy_config_path, 'enableVhostPolicy': 'true'})
])
cls.router = cls.tester.qdrouterd('conn-limit-router', config, wait=True)
def address(self):
return self.router.addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', 'u1:password@' + self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception, e:
raise Exception("%s\n%s" % (e, out))
return out
def test_verify_policies_are_loaded(self):
addr = self.address()
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 5)
def new_policy(self):
return """
{
"hostname": "dispatch-494",
"maxConnections": 50,
"maxConnectionsPerHost": 20,
"maxConnectionsPerUser": 8,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 9999,
"maxFrameSize": 222222,
"sources": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targets": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def updated_policy(self):
return """
{
"maxConnections": 500,
"maxConnectionsPerHost": 2,
"maxConnectionsPerUser": 30,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 123,
"users": "*",
"maxSessionWindow": 9999,
"maxFrameSize": 222222,
"sources": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targets": "public, private, $management",
"maxSenders": 222
}
}
}
"""
def test_verify_policy_add_update_delete(self):
# verify current vhost count
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 5)
# create
self.run_qdmanage('create --type=vhost --name=dispatch-494 --stdin', input=self.new_policy())
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 6)
found = False
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-494':
found = True
self.assertEqual(ruleset['maxConnections'], 50)
self.assertEqual(ruleset['maxConnectionsPerHost'], 20)
self.assertEqual(ruleset['maxConnectionsPerUser'], 8)
break
self.assertTrue(found)
# update
self.run_qdmanage('update --type=vhost --name=dispatch-494 --stdin', input=self.updated_policy())
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 6)
found = False
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-494':
found = True
self.assertEqual(ruleset['maxConnections'], 500)
self.assertEqual(ruleset['maxConnectionsPerHost'], 2)
self.assertEqual(ruleset['maxConnectionsPerUser'], 30)
break
self.assertTrue(found)
# delete
self.run_qdmanage('delete --type=vhost --name=dispatch-494')
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 5)
absent = True
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-494':
absent = False
break
self.assertTrue(absent)
def test_repeated_create_delete(self):
for i in range(0, 10):
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 5)
# create
self.run_qdmanage('create --type=vhost --name=dispatch-494 --stdin', input=self.new_policy())
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 6)
found = False
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-494':
found = True
break
self.assertTrue(found)
# delete
self.run_qdmanage('delete --type=vhost --name=dispatch-494')
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 5)
absent = True
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-494':
absent = False
break
self.assertTrue(absent)
class SenderReceiverLimits(TestCase):
"""
Verify that specifying a policy folder from the router conf file
effects loading the policies in that folder.
This test relies on qdmanage utility.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(SenderReceiverLimits, cls).setUpClass()
policy_config_path = os.path.join(DIR, 'policy-3')
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 2, 'policyDir': policy_config_path, 'enableVhostPolicy': 'true'})
])
cls.router = cls.tester.qdrouterd('SenderReceiverLimits', config, wait=True)
def address(self):
return self.router.addresses[0]
def test_verify_n_receivers(self):
n = 4
addr = self.address()
br1 = BlockingConnection(addr)
# n receivers OK
br1.create_receiver(address="****YES_1of4***")
br1.create_receiver(address="****YES_20f4****")
br1.create_receiver(address="****YES_3of4****")
br1.create_receiver(address="****YES_4of4****")
# receiver n+1 should be denied
self.assertRaises(LinkDetached, br1.create_receiver, "****NO****")
br1.close()
def test_verify_n_senders(self):
n = 2
addr = self.address()
bs1 = BlockingConnection(addr)
# n senders OK
bs1.create_sender(address="****YES_1of2****")
bs1.create_sender(address="****YES_2of2****")
# sender n+1 should be denied
self.assertRaises(LinkDetached, bs1.create_sender, "****NO****")
bs1.close()
class InterrouterLinksAllowed(TestCase):
inter_router_port = None
@classmethod
def setUpClass(cls):
"""Start a router"""
super(InterrouterLinksAllowed, cls).setUpClass()
policy_config_path = os.path.join(DIR, 'policy-5')
def router(name, connection):
config = [
('router', {'mode': 'interior', 'id': name}),
('listener', {'port': cls.tester.get_port()}),
('log', {'module': 'DEFAULT', 'enable': 'trace+'}),
('policy', {'enableVhostPolicy': 'yes', 'policyDir': policy_config_path}),
connection
]
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
cls.routers = []
inter_router_port = cls.tester.get_port()
router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port}))
router('B', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port, 'verifyHostName': 'no'}))
# With these configs before DISPATCH-920 the routers never connect
# because the links are disallowed by policy. Before the wait_ready
# functions complete the routers should have tried the interrouter
# link.
cls.routers[0].wait_ready()
cls.routers[1].wait_ready()
cls.routers[0].teardown()
cls.routers[1].teardown()
def test_01_router_links_allowed(self):
with open('../setUpClass/A-2.out', 'r') as router_log:
log_lines = router_log.read().split("\n")
disallow_lines = [s for s in log_lines if "link disallowed" in s]
self.assertTrue(len(disallow_lines) == 0, msg='Inter-router links should be allowed but some were blocked by policy.')
class VhostPolicyNameField(TestCase):
"""
Verify that vhosts can be created getting the name from
'id' or from 'hostname'.
This test relies on qdmanage utility.
"""
@classmethod
def setUpClass(cls):
"""Start the router"""
super(VhostPolicyNameField, cls).setUpClass()
ipv6_enabled = is_ipv6_enabled()
policy_config_path = os.path.join(DIR, 'policy-1')
replacements = {'{IPV6_LOOPBACK}':', ::1'}
for f in os.listdir(policy_config_path):
if f.endswith(".json.in"):
with open(policy_config_path+"/"+f[:-3], 'w') as outfile:
with open(policy_config_path + "/" + f) as infile:
for line in infile:
for src, target in replacements.iteritems():
if ipv6_enabled:
line = line.replace(src, target)
else:
line = line.replace(src, '')
outfile.write(line)
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
('listener', {'port': cls.tester.get_port()}),
('policy', {'maxConnections': 2, 'policyDir': policy_config_path, 'enableVhostPolicy': 'true'})
])
cls.router = cls.tester.qdrouterd('vhost-policy-name-field', config, wait=True)
def address(self):
return self.router.addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', 'u1:password@' + self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception, e:
raise Exception("%s\n%s" % (e, out))
return out
def id_policy(self):
return """
{
"id": "dispatch-918",
"maxConnections": 50,
"maxConnectionsPerHost": 20,
"maxConnectionsPerUser": 8,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 9999,
"maxFrameSize": 222222,
"sources": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targets": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def hostname_policy(self):
return """
{
"hostname": "dispatch-918",
"maxConnections": 51,
"maxConnectionsPerHost": 20,
"maxConnectionsPerUser": 8,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 9999,
"maxFrameSize": 222222,
"sources": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targets": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def both_policy(self):
return """
{
"id": "isogyre",
"hostname": "dispatch-918",
"maxConnections": 52,
"maxConnectionsPerHost": 20,
"maxConnectionsPerUser": 8,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 9999,
"maxFrameSize": 222222,
"sources": "public, private, $management",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targets": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def neither_policy(self):
return """
{
"maxConnections": 53,
"maxConnectionsPerHost": 20,
"maxConnectionsPerUser": 8,
"allowUnknownUser": true,
"groups": {
"$default": {
"allowAnonymousSender": true,
"maxReceivers": 99,
"users": "*",
"maxSessionWindow": 9999,
"maxFrameSize": 222222,
"sources": "public, private, $management, neither_policy",
"maxMessageSize": 222222,
"allowDynamicSource": true,
"remoteHosts": "*",
"maxSessions": 2,
"targets": "public, private, $management",
"maxSenders": 22
}
}
}
"""
def test_01_id_vs_hostname(self):
# verify current vhost count
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 5)
# create using 'id'
self.run_qdmanage('create --type=vhost --name=dispatch-918 --stdin', input=self.id_policy())
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 6)
found = False
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-918':
found = True
self.assertEqual(ruleset['maxConnections'], 50)
break
self.assertTrue(found)
# update using 'hostname'
self.run_qdmanage('update --type=vhost --name=dispatch-918 --stdin', input=self.hostname_policy())
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 6)
found = False
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-918':
found = True
self.assertEqual(ruleset['maxConnections'], 51)
break
self.assertTrue(found)
# update 'id' and 'hostname'
try:
self.run_qdmanage('update --type=vhost --name=dispatch-918 --stdin',
input=self.both_policy())
self.assertTrue(false) # should not be able to update 'id'
except Exception, e:
pass
# update using neither
self.run_qdmanage('update --type=vhost --name=dispatch-918 --stdin', input=self.neither_policy())
rulesets = json.loads(self.run_qdmanage('query --type=vhost'))
self.assertEqual(len(rulesets), 6)
found = False
for ruleset in rulesets:
if ruleset['hostname'] == 'dispatch-918':
found = True
self.assertEqual(ruleset['maxConnections'], 53)
break
self.assertTrue(found)
isoFound = False
for ruleset in rulesets:
if ruleset['hostname'] == 'isogyre':
isoFound = True
break
self.assertFalse(isoFound)
if __name__ == '__main__':
unittest.main(main_module())