blob: 52c05f8ba530cc7c1245a303efea83b022489c22 [file] [log] [blame]
import multiprocessing
from pyftpdlib.authorizers import DummyAuthorizer
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
class SimpleFtpServer(multiprocessing.Process):
def __init__(self):
super().__init__()
self.authorizer = DummyAuthorizer()
handler = FTPHandler
handler.authorizer = self.authorizer
self.server = FTPServer(('127.0.0.1', 0), handler)
def run(self):
self.server.serve_forever()
def stop(self):
self.server.close_all()
self.server.close()
self.terminate()
self.join()
def allow_anonymous(self, cwd):
self.authorizer.add_anonymous(cwd)
def add_user(self, user, password, cwd):
self.authorizer.add_user(user, password, cwd, perm='elradfmwMT')
def base_url(self):
return 'ftp://127.0.0.1:{}'.format(self.server.address[1])