#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function

from proton import Message
from system_test import Logger, TestCase, Qdrouterd, main_module, unittest, TIMEOUT, TestTimeout, PollTimeout
from proton.handlers import MessagingHandler
from proton.reactor import Container
from qpid_dispatch_internal.compat import BINARY


class RouterTest(TestCase):

    inter_router_port = None

    @classmethod
    def setUpClass(cls):
        """Start a router"""
        super(RouterTest, cls).setUpClass()

        def router(name, connection):

            config = [
                ('router', {'mode': 'interior', 'id': name, 'allowUnsettledMulticast': 'yes'}),
                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}),
                ('linkRoute', {'prefix': 'link', 'direction': 'in', 'containerId': 'LRC'}),
                ('linkRoute', {'prefix': 'link', 'direction': 'out', 'containerId': 'LRC'}),
                ('address', {'prefix': 'closest', 'distribution': 'closest'}),
                ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
                ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
                connection
            ]

            config = Qdrouterd.Config(config)

            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))

        cls.routers = []

        inter_router_port = cls.tester.get_port()

        router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port}))
        router('B', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port}))

        cls.routers[0].wait_router_connected('B')
        cls.routers[1].wait_router_connected('A')


    def test_01_message_route_truncated_one_router(self):
        test = MessageRouteTruncateTest(self.routers[0].addresses[0],
                                        self.routers[0].addresses[0],
                                        "addr_01")
        test.run()
        self.assertEqual(None, test.error)


    def test_02_message_route_truncated_two_routers(self):
        test = MessageRouteTruncateTest(self.routers[0].addresses[0],
                                        self.routers[1].addresses[0],
                                        "addr_02")
        test.run()
        self.assertEqual(None, test.error)


    def test_03_link_route_truncated_one_router(self):
        test = LinkRouteTruncateTest(self.routers[0].addresses[0],
                                     self.routers[0].addresses[1],
                                     "link.addr_03",
                                     self.routers[0].addresses[0])
        test.run()
        self.assertEqual(None, test.error)


    def test_04_link_route_truncated_two_routers(self):
        test = LinkRouteTruncateTest(self.routers[1].addresses[0],
                                     self.routers[0].addresses[1],
                                     "link.addr_04",
                                     self.routers[1].addresses[0])
        test.run()
        self.assertEqual(None, test.error)


    def test_05_message_route_abort_one_router(self):
        test = MessageRouteAbortTest(self.routers[0].addresses[0],
                                     self.routers[0].addresses[0],
                                     "addr_05")
        test.run()
        if test.error:
            test.logger.dump()
        self.assertEqual(None, test.error)


    def test_06_message_route_abort_two_routers(self):
        test = MessageRouteAbortTest(self.routers[0].addresses[0],
                                     self.routers[1].addresses[0],
                                     "addr_06")
        test.run()
        if test.error:
            test.logger.dump()
        self.assertEqual(None, test.error)


    def test_07_multicast_truncate_one_router(self):
        test = MulticastTruncateTest(self.routers[0].addresses[0],
                                     self.routers[0].addresses[0],
                                     self.routers[0].addresses[0],
                                     "multicast.addr_07")
        test.run()
        self.assertEqual(None, test.error)


class Entity(object):
    def __init__(self, status_code, status_description, attrs):
        self.status_code        = status_code
        self.status_description = status_description
        self.attrs              = attrs

    def __getattr__(self, key):
        return self.attrs[key]


class RouterProxy(object):
    def __init__(self, reply_addr):
        self.reply_addr = reply_addr

    def response(self, msg):
        ap = msg.properties
        return Entity(ap['statusCode'], ap['statusDescription'], msg.body)

    def read_address(self, name):
        ap = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.router.address', 'name': name}
        return Message(properties=ap, reply_to=self.reply_addr)

    def query_addresses(self):
        ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.address'}
        return Message(properties=ap, reply_to=self.reply_addr)


