| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License |
| # |
| |
| # This is a test script to run the examples and verify that they behave as expected. |
| |
| import unittest |
| import os, sys, socket, time |
| from random import randrange |
| from subprocess import Popen, PIPE, STDOUT |
| import platform |
| |
| def cmdline(*args): |
| """Adjust executable name args[0] for windows and/or valgrind""" |
| args = list(args) |
| if platform.system() == "Windows": |
| args[0] += ".exe" |
| if "VALGRIND" in os.environ and os.environ["VALGRIND"]: |
| args = [os.environ["VALGRIND"], "-q"] + args |
| return args |
| |
| def background(*args): |
| """Run executable in the backround, return the popen""" |
| p = Popen(cmdline(*args), stdout=PIPE, stderr=sys.stderr) |
| p.args = args # Save arguments for debugging output |
| return p |
| |
| def verify(p): |
| """Wait for executable to exit and verify status.""" |
| try: |
| out, err = p.communicate() |
| except Exception as e: |
| raise Exception("Error running %s: %s", p.args, e) |
| if p.returncode: |
| raise Exception("""%s exit code %s |
| vvvvvvvvvvvvvvvv |
| %s |
| ^^^^^^^^^^^^^^^^ |
| """ % (p.args, p.returncode, out)) |
| if platform.system() == "Windows": |
| # Just \n please |
| if out: |
| out = out.translate(None, '\r') |
| return out |
| |
| def execute(*args): |
| return verify(background(*args)) |
| |
| NULL = open(os.devnull, 'w') |
| |
| def wait_addr(addr, timeout=10): |
| """Wait up to timeout for something to listen on port""" |
| deadline = time.time() + timeout |
| while time.time() < deadline: |
| try: |
| c = socket.create_connection(addr.split(":"), deadline - time.time()) |
| c.close() |
| return |
| except socket.error as e: |
| time.sleep(0.01) |
| raise Exception("Timed out waiting for %s", addr) |
| |
| def pick_addr(): |
| """Pick a new host:port address.""" |
| # TODO aconway 2015-07-14: need a safer way to pick ports. |
| p = randrange(10000, 20000) |
| return "127.0.0.1:%s" % p |
| |
| class Broker(object): |
| """Run the test broker""" |
| |
| @classmethod |
| def get(cls): |
| if not hasattr(cls, "_broker"): |
| cls._broker = Broker() |
| return cls._broker |
| |
| @classmethod |
| def stop(cls): |
| if cls.get() and cls._broker.process: |
| cls._broker.process.kill() |
| cls._broker = None |
| |
| def __init__(self): |
| self.addr = pick_addr() |
| cmd = cmdline("broker", "-a", self.addr) |
| try: |
| self.process = Popen(cmd, stdout=NULL, stderr=sys.stderr) |
| wait_addr(self.addr) |
| self.addr += "/examples" |
| except Exception as e: |
| raise Exception("Error running %s: %s", cmd, e) |
| |
| class ExampleTest(unittest.TestCase): |
| """Run the examples, verify they behave as expected.""" |
| |
| @classmethod |
| def tearDownClass(self): |
| Broker.stop() |
| |
| def test_helloworld(self): |
| b = Broker.get() |
| hw = execute("helloworld", b.addr) |
| self.assertEqual('"Hello World!"\n', hw) |
| |
| def test_helloworld_blocking(self): |
| b = Broker.get() |
| hw = execute("helloworld_blocking", b.addr, b.addr) |
| self.assertEqual('"Hello World!"\n', hw) |
| |
| def test_helloworld_direct(self): |
| addr = pick_addr() |
| hw = execute("helloworld_direct", addr) |
| self.assertEqual('"Hello World!"\n', hw) |
| |
| def test_simple_send_recv(self): |
| b = Broker.get() |
| send = execute("simple_send", "-a", b.addr) |
| self.assertEqual("all messages confirmed\n", send) |
| recv = execute("simple_recv", "-a", b.addr) |
| recv_expect = "simple_recv listening on amqp://%s\n" % (b.addr) |
| recv_expect += "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)]) |
| self.assertEqual(recv_expect, recv) |
| |
| def test_simple_send_direct_recv(self): |
| addr = pick_addr() |
| recv = background("direct_recv", "-a", addr) |
| wait_addr(addr) |
| self.assertEqual("all messages confirmed\n", execute("simple_send", "-a", addr)) |
| recv_expect = "direct_recv listening on amqp://%s\n" % (addr) |
| recv_expect += "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)]) |
| self.assertEqual(recv_expect, verify(recv)) |
| |
| def test_simple_recv_direct_send(self): |
| addr = pick_addr() |
| send = background("direct_send", "-a", addr) |
| wait_addr(addr) |
| recv_expect = "simple_recv listening on amqp://%s\n" % (addr) |
| recv_expect += "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)]) |
| self.assertEqual(recv_expect, execute("simple_recv", "-a", addr)) |
| send_expect = "direct_send listening on amqp://%s\nall messages confirmed\n" % (addr) |
| self.assertEqual(send_expect, verify(send)) |
| |
| CLIENT_EXPECT=""""Twas brillig, and the slithy toves" => "TWAS BRILLIG, AND THE SLITHY TOVES" |
| "Did gire and gymble in the wabe." => "DID GIRE AND GYMBLE IN THE WABE." |
| "All mimsy were the borogroves," => "ALL MIMSY WERE THE BOROGROVES," |
| "And the mome raths outgrabe." => "AND THE MOME RATHS OUTGRABE." |
| """ |
| def test_simple_recv_send(self): |
| # Start receiver first, then run sender""" |
| b = Broker.get() |
| recv = background("simple_recv", "-a", b.addr) |
| self.assertEqual("all messages confirmed\n", execute("simple_send", "-a", b.addr)) |
| recv_expect = "simple_recv listening on amqp://%s\n" % (b.addr) |
| recv_expect += "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)]) |
| self.assertEqual(recv_expect, verify(recv)) |
| |
| def test_request_response(self): |
| b = Broker.get() |
| server = background("server", "-a", b.addr) |
| try: |
| self.assertEqual(execute("client", "-a", b.addr), self.CLIENT_EXPECT) |
| finally: |
| server.kill() |
| |
| def test_request_response(self): |
| b = Broker.get() |
| server = background("server", "-a", b.addr) |
| try: |
| self.assertEqual(execute("sync_client", "-a", b.addr), self.CLIENT_EXPECT) |
| finally: |
| server.kill() |
| |
| def test_request_response_direct(self): |
| addr = pick_addr() |
| server = background("server_direct", "-a", addr+"/examples") |
| wait_addr(addr) |
| try: |
| self.assertEqual(execute("client", "-a", addr+"/examples"), self.CLIENT_EXPECT) |
| finally: |
| server.kill() |
| |
| def test_encode_decode(self): |
| expect=""" |
| == Simple values: int, string, bool |
| Values: int(42), string("foo"), bool(true) |
| Extracted: 42, foo, 1 |
| Encoded as AMQP in 8 bytes |
| Decoded: 42, foo, 1 |
| |
| == Specific AMQP types: byte, long, symbol |
| Values: byte(120), long(123456789123456789), symbol(:bar) |
| Extracted (with conversion) 120, 123456789123456789, bar |
| Extracted (exact) x, 123456789123456789, bar |
| |
| == Array, list and map. |
| Values: array<int>[int(1), int(2), int(3)], list[int(4), int(5)], map{string("one"):int(1), string("two"):int(2)} |
| Extracted: [ 1 2 3 ], [ 4 5 ], { one:1 two:2 } |
| |
| == List and map of mixed type values. |
| Values: list[int(42), string("foo")], map{int(4):string("four"), string("five"):int(5)} |
| Extracted: [ 42 "foo" ], { 4:"four" "five":5 } |
| |
| == Insert with stream operators. |
| Values: array<int>[int(1), int(2), int(3)] |
| Values: list[int(42), bool(false), symbol(:x)] |
| Values: map{string("k1"):int(42), symbol(:"k2"):bool(false)} |
| """ |
| self.maxDiff = None |
| self.assertEqual(expect, execute("encode_decode")) |
| |
| def test_recurring_timer(self): |
| expect="""Tick... |
| Tick... |
| Tock... |
| Tick... |
| Tock... |
| """ |
| self.maxDiff = None |
| self.assertEqual(expect, execute("recurring_timer", "-t", "3")) |
| |
| if __name__ == "__main__": |
| unittest.main() |