#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License
#

# This is a test script to run the examples and verify that they behave as expected.

import unittest
import os, sys, socket, time
from  random import randrange
from subprocess import Popen, PIPE, STDOUT
import platform

def cmdline(*args):
    """Adjust executable name args[0] for windows and/or valgrind"""
    args = list(args)
    if platform.system() == "Windows":
        args[0] += ".exe"
    if "VALGRIND" in os.environ and os.environ["VALGRIND"]:
        args = [os.environ["VALGRIND"], "-q"] + args
    return args

def background(*args):
    """Run executable in the backround, return the popen"""
    p = Popen(cmdline(*args), stdout=PIPE, stderr=sys.stderr)
    p.args = args               # Save arguments for debugging output
    return p

def verify(p):
    """Wait for executable to exit and verify status."""
    try:
        out, err = p.communicate()
    except Exception as e:
        raise Exception("Error running %s: %s", p.args, e)
    if p.returncode:
        raise Exception("""%s exit code %s
vvvvvvvvvvvvvvvv
%s
^^^^^^^^^^^^^^^^
""" % (p.args, p.returncode, out))
    if platform.system() == "Windows":
        # Just \n please
        if out:
            out = out.translate(None, '\r')
    return out

def execute(*args):
    return verify(background(*args))

NULL = open(os.devnull, 'w')

def wait_addr(addr, timeout=10):
    """Wait up to timeout for something to listen on port"""
    deadline = time.time() + timeout
    while time.time() < deadline:
        try:
            c = socket.create_connection(addr.split(":"), deadline - time.time())
            c.close()
            return
        except socket.error as e:
            time.sleep(0.01)
    raise Exception("Timed out waiting for %s", addr)

def pick_addr():
    """Pick a new host:port address."""
    # TODO aconway 2015-07-14: need a safer way to pick ports.
    p =  randrange(10000, 20000)
    return "127.0.0.1:%s" % p

class Broker(object):
    """Run the test broker"""

    @classmethod
    def get(cls):
        if not hasattr(cls, "_broker"):
            cls._broker = Broker()
        return cls._broker

    @classmethod
    def stop(cls):
        if cls.get() and cls._broker.process:
            cls._broker.process.kill()
            cls._broker = None

    def __init__(self):
        self.addr = pick_addr()
        cmd = cmdline("broker", "-a", self.addr)
        try:
            self.process = Popen(cmd, stdout=NULL, stderr=sys.stderr)
            wait_addr(self.addr)
            self.addr += "/examples"
        except Exception as e:
            raise Exception("Error running %s: %s", cmd, e)

class ExampleTest(unittest.TestCase):
    """Run the examples, verify they behave as expected."""

    @classmethod
    def tearDownClass(self):
        Broker.stop()

    def test_helloworld(self):
        b = Broker.get()
        hw = execute("helloworld", b.addr)
        self.assertEqual('"Hello World!"\n', hw)

    def test_helloworld_blocking(self):
        b = Broker.get()
        hw = execute("helloworld_blocking", b.addr, b.addr)
        self.assertEqual('"Hello World!"\n', hw)

    def test_helloworld_direct(self):
        addr = pick_addr()
        hw = execute("helloworld_direct", addr)
        self.assertEqual('"Hello World!"\n', hw)

    def test_simple_send_recv(self):
        b = Broker.get()
        send = execute("simple_send", "-a", b.addr)
        self.assertEqual("all messages confirmed\n", send)
        recv = execute("simple_recv", "-a", b.addr)
        recv_expect = "simple_recv listening on amqp://%s\n" % (b.addr)
        recv_expect += "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)])
        self.assertEqual(recv_expect, recv)

    def test_simple_send_direct_recv(self):
        addr = pick_addr()
        recv = background("direct_recv", "-a", addr)
        wait_addr(addr)
        self.assertEqual("all messages confirmed\n", execute("simple_send", "-a", addr))
        recv_expect = "direct_recv listening on amqp://%s\n" % (addr)
        recv_expect += "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)])
        self.assertEqual(recv_expect, verify(recv))

    def test_simple_recv_direct_send(self):
        addr = pick_addr()
        send = background("direct_send", "-a", addr)
        wait_addr(addr)
        recv_expect = "simple_recv listening on amqp://%s\n" % (addr)
        recv_expect += "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)])
        self.assertEqual(recv_expect, execute("simple_recv", "-a", addr))
        send_expect = "direct_send listening on amqp://%s\nall messages confirmed\n" % (addr)
        self.assertEqual(send_expect, verify(send))

    CLIENT_EXPECT=""""Twas brillig, and the slithy toves" => "TWAS BRILLIG, AND THE SLITHY TOVES"
"Did gire and gymble in the wabe." => "DID GIRE AND GYMBLE IN THE WABE."
"All mimsy were the borogroves," => "ALL MIMSY WERE THE BOROGROVES,"
"And the mome raths outgrabe." => "AND THE MOME RATHS OUTGRABE."
"""
    def test_simple_recv_send(self):
        # Start receiver first, then run sender"""
        b = Broker.get()
        recv = background("simple_recv", "-a", b.addr)
        self.assertEqual("all messages confirmed\n", execute("simple_send", "-a", b.addr))
        recv_expect = "simple_recv listening on amqp://%s\n" % (b.addr)
        recv_expect += "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)])
        self.assertEqual(recv_expect, verify(recv))

    def test_request_response(self):
        b = Broker.get()
        server = background("server", "-a", b.addr)
        try:
            self.assertEqual(execute("client", "-a", b.addr), self.CLIENT_EXPECT)
        finally:
            server.kill()

    def test_request_response(self):
        b = Broker.get()
        server = background("server", "-a", b.addr)
        try:
            self.assertEqual(execute("sync_client", "-a", b.addr), self.CLIENT_EXPECT)
        finally:
            server.kill()

    def test_request_response_direct(self):
        addr = pick_addr()
        server = background("server_direct", "-a", addr+"/examples")
        wait_addr(addr)
        try:
            self.assertEqual(execute("client", "-a", addr+"/examples"), self.CLIENT_EXPECT)
        finally:
            server.kill()

    def test_encode_decode(self):
        expect="""
== Simple values: int, string, bool
Values: int(42), string("foo"), bool(true)
Extracted: 42, foo, 1
Encoded as AMQP in 8 bytes
Decoded: 42, foo, 1

== Specific AMQP types: byte, long, symbol
Values: byte(120), long(123456789123456789), symbol(:bar)
Extracted (with conversion) 120, 123456789123456789, bar
Extracted (exact) x, 123456789123456789, bar

== Array, list and map.
Values: array<int>[int(1), int(2), int(3)], list[int(4), int(5)], map{string("one"):int(1), string("two"):int(2)}
Extracted: [ 1 2 3 ], [ 4 5 ], { one:1 two:2 }

== List and map of mixed type values.
Values: list[int(42), string("foo")], map{int(4):string("four"), string("five"):int(5)}
Extracted: [ 42 "foo" ], { 4:"four" "five":5 }

== Insert with stream operators.
Values: array<int>[int(1), int(2), int(3)]
Values: list[int(42), bool(false), symbol(:x)]
Values: map{string("k1"):int(42), symbol(:"k2"):bool(false)}
"""
        self.maxDiff = None
        self.assertEqual(expect, execute("encode_decode"))

    def test_recurring_timer(self):
        expect="""Tick...
Tick...
Tock...
Tick...
Tock...
"""
        self.maxDiff = None
        self.assertEqual(expect, execute("recurring_timer", "-t", "3"))

if __name__ == "__main__":
    unittest.main()