class MessageRouteTruncateTest(MessagingHandler):
    def __init__(self, sender_host, receiver_host, address):
        super(MessageRouteTruncateTest, self).__init__()
        self.sender_host      = sender_host
        self.receiver_host    = receiver_host
        self.address          = address

        self.sender_conn   = None
        self.receiver_conn = None
        self.error         = None
        self.sender1       = None
        self.sender2       = None
        self.sender3       = None
        self.receiver      = None
        self.streaming     = False
        self.delivery      = None
        self.data          = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
        self.long_data     = ""

        self.sent_stream   = 0
        self.program       = ['Send_Short_1', 'Send_Long_Truncated', 'Send_Short_2', 'Send_Short_3']
        self.result        = []
        self.expected_result = ['Send_Short_1', 'Aborted_Delivery', '2', '2', '2', '2', '2',
                                '2', '2', '2', '2', '2', 'Send_Short_2', '3', '3', '3', '3',
                                '3', '3', '3', '3', '3', '3', 'Send_Short_3']

    def timeout(self):
        self.error = "Timeout Expired - Unprocessed Ops: %r, Result: %r" % (self.program, self.result)
        self.sender_conn.close()
        self.receiver_conn.close()

    def on_start(self, event):
        self.timer         = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        self.sender_conn   = event.container.connect(self.sender_host)
        self.receiver_conn = event.container.connect(self.receiver_host)
        self.sender1       = event.container.create_sender(self.sender_conn, self.address, name="S1")
        self.sender2       = event.container.create_sender(self.sender_conn, self.address, name="S2")
        self.sender3       = event.container.create_sender(self.sender_conn, self.address, name="S3")
        self.receiver      = event.container.create_receiver(self.receiver_conn, self.address)

    def stream(self):
        self.sender1.stream(BINARY(self.long_data))
        self.sent_stream += len(self.long_data)
        if self.sent_stream >= 1000000:
            self.streaming = False
            self.sender1.close()
            self.send()

    def send(self):
        next_op = self.program.pop(0) if len(self.program) > 0 else None
        if next_op == 'Send_Short_1':
            m = Message(body="%s" % next_op)
            self.sender1.send(m)
        elif next_op == 'Send_Long_Truncated':
            for i in range(100):
                self.long_data += self.data
            self.delivery  = self.sender1.delivery(self.sender1.delivery_tag())
            self.streaming = True
            self.stream()
        elif next_op == 'Send_Short_2':
            m = Message(body="2")
            for i in range(10):
                self.sender2.send(m)
            m = Message(body="Send_Short_2")
            self.sender2.send(m)
            self.sender2.close()
        elif next_op == 'Send_Short_3':
            m = Message(body="3")
            for i in range(10):
                self.sender3.send(m)
            m = Message(body="%s" % next_op)
            self.sender3.send(m)
            self.sender_conn.close()

    def on_sendable(self, event):
        if event.sender == self.sender1 and self.program[0] == 'Send_Short_1':
            self.send()
        if self.streaming:
            self.stream()

    def on_message(self, event):
        m = event.message
        self.result.append(m.body)
        if m.body == 'Send_Short_1':
            self.send()
        elif m.body == 'Send_Short_2':
            self.send()
        elif m.body == 'Send_Short_3':
            if self.result != self.expected_result:
                self.error = "Expected: %r, Actual: %r" % (self.expected_result, self.result)
            self.receiver_conn.close()
            self.timer.cancel()

    def on_aborted(self, event):
        self.result.append('Aborted_Delivery')
        self.send()

    def run(self):
        Container(self).run()


