blob: 7da5ca7aa658fa34d6d13dc93ac4498a7e9e532b [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import unittest, os, time
from subprocess import PIPE, Popen
from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT
from qpid_dispatch.management.client import Node
class RouterTestPlainSaslCommon(TestCase):
@classmethod
def router(cls, name, connection):
config = [
('fixedAddress', {'prefix': '/closest/', 'fanout': 'single', 'bias': 'closest'}),
('fixedAddress', {'prefix': '/spread/', 'fanout': 'single', 'bias': 'spread'}),
('fixedAddress', {'prefix': '/multicast/', 'fanout': 'multiple'}),
('fixedAddress', {'prefix': '/', 'fanout': 'multiple'}),
] + connection
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
@classmethod
def createSaslFiles(cls):
# Create a sasl database.
p = Popen(['saslpasswd2', '-c', '-p', '-f', 'qdrouterd.sasldb', '-u', 'domain.com', 'test'],
stdin=PIPE, stdout=PIPE, stderr=PIPE)
result = p.communicate('password')
assert p.returncode == 0, \
"saslpasswd2 exit status %s, output:\n%s" % (p.returncode, result)
# Create a SASL configuration file.
with open('tests-mech-PLAIN.conf', 'w') as sasl_conf:
sasl_conf.write("""
pwcheck_method: auxprop
auxprop_plugin: sasldb
sasldb_path: qdrouterd.sasldb
mech_list: ANONYMOUS DIGEST-MD5 EXTERNAL PLAIN
# The following line stops spurious 'sql_select option missing' errors when cyrus-sql-sasl plugin is installed
sql_select: dummy select
""")
class RouterTestPlainSasl(RouterTestPlainSaslCommon):
@classmethod
def setUpClass(cls):
"""
Tests the sasl_username, sasl_password property of the dispatch router.
Creates two routers (QDR.X and QDR.Y) and sets up PLAIN authentication on QDR.X.
QDR.Y connects to QDR.X by providing a sasl_username and a sasl_password.
"""
super(RouterTestPlainSasl, cls).setUpClass()
super(RouterTestPlainSasl, cls).createSaslFiles()
cls.routers = []
x_listener_port = cls.tester.get_port()
y_listener_port = cls.tester.get_port()
super(RouterTestPlainSasl, cls).router('X', [
('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': x_listener_port,
'saslMechanisms':'PLAIN', 'authenticatePeer': 'yes'}),
# This unauthenticated listener is for qdstat to connect to it.
('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(),
'authenticatePeer': 'no'}),
('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(),
'saslMechanisms':'PLAIN', 'authenticatePeer': 'yes'}),
('router', {'workerThreads': 1,
'id': 'QDR.X',
'mode': 'interior',
'saslConfigName': 'tests-mech-PLAIN',
'saslConfigPath': os.getcwd()}),
])
super(RouterTestPlainSasl, cls).router('Y', [
('connector', {'host': '0.0.0.0', 'role': 'inter-router', 'port': x_listener_port,
# Provide a sasl user name and password to connect to QDR.X
'saslMechanisms': 'PLAIN',
'saslUsername': 'test@domain.com',
'saslPassword': 'password'}),
('router', {'workerThreads': 1,
'mode': 'interior',
'id': 'QDR.Y'}),
('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': y_listener_port}),
])
cls.routers[1].wait_router_connected('QDR.X')
def test_inter_router_plain_exists(self):
"""The setUpClass sets up two routers with SASL PLAIN enabled.
This test makes executes a qdstat -c via an unauthenticated listener to
QDR.X and makes sure that the output has an "inter-router" connection to
QDR.Y whose authentication is PLAIN. This ensures that QDR.Y did not
somehow use SASL ANONYMOUS to connect to QDR.X
"""
p = self.popen(
['qdstat', '-b', str(self.routers[0].addresses[1]), '-c'],
name='qdstat-'+self.id(), stdout=PIPE, expect=None)
out = p.communicate()[0]
assert p.returncode == 0, \
"qdstat exit status %s, output:\n%s" % (p.returncode, out)
self.assertIn("inter-router", out)
self.assertIn("test@domain.com(PLAIN)", out)
def test_qdstat_connect_sasl(self):
"""
Make qdstat use sasl plain authentication.
"""
p = self.popen(
['qdstat', '-b', str(self.routers[0].addresses[2]), '-c', '--sasl-mechanisms=PLAIN',
'--sasl-username=test@domain.com', '--sasl-password=password'],
name='qdstat-'+self.id(), stdout=PIPE, expect=None)
out = p.communicate()[0]
assert p.returncode == 0, \
"qdstat exit status %s, output:\n%s" % (p.returncode, out)
split_list = out.split()
# There will be 2 connections that have authenticated using SASL PLAIN. One inter-router connection
# and the other connection that this qdstat client is making
self.assertEqual(2, split_list.count("test@domain.com(PLAIN)"))
self.assertEqual(1, split_list.count("inter-router"))
self.assertEqual(1, split_list.count("normal"))
def test_qdstat_connect_sasl_password_file(self):
"""
Make qdstat use sasl plain authentication with client password specified in a file.
"""
password_file = os.getcwd() + '/sasl-client-password-file.txt'
# Create a SASL configuration file.
with open(password_file, 'w') as sasl_client_password_file:
sasl_client_password_file.write("password")
sasl_client_password_file.close()
p = self.popen(
['qdstat', '-b', str(self.routers[0].addresses[2]), '-c', '--sasl-mechanisms=PLAIN',
'--sasl-username=test@domain.com', '--sasl-password-file=' + password_file],
name='qdstat-'+self.id(), stdout=PIPE, expect=None)
out = p.communicate()[0]
assert p.returncode == 0, \
"qdstat exit status %s, output:\n%s" % (p.returncode, out)
split_list = out.split()
# There will be 2 connections that have authenticated using SASL PLAIN. One inter-router connection
# and the other connection that this qdstat client is making
self.assertEqual(2, split_list.count("test@domain.com(PLAIN)"))
self.assertEqual(1, split_list.count("inter-router"))
self.assertEqual(1, split_list.count("normal"))
class RouterTestPlainSaslOverSsl(RouterTestPlainSaslCommon):
@staticmethod
def ssl_file(name):
return os.path.join(DIR, 'ssl_certs', name)
@classmethod
def setUpClass(cls):
"""
Tests the sasl_username, sasl_password property of the dispatch router.
Creates two routers (QDR.X and QDR.Y) and sets up PLAIN authentication on QDR.X.
QDR.Y connects to QDR.X by providing a sasl_username and a sasl_password.
This PLAIN authentication is done over an TLS/SSLv3 connection.
"""
super(RouterTestPlainSaslOverSsl, cls).setUpClass()
super(RouterTestPlainSaslOverSsl, cls).createSaslFiles()
cls.routers = []
x_listener_port = cls.tester.get_port()
y_listener_port = cls.tester.get_port()
super(RouterTestPlainSaslOverSsl, cls).router('X', [
('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': x_listener_port,
'sslProfile':'server-ssl-profile',
'saslMechanisms':'PLAIN', 'authenticatePeer': 'yes'}),
('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(),
'authenticatePeer': 'no'}),
('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(),
'sslProfile':'server-ssl-profile',
'saslMechanisms':'PLAIN', 'authenticatePeer': 'yes'}),
('sslProfile', {'name': 'server-ssl-profile',
'certDb': cls.ssl_file('ca-certificate.pem'),
'certFile': cls.ssl_file('server-certificate.pem'),
'keyFile': cls.ssl_file('server-private-key.pem'),
'password': 'server-password'}),
('router', {'workerThreads': 1,
'id': 'QDR.X',
'mode': 'interior',
'saslConfigName': 'tests-mech-PLAIN',
'saslConfigPath': os.getcwd()}),
])
super(RouterTestPlainSaslOverSsl, cls).router('Y', [
# This router will act like a client. First an SSL connection will be established and then
# we will have SASL plain authentication over SSL.
('connector', {'host': '0.0.0.0', 'role': 'inter-router', 'port': x_listener_port,
'sslProfile': 'client-ssl-profile',
'verifyHostName': 'no',
# Provide a sasl user name and password to connect to QDR.X
'saslMechanisms': 'PLAIN',
'saslUsername': 'test@domain.com',
'saslPassword': 'password'}),
('router', {'workerThreads': 1,
'mode': 'interior',
'id': 'QDR.Y'}),
('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': y_listener_port}),
('sslProfile', {'name': 'client-ssl-profile',
'certDb': cls.ssl_file('ca-certificate.pem'),
'certFile': cls.ssl_file('client-certificate.pem'),
'keyFile': cls.ssl_file('client-private-key.pem'),
'password': 'client-password'}),
])
cls.routers[1].wait_router_connected('QDR.X')
def test_aaa_qdstat_connect_sasl_over_ssl(self):
"""
Make qdstat use sasl plain authentication over ssl.
"""
p = self.popen(
['qdstat', '-b', str(self.routers[0].addresses[2]), '-c',
# The following are SASL args
'--sasl-mechanisms=PLAIN',
'--sasl-username=test@domain.com',
'--sasl-password=password',
# The following are SSL args
'--ssl-disable-peer-name-verify',
'--ssl-trustfile=' + self.ssl_file('ca-certificate.pem'),
'--ssl-certificate=' + self.ssl_file('client-certificate.pem'),
'--ssl-key=' + self.ssl_file('client-private-key.pem'),
'--ssl-password=client-password'],
name='qdstat-'+self.id(), stdout=PIPE, expect=None)
out = p.communicate()[0]
assert p.returncode == 0, \
"qdstat exit status %s, output:\n%s" % (p.returncode, out)
split_list = out.split()
# There will be 2 connections that have authenticated using SASL PLAIN. One inter-router connection
# and the other connection that this qdstat client is making
self.assertEqual(2, split_list.count("test@domain.com(PLAIN)"))
self.assertEqual(1, split_list.count("inter-router"))
self.assertEqual(1, split_list.count("normal"))
def test_inter_router_plain_over_ssl_exists(self):
"""The setUpClass sets up two routers with SASL PLAIN enabled over TLS/SSLv3.
This test makes executes a query for type='org.apache.qpid.dispatch.connection' over
an unauthenticated listener to
QDR.X and makes sure that the output has an "inter-router" connection to
QDR.Y whose authentication is PLAIN. This ensures that QDR.Y did not
somehow use SASL ANONYMOUS to connect to QDR.X
Also makes sure that TLSv1/SSLv3 was used as sslProto
"""
local_node = Node.connect(self.routers[0].addresses[1], timeout=TIMEOUT)
# sslProto should be TLSv1/SSLv3
self.assertEqual(u'TLSv1/SSLv3', local_node.query(type='org.apache.qpid.dispatch.connection').results[0][4])
# role should be inter-router
self.assertEqual(u'inter-router', local_node.query(type='org.apache.qpid.dispatch.connection').results[0][9])
# sasl must be plain
self.assertEqual(u'PLAIN', local_node.query(type='org.apache.qpid.dispatch.connection').results[0][12])
# user must be test@domain.com
self.assertEqual(u'test@domain.com', local_node.query(type='org.apache.qpid.dispatch.connection').results[0][16])
class RouterTestVerifyHostNameYes(RouterTestPlainSaslCommon):
@staticmethod
def ssl_file(name):
return os.path.join(DIR, 'ssl_certs', name)
@classmethod
def setUpClass(cls):
"""
Tests the verifyHostName property of the connector. The hostname on the server certificate we use is
A1.Good.Server.domain.com and the host is 0.0.0.0 on the client router initiating the SSL connection.
Since the host names do not match and the verifyHostName is set to true, the client router
will NOT be able make a successful SSL connection the server router.
"""
super(RouterTestVerifyHostNameYes, cls).setUpClass()
super(RouterTestVerifyHostNameYes, cls).createSaslFiles()
cls.routers = []
x_listener_port = cls.tester.get_port()
y_listener_port = cls.tester.get_port()
super(RouterTestVerifyHostNameYes, cls).router('X', [
('listener', {'addr': '0.0.0.0', 'role': 'inter-router', 'port': x_listener_port,
'sslProfile':'server-ssl-profile',
'saslMechanisms':'PLAIN', 'authenticatePeer': 'yes'}),
# This unauthenticated listener is for qdstat to connect to it.
('listener', {'addr': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(),
'authenticatePeer': 'no'}),
('sslProfile', {'name': 'server-ssl-profile',
'certDb': cls.ssl_file('ca-certificate.pem'),
'certFile': cls.ssl_file('server-certificate.pem'),
'keyFile': cls.ssl_file('server-private-key.pem'),
'password': 'server-password'}),
('router', {'workerThreads': 1,
'routerId': 'QDR.X',
'mode': 'interior',
'saslConfigName': 'tests-mech-PLAIN',
'saslConfigPath': os.getcwd()}),
])
super(RouterTestVerifyHostNameYes, cls).router('Y', [
('connector', {'addr': '127.0.0.1', 'role': 'inter-router', 'port': x_listener_port,
'sslProfile': 'client-ssl-profile',
'verifyHostName': 'yes',
'saslMechanisms': 'PLAIN',
'saslUsername': 'test@domain.com',
'saslPassword': 'password'}),
('router', {'workerThreads': 1,
'mode': 'interior',
'routerId': 'QDR.Y'}),
('listener', {'addr': '0.0.0.0', 'role': 'normal', 'port': y_listener_port}),
('sslProfile', {'name': 'client-ssl-profile',
'certDb': cls.ssl_file('ca-certificate.pem'),
'certFile': cls.ssl_file('client-certificate.pem'),
'keyFile': cls.ssl_file('client-private-key.pem'),
'password': 'client-password'}),
])
cls.routers[0].wait_ports()
cls.routers[1].wait_ports()
try:
# This will time out because there is no inter-router connection
cls.routers[1].wait_connectors(timeout=3)
except:
pass
def test_no_inter_router_connection(self):
"""
Tests to make sure that there are no 'inter-router' connections.
The connection to the other router will not happen because the connection failed
due to setting 'verifyHostName': 'yes'
"""
local_node = Node.connect(self.routers[1].addresses[0], timeout=TIMEOUT)
# There should be only two connections.
# There will be no inter-router connection
self.assertEqual(2, len(local_node.query(type='org.apache.qpid.dispatch.connection').results))
self.assertEqual('in', local_node.query(type='org.apache.qpid.dispatch.connection').results[0][15])
self.assertEqual('normal', local_node.query(type='org.apache.qpid.dispatch.connection').results[0][9])
self.assertEqual('anonymous', local_node.query(type='org.apache.qpid.dispatch.connection').results[0][16])
self.assertEqual('normal', local_node.query(type='org.apache.qpid.dispatch.connection').results[1][9])
self.assertEqual('anonymous', local_node.query(type='org.apache.qpid.dispatch.connection').results[1][16])
class RouterTestVerifyHostNameNo(RouterTestPlainSaslCommon):
@staticmethod
def ssl_file(name):
return os.path.join(DIR, 'ssl_certs', name)
@classmethod
def setUpClass(cls):
"""
Tests the verifyHostName property of the connector. The hostname on the server certificate we use is
A1.Good.Server.domain.com and the host is 0.0.0.0 on the client router initiating the SSL connection.
Since the host names do not match but verifyHostName is set to false, the client router
will be successfully able to make an SSL connection the server router.
"""
super(RouterTestVerifyHostNameNo, cls).setUpClass()
super(RouterTestVerifyHostNameNo, cls).createSaslFiles()
cls.routers = []
x_listener_port = cls.tester.get_port()
y_listener_port = cls.tester.get_port()
super(RouterTestVerifyHostNameNo, cls).router('X', [
('listener', {'addr': '0.0.0.0', 'role': 'inter-router', 'port': x_listener_port,
'sslProfile':'server-ssl-profile',
'saslMechanisms':'PLAIN', 'authenticatePeer': 'yes'}),
# This unauthenticated listener is for qdstat to connect to it.
('listener', {'addr': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(),
'authenticatePeer': 'no'}),
('sslProfile', {'name': 'server-ssl-profile',
'certDb': cls.ssl_file('ca-certificate.pem'),
'certFile': cls.ssl_file('server-certificate.pem'),
'keyFile': cls.ssl_file('server-private-key.pem'),
'password': 'server-password'}),
('router', {'workerThreads': 1,
'routerId': 'QDR.X',
'mode': 'interior',
'saslConfigName': 'tests-mech-PLAIN',
'saslConfigPath': os.getcwd()}),
])
super(RouterTestVerifyHostNameNo, cls).router('Y', [
# This router will act like a client. First an SSL connection will be established and then
# we will have SASL plain authentication over SSL.
('connector', {'addr': '127.0.0.1', 'role': 'inter-router', 'port': x_listener_port,
'sslProfile': 'client-ssl-profile',
# Provide a sasl user name and password to connect to QDR.X
'saslMechanisms': 'PLAIN',
'verifyHostName': 'no',
'saslUsername': 'test@domain.com', 'saslPassword': 'password'}),
('router', {'workerThreads': 1,
'mode': 'interior',
'routerId': 'QDR.Y'}),
('listener', {'addr': '0.0.0.0', 'role': 'normal', 'port': y_listener_port}),
('sslProfile', {'name': 'client-ssl-profile',
'certDb': cls.ssl_file('ca-certificate.pem'),
'certFile': cls.ssl_file('client-certificate.pem'),
'keyFile': cls.ssl_file('client-private-key.pem'),
'password': 'client-password'}),
])
cls.routers[0].wait_ports()
cls.routers[1].wait_ports()
cls.routers[1].wait_router_connected('QDR.X')
def test_inter_router_plain_over_ssl_exists(self):
"""
Tests to make sure that an inter-router connection exists between the routers since verifyHostName is 'no'.
"""
local_node = Node.connect(self.routers[1].addresses[0], timeout=TIMEOUT)
results = local_node.query(type='org.apache.qpid.dispatch.connection').results
self.assertEqual(4, len(results))
search = "QDR.X"
found = False
for N in range(0,3):
if results[N][0] == search:
found = True
break
self.assertTrue(found, "Connection to %s not found" % search)
# sslProto should be TLSv1/SSLv3
self.assertEqual(u'TLSv1/SSLv3', results[N][4])
# role should be inter-router
self.assertEqual(u'inter-router', results[N][9])
# sasl must be plain
self.assertEqual(u'PLAIN', results[N][12])
# user must be test@domain.com
self.assertEqual(u'test@domain.com', results[N][16])
if __name__ == '__main__':
unittest.main(main_module())