blob: ab7234bddeb6cc1a4adeb06537661e0b8c63a71d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# @private
module Qpid::Proton
module Handler
# Adapt raw proton events to {MessagingHandler} events.
class MessagingAdapter < Adapter
def delegate(method, *args)
forward(method, *args) or forward(:on_unhandled, method, *args)
end
def delegate_error(method, context)
unless forward(method, context) || forward(:on_error, context.condition)
forward(:on_unhandled, method, context)
# Close the whole connection on an un-handled error
context.connection.close(context.condition)
end
end
def on_container_start(container) delegate(:on_container_start, container); end
def on_container_stop(container) delegate(:on_container_stop, container); end
# Define repetative on_xxx_open/close methods for session and connection
def self.open_close(endpoint)
Module.new do
define_method(:"on_#{endpoint}_remote_open") do |event|
begin
delegate(:"on_#{endpoint}_open", event.context)
event.context.open if event.context.local_uninit?
rescue StopAutoResponse
end
end
define_method(:"on_#{endpoint}_remote_close") do |event|
delegate_error(:"on_#{endpoint}_error", event.context) if event.context.condition
begin
delegate(:"on_#{endpoint}_close", event.context)
event.context.close if event.context.local_active?
rescue StopAutoResponse
end
end
end
end
# Generate and include open_close modules for each endpoint type
# Using modules so we can override to extend the behavior later in the handler.
[:connection, :session].each { |endpoint| include open_close(endpoint) }
# Link open/close is handled separately because links are split into
# sender and receiver on the messaging API
def on_link_remote_open(event)
if !event.link.local_active? # Copy remote terminus data to local
event.link.source.replace(event.link.remote_source);
event.link.target.replace(event.link.remote_target);
end
delegate(event.link.sender? ? :on_sender_open : :on_receiver_open, event.link)
event.link.open if event.link.local_uninit?
add_credit(event)
rescue StopAutoResponse
end
def on_link_remote_close(event)
s = event.link.sender?
delegate_error(s ? :on_sender_error : :on_receiver_error, event.link) if event.link.condition
delegate(s ? :on_sender_close : :on_receiver_close, event.link)
event.link.close if event.link.local_active?
rescue StopAutoResponse
end
def on_transport_error(event)
delegate_error(:on_transport_error, event.context)
end
def on_transport_closed(event)
delegate(:on_transport_close, event.context) rescue StopAutoResponse
end
# Add flow control for local link open
def on_link_local_open(event) add_credit(event); end
def on_delivery(event)
if event.link.receiver? # Incoming message
d = event.delivery
if d.aborted?
delegate(:on_delivery_abort, d)
elsif d.complete?
if d.link.local_closed? && d.receiver.auto_accept
d.release # Auto release after close
else
begin
delegate(:on_message, d, d.message)
d.accept if d.receiver.auto_accept && d.local_state == 0
rescue Reject
d.reject
rescue Release
d.release
end
end
end
delegate(:on_delivery_settle, d) if d.settled?
add_credit(event)
else # Outgoing message
t = event.tracker
case t.state
when Delivery::ACCEPTED then delegate(:on_tracker_accept, t)
when Delivery::REJECTED then delegate(:on_tracker_reject, t)
when Delivery::RELEASED then delegate(:on_tracker_release, t)
when Delivery::MODIFIED then delegate(:on_tracker_modify, t)
end
delegate(:on_tracker_settle, t) if t.settled?
t.settle if t.sender.auto_settle
end
end
def on_link_flow(event)
add_credit(event)
sender = event.sender
delegate(:on_sendable, sender) if sender && sender.open? && sender.credit > 0
end
def add_credit(event)
return unless (r = event.receiver)
if r.open? && (r.drained == 0) && r.credit_window && (r.credit_window > r.credit)
r.flow(r.credit_window - r.credit)
end
end
end
end
end