Source code for ewoksserver.app.routes.workflows.descriptions

from typing import Dict, Optional, Iterator

from ...backends import json_backend


_WORKFLOW_KEYWORDS = (
    "id",
    "label",
    "category",
    "keywords",
    "input_schema",
    "ui_schema",
)


[docs]def workflow_descriptions( root: json_backend.ResourceUrlType, keywords: Optional[Dict] = None ) -> Iterator[Dict]: for res in json_backend.resources(root): description = res["graph"] if not _include_resource(description.get("keywords", dict()), keywords): continue yield { key: value for key, value in description.items() if key in _WORKFLOW_KEYWORDS }
def _include_resource(res_keywords: dict, keywords: Optional[Dict] = None) -> bool: if keywords is None: return True return all(res_keywords.get(key) == value for key, value in keywords.items())