#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function


import unittest2 as unittest
import os
from subprocess import PIPE, Popen
from system_test import TestCase, Qdrouterd, main_module, SkipIfNeeded
from proton import SASL
from proton.handlers import MessagingHandler
from proton.reactor import Container


class AuthServicePluginTest(TestCase):
    @classmethod
    def createSaslFiles(cls):
        # Create a sasl database.
        p = Popen(['saslpasswd2', '-c', '-p', '-f', 'qdrouterd.sasldb', '-u', 'domain.com', 'test'],
                  stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
        result = p.communicate('password')
        assert p.returncode == 0, \
            "saslpasswd2 exit status %s, output:\n%s" % (p.returncode, result)

        # Create a SASL configuration file.
        with open('tests-mech-PLAIN.conf', 'w') as sasl_conf:
            sasl_conf.write("""
pwcheck_method: auxprop
auxprop_plugin: sasldb
sasldb_path: qdrouterd.sasldb
mech_list: SCRAM-SHA1 PLAIN
# The following line stops spurious 'sql_select option missing' errors when cyrus-sql-sasl plugin is installed
sql_select: dummy select
""")

    @classmethod
    def setUpClass(cls):
        """
        Tests the delegation of sasl auth to an external auth service.

        Creates two routers, one acts as the authe service, the other configures the auth service plugin
        to point at this auth service.

        """
        super(AuthServicePluginTest, cls).setUpClass()

        if not SASL.extended():
            return

        cls.createSaslFiles()

        print('launching auth service...')
        auth_service_port = cls.tester.get_port()
        cls.tester.qdrouterd('auth_service', Qdrouterd.Config([
                     ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': auth_service_port,
                                   'saslMechanisms':'PLAIN', 'authenticatePeer': 'yes'}),
                     ('router', {'workerThreads': 1,
                                 'id': 'auth_service',
                                 'mode': 'standalone',
                                 'saslConfigName': 'tests-mech-PLAIN',
                                 'saslConfigPath': os.getcwd()})
        ])).wait_ready()

        cls.router_port = cls.tester.get_port()
        cls.tester.qdrouterd('router', Qdrouterd.Config([
                     ('authServicePlugin', {'name':'myauth', 'host': '127.0.0.1', 'port': auth_service_port}),
                     ('listener', {'host': '0.0.0.0', 'port': cls.router_port, 'role': 'normal', 'saslPlugin':'myauth', 'saslMechanisms':'PLAIN'}),
                     ('router', {'mode': 'standalone', 'id': 'router'})
        ])).wait_ready()

    @SkipIfNeeded(not SASL.extended(), "Cyrus library not available. skipping test")
    def test_valid_credentials(self):
        """
        Check authentication succeeds when valid credentials are presented.

        """
        test = SimpleConnect("127.0.0.1:%d" % self.router_port, 'test@domain.com', 'password')
        test.run()
        self.assertEqual(True, test.connected)
        self.assertEqual(None, test.error)

    @SkipIfNeeded(not SASL.extended(), "Cyrus library not available. skipping test")
    def test_invalid_credentials(self):
        """
        Check authentication fails when invalid credentials are presented.

        """
        test = SimpleConnect("127.0.0.1:%d" % self.router_port, 'test@domain.com', 'foo')
        test.run()
        self.assertEqual(False, test.connected)
        self.assertEqual('amqp:unauthorized-access', test.error.name)
        self.assertEqual(test.error.description.startswith('Authentication failed'), True)


class AuthServicePluginDeprecatedTest(AuthServicePluginTest):
    @classmethod
    def setUpClass(cls):
        """
        Tests the delegation of sasl auth to an external auth service.

        Creates two routers, one acts as the authe service, the other configures the auth service plugin
        to point at this auth service.

        """
        super(AuthServicePluginTest, cls).setUpClass()

        if not SASL.extended():
            return

        cls.createSaslFiles()

        print('launching auth service...')
        auth_service_port = cls.tester.get_port()
        cls.tester.qdrouterd('auth_service', Qdrouterd.Config([
                     ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': auth_service_port,
                                   'saslMechanisms':'PLAIN', 'authenticatePeer': 'yes'}),
                     ('router', {'workerThreads': 1,
                                 'id': 'auth_service',
                                 'mode': 'standalone',
                                 'saslConfigName': 'tests-mech-PLAIN',
                                 'saslConfigPath': os.getcwd()})
        ])).wait_ready()

        cls.router_port = cls.tester.get_port()
        cls.tester.qdrouterd('router', Qdrouterd.Config([
                     ('authServicePlugin', {'name':'myauth', 'authService': '127.0.0.1:%d' % auth_service_port}),
                     ('listener', {'host': '0.0.0.0', 'port': cls.router_port, 'role': 'normal',
                                   'saslPlugin':'myauth', 'saslMechanisms':'PLAIN'}),
                     ('router', {'mode': 'standalone', 'id': 'router'})
        ])).wait_ready()


class SimpleConnect(MessagingHandler):
    def __init__(self, url, username, password):
        super(SimpleConnect, self).__init__()
        self.url = url
        self.username = username
        self.password = password
        self.connected = False
        self.error = None

    def on_start(self, event):
        event.container.user = self.username
        event.container.password = self.password
        conn = event.container.connect(self.url)

    def on_connection_opened(self, event):
        self.connected = True
        event.connection.close()

    def on_transport_error(self, event):
        self.error = event.transport.condition
        if event.connection:
            event.connection.close()
        else:
            print("ERROR: %s" % self.error)

    def run(self):
        Container(self).run()


if __name__ == '__main__':
    unittest.main(main_module())

