Source code for ewoksserver.app.routes.execution.utils
from typing import Any, Dict
from fastapi.responses import JSONResponse
from fastapi import status
from ewoksjob.client import submit
from ewoksjob.client.local import submit as submit_local
from ...models import EwoksSchedulingType
from ...config import EwoksSettingsType
[docs]
class WorkflowNotReadableResponse(JSONResponse):
def __init__(self, identifier: str):
super().__init__(
{
"message": f"No permission to read workflow '{identifier}'.",
"type": "workflow",
"identifier": identifier,
},
status_code=status.HTTP_403_FORBIDDEN,
)
[docs]
class WorkflowNotFoundResponse(JSONResponse):
def __init__(self, identifier: str):
super().__init__(
{
"message": f"Workflow '{identifier}' is not found.",
"type": "workflow",
"identifier": identifier,
},
status_code=status.HTTP_404_NOT_FOUND,
)
[docs]
def submit_workflow(
workflow,
execute_arguments: Dict[str, Any],
submit_arguments: Dict[str, Any],
settings: EwoksSettingsType,
):
submit_kwargs = {**submit_arguments}
# Workflow execution: position arguments
submit_kwargs["args"] = (workflow,)
# Workflow execution: named arguments
submit_kwargs["kwargs"] = execute_arguments
execinfo = execute_arguments.setdefault("execinfo", dict())
handlers = execinfo.setdefault("handlers", list())
for handler in settings.ewoks_execution.handlers:
if handler not in handlers:
handlers.append(handler)
if settings.ewoks_scheduling.type == EwoksSchedulingType.Local:
return submit_local(**submit_kwargs)
else:
return submit(**submit_kwargs)