blob: 7125c1795fc75d219af1818049aaed4daa01c5da [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import absolute_import
import os
from proton import *
from . import common
from . import engine
from .common import pump, Skipped
def _sslCertpath(file):
""" Return the full path to the certificate, keyfile, etc.
"""
if os.name=="nt":
if file.find("private-key")!=-1:
# The private key is not in a separate store
return None
# Substitute pkcs#12 equivalent for the CA/key store
if file.endswith(".pem"):
file = file[:-4] + ".p12"
return os.path.join(os.path.dirname(__file__),
"ssl_db/%s" % file)
def _testSaslMech(self, mech, clientUser='user@proton', authUser='user@proton', authzid=None, encrypted=False, authenticated=True):
self.s1.allowed_mechs(mech)
self.c1.open()
self.c2.open()
pump(self.t1, self.t2, 1024)
if encrypted is not None:
assert self.t2.encrypted == encrypted, encrypted
assert self.t1.encrypted == encrypted, encrypted
if authzid is None:
authzid = authUser
assert self.t2.authenticated == authenticated, authenticated
assert self.t1.authenticated == authenticated, authenticated
if authenticated:
# Server
assert self.t2.user == authUser
assert self.s2.user == authUser
assert self.s2.authorization == authzid, self.s2.authorization
assert self.s2.mech == mech.strip()
assert self.s2.outcome == SASL.OK, self.s2.outcome
assert self.c2.state & Endpoint.LOCAL_ACTIVE and self.c2.state & Endpoint.REMOTE_ACTIVE,\
"local_active=%s, remote_active=%s" % (self.c1.state & Endpoint.LOCAL_ACTIVE, self.c1.state & Endpoint.REMOTE_ACTIVE)
# Client
assert self.t1.user == clientUser
assert self.s1.user == clientUser
assert self.s1.mech == mech.strip()
assert self.s1.outcome == SASL.OK, self.s1.outcome
assert self.c1.state & Endpoint.LOCAL_ACTIVE and self.c1.state & Endpoint.REMOTE_ACTIVE,\
"local_active=%s, remote_active=%s" % (self.c1.state & Endpoint.LOCAL_ACTIVE, self.c1.state & Endpoint.REMOTE_ACTIVE)
else:
# Server
assert self.t2.user == None
assert self.s2.user == None
assert self.s2.authorization == None, self.s2.authorization
assert self.s2.outcome != SASL.OK, self.s2.outcome
# Client
assert self.t1.user == clientUser
assert self.s1.user == clientUser
assert self.s1.outcome != SASL.OK, self.s1.outcome
class Test(common.Test):
pass
def consumeAllOuput(t):
stops = 0
while stops<1:
out = t.peek(1024)
l = len(out) if out else 0
t.pop(l)
if l <= 0:
stops += 1
class SaslTest(Test):
def setUp(self):
self.t1 = Transport()
self.s1 = SASL(self.t1)
self.t2 = Transport(Transport.SERVER)
self.t2.max_frame_size = 65536
self.s2 = SASL(self.t2)
def pump(self):
pump(self.t1, self.t2, 1024)
# We have to generate the client frames manually because proton does not
# generate pipelined SASL and AMQP frames together
def testIllegalProtocolLayering(self):
# Server
self.s2.allowed_mechs('ANONYMOUS')
c2 = Connection()
self.t2.bind(c2)
assert self.s2.outcome is None
# Push client bytes into server
self.t2.push(
# SASL
b'AMQP\x03\x01\x00\x00'
# @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
# SASL (again illegally)
b'AMQP\x03\x01\x00\x00'
# @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
# AMQP
b'AMQP\x00\x01\x00\x00'
# @open(16) [container-id="", channel-max=1234]
b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@'
)
consumeAllOuput(self.t2)
assert self.t2.condition
assert self.t2.closed
assert not c2.state & Endpoint.REMOTE_ACTIVE
def testPipelinedClient(self):
# Server
self.s2.allowed_mechs('ANONYMOUS')
c2 = Connection()
self.t2.bind(c2)
assert self.s2.outcome is None
# Push client bytes into server
self.t2.push(
# SASL
b'AMQP\x03\x01\x00\x00'
# @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
# AMQP
b'AMQP\x00\x01\x00\x00'
# @open(16) [container-id="", channel-max=1234]
b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@'
)
consumeAllOuput(self.t2)
assert not self.t2.condition
assert self.s2.outcome == SASL.OK
assert c2.state & Endpoint.REMOTE_ACTIVE
def testPipelinedServer(self):
# Client
self.s1.allowed_mechs('ANONYMOUS')
c1 = Connection()
self.t1.bind(c1)
assert self.s1.outcome is None
# Push server bytes into client
# Commented out lines in this test are where the client input processing doesn't
# run after output processing even though there is input waiting
self.t1.push(
# SASL
b'AMQP\x03\x01\x00\x00'
# @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS]]
b'\x00\x00\x00\x1c\x02\x01\x00\x00\x00S@\xc0\x0f\x01\xe0\x0c\x01\xa3\tANONYMOUS'
# @sasl-outcome(68) [code=0]
b'\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00'
# AMQP
b'AMQP\x00\x01\x00\x00'
# @open(16) [container-id="", channel-max=1234]
b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@'
)
consumeAllOuput(self.t1)
assert self.s1.outcome == SASL.OK
assert c1.state & Endpoint.REMOTE_ACTIVE
def testPipelined2(self):
out1 = self.t1.peek(1024)
self.t1.pop(len(out1))
self.t2.push(out1)
self.s2.allowed_mechs('ANONYMOUS')
c2 = Connection()
c2.open()
self.t2.bind(c2)
out2 = self.t2.peek(1024)
self.t2.pop(len(out2))
self.t1.push(out2)
out1 = self.t1.peek(1024)
assert len(out1) > 0
def testFracturedSASL(self):
""" PROTON-235
"""
assert self.s1.outcome is None
# self.t1.trace(Transport.TRACE_FRM)
out = self.t1.peek(1024)
self.t1.pop(len(out))
self.t1.push(b"AMQP\x03\x01\x00\x00")
out = self.t1.peek(1024)
self.t1.pop(len(out))
self.t1.push(b"\x00\x00\x00")
out = self.t1.peek(1024)
self.t1.pop(len(out))
self.t1.push(b"6\x02\x01\x00\x00\x00S@\xc0\x29\x01\xe0\x26\x04\xa3\x05PLAIN\x0aDIGEST-MD5\x09ANONYMOUS\x08CRAM-MD5")
out = self.t1.peek(1024)
self.t1.pop(len(out))
self.t1.push(b"\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00")
out = self.t1.peek(1024)
self.t1.pop(len(out))
while out:
out = self.t1.peek(1024)
self.t1.pop(len(out))
assert self.s1.outcome == SASL.OK, self.s1.outcome
def test_singleton(self):
"""Verify that only a single instance of SASL can exist per Transport"""
transport = Transport()
attr = object()
sasl1 = SASL(transport)
sasl1.my_attribute = attr
sasl2 = transport.sasl()
sasl3 = SASL(transport)
assert sasl1 == sasl2
assert sasl1 == sasl3
assert sasl1.my_attribute == attr
assert sasl2.my_attribute == attr
assert sasl3.my_attribute == attr
transport = Transport()
sasl1 = transport.sasl()
sasl1.my_attribute = attr
sasl2 = SASL(transport)
assert sasl1 == sasl2
assert sasl1.my_attribute == attr
assert sasl2.my_attribute == attr
def testSaslSkipped(self):
"""Verify that the server (with SASL) correctly handles a client without SASL"""
self.t1 = Transport()
self.t2.require_auth(False)
self.pump()
assert self.s2.outcome == None
assert self.t2.condition == None
assert self.t2.authenticated == False
assert self.s1.outcome == None
assert self.t1.condition == None
assert self.t1.authenticated == False
def testSaslSkippedFail(self):
"""Verify that the server (with SASL) correctly handles a client without SASL"""
self.t1 = Transport()
self.t2.require_auth(True)
self.pump()
assert self.s2.outcome == None
assert self.t2.condition != None
assert self.s1.outcome == None
assert self.t1.condition != None
def testMechNotFound(self):
self.c1 = Connection()
self.c1.open()
self.t1.bind(self.c1)
self.s1.allowed_mechs('IMPOSSIBLE')
self.pump()
assert self.t2.authenticated == False
assert self.t1.authenticated == False
assert self.s1.outcome != SASL.OK
assert self.s2.outcome != SASL.OK
class SASLMechTest(Test):
def setUp(self):
self.t1 = Transport()
self.s1 = SASL(self.t1)
self.t2 = Transport(Transport.SERVER)
self.s2 = SASL(self.t2)
self.c1 = Connection()
self.c1.user = 'user@proton'
self.c1.password = 'password'
self.c1.hostname = 'localhost'
self.c2 = Connection()
def testANON(self):
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'ANONYMOUS', authUser='anonymous')
def testCRAMMD5(self):
common.ensureCanTestExtendedSASL()
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'CRAM-MD5')
def testDIGESTMD5(self):
common.ensureCanTestExtendedSASL()
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'DIGEST-MD5')
# PLAIN shouldn't work without encryption without special setting
def testPLAINfail(self):
common.ensureCanTestExtendedSASL()
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'PLAIN', authenticated=False)
# Client won't accept PLAIN even if offered by server without special setting
def testPLAINClientFail(self):
common.ensureCanTestExtendedSASL()
self.s2.allow_insecure_mechs = True
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'PLAIN', authenticated=False)
# PLAIN will only work if both ends are specially set up
def testPLAIN(self):
common.ensureCanTestExtendedSASL()
self.s1.allow_insecure_mechs = True
self.s2.allow_insecure_mechs = True
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'PLAIN')
# SCRAM not supported before Cyrus SASL 2.1.26
# so not universal and hence need a test for support
# to keep it in tests.
# def testSCRAMSHA1(self):
# common.ensureCanTestExtendedSASL()
#
# self.t1.bind(self.c1)
# self.t2.bind(self.c2)
# _testSaslMech(self, 'SCRAM-SHA-1')
def _sslConnection(domain, transport, connection):
transport.bind(connection)
ssl = SSL(transport, domain, None )
return connection
class SSLSASLTest(Test):
def setUp(self):
if not common.isSSLPresent():
raise Skipped("No SSL libraries found.")
self.server_domain = SSLDomain(SSLDomain.MODE_SERVER)
self.client_domain = SSLDomain(SSLDomain.MODE_CLIENT)
self.t1 = Transport()
self.s1 = SASL(self.t1)
self.t2 = Transport(Transport.SERVER)
self.s2 = SASL(self.t2)
self.c1 = Connection()
self.c2 = Connection()
def testSSLPlainSimple(self):
if not SASL.extended():
raise Skipped("Simple SASL server does not support PLAIN")
common.ensureCanTestExtendedSASL()
clientUser = 'user@proton'
mech = 'PLAIN'
self.c1.user = clientUser
self.c1.password = 'password'
self.c1.hostname = 'localhost'
ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
_testSaslMech(self, mech, encrypted=True)
def testSSLPlainAuthzid(self):
if not SASL.extended():
raise Skipped("Simple SASL server does not support PLAIN")
common.ensureCanTestExtendedSASL()
clientUser = 'user@proton'
authzid = 'user2'
mech = 'PLAIN'
self.c1.user = clientUser
self.c1.authorization = authzid
self.c1.password = 'password'
self.c1.hostname = 'localhost'
ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
_testSaslMech(self, mech, authzid=authzid, encrypted=True)
def testSSLPlainSimpleFail(self):
if not SASL.extended():
raise Skipped("Simple SASL server does not support PLAIN")
common.ensureCanTestExtendedSASL()
clientUser = 'usr@proton'
mech = 'PLAIN'
self.c1.user = clientUser
self.c1.password = 'password'
self.c1.hostname = 'localhost'
ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
_testSaslMech(self, mech, clientUser='usr@proton', encrypted=True, authenticated=False)
def testSSLExternalSimple(self):
if os.name=="nt":
extUser = 'O=Client, CN=127.0.0.1'
else:
extUser = 'O=Client,CN=127.0.0.1'
mech = 'EXTERNAL'
self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"),
_sslCertpath("server-private-key.pem"),
"server-password")
self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER,
_sslCertpath("ca-certificate.pem") )
self.client_domain.set_credentials(_sslCertpath("client-certificate.pem"),
_sslCertpath("client-private-key.pem"),
"client-password")
self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
_testSaslMech(self, mech, clientUser=None, authUser=extUser, encrypted=True)
def testSSLExternalAuthzid(self):
if os.name=="nt":
extUser = 'O=Client, CN=127.0.0.1'
else:
extUser = 'O=Client,CN=127.0.0.1'
mech = 'EXTERNAL'
authzid = 'user_foo'
self.c1.authorization = authzid
self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"),
_sslCertpath("server-private-key.pem"),
"server-password")
self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER,
_sslCertpath("ca-certificate.pem") )
self.client_domain.set_credentials(_sslCertpath("client-certificate.pem"),
_sslCertpath("client-private-key.pem"),
"client-password")
self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
_testSaslMech(self, mech, clientUser=None, authUser=extUser, authzid=authzid, encrypted=True)
def testSSLExternalSimpleFail(self):
mech = 'EXTERNAL'
self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"),
_sslCertpath("server-private-key.pem"),
"server-password")
self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER,
_sslCertpath("ca-certificate.pem") )
self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
_testSaslMech(self, mech, clientUser=None, authUser=None, encrypted=None, authenticated=False)
class SASLEventTest(engine.CollectorTest):
def setUp(self):
engine.CollectorTest.setUp(self)
self.t1 = Transport()
self.s1 = SASL(self.t1)
self.t2 = Transport(Transport.SERVER)
self.s2 = SASL(self.t2)
self.c1 = Connection()
self.c1.user = 'user@proton'
self.c1.password = 'password'
self.c1.hostname = 'localhost'
self.c2 = Connection()
self.collector = Collector()
def testNormalAuthenticationClient(self):
common.ensureCanTestExtendedSASL()
self.c1.collect(self.collector)
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'DIGEST-MD5')
self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
Event.CONNECTION_REMOTE_OPEN)
def testNormalAuthenticationServer(self):
common.ensureCanTestExtendedSASL()
self.c2.collect(self.collector)
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'DIGEST-MD5')
self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
Event.CONNECTION_REMOTE_OPEN)
def testFailedAuthenticationClient(self):
common.ensureCanTestExtendedSASL()
clientUser = "usr@proton"
self.c1.user = clientUser
self.c1.collect(self.collector)
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'DIGEST-MD5', clientUser=clientUser, authenticated=False)
self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
Event.TRANSPORT_ERROR,
Event.TRANSPORT_TAIL_CLOSED,
Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
def testFailedAuthenticationServer(self):
common.ensureCanTestExtendedSASL()
clientUser = "usr@proton"
self.c1.user = clientUser
self.c2.collect(self.collector)
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'DIGEST-MD5', clientUser=clientUser, authenticated=False)
self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
Event.TRANSPORT_ERROR,
Event.TRANSPORT_TAIL_CLOSED,
Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
def testNoMechClient(self):
common.ensureCanTestExtendedSASL()
self.c1.collect(self.collector)
self.s2.allowed_mechs('IMPOSSIBLE')
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'DIGEST-MD5', authenticated=False)
self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
Event.TRANSPORT_ERROR,
Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
def testNoMechServer(self):
common.ensureCanTestExtendedSASL()
self.c2.collect(self.collector)
self.s2.allowed_mechs('IMPOSSIBLE')
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'DIGEST-MD5', authenticated=False)
self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
Event.TRANSPORT_TAIL_CLOSED,
Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
def testDisallowedMechClient(self):
self.c1.collect(self.collector)
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'IMPOSSIBLE', authenticated=False)
self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
Event.TRANSPORT_ERROR,
Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
def testDisallowedMechServer(self):
self.c2.collect(self.collector)
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'IMPOSSIBLE', authenticated=False)
self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
Event.TRANSPORT_TAIL_CLOSED,
Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
def testDisallowedPlainClient(self):
self.c1.collect(self.collector)
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'PLAIN', authenticated=False)
self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
Event.TRANSPORT_ERROR,
Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
def testDisallowedPlainServer(self):
self.c2.collect(self.collector)
self.t1.bind(self.c1)
self.t2.bind(self.c2)
_testSaslMech(self, 'PLAIN', authenticated=False)
self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
Event.TRANSPORT_TAIL_CLOSED,
Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)