class LinkRouteTruncateTest(MessagingHandler):
    def __init__(self, sender_host, receiver_host, address, query_host):
        super(LinkRouteTruncateTest, self).__init__()
        self.sender_host      = sender_host
        self.receiver_host    = receiver_host
        self.address          = address
        self.query_host       = query_host

        self.sender_conn   = None
        self.receiver_conn = None
        self.query_conn    = None
        self.error         = None
        self.sender1       = None
        self.receiver      = None
        self.poll_timer    = None
        self.streaming     = False
        self.delivery      = None
        self.data          = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
        self.long_data     = ""

        self.sent_stream   = 0
        self.program       = ['Send_Short_1', 'Send_Long_Truncated']
        self.result        = []
        self.expected_result = ['Send_Short_1', 'Aborted_Delivery']

    def timeout(self):
        self.error = "Timeout Expired - Unprocessed Ops: %r, Result: %r" % (self.program, self.result)
        self.sender_conn.close()
        self.receiver_conn.close()
        self.query_conn.close()
        if self.poll_timer:
            self.poll_timer.cancel()

    def on_start(self, event):
        self.timer          = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        self.sender_conn    = event.container.connect(self.sender_host)
        self.receiver_conn  = event.container.connect(self.receiver_host)
        self.query_conn     = event.container.connect(self.query_host)
        self.reply_receiver = event.container.create_receiver(self.query_conn, dynamic=True)
        self.agent_sender   = event.container.create_sender(self.query_conn, "$management")

    def setup_first_links(self, event):
        self.sender1 = event.container.create_sender(self.sender_conn, self.address, name="S1")

    def stream(self):
        self.sender1.stream(BINARY(self.long_data))
        self.sent_stream += len(self.long_data)
        if self.sent_stream >= 1000000:
            self.streaming = False
            self.sender1.close()

    def send(self):
        next_op = self.program.pop(0) if len(self.program) > 0 else None
        if next_op == 'Send_Short_1':
            m = Message(body="%s" % next_op)
            self.sender1.send(m)
        elif next_op == 'Send_Long_Truncated':
            for i in range(100):
                self.long_data += self.data
            self.delivery  = self.sender1.delivery(self.sender1.delivery_tag())
            self.streaming = True
            self.stream()

    def poll_timeout(self):
        self.poll()

    def poll(self):
        request = self.proxy.read_address('Clink')
        self.agent_sender.send(request)

    def on_sendable(self, event):
        if event.sender == self.sender1 and len(self.program) > 0 and self.program[0] == 'Send_Short_1':
            self.send()
        if event.sender == self.sender1 and self.streaming:
            self.stream()

    def on_link_opening(self, event):
        if event.receiver:
            self.receiver = event.receiver
            event.receiver.target.address = self.address
            event.receiver.open()

    def on_link_opened(self, event):
        if event.receiver == self.reply_receiver:
            self.proxy = RouterProxy(self.reply_receiver.remote_source.address)
            self.poll()

    def on_message(self, event):
        if event.receiver == self.reply_receiver:
            response = self.proxy.response(event.message)
            if response.status_code == 200 and (response.remoteCount + response.containerCount) > 0:
                if self.poll_timer:
                    self.poll_timer.cancel()
                    self.poll_timer = None
                self.setup_first_links(event)
            else:
                self.poll_timer = event.reactor.schedule(0.25, PollTimeout(self))
            return

        m = event.message
        self.result.append(m.body)
        if m.body == 'Send_Short_1':
            self.send()

    def on_aborted(self, event):
        self.result.append('Aborted_Delivery')
        if self.result != self.expected_result:
            self.error = "Expected: %r, Actual: %r" % (self.expected_result, self.result)
        self.sender_conn.close()
        self.receiver_conn.close()
        self.query_conn.close()
        self.timer.cancel()

    def run(self):
        container = Container(self)
        container.container_id="LRC"
        container.run()


