Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 51 additions & 28 deletions osf/management/commands/migrate_notifications_verification.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,85 @@
import time

from django.core.management.base import BaseCommand
from django.db.models import Count
from django.db import connection

from osf.models import NotificationSubscription, NotificationSubscriptionLegacy


class Command(BaseCommand):
help = 'Verify notification migration integrity (duplicates, invalid frequencies, counts, distribution)'
'''
"""
Usage example:
python manage.py migrate_notifications_verification
python manage.py migrate_notifications_verification --duplicates --counts
'''
python manage.py migrate_notifications_verification --duplicates --distribution
python manage.py migrate_notifications_verification --duplicates --unique-digest --output-size=100
"""

help = 'Verify notification migration integrity (duplicates, invalid frequencies, counts and distribution)'

def add_arguments(self, parser):
parser.add_argument('--all', action='store_true', default=False, help='Run all checks')
parser.add_argument('--duplicates', action='store_true', help='Check for duplicate NotificationSubscription entries')
parser.add_argument('--frequencies', action='store_true', help='Check message_frequency values for invalid ones')
parser.add_argument('--counts', action='store_true', help='Compare legacy M2M total with migrated count')
parser.add_argument('--distribution', action='store_true', help='Print breakdown summary')
parser.add_argument('--all', action='store_true', help='Run all checks')
parser.add_argument('--exclude-is-digest', action='store_true', default=False, help='Used along with --duplicates to exclude _is_digest field in unique_together')
parser.add_argument('--output-size', type=int, default=10, help='Used along with other options to set the number of found duplicates for output')

def handle(self, *args, **options):

start = time.time()
flags = {k for k, v in options.items() if v and k in ['duplicates', 'frequencies', 'counts', 'distribution']}
run_all = options['all']
output_size = options['output_size']

run_all = options['all'] or not flags
print('\n================ Notification Migration Verification ================\n')

if not run_all and not flags:
print('\n⚠ No options selected, command will exit ... \n')

# 1. Detect duplicates
if run_all or 'duplicates' in flags:
# 1. Detect duplicates
print('1) Checking duplicate NotificationSubscription entries...')
duplicates = (
NotificationSubscription.objects.values(
'user_id', 'content_type_id', 'object_id', 'notification_type_id'
action_word = 'excludes' if options['exclude_is_digest'] else 'includes'
print(f'1) Checking duplicate NotificationSubscription entries (unique_together {action_word} _is_digest)...')
if options['exclude_is_digest']:
duplicates = (
NotificationSubscription.objects.values(
'user_id', 'content_type_id', 'object_id', 'notification_type_id',
)
.annotate(count=Count('id'))
.filter(count__gt=1)
)
.annotate(count=Count('id'))
.filter(count__gt=1)
)
print(f" → Duplicates found: {duplicates.count()}")
else:
duplicates = (
NotificationSubscription.objects.values(
'user_id', 'content_type_id', 'object_id', 'notification_type_id', '_is_digest',
)
.annotate(count=Count('id'))
.filter(count__gt=1)
)
print(f' → Duplicates found: {duplicates.count()}.')
if duplicates.exists():
print(' Sample (up to 10):')
for d in duplicates[:10]:
print(f' Sample (up to {output_size}):')
for d in duplicates.order_by('-count')[:output_size]:
print(' ', d)
print(' ✔ OK' if not duplicates.exists() else ' ⚠ Needs review')

# 2. Invalid frequencies
if run_all or 'frequencies' in flags:
# 2. Invalid frequencies
print('\n2) Checking invalid message_frequency values...')
valid = {'none', 'daily', 'instantly'}
invalid_freq = NotificationSubscription.objects.exclude(message_frequency__in=valid)

print(f" → Invalid frequency rows: {invalid_freq.count()}")
print(f' → Invalid frequency rows: {invalid_freq.count()}')
if invalid_freq.exists():
print(' Sample (id, freq):')
for row in invalid_freq[:10]:
print(f" {row.id} → {row.message_frequency}")
for row in invalid_freq[:output_size]:
print(f' {row.id} → {row.message_frequency}')
print(' ✔ OK' if not invalid_freq.exists() else ' ⚠ Needs cleanup')

# 3. Compare legacy frequency-based totals vs new subscription count
if run_all or 'counts' in flags:
# 3. Compare legacy frequency-based totals vs new subscription count
print('\n3) Validating total count migrated...')
valid_subscription_ids = NotificationSubscriptionLegacy.objects.filter(event_name__in=['global_reviews', 'global_file_updated', 'file_updated']).values_list('id', flat=True)
with connection.cursor() as cursor:
Expand All @@ -72,26 +93,28 @@ def handle(self, *args, **options):
legacy_total_expanded = none_count + digest_count + transactional_count
new_total = NotificationSubscription.objects.count()

print(f" Legacy M2M total: {legacy_total_expanded}")
print(f" New subscriptions: {new_total}")
print(f' Legacy M2M total: {legacy_total_expanded}')
print(f' New subscriptions: {new_total}')

if legacy_total_expanded == new_total:
print(' ✔ Counts match')
else:
diff = new_total - legacy_total_expanded
print(f" ⚠ Mismatch: difference = {diff} (possibly skipped or duplicates removed)")
print(f' ⚠ Mismatch: difference = {diff} (possibly skipped, duplicates removed or newly created)')

print(' ⚠ Note: this is accurate only right after migration and before any new subscriptions are created.)')

if run_all or 'distribution' in flags:
# 4. Distribution summary
print('\n4) Subscription distribution breakdown (top 30):\n')
print(f'\n4) Subscription distribution breakdown (top {output_size}):\n')
dist = (
NotificationSubscription.objects
.values('notification_type_id', 'message_frequency')
.annotate(total=Count('id'))
.order_by('-total')[:30]
.order_by('-total')[:output_size]
)
for row in dist:
print(' ', row)

elapsed = time.time() - start
print(f"\n================ Verification complete in {elapsed:.2f}s ================\n")
print(f'\n================ Verification complete in {elapsed:.2f}s ================\n')
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

class Command(BaseCommand):
help = (
'Remove duplicate NotificationSubscription records, keeping only '
'the highest-id record per (user, content_type, object_id, notification_type).'
'Remove duplicate NotificationSubscription records, keeping only the highest-id record: '
'Default uniqueness: (user, content_type, object_id, notification_type, is_digest); '
'Optional uniqueness with --exclude-is-digest: (user, content_type, object_id, notification_type).'
)

def add_arguments(self, parser):
Expand All @@ -17,39 +18,53 @@ def add_arguments(self, parser):
action='store_true',
help='Show how many rows would be deleted without deleting anything.',
)
parser.add_argument(
'--exclude-is-digest',
action='store_true',
default=False,
help='Whether to exclude _is_digest field in unique_together')

def handle(self, *args, **options):

self.stdout.write('Finding duplicate NotificationSubscription records…')

to_remove = NotificationSubscription.objects.filter(
Exists(
NotificationSubscription.objects.filter(
user_id=OuterRef('user_id'),
content_type_id=OuterRef('content_type_id'),
object_id=OuterRef('object_id'),
notification_type_id=OuterRef('notification_type_id'),
_is_digest=OuterRef('_is_digest'),
id__gt=OuterRef('id'), # keep most recent record
if options['exclude_is_digest']:
to_remove = NotificationSubscription.objects.filter(
Exists(
NotificationSubscription.objects.filter(
user_id=OuterRef('user_id'),
content_type_id=OuterRef('content_type_id'),
object_id=OuterRef('object_id'),
notification_type_id=OuterRef('notification_type_id'),
id__gt=OuterRef('id'), # keep most recent record
)
)
)
else:
to_remove = NotificationSubscription.objects.filter(
Exists(
NotificationSubscription.objects.filter(
user_id=OuterRef('user_id'),
content_type_id=OuterRef('content_type_id'),
object_id=OuterRef('object_id'),
notification_type_id=OuterRef('notification_type_id'),
_is_digest=OuterRef('_is_digest'),
id__gt=OuterRef('id'), # keep most recent record
)
)
)
)

count = to_remove.count()
self.stdout.write(f"Duplicates to remove: {count}")

if options['dry']:
self.stdout.write(
self.style.WARNING('Dry run enabled — no records were deleted.')
)
return

if count == 0:
self.stdout.write(self.style.SUCCESS('No duplicates found.'))
return

with transaction.atomic():
deleted, _ = to_remove.delete()
if options['dry']:
self.stdout.write(self.style.WARNING('Dry run enabled — no records were deleted.'))
return

self.stdout.write(
self.style.SUCCESS(f"Successfully removed {deleted} duplicate records.")
)
if count > 0:
with transaction.atomic():
deleted, _ = to_remove.delete()
self.stdout.write(self.style.SUCCESS(f"Successfully removed {deleted} duplicate records."))
Loading