| #!/usr/bin/python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License |
| # |
| |
| # Run the C++ examples and verify that they behave as expected. |
| # Example executables must be in PATH |
| |
| import sys, shutil, os, errno |
| from test_unittest import unittest |
| from test_subprocess import Popen, TestProcessError, check_output, in_path |
| import test_subprocess |
| from os.path import dirname |
| from string import Template |
| |
| class Server(test_subprocess.Server): |
| @property |
| def addr(self): |
| return ":%s/example" % self.port |
| |
| def _cyrusSetup(conf_dir): |
| """Write out simple SASL config.tests |
| """ |
| saslpasswd = os.getenv('SASLPASSWD') |
| if saslpasswd: |
| t = Template("""sasldb_path: ${db} |
| mech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS |
| """) |
| abs_conf_dir = os.path.abspath(conf_dir) |
| shutil.rmtree(abs_conf_dir, True) |
| os.mkdir(abs_conf_dir) |
| db = os.path.join(abs_conf_dir,'proton.sasldb') |
| conf = os.path.join(abs_conf_dir,'proton-server.conf') |
| with open(conf, 'w') as f: |
| f.write(t.substitute(db=db)) |
| cmd_template = Template("echo password | ${saslpasswd} -c -p -f ${db} -u proton user") |
| cmd = cmd_template.substitute(db=db, saslpasswd=saslpasswd) |
| check_output(cmd, shell=True) |
| os.environ['PN_SASL_CONFIG_PATH'] = abs_conf_dir |
| |
| # Globally initialize Cyrus SASL configuration |
| _cyrusSetup('sasl-conf') |
| |
| class Broker(Server): |
| def __init__(self): |
| super(Broker, self).__init__(["broker", "-a", "//:0"], kill_me=True) |
| |
| CLIENT_EXPECT="""Twas brillig, and the slithy toves => TWAS BRILLIG, AND THE SLITHY TOVES |
| Did gire and gymble in the wabe. => DID GIRE AND GYMBLE IN THE WABE. |
| All mimsy were the borogroves, => ALL MIMSY WERE THE BOROGROVES, |
| And the mome raths outgrabe. => AND THE MOME RATHS OUTGRABE. |
| """ |
| |
| def recv_expect(): |
| return "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)]) |
| |
| class ContainerExampleTest(unittest.TestCase): |
| """Run the container examples, verify they behave as expected.""" |
| |
| def test_helloworld(self): |
| self.assertMultiLineEqual('Hello World!\n', check_output(["helloworld", Broker.addr])) |
| |
| def test_simple_send_recv(self): |
| self.assertMultiLineEqual("all messages confirmed\n", check_output(["simple_send", "-a", Broker.addr])) |
| self.assertMultiLineEqual(recv_expect(), check_output(["simple_recv", "-a", Broker.addr])) |
| |
| def test_simple_recv_send(self): |
| recv = Popen(["simple_recv", "-a", Broker.addr]) |
| self.assertMultiLineEqual("all messages confirmed\n", check_output(["simple_send", "-a", Broker.addr])) |
| self.assertMultiLineEqual(recv_expect(), recv.communicate()[0]) |
| |
| def test_simple_send_direct_recv(self): |
| recv = Server(["direct_recv", "-a", "//:0"]) |
| self.assertMultiLineEqual("all messages confirmed\n", check_output(["simple_send", "-a", recv.addr])) |
| self.assertMultiLineEqual(recv_expect(), recv.communicate()[0]) |
| |
| def test_simple_recv_direct_send(self): |
| send = Server(["direct_send", "-a", "//:0"]) |
| self.assertMultiLineEqual(recv_expect(), check_output(["simple_recv", "-a", send.addr])) |
| self.assertMultiLineEqual("all messages confirmed\n", send.communicate()[0]) |
| |
| def test_request_response(self): |
| with Popen(["server", Broker.addr, "example"], kill_me=True) as server: |
| server.expect("connected to %s" % Broker.addr) |
| self.assertMultiLineEqual(CLIENT_EXPECT, check_output(["client", "-a", Broker.addr])) |
| |
| def test_request_response_direct(self): |
| with Server(["server_direct", "-a", "//:0"], kill_me=True) as server: |
| self.assertMultiLineEqual(CLIENT_EXPECT, check_output(["client", "-a", server.addr])) |
| |
| def test_flow_control(self): |
| want="""success: Example 1: simple credit |
| success: Example 2: basic drain |
| success: Example 3: drain without credit |
| success: Example 4: high/low watermark |
| """ |
| self.assertMultiLineEqual(want, check_output(["flow_control", "--quiet"])) |
| |
| def test_encode_decode(self): |
| want=""" |
| == Array, list and map of uniform type. |
| array<int>[int(1), int(2), int(3)] |
| [ 1 2 3 ] |
| list[int(1), int(2), int(3)] |
| [ 1 2 3 ] |
| map{string(one):int(1), string(two):int(2)} |
| { one:1 two:2 } |
| map{string(z):int(3), string(a):int(4)} |
| [ z:3 a:4 ] |
| list[string(a), string(b), string(c)] |
| |
| == List and map of mixed type values. |
| list[int(42), string(foo)] |
| [ 42 foo ] |
| map{int(4):string(four), string(five):int(5)} |
| { 4:four five:5 } |
| |
| == Insert with stream operators. |
| array<int>[int(1), int(2), int(3)] |
| list[int(42), boolean(0), symbol(x)] |
| map{string(k1):int(42), symbol(k2):boolean(0)} |
| """ |
| self.maxDiff = None |
| self.assertMultiLineEqual(want, check_output(["encode_decode"])) |
| |
| def test_scheduled_send_03(self): |
| # Output should be a bunch of "send" lines but can't guarantee exactly how many. |
| out = check_output(["scheduled_send_03", "-a", Broker.addr+"scheduled_send", "-t", "0.1", "-i", "0.001"]).split() |
| self.assertTrue(len(out) > 0); |
| self.assertEqual(["send"]*len(out), out) |
| |
| def test_scheduled_send(self): |
| if not in_path("scheduled_send"): |
| return |
| out = check_output(["scheduled_send", "-a", Broker.addr+"scheduled_send", "-t", "0.1", "-i", "0.001"]).split() |
| self.assertTrue(len(out) > 0); |
| self.assertEqual(["send"]*len(out), out) |
| |
| def test_message_properties(self): |
| expect="""using put/get: short=123 string=foo symbol=sym |
| using coerce: short(as long)=123 |
| props[short]=123 |
| props[string]=foo |
| props[symbol]=sym |
| short=42 string=bar |
| expected conversion_error: "unexpected type, want: uint got: int" |
| expected conversion_error: "unexpected type, want: uint got: string" |
| """ |
| self.assertMultiLineEqual(expect, check_output(["message_properties"])) |
| |
| def test_multithreaded_client(self): |
| if not in_path("multithreaded_client"): |
| return |
| got = check_output(["multithreaded_client", Broker.addr, "examples", "10"]) |
| self.maxDiff = None |
| self.assertIn("10 messages sent and received", got); |
| |
| |
| # Test is unstable, enable when fixed |
| def skip_test_multithreaded_client_flow_control(self): |
| if not in_path("multithreaded_client_flow_control"): |
| return |
| got = check_output(["multithreaded_client_flow_control", Broker.addr, "examples", "10", "2"]) |
| self.maxDiff = None |
| self.assertIn("20 messages sent and received", got); |
| |
| class ContainerExampleSSLTest(unittest.TestCase): |
| """Run the SSL container examples, verify they behave as expected.""" |
| |
| def ssl_certs_dir(self): |
| """Absolute path to the test SSL certificates""" |
| return os.path.join(dirname(sys.argv[0]), "ssl-certs") |
| |
| def test_ssl(self): |
| # SSL without SASL, VERIFY_PEER_NAME |
| out = check_output(["ssl", "-c", self.ssl_certs_dir()]) |
| expect = "Server certificate identity CN=test_server\nHello World!" |
| self.assertIn(expect, out) |
| |
| def test_ssl_no_name(self): |
| out = check_output(["ssl", "-c", self.ssl_certs_dir(), "-v", "noname"]) |
| expect = "Outgoing client connection connected via SSL. Server certificate identity CN=test_server\nHello World!" |
| self.assertIn(expect, out) |
| |
| def test_ssl_bad_name(self): |
| # VERIFY_PEER |
| out = check_output(["ssl", "-c", self.ssl_certs_dir(), "-v", "fail"]) |
| expect = "Expected failure of connection with wrong peer name" |
| self.assertIn(expect, out) |
| |
| def test_ssl_client_cert(self): |
| # SSL with SASL EXTERNAL |
| expect="""Inbound client certificate identity CN=test_client |
| Outgoing client connection connected via SSL. Server certificate identity CN=test_server |
| Hello World! |
| """ |
| out = check_output(["ssl_client_cert", self.ssl_certs_dir()]) |
| self.assertIn(expect, out) |
| |
| if __name__ == "__main__": |
| with Broker() as b: |
| Broker.addr = b.addr |
| unittest.main() |