#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function

import unittest, os, json
from subprocess      import PIPE, STDOUT
from proton          import Message, Timeout
from system_test     import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
from proton.handlers import MessagingHandler
from proton.reactor  import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption, ApplicationEvent, EventInjector
from proton.utils    import BlockingConnection
from qpid_dispatch.management.client import Node

import time
import datetime

#====================================================
# Helper classes for all tests.
#====================================================


# Named timers allow test code to distinguish between several
# simultaneous timers, going off at different rates.
class Timeout ( object ):
    def __init__(self, parent, name):
        self.parent = parent
        self.name   = name

    def on_timer_task(self, event):
        self.parent.timeout ( self.name )



#================================================================
#     Setup
#================================================================

class TopologyAdditionTests ( TestCase ):

    @classmethod
    def setUpClass(cls):
        super(TopologyAdditionTests, cls).setUpClass()

        def router ( name, more_config ):

            config = [ ('router',  {'mode': 'interior', 'id': name}),
                       ('address', {'prefix': 'closest',   'distribution': 'closest'}),
                       ('address', {'prefix': 'balanced',  'distribution': 'balanced'}),
                       ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
                     ]      \
                     + more_config

            config = Qdrouterd.Config(config)

            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))

        cls.routers = []

        client_ports = dict()
        client_ports['A'] = cls.tester.get_port()
        client_ports['B'] = cls.tester.get_port()
        client_ports['C'] = cls.tester.get_port()
        client_ports['D'] = cls.tester.get_port()

        cls.inter_router_ports = dict()
        cls.inter_router_ports['A'] = cls.tester.get_port()
        cls.inter_router_ports['B'] = cls.tester.get_port()

        initial_cost = 10
        lower_cost   =  8
        higher_cost  = 12

        # Only routers A and B are set up initially by this class.
        # Routers C and D are started by the test itself.
        router_A_config = [
                            ( 'listener',
                              { 'port': client_ports['A'],
                                'role': 'normal',
                                'stripAnnotations': 'no'
                              }
                            ),
                            ( 'listener',
                              {  'role': 'inter-router',
                                 'port': cls.inter_router_ports['A']
                              }
                            )
                         ]

        router_B_config = [
                            ( 'listener',
                              { 'port': client_ports['B'],
                                'role': 'normal',
                                'stripAnnotations': 'no'
                              }
                            ),
                            ( 'listener',
                              { 'role': 'inter-router',
                                'port': cls.inter_router_ports['B'],
                                'stripAnnotations': 'no'
                              }
                            ),
                            ( 'connector',
                              {  'name': 'AB_connector',
                                 'role': 'inter-router',
                                 'port': cls.inter_router_ports['A'],
                                 'verifyHostname': 'no',
                                 'cost':  initial_cost,
                                 'stripAnnotations': 'no'
                              }
                            )
                         ]

        router ( 'A', router_A_config )
        router ( 'B', router_B_config )

        router_A = cls.routers[0]
        router_B = cls.routers[1]

        router_A.wait_router_connected('B')

        cls.A_addr = router_A.addresses[0]
        cls.B_addr = router_B.addresses[0]



        # The two connections that this router will make, AC and BC,
        # will be lower cost than the direct AB route that the network
        # already has.
        cls.router_C_config = [
                                ( 'listener',
                                  { 'port': client_ports['C'],
                                    'role': 'normal',
                                    'stripAnnotations': 'no'
                                  }
                                ),
                                ( 'connector',
                                  {  'name': 'AC_connector',
                                     'role': 'inter-router',
                                     'port': cls.inter_router_ports['A'],
                                     'verifyHostname': 'no',
                                     'cost':  int(lower_cost / 2),
                                     'stripAnnotations': 'no',
                                     'linkCapacity' : 1000
                                  }
                                ),
                                ( 'connector',
                                  {  'name': 'BC_connector',
                                     'role': 'inter-router',
                                     'port': cls.inter_router_ports['B'],
                                     'verifyHostname': 'no',
                                     'cost':  int(lower_cost / 2),
                                     'stripAnnotations': 'no',
                                     'linkCapacity' : 1000
                                  }
                                )
                              ]

        # The two connections that this router will make, AD and BD,
        # will be higher cost than the other paths the networks already has
        # available to get from A to B.
        cls.router_D_config = [
                                ( 'listener',
                                  { 'port': client_ports['D'],
                                    'role': 'normal',
                                    'stripAnnotations': 'no'
                                  }
                                ),
                                ( 'connector',
                                  {  'name': 'AD_connector',
                                     'role': 'inter-router',
                                     'port': cls.inter_router_ports['A'],
                                     'verifyHostname': 'no',
                                     'cost':  int(higher_cost / 2),
                                     'stripAnnotations': 'no',
                                     'linkCapacity' : 1000
                                  }
                                ),
                                ( 'connector',
                                  {  'name': 'BD_connector',
                                     'role': 'inter-router',
                                     'port': cls.inter_router_ports['B'],
                                     'verifyHostname': 'no',
                                     'cost':  int(higher_cost / 2),
                                     'stripAnnotations': 'no',
                                     'linkCapacity' : 1000
                                  }
                                )
                              ]



    # This method allows test code to add new routers during the test,
    # rather than only at startup like A and B above.
    def addRouter ( self, name, more_config ) :
        config = [ ('router',  {'mode': 'interior', 'id': name}),
                   ('address', {'prefix': 'closest',   'distribution': 'closest'}),
                   ('address', {'prefix': 'balanced',  'distribution': 'balanced'}),
                   ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
                 ]    \
                 + more_config

        config = Qdrouterd.Config(config)

        TopologyAdditionTests.routers.append(TopologyAdditionTests.tester.qdrouterd(name, config, wait=True))


    def test_01_new_route_low_cost ( self ):
        # During the test, test code will add a new router C,
        # connecting A and B with new low-cost links. At that
        # point the test's messages should switch from using
        # route AB to using route ACB.
        # By passing both of these routes to the test, I tell
        # it to expect both of them to be used.
        # If it terminates with the second path remaining unused
        # it will fail.
        #
        # Since this test alters the path that the messages follow,
        # it is OK for some messages to be released rather than
        # delivered. It doesn't always happen - depends on timing.
        initial_expected_trace = [ '0/A', '0/B' ]
        final_expected_trace   = [ '0/A', '0/C', '0/B' ]
        released_ok = True

        test = AddRouter ( self.A_addr,
                           self.B_addr,
                           "closest/01",
                           self,
                           'C',
                           self.router_C_config,
                           [ initial_expected_trace, final_expected_trace ],
                           released_ok
                         )
        test.run()
        self.assertEqual ( None, test.error )



    def test_02_new_route_high_cost ( self ):
        # During the test, test code will add a new router D,
        # connecting A and B with new links. But the links are
        # higher cost than what already exist. The network should
        # ignore them and keep using the lowest cost route that it
        # already has.
        # We communicate this expectation to the test by sending
        # it a single expected trace. The test will fail with
        # error if any other traces are encountered.
        #
        # Since this test does not alter the path that the messages
        # follow, it is *not* OK for any messages to be released
        # rather than delivered.
        only_expected_trace   = [ '0/A', '0/C', '0/B' ]
        released_ok = False

        test = AddRouter ( self.A_addr,
                           self.B_addr,
                           "closest/02",
                           self,
                           'D',
                           self.router_D_config,
                           [ only_expected_trace ],
                           released_ok
                         )
        test.run()
        self.assertEqual ( None, test.error )




