| import multiprocessing |
| import os |
| import posixpath |
| import html |
| import base64 |
| from http.server import SimpleHTTPRequestHandler, HTTPServer, HTTPStatus |
| |
| |
| class Unauthorized(Exception): |
| pass |
| |
| |
| class RequestHandler(SimpleHTTPRequestHandler): |
| def get_root_dir(self): |
| authorization = self.headers.get("authorization") |
| if not authorization: |
| if not self.server.anonymous_dir: |
| raise Unauthorized("unauthorized") |
| return self.server.anonymous_dir |
| else: |
| authorization = authorization.split() |
| if len(authorization) != 2 or authorization[0].lower() != "basic": |
| raise Unauthorized("unauthorized") |
| try: |
| decoded = base64.decodebytes(authorization[1].encode("ascii")) |
| user, password = decoded.decode("ascii").split(":") |
| expected_password, directory = self.server.users[user] |
| if password == expected_password: |
| return directory |
| except: # noqa |
| raise Unauthorized("unauthorized") |
| return None |
| |
| def unauthorized(self): |
| shortmsg, longmsg = self.responses[HTTPStatus.UNAUTHORIZED] |
| self.send_response(HTTPStatus.UNAUTHORIZED, shortmsg) |
| self.send_header("Connection", "close") |
| |
| content = self.error_message_format % { |
| "code": HTTPStatus.UNAUTHORIZED, |
| "message": html.escape(longmsg, quote=False), |
| "explain": html.escape(longmsg, quote=False), |
| } |
| body = content.encode("UTF-8", "replace") |
| self.send_header("Content-Type", self.error_content_type) |
| self.send_header("Content-Length", str(len(body))) |
| self.send_header("WWW-Authenticate", 'Basic realm="{}"'.format(self.server.realm)) |
| self.end_headers() |
| self.end_headers() |
| |
| if self.command != "HEAD" and body: |
| self.wfile.write(body) |
| |
| def do_GET(self): |
| try: |
| super().do_GET() |
| except Unauthorized: |
| self.unauthorized() |
| |
| def do_HEAD(self): |
| try: |
| super().do_HEAD() |
| except Unauthorized: |
| self.unauthorized() |
| |
| def translate_path(self, path): |
| path = path.split("?", 1)[0] |
| path = path.split("#", 1)[0] |
| path = posixpath.normpath(path) |
| assert posixpath.isabs(path) |
| path = posixpath.relpath(path, "/") |
| return os.path.join(self.get_root_dir(), path) |
| |
| |
| class AuthHTTPServer(HTTPServer): |
| def __init__(self, *args, **kwargs): |
| self.users = {} |
| self.anonymous_dir = None |
| self.realm = "Realm" |
| super().__init__(*args, **kwargs) |
| |
| |
| class SimpleHttpServer(multiprocessing.Process): |
| def __init__(self): |
| super().__init__() |
| self.server = AuthHTTPServer(("127.0.0.1", 0), RequestHandler) |
| self.started = False |
| |
| def start(self): |
| self.started = True |
| super().start() |
| |
| def run(self): |
| self.server.serve_forever() |
| |
| def stop(self): |
| if not self.started: |
| return |
| self.terminate() |
| self.join() |
| |
| def allow_anonymous(self, cwd): |
| self.server.anonymous_dir = cwd |
| |
| def add_user(self, user, password, cwd): |
| self.server.users[user] = (password, cwd) |
| |
| def base_url(self): |
| return "http://127.0.0.1:{}".format(self.server.server_port) |