blob: 71a9f3a86ee89fea773606a6a5fc856959f9310c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
require 'test_tools'
require 'minitest/unit'
require 'socket'
# Container that listens on a random port
class TestContainer < Qpid::Proton::Container
def initialize(handler, lopts=nil, id=nil)
super handler, id
@listener = listen_io(TCPServer.open(0), ListenOnceHandler.new(lopts))
end
def port() @listener.to_io.addr[1]; end
def url() "amqp://:#{port}"; end#
end
class ContainerTest < Minitest::Test
include Qpid::Proton
def test_simple()
send_handler = Class.new(MessagingHandler) do
attr_reader :accepted, :sent
def on_sendable(sender)
sender.send Message.new("foo") unless @sent
@sent = true
end
def on_tracker_accept(tracker)
@accepted = true
tracker.connection.close
end
end.new
receive_handler = Class.new(MessagingHandler) do
attr_reader :message, :link
def on_link_open(link)
@link = link
@link.open
@link.flow(1)
end
def on_message(delivery, message)
@message = message;
delivery.update Disposition::ACCEPTED
delivery.settle
end
end.new
c = TestContainer.new(receive_handler, {}, __method__)
c.connect(c.url, {:handler => send_handler}).open_sender({:name => "testlink"})
c.run
assert send_handler.accepted
assert_equal "testlink", receive_handler.link.name
assert_equal "foo", receive_handler.message.body
assert_equal "test_simple", receive_handler.link.connection.container_id
end
class CloseOnOpenHandler < TestHandler
def on_connection_open(c) super; c.close; end
end
def test_auto_stop_one
# A listener and a connection
start_stop_handler = Class.new do
def on_container_start(c) @start = c; end
def on_container_stop(c) @stop = c; end
attr_reader :start, :stop
end.new
c = Container.new(start_stop_handler, __method__)
threads = 3.times.collect { Thread.new { c.run } }
sleep(0.01) while c.running < 3
assert_equal c, start_stop_handler.start
l = c.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
c.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} )
threads.each { |t| assert t.join(1) }
assert_equal c, start_stop_handler.stop
assert_raises(Container::StoppedError) { c.run }
end
def test_auto_stop_two
# Connect between different containers
c1, c2 = Container.new("#{__method__}-1"), Container.new("#{__method__}-2")
threads = [ Thread.new {c1.run }, Thread.new {c2.run } ]
l = c2.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
c1.connect(l.url, { :handler => CloseOnOpenHandler.new} )
assert threads.each { |t| t.join(1) }
assert_raises(Container::StoppedError) { c1.run }
assert_raises(Container::StoppedError) { c2.connect("") }
end
def test_auto_stop_listener_only
c = Container.new(__method__)
# Listener only, external close
t = Thread.new { c.run }
l = c.listen_io(TCPServer.new(0))
l.close
assert t.join(1)
end
def test_stop_empty
c = Container.new(__method__)
threads = 3.times.collect { Thread.new { c.run } }
sleep(0.01) while c.running < 3
assert_nil threads[0].join(0.001) # Not stopped
c.stop
assert c.stopped
assert_raises(Container::StoppedError) { c.connect("") }
assert_raises(Container::StoppedError) { c.run }
threads.each { |t| assert t.join(1) }
end
def test_stop
c = Container.new(__method__)
c.auto_stop = false
l = c.listen_io(TCPServer.new(0))
threads = 3.times.collect { Thread.new { c.run } }
sleep(0.01) while c.running < 3
l.close
assert_nil threads[0].join(0.001) # Not stopped, no auto_stop
l = c.listen_io(TCPServer.new(0)) # New listener
conn = c.connect("amqp://:#{l.to_io.addr[1]}")
c.stop
assert c.stopped
threads.each { |t| assert t.join(1) }
assert_raises(Container::StoppedError) { c.run }
assert_equal 0, c.running
assert_nil l.condition
assert_nil conn.condition
end
def test_bad_host
cont = Container.new(__method__)
assert_raises (SocketError) { cont.listen("badlisten.example.com:999") }
assert_raises (SocketError) { c = cont.connect("badconnect.example.com:999") }
end
# Verify that connection options are sent to the peer and available as Connection methods
def test_connection_options
# Note: user, password and sasl_xxx options are tested by ContainerSASLTest below
server_handler = Class.new(MessagingHandler) do
def on_error(e) raise e.inspect; end
def on_connection_open(c)
@connection = c
c.open({
:virtual_host => "server.to.client",
:properties => { :server => :client },
:offered_capabilities => [ :s1 ],
:desired_capabilities => [ :s2 ],
:container_id => "box",
})
c.close
end
attr_reader :connection
end.new
# Transport options must be provided to the listener, by Connection#open it is too late
cont = TestContainer.new(nil, {
:handler => server_handler,
:idle_timeout => 88,
:max_sessions =>1000,
:max_frame_size => 8888,
})
client = cont.connect(cont.url,
{:virtual_host => "client.to.server",
:properties => { :foo => :bar, "str" => "str" },
:offered_capabilities => [:c1 ],
:desired_capabilities => ["c2" ],
:idle_timeout => 42,
:max_sessions =>100,
:max_frame_size => 4096,
:container_id => "bowl"
})
cont.run
c = server_handler.connection
assert_equal "client.to.server", c.virtual_host
assert_equal({ :foo => :bar, :str => "str" }, c.properties)
assert_equal([:c1], c.offered_capabilities)
assert_equal([:c2], c.desired_capabilities)
assert_equal 21, c.idle_timeout # Proton divides by 2
assert_equal 100, c.max_sessions
assert_equal 4096, c.max_frame_size
assert_equal "bowl", c.container_id
c = client
assert_equal "server.to.client", c.virtual_host
assert_equal({ :server => :client }, c.properties)
assert_equal([:s1], c.offered_capabilities)
assert_equal([:s2], c.desired_capabilities)
assert_equal "box", c.container_id
assert_equal 8888, c.max_frame_size
assert_equal 44, c.idle_timeout # Proton divides by 2
assert_equal 100, c.max_sessions
end
end
class ContainerSASLTest < Minitest::Test
include Qpid::Proton
# Handler for test client/server that sets up server and client SASL options
class SASLHandler < TestHandler
def initialize(url="amqp://", opts=nil)
super()
@url, @opts = url, opts
end
def on_container_start(container)
@client = container.connect("#{@url}:#{container.port}", @opts)
end
attr_reader :auth_user
def on_connection_open(connection)
super
if connection == @client
connection.close
else
@auth_user = connection.user
end
end
end
# Generate SASL server configuration files and database, initialize proton SASL
class SASLConfig
include Qpid::Proton
attr_reader :conf_dir, :conf_file, :conf_name, :database
def initialize()
if SASL.extended? # Configure cyrus SASL
@conf_dir = File.expand_path('sasl_conf')
@conf_name = "proton-server"
@database = File.join(@conf_dir, "proton.sasldb")
@conf_file = File.join(conf_dir,"#{@conf_name}.conf")
Dir::mkdir(@conf_dir) unless File.directory?(@conf_dir)
# Same user name in different realms
make_user("user", "password", "proton") # proton realm
make_user("user", "default_password") # Default realm
File.open(@conf_file, 'w') do |f|
f.write("
sasldb_path: #{database}
mech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS
")
end
# Tell proton library to use the new configuration
SASL.config_path = conf_dir
SASL.config_name = conf_name
end
end
private
SASLPASSWD = (ENV['SASLPASSWD'] or 'saslpasswd2')
def make_user(user, password, realm=nil)
realm_opt = (realm ? "-u #{realm}" : "")
cmd = "echo '#{password}' | #{SASLPASSWD} -c -p -f #{database} #{realm_opt} #{user}"
system(cmd) or raise RuntimeError.new("saslpasswd2 failed: #{makepw_cmd}")
end
DEFAULT = SASLConfig.new
end
def test_sasl_anonymous()
s = SASLHandler.new("amqp://", {:sasl_allowed_mechs => "ANONYMOUS"})
TestContainer.new(s, {:sasl_allowed_mechs => "ANONYMOUS"}, __method__).run
assert_equal "anonymous", s.connections[0].user
end
def test_sasl_plain_url()
skip unless SASL.extended?
# Use default realm with URL, should authenticate with "default_password"
opts = {:sasl_allowed_mechs => "PLAIN", :sasl_allow_insecure_mechs => true}
s = SASLHandler.new("amqp://user:default_password@", opts)
TestContainer.new(s, opts, __method__).run
assert_equal(2, s.connections.size)
assert_equal("user", s.auth_user)
end
def test_sasl_plain_options()
skip unless SASL.extended?
# Use default realm with connection options, should authenticate with "default_password"
opts = {:sasl_allowed_mechs => "PLAIN",:sasl_allow_insecure_mechs => true,
:user => 'user', :password => 'default_password' }
s = SASLHandler.new("amqp://", opts)
TestContainer.new(s, {:sasl_allowed_mechs => "PLAIN",:sasl_allow_insecure_mechs => true}, __method__).run
assert_equal(2, s.connections.size)
assert_equal("user", s.auth_user)
end
# Ensure we don't allow PLAIN if allow_insecure_mechs = true is not explicitly set
def test_disallow_insecure()
# Don't set allow_insecure_mechs, but try to use PLAIN
s = SASLHandler.new("amqp://user:password@", {:sasl_allowed_mechs => "PLAIN", :sasl_allow_insecure_mechs => true})
e = assert_raises(TestError) { TestContainer.new(s, {:sasl_allowed_mechs => "PLAIN"}, __method__).run }
assert_match(/amqp:unauthorized-access.*Authentication failed/, e.to_s)
end
end