blob: 7eda90b70b193b8f325664924576d1d8ead4d7ff [file] [log] [blame]
import multiprocessing
from pyftpdlib.authorizers import DummyAuthorizer
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
class SimpleFtpServer(multiprocessing.Process):
def __init__(self):
super().__init__()
self.authorizer = DummyAuthorizer()
handler = FTPHandler
handler.authorizer = self.authorizer
self.server = FTPServer(("127.0.0.1", 0), handler)
def run(self):
self.server.serve_forever()
def stop(self):
self.server.close_all()
self.server.close()
self.terminate()
self.join()
def allow_anonymous(self, cwd):
self.authorizer.add_anonymous(cwd)
def add_user(self, user, password, cwd):
self.authorizer.add_user(user, password, cwd, perm="elradfmwMT")
def base_url(self):
return "ftp://127.0.0.1:{}".format(self.server.address[1])