Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a21ce7f
First go at FTPS upload. Works!
jeremyestein Dec 11, 2025
210556b
Document that settings should live outside of any repo so they don't …
jeremyestein Dec 12, 2025
1cf2e8c
Control the PIXL install from our pyproject so the dependencies are all
jeremyestein Dec 12, 2025
8f96e8a
Make FTPS upload more usable and check that file to be uploaded is
jeremyestein Dec 12, 2025
b64b299
Convert CSV into parquet with the appropriate decimal array type.
jeremyestein Dec 12, 2025
9edb4ff
Make separate exporter container that runs cron to anonymise and export
jeremyestein Dec 16, 2025
129f8a6
Write a toy hasher so we can develop the rest of the pipeline in the
jeremyestein Dec 18, 2025
e64d155
Merge branch 'dev' into jeremy/pseudon
jeremyestein Dec 18, 2025
e0d64a4
Consistently use Python-style variable names
jeremyestein Dec 18, 2025
7f7dd8a
Move config around to reflect recent container changes
jeremyestein Dec 18, 2025
f6b1a67
Make log message more useful
jeremyestein Dec 18, 2025
00167a9
many env files now
jeremyestein Dec 18, 2025
4ef9683
Document manual pipeline calls
jeremyestein Dec 18, 2025
32ccc74
Fix all but one linting error
jeremyestein Dec 18, 2025
b54d1fb
Remove __init__.py that I had accidentally introduced at the top level,
jeremyestein Dec 18, 2025
9b39a47
Ignore missing import errors
jeremyestein Dec 18, 2025
48e2a83
Example crontab timing value was invalid (too many fields)
jeremyestein Dec 23, 2025
0ceaa39
Apply suggestions from code review
jeremyestein Dec 23, 2025
dfcb845
Clarify pyarrow data type usage
jeremyestein Dec 23, 2025
b4c7405
Add mention of PIXL to readme
jeremyestein Dec 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .dockerignore

This file was deleted.

3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ wheels/

# IDEs
.idea/

