blob: 171e88743d237913fd6e93f26c8365058565ec97 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Tools for tests. Only minitest is used.
require 'minitest/autorun'
require 'qpid_proton'
require 'socket'
begin
MiniTest::Test
rescue NameError # For older versions of MiniTest
MiniTest::Test = MiniTest::Unit::TestCase
end
class TestError < RuntimeError; end # Normal error
class TestException < Exception; end # Not caught by default rescue
def wait_port(port, timeout=5)
deadline = Time.now + timeout
begin # Wait for the port to be connectible
TCPSocket.open("", $port).close
rescue Errno::ECONNREFUSED
if Time.now > deadline then
raise TestError, "timed out waiting for port #{port}"
end
sleep(0.1)
retry
end
end
# Handler that records some common events that are checked by tests
class TestHandler < Qpid::Proton::MessagingHandler
attr_reader :errors, :connections, :sessions, :links, :messages
# Pass optional extra handlers and options to the Container
# @param raise_errors if true raise an exception for error events, if false, store them in #errors
def initialize(raise_errors=true)
super()
@raise_errors = raise_errors
@errors, @connections, @sessions, @links, @messages = 5.times.collect { [] }
end
# If the handler has errors, raise a TestError with all the error text
def raise_errors()
return if @errors.empty?
text = ""
while @errors.size > 0
text << @errors.pop + "\n"
end
raise TestError.new("TestHandler has errors:\n #{text}")
end
def on_error(condition)
@errors.push "#{condition}"
raise_errors if @raise_errors
end
def endpoint_open(queue, endpoint)
queue.push(endpoint)
end
def on_connection_open(c)
endpoint_open(@connections, c)
end
def on_session_open(s)
endpoint_open(@sessions, s)
end
def on_receiver_open(l)
endpoint_open(@links, l)
end
def on_sender_open(l)
endpoint_open(@links, l)
end
def on_message(d, m)
@messages.push(m)
end
end
# ListenHandler that closes the Listener after first (or n) accepts
class ListenOnceHandler < Qpid::Proton::Listener::Handler
def initialize(opts, n=1) super(opts); @n=n; end
def on_error(l, e) raise e; end
def on_accept(l) l.close if (@n -= 1).zero?; super; end
end
# Add port/url to Listener, assuming a TCP socket
class Qpid::Proton::Listener
def url() "amqp://:#{port}"; end
end
# A client/server pair of ConnectionDrivers linked by a socket pair
DriverPair = Struct.new(:client, :server) do
def initialize(client_handler, server_handler)
s = Socket.pair(:LOCAL, :STREAM, 0)
self.client = HandlerDriver.new(s[0], client_handler)
self.server = HandlerDriver.new(s[1], server_handler)
server.transport.set_server
end
# Process each driver once, return time of next timed event
def process(now = Time.now, max_time=nil)
t = collect { |d| d.process(now) }.compact.min
t = max_time if max_time && t > max_time
t
end
def active()
can_read = self.select { |d| d.can_read? }
can_write = self.select {|d| d.can_write? }
IO.select(can_read, can_write, [], 0)
end
def names() collect { |x| x.handler.names }; end
def clear() each { |x| x.handler.clear; } end
# Run till there is nothing else to do - not handle waiting for timed events
# but does pass +now+ to process and returns the min returned timed event time
def run(now=Time.now)
t = nil
begin
t = process(now, t)
end while active
t
end
end
# Container that listens on a random port
class ServerContainer < Qpid::Proton::Container
include Qpid::Proton
def initialize(id=nil, listener_opts=nil, n=1, handler=nil)
super handler, id
@listener = listen_io(TCPServer.open(0), ListenOnceHandler.new(listener_opts, n))
end
attr_reader :listener
def port() @listener.port; end
def url() "amqp://:#{port}"; end
end
class ServerContainerThread < ServerContainer
def initialize(*args)
super
@thread = Thread.new { run }
end
attr_reader :thread
def join() @thread.join; end
end