blob: a6523db2909644225eaa83dcfa1cc91b349f796e [file] [log] [blame]
#--
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#++
module Qpid::Proton::Reactor
class Connector < Qpid::Proton::BaseHandler
attr_accessor :address
attr_accessor :reconnect
attr_accessor :ssl_domain
def initialize(connection)
@connection = connection
@address = nil
@heartbeat = nil
@reconnect = nil
@ssl_domain = nil
end
def on_connection_local_open(event)
self.connect(event.connection)
end
def on_connection_remote_open(event)
if !@reconnect.nil?
@reconnect.reset
@transport = nil
end
end
def on_transport_tail_closed(event)
self.on_transport_closed(event)
end
def on_transport_closed(event)
if !@connection.nil? && !(@connection.state & Qpid::Proton::Endpoint::LOCAL_ACTIVE).zero?
if !@reconnect.nil?
event.transport.unbind
delay = @reconnect.next
if delay == 0
self.connect(@connection)
else
event.reactor.schedule(delay, self)
end
else
@connection = nil
end
end
end
def on_timer_task(event)
self.connect(@connection)
end
def on_connection_remote_close(event)
@connection = nil
end
def connect(connection)
url = @address.next
connection.hostname = "#{url.host}:#{url.port}"
transport = Qpid::Proton::Transport.new
transport.bind(connection)
if !@heartbeat.nil?
transport.idle_timeout = @heartbeat
elsif (url.scheme == "amqps") && !@ssl_domain.nil?
@ssl = Qpid::Proton::SSL.new(transport, @ssl_domain)
@ss.peer_hostname = url.host
elsif !url.username.nil?
sasl = transport.sasl
if url.username == "anonymous"
sasl.mechanisms("ANONYMOUS")
else
sasl.plain(url.username, url.password)
end
end
end
end
end