blob: 67121221d71cac5b9f5aef4cf87ee5e579e65326 [file] [log] [blame]
import os
import logging
import sys
import importlib
import pkg_resources
import tornado.web
from webservice import NexusHandler
from webservice.nexus_tornado.request.handlers import NexusRequestHandler
from webservice.nexus_tornado.request.handlers import NexusAsyncJobHandler
import webservice.algorithms_spark.NexusCalcSparkHandler
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
logger = logging.getLogger(__name__)
class VersionHandler(tornado.web.RequestHandler):
def get(self):
self.write(pkg_resources.get_distribution("nexusanalysis").version)
class NexusHandlerManager(object):
_spark_context = None
def __init__(self, module_dirs,
algorithm_config, tile_service_factory,
max_request_threads=1,
static_dir=None):
for moduleDir in module_dirs:
logger.info("Loading modules from %s" % moduleDir)
importlib.import_module(moduleDir)
logger.info("Running Nexus Initializers")
NexusHandler.executeInitializers(algorithm_config)
self._tile_service_factory = tile_service_factory
logger.info("Initializing request ThreadPool to %s" % max_request_threads)
self._request_thread_pool = tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads)
self._static_dir = static_dir
def get_handlers(self):
handlers = self._get_legacy_handlers()
handlers.extend(self._get_restapi_algorithm_handlers())
handlers.append((r"/version", VersionHandler))
NexusAsyncJobHandler.start_jobs_cleaner()
handlers.append((r"/jobs/(.*)", NexusAsyncJobHandler))
if self._static_dir:
handlers.append(
(r'/(.*)', tornado.web.StaticFileHandler, {'path': self._static_dir, "default_filename": "index.html"}))
return handlers
def _get_legacy_handlers(self):
return self.__get_tornado_handlers(NexusHandler.AVAILABLE_LEGACY_HANDLERS, lambda x: x)
def _get_restapi_algorithm_handlers(self):
def path_spark_to_restapi(s):
i_spark = s.find('Spark')
return '/algorithms' + s[:i_spark]
return self.__get_tornado_handlers(NexusHandler.AVAILABLE_RESTAPI_HANDLERS, path_spark_to_restapi)
def __get_tornado_handlers(self, wrappers, path_func):
handlers = []
for clazzWrapper in wrappers:
path = path_func(clazzWrapper.path)
logger.info("adding request handler for class {} on path {}".format(clazzWrapper, path))
if issubclass(clazzWrapper, webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler):
spark_context = self._get_spark_context()
handlers.append((path,
NexusRequestHandler,
dict(clazz=clazzWrapper,
tile_service_factory=self._tile_service_factory,
sc=spark_context,
thread_pool=self._request_thread_pool)))
else:
handlers.append((path,
NexusRequestHandler,
dict(clazz=clazzWrapper,
tile_service_factory=self._tile_service_factory,
thread_pool=self._request_thread_pool)))
return handlers
def _get_spark_context(self):
if self._spark_context is None:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("nexus-analysis") \
.config("spark.scheduler.mode", "FAIR") \
.config("spark.scheduler.allocation.file", os.path.abspath("webservice/config/spark_pools.xml")) \
.getOrCreate()
self._spark_context = spark.sparkContext
return self._spark_context