Skip to content

Commit f4b88ab

Browse files
authored
Merge pull request #24 from SAFEHR-data/16-back_to_no_thread
Fixing errors whilst sticking to a single threaded application.
2 parents 7ccae8a + 5f396d7 commit f4b88ab

File tree

17 files changed

+360
-154
lines changed

17 files changed

+360
-154
lines changed

.github/workflows/pytest.yml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
name: Run pytest
2+
3+
on:
4+
push:
5+
branches: [main, dev]
6+
pull_request:
7+
branches: [main, dev]
8+
types: ["opened", "reopened", "synchronize", "ready_for_review", "draft"]
9+
workflow_dispatch:
10+
11+
jobs:
12+
build:
13+
name: Run pytest
14+
runs-on: ubuntu-latest
15+
16+
steps:
17+
- uses: actions/checkout@v5
18+
- name: Install uv
19+
uses: astral-sh/setup-uv@v7
20+
21+
- name: Install dependencies
22+
run: uv sync --locked --all-extras --dev
23+
24+
- name: Run the tests
25+
run: uv run pytest tests

.pre-commit-config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ repos:
1010
- id: ruff
1111
args:
1212
- --fix
13-
- waveform_controller
13+
- ./
1414
- id: ruff-format
1515
# Type-checking python code.
1616
- repo: https://github.com/pre-commit/mirrors-mypy
@@ -23,7 +23,7 @@ repos:
2323
"types-psycopg2",
2424
"types-pika"
2525
]
26-
files: waveform_controller/
26+
files: src/
2727
# ----------
2828
# Formats docstrings to comply with PEP257
2929
- repo: https://github.com/PyCQA/docformatter

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,8 @@ dependencies = [
1010
"psycopg2-binary>=2.9.11",
1111
]
1212

13+
[project.optional-dependencies]
14+
dev = ["pytest>=9.0.2"]
15+
1316
[project.scripts]
14-
emap-extract-waveform = "waveform_controller.controller:receiver"
17+
emap-extract-waveform = "controller:receiver"

settings.env.EXAMPLE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ UDS_USERNAME="inform_user"
55
UDS_PASSWORD="inform"
66
UDS_HOST="localhost"
77
UDS_PORT="5433"
8+
UDS_CONNECT_TIMEOUT="10" # in seconds
9+
UDS_QUERY_TIMEOUT="3000" # in milliseconds
810
SCHEMA_NAME="star_dev"
911
RABBITMQ_USERNAME="my_name"
1012
RABBITMQ_PASSWORD="my_pw"

