| import inspect |
| import logging |
| import os |
| import re |
| import subprocess |
| from typing import Dict, Any |
| |
| from pyhttpd.certs import CertificateSpec |
| from pyhttpd.conf import HttpdConf |
| from pyhttpd.env import HttpdTestEnv, HttpdTestSetup |
| |
| log = logging.getLogger(__name__) |
| |
| |
| class H2TestSetup(HttpdTestSetup): |
| |
| def __init__(self, env: 'HttpdTestEnv'): |
| super().__init__(env=env) |
| self.add_source_dir(os.path.dirname(inspect.getfile(H2TestSetup))) |
| self.add_modules(["http2", "proxy_http2", "cgid", "autoindex", "ssl"]) |
| |
| def make(self): |
| super().make() |
| self._add_h2test() |
| self._setup_data_1k_1m() |
| |
| def _add_h2test(self): |
| local_dir = os.path.dirname(inspect.getfile(H2TestSetup)) |
| p = subprocess.run([self.env.apxs, '-c', 'mod_h2test.c'], |
| capture_output=True, |
| cwd=os.path.join(local_dir, 'mod_h2test')) |
| rv = p.returncode |
| if rv != 0: |
| log.error(f"compiling md_h2test failed: {p.stderr}") |
| raise Exception(f"compiling md_h2test failed: {p.stderr}") |
| |
| modules_conf = os.path.join(self.env.server_dir, 'conf/modules.conf') |
| with open(modules_conf, 'a') as fd: |
| # load our test module which is not installed |
| fd.write(f"LoadModule h2test_module \"{local_dir}/mod_h2test/.libs/mod_h2test.so\"\n") |
| |
| def _setup_data_1k_1m(self): |
| s90 = "01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678\n" |
| with open(os.path.join(self.env.gen_dir, "data-1k"), 'w') as f: |
| for i in range(10): |
| f.write(f"{i:09d}-{s90}") |
| with open(os.path.join(self.env.gen_dir, "data-10k"), 'w') as f: |
| for i in range(100): |
| f.write(f"{i:09d}-{s90}") |
| with open(os.path.join(self.env.gen_dir, "data-100k"), 'w') as f: |
| for i in range(1000): |
| f.write(f"{i:09d}-{s90}") |
| with open(os.path.join(self.env.gen_dir, "data-1m"), 'w') as f: |
| for i in range(10000): |
| f.write(f"{i:09d}-{s90}") |
| |
| |
| class H2TestEnv(HttpdTestEnv): |
| |
| @classmethod |
| @property |
| def is_unsupported(cls): |
| mpm_module = f"mpm_{os.environ['MPM']}" if 'MPM' in os.environ else 'mpm_event' |
| return mpm_module == 'mpm_prefork' |
| |
| def __init__(self, pytestconfig=None): |
| super().__init__(pytestconfig=pytestconfig) |
| self.add_httpd_conf([ |
| "H2MinWorkers 1", |
| "H2MaxWorkers 64", |
| "Protocols h2 http/1.1 h2c", |
| ]) |
| self.add_httpd_log_modules(["http2", "proxy_http2", "h2test", "proxy", "proxy_http"]) |
| self.add_cert_specs([ |
| CertificateSpec(domains=[ |
| f"push.{self._http_tld}", |
| f"hints.{self._http_tld}", |
| f"ssl.{self._http_tld}", |
| f"pad0.{self._http_tld}", |
| f"pad1.{self._http_tld}", |
| f"pad2.{self._http_tld}", |
| f"pad3.{self._http_tld}", |
| f"pad8.{self._http_tld}", |
| ]), |
| CertificateSpec(domains=[f"noh2.{self.http_tld}"], key_type='rsa2048'), |
| ]) |
| |
| self.httpd_error_log.set_ignored_lognos([ |
| 'AH02032', |
| 'AH01276', |
| 'AH01630', |
| 'AH00135', |
| 'AH02261', # Re-negotiation handshake failed (our test_101) |
| 'AH03490', # scoreboard full, happens on limit tests |
| 'AH02429', # invalid chars in response header names, see test_h2_200 |
| 'AH02430', # invalid chars in response header values, see test_h2_200 |
| 'AH10373', # SSL errors on uncompleted handshakes, see test_h2_105 |
| 'AH01247', # mod_cgid sometimes freaks out on load tests |
| 'AH01110', # error by proxy reading response |
| 'AH10400', # warning that 'enablereuse' has not effect in certain configs test_h2_600 |
| 'AH00045', # child did not exit in time, SIGTERM was sent |
| ]) |
| self.httpd_error_log.add_ignored_patterns([ |
| re.compile(r'.*malformed header from script \'hecho.py\': Bad header: x.*'), |
| re.compile(r'.*:tls_post_process_client_hello:.*'), |
| # OSSL 3 dropped the function name from the error description. Use the code instead: |
| # 0A0000C1 = no shared cipher -- Too restrictive SSLCipherSuite or using DSA server certificate? |
| re.compile(r'.*SSL Library Error: error:0A0000C1:.*'), |
| re.compile(r'.*:tls_process_client_certificate:.*'), |
| # OSSL 3 dropped the function name from the error description. Use the code instead: |
| # 0A0000C7 = peer did not return a certificate -- No CAs known to server for verification? |
| re.compile(r'.*SSL Library Error: error:0A0000C7:.*'), |
| re.compile(r'.*have incompatible TLS configurations.'), |
| ]) |
| |
| def setup_httpd(self, setup: HttpdTestSetup = None): |
| super().setup_httpd(setup=H2TestSetup(env=self)) |
| |
| |
| class H2Conf(HttpdConf): |
| |
| def __init__(self, env: HttpdTestEnv, extras: Dict[str, Any] = None): |
| super().__init__(env=env, extras=HttpdConf.merge_extras(extras, { |
| f"cgi.{env.http_tld}": [ |
| "SSLOptions +StdEnvVars", |
| "AddHandler cgi-script .py", |
| "<Location \"/h2test/echo\">", |
| " SetHandler h2test-echo", |
| "</Location>", |
| "<Location \"/h2test/delay\">", |
| " SetHandler h2test-delay", |
| "</Location>", |
| "<Location \"/h2test/error\">", |
| " SetHandler h2test-error", |
| "</Location>", |
| ] |
| })) |
| |
| def start_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=None, |
| ssl_module=None, with_certificates=None): |
| super().start_vhost(domains=domains, port=port, doc_root=doc_root, |
| with_ssl=with_ssl, ssl_module=ssl_module, |
| with_certificates=with_certificates) |
| if f"noh2.{self.env.http_tld}" in domains: |
| protos = ["http/1.1"] |
| elif port == self.env.https_port or with_ssl is True: |
| protos = ["h2", "http/1.1"] |
| else: |
| protos = ["h2c", "http/1.1"] |
| if f"test2.{self.env.http_tld}" in domains: |
| protos = reversed(protos) |
| self.add(f"Protocols {' '.join(protos)}") |
| return self |
| |
| def add_vhost_noh2(self): |
| domains = [f"noh2.{self.env.http_tld}", f"noh2-alias.{self.env.http_tld}"] |
| self.start_vhost(domains=domains, port=self.env.https_port, doc_root="htdocs/noh2") |
| self.add(["Protocols http/1.1", "SSLOptions +StdEnvVars"]) |
| self.end_vhost() |
| self.start_vhost(domains=domains, port=self.env.http_port, doc_root="htdocs/noh2") |
| self.add(["Protocols http/1.1", "SSLOptions +StdEnvVars"]) |
| self.end_vhost() |
| return self |
| |
| def add_vhost_test1(self, proxy_self=False, h2proxy_self=False): |
| return super().add_vhost_test1(proxy_self=proxy_self, h2proxy_self=h2proxy_self) |
| |
| def add_vhost_test2(self): |
| return super().add_vhost_test2() |