PROTON-1385: remove various tests that are skipped / aimed at proton-c API
diff --git a/tests/java/shim/binding/proton/__init__.py b/tests/java/shim/binding/proton/__init__.py
index ecb480f..76bb7d3 100644
--- a/tests/java/shim/binding/proton/__init__.py
+++ b/tests/java/shim/binding/proton/__init__.py
@@ -2728,21 +2728,6 @@
def require_auth(self, bool):
pn_transport_require_auth(self._impl, bool)
- @property
- def authenticated(self):
- return pn_transport_is_authenticated(self._impl)
-
- def require_encryption(self, bool):
- pn_transport_require_encryption(self._impl, bool)
-
- @property
- def encrypted(self):
- return pn_transport_is_encrypted(self._impl)
-
- @property
- def user(self):
- return pn_transport_get_user(self._impl)
-
def bind(self, connection):
"""Assign a connection to the transport"""
self._check(pn_transport_bind(self._impl, connection._impl))
diff --git a/tests/java/shim/cengine.py b/tests/java/shim/cengine.py
index 7afee6b..d72f469 100644
--- a/tests/java/shim/cengine.py
+++ b/tests/java/shim/cengine.py
@@ -875,9 +875,6 @@
def pn_transport():
return wrap(Proton.transport(), pn_transport_wrapper)
-def pn_transport_get_pytracer(trans):
- raise Skipped()
-
def pn_transport_attachments(trans):
return trans.impl.attachments()
diff --git a/tests/java/shim/csasl.py b/tests/java/shim/csasl.py
index b540f82..0308be2 100644
--- a/tests/java/shim/csasl.py
+++ b/tests/java/shim/csasl.py
@@ -59,25 +59,6 @@
Sasl.PN_SASL_TEMP: PN_SASL_TEMP,
}
-def pn_transport_require_auth(transport, require):
- raise Skipped('Not supported in Proton-J')
-
-# TODO: Placeholders
-def pn_transport_is_authenticated(transport):
- raise Skipped('Not supported in Proton-J')
-
-def pn_transport_is_encrypted(transport):
- raise Skipped('Not supported in Proton-J')
-
-def pn_transport_get_user(transport):
- raise Skipped('Not supported in Proton-J')
-
-def pn_connection_set_user(connection, user):
- pass
-
-def pn_connection_set_password(connection, password):
- pass
-
def pn_sasl_allowed_mechs(sasl, mechs):
sasl.setMechanisms(*mechs.split())
diff --git a/tests/java/shim/cssl.py b/tests/java/shim/cssl.py
index d389984..d0efbfc 100644
--- a/tests/java/shim/cssl.py
+++ b/tests/java/shim/cssl.py
@@ -113,9 +113,6 @@
# XXX: session_id
ssl.impl = ssl.transport.impl.ssl(domain, None)
-def pn_ssl_resume_status(ssl):
- raise Skipped()
-
def pn_ssl_get_cipher_name(ssl, size):
name = ssl.impl.getCipherName()
return (bool(name), name)
diff --git a/tests/python/proton_tests/common.py b/tests/python/proton_tests/common.py
index 4c1adf1..93a6a4f 100644
--- a/tests/python/proton_tests/common.py
+++ b/tests/python/proton_tests/common.py
@@ -162,13 +162,6 @@
if SASL.extended():
_cyrusSetup('sasl_conf')
-# TODO: remove all tests that reference this
-def ensureCanTestExtendedSASL():
- if not SASL.extended():
- raise Skipped('Extended SASL not supported')
- if not createdSASLDb:
- raise Skipped("Can't Test Extended SASL: Couldn't create auth db")
-
class DefaultConfig:
defines = {}
diff --git a/tests/python/proton_tests/reactor.py b/tests/python/proton_tests/reactor.py
index 39935f7..ab4492b 100644
--- a/tests/python/proton_tests/reactor.py
+++ b/tests/python/proton_tests/reactor.py
@@ -20,7 +20,7 @@
import time
import sys
-from .common import Test, SkipTest, TestServer, free_tcp_port, ensureCanTestExtendedSASL
+from .common import Test, SkipTest, TestServer, free_tcp_port
from proton.reactor import Container, Reactor, ApplicationEvent, EventInjector
from proton.handlers import CHandshaker, MessagingHandler
from proton import Handler, Url
@@ -405,61 +405,6 @@
assert h.init, "excpected the init"
-class ApplicationEventTest(Test):
- """Test application defined events and handlers."""
-
- class MyTestServer(TestServer):
- def __init__(self):
- super(ApplicationEventTest.MyTestServer, self).__init__()
-
- class MyHandler(Handler):
- def __init__(self, test):
- super(ApplicationEventTest.MyHandler, self).__init__()
- self._test = test
-
- def on_hello(self, event):
- # verify PROTON-1056
- self._test.hello_rcvd = str(event)
-
- def on_goodbye(self, event):
- self._test.goodbye_rcvd = str(event)
-
- def setUp(self):
- import os
- if not hasattr(os, 'pipe'):
- # KAG: seems like Jython doesn't have an os.pipe() method
- raise SkipTest()
- if os.name=="nt":
- # Correct implementation on Windows is complicated
- raise SkipTest("PROTON-1071")
- self.server = ApplicationEventTest.MyTestServer()
- self.server.reactor.handler.add(ApplicationEventTest.MyHandler(self))
- self.event_injector = EventInjector()
- self.hello_event = ApplicationEvent("hello")
- self.goodbye_event = ApplicationEvent("goodbye")
- self.server.reactor.selectable(self.event_injector)
- self.hello_rcvd = None
- self.goodbye_rcvd = None
- self.server.start()
-
- def tearDown(self):
- self.server.stop()
-
- def _wait_for(self, predicate, timeout=10.0):
- deadline = time.time() + timeout
- while time.time() < deadline:
- if predicate():
- break
- time.sleep(0.1)
- assert predicate()
-
- def test_application_events(self):
- self.event_injector.trigger(self.hello_event)
- self._wait_for(lambda: self.hello_rcvd is not None)
- self.event_injector.trigger(self.goodbye_event)
- self._wait_for(lambda: self.goodbye_rcvd is not None)
-
-
class AuthenticationTestHandler(MessagingHandler):
def __init__(self):
super(AuthenticationTestHandler, self).__init__()
diff --git a/tests/python/proton_tests/sasl.py b/tests/python/proton_tests/sasl.py
index 0f5f525..782cbc5 100644
--- a/tests/python/proton_tests/sasl.py
+++ b/tests/python/proton_tests/sasl.py
@@ -101,43 +101,6 @@
def pump(self):
pump(self.t1, self.t2, 1024)
- # We have to generate the client frames manually because proton does not
- # generate pipelined SASL and AMQP frames together
- def testIllegalProtocolLayering(self):
- # TODO: Skip Proton-J for now
- if "java" in sys.platform:
- raise Skipped("Proton-J does not set error condition on protocol layering violation")
-
- # Server
- self.s2.allowed_mechs('ANONYMOUS')
-
- c2 = Connection()
- self.t2.bind(c2)
-
- assert self.s2.outcome is None
-
- # Push client bytes into server
- self.t2.push(str2bin(
- # SASL
- 'AMQP\x03\x01\x00\x00'
- # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
- '\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
- # SASL (again illegally)
- 'AMQP\x03\x01\x00\x00'
- # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
- '\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
- # AMQP
- 'AMQP\x00\x01\x00\x00'
- # @open(16) [container-id="", channel-max=1234]
- '\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@'
- ))
-
- consumeAllOuput(self.t2)
-
- assert self.t2.condition
- assert self.t2.closed
- assert not c2.state & Endpoint.REMOTE_ACTIVE
-
def testPipelinedClient(self):
# TODO: When PROTON-1136 is fixed then remove this test
if "java" in sys.platform:
@@ -267,345 +230,3 @@
assert sasl1 == sasl2
assert sasl1.my_attribute == attr
assert sasl2.my_attribute == attr
-
- def testSaslSkipped(self):
- """Verify that the server (with SASL) correctly handles a client without SASL"""
- self.t1 = Transport()
- self.t2.require_auth(False)
- self.pump()
- assert self.s2.outcome == None
- assert self.t2.condition == None
- assert self.t2.authenticated == False
- assert self.s1.outcome == None
- assert self.t1.condition == None
- assert self.t1.authenticated == False
-
- def testSaslSkippedFail(self):
- """Verify that the server (with SASL) correctly handles a client without SASL"""
- self.t1 = Transport()
- self.t2.require_auth(True)
- self.pump()
- assert self.s2.outcome == None
- assert self.t2.condition != None
- assert self.s1.outcome == None
- assert self.t1.condition != None
-
- def testMechNotFound(self):
- if "java" in sys.platform:
- raise Skipped("Proton-J does not support checking authentication state")
- self.c1 = Connection()
- self.c1.open()
- self.t1.bind(self.c1)
- self.s1.allowed_mechs('IMPOSSIBLE')
-
- self.pump()
-
- assert self.t2.authenticated == False
- assert self.t1.authenticated == False
- assert self.s1.outcome != SASL.OK
- assert self.s2.outcome != SASL.OK
-
-class SASLMechTest(Test):
- def setUp(self):
- self.t1 = Transport()
- self.s1 = SASL(self.t1)
- self.t2 = Transport(Transport.SERVER)
- self.s2 = SASL(self.t2)
-
- self.c1 = Connection()
- self.c1.user = 'user@proton'
- self.c1.password = 'password'
- self.c1.hostname = 'localhost'
-
- self.c2 = Connection()
-
- def testANON(self):
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'ANONYMOUS', authUser='anonymous')
-
- def testCRAMMD5(self):
- common.ensureCanTestExtendedSASL()
-
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'CRAM-MD5')
-
- def testDIGESTMD5(self):
- common.ensureCanTestExtendedSASL()
-
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5')
-
- # PLAIN shouldn't work without encryption without special setting
- def testPLAINfail(self):
- common.ensureCanTestExtendedSASL()
-
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'PLAIN', authenticated=False)
-
- # Client won't accept PLAIN even if offered by server without special setting
- def testPLAINClientFail(self):
- common.ensureCanTestExtendedSASL()
-
- self.s2.allow_insecure_mechs = True
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'PLAIN', authenticated=False)
-
- # PLAIN will only work if both ends are specially set up
- def testPLAIN(self):
- common.ensureCanTestExtendedSASL()
-
- self.s1.allow_insecure_mechs = True
- self.s2.allow_insecure_mechs = True
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'PLAIN')
-
-# SCRAM not supported before Cyrus SASL 2.1.26
-# so not universal and hance need a test for support
-# to keep it in tests.
-# def testSCRAMSHA1(self):
-# common.ensureCanTestExtendedSASL()
-#
-# self.t1.bind(self.c1)
-# self.t2.bind(self.c2)
-# _testSaslMech(self, 'SCRAM-SHA-1')
-
-def _sslConnection(domain, transport, connection):
- transport.bind(connection)
- ssl = SSL(transport, domain, None )
- return connection
-
-class SSLSASLTest(Test):
- def setUp(self):
- if not common.isSSLPresent():
- raise Skipped("No SSL libraries found.")
-
- self.server_domain = SSLDomain(SSLDomain.MODE_SERVER)
- self.client_domain = SSLDomain(SSLDomain.MODE_CLIENT)
-
- self.t1 = Transport()
- self.s1 = SASL(self.t1)
- self.t2 = Transport(Transport.SERVER)
- self.s2 = SASL(self.t2)
-
- self.c1 = Connection()
- self.c2 = Connection()
-
- def testSSLPlainSimple(self):
- if "java" in sys.platform:
- raise Skipped("Proton-J does not support SSL with SASL")
- if not SASL.extended():
- raise Skipped("Simple SASL server does not support PLAIN")
- common.ensureCanTestExtendedSASL()
-
- clientUser = 'user@proton'
- mech = 'PLAIN'
-
- self.c1.user = clientUser
- self.c1.password = 'password'
- self.c1.hostname = 'localhost'
-
- ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
- ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
-
- _testSaslMech(self, mech, encrypted=True)
-
- def testSSLPlainSimpleFail(self):
- if "java" in sys.platform:
- raise Skipped("Proton-J does not support SSL with SASL")
- if not SASL.extended():
- raise Skipped("Simple SASL server does not support PLAIN")
- common.ensureCanTestExtendedSASL()
-
- clientUser = 'usr@proton'
- mech = 'PLAIN'
-
- self.c1.user = clientUser
- self.c1.password = 'password'
- self.c1.hostname = 'localhost'
-
- ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
- ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
-
- _testSaslMech(self, mech, clientUser='usr@proton', encrypted=True, authenticated=False)
-
- def testSSLExternalSimple(self):
- if "java" in sys.platform:
- raise Skipped("Proton-J does not support SSL with SASL")
-
- if os.name=="nt":
- extUser = 'O=Client, CN=127.0.0.1'
- else:
- extUser = 'O=Client,CN=127.0.0.1'
- mech = 'EXTERNAL'
-
- self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"),
- _sslCertpath("server-private-key.pem"),
- "server-password")
- self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
- self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER,
- _sslCertpath("ca-certificate.pem") )
- self.client_domain.set_credentials(_sslCertpath("client-certificate.pem"),
- _sslCertpath("client-private-key.pem"),
- "client-password")
- self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
-
- ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
- ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
-
- _testSaslMech(self, mech, clientUser=None, authUser=extUser, encrypted=True)
-
- def testSSLExternalSimpleFail(self):
- if "java" in sys.platform:
- raise Skipped("Proton-J does not support SSL with SASL")
-
- mech = 'EXTERNAL'
-
- self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"),
- _sslCertpath("server-private-key.pem"),
- "server-password")
- self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
- self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER,
- _sslCertpath("ca-certificate.pem") )
- self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
-
- ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
- ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
-
- _testSaslMech(self, mech, clientUser=None, authUser=None, encrypted=None, authenticated=False)
-
-class SASLEventTest(engine.CollectorTest):
- def setUp(self):
- engine.CollectorTest.setUp(self)
- self.t1 = Transport()
- self.s1 = SASL(self.t1)
- self.t2 = Transport(Transport.SERVER)
- self.s2 = SASL(self.t2)
-
- self.c1 = Connection()
- self.c1.user = 'user@proton'
- self.c1.password = 'password'
- self.c1.hostname = 'localhost'
-
- self.c2 = Connection()
-
- self.collector = Collector()
-
- def testNormalAuthenticationClient(self):
- common.ensureCanTestExtendedSASL()
- self.c1.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5')
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.CONNECTION_REMOTE_OPEN)
-
- def testNormalAuthenticationServer(self):
- common.ensureCanTestExtendedSASL()
- self.c2.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5')
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.CONNECTION_REMOTE_OPEN)
-
- def testFailedAuthenticationClient(self):
- common.ensureCanTestExtendedSASL()
- clientUser = "usr@proton"
- self.c1.user = clientUser
- self.c1.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5', clientUser=clientUser, authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_ERROR,
- Event.TRANSPORT_TAIL_CLOSED,
- Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testFailedAuthenticationServer(self):
- common.ensureCanTestExtendedSASL()
- clientUser = "usr@proton"
- self.c1.user = clientUser
- self.c2.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5', clientUser=clientUser, authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_ERROR,
- Event.TRANSPORT_TAIL_CLOSED,
- Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testNoMechClient(self):
- common.ensureCanTestExtendedSASL()
- self.c1.collect(self.collector)
- self.s2.allowed_mechs('IMPOSSIBLE')
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5', authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_ERROR,
- Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testNoMechServer(self):
- common.ensureCanTestExtendedSASL()
- self.c2.collect(self.collector)
- self.s2.allowed_mechs('IMPOSSIBLE')
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5', authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_TAIL_CLOSED,
- Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testDisallowedMechClient(self):
- self.c1.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'IMPOSSIBLE', authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_ERROR,
- Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testDisallowedMechServer(self):
- self.c2.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'IMPOSSIBLE', authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_TAIL_CLOSED,
- Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testDisallowedPlainClient(self):
- self.c1.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'PLAIN', authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_ERROR,
- Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testDisallowedPlainServer(self):
- self.c2.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'PLAIN', authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_TAIL_CLOSED,
- Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
diff --git a/tests/python/proton_tests/ssl.py b/tests/python/proton_tests/ssl.py
index 225afa7..1a05780 100644
--- a/tests/python/proton_tests/ssl.py
+++ b/tests/python/proton_tests/ssl.py
@@ -185,66 +185,6 @@
server.connection.close()
self._pump( client, server )
- def test_certificate_fingerprint_and_subfields(self):
- if os.name=="nt":
- raise Skipped("Windows support for certificate fingerprint and subfield not implemented yet")
-
- if "java" in sys.platform:
- raise Skipped("Not yet implemented in Proton-J")
-
- self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
- self._testpath("server-private-key.pem"),
- "server-password")
- self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.server_domain.set_peer_authentication( SSLDomain.VERIFY_PEER,
- self._testpath("ca-certificate.pem") )
- server = SslTest.SslTestConnection( self.server_domain, mode=Transport.SERVER )
-
- # give the client a certificate, but let's not require server authentication
- self.client_domain.set_credentials(self._testpath("client-certificate1.pem"),
- self._testpath("client-private-key1.pem"),
- "client-password")
- self.client_domain.set_peer_authentication( SSLDomain.ANONYMOUS_PEER )
- client = SslTest.SslTestConnection( self.client_domain )
-
- client.connection.open()
- server.connection.open()
- self._pump( client, server )
-
- # Test the subject subfields
- self.assertEqual("Client", server.ssl.get_cert_organization())
- self.assertEqual("Dev", server.ssl.get_cert_organization_unit())
- self.assertEqual("ST", server.ssl.get_cert_state_or_province())
- self.assertEqual("US", server.ssl.get_cert_country())
- self.assertEqual("City", server.ssl.get_cert_locality_or_city())
- self.assertEqual("O=Server,CN=A1.Good.Server.domain.com", client.ssl.get_cert_subject())
- self.assertEqual("O=Client,CN=127.0.0.1,C=US,ST=ST,L=City,OU=Dev", server.ssl.get_cert_subject())
-
- self.assertEqual("03c97341abafe5861d6969c66a190d2846d905d6", server.ssl.get_cert_fingerprint_sha1())
- self.assertEqual("ad86cc76278e69aef3a5c0dda13fc49831622f92d1364ce1ed361ff842b0143a", server.ssl.get_cert_fingerprint_sha256())
- self.assertEqual("9fb3340ecee4471534d60be025358cae33ef2cc9442ca8bb7703a68db68d9ffb7963678292996011fa55a9a2524b84a40a11a2778f25797e78e23cf05623218d",
- server.ssl.get_cert_fingerprint_sha512())
- self.assertEqual("0d4faa84a0bb6846eaec6b7493916b30", server.ssl.get_cert_fingerprint_md5())
-
- # Test the various fingerprint algorithms
- self.assertEqual("f390ddb4dc8a90bcf3528774b622a7f87f7322b5", client.ssl.get_cert_fingerprint_sha1())
- self.assertEqual("c116e902345c99bc01dda14b7a5f1b8ae6a451eddb23e5336c996ba4d12bc122", client.ssl.get_cert_fingerprint_sha256())
- self.assertEqual("8941c8ce00824ab7196bb1952787c90ef7f9bd837cbb0bb4823f57fc89e80033c75adc98b78801928d0035bcd6db6ddc9ab6da026c6548a66ede5c4f43f7e166",
- client.ssl.get_cert_fingerprint_sha512())
- self.assertEqual("d008bf05afbc983a3f98ae56e3eba643", client.ssl.get_cert_fingerprint_md5())
-
- self.assertEqual(None, client.ssl.get_cert_fingerprint(21, SSL.SHA1)) # Should be at least 41
- self.assertEqual(None, client.ssl.get_cert_fingerprint(50, SSL.SHA256)) # Should be at least 65
- self.assertEqual(None, client.ssl.get_cert_fingerprint(128, SSL.SHA512)) # Should be at least 129
- self.assertEqual(None, client.ssl.get_cert_fingerprint(10, SSL.MD5)) # Should be at least 33
- self.assertEqual(None, client.ssl._get_cert_subject_unknown_subfield())
-
- self.assertNotEqual(None, client.ssl.get_cert_fingerprint(50, SSL.SHA1)) # Should be at least 41
- self.assertNotEqual(None, client.ssl.get_cert_fingerprint(70, SSL.SHA256)) # Should be at least 65
- self.assertNotEqual(None, client.ssl.get_cert_fingerprint(130, SSL.SHA512)) # Should be at least 129
- self.assertNotEqual(None, client.ssl.get_cert_fingerprint(35, SSL.MD5)) # Should be at least 33
- self.assertEqual(None, client.ssl._get_cert_fingerprint_unknown_hash_alg())
-
def test_client_authentication(self):
""" Force the client to authenticate.
"""
@@ -496,82 +436,6 @@
assert client.connection.state & Endpoint.REMOTE_UNINIT
assert server.connection.state & Endpoint.REMOTE_UNINIT
- def test_session_resume(self):
- """ Test resume of client session.
- """
- if os.name=="nt":
- raise Skipped("Windows SChannel session resume not yet implemented.")
-
- self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
- self._testpath("server-private-key.pem"),
- "server-password")
- self.server_domain.set_peer_authentication( SSLDomain.ANONYMOUS_PEER )
-
- self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER )
-
- # details will be used in initial and subsequent connections to allow session to be resumed
- initial_session_details = SSLSessionDetails("my-session-id")
-
- server = SslTest.SslTestConnection( self.server_domain, mode=Transport.SERVER )
- client = SslTest.SslTestConnection( self.client_domain, session_details=initial_session_details )
-
- # bring up the connection and store its state
- client.connection.open()
- server.connection.open()
- self._pump( client, server )
- assert client.ssl.protocol_name() is not None
-
- # cleanly shutdown the connection
- client.connection.close()
- server.connection.close()
- self._pump( client, server )
-
- # destroy the existing clients
- del client
- del server
-
- # now create a new set of connections, use last session id
- server = SslTest.SslTestConnection( self.server_domain, mode=Transport.SERVER )
- # provide the details of the last session, allowing it to be resumed
- client = SslTest.SslTestConnection( self.client_domain, session_details=initial_session_details )
-
- #client.transport.trace(Transport.TRACE_DRV)
- #server.transport.trace(Transport.TRACE_DRV)
-
- client.connection.open()
- server.connection.open()
- self._pump( client, server )
- assert server.ssl.protocol_name() is not None
- if(API_LANGUAGE=="C"):
- assert client.ssl.resume_status() == SSL.RESUME_REUSED
- else:
- # Java gives no way to check whether a previous session has been resumed
- pass
-
- client.connection.close()
- server.connection.close()
- self._pump( client, server )
-
- # now try to resume using an unknown session-id, expect resume to fail
- # and a new session is negotiated
-
- del client
- del server
-
- server = SslTest.SslTestConnection( self.server_domain, mode=Transport.SERVER )
- client = SslTest.SslTestConnection( self.client_domain, session_details=SSLSessionDetails("some-other-session-id") )
-
- client.connection.open()
- server.connection.open()
- self._pump( client, server )
- assert server.ssl.protocol_name() is not None
- if(API_LANGUAGE=="C"):
- assert client.ssl.resume_status() == SSL.RESUME_NEW
-
- client.connection.close()
- server.connection.close()
- self._pump( client, server )
def test_multiple_sessions(self):
""" Test multiple simultaineous active SSL sessions with bi-directional
@@ -612,226 +476,6 @@
s[0].connection.close()
self._pump( s[1], s[0] )
- def test_server_hostname_authentication(self):
- """ Test authentication of the names held in the server's certificate
- against various configured hostnames.
- """
- if os.name=="nt":
- raise Skipped("PROTON-1057: disable temporarily on Windows.")
-
- # Check the CommonName matches (case insensitive).
- # Assumes certificate contains "CN=A1.Good.Server.domain.com"
- self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
- self._testpath("server-private-key.pem"),
- "server-password")
- self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER_NAME )
-
- server = SslTest.SslTestConnection( self.server_domain, mode=Transport.SERVER )
- client = SslTest.SslTestConnection( self.client_domain )
-
- client.ssl.peer_hostname = "a1.good.server.domain.com"
- assert client.ssl.peer_hostname == "a1.good.server.domain.com"
- self._do_handshake( client, server )
- del server
- del client
- self.tearDown()
-
- # Should fail on CN name mismatch:
- self.setUp()
- self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
- self._testpath("server-private-key.pem"),
- "server-password")
- self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER_NAME )
-
- server = SslTest.SslTestConnection( self.server_domain, mode=Transport.SERVER )
- client = SslTest.SslTestConnection( self.client_domain )
-
- client.ssl.peer_hostname = "A1.Good.Server.domain.comX"
- self._do_handshake( client, server )
- assert client.transport.closed
- assert server.transport.closed
- assert client.connection.state & Endpoint.REMOTE_UNINIT
- assert server.connection.state & Endpoint.REMOTE_UNINIT
- del server
- del client
- self.tearDown()
-
- # Wildcarded Certificate
- # Assumes:
- # 1) certificate contains Server Alternate Names:
- # "alternate.name.one.com" and "another.name.com"
- # 2) certificate has wildcarded CommonName "*.prefix*.domain.com"
- #
-
- # Pass: match an alternate
- self.setUp()
- self.server_domain.set_credentials(self._testpath("server-wc-certificate.pem"),
- self._testpath("server-wc-private-key.pem"),
- "server-password")
- self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER_NAME )
-
- server = SslTest.SslTestConnection( self.server_domain, mode=Transport.SERVER )
- client = SslTest.SslTestConnection( self.client_domain )
-
- client.ssl.peer_hostname = "alternate.Name.one.com"
- self._do_handshake( client, server )
- del client
- del server
- self.tearDown()
-
- # Pass: match an alternate
- self.setUp()
- self.server_domain.set_credentials(self._testpath("server-wc-certificate.pem"),
- self._testpath("server-wc-private-key.pem"),
- "server-password")
- self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER_NAME )
-
- server = SslTest.SslTestConnection( self.server_domain, mode=Transport.SERVER )
- client = SslTest.SslTestConnection( self.client_domain )
-
- client.ssl.peer_hostname = "ANOTHER.NAME.COM"
- self._do_handshake(client, server)
- del client
- del server
- self.tearDown()
-
- # Pass: match the pattern
- self.setUp()
- self.server_domain.set_credentials(self._testpath("server-wc-certificate.pem"),
- self._testpath("server-wc-private-key.pem"),
- "server-password")
- self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER_NAME )
-
- server = SslTest.SslTestConnection( self.server_domain, mode=Transport.SERVER )
- client = SslTest.SslTestConnection( self.client_domain )
-
- client.ssl.peer_hostname = "SOME.PREfix.domain.COM"
- self._do_handshake( client, server )
- del client
- del server
- self.tearDown()
-
- # Pass: match the pattern
- self.setUp()
- self.server_domain.set_credentials(self._testpath("server-wc-certificate.pem"),
- self._testpath("server-wc-private-key.pem"),
- "server-password")
- self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER_NAME )
-
- server = SslTest.SslTestConnection( self.server_domain, mode=Transport.SERVER )
- client = SslTest.SslTestConnection( self.client_domain )
-
- client.ssl.peer_hostname = "FOO.PREfixZZZ.domain.com"
- self._do_handshake( client, server )
- del client
- del server
- self.tearDown()
-
- # Fail: must match prefix on wildcard
- self.setUp()
- self.server_domain.set_credentials(self._testpath("server-wc-certificate.pem"),
- self._testpath("server-wc-private-key.pem"),
- "server-password")
- self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER_NAME )
-
- server = SslTest.SslTestConnection( self.server_domain, mode=Transport.SERVER )
- client = SslTest.SslTestConnection( self.client_domain )
-
- client.ssl.peer_hostname = "FOO.PREfi.domain.com"
- self._do_handshake( client, server )
- assert client.transport.closed
- assert server.transport.closed
- assert client.connection.state & Endpoint.REMOTE_UNINIT
- assert server.connection.state & Endpoint.REMOTE_UNINIT
- del server
- del client
- self.tearDown()
-
- # Fail: leading wildcards are not optional
- self.setUp()
- self.server_domain.set_credentials(self._testpath("server-wc-certificate.pem"),
- self._testpath("server-wc-private-key.pem"),
- "server-password")
- self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER_NAME )
-
- server = SslTest.SslTestConnection( self.server_domain, mode=Transport.SERVER )
- client = SslTest.SslTestConnection( self.client_domain )
-
- client.ssl.peer_hostname = "PREfix.domain.COM"
- self._do_handshake( client, server )
- assert client.transport.closed
- assert server.transport.closed
- assert client.connection.state & Endpoint.REMOTE_UNINIT
- assert server.connection.state & Endpoint.REMOTE_UNINIT
- self.tearDown()
-
- # Pass: ensure that the user can give an alternate name that overrides
- # the connection's configured hostname
- self.setUp()
- self.server_domain.set_credentials(self._testpath("server-wc-certificate.pem"),
- self._testpath("server-wc-private-key.pem"),
- "server-password")
- self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER_NAME )
-
- server = SslTest.SslTestConnection(self.server_domain, mode=Transport.SERVER)
- client = SslTest.SslTestConnection(self.client_domain,
- conn_hostname="This.Name.Does.not.Match",
- ssl_peername="alternate.name.one.com")
- self._do_handshake(client, server)
- del client
- del server
- self.tearDown()
-
- # Pass: ensure that the hostname supplied by the connection is used if
- # none has been specified for the SSL instanace
- self.setUp()
- self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
- self._testpath("server-private-key.pem"),
- "server-password")
- self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER_NAME )
-
- server = SslTest.SslTestConnection(self.server_domain, mode=Transport.SERVER)
- client = SslTest.SslTestConnection(self.client_domain,
- conn_hostname="a1.good.server.domain.com")
- self._do_handshake(client, server)
- del client
- del server
- self.tearDown()
-
- def test_server_hostname_authentication_2(self):
- """Initially separated from test_server_hostname_authentication
- above to force Windows checking and sidestep PROTON-1057 exclusion.
- """
-
- # Fail for a null peer name.
- self.server_domain.set_credentials(self._testpath("server-wc-certificate.pem"),
- self._testpath("server-wc-private-key.pem"),
- "server-password")
- self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER_NAME )
-
- server = SslTest.SslTestConnection( self.server_domain, mode=Transport.SERVER )
- client = SslTest.SslTestConnection( self.client_domain )
-
- # Next line results in an eventual pn_ssl_set_peer_hostname(client.ssl._ssl, None)
- client.ssl.peer_hostname = None
- self._do_handshake( client, server )
- assert client.transport.closed
- assert server.transport.closed
- assert client.connection.state & Endpoint.REMOTE_UNINIT
- assert server.connection.state & Endpoint.REMOTE_UNINIT
- self.tearDown()
-
def test_singleton(self):
"""Verify that only a single instance of SSL can exist per Transport"""
transport = Transport()
diff --git a/tests/python/proton_tests/transport.py b/tests/python/proton_tests/transport.py
index 5a5cea2..205211b 100644
--- a/tests/python/proton_tests/transport.py
+++ b/tests/python/proton_tests/transport.py
@@ -377,18 +377,3 @@
# server closed
assert self.transport.pending() < 0
-
-class LogTest(Test):
-
- def testTracer(self):
- t = Transport()
- assert t.tracer is None
- messages = []
- def tracer(transport, message):
- messages.append((transport, message))
- t.tracer = tracer
- assert t.tracer is tracer
- t.log("one")
- t.log("two")
- t.log("three")
- assert messages == [(t, "one"), (t, "two"), (t, "three")], messages
diff --git a/tests/python/proton_tests/utils.py b/tests/python/proton_tests/utils.py
index 0766eb9..78157ca 100644
--- a/tests/python/proton_tests/utils.py
+++ b/tests/python/proton_tests/utils.py
@@ -26,7 +26,7 @@
from proton.handlers import MessagingHandler
from proton.reactor import Container
from proton.utils import SyncRequestResponse, BlockingConnection
-from .common import Skipped, ensureCanTestExtendedSASL
+from .common import Skipped
CONNECTION_PROPERTIES={u'connection': u'properties'}
OFFERED_CAPABILITIES = Array(UNDESCRIBED, Data.SYMBOL, symbol("O_one"), symbol("O_two"), symbol("O_three"))
DESIRED_CAPABILITIES = Array(UNDESCRIBED, Data.SYMBOL, symbol("D_one"), symbol("D_two"), symbol("D_three"))
@@ -97,70 +97,3 @@
if conn.remote_desired_capabilities == DESIRED_CAPABILITIES:
self.desired_capabilities_received = True
-class SyncRequestResponseTest(Test):
- """Test SyncRequestResponse"""
-
- def test_request_response(self):
- ensureCanTestExtendedSASL()
- def test(name, address="x"):
- for i in range(5):
- body="%s%s" % (name, i)
- response = client.call(Message(address=address, body=body))
- self.assertEquals(response.address, client.reply_to)
- self.assertEquals(response.body, body)
-
- server = EchoServer(Url(host="127.0.0.1", port=free_tcp_port()), self.timeout)
- server.start()
- server.wait()
- connection = BlockingConnection(server.url, timeout=self.timeout)
- client = SyncRequestResponse(connection)
- try:
- test("foo") # Simple request/resposne
- finally:
- client.connection.close()
- server.join(timeout=self.timeout)
-
-
- def test_connection_properties(self):
- ensureCanTestExtendedSASL()
- server = ConnPropertiesServer(Url(host="127.0.0.1", port=free_tcp_port()), timeout=self.timeout)
- server.start()
- server.wait()
- connection = BlockingConnection(server.url, timeout=self.timeout, properties=CONNECTION_PROPERTIES, offered_capabilities=OFFERED_CAPABILITIES, desired_capabilities=DESIRED_CAPABILITIES)
- client = SyncRequestResponse(connection)
- client.connection.close()
- server.join(timeout=self.timeout)
- self.assertEquals(server.properties_received, True)
- self.assertEquals(server.offered_capabilities_received, True)
- self.assertEquals(server.desired_capabilities_received, True)
-
- def test_allowed_mechs_external(self):
- # All this test does it make sure that if we pass allowed_mechs to BlockingConnection, it is actually used.
- if "java" in sys.platform:
- raise Skipped("")
- port = free_tcp_port()
- server = ConnPropertiesServer(Url(host="127.0.0.1", port=port), timeout=self.timeout)
- server.start()
- server.wait()
- try:
- # This will cause an exception because we are specifying allowed_mechs as EXTERNAL. The SASL handshake will fail because the server is not setup to handle EXTERNAL
- connection = BlockingConnection(server.url, timeout=self.timeout, properties=CONNECTION_PROPERTIES, offered_capabilities=OFFERED_CAPABILITIES, desired_capabilities=DESIRED_CAPABILITIES, allowed_mechs=EXTERNAL)
- self.fail("Expected ConnectionException")
- except ConnectionException as e:
- self.assertTrue('amqp:unauthorized-access' in str(e), "expected unauthorized-access")
- server.join(timeout=self.timeout)
-
- def test_allowed_mechs_anonymous(self):
- # All this test does it make sure that if we pass allowed_mechs to BlockingConnection, it is actually used.
- server = ConnPropertiesServer(Url(host="127.0.0.1", port=free_tcp_port()), timeout=self.timeout)
- server.start()
- server.wait()
- # An ANONYMOUS allowed_mechs will work, anonymous connections are allowed by ConnPropertiesServer
- connection = BlockingConnection(server.url, timeout=self.timeout, properties=CONNECTION_PROPERTIES, offered_capabilities=OFFERED_CAPABILITIES, desired_capabilities=DESIRED_CAPABILITIES, allowed_mechs=ANONYMOUS)
- client = SyncRequestResponse(connection)
- client.connection.close()
- server.join(timeout=self.timeout)
- self.assertEquals(server.properties_received, True)
- self.assertEquals(server.offered_capabilities_received, True)
- self.assertEquals(server.desired_capabilities_received, True)
-