src/controller.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
"""
2+
A script to receive messages in the waveform queue and write them to stdout,
3+
based on https://www.rabbitmq.com/tutorials/tutorial-one-python
4+
"""
5+
6+
import json
7+
from datetime import datetime, timezone
8+
import logging
9+
import pika
10+
import db as db # type:ignore
11+
import settings as settings # type:ignore
12+
import csv_writer as writer # type:ignore
13+
14+
logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s")
15+
logger = logging.getLogger(__name__)
16+
17+
18+
emap_db = db.starDB()
19+
emap_db.init_query()
20+
emap_db.connect()
21+
22+
23+
class waveform_message:
24+
def __init__(self, ch, delivery_tag, body):
25+
self.ch = ch
26+
self.delivery_tag = delivery_tag
27+
self.body = body
28+
29+
30+
def ack_message(ch, delivery_tag):
31+
"""Note that `ch` must be the same pika channel instance via which the
32+
message being ACKed was retrieved (AMQP protocol constraint)."""
33+
if ch.is_open:
34+
ch.basic_ack(delivery_tag)
35+
else:
36+
logger.warning("Attempting to acknowledge a message on a closed channel.")
37+
38+
39+
def reject_message(ch, delivery_tag, requeue):
40+
if ch.is_open:
41+
ch.basic_reject(delivery_tag, requeue)
42+
else:
43+
logger.warning("Attempting to not acknowledge a message on a closed channel.")
44+
45+
46+
def waveform_callback(ch, method_frame, _header_frame, body):
47+
data = json.loads(body)
48+
try:
49+
location_string = data["mappedLocationString"]
50+
observation_timestamp = data["observationTime"]
51+
source_stream_id = data["sourceStreamId"]
52+
sampling_rate = data["samplingRate"]
53+
units = data["unit"]
54+
waveform_data = data["numericValues"]
55+
mapped_location_string = data["mappedLocationString"]
56+
except IndexError as e:
57+
reject_message(ch, method_frame.delivery_tag, False)
58+
logger.error(
59+
f"Waveform message {method_frame.delivery_tag} is missing required data {e}."
60+
)
61+
return
62+
63+
observation_time = datetime.fromtimestamp(observation_timestamp, tz=timezone.utc)
64+
lookup_success = True
65+
try:
66+
matched_mrn = emap_db.get_row(location_string, observation_time)
67+
except ValueError as e:
68+
lookup_success = False
69+
logger.error(f"Ambiguous or non existent match: {e}")
70+
matched_mrn = ("unmatched_mrn", "unmatched_nhs", "unmatched_csn")
71+
except ConnectionError as e:
72+
logger.error(f"Database error, will try again: {e}")
73+
reject_message(ch, method_frame.delivery_tag, True)
74+
return
75+
76+
if writer.write_frame(
77+
waveform_data,
78+
source_stream_id,
79+
observation_timestamp,
80+
units,
81+
sampling_rate,
82+
mapped_location_string,
83+
matched_mrn[2],
84+
matched_mrn[0],
85+
):
86+
if lookup_success:
87+
ack_message(ch, method_frame.delivery_tag)
88+
else:
89+
reject_message(ch, method_frame.delivery_tag, False)
90+
91+
92+
def receiver():
93+
# set up database connection
94+
rabbitmq_credentials = pika.PlainCredentials(
95+
username=settings.RABBITMQ_USERNAME, password=settings.RABBITMQ_PASSWORD
96+
)
97+
connection_parameters = pika.ConnectionParameters(
98+
credentials=rabbitmq_credentials,
99+
host=settings.RABBITMQ_HOST,
100+
port=settings.RABBITMQ_PORT,
101+
)
102+
connection = pika.BlockingConnection(connection_parameters)
103+
channel = connection.channel()
104+
channel.basic_qos(prefetch_count=1)
105+
106+
channel.basic_consume(
107+
queue=settings.RABBITMQ_QUEUE,
108+
auto_ack=False,
109+
on_message_callback=waveform_callback,
110+
)
111+
try:
112+
channel.start_consuming()
113+
except KeyboardInterrupt:
114+
channel.stop_consuming()
115+
116+
connection.close()

src/csv_writer.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
"""Writes a frame of waveform data to a csv file."""
2+
3+
import csv
4+
from datetime import datetime
5+
from pathlib import Path
6+
7+
8+
def create_file_name(
9+
source_stream_id: str, observation_time: datetime, csn: str, units: str
10+
) -> str:
11+
"""Create a unique file name based on the patient contact serial number
12+
(csn) the date, and the source system."""
13+
datestring = observation_time.strftime("%Y-%m-%d")
14+
units = units.replace("/", "p")
15+
units = units.replace("%", "percent")
16+
return f"{datestring}.{csn}.{source_stream_id}.{units}.csv"
17+
18+
19+
def write_frame(
20+
waveform_data: dict,
21+
source_stream_id: str,
22+
observation_timestamp: float,
23+
units: str,
24+
sampling_rate: int,
25+
mapped_location_string: str,
26+
csn: str,
27+
mrn: str,
28+
) -> bool:
29+
"""Appends a frame of waveform data to a csv file (creates file if it
30+
doesn't exist.
31+
32+
:return: True if write was successful.
33+
"""
34+
observation_datetime = datetime.fromtimestamp(observation_timestamp)
35+
36+
out_path = "waveform-export/"
37+
Path(out_path).mkdir(exist_ok=True)
38+
39+
filename = out_path + create_file_name(
40+
source_stream_id, observation_datetime, csn, units
41+
)
42+
with open(filename, "a") as fileout:
43+
wv_writer = csv.writer(fileout, delimiter=",")
44+
waveform_data = waveform_data.get("value", "")
45+
46+
wv_writer.writerow(
47+
[
48+
csn,
49+
mrn,
50+
source_stream_id,
51+
units,
52+
sampling_rate,
53+
observation_timestamp,
54+
mapped_location_string,
55+
waveform_data,
56+
]
57+
)
58+
59+
return True

