| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| require 'socket' |
| |
| module Qpid::Proton |
| |
| # Associate an AMQP {Connection} and {Transport} with an {IO} |
| # |
| # - {#read} reads AMQP binary data from the {IO} and generates events |
| # - {#tick} generates timing-related events |
| # - {#event} gets events to be dispatched to {Handler::MessagingHandler}s |
| # - {#write} writes AMQP binary data to the {IO} |
| # |
| # Thread safety: The {ConnectionDriver} is not thread safe but separate |
| # {ConnectionDriver} instances can be processed concurrently. The |
| # {Container} handles multiple connections concurrently in multiple threads. |
| # |
| class ConnectionDriver |
| # Create a {Connection} and {Transport} associated with +io+ |
| # @param io [IO] An {IO} or {IO}-like object that responds |
| # to {IO#read_nonblock} and {IO#write_nonblock} |
| def initialize(io) |
| @impl = Cproton.pni_connection_driver or raise NoMemoryError |
| @io = io |
| @rbuf = "" # String for re-usable read buffer |
| end |
| |
| # @return [Connection] |
| def connection() |
| @connection ||= Connection.wrap(Cproton.pni_connection_driver_connection(@impl)) |
| end |
| |
| # @return [Transport] |
| def transport() |
| @transport ||= Transport.wrap(Cproton.pni_connection_driver_transport(@impl)) |
| end |
| |
| # @return [IO] Allows ConnectionDriver to be passed directly to {IO#select} |
| def to_io() @io; end |
| |
| # @return [Bool] True if the driver can read more data |
| def can_read?() Cproton.pni_connection_driver_read_size(@impl) > 0; end |
| |
| # @return [Bool] True if the driver has data to write |
| def can_write?() Cproton.pni_connection_driver_write_size(@impl) > 0; end |
| |
| # True if the ConnectionDriver has nothing left to do: both sides of the |
| # transport are closed and there are no events to dispatch. |
| def finished?() Cproton.pn_connection_driver_finished(@impl); end |
| |
| # Get the next event to dispatch, nil if no events available |
| def event() |
| e = Cproton.pn_connection_driver_next_event(@impl) |
| Event.new(e) if e |
| end |
| |
| # True if {#event} will return non-nil |
| def event?() Cproton.pn_connection_driver_has_event(@impl); end |
| |
| # Iterator for all available events |
| def each_event() |
| while e = event |
| yield e |
| end |
| end |
| |
| # Non-blocking read from {#io}, generate events for {#event} |
| # IO errors are returned as transport errors by {#event}, not raised |
| def read |
| size = Cproton.pni_connection_driver_read_size(@impl) |
| return if size <= 0 |
| @io.read_nonblock(size, @rbuf) # Use the same string rbuf for reading each time |
| Cproton.pni_connection_driver_read_copy(@impl, @rbuf) unless @rbuf.empty? |
| rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR |
| # Try again later. |
| rescue EOFError # EOF is not an error |
| close_read |
| rescue IOError, SystemCallError => e |
| close e |
| end |
| |
| # Non-blocking write to {#io} |
| # IO errors are returned as transport errors by {#event}, not raised |
| def write |
| data = Cproton.pn_connection_driver_write_buffer(@impl) |
| return unless data && data.size > 0 |
| n = @io.write_nonblock(data) |
| Cproton.pn_connection_driver_write_done(@impl, n) if n > 0 |
| rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR |
| # Try again later. |
| rescue IOError, SystemCallError => e |
| close e |
| end |
| |
| # Handle time-related work, for example idle-timeout events. |
| # May generate events for {#event} and change {#can_read?}, {#can_write?} |
| # |
| # @param [Time] now the current time, defaults to {Time#now}. |
| # |
| # @return [Time] time of the next scheduled event, or nil if there are no |
| # scheduled events. If non-nil you must call {#tick} again no later than |
| # this time. |
| def tick(now=Time.now) |
| transport = Cproton.pni_connection_driver_transport(@impl) |
| ms = Cproton.pn_transport_tick(transport, (now.to_r * 1000).to_i) |
| @next_tick = ms.zero? ? nil : Time.at(ms.to_r / 1000); |
| unless @next_tick |
| idle = Cproton.pn_transport_get_idle_timeout(transport); |
| @next_tick = now + (idle.to_r / 1000) unless idle.zero? |
| end |
| @next_tick |
| end |
| |
| # Time returned by the last call to {#tick} |
| attr_accessor :next_tick |
| |
| # Disconnect the write side of the transport, *without* sending an AMQP |
| # close frame. To close politely, you should use {Connection#close}, the |
| # transport will close itself once the protocol close is complete. |
| # |
| def close_write error=nil |
| set_error error |
| Cproton.pn_connection_driver_write_close(@impl) |
| @io.close_write rescue nil # Allow double-close |
| end |
| |
| # Is the read side of the driver closed? |
| def read_closed?() Cproton.pn_connection_driver_read_closed(@impl); end |
| |
| # Is the write side of the driver closed? |
| def write_closed?() Cproton.pn_connection_driver_read_closed(@impl); end |
| |
| # Disconnect the read side of the transport, without waiting for an AMQP |
| # close frame. See comments on {#close_write} |
| def close_read error=nil |
| set_error error |
| Cproton.pn_connection_driver_read_close(@impl) |
| @io.close_read rescue nil # Allow double-close |
| end |
| |
| # Disconnect both sides of the transport sending/waiting for AMQP close |
| # frames. See comments on {#close_write} |
| def close error=nil |
| close_write error |
| close_read |
| end |
| |
| private |
| |
| def set_error err |
| transport.condition ||= Condition.convert(err, "proton:io") if err |
| end |
| end |
| |
| # A {ConnectionDriver} that feeds raw proton events to a handler. |
| class HandlerDriver < ConnectionDriver |
| # Combine an {IO} with a handler and provide |
| # a simplified way to run the driver via {#process} |
| # |
| # @param io [IO] |
| # @param handler [Handler::MessagingHandler] to receive raw events in {#dispatch} and {#process} |
| def initialize(io, handler) |
| super(io) |
| @handler = handler |
| @adapter = Handler::Adapter.adapt(handler) |
| end |
| |
| # @return [MessagingHandler] The handler dispatched to by {#process} |
| attr_reader :handler |
| |
| # Dispatch all available raw proton events from {#event} to {#handler} |
| def dispatch() |
| each_event do |e| |
| case e.method # Events that affect the driver |
| when :on_transport_tail_closed then close_read |
| when :on_transport_head_closed then close_write |
| when :on_transport_closed then @io.close rescue nil # Allow double-close |
| end |
| e.dispatch @adapter |
| end |
| end |
| |
| # Do {#read}, {#tick}, {#write} and {#dispatch} without blocking. |
| # @param [Time] now the current time |
| # @return [Time] Latest time to call {#process} again for scheduled events, |
| # or nil if there are no scheduled events |
| def process(now=Time.now) |
| read |
| dispatch |
| next_tick = tick(now) |
| dispatch |
| write |
| dispatch |
| return next_tick |
| end |
| end |
| end |