# settings files (should not be in the source tree anyway, but just in case)
*.env
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.11
3.13
18 changes: 14 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
FROM python:3.14-slim-bookworm
FROM python:3.13-slim-bookworm AS waveform_base
LABEL authors="Stephen Thompson, Jeremy Stein"
# Cron is really small. For the sake of not having to reinstall it all the time,
# put it on both images even though we only need it on exporter.
RUN export DEBIAN_FRONTEND=noninteractive && \
apt-get update && \
apt-get install --yes --no-install-recommends cron && \
apt-get autoremove --yes && apt-get clean --yes && rm -rf /var/lib/apt/lists/*
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
WORKDIR /app
ARG UVCACHE=/root/.cache/uv
COPY pyproject.toml uv.lock* /app/
COPY PIXL /PIXL
WORKDIR /app
COPY waveform-controller/pyproject.toml waveform-controller/uv.lock /app/
RUN --mount=type=cache,target=${UVCACHE} uv pip install --system .
COPY . /app/
COPY waveform-controller/. /app/
RUN --mount=type=cache,target=${UVCACHE} uv pip install --system .
FROM waveform_base AS waveform_controller
CMD ["emap-extract-waveform"]
FROM waveform_base AS waveform_exporter
ENTRYPOINT ["/app/exporter-scripts/entrypoint.sh"]
24 changes: 22 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ Configuration, copy the configuration file to the config directory and edit
as necessary. Remove the comment telling you not to put secrets in it.

```
cp settings.env.EXAMPLE config/settings.env
mkdir ../config
cp config.EXAMPLE/controller.env.EXAMPLE ../config/controller.env
cp config.EXAMPLE/exporter.env.EXAMPLE ../config/settings.env
cp config.EXAMPLE/hasher.env.EXAMPLE ../config/hasher.env
```
If it doesn't already exist you should create a directory named
`waveform-export` in the parent directory to store the saved waveform
Expand All @@ -49,7 +52,7 @@ messages.
mkdir ../waveform-export
```

Build and start the controller with docker
Build and start the controller and exporter with docker
```
cd ../waveform-controller
docker compose build
Expand All @@ -67,5 +70,22 @@ Each row of the csv will contain

`csn, mrn, units, samplingRate, observationTime, waveformData`

## Perform a parquet conversion (including de-id)
At the time of writing, the cron pipeline is not set up. This section shows
how to perform an ad-hoc de-id.
```
docker compose run waveform-controller emap-csv-pseudon --csv /waveform-export/original-csv/my_original_csv.csv
```

## Perform an export
At the time of writing, the cron pipeline is not set up. This section shows
how to perform an ad-hoc FTPS upload.

Exported files must be under the WAVEFORM_PSEUDONYMISED_PARQUET directory.
Files passed in must be given relative to this directory:
```
docker compose run --entrypoint "" waveform-exporter emap-send-ftps my_pseudonymised_file.parquet
```

# Developing
See [developing docs](docs/develop.md)
Empty file added __init__.py
Empty file.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# This is an EXAMPLE file, do not put real secrets in here.
# Copy it to ./config/settings.env and then DELETE THIS COMMENT.
# Copy it to ./config/*.env and then DELETE THIS COMMENT.
UDS_DBNAME="fakeuds"
UDS_USERNAME="inform_user"
UDS_PASSWORD="inform"
Expand Down
8 changes: 8 additions & 0 deletions config.EXAMPLE/exporter.env.EXAMPLE
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# This is an EXAMPLE file, do not put real secrets in here.
# Copy it to ./config/*.env and then DELETE THIS COMMENT.
# When does the exporter run
EXPORTER_CRON_SCHEDULE="14 5 * * * *"
FTPS_HOST=myftps.example.com
FTPS_PORT=990
FTPS_USERNAME=my_username
FTPS_PASSWORD=my_password
6 changes: 6 additions & 0 deletions config.EXAMPLE/hasher.env.EXAMPLE
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# This is an EXAMPLE file, do not put real secrets in here.
# Copy it to ./config/*.env and then DELETE THIS COMMENT.
HASHER_API_AZ_CLIENT_ID=
HASHER_API_AZ_CLIENT_PASSWORD=
HASHER_API_AZ_TENANT_ID=
HASHER_API_AZ_KEY_VAULT_NAME=
2 changes: 0 additions & 2 deletions config/.gitignore

This file was deleted.

39 changes: 35 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,46 @@
services:
waveform-controller:
build:
context: .
dockerfile: Dockerfile
context: ..
dockerfile: waveform-controller/Dockerfile
target: waveform_controller
args:
HTTP_PROXY: ${HTTP_PROXY}
http_proxy: ${http_proxy}
HTTPS_PROXY: ${HTTPS_PROXY}
https_proxy: ${https_proxy}
# ideally we'd use docker secrets but it's not enabled currently
env_file:
- ./config/settings.env
- ../config/controller.env
volumes:
- ../waveform-export:/app/waveform-export
- ../waveform-export:/waveform-export
waveform-exporter:
build:
context: ..
dockerfile: waveform-controller/Dockerfile
target: waveform_exporter
args:
HTTP_PROXY: ${HTTP_PROXY}
http_proxy: ${http_proxy}
HTTPS_PROXY: ${HTTPS_PROXY}
https_proxy: ${https_proxy}
# ideally we'd use docker secrets but it's not enabled currently
env_file:
- ../config/exporter.env
volumes:
- ../waveform-export:/waveform-export
waveform-hasher:
build:
context: ../PIXL
dockerfile: ./docker/pixl-python/Dockerfile
target: hasher_api
args:
PIXL_PACKAGE_DIR: hasher
HTTP_PROXY: ${HTTP_PROXY}
http_proxy: ${http_proxy}
HTTPS_PROXY: ${HTTPS_PROXY}
https_proxy: ${https_proxy}
ports:
- "127.0.0.1:${HASHER_API_PORT}:8000"
env_file:
- ../config/hasher.env
16 changes: 16 additions & 0 deletions exporter-scripts/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

# Set up cron schedule according to the environment variable
if [ -z "$EXPORTER_CRON_SCHEDULE" ]; then
echo "You must set EXPORTER_CRON_SCHEDULE when running this container"
exit 1
fi
set -x
cat <<EOF | crontab -
PATH=/usr/local/bin:/usr/bin:/bin
SHELL=/usr/bin/bash
$EXPORTER_CRON_SCHEDULE /app/exporter-scripts/scheduled-script.sh
EOF

# cron scheduler is PID 1 in this container
exec cron -f
6 changes: 6 additions & 0 deletions exporter-scripts/scheduled-script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash

# Run by the cron scheduler
# Probably want snakemake instead...
emap-csv-pseudon --help
emap-send-ftps --help
9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ description = "Add your description here"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"pandas",
#"pyarrow>=22.0",
"pika>=1.3.2",
"pre-commit>=4.5.0",
"psycopg2-binary>=2.9.11",
# need to be compatible with PIXL, which currently pins 2.9.10 (arguably it shouldn't)
"psycopg2-binary>=2.9.10",
# trick for making a "relative" path, works inside or outside container image
"core @ file:///${PROJECT_ROOT}/../PIXL/pixl_core",
]

[project.optional-dependencies]
dev = ["pytest>=9.0.2"]

[project.scripts]
emap-extract-waveform = "controller:receiver"
emap-csv-pseudon = "pseudon.pseudon:main"
emap-send-ftps = "exporter.ftps:do_upload"
5 changes: 3 additions & 2 deletions src/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ def waveform_callback(ch, method_frame, _header_frame, body):
matched_mrn = emap_db.get_row(location_string, observation_time)
except ValueError as e:
lookup_success = False
logger.error(f"Ambiguous or non existent match: {e}")
logger.error("Ambiguous or non existent match for location %s, obs time %s",
location_string, observation_time, exc_info=True)
matched_mrn = ("unmatched_mrn", "unmatched_nhs", "unmatched_csn")
except ConnectionError as e:
logger.error(f"Database error, will try again: {e}")
logger.error("Database error, will try again", exc_info=True)
reject_message(ch, method_frame.delivery_tag, True)
return

Expand Down
12 changes: 9 additions & 3 deletions src/csv_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import csv
from datetime import datetime
from locations import WAVEFORM_ORIGINAL_CSV
from pathlib import Path


Expand Down Expand Up @@ -33,12 +34,17 @@ def write_frame(
"""
observation_datetime = datetime.fromtimestamp(observation_timestamp)

out_path = "waveform-export/"
Path(out_path).mkdir(exist_ok=True)
WAVEFORM_ORIGINAL_CSV.mkdir(exist_ok=True, parents=False)

filename = out_path + create_file_name(
filename = WAVEFORM_ORIGINAL_CSV / create_file_name(
source_stream_id, observation_datetime, csn, units
)

# write header if is new file
if not filename.exists():
with open(filename, "w") as fileout:
fileout.write("csn,mrn,source_stream_id,units,sampling_rate,timestamp,location,values\n")

with open(filename, "a") as fileout:
wv_writer = csv.writer(fileout, delimiter=",")
waveform_data = waveform_data.get("value", "")
Expand Down
Empty file added src/exporter/__init__.py
Empty file.
45 changes: 45 additions & 0 deletions src/exporter/ftps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import logging
import os
from pathlib import Path
from typing import BinaryIO

from core.uploader._ftps import _connect_to_ftp, _create_and_set_as_cwd
import settings
import argparse
from locations import WAVEFORM_PSEUDONYMISED_PARQUET

logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s")
logger = logging.getLogger(__name__)

def do_upload():
parser = argparse.ArgumentParser()
parser.add_argument("file_to_upload", type=Path, help="file to upload relative to pseudonymised folder")
args = parser.parse_args()
do_upload_inner(args.file_to_upload)


def do_upload_inner(rel_file_to_upload: Path):
# Absolute paths override the base path, so disallow that (abspath1 / abspath2 == abspath2)
if rel_file_to_upload.is_absolute():
raise ValueError("File must be relative to pseudonymised folder")
WAVEFORM_PSEUDONYMISED_PARQUET.mkdir(parents=False, exist_ok=True)
file_to_upload = (WAVEFORM_PSEUDONYMISED_PARQUET / rel_file_to_upload).resolve(strict=True)
# Check that even after evaluating ".." and symlinks, the file is still under the "safe" directory
# for upload.
if not file_to_upload.is_relative_to(WAVEFORM_PSEUDONYMISED_PARQUET):
raise ValueError(f"File {file_to_upload} must be under {WAVEFORM_PSEUDONYMISED_PARQUET}. "
f"If this is unexpected, maybe you are using symlinks or '..' in the path?")
if not file_to_upload.exists():
raise ValueError(f"File {file_to_upload} does not exist")
logger.info("Connecting to FTPS server %s:%s, with username %s", settings.FTPS_HOST, settings.FTPS_PORT, settings.FTPS_USERNAME)
ftp = _connect_to_ftp(settings.FTPS_HOST, settings.FTPS_PORT, settings.FTPS_USERNAME, settings.FTPS_PASSWORD)
remote_project_dir = "waveform-export"
_create_and_set_as_cwd(ftp, remote_project_dir)
remote_filename = os.path.basename(file_to_upload)
command = f"STOR {remote_filename}"
logger.info("Uploading file %s", file_to_upload)
with open(file_to_upload, 'rb') as file_to_upload_fh:
ftp.storbinary(command, file_to_upload_fh)
print("Directory listing: ")
ftp.dir()
ftp.quit()
6 changes: 6 additions & 0 deletions src/locations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pathlib import Path

WAVEFORM_EXPORT_BASE = Path('/waveform-export')
WAVEFORM_ORIGINAL_CSV = WAVEFORM_EXPORT_BASE / 'original-csv'
WAVEFORM_ORIGINAL_PARQUET = WAVEFORM_EXPORT_BASE / 'original-parquet'
WAVEFORM_PSEUDONYMISED_PARQUET = WAVEFORM_EXPORT_BASE / 'pseudonymised'
Empty file added src/pseudon/__init__.py
Empty file.
16 changes: 16 additions & 0 deletions src/pseudon/hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

from functools import lru_cache


@lru_cache
def do_hash(type_prefix, value: str):
"""
Stub implementation of deidentification function for testing purposes.

Not that I think this will happen in practice, but we'd want the CSN "1234" to hash to a different
value than the MRN "1234", so prefix each value with its type.
"""
SALT = "waveform-exporter"
full_value_to_hash = f"{SALT}:{type_prefix}:{value}"
hash_str = f"{hash(full_value_to_hash) & 0xFFFFFFFF:08x}"
return hash_str
Loading
Loading