| ''' |
| Test certifier plugin behaviors |
| ''' |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import hashlib |
| import os |
| import re |
| |
| Test.Summary = ''' |
| Test certifier plugin behaviors |
| ''' |
| |
| Test.SkipUnless(Condition.PluginExists('certifier.so')) |
| |
| |
| class DynamicCertTest: |
| httpsReplayFile = "replays/https.replay.yaml" |
| certPathSrc = os.path.join(Test.TestDirectory, "certs") |
| host = "www.tls.com" |
| certPathDest = "" |
| |
| def __init__(self): |
| self.setupOriginServer() |
| self.setupTS() |
| |
| def setupOriginServer(self): |
| self.server = Test.MakeVerifierServerProcess("verifier-server1", self.httpsReplayFile) |
| |
| def setupTS(self): |
| self.ts = Test.MakeATSProcess("ts1", enable_tls=True) |
| self.ts.addDefaultSSLFiles() |
| # copy over the cert store in which the certs will be generated/stored |
| self.certPathDest = os.path.join(self.ts.Variables.CONFIGDIR, "certifier-certs") |
| Setup.Copy(self.certPathSrc, self.certPathDest) |
| Setup.MakeDir(os.path.join(self.certPathDest, 'store')) |
| self.ts.Disk.records_config.update( |
| { |
| "proxy.config.diags.debug.enabled": 1, |
| "proxy.config.diags.debug.tags": "http|certifier|ssl", |
| "proxy.config.ssl.server.cert.path": f'{self.ts.Variables.SSLDir}', |
| "proxy.config.ssl.server.private_key.path": f'{self.ts.Variables.SSLDir}', |
| }) |
| self.ts.Disk.ssl_multicert_yaml.AddLines( |
| """ |
| ssl_multicert: |
| - dest_ip: "*" |
| ssl_cert_name: server.pem |
| ssl_key_name: server.key |
| """.split("\n")) |
| self.ts.Disk.remap_config.AddLine(f"map / http://127.0.0.1:{self.server.Variables.http_port}/",) |
| self.ts.Disk.plugin_config.AddLine( |
| f'certifier.so -s {os.path.join(self.certPathDest, "store")} -m 1000 -c {os.path.join(self.certPathDest, "ca.cert")} -k {os.path.join(self.certPathDest, "ca.key")} -r {os.path.join(self.certPathDest, "ca-serial.txt")}' |
| ) |
| # Verify logs for dynamic generation of certs |
| self.ts.Disk.traffic_out.Content += Testers.ContainsExpression( |
| "creating shadow certs", "Verify the certifier plugin generates the certificate dynamically.") |
| |
| def runHTTPSTraffic(self): |
| tr = Test.AddTestRun("Test dynamic generation of certs") |
| tr.AddVerifierClientProcess( |
| "client1", self.httpsReplayFile, http_ports=[self.ts.Variables.port], https_ports=[self.ts.Variables.ssl_port]) |
| tr.Processes.Default.StartBefore(self.server) |
| tr.Processes.Default.StartBefore(self.ts) |
| tr.StillRunningAfter = self.server |
| tr.StillRunningAfter = self.ts |
| |
| def verifyCert(self, certPath): |
| tr = Test.AddTestRun("Verify the content of the generated cert") |
| tr.Processes.Default.Command = f'openssl x509 -in {certPath} -text -noout' |
| tr.Processes.Default.ReturnCode = 0 |
| # Verify certificate content |
| tr.Processes.Default.Streams.All += Testers.ContainsExpression( |
| "Subject: CN ?= ?www.tls.com", "Subject should match the host in the request") |
| tr.Processes.Default.Streams.All += Testers.ContainsExpression( |
| r"X509v3 extensions:\n.*X509v3 Subject Alternative Name:.*\n.*DNS:www.tls.com", |
| "Should contain the SAN extension", |
| reflags=re.MULTILINE) |
| |
| def verifyCertNotExist(self, certPath): |
| tr = Test.AddTestRun("Verify the cert doesn't exist in the store") |
| tr.Processes.Default.Command = "echo verify" |
| certFile = tr.Disk.File(certPath, exists=False) |
| |
| def run(self): |
| # the certifier plugin generates the cert and store it in a directory |
| # named with the first three character of the md5 hash of the hostname |
| genCertPath = os.path.join( |
| self.certPathDest, 'store', str(hashlib.md5(self.host.encode('utf-8')).hexdigest()[:3]), self.host + ".crt") |
| self.verifyCertNotExist(genCertPath) |
| self.runHTTPSTraffic() |
| self.verifyCert(genCertPath) |
| |
| |
| DynamicCertTest().run() |
| |
| |
| class ReuseExistingCertTest: |
| httpsReplayFile = "replays/https-two-sessions.replay.yaml" |
| certPathSrc = os.path.join(Test.TestDirectory, "certs") |
| host = "www.tls.com" |
| certPathDest = "" |
| |
| def __init__(self): |
| self.setupOriginServer() |
| self.setupTS() |
| |
| def setupOriginServer(self): |
| self.server = Test.MakeVerifierServerProcess("verifier-server2", self.httpsReplayFile) |
| |
| def setupTS(self): |
| self.ts = Test.MakeATSProcess("ts2", enable_tls=True) |
| self.ts.addDefaultSSLFiles() |
| # copy over the cert store in which the certs will be generated/stored |
| self.certPathDest = os.path.join(self.ts.Variables.CONFIGDIR, "certifier-certs") |
| Setup.Copy(self.certPathSrc, self.certPathDest) |
| Setup.MakeDir(os.path.join(self.certPathDest, 'store')) |
| self.ts.Disk.records_config.update( |
| { |
| "proxy.config.diags.debug.enabled": 1, |
| "proxy.config.diags.debug.tags": "http|certifier|ssl", |
| "proxy.config.ssl.server.cert.path": f'{self.ts.Variables.SSLDir}', |
| "proxy.config.ssl.server.private_key.path": f'{self.ts.Variables.SSLDir}', |
| }) |
| self.ts.Disk.ssl_multicert_yaml.AddLines( |
| """ |
| ssl_multicert: |
| - dest_ip: "*" |
| ssl_cert_name: server.pem |
| ssl_key_name: server.key |
| """.split("\n")) |
| self.ts.Disk.remap_config.AddLine(f"map / http://127.0.0.1:{self.server.Variables.http_port}/",) |
| self.ts.Disk.plugin_config.AddLine( |
| f'certifier.so -s {os.path.join(self.certPathDest, "store")} -m 1000 -c {os.path.join(self.certPathDest, "ca.cert")} -k {os.path.join(self.certPathDest, "ca.key")} -r {os.path.join(self.certPathDest, "ca-serial.txt")}' |
| ) |
| # Verify logs for reusing existing cert |
| self.ts.Disk.traffic_out.Content += Testers.ContainsExpression( |
| "reusing existing cert and context for www.tls.com", "Should reuse the existing certificate") |
| |
| def runHTTPSTraffic(self): |
| tr = Test.AddTestRun("Test dynamic generation of certs") |
| tr.AddVerifierClientProcess( |
| "client2", self.httpsReplayFile, http_ports=[self.ts.Variables.port], https_ports=[self.ts.Variables.ssl_port]) |
| tr.Processes.Default.StartBefore(self.server) |
| tr.Processes.Default.StartBefore(self.ts) |
| tr.StillRunningAfter = self.server |
| tr.StillRunningAfter = self.ts |
| |
| def run(self): |
| self.runHTTPSTraffic() |
| |
| |
| ReuseExistingCertTest().run() |