blob: bc6dab81308280f2cc83f8fd51ddfa2a8d4c4ab6 [file] [log] [blame]
#!/usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License
#
# Run the C++ examples and verify that they behave as expected.
# Example executables must be in PATH
import sys, shutil, os, errno
from test_unittest import unittest
from test_subprocess import Popen, TestProcessError, check_output, in_path
import test_subprocess
from os.path import dirname
from string import Template
class Server(test_subprocess.Server):
@property
def addr(self):
return ":%s/example" % self.port
def _cyrusSetup(conf_dir):
"""Write out simple SASL config.tests
"""
saslpasswd = os.getenv('SASLPASSWD')
if saslpasswd:
t = Template("""sasldb_path: ${db}
mech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS
""")
abs_conf_dir = os.path.abspath(conf_dir)
shutil.rmtree(abs_conf_dir, True)
os.mkdir(abs_conf_dir)
db = os.path.join(abs_conf_dir,'proton.sasldb')
conf = os.path.join(abs_conf_dir,'proton-server.conf')
with open(conf, 'w') as f:
f.write(t.substitute(db=db))
cmd_template = Template("echo password | ${saslpasswd} -c -p -f ${db} -u proton user")
cmd = cmd_template.substitute(db=db, saslpasswd=saslpasswd)
check_output(cmd, shell=True)
os.environ['PN_SASL_CONFIG_PATH'] = abs_conf_dir
# Globally initialize Cyrus SASL configuration
_cyrusSetup('sasl-conf')
class Broker(Server):
def __init__(self):
super(Broker, self).__init__(["broker", "-a", "//:0"], kill_me=True)
CLIENT_EXPECT="""Twas brillig, and the slithy toves => TWAS BRILLIG, AND THE SLITHY TOVES
Did gire and gymble in the wabe. => DID GIRE AND GYMBLE IN THE WABE.
All mimsy were the borogroves, => ALL MIMSY WERE THE BOROGROVES,
And the mome raths outgrabe. => AND THE MOME RATHS OUTGRABE.
"""
def recv_expect():
return "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)])
class ContainerExampleTest(unittest.TestCase):
"""Run the container examples, verify they behave as expected."""
def test_helloworld(self):
self.assertMultiLineEqual('Hello World!\n', check_output(["helloworld", Broker.addr]))
def test_simple_send_recv(self):
self.assertMultiLineEqual("all messages confirmed\n", check_output(["simple_send", "-a", Broker.addr]))
self.assertMultiLineEqual(recv_expect(), check_output(["simple_recv", "-a", Broker.addr]))
def test_simple_recv_send(self):
recv = Popen(["simple_recv", "-a", Broker.addr])
self.assertMultiLineEqual("all messages confirmed\n", check_output(["simple_send", "-a", Broker.addr]))
self.assertMultiLineEqual(recv_expect(), recv.communicate()[0])
def test_simple_send_direct_recv(self):
recv = Server(["direct_recv", "-a", "//:0"])
self.assertMultiLineEqual("all messages confirmed\n", check_output(["simple_send", "-a", recv.addr]))
self.assertMultiLineEqual(recv_expect(), recv.communicate()[0])
def test_simple_recv_direct_send(self):
send = Server(["direct_send", "-a", "//:0"])
self.assertMultiLineEqual(recv_expect(), check_output(["simple_recv", "-a", send.addr]))
self.assertMultiLineEqual("all messages confirmed\n", send.communicate()[0])
def test_request_response(self):
with Popen(["server", Broker.addr, "example"], kill_me=True) as server:
server.expect("connected to %s" % Broker.addr)
self.assertMultiLineEqual(CLIENT_EXPECT, check_output(["client", "-a", Broker.addr]))
def test_request_response_direct(self):
with Server(["server_direct", "-a", "//:0"], kill_me=True) as server:
self.assertMultiLineEqual(CLIENT_EXPECT, check_output(["client", "-a", server.addr]))
def test_flow_control(self):
want="""success: Example 1: simple credit
success: Example 2: basic drain
success: Example 3: drain without credit
success: Example 4: high/low watermark
"""
self.assertMultiLineEqual(want, check_output(["flow_control", "--quiet"]))
def test_encode_decode(self):
want="""
== Array, list and map of uniform type.
array<int>[int(1), int(2), int(3)]
[ 1 2 3 ]
list[int(1), int(2), int(3)]
[ 1 2 3 ]
map{string(one):int(1), string(two):int(2)}
{ one:1 two:2 }
map{string(z):int(3), string(a):int(4)}
[ z:3 a:4 ]
list[string(a), string(b), string(c)]
== List and map of mixed type values.
list[int(42), string(foo)]
[ 42 foo ]
map{int(4):string(four), string(five):int(5)}
{ 4:four five:5 }
== Insert with stream operators.
array<int>[int(1), int(2), int(3)]
list[int(42), boolean(0), symbol(x)]
map{string(k1):int(42), symbol(k2):boolean(0)}
"""
self.maxDiff = None
self.assertMultiLineEqual(want, check_output(["encode_decode"]))
def test_scheduled_send_03(self):
# Output should be a bunch of "send" lines but can't guarantee exactly how many.
out = check_output(["scheduled_send_03", "-a", Broker.addr+"scheduled_send", "-t", "0.1", "-i", "0.001"]).split()
self.assertTrue(len(out) > 0);
self.assertEqual(["send"]*len(out), out)
def test_scheduled_send(self):
if not in_path("scheduled_send"):
return
out = check_output(["scheduled_send", "-a", Broker.addr+"scheduled_send", "-t", "0.1", "-i", "0.001"]).split()
self.assertTrue(len(out) > 0);
self.assertEqual(["send"]*len(out), out)
def test_message_properties(self):
expect="""using put/get: short=123 string=foo symbol=sym
using coerce: short(as long)=123
props[short]=123
props[string]=foo
props[symbol]=sym
short=42 string=bar
expected conversion_error: "unexpected type, want: uint got: int"
expected conversion_error: "unexpected type, want: uint got: string"
"""
self.assertMultiLineEqual(expect, check_output(["message_properties"]))
def test_multithreaded_client(self):
if not in_path("multithreaded_client"):
return
got = check_output(["multithreaded_client", Broker.addr, "examples", "10"])
self.maxDiff = None
self.assertIn("10 messages sent and received", got);
# Test is unstable, enable when fixed
def skip_test_multithreaded_client_flow_control(self):
if not in_path("multithreaded_client_flow_control"):
return
got = check_output(["multithreaded_client_flow_control", Broker.addr, "examples", "10", "2"])
self.maxDiff = None
self.assertIn("20 messages sent and received", got);
class ContainerExampleSSLTest(unittest.TestCase):
"""Run the SSL container examples, verify they behave as expected."""
def ssl_certs_dir(self):
"""Absolute path to the test SSL certificates"""
return os.path.join(dirname(sys.argv[0]), "ssl-certs")
def test_ssl(self):
# SSL without SASL, VERIFY_PEER_NAME
out = check_output(["ssl", "-c", self.ssl_certs_dir()])
expect = "Server certificate identity CN=test_server\nHello World!"
self.assertIn(expect, out)
def test_ssl_no_name(self):
out = check_output(["ssl", "-c", self.ssl_certs_dir(), "-v", "noname"])
expect = "Outgoing client connection connected via SSL. Server certificate identity CN=test_server\nHello World!"
self.assertIn(expect, out)
def test_ssl_bad_name(self):
# VERIFY_PEER
out = check_output(["ssl", "-c", self.ssl_certs_dir(), "-v", "fail"])
expect = "Expected failure of connection with wrong peer name"
self.assertIn(expect, out)
def test_ssl_client_cert(self):
# SSL with SASL EXTERNAL
expect="""Inbound client certificate identity CN=test_client
Outgoing client connection connected via SSL. Server certificate identity CN=test_server
Hello World!
"""
out = check_output(["ssl_client_cert", self.ssl_certs_dir()])
self.assertIn(expect, out)
if __name__ == "__main__":
with Broker() as b:
Broker.addr = b.addr
unittest.main()