Source code for ewoksserver.app.lifespan
import logging
import os
import shutil
from contextlib import asynccontextmanager
from contextlib import contextmanager
from pprint import pformat
from typing import Generator
from celery import current_app as current_celery_app
from ewoksjob.client.local import pool_context
from fastapi import FastAPI
from ewoksserver.app.models import EwoksSchedulingType
from .. import resources
from . import config
from .backends import json_backend
from .routes.execution import socketio
from .routes.tasks.discovery import discover_tasks
logger = logging.getLogger(__name__)
[docs]
@asynccontextmanager
async def fastapi_lifespan(app: FastAPI) -> Generator[None, None, None]:
get_ewoks_settings = app.dependency_overrides.get(
config.get_ewoks_settings, config.get_ewoks_settings
)
ewoks_settings = get_ewoks_settings()
_configure_socketio(ewoks_settings)
_copy_default_resources(ewoks_settings)
_enable_execution_events(ewoks_settings)
with _enable_execution(ewoks_settings):
_rediscover_tasks(ewoks_settings)
_print_ewoks_settings(ewoks_settings)
yield
def _configure_socketio(app_settings: config.EwoksSettings) -> None:
socketio.configure_socketio(app_settings)
def _copy_default_resources(ewoks_settings: config.EwoksSettings) -> None:
"""Copy the default resources (tasks, workflows and icon) from the
python package to the resource directory."""
for resource, resource_ext in {
"tasks": [".json"],
"icons": [".png", ".svg"],
"workflows": [".json"],
}.items():
root_url = json_backend.root_url(ewoks_settings.resource_directory, resource)
os.makedirs(root_url, exist_ok=True)
for filename in os.listdir(resources.DEFAULT_ROOT / resource):
_, ext = os.path.splitext(filename)
if ext not in resource_ext:
continue
src = resources.DEFAULT_ROOT / resource / filename
if not os.path.isfile(src):
continue
dest = root_url / filename
if not os.path.exists(dest):
shutil.copy(src, dest)
def _rediscover_tasks(ewoks_settings: config.EwoksSettings) -> None:
if not ewoks_settings.ewoks_discovery.on_start_up:
return
try:
tasks = discover_tasks(ewoks_settings)
except Exception as ex:
tasks = []
logger.exception("Task discovery failed: %s", ex)
root_url = json_backend.root_url(ewoks_settings.resource_directory, "tasks")
for resource in tasks:
json_backend.save_resource(root_url, resource["task_identifier"], resource)
def _enable_execution_events(ewoks_settings: config.EwoksSettings) -> None:
"""Set default ewoks event handler when nothing has been configured"""
if ewoks_settings.configured:
return
if not ewoks_settings.ewoks_execution.handlers:
ewoks_settings.ewoks_execution.handlers = [
{
"class": "ewokscore.events.handlers.Sqlite3EwoksEventHandler",
"arguments": [
{
"name": "uri",
"value": "file:ewoks_events.db",
}
],
}
]
@contextmanager
def _enable_execution(
ewoks_settings: config.EwoksSettings,
) -> Generator[None, None, None]:
"""Ensure workflows can be executed"""
config = ewoks_settings.ewoks_scheduling
if config.type == EwoksSchedulingType.Celery:
current_celery_app.conf.update(config.configuration)
yield
else:
with pool_context():
yield
def _print_ewoks_settings(ewoks_settings: config.EwoksSettings) -> None:
"""Print summary of all Ewoks settings"""
resourcedir = ewoks_settings.resource_directory
if not resourcedir:
resourcedir = "."
lines = ["", "", "RESOURCE DIRECTORY:", os.path.abspath(resourcedir)]
cfg = ewoks_settings.ewoks_scheduling
if cfg.type is EwoksSchedulingType.Local:
lines += ["", "EWOKS JOB SCHEDULING", "Local workflow execution"]
else:
lines += [
"",
"EWOKS JOB SCHEDULING",
f"Execution using {cfg.type} with the following config:",
pformat(cfg.configuration),
]
lines += [
"",
"EWOKS EXECUTION:",
pformat(ewoks_settings.ewoks_execution.model_dump()),
]
lines += [
"",
"EWOKS DISCOVERY:",
pformat(ewoks_settings.ewoks_discovery.model_dump()),
]
lines += [""]
logger.info("\n".join(lines))