| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| require 'monitor' |
| |
| module Qpid |
| |
| class ChannelBusy< Exception ; end |
| |
| class ChannelsBusy < Exception ; end |
| |
| class SessionBusy < Exception ; end |
| |
| class ConnectionFailed < Exception ; end |
| |
| class Timeout < Exception ; end |
| |
| class Connection < Assembler |
| |
| include MonitorMixin |
| |
| attr_reader :spec, :attached, :sessions, :thread |
| attr_accessor :opened, :failed, :close_code |
| |
| def initialize(sock, args={}) |
| super(sock) |
| |
| delegate = args[:delegate] || Qpid::Delegate::Client.method(:new) |
| spec = args[:spec] || nil |
| |
| @spec = Qpid::Spec010::load(spec) |
| @track = @spec["track"] |
| |
| @attached = {} |
| @sessions = {} |
| |
| @condition = new_cond |
| @opened = false |
| @failed = false |
| @close_code = [nil, "connection aborted"] |
| |
| @thread = nil |
| |
| @channel_max = 65535 |
| |
| @delegate = delegate.call(self, args) |
| end |
| |
| def attach(name, ch, delegate, force=false) |
| synchronize do |
| ssn = @attached[ch.id] |
| if ssn |
| raise ChannelBusy.new(ch, ssn) unless ssn.name == name |
| else |
| ssn = @sessions[name] |
| if ssn.nil? |
| ssn = Session.new(name, @spec, :delegate => delegate) |
| @sessions[name] = ssn |
| elsif ssn.channel |
| if force |
| @attached.delete(ssn.channel.id) |
| ssn.channel = nil |
| else |
| raise SessionBusy.new(ssn) |
| end |
| end |
| @attached[ch.id] = ssn |
| ssn.channel = ch |
| end |
| ch.session = ssn |
| return ssn |
| end |
| end |
| |
| def detach(name, ch) |
| synchronize do |
| @attached.delete(ch.id) |
| ssn = @sessions.delete(name) |
| if ssn |
| ssn.channel = nil |
| ssn.closed |
| return ssn |
| end |
| end |
| end |
| |
| def session(name, kwargs = {}) |
| timeout = kwargs[:timeout] |
| delegate = kwargs[:delegate] || Qpid::Session::Client.method(:new) |
| |
| # FIXME: Python has cryptic comment about 'ch 0 ?' |
| channel = (0..@channel_max).detect { |i| ! @attached.key?(i) } |
| raise ChannelsBusy unless channel |
| |
| synchronize do |
| ch = Channel.new(self, channel) |
| ssn = attach(name, ch, delegate) |
| ssn.channel.session_attach(name) |
| if ssn.wait_for(timeout) { ssn.channel } |
| return ssn |
| else |
| detach(name, ch) |
| raise Timeout |
| end |
| end |
| end |
| |
| def detach_all |
| synchronize do |
| attached.values.each do |ssn| |
| ssn.exceptions << @close_code unless @close_code[0] == 200 |
| detach(ssn.name, ssn.channel) |
| end |
| end |
| end |
| |
| def start(timeout=nil) |
| @delegate.start |
| @thread = Thread.new { run } |
| @thread[:name] = 'conn' |
| synchronize do |
| unless @condition.wait_for(timeout) { @opened || @failed } |
| raise Timeout |
| end |
| end |
| if @failed |
| raise ConnectionFailed.new(@close_code) |
| end |
| end |
| |
| def run |
| # XXX: we don't really have a good way to exit this loop without |
| # getting the other end to kill the socket |
| loop do |
| begin |
| seg = read_segment |
| rescue Qpid::Closed => e |
| detach_all |
| break |
| end |
| @delegate.received(seg) |
| end |
| end |
| |
| def close(timeout=nil) |
| return unless @opened |
| Channel.new(self, 0).connection_close(200) |
| synchronize do |
| unless @condition.wait_for(timeout) { ! @opened } |
| raise Timeout |
| end |
| end |
| @thread.join(timeout) |
| @thread = nil |
| end |
| |
| def signal |
| synchronize { @condition.signal } |
| end |
| |
| def to_s |
| # FIXME: We'd like to report something like HOST:PORT |
| return @sock.to_s |
| end |
| |
| class Channel < Invoker |
| |
| attr_reader :id, :connection |
| attr_accessor :session |
| |
| def initialize(connection, id) |
| @connection = connection |
| @id = id |
| @session = nil |
| end |
| |
| def resolve_method(name) |
| inst = @connection.spec[name] |
| if inst.is_a?(Qpid::Spec010::Control) |
| return invocation(:method, inst) |
| else |
| return invocation(:error, nil) |
| end |
| end |
| |
| def invoke(type, args) |
| ctl = type.create(*args) |
| sc = StringCodec.new(@connection.spec) |
| sc.write_control(ctl) |
| @connection.write_segment(Segment.new(true, true, type.segment_type, |
| type.track, self.id, sc.encoded)) |
| |
| log = Qpid::logger["qpid.io.ctl"] |
| log.debug("SENT %s", ctl) if log |
| end |
| |
| def to_s |
| return "#{@connection}[#{@id}]" |
| end |
| |
| end |
| |
| end |
| |
| end |