blob: c49829392cee031b158636f26fe56a5edfbd2b4a [file] [log] [blame]
#!/usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License
#
# Run the C examples and verify that they behave as expected.
# Example executables must be in PATH
import unittest
from test_subprocess import Popen, Server, TestProcessError, check_output
MESSAGES=10
def receive_expect_messages(n=MESSAGES): return ''.join(['{"sequence"=%s}\n'%i for i in range(1, n+1)])
def receive_expect_total(n=MESSAGES): return "%s messages received\n"%n
def receive_expect(n=MESSAGES): return receive_expect_messages(n)+receive_expect_total(n)
def send_expect(n=MESSAGES): return "%s messages sent and acknowledged\n" % n
def send_abort_expect(n=MESSAGES): return "%s messages started and aborted\n" % n
class Broker(Server):
def __init__(self):
super(Broker, self).__init__(["broker", "", "0"], kill_me=True)
class ExampleTest(unittest.TestCase):
def runex(self, name, port, messages=MESSAGES):
"""Run an example with standard arguments, return output"""
return check_output([name, "", port, "xtest", str(messages)])
def startex(self, name, port, messages=MESSAGES):
"""Start an example sub-process with standard arguments"""
return Popen([name, "", port, "xtest", str(messages)])
def test_send_receive(self):
"""Send first then receive"""
with Broker() as b:
self.assertEqual(send_expect(), self.runex("send", b.port))
self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port))
def test_receive_send(self):
"""Start receiving first, then send."""
with Broker() as b:
r = self.startex("receive", b.port)
self.assertEqual(send_expect(), self.runex("send", b.port))
self.assertMultiLineEqual(receive_expect(), r.communicate()[0])
def test_send_direct(self):
"""Send to direct server"""
d = Server(["direct", "", "0"])
self.assertEqual(send_expect(), self.runex("send", d.port))
self.assertMultiLineEqual(receive_expect(), d.communicate()[0])
def test_receive_direct(self):
"""Receive from direct server"""
d = Server(["direct", "", "0"])
self.assertMultiLineEqual(receive_expect(), self.runex("receive", d.port))
self.assertEqual("10 messages sent and acknowledged\n", d.communicate()[0])
def test_send_abort_broker(self):
"""Sending aborted messages to a broker"""
with Broker() as b:
self.assertEqual(send_expect(), self.runex("send", b.port))
self.assertEqual(send_abort_expect(), self.runex("send-abort", b.port))
for i in range(MESSAGES):
self.assertEqual("Message aborted\n", b.stdout.readline())
self.assertEqual(send_expect(), self.runex("send", b.port))
expect = receive_expect_messages(MESSAGES)+receive_expect_messages(MESSAGES)+receive_expect_total(20)
self.assertMultiLineEqual(expect, self.runex("receive", b.port, "20"))
def test_send_abort_direct(self):
"""Send aborted messages to the direct server"""
d = Server(["direct", "", "0", "examples", "20"])
self.assertEqual(send_expect(), self.runex("send", d.port))
self.assertEqual(send_abort_expect(), self.runex("send-abort", d.port))
self.assertEqual(send_expect(), self.runex("send", d.port))
expect = receive_expect_messages() + "Message aborted\n"*MESSAGES + receive_expect_messages()+receive_expect_total(20)
self.maxDiff = None
self.assertMultiLineEqual(expect, d.communicate()[0])
def test_send_ssl_receive(self):
"""Send with SSL, then receive"""
try:
with Broker() as b:
got = self.runex("send-ssl", b.port)
self.assertIn("secure connection:", got)
self.assertIn(send_expect(), got)
self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port))
except TestProcessError as e:
if e.output and e.output.startswith(b"error initializing SSL"):
print("Skipping %s: SSL not available" % self.id())
else:
raise
if __name__ == "__main__":
unittest.main()