blob: 8591159f83872989b0221e8a1ab5cb5f85d39820 [file] [log] [blame]
import multiprocessing
import os
import posixpath
import html
import base64
from http.server import SimpleHTTPRequestHandler, HTTPServer, HTTPStatus
class Unauthorized(Exception):
pass
class RequestHandler(SimpleHTTPRequestHandler):
def get_root_dir(self):
authorization = self.headers.get("authorization")
if not authorization:
if not self.server.anonymous_dir:
raise Unauthorized("unauthorized")
return self.server.anonymous_dir
else:
authorization = authorization.split()
if len(authorization) != 2 or authorization[0].lower() != "basic":
raise Unauthorized("unauthorized")
try:
decoded = base64.decodebytes(authorization[1].encode("ascii"))
user, password = decoded.decode("ascii").split(":")
expected_password, directory = self.server.users[user]
if password == expected_password:
return directory
except: # noqa
raise Unauthorized("unauthorized")
return None
def unauthorized(self):
shortmsg, longmsg = self.responses[HTTPStatus.UNAUTHORIZED]
self.send_response(HTTPStatus.UNAUTHORIZED, shortmsg)
self.send_header("Connection", "close")
content = self.error_message_format % {
"code": HTTPStatus.UNAUTHORIZED,
"message": html.escape(longmsg, quote=False),
"explain": html.escape(longmsg, quote=False),
}
body = content.encode("UTF-8", "replace")
self.send_header("Content-Type", self.error_content_type)
self.send_header("Content-Length", str(len(body)))
self.send_header("WWW-Authenticate", 'Basic realm="{}"'.format(self.server.realm))
self.end_headers()
self.end_headers()
if self.command != "HEAD" and body:
self.wfile.write(body)
def do_GET(self):
try:
super().do_GET()
except Unauthorized:
self.unauthorized()
def do_HEAD(self):
try:
super().do_HEAD()
except Unauthorized:
self.unauthorized()
def translate_path(self, path):
path = path.split("?", 1)[0]
path = path.split("#", 1)[0]
path = posixpath.normpath(path)
assert posixpath.isabs(path)
path = posixpath.relpath(path, "/")
return os.path.join(self.get_root_dir(), path)
class AuthHTTPServer(HTTPServer):
def __init__(self, *args, **kwargs):
self.users = {}
self.anonymous_dir = None
self.realm = "Realm"
super().__init__(*args, **kwargs)
class SimpleHttpServer(multiprocessing.Process):
def __init__(self):
super().__init__()
self.server = AuthHTTPServer(("127.0.0.1", 0), RequestHandler)
self.started = False
def start(self):
self.started = True
super().start()
def run(self):
self.server.serve_forever()
def stop(self):
if not self.started:
return
self.terminate()
self.join()
def allow_anonymous(self, cwd):
self.server.anonymous_dir = cwd
def add_user(self, user, password, cwd):
self.server.users[user] = (password, cwd)
def base_url(self):
return "http://127.0.0.1:{}".format(self.server.server_port)