| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| |
| require 'test_tools' |
| require 'minitest/unit' |
| require 'socket' |
| |
| # Since ruby 2.5 the default is true, turn it off since we have tests that deliberately |
| # leak exceptions from threads to very they are caught properly from Container#run() |
| (Thread.report_on_exception = false) rescue nil |
| |
| # MessagingHandler that raises in on_error to catch unexpected errors |
| class ExceptionMessagingHandler |
| def on_error(e) raise e; end |
| end |
| |
| class ContainerTest < MiniTest::Test |
| include Qpid::Proton |
| |
| def test_simple() |
| send_handler = Class.new(ExceptionMessagingHandler) do |
| attr_reader :accepted, :sent |
| |
| def initialize() @sent, @accepted = nil; end |
| |
| def on_sendable(sender) |
| unless @sent |
| m = Message.new("hello") |
| m[:foo] = :bar |
| sender.send m |
| end |
| @sent = true |
| end |
| |
| def on_tracker_accept(tracker) |
| @accepted = true |
| tracker.connection.close |
| end |
| end.new |
| |
| receive_handler = Class.new(ExceptionMessagingHandler) do |
| attr_reader :message, :link |
| def on_receiver_open(link) |
| @link = link |
| @link.open |
| @link.flow(1) |
| end |
| |
| def on_message(delivery, message) |
| @message = message; |
| delivery.update Disposition::ACCEPTED |
| delivery.settle |
| end |
| end.new |
| |
| c = ServerContainer.new(__method__, {:handler => receive_handler}) |
| c.connect(c.url, {:handler => send_handler}).open_sender({:name => "testlink"}) |
| c.run |
| |
| assert send_handler.accepted |
| assert_equal "testlink", receive_handler.link.name |
| assert_equal "hello", receive_handler.message.body |
| assert_equal :bar, receive_handler.message[:foo] |
| assert_equal "test_simple", receive_handler.link.connection.container_id |
| end |
| |
| class CloseOnOpenHandler < TestHandler |
| def on_connection_open(c) super; c.close; end |
| end |
| |
| def test_auto_stop_one |
| # A listener and a connection |
| start_stop_handler = Class.new do |
| def on_container_start(c) @start = c; end |
| def on_container_stop(c) @stop = c; end |
| attr_reader :start, :stop |
| end.new |
| c = Container.new(start_stop_handler, __method__) |
| threads = 3.times.collect { Thread.new { c.run } } |
| sleep(0.01) while c.running < 3 |
| assert_equal c, start_stop_handler.start |
| l = c.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new})) |
| c.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} ) |
| threads.each { |t| assert t.join(1) } |
| assert_equal c, start_stop_handler.stop |
| assert_raises(Container::StoppedError) { c.run } |
| end |
| |
| def test_auto_stop_two |
| # Connect between different containers |
| c1, c2 = Container.new("#{__method__}-1"), Container.new("#{__method__}-2") |
| threads = [ Thread.new {c1.run }, Thread.new {c2.run } ] |
| l = c2.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new})) |
| c1.connect(l.url, { :handler => CloseOnOpenHandler.new} ) |
| assert threads.each { |t| t.join(1) } |
| assert_raises(Container::StoppedError) { c1.run } |
| assert_raises(Container::StoppedError) { c2.connect("") } |
| end |
| |
| def test_auto_stop_listener_only |
| c = Container.new(__method__) |
| # Listener only, external close |
| t = Thread.new { c.run } |
| l = c.listen_io(TCPServer.new(0)) |
| l.close |
| assert t.join(1) |
| end |
| |
| def test_stop_empty |
| c = Container.new(__method__) |
| threads = 3.times.collect { Thread.new { c.run } } |
| sleep(0.01) while c.running < 3 |
| assert_nil threads[0].join(0.001) # Not stopped |
| c.stop |
| assert c.stopped |
| assert_raises(Container::StoppedError) { c.connect("") } |
| assert_raises(Container::StoppedError) { c.run } |
| threads.each { |t| assert t.join(1) } |
| end |
| |
| def test_stop |
| c = Container.new(__method__) |
| c.auto_stop = false |
| |
| l = c.listen_io(TCPServer.new(0)) |
| threads = 3.times.collect { Thread.new { c.run } } |
| sleep(0.01) while c.running < 3 |
| l.close |
| assert_nil threads[0].join(0.001) # Not stopped, no auto_stop |
| |
| l = c.listen_io(TCPServer.new(0)) # New listener |
| conn = c.connect("amqp://:#{l.to_io.addr[1]}") |
| c.stop |
| assert c.stopped |
| threads.each { |t| assert t.join(1) } |
| |
| assert_raises(Container::StoppedError) { c.run } |
| assert_equal 0, c.running |
| assert_nil l.condition |
| assert_nil conn.condition |
| end |
| |
| def test_bad_host |
| cont = Container.new(__method__) |
| assert_raises (SocketError) { cont.listen("badlisten.example.com:999") } |
| assert_raises (SocketError) { cont.connect("badconnect.example.com:999") } |
| end |
| |
| # Verify that connection options are sent to the peer |
| def test_connection_options |
| # Note: user, password and sasl_xxx options are tested by ContainerSASLTest below |
| server_handler = Class.new(ExceptionMessagingHandler) do |
| def on_connection_open(c) |
| @connection = c |
| c.open({ |
| :virtual_host => "server.to.client", |
| :properties => { :server => :client }, |
| :offered_capabilities => [ :s1 ], |
| :desired_capabilities => [ :s2 ], |
| :container_id => "box", |
| }) |
| c.close |
| end |
| attr_reader :connection |
| end.new |
| # Transport options set by listener, by Connection#open it is too late |
| cont = ServerContainer.new(__method__, { |
| :handler => server_handler, |
| :idle_timeout => 88, |
| :max_sessions =>1000, |
| :max_frame_size => 8888, |
| }) |
| client = cont.connect(cont.url, |
| {:virtual_host => "client.to.server", |
| :properties => { "foo" => :bar, "str" => "str" }, |
| :offered_capabilities => [:c1 ], |
| :desired_capabilities => [:c2 ], |
| :idle_timeout => 42, |
| :max_sessions =>100, |
| :max_frame_size => 4096, |
| :container_id => "bowl" |
| }) |
| cont.run |
| |
| c = server_handler.connection |
| assert_equal "client.to.server", c.virtual_host |
| assert_equal({ "foo" => :bar, "str" => "str" }, c.properties) |
| assert_equal([:c1], c.offered_capabilities) |
| assert_equal([:c2], c.desired_capabilities) |
| assert_equal 21, c.idle_timeout # Proton divides by 2 |
| assert_equal 100, c.max_sessions |
| assert_equal 4096, c.max_frame_size |
| assert_equal "bowl", c.container_id |
| |
| c = client |
| assert_equal "server.to.client", c.virtual_host |
| assert_equal({ :server => :client }, c.properties) |
| assert_equal([:s1], c.offered_capabilities) |
| assert_equal([:s2], c.desired_capabilities) |
| assert_equal "box", c.container_id |
| assert_equal 8888, c.max_frame_size |
| assert_equal 44, c.idle_timeout # Proton divides by 2 |
| assert_equal 100, c.max_sessions |
| end |
| |
| def test_link_options |
| server_handler = Class.new(ExceptionMessagingHandler) do |
| def initialize() @links = []; end |
| attr_reader :links |
| def on_sender_open(l) @links << l; end |
| def on_receiver_open(l) @links << l; end |
| end.new |
| |
| client_handler = Class.new(ExceptionMessagingHandler) do |
| def on_connection_open(c) |
| @links = []; |
| @links << c.open_sender("s1") |
| @links << c.open_sender({:name => "s2-n", :target => "s2-t", :source => "s2-s"}) |
| @links << c.open_receiver("r1") |
| @links << c.open_receiver({:name => "r2-n", :target => "r2-t", :source => "r2-s"}) |
| c.close |
| end |
| attr_reader :links |
| end.new |
| |
| cont = ServerContainer.new(__method__, {:handler => server_handler }, 1) |
| cont.connect(cont.url, :handler => client_handler) |
| cont.run |
| |
| expect = ["test_link_options/1", "s2-n", "test_link_options/2", "r2-n"] |
| assert_equal expect, server_handler.links.map(&:name) |
| assert_equal expect, client_handler.links.map(&:name) |
| |
| expect = [[nil,"s1"], ["s2-s","s2-t"], ["r1",nil], ["r2-s","r2-t"]] |
| assert_equal expect, server_handler.links.map { |l| [l.remote_source.address, l.remote_target.address] } |
| assert_equal expect, client_handler.links.map { |l| [l.source.address, l.target.address] } |
| end |
| |
| def extract_terminus_options(t) |
| opts = Hash[[:address, :distribution_mode, :durability_mode, :timeout, :expiry_policy].map { |m| [m, t.send(m)] }] |
| opts[:filter] = t.filter.map |
| opts[:capabilities] = t.capabilities.map |
| opts[:dynamic] = t.dynamic? |
| opts |
| end |
| |
| def test_terminus_options |
| opts = { |
| :distribution_mode => Terminus::DIST_MODE_COPY, |
| :durability_mode => Terminus::DELIVERIES, |
| :timeout => 5, |
| :expiry_policy => Terminus::EXPIRE_WITH_LINK, |
| :filter => { :try => 'me' }, |
| :capabilities => { :cap => 'len' }, |
| } |
| src_opts = { :address => "src", :dynamic => true }.update(opts) |
| tgt_opts = { :address => "tgt", :dynamic => false }.update(opts) |
| |
| cont = ServerContainer.new(__method__, {}, 1) |
| c = cont.connect(cont.url) |
| s = c.open_sender({:target => tgt_opts, :source => src_opts }) |
| assert_equal src_opts, extract_terminus_options(s.source) |
| assert_equal tgt_opts, extract_terminus_options(s.target) |
| assert s.source.dynamic? |
| assert !s.target.dynamic? |
| end |
| |
| # Test for time out on connecting to an unresponsive server |
| def test_idle_timeout_server_no_open |
| s = TCPServer.new(0) |
| cont = Container.new(__method__) |
| cont.connect(":#{s.addr[1]}", {:idle_timeout => 0.1, :handler => ExceptionMessagingHandler.new }) |
| ex = assert_raises(Qpid::Proton::Condition) { cont.run } |
| assert_match(/resource-limit-exceeded/, ex.to_s) |
| ensure |
| s.close if s |
| end |
| |
| # Test for time out on unresponsive client |
| def test_idle_timeout_client |
| server = ServerContainerThread.new("#{__method__}.server", {:idle_timeout => 0.1}) |
| client_handler = Class.new(ExceptionMessagingHandler) do |
| def initialize() @ready, @block = Queue.new, Queue.new; end |
| attr_reader :ready, :block |
| def on_connection_open(c) |
| @ready.push nil # Tell the main thread we are now open |
| @block.pop # Block the client so the server will time it out |
| end |
| end.new |
| |
| client = Container.new(nil, "#{__method__}.client") |
| client.connect(server.url, {:handler => client_handler}) |
| client_thread = Thread.new { client.run } |
| client_handler.ready.pop # Wait till the client has connected |
| server.join # Exits when the connection closes from idle-timeout |
| client_handler.block.push nil # Unblock the client |
| ex = assert_raises(Qpid::Proton::Condition) { client_thread.join } |
| assert_match(/resource-limit-exceeded/, ex.to_s) |
| end |
| |
| # Make sure we stop and clean up if an aborted connection causes a handler to raise. |
| # https://issues.apache.org/jira/browse/PROTON-1791 |
| def test_handler_raise |
| cont = ServerContainer.new(__method__, {}, 0) # Don't auto-close the listener |
| client_handler = Class.new(MessagingHandler) do |
| # TestException is < Exception so not handled by default rescue clause |
| def on_connection_open(c) raise TestException.new("Bad Dog"); end |
| end.new |
| threads = 3.times.collect { Thread.new { cont.run } } |
| sleep 0.01 while cont.running < 3 # Wait for all threads to be running |
| sockets = 2.times.collect { TCPSocket.new("", cont.port) } |
| cont.connect_io(sockets[1]) # No exception |
| cont.connect_io(sockets[0], {:handler => client_handler}) # Should stop container |
| |
| threads.each { |t| assert_equal("Bad Dog", assert_raises(TestException) {t.join}.message) } |
| sockets.each { |s| assert s.closed? } |
| assert cont.listener.to_io.closed? |
| assert_raises(Container::StoppedError) { cont.run } |
| assert_raises(Container::StoppedError) { cont.listen "" } |
| end |
| |
| # Check if two time values are "close enough" to be reasonable. |
| def assert_equalish(x, y, delta=0.1) |
| assert_in_delta(x, y, delta) |
| end |
| |
| # Test container doesn't stops only when schedule work is done |
| def test_container_work_queue |
| c = Container.new __method__ |
| delays = [0.1, 0.03, 0.02] |
| a = [] |
| delays.each { |d| c.schedule(d) { a << [d, Time.now] } } |
| start = Time.now |
| c.run |
| delays.sort.each do |d| |
| x = a.shift |
| assert_equal d, x[0] |
| end |
| assert_equalish delays.reduce(:+), Time.now-start |
| end |
| |
| # Test container work queue finishes due tasks on external stop, drops future tasks |
| def test_container_work_queue_stop |
| q = Queue.new |
| c = Container.new __method__ |
| thread = Thread.new { c.run } |
| time = Time.now + 0.01 |
| # Mix good scheduled tasks at time and bad tasks scheduled after 10 secs |
| 10.times do |
| c.schedule(time) { q << true } |
| c.schedule(10) { q << false } |
| end |
| assert_same true, q.pop # First task processed, all others at same time are due |
| # Mix in some immediate tasks |
| 5.times do |
| c.work_queue.add { q << true } # Immediate |
| c.schedule(time) { q << true } |
| c.schedule(10) { q << false } |
| end |
| c.stop |
| thread.join |
| 19.times { assert_same true, q.pop } |
| assert_equal 0, q.size # Tasks with 10 sec delay should be dropped |
| end |
| |
| # Chain schedule calls from other schedule calls |
| def test_container_schedule_chain |
| c = Container.new(__method__) |
| delays = [0.05, 0.02, 0.04] |
| i = delays.each |
| a = [] |
| p = Proc.new { c.schedule(i.next) { a << Time.now; p.call } rescue nil } |
| p.call # Schedule the first, which schedules the second etc. |
| start = Time.now |
| c.run |
| assert_equal 3, a.size |
| assert_equalish delays.reduce(:+), Time.now-start |
| end |
| |
| # Schedule calls from handlers |
| def test_container_schedule_handler |
| h = Class.new() do |
| def initialize() @got = []; end |
| attr_reader :got |
| def record(m) @got << m; end |
| def on_container_start(c) c.schedule(0) {record __method__}; end |
| def on_connection_open(c) c.close; c.container.schedule(0) {record __method__}; end |
| def on_connection_close(c) c.container.schedule(0) {record __method__}; end |
| end.new |
| t = ServerContainerThread.new(__method__, nil, 1, h) |
| t.connect(t.url) |
| t.join |
| assert_equal [:on_container_start, :on_connection_open, :on_connection_open, :on_connection_close, :on_connection_close], h.got |
| end |
| |
| # Raising from container handler should stop container |
| def test_container_handler_raise |
| h = Class.new() do |
| def on_container_start(c) raise "BROKEN"; end |
| end.new |
| c = Container.new(h, __method__) |
| assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s) |
| end |
| |
| # Raising from connection handler should stop container |
| def test_connection_handler_raise |
| h = Class.new() do |
| def on_connection_open(c) raise "BROKEN"; end |
| end.new |
| c = ServerContainer.new(__method__, nil, 1, h) |
| c.connect(c.url) |
| assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s) |
| end |
| |
| # Raising from container schedule should stop container |
| def test_container_schedule_raise |
| c = Container.new(__method__) |
| c.schedule(0) { raise "BROKEN" } |
| assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s) |
| end |
| |
| def test_connection_work_queue |
| cont = ServerContainer.new(__method__, {}, 1) |
| c = cont.connect(cont.url) |
| t = Thread.new { cont.run } |
| q = Queue.new |
| |
| start = Time.now |
| c.work_queue.schedule(0.02) { q << [3, Thread.current] } |
| c.work_queue.add { q << [1, Thread.current] } |
| c.work_queue.schedule(0.04) { q << [4, Thread.current] } |
| c.work_queue.add { q << [2, Thread.current] } |
| |
| assert_equal [1, t], q.pop |
| assert_equal [2, t], q.pop |
| assert_equalish 0.0, Time.now-start |
| assert_equal [3, t], q.pop |
| assert_equal [4, t], q.pop |
| assert_equalish 0.02 + 0.04, Time.now-start |
| |
| c.work_queue.add { c.close } |
| t.join |
| assert_raises(WorkQueue::StoppedError) { c.work_queue.add { } } |
| end |
| |
| # Raising from connection schedule should stop container |
| def test_connection_work_queue_raise |
| c = ServerContainer.new(__method__) |
| c.connect(c.url) |
| c.work_queue.add { raise "BROKEN" } |
| assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s) |
| end |
| end |