| ''' |
| http://www.opensource.org/licenses/mit-license.php |
| |
| Copyright 2007-2011 David Alan Cridland |
| Copyright 2011 Lance Stout |
| Copyright 2012 Tyler L Hobbs |
| |
| Permission is hereby granted, free of charge, to any person obtaining a copy of this |
| software and associated documentation files (the "Software"), to deal in the Software |
| without restriction, including without limitation the rights to use, copy, modify, merge, |
| publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons |
| to whom the Software is furnished to do so, subject to the following conditions: |
| |
| The above copyright notice and this permission notice shall be included in all copies or |
| substantial portions of the Software. |
| |
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, |
| INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR |
| PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE |
| FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR |
| OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER |
| DEALINGS IN THE SOFTWARE. |
| ''' |
| # This file was generated by referring test cases from the pure-sasl repo i.e. https://github.com/thobbs/pure-sasl/tree/master/tests/unit |
| # and by refactoring them to cover wrapper functions in sasl_compat.py along with added coverage for functions exclusive to sasl_compat.py. |
| |
| import unittest |
| import base64 |
| import hashlib |
| import hmac |
| import kerberos |
| from mock import patch |
| import six |
| import struct |
| from puresasl import SASLProtocolException, QOP |
| from puresasl.client import SASLError |
| from pyhive.sasl_compat import PureSASLClient, error_catcher |
| |
| |
| class TestPureSASLClient(unittest.TestCase): |
| """Test cases for initialization of SASL client using PureSASLClient class""" |
| |
| def setUp(self): |
| self.sasl_kwargs = {} |
| self.sasl = PureSASLClient('localhost', **self.sasl_kwargs) |
| |
| def test_start_no_mechanism(self): |
| """Test starting SASL authentication with no mechanism.""" |
| success, mechanism, response = self.sasl.start(mechanism=None) |
| self.assertFalse(success) |
| self.assertIsNone(mechanism) |
| self.assertIsNone(response) |
| self.assertEqual(self.sasl.getError(), 'None of the mechanisms listed meet all required properties') |
| |
| def test_start_wrong_mechanism(self): |
| """Test starting SASL authentication with a single unsupported mechanism.""" |
| success, mechanism, response = self.sasl.start(mechanism='WRONG') |
| self.assertFalse(success) |
| self.assertEqual(mechanism, 'WRONG') |
| self.assertIsNone(response) |
| self.assertEqual(self.sasl.getError(), 'None of the mechanisms listed meet all required properties') |
| |
| def test_start_list_of_invalid_mechanisms(self): |
| """Test starting SASL authentication with a list of unsupported mechanisms.""" |
| self.sasl.start(['invalid1', 'invalid2']) |
| self.assertEqual(self.sasl.getError(), 'None of the mechanisms listed meet all required properties') |
| |
| def test_start_list_of_valid_mechanisms(self): |
| """Test starting SASL authentication with a list of supported mechanisms.""" |
| self.sasl.start(['PLAIN', 'DIGEST-MD5', 'CRAM-MD5']) |
| # Validate right mechanism is chosen based on score. |
| self.assertEqual(self.sasl._chosen_mech.name, 'DIGEST-MD5') |
| |
| def test_error_catcher_no_error(self): |
| """Test the error_catcher with no error.""" |
| with error_catcher(self.sasl): |
| result, _, _ = self.sasl.start(mechanism='ANONYMOUS') |
| |
| self.assertEqual(self.sasl.getError(), None) |
| self.assertEqual(result, True) |
| |
| def test_error_catcher_with_error(self): |
| """Test the error_catcher with an error.""" |
| with error_catcher(self.sasl): |
| result, _, _ = self.sasl.start(mechanism='WRONG') |
| |
| self.assertEqual(result, False) |
| self.assertEqual(self.sasl.getError(), 'None of the mechanisms listed meet all required properties') |
| |
| """Assuming Client initilization went well and a mechanism is chosen, Below are the test cases for different mechanims""" |
| |
| class _BaseMechanismTests(unittest.TestCase): |
| """Base test case for SASL mechanisms.""" |
| |
| mechanism = 'ANONYMOUS' |
| sasl_kwargs = {} |
| |
| def setUp(self): |
| self.sasl = PureSASLClient('localhost', mechanism=self.mechanism, **self.sasl_kwargs) |
| self.mechanism_class = self.sasl._chosen_mech |
| |
| def test_init_basic(self, *args): |
| sasl = PureSASLClient('localhost', mechanism=self.mechanism, **self.sasl_kwargs) |
| mech = sasl._chosen_mech |
| self.assertIs(mech.sasl, sasl) |
| |
| def test_step_basic(self, *args): |
| success, response = self.sasl.step(six.b('string')) |
| self.assertTrue(success) |
| self.assertIsInstance(response, six.binary_type) |
| |
| def test_decode_encode(self, *args): |
| self.assertEqual(self.sasl.encode('msg'), (False, None)) |
| self.assertEqual(self.sasl.getError(), '') |
| self.assertEqual(self.sasl.decode('msg'), (False, None)) |
| self.assertEqual(self.sasl.getError(), '') |
| |
| |
| class AnonymousMechanismTest(_BaseMechanismTests): |
| """Test case for the Anonymous SASL mechanism.""" |
| |
| mechanism = 'ANONYMOUS' |
| |
| |
| class PlainTextMechanismTest(_BaseMechanismTests): |
| """Test case for the PlainText SASL mechanism.""" |
| |
| mechanism = 'PLAIN' |
| username = 'user' |
| password = 'pass' |
| sasl_kwargs = {'username': username, 'password': password} |
| |
| def test_step(self): |
| for challenge in (None, '', b'asdf', u"\U0001F44D"): |
| success, response = self.sasl.step(challenge) |
| self.assertTrue(success) |
| self.assertEqual(response, six.b(f'\x00{self.username}\x00{self.password}')) |
| self.assertIsInstance(response, six.binary_type) |
| |
| def test_step_with_authorization_id_or_identity(self): |
| challenge = u"\U0001F44D" |
| identity = 'user2' |
| |
| # Test that we can pass an identity |
| sasl_kwargs = self.sasl_kwargs.copy() |
| sasl_kwargs.update({'identity': identity}) |
| sasl = PureSASLClient('localhost', mechanism=self.mechanism, **sasl_kwargs) |
| success, response = sasl.step(challenge) |
| self.assertTrue(success) |
| self.assertEqual(response, six.b(f'{identity}\x00{self.username}\x00{self.password}')) |
| self.assertIsInstance(response, six.binary_type) |
| self.assertTrue(sasl.complete) |
| |
| # Test that the sasl authorization_id has priority over identity |
| auth_id = 'user3' |
| sasl_kwargs.update({'authorization_id': auth_id}) |
| sasl = PureSASLClient('localhost', mechanism=self.mechanism, **sasl_kwargs) |
| success, response = sasl.step(challenge) |
| self.assertTrue(success) |
| self.assertEqual(response, six.b(f'{auth_id}\x00{self.username}\x00{self.password}')) |
| self.assertIsInstance(response, six.binary_type) |
| self.assertTrue(sasl.complete) |
| |
| def test_decode_encode(self): |
| msg = 'msg' |
| self.assertEqual(self.sasl.decode(msg), (True, msg)) |
| self.assertEqual(self.sasl.encode(msg), (True, msg)) |
| |
| |
| class ExternalMechanismTest(_BaseMechanismTests): |
| """Test case for the External SASL mechanisms""" |
| |
| mechanism = 'EXTERNAL' |
| |
| def test_step(self): |
| self.assertEqual(self.sasl.step(), (True, b'')) |
| |
| def test_decode_encode(self): |
| msg = 'msg' |
| self.assertEqual(self.sasl.decode(msg), (True, msg)) |
| self.assertEqual(self.sasl.encode(msg), (True, msg)) |
| |
| |
| @patch('puresasl.mechanisms.kerberos.authGSSClientStep') |
| @patch('puresasl.mechanisms.kerberos.authGSSClientResponse', return_value=base64.b64encode(six.b('some\x00 response'))) |
| class GSSAPIMechanismTest(_BaseMechanismTests): |
| """Test case for the GSSAPI SASL mechanism.""" |
| |
| mechanism = 'GSSAPI' |
| service = 'GSSAPI' |
| sasl_kwargs = {'service': service} |
| |
| @patch('puresasl.mechanisms.kerberos.authGSSClientWrap') |
| @patch('puresasl.mechanisms.kerberos.authGSSClientUnwrap') |
| def test_decode_encode(self, _inner1, _inner2, authGSSClientResponse, *args): |
| # bypassing step setup by setting qop directly |
| self.mechanism_class.qop = QOP.AUTH |
| msg = b'msg' |
| self.assertEqual(self.sasl.decode(msg), (True, msg)) |
| self.assertEqual(self.sasl.encode(msg), (True, msg)) |
| |
| # Test for behavior with different QOP like data integrity and confidentiality for Kerberos authentication |
| for qop in (QOP.AUTH_INT, QOP.AUTH_CONF): |
| self.mechanism_class.qop = qop |
| with patch('puresasl.mechanisms.kerberos.authGSSClientResponseConf', return_value=1): |
| self.assertEqual(self.sasl.decode(msg), (True, base64.b64decode(authGSSClientResponse.return_value))) |
| self.assertEqual(self.sasl.encode(msg), (True, base64.b64decode(authGSSClientResponse.return_value))) |
| if qop == QOP.AUTH_CONF: |
| with patch('puresasl.mechanisms.kerberos.authGSSClientResponseConf', return_value=0): |
| self.assertEqual(self.sasl.encode(msg), (False, None)) |
| self.assertEqual(self.sasl.getError(), 'Error: confidentiality requested, but not honored by the server.') |
| |
| def test_step_no_user(self, authGSSClientResponse, *args): |
| msg = six.b('whatever') |
| |
| # no user |
| self.assertEqual(self.sasl.step(msg), (True, base64.b64decode(authGSSClientResponse.return_value))) |
| with patch('puresasl.mechanisms.kerberos.authGSSClientResponse', return_value=''): |
| self.assertEqual(self.sasl.step(msg), (True, six.b(''))) |
| |
| username = 'username' |
| # with user; this has to be last because it sets mechanism.user |
| with patch('puresasl.mechanisms.kerberos.authGSSClientStep', return_value=kerberos.AUTH_GSS_COMPLETE): |
| with patch('puresasl.mechanisms.kerberos.authGSSClientUserName', return_value=six.b(username)): |
| self.assertEqual(self.sasl.step(msg), (True, six.b(''))) |
| self.assertEqual(self.mechanism_class.user, six.b(username)) |
| |
| @patch('puresasl.mechanisms.kerberos.authGSSClientUnwrap') |
| def test_step_qop(self, *args): |
| self.mechanism_class._have_negotiated_details = True |
| self.mechanism_class.user = 'user' |
| msg = six.b('msg') |
| self.assertEqual(self.sasl.step(msg), (False, None)) |
| self.assertEqual(self.sasl.getError(), 'Bad response from server') |
| |
| max_len = 100 |
| self.assertLess(max_len, self.sasl.max_buffer) |
| for i, qop in QOP.bit_map.items(): |
| qop_size = struct.pack('!i', i << 24 | max_len) |
| response = base64.b64encode(qop_size) |
| with patch('puresasl.mechanisms.kerberos.authGSSClientResponse', return_value=response): |
| with patch('puresasl.mechanisms.kerberos.authGSSClientWrap') as authGSSClientWrap: |
| self.mechanism_class.complete = False |
| self.assertEqual(self.sasl.step(msg), (True, qop_size)) |
| self.assertTrue(self.mechanism_class.complete) |
| self.assertEqual(self.mechanism_class.qop, qop) |
| self.assertEqual(self.mechanism_class.max_buffer, max_len) |
| |
| args = authGSSClientWrap.call_args[0] |
| out_data = args[1] |
| out = base64.b64decode(out_data) |
| self.assertEqual(out[:4], qop_size) |
| self.assertEqual(out[4:], six.b(self.mechanism_class.user)) |
| |
| |
| class CramMD5MechanismTest(_BaseMechanismTests): |
| """Test case for the CRAM-MD5 SASL mechanism.""" |
| |
| mechanism = 'CRAM-MD5' |
| username = 'user' |
| password = 'pass' |
| sasl_kwargs = {'username': username, 'password': password} |
| |
| def test_step(self): |
| success, response = self.sasl.step(None) |
| self.assertTrue(success) |
| self.assertIsNone(response) |
| challenge = six.b('msg') |
| hash = hmac.HMAC(key=six.b(self.password), digestmod=hashlib.md5) |
| hash.update(challenge) |
| success, response = self.sasl.step(challenge) |
| self.assertTrue(success) |
| self.assertIn(six.b(self.username), response) |
| self.assertIn(six.b(hash.hexdigest()), response) |
| self.assertIsInstance(response, six.binary_type) |
| self.assertTrue(self.sasl.complete) |
| |
| def test_decode_encode(self): |
| msg = 'msg' |
| self.assertEqual(self.sasl.decode(msg), (True, msg)) |
| self.assertEqual(self.sasl.encode(msg), (True, msg)) |
| |
| |
| class DigestMD5MechanismTest(_BaseMechanismTests): |
| """Test case for the DIGEST-MD5 SASL mechanism.""" |
| |
| mechanism = 'DIGEST-MD5' |
| username = 'user' |
| password = 'pass' |
| sasl_kwargs = {'username': username, 'password': password} |
| |
| def test_decode_encode(self): |
| msg = 'msg' |
| self.assertEqual(self.sasl.decode(msg), (True, msg)) |
| self.assertEqual(self.sasl.encode(msg), (True, msg)) |
| |
| def test_step_basic(self, *args): |
| pass |
| |
| def test_step(self): |
| """Test a SASL step with dummy challenge for DIGEST-MD5 mechanism.""" |
| testChallenge = ( |
| b'nonce="rmD6R8aMYVWH+/ih9HGBr3xNGAR6o2DUxpKlgDz6gUQ=",r' |
| b'ealm="example.org",qop="auth,auth-int,auth-conf",cipher="rc4-40,rc' |
| b'4-56,rc4,des,3des",maxbuf=65536,charset=utf-8,algorithm=md5-sess' |
| ) |
| result, response = self.sasl.step(testChallenge) |
| self.assertTrue(result) |
| self.assertIsNotNone(response) |
| |
| def test_step_server_answer(self): |
| """Test a SASL step with a proper server answer for DIGEST-MD5 mechanism.""" |
| sasl_kwargs = {'username': "chris", 'password': "secret"} |
| sasl = PureSASLClient('elwood.innosoft.com', |
| service="imap", |
| mechanism=self.mechanism, |
| mutual_auth=True, |
| **sasl_kwargs) |
| testChallenge = ( |
| b'utf-8,username="chris",realm="elwood.innosoft.com",' |
| b'nonce="OA6MG9tEQGm2hh",nc=00000001,cnonce="OA6MHXh6VqTrRk",' |
| b'digest-uri="imap/elwood.innosoft.com",' |
| b'response=d388dad90d4bbd760a152321f2143af7,qop=auth' |
| ) |
| sasl.step(testChallenge) |
| sasl._chosen_mech.cnonce = b"OA6MHXh6VqTrRk" |
| |
| serverResponse = ( |
| b'rspauth=ea40f60335c427b5527b84dbabcdfffd' |
| ) |
| sasl.step(serverResponse) |
| # assert that step choses the only supported QOP for for DIGEST-MD5 |
| self.assertEqual(self.sasl.qop, QOP.AUTH) |