Source code for ewoksserver.app.routes.execution.events

import logging
from contextlib import contextmanager
from typing import Generator
from typing import Optional

from ewoksjob.events.readers import EwoksEventReader
from ewoksjob.events.readers import instantiate_reader

from ...config import EwoksSettingsType

logger = logging.getLogger(__name__)


[docs] @contextmanager def reader_context( ewoks_settings: EwoksSettingsType, ) -> Generator[Optional[EwoksEventReader], None, None]: r = _reader(ewoks_settings) try: yield r finally: if r is not None: r.close()
def _reader(ewoks_settings: EwoksSettingsType) -> Optional[EwoksEventReader]: handlers = ewoks_settings.ewoks_execution.handlers argmap = {"uri": "url"} for name in ("Redis", "Sqlite3", None): for handler in handlers: if name is None or name in handler["class"]: arguments = handler.get("arguments", list()) arguments = { argmap.get(arg["name"], arg["name"]): arg["value"] for arg in arguments } return instantiate_reader(**arguments) logger.warning("Configure ewoks event handlers") return None