#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function

import time

from system_test import TestCase, Qdrouterd, TIMEOUT
from system_test import AsyncTestReceiver
from system_test import unittest
from test_broker import FakeBroker

from proton import Disposition
from proton import Message
from proton.utils import BlockingConnection
from proton.utils import SyncRequestResponse
from proton.utils import SendException
from proton.utils import LinkDetached


class LinkRouteLookupTest(TestCase):
    """
    Tests link route address lookup
    """
    # hardcoded values from the router's C code
    QD_TERMINUS_ADDRESS_LOOKUP = '_$qd.addr_lookup'
    PROTOCOL_VERSION = 1
    OPCODE_LINK_ROUTE_LOOKUP = 1
    QCM_ADDR_LOOKUP_OK = 0
    QCM_ADDR_LOOKUP_NOT_FOUND = 3

    def _check_response(self, message):
        self.assertTrue(isinstance(message.properties, dict))
        self.assertEqual(self.PROTOCOL_VERSION, message.properties.get('version'))
        self.assertTrue(message.properties.get('opcode') is not None)
        self.assertTrue(isinstance(message.body, list))
        self.assertEqual(2, len(message.body))
        return (message.properties.get('status'),
                message.body[0],  # is link_route?
                message.body[1])  # has destinations?

    @classmethod
    def setUpClass(cls):
        """Start a router"""
        super(LinkRouteLookupTest, cls).setUpClass()

        def router(name, mode, extra=None):
            config = [
                ('router', {'mode': mode, 'id': name}),
                ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'})
            ]

            if extra:
                config.extend(extra)
            config = Qdrouterd.Config(config)
            cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))

        cls.routers = []

        inter_router_port = cls.tester.get_port()
        edge_port_A       = cls.tester.get_port()
        broker_port_A     = cls.tester.get_port()
        broker_port_B     = cls.tester.get_port()

        router('INT.A', 'interior',
               [
                   ('listener', {'role': 'edge',
                                 'port': cls.tester.get_port()}),
                   ('listener', {'role': 'inter-router',
                                 'port': inter_router_port}),
                   ('connector', {'role': 'route-container',
                                  'port': cls.tester.get_port()}),

                   ('linkRoute', {'pattern': 'org.apache.A.#',
                                  'containerId': 'FakeBrokerA',
                                  'direction': 'in'}),
                   ('linkRoute', {'pattern': 'org.apache.A.#',
                                  'containerId': 'FakeBrokerA',
                                  'direction': 'out'})
               ])
        cls.INT_A = cls.routers[-1]
        cls.INT_A.listener = cls.INT_A.addresses[0]
        cls.INT_A.edge_listener = cls.INT_A.addresses[1]
        cls.INT_A.inter_router_listener = cls.INT_A.addresses[2]
        cls.INT_A.broker_connector = cls.INT_A.connector_addresses[0]

        router('INT.B', 'interior',
               [
                   ('listener', {'role': 'edge',
                                 'port': cls.tester.get_port()}),
                   ('connector', {'role': 'inter-router',
                                  'name': 'connectorToA',
                                  'port': inter_router_port}),
                   ('connector', {'role': 'route-container',
                                  'port': cls.tester.get_port()}),

                   ('linkRoute', {'pattern': 'org.apache.B.#',
                                  'containerId': 'FakeBrokerB',
                                  'direction': 'in'}),
                   ('linkRoute', {'pattern': 'org.apache.B.#',
                                  'containerId': 'FakeBrokerB',
                                  'direction': 'out'})
                ])
        cls.INT_B = cls.routers[-1]
        cls.INT_B.edge_listener = cls.INT_B.addresses[1]
        cls.INT_B.broker_connector = cls.INT_B.connector_addresses[1]

        cls.INT_A.wait_router_connected('INT.B')
        cls.INT_B.wait_router_connected('INT.A')

    def _lookup_request(self, lr_address, direction):
        """
        Construct a link route lookup request message
        """
        return Message(body=[lr_address,
                             direction],
                       properties={"version": self.PROTOCOL_VERSION,
                                   "opcode": self.OPCODE_LINK_ROUTE_LOOKUP})

    def test_link_route_lookup_ok(self):
        """
        verify a link route address can be looked up successfully for both
        locally attached and remotely attached containers
        """

        # fire up a fake broker attached to the router local to the edge router
        # wait until both in and out addresses are ready
        fb = FakeBroker(self.INT_A.broker_connector, container_id='FakeBrokerA')
        self.INT_A.wait_address("org.apache.A", containers=1, count=2)

        # create a fake edge and lookup the target address
        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)

        for direction in [True, False]:
            # True = direction inbound (receiver) False = direction outbound (sender)
            rsp = self._check_response(srr.call(self._lookup_request("org.apache.A.foo", direction)))
            self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
            self.assertTrue(rsp[1])
            self.assertTrue(rsp[2])

        # shutdown fake router
        fb.join()

        # now fire up a fake broker attached to the remote router
        fb = FakeBroker(self.INT_B.broker_connector, container_id='FakeBrokerB')
        self.INT_A.wait_address("org.apache.B", remotes=1, count=2)

        for direction in [True, False]:
            rsp = self._check_response(srr.call(self._lookup_request("org.apache.B.foo", direction)))
            self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
            self.assertTrue(rsp[1])
            self.assertTrue(rsp[2])

        fb.join()
        bc.close()

    def test_link_route_lookup_not_found(self):
        """
        verify correct handling of lookup misses
        """
        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)

        rsp = self._check_response(srr.call(self._lookup_request("not.found.address", True)))
        self.assertEqual(self.QCM_ADDR_LOOKUP_NOT_FOUND, rsp[0])

    def test_link_route_lookup_not_link_route(self):
        """
        verify correct handling of matches to mobile addresses
        """
        addr = "not.a.linkroute"
        client = AsyncTestReceiver(self.INT_A.listener, addr)
        self.INT_A.wait_address(addr)

        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)

        rsp = self._check_response(srr.call(self._lookup_request(addr, True)))
        # self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
        self.assertEqual(False, rsp[1])
        bc.close()
        client.stop()

    def test_link_route_lookup_no_dest(self):
        """
        verify correct handling of matches to mobile addresses
        """
        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
        rsp = self._check_response(srr.call(self._lookup_request("org.apache.A.nope", True)))
        self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
        self.assertEqual(True, rsp[1])
        self.assertEqual(False, rsp[2])
        bc.close()

    def _invalid_request_test(self, msg, disposition=Disposition.REJECTED):
        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
        # @TODO(kgiusti) - self.assertRaises does not work here:
        try:
            srr.call(msg)
            self.assertTrue(False)
        except SendException as exc:
            self.assertEqual(disposition, exc.state)
        bc.close()

    def test_link_route_invalid_request(self):
        """
        Test various invalid message content
        """

        # empty message
        self._invalid_request_test(Message())

        # missing properties
        msg = self._lookup_request("ignore", False)
        msg.properties = None
        self._invalid_request_test(msg)

        # missing version
        msg = self._lookup_request("ignore", False)
        del msg.properties['version']
        self._invalid_request_test(msg)

        # invalid version
        msg = self._lookup_request("ignore", False)
        msg.properties['version'] = "blech"
        self._invalid_request_test(msg)

        # unsupported version
        msg = self._lookup_request("ignore", False)
        msg.properties['version'] = 97387187
        self._invalid_request_test(msg)

        # missing opcode
        msg = self._lookup_request("ignore", False)
        del msg.properties['opcode']
        self._invalid_request_test(msg)

        # bad opcode
        msg = self._lookup_request("ignore", False)
        msg.properties['opcode'] = "snarf"
        self._invalid_request_test(msg)

        # bad body
        msg = self._lookup_request("ignore", False)
        msg.body = [71]
        self._invalid_request_test(msg)

    def test_lookup_bad_connection(self):
        """
        Verify that clients connected via non-edge connections fail
        """
        bc = BlockingConnection(self.INT_A.listener, timeout=TIMEOUT)
        self.assertRaises(LinkDetached, SyncRequestResponse, bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
        bc.close()

        bc = BlockingConnection(self.INT_A.inter_router_listener, timeout=TIMEOUT)
        self.assertRaises(LinkDetached, SyncRequestResponse, bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
        bc.close()

        # consuming from the lookup address is forbidden:
        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
        self.assertRaises(LinkDetached, bc.create_receiver, self.QD_TERMINUS_ADDRESS_LOOKUP)
        bc.close()
