Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion bluesky_httpserver/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from fastapi.openapi.utils import get_openapi

from .authentication import Mode
from .console_output import CollectPublishedConsoleOutput
from .console_output import CollectPublishedConsoleOutput, ConsoleOutputStream, SystemInfoStream
from .core import PatchedStreamingResponse
from .database.core import purge_expired
from .resources import SERVER_RESOURCES as SR
Expand Down Expand Up @@ -346,6 +346,11 @@ async def purge_expired_sessions_and_api_keys():

SR.set_console_output_loader(CollectPublishedConsoleOutput(rm_ref=RM))
SR.console_output_loader.start()
SR.set_console_output_stream(ConsoleOutputStream(rm_ref=RM))
SR.console_output_stream.start()
SR.console_output_loader.subscribe(SR.console_output_stream.add_message)
SR.set_system_info_stream(SystemInfoStream(rm_ref=RM))
SR.system_info_stream.start()

# Import module with custom code
module_names_str = os.getenv("QSERVER_CUSTOM_MODULES", None)
Expand Down Expand Up @@ -387,6 +392,8 @@ async def purge_expired_sessions_and_api_keys():
async def shutdown_event():
await SR.RM.close()
await SR.console_output_loader.stop()
await SR.console_output_stream.stop()
await SR.system_info_stream.stop()

@lru_cache(1)
def override_get_authenticators():
Expand Down
163 changes: 163 additions & 0 deletions bluesky_httpserver/console_output.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import inspect
import json
import logging
import queue
Expand Down Expand Up @@ -51,6 +52,9 @@ def __init__(self, *, rm_ref):
self._background_task_stopped = asyncio.Event()
self._background_task_stopped.set()

self._callbacks = []
self._callbacks_async = []

@property
def queues_set(self):
"""
Expand All @@ -67,6 +71,22 @@ def text_buffer_uid(self):
async def get_text_buffer(self, n_lines):
return await self._RM.console_monitor.text(n_lines)

def subscribe(self, cb):
"""
Add a function or a coroutine to the list of callbacks. The callbacks must accept
message as a parameter: cb(msg)
"""
if inspect.iscoroutinefunction(cb):
self._callbacks_async.append(cb)
else:
self._callbacks.append(cb)

def unsubscribe(self, cb):
if inspect.iscoroutinefunction(cb):
self._callbacks_async.remove(cb)
else:
self._callbacks.remove(cb)

def get_new_msgs(self, last_msg_uid):
msg_list = []
try:
Expand Down Expand Up @@ -94,6 +114,10 @@ async def _load_msgs_task(self):
try:
msg = await self._RM.console_monitor.next_msg(timeout=0.5)
self._add_message(msg=msg)
for cb in self._callbacks:
cb(msg)
for cb in self._callbacks_async:
await cb(msg)
except self._RM.RequestTimeoutError:
pass
self._background_task_stopped.set()
Expand Down Expand Up @@ -167,3 +191,142 @@ def __init__(self, content_class, *args, **kwargs):

def __del__(self):
del self._content


class ConsoleOutputStream:
def __init__(self, *, rm_ref):
self._queues = {}
self._queue_max_size = 1000

@property
def queues(self):
return self._queues

def add_queue(self, key):
"""
Add a new queue to the dictionary of queues. The key is a reference to the socket for
for connection with the client.
"""
queue = asyncio.Queue(maxsize=self._queue_max_size)
self._queues[key] = queue
return queue

def remove_queue(self, key):
"""
Remove the queue identified by the key from the dictionary of queues.
"""
if key in self._queues:
del self._queues[key]

async def add_message(self, msg):
msg_json = json.dumps(msg)
for q in self._queues.values():
# Protect from overflow. It's ok to discard old messages.
if q.full():
q.get_nowait()
await q.put(msg_json)

def start(self):
pass

async def stop(self):
pass


class SystemInfoStream:
def __init__(self, *, rm_ref):
self._RM = rm_ref
self._queues_status = {}
self._queues_info = {}
self._background_task = None
self._background_task_running = False
self._background_task_stopped = asyncio.Event()
self._background_task_stopped.set()
self._num = 0
self._queue_max_size = 1000

@property
def background_task_running(self):
return self._background_task_running

@property
def queues_status(self):
return self._queues_status

@property
def queues_info(self):
return self._queues_info

def add_queue_status(self, key):
"""
Add a new queue to the dictionary of queues. The key is a reference to the socket for
for connection with the client.
"""
queue = asyncio.Queue(maxsize=self._queue_max_size)
self._queues_status[key] = queue
return queue

def add_queue_info(self, key):
"""
Add a new queue to the dictionary of queues. The key is a reference to the socket for
for connection with the client.
"""
queue = asyncio.Queue(maxsize=self._queue_max_size)
self._queues_info[key] = queue
return queue

def remove_queue_status(self, key):
"""
Remove the queue identified by the key from the dictionary of queues.
"""
if key in self._queues_status:
del self._queues_status[key]

def remove_queue_info(self, key):
"""
Remove the queue identified by the key from the dictionary of queues.
"""
if key in self._queues_info:
del self._queues_info[key]

def _start_background_task(self):
if not self._background_task_running:
self._background_task = asyncio.create_task(self._load_msgs_task())

async def _stop_background_task(self):
self._background_task_running = False
await self._background_task_stopped.wait()

async def _load_msgs_task(self):
self._background_task_stopped.clear()
self._background_task_running = True
while self._background_task_running:
try:
msg = await self._RM.system_info_monitor.next_msg(timeout=0.5)

if isinstance(msg, dict) and "msg" in msg:
msg_json = json.dumps(msg)
# ALL 'info' messages
for q in self._queues_info.values():
# Protect from overflow. It's ok to discard old messages.
if q.full():
q.get_nowait()
await q.put(msg_json)
if isinstance(msg["msg"], dict) and "status" in msg["msg"]:
# ONLY 'status' messages
for q in self._queues_status.values():
# Protect from overflow. It's ok to discard old messages.
if q.full():
q.get_nowait()
await q.put(msg_json)
except self._RM.RequestTimeoutError:
pass
self._background_task_stopped.set()

def start(self):
self._RM.system_info_monitor.enable()
self._start_background_task()

async def stop(self):
await self._stop_background_task()
await self._RM.system_info_monitor.disable_wait()
23 changes: 23 additions & 0 deletions bluesky_httpserver/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ def __init__(self):
self._RM = None
self._custom_code_modules = []
self._console_output_loader = None
self._stop_server = False

def set_RM(self, RM):
self._RM = RM
Expand Down Expand Up @@ -37,5 +38,27 @@ def console_output_loader(self):
def console_output_loader(self, _):
raise RuntimeError("Attempting to set read-only property 'console_output_loader'")

def set_console_output_stream(self, console_output_stream):
self._console_output_stream = console_output_stream

@property
def console_output_stream(self):
return self._console_output_stream

@console_output_stream.setter
def console_output_stream(self, _):
raise RuntimeError("Attempting to set read-only property 'console_output_stream'")

def set_system_info_stream(self, system_info_stream):
self._system_info_stream = system_info_stream

@property
def system_info_stream(self):
return self._system_info_stream

@system_info_stream.setter
def system_info_stream(self, _):
raise RuntimeError("Attempting to set read-only property 'system_info_stream'")


SERVER_RESOURCES = _ServerResources()
97 changes: 96 additions & 1 deletion bluesky_httpserver/routers/core_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import pydantic
from bluesky_queueserver.manager.conversions import simplify_plan_descriptions, spreadsheet_to_plan_list
from fastapi import APIRouter, Depends, File, Form, Request, Security, UploadFile
from fastapi import APIRouter, Depends, File, Form, Request, Security, UploadFile, WebSocket, WebSocketDisconnect
from packaging import version

if version.parse(pydantic.__version__) < version.parse("2.0.0"):
Expand Down Expand Up @@ -1098,3 +1098,98 @@ def console_output_update(payload: dict, principal=Security(get_current_principa
process_exception()

return response


class WebSocketMonitor:
"""
Works for sockets that only send data to clients (not receive).

