Source code for ewoksserver.app.lifespan
import os
import shutil
import logging
from pprint import pformat
from typing import Generator
from contextlib import contextmanager
from contextlib import asynccontextmanager
from fastapi import FastAPI
from ewoksjob.client.local import pool_context
from celery import current_app as current_celery_app
from .backends import json_backend
from .. import resources
from . import config
from .routes.execution import socketio
from .routes.tasks.discovery import discover_tasks
logger = logging.getLogger(__name__)
[docs]@asynccontextmanager
async def fastapi_lifespan(app: FastAPI) -> Generator[None, None, None]:
get_ewoks_settings = app.dependency_overrides.get(
config.get_ewoks_settings, config.get_ewoks_settings
)
ewoks_settings = get_ewoks_settings()
_configure_socketio(ewoks_settings)
_copy_default_resources(ewoks_settings)
_enable_execution_events(ewoks_settings)
with _enable_execution(ewoks_settings):
_rediscover_tasks(ewoks_settings)
_print_ewoks_settings(ewoks_settings)
yield
def _configure_socketio(app_settings: config.EwoksSettings) -> None:
socketio.configure_socketio(app_settings)
def _copy_default_resources(ewoks_settings: config.EwoksSettings) -> None:
"""Copy the default resources (tasks, workflows and icon) from the
python package to the resource directory."""
for resource, resource_ext in {
"tasks": [".json"],
"icons": [".png", ".svg"],
"workflows": [".json"],
}.items():
root_url = json_backend.root_url(ewoks_settings.resource_directory, resource)
os.makedirs(root_url, exist_ok=True)
for filename in os.listdir(resources.DEFAULT_ROOT / resource):
_, ext = os.path.splitext(filename)
if ext not in resource_ext:
continue
src = resources.DEFAULT_ROOT / resource / filename
if not os.path.isfile(src):
continue
dest = root_url / filename
if not os.path.exists(dest):
shutil.copy(src, dest)
def _rediscover_tasks(ewoks_settings: config.EwoksSettings) -> None:
if not ewoks_settings.discover_tasks:
return
tasks = discover_tasks(ewoks_settings)
root_url = json_backend.root_url(ewoks_settings.resource_directory, "tasks")
for resource in tasks:
json_backend.save_resource(root_url, resource["task_identifier"], resource)
def _enable_execution_events(ewoks_settings: config.EwoksSettings) -> None:
"""Set default ewoks event handler when nothing has been configured"""
if ewoks_settings.configured:
return
if ewoks_settings.ewoks is None:
ewoks_settings.ewoks = dict()
if not ewoks_settings.ewoks.get("handlers"):
ewoks_settings.ewoks["handlers"] = [
{
"class": "ewokscore.events.handlers.Sqlite3EwoksEventHandler",
"arguments": [
{
"name": "uri",
"value": "file:ewoks_events.db",
}
],
}
]
@contextmanager
def _enable_execution(
ewoks_settings: config.EwoksSettings,
) -> Generator[None, None, None]:
"""Ensure workflows can be executed"""
if ewoks_settings.celery is None:
with pool_context():
yield
else:
current_celery_app.conf.update(ewoks_settings.celery)
yield
def _print_ewoks_settings(ewoks_settings: config.EwoksSettings) -> None:
"""Print summary of all Ewoks settings"""
lines = list()
resourcedir = ewoks_settings.resource_directory
if not resourcedir:
resourcedir = "."
lines += ["", "", "RESOURCE DIRECTORY:", os.path.abspath(resourcedir)]
adict = ewoks_settings.celery
if adict is None:
lines += ["", "CELERY:", "Not configured (local workflow execution)"]
else:
lines += ["", "CELERY:", pformat(adict)]
adict = ewoks_settings.ewoks
if adict is None:
lines += ["", "EWOKS:", "Not configured (local workflow execution)"]
else:
lines += ["", "EWOKS:", pformat(adict)]
lines += [""]
logger.info("\n".join(lines))