Source code for ewoksserver.app.routes.execution.socketio
import logging
import asyncio
import threading
from datetime import datetime
from typing import Optional
from concurrent.futures import ThreadPoolExecutor
import socketio
from socketio.exceptions import ConnectionRefusedError
from . import events
from ...config import EwoksSettings
logger = logging.getLogger(__name__)
[docs]class EwoksEventManager:
"""Asynchronous manager of a Socket.IO application."""
def __init__(self) -> None:
# Disable Socket.IO CORS since it is managed by the CORS middleware
# https://github.com/encode/starlette/issues/1309#issuecomment-953930195
self._sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins=[])
self._app = socketio.ASGIApp(self._sio, socketio_path="")
self._sio.on("connect")(self.connect)
self._sio.on("disconnect")(self.disconnect)
self._ewoks_settings = None
self._stop_event = threading.Event()
self._fetch_events_future: Optional[asyncio.Future] = None
self._counter = 0
self._executor = ThreadPoolExecutor(max_workers=1)
[docs] async def connect(self, *_) -> None:
if self._ewoks_settings.without_events:
raise ConnectionRefusedError("Socket.IO has been disabled")
self._counter += 1
await self._start()
[docs] async def disconnect(self, *_) -> None:
if self._ewoks_settings.without_events:
return
self._counter = max(self._counter - 1, 0)
if self._counter == 0:
await self._stop(timeout=3)
[docs] async def is_running(self) -> bool:
return await self._is_running(self._fetch_events_future)
@staticmethod
async def _is_running(future: Optional[asyncio.Future] = None) -> bool:
return future is not None and not future.done()
async def _start(self) -> None:
if await self.is_running():
return
self._stop_event.clear()
loop = asyncio.get_running_loop()
self._fetch_events_future = loop.run_in_executor(
self._executor, self._fetch_events_main, loop
)
async def _stop(self, timeout: Optional[float] = None) -> None:
future = self._fetch_events_future
if not await self._is_running(future):
return
self._stop_event.set()
await asyncio.wait_for(future, timeout=timeout)
def _fetch_events_main(self, loop) -> None:
try:
with events.reader_context(self._ewoks_settings) as reader:
if reader is None:
raise RuntimeError("Ewoks event handlers not configured")
starttime = datetime.now().astimezone()
for event in reader.wait_events(
starttime=starttime, stop_event=self._stop_event
):
if self._stop_event.is_set():
break
self._emit(loop, event)
except Exception as e:
# TODO: client needs to recieve an error
logger.exception(str(e))
raise
finally:
self._fetch_events_future = None
def _emit(self, loop, event) -> None:
coroutine = self._sio.emit("Executing", event)
future = asyncio.run_coroutine_threadsafe(coroutine, loop)
future.result()
[docs]def create_socketio_app() -> socketio.ASGIApp:
"""Create the ASGI Socket.IO application when needed"""
global _MANAGER
if _MANAGER is not None:
return _MANAGER._app
_MANAGER = EwoksEventManager()
return _MANAGER._app
_MANAGER = None