Source code for ewoksserver.app.routes.execution.router

from typing import Dict, List, Union, Optional
from collections import OrderedDict
from typing_extensions import Annotated

from fastapi import APIRouter
from fastapi import Body
from fastapi import Path
from fastapi import Depends
from fastapi import status
from fastapi import HTTPException
from fastapi.responses import JSONResponse

from ewoksutils import event_utils
from ewoksjob.client import get_workers, submit
from ewoksjob.client.local import submit as submit_local

from ...backends import json_backend
from ...config import EwoksSettingsType
from ..common import models as common_models
from . import models
from . import events

v1_0_0_router = APIRouter()
v1_1_0_router = APIRouter()


[docs]@v1_0_0_router.post( "/execute/{identifier}", summary="Execute workflow", response_model=models.EwoksJobInfo, response_description="Workflow execution job description", status_code=200, responses={ status.HTTP_403_FORBIDDEN: { "description": "No permission to read workflow", "model": common_models.ResourceIdentifierError, }, status.HTTP_404_NOT_FOUND: { "description": "Workflow not found", "model": common_models.ResourceIdentifierError, }, }, ) def execute_workflow( settings: EwoksSettingsType, identifier: Annotated[ str, Path( title="Task identifier", description="Unique identifier in the task database", ), ], options: Annotated[ Optional[models.EwoksExecuteOptions], Body(title="Ewoks execute options") ] = None, ) -> Dict[str, Union[int, str]]: try: graph = json_backend.load_resource( settings.resource_directory / "workflows", identifier ) except PermissionError: return JSONResponse( { "message": f"No permission to read workflow '{identifier}'.", "type": "workflow", "identifier": identifier, }, status_code=status.HTTP_403_FORBIDDEN, ) except FileNotFoundError: return JSONResponse( { "message": f"Workflow '{identifier}' is not found.", "type": "workflow", "identifier": identifier, }, status_code=status.HTTP_404_NOT_FOUND, ) if options is None: execute_arguments = None worker_options = None else: execute_arguments = options.execute_arguments worker_options = options.worker_options execute_arguments = json_backend.merge_mappings( graph["graph"].get("execute_arguments"), execute_arguments ) submit_kwargs = json_backend.merge_mappings( graph["graph"].get("worker_options"), worker_options ) # Workflow execution: position arguments submit_kwargs["args"] = (graph,) # Workflow execution: named arguments submit_kwargs["kwargs"] = execute_arguments ewoks_config = settings.ewoks if ewoks_config: execinfo = execute_arguments.setdefault("execinfo", dict()) handlers = execinfo.setdefault("handlers", list()) for handler in ewoks_config.get("handlers", list()): if handler not in handlers: handlers.append(handler) if settings.celery is None: future = submit_local(**submit_kwargs) else: future = submit(**submit_kwargs) return {"job_id": future.task_id}
[docs]@v1_0_0_router.get( "/execution/events", summary="Get workflow events", response_model=models.EwoksEventList, response_description="Workflow execution jobs grouped per job ID", status_code=200, responses={ status.HTTP_503_SERVICE_UNAVAILABLE: { "description": "Server not configured for ewoks events", "model": common_models.ResourceIdentifierError, }, }, ) def execute_events( settings: EwoksSettingsType, filters: Annotated[ models.EwoksEventFilter, Depends(models.EwoksEventFilter) ], # pydantic model to parse query parameters ) -> Dict[str, List[List[Dict]]]: jobs = OrderedDict() with events.reader_context(settings) as reader: if reader is None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Server not configured for ewoks events", ) for event in reader.get_events(**filters.model_dump(exclude_none=True)): job_id = event["job_id"] if job_id not in jobs: jobs[job_id] = list() if "engine" in event_utils.FIELD_TYPES: event["binding"] = event.pop("engine") jobs[job_id].append(event) return {"jobs": list(jobs.values())}
[docs]@v1_1_0_router.get( "/execution/workers", summary="Get workers", response_model=models.EwoksWorkerList, response_description="List of available workers", status_code=200, ) def workers(settings: EwoksSettingsType) -> Dict[str, Optional[List[str]]]: if settings.celery is None: return {"workers": None} return {"workers": get_workers()}
v1_1_0_router.include_router(v1_0_0_router)