blob: 795e7ea9f8c7e5bc9504fe7aa7b213d34cad6436 [file] [log] [blame]
import asyncio
import typing
import aiohttp
from elasticsearch import AsyncElasticsearch
import plugins.configuration
import plugins.offloader
class Endpoint:
exec: typing.Callable
def __init__(self, executor):
self.exec = executor
class BaseServer:
"""Main server class, base def"""
config: plugins.configuration.Configuration
server: typing.Optional[aiohttp.web.Server]
data: plugins.configuration.InterData
handlers: typing.Dict[str, Endpoint]
database: AsyncElasticsearch
dbpool: asyncio.Queue
runners: plugins.offloader.ExecutorPool