Source code for ewoksserver.app.lifespan

import logging
import os
import shutil
from contextlib import asynccontextmanager
from contextlib import contextmanager
from pprint import pformat
from typing import Generator

from celery import current_app as current_celery_app
from ewoksjob.client.local import pool_context
from fastapi import FastAPI

from ewoksserver.app.models import EwoksSchedulingType

from .. import resources
from . import config
from .backends import json_backend
from .routes.execution import socketio
from .routes.tasks.discovery import discover_tasks

logger = logging.getLogger(__name__)


[docs] @asynccontextmanager async def fastapi_lifespan(app: FastAPI) -> Generator[None, None, None]: get_ewoks_settings = app.dependency_overrides.get( config.get_ewoks_settings, config.get_ewoks_settings ) ewoks_settings = get_ewoks_settings() _configure_socketio(ewoks_settings) _copy_default_resources(ewoks_settings) _enable_execution_events(ewoks_settings) with _enable_execution(ewoks_settings): _rediscover_tasks(ewoks_settings) _print_ewoks_settings(ewoks_settings) yield
def _configure_socketio(app_settings: config.EwoksSettings) -> None: socketio.configure_socketio(app_settings) def _copy_default_resources(ewoks_settings: config.EwoksSettings) -> None: """Copy the default resources (tasks, workflows and icon) from the python package to the resource directory.""" for resource, resource_ext in { "tasks": [".json"], "icons": [".png", ".svg"], "workflows": [".json"], }.items(): root_url = json_backend.root_url(ewoks_settings.resource_directory, resource) os.makedirs(root_url, exist_ok=True) for filename in os.listdir(resources.DEFAULT_ROOT / resource): _, ext = os.path.splitext(filename) if ext not in resource_ext: continue src = resources.DEFAULT_ROOT / resource / filename if not os.path.isfile(src): continue dest = root_url / filename if not os.path.exists(dest): shutil.copy(src, dest) def _rediscover_tasks(ewoks_settings: config.EwoksSettings) -> None: if not ewoks_settings.ewoks_discovery.on_start_up: return try: tasks = discover_tasks(ewoks_settings) except Exception as ex: tasks = [] logger.exception("Task discovery failed: %s", ex) root_url = json_backend.root_url(ewoks_settings.resource_directory, "tasks") for resource in tasks: json_backend.save_resource(root_url, resource["task_identifier"], resource) def _enable_execution_events(ewoks_settings: config.EwoksSettings) -> None: """Set default ewoks event handler when nothing has been configured""" if ewoks_settings.configured: return if not ewoks_settings.ewoks_execution.handlers: ewoks_settings.ewoks_execution.handlers = [ { "class": "ewokscore.events.handlers.Sqlite3EwoksEventHandler", "arguments": [ { "name": "uri", "value": "file:ewoks_events.db", } ], } ] @contextmanager def _enable_execution( ewoks_settings: config.EwoksSettings, ) -> Generator[None, None, None]: """Ensure workflows can be executed""" config = ewoks_settings.ewoks_scheduling if config.type == EwoksSchedulingType.Celery: current_celery_app.conf.update(config.configuration) yield else: with pool_context(): yield def _print_ewoks_settings(ewoks_settings: config.EwoksSettings) -> None: """Print summary of all Ewoks settings""" resourcedir = ewoks_settings.resource_directory if not resourcedir: resourcedir = "." lines = ["", "", "RESOURCE DIRECTORY:", os.path.abspath(resourcedir)] cfg = ewoks_settings.ewoks_scheduling if cfg.type is EwoksSchedulingType.Local: lines += ["", "EWOKS JOB SCHEDULING", "Local workflow execution"] else: lines += [ "", "EWOKS JOB SCHEDULING", f"Execution using {cfg.type} with the following config:", pformat(cfg.configuration), ] lines += [ "", "EWOKS EXECUTION:", pformat(ewoks_settings.ewoks_execution.model_dump()), ] lines += [ "", "EWOKS DISCOVERY:", pformat(ewoks_settings.ewoks_discovery.model_dump()), ] lines += [""] logger.info("\n".join(lines))