#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function

import os
import threading
import sys
import ssl

try:
    from urllib2 import urlopen, build_opener, HTTPSHandler
    from urllib2 import HTTPError, URLError
    import urllib2 as urllib
except ImportError:
    # python3
    from urllib.request import urlopen, build_opener, HTTPSHandler
    from urllib.error import HTTPError, URLError
    import urllib

from system_test import TIMEOUT, Process, QdManager, retry
from subprocess import PIPE, STDOUT
from system_test import TestCase, Qdrouterd, main_module, DIR
from system_test import unittest


class RouterTestHttp(TestCase):

    @staticmethod
    def ssl_file(name):
        return os.path.join(DIR, 'ssl_certs', name)

    @classmethod
    def setUpClass(cls):
        super(RouterTestHttp, cls).setUpClass()
        # Http listener delete tests will run only if LWS version == 2.4.2
        # OR (if LWS version > 3.0 and LWS version < 4.0)
        cls.skip_delete_http_listener_test = ${SKIP_DELETE_HTTP_LISTENER}

    @classmethod
    def get(cls, url, use_ca=True):
        if sys.version_info >= (2, 7):
            if use_ca:
                http_data = urlopen(url, cafile=cls.ssl_file('ca-certificate.pem'))
            else:
                http_data = urlopen(url)
        else:
            req = urllib.Request(url=url)
            http_data = urlopen(req)
        return http_data.read().decode('utf-8')

    @classmethod
    def get_cert(cls, url):
        context = ssl.create_default_context()
        context.load_cert_chain(cls.ssl_file('client-certificate.pem'),
                                cls.ssl_file('client-private-key.pem'),
                                'client-password')
        context.load_verify_locations(cls.ssl_file('ca-certificate.pem'))
        opener = build_opener(HTTPSHandler(context=context))
        return opener.open(url).read().decode('utf-8')

    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
        p = self.popen(
            ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
            universal_newlines=True)
        out = p.communicate(input)[0]
        try:
            p.teardown()
        except Exception as e:
            raise Exception(out if out else str(e))
        return out

    def assert_get(self, url):
        self.assertEqual(u'HTTP test\n', self.get("%s/system_tests_http.txt" % url))

    def assert_get_cert(self, url):
        self.assertEqual(u'HTTP test\n', self.get_cert("%s/system_tests_http.txt" % url))

    def test_listen_error(self):
        """Make sure a router exits if an initial HTTP listener fails, doesn't hang"""
        listen_port = self.get_port()
        config = Qdrouterd.Config([
            ('router', {'mode': 'standalone', 'id': 'bad'}),
            ('listener', {'port': listen_port, 'maxFrameSize': '2048', 'stripAnnotations': 'no'}),
            ('listener', {'port': listen_port, 'http':True})])
        r = Qdrouterd(name="expect_fail", config=config, wait=False)
        self.assertEqual(1, r.wait())

    def is_get_request_failing(self, url, use_ca=True, use_get_cert=False):
        try:
            if use_get_cert:
                self.get_cert(url)
            else:
                self.get(url, use_ca=use_ca)
            return False
        except Exception as e:
            if "[Errno 111] Connection refused" in str(e):
                return True
            if "[Errno 104] Connection reset by peer" in str(e):
                return True
            raise e

    def test_http_listener_delete(self):
        name = 'delete_listener'
        name_1 = 'delete_listener_1'
        normal_listen_port = self.get_port()
        #
        # Open listeners on two HTTP enabled ports. Delete one of the
        # HTTP listeners and make sure that it is really gone and also
        # make sure that the other HTTP listener is still working.
        #
        http_delete_listen_port_1 = self.get_port()
        http_delete_listen_port_2 = self.get_port()
        config = Qdrouterd.Config([
            ('router', {'mode': 'standalone', 'id': 'A'}),
            ('listener', {'port': normal_listen_port}),
            ('listener', {'httpRootDir': os.path.dirname(__file__), 'name': name, 'port': http_delete_listen_port_1, 'http': True}),
            ('listener', {'httpRootDir': os.path.dirname(__file__), 'name': name_1, 'port': http_delete_listen_port_2, 'http': True})])
        router = self.qdrouterd(name="expect_fail_1", config=config, wait=True)

        def address():
            return router.addresses[0]

        # Perform a GET request on the http_delete_listen_port_1 just to make
        # sure that it is up and running.
        url_1 = "%s/system_tests_http.txt" % "http://localhost:%d" % http_delete_listen_port_1
        out = self.get(url_1, use_ca=False)

        # Perform a GET request on the http_delete_listen_port_2 just to make
        # sure that it is up and running.
        url_2 = "%s/system_tests_http.txt" % "http://localhost:%d" % http_delete_listen_port_2
        out = self.get(url_2, use_ca=False)

        # Now both http_delete_listen_port_1 and http_delete_listen_port_2
        # are working.

        # Delete the listener on port http_delete_listen_port_1
        long_type = 'org.apache.qpid.dispatch.listener'
        mgmt = QdManager(self, address=address())

        if self.skip_delete_http_listener_test:
            # You are not allowed to delete a http:yes listener
            # Try deleting it and make sure you get an exception.
            try:
                mgmt.delete(long_type, name=name)
            except Exception as e:
                if "BadRequestStatus: HTTP listeners cannot be deleted" in str(e):
                    exception_raised = True
            self.assertTrue(exception_raised)
        else:
            mgmt.delete(long_type, name=name)

            # Once again try to perform a GET request. Now since the listener
            # is gone, the GET will fail.
            ret_val = retry(lambda: self.is_get_request_failing(url_1, use_ca=False), timeout=10, delay=2)
            self.assertTrue(ret_val)

            # HTTP listener on port http_delete_listen_port_1 has been
            # deleted successfully. Make sure that the listener on port
            # http_delete_listen_port_2 is still working.
            out = self.get(url_2, use_ca=False)

    def test_http_get(self):
        config = Qdrouterd.Config([
            ('router', {'id': 'QDR.HTTP'}),
            # httpRoot has been deprecated. We are using it here to test backward compatibility.
            ('listener', {'port': self.get_port(), 'httpRoot': os.path.dirname(__file__)}),
            ('listener', {'port': self.get_port(), 'httpRootDir': os.path.dirname(__file__)}),
        ])
        r = self.qdrouterd('http-test-router', config)

        def test(port):
            self.assert_get("http://localhost:%d" % port)
            self.assertRaises(HTTPError, urlopen, "http://localhost:%d/nosuch" % port)

        # Sequential calls on multiple ports
        for port in r.ports: test(port)

        # Concurrent calls on multiple ports
        class TestThread(threading.Thread):
            def __init__(self, port):
                threading.Thread.__init__(self)
                self.port, self.ex = port, None
                self.start()
            def run(self):
                try: test(self.port)
                except Exception as e: self.ex = e
        threads = [TestThread(p) for p in r.ports + r.ports]
        for t in threads: t.join()
        for t in threads:
            if t.ex: raise t.ex

        # https not configured
        self.assertRaises(URLError, urlopen, "https://localhost:%d/nosuch" % r.ports[0])

    def test_http_metrics(self):

        if not sys.version_info >= (2, 7):
            return

        config = Qdrouterd.Config([
            ('router', {'id': 'QDR.METRICS'}),
            ('listener', {'port': self.get_port(), 'http': 'yes'}),
            ('listener', {'port': self.get_port(), 'httpRootDir': os.path.dirname(__file__)}),
        ])
        r = self.qdrouterd('metrics-test-router', config)

        def test(port):
            result = urlopen("http://localhost:%d/metrics" % port, cafile=self.ssl_file('ca-certificate.pem'))
            self.assertEqual(200, result.getcode())
            data = result.read().decode('utf-8')
            assert('connections' in data)
            assert('deliveries_ingress' in data)
            assert('deliveries_delayed_1sec' in data)
            assert('deliveries_delayed_10sec' in data)
            assert('deliveries_redirected_to_fallback' in data)

        # Sequential calls on multiple ports
        for port in r.ports: test(port)

        # Concurrent calls on multiple ports
        class TestThread(threading.Thread):
            def __init__(self, port):
                threading.Thread.__init__(self)
                self.port, self.ex = port, None
                self.start()
            def run(self):
                try: test(self.port)
                except Exception as e: self.ex = e
        threads = [TestThread(p) for p in r.ports + r.ports]
        for t in threads: t.join()
        for t in threads:
            if t.ex: raise t.ex

    def test_http_healthz(self):

        if not sys.version_info >= (2, 7):
            return

        config = Qdrouterd.Config([
            ('router', {'id': 'QDR.HEALTHZ'}),
            ('listener', {'port': self.get_port(), 'http': 'yes'}),
            ('listener', {'port': self.get_port(), 'httpRootDir': os.path.dirname(__file__)}),
        ])
        r = self.qdrouterd('metrics-test-router', config)

        def test(port):
            result = urlopen("http://localhost:%d/healthz" % port, cafile=self.ssl_file('ca-certificate.pem'))
            self.assertEqual(200, result.getcode())

        # Sequential calls on multiple ports
        for port in r.ports: test(port)

        # Concurrent calls on multiple ports
        class TestThread(threading.Thread):
            def __init__(self, port):
                threading.Thread.__init__(self)
                self.port, self.ex = port, None
                self.start()
            def run(self):
                try: test(self.port)
                except Exception as e: self.ex = e
        threads = [TestThread(p) for p in r.ports + r.ports]
        for t in threads: t.join()
        for t in threads:
            if t.ex: raise t.ex

    def test_https_get(self):
        def http_listener(**kwargs):
            args = dict(kwargs)
            args.update({'port': self.get_port(), 'http': 'yes', 'httpRootDir': os.path.dirname(__file__)})
            return ('listener', args)

        def listener(**kwargs):
            args = dict(kwargs)
            args.update({'port': self.get_port()})
            return ('listener', args)

        name = 'delete-me'
        config = Qdrouterd.Config([
            ('router', {'id': 'QDR.HTTPS'}),
            ('sslProfile', {'name': 'simple-ssl',
                            'caCertFile': self.ssl_file('ca-certificate.pem'),
                            'certFile': self.ssl_file('server-certificate.pem'),
                            'privateKeyFile': self.ssl_file('server-private-key.pem'),
                            'ciphers': 'ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:RSA+AESGCM:RSA+AES:!aNULL:!MD5:!DSS',
                            'password': 'server-password'
            }),
            http_listener(sslProfile='simple-ssl', requireSsl=False, authenticatePeer=False),
            http_listener(sslProfile='simple-ssl', requireSsl=True, authenticatePeer=False),
            http_listener(sslProfile='simple-ssl', requireSsl=True, authenticatePeer=True),
            http_listener(name=name, sslProfile='simple-ssl', requireSsl=True, authenticatePeer=True),
            listener(name='mgmt_listener', authenticatePeer=False)])
        # saslMechanisms='EXTERNAL'

        r = self.qdrouterd('https-test-router', config)
        r.wait_ready()

        def address():
            return r.addresses[4]

        self.assert_get("https://localhost:%s" % r.ports[0])
        # requireSsl=false Allows simple-ssl HTTP

        # The following test will be commented out if you are running a version lower than
        # libwebsockets 3.2.0
        ${TEST_OPTION_ALLOW_HTTP_ON_HTTPS_LISTENER}self.assert_get("http://localhost:%s" % r.ports[0])

        self.assert_get("https://localhost:%s" % r.ports[1])
        # requireSsl=True does not allow simple-ssl HTTP
        self.assertRaises(Exception, self.assert_get, "http://localhost:%s" % r.ports[1])

        # authenticatePeer=True requires a client cert
        self.assertRaises((URLError, ssl.SSLError), self.assert_get, "https://localhost:%s" % r.ports[2])

        # could not provide key password on python 2.6
        if sys.version_info >= (2, 7):
            # Provide client cert
            self.assert_get_cert("https://localhost:%d" % r.ports[2])

            # Try a get on the HTTP listener we are going to delete
            self.assert_get_cert("https://localhost:%d" % r.ports[3])

            if not self.skip_delete_http_listener_test:
                # Delete the listener with name 'delete-me'
                long_type = 'org.apache.qpid.dispatch.listener'
                mgmt = QdManager(self, address=address())
                mgmt.delete(long_type, name=name)

                # Make sure that the listener got deleted.
                ret_val = retry(lambda: self.is_get_request_failing("https://localhost:%s/system_tests_http.txt" % r.ports[3], use_get_cert=True), timeout=10, delay=2)
                self.assertTrue(ret_val)

                # Make sure other ports are working normally after the above delete.
                self.assert_get_cert("https://localhost:%d" % r.ports[2])




if __name__ == '__main__':
    unittest.main(main_module())
