| from __future__ import absolute_import |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from __future__ import absolute_import |
| |
| import os |
| |
| from .common import Test, Skipped, free_tcp_ports, \ |
| MessengerReceiverC, MessengerSenderC, \ |
| ReactorReceiverC, ReactorSenderC, \ |
| isSSLPresent |
| |
| # |
| # Tests that run the apps |
| # |
| |
| class AppTests(Test): |
| |
| def __init__(self, *args): |
| Test.__init__(self, *args) |
| self.is_valgrind = False |
| |
| def default(self, name, value, **kwargs): |
| if self.is_valgrind: |
| default = kwargs.get("valgrind", value) |
| else: |
| default = value |
| return Test.default(self, name, default, **kwargs) |
| |
| @property |
| def iterations(self): |
| return int(self.default("iterations", 2, fast=1, valgrind=2)) |
| |
| @property |
| def send_count(self): |
| return int(self.default("send_count", 17, fast=1, valgrind=2)) |
| |
| @property |
| def target_count(self): |
| return int(self.default("target_count", 5, fast=1, valgrind=2)) |
| |
| @property |
| def send_batch(self): |
| return int(self.default("send_batch", 7, fast=1, valgrind=2)) |
| |
| @property |
| def forward_count(self): |
| return int(self.default("forward_count", 5, fast=1, valgrind=2)) |
| |
| @property |
| def port_count(self): |
| return int(self.default("port_count", 3, fast=1, valgrind=2)) |
| |
| @property |
| def sender_count(self): |
| return int(self.default("sender_count", 3, fast=1, valgrind=2)) |
| |
| def valgrind_test(self): |
| self.is_valgrind = True |
| |
| def setUp(self): |
| self.senders = [] |
| self.receivers = [] |
| |
| def tearDown(self): |
| pass |
| |
| def _do_test(self, iterations=1): |
| verbose = self.verbose |
| |
| for R in self.receivers: |
| R.start( verbose ) |
| |
| for j in range(iterations): |
| for S in self.senders: |
| S.start( verbose ) |
| |
| for S in self.senders: |
| S.wait() |
| #print("SENDER OUTPUT:") |
| #print( S.stdout() ) |
| assert S.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'" |
| % (str(S.cmdline()), |
| S.status(), |
| S.stdout(), |
| S.stderr())) |
| |
| for R in self.receivers: |
| R.wait() |
| #print("RECEIVER OUTPUT") |
| #print( R.stdout() ) |
| assert R.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'" |
| % (str(R.cmdline()), |
| R.status(), |
| R.stdout(), |
| R.stderr())) |
| |
| # |
| # Traffic passing tests based on the Messenger apps |
| # |
| |
| class MessengerTests(AppTests): |
| |
| _timeout = 60 |
| |
| def _ssl_check(self): |
| if not isSSLPresent(): |
| raise Skipped("No SSL libraries found.") |
| if os.name=="nt": |
| raise Skipped("Windows SChannel lacks anonymous cipher support.") |
| |
| def __init__(self, *args): |
| AppTests.__init__(self, *args) |
| |
| def _do_oneway_test(self, receiver, sender, domain="amqp"): |
| """ Send N messages to a receiver. |
| Parameters: |
| iterations - repeat the senders this many times |
| target_count = # of targets to send to. |
| send_count = # messages sent to each target |
| """ |
| iterations = self.iterations |
| send_count = self.send_count |
| target_count = self.target_count |
| |
| send_total = send_count * target_count |
| receive_total = send_total * iterations |
| |
| port = free_tcp_ports()[0] |
| |
| receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)] |
| receiver.receive_count = receive_total |
| receiver.timeout = MessengerTests._timeout |
| self.receivers.append( receiver ) |
| |
| sender.targets = ["%s://0.0.0.0:%s/X%d" % (domain, port, j) for j in range(target_count)] |
| sender.send_count = send_total |
| sender.timeout = MessengerTests._timeout |
| self.senders.append( sender ) |
| |
| self._do_test(iterations) |
| |
| def _do_echo_test(self, receiver, sender, domain="amqp"): |
| """ Send N messages to a receiver, which responds to each. |
| Parameters: |
| iterations - repeat the senders this many times |
| target_count - # targets to send to |
| send_count = # messages sent to each target |
| send_batch - wait for replies after this many messages sent |
| """ |
| iterations = self.iterations |
| send_count = self.send_count |
| target_count = self.target_count |
| send_batch = self.send_batch |
| |
| send_total = send_count * target_count |
| receive_total = send_total * iterations |
| |
| port = free_tcp_ports()[0] |
| |
| receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)] |
| receiver.receive_count = receive_total |
| receiver.send_reply = True |
| receiver.timeout = MessengerTests._timeout |
| self.receivers.append( receiver ) |
| |
| sender.targets = ["%s://0.0.0.0:%s/%dY" % (domain, port, j) for j in range(target_count)] |
| sender.send_count = send_total |
| sender.get_reply = True |
| sender.send_batch = send_batch |
| sender.timeout = MessengerTests._timeout |
| self.senders.append( sender ) |
| |
| self._do_test(iterations) |
| |
| # Removed messenger "relay" tests. The test start-up is faulty: |
| # msgr-recv prints it's -X ready message when it starts to open a |
| # connection but it does not wait for the remote open. The relay |
| # tests depends on mapping a container name from an incoming |
| # connection. They can fail under if the sender starts before the |
| # connection is complete (esp. valgrind with SSL connections) We |
| # could fix the tests but since messenger is deprecated it does |
| # not seem worthwhile. |
| |
| def _do_star_topology_test(self, r_factory, s_factory, domain="amqp"): |
| """ |
| A star-like topology, with a central receiver at the hub, and senders at |
| the spokes. Each sender will connect to each of the ports the receiver is |
| listening on. Each sender will then create N links per each connection. |
| Each sender will send X messages per link, waiting for a response. |
| Parameters: |
| iterations - repeat the senders this many times |
| port_count - # of ports the receiver will listen on. Each sender connects |
| to all ports. |
| sender_count - # of senders |
| target_count - # of targets per connection |
| send_count - # of messages sent to each target |
| send_batch - # of messages to send before waiting for response |
| """ |
| iterations = self.iterations |
| port_count = self.port_count |
| sender_count = self.sender_count |
| target_count = self.target_count |
| send_count = self.send_count |
| send_batch = self.send_batch |
| |
| send_total = port_count * target_count * send_count |
| receive_total = send_total * sender_count * iterations |
| |
| ports = free_tcp_ports(port_count) |
| |
| receiver = r_factory() |
| receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port) for port in ports] |
| receiver.receive_count = receive_total |
| receiver.send_reply = True |
| receiver.timeout = MessengerTests._timeout |
| self.receivers.append( receiver ) |
| |
| for i in range(sender_count): |
| sender = s_factory() |
| sender.targets = ["%s://0.0.0.0:%s/%d" % (domain, port, j) for port in ports for j in range(target_count)] |
| sender.send_count = send_total |
| sender.send_batch = send_batch |
| sender.get_reply = True |
| sender.timeout = MessengerTests._timeout |
| self.senders.append( sender ) |
| |
| self._do_test(iterations) |
| |
| def test_oneway_C(self): |
| self._do_oneway_test(MessengerReceiverC(), MessengerSenderC()) |
| |
| def test_oneway_C_SSL(self): |
| self._ssl_check() |
| self._do_oneway_test(MessengerReceiverC(), MessengerSenderC(), "amqps") |
| |
| def test_echo_C(self): |
| self._do_echo_test(MessengerReceiverC(), MessengerSenderC()) |
| |
| def test_echo_C_SSL(self): |
| self._ssl_check() |
| self._do_echo_test(MessengerReceiverC(), MessengerSenderC(), "amqps") |
| |
| def test_star_topology_C(self): |
| self._do_star_topology_test( MessengerReceiverC, MessengerSenderC ) |
| |
| def test_star_topology_C_SSL(self): |
| self._ssl_check() |
| self._do_star_topology_test( MessengerReceiverC, MessengerSenderC, "amqps" ) |
| |
| def test_oneway_reactor(self): |
| self._do_oneway_test(ReactorReceiverC(), ReactorSenderC()) |