| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| |
| require 'test_tools' |
| require 'minitest/unit' |
| require 'socket' |
| |
| # Container that listens on a random port |
| class TestContainer < Qpid::Proton::Container |
| |
| def initialize(handler, listener_opts, id) |
| super handler, id |
| @listener = listen_io(TCPServer.open(0), ListenOnceHandler.new(listener_opts)) |
| end |
| attr_reader :listener |
| end |
| |
| class ContainerSASLTest < MiniTest::Test |
| include Qpid::Proton |
| |
| # Handler for test client/server that sets up server and client SASL options |
| class SASLHandler < TestHandler |
| |
| def initialize(url="amqp://", opts=nil) |
| super() |
| @url, @opts = url, opts |
| end |
| |
| def on_container_start(container) |
| @client = container.connect("#{@url}:#{container.listener.port}", @opts) |
| end |
| |
| attr_reader :auth_user |
| |
| def on_connection_open(connection) |
| super |
| if connection == @client |
| connection.close |
| else |
| @auth_user = connection.user |
| end |
| end |
| end |
| |
| # Generate SASL server configuration files and database, initialize proton SASL |
| class SASLConfig |
| include Qpid::Proton |
| attr_reader :conf_dir, :conf_file, :conf_name, :database, :error |
| |
| def initialize() |
| if SASL.extended? # Configure cyrus SASL |
| @conf_dir = File.expand_path('sasl_conf') |
| @conf_name = "proton-server" |
| @database = File.join(@conf_dir, "proton.sasldb") |
| @conf_file = File.join(conf_dir,"#{@conf_name}.conf") |
| Dir::mkdir(@conf_dir) unless File.directory?(@conf_dir) |
| # Same user name in different realms |
| make_user("user", "password", "proton") # proton realm |
| make_user("user", "default_password") # Default realm |
| File.open(@conf_file, 'w') do |f| |
| f.write(" |
| sasldb_path: #{database} |
| mech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS |
| ") |
| end |
| # Tell proton library to use the new configuration |
| SASL.config_path = conf_dir |
| SASL.config_name = conf_name |
| end |
| rescue => e |
| @error = e |
| end |
| |
| private |
| |
| SASLPASSWD = (ENV['SASLPASSWD'] or 'saslpasswd2') |
| |
| def make_user(user, password, realm=nil) |
| realm_opt = (realm ? "-u #{realm}" : "") |
| cmd = "echo '#{password}' | #{SASLPASSWD} -c -p -f #{database} #{realm_opt} #{user}" |
| system(cmd) or raise RuntimeError.new("saslpasswd2 failed: #{cmd}") |
| end |
| |
| INSTANCE = SASLConfig.new |
| end |
| |
| def begin_extended_test |
| skip("Extended SASL not enabled") unless SASL.extended? |
| skip("Extended SASL setup error: #{SASLConfig::INSTANCE.error}") if SASLConfig::INSTANCE.error |
| end |
| |
| def test_sasl_anonymous() |
| s = SASLHandler.new("amqp://", {:sasl_allowed_mechs => "ANONYMOUS"}) |
| TestContainer.new(s, {:sasl_allowed_mechs => "ANONYMOUS"}, __method__).run |
| assert_equal "anonymous", s.connections[0].user |
| end |
| |
| def test_sasl_plain_url() |
| begin_extended_test |
| # Use default realm with URL, should authenticate with "default_password" |
| opts = {:sasl_allowed_mechs => "PLAIN", :sasl_allow_insecure_mechs => true} |
| s = SASLHandler.new("amqp://user:default_password@", opts) |
| TestContainer.new(s, opts, __method__).run |
| assert_equal(2, s.connections.size) |
| assert_equal("user", s.auth_user) |
| end |
| |
| def test_sasl_plain_options() |
| begin_extended_test |
| # Use default realm with connection options, should authenticate with "default_password" |
| opts = {:sasl_allowed_mechs => "PLAIN",:sasl_allow_insecure_mechs => true, |
| :user => 'user', :password => 'default_password' } |
| s = SASLHandler.new("amqp://", opts) |
| TestContainer.new(s, {:sasl_allowed_mechs => "PLAIN",:sasl_allow_insecure_mechs => true}, __method__).run |
| assert_equal(2, s.connections.size) |
| assert_equal("user", s.auth_user) |
| end |
| |
| # Ensure we don't allow PLAIN if allow_insecure_mechs = true is not explicitly set |
| def test_disallow_insecure() |
| # Don't set allow_insecure_mechs, but try to use PLAIN |
| s = SASLHandler.new("amqp://user:password@", {:sasl_allowed_mechs => "PLAIN", :sasl_allow_insecure_mechs => true}) |
| e = assert_raises(TestError) { TestContainer.new(s, {:sasl_allowed_mechs => "PLAIN"}, __method__).run } |
| assert_match(/amqp:unauthorized-access.*Authentication failed/, e.to_s) |
| end |
| end |