blob: ac0d6a44143b6a71e4a4ab6f43f93e8c5455b4c9 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import unittest, os
from subprocess import PIPE, Popen
import system_test
from system_test import TestCase, Qdrouterd, main_module
class RouterTestPlainSasl(TestCase):
@classmethod
def createSasldb(cls):
pass
@classmethod
def setUpClass(cls):
"""
Tests the sasl_username, sasl_password property of the dispatch router.
Creates two routers (QDR.X and QDR.Y) and sets up PLAIN authentication on QDR.X.
QDR.Y connects to QDR.X by providing a sasl_username and a sasl_password.
"""
super(RouterTestPlainSasl, cls).setUpClass()
# Create a sasl database.
p = Popen(['saslpasswd2', '-c', '-p', '-f', 'qdrouterd.sasldb', '-u', 'domain.com', 'test'],
stdin=PIPE, stdout=PIPE, stderr=PIPE)
result = p.communicate('password')
assert p.returncode == 0, \
"saslpasswd2 exit status %s, output:\n%s" % (p.returncode, result)
# Create a SASL configuration file.
with open('tests-mech-PLAIN.conf', 'w') as sasl_conf:
sasl_conf.write("""
pwcheck_method: auxprop
auxprop_plugin: sasldb
sasldb_path: qdrouterd.sasldb
mech_list: ANONYMOUS DIGEST-MD5 EXTERNAL PLAIN
# The following line stops spurious 'sql_select option missing' errors when cyrus-sql-sasl plugin is installed
sql_select: dummy select
""")
def router(name, connection):
config = [
('router', {'mode': 'interior', 'routerId': 'QDR.%s'%name}),
('fixedAddress', {'prefix': '/closest/', 'fanout': 'single', 'bias': 'closest'}),
('fixedAddress', {'prefix': '/spread/', 'fanout': 'single', 'bias': 'spread'}),
('fixedAddress', {'prefix': '/multicast/', 'fanout': 'multiple'}),
('fixedAddress', {'prefix': '/', 'fanout': 'multiple'}),
] + connection
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
cls.routers = []
x_listener_port = cls.tester.get_port()
y_listener_port = cls.tester.get_port()
router('X', [
('listener', {'addr': '0.0.0.0', 'role': 'inter-router', 'port': x_listener_port,
'saslMechanisms':'PLAIN DIGEST-MD5', 'authenticatePeer': 'yes'}),
# This unauthenticated listener is for qdstat to connect to it.
('listener', {'addr': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(),
'authenticatePeer': 'no'}),
('container', {'workerThreads': 4, 'containerName': 'Qpid.Dispatch.Router.X',
'saslConfigName': 'tests-mech-PLAIN',
'saslConfigPath': os.getcwd()}),
])
router('Y', [
('connector', {'addr': '0.0.0.0', 'role': 'inter-router', 'port': x_listener_port,
# Provide a sasl user name and password to connect to QDR.X
'saslMechanisms': 'PLAIN DIGEST-MD5', 'saslUsername': 'test@domain.com', 'saslPassword': 'password'}),
('container', {'workerThreads': 4, 'containerName': 'Qpid.Dispatch.Router.Y'}),
('listener', {'addr': '0.0.0.0', 'role': 'normal', 'port': y_listener_port}),
])
cls.routers[1].wait_router_connected('QDR.X')
def test_inter_router_plain_exists(self):
"""The setUpClass sets up two routers with SASL PLAIN enabled.
This test makes executes a qdstat -c via an unauthenticated listener to
QDR.X and makes sure that the output has an "inter-router" connection to
QDR.Y whose authentication is PLAIN. This ensures that QDR.Y did not
somehow use SASL ANONYMOUS to connect to QDR.X
"""
p = self.popen(
['qdstat', '-b', str(self.routers[0].addresses[1]), '-c'],
name='qdstat-'+self.id(), stdout=PIPE, expect=None)
out = p.communicate()[0]
assert p.returncode == 0, \
"qdstat exit status %s, output:\n%s" % (p.returncode, out)
self.assertIn("inter-router", out)
self.assertIn("test@domain.com(PLAIN)", out)
if __name__ == '__main__':
unittest.main(main_module())