Source code for ewoksserver.app.routes.execution.events

import logging
from contextlib import contextmanager
from typing import Generator, Optional

from ewoksjob.events.readers import EwoksEventReader
from ewoksjob.events.readers import instantiate_reader

from ...config import EwoksSettingsType

logger = logging.getLogger(__name__)


[docs]@contextmanager def reader_context( ewoks_settings: EwoksSettingsType, ) -> Generator[Optional[EwoksEventReader], None, None]: r = _reader(ewoks_settings) try: yield r finally: if r is not None: r.close()
def _reader(ewoks_settings: EwoksSettingsType) -> Optional[EwoksEventReader]: cfg = ewoks_settings.ewoks if cfg: handlers = cfg.get("handlers", list()) else: handlers = list() argmap = {"uri": "url"} for name in ("Redis", "Sqlite3", None): for handler in handlers: if name is None or name in handler["class"]: arguments = handler.get("arguments", list()) arguments = { argmap.get(arg["name"], arg["name"]): arg["value"] for arg in arguments } return instantiate_reader(**arguments) logger.warning("Configure ewoks event handlers") return None