#================================================================
#     Tests
#================================================================


#--------------------------------------------------------------
#
# First test
# ------------------
#
# Send some messages through the original A---B router network,
# Then change it to look like this:
#
#                    C
#                   / \
#                  /   \
#                 /     \
#                /       \
#               A -------- B
#
# But the caller controls what costs are assigned to the two
# new links, so only the caller knows whether messages should
# start to flow through the new route ACB or not. It passes
# that knowledge in to us as a list of expected paths.
# This test's job is to make sure that all the expected paths
# get used by messages, and no others get used.
#
#
# Second test
# ------------------
#
# The triangular network from the first test still exists, and
# we will add to it a new router D which also connects A and B.
#
#                    C
#                   / \
#                  /   \
#                 /     \
#                /       \
#               A -------- B
#                \       /
#                 \     /
#                  \   /
#                   \ /
#                    D
# As in the first test, the caller tells us what routes ought
# to be followed, by putting them in the 'expected_traces' arg.
#
#--------------------------------------------------------------

class AddRouter ( MessagingHandler ):
    def __init__ ( self,
                   send_addr,
                   recv_addr,
                   destination,
                   parent,
                   new_router_name,
                   new_router_config,
                   expected_traces,
                   released_ok
                 ):
        super(AddRouter, self).__init__(prefetch=100)
        self.send_addr         = send_addr
        self.recv_addr         = recv_addr
        self.dest              = destination
        self.parent            = parent
        self.new_router_name   = new_router_name
        self.new_router_config = new_router_config
        self.released_ok       = released_ok

        self.error         = None
        self.sender        = None
        self.receiver      = None

        self.n_messages    = 30
        self.n_sent        = 0
        self.n_received    = 0
        self.n_released    = 0
        self.n_accepted    = 0

        self.test_timer    = None
        self.send_timer    = None
        self.timeout_count = 0
        self.reactor       = None
        self.container     = None
        self.finishing     = False

        # The parent sends us a list of the traces we
        # ought to see on messages.
        # Make a little data structure that
        # will keep track of how many times each trace was seen.
        self.expected_trace_counts = list()
        for i in range ( len(expected_traces )) :
            self.expected_trace_counts.append ( [ expected_traces[i], 0 ] )


    def run ( self ) :
        Container(self).run()


    # Close everything and allow the test to terminate.
    def bail ( self, reason_for_bailing ) :
        self.finishing = True
        self.error = reason_for_bailing
        self.receiver.close()
        self.send_conn.close()
        self.recv_conn.close()
        self.test_timer.cancel()
        self.send_timer.cancel()


    # There are two timers. The 'test' timer should only expire if
    # something has gone wrong, in which case it terminates the test.
    # The 'send' timer expires frequently, and every time it goes off
    # we send out a little batch of messages.
    def timeout ( self, name ):

        if self.finishing :
            return

        self.timeout_count += 1
        if name == "test" :
            self.bail ( "Timeout Expired: %d messages received, %d expected." % (self.n_received, self.n_messages) )
        elif name == "send" :
            self.send()
            self.send_timer = self.reactor.schedule(1, Timeout(self, "send"))

            # At T+5, create the new router with link costs as
            # specified by parent. We do it partway into the test
            # so that some messages will flow through the original
            # network, and some will flow through the network with
            # the new router added.
            if self.timeout_count == 5 :
                self.parent.addRouter ( self.new_router_name, self.new_router_config )


    def on_start(self, event):
        self.reactor   = event.reactor
        self.container = event.container

        self.test_timer  = self.reactor.schedule(30, Timeout(self, "test"))
        self.send_timer  = self.reactor.schedule(1, Timeout(self, "send"))

        self.send_conn   = event.container.connect(self.send_addr)
        self.recv_conn   = event.container.connect(self.recv_addr)

        self.sender      = event.container.create_sender(self.send_conn, self.dest)
        self.receiver    = event.container.create_receiver(self.recv_conn, self.dest)
        self.receiver.flow ( self.n_messages )


    #------------------------------------------------------------
    # Sender Side
    #------------------------------------------------------------

    def send(self):

        if self.n_sent >= self.n_messages :
            return

        # Send little bursts of 3 messages every sender-timeout.
        for _ in range(3) :
            msg = Message(body=self.n_sent)
            self.sender.send(msg)
            self.n_sent += 1
            if self.n_sent == self.n_messages :
                return


    # The caller of this tests decides whether it is OK or
    # not OK to have some messages released during the test.
    def on_released ( self, event ) :
        if self.released_ok :
            self.n_released += 1
            self.check_count ( )
        else :
            self.bail ( "a message was released." )


    def on_accepted ( self, event ) :
        self.n_accepted += 1
        self.check_count ( )


    #
    # Do the released plus the accepted messages add up to the number
    # that were sent? If so, bail out with success.
    # Do NOT end the test if the number is still shy of the expected
    # total. The callers of this method just call it every time they
    # get something -- it will be called many times poer test.
    #
    # Pleae Note:
    #   This check is on the 'sender' side of this test, rather than the
    #   'receiver' side, because it is to the sender that we make a
    #   guarantee: namely, that the sender should know the disposition of
    #   all sent messages -- whether they have been accepted by the receiver,
    #   or released by the router network.
    #
    def check_count ( self ) :
        if self.n_accepted + self.n_released == self.n_messages :
            self.finishing = True
            self.finish_test ( )



    #------------------------------------------------------------
    # Receiver Side
    #------------------------------------------------------------

    def on_message(self, event):
        if self.finishing :
            return
        self.n_received += 1
        trace = event.message.annotations [ 'x-opt-qd.trace' ]
        # Introduce flaws for debugging.
        # if self.n_received == 13 :
        #     trace = [ '0/B', '0/A', '0/D' ]
        # if self.n_received == 13 :
        #     self.n_received -= 1
        self.record_trace ( trace )
        self.check_count ( )


    # Compare the trace that came from a message to the list of
    # traces the caller told us to expect. If it is one of the
    # expected traces, count it. Otherwise, fail the test.
    def record_trace ( self, observed_trace ):
        for trace_record in self.expected_trace_counts :
            trace = trace_record [ 0 ]
            if observed_trace == trace :
                trace_record [ 1 ] += 1
                return
        # If we get here, the trace is one we were not expecting. That's bad.
        self.bail ( "Unexpected trace: %s" % observed_trace )


    # Shut down everything and make sure that all of the extected traces
    # have been seen.
    def finish_test ( self ) :
        self.test_timer.cancel()
        self.send_timer.cancel()
        for trace_record in self.expected_trace_counts :
            count = trace_record[1]
            # Deliberate flaw for debugging.
            # count = 0
            if count <= 0 :
                self.bail ( "Trace %s was not seen." % trace_record[0] )
                return

        # success
        self.bail ( None )



if __name__ == '__main__':
    unittest.main(main_module())
