| import multiprocessing |
| import os |
| import posixpath |
| import html |
| import threading |
| import base64 |
| from http.server import SimpleHTTPRequestHandler, HTTPServer, HTTPStatus |
| |
| |
| class Unauthorized(Exception): |
| pass |
| |
| |
| class RequestHandler(SimpleHTTPRequestHandler): |
| |
| def get_root_dir(self): |
| authorization = self.headers.get('authorization') |
| if not authorization: |
| if not self.server.anonymous_dir: |
| raise Unauthorized('unauthorized') |
| return self.server.anonymous_dir |
| else: |
| authorization = authorization.split() |
| if len(authorization) != 2 or authorization[0].lower() != 'basic': |
| raise Unauthorized('unauthorized') |
| try: |
| decoded = base64.decodebytes(authorization[1].encode('ascii')) |
| user, password = decoded.decode('ascii').split(':') |
| expected_password, directory = self.server.users[user] |
| if password == expected_password: |
| return directory |
| except: |
| raise Unauthorized('unauthorized') |
| return None |
| |
| def unauthorized(self): |
| shortmsg, longmsg = self.responses[HTTPStatus.UNAUTHORIZED] |
| self.send_response(HTTPStatus.UNAUTHORIZED, shortmsg) |
| self.send_header('Connection', 'close') |
| |
| content = (self.error_message_format % { |
| 'code': HTTPStatus.UNAUTHORIZED, |
| 'message': html.escape(longmsg, quote=False), |
| 'explain': html.escape(longmsg, quote=False) |
| }) |
| body = content.encode('UTF-8', 'replace') |
| self.send_header('Content-Type', self.error_content_type) |
| self.send_header('Content-Length', str(len(body))) |
| self.send_header('WWW-Authenticate', 'Basic realm="{}"'.format(self.server.realm)) |
| self.end_headers() |
| self.end_headers() |
| |
| if self.command != 'HEAD' and body: |
| self.wfile.write(body) |
| |
| def do_GET(self): |
| try: |
| super().do_GET() |
| except Unauthorized: |
| self.unauthorized() |
| |
| def do_HEAD(self): |
| try: |
| super().do_HEAD() |
| except Unauthorized: |
| self.unauthorized() |
| |
| def translate_path(self, path): |
| path = path.split('?', 1)[0] |
| path = path.split('#', 1)[0] |
| path = posixpath.normpath(path) |
| assert(posixpath.isabs(path)) |
| path = posixpath.relpath(path, '/') |
| return os.path.join(self.get_root_dir(), path) |
| |
| |
| class AuthHTTPServer(HTTPServer): |
| def __init__(self, *args, **kwargs): |
| self.users = {} |
| self.anonymous_dir = None |
| self.realm = 'Realm' |
| super().__init__(*args, **kwargs) |
| |
| |
| class SimpleHttpServer(multiprocessing.Process): |
| def __init__(self): |
| self.__stop = multiprocessing.Queue() |
| super().__init__() |
| self.server = AuthHTTPServer(('127.0.0.1', 0), RequestHandler) |
| self.started = False |
| |
| def start(self): |
| self.started = True |
| super().start() |
| |
| def run(self): |
| t = threading.Thread(target=self.server.serve_forever) |
| t.start() |
| self.__stop.get() |
| self.server.shutdown() |
| t.join() |
| |
| def stop(self): |
| if not self.started: |
| return |
| self.__stop.put(None) |
| self.terminate() |
| self.join() |
| |
| def allow_anonymous(self, cwd): |
| self.server.anonymous_dir = cwd |
| |
| def add_user(self, user, password, cwd): |
| self.server.users[user] = (password, cwd) |
| |
| def base_url(self): |
| return 'http://127.0.0.1:{}'.format(self.server.server_port) |