#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function

from proton import Message, Timeout
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT
from system_test import unittest
from proton.handlers import MessagingHandler
from proton.reactor import Container, DynamicNodeProperties
from qpid_dispatch_internal.compat import UNICODE
from qpid_dispatch.management.client import Node


class RouterTest(TestCase):

    inter_router_port = None

    @classmethod
    def setUpClass(cls):
        """Start a router"""
        super(RouterTest, cls).setUpClass()

        def router(name, connection, args=[]):

            config = [
                ('router', {'mode': 'interior', 'id': name}),
                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'multiTenant': 'yes'}),
                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}),
                ('address', {'prefix': 'closest', 'distribution': 'closest'}),
                ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
                ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
                ('address', {'prefix': '0.0.0.0/queue', 'waypoint': 'yes'}),
                connection
            ]

            config = Qdrouterd.Config(config)

            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True, cl_args=args))

        cls.routers = []
        inter_router_port = cls.tester.get_port()

        router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port}), ["-T"])


    def test_01_denied_link(self):
        test = DenyLinkTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/deny")
        test.run()
        self.assertEqual(None, test.error)

    def test_02_discard_deliveries(self):
        test = DiscardTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/discard")
        test.run()
        self.assertEqual(None, test.error)

    def test_03_presettled_source(self):
        test = SourceTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/source_ps", 300, 300)
        test.run()
        self.assertEqual(None, test.error)

    def test_04_unsettled_source(self):
        test = SourceTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/source", 300, 0)
        test.run()
        self.assertEqual(None, test.error)

    def test_05_echo_attach_detach(self):
        test = EchoTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/echo")
        test.run()
        self.assertEqual(None, test.error)


class Timeout(object):
    def __init__(self, parent):
        self.parent = parent

    def on_timer_task(self, event):
        self.parent.timeout()


class DenyLinkTest(MessagingHandler):
    def __init__(self, host, address):
        super(DenyLinkTest, self).__init__(prefetch = 0)
        self.host      = host
        self.address   = address

        self.conn     = None
        self.error    = None
        self.receiver = None
        self.sender   = None
        self.receiver_failed = False
        self.sender_failed   = False

    def timeout(self):
        self.error = "Timeout Expired: receiver_failed=%s sender_failed=%s" %\
                     ("yes" if self.receiver_failed else "no",
                      "yes" if self.sender_failed else "no")
        self.conn.close()

    def on_start(self, event):
        self.timer    = event.reactor.schedule(5.0, Timeout(self))
        self.conn     = event.container.connect(self.host)
        self.receiver = event.container.create_receiver(self.conn, self.address)
        self.sender   = event.container.create_sender(self.conn, self.address)

    def on_link_error(self, event):
        if event.receiver == self.receiver:
            self.receiver_failed = True
        if event.sender == self.sender:
            self.sender_failed = True

        if self.receiver_failed and self.sender_failed:
            self.conn.close()
            self.timer.cancel()

    def run(self):
        Container(self).run()


class DiscardTest(MessagingHandler):
    def __init__(self, host, address):
        super(DiscardTest, self).__init__(prefetch = 0)
        self.host      = host
        self.address   = address

        self.conn     = None
        self.error    = None
        self.sender   = None

        self.count    = 300
        self.sent     = 0
        self.rejected = 0

    def timeout(self):
        self.error = "Timeout Expired: n_sent=%d n_rejected=%d" % (self.sent, self.rejected)
        self.conn.close()

    def on_start(self, event):
        self.timer  = event.reactor.schedule(5.0, Timeout(self))
        self.conn   = event.container.connect(self.host)
        self.sender = event.container.create_sender(self.conn, self.address)

    def on_sendable(self, event):
        while self.sender.credit > 0 and self.sent < self.count:
            msg = Message(body="Discard Test")
            self.sender.send(msg)
            self.sent += 1

    def on_rejected(self, event):
        self.rejected += 1
        self.conn.close()
        self.timer.cancel()

    def on_link_error(self, event):
        if event.receiver == self.receiver:
            self.receiver_failed = True
        if event.sender == self.sender:
            self.sender_failed = True

        if self.receiver_failed and self.sender_failed:
            self.conn.close()
            self.timer.cancel()

    def run(self):
        Container(self).run()


class SourceTest(MessagingHandler):
    def __init__(self, host, address, count, expected_ps):
        super(SourceTest, self).__init__(prefetch = 0)
        self.host        = host
        self.address     = address
        self.expected_ps = expected_ps

        self.conn     = None
        self.error    = None
        self.receiver = None

        self.count          = count
        self.n_credit_given = 0
        self.n_rcvd         = 0
        self.n_rcvd_ps      = 0

    def timeout(self):
        self.error = "Timeout Expired: n_rcvd=%d" % (self.n_rcvd)
        self.conn.close()

    def on_start(self, event):
        self.timer    = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn     = event.container.connect(self.host)
        self.receiver = event.container.create_receiver(self.conn, self.address)
        self.receiver.flow(3)
        self.n_credit_given = 3

    def on_message(self, event):
        dlv = event.delivery
        if dlv.settled:
            self.n_rcvd_ps += 1
        self.n_rcvd += 1
        if self.n_rcvd == self.count:
            self.conn.close()
            self.timer.cancel()
            if self.n_rcvd_ps != self.expected_ps:
                self.error = "Received %d deliveries, %d were settled (expected %d)" %\
                             (self.n_rcvd, self.n_rcvd_ps, self.expected_ps)

        elif self.n_rcvd == self.n_credit_given:
            self.receiver.flow(5)
            self.n_credit_given += 5

    def run(self):
        Container(self).run()


class EchoTest(MessagingHandler):
    def __init__(self, host, address):
        super(EchoTest, self).__init__(prefetch = 0)
        self.host      = host
        self.address   = address

        self.conn     = None
        self.error    = None
        self.action   = "Connecting to router"
        self.receiver = None
        self.sender   = None

    def timeout(self):
        self.error = "Timeout Expired while attempting action: %s" % self.action
        self.conn.close()

    def fail(self, error):
        self.error = error
        self.conn.close()
        self.timer.cancel()

    def on_start(self, event):
        self.timer    = event.reactor.schedule(5.0, Timeout(self))
        self.conn     = event.container.connect(self.host)
        self.receiver = event.container.create_receiver(self.conn, self.address)

    def on_link_opening(self, event):
        if event.sender:
            self.action = "Attaching incoming echoed link"
            self.sender = event.sender
            if event.sender.remote_source.address == self.address:
                event.sender.source.address = self.address
                event.sender.open()
            else:
                self.fail("Incorrect address on incoming sender: got %s, expected %s" %
                          (event.sender.remote_source.address, self.address))

    def on_link_opened(self, event):
        if event.receiver == self.receiver:
            self.action = "Closing the echoed link"
            self.receiver.close()

    def on_link_closed(self, event):
        if event.receiver == self.receiver:
            self.conn.close()
            self.timer.cancel()

    def run(self):
        Container(self).run()


if __name__ == '__main__':
    unittest.main(main_module())
