Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
724021b
Added two db upgrades to add rcs notification type and twilio as a rc…
jimleroyer Mar 6, 2026
badabb3
Added draft code for new Twilio provider
jimleroyer Mar 6, 2026
a07de56
Merge remote-tracking branch 'origin/main' into proto/rcs-provider
jimleroyer Mar 10, 2026
3f26de8
Added Twilio library as a dep; added review comments for future me
jimleroyer Mar 10, 2026
8cfb7a1
Updated the migration scripts to include more of the missing db types…
jimleroyer Mar 10, 2026
b5638cd
Added missing RCS checks and permission types
jimleroyer Mar 10, 2026
1f4949f
Added queues pipeline for RCS; added missing RCS types
jimleroyer Mar 10, 2026
96ba14b
Added missing types in function signature
jimleroyer Mar 10, 2026
b2581d4
Added queues config for RCS along with rate config
jimleroyer Mar 10, 2026
d218293
Added missing error type for RCS request error
jimleroyer Mar 10, 2026
ce6d5f9
Added missing template_type value of RCS in db migration script
jimleroyer Mar 10, 2026
32758bd
Added missing provider_details_history entry in the db migration down…
jimleroyer Mar 10, 2026
27f0a0a
Addressing PR's comments by AI on mismatched RCS/SMS coding logic
jimleroyer Mar 10, 2026
8f7aa89
Merge remote-tracking branch 'origin/main' into proto/rcs-provider
jimleroyer Mar 12, 2026
4c1f1aa
Merge remote-tracking branch 'origin/main' into proto/rcs-provider
jimleroyer Mar 13, 2026
5fe4794
Step 4 out of 5 for RCS prototype implementation
jimleroyer Mar 13, 2026
fd7c048
Merge remote-tracking branch 'origin/main' into proto/rcs-provider
jimleroyer Mar 13, 2026
8922aeb
Updated locked dependencies with new Twilio tool library
jimleroyer Mar 13, 2026
275f289
Step 5 with billing related changes for RCS; this is not exhaustive b…
jimleroyer Mar 13, 2026
3e71a73
Formatting
jimleroyer Mar 13, 2026
48b1e43
Removed unused import
jimleroyer Mar 13, 2026
b0a3dd1
Formatting
jimleroyer Mar 13, 2026
4d128e6
Ignore models and twilio related Python type errors via mypy
jimleroyer Mar 13, 2026
f4f30a2
Fixing mypy errors
jimleroyer Mar 13, 2026
7927725
Merge branch 'main' into proto/rcs-provider
jimleroyer Mar 17, 2026
cf4d2ad
Renamed db migrations after merge with main introduced a new conflict…
jimleroyer Mar 17, 2026
db3c2eb
DB migration renamed happened previously but the migration content wa…
jimleroyer Mar 17, 2026
ade5559
I will get this right eventually..
jimleroyer Mar 17, 2026
5e539ee
Will I ever get this right?
jimleroyer Mar 17, 2026
8895d62
Merge remote-tracking branch 'origin/main' into proto/rcs-provider
jimleroyer Mar 30, 2026
8c60185
Re-updating the poetry lock file after merge with the baseline
jimleroyer Mar 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from app.clients.salesforce.salesforce_client import SalesforceClient
from app.clients.sms.aws_pinpoint import AwsPinpointClient
from app.clients.sms.aws_sns import AwsSnsClient
from app.clients.sms.twilio_rcs import TwilioRcsClient
from app.dbsetup import RoutingSQLAlchemy, enable_sqlalchemy_debug_logging
from app.encryption import CryptoSigner
from app.json_provider import NotifyJSONProvider
Expand All @@ -48,51 +49,74 @@
migrate = Migrate()
marshmallow = Marshmallow()
notify_celery = NotifyCelery()

# Provider clients
clients = Clients()
aws_ses_client = AwsSesClient()
aws_sns_client = AwsSnsClient()
aws_pinpoint_client = AwsPinpointClient()
twilio_rcs_client = TwilioRcsClient()

airtable_client = AirtableClient()
salesforce_client = SalesforceClient()
zendesk_client = ZendeskClient()

# Encryption signers
signer_notification = CryptoSigner()
signer_personalisation = CryptoSigner()
signer_complaint = CryptoSigner()
signer_delivery_status = CryptoSigner()
signer_bearer_token = CryptoSigner()
signer_api_key = CryptoSigner()
signer_inbound_sms = CryptoSigner()
zendesk_client = ZendeskClient()

# Metrics and logging
statsd_client = StatsdClient()
metrics_logger = MetricsLogger()
performance_platform_client = PerformancePlatformClient()

# Cache and Redis clients
flask_redis = FlaskRedis()
flask_cache_ops = FlaskRedis(config_prefix="CACHE_OPS")
redis_store = RedisClient()
bounce_rate_client = RedisBounceRate(redis_store)
annual_limit_client = RedisAnnualLimit(redis_store)
metrics_logger = MetricsLogger()


