blob: b90bdb8e74a816c25edc1df9a593b699b22ca864 [file] [log] [blame]
from __future__ import absolute_import
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import absolute_import
import os
from .common import Test, Skipped, free_tcp_ports, \
MessengerReceiverC, MessengerSenderC, \
ReactorReceiverC, ReactorSenderC, \
isSSLPresent
#
# Tests that run the apps
#
class AppTests(Test):
def __init__(self, *args):
Test.__init__(self, *args)
self.is_valgrind = False
def default(self, name, value, **kwargs):
if self.is_valgrind:
default = kwargs.get("valgrind", value)
else:
default = value
return Test.default(self, name, default, **kwargs)
@property
def iterations(self):
return int(self.default("iterations", 2, fast=1, valgrind=2))
@property
def send_count(self):
return int(self.default("send_count", 17, fast=1, valgrind=2))
@property
def target_count(self):
return int(self.default("target_count", 5, fast=1, valgrind=2))
@property
def send_batch(self):
return int(self.default("send_batch", 7, fast=1, valgrind=2))
@property
def forward_count(self):
return int(self.default("forward_count", 5, fast=1, valgrind=2))
@property
def port_count(self):
return int(self.default("port_count", 3, fast=1, valgrind=2))
@property
def sender_count(self):
return int(self.default("sender_count", 3, fast=1, valgrind=2))
def valgrind_test(self):
self.is_valgrind = True
def setUp(self):
self.senders = []
self.receivers = []
def tearDown(self):
pass
def _do_test(self, iterations=1):
verbose = self.verbose
for R in self.receivers:
R.start(verbose)
for j in range(iterations):
for S in self.senders:
S.start(verbose)
for S in self.senders:
S.wait()
#print("SENDER OUTPUT:")
#print( S.stdout() )
assert S.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'"
% (str(S.cmdline()),
S.status(),
S.stdout(),
S.stderr()))
for R in self.receivers:
R.wait()
#print("RECEIVER OUTPUT")
#print( R.stdout() )
assert R.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'"
% (str(R.cmdline()),
R.status(),
R.stdout(),
R.stderr()))
#
# Traffic passing tests based on the Messenger apps
#
class MessengerTests(AppTests):
_timeout = 60
def _ssl_check(self):
if not isSSLPresent():
raise Skipped("No SSL libraries found.")
if os.name == "nt":
raise Skipped("Windows SChannel lacks anonymous cipher support.")
def __init__(self, *args):
AppTests.__init__(self, *args)
def _do_oneway_test(self, receiver, sender, domain="amqp"):
""" Send N messages to a receiver.
Parameters:
iterations - repeat the senders this many times
target_count = # of targets to send to.
send_count = # messages sent to each target
"""
iterations = self.iterations
send_count = self.send_count
target_count = self.target_count
send_total = send_count * target_count
receive_total = send_total * iterations
port = free_tcp_ports()[0]
receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
receiver.receive_count = receive_total
receiver.timeout = MessengerTests._timeout
self.receivers.append(receiver)
sender.targets = ["%s://0.0.0.0:%s/X%d" % (domain, port, j) for j in range(target_count)]
sender.send_count = send_total
sender.timeout = MessengerTests._timeout
self.senders.append(sender)
self._do_test(iterations)
def _do_echo_test(self, receiver, sender, domain="amqp"):
""" Send N messages to a receiver, which responds to each.
Parameters:
iterations - repeat the senders this many times
target_count - # targets to send to
send_count = # messages sent to each target
send_batch - wait for replies after this many messages sent
"""
iterations = self.iterations
send_count = self.send_count
target_count = self.target_count
send_batch = self.send_batch
send_total = send_count * target_count
receive_total = send_total * iterations
port = free_tcp_ports()[0]
receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
receiver.receive_count = receive_total
receiver.send_reply = True
receiver.timeout = MessengerTests._timeout
self.receivers.append(receiver)
sender.targets = ["%s://0.0.0.0:%s/%dY" % (domain, port, j) for j in range(target_count)]
sender.send_count = send_total
sender.get_reply = True
sender.send_batch = send_batch
sender.timeout = MessengerTests._timeout
self.senders.append(sender)
self._do_test(iterations)
# Removed messenger "relay" tests. The test start-up is faulty:
# msgr-recv prints it's -X ready message when it starts to open a
# connection but it does not wait for the remote open. The relay
# tests depends on mapping a container name from an incoming
# connection. They can fail under if the sender starts before the
# connection is complete (esp. valgrind with SSL connections) We
# could fix the tests but since messenger is deprecated it does
# not seem worthwhile.
def _do_star_topology_test(self, r_factory, s_factory, domain="amqp"):
"""
A star-like topology, with a central receiver at the hub, and senders at
the spokes. Each sender will connect to each of the ports the receiver is
listening on. Each sender will then create N links per each connection.
Each sender will send X messages per link, waiting for a response.
Parameters:
iterations - repeat the senders this many times
port_count - # of ports the receiver will listen on. Each sender connects
to all ports.
sender_count - # of senders
target_count - # of targets per connection
send_count - # of messages sent to each target
send_batch - # of messages to send before waiting for response
"""
iterations = self.iterations
port_count = self.port_count
sender_count = self.sender_count
target_count = self.target_count
send_count = self.send_count
send_batch = self.send_batch
send_total = port_count * target_count * send_count
receive_total = send_total * sender_count * iterations
ports = free_tcp_ports(port_count)
receiver = r_factory()
receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port) for port in ports]
receiver.receive_count = receive_total
receiver.send_reply = True
receiver.timeout = MessengerTests._timeout
self.receivers.append(receiver)
for i in range(sender_count):
sender = s_factory()
sender.targets = ["%s://0.0.0.0:%s/%d" % (domain, port, j) for port in ports for j in range(target_count)]
sender.send_count = send_total
sender.send_batch = send_batch
sender.get_reply = True
sender.timeout = MessengerTests._timeout
self.senders.append(sender)
self._do_test(iterations)
def test_oneway_C(self):
self._do_oneway_test(MessengerReceiverC(), MessengerSenderC())
def test_oneway_C_SSL(self):
self._ssl_check()
self._do_oneway_test(MessengerReceiverC(), MessengerSenderC(), "amqps")
def test_echo_C(self):
self._do_echo_test(MessengerReceiverC(), MessengerSenderC())
def test_echo_C_SSL(self):
self._ssl_check()
self._do_echo_test(MessengerReceiverC(), MessengerSenderC(), "amqps")
def test_star_topology_C(self):
self._do_star_topology_test(MessengerReceiverC, MessengerSenderC)
def test_star_topology_C_SSL(self):
self._ssl_check()
self._do_star_topology_test(MessengerReceiverC, MessengerSenderC, "amqps")
def test_oneway_reactor(self):
self._do_oneway_test(ReactorReceiverC(), ReactorSenderC())