Source code for ewoksserver.tests.test_execute

import time

import pytest
from ewokscore.tests.examples.graphs import get_graph

from .api_versions import ROOT_ALL_VERSIONS, ROOT_V1_1_0


[docs]@pytest.mark.parametrize("root", ROOT_ALL_VERSIONS) def test_execute_with_celery(celery_exec_client, root): _test_execute(root, *celery_exec_client)
[docs]@pytest.mark.parametrize("root", ROOT_ALL_VERSIONS) def test_execute_without_celery(local_exec_client, root): _test_execute(root, *local_exec_client)
[docs]@pytest.mark.parametrize("root", ROOT_ALL_VERSIONS) def test_new_client_new_events(local_exec_client, root): client, sclient = local_exec_client _test_execute(root, client, sclient) sclient.disconnect() sclient.connect() time.sleep(1) assert not sclient.get_events()
[docs]@pytest.mark.parametrize("root", ROOT_ALL_VERSIONS) def test_execute_options(rest_client, mocked_local_submit, root): workflow = { "graph": { "id": "myworkflow", "label": "label1", "category": "cat1", "execute_arguments": { "engine": "ppf", "_slurm_spawn_arguments": { "parameters": {"time_limit": 10, "partition": "nice"}, "pre_script": "module load ewoks", }, }, "worker_options": {"queue": "id00"}, }, "nodes": [{"id": "task1"}], } response = rest_client.post(f"{root}/workflows", json=workflow) data = response.json() assert response.status_code == 200, data # Check that the backend uses execute_arguments and worker_options # from the workflow definition response = rest_client.post(f"{root}/execute/myworkflow") expected_submit_arguments = { "args": (), "kwargs": { "args": (workflow,), "kwargs": { "engine": "ppf", "_slurm_spawn_arguments": { "parameters": {"time_limit": 10, "partition": "nice"}, "pre_script": "module load ewoks", }, }, "queue": "id00", }, } assert mocked_local_submit == expected_submit_arguments # Check that the backend merges execute_arguments and worker_options # from the client data = { "execute_arguments": { "engine": "ppf", "_slurm_spawn_arguments": { "parameters": {"time_limit": 20, "partition": "nice"}, "pre_script": "module load ewoks", }, }, "worker_options": {"queue": "id00", "time_limit": 30}, } response = rest_client.post(f"{root}/execute/myworkflow", json=data) expected_submit_arguments = { "args": (), "kwargs": { "args": (workflow,), "kwargs": { "engine": "ppf", "_slurm_spawn_arguments": { "parameters": {"time_limit": 20, "partition": "nice"}, "pre_script": "module load ewoks", }, }, "queue": "id00", "time_limit": 30, }, } assert mocked_local_submit == expected_submit_arguments
def _test_execute(root, client, sclient): graph_name, expected = upload_graph(root, client) response = client.post(f"{root}/execute/{graph_name}") assert response.status_code == 200, response.json() n = 2 * (len(expected) + 2) events = get_events(sclient, n) _assert_events(response, events, expected) return n
[docs]def upload_graph(root, client): graph_name = "acyclic1" graph, expected = get_graph(graph_name) response = client.post(f"{root}/workflows", json=graph) assert response.status_code == 200, response.json() return graph_name, expected
[docs]def get_events(sclient, nevents, timeout=10): t0 = time.time() events = list() while True: events.extend(sclient.get_events()) if len(events) == nevents: break time.sleep(0.1) if time.time() - t0 > timeout: raise TimeoutError(f"Received {len(events)} instead of {nevents}") return events
def _assert_events(response, events, expected): n = 2 * (len(expected) + 2) assert len(events) == n job_id = response.json()["job_id"] for event in events: assert event["job_id"] == job_id if event["node_id"]: assert event["node_id"] in expected
[docs]@pytest.mark.parametrize("root", ROOT_V1_1_0) def test_get_workers_with_celery(celery_exec_client, root): rest_client, _ = celery_exec_client response = rest_client.get(f"{root}/execution/workers") assert response.status_code == 200, response.json() assert len(response.json()["workers"]) == 1
[docs]@pytest.mark.parametrize("root", ROOT_V1_1_0) def test_get_workers_without_celery(rest_client, root): response = rest_client.get(f"{root}/execution/workers") assert response.status_code == 200, response.json() assert response.json()["workers"] is None