# Queues
# TODO: Rework instantiation to decouple redis_store.redis_store and pass it in.\
email_queue = RedisQueue("email")
sms_queue = RedisQueue("sms")
performance_platform_client = PerformancePlatformClient()
document_download_client = DocumentDownloadClient()
salesforce_client = SalesforceClient()
airtable_client = AirtableClient()

clients = Clients()

api_user: Any = LocalProxy(lambda: g.api_user)
authenticated_service: Any = LocalProxy(lambda: g.authenticated_service)

sms_bulk = RedisQueue("sms", process_type="bulk")
sms_normal = RedisQueue("sms", process_type="normal")
sms_priority = RedisQueue("sms", process_type="priority")

rcs_bulk = RedisQueue("rcs", process_type="bulk")
rcs_normal = RedisQueue("rcs", process_type="normal")
rcs_priority = RedisQueue("rcs", process_type="priority")

email_bulk = RedisQueue("email", process_type="bulk")
email_normal = RedisQueue("email", process_type="normal")
email_priority = RedisQueue("email", process_type="priority")

sms_bulk_publish = RedisQueue("sms", process_type="bulk")
sms_normal_publish = RedisQueue("sms", process_type="normal")
sms_priority_publish = RedisQueue("sms", process_type="priority")

rcs_bulk_publish = RedisQueue("rcs", process_type="bulk")
rcs_normal_publish = RedisQueue("rcs", process_type="normal")
rcs_priority_publish = RedisQueue("rcs", process_type="priority")

email_bulk_publish = RedisQueue("email", process_type="bulk")
email_normal_publish = RedisQueue("email", process_type="normal")
email_priority_publish = RedisQueue("email", process_type="priority")

# APIs
document_download_client = DocumentDownloadClient()


def create_app(application, config=None):
from app.config import configs
Expand Down Expand Up @@ -120,6 +144,7 @@ def create_app(application, config=None):
aws_sns_client.init_app(application, statsd_client=statsd_client)
aws_pinpoint_client.init_app(application, statsd_client=statsd_client)
aws_ses_client.init_app(application.config["AWS_REGION"], statsd_client=statsd_client)
twilio_rcs_client.init_app(application, statsd_client=statsd_client)
notify_celery.init_app(application)
NewsletterSubscriber.init_app(application)
LatestNewsletterTemplate.init_app(application)
Expand All @@ -135,7 +160,9 @@ def create_app(application, config=None):
performance_platform_client.init_app(application)
document_download_client.init_app(application)
airtable_client.init_app(application)
clients.init_app(sms_clients=[aws_sns_client, aws_pinpoint_client], email_clients=[aws_ses_client])
clients.init_app(
sms_clients=[aws_sns_client, aws_pinpoint_client], email_clients=[aws_ses_client], rcs_clients=[twilio_rcs_client]
)

if application.config["FF_SALESFORCE_CONTACT"]:
salesforce_client.init_app(application)
Expand Down
15 changes: 15 additions & 0 deletions app/celery/nightly_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ def delete_sms_notifications_older_than_retention():
raise


@notify_celery.task(name="delete-rcs-notifications")
@cronitor("delete-rcs-notifications")
@statsd(namespace="tasks")
def delete_rcs_notifications_older_than_retention():
try:
start = datetime.utcnow()
deleted = delete_notifications_older_than_retention_by_type("rcs")
current_app.logger.info(
"Delete {} job started {} finished {} deleted {} rcs notifications".format("rcs", start, datetime.utcnow(), deleted)
)
except SQLAlchemyError:
current_app.logger.exception("Failed to delete rcs notifications")
raise


@notify_celery.task(name="delete-email-notifications")
@cronitor("delete-email-notifications")
@statsd(namespace="tasks")
Expand Down
44 changes: 44 additions & 0 deletions app/celery/provider_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,50 @@ def _deliver_sms(self, notification_id):
raise NotificationTechnicalFailureException(message)


@notify_celery.task(
bind=True,
name="deliver_rcs",
max_retries=48,
default_retry_delay=300,
rate_limit=Config.CELERY_DELIVER_RCS_RATE_LIMIT,
)
@statsd(namespace="tasks")
def deliver_rcs(self, notification_id):
try:
current_app.logger.info("Start sending RCS for notification id: {}".format(notification_id))
notification = notifications_dao.get_notification_by_id(notification_id)
if not notification:
raise NoResultFound()
send_to_providers.send_rcs_to_provider(notification)
except InvalidUrlException:
current_app.logger.error(f"Cannot send notification {notification_id}, got an invalid direct file url.")
update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE)
_check_and_queue_callback_task(notification)
# REVIEW: Catch Twilio related exceptions instead of the AWS related ones.
except (PinpointConflictException, PinpointValidationException) as e:
# As this is due to Pinpoint errors, we are NOT retrying the notification
# We are only warning on the error, and not logging an error
current_app.logger.warning("RCS delivery failed for notification_id {} Pinpoint error: {}".format(notification.id, e))
# PinpointConflictException reasons: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/pinpoint-sms-voice-v2/client/exceptions/ConflictException.html
# PinpointValidationException reasons: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/pinpoint-sms-voice-v2/client/exceptions/ValidationException.html
update_notification_status_by_id(
notification_id, NOTIFICATION_PROVIDER_FAILURE, feedback_reason=e.original_exception.response.get("Reason", "")
)
_check_and_queue_callback_task(notification)
except Exception:
try:
current_app.logger.exception("RCS notification delivery for id: {} failed".format(notification_id))
self.retry(**CeleryParams.retry(None if notification is None else notification.template.process_type))
except self.MaxRetriesExceededError:
message = (
"RETRY FAILED: Max retries reached. The task send_rcs_to_provider failed for notification {}. "
"Notification has been updated to technical-failure".format(notification_id)
)
update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE)
_check_and_queue_callback_task(notification)
raise NotificationTechnicalFailureException(message)


