blob: 2b6a5df2ec2d0da5924da894eb38015f8d9e3368 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os, common
import subprocess
from proton import *
from common import Skipped
class SslTest(common.Test):
def __init__(self, *args):
common.Test.__init__(self, *args)
def setup(self):
try:
self.t_server = Transport()
self.server = SSL(self.t_server)
self.server.init(SSL.MODE_SERVER)
self.t_client = Transport()
self.client = SSL(self.t_client)
self.client.init(SSL.MODE_CLIENT)
except SSLUnavailable, e:
raise Skipped(e)
def teardown(self):
self.t_client = None
self.t_server = None
def _pump(self):
while True:
out_client = self.t_client.output(1024)
out_server = self.t_server.output(1024)
if out_client: self.t_server.input(out_client)
if out_server: self.t_client.input(out_server)
if not out_client and not out_server: break
def _testpath(self, file):
""" Set the full path to the certificate,keyfile, etc. for the test.
"""
return os.path.join(os.path.dirname(__file__),
"ssl_db/%s" % file)
def test_defaults(self):
""" By default, both the server and the client support anonymous
ciphers - they should connect without need for a certificate.
"""
client_conn = Connection()
self.t_client.bind(client_conn)
server_conn = Connection()
self.t_server.bind(server_conn)
# check that no SSL connection exists
assert not self.server.cipher_name()
assert not self.client.protocol_name()
client_conn.open()
server_conn.open()
self._pump()
# now SSL should be active
assert self.server.cipher_name() is not None
assert self.client.protocol_name() is not None
client_conn.close()
server_conn.close()
self._pump()
def test_server_certificate(self):
""" Test that anonymous clients can still connect to a server that has
a certificate configured.
"""
self.server.set_credentials(self._testpath("server-certificate.pem"),
self._testpath("server-private-key.pem"),
"server-password")
client_conn = Connection()
self.t_client.bind(client_conn)
server_conn = Connection()
self.t_server.bind(server_conn)
client_conn.open()
server_conn.open()
self._pump()
assert self.client.protocol_name() is not None
client_conn.close()
server_conn.close()
self._pump()
def test_server_authentication(self):
""" Simple SSL connection with authentication of the server
"""
self.server.set_credentials(self._testpath("server-certificate.pem"),
self._testpath("server-private-key.pem"),
"server-password")
self.client.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
self.client.set_peer_authentication( SSL.VERIFY_PEER )
client_conn = Connection()
self.t_client.bind(client_conn)
server_conn = Connection()
self.t_server.bind(server_conn)
client_conn.open()
server_conn.open()
self._pump()
assert self.client.protocol_name() is not None
client_conn.close()
server_conn.close()
self._pump()
def test_client_authentication(self):
""" Force the client to authenticate.
"""
# note: when requesting client auth, the server _must_ send its
# certificate, so make sure we configure one!
self.server.set_credentials(self._testpath("server-certificate.pem"),
self._testpath("server-private-key.pem"),
"server-password")
self.server.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
self.server.set_peer_authentication( SSL.VERIFY_PEER,
self._testpath("ca-certificate.pem") )
# give the client a certificate, but let's not require server authentication
self.client.set_credentials(self._testpath("client-certificate.pem"),
self._testpath("client-private-key.pem"),
"client-password")
self.client.set_peer_authentication( SSL.ANONYMOUS_PEER )
client_conn = Connection()
self.t_client.bind(client_conn)
server_conn = Connection()
self.t_server.bind(server_conn)
client_conn.open()
server_conn.open()
self._pump()
assert self.client.protocol_name() is not None
client_conn.close()
server_conn.close()
self._pump()
def test_client_server_authentication(self):
""" Require both client and server to mutually identify themselves.
"""
self.server.set_credentials(self._testpath("server-certificate.pem"),
self._testpath("server-private-key.pem"),
"server-password")
self.server.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
self.server.set_peer_authentication( SSL.VERIFY_PEER,
self._testpath("ca-certificate.pem") )
self.client.set_credentials(self._testpath("client-certificate.pem"),
self._testpath("client-private-key.pem"),
"client-password")
self.client.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
self.client.set_peer_authentication( SSL.VERIFY_PEER )
client_conn = Connection()
self.t_client.bind(client_conn)
server_conn = Connection()
self.t_server.bind(server_conn)
client_conn.open()
server_conn.open()
self._pump()
assert self.client.protocol_name() is not None
client_conn.close()
server_conn.close()
self._pump()
def test_server_only_authentication(self):
""" Client verifies server, but server does not verify client.
"""
self.server.set_credentials(self._testpath("server-certificate.pem"),
self._testpath("server-private-key.pem"),
"server-password")
self.server.set_peer_authentication( SSL.ANONYMOUS_PEER )
self.client.set_credentials(self._testpath("client-certificate.pem"),
self._testpath("client-private-key.pem"),
"client-password")
self.client.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
self.client.set_peer_authentication( SSL.VERIFY_PEER )
client_conn = Connection()
self.t_client.bind(client_conn)
server_conn = Connection()
self.t_server.bind(server_conn)
client_conn.open()
server_conn.open()
self._pump()
assert self.client.protocol_name() is not None
client_conn.close()
server_conn.close()
self._pump()