class MessageRouteAbortTest(MessagingHandler):
    def __init__(self, sender_host, receiver_host, address):
        super(MessageRouteAbortTest, self).__init__()
        self.sender_host      = sender_host
        self.receiver_host    = receiver_host
        self.address          = address

        self.sender_conn   = None
        self.receiver_conn = None
        self.error         = None
        self.sender1       = None
        self.receiver      = None
        self.delivery      = None
        self.logger        = Logger(title="MessageRouteAbortTest")

        self.program       = [('D', 10), ('D', 20), ('A', 30), ('A', 40), ('D', 50), ('D', 60),
                              ('A', 100), ('D', 110),
                              ('A', 1000), ('A', 1010), ('A', 1020), ('A', 1030), ('A', 1040), ('D', 1050),
                              ('A', 10000), ('A', 10010), ('A', 10020), ('A', 10030), ('A', 10040), ('D', 10050),
                              ('A', 100000), ('A', 100010), ('A', 100020), ('A', 100030), ('A', 100040), ('D', 100050), ('F', 10)]
        self.result        = []
        self.expected_result = [10, 20, 50, 60, 110, 1050, 10050, 100050]

    def timeout(self):
        self.error = "Timeout Expired - Unprocessed Ops: %r, Result: %r" % (self.program, self.result)
        self.logger.log(self.error)
        self.sender_conn.close()
        self.receiver_conn.close()

    def on_start(self, event):
        self.logger.log("on_start")
        self.timer         = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        self.sender_conn   = event.container.connect(self.sender_host)
        self.receiver_conn = event.container.connect(self.receiver_host)
        self.sender1       = event.container.create_sender(self.sender_conn, self.address, name="S1")
        self.receiver      = event.container.create_receiver(self.receiver_conn, self.address)

    def send(self):
        if self.delivery:
            self.logger.log("send(): Do not send - delivery to be aborted is in flight")
            return

        op, size = self.program.pop(0) if len(self.program) > 0 else (None, None)
        self.logger.log("send - op=%s, size=%s" % (str(op), str(size)))

        if op == None:
            return

        body = ""
        if op == 'F':
            body = "FINISH"
        else:
            bod = str(size)
            bod2 = "0000000000" + bod
            bod3 = "." + bod2[-9:]
            body = bod3 * (size // 10)
        msg = Message(body=body)
        
        if op in 'DF':
            self.logger.log("send(): Send message size: %d" % (size))
            delivery = self.sender1.send(msg)

        if op == 'A':
            self.logger.log("send(): Start aborted message size: %d" % (size))
            self.delivery = self.sender1.delivery(self.sender1.delivery_tag())
            encoded = msg.encode()
            self.sender1.stream(encoded)

    def finish(self):
        if self.result != self.expected_result:
            self.error = "Expected: %r, Actual: %r" % (self.expected_result, self.result)
            self.logger.log(self.error)
        self.sender_conn.close()
        self.receiver_conn.close()
        self.timer.cancel()
        
    def on_sendable(self, event):
        self.logger.log("on_sendable")
        if event.sender == self.sender1:
            if self.delivery:
                self.delivery.abort()
                self.delivery = None
                self.logger.log("on_sendable aborts delivery")
            else:
                self.send()

    def on_message(self, event):
        m = event.message
        if m.body == "FINISH":
            self.finish()
        else:
            self.logger.log("on_message receives len: %d" %(len(m.body)))
            self.result.append(len(m.body))
            self.send()

    def run(self):
        Container(self).run()


class MulticastTruncateTest(MessagingHandler):
    def __init__(self, sender_host, receiver_host1, receiver_host2, address):
        super(MulticastTruncateTest, self).__init__()
        self.sender_host      = sender_host
        self.receiver_host1   = receiver_host1
        self.receiver_host2   = receiver_host2
        self.address          = address
        self.r_attach_count   = 0
        self.senders_created  = False

        self.sender_conn    = None
        self.receiver1_conn = None
        self.receiver2_conn = None
        self.error          = None
        self.sender1        = None
        self.sender2        = None
        self.sender3        = None
        self.receiver1      = None
        self.receiver2      = None
        self.streaming      = False
        self.delivery       = None
        self.data           = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
        self.long_data      = ""

        self.completions     = 0
        self.sent_stream     = 0
        self.program         = ['Send_Short_1', 'Send_Long_Truncated', 'Send_Short_2', 'Send_Short_3']
        self.result1         = []
        self.result2         = []
        self.expected_result = ['Send_Short_1', 'Aborted_Delivery', '2', '2', '2', '2', '2',
                                '2', '2', '2', '2', '2', 'Send_Short_2', '3', '3', '3', '3',
                                '3', '3', '3', '3', '3', '3', 'Send_Short_3']

    def timeout(self):
        self.error = "Timeout Expired - Unprocessed Ops: %r, Result1: %r, Result2: %r" % (self.program, self.result1, self.result2)
        self.sender_conn.close()
        self.receiver1_conn.close()
        self.receiver2_conn.close()

    def on_start(self, event):
        self.timer          = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        self.sender_conn    = event.container.connect(self.sender_host)
        self.receiver1_conn = event.container.connect(self.receiver_host1)
        self.receiver2_conn = event.container.connect(self.receiver_host2)
        self.receiver1      = event.container.create_receiver(self.receiver1_conn, self.address)
        self.receiver2      = event.container.create_receiver(self.receiver2_conn, self.address)

    def stream(self):
        self.sender1.stream(BINARY(self.long_data))
        self.sent_stream += len(self.long_data)
        if self.sent_stream >= 1000000:
            self.streaming = False
            self.sender1.close()
            self.send()

    def send(self):
        if self.streaming:
            self.stream()
            return
        next_op = self.program.pop(0) if len(self.program) > 0 else None
        if next_op == 'Send_Short_1':
            m = Message(body="%s" % next_op)
            self.sender1.send(m)

        elif next_op == 'Send_Long_Truncated':
            for i in range(100):
                self.long_data += self.data
            self.delivery  = self.sender1.delivery(self.sender1.delivery_tag())
            self.streaming = True
            self.stream()
        elif next_op == 'Send_Short_2':
            m = Message(body="2")
            for i in range(10):
                self.sender2.send(m)
            m = Message(body="Send_Short_2")
            self.sender2.send(m)
            self.sender2.close()
        elif next_op == 'Send_Short_3':
            m = Message(body="3")
            for i in range(10):
                self.sender3.send(m)
            m = Message(body="%s" % next_op)
            self.sender3.send(m)
            self.sender_conn.close()

    def on_sendable(self, event):
        self.send()

    def on_link_opened(self, event):
        if event.receiver == self.receiver1:
            self.r_attach_count += 1

        if event.receiver == self.receiver2:
            self.r_attach_count += 1

        if self.r_attach_count == 2 and not self.senders_created:
            self.senders_created = True
            self.sender1 = event.container.create_sender(self.sender_conn,
                                                         self.address,
                                                         name="S1")
            self.sender2 = event.container.create_sender(self.sender_conn,
                                                         self.address,
                                                         name="S2")
            self.sender3 = event.container.create_sender(self.sender_conn,
                                                         self.address,
                                                         name="S3")
    def on_message(self, event):
        m = event.message
        if event.receiver == self.receiver1:
            self.result1.append(m.body)
        elif event.receiver == self.receiver2:
            self.result2.append(m.body)
        if m.body == 'Send_Short_1':
            self.send()
        elif m.body == 'Send_Short_2':
            self.send()
        elif m.body == 'Send_Short_3':
            self.completions += 1
            if self.completions == 2:
                if self.result1 != self.expected_result or self.result2 != self.expected_result:
                    self.error = "Expected: %r, Actuals: %r, %r" % (self.expected_result, self.result1, self.result2)
                self.receiver1_conn.close()
                self.receiver2_conn.close()
                self.timer.cancel()

    def on_aborted(self, event):
        if event.receiver == self.receiver1:
            self.result1.append('Aborted_Delivery')
        elif event.receiver == self.receiver2:
            self.result2.append('Aborted_Delivery')
        self.send()

    def run(self):
        Container(self).run()


if __name__ == '__main__':
    unittest.main(main_module())