The class monitors the status of a socket connection. The property 'is_alive' returns True
until the socket is disconnected. The purpose of the class is to break the loop in the
implementation of the socket that only sends data to a client when the application
is closed. If there is no data to send, the loop continues to run indefinitely and
prevents the application from closing properly. No better solution was found.
"""

def __init__(self, websocket):
self._websocket = websocket
self._is_alive = True
self._task_ref = None

async def _task(self):
while True:
try:
await asyncio.sleep(1)
try:
# The following will raise an exception if the socket is disconnected.
await asyncio.wait_for(self._websocket.receive(), timeout=0.01)
except asyncio.TimeoutError:
# The socket is still connected.
pass
except Exception:
self._is_alive = False
break

def start(self):
self._task_ref = asyncio.create_task(self._task())

@property
def is_alive(self):
return self._is_alive


@router.websocket("/console_output/ws")
async def console_output_ws(websocket: WebSocket):
await websocket.accept()
q = SR.console_output_stream.add_queue(websocket)
wsmon = WebSocketMonitor(websocket)
wsmon.start()
try:
while wsmon.is_alive:
try:
msg = await asyncio.wait_for(q.get(), timeout=1)
await websocket.send_text(msg)
except asyncio.TimeoutError:
pass
except WebSocketDisconnect:
pass
finally:
SR.console_output_stream.remove_queue(websocket)


@router.websocket("/status/ws")
async def status_ws(websocket: WebSocket):
await websocket.accept()
q = SR.system_info_stream.add_queue_status(websocket)
wsmon = WebSocketMonitor(websocket)
wsmon.start()
try:
while wsmon.is_alive:
try:
msg = await asyncio.wait_for(q.get(), timeout=1)
await websocket.send_text(msg)
except asyncio.TimeoutError:
pass
except WebSocketDisconnect:
pass
finally:
SR.system_info_stream.remove_queue_status(websocket)


@router.websocket("/info/ws")
async def info_ws(websocket: WebSocket):
await websocket.accept()
q = SR.system_info_stream.add_queue_info(websocket)
wsmon = WebSocketMonitor(websocket)
wsmon.start()
try:
while wsmon.is_alive:
try:
msg = await asyncio.wait_for(q.get(), timeout=1)
await websocket.send_text(msg)
except asyncio.TimeoutError:
pass
except WebSocketDisconnect:
pass
finally:
SR.system_info_stream.remove_queue_info(websocket)
Loading
Loading