Source code for ewoksserver.tests.conftest
import time
from pathlib import Path
from typing import List
from functools import lru_cache
from collections import namedtuple
import pytest
from fastapi.testclient import TestClient
from ewokscore import events
from ewoksjob.tests.conftest import celery_config # noqa F401
from ewoksjob.tests.conftest import celery_includes # noqa F401
from .. import app as newserver
from ..app import config as serverconfig
from ..app.backends.binary_backend import _load_url
from ..app.models import (
EwoksDiscoverySettings,
EwoksExecutionSettings,
EwoksJobSettings,
EwoksSchedulingType,
)
from ..resources import DEFAULT_ROOT
from .socketio_test import SocketIOTestClient
from .api_versions import api_root # noqa F401
from .api_versions import min_api_version # noqa F401
from .api_versions import max_api_version # noqa F401
from .data import resource_filenames
[docs]
@pytest.fixture
def rest_client(tmpdir):
"""Client to the REST server (no execution)."""
app = newserver.create_app()
@lru_cache()
def get_ewoks_settings_for_tests():
return serverconfig.EwoksSettings(
configured=True,
resource_directory=str(tmpdir),
# Disable discovery since this client is used to test manual discovery
ewoks_discovery=EwoksDiscoverySettings(on_start_up=False),
)
app.dependency_overrides[serverconfig.get_ewoks_settings] = (
get_ewoks_settings_for_tests
)
with TestClient(app) as client:
yield client
[docs]
@pytest.fixture()
def ewoks_handlers(tmpdir):
uri = f"file:{tmpdir / 'ewoks_events.db'}"
yield [
{
"class": "ewokscore.events.handlers.Sqlite3EwoksEventHandler",
"arguments": [{"name": "uri", "value": uri}],
}
]
events.cleanup()
[docs]
@pytest.fixture
def local_exec_client(tmpdir, ewoks_handlers):
"""Client to the REST server and Socket.IO (execution with process pool)."""
app = newserver.create_app()
def get_settings_override():
return serverconfig.EwoksSettings(
configured=True,
resource_directory=str(tmpdir),
ewoks_execution=EwoksExecutionSettings(handlers=ewoks_handlers),
)
app.dependency_overrides[serverconfig.get_ewoks_settings] = get_settings_override
with TestClient(app) as client:
with SocketIOTestClient() as sclient:
yield client, sclient
[docs]
@pytest.fixture
def celery_session_registered_worker(celery_session_worker):
# Some server end-points submit a celery task to all queues.
# If there are no queue's registered yet, nothing is submitted.
timeout_seconds = 10
start_time = time.time()
while not celery_session_worker.app.control.inspect().active_queues():
if time.time() - start_time > timeout_seconds:
pytest.fail(
f"Celery worker queues were not registered within {timeout_seconds} seconds."
)
time.sleep(0.1)
[docs]
@pytest.fixture
def celery_exec_client(tmpdir, celery_session_registered_worker, ewoks_handlers):
"""Client to the REST server and Socket.IO (execution with celery)."""
app = newserver.create_app()
def get_settings_override():
return serverconfig.EwoksSettings(
configured=True,
resource_directory=str(tmpdir),
ewoks_scheduling=EwoksJobSettings(
type=EwoksSchedulingType.Celery, configuration=dict()
),
ewoks_execution=EwoksExecutionSettings(handlers=ewoks_handlers),
)
app.dependency_overrides[serverconfig.get_ewoks_settings] = get_settings_override
with TestClient(app) as client:
with SocketIOTestClient() as sclient:
yield client, sclient
[docs]
@pytest.fixture
def celery_discover_timeout_client(
tmpdir, celery_session_registered_worker, ewoks_handlers
):
"""Client to the REST server and Socket.IO (with a very small timeout for discovery)"""
app = newserver.create_app()
def get_settings_override():
return serverconfig.EwoksSettings(
configured=True,
resource_directory=str(tmpdir),
ewoks_scheduling=EwoksJobSettings(
type=EwoksSchedulingType.Celery, configuration=dict()
),
ewoks_execution=EwoksExecutionSettings(handlers=ewoks_handlers),
# Disable discovery since this client is used to test manual discovery timeout
ewoks_discovery=EwoksDiscoverySettings(on_start_up=False, timeout=0.1),
)
app.dependency_overrides[serverconfig.get_ewoks_settings] = get_settings_override
with TestClient(app) as client:
with SocketIOTestClient() as sclient:
yield client, sclient
[docs]
@pytest.fixture
def png_icons():
filenames = resource_filenames()
return [_load_url(filename) for filename in filenames if filename.endswith(".png")]
[docs]
@pytest.fixture
def svg_icons():
filenames = resource_filenames()
return [_load_url(filename) for filename in filenames if filename.endswith(".svg")]
[docs]
@pytest.fixture(scope="session")
def default_icon_identifiers() -> List[Path]:
return [
url.name
for url in (DEFAULT_ROOT / "icons").iterdir()
if not url.name.startswith("__")
]
[docs]
@pytest.fixture(scope="session")
def default_workflow_identifiers() -> List[Path]:
return [
url.stem
for url in (DEFAULT_ROOT / "workflows").iterdir()
if url.suffix == ".json"
]
[docs]
@pytest.fixture(scope="session")
def default_task_identifiers() -> List[Path]:
return [
url.stem for url in (DEFAULT_ROOT / "tasks").iterdir() if url.suffix == ".json"
]
[docs]
@pytest.fixture
def mocked_local_submit(mocker) -> str:
submit_local_mock = mocker.patch(
"ewoksserver.app.routes.execution.utils.submit_local"
)
MockFuture = namedtuple("Future", ["task_id"])
arguments = dict()
task_id = 0
def mocked_submit(*args, **kwargs):
nonlocal task_id
arguments["args"] = args
arguments["kwargs"] = kwargs
task_id += 1
return MockFuture(task_id=task_id)
submit_local_mock.side_effect = mocked_submit
return arguments