| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from qpid.tests.messaging.implementation import * |
| from qpid.tests.messaging import VersionTest |
| from mgmt_1 import Mgmt |
| |
| class Policy(object): |
| def __init__(self): |
| self.lines = [] |
| |
| |
| def str(self): |
| return '\n'.join(lines) |
| |
| class AclCtrl(object): |
| def __init__(self, broker, path): |
| self.policy = path |
| self.original = self.read() |
| conn = Connection.establish(broker, protocol='amqp0-10', username='admin', password='admin', sasl_mechanism='PLAIN') |
| self.agent = Mgmt(conn) |
| self.agent.create('queue', 'acl_test_queue') |
| self.lines = [] |
| |
| def deny(self, user=None, *args): |
| self._add_rule('deny', user, *args) |
| return self |
| |
| def allow(self, user=None, *args): |
| self._add_rule('allow', user, *args) |
| return self |
| |
| def apply(self, allow_admin=True): |
| if allow_admin: |
| # admin users needs permission to send a qmf message to |
| # reload policy |
| self.lines.insert(0, 'acl allow admin@QPID all all') |
| self.specify("\n".join(self.lines)) |
| return self |
| |
| def dump(self): |
| print "\n".join(self.lines) |
| return self |
| |
| def clear(self): |
| self.lines = [] |
| return self |
| |
| def _add_rule(self, deny_or_allow, user=None, *args): |
| elements = ['acl', deny_or_allow] |
| if user: |
| elements.append("%s@QPID" % user) |
| else: |
| elements.append('all') |
| if len(args) > 0: |
| for a in args: |
| elements.append(a) |
| else: |
| elements.append('all') |
| self.lines.append(' '.join(elements)) |
| |
| def read(self): |
| f = open(self.policy,'r') |
| content = f.read() |
| f.close() |
| return content |
| |
| def specify(self, acl): |
| f = open(self.policy,'w') |
| f.write(acl) |
| f.close() |
| self.agent.reload_acl_file() |
| |
| def restore(self): |
| self.agent.delete('queue', 'acl_test_queue') |
| self.specify(self.original) |
| self.agent.close() |
| |
| |
| class Acl_AMQP1_Tests (VersionTest): |
| """ |
| Tests for acl when accessing qpidd via AMQP 1.0 |
| """ |
| def auth_session(self, user): |
| conn = Connection.establish(self.config.broker, protocol='amqp1.0', username=user, password=user, sasl_mechanism='PLAIN', container_id=user) |
| return conn.session() |
| |
| def setUp(self): |
| VersionTest.setup(self) |
| self.acl = AclCtrl(self.config.broker, self.config.defines.get("policy_file")) |
| self.alice = self.auth_session('alice') |
| self.bob = self.auth_session('bob') |
| |
| def tearDown(self): |
| self.bob.connection.close() |
| self.alice.connection.close() |
| self.acl.restore() |
| VersionTest.teardown(self) |
| |
| def test_deny_sender_to_exchange(self): |
| self.acl.allow('alice').deny().apply() |
| try: |
| self.ssn.sender("amq.topic") |
| assert False, "anonymous should not be allowed to create sender to amq.topic" |
| except UnauthorizedAccess: pass |
| try: |
| self.bob.sender("amq.topic") |
| assert False, "bob should not be allowed to create sender to amq.topic" |
| except UnauthorizedAccess: pass |
| self.alice.sender("amq.topic") |
| |
| def test_deny_sender_to_queue(self): |
| self.acl.allow('alice').deny().apply() |
| try: |
| self.ssn.sender("acl_test_queue") |
| assert False, "anonymous shound not be allowed to create sender to acl_test_queue" |
| except UnauthorizedAccess: pass |
| try: |
| self.bob.sender("acl_test_queue") |
| assert False, "bob should not be allowed to create sender to acl_test_queue" |
| except UnauthorizedAccess: pass |
| self.alice.sender("acl_test_queue") |
| |
| def test_deny_sender_to_unknown(self): |
| self.acl.allow('alice').deny().apply() |
| try: |
| self.ssn.sender("unknown") |
| assert False, "anonymous should not be allowed to create sender to non-existent node" |
| except UnauthorizedAccess: pass |
| try: |
| self.bob.sender("unknown") |
| assert False, "bob should not be allowed to create sender to unknown" |
| except UnauthorizedAccess: pass |
| try: |
| self.alice.sender("unknown") |
| except NotFound: pass |
| |
| def test_deny_receiver_to_exchange(self): |
| self.acl.allow('alice').deny().apply() |
| try: |
| self.ssn.receiver("amq.topic") |
| assert False, "anonymous should not be allowed to create receiver from amq.topic" |
| except UnauthorizedAccess: pass |
| try: |
| self.bob.receiver("amq.topic") |
| assert False, "bob should not be allowed to create receiver to amq.topic" |
| except UnauthorizedAccess: pass |
| self.alice.receiver("amq.topic") |
| |
| def test_deny_receiver_to_queue(self): |
| self.acl.allow('alice').deny().apply() |
| try: |
| self.ssn.receiver("acl_test_queue") |
| assert False, "anonymous should not be allowed to create receiver from acl_test_queue" |
| except UnauthorizedAccess: pass |
| try: |
| self.bob.receiver("acl_test_queue") |
| assert False, "bob should not be allowed to create receiver to acl_test_queue" |
| except UnauthorizedAccess: pass |
| self.alice.receiver("acl_test_queue") |
| |
| def test_deny_receiver_to_unknown(self): |
| self.acl.allow('alice').deny().apply() |
| try: |
| self.ssn.receiver("I_dont_exist") |
| assert False, "anonymous should not be allowed to create receiver from non-existent node" |
| except UnauthorizedAccess: pass |
| try: |
| self.bob.receiver("unknown") |
| assert False, "bob should not be allowed to create receiver to unknown" |
| except UnauthorizedAccess: pass |
| try: |
| self.alice.receiver("unknown") |
| except NotFound: pass |
| |
| def test_create_for_receiver_from_exchange(self): |
| self.acl.allow('bob', 'access', 'exchange', 'name=amq.topic') |
| self.acl.allow('bob', 'access', 'queue', 'name=amq.topic') |
| self.acl.allow('alice').deny().apply() |
| try: |
| self.ssn.receiver("amq.topic") |
| assert False, "anonymous should not be allowed to create receiver from amq.topic" |
| except UnauthorizedAccess: pass |
| try: |
| self.bob.receiver("amq.topic") |
| assert False, "bob should not be allowed to create receiver from amq.topic without create permission" |
| except UnauthorizedAccess: pass |
| self.alice.receiver("amq.topic") |
| |
| def test_bind_for_receiver_from_exchange(self): |
| self.acl.allow('bob', 'access', 'exchange', 'name=amq.topic') |
| self.acl.allow('bob', 'access', 'queue', 'name=amq.topic') |
| self.acl.allow('bob', 'create', 'queue', 'name=bob*') |
| self.acl.allow('alice').deny().apply() |
| try: |
| self.ssn.receiver("amq.topic") |
| assert False, "anonymous should not be allowed to create receiver from amq.topic" |
| except UnauthorizedAccess: pass |
| try: |
| self.bob.receiver("amq.topic") |
| assert False, "bob should not be allowed to create receiver from amq.topic without bind permission" |
| except UnauthorizedAccess: pass |
| self.alice.receiver("amq.topic") |
| |
| def test_consume_for_receiver_from_exchange(self): |
| self.acl.allow('bob', 'access', 'exchange', 'name=amq.topic') |
| self.acl.allow('bob', 'access', 'queue', 'name=amq.topic') |
| self.acl.allow('bob', 'create', 'queue', 'name=bob*') |
| self.acl.allow('bob', 'bind', 'exchange', 'name=amq.topic') |
| self.acl.allow('alice').deny().apply() |
| try: |
| self.ssn.receiver("amq.topic") |
| assert False, "anonymous should not be allowed to create receiver from amq.topic" |
| except UnauthorizedAccess: pass |
| try: |
| self.bob.receiver("amq.topic") |
| assert False, "bob should not be allowed to create receiver from amq.topic without consume permission" |
| except UnauthorizedAccess: pass |
| self.alice.receiver("amq.topic") |
| |
| def test_required_permissions_for_receiver_from_exchange(self): |
| self.acl.allow('bob', 'access', 'exchange', 'name=amq.topic') |
| self.acl.allow('bob', 'access', 'queue', 'name=amq.topic') |
| self.acl.allow('bob', 'create', 'queue', 'name=bob*') |
| self.acl.allow('bob', 'bind', 'exchange', 'name=amq.topic') |
| self.acl.allow('bob', 'consume', 'queue', 'name=bob*') |
| self.acl.allow('alice').deny().apply() |
| try: |
| self.ssn.receiver("amq.topic") |
| assert False, "anonymous should not be allowed to create receiver from amq.topic" |
| except UnauthorizedAccess: pass |
| self.bob.receiver("amq.topic") |
| try: |
| self.bob.receiver("amq.direct") |
| assert False, "bob should not be allowed to create receiver from amq.direct" |
| except UnauthorizedAccess: pass |
| |
| def test_publish_to_exchange(self): |
| self.acl.allow('bob', 'access', 'exchange', 'name=amq.topic') |
| self.acl.allow('bob', 'access', 'queue', 'name=amq.topic') |
| self.acl.allow('bob', 'publish', 'exchange', 'name=amq.topic', 'routingkey=abc') |
| self.acl.allow('alice').deny().apply() |
| |
| sender = self.bob.sender("amq.topic") |
| sender.send(Message("a message", subject="abc"), sync=True) |
| try: |
| sender.send(Message("another", subject="def"), sync=True) |
| assert False, "bob should not be allowed to send message to amq.topic with subject 'def'" |
| except UnauthorizedAccess: pass |
| sender = self.alice.sender("amq.topic") |
| sender.send(Message("alice's message", subject="abc"), sync=True) |
| sender.send(Message("another from alice", subject="def"), sync=True) |
| |
| def test_publish_to_anonymous_relay(self): |
| self.acl.allow('bob', 'access', 'exchange', 'name=ANONYMOUS-RELAY') |
| self.acl.allow('bob', 'access', 'queue', 'name=acl_test_queue') |
| self.acl.allow('bob', 'access', 'exchange', 'name=acl_test_queue') |
| self.acl.allow('bob', 'publish', 'exchange', 'routingkey=acl_test_queue') |
| self.acl.allow('bob', 'access', 'exchange', 'name=amq.topic') |
| self.acl.allow('bob', 'access', 'queue', 'name=amq.topic') |
| self.acl.allow('bob', 'publish', 'exchange', 'name=amq.topic', 'routingkey=abc') |
| self.acl.allow('bob', 'access', 'exchange', 'name=amq.direct') |
| self.acl.allow('bob', 'access', 'queue', 'name=amq.direct') |
| self.acl.allow('alice').deny().apply() |
| |
| sender = self.bob.sender("<null>") |
| sender.send(Message("a message", properties={'x-amqp-to':'acl_test_queue'}), sync=True) |
| sender.send(Message("another", subject='abc', properties={'x-amqp-to':'amq.topic'}), sync=True) |
| try: |
| # have access permission, but publish not allowed for given key |
| sender.send(Message("a third", subject='def', properties={'x-amqp-to':'amq.topic'}), sync=True) |
| assert False, "bob should not be allowed to send message to amq.topic with key 'def'" |
| except UnauthorizedAccess: pass |
| sender = self.bob.sender("<null>") |
| try: |
| # have access permission, but no publish |
| sender.send(Message("a fourth", subject='abc', properties={'x-amqp-to':'amq.direct'}), sync=True) |
| assert False, "bob should not be allowed to send message to amq.direct" |
| except UnauthorizedAccess: pass |
| sender = self.bob.sender("<null>") |
| try: |
| # have no access permission |
| sender.send(Message("a fiftth", subject='abc', properties={'x-amqp-to':'amq.fanout'}), sync=True) |
| assert False, "bob should not be allowed to send message to amq.fanout" |
| except UnauthorizedAccess: pass |
| sender = self.bob.sender("<null>") |
| try: |
| # have no access permission |
| sender.send(Message("a sixth", properties={'x-amqp-to':'somewhereelse'}), sync=True) |
| assert False, "bob should not be allowed to send message to somewhere else" |
| except UnauthorizedAccess: pass |
| sender = self.alice.sender("<null>") |
| sender.send(Message("alice's message", properties={'x-amqp-to':'abc'}), sync=True) |
| sender.send(Message("another from alice", properties={'x-amqp-to':'def'}), sync=True) |
| |
| def test_resolution_for_sender_to_exchange(self): |
| self.acl.allow('alice', 'access', 'exchange', 'name=amq.topic') |
| self.acl.allow('alice', 'access', 'queue', 'name=amq.topic') |
| self.acl.allow('bob', 'access', 'exchange', 'name=amq.topic') |
| self.acl.deny().apply() |
| try: |
| self.ssn.sender("amq.topic") |
| assert False, "anonymous should not be allowed to create sender to amq.topic" |
| except UnauthorizedAccess: pass |
| self.bob.sender("amq.topic; {node:{type:topic}}") |
| try: |
| self.bob.sender("amq.topic") |
| assert False, "bob should not be allowed to create sender to amq.topic without specifying the node type" |
| except UnauthorizedAccess: pass |
| self.alice.sender("amq.topic; {node:{type:topic}}") |
| self.alice.sender("amq.topic") |
| try: |
| self.alice.sender("amq.direct") |
| assert False, "alice should not be allowed to create sender to amq.direct" |
| except UnauthorizedAccess: pass |
| |
| def test_resolution_for_sender_to_queue(self): |
| self.acl.allow('alice', 'access', 'exchange', 'name=acl_test_queue') |
| self.acl.allow('alice', 'access', 'queue', 'name=acl_test_queue') |
| self.acl.allow('alice', 'publish', 'exchange', 'routingkey=acl_test_queue') |
| self.acl.allow('bob', 'access', 'queue', 'name=acl_test_queue') |
| self.acl.allow('bob', 'publish', 'exchange', 'routingkey=acl_test_queue') |
| self.acl.deny().apply() |
| try: |
| self.ssn.sender("acl_test_queue") |
| assert False, "anonymous should not be allowed to create sender to acl_test_queue" |
| except UnauthorizedAccess: pass |
| self.bob.sender("acl_test_queue; {node:{type:queue}}") |
| try: |
| self.bob.sender("acl_test_queue") |
| assert False, "bob should not be allowed to create sender to acl_test_queue without specifying the node type" |
| except UnauthorizedAccess: pass |
| self.alice.sender("acl_test_queue; {node:{type:queue}}") |
| self.alice.sender("acl_test_queue") |
| |
| def test_resolution_for_receiver_from_exchange(self): |
| self.acl.allow('alice', 'access', 'exchange', 'name=amq.topic') |
| self.acl.allow('alice', 'access', 'queue', 'name=amq.topic') |
| self.acl.allow('alice', 'create', 'queue') |
| self.acl.allow('alice', 'consume', 'queue') |
| self.acl.allow('alice', 'bind', 'exchange', 'name=amq.topic') |
| self.acl.allow('bob', 'access', 'exchange', 'name=amq.topic') |
| self.acl.allow('bob', 'bind', 'exchange', 'name=amq.topic') |
| self.acl.allow('bob', 'create', 'queue', 'name=bob*', 'autodelete=true') |
| self.acl.allow('bob', 'consume', 'queue', 'name=bob*') |
| self.acl.deny().apply() |
| try: |
| self.ssn.receiver("amq.topic") |
| assert False, "anonymous should not be allowed to create receiver from amq.topic" |
| except UnauthorizedAccess: pass |
| self.bob.receiver("amq.topic; {node:{type:topic}}") |
| try: |
| self.bob.receiver("amq.topic") |
| assert False, "bob should not be allowed to create receiver from amq.topic without specifying the node type" |
| except UnauthorizedAccess: pass |
| self.alice.receiver("amq.topic; {node:{type:topic}}") |
| self.alice.receiver("amq.topic") |
| try: |
| self.alice.receiver("amq.direct") |
| assert False, "alice should not be allowed to create receiver from amq.direct" |
| except UnauthorizedAccess: pass |
| |
| def test_resolution_for_receiver_from_queue(self): |
| self.acl.allow('alice', 'access', 'exchange', 'name=acl_test_queue') |
| self.acl.allow('alice', 'access', 'queue', 'name=acl_test_queue') |
| self.acl.allow('alice', 'consume', 'queue', 'name=acl_test_queue') |
| self.acl.allow('bob', 'access', 'queue', 'name=acl_test_queue') |
| self.acl.allow('bob', 'consume', 'queue', 'name=acl_test_queue') |
| self.acl.deny().apply() |
| try: |
| self.ssn.receiver("acl_test_queue") |
| assert False, "anonymous should not be allowed to create receiver from acl_test_queue" |
| except UnauthorizedAccess: pass |
| self.bob.receiver("acl_test_queue; {node:{type:queue}}") |
| try: |
| self.bob.receiver("acl_test_queue") |
| assert False, "bob should not be allowed to create receiver from acl_test_queue without specifying the node type" |
| except UnauthorizedAccess: pass |
| self.alice.receiver("acl_test_queue; {node:{type:queue}}") |
| self.alice.receiver("acl_test_queue") |