blob: 11efa3a26d393bc26a23ef68685eeea93c365cd5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
require 'socket'
module Qpid::Proton
# Associate an AMQP {Connection} and {Transport} with an {IO}
#
# - {#read} reads AMQP binary data from the {IO} and generates events
# - {#tick} generates timing-related events
# - {#event} gets events to be dispatched to {Handler::MessagingHandler}s
# - {#write} writes AMQP binary data to the {IO}
#
# Thread safety: The {ConnectionDriver} is not thread safe but separate
# {ConnectionDriver} instances can be processed concurrently. The
# {Container} handles multiple connections concurrently in multiple threads.
#
class ConnectionDriver
# Create a {Connection} and {Transport} associated with +io+
# @param io [IO] An {IO} or {IO}-like object that responds
# to {IO#read_nonblock} and {IO#write_nonblock}
def initialize(io)
@impl = Cproton.pni_connection_driver or raise NoMemoryError
@io = io
@rbuf = "" # String for re-usable read buffer
end
# @return [Connection]
def connection()
@connection ||= Connection.wrap(Cproton.pni_connection_driver_connection(@impl))
end
# @return [Transport]
def transport()
@transport ||= Transport.wrap(Cproton.pni_connection_driver_transport(@impl))
end
# @return [IO] Allows ConnectionDriver to be passed directly to {IO#select}
def to_io() @io; end
# @return [Bool] True if the driver can read more data
def can_read?() Cproton.pni_connection_driver_read_size(@impl) > 0; end
# @return [Bool] True if the driver has data to write
def can_write?() Cproton.pni_connection_driver_write_size(@impl) > 0; end
# True if the ConnectionDriver has nothing left to do: both sides of the
# transport are closed and there are no events to dispatch.
def finished?() Cproton.pn_connection_driver_finished(@impl); end
# Get the next event to dispatch, nil if no events available
def event()
e = Cproton.pn_connection_driver_next_event(@impl)
Event.new(e) if e
end
# True if {#event} will return non-nil
def event?() Cproton.pn_connection_driver_has_event(@impl); end
# Iterator for all available events
def each_event()
while e = event
yield e
end
end
# Non-blocking read from {#io}, generate events for {#event}
# IO errors are returned as transport errors by {#event}, not raised
def read
size = Cproton.pni_connection_driver_read_size(@impl)
return if size <= 0
@io.read_nonblock(size, @rbuf) # Use the same string rbuf for reading each time
Cproton.pni_connection_driver_read_copy(@impl, @rbuf) unless @rbuf.empty?
rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
# Try again later.
rescue EOFError # EOF is not an error
close_read
rescue IOError, SystemCallError => e
close e
end
# Non-blocking write to {#io}
# IO errors are returned as transport errors by {#event}, not raised
def write
data = Cproton.pn_connection_driver_write_buffer(@impl)
return unless data && data.size > 0
n = @io.write_nonblock(data)
Cproton.pn_connection_driver_write_done(@impl, n) if n > 0
rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
# Try again later.
rescue IOError, SystemCallError => e
close e
end
# Handle time-related work, for example idle-timeout events.
# May generate events for {#event} and change {#can_read?}, {#can_write?}
#
# @param [Time] now the current time, defaults to {Time#now}.
#
# @return [Time] time of the next scheduled event, or nil if there are no
# scheduled events. If non-nil you must call {#tick} again no later than
# this time.
def tick(now=Time.now)
transport = Cproton.pni_connection_driver_transport(@impl)
ms = Cproton.pn_transport_tick(transport, (now.to_r * 1000).to_i)
@next_tick = ms.zero? ? nil : Time.at(ms.to_r / 1000);
unless @next_tick
idle = Cproton.pn_transport_get_idle_timeout(transport);
@next_tick = now + (idle.to_r / 1000) unless idle.zero?
end
@next_tick
end
# Time returned by the last call to {#tick}
attr_accessor :next_tick
# Disconnect the write side of the transport, *without* sending an AMQP
# close frame. To close politely, you should use {Connection#close}, the
# transport will close itself once the protocol close is complete.
#
def close_write error=nil
set_error error
Cproton.pn_connection_driver_write_close(@impl)
@io.close_write rescue nil # Allow double-close
end
# Is the read side of the driver closed?
def read_closed?() Cproton.pn_connection_driver_read_closed(@impl); end
# Is the write side of the driver closed?
def write_closed?() Cproton.pn_connection_driver_read_closed(@impl); end
# Disconnect the read side of the transport, without waiting for an AMQP
# close frame. See comments on {#close_write}
def close_read error=nil
set_error error
Cproton.pn_connection_driver_read_close(@impl)
@io.close_read rescue nil # Allow double-close
end
# Disconnect both sides of the transport sending/waiting for AMQP close
# frames. See comments on {#close_write}
def close error=nil
close_write error
close_read
end
private
def set_error err
transport.condition ||= Condition.convert(err, "proton:io") if err
end
end
# A {ConnectionDriver} that feeds raw proton events to a handler.
class HandlerDriver < ConnectionDriver
# Combine an {IO} with a handler and provide
# a simplified way to run the driver via {#process}
#
# @param io [IO]
# @param handler [Handler::MessagingHandler] to receive raw events in {#dispatch} and {#process}
def initialize(io, handler)
super(io)
@handler = handler
@adapter = Handler::Adapter.adapt(handler)
end
# @return [MessagingHandler] The handler dispatched to by {#process}
attr_reader :handler
# Dispatch all available raw proton events from {#event} to {#handler}
def dispatch()
each_event do |e|
case e.method # Events that affect the driver
when :on_transport_tail_closed then close_read
when :on_transport_head_closed then close_write
when :on_transport_closed then @io.close rescue nil # Allow double-close
end
e.dispatch @adapter
end
end
# Do {#read}, {#tick}, {#write} and {#dispatch} without blocking.
# @param [Time] now the current time
# @return [Time] Latest time to call {#process} again for scheduled events,
# or nil if there are no scheduled events
def process(now=Time.now)
read
dispatch
next_tick = tick(now)
dispatch
write
dispatch
return next_tick
end
end
end