Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion jobs/interactions-update/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 59 additions & 10 deletions jobs/interactions-update/src/interactions_update/job.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
import concurrent.futures
import logging
import os
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from enum import StrEnum

import requests
from dotenv import find_dotenv
from dotenv import load_dotenv
from sqlalchemy import case
from sqlalchemy import select

from interactions_update.database import get_session
from strr_api.enums.enum import InteractionStatus
from strr_api.models import CustomerInteraction
from strr_api.services import AuthService
from strr_api.services.auth_service import AuthService

load_dotenv(find_dotenv())

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class UpdateResult(StrEnum):
"""Job outcomes that do not map to a terminal Notify delivery status."""

MISSING_REFERENCE = "MISSING_REFERENCE"
UNCHANGED = "UNCHANGED"


def fetch_and_update(interaction_id, notify_url, headers, timeout):
"""Worker function to fetch status for a single interaction."""
# We fetch the interaction in a local session or use IDs to avoid thread-safety issues with ORM objects
Expand All @@ -26,7 +38,7 @@ def fetch_and_update(interaction_id, notify_url, headers, timeout):
try:
interaction = session.get(CustomerInteraction, interaction_id)
if not interaction or not interaction.notify_reference:
return False
return UpdateResult.MISSING_REFERENCE

notify_request = f"{notify_url}/notify/{interaction.notify_reference}"
resp = requests.get(notify_request, headers=headers, timeout=timeout)
Expand All @@ -46,8 +58,8 @@ def fetch_and_update(interaction_id, notify_url, headers, timeout):
interaction.provider_reference = str(data.get("id"))
interaction.meta_data = data
session.commit()
return True
return False
return new_status
return UpdateResult.UNCHANGED
finally:
session.close()

Expand Down Expand Up @@ -79,12 +91,30 @@ def run(max_workers=None):
session = next(session_gen)

try:
stmt = select(CustomerInteraction.id).where(
CustomerInteraction.status == InteractionStatus.SENT
)
interaction_ids = session.scalars(stmt).all()
stale_sent_hours = int(os.getenv("STALE_SENT_HOURS", "24"))
stale_threshold = datetime.now(timezone.utc) - timedelta(hours=stale_sent_hours)
stmt = select(
CustomerInteraction.id,
case(
(CustomerInteraction.created_at < stale_threshold, True), else_=False
).label("is_stale"),
).where(CustomerInteraction.status == InteractionStatus.SENT)
sent_interactions = session.execute(stmt).all()
interaction_ids = [row.id for row in sent_interactions]
stale_interaction_ids = [row.id for row in sent_interactions if row.is_stale]
if stale_interaction_ids:
logger.warning(
"strr.interactions.stale_sent_detected stale_sent_count=%s stale_sent_hours=%s interaction_ids=%s",
len(stale_interaction_ids),
stale_sent_hours,
stale_interaction_ids[:25],
)

if not interaction_ids:
logger.info(
"strr.interactions.update_completed sent_count=0 stale_sent_count=%s",
len(stale_interaction_ids),
)
return

token = AuthService.get_service_client_token(**_get_auth_config())
Expand All @@ -93,10 +123,13 @@ def run(max_workers=None):
"Content-Type": "application/json",
}

results = []
# If max_workers is 1, run sequentially used in benchmarking comparison
if max_workers == 1:
for i_id in interaction_ids:
fetch_and_update(i_id, notify_url, headers, notify_timeout)
results.append(
fetch_and_update(i_id, notify_url, headers, notify_timeout)
)
else:
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
Expand All @@ -107,6 +140,22 @@ def run(max_workers=None):
)
for i_id in interaction_ids
]
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
results.append(future.result())

failed_count = results.count(InteractionStatus.FAILED)
delivered_count = results.count(InteractionStatus.DELIVERED)
if failed_count:
logger.warning(
"strr.interactions.failed_detected interaction_status=FAILED failed_count=%s",
failed_count,
)
logger.info(
"strr.interactions.update_completed sent_count=%s delivered_count=%s failed_count=%s stale_sent_count=%s",
len(interaction_ids),
delivered_count,
failed_count,
len(stale_interaction_ids),
)
finally:
session.close()
57 changes: 57 additions & 0 deletions jobs/interactions-update/tests/load/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
import uuid
from datetime import datetime
from datetime import timezone
from types import SimpleNamespace
from unittest.mock import patch

import pytest
import responses
from sqlalchemy import select

from interactions_update.job import UpdateResult
from interactions_update.job import run
from strr_api.enums.enum import ChannelType
from strr_api.enums.enum import InteractionStatus
Expand Down Expand Up @@ -133,6 +136,60 @@ def test_run_failure_502(db_session, setup_bulk_interactions, monkeypatch):
assert interaction.status == InteractionStatus.SENT


@pytest.mark.parametrize("setup_bulk_interactions", [{"records": 1}], indirect=True)
def test_run_logs_stale_sent_interactions(setup_bulk_interactions, monkeypatch):
"""Test that stale SENT interactions are logged for alerting."""

class QueryResult:
"""Small result object matching the SQLAlchemy execute().all() call used by the job."""

def __init__(self, values):
self._values = values

def all(self):
return self._values

class SessionProxy:
"""Return one stale SENT interaction id and one SENT interaction id."""

def __init__(self, interaction_id):
self._interaction_id = interaction_id

def execute(self, _stmt):
return QueryResult(
[SimpleNamespace(id=self._interaction_id, is_stale=True)]
)

def close(self):
return None

interaction_id = setup_bulk_interactions["interaction_ids"][0]

monkeypatch.setenv("STALE_SENT_HOURS", "0")
monkeypatch.setattr(
"interactions_update.job.get_session",
lambda: iter([SessionProxy(interaction_id)]),
)
monkeypatch.setattr(
"interactions_update.job.AuthService.get_service_client_token",
lambda **kwargs: "123",
)
monkeypatch.setattr(
"interactions_update.job.fetch_and_update",
lambda *args: UpdateResult.UNCHANGED,
)

with patch("interactions_update.job.logger.warning") as mock_warning:
run(max_workers=1)

mock_warning.assert_any_call(
"strr.interactions.stale_sent_detected stale_sent_count=%s stale_sent_hours=%s interaction_ids=%s",
1,
0,
[interaction_id],
)


scenario_bulk = {
"records": 1000,
"target_days": [0],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Short-Term Rental Registration Approved

**Registration Number:**&nbsp;&nbsp;&nbsp;&nbsp;{{reg_num}}
**Registration Expiry Date:**&nbsp;&nbsp;&nbsp;&nbsp;{{ expiry_date }}

**Strata Hotel Address:**&nbsp;{{street_address}}
**City:**&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{{city}}
**Postal Code:**&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{{postal_code}}

Your application to register the short-term rental is **approved**.

---

# Terms and Conditions
{% if custom_content %}
{{custom_content | escape }}

{% endif %}
The registration is subject to the terms and conditions set out by the Registrar. Failure to comply may result in the suspension or cancellation of your registration.

---

# Important Next Steps
1. Log in to your [Short Term Rental Registry Dashboard](https://stratahotel.shorttermrental.registry.gov.bc.ca/en-CA/auth/login/).

2. Update each listing for this strata hotel with your **Registration Number**.

3. Keep your business documents up to date for future renewals.

---

[[strr-footer.md]]
2 changes: 1 addition & 1 deletion queue_services/strr-email/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading