| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import json |
| |
| from system_test import unittest |
| |
| from qpid_dispatch_internal.policy.policy_util import HostAddr, is_ipv6_enabled |
| from qpid_dispatch_internal.policy.policy_util import HostStruct |
| from qpid_dispatch_internal.policy.policy_util import PolicyError |
| from qpid_dispatch_internal.policy.policy_util import PolicyAppConnectionMgr |
| from qpid_dispatch_internal.policy.policy_local import PolicyLocal |
| from system_test import TestCase, main_module |
| |
| |
| class PolicyHostAddrTest(TestCase): |
| |
| def expect_deny(self, badhostname, msg): |
| denied = False |
| try: |
| xxx = HostStruct(badhostname) |
| except PolicyError: |
| denied = True |
| self.assertTrue(denied, ("%s" % msg)) |
| |
| def check_hostaddr_match(self, tHostAddr, tString, expectOk=True): |
| # check that the string is a match for the addr |
| # check that the internal struct version matches, too |
| ha = HostStruct(tString) |
| if expectOk: |
| self.assertTrue(tHostAddr.match_str(tString)) |
| self.assertTrue(tHostAddr.match_bin(ha)) |
| else: |
| self.assertFalse(tHostAddr.match_str(tString)) |
| self.assertFalse(tHostAddr.match_bin(ha)) |
| |
| def test_policy_hostaddr_ipv4(self): |
| # Create simple host and range |
| aaa = HostAddr("192.168.1.1") |
| bbb = HostAddr("1.1.1.1,1.1.1.255") |
| # Verify host and range |
| self.check_hostaddr_match(aaa, "192.168.1.1") |
| self.check_hostaddr_match(aaa, "1.1.1.1", False) |
| self.check_hostaddr_match(aaa, "192.168.1.2", False) |
| self.check_hostaddr_match(bbb, "1.1.1.1") |
| self.check_hostaddr_match(bbb, "1.1.1.254") |
| self.check_hostaddr_match(bbb, "1.1.1.0", False) |
| self.check_hostaddr_match(bbb, "1.1.2.0", False) |
| |
| def test_policy_hostaddr_ipv6(self): |
| if not is_ipv6_enabled(): |
| self.skipTest("System IPv6 support is not available") |
| # Create simple host and range |
| aaa = HostAddr("::1") |
| bbb = HostAddr("::1,::ffff") |
| ccc = HostAddr("ffff::0,ffff:ffff::0") |
| # Verify host and range |
| self.check_hostaddr_match(aaa, "::1") |
| self.check_hostaddr_match(aaa, "::2", False) |
| self.check_hostaddr_match(aaa, "ffff:ffff::0", False) |
| self.check_hostaddr_match(bbb, "::1") |
| self.check_hostaddr_match(bbb, "::fffe") |
| self.check_hostaddr_match(bbb, "::1:0", False) |
| self.check_hostaddr_match(bbb, "ffff::0", False) |
| self.check_hostaddr_match(ccc, "ffff::1") |
| self.check_hostaddr_match(ccc, "ffff:fffe:ffff:ffff::ffff") |
| self.check_hostaddr_match(ccc, "ffff:ffff::1", False) |
| self.check_hostaddr_match(ccc, "ffff:ffff:ffff:ffff::ffff", False) |
| |
| def test_policy_hostaddr_ipv4_wildcard(self): |
| aaa = HostAddr("*") |
| self.check_hostaddr_match(aaa, "0.0.0.0") |
| self.check_hostaddr_match(aaa, "127.0.0.1") |
| self.check_hostaddr_match(aaa, "255.254.253.252") |
| |
| def test_policy_hostaddr_ipv6_wildcard(self): |
| if not is_ipv6_enabled(): |
| self.skipTest("System IPv6 support is not available") |
| aaa = HostAddr("*") |
| self.check_hostaddr_match(aaa, "::0") |
| self.check_hostaddr_match(aaa, "::1") |
| self.check_hostaddr_match(aaa, "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff") |
| |
| def test_policy_malformed_hostaddr_ipv4(self): |
| self.expect_deny("0.0.0.0.0", "Name or service not known") |
| self.expect_deny("1.1.1.1,2.2.2.2,3.3.3.3", "arg count") |
| self.expect_deny("9.9.9.9,8.8.8.8", "a > b") |
| |
| def test_policy_malformed_hostaddr_ipv6(self): |
| if not is_ipv6_enabled(): |
| self.skipTest("System IPv6 support is not available") |
| self.expect_deny("1::2::3", "Name or service not known") |
| self.expect_deny("::1,::2,::3", "arg count") |
| self.expect_deny("0:ff:0,0:fe:ffff:ffff::0", "a > b") |
| |
| |
| class QpidDispatch(object): |
| def qd_dispatch_policy_c_counts_alloc(self): |
| return 100 |
| |
| def qd_dispatch_policy_c_counts_refresh(self, cstats, entitymap): |
| pass |
| |
| |
| class MockAgent(object): |
| def __init__(self): |
| self.qd = QpidDispatch() |
| |
| def add_implementation(self, entity, cfg_obj_name): |
| pass |
| |
| |
| class MockPolicyManager(object): |
| def __init__(self): |
| self.agent = MockAgent() |
| self.logs = [] |
| |
| def log_debug(self, text): |
| print("DEBUG: %s" % text) |
| self.logs.append(text) |
| |
| def log_info(self, text): |
| print("INFO: %s" % text) |
| self.logs.append(text) |
| |
| def log_trace(self, text): |
| print("TRACE: %s" % text) |
| self.logs.append(text) |
| |
| def log_error(self, text): |
| print("ERROR: %s" % text) |
| self.logs.append(text) |
| |
| def log_warning(self, text): |
| print("WARNING: %s" % text) |
| self.logs.append(text) |
| |
| def get_agent(self): |
| return self.agent |
| |
| |
| class PolicyFile(TestCase): |
| |
| manager = MockPolicyManager() |
| policy = PolicyLocal(manager) |
| policy.test_load_config() |
| |
| def test_policy1_test_zeke_ok(self): |
| p1 = PolicyFile.policy.lookup_user('zeke', '192.168.100.5', 'photoserver', '192.168.100.5:33333', 1) |
| self.assertTrue(p1 == 'test') |
| upolicy = {} |
| self.assertTrue( |
| PolicyFile.policy.lookup_settings('photoserver', p1, upolicy) |
| ) |
| self.assertTrue(upolicy['maxFrameSize'] == 444444) |
| self.assertTrue(upolicy['maxMessageSize'] == 444444) |
| self.assertTrue(upolicy['maxSessionWindow'] == 444444) |
| self.assertTrue(upolicy['maxSessions'] == 4) |
| self.assertTrue(upolicy['maxSenders'] == 44) |
| self.assertTrue(upolicy['maxReceivers'] == 44) |
| self.assertTrue(upolicy['allowAnonymousSender']) |
| self.assertTrue(upolicy['allowDynamicSource']) |
| self.assertTrue(upolicy['targets'] == 'a,private,') |
| self.assertTrue(upolicy['sources'] == 'a,private,') |
| |
| def test_policy1_test_zeke_bad_IP(self): |
| self.assertTrue( |
| PolicyFile.policy.lookup_user('zeke', '10.18.0.1', 'photoserver', "connid", 2) == '') |
| self.assertTrue( |
| PolicyFile.policy.lookup_user('zeke', '72.135.2.9', 'photoserver', "connid", 3) == '') |
| self.assertTrue( |
| PolicyFile.policy.lookup_user('zeke', '127.0.0.1', 'photoserver', "connid", 4) == '') |
| |
| def test_policy1_test_zeke_bad_app(self): |
| self.assertTrue( |
| PolicyFile.policy.lookup_user('zeke', '192.168.100.5', 'galleria', "connid", 5) == '') |
| |
| def test_policy1_test_users_same_permissions(self): |
| zname = PolicyFile.policy.lookup_user('zeke', '192.168.100.5', 'photoserver', '192.168.100.5:33333', 6) |
| yname = PolicyFile.policy.lookup_user('ynot', '10.48.255.254', 'photoserver', '192.168.100.5:33334', 7) |
| self.assertTrue(zname == yname) |
| |
| def test_policy1_lookup_unknown_application(self): |
| upolicy = {} |
| self.assertFalse( |
| PolicyFile.policy.lookup_settings('unknown', 'doesntmatter', upolicy) |
| ) |
| |
| def test_policy1_lookup_unknown_usergroup(self): |
| upolicy = {} |
| self.assertFalse( |
| PolicyFile.policy.lookup_settings('photoserver', 'unknown', upolicy) |
| ) |
| |
| |
| class PolicyFileApplicationFallback(TestCase): |
| manager = MockPolicyManager() |
| policy = PolicyLocal(manager) |
| policy.test_load_config() |
| |
| def test_bad_app_fallback(self): |
| # Show that with no fallback the user cannot connect |
| self.assertTrue( |
| self.policy.lookup_user('zeke', '192.168.100.5', 'galleria', "connid", 5) == '') |
| |
| # Enable the fallback defaultVhost and show the same user can now connect |
| self.policy.set_default_vhost('photoserver') |
| settingsname = self.policy.lookup_user('zeke', '192.168.100.5', 'galleria', "connid", 5) |
| self.assertTrue(settingsname == 'test') |
| |
| # Show that the fallback settings are returned |
| upolicy = {} |
| self.assertTrue( |
| self.policy.lookup_settings('phony*app*name', settingsname, upolicy) |
| ) |
| self.assertTrue(upolicy['maxFrameSize'] == 444444) |
| self.assertTrue(upolicy['maxMessageSize'] == 444444) |
| self.assertTrue(upolicy['maxSessionWindow'] == 444444) |
| self.assertTrue(upolicy['maxSessions'] == 4) |
| self.assertTrue(upolicy['maxSenders'] == 44) |
| self.assertTrue(upolicy['maxReceivers'] == 44) |
| self.assertTrue(upolicy['allowAnonymousSender']) |
| self.assertTrue(upolicy['allowDynamicSource']) |
| self.assertTrue(upolicy['targets'] == 'a,private,') |
| self.assertTrue(upolicy['sources'] == 'a,private,') |
| |
| # Disable fallback and show failure again |
| self.policy.set_default_vhost('') |
| self.assertTrue( |
| self.policy.lookup_user('zeke', '192.168.100.5', 'galleria', "connid", 5) == '') |
| |
| |
| class PolicyAppConnectionMgrTests(TestCase): |
| |
| def test_policy_app_conn_mgr_fail_by_total(self): |
| stats = PolicyAppConnectionMgr(1, 2, 2) |
| diags = [] |
| self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags, None, None)) |
| self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None)) |
| self.assertTrue(len(diags) == 1) |
| self.assertIn('application connection limit', diags[0]) |
| |
| def test_policy_app_conn_mgr_fail_by_user(self): |
| stats = PolicyAppConnectionMgr(3, 1, 2) |
| diags = [] |
| self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags, None, None)) |
| self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None)) |
| self.assertTrue(len(diags) == 1) |
| self.assertIn('per user', diags[0]) |
| diags = [] |
| self.assertTrue(stats.can_connect('10.10.10.10:10002', 'chuck', '10.10.10.10', diags, 2, None)) |
| self.assertFalse(stats.can_connect('10.10.10.10:10003', 'chuck', '10.10.10.10', diags, 2, None)) |
| |
| def test_policy_app_conn_mgr_fail_by_hosts(self): |
| stats = PolicyAppConnectionMgr(3, 2, 1) |
| diags = [] |
| self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags, None, None)) |
| self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None)) |
| self.assertTrue(len(diags) == 1) |
| self.assertIn('per host', diags[0]) |
| diags = [] |
| self.assertTrue(stats.can_connect('10.10.10.10:10002', 'chuck', '10.10.10.10', diags, None, 2)) |
| self.assertFalse(stats.can_connect('10.10.10.10:10003', 'chuck', '10.10.10.10', diags, None, 2)) |
| |
| def test_policy_app_conn_mgr_fail_by_user_hosts(self): |
| stats = PolicyAppConnectionMgr(3, 1, 1) |
| diags = [] |
| self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags, None, None)) |
| self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None)) |
| self.assertTrue(len(diags) == 2) |
| success = 'per user' in diags[0] or 'per user' in diags[1] |
| self.assertTrue(success) |
| diags = [] |
| self.assertTrue(stats.can_connect('10.10.10.10:10002', 'chuck', '10.10.10.10', diags, 2, 2)) |
| self.assertFalse(stats.can_connect('10.10.10.10:10003', 'chuck', '10.10.10.10', diags, 2, 2)) |
| |
| def test_policy_app_conn_mgr_update(self): |
| stats = PolicyAppConnectionMgr(3, 1, 2) |
| diags = [] |
| self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags, None, None)) |
| self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None)) |
| self.assertTrue(len(diags) == 1) |
| self.assertIn('per user', diags[0]) |
| diags = [] |
| stats.update(3, 2, 2) |
| self.assertTrue(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None)) |
| |
| def test_policy_app_conn_mgr_disconnect(self): |
| stats = PolicyAppConnectionMgr(3, 1, 2) |
| diags = [] |
| self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags, None, None)) |
| self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None)) |
| self.assertTrue(len(diags) == 1) |
| self.assertIn('per user', diags[0]) |
| diags = [] |
| stats.disconnect("10.10.10.10:10000", 'chuck', '10.10.10.10') |
| self.assertTrue(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None)) |
| |
| def test_policy_app_conn_mgr_create_bad_settings(self): |
| denied = False |
| try: |
| stats = PolicyAppConnectionMgr(-3, 1, 2) |
| except PolicyError: |
| denied = True |
| self.assertTrue(denied, "Failed to detect negative setting value.") |
| |
| def test_policy_app_conn_mgr_update_bad_settings(self): |
| denied = False |
| try: |
| stats = PolicyAppConnectionMgr(0, 0, 0) |
| except PolicyError: |
| denied = True |
| self.assertFalse(denied, "Should allow all zeros.") |
| try: |
| stats.update(0, -1, 0) |
| except PolicyError: |
| denied = True |
| self.assertTrue(denied, "Failed to detect negative setting value.") |
| |
| def test_policy_app_conn_mgr_larger_counts(self): |
| stats = PolicyAppConnectionMgr(10000, 10000, 10000) |
| diags = [] |
| for i in range(0, 10000): |
| self.assertTrue(stats.can_connect('1.1.1.1:' + str(i), 'chuck', '1.1.1.1', diags, None, None)) |
| self.assertTrue(len(diags) == 0) |
| self.assertFalse(stats.can_connect('1.1.1.1:10000', 'chuck', '1.1.1.1', diags, None, None)) |
| self.assertTrue(len(diags) == 3) |
| self.assertTrue(stats.connections_active == 10000) |
| self.assertTrue(stats.connections_approved == 10000) |
| self.assertTrue(stats.connections_denied == 1) |
| |
| |
| class PolicyAliases(TestCase): |
| |
| # |
| def test_AliasesRenameOwnVhost(self): |
| config_str = """ |
| [{ |
| "hostname": "$default", |
| "allowUnknownUser": true, |
| "aliases": "$default", |
| "groups": { |
| "$default": { |
| "remoteHosts": "*", |
| "allowDynamicSource": true, |
| "allowAnonymousSender": true, |
| "sources": "$management, examples, q1", |
| "targets": "$management, examples, q1", |
| "maxSessions": 1 |
| } |
| } |
| }] |
| """ |
| manager = MockPolicyManager() |
| policy = PolicyLocal(manager) |
| ruleset = json.loads(config_str) |
| denied = False |
| try: |
| policy.create_ruleset(ruleset[0]) |
| except PolicyError: |
| denied = True |
| self.assertTrue(denied, "Ruleset duplicates vhost and alias but condition not detected.") |
| |
| # |
| def test_SameAliasOnTwoVhosts(self): |
| config_str = """ |
| [{ |
| "hostname": "$default", |
| "aliases": "a,b,c,d,e", |
| "groups": { |
| "$default": { |
| "maxSessions": 1 |
| } |
| } |
| }, |
| { |
| "hostname": "doshormigas", |
| "aliases": "i,h,g,f,e", |
| "groups": { |
| "$default": { |
| "maxSessions": 1 |
| } |
| } |
| }] |
| """ |
| manager = MockPolicyManager() |
| policy = PolicyLocal(manager) |
| ruleset = json.loads(config_str) |
| denied = False |
| try: |
| policy.create_ruleset(ruleset[0]) |
| policy.create_ruleset(ruleset[1]) |
| except PolicyError as e: |
| denied = True |
| self.assertTrue(denied, "Rulesets duplicate same alias in two vhosts but condition not detected.") |
| |
| # |
| def test_AliasConflictsWithVhost(self): |
| config_str = """ |
| [{ |
| "hostname": "$default", |
| "groups": { |
| "$default": { |
| "maxSessions": 1 |
| } |
| } |
| }, |
| { |
| "hostname": "conflict-with-vhost", |
| "aliases": "$default", |
| "groups": { |
| "$default": { |
| "maxSessions": 1 |
| } |
| } |
| }] |
| """ |
| manager = MockPolicyManager() |
| policy = PolicyLocal(manager) |
| ruleset = json.loads(config_str) |
| denied = False |
| try: |
| policy.create_ruleset(ruleset[0]) |
| policy.create_ruleset(ruleset[1]) |
| except PolicyError as e: |
| denied = True |
| self.assertTrue(denied, "Ruleset alias names other vhost but condition not detected.") |
| |
| # |
| def test_AliasOperationalLookup(self): |
| manager = MockPolicyManager() |
| policy = PolicyLocal(manager) |
| policy.test_load_config() |
| |
| # For this test the test config defines vhost 'photoserver'. |
| # This test accesses that vhost using the alias name 'antialias'. |
| settingsname = policy.lookup_user('zeke', '192.168.100.5', 'antialias', "connid", 5) |
| self.assertTrue(settingsname == 'test') |
| |
| upolicy = {} |
| self.assertTrue( |
| policy.lookup_settings('antialias', settingsname, upolicy) |
| ) |
| self.assertTrue(upolicy['maxFrameSize'] == 444444) |
| self.assertTrue(upolicy['sources'] == 'a,private,') |
| |
| |
| if __name__ == '__main__': |
| unittest.main(main_module()) |