Source code for ewoksserver.tests.test_events

import os
from collections import Counter
from datetime import datetime

import pytest
from ewokscore.tests.examples.graphs import get_graph

from .test_execute import upload_graph
from .test_execute import get_events
from .api_versions import ROOT_ALL_VERSIONS


[docs]@pytest.mark.parametrize("root", ROOT_ALL_VERSIONS) def test_get_execution_events(local_exec_client, root): client, sclient = local_exec_client graph_name, expected = upload_graph(root, client) nevents = 0 nevents_per_exec = 2 * (len(expected) + 2) # Test no events (nothing has been executed) response = client.get(f"{root}/execution/events") assert response.status_code == 200 data = response.json() assert data == {"jobs": list()} # Execute workflow response = client.post(f"{root}/execute/{graph_name}") data = response.json() assert response.status_code == 200, data job_id1 = data["job_id"] nevents += nevents_per_exec # Wait until all events have been received over the Socket.IO connection events1 = get_events(sclient, nevents) # Query should return the same a what was recieved over the Socket.IO connection response = client.get(f"{root}/execution/events") assert response.status_code == 200 events = response.json()["jobs"] assert len(events) == 1 assert events[0] == events1 response = client.get(f"{root}/execution/events?job_id={job_id1}") assert response.status_code == 200 events = response.json()["jobs"] assert len(events) == 1 assert events[0] == events1 response = client.get(f"{root}/execution/events?context=job") assert response.status_code == 200 events = response.json()["jobs"] assert len(events) == 1 assert len(events[0]) == 2 dtmid = datetime.now().astimezone() # Execute workflow response = client.post(f"{root}/execute/{graph_name}") data = response.json() assert response.status_code == 200, data job_id2 = data["job_id"] nevents += nevents_per_exec # Wait until all events have been received over the Socket.IO connection events2 = get_events(sclient, nevents_per_exec) response = client.get(f"{root}/execution/events") assert response.status_code == 200 events = response.json()["jobs"] assert len(events) == 2 assert events[0] == events1 assert events[1] == events2 response = client.get(f"{root}/execution/events?job_id={job_id2}") assert response.status_code == 200 events = response.json()["jobs"] assert len(events) == 1 assert events[0] == events2 response = client.get(f"{root}/execution/events?context=job") assert response.status_code == 200 events = response.json()["jobs"] assert len(events) == 2 assert len(events[0]) == 2 assert len(events[1]) == 2 if os.name == "nt": return # TODO: time filtering fails on windows # Test time Query midtime = dtmid.isoformat().replace("+", "%2b") response = client.get(f"{root}/execution/events?endtime={midtime}") assert response.status_code == 200 events = response.json()["jobs"] assert len(events) == 1 assert events[0] == events1 response = client.get(f"{root}/execution/events?starttime={midtime}") assert response.status_code == 200 events = response.json()["jobs"] assert len(events) == 1 assert events[0] == events2
[docs]@pytest.mark.parametrize("root", ROOT_ALL_VERSIONS) def test_get_execution_events_parallel(local_exec_client, root): client, sclient = local_exec_client _, expected = get_graph("demo") nevents = 0 nevents_per_exec = 2 * (len(expected) + 2) # Test no events (nothing has been executed) jobs = client.get(f"{root}/execution/events").json()["jobs"] assert not jobs # Execute workflows in parallel nruns = 3 for _ in range(nruns): response = client.post( f"{root}/execute/demo", json={"execute_arguments": {"inputs": [{"name": "delay", "value": 0.1}]}}, ) data = response.json() assert response.status_code == 200, data nevents += nevents_per_exec # Get events from Socket.IO and REST API events_socketio = get_events(sclient, nevents) events_get = client.get(f"{root}/execution/events").json()["jobs"] # Check that we have all events from the Socket.IO connection nevents = Counter() for event in events_socketio: nevents[event["job_id"]] += 1 assert set(nevents.values()) == {nevents_per_exec} # Check that we have all events from the REST API nevents = Counter() for job in events_get: for event in job: nevents[event["job_id"]] += 1 assert set(nevents.values()) == {nevents_per_exec} # Check whether that events from the REST API are properly grouped per job assert len(events_get) == nruns for job in events_get: assert len({event["job_id"] for event in job}) == 1