Source code for ewoksserver.app.routes.execution.models

from typing import Optional, List, Union, Dict
from pydantic import BaseModel
from pydantic import Field


[docs]class EwoksExecuteOptions(BaseModel): execute_arguments: Optional[Dict] = Field( title="Workflow execution options", default=None ) worker_options: Optional[Dict] = Field(title="Worker options", default=None)
[docs]class EwoksJobInfo(BaseModel): job_id: Union[str, int] = Field(title="Workflow execution job identifier")
[docs]class EwoksEvent(BaseModel): host_name: str = Field(title="Host where the job was executed") process_id: int = Field(title="Process ID where the job was executed") user_name: str = Field(title="User name under which the job was executed") job_id: str = Field(title="Workflow execution job identifier") binding: Optional[str] = Field(title="Workflow execution engine", default=None) context: str = Field(title="Event context (job, workflow, node)") workflow_id: Optional[str] = Field(title="Workflow identifier", default=None) node_id: Optional[str] = Field(title="Workflow node identifier", default=None) task_id: Optional[str] = Field(title="Workflow task identifier", default=None) type: str = Field(title="Event type (start, end, progress)") time: str = Field(title="Event context send time") error: Optional[bool] = Field(title="Workflow execution failed", default=None) error_message: Optional[str] = Field( title="Workflow execution error message", default=None ) error_traceback: Optional[str] = Field( title="Workflow execution error traceback", default=None ) progress: Optional[int] = Field(title="Task progress in percentage", default=None) task_uri: Optional[str] = Field(title="Workflow task output URI", default=None) input_uris: Optional[List[Dict]] = Field( title="Workflow task input URIs", default=None ) output_uris: Optional[List[Dict]] = Field( title="Workflow task output URIs", default=None )
[docs]class EwoksEventFilter(BaseModel): user_name: Optional[str] = Field( title="User name under which the job was executed", default=None ) job_id: Optional[str] = Field( title="Workflow execution job identifier", default=None ) context: Optional[str] = Field( title="Event context (job, workflow, node)", default=None ) workflow_id: Optional[str] = Field(title="Workflow identifier", default=None) node_id: Optional[str] = Field(title="Workflow node identifier", default=None) task_id: Optional[str] = Field(title="Workflow task identifier", default=None) type: Optional[str] = Field(title="Event type (start, end, progress)", default=None) starttime: Optional[str] = Field(title="Only events after this time", default=None) endtime: Optional[str] = Field(title="Only events before this time", default=None) error: Optional[bool] = Field(title="Workflow execution failed", default=None)
[docs]class EwoksEventList(BaseModel): jobs: List[List[EwoksEvent]] = Field( title="Workflow execution jobs grouped per job ID" )
[docs]class EwoksWorkerList(BaseModel): workers: Optional[List[str]] = Field(title="Available workers for execution")