Source code for ewoksserver.tests.test_socketio_connection

import time


[docs]def test_socketio_connection_local(local_exec_client): _, sclient = local_exec_client _test_socketio_connection(sclient)
[docs]def test_socketio_connection_celery(celery_exec_client): _, sclient = celery_exec_client _test_socketio_connection(sclient)
def _test_socketio_connection(sclient): assert sclient.is_running() _assert_eventloop_is_running(True, sclient) sclient.disconnect() _assert_eventloop_is_running(False, sclient) sclient.connect() _assert_eventloop_is_running(True, sclient) def _assert_eventloop_is_running(running, sclient, timeout=3): t0 = time.time() while True: if sclient.is_running() == running: return time.sleep(0.1) if time.time() - t0 > timeout: raise TimeoutError