Source code for ewoksserver.app.routes.execution.utils

from typing import Any
from typing import Dict
from typing import Mapping
from typing import Optional

from ewoksjob.client import submit
from ewoksjob.client.local import submit as submit_local
from fastapi.responses import JSONResponse

from ...config import EwoksSettingsType
from ...models import EwoksSchedulingType
from .. import status


[docs] class WorkflowNotReadableResponse(JSONResponse): def __init__(self, identifier: str): super().__init__( { "message": f"No permission to read workflow '{identifier}'.", "type": "workflow", "identifier": identifier, }, status_code=status.HTTP_403_FORBIDDEN, )
[docs] class WorkflowNotFoundResponse(JSONResponse): def __init__(self, identifier: str): super().__init__( { "message": f"Workflow '{identifier}' is not found.", "type": "workflow", "identifier": identifier, }, status_code=status.HTTP_404_NOT_FOUND, )
[docs] def submit_workflow( workflow, client_execute_arguments: Optional[Dict[str, Any]], client_submit_arguments: Optional[Dict[str, Any]], graph_execute_arguments: Optional[Dict[str, Any]], graph_submit_arguments: Optional[Dict[str, Any]], settings: EwoksSettingsType, ): execute_kwargs = _merge_execute_arguments( client_execute_arguments, graph_execute_arguments, settings ) submit_kwargs = _merge_submit_arguments( client_submit_arguments, graph_submit_arguments ) # Workflow execution: position arguments submit_kwargs["args"] = (workflow,) # Workflow execution: named arguments submit_kwargs["kwargs"] = execute_kwargs if settings.ewoks_scheduling.type == EwoksSchedulingType.Local: return submit_local(**submit_kwargs) else: return submit(**submit_kwargs)
def _merge_execute_arguments( client_execute_arguments: Optional[Dict[str, Any]], graph_execute_arguments: Optional[Dict[str, Any]], settings: EwoksSettingsType, ) -> Dict[str, Any]: """Client arguments have precedence over graph arguments in case merging does not apply. Server configuration arguments can always be merged. """ if client_execute_arguments is None: client_execute_arguments = dict() if graph_execute_arguments is None: graph_execute_arguments = dict() # Handler from the client execinfo = client_execute_arguments.get("execinfo", dict()) handlers = execinfo.pop("handlers", list()) # Handler from the graph execinfo = graph_execute_arguments.get("execinfo", dict()) extra_handlers = execinfo.pop("handlers", list()) # Handler from the server configuration extra_handlers += settings.ewoks_execution.handlers for handler in extra_handlers: if handler not in handlers: handlers.append(handler) if handlers: execinfo = client_execute_arguments.setdefault("execinfo", dict()) execinfo["handlers"] = handlers return _merge_mappings(graph_execute_arguments, client_execute_arguments) def _merge_submit_arguments( client_submit_arguments: Optional[Dict[str, Any]], graph_submit_arguments: Optional[Dict[str, Any]], ): """Client arguments have precedence over graph arguments in case merging does not apply.""" if client_submit_arguments is None: client_submit_arguments = dict() if graph_submit_arguments is None: graph_submit_arguments = dict() return _merge_mappings(graph_submit_arguments, client_submit_arguments) def _merge_mappings(d1: Optional[Mapping], d2: Optional[Mapping]) -> dict: """`d2` has precedence over `d1` in case merging does not apply. Merging is done like `{**d1, **d2}` but then recursive. """ if d1 is None: merged = dict() else: merged = dict(d1) if not d2: return merged for key, value2 in d2.items(): value1 = merged.get(key) if isinstance(value1, Mapping) and isinstance(value2, Mapping): value2 = _merge_mappings(value1, value2) merged[key] = value2 return merged