def _handle_error_with_email_retry(
task: Task, e: Exception, notification_id: int, notification: Optional[Notification], countdown: Optional[None] = None
):
Expand Down
61 changes: 61 additions & 0 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
email_normal,
email_priority,
notify_celery,
rcs_bulk,
rcs_normal,
rcs_priority,
sms_bulk,
sms_normal,
sms_priority,
Expand All @@ -20,6 +23,7 @@
job_complete,
process_job,
save_emails,
save_rcss,
save_smss,
update_in_progress_jobs,
)
Expand Down Expand Up @@ -277,6 +281,9 @@ def recover_expired_notifications():
email_bulk.expire_inflights()
email_normal.expire_inflights()
email_priority.expire_inflights()
rcs_bulk.expire_inflights()
rcs_normal.expire_inflights()
rcs_priority.expire_inflights()


@notify_celery.task(name="beat-inbox-email-normal")
Expand Down Expand Up @@ -385,3 +392,57 @@ def beat_inbox_sms_priority():
save_smss.apply_async((None, list_of_sms_notifications, receipt_id_sms), queue=QueueNames.PRIORITY_DATABASE)
current_app.logger.info(f"Batch saving with Priority: SMS receipt {receipt_id_sms} sent to in-flight.")
receipt_id_sms, list_of_sms_notifications = sms_priority.poll()


@notify_celery.task(name="beat-inbox-rcs-normal")
@statsd(namespace="tasks")
def beat_inbox_rcs_normal():
"""
The function acts as a beat schedule to a list of notifications in the queue.
The post_api will push all the notifications of normal priority into the above list.
The heartbeat with check the list (list#1) until it is non-emtpy and move the notifications in a batch
to another list(list#2). The heartbeat will then call a job that saves list#2 to the DB
and actually sends the sms for each notification saved.
"""
receipt_id_rcs, list_of_rcs_notifications = rcs_normal.poll()

while list_of_rcs_notifications:
save_rcss.apply_async((None, list_of_rcs_notifications, receipt_id_rcs), queue=QueueNames.NORMAL_DATABASE)
current_app.logger.info(f"Batch saving with Normal Priority: RCS receipt {receipt_id_rcs} sent to in-flight.")
receipt_id_rcs, list_of_rcs_notifications = rcs_normal.poll()


@notify_celery.task(name="beat-inbox-rcs-bulk")
@statsd(namespace="tasks")
def beat_inbox_rcs_bulk():
"""
The function acts as a beat schedule to a list of notifications in the queue.
The post_api will push all the notifications of bulk priority into the above list.
The heartbeat with check the list (list#1) until it is non-emtpy and move the notifications in a batch
to another list(list#2). The heartbeat will then call a job that saves list#2 to the DB
and actually sends the sms for each notification saved.
"""
receipt_id_rcs, list_of_rcs_notifications = rcs_bulk.poll()

while list_of_rcs_notifications:
save_rcss.apply_async((None, list_of_rcs_notifications, receipt_id_rcs), queue=QueueNames.BULK_DATABASE)
current_app.logger.info(f"Batch saving with Bulk Priority: RCS receipt {receipt_id_rcs} sent to in-flight.")
receipt_id_rcs, list_of_rcs_notifications = rcs_bulk.poll()


@notify_celery.task(name="beat-inbox-rcs-priority")
@statsd(namespace="tasks")
def beat_inbox_rcs_priority():
"""
The function acts as a beat schedule to a list of notifications in the queue.
The post_api will push all the notifications of priority into the above list.
The heartbeat with check the list (list#1) until it is non-emtpy and move the notifications in a batch
to another list(list#2). The heartbeat will then call a job that saves list#2 to the DB
and actually sends the sms for each notification saved.
"""
receipt_id_rcs, list_of_rcs_notifications = rcs_priority.poll()

while list_of_rcs_notifications:
save_rcss.apply_async((None, list_of_rcs_notifications, receipt_id_rcs), queue=QueueNames.PRIORITY_DATABASE)
current_app.logger.info(f"Batch saving with Priority: RCS receipt {receipt_id_rcs} sent to in-flight.")
receipt_id_rcs, list_of_rcs_notifications = rcs_priority.poll()
Loading
Loading