blob: 3466ef191d86970eb436b6fec8fef9710f096a4b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# @private
module Qpid::Proton
module Handler
# Adapter to convert raw proton events for the old {Handler::MessagingHandler}
# used by the Reactor.
class ReactorMessagingAdapter < Adapter
def initialize handler
super
@opts = (handler.options if handler.respond_to?(:options)) || {}
@opts[:prefetch] ||= 10
@opts[:peer_close_is_error] = false unless @opts.include? :peer_close_is_error
[:auto_accept, :auto_settle, :auto_open, :auto_close].each do |k|
@opts[k] = true unless @opts.include? k
end
end
alias dispatch forward
def delegate(method, event)
event.method = method # Update the event with the new method
event.dispatch(@handler) or dispatch(:on_unhandled, event)
end
def delegate_error(method, event)
event.method = method
unless event.dispatch(@handler) || dispatch(:on_error, event)
dispatch(:on_unhandled, event)
event.connection.close(event.context.condition) if @opts[:auto_close]
end
end
def on_container_start(container) delegate(:on_start, Event.new(nil, nil, container)); end
def on_container_stop(container) delegate(:on_stop, Event.new(nil, nil, container)); end
# Define repetative on_xxx_open/close methods for each endpoint type
def self.open_close(endpoint)
on_opening = :"on_#{endpoint}_opening"
on_opened = :"on_#{endpoint}_opened"
on_closing = :"on_#{endpoint}_closing"
on_closed = :"on_#{endpoint}_closed"
on_error = :"on_#{endpoint}_error"
Module.new do
define_method(:"on_#{endpoint}_local_open") do |event|
delegate(on_opened, event) if event.context.remote_open?
end
define_method(:"on_#{endpoint}_remote_open") do |event|
if event.context.local_open?
delegate(on_opened, event)
elsif event.context.local_uninit?
delegate(on_opening, event)
event.context.open if @opts[:auto_open]
end
end
define_method(:"on_#{endpoint}_local_close") do |event|
delegate(on_closed, event) if event.context.remote_closed?
end
define_method(:"on_#{endpoint}_remote_close") do |event|
if event.context.remote_condition
delegate_error(on_error, event)
elsif event.context.local_closed?
delegate(on_closed, event)
elsif @opts[:peer_close_is_error]
Condition.assign(event.context.__send__(:_remote_condition), "unexpected peer close")
delegate_error(on_error, event)
else
delegate(on_closing, event)
end
event.context.close if @opts[:auto_close]
end
end
end
# Generate and include open_close modules for each endpoint type
[:connection, :session, :link].each { |endpoint| include open_close(endpoint) }
def on_transport_error(event) delegate_error(:on_transport_error, event); end
def on_transport_closed(event) delegate(:on_transport_closed, event); end
# Add flow control for link opening events
def on_link_local_open(event) super; add_credit(event); end
def on_link_remote_open(event) super; add_credit(event); end
def on_delivery(event)
if event.link.receiver? # Incoming message
d = event.delivery
if d.aborted?
delegate(:on_aborted, event)
d.settle
elsif d.complete?
if d.link.local_closed? && @opts[:auto_accept]
d.release
else
begin
delegate(:on_message, event)
d.accept if @opts[:auto_accept] && !d.settled?
rescue Qpid::Proton::Reject
d.reject
rescue Qpid::Proton::Release
d.release(true)
end
end
end
delegate(:on_settled, event) if d.settled?
add_credit(event)
else # Outgoing message
t = event.tracker
if t.updated?
case t.state
when Qpid::Proton::Delivery::ACCEPTED then delegate(:on_accepted, event)
when Qpid::Proton::Delivery::REJECTED then delegate(:on_rejected, event)
when Qpid::Proton::Delivery::RELEASED then delegate(:on_released, event)
when Qpid::Proton::Delivery::MODIFIED then delegate(:on_modified, event)
end
delegate(:on_settled, event) if t.settled?
t.settle if @opts[:auto_settle]
end
end
end
def on_link_flow(event)
add_credit(event)
l = event.link
delegate(:on_sendable, event) if l.sender? && l.open? && l.credit > 0
end
def add_credit(event)
r = event.receiver
prefetch = @opts[:prefetch]
if r && r.open? && (r.drained == 0) && prefetch && (prefetch > r.credit)
r.flow(prefetch - r.credit)
end
end
end
end
end