From 70119d43727c362974e230038701321027c44f6c Mon Sep 17 00:00:00 2001 From: Saksham Date: Mon, 2 Feb 2026 17:01:39 +0100 Subject: [PATCH 1/4] feat(comments): Add runner for comments migration separately --- cds_migrator_kit/rdm/cli.py | 34 ++++ cds_migrator_kit/rdm/comments/extract.py | 30 ++++ cds_migrator_kit/rdm/comments/load.py | 200 +++++++++++++++++++++ cds_migrator_kit/rdm/comments/log.py | 42 +++++ cds_migrator_kit/rdm/comments/runner.py | 41 +++++ cds_migrator_kit/rdm/comments/streams.py | 22 +++ scripts/copy_comments_attached_files.py | 42 +++++ scripts/dump_comments_to_migrate.py | 220 +++++++++++++++++++++++ 8 files changed, 631 insertions(+) create mode 100644 cds_migrator_kit/rdm/comments/extract.py create mode 100644 cds_migrator_kit/rdm/comments/load.py create mode 100644 cds_migrator_kit/rdm/comments/log.py create mode 100644 cds_migrator_kit/rdm/comments/runner.py create mode 100644 cds_migrator_kit/rdm/comments/streams.py create mode 100644 scripts/copy_comments_attached_files.py create mode 100644 scripts/dump_comments_to_migrate.py diff --git a/cds_migrator_kit/rdm/cli.py b/cds_migrator_kit/rdm/cli.py index cf429d47..dc295847 100644 --- a/cds_migrator_kit/rdm/cli.py +++ b/cds_migrator_kit/rdm/cli.py @@ -14,6 +14,7 @@ from flask import current_app from flask.cli import with_appcontext +from cds_migrator_kit.rdm.comments.runner import CommentsRunner from cds_migrator_kit.rdm.affiliations.runner import RecordAffiliationsRunner from cds_migrator_kit.rdm.affiliations.streams import AffiliationsStreamDefinition from cds_migrator_kit.rdm.records.streams import ( # UserStreamDefinition, @@ -242,3 +243,36 @@ def dump(slug, title, filepath): with open(filepath, "w") as fp: yaml.safe_dump(streams, fp, default_flow_style=False, sort_keys=False) + +@migration.group() +def comments(): + """Migration CLI for comments.""" + pass + + +@comments.command() +@click.option( + "--dry-run", + is_flag=True, +) +@click.option( + "--filepath", + help="Path to the comments metadata json file.", + required=True, +) +@click.option( + "--dirpath", + help="Path to the record-wise comments directory containing attached files.", + required=True, +) +@with_appcontext +def comments_run(filepath, dirpath, dry_run=False): + """Migrate the comments for the records in `filepath`.""" + log_dir = Path(current_app.config["CDS_MIGRATOR_KIT_LOGS_PATH"]) / "comments" + runner = CommentsRunner( + filepath=filepath, + dirpath=dirpath, + log_dir=log_dir, + dry_run=dry_run, + ) + runner.run() diff --git a/cds_migrator_kit/rdm/comments/extract.py b/cds_migrator_kit/rdm/comments/extract.py new file mode 100644 index 00000000..dc3c1b99 --- /dev/null +++ b/cds_migrator_kit/rdm/comments/extract.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2026 CERN. +# +# CDS-Migrator-Kit is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-Migrator-Kit comments extract module.""" + +import json +from pathlib import Path + +import click +from invenio_rdm_migrator.extract import Extract + + +class LegacyCommentsExtract(Extract): + """LegacyCommentsExtract.""" + + def __init__(self, filepath, **kwargs): + """Constructor.""" + self.filepath = Path(filepath).absolute() + + def run(self): + """Run.""" + with open(self.filepath, "r") as dump_file: + data = json.load(dump_file) + with click.progressbar(data.items(), label="Processing comments") as metadata: + for recid, comments in metadata: + yield (recid, comments) diff --git a/cds_migrator_kit/rdm/comments/load.py b/cds_migrator_kit/rdm/comments/load.py new file mode 100644 index 00000000..db62d81e --- /dev/null +++ b/cds_migrator_kit/rdm/comments/load.py @@ -0,0 +1,200 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# +# CDS-RDM is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-RDM migration load module.""" + +from flask import current_app + +from invenio_rdm_migrator.load.base import Load + +from cds_migrator_kit.rdm.comments.log import CommentsLogger +from cds_rdm.legacy.resolver import get_pid_by_legacy_recid +from invenio_access.permissions import system_identity +from invenio_requests.proxies import current_requests_service +from invenio_rdm_records.proxies import current_rdm_records_service +from invenio_rdm_records.records.api import RDMParent +from invenio_requests.customizations.event_types import CommentEventType +from invenio_requests.proxies import current_events_service +from invenio_requests.resolvers.registry import ResolverRegistry +from invenio_accounts.models import User +from invenio_db import db +from datetime import datetime + +from invenio_requests.customizations.request_types import CommunitySubmission +from invenio_requests.customizations.event_types import LogEventType + +logger = CommentsLogger.get_logger() + + +class CDSCommentsLoad(Load): + """CDSCommentsLoad.""" + + LEGACY_REPLY_LINK_MAP = {} + """Map of legacy reply ids to RDM comment ids.""" + + def __init__( + self, + config, + less_than_date, + dry_run=False, + ): + """Constructor.""" + self.config = config + self.less_than_date = less_than_date + self.dry_run = dry_run + + def get_oldest_record(self, parent_pid_value): + latest_record = current_rdm_records_service.read_latest( + identity=system_identity, id_=parent_pid_value + ) + search_result = current_rdm_records_service.scan_versions( + identity=system_identity, + id_=latest_record["id"], + ) + record_versions = {hit["versions"]["index"]: hit for hit in search_result} + oldest_version = min(record_versions.keys()) + return record_versions[oldest_version] + + + def create_event(self, request, data, community, record, parent_comment_id=None): + if not parent_comment_id: + print("Creating event for record: ", record['id'], "request: ", request.id, "comment ID: ", data.get("comment_id")) + else: + print("Creating reply event for record: ", record['id'], "request: ", request.id, "comment ID: ", data.get("comment_id"), "parent comment ID: ", parent_comment_id) + # TODO: Only add commment if the version id matches. To be finalized in discussion + # if data.get("version") != record['versions']['index']: + # return + + # Create comment event + comment_payload = { + "payload": { + "content": data.get("content"), + "format": "html", + } + } + + comment_status = data.get("status") + if comment_status == "da": + comment_payload["payload"].update({"content": "comment was deleted by the author.", "event_type": "comment_deleted"}) + elif comment_status == "dm": + comment_payload["payload"].update({"content": "comment was deleted by the moderator.", "event_type": "comment_deleted"}) + + event = current_events_service.record_cls.create({}, request=request.model, request_id=str(request.id), type=CommentEventType) + + if parent_comment_id: + # If it's a reply, 1. set parent comment id, 2. and if a nested reply, in the content add mentioned reply event's deep link + event.parent_id = str(parent_comment_id) + mentioned_event_id = self.LEGACY_REPLY_LINK_MAP.get(data.get("reply_to_id"), None) + if mentioned_event_id: + deep_link = f"

Link to the reply

" + comment_payload["payload"]["content"] = deep_link + "\n" + comment_payload["payload"]["content"] + + # TODO: Add attached files to the event + for attached_file in data.get("attached_files", []): + print("TODO: Add attached files to the event: ", attached_file) + # if data.get("attached_files"): + # comment_payload["payload"]["files"] = data.get("attached_files") + + event.update(comment_payload) + + user = User.query.filter_by(email=data.get("created_by")).one_or_none() + # TODO: It shouldn't be not found, if so, raise manual migration error? + if user: + event.created_by = ResolverRegistry.resolve_entity_proxy({"user": str(user.id)}, raise_=True) + else: + print("User not found for email: ", data.get("created_by")) + event.created_by = ResolverRegistry.resolve_entity_proxy({"user": "system"}, raise_=True) + + event.model.created = data.get("created_at") # Not changing the updated at because of the context of migration + event.model.version_id = 0 + + event.commit() + db.session.commit() + + current_events_service.indexer.index(event) + return event + + + def create_accepted_community_inclusion_request( + self, + record, + community, + creator_user_id, + comments=None, # TODO: Should an accepted request be created if there are no comments? + ): + # Resolve entities for references + creator_ref = ResolverRegistry.reference_entity({"user": str(creator_user_id)}, raise_=True) + receiver_ref = ResolverRegistry.reference_entity({"community": str(community.id)}, raise_=True) + topic_ref = ResolverRegistry.reference_entity({"record": record['id']}, raise_=True) + + request_item = current_requests_service.create( + system_identity, + data={ + "title": record['metadata']["title"], + }, + request_type=CommunitySubmission, + receiver=receiver_ref, + creator=creator_ref, + topic=topic_ref, + uow=None, + ) + request = request_item._record + request.status = "accepted" + created_at = datetime.fromisoformat(record['created']) + request.model.created = created_at + # request.model.updated = created_at # Not changing the updated to keep the context of migration + + request.commit() + db.session.commit() + + current_requests_service.indexer.index(request) + + for comment_data in comments: + comment_event = self.create_event(request, comment_data, community, record) + for reply in comment_data.get("replies", []): + reply_event = self.create_event(request, reply, community, record, comment_event.id) + self.LEGACY_REPLY_LINK_MAP[reply.get("comment_id")] = reply_event.id + + # Add accepted log action + event = LogEventType(payload=dict(event="accepted")) + _data = dict(payload=event.payload) + log_event = current_events_service.create( + system_identity, request.id, _data, event, uow=None + ) + # TODO: What should be the created for the log event, record's or last comment's? To be finalized in discussion + log_event._record.model.created = created_at + + log_event._record.commit() + db.session.commit() + + current_events_service.indexer.index(log_event._record) + + return request + + def _process_legacy_comments_for_recid(self, recid, comments): + """Process the legacy comments for the record.""" + logger.info(f"Processing legacy comments for recid: {recid}") + parent_pid = get_pid_by_legacy_recid(recid) + oldest_record = self.get_oldest_record(parent_pid.pid_value) + parent = RDMParent.pid.resolve(parent_pid.pid_value) + community = parent.communities.default + record_owner_id = parent.access.owned_by.owner_id + request = self.create_accepted_community_inclusion_request(oldest_record, community, record_owner_id, comments) + return request + + def _load(self, entry): + """Use the services to load the entries.""" + if entry: + recid, comments = entry + try: + self._process_legacy_comments_for_recid(recid, comments) + except Exception as ex: + logger.error(ex) + + def _cleanup(self, *args, **kwargs): + """Cleanup the entries.""" + pass diff --git a/cds_migrator_kit/rdm/comments/log.py b/cds_migrator_kit/rdm/comments/log.py new file mode 100644 index 00000000..6c6b2269 --- /dev/null +++ b/cds_migrator_kit/rdm/comments/log.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2026 CERN. +# +# CDS-Migrator-Kit is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-Migrator-Kit comments logger module.""" + +import logging + + +class CommentsLogger: + """Migrator comments logger.""" + + @classmethod + def initialize(cls, log_dir): + """Constructor.""" + formatter = logging.Formatter( + fmt="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + logger = logging.getLogger("comments-migrator") + fh = logging.FileHandler(log_dir / "info.log") + logger.setLevel(logging.WARNING) + logger.addHandler(fh) + + # errors to file + fh = logging.FileHandler(log_dir / "error.log") + fh.setLevel(logging.ERROR) + fh.setFormatter(formatter) + logger.addHandler(fh) + + # info to stream/stdout + sh = logging.StreamHandler() + sh.setFormatter(formatter) + sh.setLevel(logging.INFO) + logger.addHandler(sh) + + @classmethod + def get_logger(cls): + """Get migration logger.""" + return logging.getLogger("comments-migrator") diff --git a/cds_migrator_kit/rdm/comments/runner.py b/cds_migrator_kit/rdm/comments/runner.py new file mode 100644 index 00000000..b88bf230 --- /dev/null +++ b/cds_migrator_kit/rdm/comments/runner.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2026 CERN. +# +# CDS-Migrator-Kit is free software; you can redistribute it and/or modify +# it under the terms of the MIT License; see LICENSE file for more details. + +"""CDS-Migrator-Kit comments runner module.""" + +from pathlib import Path + +from invenio_rdm_migrator.streams import Stream + +from cds_migrator_kit.rdm.comments.log import CommentsLogger + + +class CommentsRunner: + """ETL streams runner.""" + + def __init__(self, stream_definition, filepath, log_dir, dry_run): + """Constructor.""" + self.log_dir = Path(log_dir) + self.log_dir.mkdir(parents=True, exist_ok=True) + + CommentsLogger.initialize(self.log_dir) + + self.stream = Stream( + stream_definition.name, + extract=stream_definition.extract_cls(filepath), + transform=stream_definition.transform_cls(), + load=stream_definition.load_cls(dry_run=dry_run), + ) + + def run(self): + """Run comments ETL stream.""" + try: + self.stream.run() + except Exception as e: + CommentsLogger.get_logger().exception( + f"Stream {self.stream.name} failed.", exc_info=1 + ) diff --git a/cds_migrator_kit/rdm/comments/streams.py b/cds_migrator_kit/rdm/comments/streams.py new file mode 100644 index 00000000..e31678b3 --- /dev/null +++ b/cds_migrator_kit/rdm/comments/streams.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2026 CERN. +# +# CDS-Migrator-Kit is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-Migrator-Kit comments streams module.""" +from invenio_rdm_migrator.streams import StreamDefinition + +from cds_migrator_kit.rdm.comments.extract import LegacyCommentsExtract +from invenio_rdm_migrator.transform import IdentityTransform + +from .load import CDSCommentsLoad + +CommentsStreamDefinition = StreamDefinition( + name="comments", + extract_cls=LegacyCommentsExtract, + transform_cls=IdentityTransform, + load_cls=CDSCommentsLoad, +) +"""ETL stream for CDS to RDM comments.""" diff --git a/scripts/copy_comments_attached_files.py b/scripts/copy_comments_attached_files.py new file mode 100644 index 00000000..0f60c9a7 --- /dev/null +++ b/scripts/copy_comments_attached_files.py @@ -0,0 +1,42 @@ +""" +This script is used to copy the attached files from the old system to the migration folder in the new eos. + +1. Read the attached files from the old system +2. Copy the attached files to the migration folder in the new eos system +3. Save the attached files mapping to a json file for attaching them to the comments in the new system +""" + +# from invenio.webcomment import get_attached_files +import json +import os +import shutil + +collection = "it_meetings" +environment = "sandbox" + +source_prefix = "/opt/cdsweb/var/data/comments" +destination_prefix = "/eos/media/cds/cds-rdm/{0}/migration/{1}/comments".format(environment, collection) +""" +collection_name/ +|-- comments/ + |-- comments_metadata.json + |-- recid/ + |-- comment_id (or reply_comment_id)/ + |-- This is where the attached file is copied to + |-- ... +(We keep the recid folder to avoid confusion with the files folder and in case different comments contain the same file name) +(We keep the comment_id (or reply_comment_id) folder to avoid confusion with the files folder and in case different comments contain the same file name) +""" + +def copy_comments_attached_files(comments_metadata): + for recid in comments_metadata.keys(): + # Copy the whole /comments/{recid} folder to the destination folder + shutil.copytree(os.path.join(source_prefix, recid), os.path.join(destination_prefix, recid)) + print("Copied {} comments directory to {}".format(recid, destination_prefix)) + +# Load the comments metadata to get the recids with comments and the comment IDs +comments_metadata_json_file = os.path.join(destination_prefix, "comments_metadata.json") +with open(comments_metadata_json_file, 'r') as f: + comments_metadata = json.load(f) + +copy_comments_attached_files(comments_metadata) diff --git a/scripts/dump_comments_to_migrate.py b/scripts/dump_comments_to_migrate.py new file mode 100644 index 00000000..9e4eb63f --- /dev/null +++ b/scripts/dump_comments_to_migrate.py @@ -0,0 +1,220 @@ +""" +This script is used to get the comments from the legacy system and save them to a json file. +""" + +""" +1. Find migrated records with comments using https://digital-repositories.web.cern.ch/cds/cds.cern.ch/migration/#verify-if-the-collection-has-commentsdiscussions +2. Extract and sanitize comments (query_retrieve_comments_or_remarks) +3. Map legacy user id to cdsrdm user id +4. Create comments metadata +5. Save comments metadata to json file +""" + +from invenio.dbquery import run_sql +from invenio.search_engine import search_pattern +from invenio.webcomment_dblayer import get_comment_to_bibdoc_relations +import json + +collection_queries=[ + '980__a:CNLISSUE -980:DELETED -980:HIDDEN -980__a:DUMMY', + '980__a:CNLARTICLE -980:DELETED -980:HIDDEN -980__a:DUMMY', + '710__5:CN OR 710__5:DD OR 710__5:IT OR 710__5:AS OR 710__5:STS 980:ARTICLE -980:DELETED -980:HIDDEN -980__a:DUMMY -980:INTNOTECMSPUBL', + '710__5:CN OR 710__5:DD OR 710__5:IT OR 710__5:AS OR 710__5:STS 980:PREPRINT -980:DELETED -980:HIDDEN -980__a:DUMMY -980:INTNOTECMSPUBL', + '980:INTNOTEITPUBL or 980:INTNOTEASPUBL OR 980:INTNOTEMIPUBL AND 690C:INTNOTE -980:DELETED -980:HIDDEN -980__a:DUMMY -980:INTNOTECMSPUBL', + '710__5:IT 980:PUBLARDA -980:DELETED -980:HIDDEN -980__a:DUMMY', + '980:REPORT AND 710__5:IT -980:DELETED -980:HIDDEN -980__a:DUMMY', + '980:PERI AND 710__5:IT -980:DELETED -980:HIDDEN -980__a:DUMMY', + '980:BROCHURE AND 690C:CERNITBROCHURE -980:DELETED -980:HIDDEN -980__a:DUMMY', + '980:POSTER AND 710__5:IT -980:DELETED -980:HIDDEN -980__a:DUMMY', + '980:ITCERNTALK -980:DELETED -980:HIDDEN -980__a:DUMMY', + '980:PERI AND 650:"Computing and Computers" -980:DELETED -980:HIDDEN -980__a:DUMMY', + '980:ITUDSPUBSOURCEARCHIVE -980:DELETED -980:HIDDEN -980__a:DUMMY', + '(980__:THESIS OR 980__:Thesis OR 980__:thesis) -980__:DUMMY -980__c:HIDDEN', +] +""" +For thesis: "(980__:THESIS OR 980__:Thesis OR 980__:thesis) -980__:DUMMY -980__c:HIDDEN" +""" +recids_list= [] +for collection_query in collection_queries: + recs = search_pattern(p=collection_query) + recids_list.extend(recs) + +records_with_comments = run_sql( + "SELECT id_bibrec " + "FROM cmtRECORDCOMMENT " + "WHERE id_bibrec IN ({}) " + "GROUP BY id_bibrec " + "HAVING COUNT(*) > 0".format(",".join([str(recid) for recid in recids_list])), +) + +recids_with_comments = [record[0] for record in records_with_comments] + +def get_comments_from_legacy(recid, display_order='cmt.date_creation ASC'): + """ + # from invenio.webcomment import query_retrieve_comments_or_remarks + """ + query = """SELECT user.nickname, + user.email, + user.note, + user.last_login, + cmt.id_user, + DATE_FORMAT(cmt.date_creation, '%%%%Y-%%%%m-%%%%d %%%%H:%%%%i:%%%%s') as created, + cmt.body, + cmt.status, + cmt.nb_abuse_reports, + cmt.id, + cmt.round_name, + cmt.restriction, + %(reply_to_column)s as reply_to_id, + cmt.body_format + FROM cmtRECORDCOMMENT cmt LEFT JOIN user ON + user.id=cmt.id_user + WHERE cmt.id_bibrec=%%s + ORDER BY %(display_order)s + """ % { + 'display_order': display_order, + 'reply_to_column': recid > 0 and 'cmt.in_reply_to_id_cmtRECORDCOMMENT' or 'cmt.in_reply_to_id_bskRECORDCOMMENT'} + params = (recid,) + res = run_sql(query, params, with_dict=True) + return res + +# Function to flatten arbitrarily nested comment replies into a 1-level replies list +def flatten_replies(comments_list): + """ + Takes a list of comments (dicts with at least id, reply_to_id), and + returns a list of top-level comments each with direct and indirect (flattened) + replies under a single 'replies' key (1-level nesting). + Each comment dict may have its own 'replies' key (list). + """ + # Build maps for quick lookup + comments_by_id = {c['id']: dict(c, replies=[]) for c in comments_list} + top_level_comments = [] + + for c in comments_list: + parent = c.get('reply_to_id') + if parent is None or parent not in comments_by_id: + # This is a top-level comment + top_level_comments.append(comments_by_id[c['id']]) + else: + # This is a reply; add to parent's replies + comments_by_id[parent]['replies'].append(comments_by_id[c['id']]) + + def collect_all_replies(comment): + """Recursively flattens all indirect replies under this comment. """ + flat = [] + queue = list(comment['replies']) + while queue: + reply = queue.pop(0) + flat.append(reply) + # Add all replies to the end of the queue + queue.extend(reply['replies']) + # Remove their nested replies again for full flatten + for r in flat: + r['replies'] = [] + # Sort the replies by creation date + flat.sort(key=lambda x: x['created']) + comment['replies'] = flat + + for comment in top_level_comments: + collect_all_replies(comment) + + return top_level_comments + +comments_metadata = {} +""" +{ + recid: [ # TODO: Can be nested with versions if multiple versions are present (to be finalized in discussion) + { + "comment_id": comment_id, + "content": content, + "status": status, + "user_email": user_email, + "created_at": created_at, + 'attached_files': [{"id": file_id, "path": file_path, "link": file_link}], + "replies": [ + { + "comment_id": reply_comment_id, + "content": reply_comment_content, + "status": reply_comment_status, + "user_email": reply_comment_user_email, + "created_at": reply_comment_created_at, + 'attached_files': [{"id": file_id, "path": file_path, "link": file_link}], + 'reply_to_id': id_of_the_comment_replied_to, + } + ] + } + ] +} +""" + +users_metadata = {} +""" +{ + user_id: (user_id, user_email, user_nickname, user_note, user_last_login), + ..., +} +""" + +for i, recid in enumerate(recids_with_comments): + print("({}/{}) Processing comment for record {} -".format(i+1, len(recids_with_comments), recid)) + comments = get_comments_from_legacy(recid) + + if not comments: + print("No comments found for record {}. Skipping...".format(recid)) + continue + + # Check if the comments list have atleast 1 comment with 'ok' status or 'approved' status to avoid migrating records with spam comments that are already deleted + # For eg.: https://cds.cern.ch/record/1367848/comments?ln=en + if not any(comment['status'] in ['ok', 'ap'] for comment in comments): + print("No comments with atleast one 'ok'/'ap' status found for record {}. Skipping...".format(recid)) + continue + + print("Found {} comments for record {} ...".format(len(comments), recid)) + comments_metadata[recid] = [] + + comments_to_file_relations = get_comment_to_bibdoc_relations(recid) + comment_to_version_relations = {} + for relation in comments_to_file_relations: + comment_to_version_relations[relation['id_comment']] = relation['version'] # TODO: Not used for now, but can be used in the future to map the comment to the version + print("Found {} comments to file relations for record {} ...".format(len(comments_to_file_relations), recid)) + + for comment in comments: + users_metadata[comment['id_user']] = (comment['id_user'], comment['email'], comment['nickname'], comment['note'], comment['last_login']) + + # Flatten the reply comments + flattened_comments = flatten_replies(comments) + # Sanitize the comment metadata for RDM + for comment in flattened_comments: + comment_data = { + "comment_id": comment['id'], + "content": comment['body'].replace('\xc2\xa0', ' ').replace('\n', '').strip(), + "status": comment.get('status'), + "user_email": comment.get('email'), + "created_at": comment.get('created'), + "replies": [ + { + "comment_id": reply['id'], + "content": reply['body'].replace('\xc2\xa0', ' ').replace('\n', '').strip(), + "status": reply.get('status'), + "user_email": reply.get('email'), + "created_at": reply.get('created'), + 'reply_to_id': reply.get('reply_to_id'), + } + for reply in comment['replies'] + ] + } + comments_metadata[recid].append(comment_data) + print("({}/{}) Successfully processed comments for record {}!".format(i+1, len(recids_with_comments), recid)) + +with open('comments_metadata.json', 'w') as f: + json.dump(comments_metadata, f) + +with open('users_metadata.json', 'w') as f: + json.dump(users_metadata, f) +""" +This file will be read and then this script (with some tweaks) can be run to find out the missing users in the new system. +https://gitlab.cern.ch/cds-team/production_scripts/-/blob/master/cds-rdm/migration/dump_users.py?ref_type=heads + +TODO: Might not be needed as we migrated the whole users table. To be discussed with someone. +TODO: Might need to also fix the script since it returns missing for users migrated as inactive. +""" From a9b2e142b307527127c05ca67fefcc5f4d1c8460 Mon Sep 17 00:00:00 2001 From: Saksham Date: Wed, 4 Feb 2026 17:32:49 +0100 Subject: [PATCH 2/4] feat(comments): Add link in comment content for linked files --- cds_migrator_kit/rdm/cli.py | 5 +- cds_migrator_kit/rdm/comments/extract.py | 4 +- cds_migrator_kit/rdm/comments/load.py | 189 +++++++++++++++-------- cds_migrator_kit/rdm/comments/streams.py | 2 +- scripts/copy_comments_attached_files.py | 12 +- scripts/dump_comments_to_migrate.py | 185 +++++++++++++--------- 6 files changed, 258 insertions(+), 139 deletions(-) diff --git a/cds_migrator_kit/rdm/cli.py b/cds_migrator_kit/rdm/cli.py index dc295847..b0c5dd66 100644 --- a/cds_migrator_kit/rdm/cli.py +++ b/cds_migrator_kit/rdm/cli.py @@ -14,9 +14,10 @@ from flask import current_app from flask.cli import with_appcontext -from cds_migrator_kit.rdm.comments.runner import CommentsRunner from cds_migrator_kit.rdm.affiliations.runner import RecordAffiliationsRunner from cds_migrator_kit.rdm.affiliations.streams import AffiliationsStreamDefinition +from cds_migrator_kit.rdm.comments.runner import CommentsRunner +from cds_migrator_kit.rdm.comments.streams import CommentsStreamDefinition from cds_migrator_kit.rdm.records.streams import ( # UserStreamDefinition, RecordStreamDefinition, ) @@ -244,6 +245,7 @@ def dump(slug, title, filepath): with open(filepath, "w") as fp: yaml.safe_dump(streams, fp, default_flow_style=False, sort_keys=False) + @migration.group() def comments(): """Migration CLI for comments.""" @@ -270,6 +272,7 @@ def comments_run(filepath, dirpath, dry_run=False): """Migrate the comments for the records in `filepath`.""" log_dir = Path(current_app.config["CDS_MIGRATOR_KIT_LOGS_PATH"]) / "comments" runner = CommentsRunner( + stream_definition=CommentsStreamDefinition, filepath=filepath, dirpath=dirpath, log_dir=log_dir, diff --git a/cds_migrator_kit/rdm/comments/extract.py b/cds_migrator_kit/rdm/comments/extract.py index dc3c1b99..4ae5a24e 100644 --- a/cds_migrator_kit/rdm/comments/extract.py +++ b/cds_migrator_kit/rdm/comments/extract.py @@ -25,6 +25,8 @@ def run(self): """Run.""" with open(self.filepath, "r") as dump_file: data = json.load(dump_file) - with click.progressbar(data.items(), label="Processing comments") as metadata: + with click.progressbar( + data.items(), label="Processing comments" + ) as metadata: for recid, comments in metadata: yield (recid, comments) diff --git a/cds_migrator_kit/rdm/comments/load.py b/cds_migrator_kit/rdm/comments/load.py index db62d81e..91b993b7 100644 --- a/cds_migrator_kit/rdm/comments/load.py +++ b/cds_migrator_kit/rdm/comments/load.py @@ -7,25 +7,24 @@ """CDS-RDM migration load module.""" -from flask import current_app - -from invenio_rdm_migrator.load.base import Load +from datetime import datetime -from cds_migrator_kit.rdm.comments.log import CommentsLogger from cds_rdm.legacy.resolver import get_pid_by_legacy_recid +from flask import current_app, url_for from invenio_access.permissions import system_identity -from invenio_requests.proxies import current_requests_service +from invenio_accounts.models import User +from invenio_db import db +from invenio_rdm_migrator.load.base import Load from invenio_rdm_records.proxies import current_rdm_records_service from invenio_rdm_records.records.api import RDMParent -from invenio_requests.customizations.event_types import CommentEventType -from invenio_requests.proxies import current_events_service +from invenio_rdm_records.requests import CommunitySubmission +from invenio_requests.customizations.event_types import CommentEventType, LogEventType +from invenio_requests.proxies import current_events_service, current_requests_service +from invenio_requests.records.api import RequestEventFormat from invenio_requests.resolvers.registry import ResolverRegistry -from invenio_accounts.models import User -from invenio_db import db -from datetime import datetime -from invenio_requests.customizations.request_types import CommunitySubmission -from invenio_requests.customizations.event_types import LogEventType +from cds_migrator_kit.errors import ManualImportRequired +from cds_migrator_kit.rdm.comments.log import CommentsLogger logger = CommentsLogger.get_logger() @@ -38,14 +37,13 @@ class CDSCommentsLoad(Load): def __init__( self, - config, - less_than_date, + dirpath, dry_run=False, ): """Constructor.""" - self.config = config - self.less_than_date = less_than_date + self.dirpath = dirpath # TODO: To be used later to load the attached files self.dry_run = dry_run + self.all_record_versions = {} def get_oldest_record(self, parent_pid_value): latest_record = current_rdm_records_service.read_latest( @@ -55,61 +53,120 @@ def get_oldest_record(self, parent_pid_value): identity=system_identity, id_=latest_record["id"], ) - record_versions = {hit["versions"]["index"]: hit for hit in search_result} - oldest_version = min(record_versions.keys()) - return record_versions[oldest_version] - + self.all_record_versions = { + str(hit["versions"]["index"]): hit for hit in search_result + } + oldest_version = min( + int(version) for version in self.all_record_versions.keys() + ) + return self.all_record_versions[str(oldest_version)] def create_event(self, request, data, community, record, parent_comment_id=None): if not parent_comment_id: - print("Creating event for record: ", record['id'], "request: ", request.id, "comment ID: ", data.get("comment_id")) + logger.info( + "Creating event for record<{}> request<{}> comment<{}>".format( + record["id"], + request.id, + data.get("comment_id"), + ) + ) else: - print("Creating reply event for record: ", record['id'], "request: ", request.id, "comment ID: ", data.get("comment_id"), "parent comment ID: ", parent_comment_id) - # TODO: Only add commment if the version id matches. To be finalized in discussion - # if data.get("version") != record['versions']['index']: - # return + logger.info( + "Creating reply event for record<{}> request<{}> comment<{}> parent_comment<{}>".format( + record["id"], + request.id, + data.get("comment_id"), + parent_comment_id, + ) + ) # Create comment event comment_payload = { "payload": { "content": data.get("content"), - "format": "html", + "format": RequestEventFormat.HTML.value, } } comment_status = data.get("status") + event_type = CommentEventType if comment_status == "da": - comment_payload["payload"].update({"content": "comment was deleted by the author.", "event_type": "comment_deleted"}) + comment_payload["payload"].update( + { + "content": "comment was deleted by the author.", + "event_type": "comment_deleted", + } + ) + event_type = LogEventType elif comment_status == "dm": - comment_payload["payload"].update({"content": "comment was deleted by the moderator.", "event_type": "comment_deleted"}) + comment_payload["payload"].update( + { + "content": "comment was deleted by the moderator.", + "event_type": "comment_deleted", + } + ) + event_type = LogEventType + + event = current_events_service.record_cls.create( + {}, request=request.model, request_id=str(request.id), type=event_type + ) - event = current_events_service.record_cls.create({}, request=request.model, request_id=str(request.id), type=CommentEventType) + if data.get("file_relation"): + file_relation = data.get("file_relation") + file_id = file_relation.get("file_id") + version = file_relation.get("version") + record_version = self.all_record_versions.get(str(version), None) + if record_version: + record_url = url_for( + "invenio_app_rdm_records.record_detail", + pid_value=record_version["id"], + preview_file=file_id, + ) + version_link = f"

See related record version {version}

" + comment_payload["payload"]["content"] = ( + version_link + "\n" + comment_payload["payload"]["content"] + ) if parent_comment_id: + logger.info( + "Found parent event<{}> for reply event<{}>. Setting parent_id.".format( + event.id, + parent_comment_id, + ) + ) # If it's a reply, 1. set parent comment id, 2. and if a nested reply, in the content add mentioned reply event's deep link event.parent_id = str(parent_comment_id) - mentioned_event_id = self.LEGACY_REPLY_LINK_MAP.get(data.get("reply_to_id"), None) + mentioned_event_id = self.LEGACY_REPLY_LINK_MAP.get( + data.get("reply_to_id"), None + ) if mentioned_event_id: + logger.info( + "Adding deep link to the content for the deeply nested reply event<{}>.".format( + mentioned_event_id, + event.id, + ) + ) deep_link = f"

Link to the reply

" - comment_payload["payload"]["content"] = deep_link + "\n" + comment_payload["payload"]["content"] + comment_payload["payload"]["content"] = ( + deep_link + "\n" + comment_payload["payload"]["content"] + ) # TODO: Add attached files to the event - for attached_file in data.get("attached_files", []): - print("TODO: Add attached files to the event: ", attached_file) - # if data.get("attached_files"): - # comment_payload["payload"]["files"] = data.get("attached_files") + # https://github.com/CERNDocumentServer/cds-migrator-kit/issues/381 event.update(comment_payload) user = User.query.filter_by(email=data.get("created_by")).one_or_none() - # TODO: It shouldn't be not found, if so, raise manual migration error? if user: - event.created_by = ResolverRegistry.resolve_entity_proxy({"user": str(user.id)}, raise_=True) + event.created_by = ResolverRegistry.resolve_entity_proxy( + {"user": str(user.id)}, raise_=True + ) else: print("User not found for email: ", data.get("created_by")) - event.created_by = ResolverRegistry.resolve_entity_proxy({"user": "system"}, raise_=True) - - event.model.created = data.get("created_at") # Not changing the updated at because of the context of migration + raise ManualImportRequired( + f"User not found for email: {data.get('created_by')}" + ) + event.model.created = data.get("created_at") event.model.version_id = 0 event.commit() @@ -118,23 +175,35 @@ def create_event(self, request, data, community, record, parent_comment_id=None) current_events_service.indexer.index(event) return event - def create_accepted_community_inclusion_request( self, record, community, creator_user_id, - comments=None, # TODO: Should an accepted request be created if there are no comments? + comments=None, ): + """Create an accepted community inclusion request.""" + if not comments: + logger.warning( + f"No comments found for record<{record['id']}>. Skipping request creation." + ) + return None + # Resolve entities for references - creator_ref = ResolverRegistry.reference_entity({"user": str(creator_user_id)}, raise_=True) - receiver_ref = ResolverRegistry.reference_entity({"community": str(community.id)}, raise_=True) - topic_ref = ResolverRegistry.reference_entity({"record": record['id']}, raise_=True) + creator_ref = ResolverRegistry.reference_entity( + {"user": str(creator_user_id)}, raise_=True + ) + receiver_ref = ResolverRegistry.reference_entity( + {"community": str(community.id)}, raise_=True + ) + topic_ref = ResolverRegistry.reference_entity( + {"record": record["id"]}, raise_=True + ) request_item = current_requests_service.create( system_identity, data={ - "title": record['metadata']["title"], + "title": record["metadata"]["title"], }, request_type=CommunitySubmission, receiver=receiver_ref, @@ -144,35 +213,25 @@ def create_accepted_community_inclusion_request( ) request = request_item._record request.status = "accepted" - created_at = datetime.fromisoformat(record['created']) + created_at = datetime.fromisoformat(record["created"]) request.model.created = created_at - # request.model.updated = created_at # Not changing the updated to keep the context of migration request.commit() db.session.commit() current_requests_service.indexer.index(request) + logger.info( + f"Created accepted community submission request<{request.id}> for record<{record['id']}>." + ) for comment_data in comments: comment_event = self.create_event(request, comment_data, community, record) for reply in comment_data.get("replies", []): - reply_event = self.create_event(request, reply, community, record, comment_event.id) + reply_event = self.create_event( + request, reply, community, record, comment_event.id + ) self.LEGACY_REPLY_LINK_MAP[reply.get("comment_id")] = reply_event.id - # Add accepted log action - event = LogEventType(payload=dict(event="accepted")) - _data = dict(payload=event.payload) - log_event = current_events_service.create( - system_identity, request.id, _data, event, uow=None - ) - # TODO: What should be the created for the log event, record's or last comment's? To be finalized in discussion - log_event._record.model.created = created_at - - log_event._record.commit() - db.session.commit() - - current_events_service.indexer.index(log_event._record) - return request def _process_legacy_comments_for_recid(self, recid, comments): @@ -183,7 +242,9 @@ def _process_legacy_comments_for_recid(self, recid, comments): parent = RDMParent.pid.resolve(parent_pid.pid_value) community = parent.communities.default record_owner_id = parent.access.owned_by.owner_id - request = self.create_accepted_community_inclusion_request(oldest_record, community, record_owner_id, comments) + request = self.create_accepted_community_inclusion_request( + oldest_record, community, record_owner_id, comments + ) return request def _load(self, entry): diff --git a/cds_migrator_kit/rdm/comments/streams.py b/cds_migrator_kit/rdm/comments/streams.py index e31678b3..242ff4f5 100644 --- a/cds_migrator_kit/rdm/comments/streams.py +++ b/cds_migrator_kit/rdm/comments/streams.py @@ -7,9 +7,9 @@ """CDS-Migrator-Kit comments streams module.""" from invenio_rdm_migrator.streams import StreamDefinition +from invenio_rdm_migrator.transform import IdentityTransform from cds_migrator_kit.rdm.comments.extract import LegacyCommentsExtract -from invenio_rdm_migrator.transform import IdentityTransform from .load import CDSCommentsLoad diff --git a/scripts/copy_comments_attached_files.py b/scripts/copy_comments_attached_files.py index 0f60c9a7..65c21d55 100644 --- a/scripts/copy_comments_attached_files.py +++ b/scripts/copy_comments_attached_files.py @@ -15,7 +15,9 @@ environment = "sandbox" source_prefix = "/opt/cdsweb/var/data/comments" -destination_prefix = "/eos/media/cds/cds-rdm/{0}/migration/{1}/comments".format(environment, collection) +destination_prefix = "/eos/media/cds/cds-rdm/{0}/migration/{1}/comments".format( + environment, collection +) """ collection_name/ |-- comments/ @@ -28,15 +30,19 @@ (We keep the comment_id (or reply_comment_id) folder to avoid confusion with the files folder and in case different comments contain the same file name) """ + def copy_comments_attached_files(comments_metadata): for recid in comments_metadata.keys(): # Copy the whole /comments/{recid} folder to the destination folder - shutil.copytree(os.path.join(source_prefix, recid), os.path.join(destination_prefix, recid)) + shutil.copytree( + os.path.join(source_prefix, recid), os.path.join(destination_prefix, recid) + ) print("Copied {} comments directory to {}".format(recid, destination_prefix)) + # Load the comments metadata to get the recids with comments and the comment IDs comments_metadata_json_file = os.path.join(destination_prefix, "comments_metadata.json") -with open(comments_metadata_json_file, 'r') as f: +with open(comments_metadata_json_file, "r") as f: comments_metadata = json.load(f) copy_comments_attached_files(comments_metadata) diff --git a/scripts/dump_comments_to_migrate.py b/scripts/dump_comments_to_migrate.py index 9e4eb63f..6b5f1aed 100644 --- a/scripts/dump_comments_to_migrate.py +++ b/scripts/dump_comments_to_migrate.py @@ -10,35 +10,42 @@ 5. Save comments metadata to json file """ +import json + from invenio.dbquery import run_sql from invenio.search_engine import search_pattern from invenio.webcomment_dblayer import get_comment_to_bibdoc_relations -import json -collection_queries=[ - '980__a:CNLISSUE -980:DELETED -980:HIDDEN -980__a:DUMMY', - '980__a:CNLARTICLE -980:DELETED -980:HIDDEN -980__a:DUMMY', - '710__5:CN OR 710__5:DD OR 710__5:IT OR 710__5:AS OR 710__5:STS 980:ARTICLE -980:DELETED -980:HIDDEN -980__a:DUMMY -980:INTNOTECMSPUBL', - '710__5:CN OR 710__5:DD OR 710__5:IT OR 710__5:AS OR 710__5:STS 980:PREPRINT -980:DELETED -980:HIDDEN -980__a:DUMMY -980:INTNOTECMSPUBL', - '980:INTNOTEITPUBL or 980:INTNOTEASPUBL OR 980:INTNOTEMIPUBL AND 690C:INTNOTE -980:DELETED -980:HIDDEN -980__a:DUMMY -980:INTNOTECMSPUBL', - '710__5:IT 980:PUBLARDA -980:DELETED -980:HIDDEN -980__a:DUMMY', - '980:REPORT AND 710__5:IT -980:DELETED -980:HIDDEN -980__a:DUMMY', - '980:PERI AND 710__5:IT -980:DELETED -980:HIDDEN -980__a:DUMMY', - '980:BROCHURE AND 690C:CERNITBROCHURE -980:DELETED -980:HIDDEN -980__a:DUMMY', - '980:POSTER AND 710__5:IT -980:DELETED -980:HIDDEN -980__a:DUMMY', - '980:ITCERNTALK -980:DELETED -980:HIDDEN -980__a:DUMMY', +collection_queries = [ + "980__a:CNLISSUE -980:DELETED -980:HIDDEN -980__a:DUMMY", + "980__a:CNLARTICLE -980:DELETED -980:HIDDEN -980__a:DUMMY", + "710__5:CN OR 710__5:DD OR 710__5:IT OR 710__5:AS OR 710__5:STS 980:ARTICLE -980:DELETED -980:HIDDEN -980__a:DUMMY -980:INTNOTECMSPUBL", + "710__5:CN OR 710__5:DD OR 710__5:IT OR 710__5:AS OR 710__5:STS 980:PREPRINT -980:DELETED -980:HIDDEN -980__a:DUMMY -980:INTNOTECMSPUBL", + "980:INTNOTEITPUBL or 980:INTNOTEASPUBL OR 980:INTNOTEMIPUBL AND 690C:INTNOTE -980:DELETED -980:HIDDEN -980__a:DUMMY -980:INTNOTECMSPUBL", + "710__5:IT 980:PUBLARDA -980:DELETED -980:HIDDEN -980__a:DUMMY", + "980:REPORT AND 710__5:IT -980:DELETED -980:HIDDEN -980__a:DUMMY", + "980:PERI AND 710__5:IT -980:DELETED -980:HIDDEN -980__a:DUMMY", + "980:BROCHURE AND 690C:CERNITBROCHURE -980:DELETED -980:HIDDEN -980__a:DUMMY", + "980:POSTER AND 710__5:IT -980:DELETED -980:HIDDEN -980__a:DUMMY", + "980:ITCERNTALK -980:DELETED -980:HIDDEN -980__a:DUMMY", '980:PERI AND 650:"Computing and Computers" -980:DELETED -980:HIDDEN -980__a:DUMMY', - '980:ITUDSPUBSOURCEARCHIVE -980:DELETED -980:HIDDEN -980__a:DUMMY', - '(980__:THESIS OR 980__:Thesis OR 980__:thesis) -980__:DUMMY -980__c:HIDDEN', + "980:ITUDSPUBSOURCEARCHIVE -980:DELETED -980:HIDDEN -980__a:DUMMY", ] """ For thesis: "(980__:THESIS OR 980__:Thesis OR 980__:thesis) -980__:DUMMY -980__c:HIDDEN" """ -recids_list= [] -for collection_query in collection_queries: +recids_list = [] +print("Querying already migrated records...") +for i, collection_query in enumerate(collection_queries): recs = search_pattern(p=collection_query) + print( + "({} / {}) Found {} records for query `{}`".format( + i + 1, len(collection_queries), len(recs), collection_query + ) + ) recids_list.extend(recs) +print("Filtering records with comments...") records_with_comments = run_sql( "SELECT id_bibrec " "FROM cmtRECORDCOMMENT " @@ -48,8 +55,14 @@ ) recids_with_comments = [record[0] for record in records_with_comments] +print( + "Found {} records with comments out of {} migrated records".format( + len(recids_with_comments), len(recids_list) + ) +) + -def get_comments_from_legacy(recid, display_order='cmt.date_creation ASC'): +def get_comments_from_legacy(recid, display_order="cmt.date_creation ASC"): """ # from invenio.webcomment import query_retrieve_comments_or_remarks """ @@ -72,12 +85,16 @@ def get_comments_from_legacy(recid, display_order='cmt.date_creation ASC'): WHERE cmt.id_bibrec=%%s ORDER BY %(display_order)s """ % { - 'display_order': display_order, - 'reply_to_column': recid > 0 and 'cmt.in_reply_to_id_cmtRECORDCOMMENT' or 'cmt.in_reply_to_id_bskRECORDCOMMENT'} + "display_order": display_order, + "reply_to_column": recid > 0 + and "cmt.in_reply_to_id_cmtRECORDCOMMENT" + or "cmt.in_reply_to_id_bskRECORDCOMMENT", + } params = (recid,) res = run_sql(query, params, with_dict=True) return res + # Function to flatten arbitrarily nested comment replies into a 1-level replies list def flatten_replies(comments_list): """ @@ -87,59 +104,60 @@ def flatten_replies(comments_list): Each comment dict may have its own 'replies' key (list). """ # Build maps for quick lookup - comments_by_id = {c['id']: dict(c, replies=[]) for c in comments_list} + comments_by_id = {c["id"]: dict(c, replies=[]) for c in comments_list} top_level_comments = [] for c in comments_list: - parent = c.get('reply_to_id') + parent = c.get("reply_to_id") if parent is None or parent not in comments_by_id: # This is a top-level comment - top_level_comments.append(comments_by_id[c['id']]) + top_level_comments.append(comments_by_id[c["id"]]) else: # This is a reply; add to parent's replies - comments_by_id[parent]['replies'].append(comments_by_id[c['id']]) + comments_by_id[parent]["replies"].append(comments_by_id[c["id"]]) def collect_all_replies(comment): - """Recursively flattens all indirect replies under this comment. """ + """Recursively flattens all indirect replies under this comment.""" flat = [] - queue = list(comment['replies']) + queue = list(comment["replies"]) while queue: reply = queue.pop(0) flat.append(reply) # Add all replies to the end of the queue - queue.extend(reply['replies']) + queue.extend(reply["replies"]) # Remove their nested replies again for full flatten for r in flat: - r['replies'] = [] + r["replies"] = [] # Sort the replies by creation date - flat.sort(key=lambda x: x['created']) - comment['replies'] = flat + flat.sort(key=lambda x: x["created"]) + comment["replies"] = flat for comment in top_level_comments: collect_all_replies(comment) return top_level_comments + comments_metadata = {} """ { - recid: [ # TODO: Can be nested with versions if multiple versions are present (to be finalized in discussion) + recid: [ { - "comment_id": comment_id, - "content": content, - "status": status, - "user_email": user_email, - "created_at": created_at, - 'attached_files': [{"id": file_id, "path": file_path, "link": file_link}], - "replies": [ + comment_id: comment_id, + content: content, + status: status, + user_email: user_email, + created_at: created_at, + file_relation: {file_id: file_id, version: version}, + replies: [ { - "comment_id": reply_comment_id, - "content": reply_comment_content, - "status": reply_comment_status, - "user_email": reply_comment_user_email, - "created_at": reply_comment_created_at, - 'attached_files': [{"id": file_id, "path": file_path, "link": file_link}], - 'reply_to_id': id_of_the_comment_replied_to, + comment_id: reply_comment_id, + content: reply_comment_content, + status: reply_comment_status, + user_email: reply_comment_user_email, + created_at: reply_comment_created_at, + file_relation: {file_id: file_id, version: version}, + reply_to_id: id_of_the_comment_replied_to, } ] } @@ -156,60 +174,89 @@ def collect_all_replies(comment): """ for i, recid in enumerate(recids_with_comments): - print("({}/{}) Processing comment for record {} -".format(i+1, len(recids_with_comments), recid)) + print( + "({}/{}) Processing comments for record<{}>".format( + i + 1, len(recids_with_comments), recid + ) + ) comments = get_comments_from_legacy(recid) if not comments: - print("No comments found for record {}. Skipping...".format(recid)) + print("No comments found for record<{}>. Skipping...".format(recid)) continue # Check if the comments list have atleast 1 comment with 'ok' status or 'approved' status to avoid migrating records with spam comments that are already deleted # For eg.: https://cds.cern.ch/record/1367848/comments?ln=en - if not any(comment['status'] in ['ok', 'ap'] for comment in comments): - print("No comments with atleast one 'ok'/'ap' status found for record {}. Skipping...".format(recid)) + if not any(comment["status"] in ["ok", "ap"] for comment in comments): + print( + "No comments with atleast one 'ok'/'ap' status found for record<{}>. Skipping...".format( + recid + ) + ) continue - print("Found {} comments for record {} ...".format(len(comments), recid)) + print("Found `{}` comment(s) for record<{}>".format(len(comments), recid)) comments_metadata[recid] = [] comments_to_file_relations = get_comment_to_bibdoc_relations(recid) comment_to_version_relations = {} for relation in comments_to_file_relations: - comment_to_version_relations[relation['id_comment']] = relation['version'] # TODO: Not used for now, but can be used in the future to map the comment to the version - print("Found {} comments to file relations for record {} ...".format(len(comments_to_file_relations), recid)) + comment_to_version_relations[relation["id_comment"]] = { + "file_id": relation["id_bibdoc"], + "version": relation["version"], + } + print( + "Found {} comments to file relations for record {} ...".format( + len(comments_to_file_relations), recid + ) + ) for comment in comments: - users_metadata[comment['id_user']] = (comment['id_user'], comment['email'], comment['nickname'], comment['note'], comment['last_login']) + users_metadata[comment["id_user"]] = ( + comment["id_user"], + comment["email"], + comment["nickname"], + comment["note"], + comment["last_login"], + ) # Flatten the reply comments flattened_comments = flatten_replies(comments) # Sanitize the comment metadata for RDM for comment in flattened_comments: comment_data = { - "comment_id": comment['id'], - "content": comment['body'].replace('\xc2\xa0', ' ').replace('\n', '').strip(), - "status": comment.get('status'), - "user_email": comment.get('email'), - "created_at": comment.get('created'), + "comment_id": comment["id"], + "content": comment["body"] + .replace("\xc2\xa0", " ") + .replace("\n", "") + .strip(), + "status": comment.get("status"), + "user_email": comment.get("email"), + "created_at": comment.get("created"), + "file_relation": comment_to_version_relations.get(comment["id"], {}), "replies": [ { - "comment_id": reply['id'], - "content": reply['body'].replace('\xc2\xa0', ' ').replace('\n', '').strip(), - "status": reply.get('status'), - "user_email": reply.get('email'), - "created_at": reply.get('created'), - 'reply_to_id': reply.get('reply_to_id'), + "comment_id": reply["id"], + "content": reply["body"] + .replace("\xc2\xa0", " ") + .replace("\n", "") + .strip(), + "status": reply.get("status"), + "user_email": reply.get("email"), + "created_at": reply.get("created"), + "reply_to_id": reply.get("reply_to_id"), + "file_relation": comment_to_version_relations.get(reply["id"], {}), } - for reply in comment['replies'] - ] + for reply in comment["replies"] + ], } comments_metadata[recid].append(comment_data) - print("({}/{}) Successfully processed comments for record {}!".format(i+1, len(recids_with_comments), recid)) + print("Successfully processed comment(s) for record<{}>!!!".format(recid)) -with open('comments_metadata.json', 'w') as f: +with open("comments_metadata.json", "w") as f: json.dump(comments_metadata, f) -with open('users_metadata.json', 'w') as f: +with open("users_metadata.json", "w") as f: json.dump(users_metadata, f) """ This file will be read and then this script (with some tweaks) can be run to find out the missing users in the new system. From 8c0a32044c5fb09204430537d1033ee441a82895 Mon Sep 17 00:00:00 2001 From: Saksham Date: Wed, 11 Feb 2026 10:25:54 +0100 Subject: [PATCH 3/4] rdm: comments: Add commenterRunner and UnitOfWork to CommentsRunner.load --- cds_migrator_kit/rdm/cli.py | 36 ++++++- cds_migrator_kit/rdm/comments/extract.py | 20 ++++ cds_migrator_kit/rdm/comments/load.py | 119 +++++++++++++---------- cds_migrator_kit/rdm/comments/runner.py | 34 ++++++- cds_migrator_kit/rdm/comments/streams.py | 14 ++- scripts/dump_comments_to_migrate.py | 13 +-- 6 files changed, 174 insertions(+), 62 deletions(-) diff --git a/cds_migrator_kit/rdm/cli.py b/cds_migrator_kit/rdm/cli.py index b0c5dd66..03f95f36 100644 --- a/cds_migrator_kit/rdm/cli.py +++ b/cds_migrator_kit/rdm/cli.py @@ -16,8 +16,11 @@ from cds_migrator_kit.rdm.affiliations.runner import RecordAffiliationsRunner from cds_migrator_kit.rdm.affiliations.streams import AffiliationsStreamDefinition -from cds_migrator_kit.rdm.comments.runner import CommentsRunner -from cds_migrator_kit.rdm.comments.streams import CommentsStreamDefinition +from cds_migrator_kit.rdm.comments.runner import CommenterRunner, CommentsRunner +from cds_migrator_kit.rdm.comments.streams import ( + CommenterStreamDefinition, + CommentsStreamDefinition, +) from cds_migrator_kit.rdm.records.streams import ( # UserStreamDefinition, RecordStreamDefinition, ) @@ -279,3 +282,32 @@ def comments_run(filepath, dirpath, dry_run=False): dry_run=dry_run, ) runner.run() + + +@comments.command() +@click.option( + "--dry-run", + is_flag=True, +) +@click.option( + "--filepath", + help="Path to the users metadata json file.", + required=True, +) +@click.option( + "--missing-users-filepath", + help="Path to the people.csv file containing person_id for missing users.", + default=None, +) +@with_appcontext +def commenters_run(filepath, missing_users_filepath, dry_run=False): + """Pre-create commenters accounts.""" + log_dir = Path(current_app.config["CDS_MIGRATOR_KIT_LOGS_PATH"]) / "comments" + runner = CommenterRunner( + stream_definition=CommenterStreamDefinition, + filepath=filepath, + missing_users_filepath=missing_users_filepath, + log_dir=log_dir, + dry_run=dry_run, + ) + runner.run() diff --git a/cds_migrator_kit/rdm/comments/extract.py b/cds_migrator_kit/rdm/comments/extract.py index 4ae5a24e..5a7c31d1 100644 --- a/cds_migrator_kit/rdm/comments/extract.py +++ b/cds_migrator_kit/rdm/comments/extract.py @@ -30,3 +30,23 @@ def run(self): ) as metadata: for recid, comments in metadata: yield (recid, comments) + + +class LegacyCommentersExtract(Extract): + """LegacyCommentersExtract.""" + + def __init__(self, filepath, **kwargs): + """Constructor.""" + self.filepath = Path(filepath).absolute() + + def run(self): + """Run.""" + with open(self.filepath, "r") as dump_file: + data = json.load(dump_file) + with click.progressbar( + data.items(), label="Processing commenters" + ) as metadata: + for user_data in metadata: + # user_data is a list (from JSON): [user_id, user_email, user_nickname, user_note, user_last_login] + email = user_data[1] + yield {"submitter": email} diff --git a/cds_migrator_kit/rdm/comments/load.py b/cds_migrator_kit/rdm/comments/load.py index 91b993b7..f83f78af 100644 --- a/cds_migrator_kit/rdm/comments/load.py +++ b/cds_migrator_kit/rdm/comments/load.py @@ -7,17 +7,19 @@ """CDS-RDM migration load module.""" +import os from datetime import datetime from cds_rdm.legacy.resolver import get_pid_by_legacy_recid from flask import current_app, url_for from invenio_access.permissions import system_identity from invenio_accounts.models import User -from invenio_db import db +from invenio_db.uow import UnitOfWork from invenio_rdm_migrator.load.base import Load from invenio_rdm_records.proxies import current_rdm_records_service from invenio_rdm_records.records.api import RDMParent from invenio_rdm_records.requests import CommunitySubmission +from invenio_records_resources.services.uow import RecordCommitOp from invenio_requests.customizations.event_types import CommentEventType, LogEventType from invenio_requests.proxies import current_events_service, current_requests_service from invenio_requests.records.api import RequestEventFormat @@ -41,10 +43,17 @@ def __init__( dry_run=False, ): """Constructor.""" - self.dirpath = dirpath # TODO: To be used later to load the attached files + self.dirpath = dirpath # The directory path where the attached files are stored self.dry_run = dry_run self.all_record_versions = {} + def get_attached_files_for_comment(self, recid, comment_id): + """Get the attached files for the comment.""" + attached_files_directory = os.path.join(self.dirpath, recid, comment_id) + if os.path.exists(attached_files_directory): + return os.listdir(attached_files_directory) + return [] + def get_oldest_record(self, parent_pid_value): latest_record = current_rdm_records_service.read_latest( identity=system_identity, id_=parent_pid_value @@ -61,24 +70,17 @@ def get_oldest_record(self, parent_pid_value): ) return self.all_record_versions[str(oldest_version)] - def create_event(self, request, data, community, record, parent_comment_id=None): - if not parent_comment_id: - logger.info( - "Creating event for record<{}> request<{}> comment<{}>".format( - record["id"], - request.id, - data.get("comment_id"), - ) - ) - else: - logger.info( - "Creating reply event for record<{}> request<{}> comment<{}> parent_comment<{}>".format( - record["id"], - request.id, - data.get("comment_id"), - parent_comment_id, - ) + def create_event( + self, request, data, community, record, uow, parent_comment_id=None + ): + logger.info( + "Creating event for record ID<{}> request ID<{}> comment ID<{}> parent_comment ID<{}>".format( + record["id"], + request.id, + data.get("comment_id"), + "self" if not parent_comment_id else parent_comment_id, ) + ) # Create comment event comment_payload = { @@ -111,6 +113,7 @@ def create_event(self, request, data, community, record, parent_comment_id=None) {}, request=request.model, request_id=str(request.id), type=event_type ) + # If the comment is attached to a record file, add a link to the file in the content if data.get("file_relation"): file_relation = data.get("file_relation") file_id = file_relation.get("file_id") @@ -153,6 +156,14 @@ def create_event(self, request, data, community, record, parent_comment_id=None) # TODO: Add attached files to the event # https://github.com/CERNDocumentServer/cds-migrator-kit/issues/381 + # For now, if attached files are found, raise ManualImportRequired error + attached_files = self.get_attached_files_for_comment( + record["id"], data.get("comment_id") + ) + if attached_files: + raise ManualImportRequired( + f"Attached files found for comment ID<{data.get('comment_id')}>." + ) event.update(comment_payload) @@ -162,17 +173,16 @@ def create_event(self, request, data, community, record, parent_comment_id=None) {"user": str(user.id)}, raise_=True ) else: - print("User not found for email: ", data.get("created_by")) raise ManualImportRequired( f"User not found for email: {data.get('created_by')}" ) event.model.created = data.get("created_at") event.model.version_id = 0 - event.commit() - db.session.commit() + # THere aren't any components to run for creating the event + # Since we are not using the services to create the event, we need to register the commit operation manually for indexing + uow.register(RecordCommitOp(event, indexer=current_events_service.indexer)) - current_events_service.indexer.index(event) return event def create_accepted_community_inclusion_request( @@ -200,37 +210,44 @@ def create_accepted_community_inclusion_request( {"record": record["id"]}, raise_=True ) - request_item = current_requests_service.create( - system_identity, - data={ - "title": record["metadata"]["title"], - }, - request_type=CommunitySubmission, - receiver=receiver_ref, - creator=creator_ref, - topic=topic_ref, - uow=None, - ) - request = request_item._record - request.status = "accepted" - created_at = datetime.fromisoformat(record["created"]) - request.model.created = created_at - - request.commit() - db.session.commit() + with UnitOfWork() as uow: + request_item = current_requests_service.create( + system_identity, + data={ + "title": record["metadata"]["title"], + }, + request_type=CommunitySubmission, + receiver=receiver_ref, + creator=creator_ref, + topic=topic_ref, + uow=uow, + ) + request = request_item._record + request.status = "accepted" + created_at = datetime.fromisoformat(record["created"]) + request.model.created = created_at - current_requests_service.indexer.index(request) - logger.info( - f"Created accepted community submission request<{request.id}> for record<{record['id']}>." - ) + logger.info( + f"Created accepted community submission request<{request.id}> for record<{record['id']}>." + ) - for comment_data in comments: - comment_event = self.create_event(request, comment_data, community, record) - for reply in comment_data.get("replies", []): - reply_event = self.create_event( - request, reply, community, record, comment_event.id + for comment_data in comments: + comment_event = self.create_event( + request, comment_data, community, record, uow ) - self.LEGACY_REPLY_LINK_MAP[reply.get("comment_id")] = reply_event.id + for reply in comment_data.get("replies", []): + reply_event = self.create_event( + request, + reply, + community, + record, + uow, + parent_comment_id=comment_event.id, + ) + self.LEGACY_REPLY_LINK_MAP[reply.get("comment_id")] = reply_event.id + + # Commit at the end to rollback if any error occurs not only for the request but also for the comments + uow.commit() return request diff --git a/cds_migrator_kit/rdm/comments/runner.py b/cds_migrator_kit/rdm/comments/runner.py index b88bf230..6f746c2f 100644 --- a/cds_migrator_kit/rdm/comments/runner.py +++ b/cds_migrator_kit/rdm/comments/runner.py @@ -12,12 +12,13 @@ from invenio_rdm_migrator.streams import Stream from cds_migrator_kit.rdm.comments.log import CommentsLogger +from cds_migrator_kit.rdm.users.api import CDSMigrationUserAPI class CommentsRunner: """ETL streams runner.""" - def __init__(self, stream_definition, filepath, log_dir, dry_run): + def __init__(self, stream_definition, filepath, dirpath, log_dir, dry_run): """Constructor.""" self.log_dir = Path(log_dir) self.log_dir.mkdir(parents=True, exist_ok=True) @@ -28,7 +29,7 @@ def __init__(self, stream_definition, filepath, log_dir, dry_run): stream_definition.name, extract=stream_definition.extract_cls(filepath), transform=stream_definition.transform_cls(), - load=stream_definition.load_cls(dry_run=dry_run), + load=stream_definition.load_cls(dirpath=dirpath, dry_run=dry_run), ) def run(self): @@ -39,3 +40,32 @@ def run(self): CommentsLogger.get_logger().exception( f"Stream {self.stream.name} failed.", exc_info=1 ) + + +class CommenterRunner: + """ETL streams runner dedicated to pre-create commenters accounts.""" + + def __init__( + self, stream_definition, filepath, missing_users_filepath, log_dir, dry_run + ): + """Constructor.""" + self.log_dir = Path(log_dir) + self.log_dir.mkdir(parents=True, exist_ok=True) + + CommentsLogger.initialize(self.log_dir) + + self.stream = Stream( + stream_definition.name, + extract=stream_definition.extract_cls(filepath), + transform=stream_definition.transform_cls(), + load=stream_definition.load_cls( + dry_run=dry_run, + missing_users_filepath=missing_users_filepath, + logger=CommentsLogger.get_logger(), + user_api_cls=CDSMigrationUserAPI, + ), + ) + + def run(self): + """Run commenters ETL stream.""" + self.stream.run() diff --git a/cds_migrator_kit/rdm/comments/streams.py b/cds_migrator_kit/rdm/comments/streams.py index 242ff4f5..4aba4dc7 100644 --- a/cds_migrator_kit/rdm/comments/streams.py +++ b/cds_migrator_kit/rdm/comments/streams.py @@ -9,7 +9,11 @@ from invenio_rdm_migrator.streams import StreamDefinition from invenio_rdm_migrator.transform import IdentityTransform -from cds_migrator_kit.rdm.comments.extract import LegacyCommentsExtract +from cds_migrator_kit.rdm.comments.extract import ( + LegacyCommentersExtract, + LegacyCommentsExtract, +) +from cds_migrator_kit.users.load import CDSSubmitterLoad from .load import CDSCommentsLoad @@ -20,3 +24,11 @@ load_cls=CDSCommentsLoad, ) """ETL stream for CDS to RDM comments.""" + +CommenterStreamDefinition = StreamDefinition( + name="commenters", + extract_cls=LegacyCommentersExtract, + transform_cls=IdentityTransform, + load_cls=CDSSubmitterLoad, +) +"""ETL stream for CDS to RDM commenters.""" diff --git a/scripts/dump_comments_to_migrate.py b/scripts/dump_comments_to_migrate.py index 6b5f1aed..92ca1dca 100644 --- a/scripts/dump_comments_to_migrate.py +++ b/scripts/dump_comments_to_migrate.py @@ -32,7 +32,7 @@ "980:ITUDSPUBSOURCEARCHIVE -980:DELETED -980:HIDDEN -980__a:DUMMY", ] """ -For thesis: "(980__:THESIS OR 980__:Thesis OR 980__:thesis) -980__:DUMMY -980__c:HIDDEN" +For thesis, change the query and re-run this script: "(980__:THESIS OR 980__:Thesis OR 980__:thesis) -980__:DUMMY -980__c:HIDDEN" """ recids_list = [] print("Querying already migrated records...") @@ -198,6 +198,8 @@ def collect_all_replies(comment): print("Found `{}` comment(s) for record<{}>".format(len(comments), recid)) comments_metadata[recid] = [] + # `get_comment_to_bibdoc_relations` is used to find if comments are attached to the record's files (and the version of the files) + # This is not the same as the files attached to the comments comments_to_file_relations = get_comment_to_bibdoc_relations(recid) comment_to_version_relations = {} for relation in comments_to_file_relations: @@ -255,13 +257,12 @@ def collect_all_replies(comment): with open("comments_metadata.json", "w") as f: json.dump(comments_metadata, f) +""" +This file will be read and run by the CommentsRunner to migrate the comments. +""" with open("users_metadata.json", "w") as f: json.dump(users_metadata, f) """ -This file will be read and then this script (with some tweaks) can be run to find out the missing users in the new system. -https://gitlab.cern.ch/cds-team/production_scripts/-/blob/master/cds-rdm/migration/dump_users.py?ref_type=heads - -TODO: Might not be needed as we migrated the whole users table. To be discussed with someone. -TODO: Might need to also fix the script since it returns missing for users migrated as inactive. +This file will be read and run by the CommenterRunner to pre-create the commenters accounts. """ From f420de220fdca656bb5989c79a70b6eaa4d1b8dc Mon Sep 17 00:00:00 2001 From: Saksham Date: Thu, 12 Feb 2026 17:17:36 +0100 Subject: [PATCH 4/4] tests: Add tests for comments migration --- cds_migrator_kit/rdm/cli.py | 4 +- cds_migrator_kit/rdm/comments/extract.py | 13 +- cds_migrator_kit/rdm/comments/load.py | 37 ++- cds_migrator_kit/rdm/comments/runner.py | 4 +- scripts/dump_comments_to_migrate.py | 110 +++++++-- .../data/comments/comments_metadata.json | 23 ++ tests/cds-rdm/test_comments_migration.py | 227 ++++++++++++++++++ 7 files changed, 372 insertions(+), 46 deletions(-) create mode 100644 tests/cds-rdm/data/comments/comments_metadata.json create mode 100644 tests/cds-rdm/test_comments_migration.py diff --git a/cds_migrator_kit/rdm/cli.py b/cds_migrator_kit/rdm/cli.py index 03f95f36..5342850f 100644 --- a/cds_migrator_kit/rdm/cli.py +++ b/cds_migrator_kit/rdm/cli.py @@ -300,13 +300,13 @@ def comments_run(filepath, dirpath, dry_run=False): default=None, ) @with_appcontext -def commenters_run(filepath, missing_users_filepath, dry_run=False): +def commenters_run(filepath, users_dir_path, dry_run=False): """Pre-create commenters accounts.""" log_dir = Path(current_app.config["CDS_MIGRATOR_KIT_LOGS_PATH"]) / "comments" runner = CommenterRunner( stream_definition=CommenterStreamDefinition, filepath=filepath, - missing_users_filepath=missing_users_filepath, + missing_users_dir=users_dir_path, log_dir=log_dir, dry_run=dry_run, ) diff --git a/cds_migrator_kit/rdm/comments/extract.py b/cds_migrator_kit/rdm/comments/extract.py index 5a7c31d1..9289ad86 100644 --- a/cds_migrator_kit/rdm/comments/extract.py +++ b/cds_migrator_kit/rdm/comments/extract.py @@ -43,10 +43,11 @@ def run(self): """Run.""" with open(self.filepath, "r") as dump_file: data = json.load(dump_file) - with click.progressbar( - data.items(), label="Processing commenters" - ) as metadata: + with click.progressbar(data) as metadata: for user_data in metadata: - # user_data is a list (from JSON): [user_id, user_email, user_nickname, user_note, user_last_login] - email = user_data[1] - yield {"submitter": email} + click.secho( + f"Processing commenters: {user_data['email']}", + fg="green", + bold=True, + ) + yield {"submitter": user_data["email"]} diff --git a/cds_migrator_kit/rdm/comments/load.py b/cds_migrator_kit/rdm/comments/load.py index f83f78af..d131c173 100644 --- a/cds_migrator_kit/rdm/comments/load.py +++ b/cds_migrator_kit/rdm/comments/load.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2024 CERN. +# Copyright (C) 2026 CERN. # # CDS-RDM is free software; you can redistribute it and/or modify it under # the terms of the MIT License; see LICENSE file for more details. @@ -8,8 +8,8 @@ """CDS-RDM migration load module.""" import os -from datetime import datetime +import arrow from cds_rdm.legacy.resolver import get_pid_by_legacy_recid from flask import current_app, url_for from invenio_access.permissions import system_identity @@ -49,7 +49,7 @@ def __init__( def get_attached_files_for_comment(self, recid, comment_id): """Get the attached files for the comment.""" - attached_files_directory = os.path.join(self.dirpath, recid, comment_id) + attached_files_directory = os.path.join(self.dirpath, str(recid), str(comment_id)) if os.path.exists(attached_files_directory): return os.listdir(attached_files_directory) return [] @@ -59,16 +59,12 @@ def get_oldest_record(self, parent_pid_value): identity=system_identity, id_=parent_pid_value ) search_result = current_rdm_records_service.scan_versions( - identity=system_identity, - id_=latest_record["id"], + system_identity, latest_record["id"], ) - self.all_record_versions = { - str(hit["versions"]["index"]): hit for hit in search_result - } - oldest_version = min( - int(version) for version in self.all_record_versions.keys() - ) - return self.all_record_versions[str(oldest_version)] + for hit in search_result.hits: + self.all_record_versions[hit["versions"]["index"]] = hit + oldest_version_index = min(self.all_record_versions.keys()) + return self.all_record_versions[oldest_version_index] def create_event( self, request, data, community, record, uow, parent_comment_id=None @@ -166,33 +162,32 @@ def create_event( ) event.update(comment_payload) - - user = User.query.filter_by(email=data.get("created_by")).one_or_none() + user = User.query.filter_by(email=data.get("user_email")).one_or_none() if user: event.created_by = ResolverRegistry.resolve_entity_proxy( {"user": str(user.id)}, raise_=True ) else: raise ManualImportRequired( - f"User not found for email: {data.get('created_by')}" + f"User not found for email: {data.get('user_email')}" ) - event.model.created = data.get("created_at") + created_at = arrow.get(data.get("created_at")).datetime.replace(tzinfo=None) + event.model.created = created_at event.model.version_id = 0 - # THere aren't any components to run for creating the event # Since we are not using the services to create the event, we need to register the commit operation manually for indexing uow.register(RecordCommitOp(event, indexer=current_events_service.indexer)) return event - def create_accepted_community_inclusion_request( + def create_accepted_community_submission_request( self, record, community, creator_user_id, comments=None, ): - """Create an accepted community inclusion request.""" + """Create an accepted community submission request.""" if not comments: logger.warning( f"No comments found for record<{record['id']}>. Skipping request creation." @@ -224,7 +219,7 @@ def create_accepted_community_inclusion_request( ) request = request_item._record request.status = "accepted" - created_at = datetime.fromisoformat(record["created"]) + created_at = arrow.get(record["created"]).datetime.replace(tzinfo=None) request.model.created = created_at logger.info( @@ -259,7 +254,7 @@ def _process_legacy_comments_for_recid(self, recid, comments): parent = RDMParent.pid.resolve(parent_pid.pid_value) community = parent.communities.default record_owner_id = parent.access.owned_by.owner_id - request = self.create_accepted_community_inclusion_request( + request = self.create_accepted_community_submission_request( oldest_record, community, record_owner_id, comments ) return request diff --git a/cds_migrator_kit/rdm/comments/runner.py b/cds_migrator_kit/rdm/comments/runner.py index 6f746c2f..b6548dc2 100644 --- a/cds_migrator_kit/rdm/comments/runner.py +++ b/cds_migrator_kit/rdm/comments/runner.py @@ -46,7 +46,7 @@ class CommenterRunner: """ETL streams runner dedicated to pre-create commenters accounts.""" def __init__( - self, stream_definition, filepath, missing_users_filepath, log_dir, dry_run + self, stream_definition, filepath, missing_users_dir, log_dir, dry_run ): """Constructor.""" self.log_dir = Path(log_dir) @@ -60,7 +60,7 @@ def __init__( transform=stream_definition.transform_cls(), load=stream_definition.load_cls( dry_run=dry_run, - missing_users_filepath=missing_users_filepath, + missing_users_dir=missing_users_dir, logger=CommentsLogger.get_logger(), user_api_cls=CDSMigrationUserAPI, ), diff --git a/scripts/dump_comments_to_migrate.py b/scripts/dump_comments_to_migrate.py index 92ca1dca..bebb4821 100644 --- a/scripts/dump_comments_to_migrate.py +++ b/scripts/dump_comments_to_migrate.py @@ -1,5 +1,6 @@ """ This script is used to get the comments from the legacy system and save them to a json file. +It also dumps the users metadata into valid_users.json and missing_users.json files. """ """ @@ -8,14 +9,23 @@ 3. Map legacy user id to cdsrdm user id 4. Create comments metadata 5. Save comments metadata to json file +6. Dump the users metadata into valid_users.json and missing_users.json files. """ import json +import os +from invenio.bibcirculation_cern_ldap import get_user_info_from_ldap from invenio.dbquery import run_sql from invenio.search_engine import search_pattern from invenio.webcomment_dblayer import get_comment_to_bibdoc_relations +ENV = "dev" +BASE_OUTPUT_DIR = f"/eos/media/cds/cds-rdm/{ENV}/migration/" +COMMENTS_METADATA_FILEPATH = os.path.join( + BASE_OUTPUT_DIR, "comments", "comments_metadata.json" +) + collection_queries = [ "980__a:CNLISSUE -980:DELETED -980:HIDDEN -980__a:DUMMY", "980__a:CNLARTICLE -980:DELETED -980:HIDDEN -980__a:DUMMY", @@ -165,12 +175,9 @@ def collect_all_replies(comment): } """ -users_metadata = {} +users_metadata = [] """ -{ - user_id: (user_id, user_email, user_nickname, user_note, user_last_login), - ..., -} +[(user_id, user_email, user_nickname, user_note, user_last_login), ...] """ for i, recid in enumerate(recids_with_comments): @@ -214,12 +221,14 @@ def collect_all_replies(comment): ) for comment in comments: - users_metadata[comment["id_user"]] = ( - comment["id_user"], - comment["email"], - comment["nickname"], - comment["note"], - comment["last_login"], + users_metadata.append( + ( + comment["id_user"], + comment["email"], + comment["nickname"], + comment["note"], + comment["last_login"], + ), ) # Flatten the reply comments @@ -255,14 +264,85 @@ def collect_all_replies(comment): comments_metadata[recid].append(comment_data) print("Successfully processed comment(s) for record<{}>!!!".format(recid)) -with open("comments_metadata.json", "w") as f: +with open(COMMENTS_METADATA_FILEPATH, "w") as f: json.dump(comments_metadata, f) """ This file will be read and run by the CommentsRunner to migrate the comments. """ -with open("users_metadata.json", "w") as f: - json.dump(users_metadata, f) """ -This file will be read and run by the CommenterRunner to pre-create the commenters accounts. +The following snippet is taken from the `dump_users.py` script in the `production_scripts` repository: +https://gitlab.cern.ch/cds-team/production_scripts/-/blob/master/cds-rdm/migration/dump_users.py?ref_type=heads + +It is used to dump the users metadata as valid_users.json and missing_users.json files. + +After running this script, place the "active_users.json" and "missing_users.json" files in the "cds_migrator_kit/rdm/data/users/" folder along with "people.csv" file. """ + +OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "users") +USERS_FILEPATH = os.path.join(OUTPUT_DIR, "active_users.json") +MISSING_USERS_FILEPATH = os.path.join(OUTPUT_DIR, "missing_users.json") + +if not os.path.exists(OUTPUT_DIR): + os.makedirs(OUTPUT_DIR) + + +def dump_users(): + """ + Dump the users metadata into valid_users.json and missing_users.json files. + """ + + def get_uid_from_ldap_user(ldap_user): + try: + uidNumber = ldap_user["uidNumber"][0] + return uidNumber + except: + return None + + def get_department_from_ldap_user(ldap_user): + try: + department = ldap_user["department"][0] + return department + except: + return None + + def _dump(recs): + valid_users = [] + missing_users = [] + + for rec in recs: + # record example: ( + # 414320 + # joe.doe@mail.com + # ) + + email = rec[1] + ldap_user = get_user_info_from_ldap(email=email) + uidNumber = get_uid_from_ldap_user(ldap_user) + record = { + "id": rec[0], + "email": rec[1], + "displayname": rec[2], + "active": rec[3], + } + if uidNumber: + record["uid"] = uidNumber + department = get_department_from_ldap_user(ldap_user) + if department: + record["department"] = department + else: + print("No department for {}".format(email)) + valid_users.append(record) + else: + missing_users.append(record) + + return valid_users, missing_users + + valid_users, missing_users = _dump(users_metadata) + with open(USERS_FILEPATH, "w") as fp: + json.dump(valid_users, fp, indent=2) + + if missing_users: + print("Missing users found {0}".format(len(missing_users))) + with open(MISSING_USERS_FILEPATH, "w") as fp: + json.dump(missing_users, fp, indent=2) diff --git a/tests/cds-rdm/data/comments/comments_metadata.json b/tests/cds-rdm/data/comments/comments_metadata.json new file mode 100644 index 00000000..c055b654 --- /dev/null +++ b/tests/cds-rdm/data/comments/comments_metadata.json @@ -0,0 +1,23 @@ +{ + "12345": [ + { + "comment_id": 1, + "content": "This is a test comment", + "status": "ok", + "user_email": "submitter13@cern.ch", + "created_at": "2026-01-10 10:00:00", + "file_relation": {}, + "replies": [ + { + "comment_id": 2, + "content": "This is a reply", + "status": "ok", + "user_email": "submitter10@gmail.com", + "created_at": "2026-02-10 11:00:00", + "reply_to_id": 1, + "file_relation": {} + } + ] + } + ] +} \ No newline at end of file diff --git a/tests/cds-rdm/test_comments_migration.py b/tests/cds-rdm/test_comments_migration.py new file mode 100644 index 00000000..6e2c8155 --- /dev/null +++ b/tests/cds-rdm/test_comments_migration.py @@ -0,0 +1,227 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2026 CERN. +# +# CDS-RDM is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""Tests for comments migration workflow.""" + +import json +import os +import tempfile + +import pytest +from invenio_access.permissions import system_identity +from invenio_accounts.models import User +from invenio_db.uow import UnitOfWork +from invenio_rdm_records.proxies import current_rdm_records_service +from invenio_rdm_records.records.api import RDMParent +from invenio_requests.proxies import current_events_service, current_requests_service + +from cds_migrator_kit.rdm.comments.runner import CommenterRunner, CommentsRunner +from cds_migrator_kit.rdm.comments.streams import ( + CommenterStreamDefinition, + CommentsStreamDefinition, +) + +LEGACY_RECD_ID = "12345" + + +@pytest.fixture +def temp_dir(): + """Create a temporary directory for test files.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield tmpdir + + +@pytest.fixture +def migrated_record_with_comments(test_app, community, uploader, db, add_pid): + """Create a migrated RDM record that will have comments and a legacy PID.""" + minimal_record = { + "metadata": { + "title": "Test Record with Comments", + "publication_date": "2026-01-01", + "resource_type": {"id": "publication-article"}, + "creators": [ + { + "person_or_org": { + "type": "personal", + "name": "Test Author", + "given_name": "Test", + "family_name": "Author", + } + } + ], + }, + "access": { + "record": "public", + "files": "public", + }, + "files": { + "enabled": False, + }, + "media_files": { + "enabled": False, + }, + } + + # Create the draft + draft = current_rdm_records_service.create( + system_identity, + minimal_record, + ) + + # Publish the record + record = current_rdm_records_service.publish(system_identity, draft.id) + + # Add to community + parent = record._record.parent + parent.communities.add(community) + parent.communities.default = community + parent.commit() + + # Add 'lrecid' legacy PID to the published record + add_pid( + pid_type="lrecid", + pid_value=LEGACY_RECD_ID, + object_uuid=parent.pid.object_uuid, + ) + + current_rdm_records_service.record_cls.index.refresh() + + return record + + +def test_create_users_from_metadata( + temp_dir, + db, +): + """Test creating users from users_metadata.json.""" + # Run commenters runner + log_dir = os.path.join(temp_dir, "logs") + runner = CommenterRunner( + stream_definition=CommenterStreamDefinition, + filepath=os.path.join( + os.path.dirname(__file__), "data", "users", "missing_users.json" + ), + missing_users_dir=os.path.join(os.path.dirname(__file__), "data", "users"), + log_dir=log_dir, + dry_run=False, + ) + runner.run() + + # Verify users were created + user1 = User.query.filter_by(email="submitter13@cern.ch").one_or_none() + user2 = User.query.filter_by(email="submitter10@gmail.com").one_or_none() + + assert user1 is not None + assert user2 is not None + + +def test_create_users_dry_run( + temp_dir, + db, +): + """Test creating users in dry-run mode.""" + # Run commenters runner in dry-run mode + log_dir = os.path.join(temp_dir, "logs") + runner = CommenterRunner( + stream_definition=CommenterStreamDefinition, + filepath=os.path.join( + os.path.dirname(__file__), "data", "users", "missing_users.json" + ), + missing_users_dir=os.path.join(os.path.dirname(__file__), "data", "users"), + log_dir=log_dir, + dry_run=True, + ) + runner.run() + + # Verify users were NOT created in dry-run mode + user1 = User.query.filter_by(email="submitter13@cern.ch").one_or_none() + user2 = User.query.filter_by(email="submitter10@gmail.com").one_or_none() + + # In dry-run mode, users should not be created + # For now, we just verify the runner completes without errors + assert user1 is None + assert user2 is None + + +def test_migrate_comments_from_metadata( + temp_dir, + migrated_record_with_comments, + db, +): + """Test migrating comments from comments_metadata.json.""" + # Create users first (required for comments migration) + user1 = User(email="submitter13@cern.ch", active=True) + user2 = User(email="submitter10@gmail.com", active=True) + db.session.add(user1) + db.session.add(user2) + db.session.commit() + + # Create directory structure for attached files + comments_dir = os.path.join(os.path.dirname(__file__), "data", "comments") + os.makedirs(comments_dir, exist_ok=True) + + # Run comments runner + log_dir = os.path.join(temp_dir, "logs") + runner = CommentsRunner( + stream_definition=CommentsStreamDefinition, + filepath=os.path.join( + os.path.dirname(__file__), "data", "comments", "comments_metadata.json" + ), + dirpath=comments_dir, + log_dir=log_dir, + dry_run=False, + ) + runner.run() + + current_requests_service.record_cls.index.refresh() + # Verify request was created + request = current_requests_service.search( + identity=system_identity, + q=f"topic.record:{migrated_record_with_comments['id']}", + ) + assert request.total == 1 + request = next(request.hits) + + current_events_service.record_cls.index.refresh() + # Verify comments were created as request events + comments = current_events_service.search( + identity=system_identity, + request_id=request["id"], + ) + assert comments.total == 2 # 1 comment and 1 reply + + +def test_migrate_comments_dry_run( + temp_dir, + migrated_record_with_comments, +): + """Test migrating comments in dry-run mode.""" + # Create directory structure for attached files + comments_dir = os.path.join(os.path.dirname(__file__), "data", "comments") + os.makedirs(comments_dir, exist_ok=True) + + # Run comments runner in dry-run mode + log_dir = os.path.join(temp_dir, "logs") + runner = CommentsRunner( + stream_definition=CommentsStreamDefinition, + filepath=os.path.join( + os.path.dirname(__file__), "data", "comments", "comments_metadata.json" + ), + dirpath=comments_dir, + log_dir=log_dir, + dry_run=True, + ) + runner.run() + + # In dry-run mode, request and comments should not be created + # Verify the runner completes without errors + current_requests_service.record_cls.index.refresh() + request = current_requests_service.search( + identity=system_identity, + q=f"topic.record:{migrated_record_with_comments['id']}", + ) + assert request.total == 0