| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| |
| require 'minitest/autorun' |
| require 'qpid_proton' |
| require 'test_tools' |
| include Qpid::Proton |
| |
| # Records every call, never provokes "on_unhandled" |
| class RecordingHandler < Qpid::Proton::MessagingHandler |
| |
| def initialize(*args) super(*args); @calls = []; end |
| |
| attr_accessor :calls |
| |
| def names() @calls.collect { |c| c[0] }; end |
| |
| def clear() @calls.clear; end |
| |
| def method_missing(name, *args) |
| respond_to_missing?(name) ? (@calls << [name, *args]) : super; |
| end |
| def respond_to_missing?(name, private=false); (/^on_/ =~ name); end |
| def respond_to?(name, all=false) super || respond_to_missing?(name); end # For ruby < 1.9.2 |
| end |
| |
| class NoAutoOpenClose < RecordingHandler |
| def initialize() super; @endpoints = []; end |
| def on_connection_open(x) @connection = x; super; raise StopAutoResponse; end |
| def on_session_open(x) @session = x; super; raise StopAutoResponse; end |
| def on_sender_open(x) @link = x; super; raise StopAutoResponse; end |
| def on_receiver_open(x) @link = x; super; raise StopAutoResponse; end |
| def on_connection_close(x) super; raise StopAutoResponse; end |
| def on_session_close(x) super; raise StopAutoResponse; end |
| def on_sender_close(x) super; raise StopAutoResponse; end |
| def on_receiver_close(x) super; raise StopAutoResponse; end |
| attr_reader :connection, :session, :link |
| end |
| |
| class TestMessagingHandler < MiniTest::Test |
| |
| def test_auto_open_close |
| d = DriverPair.new(RecordingHandler.new, RecordingHandler.new) |
| d.client.connection.open; d.client.connection.open_sender; d.run |
| assert_equal [:on_connection_open, :on_session_open, :on_sender_open, :on_sendable], d.client.handler.names |
| assert_equal [:on_connection_open, :on_session_open, :on_receiver_open], d.server.handler.names |
| d.clear |
| d.client.connection.close; d.run |
| assert_equal [:on_connection_close, :on_transport_close], d.server.handler.names |
| assert_equal [:on_connection_close, :on_transport_close], d.client.handler.names |
| end |
| |
| def test_no_auto_open_close |
| d = DriverPair.new(NoAutoOpenClose.new, NoAutoOpenClose.new) |
| d.client.connection.open; d.run |
| assert_equal [:on_connection_open], d.server.handler.names |
| assert_equal [], d.client.handler.names |
| d.server.connection.open; d.run |
| assert_equal [:on_connection_open], d.client.handler.names |
| assert_equal [:on_connection_open], d.server.handler.names |
| d.clear |
| d.client.connection.open_session; d.run |
| assert_equal [:on_session_open], d.server.handler.names |
| assert_equal [], d.client.handler.names |
| d.clear |
| d.client.connection.close; |
| 3.times { d.process } |
| assert_equal [:on_connection_close], d.server.handler.names |
| assert_equal [], d.client.handler.names |
| d.server.connection.close; d.run |
| assert_equal [:on_connection_close, :on_transport_close], d.client.handler.names |
| assert_equal [:on_connection_close, :on_transport_close], d.server.handler.names |
| end |
| |
| def test_transport_error |
| d = DriverPair.new(RecordingHandler.new, RecordingHandler.new) |
| d.client.connection.open; d.run |
| d.clear |
| d.client.close "stop that"; d.run |
| assert_equal [:on_transport_close], d.client.handler.names |
| assert_equal [:on_transport_error, :on_transport_close], d.server.handler.names |
| assert_equal Condition.new("proton:io", "stop that (connection aborted)"), d.client.transport.condition |
| assert_equal Condition.new("amqp:connection:framing-error", "connection aborted"), d.server.transport.condition |
| end |
| |
| # Close on half-open |
| def test_connection_error |
| d = DriverPair.new(NoAutoOpenClose.new, NoAutoOpenClose.new) |
| d.client.connection.open; d.run |
| d.server.connection.close "bad dog"; d.run |
| d.client.connection.close; d.run |
| assert_equal [:on_connection_open, :on_connection_error, :on_connection_close, :on_transport_close], d.client.handler.names |
| assert_equal "bad dog", d.client.handler.calls[1][1].condition.description |
| assert_equal [:on_connection_open, :on_connection_error, :on_connection_close, :on_transport_close], d.server.handler.names |
| end |
| |
| def test_session_error |
| d = DriverPair.new(RecordingHandler.new, RecordingHandler.new) |
| d.client.connection.open |
| s = d.client.connection.default_session; s.open; d.run |
| assert_equal [:on_connection_open, :on_session_open], d.client.handler.names |
| assert_equal [:on_connection_open, :on_session_open], d.server.handler.names |
| d.clear |
| s.close "bad dog"; d.run |
| assert_equal [:on_session_error, :on_session_close], d.client.handler.names |
| assert_equal [:on_session_error, :on_session_close], d.server.handler.names |
| assert_equal "bad dog", d.server.handler.calls[0][1].condition.description |
| end |
| |
| def test_sender_receiver_error |
| d = DriverPair.new(RecordingHandler.new, RecordingHandler.new) |
| d.client.connection.open |
| s = d.client.connection.open_sender; d.run |
| assert_equal [:on_connection_open, :on_session_open, :on_sender_open, :on_sendable], d.client.handler.names |
| assert_equal [:on_connection_open, :on_session_open, :on_receiver_open], d.server.handler.names |
| d.clear |
| s.close "bad dog"; d.run |
| assert_equal [:on_sender_error, :on_sender_close], d.client.handler.names |
| assert_equal [:on_receiver_error, :on_receiver_close], d.server.handler.names |
| assert_equal "bad dog", d.server.handler.calls[0][1].condition.description |
| end |
| |
| def test_options_off |
| linkopts = {:credit_window=>0, :auto_settle=>false, :auto_accept=>false} |
| d = DriverPair.new(NoAutoOpenClose.new, NoAutoOpenClose.new) |
| d.client.connection.open; d.run |
| assert_equal [[], [:on_connection_open]], d.names |
| d.server.connection.open; d.run |
| assert_equal [[:on_connection_open], [:on_connection_open]], d.names |
| d.clear |
| s = d.client.connection.open_sender(linkopts); d.run |
| assert_equal [[], [:on_session_open, :on_receiver_open]], d.names |
| d.server.handler.session.open # Return session open |
| d.server.handler.link.open(linkopts) # Return link open |
| d.run |
| assert_equal [[:on_session_open, :on_sender_open], [:on_session_open, :on_receiver_open]], d.names |
| d.clear |
| d.server.handler.link.flow(1); d.run |
| assert_equal [[:on_sendable], []], d.names |
| assert_equal 1, s.credit |
| d.clear |
| s.send Message.new("foo"); d.run |
| assert_equal [[], [:on_message]], d.names |
| end |
| |
| def test_message |
| handler_class = Class.new(MessagingHandler) do |
| def on_message(delivery, message) @message = message; end |
| def on_tracker_accept(event) @accepted = true; end |
| attr_accessor :message, :accepted |
| end |
| d = DriverPair.new(handler_class.new, handler_class.new) |
| d.client.connection.open; |
| s = d.client.connection.open_sender; d.run |
| assert_equal 10, s.credit # Default prefetch |
| s.send(Message.new("foo")); d.run |
| assert_equal "foo", d.server.handler.message.body |
| assert d.client.handler.accepted |
| end |
| |
| # Verify on_unhandled is called |
| def test_unhandled |
| handler_class = Class.new(MessagingHandler) do |
| def initialize() super; @unhandled = []; end |
| def on_unhandled(method, *args) @unhandled << method; end |
| attr_accessor :unhandled |
| end |
| d = DriverPair.new(handler_class.new, handler_class.new) |
| d.client.connection.open; d.run |
| assert_equal [:on_connection_open], d.client.handler.unhandled |
| assert_equal [:on_connection_open], d.server.handler.unhandled |
| end |
| |
| # Verify on_error is called |
| def test_on_error |
| handler_class = Class.new(MessagingHandler) do |
| def initialize() super; @error = []; @unhandled = []; end |
| def on_error(condition) @error << condition; end |
| def on_unhandled(method, *args) @unhandled << method; end |
| attr_accessor :error, :unhandled |
| end |
| d = DriverPair.new(handler_class.new, handler_class.new) |
| d.client.connection.open |
| r = d.client.connection.open_receiver; d.run |
| assert_equal [:on_connection_open, :on_session_open, :on_receiver_open], d.client.handler.unhandled |
| assert_equal [:on_connection_open, :on_session_open, :on_sender_open, :on_sendable], d.server.handler.unhandled |
| r.close Condition.new("goof", "oops"); d.run |
| |
| assert_equal [Condition.new("goof", "oops")], d.client.handler.error |
| assert_equal [:on_connection_open, :on_session_open, :on_sender_open, :on_sendable, :on_sender_close], d.server.handler.unhandled |
| assert_equal [Condition.new("goof", "oops")], d.server.handler.error |
| |
| end |
| |
| # Verify on_unhandled is called for errors if there is no on_error |
| def test_unhandled_error |
| handler_class = Class.new(MessagingHandler) do |
| def on_unhandled(method, *args) |
| @error = args[0].condition if method == :on_connection_error; |
| end |
| attr_accessor :error |
| end |
| d = DriverPair.new(handler_class.new, handler_class.new) |
| d.client.connection.open; d.run |
| d.client.connection.close "oops"; d.run |
| assert_equal [Condition.new("error", "oops")]*2, d.collect { |x| x.handler.error } |
| end |
| end |