blob: 13e1fd57098c2c49737be849140fced60c3363c0 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
require 'test_tools'
require 'minitest/unit'
require 'socket'
# Since ruby 2.5 the default is true, turn it off since we have tests that deliberately
# leak exceptions from threads to very they are caught properly from Container#run()
(Thread.report_on_exception = false) rescue nil
# MessagingHandler that raises in on_error to catch unexpected errors
class ExceptionMessagingHandler
def on_error(e) raise e; end
end
class ContainerTest < MiniTest::Test
include Qpid::Proton
def test_simple()
send_handler = Class.new(ExceptionMessagingHandler) do
attr_reader :accepted, :sent
def initialize() @sent, @accepted = nil; end
def on_sendable(sender)
unless @sent
m = Message.new("hello")
m[:foo] = :bar
sender.send m
end
@sent = true
end
def on_tracker_accept(tracker)
@accepted = true
tracker.connection.close
end
end.new
receive_handler = Class.new(ExceptionMessagingHandler) do
attr_reader :message, :link
def on_receiver_open(link)
@link = link
@link.open
@link.flow(1)
end
def on_message(delivery, message)
@message = message;
delivery.update Disposition::ACCEPTED
delivery.settle
end
end.new
c = ServerContainer.new(__method__, {:handler => receive_handler})
c.connect(c.url, {:handler => send_handler}).open_sender({:name => "testlink"})
c.run
assert send_handler.accepted
assert_equal "testlink", receive_handler.link.name
assert_equal "hello", receive_handler.message.body
assert_equal :bar, receive_handler.message[:foo]
assert_equal "test_simple", receive_handler.link.connection.container_id
end
class CloseOnOpenHandler < TestHandler
def on_connection_open(c) super; c.close; end
end
def test_auto_stop_one
# A listener and a connection
start_stop_handler = Class.new do
def on_container_start(c) @start = c; end
def on_container_stop(c) @stop = c; end
attr_reader :start, :stop
end.new
c = Container.new(start_stop_handler, __method__)
threads = 3.times.collect { Thread.new { c.run } }
sleep(0.01) while c.running < 3
assert_equal c, start_stop_handler.start
l = c.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
c.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} )
threads.each { |t| assert t.join(1) }
assert_equal c, start_stop_handler.stop
assert_raises(Container::StoppedError) { c.run }
end
def test_auto_stop_two
# Connect between different containers
c1, c2 = Container.new("#{__method__}-1"), Container.new("#{__method__}-2")
threads = [ Thread.new {c1.run }, Thread.new {c2.run } ]
l = c2.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
c1.connect(l.url, { :handler => CloseOnOpenHandler.new} )
assert threads.each { |t| t.join(1) }
assert_raises(Container::StoppedError) { c1.run }
assert_raises(Container::StoppedError) { c2.connect("") }
end
def test_auto_stop_listener_only
c = Container.new(__method__)
# Listener only, external close
t = Thread.new { c.run }
l = c.listen_io(TCPServer.new(0))
l.close
assert t.join(1)
end
def test_stop_empty
c = Container.new(__method__)
threads = 3.times.collect { Thread.new { c.run } }
sleep(0.01) while c.running < 3
assert_nil threads[0].join(0.001) # Not stopped
c.stop
assert c.stopped
assert_raises(Container::StoppedError) { c.connect("") }
assert_raises(Container::StoppedError) { c.run }
threads.each { |t| assert t.join(1) }
end
def test_stop
c = Container.new(__method__)
c.auto_stop = false
l = c.listen_io(TCPServer.new(0))
threads = 3.times.collect { Thread.new { c.run } }
sleep(0.01) while c.running < 3
l.close
assert_nil threads[0].join(0.001) # Not stopped, no auto_stop
l = c.listen_io(TCPServer.new(0)) # New listener
conn = c.connect("amqp://:#{l.to_io.addr[1]}")
c.stop
assert c.stopped
threads.each { |t| assert t.join(1) }
assert_raises(Container::StoppedError) { c.run }
assert_equal 0, c.running
assert_nil l.condition
assert_nil conn.condition
end
def test_bad_host
cont = Container.new(__method__)
assert_raises (SocketError) { cont.listen("badlisten.example.com:999") }
assert_raises (SocketError) { cont.connect("badconnect.example.com:999") }
end
# Verify that connection options are sent to the peer
def test_connection_options
# Note: user, password and sasl_xxx options are tested by ContainerSASLTest below
server_handler = Class.new(ExceptionMessagingHandler) do
def on_connection_open(c)
@connection = c
c.open({
:virtual_host => "server.to.client",
:properties => { :server => :client },
:offered_capabilities => [ :s1 ],
:desired_capabilities => [ :s2 ],
:container_id => "box",
})
c.close
end
attr_reader :connection
end.new
# Transport options set by listener, by Connection#open it is too late
cont = ServerContainer.new(__method__, {
:handler => server_handler,
:idle_timeout => 88,
:max_sessions =>1000,
:max_frame_size => 8888,
})
client = cont.connect(cont.url,
{:virtual_host => "client.to.server",
:properties => { "foo" => :bar, "str" => "str" },
:offered_capabilities => [:c1 ],
:desired_capabilities => [:c2 ],
:idle_timeout => 42,
:max_sessions =>100,
:max_frame_size => 4096,
:container_id => "bowl"
})
cont.run
c = server_handler.connection
assert_equal "client.to.server", c.virtual_host
assert_equal({ "foo" => :bar, "str" => "str" }, c.properties)
assert_equal([:c1], c.offered_capabilities)
assert_equal([:c2], c.desired_capabilities)
assert_equal 21, c.idle_timeout # Proton divides by 2
assert_equal 100, c.max_sessions
assert_equal 4096, c.max_frame_size
assert_equal "bowl", c.container_id
c = client
assert_equal "server.to.client", c.virtual_host
assert_equal({ :server => :client }, c.properties)
assert_equal([:s1], c.offered_capabilities)
assert_equal([:s2], c.desired_capabilities)
assert_equal "box", c.container_id
assert_equal 8888, c.max_frame_size
assert_equal 44, c.idle_timeout # Proton divides by 2
assert_equal 100, c.max_sessions
end
def test_link_options
server_handler = Class.new(ExceptionMessagingHandler) do
def initialize() @links = []; end
attr_reader :links
def on_sender_open(l) @links << l; end
def on_receiver_open(l) @links << l; end
end.new
client_handler = Class.new(ExceptionMessagingHandler) do
def on_connection_open(c)
@links = [];
@links << c.open_sender("s1")
@links << c.open_sender({:name => "s2-n", :target => "s2-t", :source => "s2-s"})
@links << c.open_receiver("r1")
@links << c.open_receiver({:name => "r2-n", :target => "r2-t", :source => "r2-s"})
c.close
end
attr_reader :links
end.new
cont = ServerContainer.new(__method__, {:handler => server_handler }, 1)
cont.connect(cont.url, :handler => client_handler)
cont.run
expect = ["test_link_options/1", "s2-n", "test_link_options/2", "r2-n"]
assert_equal expect, server_handler.links.map(&:name)
assert_equal expect, client_handler.links.map(&:name)
expect = [[nil,"s1"], ["s2-s","s2-t"], ["r1",nil], ["r2-s","r2-t"]]
assert_equal expect, server_handler.links.map { |l| [l.remote_source.address, l.remote_target.address] }
assert_equal expect, client_handler.links.map { |l| [l.source.address, l.target.address] }
end
def extract_terminus_options(t)
opts = Hash[[:address, :distribution_mode, :durability_mode, :timeout, :expiry_policy].map { |m| [m, t.send(m)] }]
opts[:filter] = t.filter.map
opts[:capabilities] = t.capabilities.map
opts[:dynamic] = t.dynamic?
opts
end
def test_terminus_options
opts = {
:distribution_mode => Terminus::DIST_MODE_COPY,
:durability_mode => Terminus::DELIVERIES,
:timeout => 5,
:expiry_policy => Terminus::EXPIRE_WITH_LINK,
:filter => { :try => 'me' },
:capabilities => { :cap => 'len' },
}
src_opts = { :address => "src", :dynamic => true }.update(opts)
tgt_opts = { :address => "tgt", :dynamic => false }.update(opts)
cont = ServerContainer.new(__method__, {}, 1)
c = cont.connect(cont.url)
s = c.open_sender({:target => tgt_opts, :source => src_opts })
assert_equal src_opts, extract_terminus_options(s.source)
assert_equal tgt_opts, extract_terminus_options(s.target)
assert s.source.dynamic?
assert !s.target.dynamic?
end
# Test for time out on connecting to an unresponsive server
def test_idle_timeout_server_no_open
s = TCPServer.new(0)
cont = Container.new(__method__)
cont.connect(":#{s.addr[1]}", {:idle_timeout => 0.1, :handler => ExceptionMessagingHandler.new })
ex = assert_raises(Qpid::Proton::Condition) { cont.run }
assert_match(/resource-limit-exceeded/, ex.to_s)
ensure
s.close if s
end
# Test for time out on unresponsive client
def test_idle_timeout_client
server = ServerContainerThread.new("#{__method__}.server", {:idle_timeout => 0.1})
client_handler = Class.new(ExceptionMessagingHandler) do
def initialize() @ready, @block = Queue.new, Queue.new; end
attr_reader :ready, :block
def on_connection_open(c)
@ready.push nil # Tell the main thread we are now open
@block.pop # Block the client so the server will time it out
end
end.new
client = Container.new(nil, "#{__method__}.client")
client.connect(server.url, {:handler => client_handler})
client_thread = Thread.new { client.run }
client_handler.ready.pop # Wait till the client has connected
server.join # Exits when the connection closes from idle-timeout
client_handler.block.push nil # Unblock the client
ex = assert_raises(Qpid::Proton::Condition) { client_thread.join }
assert_match(/resource-limit-exceeded/, ex.to_s)
end
# Make sure we stop and clean up if an aborted connection causes a handler to raise.
# https://issues.apache.org/jira/browse/PROTON-1791
def test_handler_raise
cont = ServerContainer.new(__method__, {}, 0) # Don't auto-close the listener
client_handler = Class.new(MessagingHandler) do
# TestException is < Exception so not handled by default rescue clause
def on_connection_open(c) raise TestException.new("Bad Dog"); end
end.new
threads = 3.times.collect { Thread.new { cont.run } }
sleep 0.01 while cont.running < 3 # Wait for all threads to be running
sockets = 2.times.collect { TCPSocket.new("", cont.port) }
cont.connect_io(sockets[1]) # No exception
cont.connect_io(sockets[0], {:handler => client_handler}) # Should stop container
threads.each { |t| assert_equal("Bad Dog", assert_raises(TestException) {t.join}.message) }
sockets.each { |s| assert s.closed? }
assert cont.listener.to_io.closed?
assert_raises(Container::StoppedError) { cont.run }
assert_raises(Container::StoppedError) { cont.listen "" }
end
# Check if two time values are "close enough" to be reasonable.
def assert_equalish(x, y, delta=0.1)
assert_in_delta(x, y, delta)
end
# Test container doesn't stops only when schedule work is done
def test_container_work_queue
c = Container.new __method__
delays = [0.1, 0.03, 0.02]
a = []
delays.each { |d| c.schedule(d) { a << [d, Time.now] } }
start = Time.now
c.run
delays.sort.each do |d|
x = a.shift
assert_equal d, x[0]
end
assert_equalish delays.reduce(:+), Time.now-start
end
# Test container work queue finishes due tasks on external stop, drops future tasks
def test_container_work_queue_stop
q = Queue.new
c = Container.new __method__
thread = Thread.new { c.run }
time = Time.now + 0.01
# Mix good scheduled tasks at time and bad tasks scheduled after 10 secs
10.times do
c.schedule(time) { q << true }
c.schedule(10) { q << false }
end
assert_same true, q.pop # First task processed, all others at same time are due
# Mix in some immediate tasks
5.times do
c.work_queue.add { q << true } # Immediate
c.schedule(time) { q << true }
c.schedule(10) { q << false }
end
c.stop
thread.join
19.times { assert_same true, q.pop }
assert_equal 0, q.size # Tasks with 10 sec delay should be dropped
end
# Chain schedule calls from other schedule calls
def test_container_schedule_chain
c = Container.new(__method__)
delays = [0.05, 0.02, 0.04]
i = delays.each
a = []
p = Proc.new { c.schedule(i.next) { a << Time.now; p.call } rescue nil }
p.call # Schedule the first, which schedules the second etc.
start = Time.now
c.run
assert_equal 3, a.size
assert_equalish delays.reduce(:+), Time.now-start
end
# Schedule calls from handlers
def test_container_schedule_handler
h = Class.new() do
def initialize() @got = []; end
attr_reader :got
def record(m) @got << m; end
def on_container_start(c) c.schedule(0) {record __method__}; end
def on_connection_open(c) c.close; c.container.schedule(0) {record __method__}; end
def on_connection_close(c) c.container.schedule(0) {record __method__}; end
end.new
t = ServerContainerThread.new(__method__, nil, 1, h)
t.connect(t.url)
t.join
assert_equal [:on_container_start, :on_connection_open, :on_connection_open, :on_connection_close, :on_connection_close], h.got
end
# Raising from container handler should stop container
def test_container_handler_raise
h = Class.new() do
def on_container_start(c) raise "BROKEN"; end
end.new
c = Container.new(h, __method__)
assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s)
end
# Raising from connection handler should stop container
def test_connection_handler_raise
h = Class.new() do
def on_connection_open(c) raise "BROKEN"; end
end.new
c = ServerContainer.new(__method__, nil, 1, h)
c.connect(c.url)
assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s)
end
# Raising from container schedule should stop container
def test_container_schedule_raise
c = Container.new(__method__)
c.schedule(0) { raise "BROKEN" }
assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s)
end
def test_connection_work_queue
cont = ServerContainer.new(__method__, {}, 1)
c = cont.connect(cont.url)
t = Thread.new { cont.run }
q = Queue.new
start = Time.now
c.work_queue.schedule(0.02) { q << [3, Thread.current] }
c.work_queue.add { q << [1, Thread.current] }
c.work_queue.schedule(0.04) { q << [4, Thread.current] }
c.work_queue.add { q << [2, Thread.current] }
assert_equal [1, t], q.pop
assert_equal [2, t], q.pop
assert_equalish 0.0, Time.now-start
assert_equal [3, t], q.pop
assert_equal [4, t], q.pop
assert_equalish 0.02 + 0.04, Time.now-start
c.work_queue.add { c.close }
t.join
assert_raises(WorkQueue::StoppedError) { c.work_queue.add { } }
end
# Raising from connection schedule should stop container
def test_connection_work_queue_raise
c = ServerContainer.new(__method__)
c.connect(c.url)
c.work_queue.add { raise "BROKEN" }
assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s)
end
end