src/db.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from datetime import datetime
2+
import psycopg2
3+
from psycopg2 import sql, pool
4+
import logging
5+
6+
import settings as settings # type:ignore
7+
8+
logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s")
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class starDB:
13+
sql_query: str = ""
14+
connection_string: str = "dbname={} user={} password={} host={} port={} connect_timeout={} options='-c statement_timeout={}'".format(
15+
settings.UDS_DBNAME, # type:ignore
16+
settings.UDS_USERNAME, # type:ignore
17+
settings.UDS_PASSWORD, # type:ignore
18+
settings.UDS_HOST, # type:ignore
19+
settings.UDS_PORT, # type:ignore
20+
settings.UDS_CONNECT_TIMEOUT, # type:ignore
21+
settings.UDS_QUERY_TIMEOUT, # type:ignore
22+
)
23+
connection_pool: pool.ThreadedConnectionPool
24+
25+
def connect(self):
26+
self.connection_pool = pool.SimpleConnectionPool(1, 1, self.connection_string)
27+
28+
def init_query(self):
29+
with open("src/sql/mrn_based_on_bed_and_datetime.sql", "r") as file:
30+
self.sql_query = sql.SQL(file.read())
31+
self.sql_query = self.sql_query.format(
32+
schema_name=sql.Identifier(settings.SCHEMA_NAME)
33+
)
34+
35+
def get_row(self, location_string: str, observation_datetime: datetime):
36+
parameters = {
37+
"location_string": location_string,
38+
"observation_datetime": observation_datetime,
39+
}
40+
try:
41+
with self.connection_pool.getconn() as db_connection:
42+
with db_connection.cursor() as curs:
43+
curs.execute(self.sql_query, parameters)
44+
rows = curs.fetchall()
45+
self.connection_pool.putconn(db_connection)
46+
except psycopg2.errors.OperationalError as e:
47+
self.connection_pool.putconn(db_connection)
48+
raise ConnectionError(f"Data base error: {e}")
49+
50+
if len(rows) != 1:
51+
raise ValueError(
52+
f"Wrong number of rows returned from database. {len(rows)} != 1, for {location_string}:{observation_datetime}"
53+
)
54+
55+
return rows[0]
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ def get_from_env(env_var, setting_name=None):
1313
get_from_env("UDS_PASSWORD")
1414
get_from_env("UDS_HOST")
1515
get_from_env("UDS_PORT")
16+
get_from_env("UDS_CONNECT_TIMEOUT")
17+
get_from_env("UDS_QUERY_TIMEOUT")
1618
get_from_env("SCHEMA_NAME")
1719
get_from_env("RABBITMQ_USERNAME")
1820
get_from_env("RABBITMQ_PASSWORD")

waveform_controller/sql/mrn_based_on_bed_and_datetime.sql renamed to src/sql/mrn_based_on_bed_and_datetime.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ INNER JOIN {schema_name}.location_visit lv
1414
INNER JOIN {schema_name}.location loc
1515
ON lv.location_id = loc.location_id
1616
WHERE loc.location_string = %(location_string)s
17-
AND hv.valid_from BETWEEN %(start_datetime)s AND %(end_datetime)s
18-
ORDER by hv.valid_from DESC
17+
AND lv.admission_datetime <= %(observation_datetime)s
18+
AND ( lv.discharge_datetime >= %(observation_datetime)s OR lv.discharge_datetime IS NULL )

0 commit comments

Comments
 (0)