Source code for ewoksserver.app.routes.tasks.discovery
import logging
from typing import Dict
from typing import List
from typing import Optional
from ewoksjob.client import discover_all_tasks
from ewoksjob.client import discover_tasks_from_modules
from ewoksjob.client import get_queues
from ewoksjob.client.local import discover_all_tasks as discover_all_tasks_local
from ewoksjob.client.local import (
discover_tasks_from_modules as discover_tasks_from_modules_local,
)
from ...config import EwoksSettings
from ...models import EwoksSchedulingType
logger = logging.getLogger(__name__)
[docs]
def discover_tasks(
settings: EwoksSettings,
modules: Optional[List[str]] = None,
reload: Optional[bool] = None,
task_type: Optional[str] = None,
worker_options: Optional[Dict] = None,
) -> List[Dict[str, str]]:
"""
:raises ModuleNotFoundError: failed importing tasks.
:raises TimeoutError: timeout when asking a remote worker for tasks.
:raises Exception: any other import or remote error.
"""
if worker_options is None:
kwargs = dict()
else:
kwargs = dict(worker_options)
# Task discovery: position arguments
if modules:
kwargs["args"] = modules
# Task discovery: named arguments
kwargs["kwargs"] = dict()
if reload is not None:
kwargs["kwargs"]["reload"] = reload
if task_type is not None:
kwargs["kwargs"]["task_type"] = task_type
timeout = settings.ewoks_discovery.timeout
if settings.ewoks_scheduling.type == EwoksSchedulingType.Local:
if modules:
future = discover_tasks_from_modules_local(**kwargs)
else:
future = discover_all_tasks_local(**kwargs)
tasks = future.result(timeout=timeout)
else:
tasks = _discover_tasks_in_all_queues(kwargs, timeout=timeout)
for task in tasks:
_set_default_task_properties(task)
return tasks
def _discover_tasks_in_all_queues(
kwargs: Dict, timeout: Optional[float] = None
) -> List[Dict[str, str]]:
discover_from_modules = "args" in kwargs and bool(kwargs["args"])
discover = (
discover_tasks_from_modules if discover_from_modules else discover_all_tasks
)
futures = [discover(**kwargs, queue=queue) for queue in get_queues()]
# Store tasks in a dict to avoid duplicates
task_dict = {}
for future in futures:
# Ignore failures of a single queue to not prevent discovery on other queues
new_tasks = future.result(timeout=timeout)
exc = future.exception()
if exc:
logger.warning(f"Task discovery failed for {future.queue}: {exc}")
continue
if new_tasks is None:
continue
for task in new_tasks:
task_dict[task["task_identifier"]] = task
return list(task_dict.values())
def _set_default_task_properties(task: dict) -> None:
if not task.get("icon"):
task["icon"] = "default.png"
if not task.get("label"):
task_identifier = task.get("task_identifier")
if task_identifier:
task["label"] = task_identifier.split(".")[-1]