blob: 59d88196a32ad7a34b0ca82deb82ca29a7c58eea [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
require 'monitor'
module Qpid
class ChannelBusy< Exception ; end
class ChannelsBusy < Exception ; end
class SessionBusy < Exception ; end
class ConnectionFailed < Exception ; end
class Timeout < Exception ; end
class Connection < Assembler
include MonitorMixin
attr_reader :spec, :attached, :sessions, :thread
attr_accessor :opened, :failed, :close_code
def initialize(sock, args={})
super(sock)
delegate = args[:delegate] || Qpid::Delegate::Client.method(:new)
spec = args[:spec] || nil
@spec = Qpid::Spec010::load(spec)
@track = @spec["track"]
@attached = {}
@sessions = {}
@condition = new_cond
@opened = false
@failed = false
@close_code = [nil, "connection aborted"]
@thread = nil
@channel_max = 65535
@delegate = delegate.call(self, args)
end
def attach(name, ch, delegate, force=false)
synchronize do
ssn = @attached[ch.id]
if ssn
raise ChannelBusy.new(ch, ssn) unless ssn.name == name
else
ssn = @sessions[name]
if ssn.nil?
ssn = Session.new(name, @spec, :delegate => delegate)
@sessions[name] = ssn
elsif ssn.channel
if force
@attached.delete(ssn.channel.id)
ssn.channel = nil
else
raise SessionBusy.new(ssn)
end
end
@attached[ch.id] = ssn
ssn.channel = ch
end
ch.session = ssn
return ssn
end
end
def detach(name, ch)
synchronize do
@attached.delete(ch.id)
ssn = @sessions.delete(name)
if ssn
ssn.channel = nil
ssn.closed
return ssn
end
end
end
def session(name, kwargs = {})
timeout = kwargs[:timeout]
delegate = kwargs[:delegate] || Qpid::Session::Client.method(:new)
# FIXME: Python has cryptic comment about 'ch 0 ?'
channel = (0..@channel_max).detect { |i| ! @attached.key?(i) }
raise ChannelsBusy unless channel
synchronize do
ch = Channel.new(self, channel)
ssn = attach(name, ch, delegate)
ssn.channel.session_attach(name)
if ssn.wait_for(timeout) { ssn.channel }
return ssn
else
detach(name, ch)
raise Timeout
end
end
end
def detach_all
synchronize do
attached.values.each do |ssn|
ssn.exceptions << @close_code unless @close_code[0] == 200
detach(ssn.name, ssn.channel)
end
end
end
def start(timeout=nil)
@delegate.start
@thread = Thread.new { run }
@thread[:name] = 'conn'
synchronize do
unless @condition.wait_for(timeout) { @opened || @failed }
raise Timeout
end
end
if @failed
raise ConnectionFailed.new(@close_code)
end
end
def run
# XXX: we don't really have a good way to exit this loop without
# getting the other end to kill the socket
loop do
begin
seg = read_segment
rescue Qpid::Closed => e
detach_all
break
end
@delegate.received(seg)
end
end
def close(timeout=nil)
return unless @opened
Channel.new(self, 0).connection_close(200)
synchronize do
unless @condition.wait_for(timeout) { ! @opened }
raise Timeout
end
end
@thread.join(timeout)
@thread = nil
end
def signal
synchronize { @condition.signal }
end
def to_s
# FIXME: We'd like to report something like HOST:PORT
return @sock.to_s
end
class Channel < Invoker
attr_reader :id, :connection
attr_accessor :session
def initialize(connection, id)
@connection = connection
@id = id
@session = nil
end
def resolve_method(name)
inst = @connection.spec[name]
if inst.is_a?(Qpid::Spec010::Control)
return invocation(:method, inst)
else
return invocation(:error, nil)
end
end
def invoke(type, args)
ctl = type.create(*args)
sc = StringCodec.new(@connection.spec)
sc.write_control(ctl)
@connection.write_segment(Segment.new(true, true, type.segment_type,
type.track, self.id, sc.encoded))
log = Qpid::logger["qpid.io.ctl"]
log.debug("SENT %s", ctl) if log
end
def to_s
return "#{@connection}[#{@id}]"
end
end
end
end